Conflow
Conflow allows defining complicated workflows with dependencies. Inspired by Gush (the idea) and Redis::Objects (the implementation) it focuses solely on dependency logic, while leaving queueing jobs and executing them entirely in hands of the programmer.
Please have a look at Gush
if you already use Rails and ActiveJob - it might suit your needs better.
Installation
Add this line to your application's Gemfile:
gem "conflow"
And then execute:
$ bundle
Or install it yourself as:
$ gem install conflow
Usage
Configuration
Redis connection
To configure Redis connection, set Conflow.redis
attribute to a Redis
or ConnectionPool
instance.
Conflow.redis = Redis.new(host: "127.0.0.1", port: 6379)
# or
require "connection_pool"
Conflow.redis = ConnectionPool.new(size: 5, timeout: 5) { Redis.new(host: "127.0.0.1", port: 6379) }
Redis script caching
By default, gem caches it's scripts in Redis server. To disable this behaviour, set cache_scripts
to false:
Conflow::Redis::Scripts.cache_scripts = false
Defining flows
In order to define a flow, first you need to supply a way to enqueue jobs.
Conflow
does not make any assumptions about this process - you can enqueue Sidekiq job, send a RabbitMQ event or send an email to a Very Important Person with flow ID and job ID.
class ApplicationFlow < Conflow::Flow
def queue(job)
Sidekiq::Client.enqueue(FlowWorkerJob, id, job.id)
end
end
id
(Conflow::Flow#id
) and job.id
(Conflow::Job#id
) is enough to identify job and execute it properly. Make sure that you send both of these values and it will be OK.
You can define actual jobs to be performed using #configure
method:
class MyFlow < ApplicationFlow
def configure(id:, strict:)
run UpsertJob, params: { id: id }
run CheckerJob, params: { id: id }, after: UpsertJob if strict
end
end
To create flow, use .create
method:
MyFlow.create(id: 320, strict: false)
MyFlow.create(id: 15, strict: true)
Dependencies
You can use after
option to define dependencies. after
accepts a Class
, Conflow::Job
instance or Integer
with id of the job - or an array with any combination of these.
class MyFlow < ApplicationFlow
def configure
first = run FirstJob
independent = run IndependentJob
run SecondJob, after: [FirstJob, independent]
run FinishUp, after: SecondJob
end
end
Promises
In order to use other Job's result as parameter of another job, use Futures:
class MyFlow < ApplicationFlow
def configure
first = run FirstJob
run SecondJob, params: { object_id: first.outcome[:id] }
end
end
Note that SecondJob
will automatically depend on FirstJob
. When FirstJob
finishes, it is expected to return hash: { id: "<some object>" }
.
Returned object must be serializable with JSON in order to be properly persisted and handled by Redis script which resolves promises.
If FirstJob
returns { id: 14 }
, SecondJob
will be run with { object_id: 14 }
parameter.
Performing jobs
To perform job, use Conflow::Worker
mixin. It adds #perform
method, which accepts two arguments: IDs of the flow and the job.
Simple Conflow::Worker
that is also Sidekiq::Worker
:
class FlowWorkerJob
include Conflow::Worker
include Sidekiq::Worker # order is important!
def perform(flow_id, job_id)
super do |worker_class, params|
worker_class.new(params).call
end
end
end
For previously defined flow, executing this flow would result in:
FirstJob.new({}).call
IndependentJob.new({}).call # order of the first two is not defined
SecondJob.new({}).call
FinishUp.new({}).call
Theory
The main idea of the gem is, obviously, a directed graph containing information about dependencies. It is stored in Redis in following fields:
-
conflow:job:<id>:successors
- (List) containing IDs of jobs which depend on<id>
-
conflow:flow:<id>:indegee
- (Sorted Set) set of all unqueued jobs with score representing how many dependencies are not yet fulfilled
There are three main actions that can be performed on this graph (Redis-wise):
- Queue jobs
Removes all jobs with score 0 from
:indegree
set - Complete job Decrement scores of all of the job's successors by one
- Add job
Add job ID to
:successors
list for all jobs on which it depends and add job itself to:indegree
set
All of these actions are performed via eval
/evalsha
- it lifts problems with synchronization (as scripts are executed as if in transaction) and significantly reduces amount of requests made to Redis.
Development
After checking out the repo, run bin/setup
to install dependencies. Then, run rake spec
to run the tests. You can also run bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
. To release a new version, update the version number in version.rb
, and then run bundle exec rake release
, which will create a git tag for the version, push git commits and tags, and push the .gem
file to rubygems.org.
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/[USERNAME]/conflow. This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the Contributor Covenant code of conduct.
License
The gem is available as open source under the terms of the MIT License.
Code of Conduct
Everyone interacting in the Conflow project’s codebases, issue trackers, chat rooms and mailing lists is expected to follow the code of conduct.