DataCollector
Convenience module to Extract, Transform and Load data in a Pipeline. The 'INPUT', 'OUTPUT' and 'FILTER' object will help you to read, transform and output your data. Support objects like CONFIG, LOG, ERROR, RULES help you to write manageable rules to transform and log your data. Include the DataCollector::Core module into your application gives you access to these objects.
include DataCollector::Core
Every object can be used on its own.
DataCollector Objects
Pipeline
Allows you to create a simple pipeline of operations to process data. With a data pipeline, you can collect, process, and transform data, and then transfer it to various systems and applications.
You can set a schedule for pipelines that are triggered by new data, specifying how often the pipeline should be executed in the ISO8601 duration format. The processing logic is then executed.
methods:
- .new(options): options can be schedule in ISO8601 duration format and name
- options:
- name: pipeline name
- schedule: ISO8601 duration format
- cron: in cron format ex. '1 12 * * *' intervals are not supported
- uri: a directory/file to watch
- xml_typecast: true/false -> convert convert string values to TrueClass, FalseClass, Time, Date, and DateTime
- options:
- .run: start the pipeline. blocking if a schedule is supplied
- .stop: stop the pipeline
- .pause: pause the pipeline. Restart using .run
- .running?: is pipeline running
- .stopped?: is pipeline not running
- .paused?: is pipeline paused
- .name: name of the pipe
- .run_count: number of times the pipe has ran
- .on_message: handle to run every time a trigger event happens
example:
#create a pipeline scheduled to run every 10 minutes
pipeline = Pipeline.new(schedule: 'PT10M')
pipeline.on_message do |input, output|
data = input.from_uri("https://dummyjson.com/comments?limit=10")
# process data
end
pipeline.run
#create a pipeline scheduled to run every morning at 06:00 am
pipeline = Pipeline.new(cron: '0 6 * * *')
pipeline.on_message do |input, output|
data = input.from_uri("https://dummyjson.com/comments?limit=10")
# process data
end
pipeline.run
#create a pipeline to listen and process files in a directory
extract = DataCollector::Pipeline.new(name: 'extract', uri: 'file://./data/in')
extract.on_message do |input, output, filename|
data = input.from_uri("file://#{filename}")
# process data
end
extract.run
input
The input component is part of the processing logic. All data is converted into a Hash, Array, ... accessible using plain Ruby or JSONPath using the filter object.
The input component can fetch data from various URIs, such as files, URLs, directories, queues, ...
For a push input component, a listener is created with a processing logic block that is executed whenever new data is available.
A push happens when new data is created in a directory, message queue, ...
from_uri(source, options = {:raw, :content_type, :headers, :cookies})
- source: an uri with a scheme of http, https, file, amqp
- options:
- raw: boolean do not parse
- content_type: string force a content_type if the 'Content-Type' returned by the http server is incorrect
- headers: request headers
- cookies: session cookies etc.
- method: http verb one of [GET, POST] defaul('GET')
- body: http post body
example:
# read from an http endpoint
input.from_uri("http://www.libis.be")
input.from_uri("file://hello.txt")
input.from_uri("http://www.libis.be/record.jsonld", content_type: 'application/ld+json')
input.from_uri("https://www.w3.org/TR/rdf12-turtle/examples/example1.ttl")
input.from_uri("https://dbpedia.org/sparql", body: "query=SELECT * WHERE {?sub ?pred ?obj} LIMIT 10", method:"POST", headers: {accept: "text/turtle"})
input.from_uri(StringIO.new(File.read('myrecords.xml')), content_type: 'application/xml' )
# read data from a RabbitMQ queue
listener = input.from_uri('amqp://user:password@localhost?channel=hello&queue=world')
listener.on_message do |input, output, message|
puts message
end
listener.run
# read data from a directory
listener = input.from_uri('file://this/is/directory')
listener.on_message do |input, output, filename|
puts filename
end
listener.run
Inputs can be JSON, XML or CSV or XML in a TAR.GZ file
listener from input.from_uri(directory|message queue)
When a listener is defined that is triggered by an event(PUSH) like a message queue or files written to a directory you have these extra methods.
- .run: start the listener. blocking if a schedule is supplied
- .stop: stop the listener
- .pause: pause the listener. Restart using .run
- .running?: is listener running
- .stopped?: is listener not running
- .paused?: is listener paused
- .on_message: handle to run every time a trigger event happens
output
Output is an object you can store key/value pairs that needs to be written to an output stream.
output[:name] = 'John'
output[:last_name] = 'Doe'
# get all keys from the output object
output.keys
output.key?(:name)
output.each do |k,v|
puts "#{k}:#{v}"
end
# add hash to output
output << { age: 22 }
puts output[:age]
# # 22
# add array to output
output << [1,2,3,4]
puts output.keys
# # datap
puts output['datap']
# # [1, 2, 3, 4]
Write output to a file, string use an ERB file as a template example: test.erb
<names>
<combined><%= data[:name] %> <%= data[:last_name] %></combined>
<%= print data, :name, :first_name %>
<%= print data, :last_name %>
</names>
will produce
<names>
<combined>John Doe</combined>
<first_name>John</first_name>
<last_name>Doe</last_name>
</names>
Into a variable
result = output.to_s("test.erb")
#template is optional
result = output.to_s
Into a file
output.to_uri("file://data.xml", {template: "test.erb", content_type: "application/xml"})
#template is optional
output.to_uri("file://data.json", {content_type: "application/json"})
Into a tar file stored in data
# create a tar file with a random name
data = output.to_uri("file://data.json", {content_type: "application/json", tar:true})
#choose
data = output.to_uri("file://./test.json", {template: "test.erb", content_type: 'application/json', tar_name: "test.tar.gz"})
Other output methods
output.raw
output.clear
output.to_xml(template: 'test.erb', root: 'record') # root defaults to 'data'
output.to_json
output.flatten
output.crush
output.keys
Into a temp directory
output.to_tmp_file("test.erb","directory")
filter
filter data from a hash using JSONPath
filtered_data = filter(data, "$..metadata.record")
rules
The RULES objects have a simple concept. Rules exist of 3 components:
- a destination tag
- a jsonpath filter to get the data
- a lambda to execute on every filter hit
TODO: work in progress see test for examples on how to use
RULE_SET
RULES*
FILTERS*
LAMBDA*
SUFFIX
Examples
Here you find different rule combination that are possible
RULE_SETS = {
'rs_only_filter' => {
'only_filter' => "$.title"
},
'rs_only_text' => {
'plain_text_tag' => {
'text' => 'hello world'
}
},
'rs_text_with_suffix' => {
'text_tag_with_suffix' => {
'text' => ['hello_world', {'suffix' => '-suffix'}]
}
},
'rs_map_with_json_filter' => {
'language' => {
'@' => {'nl' => 'dut', 'fr' => 'fre', 'de' => 'ger', 'en' => 'eng'}
}
},
'rs_hash_with_json_filter' => {
'multiple_of_2' => {
'@' => lambda { |d| d.to_i * 2 }
}
},
'rs_hash_with_multiple_json_filter' => {
'multiple_of' => [
{'@' => lambda { |d| d.to_i * 2 }},
{'@' => lambda { |d| d.to_i * 3 }}
]
},
'rs_hash_with_json_filter_and_suffix' => {
'multiple_of_with_suffix' => {
'@' => [lambda {|d| d.to_i*2}, 'suffix' => '-multiple_of_2']
}
},
'rs_hash_with_json_filter_and_multiple_lambdas' => {
'multiple_lambdas' => {
'@' => [lambda {|d| d.to_i*2}, lambda {|d| Math.sqrt(d.to_i) }]
}
},
'rs_hash_with_json_filter_and_option' => {
'subjects' => {
'$..subject' => [
lambda {|d,o|
{
doc_id: o['id'],
subject: d
}
}
]
}
}
rules.run can have 4 parameters. First 3 are mandatory. The last one options can hold data static to a rule set or engine directives.
List of engine directives:
- _no_array_with_one_element: defaults to false. if the result is an array with 1 element just return the element.
example:
# apply RULESET "rs_hash_with_json_filter_and_option" to data
include DataCollector::Core
output.clear
data = {'subject' => ['water', 'thermodynamics']}
rules_ng.run(RULE_SETS['rs_hash_with_json_filter_and_option'], data, output, {'id' => 1})
Results in:
{
"subjects":[
{"doc_id":1,"subject":"water"},
{"doc_id":1,"subject":"thermodynamics"}
]
}
config
config is an object that points to "config.yml" you can read and/or store data to this object.
read
config[:active]
write
config[:active] = false
log
Log to stdout
log("hello world")
error
Log an error to stdout
error("if you have an issue take a tissue")
logger
Logs are by default written to Standard OUT. If you want to change where to log to.
f = File.open('/tmp/data.log', 'w')
f.sync = true # do not buffer
# add multiple log outputs
logger(STDOUT, f)
#write to both STDOUT and /tmp/data.log
log('Hello world')
Example
Input data test.csv
sequence, data
1, apple
2, banana
3, peach
Output template test.erb
<data>
<% data[:record].each do |d| %>
<record sequence="<%= d[:sequence] %>">
<%= print d, :data %>
</record>
<% end %>
</data>
require 'data_collector'
include DataCollector::Core
data = input.from_uri('file://test.csv')
data.map{ |m| m[:sequence] *=2; m }
output[:record]=data
puts output.to_s('test.erb')
Should give as output
<data>
<record sequence="11">
<data> apple</data>
</record>
<record sequence="22">
<data> banana</data>
</record>
<record sequence="33">
<data> peach</data>
</record>
</data>
You can provide options to input.from_uri for better reading CSV formats these are the same the Ruby CSV class
Loading a CSV file with ; as the row seperator
i = input.from_uri('https://support.staffbase.com/hc/en-us/article_attachments/360009197031/username.csv', col_sep: ';')
Installation
Add this line to your application's Gemfile:
gem 'data_collector'
And then execute:
$ bundle
Or install it yourself as:
$ gem install data_collector
Usage
require 'data_collector'
include DataCollector::Core
# including core gives you a pipeline, input, output, filter, config, log, error object to work with
RULES = {
'title' => '$..vertitle'
}
#create a PULL pipeline and schedule it to run every 5 seconds
pipeline = DataCollector::Pipeline.new(schedule: 'PT5S')
pipeline.on_message do |input, output|
data = input.from_uri('https://services3.libis.be/primo_artefact/lirias3611609')
rules.run(RULES, data, output)
#puts JSON.pretty_generate(input.raw)
puts JSON.pretty_generate(output.raw)
output.clear
if pipeline.run_count > 2
log('stopping pipeline after one run')
pipeline.stop
end
end
pipeline.run
Development
After checking out the repo, run bin/setup
to install dependencies. Then, run rake test
to run the tests. You can also run bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
. To release a new version, update the version number in version.rb
, and then run bundle exec rake release
, which will create a git tag for the version, push git commits and tags, and push the .gem
file to rubygems.org.
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/[USERNAME]/data_collector.
License
The gem is available as open source under the terms of the MIT License.