Project

oplogjam

0.0
No commit activity in last 3 years
No release in over 3 years
A Ruby library to convert MongoDB oplog entries into SQL
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Development

~> 2.4
~> 12.0
~> 3.6
~> 0.9

Runtime

~> 4.2
~> 3.3
~> 0.21
~> 5.0
 Project Readme

Oplogjam Build Status

Current version: 0.1.1
Supported Ruby versions: 2.0, 2.1, 2.2
Supported MongoDB versions: 2.4, 2.6, 3.0, 3.2, 3.4
Supported PostgreSQL versions: 9.5, 9.6

An experiment in writing a "safe" MongoDB oplog tailer that converts documents to PostgreSQL JSONB in Ruby.

Based on experiences running Stripe's now deprecated MoSQL project in production, this project provides a core library which stores all MongoDB documents in the same standard table schema in PostgreSQL but leaves all configuration and orchestration to the user. This means that this library can be used to power an end-to-end MoSQL replacement but does not provide all functionality itself.

At its heart, the library connects to a MongoDB replica set oplog and provides an abstraction to users so they can iterate over the operations in the oplog and transform those into equivalent PostgreSQL SQL statements.

DB = Sequel.connect('postgres:///acme')
mongo = Mongo::Client.new('mongodb://localhost')

Oplogjam::Oplog.new(mongo).operations.each do |operation|
  operation.apply('acme.widgets' => DB[:widgets], 'acme.anvils' => DB[:anvils])
end

Requirements

Why does apply take a mapping?

This library expects to replay operations on MongoDB collections on equivalent PostgreSQL tables. As the MongoDB oplog contains all operations on a replica set in a single collection, you must provide a mapping between MongoDB namespaces (e.g. a database and collection name such as foo.bar for a collection bar in the database foo) and PostgreSQL tables (represented by Sequel datasets). Any operations for namespaces not included in the mapping will be ignored.

For example, if we only want to replay operations on foo.bar to a table foo_bar in PostgreSQL, we might have a mapping like so:

DB = Sequel.connect('postgres:///oplogjam_test')
mapping = { 'foo.bar' => DB[:foo_bar] }

Then we can pass this mapping when we call apply on an operation, e.g.

oplog.operations.each do |operation|
  operation.apply(mapping)
end

In order for this to work, the PostgreSQL table foo_bar must have the following schema:

                             Table "public.foo_bar"
   Column   |            Type             |              Modifiers
------------+-----------------------------+-------------------------------------
 uuid       | uuid                        | not null default uuid_generate_v1()
 id         | jsonb                       | not null
 document   | jsonb                       | not null
 created_at | timestamp without time zone | not null
 updated_at | timestamp without time zone | not null
 deleted_at | timestamp without time zone |
Indexes:
    "foo_bar_pkey" PRIMARY KEY, btree (uuid)
    "foo_bar_id_deleted_at_key" UNIQUE CONSTRAINT, btree (id, deleted_at)
    "foo_bar_id_index" UNIQUE, btree (id) WHERE deleted_at IS NULL

We can create this ourselves or use Oplogjam::Schema to do it for us:

schema = Oplogjam::Schema.new(DB)
schema.create_table(:foo_bar)
schema.import(collection, :foo_bar) # Optionally import data from a MongoDB collection
schema.add_indexes(:foo_bar)

Why does this project exist?

Since maintenance of MoSQL by Stripe was ended, there have been several major changes that affect anyone designing a system to replay a MongoDB oplog in PostgreSQL:

  • The MongoDB driver ecosystem was overhauled and the Ruby driver API changed significantly;
  • PostgreSQL 9.5 introduced new JSONB operations such as jsonb_set for updating fields in JSONB objects;
  • PostgreSQL 9.5 also introduced INSERT ON CONFLICT for effectively "upserting" duplicate records on INSERT.

Running MoSQL in production also revealed that we didn't need its rich support for transforming MongoDB documents into typical relational schema with typed columns but instead relied entirely on its JSONB support: effectively mirroring the MongoDB document by storing it in a single JSONB column.

