Microservice Patterns

v1.0.0

Provides guidance and patterns for decomposing monoliths, inter-service communication, data management, and resilience in microservice architectures.

2· 994·1 current·1 all-time
Security Scan
VirusTotalVirusTotal
Benign
View report →
OpenClawOpenClaw
Benign
high confidence
Purpose & Capability
Name, README, and SKILL.md all describe microservice design patterns and include illustrative code samples; there are no required binaries, env vars, or other resources that are inconsistent with that purpose.
Instruction Scope
SKILL.md provides architecture guidance and sample code (Python snippets for HTTP clients, Kafka producer, sagas, etc.). It does not instruct the agent to read system files, access secrets, call external endpoints at runtime, or exfiltrate data. Code examples reference service URLs and bootstrap servers only as illustrative placeholders.
Install Mechanism
There is no formal install spec in the registry (lowest risk). README includes example commands (an npx add pointing to a GitHub tree and cp into local skill folders). Those are documentation examples — they are not executed by the platform — but as with any copy/paste install command, users should exercise typical caution before running npx or copying from an unknown source.
Credentials
The skill declares no required environment variables or credentials. Some code examples would, in real deployments, need configuration (e.g., Kafka bootstrap servers, service URLs) — but those are illustrative and do not appear in the skill as required secrets.
Persistence & Privilege
The skill is not always-enabled and does not request system or cross-skill configuration changes. It is instruction-only and does not request persistent privileges.
Assessment
This skill appears to be a documentation/cheatsheet for microservice patterns and is internally consistent. Before installing or running any commands shown in the README: (1) verify the skill source/owner (there's no homepage and the owner ID is unfamiliar), (2) don't run the example npx/cp commands unless you trust the remote repository, (3) treat code snippets as examples — they reference external services (Kafka, service URLs) and will need proper credentials/config if you copy them into a real project, and (4) review any copied code for network calls or credential use before executing. If you want extra assurance, ask the author for a homepage/repo link or run the snippets in an isolated environment first.

Like a lobster shell, security has layers — review code before you run it.

latestvk97e50qjq2vfbmrsj03s75nkyd80wmf6
994downloads
2stars
1versions
Updated 1mo ago
v1.0.0
MIT-0

Microservices Patterns

WHAT

Patterns for building distributed systems: service decomposition, inter-service communication, data management, and resilience. Helps you avoid the "distributed monolith" anti-pattern.

WHEN

  • Decomposing a monolith into microservices
  • Designing service boundaries and contracts
  • Implementing inter-service communication
  • Managing distributed transactions
  • Building resilient distributed systems

KEYWORDS

microservices, service mesh, event-driven, saga, circuit breaker, API gateway, service discovery, distributed transactions, eventual consistency, CQRS


Decision Framework: When to Use Microservices

If you have...Then...
Small team (<5 devs), simple domainStart with monolith
Need independent deployment/scalingConsider microservices
Multiple teams, clear domain boundariesMicroservices work well
Tight deadlines, unknown requirementsMonolith first, extract later

Rule of thumb: If you can't define clear service boundaries, you're not ready for microservices.


Service Decomposition Patterns

Pattern 1: By Business Capability

Organize services around business functions, not technical layers.

E-commerce Example:
├── order-service       # Order lifecycle
├── payment-service     # Payment processing
├── inventory-service   # Stock management
├── shipping-service    # Fulfillment
└── notification-service # Emails, SMS

Pattern 2: Strangler Fig (Monolith Migration)

Gradually extract from monolith without big-bang rewrites.

1. Identify bounded context to extract
2. Create new microservice
3. Route new traffic to microservice
4. Gradually migrate existing functionality
5. Remove from monolith when complete
# API Gateway routing during migration
async def route_orders(request):
    if request.path.startswith("/api/orders/v2"):
        return await new_order_service.forward(request)
    else:
        return await legacy_monolith.forward(request)

Communication Patterns

Pattern 1: Synchronous (REST/gRPC)

Use for: Queries, when you need immediate response.

import httpx
from tenacity import retry, stop_after_attempt, wait_exponential

class ServiceClient:
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.client = httpx.AsyncClient(timeout=5.0)
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
    async def get(self, path: str):
        """GET with automatic retries."""
        response = await self.client.get(f"{self.base_url}{path}")
        response.raise_for_status()
        return response.json()

# Usage
payment_client = ServiceClient("http://payment-service:8001")
result = await payment_client.get(f"/payments/{payment_id}")

Pattern 2: Asynchronous (Events)

Use for: Commands, when eventual consistency is acceptable.

from aiokafka import AIOKafkaProducer
import json

@dataclass
class DomainEvent:
    event_id: str
    event_type: str
    aggregate_id: str
    occurred_at: datetime
    data: dict

class EventBus:
    def __init__(self, bootstrap_servers: List[str]):
        self.producer = AIOKafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode()
        )
    
    async def publish(self, event: DomainEvent):
        await self.producer.send_and_wait(
            event.event_type,  # Topic = event type
            value=asdict(event),
            key=event.aggregate_id.encode()
        )

# Order service publishes
await event_bus.publish(DomainEvent(
    event_id=str(uuid.uuid4()),
    event_type="OrderCreated",
    aggregate_id=order.id,
    occurred_at=datetime.now(),
    data={"order_id": order.id, "customer_id": order.customer_id}
))

# Inventory service subscribes and reacts
async def handle_order_created(event_data: dict):
    order_id = event_data["data"]["order_id"]
    items = event_data["data"]["items"]
    await reserve_inventory(order_id, items)

When to Use Each

SynchronousAsynchronous
Need immediate responseFire-and-forget
Simple query/responseLong-running operations
Low latency requiredDecoupling is priority
Tight coupling acceptableEventual consistency OK

