Low commit activity in last 3 years
No release in over a year
Forked from takus/fluent-plugin-dynamodb-streams; with fixes from cosmo0920/fluent-plugin-dynamodb-streams
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
 Dependencies

Development

~> 1.7
~> 10.0
>= 3.1.0

Runtime

>= 0.10.58, < 2
 Project Readme

AWS DynamoDB input plugin for fluentD

Gem Version Build Status

Fluentd input plugin for AWS DynamoDB Streams.

Forked from takus/fluent-plugin-dynamodb-streams; with fixes from cosmo0920/fluent-plugin-dynamodb-streams

Preparation

Create IAM user with a policy like the following.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator",
        "dynamodb:DescribeStream",
        "dynamodb:ListStreams"
      ],
      "Resource": "*"
    }
  ]
}

Or define aws_key_id and aws_sec_key in your config file.

Installation

Add this line to your application's Gemfile:

gem 'fluent-plugin-dynamodb-streams-alt'

And then execute:

$ bundle

Or install it yourself as:

$ gem install fluent-plugin-dynamodb-streams-alt

Configuration

<source>
  type dynamodb_streams
  #aws_key_id  AWS_ACCESS_KEY_ID
  #aws_sec_key AWS_SECRET_ACCESS_KEY
  #aws_region  AWS_DEFAULT_REGION
  stream_arn arn:aws:dynamodb:ap-northeast-1:000000000000:table/table_name/stream/2015-01-01T00:00:00.000
  pos_file /var/lib/fluent/dynamodb_streams_table_name
  fetch_interval 1
  fetch_size 1
</source>
  • tag: Fluentd tag.
  • stream_arn: DynamoDB Streams ARN.
  • pos_file: File to store last sequence number.
  • fetch_interval: The interval to fetch records in seconds. Default is 1 sec.
  • fetch_size: The maximum number of records fetches in each iteration. Default is 1.

Output

{
  "aws_region": "ap-northeast-1",
  "event_source": "aws:dynamodb",
  "event_version": "1.0",
  "event_id": "dfbdf4fe-6f2b-4b34-9b17-4b8caae561fa",
  "event_name": "INSERT",
  "dynamodb": {
    "stream_view_type": "NEW_AND_OLD_IMAGES",
    "sequence_number": "000000000000000000001",
    "size_bytes": 14,
    "keys": {
      "key": "value2"
    },
    "old_image": {
      "key": "value1"
    },
    "new_image": {
      "key": "value2"
    }
  }
}

TODO

  • store sequence number to DynamoDB
  • fetch records from each shards concurrently