Dual Stream Architecture
Dual-stream event publishing combining Kafka for durability with Redis Pub/Sub for real-time delivery. Use when building event-driven systems needing both guaranteed delivery and low-latency updates. Triggers on dual stream, event publishing, Kafka Redis, real-time events, pub/sub, streaming architecture.
MIT-0 · Free to use, modify, and redistribute. No attribution required.
⭐ 0 · 617 · 2 current installs · 2 all-time installs
by@wpank
MIT-0
Security Scan
OpenClaw
Benign
high confidencePurpose & Capability
The name/description match the included content: SKILL.md is a design/implementation guide for publishing to Kafka (durability) and Redis Pub/Sub (real-time). The code examples and guidance are coherent with the described purpose.
Instruction Scope
SKILL.md contains code snippets, architecture diagrams, and operational guidance only. It does not instruct the agent to read unrelated system files, exfiltrate data, or call external endpoints outside of normal installation guidance. The README contains manual copy/install suggestions (including paths under the user's home) but these are standard packaging instructions rather than runtime data collection.
Install Mechanism
There is no declared install spec in the skill metadata (instruction-only), which is low-risk. The README suggests using npx to fetch/install the package and includes a GitHub tree-style URL in an example; pulling code over npx or using non-standard URLs is a general supply-chain risk — verify the source before running those commands.
Credentials
The skill declares no required environment variables or credentials, which is appropriate for an instruction-only pattern. However, the examples assume Kafka and Redis clients (which in real deployments require connection strings / credentials). The skill does not declare or attempt to access credentials itself — you will need to supply those when you implement the pattern.
Persistence & Privilege
always:false and no config paths or installation hooks are declared. The skill does not request persistent presence or elevated privileges and does not modify other skills or global agent settings.
Assessment
This is an architecture/implementation guide (no executable code bundled), and overall it is coherent with its purpose. Before installing or following the README examples: 1) Verify the origin of any npx or GitHub URL used — don't run install commands that fetch code from untrusted or unclear sources. 2) Expect to provide Kafka and Redis connection configuration (host, port, TLS settings, credentials) in your application — the skill doesn't manage or request them. 3) When you copy or install the skill manually, review the files you pull into your environment. 4) If you intend to let an autonomous agent use this skill, it's low-risk because it doesn't request secrets, but still avoid granting the agent network/credential access to your Kafka/Redis clusters unless you trust it. 5) If you need a packaged/official release, prefer installs from a verified registry or a trusted repository rather than an ad-hoc tree URL.Like a lobster shell, security has layers — review code before you run it.
Current versionv1.0.0
Download ziplatest
License
MIT-0
Free to use, modify, and redistribute. No attribution required.
SKILL.md
Dual-Stream Architecture
Publish events to Kafka (durability) and Redis Pub/Sub (real-time) simultaneously for systems needing both guaranteed delivery and instant updates.
Installation
OpenClaw / Moltbot / Clawbot
npx clawhub@latest install dual-stream-architecture
When to Use
- Event-driven systems needing both durability AND real-time
- WebSocket/SSE backends that push live updates
- Dashboards showing events as they happen
- Kafka consumers have lag but users expect instant updates
Core Pattern
type DualPublisher struct {
kafka *kafka.Writer
redis *redis.Client
logger *slog.Logger
}
func (p *DualPublisher) Publish(ctx context.Context, event Event) error {
// 1. Kafka: Critical path - must succeed
payload, _ := json.Marshal(event)
err := p.kafka.WriteMessages(ctx, kafka.Message{
Key: []byte(event.SourceID),
Value: payload,
})
if err != nil {
return fmt.Errorf("kafka publish failed: %w", err)
}
// 2. Redis: Best-effort - don't fail the operation
p.publishToRedis(ctx, event)
return nil
}
func (p *DualPublisher) publishToRedis(ctx context.Context, event Event) {
// Lightweight payload (full event in Kafka)
notification := map[string]interface{}{
"id": event.ID,
"type": event.Type,
"source_id": event.SourceID,
}
payload, _ := json.Marshal(notification)
channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID)
// Fire and forget - log errors but don't propagate
if err := p.redis.Publish(ctx, channel, payload).Err(); err != nil {
p.logger.Warn("redis publish failed", "error", err)
}
}
Architecture
┌──────────────┐ ┌─────────────────┐ ┌──────────────┐
│ Ingester │────▶│ DualPublisher │────▶│ Kafka │──▶ Event Processor
│ │ │ │ │ (durable) │
└──────────────┘ │ │ └──────────────┘
│ │ ┌──────────────┐
│ │────▶│ Redis PubSub │──▶ WebSocket Gateway
│ │ │ (real-time) │
└─────────────────┘ └──────────────┘
Channel Naming Convention
events:{source_type}:{source_id}
Examples:
- events:user:octocat - Events for user octocat
- events:repo:owner/repo - Events for a repository
- events:org:microsoft - Events for an organization
Batch Publishing
For high throughput:
func (p *DualPublisher) PublishBatch(ctx context.Context, events []Event) error {
// 1. Batch to Kafka
messages := make([]kafka.Message, len(events))
for i, event := range events {
payload, _ := json.Marshal(event)
messages[i] = kafka.Message{
Key: []byte(event.SourceID),
Value: payload,
}
}
if err := p.kafka.WriteMessages(ctx, messages...); err != nil {
return fmt.Errorf("kafka batch failed: %w", err)
}
// 2. Redis: Pipeline for efficiency
pipe := p.redis.Pipeline()
for _, event := range events {
channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID)
notification, _ := json.Marshal(map[string]interface{}{
"id": event.ID,
"type": event.Type,
})
pipe.Publish(ctx, channel, notification)
}
if _, err := pipe.Exec(ctx); err != nil {
p.logger.Warn("redis batch failed", "error", err)
}
return nil
}
Decision Tree
| Requirement | Stream | Why |
|---|---|---|
| Must not lose event | Kafka only | Ack required, replicated |
| User sees immediately | Redis only | Sub-ms delivery |
| Both durability + real-time | Dual stream | This pattern |
| High volume (>10k/sec) | Kafka, batch Redis | Redis can bottleneck |
| Many subscribers per channel | Redis + local fan-out | Don't hammer Redis |
Related Skills
- Meta-skill: ai/skills/meta/realtime-dashboard/ — Complete realtime dashboard guide
- websocket-hub-patterns — WebSocket gateway
- backend/service-layer-architecture — Service integration
NEVER Do
- NEVER fail on Redis errors — Redis is best-effort. Log and continue.
- NEVER send full payload to Redis — Send IDs only, clients fetch from API.
- NEVER create one Redis channel per event — Use source-level channels.
- NEVER skip Kafka for "unimportant" events — All events go to Kafka for replay.
- NEVER use Redis Pub/Sub for persistence — Messages are fire-and-forget.
Edge Cases
| Case | Solution |
|---|---|
| Redis down | Log warning, continue with Kafka only |
| Client connects mid-stream | Query API for recent events, then subscribe |
| High channel cardinality | Use wildcard patterns or aggregate channels |
| Kafka backpressure | Buffer in memory with timeout, fail if full |
| Need event replay | Consume from Kafka from offset, not Redis |
Files
2 totalSelect a file
Select a file to preview.
Comments
Loading comments…
