Project

exekutor

0.0
No release in over a year
PostgreSQL backed active job adapter which uses `LISTEN/NOTIFY` to listen for jobs and `FOR UPDATE SKIP LOCKED` to reserve jobs.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
 Dependencies

Development

Runtime

>= 6.0, < 8.0
>= 6.0, < 8.0
~> 2.0
>= 6.0, < 8.0
~> 3.0
 Project Readme

Gem GitHub Workflow Status

Logo

Say hello to Exekutor

Exekutor is a PostgreSQL backed active job adapter, which uses powerful PostgreSQL features for low latency and efficient locking;

Features

  • Designed for active job;
  • Multithreaded job execution using Concurrent Ruby;
  • Uses LISTEN/NOTIFY to listen for jobs and FOR UPDATE SKIP LOCKED to reserve jobs;
  • Custom job options to limit execution time and prevent execution of stale jobs;
  • An Asynchronous module to execute plain ruby methods using active job;
  • Hooks to integrate your error monitoring system;
  • An HTTP status server.

Installation and set up

In a nutshell:

  • Install the exekutor gem;
  • Run rails g exekutor:install and rails db:migrate;
  • Configure Rails to use Exekutor;
  • Start your worker and queue your jobs.

Read the Getting started guide for the detailed documentation.

Configuration

Exekutor can be configured in multiple ways: using code, a yaml file, and command line options.

Code

Most of the configuration options can be configured from the initializer file. This file will be generated for you when you run rails g exekutor:install.

Default queue priority

The default priority for jobs without an explicitly specified priority. The valid range for a job priority is between 1 (highest priority) and 32,767 (lowest priority).

Exekutor.config.default_queue_priority = 16_383

Base record class name

The base class for the active record models of Exekutor.

Exekutor.config.base_record_class_name = "ActiveRecord::Base"

JSON serializer

The JSON serializer to use.

Exekutor.config.json_serializer = JSON

Logger

The logger to use.

Exekutor.config.logger = Rails.active_job.logger

Set DB connection name

Whether the listener should set the DB connection name. When Exekutor is started using the CLI, this option also configures whether to name the other DB connections used by the worker.

Exekutor.config.set_db_connection_name = false # (true for the CLI)

Enable listener

Whether to use the listener. You can set this option to false and decrease the polling interval to make Exekutor work like an old-fashioned polling worker. This is necessary if you have a tool like PgBouncer which does not allow long running connections.

Exekutor.config.enable_listener = true

Polling interval

The polling interval in seconds. Exekutor polls for jobs every 60 seconds by default to check for jobs that the listener might have missed.

Exekutor.config.polling_interval = 60.seconds

Polling jitter

Sets a "jitter" for this polling interval so all worker don't hit the database at the same time if they were started at the same time. A value of 0.1 means the polling interval can deviate up to 10%, from 5% sooner to 5% later.

For example: With a polling interval of 60 and a jitter of 0.1, the actual polling interval can range from 57 to 63 seconds.

Exekutor.config.polling_jitter = 0.1

Minimum execution threads

The minimum number of threads to keep active for executing jobs.

Exekutor.config.min_execution_threads = 1

Maximum execution threads

The maximum number of threads that may be active to execute jobs. By default, Exekutor uses your database connection pool size minus 1. Be aware that if you set this to a value greater than connection_db_config.pool, workers may have to wait for database connections to become available because all connections are occupied by other threads. This may result in an ActiveRecord::ConnectionTimeoutError if the thread has to wait too long.

Exekutor.config.max_execution_threads = 10

Maximum execution thread idletime

The number of seconds that an execution thread may be idle before being reclaimed.

Exekutor.config.max_execution_thread_idletime = 60.seconds

Status server handler

The Rack handler to use for the status server

Exekutor.config.status_handler = "webrick"

Healthcheck timeout

The timeout in minutes after which the status server deems a worker to be down. The worker updates a heartbeat every time it finishes a job and after polling for jobs. This heartbeat is used to check whether the worker is still executing jobs. This means that the timeout should be longer than the execution time of your jobs.

Exekutor.config.healthcheck_timeout = 30.minutes

Quiet

Whether to suppress the logger output to just the errors.

Exekutor.config.quiet = false

YAML

