Celery on AWS ECS - Complete Guide

June 29, 2026

Celery on AWS ECS - how hard can it be? One would expect this to be reasonably trivial to set up. You set up a message broker like RabbitMQ, create an ECS service to deploy Celery workers with autoscaling, and then simply start submitting and processing tasks. Following these steps is enough to make it work. But you might soon realize that things are just slightly off. Some tasks are lost, some fail without an error, and some are never processed until the end. Happened to me, and I know it happened to other people as well. So the question is - what's wrong?

In this article, we'll take a look at all the problems you might encounter while using Celery on AWS ECS. Then we'll also look at the solution to each problem.

You can find the repository with the full example here.

Scalable FastAPI Applications on AWS

Build scalable applications with FastAPI and Terraform that run on AWS!

Take Course

Tasks are lost

Interruptions are an inevitable fact of software operations. They might be more or less frequent, but they are always there. The more frequent the scaling and deployments, the more likely interruptions are - by design. On AWS ECS, there are two main sources of task interruptions:

  • Downscaling
  • Deployment

In both cases, ECS is stopping tasks that are currently running. First, it sends SIGTERM. Then it waits for the grace period that's defined inside the task definition. The hard upper limit is 120 seconds. After that, SIGKILL is sent, and the task is forcefully killed.

By default, when Celery receives SIGTERM, it initiates a warm shutdown. That means it waits until the task currently being processed is complete. If this takes more than 120 seconds, the ECS task is killed, and the task gets lost. It gets lost due to Celery's default settings:

  • Tasks are acknowledged as successful as soon as they are submitted: task_acks_late = False.
  • Tasks are not rejected on worker lost: task_reject_on_worker_lost = False
  • Tasks are considered as submitted without broker's confirmation: broker_transport_options = {"confirm_publish": False}.

Tasks are never fully processed

Interruptions don't apply only to lost tasks. If they are frequent and your tasks are too long, some tasks may never be fully processed because there's never a long enough window to go through all the steps. For example, if you deploy 24/7 every 30 minutes, all of your tasks must always be executed in less than 30 minutes. If that's not the case, you might try processing the task multiple times, but it will always be interrupted by the next deployment. Such a task will never be removed from the queue. Eventually, you end up with a completely clogged queue.

In other setups, the warm shutdown might forgive you that, but not on ECS. As described above, SIGKILL is always sent at most 120 seconds after SIGTERM.

Tasks are delayed for a long time

By default, Celery performs task prefetching. Instead of loading a single message before each execution, it keeps a buffer of 5 messages by default to reduce the latency. The reality is that if tasks take more than a fraction of a second, this latency is negligible. It actually causes another problem. These messages are stored in the worker's memory. When using Redis, if the worker is killed, prefetched tasks are retried only after the visibility timeout. With RabbitMQ, they are re-queued when the connection drops. Anyhow, there's another problem. Prefetched tasks might be unnecessarily delayed by the task currently being processed. There might be other free workers who could already process these tasks during that time. So the throughput of the whole system might be lower than it could be.

Tasks are not interrupted by soft-timeout

One of Celery's features is soft-timeout. You can configure the Celery worker to interrupt a task's processing after a certain amount of time. This raises a SoftTimeLimitExceeded exception, which you can intercept and handle with some cleanup or similar. Unfortunately, this can be raised at times, such as during garbage collection. In such cases, there's no caller to receive the exception - so Python goes into the unraisable exception flow. By default, it logs a warning, so your task continues executing without interruption.

Duplicated processing

Celery implements the competing consumers pattern with its broker and workers. What does that mean? There's a single channel - the message broker. All messages for tasks are sent to it. On the other side, there are workers reading from this single message broker. They all try to get the next free message in line - they are competing for it.

This can lead to duplicated processing as two workers pick up the same message. In some cases, that's harmless, but if workers are accessing shared resources, it can cause real damage. For example, imagine workers reading records from a file, cleaning them, and inserting them into the database. Since there are no existing records to lock (e.g., SELECT ... FOR UPDATE ...), you may end up with duplicated records inside the database.

