Fluentd Cloudwatch Plugin
Introduction
I am no longer actively using this plugin and am looking for maintainers. In my experience Cloudwatch Log Subscriptions are vastly superior to external API consumers, which are subject to limiting and state synchronization issues.
This gem was created out of frustration with existing solutions for Cloudwatch log ingestion into a Fluentd pipeline. Specifically, it has been designed to support:
- The 0.14.x fluentd plugin API
- Native IAM including cross-account authentication via STS
- Tidy state serialization
- HA configurations without ingestion duplication
Installation
Add this line to your application's Gemfile:
gem 'fluent-plugin-cloudwatch-ingest'
And then execute:
$ bundle
Or install it yourself as:
$ gem install fluent-plugin-cloudwatch-ingest
Usage
<source>
@type cloudwatch_ingest
region us-east-1
sts_enabled true
sts_arn arn:aws:iam::123456789012:role/role_in_another_account
sts_session_name fluentd-dev
aws_logging_enabled true
log_group_name_prefix /aws/lambda
log_stream_name_prefix 2017
log_group_exclude_regexp [^A-Z]{1}.* # exclude log groups that start with a captial
state_file_name /mnt/nfs/cloudwatch.state
interval 60
max_log_streams_per_group 50
error_interval 5 # Time to wait between error conditions before retry
get_log_events_interval 0.0 # Time to pause between get_log_events to reduce throttle error
limit_events 10000 # Number of events to fetch in any given iteration
event_start_time 0 # Do not fetch events before this time (UNIX epoch, miliseconds)
oldest_logs_first false # When true fetch the oldest logs first
drop_blank_events true # Fluentd may throw an exception if a blank event is emitted
telemetry false # Produce statsd telemetry
statsd_endpoint localhost # Endpoint to which telemetry should be sent
<parse>
@type cloudwatch_ingest
expression /^(?<message>.+)$/
time_format %Y-%m-%d %H:%M:%S.%L
event_time true # take time from the Cloudwatch event, rather than parse it from the body
inject_group_name true # inject the group name into the record
inject_stream_name true # inject the stream name into the record
inject_cloudwatch_ingestion_time field_name # inject the iso8601 `ingestion_time` as returned by the Cloudwatch API
inject_plugin_ingestion_time field_name # inject the iso8601 time at which the plugin ingested the event
parse_json_body false # Attempt to parse the body as json and add structured fields from the result
fail_on_unparsable_json false # If the body cannot be parsed as json do not ingest the record
telemetry false # Produce statsd telemetry
statsd_endpoint localhost # Endpoint to which telemetry should be sent
</parse>
</source>
Authentication
The plugin will assume an IAM instance role. Without either of the sts_*
options that role will be used for authentication. With those set the plugin will
attempt to sts:AssumeRole
the sts_arn
. This is useful for fetching logs from many accounts where the fluentd infrastructure lives in one single account.
Prefixes
Both the log_group_name_prefix
and log_stream_name_prefix
may be omitted, in which case all groups and streams will be ingested. For performance reasons it is often desirable to set the log_stream_name_prefix
to be today's date, managed by a configuration management system.
State file
The state file is a YAML serialization of the current ingestion state. When running in a HA configuration this should be placed on a shared filesystem, such as EFS. The state file is opened with an exclusive write call and as such also functions as a lock file in HA configurations. See below.
HA Setup
When the state file is located on a shared filesystem an exclusive write lock will attempted each interval
.
As such it is safe to run multiple instances of this plugin consuming from the same CloudWatch logging source without fear of duplication, as long as they share a state file.
In a properly configured auto-scaling group this provides for uninterrupted log ingestion in the event of a failure of any single node.
JSON parsing
With the parse_json_body
option set to true
the plugin will attempt to parse the body of the log entry as JSON. If this is successful any field/value pairs found will be added to the emitted record as structured fields.
If fail_on_unparsable_json
is set to true
a record body consisting of malformed json will cause the record to be rejected. You may wish to leave this setting as false if the plugin is ingesting multiple log groups with a mixture of json/structured and unstructured content.
The expression
is applied before JSON parsing is attempted. One may therefore extract a JSON fragment from within the event body if it is decorated with additional free-form text.
High volume Log Groups
If you're having ingestion problems from high volume log groups you're advised to enable telemetry in both the main plugin and the parser, and to also set both inject_cloudwatch_ingestion_time
and inject_plugin_ingestion_time
to true
.
This will enable your telemetry system to plot the state of your rate limiting, the effect of the ingestion delay inside Cloudwatch Logs (timestamp
vs ingestion_time
) and take appropriate tuning action.
Telemetry
With telemetry
set to true
and a valid statsd_endpoint
the plugin will emit telemetry in statsd format to 8125:UDP. It is up to you to configure your statsd-speaking daemon to add any prefix or tagging that you might want.
The metrics emitted in this version are:
api.calls.describeloggroups.attempted
api.calls.describeloggroups.failed
api.calls.describelogstreams.attempted
api.calls.describelogstreams.failed
api.calls.describeloggroups.excluded # due to log_group_exclude_regexp
api.calls.getlogevents.attempted
api.calls.getlogevents.failed
api.calls.getlogevents.invalid_token
events.emitted.success
events.emitted.blocked
Likewise when telemetry is enabled for the parser, the emitted metrics are:
parser.record.attempted
parser.record.success
parser.json.success # if json parsing is enabled
parser.json.failed # if json parsing is enabled
parser.ingestion_skew # the difference between `timestamp` and `ingestion_time` as returned by the Cloudwatch API
parser.plugin_skew # the difference between "now" and `timestamp`
Sub-second timestamps
When using event_time true
the @timestamp
field for the record is taken from the time recorded against the event by Cloudwatch. This is the most common mode to run in as it's an easy path to normalization: all of your Lambdas or other AWS service need not have the same, valid, time_format
nor a regex that matches every case.
If your output plugin supports sub-second precision (and you're running fluentd 0.14.x) you'll "enjoy" sub-second precision.
Elasticsearch
It is a common pattern to use fluentd alongside the fluentd-plugin-elasticsearch plugin, either directly or via fluent-plugin-aws-elasticsearch-service, to ingest logs into Elasticsearch.
Prior to version 1.9.5 there was a bug within that plugin which, via an unwise cast, caused records without a named timestamp field to be cast to DateTime
, losing the precision. This PR: uken/fluent-plugin-elasticsearch#249 fixed that issue.
IAM
IAM is a tricky and often bespoke subject. Here's a starter that will ingest all of the logs for all of your Lambdas in the account in which the plugin is running:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
"logs:DescribeMetricFilters",
"logs:FilterLogEvents",
"logs:GetLogEvents"
],
"Resource": [
"arn:aws:logs:eu-west-1:123456789012:log-group:/aws/lambda/*:*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:DescribeLogGroups",
],
"Resource": [
"arn:aws:logs:eu-west-1:123456789012:log-group:*:*"
]
}
]
}
Cross-account authentication
Broadly speaking the IAM instance role of the host on which the plugin is running needs to be able to sts:AssumeRole
the sts_arn
(and obviously needs sts_enabled
to be true).
The assumed role should look more-or-less like that above in terms of the actions and resource combinations required.
Development
After checking out the repo, run bin/setup
to install dependencies. Then, run rake spec
to run the tests. You can also run bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
. To release a new version, update the version number in version.rb
, and then run bundle exec rake release
, which will create a git tag for the version, push git commits and tags, and push the .gem
file to rubygems.org.
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/sampointer/fluent-plugin-cloudwatch-ingest.