数据管道工具箱

快速构建ETL数据管道 — 提取(APIs/数据库/文件)、转换(清洗/过滤/聚合)、加载(数据仓库),支持定时调度和监控告警。

Audits

Pass

Install

openclaw skills install data-pipeline-toolkit-v2

数据管道工具箱

快速构建ETL数据管道:提取 → 转换 → 加载 → 调度

核心功能

  1. 多源提取 — REST APIs、GraphQL、SQL数据库、CSV/JSON/Parquet文件、S3/云存储、Kafka/SQS
  2. 数据转换 — 清洗、过滤、聚合、关联、跨表Join
  3. 多目标加载 — PostgreSQL/MySQL、Snowflake/BigQuery、S3、数据仓库
  4. 定时调度 — Cron任务或事件触发
  5. 监控告警 — 失败自动通知,可视化运行状态

快速开始

# 创建数据管道
./pipeline.sh create my-pipeline

# 添加数据源
./pipeline.sh extract my-pipeline api --url https://api.example.com/data

# 添加转换规则
./pipeline.sh transform my-pipeline filter "status == 'active'"
./pipeline.sh transform my-pipeline aggregate "group by category, sum(amount)"

# 添加目标存储
./pipeline.sh load my-pipeline postgres --connection $DATABASE_URL

# 运行管道
./pipeline.sh run my-pipeline

支持的数据源

类型具体来源
APIsREST API, GraphQL, 内部服务
数据库PostgreSQL, MySQL, MongoDB, SQL Server
文件CSV, JSON, Parquet, Excel
云存储AWS S3, Google Cloud Storage
消息队列Kafka, AWS SQS

支持的目标存储

类型具体目标
数据库PostgreSQL, MySQL, BigQuery, Snowflake
数据仓库ClickHouse, DuckDB, TimescaleDB
文件存储S3, GCS, 本地文件
API第三方API回传

典型使用场景

场景1:每日销售数据汇总

# 从CRM API提取昨日销售数据
./pipeline.sh extract daily-sales api \
  --url "https://crm.example.com/api/orders?date=yesterday"

# 转换:按产品分类汇总
./pipeline.sh transform daily-sales aggregate \
  --group-by "product_category" \
  --sum "quantity,amount"

# 加载到数据仓库
./pipeline.sh load daily-sales bigquery \
  --project "my-project" --dataset "sales" --table "daily_summary"

# 设置每日定时任务
./pipeline.sh schedule daily-sales "0 6 * * *"

场景2:用户行为数据同步

# 从日志文件提取
./pipeline.sh extract user-logs file --path "/var/logs/app/*.json"

# 清洗和转换
./pipeline.sh transform user-logs filter "event_type != 'heartbeat'"
./pipeline.sh transform user-logs add-column "timestamp:parse_timestamp(time)"

# 加载到ClickHouse
./pipeline.sh load user-logs clickhouse --connection $CH_URL

监控与告警

查看运行状态

./pipeline.sh status my-pipeline
# 输出:
# Status: ✅ Running
# Last Run: 2026-05-05 06:00:00
# Duration: 45s
# Records Processed: 12,847
# Errors: 0

配置告警

# 失败时发送邮件
./pipeline.sh alert my-pipeline email --to admin@example.com

# 失败时发送飞书消息
./pipeline.sh alert my-pipeline webhook --url "https://open.feishu.cn/..."

推荐资源


由 AI智造工坊 (http://ai.qnitgroup.com) 整理发布 | 安装源: ClawHub