Install
openclaw skills install dagster-pipeline-analyzerAnalyze Dagster pipelines and software-defined assets for quality, scheduling, partitioning, IO managers, resource configuration, and observability. Checks asset dependencies, freshness policies, group structure, sensor/schedule configuration, and operational best practices. Use when asked to review Dagster code, audit asset graphs, check Dagster best practices, analyze partition strategies, review IO managers, or optimize Dagster deployments. Triggers on "dagster", "software-defined assets", "dagster assets", "dagster pipeline", "dagster schedule", "dagster sensor", "dagster partition", "io manager", "dagster resources", "dagster ops", "dagster jobs", "asset graph".
openclaw skills install dagster-pipeline-analyzerAnalyze Dagster software-defined assets and pipelines for quality, reliability, and operational excellence. Reviews asset dependency graphs, freshness policies, partitioning strategies, IO managers, resources, sensors/schedules, and observability gaps. Acts as a senior data platform engineer auditing your Dagster deployment.
Basic: Analyze the Dagster project in /path/to/dagster/
Focused: Check asset freshness policies | Analyze partition strategies | Review IO manager configuration | Find assets without observability metadata
find /path/to/project -name "definitions.py" -o -name "repository.py"
find /path/to/project -name "*.py" -path "*/assets/*"
find /path/to/project -name "*.py" -path "*/sensors/*" -o -path "*/schedules/*"
cat /path/to/project/dagster.yaml /path/to/project/workspace.yaml 2>/dev/null
Parses @asset, @multi_asset, @op, @graph, @job, ConfigurableResource, ConfigurableIOManager, @schedule, @sensor, and partition definitions.
Asset Graph: 34 assets, 5 groups (raw, staging, warehouse, analytics, ml)
Max depth: 7 | External assets: 3 | Source assets: 4
[source] s3_raw_events -> raw_events -> cleaned_events
-> event_aggregates -> daily_metrics -> churn_features -> churn_model
FAIL: "orphan_transform" has no downstream consumers
Last materialized 49 days ago. Remove or document purpose.
FAIL: "daily_metrics" depends on 6 upstream assets — fragile bottleneck
If any upstream fails, materialization blocked.
RECOMMEND: Add retry logic or partial materialization support
FAIL: No @asset_check defined for any asset
RECOMMEND: Add row count, null checks, freshness, schema validation
Assets WITH freshness policy: 8/34 (24%)
FAIL: "daily_revenue" — no freshness policy on business-critical metric
FIX: FreshnessPolicy(maximum_lag_minutes=120)
FAIL: Freshness SLA cascade violation:
"event_aggregates" has 60-min freshness
but upstream "cleaned_events" takes ~20 min to materialize
Effective budget: 10 min. Actual avg: 25 min. SLA violated regularly.
FIX: Relax to 90 min or speed up materialization
WARN: "user_segments" has 1440-min (24h) freshness
but downstream "real_time_recommendations" expects fresh data
FIX: Tighten to maximum_lag_minutes=60
"raw_events": DailyPartitionsDefinition — PASS, matches data pattern
"event_aggregates": MonthlyPartitionsDefinition
WARN: Upstream is daily — verify TimeWindowPartitionMapping
"ml_features": Unpartitioned, processes 2M+ rows every run (45 min)
FAIL: Add DailyPartitionsDefinition for incremental processing (~3 min/partition)
FAIL: No BackfillPolicy on any partitioned asset
Risk: Accidental backfill of 486 partitions = 486 concurrent runs
FIX: BackfillPolicy(max_partitions_per_run=10)
Configured: "io_manager" (Filesystem), "warehouse_io" (Snowflake), "s3_io" (S3Pickle)
FAIL: Default is FilesystemIOManager — data lost on pod restart
FIX: Set default to S3/GCS/database-backed store
FAIL: S3PickleIOManager — not portable, security risk (arbitrary code exec)
FIX: Switch to S3ParquetIOManager for schema enforcement + compression
WARN: No return type annotations — IO manager cannot validate schema
FIX: @asset def daily_metrics(...) -> pd.DataFrame:
Coverage: warehouse_io 35%, s3_io 24%, filesystem 41% (concern)
FAIL: snowflake_password="..." in definitions.py line 42
FIX: Use EnvVar("SNOWFLAKE_PASSWORD")
FAIL: Resource "spark" defined but never used — remove it
WARN: Resource "dbt" has hardcoded project path
FIX: Use EnvVar or relative path
WARN: No resource-level health checks at startup
FAIL: "daily_etl" uses cron but assets have DailyPartitionsDefinition
FIX: build_schedule_from_partitions() for auto-alignment
FAIL: "s3_file_sensor" polls every 30s — high API cost, rate limit risk
FIX: minimum_interval_seconds=300 or SQS/SNS event-driven
FAIL: "freshness_sensor" has no error handling — crashes sensor daemon
FIX: Wrap in try/except, return SkipReason on failure
WARN: 14 assets have no automation — manual-only materialization
WARN: No @asset_sensor for cross-job dependency coordination
FAIL: 22/34 assets have no description
FAIL: No MaterializeResult metadata (no row counts, schema, quality)
FIX: yield MaterializeResult(metadata={"row_count": len(df), ...})
FAIL: No asset checks defined
WARN: No code_version on any asset — always re-materializes
WARN: No owners defined — alerts not routable
Coverage: descriptions 35%, freshness 24%, code_version 0%,
checks 0%, metadata 15%, owners 0%
# Dagster Pipeline Analysis Report
## Overall Health Score: 51/100
Asset graph: 7/10 Freshness: 4/10 Partitioning: 5/10
IO managers: 4/10 Resources: 5/10 Schedules/sensors: 5/10
Observability: 3/10 Security: 4/10 Documentation: 3/10
## Critical Issues
1. Credentials hardcoded in definitions.py
2. Default IO manager is local filesystem — data loss risk
3. Pickle serialization — security + portability risk
4. No asset checks — data quality issues undetected
5. No backfill safety — accidental mass materialization risk
## High Priority
6. 76% assets have no freshness policy
7. S3 sensor polling every 30s
8. No materialization metadata
9. Freshness SLA cascade violation
10. 14 assets with no automation