Install
openclaw skills install websocket-reconnectWebSocket connection management with exponential backoff + jitter retry, heartbeat detection, and circuit breaker pattern. Use when you need reliable WebSock...
openclaw skills install websocket-reconnect可靠的 WebSocket 连接管理,包含指数退避 + 抖动重连算法、心跳检测和断路器模式。
npm install ws
const { WebSocketReconnect } = require('./scripts/websocket-reconnect.js');
const ws = new WebSocketReconnect({
url: 'wss://api.example.com/socket',
maxRetries: 10,
baseDelay: 1000,
maxDelay: 30000
});
// 监听事件
ws.on('open', () => {
console.log('✅ Connected');
});
ws.on('message', (event) => {
console.log('📨 Message:', event.data);
});
ws.on('close', (event) => {
console.log('❌ Closed:', event.code, event.reason);
});
ws.on('error', (error) => {
console.error('⚠️ Error:', error.message);
});
ws.on('retry', (info) => {
console.log(`🔄 Retry ${info.attempt}/${info.maxRetries} in ${info.delay}ms`);
});
// 连接
ws.connect();
// 发送消息
ws.send(JSON.stringify({ type: 'subscribe', channel: 'updates' }));
// 关闭连接
// ws.close();
const ws = new WebSocketReconnect({
// 连接配置
url: 'wss://api.example.com/socket',
protocols: ['graphql-ws'],
websocketOptions: {
headers: {
'Authorization': 'Bearer token123'
}
},
// 重连配置
maxRetries: 10, // 最大重试次数
baseDelay: 1000, // 基础延迟 (ms)
maxDelay: 30000, // 最大延迟 (ms)
multiplier: 2, // 退避倍数
jitter: 0.1, // 抖动系数 (0-1)
// 心跳配置
heartbeatInterval: 30000, // 心跳间隔 (ms)
heartbeatTimeout: 5000, // 心跳超时 (ms)
heartbeatMessage: JSON.stringify({ type: 'ping' }),
// 断路器配置
circuitBreaker: {
failureThreshold: 5, // 失败阈值
resetTimeout: 60000, // 重置超时 (ms)
halfOpenMaxRequests: 3 // 半开状态最大请求数
}
});
重连延迟按指数增长,避免频繁重试:
延迟 = baseDelay × (multiplier ^ 尝试次数)
示例 (baseDelay=1000ms, multiplier=2):
添加随机抖动防止多个客户端同时重试:
// jitter = 0.1 时,延迟在 ±10% 范围内随机
实际延迟 = 计算延迟 × (0.9 ~ 1.1)
heartbeatInterval 发送 ping 消息heartbeatTimeout 内回复 pong服务器需要响应 ping 消息:
// 服务器示例 (Node.js + ws)
wss.on('connection', (ws) => {
ws.on('message', (data) => {
const message = JSON.parse(data);
if (message.type === 'ping') {
ws.send(JSON.stringify({ type: 'pong' }));
}
// ... 处理其他消息
});
});
如果服务器使用不同的心跳协议:
const ws = new WebSocketReconnect({
url: 'wss://api.example.com',
heartbeatInterval: 30000,
heartbeatMessage: 'PING', // 字符串
// 或者
heartbeatMessage: { cmd: 'heartbeat' } // 对象 (会自动 JSON.stringify)
});
// 监听 pong 响应
ws.on('message', (event) => {
const data = JSON.parse(event.data);
if (data.type === 'pong') {
console.log('Heartbeat OK');
}
});
CLOSED (正常)
OPEN (熔断)
resetTimeout 后自动转为 HALF-OPENHALF-OPEN (测试)
ws.on('circuitChange', (state) => {
console.log(`Circuit breaker state: ${state}`);
if (state === 'OPEN') {
console.log('⚠️ Service temporarily unavailable');
}
});
// 手动查询状态
const circuitState = ws.getCircuitState(); // 'CLOSED', 'OPEN', or 'HALF-OPEN'
// 手动打开断路器
ws.openCircuit();
// 手动关闭断路器 (重置)
ws.closeCircuit();
| 事件 | 参数 | 描述 |
|---|---|---|
open | 无 | 连接成功建立 |
message | event | 收到消息 (原生 MessageEvent) |
close | {code, reason} | 连接关闭 |
error | Error | 发生错误 |
retry | {attempt, delay, maxRetries} | 即将重试 |
stateChange | state | 连接状态变化 |
circuitChange | state | 断路器状态变化 |
const stats = ws.getStats();
console.log(stats);
// {
// state: 'OPEN',
// circuitState: 'CLOSED',
// retryCount: 2,
// maxRetries: 10,
// circuitFailures: 0,
// failureThreshold: 5,
// lastActivity: 1709532000000,
// uptime: 15000
// }
CLOSED: 未连接CONNECTING: 正在连接OPEN: 已连接CLOSING: 正在关闭BLOCKED: 被断路器阻止FAILED: 超过最大重试次数const { WebSocketReconnect } = require('./scripts/websocket-reconnect.js');
class ChatClient {
constructor(userId) {
this.userId = userId;
this.ws = new WebSocketReconnect({
url: `wss://chat.example.com/socket?user=${userId}`,
maxRetries: 10,
baseDelay: 1000,
maxDelay: 30000,
heartbeatInterval: 30000,
heartbeatTimeout: 5000,
circuitBreaker: {
failureThreshold: 5,
resetTimeout: 60000
}
});
this.setupEventHandlers();
}
setupEventHandlers() {
this.ws.on('open', () => {
console.log('✅ Connected to chat server');
// 发送上线通知
this.ws.send(JSON.stringify({
type: 'online',
userId: this.userId
}));
});
this.ws.on('message', (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
});
this.ws.on('close', () => {
console.log('❌ Disconnected from chat server');
});
this.ws.on('error', (error) => {
console.error('⚠️ Chat error:', error.message);
});
this.ws.on('retry', (info) => {
console.log(`🔄 Reconnecting... Attempt ${info.attempt}/${info.maxRetries}`);
});
this.ws.on('circuitChange', (state) => {
if (state === 'OPEN') {
console.log('⚠️ Chat service temporarily unavailable');
}
});
}
handleMessage(message) {
switch (message.type) {
case 'pong':
// 心跳响应,忽略
break;
case 'chat':
console.log(`[${message.from}]: ${message.content}`);
break;
case 'system':
console.log(`[System]: ${message.content}`);
break;
}
}
sendMessage(content) {
if (this.ws.state === 'OPEN') {
this.ws.send(JSON.stringify({
type: 'chat',
content: content
}));
} else {
console.log('⚠️ Cannot send message: not connected');
}
}
connect() {
return this.ws.connect();
}
disconnect() {
this.ws.close();
}
}
// 使用示例
const chat = new ChatClient('user123');
chat.connect();
chat.sendMessage('Hello, world!');
const { WebSocketReconnect } = require('./scripts/websocket-reconnect.js');
class GraphQLSubscriptionClient {
constructor(endpoint, options = {}) {
this.ws = new WebSocketReconnect({
url: endpoint,
protocols: ['graphql-ws'],
maxRetries: options.maxRetries || 10,
heartbeatInterval: options.heartbeatInterval || 30000,
heartbeatMessage: JSON.stringify({ type: 'connection_init' }),
...options
});
this.subscriptions = new Map();
this.subscriptionId = 0;
this.setupEventHandlers();
}
setupEventHandlers() {
this.ws.on('open', () => {
console.log('✅ GraphQL WebSocket connected');
// 重新订阅所有订阅
this.resubscribeAll();
});
this.ws.on('message', (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
});
}
handleMessage(message) {
switch (message.type) {
case 'connection_ack':
console.log('GraphQL connection acknowledged');
break;
case 'next':
const subscription = this.subscriptions.get(message.id);
if (subscription && subscription.onData) {
subscription.onData(message.payload);
}
break;
case 'complete':
const sub = this.subscriptions.get(message.id);
if (sub && sub.onComplete) {
sub.onComplete();
}
this.subscriptions.delete(message.id);
break;
case 'error':
const s = this.subscriptions.get(message.id);
if (s && s.onError) {
s.onError(message.payload);
}
break;
}
}
subscribe(query, variables, callbacks) {
const id = String(++this.subscriptionId);
this.subscriptions.set(id, {
query,
variables,
onData: callbacks.onData,
onError: callbacks.onError,
onComplete: callbacks.onComplete
});
if (this.ws.state === 'OPEN') {
this.ws.send(JSON.stringify({
id,
type: 'subscribe',
payload: { query, variables }
}));
}
return () => this.unsubscribe(id);
}
unsubscribe(id) {
this.subscriptions.delete(id);
if (this.ws.state === 'OPEN') {
this.ws.send(JSON.stringify({
id,
type: 'complete'
}));
}
}
resubscribeAll() {
for (const [id, sub] of this.subscriptions) {
this.ws.send(JSON.stringify({
id,
type: 'subscribe',
payload: { query: sub.query, variables: sub.variables }
}));
}
}
connect() {
return this.ws.connect();
}
disconnect() {
this.ws.close();
}
}
| 选项 | 类型 | 默认值 | 描述 |
|---|---|---|---|
url | string | 必填 | WebSocket 服务器 URL |
protocols | array | [] | 子协议列表 |
websocketOptions | object | {} | WebSocket 构造函数选项 (Node.js) |
maxRetries | number | 10 | 最大重试次数 |
baseDelay | number | 1000 | 基础延迟 (毫秒) |
maxDelay | number | 30000 | 最大延迟 (毫秒) |
multiplier | number | 2 | 退避倍数 |
jitter | number | 0.1 | 抖动系数 (0-1) |
heartbeatInterval | number | 30000 | 心跳间隔 (毫秒) |
heartbeatTimeout | number | 5000 | 心跳超时 (毫秒) |
heartbeatMessage | string | {"type":"ping"} | 心跳消息 |
circuitBreaker.failureThreshold | number | 5 | 打开断路器的失败次数 |
circuitBreaker.resetTimeout | number | 60000 | 断路器重置超时 (毫秒) |
circuitBreaker.halfOpenMaxRequests | number | 3 | 半开状态最大请求数 |
// 关键业务:更多重试
maxRetries: 20, baseDelay: 2000
// 非关键业务:较少重试
maxRetries: 5, baseDelay: 1000
// 高频交易:短间隔
heartbeatInterval: 10000, heartbeatTimeout: 3000
// 普通应用:标准间隔
heartbeatInterval: 30000, heartbeatTimeout: 5000
// 低带宽场景:长间隔
heartbeatInterval: 60000, heartbeatTimeout: 10000
// 敏感服务:低阈值快速熔断
circuitBreaker: { failureThreshold: 3, resetTimeout: 30000 }
// 稳定服务:高阈值避免误熔断
circuitBreaker: { failureThreshold: 10, resetTimeout: 120000 }
// 应用退出前
ws.close(1001, 'Client shutting down');
// 而不是直接
process.exit();
原因: 服务器不稳定或网络问题
解决:
baseDelay 和 maxDelaymaxRetries原因: 服务器未响应 ping 或网络延迟高
解决:
heartbeatTimeout原因: 服务持续故障
解决:
failureThresholdresetTimeout 更快尝试恢复每个实例约 1-2MB (包括事件处理器和定时器)
支持所有现代浏览器 (Chrome, Firefox, Safari, Edge) 和 Node.js 14+
MIT