BulkProcessor
Bulk upload data in a file (e.g. CSV), process in the background, then send a success or failure report
Installation
Add this line to your application's Gemfile:
gem 'bulk-processor'
And then execute:
$ bundle
Or install it yourself as:
$ gem install bulk-processor
Usage
Configuration
Bulk processor requires the following configuration
Back end: ActiveJob
Include the activejob
and back-end queueing gems in your Gemfile, e.g.
# Gemfile
gem 'activejob'
gem 'bulk-processor'
gem 'resque'
BulkProcessor.back_end = :active_job
BulkProcessor.queue_adapter = <adapter>
The default queue_adapter is :inline
, which skips queueing and processes synchronously. Since
this is backed by ActiveJob, all of the adapters in ActiveJob::QueueAdapters are supported,
including :resque
.
Back end: Dynosaur
Include the dynosaur
gem in your Gemfile, e.g.
# Gemfile
gem 'dynosaur'
gem 'resque'
BulkProcessor.back_end = :dynosaur
BulkProcessor.heroku.api_key = 'my-heroku-api-key'
BulkProcessor.heroku.app_name = 'my-heroku-app-name'
# Rakefile
require 'bulk_processor/back_end/dynosaur/tasks'
# If you do not already have an :enivronment rake task, create a no-op one as
# Dynosaur tasks depend on it.
task :environment
end
AWS S3
BulkProcessor.temp_directory = '/tmp'
BulkProcessor.aws.access_key_id = 'my-aws-access-key'
BulkProcessor.aws.secret_access_key = 'my-aws-secret'
BulkProcessor.aws.bucket = 'my-s3-bucket'
The CSV file passed to BulkProcessor will be persisted on AWS S3 so that the job can access it. This requires configuring AWS credentials, the S3 bucket in which to store the file, and a local temp directory to hold the file locally.
Setting up the processor
You will need to supply a class for CSV processing. This class must respond to the
start
instance method, the required_columns
and optional_columns
class methods,
and have the following signature for initialize:
class PetCSVProcessor
# @return [Array<String>] column headers that must be present
def self.required_columns
['species', 'name', 'age']
end
# @return [Array<String>] column headers that may be present. If a column
# header is present that is not in 'required_columns' or 'optional_columns',
# the file will be considered invalid and no rows will be processed.
def self.optional_columns
['favorite_toy', 'talents']
end
def initialize(csv, payload:)
# Assign instance variables and do any other setup
end
def start
# Process the CSV
end
end
Swiss Army Knife base class
To account for a common use case, a base BulkProcessor::CSVProcessor
class is provided,
though it must be explicitly required. This base class can be subclassed to build a CSV processor.
This base class implements the initializer and #start
methods and returns an empty set for .optional_columns
.
The #start
method iterates over each row, processes it using a RowProcessor
,
accumulates the results, which are passed off to a Handler
. An example
implementation could look like:
require 'bulk_processor/csv_processor'
class PetCSVProcessor < BulkProcessor::CSVProcessor
# Note: this must be overridden in a subclass
#
# @return [Array<String>] column headers that must be present
def self.required_columns
['species', 'name', 'age']
end
# @return [Array<String>] column headers that may be present. If a column
# header is present that is not in 'required_columns' or 'optional_columns',
# the file will be considered invalid and no rows will be processed.
def self.optional_columns
['favorite_toy', 'talents']
end
# Note: this must be overridden in a subclass
#
# @return [RowProcessor] a class that implements the RowProcessor role
def self.row_processor_class
PetRowProcessor
end
# @return [PreProcessor] a class that implements the PreProcessor role
def self.pre_processor_class
PetPreProcessor
end
# @return [PostProcessor] a class that implements the PostProcessor role
def self.post_processor_class
PetPostProcessor
end
# @return [Handler] a class that implements the Handler role
def self.handler_class
PetHandler
end
end
class PetRowProcessor < BulkProcessor::CSVProcessor::RowProcessor
# Process the row, e.g. create a new record in the DB, send an email, etc
def process!
pet = Pet.new(row)
if pet.save
self.successful = true
else
messages.concat(pet.errors.full_messages)
end
end
# Setting these allow us to identify error messages by these key/values for
# a row, rather than using the row number
def primary_keys
['species', 'name']
end
end
class PetPostProcessor
attr_reader :results
def initialize(row_processors)
# Assign instance variables and do any other setup
end
def start
cat_count = 0
@results = []
row_processors.each do |row_processor|
cat_count += 1 if row_processor.cat?
end
if cat_count > 2
@results << BulkProcessor::CSVProcessor::Result.new(messages: ['Too many cats!'],
successful: false)
end
end
end
class PetPreProcessor
attr_reader :results
def initialize(row_processors)
# Assign instance variables and do any other setup
end
def start
@results = []
row_count = row_processors.length
if row_count > 2
@results << BulkProcessor::CSVProcessor::Result.new(messages: ['Too many rows!'], successful: false)
end
end
def self.fail_process_if_failed
# a class method that's used to indicate if the csv process needs to be continue with PreProcessor failed
true
end
end
class PetHandler
# @param payload [Hash] the payload passed into 'BulkProcessor.process', can
# be used to pass metadata around, e.g. the email address to send a
# completion report to
# @param results [Array<BulkProcessor::CSVProcessor::RowProcessor>] results
# for processing the rows (there will be one pre row in the CSV plus zero
# or more from post-processing)
def initialize(payload:, results:)
# Assign instance variables and do any other setup
end
# Notify the owner that their pets were processed
def complete!
OwnerMailer.completed(results, payload)
end
# Notify the owner that processing failed
#
# @param fatal_error [StandardError] if nil, then all rows were processed,
# else the error that was raise is passed in here
def fail!(fatal_error)
OwnerMailer.failed(fatal_error, payload)
end
end
Kicking off the process
processor = BulkProcessor.new(
key: file_name,
stream: file_stream,
processor_class: PetCSVProcessor,
payload: { recipient: current_user.email }
)
if processor.start
# The job has been enqueued, go get a coffee and wait
else
# Something went wrong, alert the file uploader
handle_invalid_file(processor.errors)
end
Parallelization
For larger CSV files, you may wish to process rows in parallel. This gem allows
you to scale up to an arbitrary number of parallel processes by providing an optional
argument to #start
. Doing this will cause the input CSV file to be split into
N number of smaller CSV files, each one being processed in separate processes.
It is important to note that the file must be sorted by the boundary column for
it to deliver on its promise.
processor = BulkProcessor.new(
key: file_name,
stream: file_stream,
processor_class: PetCSVProcessor,
payload: { recipient: current_user.email }
)
if processor.start(5)
# Split the main CSV into 5 smaller files and process in parallel.
else
# Something went wrong, alert the file uploader
handle_invalid_file(processor.errors)
end
By default, the file will be split into equal-sized partitions. If you need the partitions
to keep all rows with the same value for a column into the same partition, define .boundary_column
on the processor class to return the name of that column. E.g.
pet_id,meal,mead_date
1,kibble,2015-11-02
1,bits,2015-11-03
...
1,alpo,2015-12-31
2,alpo,2015-11-01
...
class PetCSVProcessor
def self.boundary_column
'pet_id'
end
...
end
Finally, to be notified of any failures in the splitting process, you can define
.handler_class
on your processor class to return a class that implements the Handler role.
If an error is raised in the splitting, #fail!
will be called on the Handler with
the error.
class PetCSVProcessor
def self.handler_class
PetHandler
end
...
end
BulkProcessor::CSVProcessor::Result
The result instances passed from BulkProcessor::CSVProcessor to the Handler respond to the following messages:
-
#messages [Array<String>]
- zero or more messages generated when processing the row -
#row_num [Fixnum|nil]
- the CSV row number (starting with 2) or nil if result is from post-processing -
#primary_attributes [Hash]
- a set of values that can be used to identify which row the messages are for. You must override#primary_keys
to use this. -
#successful?
- true iff the processing happened with no errors
Development
After checking out the repo, run bin/setup
to install dependencies. Then, run bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
. To release a new version, update the version number in version.rb
, and then run bundle exec rake release
to create a git tag for the version, push git commits and tags, and push the .gem
file to rubygems.org.
Contributing
- Fork it ( https://github.com/apartmentlist/bulk-processor/fork )
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create a new Pull Request