MedPipe
A Rails engine that provides mechanisms for processing datasets ranging from 1 million to several billion records.
Concept
MedPipe::Pipeline
Register PipelineTask through 'apply' method and execute them sequentially using 'run'.
MedPipe::PipelineTask
This is the basic unit of processing registered in the pipeline.
Tasks are divided into specific operations such as reading from DB or uploading to S3.
When handling large datasets, Enumerable::Lazy can be used to process data in chunks.
You need to implement the 'call' method:
@param context [Hash] Stores data during pipeline execution
@param prev_result [Object] The result of the previous task
def call(context, prev_result)
yield "data_to_pass_to_next_task"
end
MedPipe::PipelinePlan
A model for storing pipeline state, options, and results.
There are two ways to pass options for tasks: either retrieve from PipelinePlan or propagate through context.
MedPipe::PipelineGroup
A model for grouping plans.
Execution can be interrupted by setting parallel_limit to 0 during runtime.
Usage
- Create PipelineTask such as Reader, Uploader, etc. Samples
- Create PipelineRunner Sample
- Create a job for parallel Pipeline execution Sample
- Write code to register PipelinePlan
- Execute like this:
# add plan
pipeline_group = MedPipe::PipelineGroup.create!(parallel_limit: 10)
date_range = Date.new(2024, 6, 1)..Date.new(2024, 6, 30)
date_range.each do |date|
pipeline_group.pipeline_plans.status_waiting.create!(name: 'point_events', output_unit: :daily, target_date: date)
end
# execute
ExecutePipelineJob.perform_later(pipeline_group.id)
Installation
Add this line to your application's Gemfile:
gem "med_pipe"
Adding migration files
$ rails med_pipe:install:migrations
Test
Add this line to your test.rb to use factories in med_pipe
config.factory_bot.definition_file_paths << MedPipe::Engine.root.join('spec/factories')
Contributing
Bug reports and pull requests are welcome.
License
The gem is available as open source under the terms of the MIT License.