Nodus
(WARNING: EXPERIMENTAL)
Also, note that most of this README is out-of-date and inconsistent at the moment...
Description
Framework for parallel data-flow based applications.
It is influenced by, inspired by, and in many cases similar to:
- Functional Reactive Programming (also here)
- Kahn Process Networks
- Algorithmic Skeletons (see also here)
- Iteratee IO
- Railway Oriented Programming
- Erlang
- And of course the hundreds of other reinventions with names like
/(((data|signal|packet)?(stream|flow)|pipe(line)?|(flow|event|signal|reactive)-(processing|programming|architecture|computing|language)/
combined with the multitude of parallelization, concurrency, and clustering paradigms, etc. etc...
In More Detail
- Guiding philosophies
- Steady-state
- Composable
- Proportionate
- Real-time & simulation-time
- Resilient (failure modes as first class features, exception/failure paths, bulkheads, circuitbreakers, hibernation, ...)
- Simple single-stream sequential
- URI-like notation
- Token state accumulation
- Single-stream sequential with explicit sink
- Single-stream parallel
- Branches
- Forks
- Waypoints
- Pattern Branching
- Multi-stream
- Getting away with duck-typing
Guiding Assumptions
This library and associated commandline tools are most appropriate for these types of (overlapping and somewhat redundant) problems & constraints:
Dataflow-Oriented | Problems where the easiest way to look at it is a (possibly branching) pipeline of operations on a stream of data. |
Steady-State | Where the processing nodes and overall application have upper bounds on their memory requirements in order to safely and reliably handle very long running streams. i.e., bounded online algorithms and streaming-algorithms |
Functional | Most of the generator & processing nodes are usually assumed to be side-effect-free and pure (at least given all previous inputs). |
Composable | Easy to make nodes out of combinations and networks of other nodes |
Proportionate | Very easy and fast to do a simple pipeline (for example some simple functions that mutate an integer from within the console), but easily scales up to more complex production-ready solutions/projects. It strives to maintain the following inequality: effort ≤ problem‐complexity . |
It is additionally tuned for (but doesn't assume) problems with the following properties:
Parallel | For example, map-reduce type problems, or wherever strictly sequential simply isn't required. |
Ordered | When there is a natural ordering to the tokens. e.g., a time-series, or bytes from most IO streams; |
State-Accumulation | Where the tokens can accumulate state and where processing nodes can look at previous process results relative to that token as they pass through the graph. As opposed to destructive changes to the token at each process or lack of tokens altogether (like simple functions). |
Multi-Stream | Potentially create secondary streams that are unsynchronized with the original stream; |
Coordinated | Dataflow type problems where there is a need to synchronize/coordinate multiple orthogonal streams; |
Decoupled | A way to cache intermediate results (e.g., half way through the pipeline) so that multiple stream processing applications can be simultaneously writing results and other unrelated processes reading a stream of results- sometimes in sync with when they are being written by the decoupled process. (some good examples to come). |
Daemonized | Where certain stream sources and graphs of processors and sinks should be managed as a single long-lived system process when desired. |
Simulations/Reruns | Persistent caching nodes (nexus decouplers) etc. allow one to easily simulate reruns of past parts of a stream- possibly generating a new version of subsequently persisted results. |
Rewinding | Processing nodes that require (bounded) rewinding that propagates to other nodes. |
Use Little's law for queue bounds
Example: Single-Stream Sequential
+---+ +------+ +------+
| G | --> | f(x) | --> | g(x) | -->
+---+ +------+ +------+
- No sink specified at the end, so it simply outputs everything to STDOUT
- Generator types, from simplest to most complex:
- Scalar? Not very useful; does it make sense to allow it?
- Simpler proc/block/lambda than an enumerable? - instantiate via implicit counter generator with an offset which
uses the simple proc as the next block (so you could have a generator such as
->(x){Math.sin(x/100.0)}
) - Any enumerable or even just an object with an
each
method (will be made lazy if it isn't already) - Simple DSL
- State-machine class
- Decoupler
- Synchronized Decoupler
Components
Data
Thing | Otherwise Known As | |
---|---|---|
Stream | Flow, Signal | Related ordered data representation- bundled into tokens (chunks). A gradually consumed state |
Token | Packet, Event, Chunk, Element, Sample | Coherent instance of stream data being mutated and passing through the pipelines (potentially in parallel) - special values of EOF & Exceptionals |
Stream
Defined by:
- The eventual property distribution of its constituent tokens (possibly given as a shorthand name/"token-type"), which, because the token's internal state ends up mirroring the operations that have occurred, means it is also a way of describing the graph of process nodes that act on tokens within the stream (explicitly or implicitly- don't know yet) (usually all or a subset of a nodus application specification);
- An origination node for that specific stream (not necessarily a standalone origination node like for an app)
- A unique session identifier (uuid) identifying this run-instance, possibly shared with other streams (or Nexus' / Decouplers)
- Creates the initial instances of tokens, with a monotonic order indexed from the first token of the session. (Hence, streams can be theoretically infinite into the future, but always have a finite well-defined past, at least in the context of a session).
- Sometimes a version, which can affect caching, for example, or conflict resolution when a sink is permanently saving state from a running stream, etc.
- No cycles. Cycle-like behavior is handled by creating another stream with a delay (for example).
Stream == token type == (the data accumulated in the token are related) & especially (same intervals/timing)
Token
- A chunk of data - the ordered atoms of a stream.
- Has a "type"/name that coincides with the stream
- Analogous to a rolling stone gathering moss. The token starts out very small, and accumulates more pieces of state/data as it moves through the nodes, with it's most recent state being the most relevant for the next node.
- Has a tree-like internal view of the accumulated state that ends up mirroring the topology of the node network.
Session
- The context within which the runtime system executes the dataflows/streams.
Nodes
A node may be a generator for one or more streams AND/OR a sink for one or more streams, AND/OR an operator on one or more streams (accepting tokens from one or more upstream nodes and emitting them to one (or more?) downstream nodes. Generally though most nodes will deal with only a single stream- and most of those will be processor nodes, sandwiched between a generator at the beginning and a sink at the end.
Aspect | Otherwise Known As | |
---|---|---|
Generator | Observable, Source, Origin, Enumerator, Producer, Start | Has one or more output streams that originate there & are not among its incoming streams (although the incoming streams may have participated in its creation). |
Sink | Consumer, Iteratee, Fold, Reducer, End | folds over one or more input streams & emits nothing except a final result/stats if the done/EOF token is encountered. Usually in charge of any external side-effects. |
Processor | Observable, Filter, Enumeratee, Operator, Function, Convolution-operation, Transformer | Receives a token from a stream and either passes it through or advances its state before outputting the same stream-type. |
Junction | Whenever a node has input from more than one signal |
Most nodes are intra-stream nodes...
Parameterization (applying the specified or calculated parameters) only happens once, when the first token is about to hit the node. Any state change after that needs to be handled manually.
(maybe nodes with no generators applied get default behavior if they are run standalone- integer sequence generator, for example, or default to stdin, argf, etc...)
Intra-Stream Nodes
Operate on data within a single stream
Type | Behavior |
---|---|
Pipe | Chains intra-stream nodes together to operate on a token sequentially |
Branch | Sends same token down multiple parallel branches (w/ or w/o conditions, w/ or w/o automatic remerge (wait) (?), w/ or w/o infered token subselection). Some paths may be skipped/short-circuited due to conditions. |
Tap | Observer, splice, tee ... Semantic sugar to specify a non-synchronizing branch off of a point in a stream from the perspective of the branch (as if it were a generator). Can also be dynamic and/or temporary. |
(Cached) | Wrap around a node if it has referential transparency (given all previous input). Might not implement for a while |
Process | (Map, Node, Function, ...) Simple function on latest token data |
System | A specialized Process that interfaces with an external application (via stdin/stdout/stderr for the most part?) |
External | Another specialized Process that interfaces with an external application, this time via some other IPC call/response mechanisms |
View | (Select?) Changes what the next node will consider the "active data" for the token. |
Recombine | (Wait, Merge, Synchronize, Waypoint) Named synchronization point that also causes a "view" to be the combination of all merged branches. timeout logic, subselection logic, etc. |
|
NOTES:
- Branch without remerge -> when first branch path finishes (or none are deemed applicable) parent node operation continues forward- meaning the other data may or may not be available by the time it reaches a sink.
- (possibly named sync-points / stream entry/exit points as syntactical sugar to break up description of graph)
- Special
Wait
nodes might include logic that allows a "partial" token to go ahead depending on certain system conditions etc.
Stream-level Nodes (Inter-stream)
-
Application
: Adhoc collection of process networks meant to be run by itself. Usually with a single generator and a single final output sink- although that stuff can be inferred potentially. While it's meant to be standalone, other nodes should still be allowed to use it when composing more. -
Generators
: Shorthand for a node with zero input streams and one or more output streams. In a broader semantic perspective a generator is a node that creates any new streams regardless of whether it has input streams (maybe we should call thoseOriginators
or something instead?)- sequences/counters
- deterministic
- integer
- monotonic
- sequence ... (e.g., from http://oeis.org)
- real
- ... (monotonic with offsets, scaling, ...)
- ... (e.g., complex?, digits of irrationals like pi, ...)
- integer
- stochastic (or deterministic with seed in some cases)
- rng ... (prng, high quality, ...)
- quasi-random ...
- distributions ...
- deterministic
- clocks/timers ... (including simulated timers? e.g., meaningful timing sequence without waiting the actual intervals between tokens...)
- system (although usually used as normal internal functions for the nodes instead of streams)
- state changes (e.g., network connectivity, service status, ...)
- ... (e.g., resource utilization, uptime, network load, ...)
- interprocess
- signals
- argv
- file
- simple/read
- followed (i.e.,
tail -f
-like or even-F
)
- database ... (via query or change-monitoring etc.)
- pipe
- stdin
- named
- unix pipe (via file descriptor)
- argf (combo stdin and/or file(s))
- message queue
- semaphore
- shared memory
- mmap
- unix socket
- net
- socket
- secure socket
- HTTP
- HTTPS
- ... (e.g., AMQP, Erlang-node, ...)
- sequences/counters
-
Projections
(one input stream, one different output stream with timing change & probably data change):- SeqDelay: (buffer, drop(n), drop-while, fifo-queue) output stream is same as input tokens but delayed by specified number of steps (implied (n - 1) delay when a cycle is specified or detected?)
- TimeDelay: introduce a system-time latency
-
Sink: (fold, accumulator, aggregate, reduce, inject) special ending node that only outputs a result stream with a single
token that reports some reduced version of all input tokens. (can be specified with a single operator, e.g.,
:+
for sum).- Decoupler: persists records
-
Future:
- first
- include?
- any?
- count
- ... other enumerable & functional operators that act on the whole population.
- Reject: (filter) Filters out specified tokens
- Select: (grep, search) Selects specified tokens
- Reactors: Nodes that give an impulse when they encounter certain conditions on the input stream: specifically for monitoring and reacting to certain conditions like queue lengths etc. (can be thought of as being implemented via -->[tap]-->[select])
- Mutate: Timing of new stream is the same as input, but data has been mutated in a way that incompatible with the incoming stream
-
Future:
- pattern detection
- sample/alias (time-based chunking)
- sketch (in lieu of sort, for example)
- chunk
- collect-concat / flat-map
- cycle
- each-cons
- take, take-while, drop, drop-while, skip, skip-while (?)
- ... other enumerable-like projection functions
- ... sink-like enumerable functions but "so far"- e.g., 'current-max,' 'current-minmax', ...
-
DIFFICULT (or impossible considering infinite streams and steady-state, but with some possible partial-solutions)
- reverse, sort, sort-by (although that could be some kind of sketch...)
-
Merge
junctions:- Mux: when token arrives on any input stream an output token is generated with copies of the latest of all input tokens along with how long it has been since those tokens (respectively) arrived.
- Zip: combines specified number of input tokens (at least one each) laterally and emits an output token. Note that this can cause non-steady-state behavior (including even race conditions) if one of the streams bottlenecks and the other stream builds up a huge backlog. (Need to at least throw an error when this seems like it's happening, and ideally even have mechanisms for automatically dropping, etc.)
- AggregateZip: emits an output token when at least one of every input stream is received. Tokens received from streams multiple times while waiting for the first token on one of the other streams are aggregated together (so the final resulting token contains a vector for each incoming stream, with every vector containing at least one item and at least one vector containing only one item [the last stream to give an inbound token while the others were building up]).
- StreamFold: has a kernel that updates state and/or outputs a token at any input receipt from any stream.
-
Future
- concat (if it's ever actually needed- nodus isn't really meant for it...)
-
Scatter
junctions:- Switch: output streams contain subsets of the input tokens where each token travels down a path (or more than one?) determined by a case statement. (can be thought of as being implemented via -->[branch]--*>[filter])
- StateSwitch: input tokens get fed to output stream depending on the last given token from a different input stream.
-
Generic custom
Junction
s: Implementing a Junction node requires that it has to actively use actor-like semantics (including timeouts and possibly polling multiple streams) to request tokens on the input streams. [implemented by the fact that in reality it is waiting on all streams at all times, and then decides whether it applies to the given wait condition].
Application / Runtime
Nodus Application Specification
(could have just as easily been named 'Stream Processing Graph' or 'Nodus Application Specification', ...)
Technically a normal node that happens to:
- Have neither inputs nor (maybe?) outputs
- Contain one or more initial generating nodes
- (therefore) allows execution by the
nodus
execution engine
In practical terms, it:
- Creates a stream which can then potentially create other streams
- Ends with explicit or implicit sinks/end-points/reductions
- Is "executable" in a standalone fashion
- Is generally behave as a steady-state daemon processing a hypothetically infinite stream of tokens
- Is usually specified at least at the highest level in a ".node" file that also describes its high-level purpose
- (in other words) is the top-most specification of a nodus, an Application.
- Will leave side-effects / external actions for the sinks
- Will finish by having executed the external action, outputting a reduced/folded result (not ideal since it implies batch-like single-run behavior rather than daemon-like), or persisting the final tokens (or a form thereof) in a Decoupler (persistent store separate from caches described shortly).
File/spec hierarchy, auto-loading, paths, ... (node-language agnosticism?)
Nodus Execution Engine (nodus
command)
- Parses/processes a nodus specification/application/standalone-process-graph
- Starts everything up for the nodus and (normally) daemonizes
- /etc/init.d-(etc.)-like daemon instance management
- Erlang-like server behavior, ideally (or maybe even actually running on a distributed OTP network...)
- Could eventually conceivably evolve into tools with higher levels of management/abstraction (for distributed processing and cluster management, for example)
Application Groups
- Ability for nodus to specify which applications should be running autonomously all the time (system-level daemonized), including restart policy and footprint monitoring vs. those that are experimental / dev / transient, vs. those that are something in-between.
Scratch
Token: any object instance but usually openstruct with hash-like access as well (so like HashWithIndifferentAccess) but with WRITE-ONCE semantics! (throws error if the same process/node tries to write the same field a second time)
suggested behavior:
- lock-field-value-on-write
- unlock-all-field-values (for when a node starts its turn | possibly specify that this proc/node is one that can write to it)
- exception on rewrite of a field after it's locked
Contributing
- Check out the latest master to make sure the feature hasn't been implemented or the bug hasn't been fixed yet.
- Check out the issue tracker to make sure someone already hasn't requested it and/or contributed it.
- Fork the project.
- Start a feature/bugfix branch.
- Commit and push until you are happy with your contribution.
- Ensure that it is accompanied with tests.
- Please try not to mess with the Rakefile, version, or history etc. If you want to have your own version, or it is otherwise necessary, then fine, but please isolate to its own commit so I can cherry-pick around it.
Copyright
Copyright (c) 2014 Joseph Wecker. MIT License. See LICENSE.txt for further details.