Logstash AWS Kinesis Input Plugin
This is a AWS Kinesis input plugin for Logstash. Under the hood uses the Kinesis Client Library.
Installation
This plugin requires Logstash >= 2.0, and can be installed by Logstash itself.
bin/logstash-plugin install logstash-input-kinesis
Usage
input {
kinesis {
kinesis_stream_name => "my-logging-stream"
codec => json { }
}
}
Using with CloudWatch Logs
If you are looking to read a CloudWatch Logs subscription stream, you'll also want to install and configure the CloudWatch Logs Codec.
Configuration
This are the properties you can configure and what are the default values:
-
application_name
: The name of the application used in DynamoDB for coordination. Only one worker per unique stream partition and application will be actively consuming messages.- required: false
-
default value:
logstash
-
kinesis_stream_name
: The Kinesis stream name.- required: true
-
region
: The AWS region name for Kinesis, DynamoDB and Cloudwatch (if enabled)- required: false
-
default value:
us-east-1
-
checkpoint_interval_seconds
: How many seconds between worker checkpoints to DynamoDB. A low value ussually means lower message replay in case of node failure/restart but it increases CPU+network ussage (which increases the AWS costs).- required: false
-
default value:
60
-
metrics
: Worker metric tracking. By default this is disabled, set it to "cloudwatch" to enable the cloudwatch integration in the Kinesis Client Library.- required: false
-
default value:
nil
-
profile
: The AWS profile name for authentication. This ensures that the~/.aws/credentials
AWS auth provider is used. By default this is empty and the default chain will be used.- required: false
-
role_arn
: The AWS role to assume. This can be used, for example, to access a Kinesis stream in a different AWS account. This role will be assumed after the default credentials or profile credentials are created. By default this is empty and a role will not be assumed.- required: false
-
role_session_name
: Session name to use when assuming an IAM role. This is recorded in CloudTrail logs for example.- required: false
-
default value:
"logstash"
-
initial_position_in_stream
: The value for initialPositionInStream. Accepts "TRIM_HORIZON" or "LATEST".- required: false
-
default value:
"TRIM_HORIZON"
Additional KCL Settings
-
additional_settings
: The KCL provides several configuration options which can be set in KinesisClientLibConfiguration. These options are configured via various function calls that all begin withwith
. Some of these functions take complex types, which are not supported. However, you may invoke any one of thewithX()
functions that take a primitive by providing key-value pairs insnake_case
. For example, to set the dynamodb read and write capacity values, two functions exist, withInitialLeaseTableReadCapacity and withInitialLeaseTableWriteCapacity. To set a value for these, provide a hash ofadditional_settings => {"initial_lease_table_read_capacity" => 25, "initial_lease_table_write_capacity" => 100}
- required: false
-
default value:
{}
Authentication
This plugin uses the default AWS SDK auth chain, DefaultAWSCredentialsProviderChain, to determine which credentials the client will use, unless profile
is set, in which case ProfileCredentialsProvider is used.
The default chain follows this order trying to read the credentials:
-
AWS_ACCESS_KEY_ID
/AWS_SECRET_KEY
environment variables -
~/.aws/credentials
credentials file - EC2 instance profile
The credentials will need access to the following services:
- AWS Kinesis
- AWS DynamoDB: the client library stores information for worker coordination in DynamoDB (offsets and active worker per partition)
- AWS CloudWatch: if the metrics are enabled the credentials need CloudWatch update permisions granted.
Look at the documentation for deeper information on the default chain.
Contributing
- https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md#contribution-steps
- Fork it ( https://github.com/logstash-plugins/logstash-input-kinesis/fork )
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create a new Pull Request
Development
To download all jars:
bundler exec rake install_jars
To run all specs:
bundler exec rspec