What a stupid name
You likely think that this library has something to do with attributes, but it does not. So yes, it probably is a stupid name. Its named after
http://en.wikipedia.org/wiki/Ampex_ATR-100
in the spirit of following the celluloid/reel analog tape metaphor naming convention. But mostly because its short to type and Im lazy.
What it does
Websockets/publishing events to connected clients, using the following as its backbone:
Reel, Celluloid&Celluloid::IO, and Redis
Why use this over other websocket libraries for ruby?
Well dont use this yet because its incomplete. But the goal is mainly:
- Make it easy to publish changes to the connected client
- Event machine is a pile of your preferred synonym for garbage, so, didnt want to use any library that uses event machine. Use CelluloidIO and Reel as a foundation, because Reel is pretty cool.
- Be able to publish events from anywhere in your application eventually, but for now just focus on making it easy to publish when resources themselves are created/updated/destroyed.
- Single thread per websocket request that comes in. That lets you scope the websockets to the lowest level in your application that you choose to. I.E.
Lets say you have a subscriber which has many users. You can scope the publishing of events to the subscriber, and each connected user has its own websocket thread open to prevent chaotic behavior Ive seen happen in many other websocket implementations.
So its strongly focused on listening for events, not triggering, maybe that will change in future maybe not IDK.
Usage
To generate events upon actions taking place within your application, include
include ::Atr::Publishable
Into your model. Example
How it works
class Post < ::ActiveRecord::Base
include ::Atr::Publishable
end
This will set up 3 publishing queues, and 3 after_action callbacks for the respective actions.
post.created
post.destroyed
post.updated
which will basically do
def publish_created_event
routing_key = self.class.build_routing_key_for_record_action(self, "created")
// post.created
event_name = self.class.resource_action_routing_key("created")
// post.created
record_created_event = ::Atr::Event.new(routing_key, event_name, self)
::Atr.publish_event(record_created_event)
end
Etc Etc for updated/destroyed.
So to walkthrough publish_created_event above
- First, we create routing key based on the name of the class, + the action. Additionally if you scope the event, this will be reflected in the routing key (i.e. you can scope it to a particular subscriber or user or whatever, so you can share state and or sync events between multiple users belonging to the same organization). (more on that later)
- we generate an event name based on name of the class + the action (scope doesent matter we just want to describe what happened)
- wrap the record in an ::Atr::Event object
- Publish the event, this will Marshal.dump the record through redis, and if there is a subscriber listening on the routing key of the event, the websocket connection (Atr::Reactor) instance, will receive that message, unmarshall it back into the original event object, and write it to the websocket.
This allows us to publish events with pretty fine grained precision, and broadcast them to specific subscribers. If you're unfamiliar with redis pub/sub, rundown is, if you are listening to the channel at the moment the message is published, the subscriber will get it, otherwise it removes it and pays no regard to the msssage being published. No durability, but thats what we want for websocket events.
How the websocket server works
The websocket server works differently than many other implementations, in that its by design a standalone process, which acts mainly as a router for websocket connections that come in. When a valid websocket request comes in, it will launch a brand new thread, close the original request and detatch the websocket, and pass it into the object which controls the websocket (::Atr::Reactor, for lack of a better name ATM). This once again is mainly about scope, and has arisen out of the past frustrations of using websocket libraries which were built on event machine which I spent countless hours debugging, issues like duplicate events.
Its also IMO the ideal way to model a socket server, 1 thread belonging to each client which connects to it. Close the thread when they disconnect. Only use resources for whats currently relevant.
Starting the socket server
bx atr_server start --server_host="127.0.0.1" --server_port="7777"
(the defaults, the above is the same as)
bx atr_server start
Connecting to socket server via JS
var ws = new WebSocket("ws://127.0.0.1:7777");
Listen for events
ws.onmessage = function(e) {
var event, parsed_event;
event = e.data;
parsed_event = JSON.parse(event);
console.log(parsed_event);
}
Full Example (including client side)
Here is a snippet of code from an inprogress sideproject, using a base angular controller (using angular-ui-router). This is enough to listen to any event in the application, and display growl notifications for all connected members of the "organization", notifying them of what action took place.
$stateProvider.state('base', {
abstract: false,
url: "",
templateUrl: '/templates/base.html',
resolve: {
current_organization: function(CurrentOrganization) {
return CurrentOrganization.get();
},
current_user: function(CurrentUser) {
return CurrentUser.get();
}
},
controller: function($scope, current_organization, current_user, $state, growl) {
$scope.current_organization = current_organization;
$scope.current_user = current_user;
$scope.websocket_params = {
organization: current_organization.id
};
$scope.websocket_base_url = "ws://127.0.0.1:7777";
$scope.websocketUrl = function() {
return [ $scope.websocket_base_url, _.flatten(_.pairs($scope.websocket_params)).join("/") ].join("/");
};
$scope.ws = new WebSocket($scope.websocketUrl());
$scope.ws.onopen = function() {
console.log('opening ws con');
$scope.ws.send(JSON.stringify({action: "do.something." + current_user.id}));
};
$scope.ws.onmessage = function(e) {
$scope.dispatchMessage(e.data);
};
$scope.ws.onclose = function() {
alert("websocket connection closed");
};
$scope.do_something = function() {
$scope.ws.send('do_something');
};
$scope.dispatchMessage = function (message) {
var event = JSON.parse(message);
$scope.$root.$broadcast(event.name, event);
};
_.each(current_organization.websocket_channels, function(channel){
$scope.$on(channel, function(e, websocket_event){
console.log(websocket_event);
growl.addInfoMessage(websocket_event.name);
$scope.$root.$digest();
});
});
}
});
Initializer
::Atr.configure do |config|
config.authenticate_with = ::WebsocketAuthenticator
config.scope_with = ::WebsocketScope
config.event_serializer = ::WebsocketEventSerializer
end
NOTE: the following are bad examples. I.e. Im not really authenticating anything im just checking that the websocket request has a valid organization id in the path, really youd want to use auth token or some way to validate the request. But it's so low level that it should be easy to do whatever you need to w/this middlewarish pattern for scoping/validating.
Websocket Authenticator
class WebsocketAuthenticator < ::Atr::RequestAuthenticator
def matches?
current_organization.present?
end
def current_organization
@current_organization ||= ::Client::Organization.find(segs[1])
end
def segs
@segs ||= request.url.split("/").reject(&:empty?)
end
end
Websocket Scope
class WebsocketScope < ::Atr::RequestScope
VALID_SCOPE_KEYS = ["organization"]
def segs
@segs ||= request.url.split("/").reject(&:empty?)
end
def routing_key
[segs[0], segs[1]].join(".")
end
def valid?
VALID_SCOPE_KEYS.include?(segs[0]) && segs.size == 2
end
end
You also have access to query string params as a hash with params method in either class.
Event Serializer
Im using ActiveModel Serializers, but any serializer that is instantiated as new, passes in the record, and is serialized via the .to_json method should work (so if you want to use decorators or a custom class or something).
class WebsocketEventSerializer < BaseSerializer
self.root = false
attributes :id, :name, :record, :routing_key, :record, :occured_at, :time_ago
def time_ago
distance_of_time_in_words(object.occured_at, ::DateTime.now)
end
def action
object.name.split(".").pop
end
end
Model, and scoping the publication of the event
class Post < ::ActiveRecord::Base
include ::Atr::Publishable
publication_scope :organization_id
end
Kind of ghetto, but works for now, basically, this will using the organization_id attribute, and prepend the key (without the _id), i.e.
organization.#{organization_id}.post.created
Whenever creating routing keys when publishing events. (only for that specific resource though, so you probably want to add that same publication scope, and define a method that gets to that scope for each of your models requiring publication).
Last but not least, for a programatic way of knowing which channels to listen to I.E., in the javascript above
_.each(current_organization.websocket_channels, function(channel){
$scope.$on(channel, function(e, websocket_event){
console.log(websocket_event);
});
});
You can get the channels via the registry
def websocket_channels
::Atr::Registry.channels
end
If any of how it works is confusing, pay attention to the following as it may clear things up:
NOTE: the redis pubsub mechanism is only concerned about the publication scope, i.e.
organization.1234.post.created
But since each connection launches a new thread, listenting to that specific channel, we can then broadcast the event itself to the websocket as
post.created
and it will be scoped appropriately to correct client, as its only actually written out to the websocket threads belonging to that organization.
(we actually just write the entire event object to the socket and the client side JS is responsible for figuring out how to route it and what not)
Scoping and authentication
Quick explanation is, it works much like Rack middleware. Configure a class and it will be passed the request object on initialize.
Class must respond to matches? which will determine whether the request is valid, and in the case of scope_with, will scope the publishing of the record, i.e.
organization.1234.post.created rather than post.created
Advanced Configuration
Todo: explain scoping and authenticating the websocket requests and provide better example.
Important To Do
Allow target redis instance to actually be configurable. Right now just running locally so connects to redis defaults.
Configuration / Initializer
::Atr.configure do |config|
config.authenticate_with = ::WebsocketAuthenticator
config.scope_with = ::WebsocketScope
config.event_serializer = ::WebsocketEventSerializer
end
FYIs / Gotchas / Notes to self
ActiveRecord opens new connection each time request comes in, Im manually closing it as we do need to have AR loaded for the purposes of reading the schema, but after that we dont need a connection at all since all the marshaling/unmarshaling the event does not require it. (As far as I can see at least). The main application w/ the publisher, does the serialization, so the server doesen't need the connections. So no used up connections per the websocket threads that are created. Winning.
If I decide to go the route of websocket rails to allow cruding beyond just listening as it stands right now, the websocket connections will need to use connection pool.
Installation
Add this line to your application's Gemfile:
gem 'atr'
And then execute:
$ bundle
Or install it yourself as:
$ gem install atr
Contributing
- Fork it ( https://github.com/[my-github-username]/atr/fork )
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create a new Pull Request