With that specific use case in mind, I wanted to explore whether a library to safely transform arbitrary MongoDB operations into SQL could be done in Ruby and remain somewhat idiomatic.

Why doesn't this project come with some sort of executable?

While the library is more opinionated about the data schema of PostgreSQL, it doesn't attempt to make any decisions about how you decide which collections you want to replicate and where they should be replicated to. Similarly, connecting to databases, logging, etc. are all left up to the user as something that can differ wildly.

While in future I may add an executable, for now this is up to the user to manage.

API Documentation

  • Oplogjam::Oplog
    • Oplogjam::Oplog.new(client)
    • Oplogjam::Oplog#operations([query])
  • Oplogjam::Schema
    • Oplogjam::Schema.new(db)
    • Oplogjam::Schema#create_table(name)
    • Oplogjam::Schema#add_indexes(name)
    • Oplogjam::Schema#import(collection, name, batch_size = 100)
  • Oplogjam::Operation
    • Oplogjam::Operation.from(bson)
  • Oplogjam::Noop
    • Oplogjam::Noop.from(bson)
    • Oplogjam::Noop#message
    • Oplogjam::Noop#id
    • Oplogjam::Noop#timestamp
    • Oplogjam::Noop#ts
    • Oplogjam::Noop#==(other)
    • Oplogjam::Noop#apply(mapping)
  • Oplogjam::Insert
    • Oplogjam::Insert.from(bson)
    • Oplogjam::Insert#id
    • Oplogjam::Insert#namespace
    • Oplogjam::Insert#document
    • Oplogjam::Insert#timestamp
    • Oplogjam::Insert#ts
    • Oplogjam::Insert#==(other)
    • Oplogjam::Insert#apply(mapping)
  • Oplogjam::Update
    • Oplogjam::Update.from(bson)
    • Oplogjam::Update#id
    • Oplogjam::Update#namespace
    • Oplogjam::Update#update
    • Oplogjam::Update#query
    • Oplogjam::Update#timestamp
    • Oplogjam::Update#ts
    • Oplogjam::Update#==(other)
    • Oplogjam::Update#apply(mapping)
  • Oplogjam::Delete
    • Oplogjam::Delete.from(bson)
    • Oplogjam::Delete#id
    • Oplogjam::Delete#namespace
    • Oplogjam::Delete#query
    • Oplogjam::Delete#timestamp
    • Oplogjam::Delete#ts
    • Oplogjam::Delete#==(other)
    • Oplogjam::Delete#apply(mapping)
  • Oplogjam::ApplyOps
    • Oplogjam::ApplyOps.from(bson)
    • Oplogjam::ApplyOps#id
    • Oplogjam::ApplyOps#namespace
    • Oplogjam::ApplyOps#operations
    • Oplogjam::ApplyOps#timestamp
    • Oplogjam::ApplyOps#ts
    • Oplogjam::ApplyOps#==(other)
    • Oplogjam::ApplyOps#apply(mapping)
  • Oplogjam::Command
    • Oplogjam::Command.from(bson)
    • Oplogjam::Command#id
    • Oplogjam::Command#namespace
    • Oplogjam::Command#command
    • Oplogjam::Command#timestamp
    • Oplogjam::Command#ts
    • Oplogjam::Command#==(other)
    • Oplogjam::Command#apply(mapping)

Oplogjam::Oplog

An object representing a MongoDB oplog.

Oplogjam::Oplog.new(client)

mongo = Mongo::Client.new('mongodb://localhost')
Oplogjam::Oplog.new(mongo)

Return a new Oplogjam::Oplog for the given Mongo::Client client connected to a replica set.

Oplogjam::Oplogjam#operations([query])

oplog.operations.each do |operation|
  # Do something with operation
end

oplog.operations('ts' => { '$gt' => BSON::Timestamp.new(123456, 1) })

Return an infinite Enumerator yielding Operations from the Oplog with an optional MongoDB query which will affect the results from the underlying oplog.

Oplogjam::Schema

A class to manage the PostgreSQL schema used by Oplogjam (e.g. creating tables, importing data, etc.).

Oplogjam::Schema.new(db)

