This is a quick gem created to manage KubeMQ with Ruby.
To install ArangoRB: gem install RBKubeMQ
To use it in your application: require rbkubemq
For examples, look the tests in "/spec/lib/spec_helper".
It requires the gems "HTTParty", "Oj" and "faye-websocket".
Used classes
- RBKubeMQ::Client: to manage a general client
- RBKubeMQ::Sender: to manage a sender
- RBKubeMQ::Streamer: to create a websocket to send data
- RBKubeMQ::Subscriber: to create a websocket to receive data
- RBKubeMQ::Utility: to parse and load easier the data
- RBKubeMQ::Error: to manage generic error
RBKubeMQ::Client
Arango::Server is used to manage a connection with KubeMQ.
client = RBKubeMQ::Client.new host: "YOUR_HOST", port: "8080", tls: false # tls is true then it will make your requests with https and wss instead of http or ws
RBKubeMQ::Sender
Sender it is used to do HTTP requests to KubeMQ. It manage event, request, query and response requests.
sender = client.sender client_id: "YOUR_CLIENT", channel: "YOUR_CHANNEL",
meta: nil, store: false, timeout: 1000, cache_key: nil, cache_ttl: nil
The possible request that you can do are the following:
sender.event("YOUR MESSAGE") # send an event
sender.request("YOUR MESSAGE") # send a request (we do not expect a body)
sender.query("YOUR MESSAGE") # send a query (we expect a body)
sender.response(received_request, message: "YOUR MESSAGE") # send a response to a request (the received request is the one received with a subscriber)
You can overwrite the default values by inserting them as attributes, like this:
sender.event("YOUR MESSAGE", client_id: "client_id")
Note that client_id, channel are mandatory values that need to be insert either at the initialization or during the request.
RBKubeMQ::Streamer
Streamer it is used to create a stream websocket that it will be used to communicate with the KubeMQ. By using an eventmachine the structure of a streamer can be similar to one of a websocket.
i = 0
EM.run do
streamer = client.streamer(client_id: "YOUR_CLIENT", channel: "YOUR_CHANNEL",
meta: nil, store: false)
streamer.on :open do |event|
p [:open]
end
streamer.on :message do |event|
p [:message, RBKubeMQ::Utility.load(event.data)]
end
streamer.on :close do |event|
p [:close]
streamer = nil; EM.stop
end
# Send a message every second
timer = EM::PeriodicTimer.new(1) do
i += 1
puts "SENDING #{i}"
streamer.send(i, meta: "Stream")
end
end
RBKubeMQ::Subscriber
Subscriber is liked the streamer but it is used only to subscribe to a client_id and a channel. You can use groups to subdivide the queue between different machines. It cannot be use to send data.
EM.run do
subscriber = client.subscriber(client_id: "YOUR_CLIENT", channel: "YOUR_CHANNEL",
group: "YOUR_GROUP")
subscriber.on :open do |event|
p [:open]
end
subscriber.on :message do |event|
p [:message, RBKubeMQ::Utility.load(event.data)]
end
subscriber.on :close do |event|
p [:close]
subscriber = nil; EM.stop
end
end
In the example the subscriber is for events. You can subscribe to "events_store", "commands", and "queries" by specifying "type: events_store" during the initialization.
RBKubeMQ::Utility
RBKubeMQ::Utility.dump(hash) # Convert hash in correct format for KubeMQ
RBKubeMQ::Utility.load(string) # Parse hash in human format from KubeMQ
RBKubeMQ::Error
RBKubeMQ::Error is used to manage generic errors inside of the gem.
Test
To test, create a file "config.yml" in the "spec" folder with inside:
host: YOUR_KYBEMQ_HOST
Then run the tests with rspec spec/lib/sender.rb
.