EventMachine ZMTP1.0 protocol (ZMQ2.x protocol)
It is implementation of ZMTP 1.0 - ZMQ 2.x transport protocol using facilites provided by EventMachine. There are implementations of ZMQ socket types which try to be similar to original, but not too hard. Moreover, you may create your own behaviour. Library is tested against native ZMQ using ffi-rzmq.
Installation
Add this line to your application's Gemfile:
gem 'em-zmq-tp10'
And then execute:
$ bundle
Or install it yourself as:
$ gem install em-zmq-tp10
Usage
Library provides callback oriented classes which tries to emulate behaviour of standard ZMQ classes, but already integrated with EventMachine eventloop.
Main difference in behaviour is in highwatermark handling and balancing:
For DEALER and REQ zmq provides roundrobin load balancing until HighWaterMark reached, then send operation blocks or returns error EAGAIN
.
This implementation do roundrobin until fixed internal highwatermark (2048bytes) reached, and then pushes to common queue until userdefined watermark is reached.
This internal per connect buffer is handled by EventMachine itself, and there is no precise control over it, so that, if peer is disconnected, all pushed messages to this buffer are lost. So that, compared to raw ZMQ, you loose not only content of OS's internal socket buffer, but EventMachine buffer as well :(
But since ZMQ is never pretended on durability, it is not big issue (for me).
There is two strategy of highwatermark handling: drop_first
and drop_last
.
drop_last
- is ignoring any try to send message if queue is full - this is default strategy for ZMQ if you use nonblocking sending.
drop_first
- dropping earliest message in a queue, so that newly inserted message will have more chanches to be sent. You can react on such dropping by overriding cancel_message
(or cancel_request
for Req). I like this strategy more cause old request tends to be less usefull, but drop_last
is still default for "compatibility".
There is also simplified classes without internal queue (PreDealer
, PreReq
, PreRouter
, PreRep
, PrePub
), so that you can implement your own strategy of queueing.
And you could do any crazy thing using base EM::Protocols::Zmq2::Socket
class
Socket
Base class. It provides #connect and #bind methods for establishing endpoints. This method could be called outside EM event loop (even before EM.run called), cause they use EM.schedule. TCP and IPC endpoints are supported and fully interoperable with native ZMQ. INPROC supported as well, but you should treat them as connections inside EventMachine's context, so that you could not connect to native ZMQ inproc endpoints.
Dealer
class MyPreDealer < EM::Protocols::Zmq2::PreDealer
def receive_message(message)
puts "Message received: #{message.inspect}"
end
end
dealer = MyPreDealer.new
dealer.connect('tcp://127.0.0.1:8000')
dealer.bind('unix://dealer.sock')
EM.schedule {
if !dealer.send_message(['asdf','fdas'])
puts "Could not send message (no free peers)"
end
}
class MyDealer < EM::Protocols::Zmq2::Dealer
def receive_message(message)
puts "Message received: #{message.inspect}"
end
end
dealer = MyDealer.new(hwm: 1000, hwm_strategy: :drop_last)
dealer.connect('tcp://127.0.0.1:8000')
EM.schedule {
if !dealer.send_message(['asdf','fdas'])
puts "No free peers and outgoing queue is full"
end
}
dealer = EM::Protocols::Zmq2::DealerCb.new do |message|
puts "Receive message #{message.inspect}"
end
dealer.connect('ipc://rep')
EM.schedule {
dealer.send_message(['hello','world'])
}
Req
class MyPreReq < EM::Protocols::Zmq2::PreReq
def receive_reply(message, data, request_id)
puts "Received message #{message} and stored data #{data}"
end
end
req = MyPreReq.new
req.bind(...)
req.connect(...)
if request_id = req.send_request(['this is', 'message'], 'saved_data')
puts "Message sent"
else
puts "there is no free peer"
end
class MyReq < EM::Protocols::Zmq2::PreReq
def receive_reply(message, data, request_id)
puts "Received message #{message} and stored data #{data}"
end
end
req = MyReq.new
req.bind(...)
req.connect(...)
if request_id = req.send_request(['hi'], 'ho')
puts "Message sent"
end
req = EM::Protocols::Zmq2::ReqCb.new
req.bind('ipc://req')
timer = nil
request_id = req.send_request(['hello', 'world']) do |message|
EM.cancel_timer(timer)
puts "Message #{message}"
end
if request_id
timer = EM.add_timer(1) {
req.cancel_request(request_id)
}
end
req = EM::Protocols::Zmq2::ReqDefer.new
req.bind('ipc://req')
data = {hi: 'ho'}
deferable = req.send_request(['hello', 'world'], data) do |reply, data|
puts "Reply received #{reply} #{data}"
end
deferable.timeout 1
deferable.errback do
puts "Message canceled"
end
deferable.callback do |reply, data|
puts "Another callback #{reply} #{data}"
end
Router
Router stores peer identity in a message, as ZMQ router do.
And it sends message to a peer, which idenitity equals to first message string.
PreRouter
does no any queue caching, Router
saves message in queue per peer, controlled by highwatermark strategy.
class MyPreRouter < EM::Protocols::Zmq2::PreRouter
def receive_message(message)
puts "Received message #{message} (and it contains envelope)"
end
end
router = MyPreRouter.new
router.bind(...)
router.send_message(message)
class MyRouter < EM::Protocols::Zmq2::Router
def receive_message(message)
puts "Received message #{message}"
message[-1] = 'reply'
send_message(message)
end
end
router = MyPreRouter.new(hwm: 1000, hwm_strategy: :drop_first)
router.bind(...)
router.send_message(message)
Rep
REP differs from Router mainly in methods signature.
class EchoBangPreRep < EM::Protocols::Zmq2::PreRep
def receive_request(message, envelope)
message << "!"
if send_reply(message, envelope)
puts "reply sent successfuly"
end
end
end
rep = EchoBangPreRep.new
rep.bind('ipc://rep')
class EchoBangRep < EM::Protocols::Zmq2::Rep
def receive_request(message, envelope)
message << "!"
if send_reply(message, envelope)
puts "reply sent successfuly"
end
end
end
rep = EchoBangRep.new
rep.bind('ipc://rep')
Sub
Unless ZMQ sub, this Sub
accepts not only strings, but also RegExps and procs for subscribing.
Note that as in ZMQ 2.x filtering occurs on Sub side.
Since subscriptions could be defined with callback passed to :subscribe
option, subscribe
or subscribe_many
methods, you could use this class without overloading.
class MySub < EM::Protocols::Zmq2::Sub
def receive_message(message)
puts "default handler"
end
end
sub = MySub.new(subscribe: ['this', 'that'])
sub.subscribe /^callback/i, do |message|
puts "Callback subscribe #{message}"
end
sub.subscribe_many(
proc{|s| s.end_with?("END")} => proc{|message| puts "TILL END #{message}"},
'' => nil # also to default
)
Pub
PrePub
sends messages only to connected and not busy peers. send_message
returns true, if there is at least one peer with short EventMachine's outgoing queue, to which message is scheduled.
Pub
tries to queue messages for all connected and for disconnected peers with explicit identity set.
Since there is no incoming data, there is no need to overload methods.
pub = EM::Protocols::Zmq2::PrePub.new
pub.bind(...)
pub.send_message(['hi', 'you'])
pub = EM::Protocols::Zmq2::Pub.new
pub.bind(...)
pub.send_message(['hi', 'you'])
Contributing
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Added some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request