Project

aggregates

0.0
No release in over 3 years
Low commit activity in last 3 years
A ruby gem for writing CQRS applications
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies

Runtime

 Project Readme

Aggregates Icon

Aggregates

A ruby gem for writing CQRS applications with pluggable components.

Warning: This Gem is in active development and probably doesn't work correctly. Tests are really light.

Gem Version Ruby Style Guide

Table of Contents

  • Features
  • Requirements
  • Setup
  • Usage
    • Defining AggregateRoots
    • Creating Commands
    • Creating Events
    • Processing Commands
    • Value Objects
    • Filtering Commands
    • Processing Events
    • Building The Domain
    • Executing Your Domain
      • Storage Backends
      • Executing Commands
      • Auditing Aggregates
  • Development
  • Tests
  • Versioning
  • Code of Conduct
  • Contributions
  • License
  • History
  • Credits

Features

  • Pluggable Event / Command Storage Backends
  • Tools for Command Validation, Filtering, and Execution.
  • Opinionated structure for CQRS, Domain-Driven Design, and Event Sourcing.

Requirements

  1. Ruby 3.0+

Setup

To install, run:

gem install aggregates

Or Add the following to your Gemfile:

gem "aggregates"

Usage

Defining AggregateRoots

An AggregateRoot is a grouping of domain object(s) that work to encapsulate a single part of your Domain or Business Logic. The general design of aggregate roots should be as follows:

  • Create functions that encapsulate different operations on your Aggregate Roots. These functions should enforce business logic constraints and then capture state changes by creating events.
  • Create event handlers that actually perform the state changes captured by those events.

A simple example is below:

class Post < Aggregates::AggregateRoot
  # Write functions that encapsulate business logic.
  def publish(command)
    apply PostPublished, body: command.body, category: command.category
  end

  # Modify the state of the aggregate from the emitted events.
  on PostPublished do |event|
    @body = event.body
    @category = event.category
  end
end

Note: the message-handling DSL (on) supports passing a super class of any given event as well. Every on block that applies to the event will be called in order from most specific to least specific.

Creating Commands

Commands are a type of domain message that define the shape and contract of data needed to perform an action. Essentially, they provide the api for interacting with your domain. Commands should have descriptive names capturing the change they are intended to make. For instance, ChangeUserEmail or AddComment.

class PublishPost < Aggregates::Command
  interacts_with Post

  attribute :body
  attribute :category
  validates_length_of :body, minimum: 10
end

You can specify them via attr accessors and use ActiveModel::Validations to enforce data constraints.

Creating Events

An Event describes something that happened. They are named in passed tense. For instance, if the user's email has changed, then you might create an event type called UserEmailChanged.

class PostPublished < Aggregates::Event
  attribute :body
  attribute :category
end

Processing Commands

The goal of a CommandProcessor is to route commands that have passed validation and filtering. They should invoke business logic on their respective aggregates. Doing so is accomplished by using the same message-handling DSL as in our AggregateRoots, this time for commands.

class PostCommandProcessor < Aggregates::CommandProcessor
  # Instead of `process`, you may use `on`
  process PublishPost do |command, post|
    post.publish(command)
  end
end

Note: the message-handling DSL (process) supports passing a super class of any given event as well. Every process block that applies to the event will be called in order from most specific to least specific.

Value Objects

Often times you will find that you will have data clumps that are similar pieces of data that have the same rules, and schema. Typically these values represent a valid type in your domain and should be combined as a single value. That is where ValueObject comes in. The api is the same as commands and events.

class Name < Aggregates::ValueObject
  attribute :first_name
  attribute :last_name
  validates_presence_of :first_name, :last_name
end

When you have a command, validation logic will automatically include validating nested value objects to an arbitrary depth.

Filtering Commands

There are times where commands should not be executed by the domain logic. You can opt to include a condition in your command processor or aggregate. However, that is not always extensible if you have repeated logic between many commands. Additionally, it violates the single responsibility principal.

