ia-python-services

MCP Tools

Python patterns for CLI tools, async concurrency, and backend services. Use when working with Python code, building CLI apps, FastAPI services, async with asyncio, background jobs, or configuring uv, ruff, ty, pytest, or pyproject.toml.

Install

openclaw skills install compound-eng-python-services

Python Services & CLI

Modern Tooling

ToolReplacesPurpose
uvpip, virtualenv, pyenv, pipxPackage/dependency management
ruffflake8, black, isortLinting + formatting
tymypy, pyrightType checking (Astral, faster)
  • uv init --package myproject for distributable packages, uv init for apps
  • uv add <pkg>, uv add --group dev <pkg>, never edit pyproject.toml deps manually
  • uv run <cmd> instead of activating venvs -- auto-activates the venv without explicit activation
  • uv add --upgrade <pkg> to upgrade a single package without touching others
  • uv tree --outdated to preview what would be upgraded before committing
  • uv.lock goes in version control
  • Use [dependency-groups] (PEP 735) for dev/test/docs, not [project.optional-dependencies]
  • PEP 723 inline metadata for standalone scripts with deps
  • ruff check --fix . && ruff format . for lint+format in one pass

Standard project layout:

src/mypackage/
    __init__.py
    main.py
    services/
    models/
tests/
    conftest.py
    test_main.py
pyproject.toml

See cli-tools.md for Click patterns, argparse, and CLI project layout.

Parallelism

WorkloadApproach
Many concurrent I/O callsasyncio (gather, create_task)
CPU-bound computationmultiprocessing.Pool or concurrent.futures.ProcessPoolExecutor
Mixed I/O + CPUasyncio.to_thread() to offload blocking work
Simple scripts, few connectionsStay synchronous

Sync vs Async Decision

Use async (asyncio) when:

  • I/O-bound work has multiple concurrent operations (HTTP calls, database queries, file I/O happening in parallel)
  • WebSocket servers or long-lived connections require it
  • The framework requires it (FastAPI async endpoints, aiohttp)

Stay synchronous when:

  • Work is CPU-bound (computation, data transformation) -- async adds nothing, use multiprocessing instead
  • Building simple scripts and CLI tools with sequential I/O
  • All I/O is sequential anyway (one DB query, process result, one API call)
  • The team lacks async debugging experience (asyncio stack traces are harder to read)

Rule of thumb: if the code is not waiting on multiple I/O operations concurrently, sync is simpler and correct. Do not add async complexity for a single sequential pipeline.

Key rule: Stay fully sync or fully async within a call path.

asyncio patterns:

  • asyncio.gather(*tasks) for concurrent I/O -- use return_exceptions=True for partial failure tolerance
  • asyncio.TaskGroup (3.11+) for structured concurrency -- automatic cancellation of sibling tasks on failure; prefer over gather when all tasks must succeed
  • asyncio.Semaphore(n) to limit concurrency (rate limiting external APIs)
  • asyncio.wait_for(coro, timeout=N) for timeouts
  • asyncio.Queue for producer-consumer
  • asyncio.Lock when coroutines share mutable state
  • Never block the event loop: asyncio.to_thread(sync_fn) for sync libs, aiohttp/httpx.AsyncClient for HTTP
  • Handle CancelledError -- always re-raise after cleanup
  • Async generators (async for) for streaming/pagination

multiprocessing for CPU-bound:

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as pool:
    results = list(pool.map(cpu_task, items))

See fastapi.md for project structure, lifespan, config, DI, async DB, and repository pattern.

Background Jobs

  • Return job ID immediately, process async. Client polls /jobs/{id} for status
  • Celery: @app.task(bind=True, max_retries=3, autoretry_for=(ConnectionError,)) -- exponential backoff: raise self.retry(countdown=2**self.request.retries * 60)
  • Alternatives: Dramatiq (modern Celery), RQ (simple Redis), cloud-native (SQS+Lambda, Cloud Tasks)
  • Idempotency is mandatory -- tasks may retry. Use idempotency keys for external calls, check-before-write, upsert patterns
  • Dead letter queue for permanently failed tasks after max retries
  • Task workflows: chain(a.s(), b.s()) for sequential, group(...) for parallel, chord(group, callback) for fan-out/fan-in

Resilience

Retries with tenacity:

from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type

@retry(
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    stop=stop_after_attempt(5) | stop_after_delay(60),
    wait=wait_exponential_jitter(initial=1, max=30),
    before_sleep=log_retry_attempt,
)
def call_api(url: str) -> dict: ...
  • Retry only transient errors: network, 429/502/503/504. Never retry 4xx (except 429), auth errors, validation errors
  • Every network call needs a timeout
  • @fail_safe(default=[]) decorator for non-critical paths -- return cached/default on failure
  • functools.lru_cache(maxsize=N) for pure-function memoization; functools.cache (unbounded) for small domains
  • Stack decorators: @traced @with_timeout(30) @retry(...) -- separate infra from business logic

Connection pooling is mandatory for production: reuse httpx.AsyncClient() across requests, configure SQLAlchemy pool_size/max_overflow, use aiohttp.TCPConnector(limit=N).

Production Resilience

  • Fail-fast config validation: use a Pydantic BaseSettings model with model_validator to parse and validate all environment variables at startup. If invalid, crash before serving traffic. Never discover a missing secret on the first request that needs it.
  • Health endpoints: expose /health (shallow liveness -- returns 200 if the process responds) and /ready (deep readiness -- verifies database, Redis, and critical dependencies are reachable). Load balancers route traffic based on /ready; orchestrators restart based on /health.

