Building Async Backends with Django and Celery



This content originally appeared on DEV Community and was authored by Elizabeth Adhiambo

Performance is everything in modern web applications. Users expect fast responses, instant feedback, and seamless experiences even when your backend is running heavy workloads.

So what happens when your app needs to handle tasks that can’t complete in milliseconds? Think of:

  • Generating AI-powered summaries
  • Processing large CSV files for bulk imports
  • Sending thousands of emails or notifications
  • Running complex image or video processing

If you try to handle these in the request/response cycle, your app slows down or worse, times out.

This is where background processing with Django and Celery comes in. In this article, we’ll break down how to build a production-ready async backend that can handle heavy lifting without blocking user requests.

Why Background Processing Matters

Imagine this scenario: your user uploads a 100MB CSV with 50,000 records, and you try to process it in the same request.

  • The request takes minutes to complete
  • Your server thread is blocked
  • The user gets frustrated or abandons the app

Instead, with background processing:

  • The request is accepted quickly
  • The task is queued in the background
  • The user gets an instant response (202 Accepted – Your job is being processed)
  • The heavy lifting happens elsewhere (workers)
  • The user can poll for results or receive notifications

The Architecture

Here’s a simplified flow:

Client → Django View → Celery Task Queue → Worker → Result Backend → Client (WebSocket)

  1. Client → Django View: User submits a request (e.g., upload a file, request AI summary)
  2. Django View → Task Queue: The request is validated and pushed to Celery via a broker like Redis
  3. Worker: Celery workers pick up tasks from the queue and process them asynchronously
  4. Result Backend: Task results and status are stored in Redis or your database
  5. Client: Receives real-time updates via WebSocket (no polling needed!)

Project Setup & Configuration

Installing Dependencies

pip install celery redis channels channels-redis django-rest-framework
pip install google-generativeai PyPDF2 pdfplumber python-docx

Celery Configuration

The key to a robust Celery setup is proper configuration with rate limiting, time limits, and periodic tasks:

# core/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
app = Celery('ai_document_processor')

app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

# Periodic tasks for maintenance
app.conf.beat_schedule = {
    'cleanup-failed-jobs': {
        'task': 'documents.tasks.cleanup_failed_jobs',
        'schedule': 3600.0,  # Every hour
    },
}

# Task execution limits and rate control
app.conf.task_annotations = {
    'documents.tasks.process_document': {
        'rate_limit': '10/m',      # 10 tasks per minute
        'time_limit': 1800,        # 30 minutes max
        'soft_time_limit': 1500,   # Warning at 25 minutes
    },
}

Key Configuration Highlights:

  • Rate Limiting: Prevents overwhelming external APIs
  • Time Limits: Prevents runaway tasks from consuming resources
  • Periodic Tasks: Automated cleanup via Celery Beat
  • Auto-discovery: Finds tasks across all Django apps

Django Settings

# settings.py
REDIS_URL = config('REDIS_URL', default='redis://localhost:6379/0')

# Celery
CELERY_BROKER_URL = REDIS_URL
CELERY_RESULT_BACKEND = REDIS_URL
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'

# Channels for WebSockets
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {'hosts': [REDIS_URL]},
    },
}

# AI Configuration
GEMINI_API_KEY = config('GEMINI_API_KEY')
GEMINI_MODEL = 'gemini-pro'

Data Models

We use four key models to track the entire processing lifecycle:

  • Document: Stores file metadata and processing status
  • ProcessingJob: Tracks Celery task execution with progress updates
  • DocumentAnalysis: Stores AI-generated results (summary, sentiment, topics, etc.)
  • ProcessingLog: Detailed logs for debugging

Each model uses UUIDs for better security and includes timestamps, status fields, and relationships that ensure data integrity through OneToOne and ForeignKey constraints.

The Processing Pipeline

Main Task Orchestration

Here’s the heart of the system – the main Celery task that coordinates everything:

