Jruby::Kafka
Prerequisites
-
Apache Kafka version 0.8.2.1 installed and running.
-
JRuby installed.
About
This gem is primarily used to wrap most of the [Kafka 0.8.2.1 high level consumer] and [Kafka 0.8.2.1 producer] API into jruby. The [Kafka Consumer Group Example] is pretty much ported to this library.
- Kafka 0.8.2.1 high level consumer
- Kafka 0.8.2.1 java producer
- Kafka 0.8.1.0 scala producer
- Kafka Consumer Group Example
Note that the Scala Kafka::Producer
will deprecate and Java Kafka::KafkaProducer
is taking over.
Installation
This package is now distributed via RubyGems.org but you can build it using the following instructions.
From the root of the project run:
$ bundle install
$ rake setup jar package
You can run the following to install the resulting package:
$ gem install jruby-kafka*.gem
Add this line to your application's Gemfile:
gem 'jruby-kafka'
Usage
If you want to run the tests, make sure you already have downloaded Kafka 0.8.X, followed the kafka quickstart instructions and have KAFKA_PATH set in the environment.
Usage
The following producer code sends a message to a test
topic
require 'jruby-kafka'
producer_options = {:broker_list => "192.168.59.103:9092", "serializer.class" => "kafka.serializer.StringEncoder"}
producer = Kafka::Producer.new(producer_options)
producer.connect()
100.times { |i| producer.send_msg("test", i.to_s, i.to_s) }
The following consumer example indefinitely listens to the test
topic and prints out messages as they are received from Kafka:
require 'jruby-kafka'
consumer_options = {
:topic_id => "test",
:zk_connect => "192.168.59.103:2181",
:group_id => "test_group",
:auto_commit_enable => "#{false}",
:auto_offset_reset => "smallest",
}
messages = Queue.new
consumer_group = Kafka::Group.new(consumer_options)
consumer_group.run(2) do |message, metadata|
messages << [message, metadata]
consumer_group.commit(metadata)
sleep 0.5
print message
end
trap('SIGINT') do
consumer_group.shutdown()
puts "Consumed #{messages.size} messages"
end
Using in logstash:
Check out this repo: https://github.com/joekiller/logstash-kafka
Contributing
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request