AmqpDelayer

The AmqpDelayer is an application designed to manage delayed message handling within RabbitMQ message queues. It reads messages from a RabbitMQ queue and, based on the "timestamp" header property, holds each message until its designated delay expires. When this delay expires, AmqpDelayer publishes the message back to RabbitMQ for further processing. Messages without a timestamp, or those marked for immediate release, are forwarded right away. Storage can be configured as disk-based or use MongoDB.

We developed this application for MailerQ users seeking alternatives to prolonged message storage in RabbitMQ. When an email is rescheduled for delayed delivery, such as due to temporarily unreachable mail servers or graylisting, MailerQ returns it to RabbitMQ, where it is potentially held for up to 48 hours. Since RabbitMQ isn’t optimized for such long-term storage, AmqpDelayer can serve as an optional intermediary, offloading these retry messages from RabbitMQ and retaining them on disk or in MongoDB until they’re ready for re-queueing.

Although the application was originally developed for use in combination with MailerQ, it is a generic application that can be used in all sorts of RabbitMQ based topologies where messages need to be offloaded from RabbitMQ for a while.

Installation

The application is stored in our APT repository. If you have already enabled our APT repository, you can install it with this command:

sudo apt install mailerq-amqpdelayer

Integration with MailerQ

A typical use case is to route the MailerQ retries to a special queue, and configure AmqpDelayer to read out this queue, and route the output back to the outbox queue, where it will be picked up by MailerQ again for further delivery.

Running

You typically run this program from the command line. It reads all its input from a config file stored in /etc/copernica/amqpdelayer.txt. However, you can also pass command line arguments, for example:

amqpdelayer --rabbitmq-address amqp://user:password/hostname/vhost

Configuration

All options can be supplied in the system-wide config file, via environment variables, and as command line options. The config file is stored in /etc/copernica/amqpdelayer.txt. Each config file setting can also be passed as a command line argument, or supplied as an environment variable (where the variable name is converted to uppercase, and dashes to underscores). The following settings are thus equivalent:

All other configuration settings can be converted in a similar manner.

Supported options for RabbitMQ

AmqpDelayer reads messages from a RabbitMQ message queue, holds them for a while, and then publishes them back to RabbitMQ. The following options can be used to configure this:

If you leave the routing key empty, messages will be published with their original routing key. If the usereplyto is set to true, messages with a routing-key set in their envelope will be sent back to this reply-to exchange and the exchange from the config file is ignored.

Supported options for storage

There are two types of storage supported: disk-space storage and MongoDB-based storage.

Disk-based storage is implemented using LMDB and uses separate data and index files. The mapsize config file setting holds the max size for such files. It is not used for MongoDB-based storage.

AmqpDelayer does not use locks or other mechanisms to prevent data races: if you run multiple AmqpDelayer instances, each one must have its own directory or MongoDB collection where data is stored.

Options to limit message delays

Messages are delayed based on the timestamp header in the message. When it is set to a future timestamp, messages are stored. Other messages are immediately published back to RabbitMQ. The following settings can be used to further control this:

These settings can be used to prevent that messages that expire soon are sent to storage. All values are in seconds.

Operation and shutdown procedure

The application normally takes care of both delaying and reviving messages. With the mode setting, you can configure the application to just delay messages (without ever reviving them), or to just revive them (without ever delaying):

When the application shuts down, messages are kept in storage and are picked up again when the application is restarted. You can alternatively set the shutdown procedure to "return" or "forward". With "return" all stored messages are published back to the queue from which they were read, and with "forward" the messages are passed to the configured exchange and routingkey as if they already expired.

The shutdown option could be helpful when running in a containerized environment with non-persistent storage (like emptyDir on Kubernetes). Messages are published back to RabbitMQ when the application closes down and storage is removed. The shutdown procedure is not implemented for MongoDB-based storage.

RabbitMQ and storage tuning

AmqpDelayer uses several settings to improve throughput for incoming messages and publishing to RabbitMQ. Below are key configuration options to tune RabbitMQ quality of service and storage operations.

RabbitMQ Quality of Service (QoS) and transaction sizes

The group size sets the write transaction’s size. A smaller group size keeps messages in memory for less time, while a larger group size reduces write frequency. The application postpones flushing message to storage until the transaction is full ('groupsize' messages have been read from RabbitMQ), or until the timeout is reached.

Higher QoS and groupsize values allow AmqpDelayer to load multiple messages into memory, bundling them into larger write transactions. QoS should always be set higher than the group size to ensure efficient bundling. Message are acked to RabbitMQ after they are written to storage.

Throttling and message revival

To avoid overwhelming RabbitMQ when republishing messages, use these settings:

The throttle prevents AmqpDelayer from sending too many messages to RabbitMQ simultaneously. Once the throttle limit is hit and this number of publish operations are in flight, AmqpDelayer pauses loading from storage until at least batchsize messages are acknowledged. After that, it proceeds with loading more messages from storage.

These two settings combined ensure that RabbtMQ is not flooded with publish operations on the one hand, and at the same time that the storage platform is not flooded with many small read operations. Batchsize must be smaller than the throttle.

Built-in webserver

The application has a small built-in web server that exposes some metrics that can be used in a Prometheus stack. There are two supported pages: / and /metrics.