ActionSubscriber
ActionSubscriber is a DSL for for easily intergrating your Rails app with a RabbitMQ messaging server.
Requirements
I test on Ruby 2.2.1 and Jruby 9.x. MRI 1.9 and jRuby 1.7 are still supported.
If you want to use MRI 1.9 you will need to lock down the amq-protocol
and bunny
gems to < 2.0
since they both require ruby 2.0+.
Migrating from ActionSubscriber 3.X or earlier
If you were using the --mode=pop
from the 2.X or 3.X version of ActionSubscriber you can get the same sort of behavior by drawing your routes like this:
::ActionSubscriber.draw_routes do
# instead of creating custom threadpools you set the threadpool size of your connection here in the routes
# you can set the threadpool size for the default connection via the `::ActionSubscriber.configuration.threadpool_size = 16`
route UserSubscriber, :created,
:prefetch => 1,
:concurrency => 16,
:acknowledgements => true
# in user_subscriber.rb make sure to set `at_most_once!` like this
#
# class UserSubscriber < ::ActionSubscriber::Base
# at_most_once!
# end
# If you were previously using custom threadpools for different routes you can mimic that behavior by opening multiple connections
connection(:slow_work, :threadpool_size => 32) do
route UserSubscriber, :created,
:prefetch => 1,
:concurrency => 32,
:acknowledgements => true
end
end
That will give you a similar behavior to the old --mode=pop
where messages polled from the server, but with reduced latency.
Supported Message Types
ActionSubscriber support JSON and plain text out of the box, but you can easily add support for any custom message type.
Example
A subscriber is set up by creating a class that inherits from ActionSubscriber::Base.
class UserSubscriber < ::ActionSubscriber::Base
def created
# do something when a user is created
end
end
checkout the examples dir for more detailed examples.
Usage
In your application setup you will draw your subscription routes. In a rails app this is usually done in config/initializers/action_subscriber.rb
.
::ActionSubscriber.draw_routes do
# you can define routes one-by-one for fine-grained controled
route UserSubscriber, :created
# or you can setup default routes for all the public methods in a subscriber
default_routes_for UserSubscriber
end
Now you can start your subscriber process with:
$ bundle exec action_subscriber start
This will connect your subscribers to the rabbitmq broker and allow it to push messages down to your subscribers.
Around Filters
"around" filters are responsible for running their associated actions by yielding, similar to how Rack middlewares work (and Rails around filters work)
class UserSubscriber < ::ActionSubscriber::Base
around_filter :log_things
def created
# do something when a user is created
end
private
def log_things
puts "before I do some stuff"
yield
puts "I did some stuff"
end
end
Warning: an around filter will only be added once to the chain, duplicate around filters are not supported
Configuration
ActionSubscriber needs to know how to connect to your rabbit server to start getting messages.
In an initializer, you can set the host and the port like this :
ActionSubscriber.configure do |config|
config.hosts = ["rabbit1", "rabbit2", "rabbit3"]
config.port = 5672
end
Other configuration options include :
- config.add_decoder - add a custom decoder for a custom content type
- config.allow_low_priority_methods - Subscribe to
*_low
queues in addition to normal queues. - config.connection_reaping_interval - Connection reaping interval when using a project ActiveRecord
- config.connection_reaping_timeout_interval - Connection reaping timeout interval when using a project ActiveRecord
- config.default_exchange - set the default exchange that your queues will use, using the default RabbitMQ exchange is not recommended
- config.error_handler - handle error like you want to handle them!
- config.heartbeat - number of seconds between hearbeats (default 5) see bunny documentation for more details
- config.hosts - an array of hostnames in your cluster (ie
["rabbit1.myapp.com", "rabbit2.myapp.com"]
) - config.network_recovery_interval - reconnection interval for TCP connection failures (default 1)
- config.password - RabbitMQ password (default "guest")
- config.prefetch - number of messages to hold in the local queue in subscriber mode
- config.resubscribe_on_consumer_cancellation - resubscribe when the consumer is cancelled (queue deleted or cluster fails, default true)
- config.seconds_to_wait_for_graceful_shutdown - time to wait before force stopping server after shutdown signal
- config.threadpool_size - set the number of threads available to action_subscriber
- config.timeout - how many seconds to allow rabbit to respond before timing out
- config.tls - true/false whether to use TLS when connecting to the server
- config.tls_ca_certificats - a list of ca certificates to use for verifying the servers TLS certificate
- config.tls_cert - a client certificate to use during the TLS handshake
- config.tls_key - a key to use during the TLS handshake
- config.username - RabbitMQ username (default "guest")
- config.verify_peer - whether to attempt to validate the server's TLS certificate
- config.virtual_host - RabbitMQ virtual host (default "/")
Note: TLS is not handled identically in
bunny
andmarch_hare
. The configuration options we provide are passed through as provided. For details on expected behavior please check thebunny
ormarch_hare
documentation based on whether you are running in MRI or jRuby.
Message Acknowledgment
no_acknolwedgement!
This mode is the default. Rabbit is told to not expect any message acknowledgements so messages will be lost if an error occurs. This also allows the broker to send messages as quickly as it wants down to your subscriber.
Warning: If messages arrive very quickly this could cause your process to crash as your memory fills up with unprocessed message. We highly recommend you use
at_least_once!
mode to provide a throttle so the broker does not overwhelm your process with messages.
manual_acknowledgement!
This mode leaves it up to the subscriber to handle acknowledging or rejecting messages. In your subscriber you can just call acknowledge
, reject
, or nack
.
at_most_once!
Rabbit is told to expect message acknowledgements, but sending the acknowledgement is left up to ActionSubscriber. We send the acknowledgement right before calling your subscriber.
at_least_once!
Rabbit is told to expect message acknowledgements, but sending the acknowledgement is left up to ActionSubscriber. We send the acknowledgement right after calling your subscriber. If an error is raised your message will be retried on a sent back to rabbitmq and retried on an exponential backoff schedule.
safe_nack
If you turn on acknowledgements and a message is not acknowledged by your code manually or using one of the filters above the ErrorHandler
middleware
which wraps the entire block with call nack
this is a last resort so the connection does not get backed up in cases of unexpected or
unhandled errors.
redeliver
A message can be sent to "redeliver" with ::ActionSubscriber::MessageRetry.redeliver_message_with_backoff
or the DSL method redeliver
and optionally
takes a "backoff schedule" which is a hash of backoff milliseconds for each redeliver, the default:
SCHEDULE = {
2 => 100,
3 => 500,
4 => 2_500,
5 => 12_500,
6 => 62_500,
7 => 312_500,
8 => 1_562_500,
9 => 7_812_500,
10 => 39_062_500,
}
when the schedule "returns" nil
the message will not be retried
Warning: If you use
redeliver
you need to handle reject/acknowledge according how errors are handled; if an error is caught and the ack/reject is already done then you may duplicate the message inat_least_once!
mode
Testing
ActionSubscriber includes support for easy unit testing with RSpec.
In your spec_helper.rb:
require 'action_subscriber/rspec'
RSpec.configure do |config|
config.include ::ActionSubscriber::RSpec
end
In your_subscriber_spec.rb :
subject { mock_subscriber }
Your test subject will be an instance of your subscriber class, and you can easily test your public methods without dependence on data from Rabbit. You can optionally pass data for your mock subscriber to consume if you wish.
subject { mock_subscriber(:header => "test_header", :payload => "payload") }
Development
If you want to work on action_subscriber
you will need to have a rabbitmq instance running locally on port 5672 with a management plugin enabled on port 15672. Usually the easiest way to accomplish this is to use docker and run the command:
$ docker run --net=host --rm=true --hostname diagon --name rabbit rabbitmq:3.6.6-management
Now that rabbitmq is running you can clone this project and run:
$ cd action_subscriber
$ bundle install
$ bundle exec rspec