Install
openclaw skills install airflow-dag-analyzerAnalyze Apache Airflow DAG definitions for quality, reliability, and operational best practices. Checks task dependencies, SLA compliance, retry policies, re...
openclaw skills install airflow-dag-analyzerAnalyze Apache Airflow DAG definitions for quality, reliability, and operational excellence. Reviews task dependencies, retry policies, SLA configurations, resource allocation, sensor patterns, error handling, and DAG structure. Acts as a senior data platform engineer auditing your Airflow deployment.
Invoke this skill when you need to review Airflow DAGs for quality issues, optimize performance, or ensure operational best practices.
Basic invocation:
Analyze the Airflow DAGs in /path/to/dags/ Review this DAG file for best practices Check DAG quality for production readiness
Focused analysis:
Check retry policies across all DAGs Find DAGs without SLA configurations Analyze task dependency structure for bottlenecks Review sensor configurations for timeout risks
The agent reads DAG Python files, parses task definitions, and produces a comprehensive quality report.
The agent locates and reads all DAG definitions:
# Find all DAG files
find /path/to/dags/ -name "*.py" -type f
# Identify which files contain DAG definitions
grep -rl "DAG\|@dag" /path/to/dags/ --include="*.py"
# Check for dynamic DAG generation
grep -rn "globals()\|create_dag\|dag_factory\|generate_dag" /path/to/dags/ --include="*.py"
The agent parses each DAG file to extract:
The agent checks each DAG's top-level configuration:
Schedule configuration:
# GOOD: Explicit schedule with catchup disabled
dag = DAG(
dag_id="etl_daily_orders",
schedule_interval="0 6 * * *",
catchup=False,
start_date=datetime(2026, 1, 1),
tags=["etl", "orders", "production"],
)
# PROBLEMS the agent detects:
FAIL: dag_id="etl_daily_orders"
catchup=True (default) — will backfill all dates from start_date
RISK: Accidental mass backfill on deploy, resource exhaustion
FIX: Set catchup=False unless intentional backfill is needed
FAIL: dag_id="report_generator"
start_date=datetime.now() — dynamic start_date
RISK: Start date changes on every scheduler restart, breaks idempotency
FIX: Use a fixed date: start_date=datetime(2026, 1, 1)
WARN: dag_id="data_sync"
No schedule_interval defined — DAG defaults to daily
FIX: Explicitly set schedule_interval (even if daily)
FAIL: dag_id="ml_training"
max_active_runs=None (unlimited)
RISK: Overlapping runs cause resource contention and data corruption
FIX: Set max_active_runs=1 for data pipelines
WARN: dag_id="analytics_refresh"
No tags defined
FIX: Add tags for filtering in the UI: tags=["analytics", "production"]
WARN: dag_id="customer_export"
No doc_md or description
FIX: Add DAG documentation for operators and on-call engineers
Default arguments audit:
# The agent checks default_args for completeness:
default_args = {
"owner": "data-team", # WHO: Required for accountability
"depends_on_past": False, # DEPENDENCY: Usually False for resilience
"email": ["data-team@company.com"], # ALERTS: Team email for notifications
"email_on_failure": True, # ALERTS: Must be True for production
"email_on_retry": False, # ALERTS: Usually False (too noisy)
"retries": 3, # RESILIENCE: Required for production
"retry_delay": timedelta(minutes=5), # RESILIENCE: Required with retries
"retry_exponential_backoff": True, # RESILIENCE: Prevents thundering herd
"max_retry_delay": timedelta(minutes=60),
"execution_timeout": timedelta(hours=2), # SAFETY: Prevents hanging tasks
"sla": timedelta(hours=4), # SLA: When should we be concerned
"on_failure_callback": alert_slack, # ALERTS: Custom alerting
}
Default Args Audit: etl_daily_orders
FAIL: No "retries" defined — tasks will not retry on transient failures
Production DAGs should have retries >= 2
FAIL: No "execution_timeout" — tasks can hang indefinitely
Risk: Zombie tasks consuming worker slots for hours/days
Recommend: Set based on expected duration + buffer (2x typical)
FAIL: No "on_failure_callback" — team not notified on failures
Recommend: Add Slack/PagerDuty callback for production DAGs
WARN: No "sla" defined — no SLA tracking
Recommend: Set SLA based on business requirements
WARN: "owner" = "airflow" (default) — no ownership attribution
Recommend: Set to team or individual responsible for the DAG
PASS: "depends_on_past" = False — good for resilience
PASS: "email_on_failure" = True — email alerts configured
The agent maps the dependency graph and checks for structural issues:
DAG Structure Analysis: etl_daily_orders
Tasks: 14
Max depth: 6 (levels of sequential dependencies)
Max width: 4 (maximum parallel tasks at any level)
Critical path: extract -> validate -> transform -> aggregate -> load -> notify
Critical path duration: estimated 45 min (sum of median task durations)
Dependency Graph:
extract_orders ─┐
extract_items ─┤
extract_users ─┴─> validate_data ─> transform ─> aggregate ─┐
├─> load_warehouse
extract_config ────────────────────────────────────────────────┘
└─> load_cache
load_warehouse ─┐
load_cache ─┴─> run_tests ─> notify_complete
Structural checks:
FAIL: Task "transform" has 3 upstream dependencies but creates a bottleneck
All downstream tasks wait for transform to complete
RECOMMEND: Split transform into independent sub-tasks that can run in parallel:
transform_orders, transform_items, transform_users
FAIL: Linear chain of 6 tasks where 3 could run in parallel
extract -> validate -> transform -> test -> load -> notify
RECOMMEND: Parallelize extract tasks, parallelize test + load where possible
WARN: Task "extract_config" has no downstream dependencies
This task runs but nothing depends on its output
Is this intentional? May be a dead task.
WARN: DAG has 14 tasks — approaching complexity threshold
RECOMMEND: Consider using TaskGroups to organize related tasks
FAIL: Circular dependency risk detected in dynamic task generation
Loop generates tasks but dependency chain references previous iteration
RISK: Can create circular dependency at runtime depending on data
Fan-out / fan-in analysis:
Fan-out at "validate_data": 1 -> 4 tasks
OK: Reasonable parallelism
Fan-in at "load_warehouse": 4 -> 1 tasks
WARN: Single point of failure — if any upstream fails, load fails
Consider: Use trigger_rule="none_failed_min_one_success" if partial loads acceptable
The agent evaluates resilience configuration:
Retry Policy Analysis:
Task: extract_api_data
retries: 0 <-- FAIL: API calls should always retry
Recommend: retries=3, retry_delay=timedelta(minutes=2),
retry_exponential_backoff=True
Task: load_to_warehouse
retries: 5, retry_delay: 1 minute
WARN: 5 retries * 1 min = only 5 min total retry window
For warehouse operations, recommend:
retries=3, retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=30)
This gives ~35 min retry window for transient warehouse issues
Task: send_email_report
retries: 10, retry_delay: 10 seconds
WARN: Aggressive retry on email — may trigger rate limits
Recommend: retries=3, retry_delay=timedelta(minutes=1)
Trigger rule analysis:
Task: run_quality_checks
trigger_rule: "all_success" (default)
OK: Quality checks should only run if all data loaded
Task: send_failure_alert
trigger_rule: "all_success" (default)
FAIL: Alert task should fire on failure
FIX: trigger_rule="one_failed" or "all_done"
Task: cleanup_temp_files
trigger_rule: "all_success"
WARN: Cleanup won't run if upstream fails — temp files accumulate
FIX: trigger_rule="all_done" — cleanup should always run
Sensors are a common source of production issues. The agent checks each sensor:
Sensor Analysis:
FAIL: ExternalTaskSensor("wait_for_upstream_dag")
mode="poke" (default), poke_interval=60s, timeout=3600s
PROBLEM: Poke mode holds a worker slot while waiting
At 60s interval for up to 1 hour = 1 worker slot blocked
FIX: mode="reschedule" — releases worker between pokes
FAIL: S3KeySensor("wait_for_file")
timeout=7200s (2 hours), no explicit timeout action
PROBLEM: If file never arrives, sensor blocks for 2 hours then fails
FIX: Add soft_fail=True to skip downstream tasks gracefully
OR: Add on_failure_callback to alert when file is late
WARN: SqlSensor("check_data_ready")
poke_interval=30s
PROBLEM: Polling database every 30s for extended periods
FIX: Increase poke_interval to 300s, use mode="reschedule"
WARN: HttpSensor("wait_for_api")
No exponential_backoff=True
RISK: Fixed-interval polling may hit rate limits
FIX: Add exponential_backoff=True
Sensor best practices checklist:
| Setting | Recommended | Why |
|---|---|---|
mode | "reschedule" | Releases worker slot between pokes |
timeout | Set explicitly | Prevents indefinite waits |
poke_interval | 300s+ for external deps | Reduces load on external systems |
soft_fail | True for optional deps | Prevents blocking entire DAG |
exponential_backoff | True for APIs | Prevents rate limiting |
The agent reviews resource configuration:
Resource Analysis:
WARN: No pool assignments detected
All 14 tasks run in default pool (128 slots)
RISK: This DAG competes with all other DAGs for worker slots
RECOMMEND: Assign pools for resource-intensive tasks:
- "api_pool" (limit=5) for API extraction tasks
- "warehouse_pool" (limit=10) for database operations
WARN: Task "heavy_transform" has no resource constraints
KubernetesPodOperator without resource requests/limits
RISK: Can consume unlimited cluster resources
FIX: Add resources=k8s.V1ResourceRequirements(
requests={"memory": "2Gi", "cpu": "1"},
limits={"memory": "4Gi", "cpu": "2"}
)
WARN: No priority_weight set on critical path tasks
All tasks have equal priority (default=1)
RECOMMEND: Set higher priority_weight for critical path tasks
to ensure they're scheduled first when workers are constrained
The agent checks that operators are used correctly:
Operator Analysis:
FAIL: BashOperator with inline script (15 lines)
Task: "complex_transformation"
PROBLEM: Complex logic in BashOperator is hard to test and maintain
FIX: Move to PythonOperator or a standalone script
FAIL: PythonOperator calling subprocess.run()
Task: "run_spark_job"
PROBLEM: Bypasses Airflow's operator framework
FIX: Use SparkSubmitOperator or BashOperator for CLI commands
WARN: BranchPythonOperator without join task
Task: "check_data_quality"
Branches to "process_good_data" or "handle_bad_data"
But no downstream join — one branch's downstream tasks are skipped
RECOMMEND: Add a join task with trigger_rule="none_failed_min_one_success"
WARN: Multiple ShortCircuitOperator in sequence
Tasks: "check_weekday", "check_data_exists", "check_flag"
PROBLEM: Unclear which condition caused the circuit to break
FIX: Combine conditions or add logging to each operator
FAIL: Using SubDagOperator
Task: "process_subtasks"
SubDagOperator is DEPRECATED and causes deadlocks
FIX: Replace with TaskGroup (Airflow 2.0+)
The agent checks operational visibility:
SLA Configuration:
FAIL: 8 of 12 production DAGs have no SLA defined
DAGs without SLA: etl_orders, etl_users, report_daily, ...
RISK: Data pipeline delays go unnoticed until stakeholders complain
WARN: DAG "etl_critical" has SLA=6 hours, schedule=daily
SLA allows data to be 6+ hours late before alert
RECOMMEND: Tighten to 2 hours based on downstream consumer requirements
FAIL: No sla_miss_callback defined for any DAG
Email-only SLA alerts are often missed
RECOMMEND: Add callback for Slack/PagerDuty integration
Monitoring Gaps:
FAIL: No task-level execution_timeout on 23 tasks
Risk: Zombie tasks silently consume resources
WARN: No Airflow metrics exported (statsd/prometheus)
Recommend: Enable statsd metrics for:
- scheduler.heartbeat
- dag_processing.total_parse_time
- executor.open_slots
- task_instance.duration
The agent evaluates whether DAGs are safe to re-run:
Idempotency Analysis:
FAIL: Task "load_orders" uses INSERT without UPSERT logic
Re-running this task will create duplicate rows
FIX: Use INSERT ... ON CONFLICT DO UPDATE (PostgreSQL)
OR: Use MERGE statement (Snowflake/BigQuery)
OR: Use DELETE+INSERT pattern within a transaction
FAIL: Task "export_to_s3" writes to fixed S3 key
s3://bucket/exports/orders.csv
Re-running overwrites without versioning
FIX: Use execution_date in path:
s3://bucket/exports/orders/{{ ds }}/orders.csv
WARN: Task "send_notification" has no idempotency check
Re-running sends duplicate Slack messages
RECOMMEND: Add check for already-sent notification for this execution_date
PASS: Task "create_report" uses execution_date in output path
Safe to re-run — produces same output for same execution_date
The agent generates a comprehensive report:
# Airflow DAG Analysis Report
# DAGs Directory: /opt/airflow/dags/ | Date: April 30, 2026
## Overview
Total DAGs: 12
Total tasks: 87
Production DAGs: 9
Development DAGs: 3
## Overall Health Score: 58/100
## Category Scores
DAG Configuration: 6/10 (catchup, schedule, max_active_runs)
Default Args: 4/10 (retries, timeouts, callbacks)
Task Dependencies: 7/10 (DAG structure, bottlenecks)
Retry Policies: 5/10 (missing retries, bad intervals)
Sensor Config: 4/10 (poke mode, no timeouts)
Resource Allocation: 5/10 (no pools, no limits)
Operator Usage: 6/10 (deprecated operators, misuse)
SLA & Monitoring: 3/10 (missing SLAs, no callbacks)
Idempotency: 5/10 (duplicate risk on re-run)
Code Quality: 7/10 (imports, structure, DRY)
## Critical Issues (Production Risk)
1. 8 DAGs have catchup=True — risk of accidental mass backfill
2. 23 tasks have no execution_timeout — zombie task risk
3. SubDagOperator in use — deprecated, causes deadlocks
4. 4 sensors using poke mode — blocking worker slots
5. No SLA monitoring on critical data pipelines
## High Priority
6. No retry policies on API-dependent tasks
7. No pools configured — resource contention risk
8. INSERT without upsert — duplicate data on re-runs
9. No failure callbacks — team not alerted on failures
## Recommendations Summary
Estimated effort: 3-5 days for critical + high priority fixes
Expected improvement: 58 -> 82 health score
Risk reduction: Eliminates 4 classes of production incidents
The agent produces:
| Scope | What It Covers |
|---|---|
| Full (default) | All DAGs, all checks |
| Single DAG | Deep analysis of one DAG file |
| Category | One check category across all DAGs (e.g., only retry policies) |
| Production only | Only DAGs tagged as production |
| Changed | Only DAG files changed in current git branch |
The agent adapts its recommendations based on the Airflow version detected:
| Feature | Airflow 1.x | Airflow 2.x |
|---|---|---|
| TaskGroups | N/A (use SubDagOperator) | Recommended over SubDag |
| TaskFlow API | N/A | Recommended for Python tasks |
| Timetables | N/A | Recommended over cron strings for complex schedules |
| Dynamic Task Mapping | N/A | Recommended for variable task counts |
| Dataset-driven scheduling | N/A | Recommended for event-driven pipelines (2.4+) |