Celery: Background Processing in Python: FastAPI Django and Flask

Video thumbnail

When we develop web applications, we often encounter heavy tasks that should not block the response to the user. The classic example is sending emails, but it could be thousands of other things: generating PDFs, processing videos, or performing complex calculations. To solve this in the Python ecosystem, we use Celery.

How does it work?

Celery allows you to create work queues. Instead of executing the heavy task on the main server, it is sent to a queue and Celery processes it bit by bit. This prevents the server from crashing. For this to work, we need a Broker (usually Redis), which is excellent at managing these messages in the background.

Key Concepts: Broker and Backend

  • To configure Celery, we must understand two fundamental components:
  • Broker (The Mailbox): This is where the framework (FastAPI, Flask, or Django) leaves the messages. Celery, along with Redis, is responsible for managing this queue.
  • Backend (The Result Store): This is where the Celery Worker saves the result. This allows us to check later what happened with the task: if it completed successfully, if it failed, or if it is still pending.
# --- 1. Celery Configuration (The Broker and Backend) ---
# We define the Celery instance.
# broker: The "mailbox" where FastAPI leaves the messages (Redis).
# backend: Where the worker saves the results so FastAPI can read them later (Redis).
celery_app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

Installation and Basic Commands

The installation is universal for all three frameworks. We usually install Celery with Redis support:

$ pip install celery[redis]

Execution of Services

To work in development, we need to run two processes in parallel:

The Web Server:

  • FastAPI: uvicorn api:app --reload
  • Django: python manage.py runserver
  • Flask: python app.py

The Celery Worker:

$ celery -A config_file_name worker --loglevel=info

(Where -A indicates the application or module where the Celery configuration resides).

Practical Implementation (FastAPI Example)

Regardless of the framework, the logic is very similar. We define a function with the @app.task decorator and call it using the .delay() method.

Configuration

Here we configure Celery with Redis, which is used by Celery for managing the task queue:

mycelery.py

from celery import Celery
from fastapi import APIRouter
import time
# --- 1. Celery Configuration (The Broker and Backend) ---
celery_app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

The Heavy Task

We simulate a long process (like generating a video) with time.sleep(10):

mycelery.py

# --- 2. The Worker ---
# This function does NOT execute on the FastAPI server.
# It executes in the separate Celery process (the "basement").
@celery_app.task
def slow_task(name: str):
    print(f"Starting heavy task for: {name}")
    time.sleep(10)  # Simulates a 10-second process (e.g., generating a PDF)
    return f"Task completed for {name}!"

The Endpoint (User Consumption)

The user makes the request, the server returns a task ID immediately, and Celery keeps working in the background:

celery_router = APIRouter()
@celery_router.get("/run-task/{name}")
async def run_background_task(name: str):
    """
    Endpoint that receives the request and delegates it to Celery.
    Immediately returns the task ID without waiting for it to finish.
    """
    # .delay() is the key: it sends the message to Redis and returns instantly.
    task = slow_task.delay(name)
    
    return {
        "message": "Task received and sent to the worker (basement)",
        "task_id": task.id,
        "info": "Use the ID to check the status at /celery/status/{task_id}"
    }

The Endpoint (User Consumption)

The user makes the request, the server returns a task ID immediately, and Celery keeps working in the background:

mycelery.py

celery_router = APIRouter()
# @celery_router.post("/run-task/{name}")
@celery_router.get("/run-task/{name}")
async def run_background_task(name: str):
    """
    Endpoint that receives the request and delegates it to Celery.
    Returns the task ID immediately, without waiting for it to finish.
    """
    # .delay() is the key: it sends the message to Redis and returns instantly.
    task = slow_task.delay(name)
    
    return {
        "message": "Task received and sent to the worker (basement)",
        "task_id": task.id,
        "info": "Use the ID to check the status at /celery/status/{task_id}"
    }

Additionally, we configure ANOTHER endpoint used to verify the status of the tasks:

mycelery.py

@celery_router.get("/status/{task_id}")
async def get_task_status(task_id: str):
    """
    Checks the status of a task in the Celery backend (Redis).
    """
    # We get the asynchronous result using the ID
    task_result = celery_app.AsyncResult(task_id)
    
    return {
        "task_id": task_id,
        "status": task_result.status, # States: PENDING, STARTED, SUCCESS, FAILURE
        "result": task_result.result  # The return value of the slow_task function (if finished)
    }

