Logstash Codec - Avro Schema Registry
v1.2.0
This plugin is used to serialize Logstash events as Avro datums, as well as deserializing Avro datums into Logstash events.
Decode/encode Avro records as Logstash events using the associated Avro schema from a Confluent schema registry. (https://github.com/confluentinc/schema-registry)
Installation
logstash-plugin install logstash-codec-avro_schema_registry
Decoding (input)
When this codec is used to decode the input, you may pass the following options:
-
endpoint
- always required. -
username
- optional. -
password
- optional. -
tag_on_failure
- tag events with_avroparsefailure
when decode fails
If the input stream is binary encoded, you should use the ByteArrayDeserializer
in the Kafka input config.
Encoding (output)
This codec uses the Confluent schema registry to register a schema and encode the data in Avro using schema_id lookups.
When this codec is used to encode, you may pass the following options:
-
endpoint
- always required. -
username
- optional. -
password
- optional. -
schema_id
- when provided, no other options are required. -
subject_name
- required when there is noschema_id
. -
schema_version
- when provided, the schema will be looked up in the registry. -
schema_uri
- when provided, JSON schema is loaded from URL or file. -
schema_string
- required when there is noschema_id
,schema_version
orschema_uri
-
check_compatibility
- will check schema compatibility before encoding. -
register_schema
- will register the JSON schema if it does not exist. -
binary_encoded
- will output the encoded event as a ByteArray. Requires theByteArraySerializer
to be set in the Kafka output config. -
base64_encoded
- will output the encoded event as a base64 string. -
client_certificate
- Client TLS certificate for mutual TLS -
client_key
- Client TLS key for mutual TLS -
ca_certificate
- CA Certificate -
verify_mode
- SSL Verify modes. Valid options areverify_none
,verify_peer
,verify_client_once
, andverify_fail_if_no_peer_cert
. Default isverify_peer
Usage
Basic usage with Kafka input and output.
input {
kafka {
...
codec => avro_schema_registry {
endpoint => "http://schemas.example.com"
}
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}
filter {
...
}
output {
kafka {
...
codec => avro_schema_registry {
endpoint => "http://schemas.example.com"
subject_name => "my_kafka_subject_name"
schema_uri => "/app/my_kafka_subject.avsc"
register_schema => true
}
value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}
}
Using signed certificate for registry authentication
output {
kafka {
...
codec => avro_schema_registry {
endpoint => "http://schemas.example.com"
schema_id => 47
client_key => "./client.key"
client_certificate => "./client.crt"
ca_certificate => "./ca.pem"
verify_mode => "verify_peer"
}
value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}
}