Project

serfx

0.02
No commit activity in last 3 years
No release in over 3 years
Serfx is a minimal ruby client for serf, an event based, distributed, fault tolerant orchestration system
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Development

Runtime

 Project Readme

Serfx

A bare minimal ruby client for Serf.

Introduction

Serfx uses Serf's RPC protocol under the hood. Serf RPC protocol involves communication with a local or remote serf agent over tcp using msgpack for serialization. Serf's RPC protocol mandates handshake (followed by auth if applicable) to be the first command(s) in an RPC session. Serfx exposes all the low level RPC commands via the Connection class, as well a convenient connect method which creates a connection object and does the handshake, auth and connection closing for you. If you are creating the Serfx::Connection directly, they you have to do the handshake (auth if applicable) and connection closing explicitly. For example, the command members can be invoked as:

Serfx.connect do |conn|
  conn.members
end

Which is equivalent to

conn = Serfx::Connection.new
conn.handshake
conn.members
conn.close

Sending custom events

Serf allows creating user events. An user event must have a name and can have an optional payload. Following will create an user event named play with payload PinkFloyd

conn.event('play', 'PinkFloyd')

Serf agents can be configured to invoke arbtrary script as [event handlers][event-handlers]. When serf agents receive the play event thay'll invoke the corresponding handler(s) and pass the payload 'PinkFloyd' via standard input. [event-handlers]: http://www.serfdom.io/docs/agent/event-handlers.html

Query and response

Serf queries are special events that can be fired against a set of node(s) and the target node(s) can respond to aginst the queried event. Since Serf's event pipeline is asyn, every query event is timeboxed (by default 15s) and only those responses that are received within the timeout period is yield-ed. Following is an example will fire a query event name 'foo' with 'bar' payload and print the incoming responses from any node within the default timeout(15s).

conn.query('foo', 'bar') do |response|
  p response
end

Writing custom handlers

serf agents can be configured to invoke an executable script when an user event is received.

Serfx::Utils::Handler module provides a set of helper methods to ease writing ruby based serf event handlers. It wraps the data passed via serf into a convenient SerfEvent object, as well as provides observer like API where callbacks can be registered based on event name and type.

For example, following script will respond to any qury event named 'upcase' and return the uppercase version of the original query event's payload

require 'serfx/utils/handler'

include Serfx::Utils::Handler

on :query, 'upcase' do |event|
  STDOUT.write(event.payload.upcase)
end

run

Assuming this event handler is configured with upcase user event (-event-handler 'query:upcase=/path/to/handler'), it can be used as:

serf query -no-ack upcase foo
Response from 'node1': FOO

Managing long running tasks via serf handlers

Serf event handler invocations are blocking calls. i.e. serf will not process any other event when a handler invocation is in progress. Due to this, long running tasks should not be invoked as serf handler directly.

AsyncJob helps buildng serf handlers that involve long running commands. It starts the command in background, allowing handler code to return immediately. It does double fork where the first child process is detached (attached to init as parent process) and and the target long running task is spawned as a second child process. This allows the first child process to wait and reap the output of actual long running task.

The first child process updates a state file before spawing the long ranning task(state='invoking'), during the lon running task execution (state='running') and after the spawned process' return (state='finished'). This state file provides a convenient way to query the current state of an AsyncJob.

AsyncJob porvide four methods to manage jobs. AsyncJob#start will start the task. Once started, AyncJob#state_info can be used to check whether the job is still running or finished. One started a job can be either in 'running' state or in 'finished' state. AsyncJob#reap is used for deleting the state file once the task is finished. An AsyncJob can be killed, if its in running state, using the AsyncJob#kill method. A new AyncJob can not be started unless previous AsyncJob with same name/state file is reaped.

require 'serfx/utils/async_job'
require 'serfx/utils/handler'

include Serfx::Utils::Handler

job = Serfx::Utils::AsyncJob.new(
  name: "bash_test"
  command: "bash -c 'for i in `seq 1 300`; do echo $i; sleep 1; done'",
  state: '/opt/serf/states/long_task'
  )

on :query, 'bash_test' do |event|
  case event.payload
  when 'start'
    puts job.start
  when 'kill'
    puts job.kill
  when 'reap'
    puts job.reap
  when 'check'
    puts job.state_info
  else
    puts 'failed'
  end
end

run

Assuming this handler is configured with bash_test query events (-event-handler query:bash_test=/path/to/handler), it can be used as:

serf query bash_test start
serf query bash_test check # check if job is running or finished
serf query bash_test reap # delete a finished job's state file
serf query bash_test kill

Specifying connection details

By default Serfx will try to connect to localhost at port 7373 (serf agent's default RPC port). Both Serfx::Connection#new as well as Serfx.connect accepts a hash specifying connection options i.e host, port, encryption, which can be used to specify non-default values.

Serfx.client(host: 'serf1.example.com', port: 7373, authkey: 'secret')

API documentation

[Detailed api documentation][api-doc] is accessible via rubydoc. [api-doc]: http://rubydoc.info/gems/serfx

Note

Currently the response of members RPC method returns the ipaddress of cluster members as byte array. You can use unpack method to convert it to string.

Serfx.client(host: 'serf1.example.com') do |conn|
  response = conn.members
  puts response.body['Members'].first['Addr'] # first member's IP in bytes
  puts response.body['Members'].first['Addr'].unpack('CCCC').join('.') # Same as string
end

Supported ruby versions

Serfx aims to support and is tested against the following Ruby implementations:

  • Ruby 1.9.3
  • Ruby 2.0.0
  • Ruby 2.1.0

License

Copyright (c) 2014, PagerDuty

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.