Celery setup on AWS ECS

So far, we've mentioned only the problems. Let's take a look at how to configure Celery for success on AWS ECS.

Infrastructure

First of all, if you can, I suggest using RabbitMQ from Amazon MQ. It simply comes with better message handling than Redis. It's more expensive, but I think it's worth it if you're doing things on any decent scale. If not, feel free to use Redis.

Second, I suggest setting up one ECS service per queue. Try to keep the number of queues to a minimum to minimize infrastructure complexity. Unless you have a good reason, pick ECS Fargate as the deployment target. This way, you don't need to manage EC2 servers. If you need more workers, you simply scale up.

You can learn how to effectively manage AWS infrastructure with Terraform (incl. ECS, RDS, and VPC) inside my course Scalable FastAPI Applications on AWS.

Settings

As mentioned above, Celery doesn't come with the best defaults for today's applications. For example, task_reject_on_worker_lost is set to False by default to prevent an out-of-memory task from clogging the queue by repeatedly killing the workers. Anyhow, within AWS, especially with Fargate, that's not a concern. If it happens, simply increase the allocated resources and redeploy. You can then investigate and fix the root cause later.

So, for successful processing, we need to change the following defaults:

  • task_acks_late -> True
    • To treat tasks as successfully processed only after processing. Otherwise, tasks are not retried.
  • task_reject_on_worker_lost -> True
    • To ensure tasks are retried if workers die for any reason (e.g., warm shutdown + SIGKILL).
  • worker_prefetch_multiplier -> 1
    • To avoid unnecessarily delayed tasks.
  • broker_connection_retry_on_startup -> True
    • To make startups more reliable.
  • broker_transport_options -> {"confirm_publish": True}
    • To avoid unsubmitted tasks due to message transport issues.

Additionally, make sure exponential retries are enabled. This way, you ensure that tasks are retried in the event of an interruption.

class Task(celery.Task):
    autoretry_for = (Exception,)
    max_retries = 5
    retry_backoff = True
    retry_backoff_max = 600
    retry_jitter = True


celery_app = celery.Celery(
    __name__,
    task_cls=Task,
)

With this in mind, try to keep visibility timeout short. It must not be shorter than the longest task execution time. You can enforce this with soft-timeout or hard-timeout. You should always keep visibility timeout slightly above the timeout limit - just in case.

Last but not least, use signals to exit in case of an unraisable error to make sure soft-timeout is respected:

def _abort_on_unraisable_errors(unraisable: Any) -> None:
    exc_type = unraisable.exc_type
    exc_value = unraisable.exc_value

    if exc_type is SoftTimeLimitExceeded or isinstance(exc_value, SoftTimeLimitExceeded):
        LOGGER.warning("Soft time limit exceeded")

    signal.raise_signal(signal.SIGQUIT)


sys.unraisablehook = _abort_on_unraisable_errors

This ensures that the process exits upon encountering an unraisable error.

Celery Task design

Once you have the settings in place, ensure your tasks are designed to be processed quickly enough within the given constraints. There are two techniques you can use to achieve that: fan out and batching.

Fan out

The first one is called fan out. You can split the work between a "scheduler" task and "execution" tasks. For example: Instead of listing all users and sending an email to all of them inside the same task, you can split the listing and email sending:

# Single task
@celery_app.task(name="send_newsletter")
def send_newsletter():
    session = SessionLocal()
    for user in session.query(User).all():
        LOGGER.info(f"Sending newsletter to {user.email}")
        time.sleep(1)


# Fan out
@celery_app.task(name="send_newsletter_to_user")
def send_newsletter_to_user(user_email):
    LOGGER.info(f"Sending newsletter to {user_email}")
    time.sleep(1)


@celery_app.task(name="send_newsletter_fan_out")
def send_newsletter_fan_out():
    session = SessionLocal()
    for user in session.query(User).all():
        send_newsletter_to_user.apply_async(kwargs={"user_email": user.email})

