Dual Stream Architecture

Dual-stream event publishing combining Kafka for durability with Redis Pub/Sub for real-time delivery. Use when building event-driven systems needing both guaranteed delivery and low-latency updates. Triggers on dual stream, event publishing, Kafka Redis, real-time events, pub/sub, streaming architecture.

MIT-0 · Free to use, modify, and redistribute. No attribution required.
0 · 617 · 2 current installs · 2 all-time installs
MIT-0
Security Scan
VirusTotalVirusTotal
Benign
View report →
OpenClawOpenClaw
Benign
high confidence
Purpose & Capability
The name/description match the included content: SKILL.md is a design/implementation guide for publishing to Kafka (durability) and Redis Pub/Sub (real-time). The code examples and guidance are coherent with the described purpose.
Instruction Scope
SKILL.md contains code snippets, architecture diagrams, and operational guidance only. It does not instruct the agent to read unrelated system files, exfiltrate data, or call external endpoints outside of normal installation guidance. The README contains manual copy/install suggestions (including paths under the user's home) but these are standard packaging instructions rather than runtime data collection.
Install Mechanism
There is no declared install spec in the skill metadata (instruction-only), which is low-risk. The README suggests using npx to fetch/install the package and includes a GitHub tree-style URL in an example; pulling code over npx or using non-standard URLs is a general supply-chain risk — verify the source before running those commands.
Credentials
The skill declares no required environment variables or credentials, which is appropriate for an instruction-only pattern. However, the examples assume Kafka and Redis clients (which in real deployments require connection strings / credentials). The skill does not declare or attempt to access credentials itself — you will need to supply those when you implement the pattern.
Persistence & Privilege
always:false and no config paths or installation hooks are declared. The skill does not request persistent presence or elevated privileges and does not modify other skills or global agent settings.
Assessment
This is an architecture/implementation guide (no executable code bundled), and overall it is coherent with its purpose. Before installing or following the README examples: 1) Verify the origin of any npx or GitHub URL used — don't run install commands that fetch code from untrusted or unclear sources. 2) Expect to provide Kafka and Redis connection configuration (host, port, TLS settings, credentials) in your application — the skill doesn't manage or request them. 3) When you copy or install the skill manually, review the files you pull into your environment. 4) If you intend to let an autonomous agent use this skill, it's low-risk because it doesn't request secrets, but still avoid granting the agent network/credential access to your Kafka/Redis clusters unless you trust it. 5) If you need a packaged/official release, prefer installs from a verified registry or a trusted repository rather than an ad-hoc tree URL.

Like a lobster shell, security has layers — review code before you run it.

Current versionv1.0.0
Download zip
latestvk970my2hcjdsb3ac8qgjvctbjn80wv3t

License

MIT-0
Free to use, modify, and redistribute. No attribution required.

SKILL.md

Dual-Stream Architecture

Publish events to Kafka (durability) and Redis Pub/Sub (real-time) simultaneously for systems needing both guaranteed delivery and instant updates.

Installation

OpenClaw / Moltbot / Clawbot

npx clawhub@latest install dual-stream-architecture

When to Use

  • Event-driven systems needing both durability AND real-time
  • WebSocket/SSE backends that push live updates
  • Dashboards showing events as they happen
  • Kafka consumers have lag but users expect instant updates

Core Pattern

type DualPublisher struct {
    kafka  *kafka.Writer
    redis  *redis.Client
    logger *slog.Logger
}

func (p *DualPublisher) Publish(ctx context.Context, event Event) error {
    // 1. Kafka: Critical path - must succeed
    payload, _ := json.Marshal(event)
    err := p.kafka.WriteMessages(ctx, kafka.Message{
        Key:   []byte(event.SourceID),
        Value: payload,
    })
    if err != nil {
        return fmt.Errorf("kafka publish failed: %w", err)
    }

    // 2. Redis: Best-effort - don't fail the operation
    p.publishToRedis(ctx, event)

    return nil
}

func (p *DualPublisher) publishToRedis(ctx context.Context, event Event) {
    // Lightweight payload (full event in Kafka)
    notification := map[string]interface{}{
        "id":        event.ID,
        "type":      event.Type,
        "source_id": event.SourceID,
    }

    payload, _ := json.Marshal(notification)
    channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID)

    // Fire and forget - log errors but don't propagate
    if err := p.redis.Publish(ctx, channel, payload).Err(); err != nil {
        p.logger.Warn("redis publish failed", "error", err)
    }
}

Architecture

┌──────────────┐     ┌─────────────────┐     ┌──────────────┐
│   Ingester   │────▶│  DualPublisher  │────▶│    Kafka     │──▶ Event Processor
│              │     │                 │     │  (durable)   │
└──────────────┘     │                 │     └──────────────┘
                     │                 │     ┌──────────────┐
                     │                 │────▶│ Redis PubSub │──▶ WebSocket Gateway
                     │                 │     │ (real-time)  │
                     └─────────────────┘     └──────────────┘

Channel Naming Convention

events:{source_type}:{source_id}

Examples:
- events:user:octocat      - Events for user octocat
- events:repo:owner/repo   - Events for a repository
- events:org:microsoft     - Events for an organization

Batch Publishing

For high throughput:

func (p *DualPublisher) PublishBatch(ctx context.Context, events []Event) error {
    // 1. Batch to Kafka
    messages := make([]kafka.Message, len(events))
    for i, event := range events {
        payload, _ := json.Marshal(event)
        messages[i] = kafka.Message{
            Key:   []byte(event.SourceID),
            Value: payload,
        }
    }

    if err := p.kafka.WriteMessages(ctx, messages...); err != nil {
        return fmt.Errorf("kafka batch failed: %w", err)
    }

    // 2. Redis: Pipeline for efficiency
    pipe := p.redis.Pipeline()
    for _, event := range events {
        channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID)
        notification, _ := json.Marshal(map[string]interface{}{
            "id":   event.ID,
            "type": event.Type,
        })
        pipe.Publish(ctx, channel, notification)
    }
    
    if _, err := pipe.Exec(ctx); err != nil {
        p.logger.Warn("redis batch failed", "error", err)
    }

    return nil
}

Decision Tree

RequirementStreamWhy
Must not lose eventKafka onlyAck required, replicated
User sees immediatelyRedis onlySub-ms delivery
Both durability + real-timeDual streamThis pattern
High volume (>10k/sec)Kafka, batch RedisRedis can bottleneck
Many subscribers per channelRedis + local fan-outDon't hammer Redis

Related Skills


NEVER Do

  • NEVER fail on Redis errors — Redis is best-effort. Log and continue.
  • NEVER send full payload to Redis — Send IDs only, clients fetch from API.
  • NEVER create one Redis channel per event — Use source-level channels.
  • NEVER skip Kafka for "unimportant" events — All events go to Kafka for replay.
  • NEVER use Redis Pub/Sub for persistence — Messages are fire-and-forget.

Edge Cases

CaseSolution
Redis downLog warning, continue with Kafka only
Client connects mid-streamQuery API for recent events, then subscribe
High channel cardinalityUse wildcard patterns or aggregate channels
Kafka backpressureBuffer in memory with timeout, fail if full
Need event replayConsume from Kafka from offset, not Redis

Files

2 total
Select a file
Select a file to preview.

Comments

Loading comments…