Celery Task Analyzer

v1.0.0

Analyze Celery task configurations for reliability, performance, and debugging. Checks retry policies, task routing, serialization, result backends, worker c...

0· 27· 1 versions· 0 current· 0 all-time· Updated 5h ago· MIT-0

Celery Task Analyzer

Analyze Celery task configurations for reliability, performance, and operational excellence. Reviews task definitions, retry policies, routing rules, serialization settings, result backend configuration, worker tuning, and beat schedules. Acts as a senior distributed systems engineer auditing your Celery deployment.

Usage

Invoke this skill when you need to review Celery task configurations, optimize worker performance, or ensure reliability best practices.

Basic invocation:

Analyze the Celery tasks in /path/to/app/tasks/ Review this Celery configuration for production readiness Check task reliability across the project

Focused analysis:

Audit retry policies for all Celery tasks Review task routing configuration Check beat schedule for overlapping tasks Analyze worker configuration for memory leaks

The agent reads Celery task files, configuration modules, and beat schedules, then produces a comprehensive quality report.

How It Works

Step 1: Discover and Parse Celery Components

The agent locates all Celery-related code:

# Find Celery app configuration
grep -rl "Celery(\|celery_app\|make_celery" /path/to/app/ --include="*.py"

# Find task definitions
grep -rl "@app.task\|@shared_task\|@celery.task" /path/to/app/ --include="*.py"

# Find beat schedule definitions
grep -rl "beat_schedule\|CELERYBEAT_SCHEDULE\|crontab\|solar" /path/to/app/ --include="*.py"

# Find configuration files
grep -rl "broker_url\|result_backend\|CELERY_" /path/to/app/ --include="*.py"

The agent parses each component to extract:

  • App configuration (broker, backend, serialization, timezone)
  • Task definitions (name, module, decorators, arguments)
  • Retry policies (max_retries, retry_backoff, retry_jitter)
  • Routing rules (queues, exchanges, routing keys)
  • Beat schedules (periodic tasks, crontab, intervals)
  • Signal handlers (task_prerun, task_postrun, task_failure)
  • Worker configuration (concurrency, prefetch, pool type)

Step 2: Audit Task Definitions

The agent checks each task's configuration:

Task decorator analysis:

# GOOD: Well-configured task
@shared_task(
    bind=True,
    name="orders.process_payment",
    max_retries=3,
    default_retry_delay=60,
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True,
    acks_late=True,
    reject_on_worker_lost=True,
    time_limit=300,
    soft_time_limit=270,
    rate_limit="100/m",
    track_started=True,
    ignore_result=False,
)
def process_payment(self, order_id: int) -> dict:
    ...

# PROBLEMS the agent detects:
FAIL: Task "send_email" — no retry configuration
  @shared_task
  def send_email(to, subject, body):
  RISK: Transient SMTP failures cause permanent message loss
  FIX: Add retry policy:
    @shared_task(bind=True, max_retries=3, retry_backoff=True)
    def send_email(self, to, subject, body):
        try: ...
        except SMTPException as exc:
            raise self.retry(exc=exc)

FAIL: Task "process_payment" — no time_limit
  RISK: Stuck payment processing blocks worker indefinitely
  FIX: Add time_limit=300, soft_time_limit=270
  The soft limit raises SoftTimeLimitExceeded so you can clean up

FAIL: Task "generate_report" — acks_late=False (default)
  RISK: If worker crashes mid-task, message is already acknowledged
  The report generation is lost with no retry
  FIX: acks_late=True, reject_on_worker_lost=True

WARN: Task "sync_inventory" — no rate_limit
  This task calls external API with rate limits
  RISK: Worker bursts can trigger API throttling for all consumers
  FIX: rate_limit="30/m" (match API rate limit)

WARN: Task "cleanup_temp_files" — ignore_result not set
  Results are stored but never read for this fire-and-forget task
  FIX: ignore_result=True — saves result backend storage and bandwidth

WARN: Task "update_search_index" — no bind=True
  Cannot access self.retry() or self.request without bind=True
  FIX: @shared_task(bind=True) and add self as first parameter

Task naming audit:

Task Naming Analysis:

  FAIL: Task uses auto-generated name
    "app.tasks.send_email" — derived from module path
    RISK: Renaming the module or moving the file breaks all pending tasks
    FIX: Set explicit name: @shared_task(name="notifications.send_email")

  FAIL: Inconsistent naming convention
    "process_order" (no namespace)
    "payments.charge_card" (dotted namespace)
    "UserSync" (PascalCase)
    FIX: Use consistent dotted namespace: "domain.action_verb"
      "orders.process_order", "payments.charge_card", "users.sync_user"

  WARN: Task name collision risk
    "process" defined in both orders/tasks.py and payments/tasks.py
    Auto-generated names differ but explicit names might collide
    FIX: Always use namespaced explicit names

