Quebert
Quebert is a ruby background worker library that works with the very fast and simple beanstalkd deamon.
Why Quebert?
Because it has really low latency. Other Ruby queuing frameworks, like DJ or Resque, have to poll their queue servers periodicly. You could think of it as a "pull" queue. Quebert is a "push" queue. It maintains a persistent connection with beanstalkd and when is enqueued, it's instantly pushed to the workers and executed.
Sidekiq uses Redis's "push" primitives so it has low latency, but it doesn't support class reloading in a development environment. Sidekiq is also threaded, which means there are no guarantees of reliability when running non-threadsafe code.
Backburner is very similar to Quebert. It offers more options for concurrency (threading, forking, etc.) than Quebert but lacks pluggable back-ends, which means you'll be stubbing and mocking async calls.
Who uses it?
Quebert is a serious project. It's used in a production environment at Poll Everywhere to handle everything from SMS message processing to account downgrades.
Features
- Multiple back-ends (InProcess, Sync, and Beanstalk)
- Rails/ActiveRecord integration similar to async_observer
- Pluggable exception handling (for Hoptoad integration)
- Run workers with pid, log, and config files. These do not daemonize (do it yourself punk!)
- Custom hooks that may be called before, after & around jobs are run
Some features that are currently missing that I will soon add include:
- Rails plugin support (The AR integrations have to be done manually today)
- Auto-detecting serializers. Enhanced ClassRegistry to more efficiently look up serializers for objects.
How to use
There are two ways to enqueue jobs with Quebert: through the Job itself, provided you set a default back-end for the job, or put it on the backend.
Supported Ruby Versions
Quebert officially is supported to run on the currently supported versions of MRI.
This includes versions >= 2.4.3
. Have a look at the .travis.yml
configuration file to see all Ruby versions we support.
Jobs
Quebert includes a Job class so you can implement how you want certain types of Jobs performed.
Quebert.backend = Quebert::Backend::InProcess.new
class WackyMathWizard < Quebert::Job
def perform(*nums)
nums.inject(0){|sum, n| sum = sum + n}
end
end
You can either drop a job in a queue:
Quebert.backend.put WackyMathWizard.new(1, 2, 3)
Or drop it in right from the job:
# Run job right away!
WackyMathWizard.new(4, 5, 6).enqueue
# Run a lower priority job in 10 seconds for a max of 120 seconds
WackyMathWizard.new(10, 10, 10).enqueue(ttr: 120, priority: 100, delay: 10)
Then perform the jobs!
Quebert.backend.reserve.perform # => 6
Quebert.backend.reserve.perform # => 15
Quebert.backend.reserve.perform # => 30
Rails integration
config/quebert.yml:
development:
backend: beanstalk
host: localhost:11300
queue: myapp-development
test:
backend: sync
# etc.
config/initializers/quebert.rb:
Quebert.config.from_hash(Rails.application.config.quebert)
Quebert.config.logger = Rails.logger
Global Job hooks
Quebert has support for providing custom hooks to be called before, after & around your jobs are being run. A common example is making sure that any active ActiveRecord database connections are put back on the connection pool after a job is done:
Quebert.config.after_job do
ActiveRecord::Base.clear_active_connections!
end
Quebert.config.before_job do |job|
# all hooks take an optional job argument
# in case you want to do something with that job
end
Quebert.config.around_job do |job|
# this hook gets called twice
# once before & once after a job is performed
end
Beanstalk Job hooks
Jobs can define their own business logic that will get called surrounding Beanstalk events:
class FooJob < Quebert::Job
def around_bury
# custom pre-bury code
yield
# custom post-bury code
end
end
Supported Beanstalk event hooks: around_bury
, around_release
, around_delete
Async sender
Take any ol' class and include the Quebert::AsyncSender.
Quebert.backend = Quebert::Backend::InProcess.new
class Greeter
include Quebert::AsyncSender::Class
def initialize(name)
@name = name
end
def sleep_and_greet(time_of_day)
sleep 10000 # Sleeping, get it?
"Oh! Hi #{name}! Good #{time_of_day}."
end
def self.budweiser_greeting(name)
"waaazup #{name}!"
end
end
walmart_greeter = Greeter.new("Brad")
Remember the send method in ruby?
walmart_greeter.send(:sleep_and_greet, "morning")
# ... time passes, you wait as greeter snores obnoxiously ...
# => "Oh! Hi Brad! Good morning."
What if the method takes a long time to run and you want to queue it? async.send it!
walmart_greeter.async.sleep_and_greet("morning")
# ... do some shopping and come back later when the dude wakes up
Quebert figures out how to serialize the class, throw it on a worker queue, re-instantiate it on the other side, and finish up the work.
Quebert.backend.reserve.perform # => "Oh! Hi Brad! Good morning."
# ... Sorry dude! I'm shopping already
Does it work on Class methods? Yeah, that was easier than making instance methods work:
Quebert.async.budweiser_greeting("Coraline")
Quebert.backend.reserve.perform # => "waazup Coraline!"
- Only basic data types are included for serialization. Serializers may be customized to include support for different types.
Backends
- Beanstalk: Enqueue jobs in a beanstalkd service. The workers run in a separate process. Typically used in production environments.
- Sync: Perform jobs immediately upon enqueuing. Typically used in testing environments.
- InProcess: Enqueue jobs in an in-memory array. A worker will need to reserve a job to perform.
Multiple queues
To start a worker pointed at a non-default queue (e.g., a Quebert "tube"), start the process with -q
:
bundle exec quebert -q other-tube
Then specify the queue name in your job:
class FooJob < Quebert::Job
def queue
"other-tube"
end
def perform(args)
# ...
end
end
Setting job defaults
A Quebert::Job
is a Plain Ol' Ruby Object. The defaults of a job, including its ttr
, queue_name
, and delay
may be overridden in a super class as follows:
# Assuming you're in Rails or using ActiveSupport
class FooJob < Quebert::Job
def ttr
5.minutes
end
def delay
30.seconds
end
def queue_name
"long-running-delayed-jobs"
end
def perform(args)
# ...
end
end
Take a look at the Quebert::Job
class code for more details on methods you may ovveride.