Parallel
Run any code in parallel Processes(> use all CPUs), Threads(> speedup blocking operations), or Ractors(> use all CPUs).
Best suited for map-reduce or e.g. parallel downloads/uploads.
Install
gem install parallel
Usage
# 2 CPUs -> work in 2 processes (a,b + c)
results = Parallel.map(['a','b','c']) do |one_letter|
SomeClass.expensive_calculation(one_letter)
end
# 3 Processes -> finished after 1 run
results = Parallel.map(['a','b','c'], in_processes: 3) { |one_letter| SomeClass.expensive_calculation(one_letter) }
# 3 Threads -> finished after 1 run
results = Parallel.map(['a','b','c'], in_threads: 3) { |one_letter| SomeClass.expensive_calculation(one_letter) }
# 3 Ractors -> finished after 1 run
results = Parallel.map(['a','b','c'], in_ractors: 3, ractor: [SomeClass, :expensive_calculation])
Same can be done with each
Parallel.each(['a','b','c']) { |one_letter| ... }
or each_with_index
, map_with_index
, flat_map
Produce one item at a time with lambda
(anything that responds to .call
) or Queue
.
items = [1,2,3]
Parallel.each( -> { items.pop || Parallel::Stop }) { |number| ... }
Also supports any?
or all?
Parallel.any?([1,2,3,4,5,6,7]) { |number| number == 4 }
# => true
Parallel.all?([1,2,nil,4,5]) { |number| number != nil }
# => false
Processes/Threads are workers, they grab the next piece of work when they finish.
Processes
- Speedup through multiple CPUs
- Speedup for blocking operations
- Variables are protected from change
- Extra memory used
- Child processes are killed when your main process is killed through Ctrl+c or kill -2
Threads
- Speedup for blocking operations
- Variables can be shared/modified
- No extra memory used
Ractors
- Ruby 3.0+ only
- Speedup for blocking operations
- No extra memory used
- Very fast to spawn
- Experimental and unstable
-
start
andfinish
hooks are called on main thread - Variables must be passed in
Parallel.map([1,2,3].map { |i| [i, ARGV, local_var] }, ...
- use
Ractor.make_shareable
to pass in global objects
ActiveRecord
Connection Lost
- Multithreading needs connection pooling, forks need reconnects
- Adjust connection pool size in
config/database.yml
when multithreading
# reproducibly fixes things (spec/cases/map_with_ar.rb)
Parallel.each(User.all, in_processes: 8) do |user|
user.update_attribute(:some_attribute, some_value)
end
User.connection.reconnect!
# maybe helps: explicitly use connection pool
Parallel.each(User.all, in_threads: 8) do |user|
ActiveRecord::Base.connection_pool.with_connection do
user.update_attribute(:some_attribute, some_value)
end
end
# maybe helps: reconnect once inside every fork
Parallel.each(User.all, in_processes: 8) do |user|
@reconnected ||= User.connection.reconnect! || true
user.update_attribute(:some_attribute, some_value)
end
NameError: uninitialized constant
A race happens when ActiveRecord models are autoloaded inside parallel threads in environments that lazy-load, like development, test, or migrations.
To fix, autoloaded classes before the parallel block with either require '<modelname>'
or ModelName.class
.
Break
Parallel.map([1, 2, 3]) do |i|
raise Parallel::Break # -> stops after all current items are finished
end
Parallel.map([1, 2, 3]) { |i| raise Parallel::Break, i if i == 2 } == 2
Kill
Only use if whatever is executing in the sub-command is safe to kill at any point
Parallel.map([1,2,3]) do |x|
raise Parallel::Kill if x == 1# -> stop all sub-processes, killing them instantly
sleep 100 # Do stuff
end
Progress / ETA
# gem install ruby-progressbar
Parallel.map(1..50, progress: "Doing stuff") { sleep 1 }
# Doing stuff | ETA: 00:00:02 | ==================== | Time: 00:00:10
Use :finish
or :start
hook to get progress information.
-
:start
has item and index -
:finish
has item, index, and result
They are called on the main process and protected with a mutex.
(To just get the index, use the more performant Parallel.each_with_index
)
Parallel.map(1..100, finish: -> (item, i, result) { ... do something ... }) { sleep 1 }
Set finish_in_order: true
to call the :finish
hook in the order of the input (will take longer to see initial output).
Parallel.map(1..9, finish: -> (item, i, result) { puts "#{item} ok" }, finish_in_order: true) { sleep rand }
Worker number
Use Parallel.worker_number
to determine the worker slot in which your
task is running.
Parallel.each(1..5, in_processes: 2) { |i| puts "Item: #{i}, Worker: #{Parallel.worker_number}" }
Item: 1, Worker: 1
Item: 2, Worker: 0
Item: 3, Worker: 1
Item: 4, Worker: 0
Item: 5, Worker: 1
Dynamically generating jobs
Example: wait for work to arrive or sleep
queue = []
Thread.new { loop { queue << rand(100); sleep 2 } } # job producer
Parallel.map(Proc.new { queue.pop }, in_processes: 3) { |f| f ? puts("#{f} received") : sleep(1) }
Tips
- [Benchmark/Test] Disable threading/forking with
in_threads: 0
orin_processes: 0
, to run the same code with different setups - [Isolation] Do not reuse previous worker processes:
isolation: true
- [Stop all processes with an alternate interrupt signal]
'INT'
(fromctrl+c
) is caught by default. Catch'TERM'
(fromkill
) withinterrupt_signal: 'TERM'
- [Process count via ENV]
PARALLEL_PROCESSOR_COUNT=16
will use16
instead of the number of processors detected. This is used to reconfigure a tool usingparallel
without inserting custom logic. - [Process count]
parallel
uses a number of processors seen by the OS for process count by default. If you want to use a value considering CPU quota, please addconcurrent-ruby
to yourGemfile
.
TODO
- Replace Signal trapping with simple
rescue Interrupt
handler
Authors
- Przemyslaw Wroblewski
- TJ Holowaychuk
- Masatomo Nakano
- Fred Wu
- mikezter
- Jeremy Durham
- Nick Gauthier
- Andrew Bowerman
- Byron Bowerman
- Mikko Kokkonen
- brian p o'rourke
- [Norio Sato]
- Neal Stewart
- Jurriaan Pruis
- Rob Worley
- Tasveer Singh
- Joachim
- yaoguai
- Bartosz Dziewoński
- yaoguai
- Guillaume Hain
- Adam Wróbel
- Matthew Brennan
- Brendan Dougherty
- Daniel Finnie
- Philip M. White
- Arlan Jaska
- Sean Walbran
- Nathan Broadbent
- Yuki Inoue
- Takumasa Ochi
- Shai Coleman
- Earlopain
Michael Grosser
michael@grosser.it
License: MIT