{"skill":{"slug":"data-pipeline-toolkit-v2","displayName":"数据管道工具箱","summary":"快速构建ETL数据管道 — 提取(APIs/数据库/文件)、转换(清洗/过滤/聚合)、加载(数据仓库)，支持定时调度和监控告警。","description":"---\nname: data-pipeline-toolkit\ndescription: 快速构建ETL数据管道 — 提取(APIs/数据库/文件)、转换(清洗/过滤/聚合)、加载(数据仓库)，支持定时调度和监控告警。\nversion: 1.0.0\nauthor: yesong-Hue\ntags: [ETL, 数据管道, 数据集成, 自动化, 调度]\nreadme: |\n  # 数据管道工具箱\n\n  快速构建企业级ETL数据管道。\n\n  ## 核心功能\n\n  - Extract：支持APIs、数据库、CSV/JSON/Parquet文件、S3等\n  - Transform：清洗、过滤、聚合、关联\n  - Load：PostgreSQL/MySQL、Snowflake/BigQuery、S3等\n  - 定时调度：Cron或事件驱动\n  - 监控告警：失败自动通知\n\n  ## 安装\n\n  ```bash\n  clawhub install data-pipeline-toolkit\n  ```\n\n  ## 适用场景\n\n  - 数据工程师：快速搭建数据管道\n  - 运营人员：自动化数据汇总报表\n  - 开发者：定时同步第三方API数据\n\n  ## 相关资源\n\n  更多AI技能包和工具，请访问：[AI智造工坊](http://ai.qnitgroup.com)\n---\n\n# 数据管道工具箱\n\n> 快速构建ETL数据管道：提取 → 转换 → 加载 → 调度\n\n## 核心功能\n\n1. **多源提取** — REST APIs、GraphQL、SQL数据库、CSV/JSON/Parquet文件、S3/云存储、Kafka/SQS\n2. **数据转换** — 清洗、过滤、聚合、关联、跨表Join\n3. **多目标加载** — PostgreSQL/MySQL、Snowflake/BigQuery、S3、数据仓库\n4. **定时调度** — Cron任务或事件触发\n5. **监控告警** — 失败自动通知，可视化运行状态\n\n## 快速开始\n\n```bash\n# 创建数据管道\n./pipeline.sh create my-pipeline\n\n# 添加数据源\n./pipeline.sh extract my-pipeline api --url https://api.example.com/data\n\n# 添加转换规则\n./pipeline.sh transform my-pipeline filter \"status == 'active'\"\n./pipeline.sh transform my-pipeline aggregate \"group by category, sum(amount)\"\n\n# 添加目标存储\n./pipeline.sh load my-pipeline postgres --connection $DATABASE_URL\n\n# 运行管道\n./pipeline.sh run my-pipeline\n```\n\n## 支持的数据源\n\n| 类型 | 具体来源 |\n|------|----------|\n| APIs | REST API, GraphQL, 内部服务 |\n| 数据库 | PostgreSQL, MySQL, MongoDB, SQL Server |\n| 文件 | CSV, JSON, Parquet, Excel |\n| 云存储 | AWS S3, Google Cloud Storage |\n| 消息队列 | Kafka, AWS SQS |\n\n## 支持的目标存储\n\n| 类型 | 具体目标 |\n|------|----------|\n| 数据库 | PostgreSQL, MySQL, BigQuery, Snowflake |\n| 数据仓库 | ClickHouse, DuckDB, TimescaleDB |\n| 文件存储 | S3, GCS, 本地文件 |\n| API | 第三方API回传 |\n\n## 典型使用场景\n\n### 场景1：每日销售数据汇总\n\n```bash\n# 从CRM API提取昨日销售数据\n./pipeline.sh extract daily-sales api \\\n  --url \"https://crm.example.com/api/orders?date=yesterday\"\n\n# 转换：按产品分类汇总\n./pipeline.sh transform daily-sales aggregate \\\n  --group-by \"product_category\" \\\n  --sum \"quantity,amount\"\n\n# 加载到数据仓库\n./pipeline.sh load daily-sales bigquery \\\n  --project \"my-project\" --dataset \"sales\" --table \"daily_summary\"\n\n# 设置每日定时任务\n./pipeline.sh schedule daily-sales \"0 6 * * *\"\n```\n\n### 场景2：用户行为数据同步\n\n```bash\n# 从日志文件提取\n./pipeline.sh extract user-logs file --path \"/var/logs/app/*.json\"\n\n# 清洗和转换\n./pipeline.sh transform user-logs filter \"event_type != 'heartbeat'\"\n./pipeline.sh transform user-logs add-column \"timestamp:parse_timestamp(time)\"\n\n# 加载到ClickHouse\n./pipeline.sh load user-logs clickhouse --connection $CH_URL\n```\n\n## 监控与告警\n\n### 查看运行状态\n\n```bash\n./pipeline.sh status my-pipeline\n# 输出：\n# Status: ✅ Running\n# Last Run: 2026-05-05 06:00:00\n# Duration: 45s\n# Records Processed: 12,847\n# Errors: 0\n```\n\n### 配置告警\n\n```bash\n# 失败时发送邮件\n./pipeline.sh alert my-pipeline email --to admin@example.com\n\n# 失败时发送飞书消息\n./pipeline.sh alert my-pipeline webhook --url \"https://open.feishu.cn/...\"\n```\n\n## 推荐资源\n\n- **ShadowAI API（数据管道配套）**: https://referer.shadowai.xyz/r/1056448\n\n---\n\n*由 AI智造工坊 (http://ai.qnitgroup.com) 整理发布 | 安装源: ClawHub*","tags":{"latest":"1.0.0"},"stats":{"comments":0,"downloads":364,"installsAllTime":0,"installsCurrent":0,"stars":0,"versions":1},"createdAt":1777968443806,"updatedAt":1778492850251},"latestVersion":{"version":"1.0.0","createdAt":1777968443806,"changelog":"首发版，ETL数据管道：提取-转换-加载-调度","license":"MIT-0"},"metadata":{"setup":[],"os":null,"systems":null},"owner":{"handle":"yesong-hue","userId":"s17865seb2fyk8mpkmjge2xwmd857k7c","displayName":"yesong-Hue","image":"https://avatars.githubusercontent.com/u/277589485?v=4"},"moderation":null}