Project

downstream

0.02
The project is in a healthy, maintained state
Straightforward way to implement communication between Rails Engines using the Publish-Subscribe pattern
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
 Dependencies

Development

~> 2.2
>= 1.16
~> 1.3
~> 13.0
~> 3.0
~> 1.3
~> 1.4.0

Runtime

 Project Readme

Gem Version Build Status

Downstream

This gem provides a straightforward way to implement communication between Rails Engines using the Publish-Subscribe pattern. The gem allows decreasing the coupling of engines with events. An event is a recorded object in the system that reflects an action that the engine performs, and the params that lead to its creation.

The gem inspired by active_event_store, and initially based on its codebase. Having said that, it does not store in a database all happened events which ensures simplicity and performance.

Sponsored by Evil Martians

Installation

Add this line to your application's Gemfile:

gem "downstream", "~> 1.0"

Usage

Downstream provides a way more handy interface to build reactive apps. Each event has a strict schema described by a separate class. The gem has convenient tooling to write tests.

Downstream supports various adapters for event handling. It can be configured in a Rails initializer config/initializers/downstream.rb:

Downstream.configure do |config|
  config.pubsub = :stateless # it's a default adapter
  config.async_queue = :high_priority # nil by default
end

For now, it's implemented only one adapter. The stateless adapter is based on ActiveSupport::Notifications, and it doesn't store history events anywhere. All event invocations are synchronous. Adding asynchronous subscribers are on my road map.

Describe events

Events are represented by event classes, which describe events payloads and identifiers:

class ProfileCreated < Downstream::Event
  # (optional)
  # Event identifier is used for streaming events to subscribers.
  # By default, identifier is equal to underscored class name.
  # You don't need to specify identifier manually, only for backward compatibility when
  # class name is changed.
  self.identifier = "profile_created"

  # Add attributes accessors
  attributes :user
end

Each event has predefined (reserved) fields:

  • event_id – unique event id
  • type – event type (=identifier)

NOTE: events should be in the past tense and describe what happened (e.g. "ProfileCreated", "EventPublished", etc.).

Events are stored in app/events folder.

Publish events

To publish an event you must first create an instance of the event class and call Downstream.publish method:

event = ProfileCompleted.new(user: user)

# then publish the event
Downstream.publish(event)

That's it! Your event has been stored and propagated.

Subscribe to events

To subscribe a handler to an event you must use Downstream.subscribe method.

You should do this in your app or engine initializer:

# some/engine.rb

initializer "my_engine.subscribe_to_events" do
  # To make sure event store is initialized use load hook
  # `store` == `Downstream`
  ActiveSupport.on_load "downstream-events" do |store|
    store.subscribe MyEventHandler, to: ProfileCreated

    # anonymous handler (could only be synchronous)
    store.subscribe(to: ProfileCreated) do |event|
      # do something
    end

    # you can omit event if your subscriber follows the convention
    # for example, the following subscriber would subscribe to
    # ProfileCreated event
    store.subscribe OnProfileCreated::DoThat
  end
end

NOTE: event handler must be a callable object.

Although subscriber could be any callable Ruby object, that have specific input format (event); thus we suggest putting subscribers under app/subscribers/on_<event_type>/<subscriber.rb>, e.g. app/subscribers/on_profile_created/create_chat_user.rb).

Sometimes, you may be interested in using temporary subscriptions. For that, you can use this:

subscriber = ->(event) { my_event_handler(event) }
Downstream.subscribed(subscriber, to: ProfileCreated) do
  some_invocation
end

If you want to handle events in a background job, you can pass the async: true option:

store.subscribe OnProfileCreated::DoThat, async: true

By default, a job will be enqueued into async_queue name from the Downstream config. You can define your own queue name for a specific subscriber:

store.subscribe OnProfileCreated::DoThat, async: {queue: :low_priority}

NOTE: all subscribers are synchronous by default

Testing

You can test subscribers as normal Ruby objects.

First, load testing helpers in the spec_helper.rb:

require "downstream/rspec"

To test that a given subscriber exists, you can do the following:

it "is subscribed to some event" do
  allow(MySubscriberService).to receive(:call)

  event = MyEvent.new(some: "data")

  Downstream.publish event

  expect(MySubscriberService).to have_received(:call).with(event)
end

# for asynchronous subscriptions
it "is subscribed to some event" do
  event = MyEvent.new(some: "data")
  expect { Downstream.publish event }.
    to have_enqueued_async_subscriber_for(MySubscriberService).
    with(event)
end

To test publishing use have_published_event matcher:

expect { subject }.to have_published_event(ProfileCreated).with(user: user)

NOTE: have_published_event only supports block expectations.

NOTE 2 with modifier works like have_attributes matcher (not contain_exactly);