This way, there's one fast task that lists the records and submits the tasks. Each email-sending task then sends an email to a single user. And that's also a fast operation. This way, even if SIGTERM is received, tasks are fully processed within the 120-second grace period.

There's one caveat, though. This works well only for high-priority tasks where we want to execute them as soon as possible. Imagine having 100k users. That's 100k submitted tasks. This would jam the queue for quite some time. Therefore, don't use fan-out for tasks where some delay is acceptable.

Batching

Another approach is batching. So instead of doing all the work at once, you split the work into batches. For example: You list the first 100 users and send them emails. After that, you submit the next task with a marker (e.g., the last evaluated key) that indicates which records can be skipped. This way, the next execution picks up where the previous one left off. You repeat that until the end.

See the newsletter example:

BATCH_SIZE = 100


@celery_app.task(
    name="send_newsletter_batching",
)
def send_newsletter_batching(last_evaluated_key=None):
    last_evaluated_key = last_evaluated_key or -1
    session = SessionLocal()
    users = (
        session.query(User)
        .filter(User.id > last_evaluated_key)
        .order_by(User.id)
        .limit(BATCH_SIZE)
        .all()
    )

    for user in users:
        LOGGER.info(f"Sending newsletter to {user.email}")
        time.sleep(1)

    if len(users) < BATCH_SIZE:
        return
    new_last_evaluated_key = users[-1].id
    send_newsletter_batching.apply_async(
        kwargs={"last_evaluated_key": new_last_evaluated_key}
    )

It looks very similar to pagination on the API. It's just that a Celery task is doing it instead of the web app.

You can use batching when you don't need things done immediately, because it lets other tasks use resources in the meantime.

Task locking

To prevent duplicated processing, we can implement locking with Redis. We can use setnx, which sets the key only if it doesn't exist yet. We can check whether the lock is already acquired before the execution starts and send the task into retry if it is. Once the processing is done, we release the lock. We need to make sure that the lock is released only by the process that acquired it:

import datetime
import os
from functools import wraps

from redis.client import Redis

redis = Redis.from_url(os.getenv("REDIS_URL"))
TIME_TO_LIVE = datetime.timedelta(minutes=15).total_seconds()


class LockNotAcquired(Exception):
    pass


def lock(job_id):
    return bool(redis.set(name=job_id, value=job_id, nx=True, ex=int(TIME_TO_LIVE)))


def release(job_id):
    redis.delete(job_id)


def no_parallel_processing_of_task(fun):  # type: ignore
    @wraps(fun)
    def outer(self, *args, **kwargs):  # type: ignore
        acquired = lock(job_id=self.request.id)
        if not acquired:
            raise LockNotAcquired(
                f"Task {self.request.id} is already being processed by another worker"
            )
        try:
            return fun(self, *args, **kwargs)
        finally:
            if acquired:
                release(job_id=self.request.id)

    return outer

Once we have the locking implemented, we can apply it to our task:

@celery_app.task(name="send_newsletter_locking", bind=True)
@no_parallel_processing_of_task
def send_newsletter_locking(self):
    session = SessionLocal()
    for user in session.query(User).all():
        LOGGER.info(f"Sending newsletter to {user.email}")
        time.sleep(1)

So if we use that, the task on the second worker who picked it up won't start. This way, we prevent parallel processing of the same task.

Why retry instead of just early exit? Because we don't know whether the worker who started the processing will actually finish it. There's no guarantee for it. With retries, we ensure that a task is fully processed at least once, possibly twice. That's why it's important for tasks to be idempotent.

Note: If you'd rather make an early exit during the retry, you could add a check to see whether the task already has the result. You could do an early exit returning the same value.

Become a better engineer, one article at a time.

Practices, mindsets, and habits that actually move the needle. Delivered weekly to your inbox.

Conclusion

What might seem trivial in a tutorial can be significantly more complicated once you're in production. Anyhow, if you know what to do, it's not hard to set things up. If you have any questions, feel free to reach out on X or LinkedIn.

Happy scaling!

Share