Queue API
stats
: Queue stats
q.stats ((err, res) => {
...
})
res
contains usage statistics (elements inserted, elements extracted, paused status).
name
: Queue name
var qname = q.name ()
type
: Queue type
var qtype = q.type ()
Returns a string with the type of the queue (the type of backend which was used to create it).
size
: Queue occupation
q.size ((err, res) => {
...
})
Returns the number of elements in the queue that are already elligible (that is, excluding scheduled elements with a schedule time in the future).
totalSize
: Total queue occupation
q.totalSize ((err, res) => {
...
})
Returns the number of elements in the queue (that is, including scheduled elements with a schedule time in the future).
schedSize
: Size of Scheduled
q.schedSize ((err, res) => {
...
})
Returns the number of scheduled elements in the queue (that is, those with a schedule time in the future). Returns 0 if the queue does not support scheduling.
resvSize
: Reserved elements size
q.resvSize ((err, res) => {
...
})
Returns the number of reserved elements in the queue. Returns null
if the queue does not support reserve.
pause
/ paused
: Pause/Resume
// pauses the queue
q.pause (true)
// resumes the queue
q.pause (false)
// gets paused status of queue
q.paused ((err, is_paused) => {
...
})
Pauses/Resumes all consumers on this queue (calls to pop()
). Producers are not affected (calls to push()
).
The pause/resume condition is propagated via the signallers, so this affects all consumers, not only those local to the process, if a redis-pubsub or mongo-capped signaller is used.
Also, the paused condition is stored as stats, so any new call to pop()
will honor it.
next_t
: Time of schedule of next message
q.next_t ((err, res) => {
...
})
Returns a Date
, or null
if queue is empty. Queues with no support for schedule/delay always return `null
push
: Add element to queue
q.push (payload, [opts,] (err, res) => {
...
})
Adds payload to the queue and calls passed callback upon completion. Callback's res will contain the id assigned to the inserted element, if the backup provides one.
payload
can be an object
, array
, string
, number
or Buffer
Possible opts:
mature
: unix timestamp where the element would be elligible for extraction. It is guaranteed that the element won't be extracted before this time.delay
: delay in seconds to calculate the mature timestamp, if mature is not provided. For example, a delay=120 guarantees the element won't be extracted until 120 secs have elapsed at least.tries
: value to initialize the retry counter, defaults to 0 (still no retries).hdrs
: object with scalar-valued keys (ie, string, number or boolean) to be added alongside thepayload
as general purpose headers
mature and delay have no effect if the backend does not support delay/schedule.
pop
: Get element from queue
var tr = q.pop (cid, [opts,] (err, res) => {
...
})
Obtains an element from the queue. Callback is called with the element obtained if any, or if an error happened. If defined, the operation will wait for opts.timeout
seconds for an element to appear in the queue before bailing out (with both err
and res
being null). However, it immediately returns an id that can be used to cancel the operation at anytime.
*cid*
is an string that identifies the consumer entity; it is used only for debugging purposes.
Possible opts:
- timeout: milliseconds to wait for an elligible element to appear in the queue to be returned. If not defined it will wait forever
- reserve: if
true
the element is only reserved, not completely returned. This means either ok or ko operations are needed upon the obtained element once processed, otherwise the element will be rolled back (and made available again) at some point in the future (this is only available on backends capable of reserve/commit).
cancel
: Cancel a pending Pop
var tr = q.pop (cid, opts, (err, res) => {...});
.
.
.
q.cancel (tr);
Cancels a pending pop
operation, identified by the value returned by pop()
If no tr
param is passed, or it is null
, all pending pop
operations on the queue are cancelled. Cancelled pop
operations will get 'cancel'
(a string) in the error
parameter value of the callback.
ok
: Commit a reserved element
q.ok (id|obj, (err, res) => {
...
})
Commits a reserved element by its id (the id would be assigned to res._id
on the res
param of pop()
operation) or specifying the entire
object (as it was returned by the reserve call). This effectively erases the element from the queue.
const tr = q.pop ('my-consumer-id', {reserve: true}, (err, res) => {
// do something with res
...
// alternative 1: commit it by id
q.ok (res._id, (err, res) => {
...
});
// alternative 2: commit it by full object
q.ok (res, (err, res) => {
...
});
});
ko
: Roll back a reserved element
q.ko (id|obj, next_t, (err, res) => {
...
})
Rolls back a reserved element by its id (the id would be at res._id
on the res
param of pop()
operation) or specifying the entire
object (as it was returned by the reserve call). This effectively makes the element available again at the queue, marking it to be
mature at next_t
(next_t
being a millsec-unixtime). If no next_t
is specified or a null
is passed, now()
is assumed.
the res
param of the callback can take the following values:
true
if all went ok and an element was in fact rolled back- nullish if the element does not exist in the queue
'deadletter'
(as string) if the rollback resulted in the element being moved to deadletter
As with ok()
, you can pass the entire reserved object as id
:
const tr = q.pop ('my-consumer-id', {reserve: true}, (err, res) => {
// do something with res
...
// commit or rollback it
if (succeed) q.ok (res, (err, res) => {
...
})
else q.ko (res, (err, res) => {
if (res === 'deadletter') {
// moved to deadletter
...
}
...
})
});
You must pass the entire res
object for the deadletter feature to work; even if activated at the factory, ko()
will not honor deadletter if you only pass the res._id
as id
.
remove
: Delete elements from queue by id
q.remove (id, (err, res) => {
...
})
Removes an element from a queue, by using the id returned at insertion time (backends supporting remove
will return an in upon insertion). A reserved element can not be removed, and will be considered as nonexistent
the res
param of the callback can take the following values:
true
if all went ok and the element was removed- nullish if the element does not exist in the queue
drain
: Drain queue
q.drain (err => {
...
})
Drains a queue. This is a needed operation when a backend does read-ahead upon a pop()
, or buffers push()
operations for later; in this case, you may want to be sure that all extra elemens read are actually popped, and all pending pushes are committed.
'drain' will immediately inhibit push()
: any call to push()
will immediately result in a 'drain'
(a string) in the error
parameter value of the callback. The callback will be called when all pending pushes are committed, and all read-ahead on a pop() has been actually popped.
Also, drain()
will also call cancel()
on the queue immediately before finishing, in case of success.