Project

pg_conduit

0.0
No commit activity in last 3 years
No release in over 3 years
Stream data from one postgres database to another
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Development

~> 1.16
~> 2.14
~> 10.0
~> 3.0
~> 0.16.1
~> 0.9

Runtime

~> 1.0
 Project Readme

PgConduit

CircleCI

Stream data between two postgres databases. This is mostly an excuse for me to play around with concurrency in Ruby.

This gem is in early development. As such I would advise against using it in any environment where data integrity is important. I will release version 1.0 when I feel confident that the code is sufficiently robust.

Installation

Add this line to your application's Gemfile:

gem 'pg_conduit'

And then execute:

$ bundle

Or install it yourself as:

$ gem install pg_conduit

Quick Start

PgConduit.db_to_db(source, destination)

Returns an instance of PgConduit::Pipe that will execute queries passed to read and write against the source and destination databases, respectively.

The source and destination arguments are passed to PG::Connection, so any arguments that it accepts can be used.

Write one row at a time

source      = 'postgres://user:pass@source/db'
destination = { dbname: 'my_local_db' }

pipe = PgConduit.db_to_db(source, destination)

pipe.read('SELECT id, full_name, email FROM users')
    .transform do |user|
      <<-SQL
        INSERT INTO customers(user_id, name, email)
        VALUES ('#{user['id']}', '#{user['full_name']}', '#{user['email']}')
      SQL
    end
    .run

Write in batches

source      = 'postgres://user:pass@source/db'
destination = { dbname: 'my_local_db' }

pipe = PgConduit.db_to_db(source, destination)

pipe.read('SELECT id, full_name, email FROM users')
    .transform do |user| 
      <<-SQL
        ('#{user['id']}', '#{user['full_name']}', '#{user['email']}')
      SQL
    end
    .on_chunk(size: 100) do |values|
      <<-SQL
        INSERT INTO customers(user_id, name, email)
        VALUES #{values.join(',')}
      SQL
    end
    .run

PgConduit.db_to_file(source, destination)

Write output from source database to file.

source      = 'postgres://user:pass@source/db'
destination = '/some/system/path/user_count.txt'

pipe = PgConduit.db_to_file(source, destination)

pipe.read('SELECT count(*) FROM users')
    .transform { |res| "Number of users: #{res['count']}" }
    .run

PgConduit.db_to_stdout(source)

Write output from source database to stdout.

pipe = PgConduit.db_to_stdout('postgres://user:pass@source/db')

query = <<-SQL
  SELECT posts.user_id, users.email, count(posts.*) FROM users
  JOIN posts ON posts.user_id = users.id
  GROUP BY posts.user_id, users.email
SQL

pipe.read(query)
    .transform do |post_count| 
      "#{post_count['user_id']} | #{post_count['email']} - #{post_count['count']}"
    end
    .run

PgConduit.db_to_null(source)

Swallow output from source database. Mostly useful for testing. exec is an alias of write.

pipe = PgConduit.db_to_null('postgres://user:pass@source/db')
pipe.read('SELECT count(*) FROM users')
    .peak { |res| raise 'fail' unless res['count'] == 10 }
    .run

Development

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/[USERNAME]/pg_conduit.