Skip to main content

Modelling queues on MongoDB - II

· 9 min read

This is a continuation of Modelling queues on MongoDB - I, where we explained the technological basis on how to build a rather decent queue middleware by leveraging on preexisting DB technologies, and adding very little more

Now, we explore how to push the technology further, building on top of what we got so far to add extra, useful features

Adding delay/schedule

This is a feature that is seldom found on QMWs, but that should be easy to implement if the persistence is sound: after all, if you got the items safely stored, they can remain stored for any arbitrary period of time

The tricky part is to provide this feature while honoring these conditions:

  • performance should not degrade. Both push and pop should remain O(1)
  • items awaiting should not block items that are ready

On the other hand, this feature can be used to implement quite a lot of common logic, so it should be high in the wishlist. Some examples are:

  • exponential backoff if whatever you do with an item goes wrong and you want to retry later
  • simple scheduling of events or actions (items would model both)
  • with some extra logic, it's easy to build a recurring or cron-like system, where items happen periodically

As it turns out, this is quite easy to model on MongoDB while still maintaining all the features and capabilities of the good enough queues depicted before. The model can be expressed as:

operationimplementation base
pushcoll.insertOne ({payload: params.item, when: params.when OR now()})
popcoll.findOneAndDelete({when < now()}, {orderby: {when: $asc}}).payload

One of the obvious changes is, we no longer insert the item as is: we encapsulate it inside an envelope where we put extra information; in this case, a timestamp stating when the object should start being eligible for a pop operation. Thus, the pop will only affect items whose when timestamp lies in the past, and ignore those with the timestamp still in the future

Then, in order to keep the performance close to O(1) we must be sure the collection has an index on when; moreover, it would be advisable to also order the findOneAndDelete operation by when, ascending: this way we will add best-effort ordering, where elements with a longer-due timestamp are popped first

Adding reserve-commit-rollback

A feature that should be offered on every decent QMW is the ability to reserve an item, then process it and commit it once done, or rollback it if something fails and we want it to be retried later (or by other consumer)

This allows for what's known as at-least-once semantics: every item in the queue is guaranteed to be treated at least once even in the event of consumer failure. It does not guarantee lack of duplications, though. By contrast, the simple pop model provides at_most_once semantics: duplications are guaranteed to not to happen, but at the cost of risk of item loss if a consumer malfunctions

Reserve-commit-rollback model can be expressed as the following extension of the delay/schedule model above :

operationimplementation base
pushcoll.insertOne ({payload: params.item, when: params.when OR now(), retries: 0, reserved: false})
popcoll.findOneAndDelete({when < now()}, {orderby: {when: $asc}}).payload
reservecoll.findOneAndUpdate({when < now()}, {when: (now() + params.timeout), reserved: true}, {orderby: {when: $asc}})
commitcoll.delete({_id: params.reserved_item._id})
rollbackcoll.findOneAndUpdate({_id: params.reserved_item._id}, {when: (now() + params.delay), reserved: false, retries: $inc})

The general idea is to leverage the existing scheduling feature: to reserve an element is just to set its when time ahead in the future, by a fixed timeout amount; if the consumer is unable to process the element in this time, the item will become eligible again for other consumers.

The commit operation simply deletes the entry by using the _id of the element returned by reserve (which is referred to above as params_reserved_item); the rollback is a bit more complex: it modifies it to remove the reserved flag, increments the retries counter and -most important- sets a when time further in the future. This last bit fulfills the important feature of adding delays to retries, so an element rejected by a consumer for further retry will not be available immediately (when it is likely to fail again)

Note that the reserved flag is purely informational, although further checks could be done on it to improve robustness. The same goes for retries: it just counts the number of retries; more logic could be added to this, for example adding a deadletter-queue feature: if the number of retries goes too high, the items are moved to a separated queue for a more dedicated processing at a later time

Queues with historic data