@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def process_document(self, document_id: str) -> dict:
    """Process document through the complete pipeline"""
    try:
        document = Document.objects.get(id=document_id)

        # Create processing job
        processing_job = ProcessingJob.objects.create(
            document=document,
            celery_task_id=self.request.id,
            status='started'
        )

        # Step 1: Extract text (20% progress)
        self.update_state(state='PROGRESS', meta={'progress': 20})
        extracted_text = extract_text_from_document(document)
        send_websocket_update(document_id, 'processing_update', {
            'progress': 20,
            'message': 'Text extraction completed'
        })

        # Step 2: AI Analysis (50% progress)
        self.update_state(state='PROGRESS', meta={'progress': 50})
        analysis_results = analyze_document_with_gemini(document, extracted_text)
        send_websocket_update(document_id, 'processing_update', {
            'progress': 90,
            'message': 'AI analysis completed'
        })

        # Step 3: Finalize (100% progress)
        document.status = 'completed'
        document.save()

        send_websocket_update(document_id, 'processing_complete', {
            'status': 'completed'
        })

        return {'status': 'success', 'document_id': document_id}

    except Exception as exc:
        # Handle errors and retry with exponential backoff
        if self.request.retries < self.max_retries:
            raise self.retry(countdown=60 * (2 ** self.request.retries))
        raise exc

Key Features:

  • Progress Tracking: Updates state at each step (20%, 50%, 100%)
  • WebSocket Updates: Real-time notifications to connected clients
  • Retry Logic: Exponential backoff for transient failures
  • Error Handling: Comprehensive exception management

Sending Real-Time Updates

The critical link between Celery workers and WebSocket clients:

def send_websocket_update(document_id: str, event_type: str, data: dict):
    """Send updates to WebSocket clients"""
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        f'document_{document_id}',
        {
            'type': event_type,
            'document_id': document_id,
            'timestamp': timezone.now().isoformat(),
            **data
        }
    )

This bridges the gap between synchronous Celery tasks and asynchronous WebSocket connections, enabling real-time updates.

Text Extraction with Fallbacks

def extract_text_from_pdf(file_path: str) -> str:
    """Extract text with fallback strategy"""
    try:
        # Primary: pdfplumber (more reliable)
        with pdfplumber.open(file_path) as pdf:
            return '\n'.join([page.extract_text() for page in pdf.pages])
    except Exception:
        # Fallback: PyPDF2
        return extract_with_pypdf2(file_path)

Multiple extraction methods ensure robustness – if one library fails, we automatically try another.

AI Analysis

def analyze_document_with_gemini(document, text_content: str) -> dict:
    """Run multiple AI analyses"""
    model = genai.GenerativeModel('gemini-pro')
    analysis = DocumentAnalysis.objects.create(document=document)

    # Run different analysis types
    analysis_types = {
        'summary': 'Provide a 2-3 sentence summary',
        'key_points': 'Extract key points as JSON list',
        'sentiment': 'Analyze sentiment: positive/negative/neutral',
        'topics': 'Identify main topics as JSON list'
    }

    for analysis_type, prompt in analysis_types.items():
        response = model.generate_content(f"{prompt}\n\n{text_content[:4000]}")
        # Store results in analysis object
        setattr(analysis, f'{analysis_type}_completed', True)
        time.sleep(1)  # Respect API rate limits

    analysis.save()
    return analysis

Best Practices:

  • Rate limiting with sleep between calls
  • Token tracking for cost management
  • Flexible JSON parsing with fallbacks
  • Error isolation – one failure doesn’t break others

Real-Time Updates with WebSockets

Server-Side Consumer

class DocumentProcessingConsumer(AsyncWebsocketConsumer):
    """Real-time processing updates"""

    async def connect(self):
        self.document_id = self.scope['url_route']['kwargs']['document_id']
        self.room_group_name = f'document_{self.document_id}'

        # Verify user has access
        if await self.check_document_access():
            await self.channel_layer.group_add(
                self.room_group_name,
                self.channel_name
            )
            await self.accept()
            await self.send_document_status()

    async def processing_update(self, event):
        """Forward processing updates to client"""
        await self.send(text_data=json.dumps({
            'type': 'processing_update',
            'progress': event['progress'],
            'message': event['message']
        }))

WebSocket Features:

  • Authentication checks before accepting connections
  • Room-based broadcasting for multiple clients
  • Automatic status updates on connection
  • Type-specific event handlers

Client-Side Integration

const socket = new WebSocket(`ws://${location.host}/ws/documents/${docId}/`);

