{"skill":{"slug":"afrexai-data-engineering","displayName":"Data Engineering","summary":"Design and operate scalable data pipelines and architectures using best-fit patterns, tools, and modeling methodologies without external dependencies.","description":"# Data Engineering Command Center\n\nComplete methodology for designing, building, operating, and scaling data pipelines and infrastructure. Zero dependencies — pure agent skill.\n\n---\n\n## Phase 1: Data Architecture Assessment\n\nBefore building anything, understand the landscape.\n\n### Architecture Brief\n\n```yaml\nproject_name: \"\"\nbusiness_context: \"\"\ndata_consumers:\n  - team: \"\"\n    use_case: \"\"          # analytics | ML | operational | reporting | reverse-ETL\n    latency_requirement: \"\"  # real-time (<1s) | near-real-time (<5min) | batch (hourly+)\n    query_pattern: \"\"     # ad-hoc | scheduled | API | dashboard\n\ncurrent_state:\n  sources: []             # list every system producing data\n  storage: []             # where data lives today\n  pain_points: []         # what's broken, slow, unreliable\n  data_volume:\n    current_gb_per_day: 0\n    growth_rate_percent: 0\n    retention_months: 0\n\nconstraints:\n  budget_monthly_usd: 0\n  team_size: 0\n  skill_level: \"\"         # junior | mid | senior | mixed\n  compliance: []          # GDPR, HIPAA, SOX, PCI, none\n  cloud_provider: \"\"      # AWS | GCP | Azure | multi | on-prem\n```\n\n### Architecture Pattern Decision Matrix\n\n| Signal | Pattern | When to Use |\n|--------|---------|-------------|\n| All consumers need data hourly+ | **Batch ETL** | Reporting, warehousing, most analytics |\n| Some need <5 min latency | **Micro-batch** | Dashboard freshness, near-real-time analytics |\n| Events need <1s processing | **Streaming** | Fraud detection, real-time pricing, alerts |\n| Need both batch + streaming | **Lambda** | When batch accuracy + real-time speed both matter |\n| Want to simplify Lambda | **Kappa** | When you can reprocess from stream replay |\n| Data lake + warehouse combined | **Lakehouse** | When you need both cheap storage + fast SQL |\n| Sources change independently | **Data Mesh** | Large orgs, domain-owned data, >5 teams |\n| ML is primary consumer | **Feature Store** | ML-heavy orgs with feature reuse needs |\n\n### Technology Selection Guide\n\n#### Orchestration\n\n| Tool | Best For | Avoid When |\n|------|----------|------------|\n| **Airflow** | Complex DAGs, Python-native teams, mature ecosystem | Simple pipelines (<5 tasks) |\n| **Dagster** | Software-defined assets, strong typing, dev experience | Legacy team resistant to new paradigms |\n| **Prefect** | Dynamic workflows, cloud-native, Python-first | Need on-prem with no cloud dependency |\n| **dbt** | SQL transformations, ELT, analytics engineering | Non-SQL transforms, streaming |\n| **Temporal** | Long-running workflows, retry-heavy, microservices | Simple ETL, small teams |\n| **Cron + scripts** | <3 pipelines, solo engineer, simple schedules | Anything with dependencies or retries |\n\n#### Processing\n\n| Tool | Best For | Avoid When |\n|------|----------|------------|\n| **Spark** | >100GB, complex transforms, ML pipelines | <10GB (overkill), real-time streaming |\n| **DuckDB** | Local analytics, <100GB, SQL on files | Distributed processing, production streaming |\n| **Polars** | Single-node, Rust-speed, <50GB, DataFrames | Distributed, need Spark ecosystem |\n| **Pandas** | <1GB, quick analysis, prototyping | Production pipelines, anything >5GB |\n| **Flink** | True streaming, event-time processing | Batch-only, small team (steep learning curve) |\n| **SQL (warehouse)** | ELT in Snowflake/BigQuery/Redshift | Complex ML transforms, binary data |\n\n#### Storage\n\n| Tool | Best For | Avoid When |\n|------|----------|------------|\n| **Snowflake** | Analytics, separation of compute/storage, multi-cloud | Tight budget, real-time OLTP |\n| **BigQuery** | GCP-native, serverless, large-scale analytics | Multi-cloud, need fine-grained cost control |\n| **Redshift** | AWS-native, existing AWS ecosystem | Elastic scaling needs, multi-cloud |\n| **Databricks** | ML + analytics unified, Spark-native, lakehouse | Pure SQL analytics, small data |\n| **PostgreSQL** | OLTP + light analytics, <500GB, budget-conscious | >1TB analytics, real-time dashboards on large data |\n| **S3/GCS/ADLS** | Raw data lake, cheap storage, any format | Direct SQL queries (need compute layer) |\n| **Delta Lake/Iceberg** | Table format on data lake, ACID on files | Simple file storage, no lakehouse need |\n\n---\n\n## Phase 2: Data Modeling\n\n### Modeling Methodology Decision\n\n| Approach | Best For | Key Concept |\n|----------|----------|-------------|\n| **Kimball (Dimensional)** | BI/reporting, star schemas | Facts + Dimensions, business-process-centric |\n| **Inmon (3NF)** | Enterprise data warehouse, single source of truth | Normalized, subject-area-centric |\n| **Data Vault 2.0** | Agile warehousing, auditability, multiple sources | Hubs + Links + Satellites, insert-only |\n| **One Big Table (OBT)** | Simple analytics, few joins, dashboard performance | Pre-joined, denormalized, fast queries |\n| **Activity Schema** | Event analytics, product analytics | Entity + Activity + Feature columns |\n\n### Dimensional Model Template\n\n```yaml\nfact_table:\n  name: \"fact_[business_process]\"\n  grain: \"\"                    # one row = one [what]?\n  grain_statement: \"One row per [transaction/event/snapshot] at [time grain]\"\n  measures:\n    - name: \"\"\n      type: \"\"                 # additive | semi-additive | non-additive\n      aggregation: \"\"          # SUM | AVG | COUNT | MIN | MAX | COUNT DISTINCT\n      business_definition: \"\"\n  degenerate_dimensions: []    # IDs stored in fact (order_number, invoice_id)\n  foreign_keys: []             # links to dimension tables\n\ndimensions:\n  - name: \"dim_[entity]\"\n    type: \"\"                   # Type 1 (overwrite) | Type 2 (history) | Type 3 (previous value)\n    natural_key: \"\"            # business key from source\n    surrogate_key: \"\"          # warehouse-generated key\n    attributes:\n      - name: \"\"\n        source: \"\"\n        scd_type: \"\"           # 1 | 2 | 3\n    hierarchy: []              # e.g., [country, region, city, store]\n```\n\n### SCD Type Decision Guide\n\n| Scenario | SCD Type | Implementation |\n|----------|----------|----------------|\n| Don't care about history | **Type 1** | UPDATE in place |\n| Need full history | **Type 2** | New row + valid_from/valid_to + is_current flag |\n| Only need previous value | **Type 3** | Add previous_[column] |\n| Track changes with timestamps | **Type 4** | Mini-dimension (history table) |\n| Hybrid: some attrs Type 1, some Type 2 | **Type 6** | Combine 1+2+3 in one table |\n\n**Default recommendation:** Type 2 for anything business-critical (customer status, product price, employee department). Type 1 for everything else.\n\n### Naming Conventions\n\n| Object | Convention | Example |\n|--------|-----------|---------|\n| Raw/staging tables | `raw_[source]_[table]` | `raw_stripe_payments` |\n| Staging models | `stg_[source]__[entity]` | `stg_stripe__payments` |\n| Intermediate models | `int_[entity]_[verb]` | `int_orders_pivoted` |\n| Mart/fact tables | `fct_[business_process]` | `fct_orders` |\n| Dimension tables | `dim_[entity]` | `dim_customers` |\n| Metrics/aggregates | `mrt_[domain]_[metric]` | `mrt_sales_daily` |\n| Snapshots | `snp_[entity]_[grain]` | `snp_inventory_daily` |\n| Columns: boolean | `is_[state]` or `has_[thing]` | `is_active`, `has_subscription` |\n| Columns: timestamp | `[event]_at` | `created_at`, `shipped_at` |\n| Columns: date | `[event]_date` | `order_date` |\n| Columns: ID | `[entity]_id` | `customer_id` |\n| Columns: amount | `[thing]_amount` | `order_amount` |\n| Columns: count | `[thing]_count` | `line_item_count` |\n\n---\n\n## Phase 3: Pipeline Design Patterns\n\n### Universal Pipeline Template\n\n```yaml\npipeline:\n  name: \"\"\n  owner: \"\"\n  schedule: \"\"               # cron expression\n  sla_minutes: 0             # max acceptable runtime\n  tier: \"\"                   # 1 (critical) | 2 (important) | 3 (nice-to-have)\n\n  extract:\n    source_system: \"\"\n    connection: \"\"\n    strategy: \"\"             # full | incremental | CDC | log-based\n    incremental_key: \"\"      # column for incremental (e.g., updated_at)\n    watermark_storage: \"\"    # where to persist last-extracted position\n\n  transform:\n    engine: \"\"               # SQL | Spark | Python | dbt\n    stages:\n      - name: \"clean\"\n        operations: []       # dedupe, null handling, type casting, trimming\n      - name: \"conform\"\n        operations: []       # standardize codes, currencies, timezones\n      - name: \"enrich\"\n        operations: []       # lookups, calculations, derived fields\n      - name: \"aggregate\"\n        operations: []       # rollups, pivots, window functions\n\n  load:\n    target_system: \"\"\n    strategy: \"\"             # append | upsert | merge | truncate-reload | partition-swap\n    merge_keys: []\n    partition_key: \"\"\n    clustering_keys: []\n\n  quality_gates:\n    pre_load: []             # checks before writing\n    post_load: []            # checks after writing\n\n  error_handling:\n    strategy: \"\"             # fail-fast | dead-letter | retry | skip-and-alert\n    max_retries: 3\n    retry_delay_seconds: 300\n    alert_channels: []\n```\n\n### Extraction Strategy Decision Tree\n\n```\nIs the source database?\n├── Yes → Does it support CDC?\n│   ├── Yes → Use CDC (Debezium, AWS DMS, Fivetran)\n│   │   Best for: high-volume, low-latency, minimal source impact\n│   └── No → Does it have a reliable updated_at column?\n│       ├── Yes → Incremental extraction on updated_at\n│       │   ⚠️ Won't catch hard deletes — add periodic full reconciliation\n│       └── No → Full extraction\n│           Only viable for small tables (<1M rows)\n├── Is it an API?\n│   ├── Supports webhooks? → Event-driven ingestion\n│   ├── Has cursor/pagination? → Incremental with cursor bookmark\n│   └── No pagination? → Full pull with rate-limit handling\n├── Is it files (S3, SFTP, email)?\n│   └── Event-triggered (S3 notification, file watcher)\n│       Validate: schema, completeness, filename pattern\n└── Is it streaming (Kafka, Kinesis, Pub/Sub)?\n    └── Consumer group with offset management\n        Key decisions: at-least-once vs exactly-once, consumer lag alerting\n```\n\n### Load Strategy Decision\n\n| Strategy | When | Trade-off |\n|----------|------|-----------|\n| **Append** | Event/log data, immutable facts | Simple but grows forever — partition + retain |\n| **Upsert/Merge** | Dimension updates, SCD Type 1 | Handles updates but slower on large tables |\n| **Truncate-Reload** | Small tables (<1M), reference data | Simple but window of missing data |\n| **Partition Swap** | Large fact tables, daily loads | Atomic, fast, but needs partition alignment |\n| **Soft Delete** | Need audit trail of deletions | Adds complexity to every downstream query |\n\n### Idempotency Rules (NON-NEGOTIABLE)\n\nEvery pipeline MUST be re-runnable without side effects:\n\n1. **Use MERGE/UPSERT, never blind INSERT** for mutable data\n2. **Partition-swap for immutable data** — drop partition + reload\n3. **Store watermarks externally** — not in the pipeline code\n4. **Dedup at ingestion** — use source natural keys\n5. **Test by running twice** — output must be identical both times\n\n---\n\n## Phase 4: Data Quality Framework\n\n### Quality Dimensions\n\n| Dimension | Definition | Example Check |\n|-----------|-----------|---------------|\n| **Completeness** | No missing values where required | `NOT NULL` on required fields, row count within range |\n| **Uniqueness** | No unexpected duplicates | Primary key uniqueness, natural key uniqueness |\n| **Validity** | Values within expected domain | Enum checks, range checks, regex patterns |\n| **Accuracy** | Data matches real-world truth | Cross-system reconciliation, manual spot checks |\n| **Freshness** | Data arrives on time | `MAX(loaded_at) > NOW() - INTERVAL '2 hours'` |\n| **Consistency** | Same data agrees across systems | Sum reconciliation between source and target |\n\n### Quality Check Templates\n\n```sql\n-- Completeness: Required fields not null\nSELECT COUNT(*) AS null_violations\nFROM {table}\nWHERE {required_column} IS NULL;\n-- Threshold: 0\n\n-- Uniqueness: No duplicate primary keys\nSELECT {pk_column}, COUNT(*) AS dupe_count\nFROM {table}\nGROUP BY {pk_column}\nHAVING COUNT(*) > 1;\n-- Threshold: 0\n\n-- Freshness: Data arrived within SLA\nSELECT CASE\n  WHEN MAX({timestamp_col}) > CURRENT_TIMESTAMP - INTERVAL '{sla_hours} hours'\n  THEN 'PASS' ELSE 'FAIL'\nEND AS freshness_check\nFROM {table};\n\n-- Volume: Row count within expected range\nSELECT CASE\n  WHEN COUNT(*) BETWEEN {min_expected} AND {max_expected}\n  THEN 'PASS' ELSE 'FAIL'\nEND AS volume_check\nFROM {table}\nWHERE {partition_col} = '{run_date}';\n\n-- Referential: FK integrity\nSELECT COUNT(*) AS orphan_count\nFROM {fact_table} f\nLEFT JOIN {dim_table} d ON f.{fk} = d.{pk}\nWHERE d.{pk} IS NULL;\n-- Threshold: 0\n\n-- Distribution: No unexpected skew\nSELECT {column}, COUNT(*) AS cnt,\n  ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) AS pct\nFROM {table}\nGROUP BY {column}\nORDER BY cnt DESC;\n-- Alert if any single value > {max_pct}%\n\n-- Cross-system reconciliation\nSELECT\n  (SELECT SUM(amount) FROM source_system.orders WHERE date = '{date}') AS source_total,\n  (SELECT SUM(amount) FROM warehouse.fct_orders WHERE order_date = '{date}') AS target_total,\n  ABS(source_total - target_total) AS variance;\n-- Threshold: variance < 0.01 * source_total (1%)\n```\n\n### Data Contract Template\n\n```yaml\ncontract:\n  name: \"\"\n  version: \"\"\n  owner: \"\"                    # team responsible for producing this data\n  consumers: []                # teams consuming this data\n  sla:\n    freshness_hours: 0\n    availability_percent: 99.9\n    support_hours: \"\"          # business-hours | 24x7\n\n  schema:\n    - column: \"\"\n      type: \"\"\n      nullable: false\n      description: \"\"\n      business_definition: \"\"\n      pii: false\n      checks:\n        - type: \"\"             # not_null | unique | range | enum | regex | custom\n          params: {}\n\n  breaking_change_policy: \"\"   # notify-30-days | version-bump | never-break\n  notification_channel: \"\"\n```\n\n### Quality Severity Levels\n\n| Level | Definition | Response |\n|-------|-----------|----------|\n| **P0 — Critical** | Data corruption, wrong numbers in production dashboards, compliance data wrong | Stop pipeline, alert immediately, rollback if possible |\n| **P1 — High** | Missing data for key reports, SLA breach, >5% of records affected | Alert team, fix within 4 hours, post-mortem required |\n| **P2 — Medium** | Non-critical field quality, <1% records affected, no downstream impact | Fix in next sprint, add monitoring to prevent recurrence |\n| **P3 — Low** | Cosmetic issues, edge cases, non-critical data | Backlog, fix when convenient |\n\n---\n\n## Phase 5: Performance Optimization\n\n### SQL Optimization Checklist\n\n| Problem | Fix | Impact |\n|---------|-----|--------|\n| Full table scan | Add/use partition pruning | 10-100x faster |\n| Large joins | Pre-aggregate before joining | 5-50x faster |\n| SELECT * | Select only needed columns | 2-10x faster (columnar stores) |\n| Correlated subquery | Rewrite as JOIN or window function | 10-100x faster |\n| DISTINCT on large result | Fix upstream duplication instead | 2-5x faster |\n| ORDER BY without LIMIT | Add LIMIT or remove if not needed | Prevents memory spills |\n| String operations in WHERE | Pre-compute, use lookup table | Enables index usage |\n| Multiple passes over same data | Combine with CASE WHEN + GROUP BY | 2-5x faster |\n| NOT IN with NULLs | Use NOT EXISTS or LEFT JOIN IS NULL | Correctness + performance |\n\n### Spark Optimization Guide\n\n| Problem | Solution |\n|---------|----------|\n| Shuffle-heavy joins | Broadcast small table (`broadcast(df)`) if <100MB |\n| Data skew | Salt the skewed key: add random prefix, join on salted key, aggregate |\n| Small files | Coalesce output: `.coalesce(target_files)` or use adaptive query execution |\n| Too many partitions | `spark.sql.shuffle.partitions` = 2-3x cluster cores |\n| OOM errors | Increase `spark.executor.memory`, reduce partition size, spill to disk |\n| Slow writes | Use Parquet with snappy, partition by date, avoid small writes |\n| Repeated computation | `.cache()` or `.persist()` DataFrames used >1 time |\n| Complex transformations | Push down predicates, filter early, select early |\n\n### Partitioning Strategy\n\n| Data Type | Partition Key | Why |\n|-----------|--------------|-----|\n| Transactional/event | Date (daily or monthly) | Most queries filter by time range |\n| Multi-tenant | Tenant ID + date | Isolate tenant queries, time-range pruning |\n| Geospatial | Region + date | Regional queries are common |\n| Log data | Date + hour | High volume needs finer partitions |\n| Reference/dimension | Don't partition | Too small, full scan is fine |\n\n**Rules:**\n- Target 100MB-1GB per partition (compressed)\n- <10,000 total partitions per table\n- Never partition on high-cardinality columns (user_id)\n- Always include partition key in WHERE clauses\n\n---\n\n## Phase 6: Data Governance & Cataloging\n\n### Data Classification\n\n| Level | Examples | Controls |\n|-------|---------|----------|\n| **Public** | Product catalog, published stats | No restrictions |\n| **Internal** | Aggregated metrics, non-PII analytics | Auth required, audit logging |\n| **Confidential** | Customer PII, financial records, HR data | Encryption, column-level access, masking |\n| **Restricted** | SSN, payment cards, health records, passwords | Encryption at rest + transit, tokenization, audit every access, retention limits |\n\n### PII Handling Rules\n\n1. **Identify:** Scan all sources for PII columns (name, email, phone, SSN, DOB, address, IP)\n2. **Classify:** Tag each with sensitivity level\n3. **Minimize:** Only ingest PII you actually need\n4. **Protect:** \n   - Hash or tokenize in staging (SHA-256 with salt for pseudonymization)\n   - Dynamic masking for non-privileged users\n   - Column-level encryption for restricted data\n5. **Retain:** Auto-delete after retention period\n6. **Audit:** Log every query touching PII columns\n7. **Right to delete:** Build a deletion pipeline that propagates across all derived tables\n\n### Data Catalog Entry Template\n\n```yaml\ndataset:\n  name: \"\"\n  description: \"\"\n  owner_team: \"\"\n  steward: \"\"                  # person responsible for quality\n  domain: \"\"                   # sales | marketing | finance | product | engineering\n  tier: \"\"                     # gold (trusted) | silver (cleaned) | bronze (raw)\n  \n  lineage:\n    sources: []                # upstream datasets/systems\n    transformations: \"\"        # brief description of key transforms\n    downstream: []             # who consumes this\n  \n  refresh:\n    schedule: \"\"\n    sla_hours: 0\n    last_successful_run: \"\"\n  \n  quality:\n    tests: []                  # list of quality checks\n    last_score: 0              # 0-100\n    known_issues: []\n  \n  access:\n    classification: \"\"         # public | internal | confidential | restricted\n    pii_columns: []\n    access_request_process: \"\" # how to get access\n  \n  usage:\n    avg_daily_queries: 0\n    top_consumers: []\n    cost_monthly_usd: 0\n```\n\n---\n\n## Phase 7: Pipeline Monitoring & Alerting\n\n### Pipeline Health Dashboard\n\n```yaml\ndashboard:\n  pipeline_metrics:\n    - metric: \"Pipeline Success Rate\"\n      formula: \"successful_runs / total_runs * 100\"\n      target: \">99%\"\n      alert_threshold: \"<95%\"\n\n    - metric: \"Average Runtime\"\n      formula: \"avg(end_time - start_time) over 7 days\"\n      target: \"<SLA\"\n      alert_threshold: \">80% of SLA\"\n\n    - metric: \"Data Freshness\"\n      formula: \"NOW() - MAX(loaded_at)\"\n      target: \"<SLA hours\"\n      alert_threshold: \">SLA\"\n\n    - metric: \"Data Volume Variance\"\n      formula: \"abs(today_rows - avg_7d_rows) / avg_7d_rows * 100\"\n      target: \"<20%\"\n      alert_threshold: \">50%\"\n\n    - metric: \"Quality Check Pass Rate\"\n      formula: \"passed_checks / total_checks * 100\"\n      target: \"100%\"\n      alert_threshold: \"<95%\"\n\n    - metric: \"Failed Pipeline Count\"\n      formula: \"count where status = failed in last 24h\"\n      target: \"0\"\n      alert_threshold: \">0\"\n\n    - metric: \"Backfill Queue\"\n      formula: \"count of pending backfill requests\"\n      target: \"0\"\n      alert_threshold: \">5\"\n\n    - metric: \"Infrastructure Cost\"\n      formula: \"compute + storage + egress\"\n      target: \"<budget\"\n      alert_threshold: \">110% budget\"\n```\n\n### Alert Severity\n\n| Severity | Condition | Response Time | Example |\n|----------|-----------|---------------|---------|\n| **P0** | Revenue/compliance impacting | 15 min | Payment pipeline down, regulatory report delayed |\n| **P1** | Business-critical dashboard stale | 1 hour | Executive dashboard >4h stale |\n| **P2** | Non-critical pipeline failed | 4 hours | Marketing attribution delayed |\n| **P3** | Warning/degradation | Next business day | Pipeline 80% of SLA, minor quality drift |\n\n### Structured Logging Standard\n\nEvery pipeline run MUST log:\n\n```json\n{\n  \"pipeline_name\": \"\",\n  \"run_id\": \"\",\n  \"started_at\": \"\",\n  \"completed_at\": \"\",\n  \"status\": \"success|failed|partial\",\n  \"stage\": \"\",\n  \"rows_extracted\": 0,\n  \"rows_transformed\": 0,\n  \"rows_loaded\": 0,\n  \"rows_rejected\": 0,\n  \"quality_checks_passed\": 0,\n  \"quality_checks_failed\": 0,\n  \"duration_seconds\": 0,\n  \"error_message\": \"\",\n  \"watermark_before\": \"\",\n  \"watermark_after\": \"\"\n}\n```\n\n---\n\n## Phase 8: Testing Strategy\n\n### Pipeline Test Pyramid\n\n| Layer | What to Test | How | When |\n|-------|-------------|-----|------|\n| **Unit** | Individual transforms, business logic | pytest with fixtures, dbt unit tests | Every PR |\n| **Integration** | Source connectivity, schema compatibility | Test against staging/dev environment | Daily + PR |\n| **Contract** | Schema hasn't changed, data types stable | Schema registry, contract tests | Every pipeline run |\n| **Data Quality** | Completeness, uniqueness, freshness, validity | Quality framework checks | Every pipeline run |\n| **E2E** | Full pipeline produces correct output | Golden dataset comparison | Weekly + release |\n| **Performance** | Runtime within SLA, no regression | Benchmark against historical runs | Weekly |\n\n### dbt Testing Checklist\n\n```yaml\n# For every model, define at minimum:\nmodels:\n  - name: fct_orders\n    columns:\n      - name: order_id\n        tests:\n          - unique\n          - not_null\n      - name: customer_id\n        tests:\n          - not_null\n          - relationships:\n              to: ref('dim_customers')\n              field: customer_id\n      - name: order_amount\n        tests:\n          - not_null\n          - dbt_utils.accepted_range:\n              min_value: 0\n              max_value: 1000000\n      - name: order_status\n        tests:\n          - accepted_values:\n              values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']\n      - name: ordered_at\n        tests:\n          - not_null\n          - dbt_utils.recency:\n              datepart: day\n              field: ordered_at\n              interval: 2\n```\n\n### Backfill Protocol\n\nWhen you need to reprocess historical data:\n\n1. **Scope:** Define exact date range and affected tables\n2. **Impact assessment:** What downstream models/dashboards will be affected?\n3. **Communication:** Notify consumers of temporary data inconsistency\n4. **Isolation:** Run backfill in separate compute to avoid impacting current pipelines\n5. **Validation:** Compare row counts and key metrics pre/post backfill\n6. **Execution:** Process in reverse-chronological order (most recent first)\n7. **Monitoring:** Watch for resource spikes, duplicate creation\n8. **Verification:** Reconcile against source after completion\n9. **Documentation:** Log what was backfilled, why, and any anomalies found\n\n---\n\n## Phase 9: Cost Optimization\n\n### Cloud Cost Reduction Strategies\n\n| Strategy | Savings | Effort |\n|----------|---------|--------|\n| Right-size compute (auto-scaling) | 20-40% | Low |\n| Use spot/preemptible instances for batch | 60-80% | Medium |\n| Compress data (Parquet + Snappy/Zstd) | 50-80% storage | Low |\n| Lifecycle policies (hot → warm → cold → archive) | 40-70% storage | Low |\n| Eliminate unused tables/pipelines | 10-30% | Low |\n| Optimize query patterns (partition pruning) | 30-60% compute | Medium |\n| Reserved capacity for steady-state | 30-50% | Medium |\n| Cache expensive queries | 20-50% compute | Medium |\n\n### Cost Allocation Template\n\n```yaml\ncost_tracking:\n  by_pipeline:\n    - pipeline: \"\"\n      compute_monthly_usd: 0\n      storage_monthly_usd: 0\n      egress_monthly_usd: 0\n      total: 0\n      cost_per_row: 0        # total / rows_processed\n      business_value: \"\"     # what revenue/decision does this enable?\n      roi_justified: true    # is the cost worth it?\n\n  optimization_opportunities:\n    - description: \"\"\n      estimated_savings_usd: 0\n      effort: \"\"             # low | medium | high\n      priority: 0            # 1 = do now\n```\n\n### Cost Red Flags\n\n- Single pipeline >30% of total spend\n- Cost per row increasing month-over-month\n- Tables with 0 queries in 30 days\n- Dev/staging environments running 24/7\n- Full table scans on >1TB tables\n- Uncompressed data in cloud storage\n- Cross-region data transfer\n\n---\n\n## Phase 10: Operational Runbooks\n\n### Pipeline Failure Triage\n\n```\nPipeline failed →\n1. Check error message in logs\n   ├── Connection timeout → Check source availability, network, credentials\n   ├── Schema mismatch → Source schema changed → update extract + notify\n   ├── Data quality check failed → Investigate source data, check for anomalies\n   ├── Out of memory → Increase resources or optimize query\n   ├── Permission denied → Check IAM roles, token expiry\n   ├── Duplicate key violation → Check idempotency, investigate source dupes\n   └── Timeout (SLA breach) → Check data volume spike, query plan, cluster health\n\n2. Determine impact\n   ├── What dashboards/reports are affected?\n   ├── What's the data freshness SLA?\n   └── Who needs to be notified?\n\n3. Fix\n   ├── Transient (network, timeout) → Retry\n   ├── Data issue → Fix source data, re-run with quality gate override if safe\n   ├── Schema change → Update pipeline, backfill if needed\n   └── Infrastructure → Scale up, file ticket with cloud provider\n\n4. Post-fix\n   ├── Verify data correctness\n   ├── Update runbook with new failure mode\n   └── Add monitoring/alerting to catch earlier next time\n```\n\n### Schema Change Management\n\nWhen a source system changes schema:\n\n1. **Detect:** Schema comparison check in extraction pipeline (hash schema, compare to registered)\n2. **Classify:**\n   - **Additive** (new column): Usually safe — add to pipeline, backfill if needed\n   - **Rename**: Map old → new in transform, update downstream\n   - **Type change**: Assess compatibility, may need cast or historical rebuild\n   - **Column removed**: Critical — breaks queries, need immediate attention\n3. **Test:** Run pipeline in dry-run mode with new schema\n4. **Deploy:** Update transforms, quality checks, documentation\n5. **Communicate:** Notify downstream consumers via data contract channel\n\n### Disaster Recovery\n\n| Scenario | RPO | RTO | Recovery Steps |\n|----------|-----|-----|----------------|\n| Pipeline code lost | 0 (git) | 1h | Redeploy from git, restore orchestrator state |\n| Warehouse data corrupted | Varies | 4h | Restore from Time Travel/snapshot, re-run affected pipelines |\n| Source system down | N/A | Wait | Queue extractions, catch up with incremental once restored |\n| Cloud region outage | 24h | 8h | Failover to DR region if configured, else wait |\n| Credential compromise | 0 | 2h | Rotate all credentials, audit access logs, re-run affected pipelines |\n\n---\n\n## Phase 11: Advanced Patterns\n\n### Slowly Changing Dimension Type 2 (SQL Template)\n\n```sql\n-- Merge pattern for SCD Type 2\nMERGE INTO dim_customer AS target\nUSING (\n  SELECT * FROM stg_customers\n  WHERE updated_at > (SELECT MAX(valid_from) FROM dim_customer)\n) AS source\nON target.customer_natural_key = source.customer_id\n   AND target.is_current = TRUE\n\n-- Update: close old record\nWHEN MATCHED AND (\n  target.customer_name != source.name OR\n  target.customer_status != source.status\n  -- list all Type 2 tracked columns\n) THEN UPDATE SET\n  is_current = FALSE,\n  valid_to = CURRENT_TIMESTAMP\n\n-- Insert: new record (both new customers and changed ones)\nWHEN NOT MATCHED THEN INSERT (\n  customer_natural_key, customer_name, customer_status,\n  valid_from, valid_to, is_current\n) VALUES (\n  source.customer_id, source.name, source.status,\n  CURRENT_TIMESTAMP, '9999-12-31', TRUE\n);\n\n-- Then insert new versions of changed records\nINSERT INTO dim_customer (\n  customer_natural_key, customer_name, customer_status,\n  valid_from, valid_to, is_current\n)\nSELECT customer_id, name, status,\n  CURRENT_TIMESTAMP, '9999-12-31', TRUE\nFROM stg_customers s\nWHERE EXISTS (\n  SELECT 1 FROM dim_customer d\n  WHERE d.customer_natural_key = s.customer_id\n    AND d.is_current = FALSE\n    AND d.valid_to = CURRENT_TIMESTAMP\n);\n```\n\n### CDC with Debezium (Architecture Pattern)\n\n```\nSource DB → Debezium Connector → Kafka Topic → \n  ├── Stream processor (Flink/Spark Streaming) → Target DB\n  ├── S3 sink connector → Data Lake (raw)\n  └── Elasticsearch sink → Search index\n```\n\nKey decisions:\n- **Topic per table** or **single topic**: Per table (easier routing, independent scaling)\n- **Schema registry**: Always use (Confluent Schema Registry or AWS Glue)\n- **Serialization**: Avro (compact + schema evolution) or Protobuf (strict + fast)\n- **Offset management**: Connector manages; monitor consumer lag\n\n### Feature Store Pattern\n\n```yaml\nfeature_store:\n  entity: \"customer\"\n  entity_key: \"customer_id\"\n  \n  features:\n    - name: \"total_orders_30d\"\n      description: \"Total orders in last 30 days\"\n      type: \"INT\"\n      source: \"fct_orders\"\n      computation: \"batch\"      # batch | streaming | on-demand\n      freshness: \"daily\"\n      ttl_hours: 48\n    \n    - name: \"avg_order_value_90d\"\n      description: \"Average order value last 90 days\"\n      type: \"FLOAT\"\n      source: \"fct_orders\"\n      computation: \"batch\"\n      freshness: \"daily\"\n      ttl_hours: 48\n    \n    - name: \"last_login_minutes_ago\"\n      description: \"Minutes since last login event\"\n      type: \"INT\"\n      source: \"events_stream\"\n      computation: \"streaming\"\n      freshness: \"real-time\"\n      ttl_hours: 1\n  \n  serving:\n    online: true               # low-latency feature serving (Redis/DynamoDB)\n    offline: true              # batch feature retrieval for training\n    point_in_time_correct: true  # prevent feature leakage in ML training\n```\n\n### Data Mesh Principles\n\nIf operating at scale (>5 data teams):\n\n1. **Domain ownership**: Each business domain owns its data products (not central data team)\n2. **Data as a product**: Treat datasets like products — SLAs, documentation, discoverability\n3. **Self-serve platform**: Central team builds the platform, domains build on top\n4. **Federated governance**: Standards and interoperability maintained centrally, implementation decentralized\n\n**When NOT to use Data Mesh:**\n- <5 data producers/consumers\n- Small team (<20 engineers total)\n- Single business domain\n- Early-stage company (over-engineering)\n\n---\n\n## Quality Scoring Rubric (0-100)\n\n| Dimension | Weight | Scoring |\n|-----------|--------|---------|\n| **Pipeline Reliability** | 20 | 0=frequent failures, 10=some failures with manual recovery, 20=99.5%+ success rate with auto-retry |\n| **Data Quality** | 20 | 0=no checks, 10=basic null/unique checks, 20=comprehensive quality framework with contracts |\n| **Performance** | 15 | 0=regularly breaches SLA, 8=meets SLA, 15=well under SLA with optimization |\n| **Documentation** | 10 | 0=none, 5=basic README, 10=full catalog entries with lineage and business definitions |\n| **Monitoring** | 15 | 0=no alerts, 8=failure alerts only, 15=proactive monitoring with dashboards and anomaly detection |\n| **Testing** | 10 | 0=no tests, 5=basic smoke tests, 10=full test pyramid (unit+integration+contract+E2E) |\n| **Cost Efficiency** | 10 | 0=no cost tracking, 5=tracked, 10=optimized with ROI justification per pipeline |\n\n**Scoring guide:**\n- 0-40: Critical gaps — prioritize pipeline reliability and data quality\n- 41-60: Functional but fragile — add monitoring, testing, documentation\n- 61-80: Solid — optimize performance, cost, governance\n- 81-100: Excellent — maintain, innovate, mentor\n\n---\n\n## Edge Cases & Gotchas\n\n### Timezone Traps\n- Store everything in UTC. Convert only at presentation layer\n- Event timestamps: use event time, not processing time\n- Daylight saving: `TIMESTAMP WITH TIME ZONE`, never `WITHOUT`\n- Late-arriving data: watermark strategy + allowed lateness window\n\n### Late-Arriving Data\n- Define maximum acceptable lateness per source\n- Reprocess affected partitions when late data arrives\n- Track late arrival rate as a quality metric\n- Consider separate \"late data\" pipeline that patches in\n\n### Exactly-Once Processing\n- True exactly-once is expensive. Most systems need at-least-once + idempotent writes\n- Use transaction IDs or natural keys for deduplication\n- Kafka: use idempotent producer + transactional consumer\n- Database: MERGE/UPSERT on natural key\n\n### Schema Evolution\n- **Forward compatible**: New code reads old data (safe to deploy new readers first)\n- **Backward compatible**: Old code reads new data (safe to deploy new writers first)\n- **Full compatible**: Both directions (safest, most restrictive)\n- Use Avro or Protobuf with schema registry for streaming data\n\n### Multi-Tenant Data\n- Tenant ID in every table, every query, every log\n- Row-level security in warehouse\n- Separate compute per tenant (or at least isolation)\n- Never join across tenants without explicit business reason\n- Tenant-aware backfill (don't rebuild all tenants for one tenant's issue)\n\n### Data Lake Anti-Patterns\n- \"Data Swamp\": ingesting everything with no organization or catalog → only ingest what has a known consumer\n- Small files: thousands of <1MB files → compact regularly (target 100MB-1GB)\n- No table format: raw Parquet/CSV without Delta/Iceberg → loses ACID, schema evolution, time travel\n- No access controls: single bucket, everyone admin → implement IAM per domain/team\n\n---\n\n## Natural Language Commands\n\nSay any of these to activate specific workflows:\n\n1. **\"Design a data pipeline for [source] to [target]\"** → Full pipeline template with extraction strategy, transforms, load pattern, quality checks\n2. **\"Model [entity/domain] for analytics\"** → Dimensional model with fact/dimension tables, grain, measures, SCD types\n3. **\"Optimize this query/pipeline\"** → Performance analysis with specific recommendations\n4. **\"Set up data quality for [table/pipeline]\"** → Quality framework with checks, contracts, monitoring\n5. **\"Audit our data infrastructure\"** → Full assessment using scoring rubric\n6. **\"Help with [Spark/Airflow/dbt/Kafka] issue\"** → Troubleshooting with technology-specific guidance\n7. **\"Design a data catalog for our org\"** → Catalog template with governance, classification, lineage\n8. **\"Plan a data migration from [old] to [new]\"** → Migration plan with validation, rollback, parallel-run\n9. **\"Set up monitoring for our pipelines\"** → Dashboard template with alerts, logging standards, runbooks\n10. **\"Review our data costs\"** → Cost analysis with optimization strategies and ROI framework\n11. **\"Handle schema change in [source]\"** → Change management protocol with impact assessment\n12. **\"Backfill [table] for [date range]\"** → Backfill protocol with validation and communication plan\n","tags":{"analytics":"1.0.0","data":"1.0.0","engineering":"1.0.0","etl":"1.0.0","latest":"1.0.0","pipeline":"1.0.0"},"stats":{"comments":0,"downloads":1112,"installsAllTime":42,"installsCurrent":5,"stars":0,"versions":1},"createdAt":1771435551171,"updatedAt":1778491574637},"latestVersion":{"version":"1.0.0","createdAt":1771435551171,"changelog":"Data Engineering Command Center skill, initial release:\n\n- Provides a complete, zero-dependency methodology for designing, building, and scaling data pipelines and infrastructure.\n- Includes detailed assessment templates, architecture pattern decision matrices, and technology selection guides for orchestration, processing, and storage.\n- Offers best-practice data modeling methodologies, templates, SCD guidance, and naming conventions.\n- Supplies universal pipeline design patterns for extraction, transformation, and loading phases.\n- Designed for clarity and actionability—no external dependencies or opinionated tooling biases.","license":null},"metadata":null,"owner":{"handle":"1kalin","userId":"s17e1q0nx23qnh4n429zzqc05x83hvsw","displayName":"1kalin","image":"https://avatars.githubusercontent.com/u/15705344?v=4"},"moderation":null}