SimpleQueues
In the Gang of Four books, one of the first few lines is “Program to an interface, not an implementation.” When you need a queue, the only operations you need are enqueue and dequeue. It doesn’t matter that Redis (a nice and simple queue server when you need it) has a ton of extra features which we aren’t going to use.
This library was written and spec’d on Ruby 1.9.2. It is also in use, in production, on JRuby in 1.8 mode.
Exceptions
All underlying exceptions the Redis gem raises are let through. This means you’ll see Errno::ECONNREFUSED
, Errno::EAGAIN
and friends. Of course, you may also receive ArgumentError if you do something bad.
Usage
require "simple_queues"
# Sane defaults:
# * Defaults to a Redis instance at 127.0.0.1:6379, database 0
# * Defaults to the JSON encoder
Queues = SimpleQueues::Redis.new
Queues.enqueue :pages_to_crawl, :url => "http://blog.teksol.info/"
Queues.enqueue :pages_to_crawl, :url => "http://techcrunch.com/"
# In another process
Queues = SimpleQueues::Redis.new(Redis.new("192.168.1.9", 9281), :encoder => :json)
loop do
# This blocks until a message is dequeued.
message = Queues.dequeue_blocking :pages_to_crawl
process(message)
end
# Alternatively, using a timeout
loop do
message = Queues.dequeue_with_timeout :pages_to_crawl, 5 # seconds
if message then
process(message)
else
# Timed out
break
end
end
<code>
Multiple Queues
Sometimes, you want to dequeue from multiple queues simultaneously, and react appropriately. When that happens, you need to use #on_dequeue
:
require "simple_queues"
Queues = SimpleQueues::Redis.new
$running = true
Queues.on_dequeue :crawler_controls do |message|
$running = false if message["command"] == "quit"
end
Queues.on_dequeue :pages_to_crawl do |message|
# Handle crawling a page
end
while $running do
Queues.dequeue_with_timeout 5 #seconds
end
Alternatively, the block you provide to #on_dequeue can accept two parameters and will be provided with the queue name:
require "simple_queues"
# Provides #underscore, #classify, #constantize and friends
require "active_support/inflector"
# Provides #seconds, #minutes and friends
require "active_support/core_ext/numeric/time"
Queues = SimpleQueues::Redis.new
class BaseWorker
def initialize(message)
@message = message
end
end
class CrawlerControlWorker < BaseWorker
def run
$running = false
end
end
class PagesToCrawlWorker < BaseWorker
def run
# Crawl, do your own stuff here
end
end
def handler(queue_name, message)
klass = (queue_name.to_s << "_worker").classify.constantize
klass.new(message).run
end
Queues.on_dequeue :crawler_controls, &method(:handler)
Queues.on_dequeue :pages_to_crawl, &method(:handler)
while $running
Queues.dequeue_with_timeout(30.seconds)
end
LICENSE
(The MIT License)
Copyright © 2011 François Beausoleil (francois@teksol.info)
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
‘Software’), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED ‘AS IS’, WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.