Install
openclaw skills install @guyxlouspg/task-interrupt管理和中断长时间运行任务,支持用户命令终止、资源清理和任务状态保存,防止任务卡死或无限执行。
openclaw skills install @guyxlouspg/task-interrupt任务打断机制是一个用于控制和管理OpenClaw Agent任务执行的技能。它解决了Agent执行任务时卡住、运行时间过长或用户想中途终止的问题。
┌─────────────────────────────────────────────────────────────────────────┐
│ 飞书群聊 │
│ 用户发送: /stop 或 "中断" │
└────────────────────────────────┬────────────────────────────────────────┘
│ Webhook/消息事件
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ 主 Agent (maojingli) │
│ ┌──────────────┐ ┌──────────────────┐ ┌───────────────────────┐ │
│ │ 消息监听器 │───▶│ 中断指令识别器 │───▶│ Subagent 管理器 │ │
│ │ │ │ /stop, 中断 │ │ sessions_* │ │
│ │ │ │ cancel, abort │ │ │ │
│ └──────────────┘ └──────────────────┘ └───────────┬───────────┘ │
│ │ │
│ ┌─────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────┐ │
│ │ 中断信号发射器 │ │
│ │ 1. 写入中断标志文件 │ │
│ │ 2. process.kill() │ │
│ └───────────────────────┘ │
└────────────────────────────────┬────────────────────────────────────────┘
│ 中断信号 (信号量/标志文件)
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Subagent (maoxiami 等) │
│ ┌──────────────┐ ┌──────────────────┐ ┌───────────────────────┐ │
│ │ 心跳/轮询器 │───▶│ 中断检测器 │───▶│ 任务执行器 (可中断) │ │
│ │ (每5秒检查) │ │ 检查标志文件 │ │ exec/tool calls │ │
│ └──────────────┘ └──────────────────┘ └───────────┬───────────┘ │
│ ▲ │ │
│ │ ▼ │
│ │ ┌───────────────────────┐ │
│ │ │ 安全停止处理器 │ │
│ │ │ 1. 保存检查点 │ │
│ │ │ 2. 清理临时文件 │ │
│ │ │ 3. 释放资源 │ │
│ └────────────────────────────────────┴───────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
识别来自用户的打断指令:
/stop、/中断、/cancel、/abort停止、中断等关键词将中断信号传递给运行中的subagent:
/tmp/agent-stop-{sessionId}.flagsessions_kill 作为最终手段轮询检查中断标志:
优雅停止并清理资源:
在主Agent的消息处理流程中添加中断指令识别和发射逻辑。参考设计文档中的代码示例。
Subagent需要在启动时调用 startInterruptPolling(),并在任务关键点保存检查点 saveCheckpoint()。
const interruptible = new InterruptibleAgent(sessionId);
// 启动中断轮询
await interruptible.startInterruptPolling();
try {
// 保存检查点
await interruptible.saveCheckpoint('task_start', { step: 1 });
// 执行任务...
await doLongRunningTask();
// 保存检查点
await interruptible.saveCheckpoint('task_complete', { success: true });
} catch (error) {
if (error.message === 'INTERRUPTED') {
// 处理被中断的情况
console.log('任务被用户中断');
}
} finally {
// 停止轮询
await interruptible.stopInterruptPolling();
}
技能提供了三个Shell脚本用于手动测试和管理:
./scripts/create-stop-flag.sh <sessionId> [reason]
示例:
./scripts/create-stop-flag.sh abc123 "User requested stop"
./scripts/check-stop-flag.sh <sessionId> [maxAgeSeconds]
返回码:
示例:
if ./scripts/check-stop-flag.sh abc123 60; then
echo "检测到中断请求"
fi
./scripts/clear-stop-flag.sh <sessionId>
示例:
./scripts/clear-stop-flag.sh abc123
| 属性 | 类型 | 描述 |
|---|---|---|
| sessionId | string | 当前会话ID |
| interruptCheckInterval | number | 轮询间隔(毫秒,默认5000) |
| isInterrupted | boolean | 是否已中断 |
| checkpoints | Array | 已保存的检查点列表 |
启动中断检测轮询。
await agent.startInterruptPolling();
停止中断检测轮询。
await agent.stopInterruptPolling();
手动检查中断标志(通常自动调用)。
const flag = await agent.checkForInterrupt();
// 返回 null 或 { sessionId, reason, timestamp, signal }
保存任务检查点。
await agent.saveCheckpoint('step1', { processed: 100, total: 200 });
获取最后一个检查点。
const checkpoint = agent.getLastCheckpoint();
处理中断(内部调用)。
await agent.handleInterrupt(flag);
提供安全停止和资源清理功能。
执行完整的安全停止流程。
const handler = new SafeStopHandler(agent);
const result = await handler.stop('user_stop');
// 返回 { success: boolean, checkpoint?: object, reason?: string, error?: string }
保存Agent状态到 /tmp/agent-state/{sessionId}.json。
const statePath = await handler.saveState();
清理临时文件模式:
/tmp/agent-work/{sessionId}/*/tmp/agent-downloads/{sessionId}/*await handler.cleanupTempFiles();
关闭数据库连接和文件句柄。
await handler.closeConnections();
取消所有定时任务。
await handler.cancelScheduledTasks();
task-interrupt/
├── SKILL.md # 本文档
├── instructions.md # Agent指令说明
├── claw.json # 技能元数据
└── scripts/
├── create-stop-flag.sh # 创建停止标志
├── check-stop-flag.sh # 检查停止标志
└── clear-stop-flag.sh # 清除停止标志
中断标志文件:/tmp/agent-stop-{sessionId}.flag
状态保存文件:/tmp/agent-state/{sessionId}.json
标志文件格式:
{
"sessionId": "abc123",
"timestamp": 1742345678901,
"reason": "user_request",
"signal": "SIGINT"
}
可通过环境变量调整行为:
| 变量 | 默认值 | 说明 |
|---|---|---|
| INTERRUPT_CHECK_INTERVAL | 5000 | 轮询间隔(毫秒) |
| INTERRUPT_FLAG_DIR | /tmp/agent-stop | 标志文件目录 |
| INTERRUPT_FLAG_MAX_AGE | 60 | 标志最大有效期(秒) |
| AGENT_STATE_DIR | /tmp/agent-state | 状态保存目录 |
startInterruptPolling()/tmp/agent-stop-{sessionId}.flagsafeStopHandler.stop() 是否完整执行clear-stop-flag.sh 是否执行成功// interruptible-agent.js
const path = require('path');
const { exec } = require('child_process');
class InterruptibleAgent {
constructor(sessionId, options = {}) {
this.sessionId = sessionId;
this.interruptCheckInterval = options.interval || 5000;
this.isInterrupted = false;
this.checkpoints = [];
this.pollingTimer = null;
}
startInterruptPolling() {
this.pollingTimer = setInterval(async () => {
await this.checkForInterrupt();
}, this.interruptCheckInterval);
}
stopInterruptPolling() {
if (this.pollingTimer) {
clearInterval(this.pollingTimer);
this.pollingTimer = null;
}
}
async checkForInterrupt() {
try {
const flagPath = `/tmp/agent-stop-${this.sessionId}.flag`;
if (await fs.pathExists(flagPath)) {
const content = await fs.readFile(flagPath, 'utf8');
const flag = JSON.parse(content);
console.log(`[中断] 检测到中断信号: ${flag.reason}`);
this.isInterrupted = true;
await this.handleInterrupt(flag);
}
} catch (error) {
console.error(`[中断] 检查失败: ${error.message}`);
}
}
saveCheckpoint(name, data) {
const checkpoint = {
name,
data,
timestamp: Date.now()
};
this.checkpoints.push(checkpoint);
console.log(`[检查点] 已保存: ${name}`);
}
getLastCheckpoint() {
return this.checkpoints[this.checkpoints.length - 1] || null;
}
async handleInterrupt(flag) {
console.log('[中断] 开始安全停止流程...');
this.stopInterruptPolling();
await this.saveState();
await this.cleanup();
await this.clearFlag(flag);
// 通知主Agent
await this.notifyMainAgent({
interrupted: true,
checkpoint: this.getLastCheckpoint(),
reason: flag.reason
});
}
async clearFlag(flag) {
const flagPath = `/tmp/agent-stop-${flag.sessionId}.flag`;
if (await fs.pathExists(flagPath)) {
await fs.remove(flagPath);
}
}
async saveState() {
const state = {
sessionId: this.sessionId,
checkpoints: this.checkpoints,
timestamp: Date.now()
};
const stateDir = '/tmp/agent-state';
await fs.ensureDir(stateDir);
await fs.writeJson(path.join(stateDir, `${this.sessionId}.json`), state);
console.log(`[状态] 已保存`);
}
async cleanup() {
// 清理临时文件
const tempDir = `/tmp/agent-work/${this.sessionId}`;
if (await fs.pathExists(tempDir)) {
await fs.remove(tempDir);
}
// 关闭连接
if (this.db) await this.db.end();
if (this.fileHandles) {
for (const fh of this.fileHandles) {
await fh.close();
}
}
// 取消定时任务
if (this.schedules) {
for (const s of this.schedules) {
clearTimeout(s.timer);
clearInterval(s.interval);
}
}
console.log('[清理] 完成');
}
async notifyMainAgent(result) {
// 实现通知逻辑
console.log('[通知]', result);
}
}
module.exports = InterruptibleAgent;
提供测试脚本验证功能:
# 测试1: 创建标志
./scripts/create-stop-flag.sh test-session "test reason"
# 测试2: 检查标志
./scripts/check-stop-flag.sh test-session 60
# 测试3: 清除标志
./scripts/clear-stop-flag.sh test-session
欢迎提交 Issue 和 Pull Request!
MIT