Skip to main content

· 9 min read

This is a continuation of Modelling queues on MongoDB - I, where we explained the technological basis on how to build a rather decent queue middleware by leveraging on preexisting DB technologies, and adding very little more

Now, we explore how to push the technology further, building on top of what we got so far to add extra, useful features

Adding delay/schedule

This is a feature that is seldom found on QMWs, but that should be easy to implement if the persistence is sound: after all, if you got the items safely stored, they can remain stored for any arbitrary period of time

The tricky part is to provide this feature while honoring these conditions:

  • performance should not degrade. Both push and pop should remain O(1)
  • items awaiting should not block items that are ready

On the other hand, this feature can be used to implement quite a lot of common logic, so it should be high in the wishlist. Some examples are:

  • exponential backoff if whatever you do with an item goes wrong and you want to retry later
  • simple scheduling of events or actions (items would model both)
  • with some extra logic, it's easy to build a recurring or cron-like system, where items happen periodically

As it turns out, this is quite easy to model on MongoDB while still maintaining all the features and capabilities of the good enough queues depicted before. The model can be expressed as:

operationimplementation base
pushcoll.insertOne ({payload: params.item, when: params.when OR now()})
popcoll.findOneAndDelete({when < now()}, {orderby: {when: $asc}}).payload

One of the obvious changes is, we no longer insert the item as is: we encapsulate it inside an envelope where we put extra information; in this case, a timestamp stating when the object should start being eligible for a pop operation. Thus, the pop will only affect items whose when timestamp lies in the past, and ignore those with the timestamp still in the future

Then, in order to keep the performance close to O(1) we must be sure the collection has an index on when; moreover, it would be advisable to also order the findOneAndDelete operation by when, ascending: this way we will add best-effort ordering, where elements with a longer-due timestamp are popped first

Adding reserve-commit-rollback

A feature that should be offered on every decent QMW is the ability to reserve an item, then process it and commit it once done, or rollback it if something fails and we want it to be retried later (or by other consumer)

This allows for what's known as at-least-once semantics: every item in the queue is guaranteed to be treated at least once even in the event of consumer failure. It does not guarantee lack of duplications, though. By contrast, the simple pop model provides at_most_once semantics: duplications are guaranteed to not to happen, but at the cost of risk of item loss if a consumer malfunctions

Reserve-commit-rollback model can be expressed as the following extension of the delay/schedule model above :

operationimplementation base
pushcoll.insertOne ({payload: params.item, when: params.when OR now(), retries: 0, reserved: false})
popcoll.findOneAndDelete({when < now()}, {orderby: {when: $asc}}).payload
reservecoll.findOneAndUpdate({when < now()}, {when: (now() + params.timeout), reserved: true}, {orderby: {when: $asc}})
commitcoll.delete({_id: params.reserved_item._id})
rollbackcoll.findOneAndUpdate({_id: params.reserved_item._id}, {when: (now() + params.delay), reserved: false, retries: $inc})

The general idea is to leverage the existing scheduling feature: to reserve an element is just to set its when time ahead in the future, by a fixed timeout amount; if the consumer is unable to process the element in this time, the item will become eligible again for other consumers.

The commit operation simply deletes the entry by using the _id of the element returned by reserve (which is referred to above as params_reserved_item); the rollback is a bit more complex: it modifies it to remove the reserved flag, increments the retries counter and -most important- sets a when time further in the future. This last bit fulfills the important feature of adding delays to retries, so an element rejected by a consumer for further retry will not be available immediately (when it is likely to fail again)

Note that the reserved flag is purely informational, although further checks could be done on it to improve robustness. The same goes for retries: it just counts the number of retries; more logic could be added to this, for example adding a deadletter-queue feature: if the number of retries goes too high, the items are moved to a separated queue for a more dedicated processing at a later time

Queues with historic data

Here's another twist: instead of fully removing items once consumed (by means f pop or commit), we just mark them as deleted; then we keep them around for some time, just in case we need to inspect past traffic, or replay some items. This feature can be desirable on environments where the ability to inspect or even reproduce past traffic is paramount. Also, this can be easily done at the expense of storage space only, with the following variation over the model above:

operationimplementation base
pushcoll.insertOne ({payload: params.item, when: params.when OR now(), retries: 0, reserved: false})
popcoll.findOneAndUpdate({when < now(), processed: $nonexistent}, {processed: now(), when: $INFINITE}, {orderby: {when: $asc}}).payload
reservecoll.findOneAndUpdate({when < now(), processed: $nonexistent}, {when: (now() + params.timeout), reserved: true}, {orderby: {when: $asc}})
commitcoll.update({_id: params.reserved._id}, {processed: now(), when: $INFINITE})
rollbackcoll.findOneAndUpdate({_id: params.reserved._id}, {when: (now() + params.delay), reserved: false, retries: $inc})