DB = Sequel.connect('postgres:///oplogjam_test')
schema = Oplogjam::Schema.new(DB)

Return a new Oplogjam::Schema for the given Sequel database connection.

Oplogjam::Schema#create_table(name)

schema.create_table(:foo_bar)

Attempt to create a table for Oplogjam's use in PostgreSQL with the given name if it doesn't already exist. Note that the name may be a single String, Symbol or a Sequel qualified identifier if you're using PostgreSQL schema.

A table will be created with the following schema:

  • uuid: a UUID v1 primary key (v1 so that they are sequential);
  • id: a jsonb representation of the primary key of the MongoDB document;
  • document: a jsonb representation of the entire MongoDB document;
  • created_at: the timestamp when this row was created by Oplogjam (not by MongoDB);
  • updated_at: the timestamp when this row was last updated by Oplogjam (not by MongoDB);
  • deleted_at: the timestamp when this row was deleted by Oplogjam (not by MongoDB).

If the table already exists, the method will do nothing.

Oplogjam::Schema#add_indexes(name)

schema.add_indexes(name)

Add the following indexes and constraints to the table with the given name if they don't already exist:

  • A unique index on id and deleted_at so no two records can have the same MongoDB ID and deletion time;
  • A partial unique index on id where deleted_at is NULL so no two records can have the same ID and not be deleted.

Note that the name may be a single String, Symbol or a Sequel qualified identifier if you're using PostgreSQL schema.

If the indexes already exist on the given table, the method will do nothing.

Oplogjam::Schema#import(collection, name[, batch_size = 100])

schema.import(mongo[:bar], :foo_bar)

Import all existing documents from a given Mongo::Collection collection into the PostgreSQL table with the given name in batches of batch_size (defaults to 100). Note that the name may be a single String, Symbol or Sequel qualified identifier if you're using PostgreSQL schema.

For performance, it's better to import existing data before adding indexes to the table (hence the separate create_table and add_indexes methods).

Oplogjam::Operation

A class representing a single MongoDB oplog operation.

Oplogjam::Operation.from(bson)

Oplogjam::Operation.from(document)

Convert a BSON document representing a MongoDB oplog operation into a corresponding Ruby object:

  • Oplogjam::Noop
  • Oplogjam::Insert
  • Oplogjam::Update
  • Oplogjam::Delete
  • Oplogjam::ApplyOps
  • Oplogjam::Command

Raises a Oplogjam::InvalidOperation if the type of operation is not recognised.

Oplogjam::Noop

A class representing a MongoDB no-op.

Oplogjam::Noop.from(bson)

Oplogjam::Noop.from(document)

Convert a BSON document representing a MongoDB oplog no-op into an Oplogjam::Noop instance.

Raises a Oplogjam::InvalidNoop error if the given document is not a valid no-op.

Oplogjam::Noop#message

noop.message
#=> "initiating set"

Return the internal message of the no-op.

Oplogjam::Noop#id

noop.id
#=> -2135725856567446411

Return the internal, unique identifier for the no-op.

Oplogjam::Noop#timestamp

noop.timestamp
#=> 2017-09-09 16:11:18 +0100

Return the time of the no-op as a Time.

Oplogjam::Noop#ts

noop.ts
#=> #<BSON::Timestamp:0x007fcadfa44500 @increment=1, @seconds=1479419535>

Return the raw, underlying BSON Timestamp of the no-op.

Oplogjam::Noop#==(other)

noop == other_noop
#=> false

Compares the identifiers of two no-ops and returns true if they are equal.

Oplogjam::Noop#apply(mapping)

noop.apply('foo.bar' => DB[:bar])

Apply this no-op to a mapping of MongoDB namespaces (e.g. foo.bar) to Sequel datasets representing PostgreSQL tables. As no-ops do nothing, this performs no operation.

Oplogjam::Insert

A class representing a MongoDB insert.

Oplogjam::Insert.from(bson)

Oplogjam::Insert.from(document)

Convert a BSON document representing a MongoDB oplog insert into an Oplogjam::Insert instance.

Raises a Oplogjam::InvalidInsert error if the given document is not a valid insert.

Oplogjam::Insert#id

