Celery and RabbitMQ timeouts and connection resets

celerydjangorabbitmq

I'm using RabbitMQ 3.6.0 and Celery 3.1.20 on a Windows 10 machine in a Django application. Everything is running on the same computer. I've configured Celery to Acknowledge Late (CELERY_ACKS_LATE=True) and now I'm getting connection problems.

I start the Celery worker, and after 50-60 seconds of handling tasks each worker thread fails with the following message:

Couldn't ack ###, reason:ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)

(### is the number of the task)

When I look at the RabbitMQ logs I see this:

=INFO REPORT==== 10-Feb-2016::22:16:16 ===
accepting AMQP connection <0.247.0> (127.0.0.1:55372 -> 127.0.0.1:5672)

=INFO REPORT==== 10-Feb-2016::22:16:16 ===
accepting AMQP connection <0.254.0> (127.0.0.1:55373 -> 127.0.0.1:5672)

=ERROR REPORT==== 10-Feb-2016::22:17:14 ===
closing AMQP connection <0.247.0> (127.0.0.1:55372 -> 127.0.0.1:5672):
{writer,send_failed,{error,timeout}}

The error occurs exactly when the Celery workers are getting their connection reset.

I thought this was an AMQP Heartbeat issue, so I've added BROKER_HEARTBEAT = 15 to my Celery settings, but it did not make any difference.

Best Answer

I was having a similar issue with Celery on Windows with long running tasks with concurrency=1. The following configuration finally worked for me:

CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1

I also started the celery worker daemon with the -Ofair option:

celery -A test worker -l info -Ofair

In my limited understanding, CELERYD_PREFETCH_MULTIPLIER sets the number of messages that sit in the queue of a specific Celery worker. By default it is set to 4. If you set it to 1, each worker will only consume one message and complete the task before it consumes another message. I was having issues with long-running task because the connection to RabbitMQ was consistently lost in the middle of the long task, but then the task was re-attempted if any other message/tasks were waiting in the celery queue.

The following option was also specific to my situation:

CELERYD_CONCURRENCY = 1

Setting concurrency to 1 made sense for me because I had long running tasks that needed a large amount of RAM so they each needed to run solo.

Related Topic