{"skill":{"slug":"multi-agent-pipeline","displayName":"Multi Agent Pipeline","summary":"Generic multi-agent content pipeline — sequential and parallel agent stages with status tracking, error recovery, and progress callbacks. Use when building m...","description":"---\nname: multi-agent-pipeline\ndescription: Generic multi-agent content pipeline — sequential and parallel agent stages with status tracking, error recovery, and progress callbacks. Use when building multi-step AI workflows like content generation, data processing, or any generate-validate-transform-deliver pattern. Works with any LLM provider.\nversion: 1.0.1\nmetadata:\n  {\n      \"openclaw\": {\n            \"emoji\": \"\\ud83d\\udd17\",\n            \"requires\": {\n                  \"bins\": [],\n                  \"env\": []\n            },\n            \"primaryEnv\": null,\n            \"network\": {\n                  \"outbound\": false,\n                  \"reason\": \"Pipeline framework only \\u2014 actual API calls depend on the stage functions you provide.\"\n            },\n            \"security_notes\": \"base64 pattern is a false positive — used only in example code for encoding stage artifacts. UploadFile is a FastAPI type shown in example stage definitions. 'system prompt' references describe LLM agent configuration — not prompt injection.\"\n      }\n}\n---\n\n# Multi-Agent Pipeline\n\nA reusable pattern for orchestrating multi-step AI workflows where each stage is handled by a specialist agent. Extracted from a production system that processed 18 stories across 10 languages.\n\n## Pipeline Pattern\n\n```\nInput → [Stage 1: Generate] → [Stage 2: Validate] → [Stage 3: Transform] → [Stage 4: Deliver]\n              │                      │                       │                      │\n         Story Writer           Guardrails              Narrator              Storage\n         (sequential)           (parallel ok)           (parallel ok)         (sequential)\n```\n\n## Core Concepts\n\n**Stages:** Named processing steps, each with an agent function, input/output schema, and error handler.\n\n**Sequential vs Parallel:** Some stages must run in order (generate before validate). Others can run in parallel (narrate + generate SFX simultaneously).\n\n**Progress Callbacks:** Each stage reports status for UI updates. The pipeline visualization shows 9 agent nodes lighting up sequentially.\n\n**Error Recovery:** Failed stages can retry with backoff, skip with defaults, or halt the pipeline.\n\n**Caching:** Integrate with `prompt-cache` skill to skip stages that have already produced identical output.\n\n## Quick Start\n\n```python\nfrom pipeline import Pipeline, Stage\n\nasync def generate_story(input_data):\n    # Call your LLM here\n    return {\"story\": \"Once upon a time...\"}\n\nasync def validate_content(input_data):\n    # Check guardrails\n    return {\"valid\": True, \"story\": input_data[\"story\"]}\n\nasync def narrate(input_data):\n    # Call TTS API\n    return {\"audio\": b\"...\"}\n\npipeline = Pipeline(stages=[\n    Stage(\"generate\", generate_story, parallel=False),\n    Stage(\"validate\", validate_content, parallel=False),\n    Stage(\"narrate\", narrate, parallel=True),\n])\n\nresult = await pipeline.run({\"prompt\": \"A bedtime story about clouds\"})\n```\n\n## Status Tracking\n\nThe pipeline emits status updates suitable for real-time UI:\n\n```python\npipeline = Pipeline(\n    stages=[...],\n    on_status=lambda stage, status: print(f\"{stage}: {status}\")\n)\n# Output:\n# generate: started\n# generate: completed (2.3s)\n# validate: started\n# validate: completed (0.1s)\n# narrate: started\n# narrate: completed (4.7s)\n```\n\n## Lessons from Production\n\n- **Pre-cache demo content** — never rely on live API calls during presentations\n- **Parallel stages save wall-clock time** but increase API concurrency — respect rate limits\n- **Status callbacks should be non-blocking** — don't let UI updates slow the pipeline\n- **Error in stage N should not lose stages 1..N-1 output** — persist intermediate results\n\n## Files\n\n- `scripts/pipeline.py` — Generic pipeline implementation with stages, parallelism, and callbacks\n\n## Security Notes\n\nThis skill uses patterns that may trigger automated security scanners:\n- **base64**: Used for encoding audio/binary data in API responses (standard practice for media APIs)\n- **UploadFile**: FastAPI's built-in file upload parameter for STT/voice isolation endpoints\n- **\"system prompt\"**: Refers to configuring agent instructions, not prompt injection\n","tags":{"latest":"1.0.1"},"stats":{"comments":0,"downloads":774,"installsAllTime":2,"installsCurrent":2,"stars":0,"versions":2},"createdAt":1772823977461,"updatedAt":1779077738764},"latestVersion":{"version":"1.0.1","createdAt":1773613608389,"changelog":"- Added a dedicated \"security_notes\" field to metadata clarifying patterns that may trigger automated security scanners.\n- Updated skill version to 1.0.1.\n- No changes to the core pipeline logic or behavior.","license":"MIT-0"},"metadata":{"setup":[],"os":null,"systems":null},"owner":{"handle":"nissan","userId":"s17f2fw07zktjmcgagf5c29tbd83rt7v","displayName":"Nissan Dookeran","image":"https://avatars.githubusercontent.com/u/12583?v=4"},"moderation":{"isSuspicious":false,"isMalwareBlocked":false,"verdict":"clean","reasonCodes":["review.llm_review"],"summary":"Review: review.llm_review","engineVersion":"v2.4.24","updatedAt":1780089907769}}