Minion: super simple job queue over amqp
Minion makes processing jobs over AMQP simple and easy.
Warning
I got issues with this minion version with latest rabbit (>1.0.8) and AMQP, getting errors at setup. I do not use anymore this code, using directly the AMQP code.
Setup
Minion pulls the AMQP credentials out the environment via AMQP_URL.
$ export AMQP_URL="amqp://johndoe:abc123@localhost/my_vhost"
Alternativly you can explicitly set it programmatically like this:
Minion.amqp_url = "amqp://johndoe:abc123@localhost/my_vhost"
If no URL is supplied, Minion defaults to "amqp://guest:guest@localhost/" which is the default credentials for Rabbitmq running locally.
Principles
Minion treats your jobs with respect. The queues are durable and not autodelete. When popping jobs off the queue, they will not receive an ack until the job is done. You can rest assured that once queued, the job will not be lost.
Sends are done synchronously and receives are done asynchronously. This allows you to Minion.enqueue() from the console, or in a mongrel and you don't need to worry about eventmachine. It also means that when enqueue returns, the AMQP server has received your message. Daemons set to receive messages however use eventmachine.
Message processing is done one at a time (prefetch 1). If you want tasks done in parallel, run two minions.
Push a job onto the queue
Its easy to push a job onto the queue.
Minion.enqueue("make.sandwich", { "for" => "me", "with" => "bread" })
Minion expects a queue name (and will create it if needed). The second argument needs to be a hash.
Processing a job
require 'mb-minion'
include Minion
job "make.sandwich" do |args|
Sandwich.make(args["for"],args["with"])
end
Chaining multiple steps
If you have a task that requires more than one step just pass an array of queues when you enqueue.
Minion.enqueue([ "make.sandwich", "eat.sandwich" ], "for" => "me")
job "make.sandwich" do
# this return value is merged with for => me and sent to the next queue
{ "type" => "ham on rye" }
end
job "eat.sandwich" do |msg|
puts "I have #{msg.content["type"]} sandwich for #{msg.content["me"]}"
end
Conditional Processing
If you want a minion worker to only subscribe to a queue under specific conditions there is a :when parameter that takes a lambda as an argument. For example, if you had a queue that makes sandwiches but only if there is bread on hand, it would be.
job "make.sandwich", :when => lambda { not Bread.out? } do
Sandwich.make
end
Batch Processing
If you want a minion worker to subscribe to a queue and batch messages together you can use the "batch" options. This will group messages into groups of "batch_size" unless there are too few messages available.
job "make.sandwich", :batch_size => 5 do |msg|
Sandwich.make_a_bunch msg.batch
end
If you want your worker to wait until the exact batch_size is reached, then tell it so:
job "make.sandwich", :batch_size => 5, :wait => true do |msg|
Sandwich.make_a_bunch msg.batch
end
That'll wait indefinitely, but maybe you'll give up after a bit. Just tell it how many seconds:
job "make.sandwich", :batch_size => 5, :wait => 5 do |msg|
Sandwich.make_a_bunch msg.batch
end
This is especially helpful since any short delay can create some weird batch sizes. See the examples for more information.
Error handling
When an error is thrown in a job handler, the job is requeued to be done later and the minion process exits. If you define an error handler, however, the error handler is run and the job is removed from the queue.
error do |e|
puts "got an error! #{e}"
end
Logging
Minion logs to stdout via "puts". You can specify a custom logger like this:
logger do |msg|
puts msg
end
Testing
When running the Minion test suite you will need to have a RabbitMQ instance running locally with the default admin user "guest" still intact.
Meta
Created by Orion Henry
Patches contributed by Adam Wiggins, Kyle Drake
Released under the MIT License: www.opensource.org/licenses/mit-license.php