Step 3: Analyze Retry Policies

The agent evaluates retry configurations for reliability:

Retry Policy Analysis:

  Task: "orders.process_payment"
    max_retries: 3
    retry_backoff: True (exponential: 1s, 2s, 4s)
    retry_backoff_max: 600s
    retry_jitter: True
    PASS: Well-configured — exponential backoff with jitter prevents thundering herd

  Task: "integrations.sync_crm"
    max_retries: 10
    default_retry_delay: 5
    retry_backoff: False
    FAIL: Fixed 5-second delay with 10 retries = only 50 seconds of retry window
    For CRM API issues that may last minutes, this is insufficient
    FIX: retry_backoff=True, retry_backoff_max=1800
    This gives: 5s, 10s, 20s, 40s, ... up to 30 min — covers longer outages

  Task: "reports.generate_pdf"
    max_retries: None (infinite retries!)
    FAIL: Infinite retries — a poison message retries forever
    RISK: Consumes worker slots indefinitely, fills dead letter queue
    FIX: Set finite max_retries (3-5 for most tasks)

  Task: "notifications.send_push"
    Retries on all exceptions (bare except)
    FAIL: Retries on ValueError, TypeError — bugs never surface
    FIX: Only retry on transient errors:
      except (ConnectionError, TimeoutError) as exc:
          raise self.retry(exc=exc)
      # Let ValueError, TypeError propagate as failures

  Task: "etl.import_csv"
    autoretry_for=(Exception,)
    FAIL: autoretry_for too broad — retries on programming errors
    FIX: autoretry_for=(IOError, ConnectionError, OperationalError)

  Retry Summary:
    Tasks with retry:       8/15
    Tasks without retry:    5/15 (REVIEW NEEDED)
    Tasks with bad retry:   2/15 (NEEDS FIX)
    Infinite retry tasks:   1/15 (CRITICAL)

Step 4: Review Task Routing and Queues

The agent analyzes routing configuration:

Routing Analysis:

  Queue Configuration:
    Queues defined: ["default", "high_priority", "bulk", "notifications"]

  FAIL: No task_routes configured
    All 15 tasks go to "default" queue
    RISK: Bulk import tasks starve payment processing tasks
    FIX: Configure routing:
      task_routes = {
          "orders.process_payment": {"queue": "high_priority"},
          "etl.import_*": {"queue": "bulk"},
          "notifications.*": {"queue": "notifications"},
      }

  FAIL: No dedicated workers per queue
    Single worker process consumes all queues
    RISK: Cannot scale queues independently, cannot set per-queue concurrency
    FIX: Run separate workers:
      celery -A app worker -Q high_priority -c 4
      celery -A app worker -Q bulk -c 2
      celery -A app worker -Q notifications -c 8

  WARN: No priority_steps configured
    Default priority is FIFO only — cannot prioritize within a queue
    FIX: For RabbitMQ: task_queue_max_priority=10
    Then: process_payment.apply_async(priority=9)

  WARN: No dead letter exchange configured
    Failed messages after max_retries are silently discarded
    FIX: Configure DLX for post-mortem analysis:
      task_reject_on_worker_lost = True
      task_acks_on_failure_or_timeout = False

Step 5: Audit Serialization and Security

The agent checks serialization settings:

Serialization Analysis:

  FAIL: task_serializer = "pickle"
    RISK: Pickle deserialization allows arbitrary code execution
    Any message injected into the broker can execute code on workers
    FIX: task_serializer = "json"
    AND: accept_content = ["json"]  (reject pickle messages)

  WARN: result_serializer not explicitly set
    Defaults to task_serializer — if pickle, results are also pickled
    FIX: result_serializer = "json"

  FAIL: accept_content = ["json", "pickle"]
    Still accepts pickle — attacker can send pickle-serialized message
    FIX: accept_content = ["json"] only

  WARN: Task "ml.train_model" passes non-JSON-serializable argument
    Argument: numpy array, pandas DataFrame
    RISK: Will fail with json serializer
    FIX: Serialize data before sending:
      - Pass file path or S3 key instead of raw data
      - Use msgpack serializer for binary data
      - Convert to list/dict before sending

  PASS: No task passes ORM model instances as arguments
    (ORM objects are not serializable and create stale data issues)

Step 6: Review Result Backend Configuration

The agent evaluates result backend setup:

