RailsTransactionalOutbox
An implementation of transactional outbox pattern to be used with Rails.
Installation
Add this line to your application's Gemfile:
gem 'rails-transactional-outbox'
And then execute:
$ bundle install
Or install it yourself as:
$ gem install rails-transactional-outbox
Usage
To get familiar with the pattern:
Create the initializer with the following content:
Rails.application.config.to_prepare do
RailsTransactionalOutbox.configure do |config|
config.database_connection_provider = ActiveRecord::Base # required
config.transaction_provider = ActiveRecord::Base # required
config.logger = Rails.logger # required
config.outbox_model = OutboxEntry # required
config.error_handler = Sentry # non-required, but highly recommended, defaults to RailsTransactionalOutbox::ErrorHandlers::NullErrorHandler. When using Sentry, you will probably want to exclude SignalException `config.excluded_exceptions += ["SignalException"]`.
config.transactional_outbox_worker_sleep_seconds = 1 # optional, defaults to 0.5
config.transactional_outbox_worker_idle_delay_multiplier = 5 # optional, defaults to 1, if there are no outbox entries to be processed, then the sleep time for the thread will be equal to transactional_outbox_worker_idle_delay_multiplier * transactional_outbox_worker_sleep_seconds
config.outbox_batch_size = 100 # optional, defaults to 100
config.add_record_processor(MyCustomOperationProcerssor) # optional, by default it contains only one processor for ActiveRecord, but you could add more
config.raise_not_found_model_error = true # optional, defaults to true. Should the error be raised if outbox entry model is not found
config.lock_client = Redlock::Client.new([ENV["REDIS_URL"]]) # required if you want to use RailsTransactionalOutbox::OutboxEntriesProcessors::OrderedByCausalityKeyProcessor, defaults to RailsTransactionalOutbox::NullLockClient. Check its interface and the interface of `redlock` gem. To cut the long story short, when the lock is acquired, a hash with the structure outlined in RailsTransactionalOutbox::NullLockClient should be yielded, if the lock is not acquired, a nil should be yielded.
config.lock_expiry_time = 10_000 # not required, defaults to 10_000, the unit is milliseconds
config.outbox_entries_processor = `RailsTransactionalOutbox::OutboxEntriesProcessors::OrderedByCausalityKeyProcessor`.new # not required, defaults to RailsTransactionalOutbox::OutboxEntriesProcessors::NonOrderedProcessor.new
config.outbox_entry_causality_key_resolver = ->(model) { model.tenant_id } # not required, defaults to a lambda returning nil. Needed when using `outbox_entry_causality_key_resolver`
config.unprocessed_causality_keys_limit = 100_000 # not required, defaults to 10_000. Might be a good idea to decrease the value when you start experiencing OOMs - they are likely to be caused by fetching too many causality keys. It is likely to happen when you have huge amount of records to process.
config.datadog_statsd_client = Datadog::Statsd.new("localhost", 8125, namespace: "application_name.production") # needed only for latency tracking, defaults to `nil`
config.high_priority_sidekiq_queue = :critical # not required, defaults to `:rails_transactional_outbox_high_priority`
end
end
Create OutboxEntry model (or use a different name, just make sure to adjust config/migration) with the following content:
class OutboxEntry < ApplicationRecord
include RailsTransactionalOutbox::OutboxModel
# optional, if you want to use encryption
crypt_keeper :changeset, :arguments, encryptor: :postgres_pgp, key: ENV.fetch("CRYPT_KEEPER_KEY"), encoding: "UTF-8"
outbox_encrypt_json_for :changeset, :arguments
end
And use the following migration:
create_table(:outbox_entries) do |t|
t.string "resource_class"
t.string "resource_id"
t.string "event_name", null: false
t.string "context", null: false
t.datetime "processed_at"
t.text "arguments", null: false, default: "{}"
t.text "changeset", null: false, default: "{}"
t.string "causality_key"
t.datetime "failed_at"
t.datetime "retry_at"
t.string "error_class"
t.string "error_message"
t.integer "attempts", null: false, default: 0
t.datetime "created_at", precision: 6, null: false
t.datetime "updated_at", precision: 6, null: false
t.index %w[resource_class event_name], name: "idx_outbox_enc_entries_on_resource_class_and_event"
t.index %w[resource_class resource_id], name: "idx_outbox_enc_entries_on_resource_class_and_resource_id"
t.index ["context"], name: "idx_outbox_enc_entries_on_topic"
t.index ["created_at"], name: "idx_outbox_enc_entries_on_created_at"
t.index ["created_at"], name: "idx_outbox_enc_entries_on_created_at_not_processed", where: "processed_at IS NULL"
t.index ["causality_key", created_at"], name: "idx_outbox_enc_entries_on_c_key_crtd_at_n_proc", where: "processed_at IS NULL"
t.index %w[resource_class created_at], name: "idx_outbox_enc_entries_on_resource_class_and_created_at"
t.index %w[resource_class processed_at], name: "idx_outbox_enc_entries_on_resource_class_and_processed_at"
end
Keep in mind that arguments
and changeset
are text
columns here. If you don't want to use encryption, replace them with jsonb
columns:
t.jsonb "arguments", null: false, default: {}
t.jsonb "changeset", null: false, default: {}
The following columns: resource_class
, resource_id
and changeset
are dedicated to ActiveRecord integration. Do not try to modify these columns for custom processors.
As the last step, include RailsTransactionalOutbox::ReliableModel
module in the models that are supposed to have reliable after_commit
callbacks, for example:
class User < ActiveRecord::Base
include RailsTransactionalOutbox::ReliableModel
end
Now, you can just replace after_commit
callbacks with reliable_after_commit
. The interface is going to be the same as for after_commit
:
- you can provide
:on
option to specific when the callback should be executed - you can use both blocks or symbols as the method names
- you can pass
:if
and:unless
options - you can also use
reliable_after_create_commit
,reliable_after_update_commit
,reliable_after_destroy_commit
,reliable_after_save_commit
.
When executing the callbacks, you can use previous_changes
which will contain the changes that are persisted as changesets. One potential gotcha is that Time/Date types are stored as strings, so oyu might need to handle some conversion to be on the safe side.
Inclusion of this module will result in OutboxEntry records being created after create/update/destroy. For these entries, the context
column will be populated with active_record
value.
Ordering/Preserving causality
There are two type of processors that have very different behavior depending on the concurrency:
-
RailsTransactionalOutbox::OutboxEntriesProcessors::NonOrderedProcessor
(used by default):
By default, the order will be preserved only if there is no concurrency (i.e. a single process with a single thread). Internally, .lock("FOR UPDATE SKIP LOCKED")
is used to avoid conflicts and other issues related to concurrency but at the cost of no longer preserving the causality of outbox entries (although the entries are ordered by created_at
).
2RailsTransactionalOutbox::OutboxEntriesProcessors::OrderedByCausalityKeyProcessor
:
Uses lock (e.g. Redlock) to preserve causality determined by causality_key
(e.g. a tenant ID).
Custom processors
If you want to add some custom processor either for ActiveRecord or for custom service objects, create an object inheriting from RailsTransactionalOutbox::RecordProcessors::BaseProcessor
, which has the following interface:
class RailsTransactionalOutbox
class RecordProcessors
class BaseProcessor
def applies?(_record)
raise "implement me"
end
def call(_record)
raise "implement me"
end
end
end
end
For a reference, this is an example of ActiveRecordProcessor
:
class RailsTransactionalOutbox
class RecordProcessors
class ActiveRecordProcessor < RailsTransactionalOutbox::RecordProcessors::BaseProcessor
ACTIVE_RECORD_CONTEXT = "active_record"
private_constant :ACTIVE_RECORD_CONTEXT
def self.context
ACTIVE_RECORD_CONTEXT
end
def applies?(record)
record.context == ACTIVE_RECORD_CONTEXT
end
def call(record)
model = record.infer_model or raise CouldNotFindModelError.new(record)
model.previous_changes = record.transformed_changeset.with_indifferent_access
model.reliable_after_commit_callbacks.for_event_type(record.event_type).each do |callback|
callback.call(model)
end
end
class CouldNotFindModelError < StandardError
attr_reader :record
def initialize(record)
super()
@record = record
end
def to_s
"could not find model for outbox record: #{record.id}"
end
end
end
end
end
If you want to extent the behavior of ActiveRecordProcessor
, you could actually create a new processor that handles exactly the same context as multiple processors can be used for the same context.
When adding a custom processor for service objects/operations, you might want to use arguments
column, to keep all the arguments there.
If you use encryption and you want to deal with properly deserialized hash, you transformed_changeset
and transformed_arguments
methods (like ActiveRecordProcessor
does.)
When dealing with custom service objects, remember to create OutboxEntry records inside the same transaction:
class MyServiceObject
def call(user_id)
transaction do
execute_some_logic
OutboxEntry.create!(context: "service_object", event_name: "my_service_object_called", arguments: { user_id: user_id })
end
end
end
Running outbox worker
Use the following Rake task:
RAILS_TRANSACTIONAL_OUTBOX_THREADS_NUMBER=5 DB_POOL=10 bundle exec rake rails_transactional_outbox:worker
If you want to use just a single thread:
bundle exec rake bookingsync_prometheus:producer
Archiving old outbox records
You will probably want to periodically archive/delete processed outbox records. It's recommended to use tartarus-rb for that.
Here is an example config:
tartarus.register do |item|
item.model = OutboxEntry
item.cron = "5 4 * * *"
item.queue = "default"
item.archive_items_older_than = -> { 3.days.ago }
item.timestamp_field = :processed_at
item.archive_with = :delete_all_using_limit_in_batches
end
Outbox Processing Latency Tracking
It's highly recommended to tracking latency of processing outbox records defined as the difference between the processed
and created_at
timestamps.
RailsTransactionalOutbox.configure do |config|
config.datadog_statsd_client = Datadog::Statsd.new("localhost", 8125, namespace: "application_name.production") # required for latency tracking, defaults to `nil`
config.high_priority_sidekiq_queue = :critical # not required, defaults to `:rails_transactional_outbox_high_priority`
end
You also need to add a job to the sidekiq-cron schedule that will run every 1 minute:
Sidekiq.configure_server do |config|
config.on(:startup) do
RailsTransactionalOutbox::DatadogLatencyReporterScheduler.new.add_to_schedule
end
end
With this setup, you will have the following metrics available on DataDog:
"#{namespace}.rails_transactional_outbox.latency.minimum"
"#{namespace}.rails_transactional_outbox.latency.maximum"
"#{namespace}.rails_transactional_outbox.latency.average"
"#{namespace}.rails_transactional_outbox.latency.highest_since_creation_date
Health Checks
You need to explicitly enable the health check (e.g. in the initializer):
RailsTransactionalOutbox.enable_outbox_worker_healthcheck
To perform the actual health check, use bin/rails_transactional_outbox_health_check
. On success, the script exits with 0
status and on failure, it logs the error and exits with 1
status.
bundle exec rails_transactional_outbox_health_check
It works for both readiness and liveness checks.
Events, hooks and monitors
You can subscribe to certain events that are published by RailsTransactionalOutbox.monitor
. The monitor is based on dry-monitor
.
Available events and arguments are:
- "rails_transactional_outbox.started", no arguments
- "rails_transactional_outbox.stopped", no arguments
- "rails_transactional_outbox.shutting_down", no arguments
- "rails_transactional_outbox.record_processing_failed", arguments: outbox_record
- "rails_transactional_outbox.record_processed", no arguments: outbox_record
- "rails_transactional_outbox.error", arguments: error, error_message
- "rails_transactional_outbox.heartbeat", no arguments
Testing the logic from reliable_after_commit callbacks
The fastest way to handle it would be to add this to rails_helper.rb
:
ApplicationRecord.after_commit do
RailsTransactionalOutbox::OutboxEntriesProcessor.new.call
end
Development
After checking out the repo, run bin/setup
to install dependencies. Then, run rake spec
to run the tests. You can also run bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
. To release a new version, update the version number in version.rb
, and then run bundle exec rake release
, which will create a git tag for the version, push git commits and the created tag, and push the .gem
file to rubygems.org.
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/BookingSync/rails-transactional-outbox.
License
The gem is available as open source under the terms of the MIT License.