No commit activity in last 3 years
No release in over 3 years
Fluent input plugin for azure queue input
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
 Dependencies

Development

>= 1.3.3
>= 0.9.2
>= 3.0.8

Runtime

>= 1.8
< 1.1.0, >= 1.0.1
< 1.1.0, >= 1.0.1
< 0.14, >= 0.12.2
 Project Readme

A Fluentd plugin to read from azure queues and event hubs

Build Status

This gem consists of two fluentd input plugins, azure_queue and azure_event_hub_capture. The azure queue input plugin performs at about 30 messages/second in my tests. If you need more throughput from event hubs, I suggest using the event hub capture plugin.

Dependencies

fluentd v.12

azure_queue Input Plugin

Input: Configuration

<source>
  @type azure_queue

  tag queue_input
  storage_account_name my_storage_account
  storage_access_key my_storage_access_key
  queue_name my_storage_queue
  fetch_interval 5
  lease_duration 30
</source>

tag (required)

The tag for the input

storage_account_name (required)

The storage account name

storage_access_key (required)

The storage account access key

queue_name (required)

The storage queue name

message_key

The the record key to put the message data into. Default 'message'

lease_duration

The time to lease the messages for. Default 300

max_fetch_threads

The maximum number of threads to fetch and delete queue messages with. Default 30

Integration with Azure Event Hub

You can use an azure function to forward messages from event hubs to storage queues for easy ingestion by this gem. This is not recommended for high volumes, but should serve as a stop gap until a complete azure event hub gem is created.

using System;
using Microsoft.WindowsAzure.Storage.Queue;

public static void Run(string[] hubMessages, ICollector<string> outputQueue, TraceWriter log)
{
    foreach (string message in hubMessages)
    {
        int bytes = message.Length * sizeof(Char);
        if (bytes < 64000)
        {
            outputQueue.Add(message);
        }
        else
        {
            log.Warning($"Message is larger than 64k with {bytes} bytes. Dropping message");
        }
    }
}

azure_event_hub_capture Input Plugin

This plugin is designed to work with blobs stored to a container via Azure Event Hubs Capture

Warning: This plugin will delete the blobs after emitting the contents in fluentd.

Input: Configuration

<source>
  @type azure_event_hub_capture

  tag event_hub_input
  storage_account_name my_storage_account
  storage_access_key my_storage_access_key
  container_names my_capture_container
  fetch_interval 30
  lease_duration 30
</source>

tag (required)

The tag for the input

storage_account_name (required)

The storage account name

storage_access_key (required)

The storage account access key

container_names (required)

The capture container names, comma separated.

message_key

The the record key to put the message data into. Default 'message'

fetch_interval

The time in seconds to sleep between fetching the blob list. Default 30

lease_duration

The time to lease the messages for. Default 60

queue_lease_time

The time to lease the queue message for. Default 60

queue_name

If set, get the blob names from a storage queue. Good if there are many blobs because listblobs is not reliable.

Integration with Azure Event Hub

Use this azure function if you want to ingest event hub capture blob names from a storage queue.

// Blob trigger binding to a CloudBlockBlob
#r "Microsoft.WindowsAzure.Storage"

using Microsoft.WindowsAzure.Storage.Blob;

public static void Run(CloudBlockBlob myBlob, out BlobIdentifier outputQueueItem, TraceWriter log)
{
    log.Info($"C# Blob trigger function Processed blob Name:{myBlob.Name} URI:{myBlob.StorageUri}");
    BlobIdentifier blobId = new BlobIdentifier { Name = myBlob.Name, Container = myBlob.Container.Name };
    //outputQueueItem.add(blobId);
    outputQueueItem = blobId;
}


public class BlobIdentifier
{
    public string Container { get; set; }
    public string Name { get; set; }
}