insert.id
#=> -2135725856567446411

Return the internal, unique identifier for the insert.

Oplogjam::Insert#namespace

insert.namespace
#=> "foo.bar"

Return the namespace the insert affects. This will be a String of the form database.collection, e.g. foo.bar.

Oplogjam::Insert#document

insert.document
#=> {"_id"=>1}

Return the BSON::Document being inserted.

Oplogjam::Insert#timestamp

insert.timestamp
#=> 2017-09-09 16:11:18 +0100

Return the time of the insert as a Time.

Oplogjam::Insert#ts

insert.ts
#=> #<BSON::Timestamp:0x007fcadfa44500 @increment=1, @seconds=1479419535>

Return the raw, underlying BSON Timestamp of the insert.

Oplogjam::Insert#==(other)

insert == other_insert
#=> false

Compares the identifiers of two inserts and returns true if they are equal.

Oplogjam::Insert#apply(mapping)

insert.apply('foo.bar' => DB[:bar])

Apply this insert to a mapping of MongoDB namespaces (e.g. foo.bar) to Sequel datasets representing PostgreSQL tables. If the namespace of the insert maps to a dataset in the mapping, insert this document into the dataset with the following values:

  • A unique UUID v1 identifier;
  • The value of the document's _id stored as a JSONB value;
  • The entire document stored as a JSONB value;
  • The current time as created_at;
  • The current time as updated_at.

Oplogjam::Update

A class representing a MongoDB update.

Oplogjam::Update.from(bson)

Oplogjam::Update.from(document)

Convert a BSON document representing a MongoDB oplog update into an Oplogjam::Update instance.

Raises a Oplogjam::InvalidUpdate error if the given document is not a valid update.

Oplogjam::Update#id

update.id
#=> -2135725856567446411

Return the internal, unique identifier for the update.

Oplogjam::Update#namespace

update.namespace
#=> "foo.bar"

Return the namespace the update affects. This will be a String of the form database.collection, e.g. foo.bar.

Oplogjam::Update#update

update.update
#=> {"$set"=>{"name"=>"Alice"}}

Return the update to be applied as a BSON::Document.

Oplogjam::Update#query

update.query
#=> {"_id"=>1}

Return the query identifying which documents should be updated as a BSON::Document.

Oplogjam::Update#timestamp

update.timestamp
#=> 2017-09-09 16:11:18 +0100

Return the time of the update as a Time.

Oplogjam::Update#ts

update.ts
#=> #<BSON::Timestamp:0x007fcadfa44500 @increment=1, @seconds=1479419535>

Return the raw, underlying BSON Timestamp of the update.

Oplogjam::Update#==(other)

update == other_update
#=> false

Compares the identifiers of two updates and returns true if they are equal.

Oplogjam::Update#apply(mapping)

update.apply('foo.bar' => DB[:bar])

Apply this update to a mapping of MongoDB namespaces (e.g. foo.bar) to Sequel datasets representing PostgreSQL tables. If the namespace of the update maps to a dataset in the mapping, perform the update by finding the relevant row based on the query and transforming the MongoDB update into an equivalent PostgreSQL update.

This will also update the updated_at column of the target document to the current time.

Oplogjam::Delete

A class representing a MongoDB deletion.

Oplogjam::Delete.from(bson)

Oplogjam::Delete.from(document)

Convert a BSON document representing a MongoDB oplog delete into an Oplogjam::Delete instance.

Raises a Oplogjam::InvalidDelete error if the given document is not a valid delete.

Oplogjam::Delete#id

delete.id
#=> -2135725856567446411

Return the internal, unique identifier for the delete.

Oplogjam::Delete#namespace

delete.namespace
#=> "foo.bar"

Return the namespace the delete affects. This will be a String of the form database.collection, e.g. foo.bar.

Oplogjam::Delete#query

delete.query
#=> {"_id"=>1}

Return the query identifying which documents should be deleted as a BSON::Document.

Oplogjam::Delete#timestamp

delete.timestamp
#=> 2017-09-09 16:11:18 +0100

Return the time of the delete as a Time.

Oplogjam::Delete#ts

