Shoryuken::Waiter
Based heavily on the concept of shoryuken-later
, Shoryuken::Waiter
allows jobs to be scheduled greater that 15 minutes into the future when using Shoryuken.
Notice: Version 0.x is tightly coupled Rails and Shoryuken SQS queues. 1.x should add support for Shoryuken workers (currently only Active Job is supported), and more configurable DynamoDB tables. 0.0.x builds rely on Shoryuken changes that have not yet been merged into master.
Usage
Integration with ActiveJob
Because SQS only allows messages to be delayed up to 15 minutes, the Active Job
adapter that ships with Shoryuken
does not allow jobs to be scheduled further than 15 minutes out. Shoryuken::Waiter
provides a replacement adapter that wraps the Shoryuken
adapter, and allows for jobs to be scheduled arbitrarily far into the future.
# config/application.rb
config.active_job.queue_adapter = :shoryuken_waiter
If a job is created with a delay of more than 15 minutes, and Shoryuken::Waiter
finds a DynamoDB table whose name matches the name of the queue for which the job was intended, the properties of the job are captured to that table, allowing it to be rescheduled later on. If a matching table can't be found or the job's is 15 minutes or less the native Shoryuken
adapters handles it.
The schedule poller
There is no separate poller process that is run to watch for scheduled jobs. Whenever the Shoryuken
message processor starts up, a Celluloid supervisor maintains a single Shoryuken::Waiter::Querier
that polls DynamoDB for jobs. The poller shuts down when Shoryuken
stops.
The frequency that tables are queried for scheduleable jobs is 5.minutes
by default. It can be customized by setting the delay
property in the Shoryuken
configuration.
# config/shoryuken.yml
waiter:
delay: 30 # seconds
Queues & Tables
When Shoryuken::Waiter
loads it tries to find a DynamoDB table for each SQS Shoryuken
queue that is configured. When a table with a matching name can't be found, Shoryuken::Waiter
will not poll for jobs intended for that queue, and creating jobs intended for that queue is handled by the the Shoryuken
Active Job
adapter.
Note: There is no technical reason that each queue needs it's own table. It would be more cost effective to put all jobs into a single table, so that may be a change that is made in the future.
Creating DynamoDB Tables
Tables being used with Shoryuken::Waiter
must be created with certain properties to work correctly.
-
Table name: The name of the table must match the name of an SQS queue registered with
Shoryuken
-
Primary key: The
item key
must be aString
with the valuescheduler
, and thesort key
must be aString
with the valuejob_id
-
Secondary index: An index (generally a Local Secondary Index) must be added to the table. The
item key
of the index's primary key must be aString
with the valuescheduler
, and thesort key
of the primary key must be aNumber
with the valueperform_at
. The index name must bescheduler-perform_at-index
. Projected attributes generally can be set toall
.
Other properties of the table, such as the provisioned capacity will be application dependent.
Pricing
Specific pricing details for DynamoDB should be considered when determining the cost of using the service for Shoryuken::Waiter
with your app. AWS's Free Tier, which does not expire for DynamoDB, provides 25 Units of read and write capacity and 25 GB of storage for an account. In general this would be enough capacity to support several Shoryuken::Waiter
tables at no cost for most applications.
Increasing the frequency of polling, scheduling many jobs that must be delayed, or having many jobs delayed for a very long time leading to a large on-disk footprint of the tables could lead to higher costs.
Be aware that the secondary index that is required for Shoryuken::Waiter
counts against additional throughput capacity and storage costs.
Internals
When a job is scheduled normally with Shoryuken
, the message is sent to SQS with four properties queue_url
, message_body
, delay_seconds
, and message_attributes
.
When the job must be delayed, Shoryuken::Waiter
captures enough information to recreate an identical SQS message later on. The message_body
and message_attributes
are stored unaltered. delay_seconds
is tranformed into a timestamp, relative to when the job was created, which can be used to query items from a DynamoDB table. The message's queue_url
is discarded, since it can be recreated from the job's queue name, which is already captured as part of the message_body
.
Query
After a job is delayed and stored in a DynamoDB table, it must be retrieved at the appropriate time, and sent to SQS like was originally intended.
The poller needs a way of efficiently finding jobs in tables, and the DynamoDB API provides the query
operation for that purpose.
The query
operation can either search based on just a hash key, or a hash key and a range key. Shoryuken::Waiter
is looking for all jobs that are scheduled before a certain point in time (15 minutes from the time of the query). It's only possible to make comparison queries with a range key, so using just a hash key would not work. Since the hash key is required for all query operations, but there is no meaningful key in this case, it's value can be arbitrary, as long as it is consistent.
Using the job's timestamp as the range key would be reasonable, except that more than one job may be scheduled for the same time. Since all items are being given the same hash key, identical range keys would violate the primary key uniqueness constraint. Instead of using a value from the table's primary key for the comparison query, a secondary index can be added to the table, which also allows for range keys to be queried.
The range key of the table's primary key simply needs to provide uniqueness, so the job's unique ID can be used for that. The hash key of the secondary index must match the hash key of the table, so it will also be the arbitrary value that is selected. The range key will be the perform_at
timestamp, and since secondary indexes do not require primary keys to be unique, there is no risk of collision between jobs with the same timestamp.
A query can now be performed to find any item in the table with a given hash key (the arbitrary value shared by all items in the table) and a range key that only returns jobs scheduled no later than 15 minutes from now.
If the query returns any items, they must be turned back into SQS messages and sent to their intended queue. The stored values are pulled back out of the item, and transformed if necessary (e.g. turning perform_at
back to delay_seconds
).
The query returns sets of items, and SQS can be sent sets of messages (at most 10). The batches of items are processed into batches of messages, which get sent back to Shoryuken
, and ultimately to SQS.