When starting a worker from the CLI, a number of configuration options can be overridden using a YAML configuration file.

Queues

The queues this worker should perform jobs out of.

exekutor:
  queues: [ "queues", "to", "watch" ]

Job priorities

The minimum and maximum job priority to perform

exekutor:
  min_priority: 100
  max_priority: 200

Json serializer

The JSON serializer to use for deserializing jobs by this worker.

exekutor:
  json_serializer: "Oj"

Set db connection name

Whether to set the application name of the DB connections for this worker.

exekutor:
  set_db_connection_name: true

Enable listener

Whether to enable the listener for this worker.

exekutor:
  enable_listener: true

Polling interval / jitter

The polling interval (in seconds) and jitter for this worker.

exekutor:
  polling_interval: 60
  polling_jitter: 0.1

Execution thread options

The minimum and maximum threads this worker should spawn and the thread idletime (in seconds) for reclaiming threads.

exekutor:
  min_execution_threads: 1
  max_execution_threads: 14
  max_execution_thread_idletime: 60

Status server options

The status server handler, the port to use, and the worker timeout (in minutes).

exekutor:
  status_server_port: 10100
  status_server_handler: webrick
  healthcheck_timeout: 60

Quiet

Whether to suppress log output to just the errors.

exekutor:
  quiet: true

Wait for termination

Whether and how long to wait for the execution threads to finish upon exit.

  • If the value is false or nil, the worker will not wait for the execution threads to finish but will not kill the threads either;
  • If the value is zero, the worker will kill the execution threads immediately upon exit;
  • If the value is a positive number, the worker will wait for the indicated amount of seconds to let the execution threads finish and will kill the threads if the timeout is exceeded.
  • Otherwise the worker will wait for the execution threads to finish indefinitely.
exekutor:
  wait_for_termination: 120

Command line options

A small number of options are also configurable through the command line. The command line options override the values set from the YAML and initializer files.

Queues

The queues this worker should perform jobs out of. The option can be specified mulitple time to indicate multiple queues.

exekutor start --queue queue --queue another_queue --queue third_queue

Job priorities

The minimum and maximum job priority to perform, specified as min:max. If only 1 value is specified, it will be used as a minimum

exekutor start --priority 100:200

Polling interval

The polling interval in seconds.

exekutor start --poll_interval 90

Number of execution threads

The minimum and maximum number of execution threads, specified as min:max. If only 1 value is specified, the thread pool will have a fixed size.

exekutor start --threads 10:20

Command line interface

Start

Starts a worker.

exekutor start [options]

Options

  • --help – Show the help message.
  • --identifier=arg – The identifier for this worker. This identifier is shown in the process name and the connection name.
  • --pidfile=path – The path to the PID file for a daemonized worker.
  • --configfile=path – The path to the YAML configfile.
  • --daemonize – Whether to daemonize the worker.
  • --env – The Rails environment to load.
  • --poll_interval – The poll interval for this worker.
  • --max_threads – The maximum execution threads for this worker.
  • --queue – The queue to work off. Can be specified multiple times.

Stop

Stops a daemonized worker.

exekutor stop [options]

Options

  • --identifier=arg – The identifier of the worker to stop. (translates to --pidfile=tmp/pids/exekutor.%{identifier}.pid)
  • --pidfile=path – The path to the PID file of the worker to stop.
  • --all – Stops all daemonized workers with default pidfiles (ie. tmp/pids/exekutor*.pid). You can use pidfile option to use a custom pidfile pattern.
  • --shutdown_timeout=int – The amount of seconds to wait before killing a worker process.

Restart

Restarts a daemonized worker with the specified options.

exekutor restart [options]

Options

See start.

Exekutor will not remember the original start options, they have to be fully specified again when you restart a worker.

  • --shutdown_timeout=int – The amount of seconds to wait before killing a worker process.

Info

Prints info about the active workers and pending jobs.

exekutor info [options]

Options

  • --environment – The Rails environment to load.

Cleanup

Cleans up finished jobs and/or stale workers

exekutor cleanup [all|jobs|workers] [options]

Options

  • --environment=arg – The Rails environment to load.
  • --job_status=arg – The statuses to purge.
  • --timeout=int – The timeout in hours. Workers and jobs before the timeout will be purged.
  • --job_timeout=int – The job timeout in hours (overrides --timeout). Jobs where scheduled_at is before the timeout will be purged.
  • --worker_timeout=int – The worker timeout in hours (overrides --timeout). Workers where the last heartbeat is before the timeout will be purged.

