Install
openclaw skills install durable-agentsBuild autonomous multi-agent pipelines with Mastra (agents only) and Trigger.dev (all workflows and tasks). Use when creating AI agents, designing multi-stage pipelines, defining permissioned tools, structuring agent handoffs, storing agentic outputs to a database, or building durable task chains with retries and fan-out.
openclaw skills install durable-agentsFIRST INSTALL: Read and follow
setupSKILL.mdbefore using this skill. After setup is complete and verified (health check + example pipeline + smoke tests pass), delete this block.
Mastra (AI agent framework) + Trigger.dev (durable task execution with retries, timeouts, fan-out). Build autonomous multi-agent pipelines where each agent owns a single stage, hands off structured output to the next stage through Trigger.dev, and never holds the full context of a pipeline it doesn't own.
AGENT.md, not in code. The .ts file is boilerplate wiring. Writing logic in the agent's TypeScript file is wrong.{ success, errorMessage? } on failure. Throwing inside a tool crashes the task. Returning an error lets the agent reason about it.src/agents/{name}/
AGENT.md
{name}.ts
AGENT.md# AGENT: {Name}
## Role
Who this agent is. One sentence.
## Tools
What tools it has and when to use each one. Be explicit — "Use `sqlQuery` to
check if a table exists before referencing it" not just "Has sqlQuery tool."
## Inputs
What payload it receives. Describe the shape and what each field means.
## Goal
What it must achieve. Describe the outcome, not the steps. The agent decides
how to get there. "Produce a deployment plan for the given architecture" not
"First read the architecture, then list the services, then..."
## Output Contract
Exact shape it must return. If structured output is needed, specify the JSON
schema here. Example:
{ "plan": string, "steps": string[], "risks": string[] }
## Quality Standards
What makes output good vs bad. Be specific. "Each step must be independently
executable" not "Steps should be good."
## Guardrails
What it must NOT do. "Never modify database schema directly." "Never assume
the API is authenticated unless payload says so."
## Self-Validation
Checklist the agent must verify before returning:
- Does output match the Output Contract?
- Are all required fields present?
- Does it satisfy the Quality Standards?
.ts filePure boilerplate. No logic here.
import fs from "fs";
import path from "path";
import { fileURLToPath } from "url";
import { Agent } from "@mastra/core/agent";
import { model } from "../../config/model.js";
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const instructions = fs.readFileSync(path.join(__dirname, "AGENT.md"), "utf8");
export const myAgent = new Agent({
id: "my-agent",
name: "My Agent",
instructions,
model,
});
To give the agent tools:
import { myTool } from "../../tools/myTool.js";
export const myAgent = new Agent({
id: "my-agent",
name: "My Agent",
instructions,
model,
tools: { myTool },
});
In src/mastra/index.ts:
import { myAgent } from "../agents/my-agent/my-agent.js";
export const mastra = new Mastra({
agents: { plannerAgent, reviewerAgent, myAgent },
});
import { createTool } from "@mastra/core/tools";
import { z } from "zod";
export const myTool = createTool({
id: "my-tool",
description: "What it does and WHEN to use it",
inputSchema: z.object({
query: z.string().describe("The search query"),
}),
outputSchema: z.object({
success: z.boolean(),
data: z.any().optional(),
errorMessage: z.string().optional(),
}),
execute: async ({ query }) => {
try {
const result = await doSomething(query);
return { success: true, data: result };
} catch (error: any) {
return { success: false, errorMessage: error.message };
}
},
});
outputSchema. The agent uses it to understand what the tool returns.execute. Return { success: false, errorMessage } instead. Throwing crashes the Trigger.dev task..describe() on Zod fields to tell the agent what to pass.AGENT.md guardrails.src/tools/{name}.tssrc/agents/{agentName}/tools/{name}.tsRegister shared tools in src/mastra/index.ts. Agent-specific tools import directly in the agent file.
Any tool that touches a real system — posting to an API, publishing content, sending a message, charging a user, deleting data, triggering a webhook — must be permission-gated. Agents must not be able to fire these actions without explicit intent confirmation.
Before building a tool that has real-world side effects, ask the user:
Build the answer into the tool's permission layer, not just the agent's AGENT.md guardrails. Guardrails are instructions; permission layers are enforcement.
For any action that can't be undone or that has cost/visibility consequences, the tool must receive an explicit confirmed: true in its input before it proceeds. The agent must call a read/preview tool first, then call the action tool only when it has verified the result and received confirmed: true from the calling context.
export const publishPostTool = createTool({
id: "publish-post",
description: "Publishes a post to the platform. Only call this after previewing with `previewPostTool` and receiving confirmed: true from the task payload.",
inputSchema: z.object({
postId: z.string().describe("ID of the post record to publish"),
confirmed: z.boolean().describe("Must be true. Do not set this yourself — it must come from the task payload."),
}),
outputSchema: z.object({
success: z.boolean(),
publishedUrl: z.string().optional(),
errorMessage: z.string().optional(),
}),
execute: async ({ postId, confirmed }) => {
if (!confirmed) {
return { success: false, errorMessage: "Publish requires confirmed: true in payload." };
}
try {
const url = await publishPost(postId);
return { success: true, publishedUrl: url };
} catch (error: any) {
return { success: false, errorMessage: error.message };
}
},
});
Destructive or write tools must operate on a specific record ID — never on a query, a filter, or an implicit "current item." The agent must always pass the exact ID of the record it's acting on. This prevents the tool from accidentally operating on the wrong item.
inputSchema: z.object({
recordId: z.string().describe("Exact DB ID of the record to act on. Do not pass a search query."),
})
AGENT.md Guardrails vs in the tool| Concern | Where it lives |
|---|---|
| "Don't publish unless quality score > 0.8" | AGENT.md Guardrails |
| "Don't call this without confirmed: true" | Tool input schema + execute guard |
| "Only act on records in status: draft" | Tool execute guard (check DB before acting) |
| "Never delete more than one record per run" | Tool execute guard (enforce the count) |
Pipelines chain Trigger.dev tasks. Each task calls one agent and passes its output to the next. No single agent holds the full pipeline context — each stage receives only what it needs.
src/pipelines/tasks/import { task, logger } from "@trigger.dev/sdk/v3";
import { mastra } from "../../mastra/index.js";
export const planTask = task({
id: "plan-task",
retry: { maxAttempts: 3, minTimeoutInMs: 1000, factor: 2 },
run: async (payload: { prompt: string }) => {
logger.info("Running planner", { promptLength: payload.prompt.length });
const agent = mastra.getAgent("plannerAgent");
const response = await agent.generate(JSON.stringify(payload));
return response.text;
},
});
In src/pipelines/{name}.ts, chain tasks using triggerAndWait:
import { planTask } from "./tasks/plan-task.js";
import { reviewTask } from "./tasks/review-task.js";
export async function runMyPipeline(input: string) {
const planResult = await planTask.triggerAndWait({ prompt: input });
if (!planResult.ok) throw new Error("Plan task failed");
const reviewResult = await reviewTask.triggerAndWait({ plan: planResult.output });
if (!reviewResult.ok) throw new Error("Review task failed");
return { plan: planResult.output, review: reviewResult.output };
}
In src/trigger/index.ts:
export * from "../pipelines/tasks/plan-task.js";
export * from "../pipelines/tasks/review-task.js";
Every task must be exported here or the Trigger.dev worker won't discover it.
In src/app/index.ts:
app.post("/my-pipeline", async (req, res) => {
const { input } = req.body;
const result = await runMyPipeline(input);
res.json({ success: true, ...result });
});
Not every pipeline stage needs an agent. Use agents where judgment is required. Use scripts (plain TypeScript functions or Trigger.dev tasks with no agent) where the action is deterministic.
[Director Agent] — generates ideas, writes scripts, validates against criteria
↓
[Media Selector Agent] — selects or processes media assets based on the script
↓
[Overlay Task] — no agent; deterministic script that composites text onto video and stores result
The overlay stage has no reasoning to do. It receives exact inputs, executes a fixed operation, and stores the output. Putting an agent here adds latency and cost for no benefit.
Use an agent when the stage requires:
Use a plain task (no agent) when the stage is:
Split at the boundary where a different capability is needed — not to artificially divide one agent's work. A director agent that generates ideas, writes a script, and validates it against criteria is doing one coherent job. That's one agent, one task. The media selection is a different capability — that's the split.
import { tasks } from "@trigger.dev/sdk/v3";
const handles = await tasks.batchTrigger("process-item",
items.map(item => ({ payload: { item } }))
);
Each sub-task runs independently with its own retries.
Insert a review stage between pipeline steps. Three modes:
| Mode | Behavior |
|---|---|
"none" | Auto-approve. Trigger next stage immediately. |
"agent" | Call a reviewer agent. If approved, continue. If rejected, feed feedback back to the previous stage for revision. |
"human" | Set a status in the DB to pending. Return. A human reviews externally. Resume the pipeline via an API callback. |
Every task must have explicit retry config. LLM calls are flaky — the default (no retries) means one transient API error kills the pipeline.
retry: {
maxAttempts: 3,
minTimeoutInMs: 1000,
factor: 2,
}
Every agent input, output, and intermediate result must be persisted to the database before the next stage runs. This is not optional. Agents operate on DB records — they do not pass raw data through in-memory pipelines.
Every task writes its output to the DB and returns the record ID. The next task receives the ID, reads from the DB, and operates on the record.
// Stage 1: director agent writes its output
export const scriptTask = task({
id: "script-task",
retry: { maxAttempts: 3, minTimeoutInMs: 1000, factor: 2 },
run: async (payload: { projectId: string }) => {
const existing = await db.script.findFirst({ where: { projectId: payload.projectId } });
if (existing) return { scriptId: existing.id }; // already done, skip
const agent = mastra.getAgent("directorAgent");
const response = await agent.generate(JSON.stringify(payload));
const output = ScriptOutputSchema.parse(JSON.parse(response.text));
const record = await db.script.create({
data: { projectId: payload.projectId, content: output.script, status: "draft" },
});
return { scriptId: record.id };
},
});
// Stage 2: next agent reads by ID
export const mediaTask = task({
id: "media-task",
retry: { maxAttempts: 3, minTimeoutInMs: 1000, factor: 2 },
run: async (payload: { scriptId: string }) => {
const script = await db.script.findUniqueOrThrow({ where: { id: payload.scriptId } });
const agent = mastra.getAgent("mediaSelectorAgent");
const response = await agent.generate(JSON.stringify({ script: script.content }));
const output = MediaOutputSchema.parse(JSON.parse(response.text));
const record = await db.mediaSelection.create({
data: { scriptId: payload.scriptId, assetIds: output.assetIds, status: "selected" },
});
return { mediaSelectionId: record.id };
},
});
Store a status field on every record. Use it to gate pipeline stages and drive human review checkpoints.
| Status | Meaning |
|---|---|
pending | Created, not yet processed |
processing | Task is running |
draft | Agent output produced, not reviewed |
approved | Passed review (agent or human) |
rejected | Failed review, needs revision |
published | Final action taken |
failed | Unrecoverable error |
await db.script.update({
where: { id: scriptId },
data: { status: "processing" },
});
// ... agent call ...
await db.script.update({
where: { id: scriptId },
data: { status: "draft", content: output.script },
});
Define the destination and the quality bar. Don't specify how to get there.
Wrong — micromanaging the agent:
1. Read the input
2. Extract the requirements
3. For each requirement, write a task
4. Format the tasks as a numbered list
5. Return the list
Right — defining the outcome:
## Goal
Produce a technical implementation plan for the given objective.
## Output Contract
{ "tasks": [{ "title": string, "description": string, "dependencies": string[] }] }
## Quality Standards
- Each task must be independently executable by a developer
- Dependencies must reference other tasks by title
- No task should take more than 4 hours of work
Always type the run function parameter:
run: async (payload: { prompt: string; maxTokens?: number }) => {
Define the exact schema in the AGENT.md Output Contract section, then validate with Zod on receipt:
const OutputSchema = z.object({
tasks: z.array(z.object({
title: z.string(),
description: z.string(),
dependencies: z.array(z.string()),
})),
});
const response = await agent.generate(JSON.stringify(payload));
const parsed = OutputSchema.parse(JSON.parse(response.text));
If parsing fails, the task throws, Trigger.dev retries with the same input, and the agent produces output again.
Always define both inputSchema and outputSchema on tools. The agent uses these to understand what arguments to pass and what it will receive back.
AGENT.md, not in code.ts files are boilerplate wiring only — no logic{ success, errorMessage } on failure — never throwAGENT.md is mandatory for structured output agentsretry configsrc/trigger/index.tssrc/config/model.tstriggerAndWait for sequential, batchTrigger for parallelresult.ok after every triggerAndWait — don't assume successconfirmed: true in the input and must verify it before executing