Kafka::Retryable
Consumer failures are common for Kafka consumers, and if these are left unhandled, it can lead to data losses. There are many ways to buffer and retry messages that encounter failures. The aim of kafka-retryable
is to provide multiple types of failure handling policies that can be used depending on the need.
Installation
Add this line to your application's Gemfile:
gem 'kafka-retryable'
And then execute:
$ bundle
Or install it yourself as:
$ gem install kafka-retryable
Usage
kafka_retryable
allows failed messages to be buffered into a dead_letter_queue
inside Kafka. In the future, it will provision for retry mechanisms and the ability to buffer in other messaging systems.
- Setup the gem by setting configuration parameters. In a rails project, this can be done inside
config/initializers/kafka_retryable.rb
for instance:
Kafka::Retryable.setup do |config|
config.failure_handling.enabled = true
config.buffer.kafka.seed_brokers = ['kafka://localhost:9092']
end
These are the available configurations:
Option | Value type | Description | Default |
---|---|---|---|
failure_handling.enabled | Boolean | Set if buffering failed messages to a topic in Kafka is enabled | true |
buffer.kafka.seed_brokers | Array | Kafka broker URL. Example: kafka://127.0.0.1:9092 or kafka+ssl://127.0.0.1:909 | nil |
- Start with declaring configuration parameters for your class using the following syntax:
Class KafkaConsumer
include Kafka::Retryable::HandleFailure
configure_handler buffer: :kafka,
dead_letter_queue: :topic_t1,
exception_blacklist: [Karafka::InvalidMessageError],
after_failure: ->(error, message) { Bugsnag.notify("#{error}-#{message}") }
def consume
# Message consumption logic goes here
end
end
-
dead_letter_queue
: Topic where the consumer should enqueue the failure message. -
exception_blacklist
: List of exception classes for which the error handling logic does not apply -
exception_whitelist
: List of exception classes for which the error handling logic should apply -
after_failure
: Accepts a proc that is executed after failure handling is completed
If exception_blacklist
and exception_whitelist
are both missing, then the error handling logic will apply for all exception classes.
- Enclose message processing logic inside
handle_failure(message)
helper method provided bykafka-retryable
handle_failure(message) do
MessageProcessor.new(message).perform
end
Overall, this is how Kafka Consumers using kafka-retryable
will look like:
Class KafkaConsumer
include Kafka::Retryable::HandleFailure
configure_handler buffer: :kafka,
dead_letter_queue: :topic_t1,
exception_blacklist: [Karafka::InvalidMessageError],
after_failure: ->(error, message) { Bugsnag.notify("#{error}-#{message}") }
def consume
handle_failure(message)
# Message consumption logic goes here
end
end
end
kafka-retryable
will make sure any exceptions outside the exception_blacklist
will cause the message to be enqueued into the dead_letter_queue
specified in the failure_handler
configuration.
Development
After checking out the repo, run bin/setup
to install dependencies. You can also run bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
. To release a new version, update the version number in version.rb
, and then run bundle exec rake release
, which will create a git tag for the version, push git commits and tags, and push the .gem
file to rubygems.org.
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/[USERNAME]/kafka-retryable. This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the Contributor Covenant code of conduct.
License
The gem is available as open source under the terms of the MIT License.
Code of Conduct
Everyone interacting in the Kafka::Retryable project’s codebases, issue trackers, chat rooms and mailing lists is expected to follow the code of conduct.