Mastering Celery for Background Task Processing
If your web application sends emails, resizes images, generates PDFs, or interacts with slow third-party APIs during an HTTP request, you are holding the user hostage. These operations must be offloaded to a background task queue. In the Python ecosystem, Celery is the undisputed king of task queues.
The Architecture
Celery requires a Message Broker to send and receive messages. The two industry standards are:
- RabbitMQ: The recommended choice for enterprise. It guarantees message delivery and is incredibly robust.
- Redis: Easier to set up if you are already using it for caching, though slightly more prone to data loss in edge cases if not configured properly.
You also typically need a Result Backend (like Redis or PostgreSQL) to store the state and return values of completed tasks.
Defining and Calling Tasks
Tasks are simply Python functions decorated with @celery.task. The golden rule of Celery is:
Always pass IDs, never pass complex objects (like SQLAlchemy models). The message payload
must be serialized to JSON, and the database row might change between the time the task is queued and when it
executes.
from celery import Celery
import time
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
@app.task(bind=True, max_retries=3)
def process_video(self, video_id):
try:
# 1. Fetch the latest state from the database using the ID
video = db.query(Video).get(video_id)
# 2. Perform the heavy lifting
encode_video_to_h264(video.file_path)
# 3. Update the database
video.status = 'COMPLETED'
db.commit()
return f"Video {video_id} processed."
except Exception as exc:
# Exponential backoff retry
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
To queue the task from your web API, use the .delay() method:
@app.post("/upload")
def upload_video(file: UploadFile):
video = save_to_db(file)
# Non-blocking: returns instantly!
process_video.delay(video.id)
return {"message": "Video uploading, processing in background"}
Routing and Dedicated Workers
As your application grows, you will have different types of tasks. Sending a welcome email takes 500ms. Processing a 4K video takes 30 minutes. If they share the same queue, a backlog of video tasks will prevent password reset emails from going out.
You must route tasks to specific queues and spin up dedicated workers for those queues.
# celery_config.py
app.conf.task_routes = {
'tasks.process_video': {'queue': 'heavy_compute'},
'tasks.send_email': {'queue': 'high_priority'},
}
Then, start your workers independently on your servers:
# Server 1 handles heavy compute tasks, limit concurrency to CPU cores
celery -A tasks worker -Q heavy_compute --concurrency=4
# Server 2 handles high-volume fast tasks using Gevent or Eventlet
celery -A tasks worker -Q high_priority -P gevent -c 100
Conclusion
Celery is a powerful, distributed system. By decoupling long-running operations from your HTTP request-response cycle, you ensure that your web APIs remain lightning-fast and highly available, regardless of the heavy lifting happening in the background.