RabbitFeed
** This gem is not maintained anymore **
A gem providing asynchronous event publish and subscribe capabilities with RabbitMQ.
Core concepts
- Fire and forget: Application can publish an event and it has no knowledge/care of how that event is consumed.
- Persistent events: Once an event has been published, it will persist until it has been processed successfully.
- Self-describing events: The event not only contains a payload, but also a schema that describes that payload.
- Multiple subscribers: Multiple applications can subscribe to the same events.
- Event versioning: Consumers can customize event handling based on the event version.
Installation
Add this line to your application's Gemfile:
gem 'rabbit_feed'
Configuration
Create a config/rabbit_feed.yml
file. The following options should be specified:
environment:
host: RabbitMQ host
user: RabbitMQ user name
password: RabbitMQ password
application: Application name
Sample:
development:
host: localhost
user: guest
password: guest
application: beavers
Configuration options that can be specified are:
Name | Use |
---|---|
host |
Hostname or IP of the RabbitMQ server |
hosts |
Array of hostnames or IPs of a RabbitMQ cluster |
port |
The port to use on the RabbitMQ server |
user |
The user to authenticate with the RabbitMQ server |
password |
The password to authenticate with the RabbitMQ server |
application |
The name of the application - used for routing events to the specified consumers |
environment |
The environment of the application - used for routing events to the specified consumers |
exchange |
The name of the RabbitMQ exchange to which events are published |
heartbeat |
The interval at which to send heartbeats to the RabbitMQ server (in seconds) |
connect_timeout |
The timeout (in seconds) for connecting to the RabbitMQ server |
network_recovery_interval |
Recovery interval (in seconds) periodic network recovery will use |
auto_delete_queue |
If true, any queues created will auto-delete upon disconnect |
auto_delete_exchange |
If true, any exchanges created will auto-delete upon disconnect |
route_prefix_extension |
If set, the routing key/queue of the exchange will become env.route_prefix_extension .application.event |
consumer_exit_after_fail |
If set, consumer will stop consuming as soon as it fails to process an event. By default, it will continue to consume and log the error(s) |
Initialisation
If installing in a rails application, the following should be defined in config/initializers/rabbit_feed.rb
:
# Define the events (if producing)
EventDefinitions do
define_event('user_creates_beaver', version: '1.0.0') do
defined_as do
'A beaver has been created'
end
payload_contains do
field('beaver_name', type: 'string', definition: 'The name of the beaver')
end
end
end
# Define the event routing (if consuming)
EventRouting do
accept_from('beavers') do
event('foo') do |event|
# Do something...
end
end
end
You may also override the log location (defaults to log/rabbit_feed.log
if a log
directory exists, else STDOUT
), the environment (defaults to the RAILS_ENV
or RACK_ENV
), and the path to the RabbitFeed config file (defaults to config/rabbit_feed.yml
) in the initializer, like this:
RabbitFeed.instance_eval do
self.log = Logger.new (Rails.root.join 'log', 'rabbit_feed.log')
self.environment = Rails.env
self.configuration_file_path = Rails.root.join 'config', 'rabbit_feed.yml'
end
Logging in JSON
RabbitFeed log messages are constructed in such a way that they are friendly to JSON log formats. This is done to simplify log aggregation with tools like Kibana. To log in JSON format, set the RabbitFeed log to use the JSON formatter, e.g.
RabbitFeed.log.formatter = RabbitFeed::JsonLogFormatter
Producing events
The producer defines the events and their payloads using the Event Definitions DSL. In a rails app, this can be defined in the initialiser.
To produce an event:
require 'rabbit_feed'
RabbitFeed::Producer.publish_event 'Event name', { 'payload_field' => 'payload field value' }
Event name: This tells you what the event is.
Event payload: This is the data about the event. This should be a hash.
The event will be published to the configured exchange on RabbitMQ (amq.topic
by default) with a routing key having the pattern of: [environment].[producer application name].[event name]
.
Returned Events
In the case that there are no consumers configured to subscribe to an event, the event will be returned to the producer. The returned event will be logged, and if your project uses Airbrake (v5.0+) or Airbrake-Ruby, an error will be reported there.
Testing the Producer
To prevent RabbitFeed from publishing events to RabbitMQ during tests, add the following to spec_helper.rb
:
RSpec.configure do |config|
RabbitFeed::TestingSupport.capture_published_events(config)
end
Or, if using Cucumber, put this in your env.rb
:
Before do
RabbitFeed::TestingSupport.capture_published_events_in_context(self)
end
RSpec
To verify that your application publishes an event, use the custom RSpec matcher provided with this application.
To make the custom RSpec matcher available to your tests, add the following to spec_helper.rb
:
RSpec.configure do |config|
RabbitFeed::TestingSupport.setup(config)
end
Validating an event was published by event name:
require 'spec_helper'
describe BeaversController do
describe 'POST create' do
it 'publishes a create event' do
expect{
post :create, beaver: { name: 'beaver' }
}.to publish_event('user_creates_beaver')
end
end
end
Validating an event was published by event name and payload:
require 'spec_helper'
describe BeaversController do
describe 'POST create' do
it 'publishes a create event' do
expect{
post :create, beaver: { name: 'beaver' }
}.to publish_event('user_creates_beaver', 'beaver_name' => 'beaver')
end
end
end
Validating an event was published by event name and partial payload match:
require 'spec_helper'
describe BeaversController do
describe 'POST create' do
it 'publishes a create event' do
expect{
post :create, beaver: { name: 'beaver' }
}.to publish_event('user_creates_beaver').including('beaver_name' => 'beaver')
end
end
end
You can also use the .asserting
chain, allowing you to provide your own expectation on the event's payload:
it 'publishes a create event' do
expect{
post :create, beaver: { name: 'beaver' }
}.to publish_event('user_creates_beaver', nil).asserting do |payload|
expect(payload['beaver_name']).to match(/ea/)
end
end
Consuming events
The consumer defines to which events it will subscribe as well as how it handles events using the Event Routing DSL. In a rails app, this can be defined in the initialiser.
An Event
contains the following information:
-
metadata
Information about the event, including:-
environment
The environment in which the event was created (e.g. development, test, production) -
application
The name of the application that generated the event (as specified in rabbit_feed.yml) -
version
The version of the event payload (as specified in the event definition) -
name
The name of the event -
host
The hostname of the server on which the event was generated -
created_at_utc
The time (in UTC) that the event was created -
schema_version
The version of the event schema
-
-
payload
The payload of the event
Running the consumer
bundle exec rabbit_feed consume --environment development
More information about the consumer command line options can be found here.
Event Processing Errors
In the case that your consumer raises an error whilst processing an event, the error will be logged. If your project uses Airbrake (v5.0+) or Airbrake-Ruby, the error will also be reported there. The event that was being processed will remain on the RabbitMQ queue, and will be redelivered to the consumer until it is processed without error.
Testing the Consumer
If you want to test that your routes are behaving as expected without actually using RabbitMQ, you can invoke rabbit_feed_consumer.consume_event(event)
. The following is an example:
describe 'consuming events' do
accumulator = []
let(:define_route) do
EventRouting do
accept_from('application_name') do
event('event_name') do |event|
accumulator << event
end
end
end
end
let(:event) { RabbitFeed::Event.new({'application' => 'application_name', 'name' => 'event_name'}, {'stuff' => 'some_stuff'}) }
before { define_route }
it 'routes to the correct service' do
rabbit_feed_consumer.consume_event(event)
expect(accumulator.size).to eq(1)
end
end
To make the rabbit_feed_consumer
method available to your tests, add the following to spec_helper.rb
:
RSpec.configure do |config|
RabbitFeed::TestingSupport.setup(config)
end
Command Line Tools
Event Publish
bundle exec bin/rabbit_feed produce --payload 'Event payload' --name 'Event name' --environment test --config spec/fixtures/configuration.yml --logfile test.log --require rabbit_feed.rb --verbose
Publishes an event. Note: until you've specified the event definitions, this will not publish any events. Options are as follows:
--payload The payload of the event
--name The name of the event
--environment The environment to run in
--config The location of the rabbit_feed configuration file
--logfile The location of the log file
--require The project file containing the dependancies
--verbose Turns on DEBUG logging
--help Print the available options
Consumer
bundle exec bin/rabbit_feed consume --environment test --config spec/fixtures/configuration.yml --logfile test.log --require rabbit_feed.rb --pidfile rabbit_feed.pid --verbose --daemon
Starts a consumer. Note: until you've specified the event routing, this will not receive any events. Options are as follows:
--environment The environment to run in
--application The name of the application (used for routing events)
--config The location of the rabbit_feed configuration file
--logfile The location of the log file
--require The project file containing the dependancies (only necessary if running with non-rails application)
--pidfile The location at which to write a pid file
--verbose Turns on DEBUG logging
--daemon Run the consumer as a daemon
--help Print the available options
Stopping the consumer
bundle exec bin/rabbit_feed shutdown
This only applies if you've started the consumer with the --daemon
option.
Console Consumer
bundle exec bin/rabbit_feed console --environment development --config spec/fixtures/configuration.yml --logfile development.log --pidfile rabbit_feed.pid --verbose
The console consumer will accept any event from any application and will print the event metadata and payload to the console. This is useful during development to get instant feedback on the events being published.
Example Console Output
RabbitFeed console starting at 2016-02-08 16:22:22 UTC...
Environment: development
Queue: development.rabbit_feed_console
There are currently 4 message(s) in the console's queue.
Would you like to purge the queue before proceeding? (y/N)>
n
Ready. Press CTRL+C to exit.
----------------------------user_creates_beaver: 2016-02-08 16:19:30 UTC----------------------------
#Event metadata
application: rails_app
created_at_utc: 2016-02-08T16:19:30.530210Z
environment: development
host: localhost
name: user_creates_beaver
schema_version: 2.0.0
version: 1.0.0
****************************************************************************************************
#Event payload
beaver_name: 02/08/16 16:19:30
----------------------------------------------------------------------------------------------------
1 events received.
Event Definitions DSL
Provides a means to define all events that are published by an application. Defines the event names and the payload associated with each event. The DSL is converted into a schema that is serialized along with the event payload, meaning the events are self-describing. This is accomplished using Apache Avro. This also validates the event payload against its schema before it is published.
Event definitions are cumulative, meaning you can load multiple EventDefinitions
blocks.
Here is an example DSL:
EventDefinitions do
define_event('user_creates_beaver', version: '1.0.0') do
defined_as do
'A beaver has been created'
end
payload_contains do
field('beaver_name', type: 'string', definition: 'The name of the beaver')
end
end
define_event('user_updates_beaver', version: '1.0.0') do
defined_as do
'A beaver has been updated'
end
payload_contains do
field('beaver_name', type: 'string', definition: 'The name of the beaver')
end
end
end
This defines two events:
user_creates_beaver
user_updates_beaver
Each event has a mandatory string field in its payload, called beaver_name
.
The available field types are described here.
Publishing a user_creates_beaver
event would look like this:
RabbitFeed::Producer.publish_event 'user_creates_beaver', { 'beaver_name' => @beaver.name }
Event Routing DSL
Provides a means for consumers to specify to which events it will subscribe as well as how it handles events. This is accomplished using a custom DSL backed by a RabbitMQ topic exchange.
Event routing definitions are cumulative, meaning you can load multiple EventRouting
blocks.
Here is an example DSL:
EventRouting do
accept_from('beavers') do
event('user_creates_beaver') do |event|
puts event.payload
end
event('user_updates_beaver') do |event|
puts event.payload
end
end
end
This will subscribe to specified events originating from the beavers
application. We have specified that we would like to subcribe to user_creates_beaver
and user_updates_beaver
events. If either event type is received, we have specified that its payload will be printed to the screen.
When the consumer is started, it will create its queue named using this pattern: [environment].[consumer application name]
. It will bind the queue to the amq.topic
exchange on the routing keys as defined in the event routing. In this example, it will bind on:
environment.beavers.user_creates_beaver
environment.beavers.user_updates_beaver
Note: The consumer queues will automatically expire (delete) after 7 days without any consumer connections. This is to prevent unused queues from hanging around once their associated consumer has been terminated.
Wildcards
Applications that wish to handle events from any application or wish to handle any event can achieve this using the :any
key.
For example, the following would consume any event published by RabbitFeed:
EventRouting do
accept_from(:any) do
event(:any) do |event|
puts event.payload
end
end
end
In this example, the consumer queue will bind to the specified exchange on the following routing keys:
environment.*.*
Delivery Semantics
RabbitFeed provides 'at least once' delivery semantics. There are two use-cases where an event may be delivered more than once:
- If the subscriber raises an exception whilst processing an event, RabbitFeed will re-deliver the event to the subscriber until the event is processed without error.
- If an event is pushed to the subscriber, and the subscriber loses connectivity with RabbitMQ before it can send an acknowledgement back to RabbitMQ, RabbitMQ will push the event again once connectivity has been restored.
It is advisable to run RabbitFeed in a clustered RabbitMQ environment to prevent the loss of messages in the case that a RabbitMQ node is lost. By default, RabbitFeed will declare queues to be mirrored across all nodes of the cluster.
Thread Safety
RabbitFeed event publishing is thread-safe. This is achieved by only allowing one thread to publish an event at any given time.
Developing
See ./DEVELOPING.md for instructions on how to develop RabbitFeed