Job options

You can include the Exekutor::JobObtions mixin into your active job class to use custom job options.

Execution timeout

Limit the execution time of your job.

Be aware that Timeout::timeout is used internally for this, which can raise an error at any line of code in your application. Use with caution

class MyJob < ActiveJob::Base
  include Exekutor::JobOptions
  exekutor_options execution_timeout: 10.seconds
end

# Or per job
MyJob.set(execution_timeout: 1.minute).perform_later

Queue timeout

When a queue timeout is specified, Exekutor will not execute or job if it has been in the queue for longer than the timeout.

class MyJob < ActiveJob::Base
  include Exekutor::JobOptions
  exekutor_options queue_timeout: 1.hour
end

# Or per job
MyJob.set(queue_timeout: 15.minutes).perform_later

Asynchronous methods

Include the Exekutor::Asynchronous mixin in any class to make one or more of its methods be executed asynchronously through active job.

class MyRecord < ActiveRecord::Base
  include Exekutor::Asynchronous

  def method(arg1, arg2)
    puts "arg1: #{arg1.inspect}; arg2: #{arg2.inspect}"
  end

  perform_asynchronously :method

  def self.class_method(arg1, arg2)
    puts "arg1: #{arg1.inspect}; arg2: #{arg2.inspect}"
  end

  perform_asynchronously :class_method, class_method: true
end

Caveats

Method arguments

Exekutor can only perform methods asynchronously if all the arguments can be serialized by active job. See the active job documentation for the supported arguments.

Executing instance methods

Exekutor can only perform instance methods asynchronously if the class instance can be serialized by active job. In practice, this means that you can only do this on active record models because they are serialized to a GlobalID. If you want to use this mixin on another class, you'll have to write your own active job serializer.

Hooks

You can register hooks to be called for certain lifecycle events in Exekutor. These hooks work similar to ActiveSupport::Callbacks.

class MyHook
  include Exekutor::Hook

  around_job_execution :instrument
  after_job_failure { |_job, error| report_error error }
  after_fatal_error :report_error

  def instrument(job)
    ErrorMonitoring.monitor_transaction(job) { yield }
  end

  def report_error(error)
    ErrorMonitoring.report error
  end
end

Hook types

  • before_enqueue – Called before a job is enqueued. Receives the job as an argument.
  • around_enqueue – Called when a job is enqueued, yield must be called to propagate the call. Receives the job as an argument.
  • after_enqueue – Called after a job is enqueued. Receives the job as an argument.
  • before_job_execution – Called before a job is executed. Receives a Hash with job info as an argument.
  • around_job_execution – Called when a job is executed, yield must be called to propagate the call. Receives a Hash with job info as an argument.
  • after_job_execution – Called after a job is executed. Receives a Hash with job info as an argument.
  • on_job_failure – Called after a job has raised an error. Receives a Hash with job info and the raised error as arguments.
  • on_fatal_error – Called after an error was raised outside job execution. Receives the raised error as an argument.
  • before_startup – Called before starting up a worker. Receives the worker as an argument.
  • after_startup – Called after a worker has started up. Receives the worker as an argument.
  • before_shutdown – Called before shutting down a worker. Receives the worker as an argument.
  • after_shutdown – Called after a worker has shutdown. Receives the worker as an argument.

The job execution hooks

The job execution hooks receive a Hash with job info instead of a active job instance. This has contains the following values:

  • id – The Exekutor id of the job.
  • options – The custom Exekutor options for this job.
  • payload – The active job payload for this job.
  • scheduled_at – The time this job was meant to be executed.

Running a worker from Ruby

You can also start a worker from Ruby code:

Exekutor::Worker.start(worker_options)

