Backburner-allq is a allq-powered job queue that can handle a very high volume of jobs. You create background jobs and place them on multiple work queues to be processed later.
Processing background jobs reliably has never been easier than with allq and Backburner. This gem works with any ruby-based web framework, but is especially suited for use with Sinatra, Padrino and Rails.
If you want to use allq for your job processing, consider using Backburner. Backburner is heavily inspired by Resque and DelayedJob. Backburner stores all jobs as simple JSON message payloads. Persistent queues are supported when allq persistence mode is enabled.
Backburner supports multiple queues, job priorities, delays, and timeouts. In addition, Backburner has robust support for retrying failed jobs, handling error cases, custom logging, and extensible plugin hooks.
Why Backburner?
Backburner is well tested and has a familiar, no-nonsense approach to job processing, but that is of secondary importance. Let's face it, there are a lot of options for background job processing. DelayedJob, and Resque are the first that come to mind immediately. So, how do we make sense of which one to use? And why use Backburner over other alternatives?
The key to understanding the differences lies in understanding the different projects and protocols that power these popular queue libraries under the hood. Every job queue requires a queue store that jobs are put into and pulled out of. In the case of Resque, jobs are processed through Redis, a persistent key-value store. In the case of DelayedJob, jobs are processed through ActiveRecord and a database such as PostgreSQL.
You will quickly see that allq is an underrated but incredible project that is extremely well-suited as a job queue. Significantly better suited for this task than Redis or a database.
A single instance of Allq is perfectly capable of handling thousands of jobs a second (or more, depending on your job size) because it is an in-memory, event-driven system.
Allq supports the following features out of the box:
Feature | Description |
---|---|
Parallelized | Supports multiple work queues created on demand. |
Reliable | Beanstalk’s reserve, work, delete cycle ensures reliable processing. |
Scheduling | Delay enqueuing jobs by a specified interval to schedule processing later |
Fast | Processes thousands of jobs per second without breaking a sweat. |
Priorities | Specify priority so important jobs can be processed quickly. |
Persistence | Jobs are stored in memory for speed, but logged to disk for safe keeping. |
Federation | Horizontal scalability provided through federation by the client. |
Error Handling | Bury any job which causes an error for later debugging and inspection. |
Fair Queueing | Allows you to shard work across queues for fair client queuing. |
Workflows | Allows you tell Job C to wait for Job A and Job B to finish first. |
Keep in mind that these features are supported out of the box with allq and require no special code within this gem to support. In the end, allq is the ideal job queue while also being ridiculously easy to install and setup.
Installation
$ gem install backburner
Configuration
Backburner is extremely simple to setup. Just configure basic settings for backburner:
Backburner.configure do |config|
config.beanstalk_url = "allq://127.0.0.1:8090"
config.tube_namespace = "some.app.production"
config.namespace_separator = "."
config.on_error = lambda { |e| puts e }
config.max_job_retries = 3 # default 0 retries
config.retry_delay = 2 # default 5 seconds
config.retry_delay_proc = lambda { |min_retry_delay, num_retries| min_retry_delay + (num_retries ** 3) }
config.default_priority = 65536
config.respond_timeout = 120
config.default_worker = Backburner::Workers::Simple
config.logger = Logger.new(STDOUT)
config.primary_queue = "backburner-jobs"
config.priority_labels = { :custom => 50, :useless => 1000 }
config.reserve_timeout = nil
config.job_serializer_proc = lambda { |body| JSON.dump(body) }
config.job_parser_proc = lambda { |body| JSON.parse(body) }
end
The key options available are:
Option | Description |
---|---|
allq_url |
Address for allq connection i.e 'allq://127.0.0.1:8090' |
tube_namespace |
Prefix used for all tubes related to this backburner queue. |
namespace_separator |
Separator used for namespace and queue name |
on_error |
Lambda invoked with the error whenever any job in the system fails. |
max_job_retries |
Integer defines how many times to retry a job before burying. |
retry_delay |
Integer defines the base time to wait (in secs) between job retries. |
retry_delay_proc |
Lambda calculates the delay used, allowing for exponential back-off. |
default_priority |
Integer The default priority of jobs |
respond_timeout |
Integer defines how long a job has to complete its task |
default_worker |
Worker class that will be used if no other worker is specified. |
logger |
Logger recorded to when backburner wants to report info or errors. |
primary_queue |
Primary queue used for a job when an alternate queue is not given. |
priority_labels |
Hash of named priority definitions for your app. |
reserve_timeout |
Duration to wait for work from a single server, or nil for forever. |
job_serializer_proc |
Lambda serializes a job body to a string to write to the task |
job_parser_proc |
Lambda parses a task body string to a hash |
Usage
Backburner allows you to create jobs and place them onto any number of allq tubes, and later pull those jobs off the tubes and process them asynchronously with a worker.
Enqueuing Jobs
At the core, Backburner is about jobs that can be processed asynchronously. Jobs are simple ruby objects which respond to perform
.
Job objects are queued as JSON onto a tube to be later processed by a worker. Here's an example:
class NewsletterJob
# required
def self.perform(email, body)
NewsletterMailer.deliver_text_to_email(email, body)
end
# optional, defaults to 'backburner-jobs' tube
def self.queue
"newsletter-sender"
end
# optional, defaults to default_priority
def self.queue_priority
1000 # most urgent priority is 0
end
# optional, defaults to respond_timeout in config
def self.queue_respond_timeout
300 # number of seconds before job times out, 0 to avoid timeout. NB: A timeout of 1 second will likely lead to race conditions between Backburner and allq and should be avoided
end
# optional, defaults to retry_delay_proc in config
def self.queue_retry_delay_proc
lambda { |min_retry_delay, num_retries| min_retry_delay + (num_retries ** 5) }
end
# optional, defaults to retry_delay in config
def self.queue_retry_delay
5
end
# optional, defaults to max_job_retries in config
def self.queue_max_job_retries
5
end
end
You can include the optional Backburner::Queue
module so you can easily specify queue settings for this job:
class NewsletterJob
include Backburner::Queue
queue "newsletter-sender" # defaults to 'backburner-jobs' tube
queue_priority 1000 # most urgent priority is 0
queue_respond_timeout 300 # number of seconds before job times out, 0 to avoid timeout
def self.perform(email, body)
NewsletterMailer.deliver_text_to_email(email, body)
end
end
Jobs can be enqueued with:
Backburner.enqueue NewsletterJob, 'foo@admin.com', 'lorem ipsum...'
Backburner.enqueue
accepts first a ruby object that supports perform
and then a series of parameters
to that object's perform
method. The queue name used by default is {namespace}.backburner-jobs
unless otherwise specified.
You may also pass a lambda as the queue name and it will be evaluated when enqueuing a job (and passed the Job's class as an argument). This is especially useful when combined with "Simple Async Jobs" (see below).
Simple Async Jobs
In addition to defining custom jobs, a job can also be enqueued by invoking the async
method on any object which
includes Backburner::Performable
. Async enqueuing works for both instance and class methods on any performable object.
class User
include Backburner::Performable
queue "user-jobs" # defaults to 'user'
queue_priority 500 # most urgent priority is 0
queue_respond_timeout 300 # number of seconds before job times out, 0 to avoid timeout
def activate(device_id)
@device = Device.find(device_id)
# ...
end
def self.reset_password(user_id)
# ...
end
end
# Async works for instance methods on a persisted object with an `id`
@user = User.first
@user.async(:ttr => 100, :queue => "activate").activate(@device.id)
# ..and for class methods
User.async(:pri => 100, :delay => 10.seconds).reset_password(@user.id)
This automatically enqueues a job for that user record that will run activate
with the specified argument.
Note that you can set the queue name and queue priority at the class level and
you are also able to pass pri
, ttr
, delay
and queue
directly as options into async
.
The queue name used by default is {namespace}.backburner-jobs
if not otherwise
specified.
If a lambda is given for queue
, then it will be called and given the
performable object's class as an argument:
# Given the User class above
User.async(:queue => lambda { |user_klass| ["queue1","queue2"].sample(1).first }).do_hard_work # would add the job to either queue1 or queue2 randomly
Using Async Asynchronously
It's often useful to be able to configure your app in production such that every invocation of a method is asynchronous by default as seen in delayed_job. To accomplish this, the Backburner::Performable
module exposes two handle_asynchronously
convenience methods
which accept the same options as the async
method:
class User
include Backburner::Performable
def send_welcome_email
# ...
end
# ---> For instance methods
handle_asynchronously :send_welcome_email, queue: 'send-mail', pri: 5000, ttr: 60
def self.update_recent_visitors
# ...
end
# ---> For class methods
handle_static_asynchronously :update_recent_visitors, queue: 'long-tasks', ttr: 300
end
Now, all calls to User.update_recent_visitors
or User#send_welcome_email
will automatically be handled asynchronously when invoked. Similarly, you can call these methods directly on the Backburner::Performable
module to apply async behavior outside the class:
# Given the User class above
Backburner::Performable.handle_asynchronously(User, :activate, ttr: 100, queue: 'activate')
Now all calls to the activate
method on a User
instance will be async with the provided options.
A Note About Auto-Async
Because an async proxy is injected and used in place of the original method, you must not rely on the return value of the method. Using the example User
class above, if my send_welcome_email
returned the status of an email submission and I relied on that to take some further action, I will be surprised after rewiring things with handle_asynchronously
because the async proxy actually returns the (boolean) result of Backburner::Worker.enqueue
.
Working Jobs
Backburner workers are processes that run forever handling jobs that are reserved from the queue. Starting a worker in ruby code is simple:
Backburner.work
This will process jobs in all queues but you can also restrict processing to specific queues:
Backburner.work('newsletter-sender', 'push-notifier')
The Backburner worker also exists as a rake task:
require 'backburner/tasks'
so you can run:
$ QUEUE=newsletter-sender,push-notifier rake backburner:work
You can also run the backburner binary for a convenient worker:
bundle exec backburner -q newsletter-sender,push-notifier -d -P /var/run/backburner.pid -l /var/log/backburner.log
This will daemonize the worker and store the pid and logs automatically. For Rails and Padrino, the environment should
load automatically. For other cases, use the -r
flag to specify a file to require.
Delaying Jobs
In Backburner, jobs can be delayed by specifying the delay
option whenever you enqueue a job. If you want to schedule a job for an hour from now, simply add that option while enqueuing the standard job:
Backburner::Worker.enqueue(NewsletterJob, ['foo@admin.com', 'lorem ipsum...'], :delay => 1.hour)
or while you schedule an async method call:
User.async(:delay => 1.hour).reset_password(@user.id)
Backburner will take care of the rest!
Persistence
Jobs are persisted to queues as JSON objects. Let's take our User
example from above. We'll run the following code to create a job:
User.async.reset_password(@user.id)
The following JSON will be put on the {namespace}.backburner-jobs
queue:
{
'class': 'User',
'args': [nil, 'reset_password', 123]
}
The first argument is the 'id' of the object in the case of an instance method being async'ed. For example:
@device = Device.find(987)
@user = User.find(246)
@user.async.activate(@device.id)
would be stored as:
{
'class': 'User',
'args': [246, 'activate', 987]
}
Since all jobs are persisted in JSON, your jobs must only accept arguments that can be encoded into that format. This is why our examples use object IDs instead of passing around objects.
Named Priorities
As of v0.4.0, Backburner has support for named priorities. allq priorities are numerical but
backburner supports a mapping between a word and a numerical value. The following priorities are
available by default: high
is 0, medium
is 4, and low
is 9.
Priorities can be customized with:
Backburner.configure do |config|
config.priority_labels = { :custom => 1, :useful => 5 }
# or append to default priorities with
# config.priority_labels = Backburner::Configuration::PRIORITY_LABELS.merge(:foo => 5)
end
and then these aliases can be used anywhere that a numerical value can:
Backburner::Worker.enqueue NewsletterJob, ["foo", "bar"], :pri => :custom
User.async(:pri => :useful, :delay => 10.seconds).reset_password(@user.id)
Using named priorities can greatly simplify priority management.
Processing Strategies
In Backburner, there are several different strategies for processing jobs which are reflected by multiple worker subclasses. Custom workers can be defined fairly easily. By default, Backburner comes with the following workers built-in:
Worker | Description |
---|---|
Backburner::Workers::Simple |
Single threaded, no forking worker. Simplest option. |
Backburner::Workers::Forking |
Basic forking worker that manages crashes and memory bloat. |
Backburner::Workers::ThreadsOnFork |
Forking worker that utilizes threads for concurrent processing. |
Backburner::Workers::Threading |
Utilizes thread pools for concurrent processing. |
You can select the default worker for processing with:
Backburner.configure do |config|
config.default_worker = Backburner::Workers::Forking
end
or determine the worker on the fly when invoking work
:
Backburner.work('newsletter-sender', :worker => Backburner::Workers::ThreadsOnFork)
or through associated rake tasks with:
$ QUEUE=newsletter-sender,push-message THREADS=2 GARBAGE=1000 rake backburner:threads_on_fork:work
When running on MRI or another Ruby implementation with a Global Interpreter Lock (GIL), do not be surprised if you're unable to saturate multiple cores, even with the threads_on_fork worker. To utilize multiple cores, you must run multiple worker processes.
Additional concurrency strategies will hopefully be contributed in the future. If you are interested in helping out, please let us know.
More info: Threads on Fork Worker
For more information on the threads_on_fork worker, check out the
ThreadsOnFork Worker documentation. Please note that the ThreadsOnFork
worker does not work on Windows due to its lack of fork
.
More info: Threading Worker (thread-pool-based)
Configuration options for the Threading worker are similar to the threads_on_fork worker, sans the garbage option. When running via the backburner
CLI, it's simplest to provide the queue names and maximum number of threads in the format "{queue name}:{max threads in pool}[,{name}:{threads}]":
$ bundle exec backburner -q queue1:4,queue2:4 # and then other options, like environment, pidfile, app root, etc. See docs for the CLI
Default Queues
Workers can be easily restricted to processing only a specific set of queues as shown above. However, if you want a worker to process all queues instead, then you can leave the queue list blank.
When you execute a worker without any queues specified, queues for known job queue class with include Backburner::Queue
will be processed.
To access the list of known queue classes, you can use:
Backburner::Worker.known_queue_classes
# => [NewsletterJob, SomeOtherJob]
Dynamic queues created by passing queue options will not be processed by a default worker. For this reason, you may want to take control over the default list of
queues processed when none are specified. To do this, you can use the default_queues
class method:
Backburner.default_queues.concat(["foo", "bar"])
This will ensure that the foo and bar queues are processed by any default workers. You can also add job queue names with:
Backburner.default_queues << NewsletterJob.queue
The default_queues
stores the specific list of queues that should be processed by default by a worker.
Failures
When a job fails in backburner (usually because an exception was raised), the job will be released
and retried again until the max_job_retries
configuration is reached.
Backburner.configure do |config|
config.max_job_retries = 3 # retry jobs 3 times
config.retry_delay = 2 # wait 2 seconds in between retries
end
Note the default max_job_retries
is 0, meaning that by default jobs are not retried.
As jobs are retried, a progressively-increasing delay is added to give time for transient
problems to resolve themselves. This may be configured using retry_delay_proc
. It expects
an object that responds to #call
and receives the value of retry_delay
and the number
of times the job has been retried already. The default is a cubic back-off, eg:
Backburner.configure do |config|
config.retry_delay = 2 # The minimum number of seconds a retry will be delayed
config.retry_delay_proc = lambda { |min_retry_delay, num_retries| min_retry_delay + (num_retries ** 3) }
end
If continued retry attempts fail, the job will be buried and can be 'kicked' later for inspection.
You can also setup a custom error handler for jobs using configure:
Backburner.configure do |config|
config.on_error = lambda { |ex| Airbrake.notify(ex) }
end
Now all backburner queue errors will appear on airbrake for deeper inspection.
If you wish to retry a job without logging an error (for example when handling transient issues in a cloud or service oriented environment),
simply raise a Backburner::Job::RetryJob
error.
Logging
Logging in backburner is rather simple. When a job is run, the log records that. When a job fails, the log records that. When any exceptions occur during processing, the log records that.
By default, the log will print to standard out. You can customize the log to output to any standard logger by controlling the configuration option:
Backburner.configure do |config|
config.logger = Logger.new(STDOUT)
end
Be sure to check logs whenever things do not seem to be processing.
Hooks
Backburner is highly extensible and can be tailored to your needs by using various hooks that can be triggered across the job processing lifecycle. Often using hooks is much easier then trying to monkey patch the externals.
Check out HOOKS.md for a detailed overview on using hooks.
Workers in Production
Once you have Backburner setup in your application, starting workers is really easy. Simply add the task to your Rakefile:
# Rakefile
require 'backburner/tasks'
and then you can start the rake task with:
$ rake backburner:work
$ QUEUE=newsletter-sender,push-notifier rake backburner:work
The best way to deploy these rake tasks is using a monitoring library. We suggest God which watches processes and ensures their stability. A simple God recipe for Backburner can be found in examples/god.
Command-Line Interface
Instead of using the Rake tasks, you can use Backburner's command-line interface (CLI) – powered by the Dante gem – to launch daemonized workers. Several flags are available to control the process. Many of these are provided by Dante itself, such as flags for logging (-l
), the process' PID (-P
), whether to daemonize (-d
) or kill a running process (-k
). Backburner provides a few more:
Queues (-q
)
Control which queues the worker will watch with the -q
flag. Comma-separate multiple queue names and, if you're using the ThreadsOnFork
worker, colon-separate the settings for thread limit, garbage limit and retries limit (eg. send_mail:4:10:3
). See its wiki page for some more details.
backburner -q send_mail,create_thumbnail # You may need to use `bundle exec`
Boot an app (-r
)
Load an app with the -r
flag. Backburner supports automatic loading for both Rails and Padrino apps when started from the their root folder. However, you may point to a specific app's root using this flag, which is very useful when running workers from a service script.
path="/var/www/my-app/current"
backburner -r "$path"
Load an environment (-e
)
Use the -e
flag to control which environment your app should use:
environment="production"
backburner -e $environment
Reconnecting
In Backburner, if the allq connection is temporarily severed, several retries to establish the connection will be attempted.
You can manually catch this exception, and attempt another manual retry using Backburner::Worker.retry_connection!
.
Web Front-end
Be sure to check out the Sinatra-powered project beanstalkd_view by denniskuczynski which provides an excellent overview of the tubes and jobs processed by your beanstalk workers. An excellent addition to your Backburner setup.
Acknowledgements
- Nathan Esquenazi - Project maintainer
- Dave Myron - Multiple features and doc improvements
- Kristen Tucker - Coming up with the gem name
- Tim Lee, Josh Hull, Nico Taing - Helping me work through the idea
- Miso - Open-source friendly place to work
- Evgeniy Denisov - Multiple fixes and cleanups
- Andy Bakun - Fixes to how multiple beanstalkd instances are processed
- Renan T. Fernandes - Added threads_on_fork worker
- Daniel Farrell - Added forking worker
Contributing
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Added some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request
References
The code in this project has been made in light of a few excellent projects:
Thanks to these projects for inspiration and certain design and implementation decisions.
Links
- Code:
git clone git://github.com/nesquena/backburner.git
- Home: http://github.com/nesquena/backburner
- Docs: http://rdoc.info/github/nesquena/backburner/master/frames
- Bugs: http://github.com/nesquena/backburner/issues
- Gems: http://gemcutter.org/gems/backburner