Project

turbine

0.01
No commit activity in last 3 years
No release in over 3 years
Performance-oriented stream processing built on Kafka and Zookeeper
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
 Dependencies

Development

~> 1.9
~> 10.0
~> 3.2

Runtime

 Project Readme

Turbine

Gem Version Build Status Code Climate Coverage Status

Fault-tolerant multithreaded stream processing for Ruby.

Turbine is a performance-oriented stream processing library built on Zookeeper. It presently supports Kafka as a message queue, but is designed to be pluggable in order to potentially support other message queues in the future.

Turbine is not a job queue and is missing most of the features you'd expect from a job queue by design. Turbine is designed to be small, simple, and fast.

Installation

Add these lines to your application's Gemfile:

gem "turbine"

And then execute:

$ bundle

Or install it yourself as:

$ gem install turbine

Getting Started

Please see the Quickstart on the Wiki for a quick tutorial on writing your first stream processing job using Turbine.

Usage

Turbine presently supports stream processing from the Kafka message queue using the poseidon_cluster gem, which implements self-rebalancing Consumer Groups.

To create a new Kafka consumer for a topic, do the following:

require "turbine"
require "turbine/consumer/kafka"

consumer = Turbine::Consumer::Kafka.new(
  "my-group",                               # Group name
  ["kafka1.host:9092", "kafka2.host:9092"], # Kafka brokers
  ["zk1.host:2181",    "zk2.host:2181"],    # Zookeeper hosts
  "my-topic"                                # Topic name
)

processor = Turbine::Processor.new(min_threads: 5, max_threads: 5, max_queue: 1000)

processor.process(consumer) do |msg|
   ...
end

Error Handling

By default, Turbine prints exceptions that occur during message processing to STDERR. Chances are, you'd probably rather log them to an exception logging service.

After creating a processor object, you can configure a custom exception handler so you can log these exceptions to a more appropriate place than STDERR:

processor = Turbine::Processor.new(min_threads: 5, max_threads: 5, max_queue: 1000)

processor.error_handler do |ex, _msg|
  MyBugHandler.notify_or_ignore(ex)
end

processor.process(consumer) do |msg|
   ...
end

Semantics (PLEASE READ)

Turbine automatically reschedules processing of messages in the stream in the event of faults or rebalancing of resources. Because of this, the same message may be received multiple times. Stream processing jobs written in Turbine MUST account for this.

An example of where things could go wrong is a "counter" job. Imagine we look for a particular event and increment a counter in statsd/memcached/redis. This will not give accurate numbers, because message replays will spuriously increment the counter.

The contract of Turbine is as follows:

  • Turbine messages are guaranteed to be delivered AT LEAST once but Turbine MAY replay the same message many times
  • Because of this, stream processing jobs written in Turbine MUST be idempotent (i.e. repeat processing of the same message is gracefully tolerated)

Development

After checking out the repo, run bin/setup to install dependencies. Then, run bin/console for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release to create a git tag for the version, push git commits and tags, and push the .gem file to rubygems.org.

Contributing

  • Fork this repository on github
  • Make your changes and send us a pull request
  • If we like them we'll merge them
  • If we've accepted a patch, feel free to ask for commit access

License

Copyright (c) 2015 Tony Arcieri. Distributed under the MIT License. See LICENSE.txt for further details.