Then, we need to add a TTL index on the new field processed, with some long-enough expiration time

The main difference is the addition of a processed field that marks both whether the item was processed (that is deleted, no more, gone to meet its maker) and if so, when that happened. This field is also used to delete old entries, once some fixed time has elapsed. This means those queues can potentially grow very big, cause the condition to remove old entries is age, and not size

Note that, in order to improve performance a bit, when an element is processed (after either pop or commit) its when is set to some time far in the future (to $INFINITE and beyond, althoug in practice $INFINITE would be the largest date possible and not a real infinity), to move it 'away' of the get/reserve query

Queues fit for ETL pipelines: moving elements from one queue to the next, atomically

This is an interesting concept: one of the common uses of job queues is to build what's known as ELT pipelines: a set of computing stations where items are transformed or otherwise processed, connected with queues. A common example would be a POSIX shell pipeline, where several commands are tied together so the output of one becomes the input of the next; a ETL pipeline can have also forks and loops, so the topology can be generalized to a graph, not just a linear pipeline

Let us assume for a moment that messages are never created or duplicated in any station: in other words, an item entering a station will produce zero or one items as output. In this scenario, one of the reliability problems that arise is that, usually, moving items from one (input) queue to the next (output) queue is not an atomic operation. This may lead to either item loss or item duplication in the case of station malfunction, even if we use reserve-commit

If we push to output after committing on input, we incur on risk of loss:

whereas if we push to output before commit on input, we risk duplication:

So, the commit-in-input and push-on-output operations must be done atomically; and it turns out it is quite simple to extend the model to accommodate that as a new, atomic move-to-queue operation (although it comes at a price, as we will see)

The easiest way to implement this operation is to require that all queues of a given pipeline are to be hosted in the same mongodb collection; then, our item envelope grows to contain an extra field, q. And last, all operations are augmented to use this new field:

operationimplementation base
pushcoll.insertOne ({q: params.qname, payload: params.item, when: params.when OR now(), retries: 0, reserved: false})
popcoll.findOneAndDelete({q: params.qname, when < now()}, {orderby: {when: $asc}}).payload
reservecoll.findOneAndUpdate({q: params.qname, when < now()}, {when: (now() + params.timeout), reserved: true}, {orderby: {when: $asc}})
commitcoll.delete({_id: params.reserved._id})
rollbackcoll.findOneAndUpdate({_id: params.reserved._id}, {when: (now() + params.delay), reserved: false, retries: $inc})

The new operation move-to-queue is expected to act upon a reserved item, and can be modelled as:

operationimplementation base
moveToQcoll.findOneAndUpdate({_id: params.reserved._id}, {q: params.new_qname, reserved: false, retries: 0})

The operation is rather similar to a rollback, and it is definitely atomic

Payload mutability

In any ETL worth its salt the payloads of the items being managed should be mutable: most logic would be otherwise narly impossible, or very complicated, to express.

The operations in the model depicted above do not allow that, but there is nothing that prevents it; it can certainly be done as part of the update section of the moveToQ and commit operations. It is simply not added here for clarity

· 18 min read

This is a series of articles describing the technical details on which keuss is based to build a rather complete queue middleware (QMW henceforth) with a quite shallow layer on top of MongoDB. The basic approach is well known and understood, but keuss goes well beyond the basic approach to provide extra functionalities

Some nomenclature

