ActionPool
ActionPool is just a simple thread pool. It allows for various constraints and resizing in a pretty easy and unobtrusive manner. You can set limits on how long tasks are worked on, as well as on the life of a thread. For things that like to use lots threads, it can be helpful to reuse threads instead of constantly recreating them.
Install:
gem install actionpool
== Documentation
{rdocs}[http://allgems.ruby-forum.com/gems/ActionPool/]
Example
Code:
require 'actionpool'
pool = ActionPool::Pool.new
pool.process do
sleep(2)
raise 'Wakeup main thread'
end
20.times do
pool.process do
puts "Thread: #{Thread.current}"
sleep(0.1)
end
end
begin
sleep
rescue Exception => e
puts "Thread pool woke me up: #{e}"
end
Result:
Thread: #<Thread:0x93ebeb8>
Thread: #<Thread:0x93eb92c>
Thread: #<Thread:0x93eb8a0>
Thread: #<Thread:0x93eb814>
Thread: #<Thread:0x93eb788>
Thread: #<Thread:0x93eb670>
Thread: #<Thread:0x93eb5e4>
Thread: #<Thread:0x93eb558>
Thread: #<Thread:0x93eb4cc>
Thread: #<Thread:0x93ebeb8>
Thread: #<Thread:0x93eb92c>
Thread: #<Thread:0x93eb8a0>
Thread: #<Thread:0x93eb814>
Thread: #<Thread:0x93eb788>
Thread: #<Thread:0x93eb670>
Thread: #<Thread:0x93eb5e4>
Thread: #<Thread:0x93eb558>
Thread: #<Thread:0x93eb4cc>
Thread: #<Thread:0x93eb92c>
Thread: #<Thread:0x93eb8a0>
Thread pool woke me up: Wakeup main thread
Important note
The worker threads in the ActionPool will catch all Exception objects that your block fails to catch. Instead of just eating these exceptions, they are passed back to the creating thread of the pool (generally the main thread). This is important to note if you care about processing unexpected exceptions and what allows the example code above to be woken up from its sleep.
The internals
ActionPool has some simple settings that make things work. First, the pool has a minimum and maximum number of allowed threads. On initialization, the minimum number of threads are created and put into the pool. By default, this is 10 threads. As the number of tasks added to the pool increases, the pool will grow as needed. When more tasks are in the pool than threads to process them, new threads will be added into the pool, until the maximum thread threshold is reached. Taking the example above, we can demonstrate this easily by adjusting our limits:
require 'actionpool'
pool = ActionPool::Pool.new(:min_threads => 1, :max_threads => 3)
pool.process do
sleep(10)
raise 'Wakeup main thread'
end
20.times do
pool.process do
puts "Thread: #{Thread.current}"
sleep(rand(0.0))
end
end
begin
sleep
rescue Exception => e
puts "Thread pool woke me up: #{e}"
end
Which results in:
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1080>
Thread pool woke me up: Wakeup main thread
Our pool starts with a single thread that is occupied by the sleeping task waiting to raise an exception. As we begin to add new tasks, the pool grows to accommodate the growing number of tasks, until it reaches the maximum threshold of 3. At that point, the pool simply processes the tasks until the task list is empty.
The pool also has the ability to limit the amount of time a thread spends working on a given task. By default, a thread will work on a given task until the task is completed, or the pool is shutdown. However, as the following example shows, it is very easy to limit this time to avoid the pool being bogged down on long running tasks:
require 'actionpool'
pool = ActionPool::Pool.new(:min_threads => 1, :max_threads => 1, :a_to => 1)
pool.process do
puts "#{Time.now}: I'm a long running task"
sleep(100)
raise 'Wakeup main thread'
end
pool.process do
puts "#{Time.now}: Waiting for my turn"
raise "I'm waking up the main thread"
end
begin
sleep
rescue Exception => e
puts "Thread pool woke me up: #{e}"
end
Results:
2009-10-10 08:47:08 -0700: I'm a long running task
2009-10-10 08:47:09 -0700: Waiting for my turn
Thread pool woke me up: I'm waking up the main thread
If you have a number of tasks you would like to schedule at once, it is easy with the add_jobs method:
require 'actionpool'
pool = ActionPool::Pool.new
a = 0
lock = Mutex.new
tasks = [].fill(lambda{ lock.synchronize{ a += 1 } }, 0..19)
pool.add_jobs(tasks)
pool.shutdown
puts "Result: #{a}"
Results:
Result: 20
Passing arguments to tasks is now available as well:
require 'actionpool'
pool = ActionPool::Pool.new
string = 'Hello world'
puts "Original: #{string}. ID: #{string.object_id}"
pool << [lambda{|var| puts "Passed: #{var}. ID: #{var.object_id}"}, [string.dup]]
pool << [lambda{|a,b| puts "Passed: #{a} | #{b}. ID: #{a.object_id} | #{b.object_id}"}, [string, string.dup]]
pool.shutdown
Results:
Original: Hello world. ID: 70651630
Passed: Hello world. ID: 70651250
Passed: Hello world | Hello world. ID: 70651630 | 70651100
== Last remarks
If you find any bugs, please report them through {github}[http://github.com/spox/actionpool/issues].
== License
ActionPool is licensed under the LGPLv3 Copyright (c) 2012 spox spox@modspox.com