NSQ adapter and testing objects for using the NSQ messaging system in your Ruby project.
This library is intended to facilitate publishing and consuming messages on an NSQ messaging queue.
We also include fakes to make testing easier.
This library is dependent
on the nsq-ruby-fastly
gem.
Please use GitHub Issues to report bugs.
Install
fastly_nsq
is a Ruby Gem
tested against Rails >= 4.2
and Ruby >= 2.1.8
.
To get started,
add fastly_nsq
to your Gemfile
and bundle install
.
Usage
Connections
NSQD cconnections can be discovered via nsqlookupd's or specified explicity for consumers and producers.
Using nsqlookup:
Set the ENV variable to a comma sepearated string of lookups:
ENV['NSQLOOKUPD_HTTP_ADDRESS'] = "lookup01:1234,lookup01:1234"
Or configure them directly:
FastlyNsq.configure do |config|
config.lookupd_http_addresses = ["lookup01:1234", "lookup02:1234"]
end
Using nsqd directly:
NSQD connections can be specified for consumers and producers. Being able to set different sets for consumers and producers facilitates removing and adding new instances without downtime.
Set the following ENV variables to a comma sepearted string of nsqds:
ENV['NSQD_CONSUMERS']="nsqd01:4150,nsd02:4150"
ENV['NSQD_PRODUCERS']="nsqd01:4150,nsd02:4150"
Or configure them directly:
FastlyNsq.configure do |config|
config.consumer_nsqds = ["nsqd01:4150", "nsqd02:4150"]
config.producer_nsqds = ["nsqd01:4150", "nsqd02:4150"]
end
Connection Priority
When FastlyNsq.consumer_nsqds
or FastlyNsq.producer_nsqds
are set they
will be used instead of FastlyNsq.lookupd_http_addresses
.
TLS
Set the following ENV variables to enable TLS support:
NSQ_SSL_KEY
NSQ_SSL_CERTIFICATE
NSQ_SSL_CA_CERTIFICATE
NSQ_SSL_VERIFY_MODE (optional)
FastlyNsq::Producer
This is a class which provides an adapter to the fake and real NSQ producers. These are used to write messages onto the queue:
message_data = {
"data" => {
"key" => "value"
}
}
producer = FastlyNsq::Producer.new(
topic: topic,
)
producer.write(message_data.to_json)
The mock/real strategy used can be switched by requiring the test file and configuring the mode.
require 'fastly_nsq/testing'
FastlyNsq::Testing.enabled? #=> true
FastlyNsq::Testing.disabled? #=> false
producer = FastlyNsq::Producer.new(topic: topic)
listener = FastlyNsq::Listener.new(topic: topic, channel: channel, processor: ->(m) { puts 'got: '+ m.body })
FastlyNsq::Testing.fake! # default, messages accumulate on the listeners
producer.write '{"foo":"bar"}'
listener.messages.size #=> 1
FastlyNsq::Testing.reset! # remove all accumulated messages
listener.messages.size #=> 0
producer.write '{"foo":"bar"}'
listener.messages.size #=> 1
listener.drain
# got: {"foo"=>"bar"}
listener.messages.size #=> 0
FastlyNsq::Testing.inline! # messages are processed as they are produced
producer.write '{"foo":"bar"}'
# got: {"foo"=>"bar"}
listener.messages.size #=> 0
FastlyNsq::Testing.disable! # do it live
FastlyNsq::Testing.enable! # re-enable testing mode
FastlyNsq::Consumer
This is a class which provides an adapter to the fake and real NSQ consumers. These are used to read messages off of the queue:
consumer = FastlyNsq::Consumer.new(
topic: 'topic',
channel: 'channel'
)
consumer.size #=> 1
message = consumer.pop
message.body #=> "{ 'data': { 'key': 'value' } }"
message.finish
consumer.size #=> 0
consumer.terminate
FastlyNsq::Listener
To process the next message on the queue:
topic = 'user_created'
channel = 'my_consuming_service'
processor = MessageProcessor
FastlyNsq::Listener.new(topic: topic, channel: channel, processor: processor)
This will send messages through FastlyNsq.manager.pool
off of the queue
and send the JSON text body
to MessageProcessor.call(message)
.
Specify a topic priority by providing a number (default is 0)
topic = 'user_created'
channel = 'my_consuming_service'
processor = MessageProcessor
priority = 1 # a little higher
FastlyNsq::Listener.new(topic: topic, channel: channel, processor: processor, priority: priority)
FastlyNsq::CLI
To help facilitate running the FastlyNsq::Listener
in a blocking fashion
outside your application, a CLI
and bin script fastly_nsq
are provided.
This can be setup ahead of time by calling FastlyNsq.configure
and passing block.
# config/fastly_nsq.rb
FastlyNsq.configure do |config|
config.channel = 'fnsq'
config.logger = Logger.new
config.preprocessor = ->(_) { FastlyNsq.logger.info 'PREPROCESSESES' }
config.max_attempts = 20
config.max_req_timeout = (60 * 60 * 4 * 1_000) # 4 hours
config.max_processing_pool_threads = 10
lc.listen 'posts', ->(m) { puts "posts: #{m.body}" }
lc.listen 'blogs', ->(m) { puts "blogs: #{m.body}" }, priority: 3
end
An example of using the cli:
./bin/fastly_nsq -r config/fastly_nsq.rb -L ./test.log -P ./fastly_nsq.pid -v -d -t 4 -c 10
FastlyNsq::Messenger
Wrapper around a producer for sending messages and persisting producer objects.
FastlyNsq::Messenger.deliver(message: msg, topic: 'my_topic', originating_service: 'my service')
You can also optionally pass custom metadata.
FastlyNsq::Messenger.deliver(message: msg, topic: 'my_topic', originating_service: 'my service', meta: { test: 'test' })
This will use a FastlyNsq::Producer for the given topic or create on if it isn't
already persisted. Then it will write the passed message to the queue. If you don't set
the originating service it will use unknown
You can also set the originating service for all deliver
calls:
FastlyNsq::Messenger.originating_service = 'some awesome service'
FastlyNsq::Messenger
also spuports delivering multiple message at once and will
use the NSQ mpub
directive under the hood.
FastlyNsq::Messenger.deliver_multi(messages: array_of_msgs, topic: 'my_topic')
FastlyNsq::Messenger
can also be used to manage Producer connections
# get a producer:
producer = FastlyNsq::Messenger.producer_for(topic: 'hot_topic')
# get a hash of all persisted producers:
producers = FastlyNsq::Messenger.producers
# terminate a producer
FastlyNsq::Messenger.terminate_producer(topic: 'hot_topic')
# terminate all producers
FastlyNsq::Messenger.terminate_all_producers
FastlyNsq::Http
Wrappers around nsqd
and nsqlookupd
http api's described here:
Nsqd
Implements most of the Nsqd api.
Example usage:
FastlyNsq::Http::Nsqd.ping
FastlyNsq::Http::Nsqd.create_channel(topic: 'foo', channel: 'bar')
FastlyNsq::Http::Nsqd.stats(topic: 'foo', format: '')
TODO:
- Debug endpoints (
/debug/*
) - Config PUT (
/config/nsqlookupd_tcp_address
) - Correct Handling of
mpub
binary
mode
Nsqlookupd
Implements all of the Nsqlookupd api.
Example usage:
FastlyNsq::Http::Nsqlookupd.nodes
FastlyNsq::Http::Nsqlookupd.lookup(topic: 'foo')
Testing
FastlyNsq
provides a test mode and a helper class to make testing easier.
In order to test classes that use FastlyNsq without having real connections to NSQ:
require 'fastly_nsq/testing'
RSpec.configure do |config|
config.before(:each) do
FastlyNsq::Testing.fake!
FastlyNsq::Testing.reset!
end
end
To test processor classes you can create test messages:
test_message = FastlyNsq::Testing.message(data: { 'count' => 123 })
My::ProcessorKlass.call(test_message)
expect(some_result)
Configuration
See the documentation for additional settings
Example:
FastlyNsq.configure do |config|
config.channel = "z"
config.producer_nsqds = ["nsqd01:4150", "nsqd02:4150"]
config.lookupd_http_addresses = ["lookupd01:4161", "lookupd02:4161"]
config.logger = Logger.new(STDOUT)
config.max_attempts = 10
config.max_req_timeout = 10_000
config.max_processing_pool_threads = 42
end
Environment Variables
The URLs for the various
NSQ endpoints are expected
in ENV
variables.
Below are the required variables and sample values for using stock NSQ on OS X, installed via Homebrew:
NSQD_HTTP_ADDRESS='127.0.0.1:4151'
NSQLOOKUPD_HTTP_ADDRESS='127.0.0.1:4161, 10.1.1.101:4161'
NSQD_CONSUMERS='127.0.0.1:4150'
NSQD_PRODUCERS='127.0.0.1:4150'
See the .sample.env
file
for more detail.
Development
The fastest way to get up and running for development is to use the Docker container provided by Docker Compose:
- Clone:
git clone https://github.com/fastly/fastly_nsq.git
cd fastly_nsq
- run
bundle install
- run
docker-compose up -d
rake spec
You will still need the ENV
variables as defined above.
Contributors
- Adarsh Pandit (@adarsh)
- Thomas O'Neil (@alieander)
- Joshua Wehner (@jaw6)
- Lukas Eklund (@leklund)
- Josh Lane (@lanej)
- Hassan Shahid (@set5think)
Acknowledgements
- Documentation inspired by Steve Losh's "Teach Don't Tell" post.
- Thanks to Wistia for
nsq-ruby
.
Copyright
Copyright (c) 2016 Fastly, Inc under an MIT license.
See LICENSE.txt for details.
Metadata
- Ignore