Data Patterns

Database Per Service

Each service owns its data. No shared databases.

order-service     → orders_db (PostgreSQL)
payment-service   → payments_db (PostgreSQL)
product-service   → products_db (MongoDB)
analytics-service → analytics_db (ClickHouse)

Saga Pattern (Distributed Transactions)

For operations spanning multiple services that need rollback capability.

class SagaStep:
    def __init__(self, name: str, action: Callable, compensation: Callable):
        self.name = name
        self.action = action
        self.compensation = compensation

class OrderFulfillmentSaga:
    def __init__(self):
        self.steps = [
            SagaStep("create_order", self.create_order, self.cancel_order),
            SagaStep("reserve_inventory", self.reserve_inventory, self.release_inventory),
            SagaStep("process_payment", self.process_payment, self.refund_payment),
            SagaStep("confirm_order", self.confirm_order, self.cancel_confirmation),
        ]
    
    async def execute(self, order_data: dict) -> SagaResult:
        completed_steps = []
        context = {"order_data": order_data}
        
        for step in self.steps:
            try:
                result = await step.action(context)
                if not result.success:
                    await self.compensate(completed_steps, context)
                    return SagaResult(status="failed", error=result.error)
                completed_steps.append(step)
                context.update(result.data)
            except Exception as e:
                await self.compensate(completed_steps, context)
                return SagaResult(status="failed", error=str(e))
        
        return SagaResult(status="completed", data=context)
    
    async def compensate(self, completed_steps: List[SagaStep], context: dict):
        """Execute compensating actions in reverse order."""
        for step in reversed(completed_steps):
            try:
                await step.compensation(context)
            except Exception as e:
                # Log but continue compensating
                logger.error(f"Compensation failed for {step.name}: {e}")

Resilience Patterns

Circuit Breaker

Fail fast when a service is down. Prevents cascade failures.

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open" # Testing recovery

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        self.failure_count = 0
        self.success_count = 0
        self.state = CircuitState.CLOSED
        self.opened_at = None
    
    async def call(self, func: Callable, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpen("Service unavailable")
        
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.success_count = 0
    
    def _on_failure(self):
        self.failure_count += 1
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            self.opened_at = datetime.now()
    
    def _should_attempt_reset(self) -> bool:
        return datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout)

# Usage
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)

async def call_payment_service(data: dict):
    return await breaker.call(payment_client.post, "/payments", json=data)

Retry with Exponential Backoff

For transient failures.

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type((httpx.TimeoutException, httpx.HTTPStatusError))
)
async def fetch_user(user_id: str):
    response = await client.get(f"/users/{user_id}")
    response.raise_for_status()
    return response.json()

Bulkhead

Isolate resources to limit impact of failures.

import asyncio

class Bulkhead:
    def __init__(self, max_concurrent: int):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def call(self, func: Callable, *args, **kwargs):
        async with self.semaphore:
            return await func(*args, **kwargs)

# Limit concurrent calls to each service
payment_bulkhead = Bulkhead(max_concurrent=10)
inventory_bulkhead = Bulkhead(max_concurrent=20)

result = await payment_bulkhead.call(payment_service.charge, amount)

API Gateway Pattern

Single entry point for all clients.

from fastapi import FastAPI, Depends, HTTPException
from circuitbreaker import circuit

app = FastAPI()

class APIGateway:
    def __init__(self):
        self.clients = {
            "orders": httpx.AsyncClient(base_url="http://order-service:8000"),
            "payments": httpx.AsyncClient(base_url="http://payment-service:8001"),
            "inventory": httpx.AsyncClient(base_url="http://inventory-service:8002"),
        }
    
    @circuit(failure_threshold=5, recovery_timeout=30)
    async def forward(self, service: str, path: str, **kwargs):
        client = self.clients[service]
        response = await client.request(**kwargs, url=path)
        response.raise_for_status()
        return response.json()
    
    async def aggregate(self, order_id: str) -> dict:
        """Aggregate data from multiple services."""
        results = await asyncio.gather(
            self.forward("orders", f"/orders/{order_id}", method="GET"),
            self.forward("payments", f"/payments/order/{order_id}", method="GET"),
            self.forward("inventory", f"/reservations/order/{order_id}", method="GET"),
            return_exceptions=True
        )
        
        return {
            "order": results[0] if not isinstance(results[0], Exception) else None,
            "payment": results[1] if not isinstance(results[1], Exception) else None,
            "inventory": results[2] if not isinstance(results[2], Exception) else None,
        }

gateway = APIGateway()

@app.get("/api/orders/{order_id}")
async def get_order_aggregate(order_id: str):
    return await gateway.aggregate(order_id)

Health Checks

Every service needs liveness and readiness probes.

@app.get("/health/live")
async def liveness():
    """Is the process running?"""
    return {"status": "alive"}

@app.get("/health/ready")
async def readiness():
    """Can we serve traffic?"""
    checks = {
        "database": await check_database(),
        "cache": await check_redis(),
    }
    
    all_healthy = all(checks.values())
    status = "ready" if all_healthy else "not_ready"
    
    return {"status": status, "checks": checks}

NEVER

  • Shared Databases: Creates tight coupling, defeats the purpose
  • Synchronous Chains: A → B → C → D = fragile, slow
  • No Circuit Breakers: One service down takes everything down
  • Distributed Monolith: Services that must deploy together
  • Ignoring Network Failures: Assume the network WILL fail
  • No Compensation Logic: Can't undo failed distributed transactions
  • Starting with Microservices: Always start with a well-structured monolith
  • Chatty Services: Too many inter-service calls = latency death

Comments

Loading comments...