:advanced_sneakers
adapter for ActiveJob
Drop-in replacement for :sneakers
adapter of ActiveJob. Extra features:
- Tries to handle unrouted messages
- Respects
queue_as
of ActiveJob and defines consumer class per RabbitMQ queue - Supports custom message options
- Allows to run ActiveJob consumers separately from native Sneakers consumers
- Support for
delayed jobs
GuestsCleanupJob.set(wait: 1.week).perform_later(guest)
- Exponential backoff*
- Exposes
#delivery_info
&#headers
AMQP metadata to job
Installation
Add this line to your application's Gemfile:
gem 'advanced-sneakers-activejob'
And then execute:
$ bundle
Or install it yourself as:
$ gem install advanced-sneakers-activejob
Usage
config.active_job.queue_adapter = :advanced_sneakers
Run worker for all queues of ActiveJob
rake sneakers:active_job
Run worker for picked queues of ActiveJob
QUEUES=mailers,foo,bar rake sneakers:active_job
Wildcards are supported for queues names with "words" (separator is .
). Algorithm is similar to the way the topic exchange matches routing keys. *
(star) substitutes for exactly one word. #
(hash) substitutes for zero or more words
QUEUES=mailers,index.*,telemetery.# rake sneakers:active_job
Unrouted messages
If message is published before routing has been configured (e.g. by consumer), it might be lost. To mitigate this problem the adapter uses :mandatory option for publishing messages. RabbitMQ returns unrouted messages back and the publisher is able to handle them:
- Create queue
- Create binding
- Re-publish message
There is a setting handle_unrouted_messages
in configuration to disable this behavior. If it is disabled, publisher will only log unrouted messages.
Take into accout that this process is asynchronous. It means that in case of network failures or process exit unrouted messages could be lost. The adapter tries to postpone application exit up to 5 seconds in case if there are unrouted messages, but it does not provide any guarantees.
Delayed messages are not handled! If job is delayed GuestsCleanupJob.set(wait: 1.week).perform_later(guest)
and there is no proper routing defined at the moment of job execution, it would be lost.
Custom message options
Advanced sneakers adapter allows to set custom message options (e.g. routing keys) on class-level.
class MyJob < ActiveJob::Base
queue_as :some_name
message_options routing_key: 'my.custom.routing.key',
headers: { 'foo' => 'bar' }
def perform(params)
# ProcessData.new(params).call
end
end
Procs are also supported
class MyJob < ActiveJob::Base
queue_as :some_name
message_options routing_key: ->(job) { "process_user_data.#{job.arguments.first.vip? ? 'urgent' : 'regular' }" }
def perform(user)
# ProcessUserData.new(user).call
end
end
And also supports custom message options per job
MyJob.set(priority: 1, headers: { 'foo' => 'bar' }).perform_later('baz')
Read more about message properties:
- https://www.rabbitmq.com/publishers.html#message-properties
- http://reference.rubybunny.info/Bunny/Exchange.html#publish-instance_method
Take into accout that custom message options are used for publishing only.
How to separate ActiveJob consumers
Sneakers comes with rake sneakers:run
task, which would run all consumers (including ActiveJob ones). If you need to run native sneakers consumers apart from ActiveJob consumers:
- Set
activejob_workers_strategy
to:exclude
in configuration - Run
rake sneakers:run
task to run native Sneakers consumers - Run
rake sneakers:active_job
task to run ActiveJob consumers
Tip: if you want to see how consumers are grouped, exec Sneakers::Worker::Classes
in rails console.
Exponential backoff*
The adapter enforces AdvancedSneakersActiveJob::Handler
for ActiveJob consumers. This handler applies exponential backoff if failure is not handled by ActiveJob rescue_from
/retry_on
/discard_on
.
Error name is tracked in x-last-error-name
, error full message is tracked in x-last-error-details
gzipped & encoded by Base64. To decode error details:
ActiveSupport::Gzip.decompress(Base64.decode64(data_from_header))
* For RabbitMQ queues amount optimization exponential backoff is not calculated by formula, but predifined. You can customize retry_delay_proc
in configuration
AMQP metadata
Each message in AMQP comes with delivery_info
and headers
. :advanced_sneakers
adapter provides them on job level.
class SomeComplexJob < ActiveJob::Base
before :perform do |job|
# metadata is available in callbacks
logger.debug({delivery_info: job.delivery_info, headers: job.headers})
end
def perform(msg)
# metadata is available here as well
logger.debug({delivery_info: delivery_info, headers: headers})
end
end
Configuration
AdvancedSneakersActiveJob.configure do |config|
# Should AdvancedSneakersActiveJob try to handle unrouted messages?
# There are still no guarantees that unrouted message is not lost in case of network failure or process exit.
# Delayed unrouted messages are not handled.
config.handle_unrouted_messages = true
# Should Sneakers build-in runner (e.g. `rake sneakers:run`) run ActiveJob consumers?
# :include - yes
# :exclude - no
# :only - Sneakers runner will run _only_ ActiveJob consumers
#
# This setting might be helpful if you want to run ActiveJob consumers apart from native Sneakers consumers.
# In that case set strategy to :exclude and use `rake sneakers:run` for native and `rake sneakers:active_job` for ActiveJob consumers
config.activejob_workers_strategy = :include
# All delayed messages delays are rounded to seconds.
config.delay_proc = ->(timestamp) { (timestamp - Time.now.to_f).round } } # integer result is expected
# Delayed queues can be filtered by this prefix (e.g. delayed:60 - queue for messages with 1 minute delay)
config.delayed_queue_prefix = 'delayed'
# Custom sneakers configuration for ActiveJob publisher & runner
config.sneakers = {
connection: Bunny.new('CUSTOM_URL', with: { other: 'options' }),
exchange: 'activejob',
handler: AdvancedSneakersActiveJob::Handler
}
# Define custom delay for retries, but remember - each unique delay leads to new queue on RabbitMQ side
config.retry_delay_proc = ->(count) { AdvancedSneakersActiveJob::EXPONENTIAL_BACKOFF[count] }
# Connection for publisher (fallbacks to connection of consumers)
config.publish_connection = Bunny.new('CUSTOM_URL', with: { other: 'options' })
# Log level of "rake sneakers:active_job" output
config.log_level = :info
end
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/veeqo/advanced-sneakers-activejob.
License
The gem is available as open source under the terms of the MIT License.
About Veeqo
At Veeqo, our team of Engineers is on a mission to create a world-class Inventory and Shipping platform, built to the highest standards in best coding practices. We are a growing team, looking for other passionate developers to join us on our journey. If you're looking for a career working for one of the most exciting tech companies in ecommerce, we want to hear from you.