Python

Mastering Celery for Background Task Processing

By Mohd Baquir Qureshi
Hardware Circuit Board

If your web application sends emails, resizes images, generates PDFs, or interacts with slow third-party APIs during an HTTP request, you are holding the user hostage. These operations must be offloaded to a background task queue. In the Python ecosystem, Celery is the undisputed king of task queues.

The Architecture

Celery requires a Message Broker to send and receive messages. The two industry standards are:

  1. RabbitMQ: The recommended choice for enterprise. It guarantees message delivery and is incredibly robust.
  2. Redis: Easier to set up if you are already using it for caching, though slightly more prone to data loss in edge cases if not configured properly.

You also typically need a Result Backend (like Redis or PostgreSQL) to store the state and return values of completed tasks.

Defining and Calling Tasks

Tasks are simply Python functions decorated with @celery.task. The golden rule of Celery is: Always pass IDs, never pass complex objects (like SQLAlchemy models). The message payload must be serialized to JSON, and the database row might change between the time the task is queued and when it executes.

from celery import Celery
import time

app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

@app.task(bind=True, max_retries=3)
def process_video(self, video_id):
    try:
        # 1. Fetch the latest state from the database using the ID
        video = db.query(Video).get(video_id)
        
        # 2. Perform the heavy lifting
        encode_video_to_h264(video.file_path)
        
        # 3. Update the database
        video.status = 'COMPLETED'
        db.commit()
        return f"Video {video_id} processed."
    except Exception as exc:
        # Exponential backoff retry
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

To queue the task from your web API, use the .delay() method:

@app.post("/upload")
def upload_video(file: UploadFile):
    video = save_to_db(file)
    
    # Non-blocking: returns instantly!
    process_video.delay(video.id)
    
    return {"message": "Video uploading, processing in background"}

Routing and Dedicated Workers

As your application grows, you will have different types of tasks. Sending a welcome email takes 500ms. Processing a 4K video takes 30 minutes. If they share the same queue, a backlog of video tasks will prevent password reset emails from going out.

You must route tasks to specific queues and spin up dedicated workers for those queues.

# celery_config.py
app.conf.task_routes = {
    'tasks.process_video': {'queue': 'heavy_compute'},
    'tasks.send_email': {'queue': 'high_priority'},
}

Then, start your workers independently on your servers:

# Server 1 handles heavy compute tasks, limit concurrency to CPU cores
celery -A tasks worker -Q heavy_compute --concurrency=4

# Server 2 handles high-volume fast tasks using Gevent or Eventlet
celery -A tasks worker -Q high_priority -P gevent -c 100

Conclusion

Celery is a powerful, distributed system. By decoupling long-running operations from your HTTP request-response cycle, you ensure that your web APIs remain lightning-fast and highly available, regardless of the heavy lifting happening in the background.