Install
openclaw skills install data-pipeline-agentETL pipeline builder for business data — API extraction, data cleaning, transformation, and warehouse loading. Use when you need to move data between systems...
openclaw skills install data-pipeline-agentBuild, run, and monitor ETL (Extract → Transform → Load) pipelines for business data. Specializes in financial data flows, API integrations, and warehouse loading patterns for accounting and operations teams.
# Extract from REST API, clean, output CSV
import requests, pandas as pd, json
from datetime import datetime, timedelta
def extract(api_url, headers, params=None):
"""Pull paginated JSON from any REST endpoint."""
results = []
while api_url:
r = requests.get(api_url, headers=headers, params=params)
r.raise_for_status()
data = r.json()
results.extend(data.get("data", data if isinstance(data, list) else [data]))
api_url = data.get("next_page_url") # pagination
params = None # only pass params on first call
return results
def clean(records, rename_map=None, drop_nulls_on=None, date_cols=None):
"""Normalize, rename, parse dates, drop nulls."""
df = pd.DataFrame(records)
if rename_map:
df = df.rename(columns=rename_map)
if date_cols:
for col in date_cols:
df[col] = pd.to_datetime(df[col], errors="coerce")
if drop_nulls_on:
df = df.dropna(subset=drop_nulls_on)
df = df.drop_duplicates()
return df
def load_csv(df, output_path):
df.to_csv(output_path, index=False)
print(f"✅ Saved {len(df)} rows → {output_path}")
# Example: QBO Invoice Extract
HEADERS = {"Authorization": "Bearer <TOKEN>", "Accept": "application/json"}
records = extract("https://quickbooks.api.intuit.com/v3/company/<REALM>/query?query=SELECT * FROM Invoice", HEADERS)
df = clean(records, rename_map={"TxnDate": "invoice_date", "TotalAmt": "amount"}, date_cols=["invoice_date"])
load_csv(df, f"data/invoices_{datetime.today().date()}.csv")
import pandas as pd
def merge_gl_with_bank(gl_path, bank_path, match_on="amount", date_tolerance_days=3):
"""
Match GL entries to bank transactions.
Flags unmatched rows for manual review.
"""
gl = pd.read_csv(gl_path, parse_dates=["date"])
bank = pd.read_csv(bank_path, parse_dates=["date"])
# Merge on amount + date proximity
merged = pd.merge_asof(
gl.sort_values("date"),
bank.sort_values("date"),
on="date",
by=match_on,
tolerance=pd.Timedelta(days=date_tolerance_days),
direction="nearest",
suffixes=("_gl", "_bank")
)
unmatched_gl = gl[~gl.index.isin(merged.dropna(subset=["date_bank"]).index)]
unmatched_bank = bank[~bank.index.isin(merged.dropna(subset=["date_gl"]).index)]
print(f"✅ Matched: {len(merged.dropna())} | ⚠️ Unmatched GL: {len(unmatched_gl)} | Bank: {len(unmatched_bank)}")
return merged, unmatched_gl, unmatched_bank
import pandas as pd
def audit_dataset(df, required_cols=None, expected_types=None):
"""
Run data quality checks. Returns a report dict.
"""
report = {
"row_count": len(df),
"duplicate_rows": int(df.duplicated().sum()),
"null_summary": df.isnull().sum().to_dict(),
"issues": []
}
if required_cols:
missing = [c for c in required_cols if c not in df.columns]
if missing:
report["issues"].append(f"Missing required columns: {missing}")
if expected_types:
for col, dtype in expected_types.items():
if col in df.columns and not pd.api.types.is_dtype_equal(df[col].dtype, dtype):
report["issues"].append(f"{col}: expected {dtype}, got {df[col].dtype}")
# Flag columns with >20% nulls
for col, nulls in report["null_summary"].items():
pct = nulls / len(df) * 100
if pct > 20:
report["issues"].append(f"{col}: {pct:.1f}% null — review required")
return report
# Usage
df = pd.read_csv("data/ar_aging.csv")
report = audit_dataset(
df,
required_cols=["customer_id", "invoice_date", "amount", "due_date"],
expected_types={"amount": "float64", "customer_id": "object"}
)
print(report)
#!/bin/bash
# daily-gl-sync.sh — run via cron or OpenClaw cron tool
# Extracts GL, cleans, loads to SQLite, notifies on error
set -euo pipefail
LOG="logs/gl-sync-$(date +%Y-%m-%d).log"
mkdir -p logs data
echo "[$(date)] Starting GL sync..." | tee -a "$LOG"
python3 pipelines/gl_extract.py >> "$LOG" 2>&1 && \
python3 pipelines/gl_clean.py >> "$LOG" 2>&1 && \
python3 pipelines/gl_load.py >> "$LOG" 2>&1 && \
echo "[$(date)] ✅ GL sync complete" | tee -a "$LOG" || \
echo "[$(date)] ❌ GL sync FAILED — check $LOG" | tee -a "$LOG"
import pandas as pd
import sqlite3
def load_to_sqlite(df, db_path, table_name, if_exists="replace"):
"""
Load DataFrame to SQLite. Use if_exists='append' for incremental loads.
"""
conn = sqlite3.connect(db_path)
df.to_sql(table_name, conn, if_exists=if_exists, index=False)
conn.close()
print(f"✅ Loaded {len(df)} rows → {db_path}::{table_name}")
# PostgreSQL version (requires psycopg2 + sqlalchemy)
from sqlalchemy import create_engine
def load_to_postgres(df, conn_str, table_name, schema="public", if_exists="replace"):
engine = create_engine(conn_str)
df.to_sql(table_name, engine, schema=schema, if_exists=if_exists, index=False)
print(f"✅ Loaded {len(df)} rows → {schema}.{table_name}")
1. Extract: QBO Invoices API → raw JSON
2. Transform: Calculate days_outstanding, aging_bucket (0-30, 31-60, 61-90, 90+)
3. Enrich: Join with customer contact data
4. Load: Google Sheets "AR Aging" tab + SQLite archive
5. Alert: Flag invoices >60 days for follow-up queue
1. Extract: Bank API (Plaid/CSV export) + QBO GL
2. Transform: Normalize dates, amounts, memo fields
3. Match: Fuzzy join on amount + date (±3 days tolerance)
4. Flag: Unmatched transactions → manual review CSV
5. Load: Reconciliation log → SQLite + email summary
1. Extract: Payroll system CSV export (Gusto, ADP, etc.)
2. Transform: Map payroll codes → GL account numbers
3. Validate: Totals match payroll register
4. Load: Journal entry template → QBO batch import format
5. Archive: Raw + transformed files in dated folder
Before building any pipeline:
| Problem | Solution |
|---|---|
| Mixed date formats | pd.to_datetime(col, infer_datetime_format=True) |
| Currency strings ("$1,234.56") | col.str.replace(r'[$,]', '', regex=True).astype(float) |
| Duplicate rows | df.drop_duplicates(subset=['id']) |
| Null amounts | df['amount'].fillna(0) or df.dropna(subset=['amount']) |
| Inconsistent casing | df['name'].str.strip().str.title() |
| Leading/trailing spaces | df.apply(lambda x: x.str.strip() if x.dtype == 'object' else x) |
| Outlier detection | df[df['amount'].between(df['amount'].quantile(.01), df['amount'].quantile(.99))] |
# Daily GL sync at 6 AM CST
Schedule: cron "0 6 * * *" tz=America/Chicago
Payload: agentTurn — "Run the daily GL sync pipeline in ~/workspace/pipelines/"
Delivery: announce to Telegram on completion or failure
Install Python data stack:
pip install pandas requests sqlalchemy psycopg2-binary openpyxl xlrd
# For Google Sheets
pip install gspread gspread-dataframe google-auth
# For Plaid bank feeds
pip install plaid-python
workspace/
pipelines/
gl_extract.py
gl_clean.py
gl_load.py
ar_aging.py
bank_reconcile.py
data/
raw/ ← API responses, CSV imports (never edited)
processed/ ← cleaned, transformed data
archive/ ← date-stamped historical snapshots
logs/
pipeline-YYYY-MM-DD.log
scripts/
run-daily-pipelines.sh