Options

  • :identifier – the identifier for this worker
  • :queues – the queues to work on
  • :min_priority – the minimum job priority to execute
  • :max_priority – the maximum job priority to execute
  • :enable_listener – whether to enable the listener
  • :min_threads – the minimum number of execution threads that should be active
  • :max_threads – the maximum number of execution threads that may be active
  • :max_thread_idletime – the maximum duration a thread may be idle before being stopped
  • :polling_interval – the polling interval
  • :poling_jitter – the polling jitter
  • :set_db_connection_name – whether the DB connection name should be set
  • :wait_for_termination – how long the worker should wait on jobs to be completed before exiting
  • :status_server_port – the port to run the status server on
  • :status_server_handler – The name of the rack handler to use for the status server
  • :healthcheck_timeout – The timeout of a worker before the status server deems it as down

The default values for most of the options can be fetched by:

Exekutor.config.worker_options # => { enable_listener: … } 

Methods

Start

Starts the worker in the background. The method will return immediately after startup.

worker = Worker.new(options)
worker.start

# or

worker = Worker.start(options)

Stop

Stops the worker. This method may block until the worker has finished its jobs depending wait_for_termination option.

worker.stop

Kill

Kills the worker. This method cancels job execution and return the jobs back to the pending state. This method does not invoke the shutdown hooks.

worker.kill

Join

Joins the current thread with the worker thread. This method blocks until the worker shuts down.

worker.join

Cleanup

You can clean up the jobs from the CLI and from Ruby:

See Command line interface for the CLI options.

cleanup = Exekutor::Cleanup.new
cleanup.cleanup_workers(options)
cleanup.cleanup_jos(options)

Cleanup workers

Cleans up stale workers. Worker records should be automatically purged upon shutdown, but when for example the DB connection is down or when a worker is killed the records might be left behind.

Exekutor has a DB trigger to automatically release unfinished jobs when a worker record is deleted. This means that stale worker records can lock jobs without them being executed. Cleaning up these records will release these jobs so they can be performed by other workers.

Options

  • :timeout – The timeout for worker heartbeats. Workers where the last heartbeat is before the timeout will be purged. Make sure the timeout is not shorter than the execution time of your jobs.

Cleanup jobs

Exekutor does not delete jobs after they are finished. This means the jobs table will fill up with finished jobs, which will slow down your table. Regularly purging these jobs will make sure your jobs table will remain blazing fast.

Options

  • :timeout – The timeout for jobs. Jobs where scheduled_at is before the timeout will be purged.
  • :status – The job statuses to purgs. Only jobs with the specified status will be purged.

Deployment

When deploying on a server, use a process monitoring tool like Eye, Bluepill, or God to manage your workers in a production environment. This will ensure your workers will be kept active.

Error reporting

Use a hook to report any failed jobs and low level errors to your favorite error monitoring tool. If you want to add your monitoring tool of choice as a plugin, feel free to open a PR!

There is only 1 error monitoring plugin for now: Appsignal

Exekutor.load_plugin :appsignal

Status server

Use the status server to check if your worker is running by curl localhost:[port]/ready or …/live.

The ready endpoint checks if the worker is running and if the database connection is active. The live endpoint checks if the worker is running and is active by looking at the worker heartbeat.

$ curl localhost:9000/ready
[OK] ID: f1a2ee6a-cdac-459c-a4b8-de7c6a8bbae6; State: started

The …/threads endpoint reports on the usage of the thread-pool and can be used for autoscaling.

$ curl localhost:9000/threads
{"minimum":1,"maximum":10,"available":4,"usage_percent":60.0}

Caveats

No run-once guarantee

Make your jobs idempotent

Although Exekutor does it's best to execute a job only once, there is no guarantee this actually happens. If the database connection is lost while executing a job, a worker cannot mark the job as completed. While the connection is down, the worker keeps track of which jobs were finished and mark them as such as soon as the database connection comes back up.

If the worker is stopped or killed before this happens, the job will be stuck in the executing status (and the worker record will become stale). The cleanup task will purge any stale workers and release the jobs, after which these jobs will be executed again by another worker.

This means you have to design your jobs to be idempotent: executing it multiple times should have the same effect. It also means that it's not wise to shutdown your workers without an active database connection.

Development

After checking out the repo, run bin/setup to install dependencies. Then, run rake test to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and the created tag, and push the .gem file to rubygems.org.

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/devdicated/exekutor. This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the code of conduct.

License

The gem is available as open source under the terms of the MIT License.

Code of Conduct

Everyone interacting in the Exekutor project's codebases, issue trackers, chat rooms and mailing lists is expected to follow the code of conduct.