Fluent::Plugin::Kafka
TODO: Write a gem description TODO: Also, I need to write tests
Installation
Add this line to your application's Gemfile:
gem 'fluent-plugin-kafka'
And then execute:
$ bundle
Or install it yourself as:
$ gem install fluent-plugin-kafka
Usage
Input plugin (@type 'kafka')
<source>
@type kafka
host <broker host>
port <broker port: default=9092>
topics <listening topics(separate with comma',')>
format <input text type (text|json|ltsv|msgpack)>
message_key <key (Optional, for text format only, default is message)>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
max_bytes (integer) :default => nil (Use default of Poseidon)
max_wait_ms (integer) :default => nil (Use default of Poseidon)
min_bytes (integer) :default => nil (Use default of Poseidon)
socket_timeout_ms (integer) :default => nil (Use default of Poseidon)
</source>
Supports following Poseidon::PartitionConsumer options.
- max_bytes — default: 1048576 (1MB) — Maximum number of bytes to fetch
- max_wait_ms — default: 100 (100ms) — How long to block until the server sends us data.
- min_bytes — default: 1 (Send us data as soon as it is ready) — Smallest amount of data the server should send us.
- socket_timeout_ms - default: 10000 (10s) - How long to wait for reply from server. Should be higher than max_wait_ms.
Supports a start of processing from the assigned offset for specific topics.
<source>
@type kafka
host <broker host>
port <broker port: default=9092>
format <input text type (text|json|ltsv|msgpack)>
<topic>
topic <listening topic>
partition <listening partition: default=0>
offset <listening start offset: default=-1>
</topic>
<topic>
topic <listening topic>
partition <listening partition: default=0>
offset <listening start offset: default=-1>
</topic>
</source>
See also Poseidon::PartitionConsumer for more detailed documentation about Poseidon.
Input plugin (@type 'kafka_group', supports kafka group)
<source>
@type kafka_group
brokers <list of broker-host:port, separate with comma, must set>
zookeepers <list of broker-host:port, separate with comma, must set>
zookeeper_path <broker path in zookeeper> :default => /brokers/ids # Set path in zookeeper for brokers
consumer_group <consumer group name, must set>
topics <listening topics(separate with comma',')>
format <input text type (text|json|ltsv|msgpack)>
message_key <key (Optional, for text format only, default is message)>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
max_bytes (integer) :default => nil (Use default of Poseidon)
max_wait_ms (integer) :default => nil (Use default of Poseidon)
min_bytes (integer) :default => nil (Use default of Poseidon)
socket_timeout_ms (integer) :default => nil (Use default of Poseidon)
</source>
Supports following Poseidon::PartitionConsumer options.
- max_bytes — default: 1048576 (1MB) — Maximum number of bytes to fetch
- max_wait_ms — default: 100 (100ms) — How long to block until the server sends us data.
- min_bytes — default: 1 (Send us data as soon as it is ready) — Smallest amount of data the server should send us.
- socket_timeout_ms - default: 10000 (10s) - How long to wait for reply from server. Should be higher than max_wait_ms.
See also Poseidon::PartitionConsumer for more detailed documentation about Poseidon.
Output plugin (non-buffered)
<match *.**>
@type kafka
# Brokers: you can choose either brokers or zookeeper.
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
zookeeper <zookeeper_host>:<zookeeper_port> # Set brokers via Zookeeper
zookeeper_path <broker path in zookeeper> :default => /brokers/ids # Set path in zookeeper for kafka
default_topic <output topic>
default_partition_key (string) :default => nil
output_data_type (json|ltsv|msgpack|attr:<record name>|<formatter name>)
output_include_tag (true|false) :default => false
output_include_time (true|false) :default => false
max_send_retries (integer) :default => 3
required_acks (integer) :default => 0
ack_timeout_ms (integer) :default => 1500
compression_codec (none|gzip|snappy) :default => none
</match>
Supports following Poseidon::Producer options.
- max_send_retries — default: 3 — Number of times to retry sending of messages to a leader.
- required_acks — default: 0 — The number of acks required per request.
- ack_timeout_ms — default: 1500 — How long the producer waits for acks.
- compression_codec - default: none - The codec the producer uses to compress messages.
See also Poseidon::Producer for more detailed documentation about Poseidon.
This plugin supports compression codec "snappy" also. Install snappy module before you use snappy compression.
$ gem install snappy
Load balancing
Messages will be sent broker in a round-robin manner as default by Poseidon, but you can set default_partition_key
in config file to route messages to a specific broker.
If key name partition_key
exists in a message, this plugin set its value of partition_key as key.
|default_partition_key|partition_key| behavior | |-|-| |Not set|Not exists| All messages are sent in round-robin | |Set| Not exists| All messages are sent to specific broker | |Not set| Exists | Messages which have partition_key record are sent to specific broker, others are sent in round-robin| |Set| Exists | Messages which have partition_key record are sent to specific broker with parition_key, others are sent to specific broker with default_parition_key|
Buffered output plugin
<match *.**>
@type kafka_buffered
# Brokers: you can choose either brokers or zookeeper.
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
zookeeper <zookeeper_host>:<zookeeper_port> # Set brokers via Zookeeper
zookeeper_path <broker path in zookeeper> :default => /brokers/ids # Set path in zookeeper for kafka
default_topic <output topic>
default_partition_key (string) :default => nil
flush_interval <flush interval (sec) :default => 60>
buffer_type (file|memory)
output_data_type (json|ltsv|msgpack|attr:<record name>|<formatter name>)
output_include_tag (true|false) :default => false
output_include_time (true|false) :default => false
max_send_retries (integer) :default => 3
required_acks (integer) :default => 0
ack_timeout_ms (integer) :default => 1500
compression_codec (none|gzip|snappy) :default => none
</match>
Supports following Poseidon::Producer options.
- max_send_retries — default: 3 — Number of times to retry sending of messages to a leader.
- required_acks — default: 0 — The number of acks required per request.
- ack_timeout_ms — default: 1500 — How long the producer waits for acks.
- compression_codec - default: none - The codec the producer uses to compress messages.
See also Poseidon::Producer for more detailed documentation about Poseidon.
This plugin supports compression codec "snappy" also. Install snappy module before you use snappy compression.
$ gem install snappy
Load balancing
Messages will be sent broker in a round-robin manner as default by Poseidon, but you can set default_partition_key
in config file to route messages to a specific broker.
If key name partition_key
exists in a message, this plugin set its value of partition_key as key.
|default_partition_key|partition_key| behavior | |-|-| |Not set|Not exists| All messages are sent in round-robin | |Set| Not exists| All messages are sent to specific broker | |Not set| Exists | Messages which have partition_key record are sent to specific broker, others are sent in round-robin| |Set| Exists | Messages which have partition_key record are sent to specific broker with parition_key, others are sent to specific broker with default_parition_key|
Contributing
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Added some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request