Wukong-Storm
The Hadoop plugin for Wukong lets you run Wukong processors and dataflows as Storm topologies reading data in and out from Kafka.
Before you use Wukong-Storm to develop, test, and write your Hadoop jobs, you might want to read about Wukong, write some simple processors, and read about some of Storm's core concepts.
You might also want to check out some other projects which enrich the Wukong and Hadoop experience:
- wukong-hadoop: Run Wukong processors and dataflows as mappers and/or reducers within the Hadoop framework. Model jobs locally before you run them.
- wukong-load: Load the output data from your local Wukong jobs and flows into a variety of different data stores.
- wukong-deploy: Orchestrate Wukong and other wu-tools together to support an application running on the Infochimps Platform.
Installation & Setup
Wukong-Storm can be installed as a RubyGem:
$ sudo gem install wukong-storm
If you actually want to run your dataflows as functioning Storm topologies reading/writing to/from Kafka, you'll of course need access to Storm and Kafka installations. Ironfan is a great tool for building and managing Storm clusters and other distributed infrastructure quickly and easily.
To run Storm jobs through Wukong-Storm, you'll need to move your your Wukong code to each worker of the Storm cluster, install Wukong-Storm on each, and log in and launch your job fron one of them. Ironfan again helps with configuring this.
Anatomy of a running topology
Storm defines the concept of a topology. A topology contains spouts and bolts. A spout is a source of data. A bolt processes data. Bolts can be connected to each other and to spouts in arbitrary ways.
Tooplogies submitted to Storm's Nimbus but run within a Storm supervisor. Each supervisor can dedicate a certain number of workers to a topology. Within each worker, parallelism controls the number of threads the worker assigns to the topology.
Wukong-Storm runs each Wukong dataflow as a single bolt within a single topology. Data is passed to this bolt over STDIN and collected over STDOUT, similar to the way Hadoop streaming operates.
This topology is hooked up to a
storm.kafka.trident.OpaqueTridentKafkaSpout
(part of
storm-contrib) which
reads from a single input topic within Kafka.
Output records are written to a default Kafka topic but this can be overridden on a per-record basis.
Communication protocol
A Wukong dataflow launched within Storm runs as a single bolt (see
com.infochimps.wukong.storm.SubprocessFunction
).
This bolt works by launching an arbitrary command-line and sending it
records over STDIN and reading its output over STDOUT. The
SubprocessFunction
class expects whatever command it launched to
obey a protocol under which the output after each input consists
of each output record followed by a newline, with the full batch of
output records followed by a batch terminator (default: ---
) then
another newline.
Wukong-Storm comes with a command wu-bolt
which works very similarly
to wu-local
but implements this protocol. Here's an example of
using wu-bolt
directly with a processor:
$ echo 2 | wu-bolt prime_factorizer.rb
2
---
$ echo 12 | wu-bolt prime_factorizer.rb
2
2
3
---
$ echo 19 | wu-bolt prime_factorizer.rb
---
Notice that in the last example, the presence of the batch delimiter after each input record make it easy to tell the difference between "no output records" and "no output records yet" which, over STDIN/STDOUT, is rather hard to tell otherwise.
Running a dataflow
A simple processor
Assuming you have correctly installed Wukong-Storm, Storm, Kafka, Zookeeper, &c., and you have defined a simple dataflow (or in this case, just a single processor) like this:
# in upcaser.rb
Wukong.processor(:upcaser) do
def process line
yield line.upcase
end
end
Then you can launch it directly into Storm:
$ wu-storm upcaser.rb --input=some_input_topic --output=some_output_topic
If a topology named upcaser
already exists, you'll get an error.
Add the --rm
flag to first kill the running topology before
launching the new one:
$ wu-storm upcaser.rb --input=some_input_topic --output=some_output_topic --rm
The default amount of time to wait for the topology to die is 300
seconds (5 minutes), just like the storm kill
command (which is used
under the hood). When debugging a topology in development, it's
helpful to add --wait=1
to immediately kill the topology.
See exactly what happened behind the scenes by adding the --dry_run
flag which will print commands and not execute them:
$ wu-storm upcaser.rb --input=some_input_topic --output=some_output_topic --rm --dry_run
A more complicated example
Say you have a dataflow:
# in my_flow.rb
Wukong.dataflow(:my_flow) do
my_parser | does_something | then_something_else | to_json
end
You can launch it using a different topology name as well as target arbitrary locations for your Zookeeper, Kafka, and Storm servers:
$ wu-storm my_flow.rb --name=my_flow_attempt_3 --zookeeper_hosts=10.121.121.121,10.122.122.122 --kafka_hosts=10.123.123.123 --nimbus_host=10.124.124.124 --input=some_input_topic --output=some_output_topic
Running non-Wukong or non-Ruby code
You can also use Wukong-Storm as a harness to run non-Wukong or
non-Ruby code. As long as you can specificy a command-line to run
which supports the communication protocol, then you can
run it with wu-storm
:
$ wu-storm --bolt_command='my_cmd --some-option=value -af -q 3' --input=some_input_topic --output=some_output_topic
Scaling options
Storm provides several options for scaling up or down a topology. Wukong-Storm makes them accessible at launch time via the following options:
-
--workers
specify the number of workers (a.k.a. "executors" or "slots") for the topology. Defaults to 1. -
--input_parallelism
specify the number of threads within the spout reading from Kafka within each worker. Defaults to 1. -
--parallelism
specify the number of threads within the bolt running Wukong code within each worker. Defaults to 1.