About
Pipelines
is a Keuss extension for building ETL processing graphs with ease while guaranteeing atomicity in the processing: whatever happens at the processing of an element, the element is guaranteed to be in either the source or in the destination queue; never in both, never in none.
Keuss pipelines are build upon Keuss Queues with pipeline capacity, which means Pipelines inherit all their advantages in terms of HA, durability and performance. So far, Keuss offers only one Queue backend with pipeline capacity, pl-mongo
Queues are linked together with processing units named Processors, which glue together a source queue with zero or more destination queues. Each processor encapsulates a loop that could be described -in its simplest form- as follows:
forever do
src_queue.reserve () -> element # reserve an element from entry queue
process (element) -> err, res # process the element
if (err) then
if (err.drop) do # error tells processor to drop/ignore the element
src_queue.commit (element)
else do
src_queue.rollback (element) # regular error, rollback. It would be retried
end
else
if (res.drop) do # processed ok, but drop the item anyway
src_queue.commit (element)
else do
# commit on entry queue and insert into the exit queue, all in one atomic operation
# modifications in the payload are conserved
move_to_next_queue (element, src_queue)
end
end
next_loop
end
- The
process()
part is user-provided, passed as a function on the initialization of the processor - The exact semantics of
move_to_next_queue()
vary depending on the specific type of Processor chosen
Real, simple example
Here is the simplest possible example: 2 queues connected with a very simple processor. Elements in the source queue are taken, a passed: true
is added to them and moved to the next queue:
const MQ = require ('keuss/backends/pl-mongo');
const PDL = require ('keuss/Pipeline/DirectLink');
const async = require ('async');
const factory_opts = {
url: 'mongodb://localhost/qeus'
};
// initialize factory
MQ (factory_opts, (err, factory) => {
if (err) return console.error (err);
// factory ready, create 2 queues on default pipeline
const q_opts = {};
async.parallel ({
q1: cb => factory.queue ('test_pl_1', q_opts, cb),
q2: cb => factory.queue ('test_pl_2', q_opts, cb),
}, (err, queues) {
if (err) return console.error (err);
// tie them up, q1 -> q2
const pdl = new PDL (queues.q1, queues.q2);
pdl.start ((elem, done) => {
// pass element to next queue, set payload.passed to true
done (null, {
update: {
$set: {passed: true}
}
});
});
// insert elements in the entry queue
async.timesLimit (111, 3, (n, next) => queues.q1.push ({a:n, b:'see it spin...'}, next));
// read elements at the outer end
async.timesLimit (111, 3, (n, next) => queues.q2.pop ('exit', (err, res) => {
console.log ('end point get', res);
next ();
}));
}));
});
just run this example and you'll see 111 elements being inserted at q1, being processed at the pdl processor, and then popped from q2
Pipeline-aware Queues
As stated before only one Keuss Queue backed -pl-mongo
- is compatible with pipelines. Those are the pipeline-related options available at the backend:
pipeline
: specifies the pipeline name for this queue. Only queues within the same pipeline (that is, same mongodb url and same pipeline name) can actually work together in a pipeline. Defaults todefault
In the above example both queues q1 and q2 are created in a pipeline named default
. To use a different one you just change the code into:
const q_opts = {pipeline: 'some_other_pipeline'};
async.parallel ({
q1: cb => factory.queue ('test_pl_1', q_opts, cb),
q2: cb => factory.queue ('test_pl_2', q_opts, cb),
}, (err, queues) {
...
Also, pipeline-aware queues provide a new operation:
pl_step (id, next_queue, opts, callback)
id
is a previously reserved Idnext_queue
is the queue to (atomically) move the item toopts
are extra options for the operation:mature
: Date instance with the not-before timestamp for the item, to be used when inserted intonext_queue
. Defaults tonow()
tries
: number of tries for the item, to be used when inserted into next_queue. Defaults to0
payload
: if specified, use this as item's payload when moving to next_queue. This totally substitutes the previous payloadhdrs
: a key-value object containing extra headers to set on the message. Headers can only be set, not unset or removedupdate
: Optional object containing mongodb update operations. Those are mapped to be applied to the message'spayload
. For example, in the example above:done (null, {
update: {
$set: {passed: true}
}
});the
update
parameter of the second argument todone()
is passed internally topl_step()
asopts.update
: this would cause the message'spayload.passed
to be set totrue
even if there's no explicit mention ofpayload
The whole pl_step()
operation is guaranteed to be atomic; this includes applying of opts.payload
or opts.update
if present
Also, opts.payload
takes precedence over opts.update
: if both are specified only the former is taken into account, and the latter is totally ignored