Project

local_bus

0.02
The project is in a healthy, maintained state
LocalBus is a lightweight yet powerful pub/sub system for Ruby applications that enables decoupled communication within a single process. It offers both non-blocking I/O and thread pool processing modes, robust error handling, and fine-grained concurrency controls. Perfect for organizing event-driven architectures, handling background jobs, and managing complex workflows without external dependencies."
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies
 Project Readme

Lines of Code GEM Version GEM Downloads Tests Ruby Style Sponsors Twitter Follow

LocalBus

LocalBus is a lightweight pub/sub system for Ruby that helps organize and simplify intra-process communication.

Table of Contents

  • Why LocalBus?
  • Installation
  • Quick Start
    • Interfaces
    • Bus (immediate processing)
    • Station (background processing)
  • Advanced Usage & Considerations
    • Concurrency Controls
      • Bus Interface (Async)
      • Station Interface (Thread Pool)
    • Error Handling & Recovery
    • Memory Considerations
    • Blocking Operations
    • Shutdown & Cleanup
    • Limitations
  • See Also
  • Sponsors

Why LocalBus?

A message bus (or enterprise service bus) is an architectural pattern that enables different parts of an application to communicate without direct knowledge of each other. Think of it as a smart postal service for your application - components can send messages to topics, and other components can listen for those messages, all without knowing about each other directly.

Even within a single process, this pattern offers powerful benefits:

  • Decouple Components: Break complex systems into maintainable parts that can evolve independently
  • Single Responsibility: Each component can focus on its core task without handling cross-cutting concerns
  • Flexible Architecture: Easily add new features by subscribing to existing events without modifying original code
  • Control Flow: Choose immediate or background processing based on your needs
  • Testing: Simplified testing as components can be tested in isolation
  • Stay Reliable: Built-in error handling and thread safety
  • Non-Blocking: Efficient message processing with async I/O

Installation

bundle add local_bus

Quick Start

Interfaces

  • Bus: Single-threaded, immediate message delivery using Socketry Async with non-blocking I/O operations
  • Station: Multi-threaded message queuing powered by Concurrent Ruby's thread pool, processing messages through the Bus without blocking the main thread

Both interfaces ensure optimal performance:

  • Bus leverages async I/O to prevent blocking on network or disk operations
  • Station offloads work to a managed thread pool, keeping the main thread responsive
  • Both interfaces support an explicit wait for subscribers

Bus (immediate processing)

Best for real-time operations like logging, metrics, and state updates.

bus = LocalBus::Bus.new # ... or LocalBus.instance.bus

bus.subscribe "user.created" do |message|
  AuditLog.record(message.payload)
  true
end

# publish returns a promise-like object that resolves to subscribers
result = bus.publish("user.created", user_id: 123)

result.wait  # blocks until all subscribers complete
result.value # blocks and waits until all subscribers complete and returns the subscribers

Subscribe with an explicit callable.

callable = ->(message) { "Received message: #{message.payload}" }
LocalBus.instance.bus.subscribe "user.created", callable: callable

subscribers = LocalBus.instance.bus.publish("user.created", user_id: 123).value
# => [#<LocalBus::Subscriber:0x0000000126b7cf38 ...>]

subscribers.first.value
# => "Received message: {:user_id=>123}"

# you can use any object that responds to #call
class ExampleCallable
  def call(message)
    "Received message: #{message.payload}"
  end
end

LocalBus.instance.bus.subscribe "user.created", callable: ExampleCallable.new
subscribers = LocalBus.instance.bus.publish("user.created", user_id: 123).value
# => [#<LocalBus::Subscriber:0x0000000126b7cf38 ...>]

subscribers.first.value
# => "Received message: {:user_id=>123}"

Station (background processing)

Best for async operations like emails, notifications, and resource-intensive tasks.

station = LocalBus::Station.new # ... or LocalBus.instance.station

station.subscribe "email.welcome" do |message|
  WelcomeMailer.deliver(message.payload[:user_id])
  true
end

# Returns a Promise or Future that resolves to subscribers
result = station.publish("email.welcome", user_id: 123)

