The project is in a healthy, maintained state
This gem is used for producing Kafka messages. It represents a wrapper over Waterdrop gem and is recommended for using as a transport with sbmt-outbox
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies
 Project Readme

Gem Version Build Status

Sbmt-KafkaProducer

This gem is used to produce Kafka messages. It is a wrapper over the waterdrop gem, and it is recommended for use as a transport with the sbmt-outbox gem.

Installation

Add this line to your app's Gemfile:

gem "sbmt-kafka_producer"

And then execute:

bundle install

Demo

Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/Kuper-Tech/outbox-example-apps

Auto configuration

We recommend going through the configuration and file creation process using the following Rails generators. Each generator can be run by using the --help option to learn more about the available arguments.

Initial configuration

If you plug the gem into your application for the first time, you can generate the initial configuration:

rails g kafka_producer:install

As a result, the config/kafka_producer.yml file will be created.

Producer class

A producer class can be generated with the following command:

rails g kafka_producer:producer MaybeNamespaced::Name sync topic

As the result, a sync producer will be created.

Outbox producer

To generate an Outbox producer for use with Gem sbmt-Outbox, run the following command:

rails g kafka_producer:outbox_producer SomeOutboxItem

Manual configuration

The config/kafka_producer.yml file is the main configuration for this gem.

default: &default
  deliver: true
  ignore_kafka_error: true
  # see more options at https://github.com/karafka/waterdrop/blob/master/lib/waterdrop/config.rb
  wait_on_queue_full: true
  max_payload_size: 1000012
  max_wait_timeout: 60000
  auth:
    kind: plaintext
  kafka:
    servers: "kafka:9092" # required
    max_retries: 2 # optional, default: 2
    required_acks: -1 # optional, default: -1
    ack_timeout: 1000 # in milliseconds, optional, default: 1000
    retry_backoff: 1000 # in milliseconds, optional, default: 1000
    connect_timeout: 2000 # in milliseconds, optional, default: 2000
    message_timeout: 55000 # in milliseconds, optional, default: 55000
    # kafka_config: # optional, low-level custom Kafka options (see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
    #   queue.buffering.max.messages: 100000
    #   queue.buffering.max.ms: 5

development:
  <<: *default

test:
  <<: *default
  deliver: false
  wait_on_queue_full: false

production:
  <<: *default

auth config section

The gem supports 2 variants: plaintext (default) and SASL-plaintext

SASL-plaintext:

auth:
  kind: sasl_plaintext
  sasl_username: user
  sasl_password: pwd
  sasl_mechanism: SCRAM-SHA-512

kafka config section

The servers key is required and should be in rdkafka format: without kafka:// prefix, for example: srv1:port1,srv2:port2,....

The kafka_config section may contain any rdkafka option

Producer class

To create a producer that will be responsible for sending messages to Kafka, copy the following code:

# app/producers/some_producer.rb
class SomeProducer < Sbmt::KafkaProducer::BaseProducer
  option :topic, default: -> { "topic" }

  def publish(payload, **options)
    sync_publish(payload, options)
    # async_publish(payload, options)
  end
end

Outbox producer config

Add the following lines to your config/outbox.yml file in the transports section:

outbox_items:
  some_outbox_item:
    transports:
      sbmt/kafka_producer:
        topic: 'topic'
        kafka: # optional kafka options
          required_acks: -1

Usage

To send a message to a Kafka topic, execute the following command:

SomeProducer.new.publish(payload, key: "123", headers: {"some-header" => "some-value"})

Metrics

The gem collects base producing metrics using Yabeda. See metrics at YabedaConfigurer.

Testing

To stub a producer request to real Kafka broker, you can use a fake class. To do this, please add require "sbmt/kafka_producer/testing" to the spec/rails_helper.rb.

Development

Install dip.

And run:

dip provision
dip rspec