Install
openclaw skills install compound-eng-python-servicesPython patterns for CLI tools, async concurrency, and backend services. Use when working with Python code, building CLI apps, FastAPI services, async with asyncio, background jobs, or configuring uv, ruff, ty, pytest, or pyproject.toml.
openclaw skills install compound-eng-python-services| Tool | Replaces | Purpose |
|---|---|---|
| uv | pip, virtualenv, pyenv, pipx | Package/dependency management |
| ruff | flake8, black, isort | Linting + formatting |
| ty | mypy, pyright | Type checking (Astral, faster) |
uv init --package myproject for distributable packages, uv init for appsuv add <pkg>, uv add --group dev <pkg>, never edit pyproject.toml deps manuallyuv run <cmd> instead of activating venvs -- auto-activates the venv without explicit activationuv add --upgrade <pkg> to upgrade a single package without touching othersuv tree --outdated to preview what would be upgraded before committinguv.lock goes in version control[dependency-groups] (PEP 735) for dev/test/docs, not [project.optional-dependencies]ruff check --fix . && ruff format . for lint+format in one passStandard project layout:
src/mypackage/
__init__.py
main.py
services/
models/
tests/
conftest.py
test_main.py
pyproject.toml
See cli-tools.md for Click patterns, argparse, and CLI project layout.
| Workload | Approach |
|---|---|
| Many concurrent I/O calls | asyncio (gather, create_task) |
| CPU-bound computation | multiprocessing.Pool or concurrent.futures.ProcessPoolExecutor |
| Mixed I/O + CPU | asyncio.to_thread() to offload blocking work |
| Simple scripts, few connections | Stay synchronous |
Use async (asyncio) when:
Stay synchronous when:
Rule of thumb: if the code is not waiting on multiple I/O operations concurrently, sync is simpler and correct. Do not add async complexity for a single sequential pipeline.
Key rule: Stay fully sync or fully async within a call path.
asyncio patterns:
asyncio.gather(*tasks) for concurrent I/O -- use return_exceptions=True for partial failure toleranceasyncio.TaskGroup (3.11+) for structured concurrency -- automatic cancellation of sibling tasks on failure; prefer over gather when all tasks must succeedasyncio.Semaphore(n) to limit concurrency (rate limiting external APIs)asyncio.wait_for(coro, timeout=N) for timeoutsasyncio.Queue for producer-consumerasyncio.Lock when coroutines share mutable stateasyncio.to_thread(sync_fn) for sync libs, aiohttp/httpx.AsyncClient for HTTPCancelledError -- always re-raise after cleanupasync for) for streaming/paginationmultiprocessing for CPU-bound:
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as pool:
results = list(pool.map(cpu_task, items))
See fastapi.md for project structure, lifespan, config, DI, async DB, and repository pattern.
/jobs/{id} for status@app.task(bind=True, max_retries=3, autoretry_for=(ConnectionError,)) -- exponential backoff: raise self.retry(countdown=2**self.request.retries * 60)chain(a.s(), b.s()) for sequential, group(...) for parallel, chord(group, callback) for fan-out/fan-inRetries with tenacity:
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type
@retry(
retry=retry_if_exception_type((ConnectionError, TimeoutError)),
stop=stop_after_attempt(5) | stop_after_delay(60),
wait=wait_exponential_jitter(initial=1, max=30),
before_sleep=log_retry_attempt,
)
def call_api(url: str) -> dict: ...
@fail_safe(default=[]) decorator for non-critical paths -- return cached/default on failurefunctools.lru_cache(maxsize=N) for pure-function memoization; functools.cache (unbounded) for small domains@traced @with_timeout(30) @retry(...) -- separate infra from business logicConnection pooling is mandatory for production: reuse httpx.AsyncClient() across requests, configure SQLAlchemy pool_size/max_overflow, use aiohttp.TCPConnector(limit=N).
BaseSettings model with model_validator to parse and validate all environment variables at startup. If invalid, crash before serving traffic. Never discover a missing secret on the first request that needs it./health (shallow liveness -- returns 200 if the process responds) and /ready (deep readiness -- verifies database, Redis, and critical dependencies are reachable). Load balancers route traffic based on /ready; orchestrators restart based on /health.JSONRenderer, TimeStamper, merge_contextvarsX-Correlation-ID header), bind to contextvars, propagate to downstream callsLogRecord attributes from a Formatter. A custom logging.Formatter.format() that rewrites record.name (or any record attribute) in place leaks to every other handler attached to the same logger and to pytest caplog. Logger.callHandlers passes the same LogRecord object to each handler — whichever formats first wins the mutation, and downstream handlers and test filters see the modified state. Tests filtering by full logger name (if r.name == "src.services.foo") then silently miss; routing handlers doing LOGGER_TO_MODEL.get(record.name) fall through to defaults. Use a logging.Filter that adds a non-mutating attribute (record.short_name) and reference it in the format string as %(short_name)s, or override formatMessage instead of format. try/finally restore works for synchronous handler chains but is fragile under async handlers that interleave.uv run pytest --cov --cov-report=html)--lf (last failed), -x (stop on first failure), -k "pattern" (filter), --pdb (debugger on failure)conftest.py for shared fixtures. Scope wisely: @pytest.fixture(scope="session") for expensive setup (DB connections), scope="function" (default) for test isolationtmp_path: built-in fixture for temp files -- no manual cleanup needed@pytest.mark.parametrize("input,expected", [...], ids=["empty", "single", "overflow"]) for readable test namesautospec=True on mocks to catch API drift. assert_awaited_once() for async mocks.pyproject.toml under [tool.pytest.ini_options] with markers = ["slow", "integration"]. Run fast tests with -m "not slow".class Renderable(Protocol) for structural typing at service boundaries -- enables testing with plain objects instead of mocks@contextmanager for connection/transaction lifecycle. Always implement __exit__ cleanup.ValueError, TypeError, KeyError, not bare Exceptionraise ServiceError("upload failed") from e -- always chain to preserve debug trailBatchResult(succeeded={}, failed={}) -- don't let one item abort the batchBaseModel with field_validator for complex input validationia-postgresql skill for the full pattern)--autogenerate as a starting point, always review generated SQL before committingBaseModel request/response schemas and FastAPI response_model before writing endpoint logic. The schema is the contract -- implementation follows. Generate OpenAPI docs from these models automatically.response_model and model_config = ConfigDict(extra="forbid") to control exactly what's serialized -- never return raw dicts or ORM objects from endpoints.field: str | None = None) rather than changing or removing existing ones. Removing a Pydantic field from a response model breaks callers silently. Deprecate first (Field(deprecated=True)), remove in a later version.{"error": {"code": "...", "message": "...", "details": ...}}. Register @app.exception_handler for RequestValidationError, HTTPException, and application-specific exceptions to normalize into one format. Callers build error handling once.uv run pytest passes with zero failuresuv run ruff check . passes with zero warningsuv run ty check . passes with zero errorsuv run pytest --cov)