Using Celery with RabbitMQ's Lazy Queue

2020-04-15 / #celery #rabbitmq #lazy queue


Celery & RabbitMQ basics

Skip this part if you know what Celery & RabbitMQ is.

Celery is a task queue system in Python. Celery can help you run something in the background, schedule cronjobs and distribute workloads across multiple servers.

RabbitMQ is a message broker. A message broker is a program to help you send messages. Celery needs to use a message broker to send & receive messages, and they highly recommend using RabbitMQ.

What is Lazy Queue?

When you publish a message into a RabbitMQ's regular queue, the message is stored into the memory so they can be delivered to consumers pretty fast. RabbitMQ will try to keep as much as messages it can until it hit a memory threshold. When it happens, RabbitMQ will save messages to disk("page out"), which will block the queue and prevent publishers from publishing any new messages.

As you may notice, if you have a lot of messages to publish, a regular queue will

  1. use a lot of memory, depending on your message size and number
  2. sometimes stop receiving new messages ( when paging out ), make the performance less predictive.

A lazy queue will try to save the message to disk as early as possible, therefore minimize the memory usage ( use constant space in most cases ) with the cost of throughput time, disk I/O and CPU usage.

Should I use Lazy Queue?

I'd always use lazy queues unless there are some really high-performance requirements. Lazy queues use significantly less memory and can hold a lot more messages than regular queues. CloudAMQP recommends to enable lazy queues to get predictable performance..

But before you do, I highly recommend you to carefully read the Caveats and Limitations section of lazy queues' documentation, and do some benchmarks yourself.

How to Celery with RabbitMQ's Lazy Queue

Specify queue arguments in Celery

Say you have a celery app

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

app.conf.task_routes = ([
    ('account.tasks.*', {
        'queue': 'account'
    }),
    ('server.tasks.*', {
        'queue': 'server'
    })
])

You can specify queue_arguments to make queues lazy.

from celery import Celery
from kombu import Queue

app = Celery('tasks', broker='amqp://guest@localhost//')

app.conf.task_queues = [
    Queue('account', queue_arguments={'x-queue-mode': 'lazy'}),
    Queue('server', queue_arguments={'x-queue-mode': 'lazy'}),
]

app.conf.task_routes = ([
    ('account.tasks.*', {
        'queue': 'account'
    }),
    ('server.tasks.*', {
        'queue': 'server'
    })
])

Or, if you haven't specified any task routes

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

In this case, Celery will route them into a default queue called celery by default. You need to change the default queue into lazy mode.

from celery import Celery
from kombu import Queue

app = Celery('tasks', broker='amqp://guest@localhost//')

app.conf.task_queues = [
    Queue('celery', queue_arguments={'x-queue-mode': 'lazy'}),
]


@app.task
def add(x, y):
    return x + y

If the queue is already created, you may encounter a PRECONDITION_FAILED error saying the queue was declared with inequivalent arg 'x-queue-mode'. You should consider deleting the queue ( WILL LOSE ALL MESSAGES INSIDE IT ) or changing queue mode via policy.

Changing Queue Mode using RabbitMQ's policy

You can also change queues' mode using RabbitMQ's policy. This allows you to change an existing queue's mode without removing & recreating it. You can set a policy in shell using rabbitmqctl:

rabbitmqctl set_policy Lazy "^celery$" '{"queue-mode":"default"}' --apply-to queues

The above command changes the queue celery into lazy mode.

I prefer to specify queue arguments in Celery since it can be version controlled and don't require extra operations on the production server.

Enjoy.