Install
openclaw skills install data-pipeline-toolkit-v2快速构建ETL数据管道 — 提取(APIs/数据库/文件)、转换(清洗/过滤/聚合)、加载(数据仓库),支持定时调度和监控告警。
openclaw skills install data-pipeline-toolkit-v2快速构建ETL数据管道:提取 → 转换 → 加载 → 调度
# 创建数据管道
./pipeline.sh create my-pipeline
# 添加数据源
./pipeline.sh extract my-pipeline api --url https://api.example.com/data
# 添加转换规则
./pipeline.sh transform my-pipeline filter "status == 'active'"
./pipeline.sh transform my-pipeline aggregate "group by category, sum(amount)"
# 添加目标存储
./pipeline.sh load my-pipeline postgres --connection $DATABASE_URL
# 运行管道
./pipeline.sh run my-pipeline
| 类型 | 具体来源 |
|---|---|
| APIs | REST API, GraphQL, 内部服务 |
| 数据库 | PostgreSQL, MySQL, MongoDB, SQL Server |
| 文件 | CSV, JSON, Parquet, Excel |
| 云存储 | AWS S3, Google Cloud Storage |
| 消息队列 | Kafka, AWS SQS |
| 类型 | 具体目标 |
|---|---|
| 数据库 | PostgreSQL, MySQL, BigQuery, Snowflake |
| 数据仓库 | ClickHouse, DuckDB, TimescaleDB |
| 文件存储 | S3, GCS, 本地文件 |
| API | 第三方API回传 |
# 从CRM API提取昨日销售数据
./pipeline.sh extract daily-sales api \
--url "https://crm.example.com/api/orders?date=yesterday"
# 转换:按产品分类汇总
./pipeline.sh transform daily-sales aggregate \
--group-by "product_category" \
--sum "quantity,amount"
# 加载到数据仓库
./pipeline.sh load daily-sales bigquery \
--project "my-project" --dataset "sales" --table "daily_summary"
# 设置每日定时任务
./pipeline.sh schedule daily-sales "0 6 * * *"
# 从日志文件提取
./pipeline.sh extract user-logs file --path "/var/logs/app/*.json"
# 清洗和转换
./pipeline.sh transform user-logs filter "event_type != 'heartbeat'"
./pipeline.sh transform user-logs add-column "timestamp:parse_timestamp(time)"
# 加载到ClickHouse
./pipeline.sh load user-logs clickhouse --connection $CH_URL
./pipeline.sh status my-pipeline
# 输出:
# Status: ✅ Running
# Last Run: 2026-05-05 06:00:00
# Duration: 45s
# Records Processed: 12,847
# Errors: 0
# 失败时发送邮件
./pipeline.sh alert my-pipeline email --to admin@example.com
# 失败时发送飞书消息
./pipeline.sh alert my-pipeline webhook --url "https://open.feishu.cn/..."
由 AI智造工坊 (http://ai.qnitgroup.com) 整理发布 | 安装源: ClawHub