No commit activity in last 3 years
No release in over 3 years
Fluent output plugin for Cassandra via Datastax Ruby Driver for Apache Cassandra
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Development

>= 1.0.0
>= 1.8.4
>= 3.12
>= 0
~> 3.1.4

Runtime

>= 0.12.29
 Project Readme

Cassandra plugin for Fluentd

Cassandra output plugin for Fluentd.

Implemented using the Datastax Ruby Driver for Apache Cassandra gem and targets CQL3 and Cassandra 1.2 - 3.x

Installation

via RubyGems : https://rubygems.org/gems/fluentd-plugin-cassandra-cqlfunction

fluent-gem install fluentd-plugin-cassandra-cqlfunction

Fluentd.conf Configuration

Filter Plugin:

<filter **>
  @type cassandra_selector				# fluent filter plugin
  host 127.0.0.1,127.0.0.2			        # default => 127.0.0.1
  port 9042						# default => 9092
  username						# default => null
  password						# default => null
  connect_timeout				# default => 10 (sec)
  keyspace ex						# cassandra keyspace
  tablename tb_ex					# cassandra table
  field fieldA,fieldB					# select field normal
  field_json fieldC                 # select field json string on base(ex fieldC='{"a":"1"}')
  where_condition fieldA='xxx' and fieldB=':keyfrominput;'	# keyfrominput (fieldB=':a;' --> fieldB='1')
</filter>

ex Filter ::

input -> {'a':'1'}
output 1 rec -> {'a':'1', 'fieldA':'xxx', 'fieldB':'yyy'}
output 2+ rec -> {'a':'1', 'data_cassandra': [{fieldA':'xxx', 'fieldB':'yyy'},{fieldA':'aaa', 'fieldB':'bbb'}]}

Output Plugin:

<match **>
   @type cassandra_upsert
   host 127.0.0.1,127.0.0.2
   port 9042
   username						# default => null
   password						# default => null
   connect_timeout				# default => 10 (sec)
   keyspace ex
   tablename tb_ex
   case_insert_value fieldPk='xxx', fieldB=':keyfrominput;'   #For insert case
   case_update_value fieldA='xxx', fieldB=':keyfrominput;'  #For update case
   where_condition_upd fieldPk='xxx' or fieldPk=':keyfrominput;'
</match>

<match **>
   @type cassandra_insert
   host 127.0.0.1,127.0.0.2
   port 9042
   username						# default => null
   password						# default => null
   connect_timeout				# default => 10 (sec)
  
   keyspace ex
   tablename tb_ex
   insert_value fieldPk='xxx', fieldB=':keyfrominput;'
</match>

<match **>
   @type cassandra_update
   host 127.0.0.1,127.0.0.2
   port 9092
   username						# default => null
   password						# default => null
   connect_timeout				# default => 10 (sec)
  
   keyspace ex
   tablename tb_ex
   update_value fieldPk='xxx', fieldB=':keyfrominput;'
   where_condition_upd fieldPk='xxx' or fieldPk=':keyfrominput;'
</match>