0.0
Low commit activity in last 3 years
A long-lived project that still receives updates
GlassOctopus provides a minimal, modular and adaptable interface for developing Kafka consumers in Ruby. In its philosophy it is very close to Rack.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Development

 Project Readme

Glass Octopus

A Kafka consumer framework. Like Rack but for Kafka.

Installation

Add this line to your application's Gemfile:

gem 'glass_octopus'

And then execute:

$ bundle

Or install it yourself as:

$ gem install glass_octopus

This gem requires Ruby 2.1 or higher.

Getting started

Pick your adapter:

  • ruby-kafka

      # in your Gemfile
      gem "glass_octopus"
      gem "ruby-kafka"
    

Currently only ruby-kafka is supported out of the box. If you need to use another adapter you can pass a class to config.adapter. See documentation for GlassOctopus::Configuration#adapter.

# in app.rb
require "bundler/setup"
require "glass_octopus"

app = GlassOctopus.build do
  use GlassOctopus::Middleware::CommonLogger

  run Proc.new { |ctx|
    puts "Got message: #{ctx.message.key} => #{ctx.message.value}"
  }
end

GlassOctopus.run(app) do |config|
  config.adapter :ruby_kafka do |kafka|
    kafka.broker_list = %[localhost:9092]
    kafka.topic       = "mytopic"
    kafka.group_id    = "mygroup"
    kafka.client_id   = "myapp"
  end
end

Run it with bundle exec ruby app.rb

For more examples look into the example directory.

For the API documentation please see the documentation site

Handling Avro messages with Schema Registry

Glass Octopus can be used with Avro messages validated against a schema. For this, you need a running Schema Registry service. You also need to have the avro_turf gem installed.

# in your Gemfile
gem "avro_turf"

Add the AvroParser middleware with the Schema Registry URL to your app.

# in app.rb
app = GlassOctopus.build do
  use GlassOctopus::Middleware::AvroParser, "http://schema_registry_url:8081"
  # ...
end

Supported middleware

  • ActiveRecord

    Return any active connection to the pool after the message has been processed.

    app = GlassOctopus.build do
      use GlassOctopus::Middleware::ActiveRecord
      # ...
    end
  • New Relic

    Record message processing as background transactions. Also captures uncaught exceptions.

    app = GlassOctopus.build do
      use GlassOctopus::Middleware::NewRelic, MyConsumer
      # ...
    end
  • Sentry

    Report uncaught exceptions to Sentry.

    app = GlassOctopus.build do
      use GlassOctopus::Middleware::Sentry
      # ...
    end
  • Common logger

    Log processed messages and runtime of the processing.

    app = GlassOctopus.build do
      use GlassOctopus::Middleware::CommonLogger
      # ...
    end
  • Parse messages as JSON

    Parse message value as JSON. The resulting hash is placed in context.params.

    app = GlassOctopus.build do
      use GlassOctopus::Middleware::JsonParser
      # ...
      run MyConsumer
    end
    
    class MyConsumer
      def call(ctx)
        puts ctx.params # message value parsed as JSON
        puts ctx.message # Raw unaltered message
      end
    end

    Optionally you can specify a class to be instantiated with the message hash.

    app = GlassOctopus.build do
      use GlassOctopus::Middleware::JsonParser, class: MyMessage
      run MyConsumer
    end
    
    class MyMessage
      def initialize(attributes)
        attributes.each { |k,v| public_send("#{k}=", v) }
      end
    end

Development

Install docker and docker-compose to run Kafka and Zookeeper for tests.

Start Kafka and Zookeeper

$ docker-compose up

Run all tests including integration tests:

$ rake test:all

Running tests without integration tests:

$ rake # or rake test

When you are done run docker-compose down to clean up docker containers.

License

The gem is available as open source under the terms of the MIT License.