Heller
As indicated by the badge above this project is no longer maintained at all. I highly suggest that you use one of the other Kafka gems that are available, and in case you're explicitly looking for a JRuby gem I recommend burtcorp/kafka-clients-jruby.
Heller is a JRuby wrapper around the Kafka Producer and (Simple)Consumer APIs, much like Mikka is for Akka's Java API.
The goal of Heller is to make the Producer and Consumer APIs of Kafka a bit more Rubyesque, and useful as building blocks for creating more advanced producer and consumer implementations.
Producer API
Heller::Producer
is an extremely simple wrapper class around
kafka.javaapi.producer.Producer
and provides some convenience in configuring the
producer with more Rubyesque names for configuration parameters.
All configuration parameters are supported and can be used in the following way:
require 'heller'
producer = Heller::Producer.new('localhost:9092,localhost:9093', {
:type => :async,
:serializer => 'kafka.serializer.StringEncoder',
:key_serializer => 'kafka.serializer.DefaultEncoder',
:partitioner => 'kafka.producer.DefaultPartitioner',
:compression => :gzip,
:num_retries => 5,
:retry_backoff => 1500,
:metadata_refresh_interval => 5000,
:batch_size => 2000,
:client_id => 'spec-client',
:request_timeout => 10000,
:buffer_limit => 100 * 100,
:buffer_timeout => 1000 * 100,
:enqueue_timeout => 1000,
:socket_buffer => 1024 * 1000,
:ack => -1
})
Check the official Kafka docs for possible values for each parameter.
To send messages you creates instances of Heller::Message
and feed them to the
#push
method of the producer:
messages = 3.times.map { Heller::Message.new('test', 'my message!') }
producer.push(messages)
Want to partition messages based on some key? Sure, no problem:
messages = [0, 1, 2].map { |key| Heller::Message.new('test', "my message using #{key} as key!", key.to_s) }
producer.push(messages)
Consumer API
Heller::Consumer
wraps kafka.javaapi.consumer.SimpleConsumer
and provides
basically the same methods, but with a bit more convenience (or at least I'd
like to think so).
A Consumer
can be created in the following way:
require 'heller'
options = {
# 'generic' consumer options
:timeout => 5000, # socket timeout
:buffer_size => 128 * 1024, # socket buffer size
:client_id => 'my-consumer', # id of consumer
# fetch request related options
:max_wait => 4500, # maximum time (ms) the consumer will wait for response of a request
:min_bytes => 1024 # minimum amount of bytes the server (broker) should return for a fetch request
}
consumer = Heller::Consumer.new('localhost:9092', options)
The options specified in the options hash are also described in the official Kafka docs, albeit they're described in the context of their high-level consumer.
The consumer API exposes the following methods: #fetch
, #metadata
,
#offsets_before
, #earliest_offset
and #latest_offset
, and
their usage is described below.
Fetching messages
topic = 'my-topic'
partition = offset = 0
response = consumer.fetch(Heller::FetchRequest.new(topic, partition, offset))
if response.error? && (error_code = response.error(topic, partition)) != 0
puts "Got error #{Heller::Errors.error_for(error_code)}!"
else
message_enumerator = response.messages(topic, partition)
message_enumerator.each do |offset, message_payload|
puts "#{offset}: #{message_payload}"
end
end
See Heller::FetchResponse
(and the related specs) for usage of other
methods.
It's also possible to pass an array of FetchRequest
objects to #fetch
.
requests = [0, 1, 2].map { |i| Heller::FetchRequest.new(topic, i, offset) }
fetch_response = consumer.fetch(requests)
Topic and partition metadata
kafka.javaapi.consumer.SimpleConsumer
exposes a method called #topic_metadata
, which in Heller has been "renamed" to just #metadata
.
topics = [1, 2, 3].map { |i| "my-topic-#{i}" }
response = consumer.metadata(topics)
response.each do |topic, partition_metadata|
puts "Got metadata for (#{topic}:#{partition_metadata.partition_id})"
end
leader = response.leader_for('my-topic-1', 0)
puts "Leader for my-topic-1:0 is at #{leader.connection_string} (#{leader.zk_string})"
isrs = response.isr_for('my-topic-1', 0) # also aliased as #in_sync_replicas_for
isrs.each do |isr|
puts "An in-sync replica for my-topic-1:0 is at #{isr.connection_string} (#{isr.zk_string})"
end
Get offsets for topic-partition combinations
# arguments = *[topic, partition, timestamp (ms), max number of offsets]
request = Heller::OffsetRequest.new('my-topic', 0, Time.now.to_i * 1000, 10)
response = consumer.offsets_before(request)
if response.error? && (error_code = response.error('my-topic', 0)) != 0
puts "Got error #{Heller::Errors.error_for(error_code)}!"
else
offsets = response.offsets('my-topic', 0)
puts "Got #{offsets.join(', ')} offsets for my-topic:0"
end
Heller::Consumer
also exposes (as SimpleConsumer
) two convenience
methods for retrieving the earliest / latest offset for a topic-partition
combination.
earliest_offset = consumer.earliest_offset('my-topic', 0)
latest_offset = consumer.latest_offset('my-topic', 0)
puts "Earliest available offset is #{earliest_offset}"
puts "Latest available offset is #{latest_offset}"
Copyright
Copyright 2013-2015 Mathias Söderberg and contributors
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.