delete.ts
#=> #<BSON::Timestamp:0x007fcadfa44500 @increment=1, @seconds=1479419535>

Return the raw, underlying BSON Timestamp of the delete.

Oplogjam::Delete#==(other)

delete == other_delete
#=> false

Compares the identifiers of two deletes and returns true if they are equal.

Oplogjam::Delete#apply(mapping)

delete.apply('foo.bar' => DB[:bar])

Apply this delete to a mapping of MongoDB namespaces (e.g. foo.bar) to Sequel datasets representing PostgreSQL tables. If the namespace of the delete maps to a dataset in the mapping, perform a soft deletion by finding the relevant row based on the query and setting deleted_at to the current time.

This will also update the updated_at column of the target document to the current time.

Oplogjam::ApplyOps

A class representing a series of MongoDB operations in a single operation.

Oplogjam::ApplyOps.from(bson)

Oplogjam::ApplyOps.from(document)

Convert a BSON document representing a MongoDB oplog apply ops into an Oplogjam::ApplyOps instance.

Raises a Oplogjam::InvalidApplyOps error if the given document is not a valid apply ops.

Oplogjam::ApplyOps#id

apply_ops.id
#=> -2135725856567446411

Return the internal, unique identifier for the apply ops.

Oplogjam::ApplyOps#namespace

apply_ops.namespace
#=> "foo.bar"

Return the namespace the apply ops affects. This will be a String of the form database.collection, e.g. foo.bar.

Oplogjam::ApplyOps#operations

apply_ops.operations
#=> [#<Oplogjam::Insert ...>]

Return the operations within the apply ops as Oplogjam operations of the appropriate type.

Oplogjam::ApplyOps#timestamp

apply_ops.timestamp
#=> 2017-09-09 16:11:18 +0100

Return the time of the apply ops as a Time.

Oplogjam::ApplyOps#ts

apply_ops.ts
#=> #<BSON::Timestamp:0x007fcadfa44500 @increment=1, @seconds=1479419535>

Return the raw, underlying BSON Timestamp of the apply ops.

Oplogjam::ApplyOps#==(other)

apply_ops == other_apply_ops
#=> false

Compares the identifiers of two apply ops and returns true if they are equal.

Oplogjam::ApplyOps#apply(mapping)

apply_ops.apply('foo.bar' => DB[:bar])

Apply all of the operations inside this apply ops to a mapping of MongoDB namespaces (e.g. foo.bar) to Sequel datasets representing PostgreSQL tables. If the namespace of the operations map to a dataset in the mapping, apply them as described in each operation type's apply method.

Oplogjam::Command

A class representing a MongoDB command.

Oplogjam::Command.from(bson)

Oplogjam::Command.from(document)

Convert a BSON document representing a MongoDB oplog command into an Oplogjam::Command instance.

Raises a Oplogjam::InvalidCommand error if the given document is not a valid command.

Oplogjam::Command#id

command.id
#=> -2135725856567446411

Return the internal, unique identifier for the command.

Oplogjam::Command#namespace

command.namespace
#=> "foo.bar"

Return the namespace the command affects. This will be a String of the form database.collection, e.g. foo.bar.

Oplogjam::Command#command

command.command
#=> {"create"=>"bar"}

Return the contents of the command as a BSON::Document.

Oplogjam::Command#timestamp

command.timestamp
#=> 2017-09-09 16:11:18 +0100

Return the time of the command as a Time.

Oplogjam::Command#ts

command.ts
#=> #<BSON::Timestamp:0x007fcadfa44500 @increment=1, @seconds=1479419535>

Return the raw, underlying BSON Timestamp of the command.

Oplogjam::Command#==(other)

command == other_command
#=> false

Compares the identifiers of two commands and returns true if they are equal.

Oplogjam::Command#apply(mapping)

command.apply('foo.bar' => DB[:bar])

Apply this command to a mapping of MongoDB namespaces (e.g. foo.bar) to Sequel datasets representing PostgreSQL tables. As commands have no equivalent in PostgreSQL, this performs no operation.

Acknowledgements

License

Copyright © 2017 Paul Mucur.

Distributed under the MIT License.