Result Backend Analysis:

  Backend: Redis (redis://localhost:6379/1)

  FAIL: result_expires not configured
    Default: 24 hours — results accumulate in Redis
    For 10,000 tasks/day = 10,000 keys consuming memory
    FIX: result_expires = 3600  (1 hour, or match your read pattern)

  FAIL: Same Redis instance for broker and results
    redis://localhost:6379/0 (broker)
    redis://localhost:6379/1 (results)
    RISK: Result storage OOM can crash the broker
    FIX: Use separate Redis instances for broker and results

  WARN: result_extended = False (default)
    Extended results include task args, worker hostname, timestamps
    Valuable for debugging production issues
    FIX: result_extended = True

  WARN: 8 tasks have ignore_result=False but results are never read
    These tasks store results that nobody fetches
    FIX: Set ignore_result=True on fire-and-forget tasks:
      send_email, update_cache, sync_analytics, log_event,
      send_webhook, cleanup_files, update_counter, emit_metric

  FAIL: No result_backend_transport_options
    No connection pooling or timeout configuration
    FIX: result_backend_transport_options = {
        "retry_policy": {"max_retries": 3, "interval_start": 0.2},
        "socket_timeout": 5,
    }

Step 7: Analyze Beat Schedule

The agent reviews periodic task configuration:

Beat Schedule Analysis:

  Schedule: 12 periodic tasks configured

  FAIL: Overlapping schedules detected
    "sync_all_users" runs every 5 minutes (crontab(*/5))
    "sync_premium_users" runs every 3 minutes (crontab(*/3))
    Both query user table — concurrent runs cause lock contention
    FIX: Stagger schedules or merge into single task with priority flag

  FAIL: Task "generate_daily_report" — no expires setting
    Schedule: crontab(hour=2, minute=0) — runs at 2 AM
    If worker is down at 2 AM, task queues and runs when worker recovers
    By then the report may be stale or a new execution is already scheduled
    FIX: Add expires=3600 — discard if not executed within 1 hour

  FAIL: No task locking — concurrent executions possible
    "sync_inventory" runs every 10 minutes, average runtime 12 minutes
    Overlap: new execution starts before previous completes
    FIX: Use celery-once or implement distributed lock:
      from celery_once import QueueOnce
      @task(base=QueueOnce, once={"graceful": True})

  WARN: Beat schedule uses intervals instead of crontab for daily tasks
    "cleanup" — schedule=timedelta(hours=24)
    RISK: Interval drifts over time (runs at 2:00, then 2:01, 2:03...)
    FIX: Use crontab for fixed-time tasks:
      schedule=crontab(hour=2, minute=0)

  WARN: No jitter on high-frequency tasks
    5 tasks run every minute — all hit broker simultaneously
    FIX: Stagger start times:
      crontab(minute="*/5")  — :00, :05, :10...
      crontab(minute="1-59/5")  — :01, :06, :11... (offset by 1)

  Beat Health:
    Total periodic tasks: 12
    Tasks with expires: 3/12 (NEEDS ATTENTION)
    Tasks with lock protection: 1/12 (CRITICAL)
    Overlapping schedules: 2 pairs detected

Step 8: Review Worker Configuration

The agent audits worker settings:

Worker Configuration Analysis:

  FAIL: worker_prefetch_multiplier = 4 (default)
    With long-running tasks, worker grabs 4 tasks but can only process 1
    Other tasks wait in prefetch buffer — poor load distribution
    FIX: worker_prefetch_multiplier = 1 for long tasks
    OR: worker_prefetch_multiplier = 0 (disable) for strict fair scheduling

  FAIL: No worker_max_tasks_per_child configured
    RISK: Memory leaks in tasks accumulate until worker OOM
    FIX: worker_max_tasks_per_child = 1000
    Worker restarts after 1000 tasks — recovers leaked memory

  FAIL: No worker_max_memory_per_child configured
    RISK: Single memory-intensive task can crash the worker
    FIX: worker_max_memory_per_child = 400000  (400 MB in KB)
    Worker restarts if memory exceeds limit

  WARN: worker_pool = "prefork" with I/O-bound tasks
    Most tasks are API calls and database queries (I/O bound)
    Prefork creates heavy processes for each worker
    CONSIDER: worker_pool = "gevent" or "eventlet" for I/O-bound workloads
    CAUTION: Not safe if tasks use non-greenlet-safe libraries

  WARN: worker_concurrency not explicitly set
    Defaults to CPU count — may be too high for memory-heavy tasks
    FIX: Set based on workload:
      CPU-bound: worker_concurrency = CPU_COUNT
      I/O-bound (prefork): worker_concurrency = CPU_COUNT * 2
      I/O-bound (gevent): worker_concurrency = 100-500

  FAIL: No task_soft_time_limit default
    Individual tasks can run forever if no per-task limit set
    FIX: Set global defaults:
      task_soft_time_limit = 300  (5 minutes)
      task_time_limit = 330  (hard kill 30s after soft limit)

Step 9: Check Task Design Patterns

The agent evaluates task design for correctness:

Task Design Analysis:

  FAIL: Task "process_batch" accepts queryset as argument
    def process_batch(self, queryset):
    RISK: Django querysets are not serializable
    RISK: Even if pickled, data is stale by the time task runs
    FIX: Pass IDs, query in the task:
      def process_batch(self, item_ids: list[int]):
          items = Item.objects.filter(id__in=item_ids)

  FAIL: Task "update_dashboard" has database transaction spanning task
    with transaction.atomic():
        ... (entire task body)
    RISK: Long-running transaction holds locks, blocks other queries
    FIX: Use smaller transactions within the task, or use bulk operations

  FAIL: Task "send_notifications" iterates and sends one by one
    for user in users:
        send_push(user.id)  # Synchronous call inside task
    RISK: If task fails at user #500 of 1000, all 500 are lost
    FIX: Fan out to individual tasks:
      group(send_push.s(user_id) for user_id in user_ids).apply_async()

  WARN: Task "import_data" has no idempotency check
    Re-running creates duplicate records
    FIX: Use unique constraints and upsert logic, or check task ID:
      if cache.get(f"task_done:{self.request.id}"):
          return  # Already processed

  WARN: Task "process_order" calls another task synchronously
    result = charge_card.delay(order_id).get(timeout=30)
    FAIL: .get() blocks the worker — deadlock risk if workers exhausted
    FIX: Use chain or callback:
      chain(charge_card.s(order_id), fulfill_order.s()).apply_async()

  PASS: Task arguments are JSON-serializable primitives
  PASS: No global mutable state accessed between tasks

Step 10: Produce the Analysis Report

The agent generates a comprehensive report:

# Celery Task Analysis Report
# Project: /path/to/app/ | Date: April 30, 2026

## Overview
  Tasks: 15
  Periodic tasks: 12
  Queues: 4 (default, high_priority, bulk, notifications)
  Broker: RabbitMQ
  Result backend: Redis
  Worker pool: prefork

## Overall Health Score: 51/100

## Category Scores
  Task Configuration:     5/10  (missing retries, no time limits)
  Retry Policies:         4/10  (infinite retries, broad exception catch)
  Routing & Queues:       4/10  (no routing, single worker)
  Serialization:          3/10  (pickle in use — security risk)
  Result Backend:         5/10  (no expiry, shared instance)
  Beat Schedule:          5/10  (overlaps, no expires, no locks)
  Worker Config:          4/10  (no memory limits, no prefetch tuning)
  Task Design:            6/10  (queryset args, sync calls)
  Monitoring:             5/10  (no Flower, no metrics)

## Critical Issues
  1. Pickle serializer enabled — remote code execution risk
  2. Infinite retry task — poison messages never cleared
  3. No task time limits — stuck tasks block workers indefinitely
  4. Synchronous .get() call inside task — deadlock risk
  5. No worker memory limits — OOM crashes

## Recommendations Summary
  Estimated effort: 3-5 days for critical + high priority fixes
  Expected improvement: 51 -> 80 health score
  Risk reduction: Eliminates security vulnerability and 3 crash scenarios

Output

The agent produces:

  • Health score: 0-100 overall Celery configuration quality rating
  • Category scores: granular ratings for each quality dimension
  • Critical issues: problems that pose reliability or security risk
  • Per-task analysis: configuration audit for each task definition
  • Retry policy review: evaluation of each task's failure handling
  • Routing map: visual representation of task-to-queue assignments
  • Beat schedule timeline: periodic task execution timeline with overlap detection
  • Worker tuning recommendations: pool, concurrency, and memory settings
  • Remediation code: exact Python code to fix each issue
  • Priority matrix: issues ranked by risk and effort

Scope Options

ScopeWhat It Covers
Full (default)All tasks, config, beat, workers
Single taskDeep analysis of one task definition
Config onlyBroker, backend, serialization, worker settings
Beat onlyPeriodic task schedule analysis
Retry auditRetry policies across all tasks
SecuritySerialization, auth, and message signing

Broker Support

The agent adapts recommendations based on the broker detected:

FeatureRabbitMQRedisSQS
Priority queuesNative supportLimited (0-9)Not supported
Dead letter exchangeNative DLXManual implementationNative DLQ
Message persistencePer-messageAOF/RDBAlways persistent
Task routingExchange/routing keyQueue name onlyQueue URL
Visibility timeoutack_late + prefetchvisibility_timeoutDefault 30s
Max message size128 MB default512 MB256 KB (use S3)

Tips for Best Results

  • Point the agent at your entire project for full task discovery
  • Include celeryconfig.py or Django settings for configuration analysis
  • Share Flower metrics or task logs for runtime behavior analysis
  • Run before major deployments or scaling events
  • Combine with broker monitoring to correlate config issues with runtime failures
  • For Django projects, include settings.py and all apps with tasks.py files

Version tags

latestvk9704hbx2r30ae1qzm7qhmdkmx85v65k