Instead, it is best to support this kind of filtering logic using CommandFilters. A CommandFilter uses the same Message Handling message-handling DSL as the rest of the Aggregates gem. This time, it needs to return a true/false back to the gem to determine whether or not (true/false) the command should be allowed. Many command filters can provide many blocks of the filter or on DSL. If any one of the filters rejects the command then the command will not be processed.

class UpdatePostCommand < Aggregates::Command
  interacts_with Post
  attribute :commanding_user_id
end

class UpdatePostBody < UpdatePostCommand
  attribute :body
end

class PostCommandFilter < Aggregates::CommandFilter
  # Instead of `filter`, you may use `on`
  filter UpdatePostCommand do |command, post|
    post.owner_id == command.commanding_user_id
  end
end

In this example, we are using a super class of UpdatePostBody. As with all MessageProcessors, calling filter with a super class will be called when any child class is being processed. In other words, on UpdatePostCommand will be called when you call Aggregates.execute_command with an instance of UpdatePostBody.

Processing Events

Event processors are responsible for responding to events and effecting changes on things that are not the aggregates themselves. Here is where the read side of your CQRS model can take place. Since Aggregates does not enforce a storage solution for any component of the application, you will likely want to provide a helper mechanism for updating projections of aggregates into your read model.

Additionally, the EventProcessor type can be used to perform other side effects in other systems. Examples could include sending an email to welcome a user, publish the event to a webhook for a subscribing micro service, or much more.

class RssUpdateProcessor < Aggregates::EventProcessor
  def update_feed_for_new_post(event)
    # ...
  end

  on PostPublished do |event|
    update_feed_for_new_post(event)
  end
end

Building The Domain

domain = Aggregates.create_domain do
  # Adding Command Processors
  process_commands_with PostCommandProcessor.new
  # Adding Event Processors
  process_events_with RssUpdateProcessor.new
  # Adding Command Filters
  filter_commands_with MyCommandFilter.new
end

Executing Your Domain

Storage Backends

Storage Backends are the method by which events and commands are stored in the system. You need to specify one in order to execute your domain.

executor = domain.execute_with MyAwesomeStorageBackend.new

Executing Commands

aggregate_id = Aggregates.new_aggregate_id
command = CreateThing.new(foo: 1, bar: false, aggregate_id: aggregate_id)
executor.execute_command(command)

increment = IncrementFooThing.new(aggregate_id: aggregate_id)
toggle = ToggleBarThing.new(aggregate_id: aggregate_id)
executor.execute_command(command)
executor.execute_command(command)

Auditing Aggregates

aggregate_id = Aggregates.new_aggregate_id
# ... Commands and stuff happened.
auditor = executor.audit MyAggregateType aggregate_id

# Each of these returns a list to investigate using.
events = auditor.events # Or events_processed_by(time) or events_processed_after(time)
commands = auditor.commands # Or commands_processed_by(time) or commands_processed_after(time)

# Or....
# View the state of an aggregate at a certain pont in time.
aggregate_at_time = auditor.inspect_state_at(Time.now - 1.hour)

Development

To contribute, run:

git clone https://github.com/resilient-vitality/aggregates.git
cd aggregates
bin/setup

You can also use the IRB console for direct access to all objects:

bin/console

Tests

To test, run:

bundle exec rake

Versioning

Read Semantic Versioning for details. Briefly, it means:

  • Major (X.y.z) - Incremented for any backwards incompatible public API changes.
  • Minor (x.Y.z) - Incremented for new, backwards compatible, public API enhancements/fixes.
  • Patch (x.y.Z) - Incremented for small, backwards compatible, bug fixes.

Code of Conduct

Please note that this project is released with a CODE OF CONDUCT. By participating in this project you agree to abide by its terms.

Contributions

Read CONTRIBUTING for details.

License

Copyright 2021 Resilient Vitality. Read LICENSE for details.

History

Read CHANGES for details. Built with Gemsmith.

Credits

Developed by Zach Probst at Resilient Vitality.