Data Pipeline

v1.1.0

Lightweight ETL pipeline orchestrator with lifecycle hooks, bundle execution, stage retry, timeout control, and built-in transformers. 26 tests, 100% pass rate.

0· 77·2 current·2 all-time

Install

OpenClaw Prompt Flow

Install with OpenClaw

Best for remote or guided setup. Copy the exact prompt, then paste it into OpenClaw for pagoda111king/data-pipeline.

Previewing Install & Setup.
Prompt PreviewInstall & Setup
Install the skill "Data Pipeline" (pagoda111king/data-pipeline) from ClawHub.
Skill page: https://clawhub.ai/pagoda111king/data-pipeline
Keep the work scoped to this skill only.
After install, inspect the skill metadata and help me finish setup.
Use only the metadata you can verify from ClawHub; do not invent missing requirements.
Ask before making any broader environment changes.

Command Line

CLI Commands

Use the direct CLI path if you want to install manually and keep every step visible.

OpenClaw CLI

Bare skill slug

openclaw skills install data-pipeline

ClawHub CLI

Package manager switcher

npx clawhub@latest install data-pipeline
Security Scan
VirusTotalVirusTotal
Benign
View report →
OpenClawOpenClaw
Benign
medium confidence
Purpose & Capability
The name/description (ETL/pipeline engine) match the included src/pipeline.js, examples, and tests. APIs declared in SKILL.md (Pipeline, Transformers, Validators, PipelineFactory) correspond to the implementation. There are no extra credentials, binaries, or config paths requested that would be unrelated to a data pipeline.
Instruction Scope
SKILL.md contains usage examples and runtime instructions that only reference the local library (require('data-pipeline/src/pipeline')) and typical pipeline operations. It does not instruct the agent to read system files, env vars, or post data to external endpoints. Note: the pipeline API supports user-provided callbacks (onStageComplete) and custom stage functions; those user-supplied functions can perform I/O or network requests if the caller provides them, so data exfiltration is possible only via user code, not the library itself.
Install Mechanism
No install spec; this is instruction-and-source-only. The repository includes source files and a package-lock.json that references npm registry packages (devDependencies). There are no download URLs, extract steps, or non-standard install behaviors in the bundle.
Credentials
The skill declares no required environment variables or credentials. The runtime code does not reference environment variables or sensitive config paths in the provided snippets.
Persistence & Privilege
always is false and model invocation is allowed (the platform default). The skill does not request permanent/privileged presence nor modify other skills or agent-wide configs based on the provided files.
Assessment
This package appears internally coherent for a JavaScript data-pipeline library and doesn't ask for secrets or external installs. However: (1) the package has no homepage or source provenance—treat it as unvetted third-party code; (2) review src/pipeline.js and any included tests yourself before running in production; (3) when using the library, avoid passing untrusted stage functions or callbacks (onStageComplete) that could make network requests or access local files; (4) run it in a sandbox or ephemeral environment if you must execute it before doing a deeper audit; and (5) if you need stronger assurance, ask the author for a repository link or provenance information.

Like a lobster shell, security has layers — review code before you run it.

latestvk9796744as9qg2st33jtsy1x9d85mn52
77downloads
0stars
2versions
Updated 1d ago
v1.1.0
MIT-0

data-pipeline · 数据处理管线引擎

可组合的数据转换、验证和分析管线。像搭积木一样处理数据。


何时使用

当用户提到:数据清洗、数据转换、ETL、数据验证、数据分组、数据聚合、管道处理、批量数据处理、数组处理

快速开始

const { Pipeline, Transformers, Validators, PipelineFactory } = require('data-pipeline/src/pipeline');

// 创建一个清洗管线
const pipeline = new Pipeline();
pipeline
  .addStage('filter', Transformers.filter(x => x.age >= 18))
  .addStage('pick', Transformers.pick(['name', 'email']))
  .addStage('sort', Transformers.sort('name', 'asc'));

const result = await pipeline.run(users);

核心 API

Pipeline

const pipeline = new Pipeline({ strict: true, context: { key: 'value' } });

// 添加阶段
pipeline.addStage(name, asyncFn, { retryCount: 0, retryDelay: 100, timeout: 30000 });
pipeline.addStages([{ name, fn, options }]);

// 阶段管理
pipeline.insertBefore(target, name, fn, options);
pipeline.insertAfter(target, name, fn, options);
pipeline.removeStage(name);
pipeline.toggleStage(name, enabled);

// 执行
const result = await pipeline.execute(data);  // 返回 { data, metadata }
const data = await pipeline.run(data);         // 只返回数据

// 指标
const metrics = pipeline.getMetrics();
pipeline.resetMetrics();

内置转换器

转换器说明示例
filter(fn)过滤Transformers.filter(x => x.active)
map(fn)映射Transformers.map(x => x.name)
reduce(fn, init)归约Transformers.reduce((a,b) => a+b, 0)
groupBy(key)分组Transformers.groupBy('dept')
sort(key, order)排序Transformers.sort('age', 'desc')
dedup(key)去重Transformers.dedup('id')
flatten(depth)扁平化Transformers.flatten(2)
paginate(page, size)分页Transformers.paginate(1, 10)
limit(n)限制Transformers.limit(5)
pick(fields)选择字段Transformers.pick(['name', 'age'])
rename(map)重命名Transformers.rename({old: 'new'})
merge(key, ...sources)合并Transformers.merge('id', extras)

验证器

const schema = {
  name: { required: true, type: 'string', minLength: 1 },
  age: { type: 'number', min: 0, max: 150 },
  email: { pattern: /^[^\s@]+@[^\s@]+\.[^\s@]+$/ },
  role: { enum: ['admin', 'user'] },
  password: { validate: (v) => v.length >= 8 ? true : 'Too short' }
};

const validator = Validators.schema(schema);
const result = validator(data);
// { valid: boolean, errors: [...], totalItems, validItems }

工厂函数

// ETL 管线
const etl = PipelineFactory.createETL(extract, transforms, load);

// 数据清洗管线
const cleaner = PipelineFactory.createCleaner(schema, { defaultField: 'value' });

// 数据分析管线
const analyzer = PipelineFactory.createAnalyzer('groupKey', {
  avgVal: vals => vals.reduce((a,b) => a+b, 0) / vals.length,
  maxVal: vals => Math.max(...vals)
});

使用场景

  1. 数据清洗:验证 → 去重 → 填充默认值 → 修剪字符串
  2. ETL 流程:提取 → 转换(map/filter/reduce)→ 加载
  3. 数据分析:分组 → 聚合 → 排序 → 分页
  4. 数据验证:批量验证对象数组,返回详细错误报告
  5. API 数据处理:合并多个数据源 → 重命名字段 → 选择输出字段

错误处理

try {
  const result = await pipeline.execute(data);
} catch (err) {
  if (err instanceof PipelineError) {
    console.log('Failed at:', err.failedStage);
    console.log('Partial data:', err.lastData);
    console.log('Stage results:', err.stageResults);
  }
}

性能指标

const metrics = pipeline.getMetrics();
// {
//   pipeline: { totalRuns, totalErrors, avgTime },
//   stages: [{ name, calls, errors, avgTime }, ...]
// }

Comments

Loading comments...