No release in over 3 years
Low commit activity in last 3 years
Sidekiq extension providing a workflow API on top of Sidekiq Pro's batches
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
 Dependencies

Development

~> 5.0
~> 1.3
~> 0.11.3
~> 12.0

Runtime

 Project Readme

sidekiq_workflows Github Actions Gem Version

Sidekiq extension providing a workflow API on top of Sidekiq Pro's batches. To use this gem, you need a Sidekiq Pro license, and provide the credentials to the gems.contribsys.com repository via bundler:

bundle config gems.contribsys.com username:password

or alternatively export BUNDLE_GEMS__CONTRIBSYS__COM=username:password

Rationale

While Sidekiq Pro's batches are powerful, only a rather low level API is provided to work with them. Take this example:

https://github.com/mperham/sidekiq/wiki/Really-Complex-Workflows-with-Batches

This is a lot of complex code scattered in various callbacks to enable a straightforward workflow. It is easy making mistakes when writing such code, and it's also hard to debug. This gem provides an API to define a workflow in a single place, abstracting the Batch API away.

Version 1 (breaking changes)

Since version 1.0, retries are supported. Instead of the on_complete callback, success and death events are now used. The documentation has been updated to reflect the new interface.

Usage

require 'sidekiq_workflows'

Defining a workflow

A workflow consists of Sidekiq workers which can execute in parallel. On successful completion (all workers within a group have completed successfully), a follow-up group of workers can be launched. If a worker within a group dies, the follow-up group will not be started. Retries are supported, please make sure that the Sidekiq workers being used have retries enabled and dead set to true (sidekiq_options retry: 0 if you don't want any retries).

class A; include Sidekiq::Worker; def perform(x); end; end
class B; include Sidekiq::Worker; def perform(x, y); end; end
class C; include Sidekiq::Worker; def perform(x, y, z); end; end
class D; include Sidekiq::Worker; def perform(x); end; end
class E; include Sidekiq::Worker; def perform(x); end; end
class F; include Sidekiq::Worker; def perform; end; end

workflow = SidekiqWorkflows.build do
  perform(A, 'first param to perform')
  perform(B, 'first', 'second').then do
    perform(C, 'first', 'second', 'third')
    perform([
      {worker: D, payload: ['first']},
      {worker: E, payload: ['first']}
    ]).then do
      perform(F)
    end
  end
end

A and B run in parallel. As soon as B completes successfully, C, D and E will be launched, running in parallel as well. At last, when D and E both complete successfully, F will be launched.

Workflow example diagram

Additional parameters

SidekiqWorkflows.build can take some additional parameters:

  • workflow_uuid: To identify this workflow instance, you may want to provide an ID.
  • except: An array of worker classes to be entirely skipped in this workflow instance.
  • on_partial_complete: A callback that is being called whenever a group of workers within the workflow has completed (either by success or death). Modifying the example above:
class WorkflowCallbacks; def on_partial_complete(status, options); end; end

workflow = SidekiqWorkflows.build(on_partial_complete: 'WorkflowCallbacks#on_partial_complete') do
  ...
end

This is especially useful if you want to report progress of the workflow to a client (for example, send a notification). When using the example above, the callback will be called 5 times in total (for A, B, C, [D, E], F). The status hash contains the workflow_uuid if present. For more details on status and options, see: https://github.com/mperham/sidekiq/wiki/Batches#callbacks

There's a an additional parameter to perform as well:

  • delay: Wait an amount of time (to_i must yield seconds as an integer) before launching this worker. This uses Sidekiq's perform_in under the hood.

Modifying the example above:

perform(C, 'first', 'second', 'third', delay: 5.minutes)

After the successful completion of B, C will be scheduled for execution at 5 minutes in the future.

Generally, perform can be used in two ways:

  • perform(C, 'first', 'second', 'third', delay: 5.minutes)
  • perform([worker: C, payload: ['first', 'second', 'third'], delay: 5.minutes])

The second form can be used to perform multiple workers within a single batch.

Launching a workflow

Once defined, you can launch a workflow like this:

batch_id = SidekiqWorkflows::Worker.perform_workflow(workflow)

This method returns a Sidekiq Pro batch ID. This batch represents the workflow.

Additional parameters

SidekiqWorkflows::Worker.perform_workflow can take some additional parameters:

  • on_success: A callback that is being called once, when the workflow has successfully completed.
  • on_success_options: A hash of key/value options which will be part of the options hash of the callback.
  • on_death: A callback that is being called once, when the workflow has 'died' (that is, retries have been exhausted for at least one worker in the hierarchy).
  • on_death_options: A hash of key/value options which will be part of the options hash of the callback.
class WorkflowCallbacks; def on_success(status, options); end; end

SidekiqWorkflows::Worker.perform_workflow(workflow, on_success: 'WorkflowCallbacks#on_success', on_success_options: {stuff: 1})

If workflow_uuid has been passed into SidekiqWorkflows.build, it will also be present inside the options hash.

Configuration

There is some additional configuration options.

SidekiqWorkflows.worker_queue = 'some_queue'
SidekiqWorkflows.callback_queue = 'another_queue'

worker_queue is the name of the Sidekiq queue which will be used for the gem's own meta worker. This worker usually has a execution time of only a few milliseconds, so you may want to use an appropriate queue for that.

callback_queue is the name of the Sidekiq queue which will be used for the on_partial_complete, on_success and on_death callback workers.

If not specified, the default Sidekiq queue will be used.

Development

Run tests locally

  1. CONTRIBSYS_CREDENTIALS=your-secret-sidekiq-pro-creds docker-compose build
  2. docker-compose run --rm sidekiq-workflows-test bundle exec rake test