Emque::Consuming
Emque Consuming is a Ruby application framework that includes everything needed to create and run application capable of consuming messages from a message broker in a Pub/sub architecture. Messages can be produced with the emque-producing library.
Adapters
We currently only support RabbitMQ. If you would like to add your own adapter, take a look at the adapters directory.
Installation
Add this line to your application's Gemfile:
gem "emque-consuming"
# make sure you have bunny for rabbitmq unless you're using a custom adapter
gem "bunny", "~> 2.11.0"
And then execute:
$ bundle
Or install it yourself as:
$ gem install emque-consuming
Setup
Easy
Using the new
generator is the easiest way to get up and running quickly.
emque <options> new name
This command will create a directory "name" with barebones directories and files you'll need to get started. You can also use the following command line options to customize your application's configuration. Options will be added to the config/application.rb file generated.
emque <options> (start|stop|new|console|help) <name (new only)>
# ...
-S, --socket PATH PATH to the application's unix socket
-b, --bind IP:PORT IP & port for the http status application to listen on.
# ...
-e, --error-limit N Set the max errors before application suicide
-s, --status Run the http status application
-x, --error-expiration SECONDS Expire errors after SECONDS
--app-name NAME Run the application as NAME
# ...
Automatic Error Retries
In 1.2.0 the ability to automatically retry specific types of errors was introduced. This depends on the Delayed Message Exchange plugin. After installing the plugin, you'll need to configure emque consuming to utilize the plugin:
config.enable_delayed_message = true # turn on retryable errors, defaults to false
config.retryable_errors = ["ExampleError"] # a comma delimited array of strings matching the error, defaults to any empty array ([])
config.retryable_error_limit = 4 # the number of retries to use before dead lettering the message, defaults to 3
config.delayed_message_workers = 3 # the number of workers processing delayed messages, defaults to 1
This will now allow you to retry known errors with an exponential backoff, which should help with transient errors!
Custom
Configure Emque::Consuming in your config/application.rb file. Here is an example:
# config/application.rb
require "emque/consuming"
module Example
class Application
include Emque::Consuming::Application
initialize_core!
config.set_adapter(:rabbit_mq, :url => ENV["RABBITMQ_URL"], :prefetch => 10)
# customize the error thresholds
# config.error_limit = 10
# config.error_expiration = 300
# enable the status application
# config.status = :on
# errors will be logged, but if more is needed, that can be added
# config.error_handlers << Proc.new {|ex, context|
# send an email, send to HoneyBadger, etc
# }
# do something when shutdown occurs
# config.shutdown_handlers << Proc.new { |context|
# notify slack, pagerduty or send an email
# }
end
end
You'll also want to set up a routes.rb file:
# config/routes.rb
Example::Application.router.map do
topic "events" => EventsConsumer do
map "events.new" => "new_event"
# ...
end
# ...
end
and a consumer for each topic:
# app/consumers/events_consumer.rb
class EventsConsumer
include Emque::Consuming.consumer
def new_event(message)
# NOTE: message is an immutable Virtus (https://github.com/solnic/virtus) Value Object.
# Check it out here: https://github.com/emque/emque-consuming/blob/main/lib/emque/consuming/message.rb
# You don't have to use pipe (https://github.com/teamsnap/pipe-ruby), but we love it!
pipe(message, :through => [
:shared_action, :do_something_with_new_event
])
end
private
def shared_action(message)
# ...
end
def do_something_with_new_event(message)
# ...
end
end
Usage
Emque::Consuming provides a command line interface:
$ bundle exec emque help
emque <options> (start|stop|new|console|help) <name (new only)>
-P, --pidfile PATH Store pid in PATH
-S, --socket PATH PATH to the application's unix socket
-b, --bind IP:PORT IP & port for the http status application to listen on.
-d, --daemon Daemonize the application
-e, --error-limit N Set the max errors before application suicide
-s, --status Run the http status application
-x, --error-expiration SECONDS Expire errors after SECONDS
--app-name NAME Run the application as NAME
--env (ex. production) Set the application environment, overrides EMQUE_ENV
--auto-shutdown (false|true) Enable or disable auto shutdown on reaching the error limit
and a series of rake commands:
$ bundle exec rake -T
rake emque:configuration # Show the current configuration of a running instance (accepts SOCKET)
rake emque:console # Start a pry console
rake emque:errors:clear # Clear all outstanding errors (accepts SOCKET)
rake emque:errors:expire_after # Change the number of seconds to SECONDS before future errors expire (accepts SOCKET)
rake emque:errors:limit:down # Decrease the error limit (accepts SOCKET)
rake emque:errors:limit:up # Increase the error limit (accepts SOCKET)
rake emque:restart # Restart the workers inside a running instance (does not reload code; accepts SOCKET)
rake emque:routes # Show the available routes
rake emque:start # Start a new instance (accepts PIDFILE, DAEMON)
rake emque:status # Show the current status of a running instance (accepts SOCKET)
rake emque:stop # Stop a running instance (accepts SOCKET)
To use the rake commands, add the following lines to your application's Rakefile:
require_relative "config/application"
require "emque/consuming/tasks"
Tests
Testing is a bit sparse at the moment, but we're working on it.
To run tests...
osnp run setup
osnp run start
bundle install
bundle exec rspec
Contributing
FIRST: Read our style guides at https://github.com/teamsnap/guides/tree/master/ruby
- Fork it ( http://github.com/emque/emque-consuming/fork )
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request