Project

inst-jobs

0.05
A long-lived project that still receives updates
Instructure-maintained fork of delayed_job
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Project Readme

Instructure Delayed Jobs

Build Status

This gem was forked from delayed_job in late 2010. While we have tried to maintain compatibility with delayed_job where possible, so much code has been added and rewritten that you should approach this as a distinct library.

It's still useful to highlight the primary differences with delayed_job, for those familiar with it:

  • inst-jobs was adapted for Canvas LMS, where it has been battle-hardened over the last 5+ years, scaling with Canvas from zero to tens of millions of jobs run per day.
    • To achieve this we are using some PostgreSQL specific features, which means support for MySQL and other ActiveRecord backends has been dropped.
    • Pushing and popping from the queue is very highly optimized, for a SQL-based queue. A typical PostgreSQL database running on a c3.4xlarge EC2 instance can handle queueing and running more than 11 million jobs per day while staying below 30% CPU.
    • The architecture is designed to support a mix of long-running (even multi-hour) jobs alongside large numbers of very short (less than one second) jobs.
  • Daemon management is highly reliable.
    • Dead workers will be restarted, and any jobs they were working on will go through the normal failure handling code.
  • Reliable and distributed "Cron" style jobs through the built-in periodic jobs functionality.
  • Strands, allowing for ordered sequences of jobs based on ad-hoc name tags.
  • N-Strands, building on strands, allowing one to throttle how many jobs on a strand can be running concurrently.
  • Singleton jobs, where at most one job is running, and one queued at a time.
  • A simple jobs admin UI, usable by any Rails or Rack application.
  • A separate failed_jobs table for tracking failed jobs.
  • Automatic tracking of what code enqueued each job, if Marginalia is enabled.

Installation

inst-jobs requires Rails 3.2 or above, and Ruby 2.1 or above. It is tested through Rails 5.0 and Ruby 2.3.

Add this line to your Rails application's Gemfile:

gem 'inst-jobs'

And then execute:

$ bundle

Or install it yourself as:

$ gem install inst-jobs

Setup

ActiveRecord Backend

If you are using the ActiveRecord backend, you'll need to install and run the migrations:

$ rake delayed_engine:install:migrations
$ rake db:migrate

To use a separate database connection, specify it in an initializer:

Delayed::Backend::ActiveRecord::Job.establish_connection(my_db_queue_config)

When upgrading inst-jobs, make sure to run rake delayed_engine:install:migrations again to add any new migrations.

The ActiveRecord backend only supports PostgreSQL.

Worker Configuration

Worker and queue information defaults to read from config/delayed_jobs.yml, this can be overridden using the --config option from the command line.

development:
  workers:
  - workers: 2

production:
  workers:
  - workers: 10

An initializer can also be used to set preferred values for any settings that control specific interal delayed job behavior:

Delayed::Settings.max_attempts              = 1
Delayed::Settings.queue                     = "canvas_queue"
Delayed::Settings.sleep_delay               = ->{ 2.0 }

You can find a list of available settings in lib/delayed_job/settings.rb.

Usage

Signal Handling

Inst-jobs makes an attempt at being well behaved with respect to how child processes are handled. When the pool receives SIGQUIT it will pass that on and wait Settings.slow_exit_timeout seconds (default 20) for all children to finish their currently active task and exit. If they take longer than this a SIGTERM will be sent telling them to clean up and bail quickly, if that doesn't happen within 2 seconds SIGKILL is then sent. This graceful exit can be expedited by sending SIGTERM/SIGINT to the pool, this will still allow the slow_exit_timeout period for the workers to exit but they should exit almost immediately.

The old behavior of the pool exiting and leaving the child processes orphaned can be preserved by setting kill_workers_on_exit to false. This will cause the first signal sent to the pool to be propagated to all of the child processes after which the pool will exit.

Running Workers

$ inst_jobs # display help
$ inst_jobs start # start a worker in the background
$ inst_jobs run # start a worker in the foreground

Queueing Jobs

In the simplest form, this means just calling delay on any object before calling the actual method:

@user.delay.activate!

