Security warning — review recommended

ClawHub Security found sensitive or high-impact capabilities. Review the scan results before using.

Multi Agent Pipeline

Generic multi-agent content pipeline — sequential and parallel agent stages with status tracking, error recovery, and progress callbacks. Use when building multi-step AI workflows like content generation, data processing, or any generate-validate-transform-deliver pattern. Works with any LLM provider.

Audits

Warn

Install

openclaw skills install multi-agent-pipeline

Multi-Agent Pipeline

A reusable pattern for orchestrating multi-step AI workflows where each stage is handled by a specialist agent. Extracted from a production system that processed 18 stories across 10 languages.

Pipeline Pattern

Input → [Stage 1: Generate] → [Stage 2: Validate] → [Stage 3: Transform] → [Stage 4: Deliver]
              │                      │                       │                      │
         Story Writer           Guardrails              Narrator              Storage
         (sequential)           (parallel ok)           (parallel ok)         (sequential)

Core Concepts

Stages: Named processing steps, each with an agent function, input/output schema, and error handler.

Sequential vs Parallel: Some stages must run in order (generate before validate). Others can run in parallel (narrate + generate SFX simultaneously).

Progress Callbacks: Each stage reports status for UI updates. The pipeline visualization shows 9 agent nodes lighting up sequentially.

Error Recovery: Failed stages can retry with backoff, skip with defaults, or halt the pipeline.

Caching: Integrate with prompt-cache skill to skip stages that have already produced identical output.

Quick Start

from pipeline import Pipeline, Stage

async def generate_story(input_data):
    # Call your LLM here
    return {"story": "Once upon a time..."}

async def validate_content(input_data):
    # Check guardrails
    return {"valid": True, "story": input_data["story"]}

async def narrate(input_data):
    # Call TTS API
    return {"audio": b"..."}

pipeline = Pipeline(stages=[
    Stage("generate", generate_story, parallel=False),
    Stage("validate", validate_content, parallel=False),
    Stage("narrate", narrate, parallel=True),
])

result = await pipeline.run({"prompt": "A bedtime story about clouds"})

Status Tracking

The pipeline emits status updates suitable for real-time UI:

pipeline = Pipeline(
    stages=[...],
    on_status=lambda stage, status: print(f"{stage}: {status}")
)
# Output:
# generate: started
# generate: completed (2.3s)
# validate: started
# validate: completed (0.1s)
# narrate: started
# narrate: completed (4.7s)

Lessons from Production

  • Pre-cache demo content — never rely on live API calls during presentations
  • Parallel stages save wall-clock time but increase API concurrency — respect rate limits
  • Status callbacks should be non-blocking — don't let UI updates slow the pipeline
  • Error in stage N should not lose stages 1..N-1 output — persist intermediate results

Files

  • scripts/pipeline.py — Generic pipeline implementation with stages, parallelism, and callbacks

Security Notes

This skill uses patterns that may trigger automated security scanners:

  • base64: Used for encoding audio/binary data in API responses (standard practice for media APIs)
  • UploadFile: FastAPI's built-in file upload parameter for STT/voice isolation endpoints
  • "system prompt": Refers to configuring agent instructions, not prompt injection