LocalBus
LocalBus is a lightweight pub/sub system for Ruby that helps organize and simplify intra-process communication.
Table of Contents
- Why LocalBus?
- Installation
- Quick Start
- Interfaces
- Bus (immediate processing)
- Station (background processing)
- Advanced Usage & Considerations
- Concurrency Controls
- Bus Interface (Async)
- Station Interface (Thread Pool)
- Error Handling & Recovery
- Memory Considerations
- Blocking Operations
- Shutdown & Cleanup
- Limitations
- Concurrency Controls
- See Also
- Sponsors
Why LocalBus?
A message bus (or enterprise service bus) is an architectural pattern that enables different parts of an application to communicate without direct knowledge of each other. Think of it as a smart postal service for your application - components can send messages to topics, and other components can listen for those messages, all without knowing about each other directly.
Even within a single process, this pattern offers powerful benefits:
- Decouple Components: Break complex systems into maintainable parts that can evolve independently
- Single Responsibility: Each component can focus on its core task without handling cross-cutting concerns
- Flexible Architecture: Easily add new features by subscribing to existing events without modifying original code
- Control Flow: Choose immediate or background processing based on your needs
- Testing: Simplified testing as components can be tested in isolation
- Stay Reliable: Built-in error handling and thread safety
- Non-Blocking: Efficient message processing with async I/O
Installation
bundle add local_bus
Quick Start
Interfaces
- Bus: Single-threaded, immediate message delivery using Socketry Async with non-blocking I/O operations
- Station: Multi-threaded message queuing powered by Concurrent Ruby's thread pool, processing messages through the Bus without blocking the main thread
Both interfaces ensure optimal performance:
- Bus leverages async I/O to prevent blocking on network or disk operations
- Station offloads work to a managed thread pool, keeping the main thread responsive
- Both interfaces support an explicit
wait
for subscribers
Bus (immediate processing)
Best for real-time operations like logging, metrics, and state updates.
bus = LocalBus::Bus.new # ... or LocalBus.instance.bus
bus.subscribe "user.created" do |message|
AuditLog.record(message.payload)
true
end
# publish returns a promise-like object that resolves to subscribers
result = bus.publish("user.created", user_id: 123)
result.wait # blocks until all subscribers complete
result.value # blocks and waits until all subscribers complete and returns the subscribers
Subscribe with an explicit callable
.
callable = ->(message) { "Received message: #{message.payload}" }
LocalBus.instance.bus.subscribe "user.created", callable: callable
subscribers = LocalBus.instance.bus.publish("user.created", user_id: 123).value
# => [#<LocalBus::Subscriber:0x0000000126b7cf38 ...>]
subscribers.first.value
# => "Received message: {:user_id=>123}"
# you can use any object that responds to #call
class ExampleCallable
def call(message)
"Received message: #{message.payload}"
end
end
LocalBus.instance.bus.subscribe "user.created", callable: ExampleCallable.new
subscribers = LocalBus.instance.bus.publish("user.created", user_id: 123).value
# => [#<LocalBus::Subscriber:0x0000000126b7cf38 ...>]
subscribers.first.value
# => "Received message: {:user_id=>123}"
Station (background processing)
Best for async operations like emails, notifications, and resource-intensive tasks.
station = LocalBus::Station.new # ... or LocalBus.instance.station
station.subscribe "email.welcome" do |message|
WelcomeMailer.deliver(message.payload[:user_id])
true
end
# Returns a Promise or Future that resolves to subscribers
result = station.publish("email.welcome", user_id: 123)
result.wait # blocks until all subscribers complete
result.value # blocks and waits until all subscribers complete and returns the subscribers
Subscribe with an explicit callable
.
callable = ->(message) { "Received message: #{message.payload}" }
LocalBus.instance.station.subscribe "email.welcome", callable: callable
subscribers = LocalBus.instance.station.publish("email.welcome", user_id: 123).value
# => [#<LocalBus::Subscriber:0x0000000126b7cf38 ...>]
subscribers.first.value
# => "Received message: {:user_id=>123}"
# you can use any object that responds to #call
class ExampleCallable
def call(message)
"Received message: #{message.payload}"
end
end
LocalBus.instance.station.subscribe "email.welcome", callable: ExampleCallable.new
subscribers = LocalBus.instance.station.publish("email.welcome", user_id: 123).value
# => [#<LocalBus::Subscriber:0x0000000126b7cf38 ...>]
subscribers.first.value
# => "Received message: {:user_id=>123}"
Advanced Usage & Considerations
Concurrency Controls
Bus Interface (Async)
The Bus interface uses Async's Semaphore to limit resource consumption:
# Configure concurrency limits for the Bus
bus = LocalBus::Bus.new(max_concurrency: 10)
# The semaphore ensures only N concurrent operations run at once
bus.subscribe "resource.intensive" do |message|
# Only 10 of these will run concurrently
perform_intensive_operation(message)
end
When the max concurrency limit is reached, new publish operations will wait until a slot becomes available. This prevents memory bloat but means you should be mindful of timeouts in your subscribers.
Station Interface (Thread Pool)
The Station interface uses Concurrent Ruby's fixed thread pool with a fallback policy:
# Configure the thread pool size for the Station
station = LocalBus::Station.new(
max_queue: 5_000, # Maximum number of queued items
max_threads: 10, # Maximum pool size
fallback_policy: :caller_runs # Runs on calling thread
)
The fallback policy determines behavior when the thread pool is saturated:
-
:caller_runs
- Executes the task in the publishing thread (can block) -
:abort
- Raises an error -
:discard
- Silently drops the task
Error Handling & Recovery
Both interfaces implement error boundaries to prevent individual subscriber failures from affecting others:
bus.subscribe "user.created" do |message|
raise "Something went wrong!"
true # Never reached
end
bus.subscribe "user.created" do |message|
# This still executes despite the error above
notify_admin(message)
true
end
# The publish operation completes with partial success
result = bus.publish("user.created", user_id: 123)
result.wait
errored_subscribers = result.value.select(&:error)
Memory Considerations
Messages are held in memory until all subscribers complete processing. For the Station interface, this includes time spent in the thread pool queue. Consider this when publishing large payloads or during high load:
# Memory-efficient publishing of large datasets
large_dataset.each_slice(100) do |batch|
station.publish("data.process", items: batch).wait
end
Blocking Operations
The Bus interface uses non-blocking I/O but can still be blocked by CPU-intensive operations:
# Bad - blocks the event loop
bus.subscribe "cpu.intensive" do |message|
perform_heavy_calculation(message)
end
# Better - offload to Station for CPU-intensive work
station.subscribe "cpu.intensive" do |message|
perform_heavy_calculation(message)
end
Shutdown & Cleanup
LocalBus does its best to handle graceful shutdown when the process exits, and works to ensure published messages are processed. However, it's possible that some messages may be lost when the process exits.
Factor for potential message loss when designing your system. For example, idempotency (i.e. messages that can be re-published without unintended side effects).
Limitations
- The Bus interface is single-threaded - long-running subscribers can impact latency
- The Station interface may drop messages if configured with
:discard
fallback policy - No persistence - pending messages may be lost on process restart
- No distributed support - communication limited to single process
- Large payloads can impact memory usage, especially under high load
- No built-in retry mechanism for failed subscribers
Consider these limitations when designing your system architecture.
See Also
- Message Bus - A reliable and robust messaging bus for Ruby and Rack
- Wisper - A micro library providing Ruby objects with Publish-Subscribe capabilities
Sponsors
Proudly sponsored by