Dagster Pipeline Analyzer

v1.0.0

Analyze Dagster pipelines and software-defined assets for quality, scheduling, partitioning, IO managers, resource configuration, and observability. Checks a...

0· 19· 1 versions· 1 current· 1 all-time· Updated 2h ago· MIT-0

Install

openclaw skills install dagster-pipeline-analyzer

Dagster Pipeline Analyzer

Analyze Dagster software-defined assets and pipelines for quality, reliability, and operational excellence. Reviews asset dependency graphs, freshness policies, partitioning strategies, IO managers, resources, sensors/schedules, and observability gaps. Acts as a senior data platform engineer auditing your Dagster deployment.

Usage

Basic: Analyze the Dagster project in /path/to/dagster/ Focused: Check asset freshness policies | Analyze partition strategies | Review IO manager configuration | Find assets without observability metadata

How It Works

Step 1: Discover Dagster Definitions

find /path/to/project -name "definitions.py" -o -name "repository.py"
find /path/to/project -name "*.py" -path "*/assets/*"
find /path/to/project -name "*.py" -path "*/sensors/*" -o -path "*/schedules/*"
cat /path/to/project/dagster.yaml /path/to/project/workspace.yaml 2>/dev/null

Parses @asset, @multi_asset, @op, @graph, @job, ConfigurableResource, ConfigurableIOManager, @schedule, @sensor, and partition definitions.

Step 2: Audit Asset Graph Structure

Asset Graph: 34 assets, 5 groups (raw, staging, warehouse, analytics, ml)
Max depth: 7 | External assets: 3 | Source assets: 4

  [source] s3_raw_events -> raw_events -> cleaned_events
    -> event_aggregates -> daily_metrics -> churn_features -> churn_model

  FAIL: "orphan_transform" has no downstream consumers
    Last materialized 49 days ago. Remove or document purpose.

  FAIL: "daily_metrics" depends on 6 upstream assets — fragile bottleneck
    If any upstream fails, materialization blocked.
    RECOMMEND: Add retry logic or partial materialization support

  FAIL: No @asset_check defined for any asset
    RECOMMEND: Add row count, null checks, freshness, schema validation

Step 3: Review Freshness Policies

  Assets WITH freshness policy: 8/34 (24%)

  FAIL: "daily_revenue" — no freshness policy on business-critical metric
    FIX: FreshnessPolicy(maximum_lag_minutes=120)

  FAIL: Freshness SLA cascade violation:
    "event_aggregates" has 60-min freshness
    but upstream "cleaned_events" takes ~20 min to materialize
    Effective budget: 10 min. Actual avg: 25 min. SLA violated regularly.
    FIX: Relax to 90 min or speed up materialization

  WARN: "user_segments" has 1440-min (24h) freshness
    but downstream "real_time_recommendations" expects fresh data
    FIX: Tighten to maximum_lag_minutes=60

Step 4: Analyze Partitioning

  "raw_events": DailyPartitionsDefinition — PASS, matches data pattern
  "event_aggregates": MonthlyPartitionsDefinition
    WARN: Upstream is daily — verify TimeWindowPartitionMapping
  "ml_features": Unpartitioned, processes 2M+ rows every run (45 min)
    FAIL: Add DailyPartitionsDefinition for incremental processing (~3 min/partition)

  FAIL: No BackfillPolicy on any partitioned asset
    Risk: Accidental backfill of 486 partitions = 486 concurrent runs
    FIX: BackfillPolicy(max_partitions_per_run=10)

Step 5: Review IO Managers

  Configured: "io_manager" (Filesystem), "warehouse_io" (Snowflake), "s3_io" (S3Pickle)

  FAIL: Default is FilesystemIOManager — data lost on pod restart
    FIX: Set default to S3/GCS/database-backed store

  FAIL: S3PickleIOManager — not portable, security risk (arbitrary code exec)
    FIX: Switch to S3ParquetIOManager for schema enforcement + compression

  WARN: No return type annotations — IO manager cannot validate schema
    FIX: @asset def daily_metrics(...) -> pd.DataFrame:

  Coverage: warehouse_io 35%, s3_io 24%, filesystem 41% (concern)

Step 6: Audit Resources

  FAIL: snowflake_password="..." in definitions.py line 42
    FIX: Use EnvVar("SNOWFLAKE_PASSWORD")

  FAIL: Resource "spark" defined but never used — remove it

  WARN: Resource "dbt" has hardcoded project path
    FIX: Use EnvVar or relative path

  WARN: No resource-level health checks at startup

Step 7: Review Schedules and Sensors

  FAIL: "daily_etl" uses cron but assets have DailyPartitionsDefinition
    FIX: build_schedule_from_partitions() for auto-alignment

  FAIL: "s3_file_sensor" polls every 30s — high API cost, rate limit risk
    FIX: minimum_interval_seconds=300 or SQS/SNS event-driven

  FAIL: "freshness_sensor" has no error handling — crashes sensor daemon
    FIX: Wrap in try/except, return SkipReason on failure

  WARN: 14 assets have no automation — manual-only materialization
  WARN: No @asset_sensor for cross-job dependency coordination

Step 8: Check Observability

  FAIL: 22/34 assets have no description
  FAIL: No MaterializeResult metadata (no row counts, schema, quality)
    FIX: yield MaterializeResult(metadata={"row_count": len(df), ...})
  FAIL: No asset checks defined
  WARN: No code_version on any asset — always re-materializes
  WARN: No owners defined — alerts not routable

  Coverage: descriptions 35%, freshness 24%, code_version 0%,
            checks 0%, metadata 15%, owners 0%

Step 9: Final Report

# Dagster Pipeline Analysis Report

## Overall Health Score: 51/100
  Asset graph: 7/10       Freshness: 4/10       Partitioning: 5/10
  IO managers: 4/10       Resources: 5/10       Schedules/sensors: 5/10
  Observability: 3/10     Security: 4/10        Documentation: 3/10

## Critical Issues
  1. Credentials hardcoded in definitions.py
  2. Default IO manager is local filesystem — data loss risk
  3. Pickle serialization — security + portability risk
  4. No asset checks — data quality issues undetected
  5. No backfill safety — accidental mass materialization risk

## High Priority
  6. 76% assets have no freshness policy
  7. S3 sensor polling every 30s
  8. No materialization metadata
  9. Freshness SLA cascade violation
  10. 14 assets with no automation

Output

  • Asset graph visualization with dependency depth, bottlenecks, orphans
  • Freshness analysis with SLA cascade validation
  • Partition audit covering strategy review and backfill safety
  • IO manager review for serialization, durability, consistency
  • Resource/schedule/sensor audit with security and config checks
  • Observability gaps for metadata, checks, documentation coverage
  • Health score 0-100 with per-category breakdown and remediation code

Tips for Best Results

  • Point the agent at your Dagster project root directory
  • Include dagster.yaml and workspace.yaml for deployment context
  • Provide Dagster version for version-appropriate recommendations
  • Run before major asset graph changes or quarterly as a health check

Version tags

latestvk975pncz1cxjcdwqgzjas8qyhd85tr05