socket.onmessage = function(e) {
    const data = JSON.parse(e.data);

    if (data.type === 'processing_update') {
        updateProgressBar(data.progress);
        showMessage(data.message);
    } else if (data.type === 'processing_complete') {
        showSuccess('Processing completed!');
    }
};

REST API Design

Document Processing Endpoints

class DocumentViewSet(viewsets.ModelViewSet):
    permission_classes = [IsAuthenticated]

    def perform_create(self, serializer):
        """Upload and automatically start processing"""
        document = serializer.save(user=self.request.user)
        process_document.delay(str(document.id))

    @action(detail=True, methods=['post'])
    def reprocess(self, request, pk=None):
        """Restart failed processing"""
        document = self.get_object()
        document.status = 'uploaded'
        document.save()
        process_document.delay(str(document.id))
        return Response({'message': 'Reprocessing started'})

    @action(detail=True, methods=['post'])
    def cancel_processing(self, request, pk=None):
        """Cancel active processing"""
        job = self.get_object().processing_job
        from celery import current_app
        current_app.control.revoke(job.celery_task_id, terminate=True)
        return Response({'message': 'Processing cancelled'})

API Highlights:

  • RESTful design with standard HTTP methods
  • Automatic processing on upload
  • Reprocessing for failed documents
  • Task cancellation support

Production Best Practices

1. Task Rate Limiting

app.conf.task_annotations = {
    'documents.tasks.process_document': {
        'rate_limit': '10/m',  # Prevent queue flooding
    },
}

2. Monitoring with Flower

pip install flower
celery -A core flower --port=5555
# Access dashboard at http://localhost:5555

Flower provides:

  • Real-time task monitoring
  • Worker statistics
  • Task history and results
  • Performance graphs

3. Horizontal Scaling

# Run multiple workers
celery -A core worker --concurrency=4 -n worker1@%h
celery -A core worker --concurrency=4 -n worker2@%h

# Separate queues by priority
celery -A core worker -Q high_priority,default -n worker1@%h
celery -A core worker -Q low_priority -n worker2@%h

4. Task Priority Queues

# Configure routing
app.conf.task_routes = {
    'documents.tasks.process_document': {'queue': 'high_priority'},
    'documents.tasks.cleanup_old_logs': {'queue': 'low_priority'},
}

# Send to specific queue
process_document.apply_async(
    args=[document_id],
    queue='high_priority',
    priority=9
)

5. Error Tracking

import sentry_sdk
from sentry_sdk.integrations.celery import CeleryIntegration

sentry_sdk.init(
    dsn="your-sentry-dsn",
    integrations=[CeleryIntegration()],
)

Beyond AI: Other Real-Time Use Cases

WebSocket + Celery architecture is perfect for:

  • Live Data Processing: Real-time analytics, log processing, data transformations
  • File Uploads: Show upload progress, virus scanning, format conversion
  • Report Generation: PDF/Excel generation with progress updates
  • Batch Operations: Bulk updates, mass emails, data migrations
  • Integration Workflows: Multi-step API integrations with status updates
  • Machine Learning: Model training progress, inference batching
  • Image/Video Processing: Encoding, resizing, filtering with live previews

Wrapping Up

By combining Django, Celery, and Django Channels, you can:

  • Keep your app responsive under heavy load
  • Provide real-time feedback to users (no polling!)
  • Handle long-running tasks safely in the background
  • Scale horizontally by adding more workers
  • Build modern, real-time user experiences

This architecture powers everything from AI-driven features to bulk data processing anywhere you need to keep users informed while work happens asynchronously.

Quick Reference Commands

# Development
redis-server
celery -A core worker --loglevel=info
python manage.py runserver

# Production
celery -A core worker --loglevel=info --concurrency=4
celery -A core beat --loglevel=info  # Scheduled tasks
celery -A core flower                 # Monitoring
daphne -b 0.0.0.0 -p 8000 core.asgi:application

# Debugging
celery -A core inspect active      # View active tasks
celery -A core inspect stats       # Worker statistics
celery -A core purge              # Clear all tasks

Resources

Repository: The complete code is available at ai-document-processor


This content originally appeared on DEV Community and was authored by Elizabeth Adhiambo