Wukong-Load
This Wukong plugin makes it easy to load data from the command-line into and out of various data stores.
It is assumed that you will independently deploy and configure each data store yourself (but see Ironfan).
## Installation & SetupWukong-Load can be installed as a RubyGem:
$ sudo gem install wukong-load
Wukong-Load provides a command-line program wu-load
you can use to
load data fed in over STDIN into a data store. It's designed to work
effectively with wu-local
and wu-dump
as part of UNIX pipelines.
Get help on wu-load
by running
$ wu-load --help
All input to wu-load
should consist of newline-separated records
over STDIN. For some data stores, JSON-formatted, Hash-like input is
expected. Keys in the record may be interpreted as metadata about the
record or about how to route the record within the data store.
Here are some quick examples which do what you think they do:
$ echo 'an arbitrary line of text' | wu-load kafka --topic=foo
$ echo '{"this": "record", "will": { "be": "indexed"}} | wu-load elasticsearch --index=foo --es_type=bar
$ echo '{"this": "record", "will": { "be": "updated"}, "_id": "existing_id"}' | wu-load elasticsearch --index=foo --es_type=bar
$ echo '{"this": "record", "will": { "be": "indexed"}} | wu-load mongodb --database=foo --collection=bar
$ echo '{"this": "record", "will": { "be": "upserted"}, "_id": "existing_id"}' | wu-load mongodb --database=foo --collection=bar
$ echo '{"this": "record", "is": "indexed"} | wu-load sql --database=foo --table=bar
$ echo '{"this": "record", "is": "upserted"}, "_id": "existing_id"}' | wu-load sql --database=foo --table=bar
Further details and options will depend on the data store you're writing to. See more information on each specific data store below. You can also get help for a specific data store with:
$ wu-load STORE_TYPE --help
Note: The wu-load
program is not designed to handle any
significant scale of data. It is only intended as a convenience
tool for modeling how Wukong dataflows (which can scale) will
interact with data stores.
<a name="wu-dump>
Wukong-Load provides a command-line program wu-dump
you can use to
dump data from a data store to STDOUT. It's designed to work
effectively with wu-local
and wu-load
in UNIX pipelines.
Get help on wu-dump
by running
wu-dump --help
All output from wu-dump
will be newline-separated records over
STDOUT. For some data stores, JSON-formatted, Hash-like output is
produced.
Here are some quick examples which do what you think they do:
$ wu-dump kafka --topic=foo
line 1
line 2
line 3
...
$ wu-dump file --input=/data/foo.tsv
foo a
foo b
foo c
...
$ wu-dump file --input=/data/bar.tsv.gz
bar x
bar y
bar z
...
$ wu-dump file --input=data.zip
data.zip 1 foo a
data.zip 2 foo b
...
data.zip 1 bar x
data.zip 2 bar y
...
$ wu-dump directory --input=/data
/data/foo.tsv 1 foo a
/data/foo.tsv 2 foo b
...
/data/bar.tsv.gz 1 bar x
/data/bar.tsv.gz 2 bar y
...
Further details and options will depend on the data store you're reading from. See more information on each specific data store below. You can also get help for a specific data store with:
$ wu-dump STORE_TYPE --help
Note: The wu-load
program is not designed to handle any
significant scale of data. It is only intended as a convenience
tool for modeling how Wukong dataflows (which can scale) will
interact with data stores.
<a name="wu-sync>
wu-sync
Wukong-Load also provides a program wu-sync
which can be used to
sync data between filesystem like data stores.
wu-sync
itself provides several types of syncs that can be chained
together to create transactional, scalable processing for batch files.
Here's an example which syncs data from an FTP server to S3, providing a per-file transactional guarantee throughout:
$ wu-sync ftp --host=ftp.example.com --path=/remote/data --output=/data/incoming
$ wu-sync prepare --input=/data/incoming --output=/data/received
$ wu-sync prepare --input=/data/incoming --output=/data/received
$ wu-sync s3 --input=/data/received --bucket=s3://example.com/data
Further details and options will depend on the type of sync being performed. See more information on each specific sync below. You can also get help for a specific sync with:
$ wu-sync SYNC_TYPE --help
Kafka uses the concept of "topics" which contain numbered "partitions" that contain a sequence of events concatenated together. Producers (writers) to Kafka dump in events and consumers (readers) request data between particular byte offsets on particular partitions of particular topics. Kafka's data model is very simple (everything is a sequence of bytes) so it can store any kind of data.
Get options for loading data from or dumping data to Kafka:
$ wu-load kafka --help
$ wu-dump kafka --help
Connecting
wu-load
and wu-dump
try to connect to a Kafka broker at a default
host (localhost) and a port (9092). You can change this:
$ cat data.txt | wu-load kafka --host=10.122.123.124 --port=1234
$ wu-dump kafka --host=10.122.123.124 --port=1234
Routing
wu-load
and wu-dump
both assume a default topic (wukong
) and
partition (0), but you can change these:
$ cat data.txt | wu-load kafka --topic=messages --partition=6
$ wu-dump kafka --topic=messages --partition=6
Producing
Writing data to Kafka is simple: pick a topic and partition and push in some bytes:
$ cat data.txt | wu-load kafka --topic=messages
To achieve higher throughput (though, remember, wu-dump
and
wu-load
are not designed for significant load), set the
--batch_size
option up a bit:
$ cat data.txt | wu-load kafka --topic=messages --batch_size=1000
Consuming
Consuming data from Kafka requires picking a topic, partition, and a byte-offset.
It's rare to want to choose a specific byte-offset (typically only occurs in delicate situations that no one wants to be in) but you can do it easily:
$ wu-dump kafka --topic=foo --offset=65536
The usual approach is to consume either from the beginning or end of
available data on a topic & partition. wu-dump
's default behavior
is to start from the end of a topic:
$ wu-dump kafka --topic=foo
but you can also start from the beginning:
$ wu-dump kafka --topic=foo --from_beginning
Another common pattern among Kafka consumers is to choose an offset
that is "wherever you last left off". Kafka does not provide a
mechansim to remember per-consumer last-offsets so it's up to
consumers to do this for themselves. Since wu-dump
is not designed
to run continuously, it does not store "wherever you last left off".
Note:: ElasticSearch support for wu-dump
is not currently
implemented.
ElasticSearch uses the concept of "indices" which contain "types" to store "documents", each of which is a schema-less, Hash-like structure. Every document must also have an ID which can be generated by ElasticSearch at index (write) time.
wu-load
, like many tools that interact with ElasticSearch, uses
JSON-serialization. See full options with
$ wu-load elasticsearch --help
Connecting
wu-load
tries to connect to an Elasticsearch server at a default
host (localhost) and port (9200). You can change these:
$ cat data.json | wu-load elasticsearch --host=10.122.123.124 --port=80
Routing
wu-load
loads each document into default index (wukong
) and type
(streaming_record
), but you can change these:
$ cat data.json | wu-load elasticsearch --index=publication --es_type=book
A record with an _index
or _type
field will override these default
settings. You can change the names of the fields used with the
--index_field
and --es_type_field
options:
$ cat data.json | wu-load elasticsearch --index=industry --es_type=publication --index_field=publisher --es_type_field=classification
Creates vs. Updates
If an input record contains a value for the field _id
then that
value will be as the ID of the document when written, possibly
overwriting a document that already exists -- an update.
You can change the field you use for the Elasticsearch ID property:
$ cat data.json | wu-load elasticsearch --index=media --es_type=books --id_field="ISBN"
Note:: MongoDB support for wu-dump
is not currently implemented.
MongoDB uses the concept of "databases" which contain "collections" to store "documents", each of which is a schema-less, Hash-like structure. Every document must also have an ID which can be generated by MongoDB at insert (write) time.
wu-load
, like many tools that interact with MongoDB, uses
JSON-serialization. See full options with
$ wu-load mongodb --help
Connecting
wu-load
tries to connect to an MongoDB server at a default host
(localhost) and port (27017). You can change these:
$ cat data.json | wu-load mongodb --host=10.122.123.124 --port=1234
Routing
wu-load
loads each document into default database (wukong
) and
collection (streaming_record
), but you can change these:
$ cat data.json | wu-load mongodb --database=publication --collection=book
A record with a _database
or _collection
field will override these
default settings. You can change the names of the fields used with
the --database_field
and --collection_field
options:
$ cat data.json | wu-load mongodb --database=industry --collection=publication --database_field=publisher --collection_field=classification
Creates vs. Updates
If an input record contains a value for the field _id
then that
value will be as the ID of the document when written, possibly
overwriting a document that already exists -- an update.
You can change the field you use for the MongoDB ID property:
$ cat data.json | wu-load mongodb --database=publication --collection=books --id_field="ISBN"
Syncing data between filesystems is always challenging to do well though there are many good tools available.
In a Big Data setting, syncs are even more challenging because of the size of the files involved which exacerbates problems of blocking, atomicity, transactionality, throughput, memory, &c. The best approach is to split the process up into several steps, each of which does one simple operation robustly, correctly, and in a way that can be tracked.
The wu-sync
provides several types of syncs that can be used
together to create transactional, scalable processing for batch files.
Each wu-sync
command is meant to be run on a regular schedule every
few minutes, perhaps via cron
.
All of the wu-sync
sync types accept the --dry_run
option which
will go through the sync showing what would be done but not doing it.
From FTP
The ftp
sync will sync data from an FTP server to a local disk. It
supports several different flavors of
FTP, including
FTPS and
SFTP.
On each invocation, wu-sync ftp
will download any files (or parts of
files) present under --path
on the remote server but not present in
the local --output
directory. Files present on the local filesystem
but not on the remote server will be ignored.
The lftp
program is required for wu-ftp
to
function.
Get general help on wu-sync ftp
with
$ wu-sync ftp --help
Note: Since FTP is conceptually "single-threaded", no throughput
gains are achieved by having multiple processes try and read the same
file on the same FTP server. For this reason, wu-sync ftp
can
be used in a production setting, acting as the fundamental building
block of a more distributed system of which each wu-sync ftp
process
is responsible for some files on some FTP servers.
Specifying FTP protocol, host, and credentials
By default, wu-sync ftp
will try to connect anonymously to an FTP
server running on the local machine. The --protocol
, --host
,
--port
, --username
, and --password
options can be used to
configure this behavior. Here's an example of connecting to a remote
FTP server using a secure FTPS connection:
$ wu-sync ftp --host=ftp.example.com --protocol=ftps --username=bob --password=<password> --output=/data/ftp
You can use the FTP_PASSWORD
environment variable if you don't want
to pass the password on the command-line.
The port is determined automatically from the protocol (e.g. - 21 for
ftp
, 22 for sftp
, 443 for ftps
) but can be explicitly given with
the --port
flag.
Preparing Files for Downstream Consumption
One of the immediate issues that arises when syncing files is that it may not be clear to a client when the file on the remote server the client is connected to has finished uploading. It may also not be clear if the file has finished downloading to the local filesystem.
The approach taken by Wukong is to run a separate "archival" step
which syncs data in a local --input
directory with data in a local
--output
directory. Files will only show up in the --output
directory when they are complete, as measured by them having stopped
growing in the --input
directory.
wu-sync
is not a continuously running process, however, and so the
only way for it to reliably know when files have stopped growing is
for it to store file sizes from one invocation in order to compare
them to file sizes on the next invocation. For this reason, creating
files in the --output
directory requires at at least two
invocations of wu-sync
(with the same set of parameters).
Here's an example of the --input
directory /data/ftp
:
/data/ftp
├── alice
│ └── project_1
│ └── file_1
├── bob
│ ├── project_1
│ │ ├── file_1
│ │ ├── file_2
│ │ └── file_3
│ └── project_2
│ ├── file_1
│ └── file_2
└── README
After running wu-sync prepare
twice
$ wu-sync prepare --input=/data/ftp --output=/data/clean
$ wu-sync prepare --input=/data/ftp --output=/data/clean
the --output
directory /data/clean
should look exactly the same
as the input directory:
/data/clean
├── alice
│ └── project_1
│ └── file_1
├── bob
│ ├── project_1
│ │ ├── file_1
│ │ ├── file_2
│ │ └── file_3
│ └── project_2
│ ├── file_1
│ └── file_2
└── README
Nothing here seems magical, but the log output of wu-sync prepare
reveals a lot that's going on to ensure that only complete files are
processed.
Note: The files created in the --output
directory are
hardlinks pointing at the
original files in the --input
directory.
Splitting input files
The --split
option will make wu-sync prepare
split large files in
the --input
directory into many manageable, smaller files in the
--output
directory.
Assume that Alice has been busy and that the file
/data/clean/alice/project_1/file_1
is big. After running wu-sync prepare
twice:
$ wu-sync prepare --input=/data/ftp --output=/data/clean --split
$ wu-sync prepare --input=/data/ftp --output=/data/clean --split
the --output
directory /data/clean
will look like this:
/data/clean
├── alice
│ └── project_1
│ ├── file_1.part-0000
│ ├── file_1.part-0001
│ └── file_1.part-0002
├── bob
│ ├── project_1
│ │ ├── file_1.part-0000
│ │ ├── file_2.part-0000
│ │ └── file_3.part-0000
│ └── project_2
│ ├── file_1.part-0000
│ └── file_2.part-0000
└── README.part-0000
Alice's big --input
file has been split into three --output
files.
Bob's --input
files were smaller than the split size so they each
resulted in a single --output
file.
You can change the number of lines in each file split with the
--lines
option and switch to splitting by bytes with the --bytes
option.
Note: When splitting, the files in the --output
directory are
real files instead of hardlinks as they are by default.
Ordering output files
Some tools require their input to be ordered and for these tools
wu-sync prepare
provides an --ordered
option which will reorganize
files in the --output
directory so that they are totally ordered:
$ wu-sync prepare --input=/data/ftp --output=/data/clean --ordered
$ wu-sync prepare --input=/data/ftp --output=/data/clean --ordered
This would result in the following structure for the --output
directory:
/data/clean
├── alice
│ └── 2013
│ └── 09
│ └── 20
│ └── 20130920-071142-1-alice-project_1-file_1
├── bob
│ └── 2013
│ └── 09
│ └── 20
│ ├── 20130920-071142-2-bob-project_1-file_1
│ ├── 20130920-071142-3-bob-project_1-file_2
│ ├── 20130920-071142-4-bob-project_1-file_3
│ ├── 20130920-071142-5-bob-project_2-file_1
│ └── 20130920-071142-6-bob-project_2-file_2
└── root
└── 2013
└── 09
└── 20
└── 20130920-071142-0-README
The ordering is built up from:
-
Each top-level subdirectory of the
--input
directory appears in the root of the--output
directory. (Files that were in the--input
directory itself are put in theroot
subdirectory of the--output
directory.) -
Within each top-level subdirectory, a daily directory is created based on the time the corresponding
--input
file was recognized as completed. -
All files within the top-level subdirectory of the
--input
directory are placed within this daily directory with basenames constructed froma. the time the corresponding
--input
file was recognized as completed b. an incrementing counter c. the path of the--input
file relative to the--input
directory
The --ordered
option can be combined with the --split
option:
$ wu-sync prepare --input=/data/ftp --output=/data/clean --ordered
$ wu-sync prepare --input=/data/ftp --output=/data/clean --ordered
to get
/data/clean
├── alice
│ └── 2013
│ └── 09
│ └── 20
│ ├── 20130920-071643-1-alice-project_1-file_1.part-0000
│ ├── 20130920-071643-1-alice-project_1-file_1.part-0001
│ └── 20130920-071643-1-alice-project_1-file_1.part-0002
├── bob
│ └── 2013
│ └── 09
│ └── 20
│ ├── 20130920-071643-2-bob-project_1-file_1.part-0000
│ ├── 20130920-071643-3-bob-project_1-file_2.part-0000
│ ├── 20130920-071643-4-bob-project_1-file_3.part-0000
│ ├── 20130920-071643-5-bob-project_2-file_1.part-0000
│ └── 20130920-071643-6-bob-project_2-file_2.part-0000
└── root
└── 2013
└── 09
└── 20
└── 20130920-071643-0-README.part-0000
Including metadata
The --metadata
option produces a JSON-formatted metadata file for
each data file produced in the --output
directory.
Metadata files are stored in a separate hierarchy in the --output
directory from the data files. Within this separate hierarchy they
have the same relative path as their corresponding data files, but
with an extra suffix of .meta
. Here's an example:
$ wu-sync prepare --input=/data/ftp --output=/data/clean --metadata
$ wu-sync prepare --input=/data/ftp --output=/data/clean --metadata
which produces
/data/clean
├── alice
│ └── 2013
│ └── 09
│ └── 20
│ └── 20130920-072539-1-alice-project_1-file_1
├── alice_meta
│ └── 2013
│ └── 09
│ └── 20
│ └── 20130920-072539-1-alice-project_1-file_1.meta
├── bob
│ └── 2013
│ └── 09
│ └── 20
│ ├── 20130920-072539-2-bob-project_1-file_1
│ ├── 20130920-072539-3-bob-project_1-file_2
│ ├── 20130920-072539-4-bob-project_1-file_3
│ ├── 20130920-072539-5-bob-project_2-file_1
│ └── 20130920-072539-6-bob-project_2-file_2
├── bob_meta
│ └── 2013
│ └── 09
│ └── 20
│ ├── 20130920-072539-2-bob-project_1-file_1.meta
│ ├── 20130920-072539-3-bob-project_1-file_2.meta
│ ├── 20130920-072539-4-bob-project_1-file_3.meta
│ ├── 20130920-072539-5-bob-project_2-file_1.meta
│ └── 20130920-072539-6-bob-project_2-file_2.meta
├── root
│ └── 2013
│ └── 09
│ └── 20
│ └── 20130920-072539-0-README
└── root_meta
└── 2013
└── 09
└── 20
└── 20130920-072539-0-README.meta
Notice that the --metadata
option implies the --ordered
option.
This is so metadata files are guaranteed to come later than their
corresponding data file in any lexicographically ordered walking of
the --output
directory.
This is so that syncing tools will transfer a metadata file after
they transfer its corresponding data file. Since metadata files are
also small, this means that downstream tools can use the presence or
absence of a metadata file to know for sure whether a data file has
already been transferred completely -- they won't have to do the dance
that wu-sync prepare
is doing.
The content of a metadata file is very simple. Here's
/data/clean/alice_meta/2013/09/20/20130920-072539-1-alice-project_1-file_1.meta
:
{
"path": "alice/2013/09/20/20130920-072539-1-alice-project_1-file_1",
"meta_path": "alice_meta/2013/09/20/20130920-072539-1-alice-project_1-file_1.meta",
"size": 168894,
"md5": "0a61f0919f546ce04fc119b028b88a2e"
}
Both path
and meta_path
are relative to the --output
directory.
The --metadata
option can also be combined with the --split
option.
Multiple Output Directories
Using multiple --output
directories mounted on different devices can
greatly speed up operation as large files are read & written. When
using multiple --output
directories, each consecutively processed
file in the --input
directory will be assigned one of the output
directories in a round-robin fashion:
$ wu-sync prepare --input=/data/ftp --output=/data/clean_1,/data/clean_2
$ wu-sync prepare --input=/data/ftp --output=/data/clean_1,/data/clean_2
To S3
The s3
sync will sync data from a local --input
directory to an
S3 --bucket
and path. It requires the
s3cmd
program.
Here's an example.
$ wu-sync s3 --input=/data/clean --bucket=s3://example.com/archive
This example assumes that the underlying s3cmd
has been installed
with appropriate credentials to write to the s3://example.com
bucket
(or that the bucket s3://example.com
is world-writable). If not, an
s3cmd
configuration file can be passed in with the --s3cmd_config
file:
$ wu-sync s3 --input=/data/clean --bucket=s3://example.com/archive --s3cmd_config=config/s3cfg
The wu-sync s3
command also works with multiple --input
directories. This is to work nicely with the wu-sync prepare
command which has multiple --output
directories:
$ wu-sync s3 --input=/data/clean_1,/data/clean_2 --bucket=s3://example.com/archive --s3cmd_config=config/s3cfg
Note: This functionality is designed to work in the context of a deploy pack only.
Syncing several FTP sources is possibly with wu-sync
but it's a lot
to type. The wu-sync-all
command can read a configuration file with
pre-defined data sources and run each of the sync types on each of the
sources (or just some of them).
Within one of the configuration files in your deploy pack (either
config/settings.yml
or an environment-specific
config/environments/ENVIRONMENT.yml
) create a listener for each of
your souces as follows:
---
listeners:
nasa:
ftp:
host: ftp.nasa.gov
username: narmstrong
password: first!
path: /data/latest
prepare:
ordered: true
metadata: true
s3:
bucket: s3://archive.example.com/nasa
usaf:
ftp:
host: ftp.usaf.gov
username: bobross
password: xxx
path: /data/latest
prepare:
split: true
lines: 1_000_000
s3:
bucket: s3://archive.example.com/usaf
...
The top-level keys in the listeners
Hash (nasa
, usaf
, &c.) are
each the name of a data source. The next-level keys (ftp
,
prepare
, s3
) each name a sync type and give the options for that
type. These options are exactly the same as the usual options for
that sync-type. Options not supplied via the configuration file
(typically "system" parameters like --input
and --output
directories) are expected to be supplied on the command-line at
runtime.
With a proper configuration with valid listeners, the following commands will perform a sync from FTP to S3 of all data sources:
$ wu-sync-all ftp --output=/data/incoming
$ wu-sync-all prepare --input=/data/incoming --output=/data/received
$ wu-sync-all prepare --input=/data/incoming --output=/data/received
$ wu-sync-all s3 --input=/data/received
The --only
and --except
options can be used to limit
wu-sync-all
's action to a desired subset of sources:
$ wu-sync-all ftp --output=/data/incoming --only=nasa,usaf
$ wu-sync-all ftp --output=/data/incoming --except=wto