Observability

  • structlog for JSON structured logging. Configure once at startup with JSONRenderer, TimeStamper, merge_contextvars
  • Correlation IDs -- generate at ingress (X-Correlation-ID header), bind to contextvars, propagate to downstream calls
  • Log levels: DEBUG=diagnostics, INFO=operations, WARNING=anomalies handled, ERROR=failures needing attention. Never log expected behavior at ERROR
  • Prometheus metrics -- track latency (Histogram), traffic (Counter), errors (Counter), saturation (Gauge). Keep label cardinality bounded (no user IDs)
  • OpenTelemetry for distributed tracing across services
  • Never mutate LogRecord attributes from a Formatter. A custom logging.Formatter.format() that rewrites record.name (or any record attribute) in place leaks to every other handler attached to the same logger and to pytest caplog. Logger.callHandlers passes the same LogRecord object to each handler — whichever formats first wins the mutation, and downstream handlers and test filters see the modified state. Tests filtering by full logger name (if r.name == "src.services.foo") then silently miss; routing handlers doing LOGGER_TO_MODEL.get(record.name) fall through to defaults. Use a logging.Filter that adds a non-mutating attribute (record.short_name) and reference it in the format string as %(short_name)s, or override formatMessage instead of format. try/finally restore works for synchronous handler chains but is fragile under async handlers that interleave.

Discipline

  • Simplicity first -- every change as simple as possible, impact minimal code
  • Only touch what's necessary -- avoid introducing unrelated changes
  • No hacky workarounds -- if a fix feels wrong, step back and implement the clean solution
  • Before adding a new abstraction, verify it appears in 3+ places. If not, inline it.
  • Verify: see Verify section below -- pass all checks with zero warnings before declaring done
  • Coverage target: 80%+ (uv run pytest --cov --cov-report=html)

Testing Patterns

  • pytest flags: --lf (last failed), -x (stop on first failure), -k "pattern" (filter), --pdb (debugger on failure)
  • Fixtures: use conftest.py for shared fixtures. Scope wisely: @pytest.fixture(scope="session") for expensive setup (DB connections), scope="function" (default) for test isolation
  • tmp_path: built-in fixture for temp files -- no manual cleanup needed
  • Parametrize with IDs: @pytest.mark.parametrize("input,expected", [...], ids=["empty", "single", "overflow"]) for readable test names
  • Mock discipline: always autospec=True on mocks to catch API drift. assert_awaited_once() for async mocks.
  • Test markers: register in pyproject.toml under [tool.pytest.ini_options] with markers = ["slow", "integration"]. Run fast tests with -m "not slow".
  • Protocol duck typing: use class Renderable(Protocol) for structural typing at service boundaries -- enables testing with plain objects instead of mocks
  • Context managers: @contextmanager for connection/transaction lifecycle. Always implement __exit__ cleanup.

Error Handling

  • Validate inputs at boundaries before expensive ops. Report all errors at once when possible
  • Use specific exceptions: ValueError, TypeError, KeyError, not bare Exception
  • raise ServiceError("upload failed") from e -- always chain to preserve debug trail
  • Convert external data to domain types (enums, Pydantic models) at system boundaries
  • Batch processing: BatchResult(succeeded={}, failed={}) -- don't let one item abort the batch
  • Pydantic BaseModel with field_validator for complex input validation

Migrations

  • Separate schema and data migrations -- data backfills in their own migration file
  • Renames/removals use expand-contract: add new column → backfill → switch reads → drop old (see ia-postgresql skill for the full pattern)
  • Never edit a migration that has already run in a shared environment
  • Alembic: use --autogenerate as a starting point, always review generated SQL before committing
  • Test migrations against production-sized data -- a migration that takes 2ms on dev can lock a table for minutes in production

API Design

  • Contract-first: define Pydantic BaseModel request/response schemas and FastAPI response_model before writing endpoint logic. The schema is the contract -- implementation follows. Generate OpenAPI docs from these models automatically.
  • Hyrum's Law awareness: every observable response field, ordering, or timing becomes a dependency for callers. Use explicit response_model and model_config = ConfigDict(extra="forbid") to control exactly what's serialized -- never return raw dicts or ORM objects from endpoints.
  • Addition over modification: add new optional fields (field: str | None = None) rather than changing or removing existing ones. Removing a Pydantic field from a response model breaks callers silently. Deprecate first (Field(deprecated=True)), remove in a later version.
  • Consistent error structure: all exceptions should produce the same envelope: {"error": {"code": "...", "message": "...", "details": ...}}. Register @app.exception_handler for RequestValidationError, HTTPException, and application-specific exceptions to normalize into one format. Callers build error handling once.
  • Boundary validation via Pydantic: validate at the endpoint/handler level with Pydantic models and FastAPI's automatic request parsing. Internal services and repositories trust that input was validated at entry -- no redundant validation scattered through business logic.
  • Third-party responses are untrusted data: validate shape and content of external API responses before using them in logic, rendering, or decision-making. A compromised or misbehaving service can return unexpected types, malicious content, or missing fields. Parse through a Pydantic model before use.

Verify

  • uv run pytest passes with zero failures
  • uv run ruff check . passes with zero warnings
  • uv run ty check . passes with zero errors
  • Coverage target: 80%+ (uv run pytest --cov)