The .delay() method acts as a wrapper that serializes the task and sends it to Redis. Thanks to this, the code remains very clean.

Commands

To use it, we must use the command to start the framework:

$ uvicorn api:app --reload      

And the Celery service command pointing to the configuration:

$ celery -A mycelery.celery_app worker --loglevel=info

Upon execution, create some tasks and you will see in the terminal how Celery receives and completes them; example:

Starting heavy task for: Andres
Task mycelery.slow_task[2571353f-390a-4364-8599-c1019807f27d] received
[2026-03-02 11:41:01,649: INFO/ForkPoolWorker-8] Task mycelery.slow_task[d5b6fd83-6e6c-4337-b9d4-7b50261e5c4b] succeeded in 10.007935125002405s: 'Task completed for Andres!'
Task mycelery.slow_task[2571353f-390a-4364-8599-c1019807f27d] succeeded in 10.015040790996864s: 'Task completed for Andres!'
Task mycelery.slow_task[c5353a8a-de50-4f63-9a67-a7762f444482] succeeded in 10.008552334002161s: 'Task completed for Andres!'

The ID is the one accompanying the task:

2571353f-390a-4364-8599-c1019807f27d

Practical Implementation (Flask Example)

It is usually configured using a Celery object that wraps the application, using Blueprints for routes in a similar way to FastAPI's Endpoints.

my_app/tasks.py

from celery import Celery
import time
# We configure Celery using Redis as the mailbox (broker)
celery_app = Celery(
   "tasks",
   broker="redis://localhost:6379/0",
   backend="redis://localhost:6379/0" # To save the result
)
@celery_app.task
def slow_task(name):
   """This is the task that the worker will execute in the background."""
   print(f"Starting heavy task for: {name}")
   time.sleep(10)  # Simulates a 10-second process
   return f"Task completed for {name}!"

The endpoint:

my_app/celery_views.py

from flask import Blueprint, jsonify
from my_app.tasks import slow_task
# We define a Blueprint for Celery-related routes
celery_bp = Blueprint('celery_views', __name__)
@celery_bp.route("/run-task/<name>", methods=["POST", "GET"])
def run_background_task(name):
    # .delay() tells Python: "Don't execute this here, send it to the worker"
    # This is asynchronous: Flask responds immediately to the user.
    task = slow_task.delay(name)
    
    return jsonify({
        "message": "Task received and sent to the basement",
        "task_id": task.id
    })

The commands for this project are:

$ python run.py
$ celery -A my_app.tasks.celery_app worker --loglevel=info

Practical Implementation (Django Example)

It is a bit more formal (verbose). The configuration usually resides in a celery.py file within the core of the project, and tasks are defined in tasks.py files within each app. Django allows centralizing Celery configuration inside settings.py itself.

mystore/settings.py

# Celery Configuration
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"

elements/tasks.py

from celery import shared_task
import time
@shared_task
def slow_task(name):
    print(f"Starting heavy task for: {name}")
    time.sleep(10)  # Simulates a 10-second process (e.g., generating a PDF)
    return f"Task completed for {name}!"

mystore/celery.py

import os
from celery import Celery
# Set the default Django settings module
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mystore.settings')
app = Celery('mystore')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps
app.autodiscover_tasks()

mystore/__init__.py

import pymysql
pymysql.version_info = (2, 2, 7, "final", 0)
pymysql.install_as_MySQLdb()
# mystore/__init__.py
# To make Django load the Celery app automatically at startup, ensure your mystore/__init__.py file imports the Celery app. It should look like this:
from .celery import app as celery_app
__all__ = ('celery_app',)

The commands for this project are:

$ python manage.py runserver
$ celery -A mystore.celery.app worker --loglevel=info

Status Verification

In all cases, we can create an additional endpoint that receives the task_id to consult if the task is "PENDING" or "SUCCESS". This is vital to know if, for instance, the email was sent or the rocket has already reached the moon.

Conclusion

Celery is a professional and robust tool. Unlike FastAPI's native BackgroundTasks (which are simple but lose functionality if the server restarts), Celery is designed for production environments where reliability and scalability are paramount.

Discover how Celery resolves request blocking using asynchronous job queues. The essential tool for long-running tasks in production environments.

I agree to receive announcements of interest about this Blog.

Andrés Cruz

ES En español