Websocket Hub Patterns

Horizontally-scalable WebSocket hub pattern with lazy Redis subscriptions, connection registry, and graceful shutdown. Use when building real-time WebSocket servers that scale across multiple instances. Triggers on WebSocket hub, WebSocket scaling, connection registry, Redis WebSocket, real-time gateway, horizontal scaling.

MIT-0 · Free to use, modify, and redistribute. No attribution required.
0 · 617 · 0 current installs · 0 all-time installs
MIT-0
Security Scan
VirusTotalVirusTotal
Benign
View report →
OpenClawOpenClaw
Benign
high confidence
Purpose & Capability
The name/description (WebSocket hub + Redis coordination) match the content: Go code patterns for connection registry, lazy Redis subscriptions, message forwarding, and graceful shutdown. No unexplained credentials, binaries, or unrelated capabilities are requested.
Instruction Scope
SKILL.md contains design explanation and sample Go code only. It does not instruct the agent to read arbitrary files, access unrelated environment variables, exfiltrate data, or contact external endpoints beyond developer install hints. All shown actions are scoped to implementing a WebSocket hub.
Install Mechanism
There is no formal install spec in the registry entry (instruction-only). README shows developer install hints (npx, copying into local skill folders). No downloads or extract-from-URL install steps that would write or execute arbitrary remote code. This is low-risk for the platform.
Credentials
The skill declares no required env vars or credentials. The Go patterns reference a Redis client (expected for Redis-backed coordination) but do not request credentials in the skill metadata—credential needs would arise only when a developer implements the pattern (e.g., providing Redis endpoint/credentials), which is proportionate to the purpose.
Persistence & Privilege
The skill is not always-enabled and is user-invocable. It does not request persistent system-wide configuration changes or access to other skills' configs. Autonomous invocation is allowed (platform default) but reasonable for an instruction-only developer pattern.
Assessment
This skill is a reusable design pattern (sample Go code) for building horizontally-scalable WebSocket hubs with Redis; it is internally consistent. Before using in production: verify the code fits your auth model (you will need to supply Redis credentials and secure client auth), review and test error handling/timeouts/rate-limiting, and confirm the source is trustworthy (homepage is missing and the registry source is 'unknown' — prefer code from a known repo or audit the files you copy). The README references npx/copy-based installation hints rather than a vetted package—treat any downloaded code like third-party code and review it before running in your environment.

Like a lobster shell, security has layers — review code before you run it.

Current versionv1.0.0
Download zip
latestvk970ayy4qk5cq21ns0sdeb6ya580xh66

License

MIT-0
Free to use, modify, and redistribute. No attribution required.

SKILL.md

WebSocket Hub Patterns

Production patterns for horizontally-scalable WebSocket connections with Redis-backed coordination.

Installation

OpenClaw / Moltbot / Clawbot

npx clawhub@latest install websocket-hub-patterns

When to Use

  • Real-time bidirectional communication
  • Chat applications, collaborative editing
  • Live dashboards with client interactions
  • Need horizontal scaling across multiple gateway instances

Hub Structure

type Hub struct {
    // Local state
    connections   map[*Connection]bool
    subscriptions map[string]map[*Connection]bool // channel -> connections

    // Channels
    register   chan *Connection
    unregister chan *Connection
    broadcast  chan *Event

    // Redis for scaling
    redisClient  *redis.Client
    redisSubs    map[string]*goredis.PubSub
    redisSubLock sync.Mutex

    // Optional: Distributed registry
    connRegistry *ConnectionRegistry
    instanceID   string

    // Shutdown
    done chan struct{}
    wg   sync.WaitGroup
}

Hub Main Loop

func (h *Hub) Run() {
    for {
        select {
        case <-h.done:
            return

        case conn := <-h.register:
            h.connections[conn] = true
            if h.connRegistry != nil {
                h.connRegistry.RegisterConnection(ctx, conn.ID(), info)
            }

        case conn := <-h.unregister:
            if _, ok := h.connections[conn]; ok {
                if h.connRegistry != nil {
                    h.connRegistry.UnregisterConnection(ctx, conn.ID())
                }
                h.removeConnection(conn)
            }

        case event := <-h.broadcast:
            h.broadcastToChannel(event)
        }
    }
}

Lazy Redis Subscriptions