If a method should always be run in the background, you can call #handle_asynchronously after the method declaration:

class Device
  def deliver
    # long running method
  end
  handle_asynchronously :deliver
end

device = Device.new
device.deliver

Job Parameters

To pass parameters to the jobs engine, send them to the delay method:

@user.delay(max_attempts: 1, priority: 50).activate!(other_user)

handle_asynchronously and delay take these parameters:

  • :priority (number): lower numbers run first; default is 10 but can be reconfigured.
  • :run_at (Time): run the job on or after this time; default is now.
  • :queue (string): named queue to put this job in, if using separate queues.
  • :max_attempts (number): the max number of attempts to make before permanently failing the job; default is 1. For a job to be retried, it should raise an error. By default, an exponential backoff algorithm is used to determine the run_at for retried jobs (5 + attempt**4 seconds later).
  • :strand (string): strand to assign this job to; default is not to assign to a strand.
  • :n_strand (string): n_strand to assign this job to; default is none.
  • :singleton (string): singleton to assign this job to; default is none.
  • :on_conflict (:use_earliest|:overwrite|:loose): option for how to handle the new job if a singleton[#singleton-jobs] job of the same type already exists.
  • :on_failure (symbol): method name to call on failure. The method should accept a single argument (the error). Note that if max_attempts is greater than 1, this method will be invoked each time the job fails, prior to rescheduling. Also note that upon permanent failure (see :max_attempts for definition of 'permanent failure'), both the :on_failure and :on_permanent_failure methods will be invoked.
  • :on_permanent_failure (symbol): method name to call on permanent failure (see :max_attempts for definition of 'permanent failure'). The method should accept a single argument (the error).

Features

Strands

A strand is a set of jobs that must be run in queue order. When a job is assigned to a strand, it will not start running until all previous jobs assigned to that strand have either completed or failed permanently. This is very useful when you have sequences of jobs that need to run in order.

An example use case is the "ZIP file import" functionality in Canvas LMS. Each job queued up processes an uploaded ZIP file and updates the specified course's files. It's important to make sure that only one import job is ever running for a course, but we don't want to globally serialize the ZIP imports, we only want to serialize them per-course.

Strands make this simple. We simply use the course's unique identifier as part of the strand name, and we get the desired behavior. The (simplified) code is:

zip_file_import.delay(strand: "zip_file_import:#{course.uuid}").process

Strand names are just freeform strings, and don't need to be created in advance. The system is designed to perform well with any number of active strands.

N Strands

Strands are also useful when not required for correctness, but to avoid one particular set of jobs monopolizing too many job workers. This can also be done by using a different :queue parameter for the jobs, and setting up a separate pool of workers for that queue. But this is often overkill, and can result in wasted, idle workers for less-frequent jobs.

Another option is to use the n_strand parameter. This uses the same strand functionality to cap the number of jobs that can run in parallel for the specified n_strand. The limit can be changed at runtime, as well.

# The given proc will be called each time inst-jobs queues an n_strand job, to
# determine how many jobs with this strand will be allowed to run in parallel.
Delayed::Settings.num_strands = proc do |strand_name|
  if strand_name == "external_api_call"
    3
  else
    1
  end
end

my_api.delay(n_strand: "external_api_call").make_call

Singleton Jobs

Singleton jobs don't queue another job if a job is already queued with the given strand name:

# If a job is already queued on the strand with this name, this job will not be
# queued. It doesn't matter if previous jobs were queued on this strand but have
# already completed, it only matters what is currently on the queue.
grader.delay(singleton: "grade_student:#{student.uuid}").grade_student

If a job is currently running, it doesn't count as being in the queue for the purposes of singleton jobs. This is usually the desired behavior to avoid race conditions.

You can also pass an on_conflict option. The default of :use_earliest means that the queued job will be updated to the earliest run_at of the existing and the new job. Assuming you're using the default run_at of now, that means the new job will simply be dropped. It can also be used if you run the singleton on a schedule (like a periodic job), but occasionally want it to run now.

The second option is :overwrite and will always update the pending job to use the run_at of the new job. This is useful for "debouncing" - you have some cleanup that needs to run after a trigger action, but there are many of that trigger action and it's not useful to run the single cleanup job until the trigger action calms down. This is also useful if the arguments to the job might change, and you want it to run with the latest version of those arguments.

The last option is :loose, which will drop the new job if there is already a job queued.

After enqueuing a job, the job object will have a hint indicating if a new job was inserted, an existing job was updated, or the new job was dropped:

  • :inserted - a new job was inserted into the queue
  • :updated - an existing job was updated (both :use_earliest and :overwrite can update the existing job's run_at, but only :overwrite will update the handler)
  • :dropped - the new job was dropped due to an existing job in the queue

Mixing N-Strand and Singleton

It is allowed to use both n_strand and singleton at the same time. This could be useful if you want to throttle the number of jobs hitting an external service, but also only allow one instance of the job along another axis. For example, a job to refresh the state of a particular user with an external service:

user.delay(singleton: "service_update:#{user.id}", n_strand: "service_updates").update_service_data

The rules for singletons and N-strands are intersected - a job is ready to run only if there are no more than max_concurrent running for the strand, and if there is not another job running for its singleton. In a strand with three jobs, and max concurrency of two, if the first two jobs are on one singleton, but the third is on another, the third job will run, even though the second one cannot yet (due to waiting for the first job on the same singleton to complete).

Periodic Jobs

Periodic jobs are a reliable, distributed way of running recurring tasks, in other words "distributed fault-tolerant Cron".

Periodic jobs need to be configured during application startup, so that workers have access to the schedules. For instance, in a Rails app it's suggested to create a config/initializers/periodic_jobs.rb file:

# The first argument is a name tag for the job, and must be unique. The 2nd
# argument is the run schedule, in Cron syntax.
Delayed::Periodic.cron 'My Periodic Task', '30 11 * * *' do
  MyApp::SomeTask.run()
end

Periodic Jobs are queued just like normal jobs, and run by your same pool of workers. Jobs are configured with max_attempts set to 1, so if the job fails, it will not run again until the next scheduled interval.

This works by storing a registry of periodic jobs with their intervals, enqueueing each job immediately for the next time it should be run at, and then having an extra step after the job is performed that re-enqueues it for the NEXT time it will run. It's expected that every periodic job will be in the queue all the time (either executing or queued for the next time it will execute).

Periodic jobs are singletons (see docs above on singleton jobs).

Lifecycle Events

There are several callbacks you can hook into from outside the library, find them at the top of the "lifecycle.rb" class.

To hook into a callback, write something that looks like this in an initializer:

Delayed::Worker.lifecycle.before(:error) do |worker, exception|
  ErrorThingy.notify(exception)
end

:error

runs whenever a job throws an exception during invocation

:retry

runs when a job throws a Delayed::RetriableError during invocation (usually within the rescue block of an "expected" exception so the proximate cause is the inner exception). If the job CAN retry/reschedule, it will attempt to do so, otherwise it will run the :error callback instead. Think of this as a way to handle infrastructural or other "transient" exceptions without firing your error notification process.

Work Queue

By default, each Worker process will independently query and lock jobs in the queue. There is an experimental ParentProcess WorkQueue implementation that has each Worker on a server communicate to a separate process on the server that centrally handles querying and locking jobs. This can be enabled in the yml config:

production:
  work_queue: parent_process

This will cut down on DB lock contention drastically, at the cost of potentially taking a bit longer to find new jobs. It also enables another lifecycle callback that can be used by plugins for added functionality. This may become the default or only implementation, eventually.

Sentry Error Reporting

The standard delayed_job integration will work with inst-jobs as well. Just add in an initializer along with your other raven-ruby configuration:

require 'raven/integrations/delayed_job'

Worker Health Checking and Unlocking Orphaned Jobs

Occasionally a worker will unexpectedly terminate without being allowed to run any cleanup code, when this happens it causes any jobs that process had locked to remain locked indefinitely. To alleviate this each worker can register itself with a locally running consul agent which will watch that each process is still alive, when a process is found to be dead it will automatically be deregistered from the agent causing another process to come along and reschedule the locked job.

Configuring the Consul health check

In order to use the Consul health check you must include the diplomat gem, version 2.5.x, in your application's Gemfile. It is not included in the default dependencies because it is an optional feature.

# Enable the consul health check
Delayed::Settings.worker_health_check_type = :consul

# Configure the health check
Delayed::Settings.worker_health_check_config = {
  service_name: 'canvas-worker', # Optional, defaults to 'inst-jobs_worker'
  check_interval: '7m', # Optional, defaults to 5m
}

# allow the worker pool to fork off a process
# periodically to kill any jobs that are failing their
# health checks
Delayed::Settings.disable_abandoned_job_cleanup = false

Testing

To write tests that interact with inst-jobs, you'll need to configure an actual database.

Locally

By default, if you have postgres running on its default ports, and if you have run:

$> createdb inst-jobs-test-1

Then you should be able to run the tests that come with the library with:

$> bundle exec rspec spec

In Docker

Alternatively, if you have docker-compose set up, you can run the CI build, which spins up the necessary services in docker:

$> ./build.sh

Running individual tests in Docker

First, you'll want a persistent gems volume, which you can get by:

$> cp docker-compose.override.yml.example docker-compose.override.yml

Then you can install bundler and gems, which you'll want to do in your ruby version of choice:

$> docker-compose run --rm app bash -lc "bundle install"

Now, to run an individual spec:

$> docker-compose run --rm app bash -lc "bundle exec rspec spec/delayed/worker_spec.rb"

You can also run the whole suite, but under just one rvm context, with:

$> docker-compose run --rm app bash -lc "bundle exec rake spec"

Writing Tests

There are a few basic testing helpers available:

require 'delayed/testing'

Delayed::Testing.drain # run all queued jobs
Delayed::Testing.run_job(job) # run a single job

before(:each) do
  Delayed::Testing.clear_all! # delete all queued jobs
end

Web UI

inst-jobs has a built-in web ui that allows users to view running jobs. Optionally, this web ui can support basic job management as well (hold, unhold, and delete operations are supported). To enable this feature, pass a hash containing update: true into the Delayed::Server constructor. You probably want to ensure that the jobs endpoint requires authentication before enabling this feature.

For Rails Apps

To use the web UI in your existing Rails application there are two options, first "The Rails Way" as shown just below this text or the Rack way shown at the very end of this section.

For "The Rails Way" to work there are two changes that need to be made to your application.

First you'll need to add Sinatra and sinatra-contrib to your Gemfile (these dependencies are excluded from the default list so those who aren't using this feature don't get the extra gems). For Rails 5.x applications, you'll need to use Sinatra 2.x, which is in beta at the time of this writing.

Second, you'll need to add the following to your routes file:

require 'delayed/server'

Rails.application.routes.draw do
  # The delayed jobs server can mounted at any route you desire, delayed_jobs is
  # just for this example
  mount Delayed::Server.new => '/delayed_jobs'
end

Additionally, if you wish to restrict who has access to this route it is recommended that users wrap this route in a constraint.

For Rack and Sinatra Apps

To use the web UI in your Rack app you can simply mount the app just like any other Rack app in your config.ru file:

require 'delayed/server'

# The delayed jobs server can mounted at any route you desire, delayed_jobs is
# just for this example
map '/delayed_jobs' do
  run Delayed::Server.new
end

run MyApp

Contributing

  1. Fork it ( https://github.com/instructure/inst-jobs/fork )
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create a new Pull Request

Publishing

Ready to release a new version of inst-jobs? Make sure you're an owner (https://rubygems.org/gems/inst-jobs)

If your rubygems credentials are already set in ~/.gem/credentials, you can just run the release task: bundle exec rake release

If they are not, you can do this manually for now, and it will cache your credentials as part of the process:

bundle exec rake build
# -> inst-jobs VERSION built to pkg/inst-jobs-VERSION.gem
gem push pkg/inst-jobs-VERSION.gem
# -> follow prompts to enter your login information

Future releases you can now just use the release rake task, although if you have MFA enabled (and you should!) and your MFA valid period expires, you'll have to do the gem push manually to enter a new MFA code.