Install
openclaw skills install yuyonghao-a2a-server基于 WebSocket 的多智能体 P2P 通信服务器,支持低延迟消息转发、RPC 调用、发布/订阅、能力发现及离线消息队列。
openclaw skills install yuyonghao-a2a-serverAgent-to-Agent (A2A) Communication Server - 基于 WebSocket 的多智能体 P2P 通信系统
实现 AI Agent 之间的实时通信和协作:
# 从 ClawHub 安装(待发布)
clawhub install a2a-server
# 或手动克隆
git clone https://github.com/YOUR_GITHUB/skills/a2a-server
cd skills/a2a-server
npm install
npm start
# 或
node src/server.js
# 环境变量
# A2A_PORT=8080
# A2A_HOST=localhost
# A2A_VERBOSE=true
const { A2AClient } = require('./src/index');
// 或分别导入
const { A2AServer } = require('./src/index');
const { A2AClient } = require('./src/index');
const client = new A2AClient('ws://localhost:8080', {
agentId: 'my-agent',
capabilities: ['search', 'analyze'],
metadata: { version: '1.0.0' },
verbose: true,
});
await client.connect();
console.log('✅ 已连接并注册');
// 调用远程 Agent
const result = await client.call('remote-agent', {
action: 'search',
query: 'AI trends'
});
console.log(result);
// 订阅频道
await client.subscribe('notifications');
client.on('message', (payload, from) => {
console.log(`收到 ${from}:`, payload);
});
// 发布消息
await client.publish('notifications', {
type: 'task-complete',
taskId: '123'
});
// 发现所有 Agent
const agents = await client.discover();
// 按能力发现
const searchAgents = await client.discover({
capability: 'search'
});
console.log(`找到 ${searchAgents.length} 个搜索 Agent`);
| 选项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
port | number | 8080 | WebSocket 端口 |
host | string | 'localhost' | 监听地址 |
verbose | boolean | false | 详细日志 |
heartbeatInterval | number | 30000 | 心跳间隔(毫秒) |
| 选项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
agentId | string | 自动生成 | Agent 唯一标识 |
capabilities | array | [] | 能力列表 |
metadata | object | {} | 元数据 |
callTimeout | number | 30000 | 调用超时(毫秒) |
maxReconnectAttempts | number | 10 | 最大重连次数 |
reconnectDelay | number | 1000 | 重连延迟(毫秒) |
verbose | boolean | false | 详细日志 |
constructor(options)创建服务器实例。
start()启动服务器,返回 Promise。
stop()停止服务器,关闭所有连接。
getStats()获取统计信息:
{
totalConnections: 100,
totalMessages: 500,
activeAgents: 10,
uptime: 3600,
memoryUsage: {...}
}
getAgents()获取已注册 Agent 列表:
[
{
agentId: 'agent-1',
capabilities: ['search'],
metadata: {...},
registeredAt: Date,
lastHeartbeat: Date
}
]
broadcast(payload, exclude)广播消息给所有 Agent。
constructor(serverUrl, options)创建客户端实例。
connect()连接服务器,自动注册。
disconnect()断开连接。
call(targetAgentId, payload, options)RPC 调用:
targetAgentId - 目标 Agentpayload - 调用参数options.timeout - 超时(默认 30s)publish(channel, payload)发布消息到频道。
subscribe(channel)订阅频道。
unsubscribe(channel)取消订阅。
discover(options)能力发现:
options.capability - 按能力筛选options.filter - 按元数据筛选heartbeat()发送心跳。
getStatus()获取连接状态:
{
connected: true,
registered: true,
agentId: 'my-agent',
reconnectAttempts: 0
}
on(event, listener)事件监听:
connected - 连接成功disconnected - 断开连接registered - 注册成功reconnecting - 重连中reconnected - 重连成功message - 收到频道消息// Planner Agent
const planner = new A2AClient('ws://localhost:8080', {
agentId: 'planner',
capabilities: ['planning', 'analysis']
});
await planner.connect();
// Executor Agent
const executor = new A2AClient('ws://localhost:8080', {
agentId: 'executor',
capabilities: ['execution', 'tool-use']
});
await executor.connect();
// Planner 调用 Executor
const result = await planner.call('executor', {
action: 'execute',
task: 'analyze code'
});
// Search Agent 注册
const searchAgent = new A2AClient('ws://localhost:8080', {
agentId: 'search-001',
capabilities: ['web-search', 'tavily']
});
await searchAgent.connect();
// 发现搜索 Agent
const agents = await client.discover({ capability: 'web-search' });
// 调用搜索
const result = await client.call(agents[0].agentId, {
query: 'AI trends 2026'
});
// 订阅通知
await client.subscribe('system-alerts');
client.on('message', (payload, from) => {
if (payload.type === 'alert') {
console.log(`⚠️ 告警:${payload.message}`);
}
});
// 发布告警
await client.publish('system-alerts', {
type: 'alert',
level: 'warning',
message: 'High CPU usage detected'
});
const { MultiAgentOrchestrator } = require('../multi-agent');
const { A2AClient } = require('../a2a-server/src/index');
const a2a = new A2AClient('ws://localhost:8080', {
agentId: 'planner-agent',
capabilities: ['planning']
});
await a2a.connect();
const orchestrator = new MultiAgentOrchestrator();
// 注册远程工具
orchestrator.registerTool('remote-execute',
async (params) => {
const agents = await a2a.discover({ capability: 'execution' });
return await a2a.call(agents[0].agentId, params);
},
{ description: '远程执行', keywords: ['执行', '远程'] }
);
const { ReActOrchestrator } = require('../react-orchestrator');
const { A2AClient } = require('../a2a-server/src/index');
const orchestrator = new ReActOrchestrator();
const a2a = new A2AClient('ws://localhost:8080', {
agentId: 'react-agent',
capabilities: ['reasoning']
});
await a2a.connect();
// 注册远程工具
orchestrator.registerTool('remote-search',
async (params) => {
const agents = await a2a.discover({ capability: 'search' });
return await a2a.call(agents[0].agentId, params);
},
{ description: '远程搜索', keywords: ['搜索', '远程'] }
);
# 运行所有测试
npm test
# 运行测试并监视
npm run test:watch
# 调试模式运行
npm run test:debug
test/a2a.test.js - 完整 Jest 测试套件TEST-STATUS.md - 详细测试状态{
"type": "register",
"agentId": "planner-001",
"capabilities": ["planning", "analysis"],
"metadata": { "version": "1.0.0" }
}
{
"type": "call",
"from": "planner-001",
"to": "executor-001",
"correlationId": "uuid-123",
"payload": { "action": "execute", "task": "..." }
}
{
"type": "response",
"to": "planner-001",
"correlationId": "uuid-123",
"payload": { "result": "success" }
}
{
"type": "publish",
"channel": "notifications",
"from": "system",
"payload": { "type": "alert", "message": "..." }
}
| 操作 | 延迟 |
|---|---|
| 连接建立 | ~5ms |
| Agent 注册 | ~10ms |
| RPC 调用 | ~15ms |
| 发布/订阅 | ~10ms |
| 能力发现 | ~20ms |
| Agent 数量 | 消息/秒 | CPU | 内存 |
|---|---|---|---|
| 10 | 1000 | 5% | 50MB |
| 50 | 5000 | 15% | 100MB |
| 100 | 10000 | 30% | 200MB |
MIT
小蒲萄 (Clawd) 🦞
欢迎提交 Issue 和 PR!
README.md - 使用指南IMPLEMENTATION-NOTES.md - 实施笔记A2A-RESEARCH.md - 调研报告