Subscribe to Redis only when first local subscriber joins:

func (h *Hub) subscribeToChannel(conn *Connection, channel string) error {
    // Add to local subscriptions
    if h.subscriptions[channel] == nil {
        h.subscriptions[channel] = make(map[*Connection]bool)
    }
    h.subscriptions[channel][conn] = true

    // Lazy: Only subscribe to Redis on first subscriber
    h.redisSubLock.Lock()
    defer h.redisSubLock.Unlock()

    if _, exists := h.redisSubs[channel]; !exists {
        pubsub := h.redisClient.Subscribe(context.Background(), channel)
        h.redisSubs[channel] = pubsub
        go h.forwardRedisMessages(channel, pubsub)
    }

    return nil
}

func (h *Hub) unsubscribeFromChannel(conn *Connection, channel string) {
    if subs, ok := h.subscriptions[channel]; ok {
        delete(subs, conn)

        // Cleanup when no local subscribers
        if len(subs) == 0 {
            delete(h.subscriptions, channel)
            h.closeRedisSubscription(channel)
        }
    }
}

Redis Message Forwarding

func (h *Hub) forwardRedisMessages(channel string, pubsub *goredis.PubSub) {
    ch := pubsub.Channel()
    for {
        select {
        case <-h.done:
            return
        case msg, ok := <-ch:
            if !ok {
                return
            }
            h.broadcast <- &Event{
                Channel: channel,
                Data:    []byte(msg.Payload),
            }
        }
    }
}

func (h *Hub) broadcastToChannel(event *Event) {
    subs := h.subscriptions[event.Channel]
    for conn := range subs {
        select {
        case conn.send <- event.Data:
            // Sent
        default:
            // Buffer full - close slow client
            h.removeConnection(conn)
        }
    }
}

Connection Write Pump

func (c *Connection) writePump() {
    ticker := time.NewTicker(54 * time.Second) // Ping interval
    defer func() {
        ticker.Stop()
        c.conn.Close()
    }()

    for {
        select {
        case message, ok := <-c.send:
            c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if !ok {
                c.conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }
            c.conn.WriteMessage(websocket.TextMessage, message)

            // Batch drain queue
            for i := 0; i < len(c.send); i++ {
                c.conn.WriteMessage(websocket.TextMessage, <-c.send)
            }

        case <-ticker.C:
            if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

Connection Registry for Horizontal Scaling

type ConnectionRegistry struct {
    client     *redis.Client
    instanceID string
}

func (r *ConnectionRegistry) RegisterConnection(ctx context.Context, connID string, info ConnectionInfo) error {
    info.InstanceID = r.instanceID
    data, _ := json.Marshal(info)
    return r.client.Set(ctx, "ws:conn:"+connID, data, 2*time.Minute).Err()
}

func (r *ConnectionRegistry) HeartbeatInstance(ctx context.Context, connectionCount int) error {
    info := InstanceInfo{
        InstanceID:  r.instanceID,
        Connections: connectionCount,
    }
    data, _ := json.Marshal(info)
    return r.client.Set(ctx, "ws:instance:"+r.instanceID, data, 30*time.Second).Err()
}

Graceful Shutdown

func (h *Hub) Shutdown() {
    close(h.done)

    // Close all Redis subscriptions
    h.redisSubLock.Lock()
    for channel, pubsub := range h.redisSubs {
        pubsub.Close()
        delete(h.redisSubs, channel)
    }
    h.redisSubLock.Unlock()

    // Close all connections
    for conn := range h.connections {
        conn.Close()
    }

    h.wg.Wait()
}

Decision Tree

SituationApproach
Single instanceSkip ConnectionRegistry
Multi-instanceEnable ConnectionRegistry
No subscribers to channelLazy unsubscribe from Redis
Slow clientClose on buffer overflow
Need message historyUse Redis Streams + Pub/Sub

Related Skills


NEVER Do

  • NEVER block on conn.send — Use select with default to detect overflow
  • NEVER skip graceful shutdown — Clients need close frames
  • NEVER share pubsub across channels — Each channel needs own subscription
  • NEVER forget instance heartbeat — Dead instances leave orphaned connections
  • NEVER send without ping/pong — Load balancers close "idle" connections

Files

2 total
Select a file
Select a file to preview.

Comments

Loading comments…