Processors
A small hierarchy of processors is provided with Pipelines:
BaseLink
Common base for all Processors, provides all the functionality common to all. It can not be used directly
Creation
const bl = new BaseLink (src_q, opts)
Although not intended to be instantiated, this serves as common initialization to all Processors
src_q
must be a pipelined queueopts
can contain:retry_factor_t, retry_base_t
: they control the delay imposed to an element when it is rolled back. The formula isdelay-in-seconds = item.tries * retry_factor_t + retry_base_t
They default to
2
and1
respectivelymature
: Date instance or unix timestamp (in milliseconds, as integer) expressing the not-before timestamp for the item, to be used when callingpl_step()
in the src queuedelay
: delay in seconds to calculatemature
, ifmature
is not specified
Methods
src()
: returns src queuename()
: returns Processor nameon_data(fn)
: specifies the process function to be applied to each elementstart(fn)
: starts the processor. Optionally, a process function can be passed; if not passed the process function must have been previously specified usingon_data()
stop()
: stops the Processor
Process Function
The function passed into on_data()
or start()
provides the processor logic; this function is referred to as processor function. This function is called on each step of the Processor loop with the reserved item, and it is expected to calls its callback once done with the item. The way the function calls the callback determines what happens with the item afterwards
The function looks like this:
function (item, cb) {
...
})
The item
is received exactly as it comes as result of a (successful) reserve()
call on the source queue; after processing the item cb
should be called once to finish the processing of item
and proceed with the next loop cycle. The callback has the following signature:
cb (err, res);
where:
if
err
is not nil- if
err.drop
is exactlytrue
the item is committed in the src queue and therefore dropped from the pipeline - else the item is rolled back in the src queue, using the processor's
retry_factor_t
andretry_base_t
to calculate the retry delay. If the queue was created with deadletter support, the item would be moved to the deadletter queue if it has reached its maximum number of rollbacks; in such case, the movement into deadletter is also atomic
- if
else (if
err
is nill)- if (
res.drop
is exactlytrue
the item is committed in the src queue and therefore dropped from the pipeline - else the item is passed to the text queue in the pipeline (by means of
pl_next()
)- if
res.mature
orres.delay
exist (or they were specified at the processor's creation) they are used to calculate the delay/mature of the element in the destination queue - if
res.payload
exists it is used to replace the item's payload entirely - else if
res.update
exists it is used as mongodb-update operations on the item's payload
- if
All those operations happen in an atomic way
- if (
Semantic this
in process function
The function is bound to the processor, so the function can access and use processor's primitives. For example, it can insert copies of the item, or new items, in any of the source or destination queues
In order to use this functionality the process function can not be declared as an 'arrow' function, since those can not be bound. Use the classic function xxxx (item, cb) {...}
if you intend to access the underlying Processor
Events
BaseLink inherits from EventEmitter
and publishes the following events:
error
: an error happened in the internal loop. It comes with one parameter, an object with the following fields:on
: exact type of error:src-queue-pop
: error while reserving an element on the src queuesrc-queue-commit-on-error
: error while committing an element on the src queue when an error was passed anderr.drop==true
src-queue-rollback-on-error
: error while rolling back an element on the src queue when an error was passedsrc-queue-commit-on-drop
: error while committing an element an element on the src queue when processed ok andres.drop==true
next-queue
: error while atomically moving the element to the next queue
elem
: element that caused the error. Not present insrc-queue-pop
error
: original error objectopts
: (only present innext-queue
) options passed internally topl_step()
DirectLink
Processor that connects the source queue to exactly one destination queue:
src-queue --> DirectLink --> dst-queue
Creation
const PDL = require ('keuss/Pipeline/DirectLink');
const bl = new PDL (src_q, dst_q, opts);
In addition to BaseLink
:
dst_q
must be a pipelined queue; also, bothsrc_q
anddst_q
must be of the same type and must belong to the same pipeline
Methods
In addition to those of BaseLink
dst()
: returns destination queue
Process Function
In the case of successful processing (i.e.: no err
in the callback invocation) the item is atomically moved to the dst
queue.
No other semantics are added to the process function.
ChoiceLink
Processor that connects the source queue to an array of queues; after processing, each item would be moved to exactly one of those queues:
|--> dst-queue-0
|--> dst-queue-1
src-queue --> ChoiceLink --|--> dst-queue-2
| ...
|--> dst-queue-n
Creation
const PCL = require ('keuss/Pipeline/ChoiceLink');
const cl = new PCL (src_q, array_of_dst_queues, opts);
In addition to BaseLink
:
array_of_dst_queues
must be an array of pipelined queues; each one must be of the same type and must belong to the same pipeline thansrc_q
Methods
In addition to those of BaseLink
:
dst_by_idx(idx)
: returns destination queue from the array, selected by array index (integer)dst_by_name(name)
: returns destination queue from the array, selected by queue name (string)dst_dimension ()
: returns number of possible destination queuesdst_names ()
: returns an array with the names of all dst queues
Process Function
ChoiceLink expects an res.dst
in the callback invocation, which must fulfill one of those conditions:
- be an integer and resolve to a valid element when applied as index to the array of destination queues
- be a string and correspond to the name of one of the destination queues
The element will be moved atomically to the specified destination queue upon successful processing (i.e.: no err
in the callback invocation)
Sink
Processor that connects the source queue to exactly zero destination queue. That is, a termination point: successfully processed elements are always removed from the pipeline
src-queue --> Sink
Creation
const PS = require ('keuss/Pipeline/Sink');
const bl = new PS (src_q, opts);
No extra parameters are expected in addition to those of BaseLink
Methods
No extra methods are provided in addition to those of BaseLink
Process Function
In the case of successful processing (i.e.: no err
in the callback invocation) the item is removed from the pipeline, exactly as if res.drop
were specified. Actually, res
is totally ignored