result.wait  # blocks until all subscribers complete
result.value # blocks and waits until all subscribers complete and returns the subscribers

Subscribe with an explicit callable.

callable = ->(message) { "Received message: #{message.payload}" }
LocalBus.instance.station.subscribe "email.welcome", callable: callable

subscribers = LocalBus.instance.station.publish("email.welcome", user_id: 123).value
# => [#<LocalBus::Subscriber:0x0000000126b7cf38 ...>]

subscribers.first.value
# => "Received message: {:user_id=>123}"

# you can use any object that responds to #call
class ExampleCallable
  def call(message)
    "Received message: #{message.payload}"
  end
end

LocalBus.instance.station.subscribe "email.welcome", callable: ExampleCallable.new
subscribers = LocalBus.instance.station.publish("email.welcome", user_id: 123).value
# => [#<LocalBus::Subscriber:0x0000000126b7cf38 ...>]

subscribers.first.value
# => "Received message: {:user_id=>123}"

Advanced Usage & Considerations

Concurrency Controls

Bus Interface (Async)

The Bus interface uses Async's Semaphore to limit resource consumption:

# Configure concurrency limits for the Bus
bus = LocalBus::Bus.new(max_concurrency: 10)

# The semaphore ensures only N concurrent operations run at once
bus.subscribe "resource.intensive" do |message|
  # Only 10 of these will run concurrently
  perform_intensive_operation(message)
end

When the max concurrency limit is reached, new publish operations will wait until a slot becomes available. This prevents memory bloat but means you should be mindful of timeouts in your subscribers.

Station Interface (Thread Pool)

The Station interface uses Concurrent Ruby's fixed thread pool with a fallback policy:

# Configure the thread pool size for the Station
station = LocalBus::Station.new(
  max_queue: 5_000, # Maximum number of queued items
  max_threads: 10, # Maximum pool size
  fallback_policy: :caller_runs # Runs on calling thread
)

The fallback policy determines behavior when the thread pool is saturated:

  • :caller_runs - Executes the task in the publishing thread (can block)
  • :abort - Raises an error
  • :discard - Silently drops the task

Error Handling & Recovery

Both interfaces implement error boundaries to prevent individual subscriber failures from affecting others:

bus.subscribe "user.created" do |message|
  raise "Something went wrong!"
  true # Never reached
end

bus.subscribe "user.created" do |message|
  # This still executes despite the error above
  notify_admin(message)
  true
end

# The publish operation completes with partial success
result = bus.publish("user.created", user_id: 123)
result.wait
errored_subscribers = result.value.select(&:error)

Memory Considerations

Messages are held in memory until all subscribers complete processing. For the Station interface, this includes time spent in the thread pool queue. Consider this when publishing large payloads or during high load:

# Memory-efficient publishing of large datasets
large_dataset.each_slice(100) do |batch|
  station.publish("data.process", items: batch).wait
end

Blocking Operations

The Bus interface uses non-blocking I/O but can still be blocked by CPU-intensive operations:

# Bad - blocks the event loop
bus.subscribe "cpu.intensive" do |message|
  perform_heavy_calculation(message)
end

# Better - offload to Station for CPU-intensive work
station.subscribe "cpu.intensive" do |message|
  perform_heavy_calculation(message)
end

Shutdown & Cleanup

LocalBus does its best to handle graceful shutdown when the process exits, and works to ensure published messages are processed. However, it's possible that some messages may be lost when the process exits.

Factor for potential message loss when designing your system. For example, idempotency (i.e. messages that can be re-published without unintended side effects).

Limitations

  • The Bus interface is single-threaded - long-running subscribers can impact latency
  • The Station interface may drop messages if configured with :discard fallback policy
  • No persistence - pending messages may be lost on process restart
  • No distributed support - communication limited to single process
  • Large payloads can impact memory usage, especially under high load
  • No built-in retry mechanism for failed subscribers

Consider these limitations when designing your system architecture.

See Also

  • Message Bus - A reliable and robust messaging bus for Ruby and Rack
  • Wisper - A micro library providing Ruby objects with Publish-Subscribe capabilities

Sponsors

Proudly sponsored by