Glass Octopus
A Kafka consumer framework. Like Rack but for Kafka.
Installation
Add this line to your application's Gemfile:
gem 'glass_octopus'
And then execute:
$ bundle
Or install it yourself as:
$ gem install glass_octopus
This gem requires Ruby 2.1 or higher.
Getting started
Pick your adapter:
-
ruby-kafka
# in your Gemfile gem "glass_octopus" gem "ruby-kafka"
Currently only ruby-kafka
is supported out of the box. If you need to use another adapter you can pass a class to config.adapter
. See documentation for GlassOctopus::Configuration#adapter
.
# in app.rb
require "bundler/setup"
require "glass_octopus"
app = GlassOctopus.build do
use GlassOctopus::Middleware::CommonLogger
run Proc.new { |ctx|
puts "Got message: #{ctx.message.key} => #{ctx.message.value}"
}
end
GlassOctopus.run(app) do |config|
config.adapter :ruby_kafka do |kafka|
kafka.broker_list = %[localhost:9092]
kafka.topic = "mytopic"
kafka.group_id = "mygroup"
kafka.client_id = "myapp"
end
end
Run it with bundle exec ruby app.rb
For more examples look into the example directory.
For the API documentation please see the documentation site
Handling Avro messages with Schema Registry
Glass Octopus can be used with Avro messages validated against a schema. For this, you need a running Schema Registry service.
You also need to have the avro_turf
gem installed.
# in your Gemfile
gem "avro_turf"
Add the AvroParser
middleware with the Schema Registry URL to your app.
# in app.rb
app = GlassOctopus.build do
use GlassOctopus::Middleware::AvroParser, "http://schema_registry_url:8081"
# ...
end
Supported middleware
-
ActiveRecord
Return any active connection to the pool after the message has been processed.
app = GlassOctopus.build do use GlassOctopus::Middleware::ActiveRecord # ... end
-
New Relic
Record message processing as background transactions. Also captures uncaught exceptions.
app = GlassOctopus.build do use GlassOctopus::Middleware::NewRelic, MyConsumer # ... end
-
Sentry
Report uncaught exceptions to Sentry.
app = GlassOctopus.build do use GlassOctopus::Middleware::Sentry # ... end
-
Common logger
Log processed messages and runtime of the processing.
app = GlassOctopus.build do use GlassOctopus::Middleware::CommonLogger # ... end
-
Parse messages as JSON
Parse message value as JSON. The resulting hash is placed in
context.params
.app = GlassOctopus.build do use GlassOctopus::Middleware::JsonParser # ... run MyConsumer end class MyConsumer def call(ctx) puts ctx.params # message value parsed as JSON puts ctx.message # Raw unaltered message end end
Optionally you can specify a class to be instantiated with the message hash.
app = GlassOctopus.build do use GlassOctopus::Middleware::JsonParser, class: MyMessage run MyConsumer end class MyMessage def initialize(attributes) attributes.each { |k,v| public_send("#{k}=", v) } end end
Development
Install docker and docker-compose to run Kafka and Zookeeper for tests.
Start Kafka and Zookeeper
$ docker-compose up
Run all tests including integration tests:
$ rake test:all
Running tests without integration tests:
$ rake # or rake test
When you are done run docker-compose down
to clean up docker containers.
License
The gem is available as open source under the terms of the MIT License.