Let us start establishing some common nomenclature that will appear later on:

  • job queue: A construct where elements can be inserted and extracted, in a FIFO (first in, first out) manner. Elements extracted are removed from the queue and are no longer available

  • queue middleware (qmw): A system that provides queues and means for actors to perform as producers, consumers or both

  • push: action of inserting an element into a queue

  • pop: action of extracting an element from a queue

  • reserve/commit/rollback: operations to provide more control on the extraction of elements: first the element is reserved, (making it invisible by other reserve or pop operations, but still present in the queue), then once the element is processed it is committed (and only then the element is removed from the queue) or rolledback (meaning it is made eligible again for other reserve or pop, possibly after some delay); if neither commit or rollback happen after some time, an automatic rollback is applied

  • consumer: an actor performing pop and/or reserve-commit-rollback operations on a queue. A queue can have zero or more concurrent consumers

  • producer: an actor performing push operations on a queue. A queue can have zero or more concurrent producers

  • at-most-once: consumer guarantee associated with the pop operation: since the element is first removed from the queue, and then the consumer proceeds to process it, if the consumer dies or crashes in between, the element will be lost. That is, losses are tolerated, but duplications are not

  • at-least-once: consumer guarantee associated with the reserve-commit-rollback operations: if the consumer crashes between reserve and commit the element will eventually be auto-rolledback and be processed again (possibly by another consumer). Therefore, duplications are tolerated but losses are not

  • exactly-once: theoretical consumer guarantee where no losses and no duplications can happen. It involves the use of monotonical identifiers or window-based duplication detection, and is generally extremely complex to achieve, and almost in all cases with a hefty performance penalty. It is almost never offered out of the box in any QMW

  • deadletter queue: usually, there is a maximum number of times an element can be rolled back after a reserve, in order to prevent ill-formed or otherwise incorrect messages to stay forever in queues. Upon rollback, if the element has reached the maximum number of rollbacks it is removed from the queue and pushed into the deadletter queue, which is an otherwise regular queue

  • ordered queue: A non-FIFO queue: insertions are not done at the tail of the queue, but at any point. This means insertions are no longer O(1): depending on the technology used they can be O(n) or better, such as O(log(n)) for a btree-based queue; same goes for push/reserve operations, they are no longer O(1)

    Using ordered queues on a QMW is key to implement certain operations: not only the more obvious such as delay, schedule or priorities, but also robust and performing reserve/commit/rollback

    Using a database to implement queues makes ordered queues a bliss: using a regular index is usually all you need to get near-constant-complexity operations

  • delay/schedule: Push operation when the element is marked not to be eligible for pop or reserve before a certain time. The presence of delayed elements must not impact in any way the rest of elements (that is, the rest of elements' eligibility must not change) or the queue itself (that is, the presence of delayed elements must not degrade the queue performance or capabilities)

    This feature can be very easily implemented using an ordered queue, where the order is defined by a timestamp representing the mature time: the time when the element can be popped or reserved, and not before

    The delay/schedule feature can be applied also to rollbacks, since a rollback is conceptually a re-insertion; delays in rollbacks provide a way to implement exponential backoff easily, to prevent busy reserve-fail-rollback loops, when one element in the queue is repeatedly rolled back upon processing

Basic building blocks

Here's what you need to build a proper QMW:

  1. A storage subsystem: Data for the contents of the queues have to be stored somewhere. It has to provide:

    1. Persistency: data must be stored in a permanent manner, reliably. In-memory QMW has its niche, but we will focus on persistent QMWs
    2. High Availability: we do not want a hardware or network failure to take down the QMW. It should run in a cluster manner, on several machines (possibly in separated geographical locations); if one of the machines fail the rest can cope without (or with minimal) disruption
    3. Sufficient Throughput: the storage should be able to handle a high number of operations per second
    4. Low Latency: operations should be performed very fast, ideally as independent of throughput as possible
  2. An event bus: all QMW clients would need some form of central communication to be aware of certain events in the QWM. For example, if a client is waiting for data to become available in a queue, it should be able to simply await for an event, instead of running a poll busy-loop. Another example of useful event is to signal whether a queue becomes paused (since it must be paused for all clients)

    This event bus can be a pub/sub, stateless bus: only connected clients are made aware of events and there is no need to save events for clients that may connect later. This simplifies the event bus by a lot

The whole idea behind keuss is that all those building blocks are already available out there in the form of DataBase systems, and all there is to add is a thin layer and a few extras.

The need for atomic operations

However, not just any storage (or DB, for that matter) is a good candidate to model queues: there is at least one feature that, lest it be present, renders queue modelling very difficult if not impossible: atomic modify operations

An atomic modify operation in the context of a storage system can be defined as the ability to perform a read and a modify on a single record without the possibility of a second modify interfering, changing the record after the read but before the modify (or after the modify and before the read)

If the storage system provides such primitives, it is relatively easy and simple to model queues on top of it; also, the overall performance (throughput and latency) will greatly depend on the performance of such operation: most RDBMs can do this by packing the read and the modify inside a transaction, but that usually degrades the performance greatly, to a point where it is not viable for queue modelling

There are 2 major storage systems that provide all the needed blocks, along with atomic modifies: MongoDB and Redis. MongoDB has turned out to be an almost perfect fit to back a QMW, as we shall see. MongoDB provides a set of atomic operations to read and modify, and to read and remove. Those operations guarantee that the elements selected to be read and then modified (or removed) will not be read by others until modified (or not read at all if it's removed)

Redis is also a good fit, but it does nor provide a good enough storage layer: it is neither persistent nor high available. Arguably, that's a default behaviour: Redis-Cluster coupled with proper persistence should in theory be up to the task. However, this series of articles would focus on MongoDB only. For now, let us say that atomic operations are very easily added to Redis by coding them as lua extensions, since all operations in Redis are atomic by design

In the following sections we will see how the implementations of common QMW operations can be indeed solved elegantly using atomic operations provided by MongoDB as the underlying DB/storage

Simple approach: good enough queues

There is a very simple, very common way to model queues on top of mongoDB collections. This model does not support reserve-commit-rollback, nor it does support delay/schedule. The model can be succinctly put as:

operationimplementation base
pushcoll.insertOne (item)
popcoll.findOneAndDelete()

The key is the use of the atomic operation findOneAndDelete, which is the combination of a findOne and a remove, but run in one single step. Several actors can perform concurrent findOneAndDelete operations without issues, and without interfering each other

Actors can perform concurrent insertOne operations too, without interference; the same goes for actors performing both findOneAndDelete and insertOne operations. The net result is that many consumers and producers can be served concurrently without interferences or loss of performance, which is what one expects of any self-respecting QMW

This model provides a very simple but rather capable powerful QMW:

  • queues are mostly strict FIFO (FIFO loses its strict meaning when different producers located in different machines are inserting in the same queue, but in practical terms it usually does not matter)
  • we got very good persistence, as good as mongodb's
  • we got very good HA:
    • both consumers and producers have no state, so they can be replicated without problems
    • there is a practical 1:1 equivalence between queues and collections, so all the HA guarantees mongodb provides on collections apply directly to queues
  • we got more than decent performance:
    • mongodb is quite performing on insertions, in the range of Khz (ie, thousands per second)
    • on pop operations, findOneAndDelete is less performing than a simple remove or a findOne but is still able to reach Khz performance. In practice, findOneAndDelete is the bottleneck of this model, because it serializes calls to pop within each queue

The main drawback of this model is the fact that the pop/reserve operations can only be performed in a poll loop: they either return an element or return 'no elements in queue' (or return an error), in all cases pretty much immediately. Therefore, wait state of arbitrary duration must be inserted in the consumer loop if the operation returns 'no elements in queue': otherwise you will get a busy loop where your pop/reserve call relentlessly return 'no elements', eating the CPU in the process (incidentally, this is a text-book case of poll loop)

In some cases, where latencies in the range of seconds or tens of seconds are of no concern, a pool loop can be happily used, so this makes a valid, simple and effective model, especially if you already use MongoDB. In cases where latencies are expected to be near-realtime something better is needed

Adding an event bus

At this point, one of the best improvements to the model is to remove the need for poll loops; for that to happen, we need the pop/reserve operations to 'block' if there are no elements, until they are. A naïve way to do so is to add the poll loop in the pop/reserve calls, so the caller would have the illusion of blocking:

As mentioned, this simply moves the pool loop inside the pop/reserve implementation, away from the user's eyes. But it is still a poll loop, with all its limitations. To truly remove the poll loop we need the ability to wake up a waiting consumer when there are new elements in the queue:

This way the push-to-pop latencies are reduced to close to the theoretical minimum: any consumer would be blocked only when they have to: when there are no elements

Possible implementations

There is an obvious option for the implementation of such an event bus: a pub/sub subsystem. Pub/sub is very well understood, it's stateless and there are a lot of stable implementations. And more importantly, there are stable implementations on top of we already use for storage of queues.

The main disadvantages of pub/sub in themselves as event bus are:

  • they can not handle duplicates: in an HA setup all the replicas of a given client will get a separated copy of each event
  • they have no history: disconnected clients will miss any event published when they're not connected

But none of those is a real disadvantage for us:

  • each client, whether a replica or not, must receive a copy of each event
  • disconnected clients do not need to receive and react to events, since they're not dealing with queues

Let's see the most viable implementations:

In-memory pub/sub

This is a very simple, extremely nimble implementation of a pub/sub that works only within the same (OS) process. It's only meant to be used for testing. It can be also seen as the canonical implementation of the event bus

A very good and very simple implementation for node.js is mitt

Redis pub/sub

redis offers a simple and very efficient pub/sub implementation, which can be used as is. If you already use redis it's definitely the way to go

MongoDB capped collection (also a pub/sub)

Since mongoDB is used to back the queues, it would be great if it could also power the rest of the needed subsystems; and it does, with a just a little implementation work: it is relatively easy to build a pubsub on top of mongoDB capped collections, and there are quite a few implementations readily available. One good example of such implementation in node.js is mubsub

Using this implementation has the added appeal of not adding any extra dependency: you can just use the same mongoDB server used for the queues

This implementations has the added benefit of history state: it operates like a ring buffer, so there is the possibility of accessing past events. However, this is not needed at all here

Practical considerations & improvements

There are a few considerations worth noting about how pubsub fits our purpose, and a few extras we can add to improve matters further

Race conditions

A pubsub bus is asynchronous in nature, so under some conditions you may lose events. Take for example the case of a single client that reinserts an element in the same queue it is consuming from: the event fired upon the insertion is produced at about the same time the client attempts to pop another element. However, if the pop operation establishes the queue is empty (because the queue size was not yet updated after the insert) but the event arrives before the pop operation starts waiting for them, you lose the event and you risk waiting forever, when the queue has indeed one element

Race conditions such as this one are very hard to prevent entirely, if at all possible. For that reason it is recommended to use a model in which race conditions do not cause major issues

In a system such as a QMW the problems to avoid at all costs are:

  • loss of messages
  • duplication of messages
  • deadlocks and other forms of wait-forever conditions

Race conditions on wake-up events won't cause loss or duplication of messages, since this is guaranteed by the queue model; they can however cause deadlocks, where a consumer is left waiting forever for an event that may never arrive

One way to remove this problem is to add a timeout and a poll loop: in the absence of events, the consumer will fallback into a poll loop with a rather long period (this period would be the wait-for-events timeout), usually in the range of tens of seconds. With this we change deadlocks into poll cycles, or deadlocks into increased latency for some rare cases

The consume pop loop would look like this:

forever do:
msg := pop_msg_from_queue() // nonblocking operation, either returns a message or null if none available
if (!msg) do:
await wake_up(insertion) or timeout(period)
continue // a wakeup event arrived or the the timeout was reached: next loop either way
else
return msg
done
done

High cardinality of events

A side effect of having 'insert' events published is that subscribers must be ready to deal with potentially enormous amounts of events; if you're inserting messages in queues at, say 1 Khz and you have 50 consumers, you will have 50,000 individual events to deal with. Besides, most of those events will add no information at all: once a consumer is woken up, it would not be interested in insert events until the queue is empty again; queue consumers just ignore events emitted when they are not idle, but the raw amount of events in the bus might still need a noticeable amount of compute and I/O (especially on non-local pubsubs)

A simple way to minimize this is to simply ignore events if an equivalent one was emitted already in a short period of time; if that is the case, the event is ignored right before the publish, and does not make it to the pubsub at all

Keuss uses exactly this strategy on all the included signal pubsubs, using a window of 50 ms: if the same event was emitted for the same queue within 50 ms in the past, it is ignored

Drawbacks

This strategy has a notable drawback: it introduces an apparent race condition.

Take a queue with a single consumer; insert a single message in the queue, which would be immediately taken by the consumer. If the consumer rejects the message with a zero delay, the consumer may still see zero elements in the next iteration, so it'll block and wait for insert events. The reject will indeed produce an insert event... which will be dropped because it's equivalent to the first insert event, that was emitted about the same millisecond

This will just be a nuisance, since the consumer would time out and rearm itself eventually. But be warned, this can happen on edge cases

Adding another pub/sub implementation

The interface for pubsub in Keuss is very simple, so adding new or different implementations would be quite easy. For example, if you already use mqtt, it makes sense to reuse it to power the event pubsub. This is however out of scope

Final thoughts

At this point we got a rather decent QMW capable of push/pop with concurrent publishers and consumers, with persistence and HA, and able to manage operations at Khz frequency with millisec latencies; all this with a quite simple and stateless implementation

This model can already solve a great deal of problems where persistent job queues are needed, especially if you already got MongoDB in your mix. Also, it has 2 advantages over traditional QMWs :

  1. Performance: This model produces great performance figures when compared with traditional QMWs with full persistence/HA activated
  2. Simplicity: the whole of the implementation is client side, and it is stateless and very thin.
  3. Ease of debug: it is very easy to open the trunk, peek inside and see exactly what's in each queue, and it equally easy to tweak and fix whatever problem you find. In some situations this is an invaluable feature

However, we can do better...

· One min read

We got a brand new site for Keuss, powered by Docusaurus v2. Chances are, if you're reading this, you already found the site...

Good old README.md, as simple and convenient as it is, was becoming way too big and difficult to navigate. Also, it is not a proper site worth presenting to the world; hence the decision to add a small, simple enough site to Keuss