Project

bps-kafka

0.01
Repository is archived
Low commit activity in last 3 years
No release in over a year
https://github.com/bsm/bps
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
 Dependencies

Runtime

= 0.2.4
>= 1.1.0.beta1
 Project Readme

BPS

Build Status GoDoc Go Report Card Gem Version

Multi-backend abstraction for message processing and pubsub queues for Go and Ruby.

Documentation

Check auto-generated documentation:

Install

# go:
go get -u github.com/bsm/bps
go get -u github.com/bsm/bps/kafka
go get -u github.com/bsm/bps/nats
go get -u github.com/bsm/bps/stan

# ruby:
bundle add 'bps-kafka'
bundle add 'bps-nats'
bundle add 'bps-stan'

Backends: Go

Backends: Ruby

Publishing: Go

package main

import (
	"context"
	"fmt"

	"github.com/bsm/bps"
)

func main() {
	ctx := context.Background()
	pub := bps.NewInMemPublisher()
	defer pub.Close()

	topicA := pub.Topic("topic-a")
	topicB := pub.Topic("topic-b")

	_ = topicA.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-1"),
	})
	_ = topicB.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-2"),
	})
	_ = topicA.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-2"),
	})

	fmt.Println(len(topicA.(*bps.InMemPubTopic).Messages()))
	fmt.Println(len(topicB.(*bps.InMemPubTopic).Messages()))

}

Publishing: Ruby

require 'bps/kafka'

pub = BPS::Publisher.resolve('kafka://localhost:9092')
top = pub.topic('topic')

top.publish('foo')
top.publish('bar')

pub.close

To seed multiple brokers, use:

BPS::Publisher.resolve('kafka://10.0.0.1,10.0.0.2,10.0.0.3:9092')

If your brokers are on different ports, try:

BPS::Publisher.resolve('kafka://10.0.0.1%3A9092,10.0.0.2%3A9093,10.0.0.3%3A9094')

Subscribing: Go

package main

import (
	"context"
	"fmt"

	"github.com/bsm/bps"
)

func main() {
	ctx := context.Background()
	pub := bps.NewInMemPublisher()
	defer pub.Close()

	topicA := pub.Topic("topic-a")
	topicB := pub.Topic("topic-b")

	_ = topicA.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-1"),
	})
	_ = topicB.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-2"),
	})
	_ = topicA.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-2"),
	})

	fmt.Println(len(topicA.(*bps.InMemPubTopic).Messages()))
	fmt.Println(len(topicB.(*bps.InMemPubTopic).Messages()))

}