msgpack-rpc-stack
A module of implementation for MessagePack-RPC protocol stack.
Installation
Add this line to your application's Gemfile:
gem 'msgpack-rpc-stack'
And then execute:
$ bundle
Or install it yourself as:
$ gem install msgpack-rpc-stack
Example
server side
with EventMachine
#! /usr/bin/env ruby
# coding: utf-8
require 'logger'
require 'eventmachine'
require 'msgpack/rpc/server'
$logger = Logger.new(STDOUT)
$logger.datetime_format = "%Y-%m-%dT%H:%M:%S"
class RpcServer < EM::Connection
include MessagePack::Rpc::Server
# EM::Connection#receive_dataをMessagePack::Rpc::Server#receive_streamの
# aliasにすることによって自動的にデータが流し込まれるようにしている。
alias :receive_data :receive_stream
def post_init
info = Socket.unpack_sockaddr_in(get_peername())
@addr = info[1]
@port = info[0]
$logger.info("connection from #{@addr}:#{@port}")
end
def unbind
$logger.info("connection close #{@addr}:#{@port}")
end
#
# require for MessagePack::Rpc::Server
#
# send_dataメソッドはEM::Connectionで提供されるので定義なし
def on_error(e)
$logger.error(e.message)
end
#
# declar procedure
#
def bar
$logger.info("call `bar` from #{@addr}:#{@port}")
return "hello"
end
remote_public :bar
def foo(df)
$logger.info("call `foo` from #{@addr}:#{@port}")
EM.defer {
sleep 3
df.resolve("timeout")
}
end
remote_async :foo
def bye
$logger.info("receive notify `bye` from #{@addr}:#{@port}")
EM.stop
end
remote_public :bye
end
#
# main process
#
EM.run {
$logger.info("start dummy server")
EM.start_server("localhost", 9001, RpcServer)
}
use raw TCP socket
require 'logger'
require 'socket'
require 'msgpack/rpc/server'
$logger = Logger.new(STDOUT)
$logger.datetime_format = "%Y-%m-%dT%H:%M:%S"
#
# 簡略化のため、個別にスレッドを立ち上げる実装にしています。本来であれば、
# いちいちスレッドを立ち上げるのはリソースがもったいないので、ちゃんとし
# たコードを書く場合はIO.selectで多重化するようにして下さい。
#
class Server
include MessagePack::Rpc::Server
class Exit < Exception; end
class << self
def up(host, port)
$logger.info("start dummy server")
@server = TCPServer.open(host, port)
@thread = Thread.current
loop {
begin
sock = @server.accept
self.new(sock)
rescue Exit
break
end
}
end
def down
@thread.raise(Exit)
@thread.join
@server.close
$logger.info("stop dummy server")
end
end
def initialize(sock)
@sock = sock
info = @sock.peeraddr
@addr = info[2]
@port = info[1]
$logger.info("connection from #{@addr}:#{@port}")
Thread.new {
until @sock.eof?
# 受け取ったデータをMessagePack::Rpc::Server#receive_streamで
# モジュールにデータを流し込む
receive_stream(@sock.readpartial(1024))
end
$logger.info("connection close #{@addr}:#{@port}")
@sock.close
}
end
#
# require for MessagePack::Rpc::Server
#
def send_data(data)
@sock.write(data)
end
private :send_data
def on_error(e)
$logger.error(e.message)
end
private :on_error
#
# declar procedures
#
def bar
$logger.info("call `bar` from #{@addr}:#{@port}")
return "hello"
end
remote_public :bar
def foo(df)
$logger.info("call `foo` from #{@addr}:#{@port}")
Thread.new {
sleep 3
df.resolve("timeout")
}
end
remote_async :foo
def bye
$logger.info("receive notify `bye` from #{@addr}:#{@port}")
self.class.down
end
remote_public :bye
end
#
# main process
#
Server.up("localhost", 9001)
client side
#! /usr/bin/env ruby
# coding: utf-8
require 'socket'
require 'msgpack/rpc/client'
class SampleClient
include MessagePack::Rpc::Client
class Exit < Exception; end
class Error < Exception
def initialize(data)
@data = data
end
attr_reader :data
end
class << self
def open(host, port)
ret = self.allocate
ret.instance_eval {
@sock = TCPSocket.open(host, port)
@sock.sync
@thread = Thread.fork {
begin
loop {
# 受け取ったデータをMessagePack::Rpc::Client#receive_streamで
# モジュールにデータを流し込む
receive_stream(@sock.readpartial(1024))
}
rescue Exit
end
}
}
return ret
end
end
def close
@thread.raise(Exit)
@thread.join
@sock.close
end
#
# require for MessagePack::Rpc::Client
#
def send_data(data)
@sock.write(data)
end
private :send_data
end
#
# main process
#
port = SampleClient.open("localhost", 9001)
que = Queue.new
port.call(:foo) { |resp, error|
que << [:foo, resp, error]
}
port.call(:bar) { |resp, error|
que << [:bar, resp, error]
}
port.call(:baz) { |resp, error|
que << [:baz, resp, error]
}
p que.deq
p que.deq
p que.deq
port.notify(:bye)
port.close