Here's another twist: instead of fully removing items once consumed (by means f pop or commit), we just mark them as deleted; then we keep them around for some time, just in case we need to inspect past traffic, or replay some items. This feature can be desirable on environments where the ability to inspect or even reproduce past traffic is paramount. Also, this can be easily done at the expense of storage space only, with the following variation over the model above:

operationimplementation base
pushcoll.insertOne ({payload: params.item, when: params.when OR now(), retries: 0, reserved: false})
popcoll.findOneAndUpdate({when < now(), processed: $nonexistent}, {processed: now(), when: $INFINITE}, {orderby: {when: $asc}}).payload
reservecoll.findOneAndUpdate({when < now(), processed: $nonexistent}, {when: (now() + params.timeout), reserved: true}, {orderby: {when: $asc}})
commitcoll.update({_id: params.reserved._id}, {processed: now(), when: $INFINITE})
rollbackcoll.findOneAndUpdate({_id: params.reserved._id}, {when: (now() + params.delay), reserved: false, retries: $inc})

Then, we need to add a TTL index on the new field processed, with some long-enough expiration time

The main difference is the addition of a processed field that marks both whether the item was processed (that is deleted, no more, gone to meet its maker) and if so, when that happened. This field is also used to delete old entries, once some fixed time has elapsed. This means those queues can potentially grow very big, cause the condition to remove old entries is age, and not size

Note that, in order to improve performance a bit, when an element is processed (after either pop or commit) its when is set to some time far in the future (to $INFINITE and beyond, althoug in practice $INFINITE would be the largest date possible and not a real infinity), to move it 'away' of the get/reserve query

Queues fit for ETL pipelines: moving elements from one queue to the next, atomically

This is an interesting concept: one of the common uses of job queues is to build what's known as ELT pipelines: a set of computing stations where items are transformed or otherwise processed, connected with queues. A common example would be a POSIX shell pipeline, where several commands are tied together so the output of one becomes the input of the next; a ETL pipeline can have also forks and loops, so the topology can be generalized to a graph, not just a linear pipeline

Let us assume for a moment that messages are never created or duplicated in any station: in other words, an item entering a station will produce zero or one items as output. In this scenario, one of the reliability problems that arise is that, usually, moving items from one (input) queue to the next (output) queue is not an atomic operation. This may lead to either item loss or item duplication in the case of station malfunction, even if we use reserve-commit

If we push to output after committing on input, we incur on risk of loss:

whereas if we push to output before commit on input, we risk duplication:

So, the commit-in-input and push-on-output operations must be done atomically; and it turns out it is quite simple to extend the model to accommodate that as a new, atomic move-to-queue operation (although it comes at a price, as we will see)

The easiest way to implement this operation is to require that all queues of a given pipeline are to be hosted in the same mongodb collection; then, our item envelope grows to contain an extra field, q. And last, all operations are augmented to use this new field:

operationimplementation base
pushcoll.insertOne ({q: params.qname, payload: params.item, when: params.when OR now(), retries: 0, reserved: false})
popcoll.findOneAndDelete({q: params.qname, when < now()}, {orderby: {when: $asc}}).payload
reservecoll.findOneAndUpdate({q: params.qname, when < now()}, {when: (now() + params.timeout), reserved: true}, {orderby: {when: $asc}})
commitcoll.delete({_id: params.reserved._id})
rollbackcoll.findOneAndUpdate({_id: params.reserved._id}, {when: (now() + params.delay), reserved: false, retries: $inc})

The new operation move-to-queue is expected to act upon a reserved item, and can be modelled as:

operationimplementation base
moveToQcoll.findOneAndUpdate({_id: params.reserved._id}, {q: params.new_qname, reserved: false, retries: 0})

The operation is rather similar to a rollback, and it is definitely atomic

Payload mutability

In any ETL worth its salt the payloads of the items being managed should be mutable: most logic would be otherwise narly impossible, or very complicated, to express.

The operations in the model depicted above do not allow that, but there is nothing that prevents it; it can certainly be done as part of the update section of the moveToQ and commit operations. It is simply not added here for clarity