RubyKafkaRetry
The ruby_kafka_retry
gem provides a mechanism to handle message retries and dead-letter queue (DLQ) functionality in Ruby applications using Kafka. It ensures messages are retried with an increasing delay before being sent to a DLQ.
Installation
Add this line to your application's Gemfile:
gem 'ruby_kafka_retry', '~> 0.1.0'
And then execute:
$ bundle install
Or install it yourself as:
$ gem install ruby_kafka_retry
Usage
Retrying Messages
To handle message retries, use the RubyKafkaRetry::RetryFailedEvent
class. This class allows you to specify the retry topic, DLQ topic, and the message to be retried, along with an optional maximum retry attempt count.
retry_topic = 'my_retry_topic'
dlq_topic = 'my_dlq_topic'
topic_message = { key: 'value' } # The message to be processed
max_retry_attempt = 5 # Optional parameter, default is 3 if not provided
retry_event = RubyKafkaRetry::RetryFailedEvent.new(retry_topic, dlq_topic, topic_message, max_retry_attempt)
retry_event.retry
Detailed Description
-
First Retry Attempt: If the topic_message does not include the
current_retry_attempt
key, the gem considers it as the first retry attempt andcurrent_retry_attempt
will be appended to thetopic_message
with the value as 1. The modifiedtopic_message
will then be published to the retry_topic. -
Message Format: The
topic_message
must be a hash. If a non-hash object is passed, the gem will raise an error:raise TypeError, 'topic_message must be a Hash'
-
Retry Logic:
- If the
current_retry_attempt
value in the topic_message reaches themax_retry_attempt
count, the message will be published to the DLQ topic. - If the
current_retry_attempt
value is less than themax_retry_attempt
, thecurrent_retry_attempt
value will be incremented, and the message will be republished to theretry_topic
after a delay. - The delay before republishing is calculated as
2 ** current_retry_attempt
minutes. - The
max_retry_attempt
parameter is optional. If it is not provided, the default value is3
.
- If the
Example Workflow
Here's a step-by-step example workflow:
- A message
topic_message = { key: 'value' }
is received and processed. - If processing fails, it triggers a retry:
-
current_retry_attempt
key is added to the message if not present. - Message becomes
{ key: 'value', current_retry_attempt: 1 }
.
-
- The message is published to the
retry_topic
after a delay of 2 ** 1 (2 minutes). - If processing fails again, current_retry_attempt is incremented, and the message is republished after a delay of 2 ** 2 (4 minutes).
- This continues until
current_retry_attempt
reachesmax_retry_attempt
. - Once
max_retry_attempt
is reached, the message is published to the DLQ topic.
Configuration
You need to configure the gem by creating a YAML configuration file at config/ruby_kafka_retry.yml
. This file should contain the following settings:
development:
client_id: "my_kafka_client_id"
brokers:
- "localhost:9092"
ssl_ca_certs_from_system: false
redis_host: "127.0.0.1"
redis_db: "10"
redis_port: "6379"
sidekiq_queue: "test_retry_queue"
stage:
client_id: "my_kafka_client_id"
brokers:
- "localhost:9092"
ssl_ca_certs_from_system: false
redis_host: "127.0.0.1"
redis_db: "10"
redis_port: "6379"
sidekiq_queue: "test_retry_queue"
production:
client_id: "my_kafka_client_id"
brokers:
- "localhost:9092"
ssl_ca_certs_from_system: false
redis_host: "127.0.0.1"
redis_db: "10"
redis_port: "6379"
sidekiq_queue: "test_retry_queue"
add the same sidekiq_queue
in sidekiq.yml
file as well
Dependencies
The ruby_kafka_retry
gem depends on the following gems:
- ruby-kafka
- sidekiq
Running Services
To use this gem, ensure the following services are running in the background:
- Kafka Server: Ensure your Kafka server is up and running.
- Sidekiq Server: Start your Sidekiq server to handle background job processing.
Development
After checking out the repo, run bin/setup
to install dependencies. You can also run bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
. To release a new version, update the version number in version.rb
, and then run bundle exec rake release
, which will create a git tag for the version, push git commits and the created tag, and push the .gem
file to rubygems.org.
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/veeraveeraveera/ruby_kafka_retry.