0.0
No commit activity in last 3 years
No release in over 3 years
Abstracts common patterns used with AMQP
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Development

Runtime

< 6.0, > 4.0
~> 1.5
~> 2.1
~> 3.0
 Project Readme

mercury

Mercury is a messaging layer intended to hide complexity for typical messaging scenarios. It is backed by the AMQP gem and consequently runs in an EventMachine reactor and has an asynchronous API. Mercury consists of sources, work queues, and listeners. A message is published to a source, to which one or more work queues and/or listeners are attached. These map roughly to AMQP constructs:

  • source: topic exchange
  • work queue: durable named queue
  • listener: temporary anonymous queue
  • tag: routing key

At the moment, mercury is backed by AMQP and serializes messages as JSON. In the future, additional transports and message formats may be supported.

Mercury writes string messages directly without encoding; this allows a client to pre-encode a message using an arbitrary encoding. The receiving client receives the encoded bytes as the message content (assuming the encoded message fails to parse as JSON).

require 'mercury'

def run
  EventMachine.run do
    Mercury.open do |m|
      m.start_worker('cooks', 'orders', method(:handle_message)) do
        m.publish('orders', {'table' => 5, 'items' => ['salad', 'steak', 'cake']})
      end
    end
  end
end

def handle_message(msg)
  order = msg.content
  cook(order)
  msg.ack
end

Notably, mercury also has a monadic interface that hides the explicit continuation passing introduced by asynchrony, which has the effect of flattening chained calls. This is particularly useful for testing, where the same code plays both sides of a conversation. Compare:

require 'mercury'

Mercury.open do |m|
  m.start_listener(source, proc{}) do
    m.source_exists?(source) do |r1|
      expect(r1).to be true
      m.delete_source(source) do
        m.source_exists?(source) do |r2|
          expect(r2).to be false
          m.close do
            done
          end
        end
      end
    end
  end
end

# ... vs ...

require 'mercury/monadic'

seq do
  let(:m)  { Mercury::Monadic.open    }
  and_then { m.start_listener(source) }
  let(:r1) { m.source_exists?(source) }
  and_lift { expect(r1).to be true    }
  and_then { m.delete_source(source)  }
  let(:r2) { m.source_exists?(source) }
  and_lift { expect(r2).to be false   }
  and_then { m.close                  }
  and_lift { done                     }
end