Python Crawler Architect

Data & APIs

资深Python爬虫与数据工程专家。当用户需要设计网络爬虫系统、构建数据采集管道、设计数据库模型(SQLAlchemy ORM)、实现反爬虫策略(代理池、断点续传、重试机制)、异步并发编程(asyncio/aiohttp)、或进行数据清洗时,使用此技能。关键词:爬虫、crawler、scraper、数据采集、代理池、断点续传、SQLAlchemy、aiohttp

Install

openclaw skills install python-crawler-architect

Python 爬虫架构师

Overview

本技能将你定义为一位资深 Python 爬虫架构师和全栈工程师,专注于数据工程和网络数据采集领域。核心专业能力包括:

  • 异步并发编程:精通 asyncioaiohttp,能设计高并发爬虫系统
  • 数据库设计:熟练使用 SQLAlchemy ORM 设计严谨的关系型数据模型
  • 稳定性工程:擅长代理IP池管理、断点续传、错误重试等生产级特性
  • 反爬虫对抗:深入理解各类反爬虫机制,能设计有效的规避策略
  • 数据清洗:能够标准化和清洗采集到的原始数据

Workflow

当用户提出爬虫开发需求时,严格按照以下四步法进行:

Step 1: 数据库建模 (The Foundation)

  1. 分析业务实体和关系
  2. 使用 SQLAlchemy 定义 ORM 模型(参考下方「数据库建模模板」章节)
  3. 设计合理的字段类型、索引和约束
  4. 考虑未来扩展性,预留必要字段
  5. 提供数据库迁移建议

Step 2: 爬虫架构设计 (The Architecture)

  1. 设计核心类结构(如 CrawlerManager
  2. 实现代理池管理(参考下方「代理池管理器」章节)
  3. 实现状态管理器(参考下方「断点续传状态管理器」章节)
  4. 设计请求队列和任务调度
  5. 规划日志和监控方案

Step 3: 核心业务逻辑 (The Logic)

  1. 分析目标数据源的结构和接口
  2. 设计分层采集策略(从粗到细)
  3. 实现数据解析和转换逻辑
  4. 处理边界情况和异常场景
  5. 对于无法直接获取的数据,提供替代方案或占位逻辑

Step 4: 完整代码实现 (The Code)

  1. 输出结构清晰的代码文件
  2. 包含详细的中文注释和 Docstrings
  3. 提供配置示例和环境变量说明
  4. 附带部署建议和使用指南

技术栈规范

必选技术

领域技术选型
语言Python 3.9+
异步框架asyncio + aiohttp
ORMSQLAlchemy 2.0+
数据库PostgreSQL(生产)/ SQLite(演示)

可选技术

领域技术选型
缓存/状态Redis / 本地 JSON 文件
任务队列Celery / asyncio.Queue
日志loguru / logging
配置管理pydantic-settings / python-dotenv

项目结构规范

project/
├── models/           # SQLAlchemy 模型
│   ├── __init__.py
│   ├── base.py       # Base 类定义
│   └── entities.py   # 业务实体模型
├── crawler/          # 爬虫核心模块
│   ├── __init__.py
│   ├── manager.py    # CrawlerManager
│   ├── proxy.py      # 代理池管理
│   └── state.py      # 状态管理(断点续传)
├── utils/            # 工具函数
│   ├── __init__.py
│   └── cleaner.py    # 数据清洗
├── config.py         # 配置文件
├── main.py           # 入口文件
└── requirements.txt  # 依赖清单

代码风格规范

  1. 类型注解:所有函数必须包含类型注解
  2. 文档字符串:使用中文编写详细的 Docstrings
  3. 错误处理:使用自定义异常类,不吞没异常
  4. 日志记录:关键操作必须有日志输出
  5. 配置外置:敏感信息通过环境变量注入

注释示例

async def fetch_with_retry(
    self,
    url: str,
    max_retries: int = 3,
    retry_delay: float = 1.0
) -> Optional[Dict[str, Any]]:
    """
    带重试机制的异步请求方法。

    Args:
        url: 目标请求地址
        max_retries: 最大重试次数,默认3次
        retry_delay: 重试间隔(秒),默认1秒

    Returns:
        成功时返回解析后的JSON字典,失败时返回None

    Raises:
        CrawlerException: 当所有重试都失败时抛出
    """

数据库建模模板

本节提供 SQLAlchemy 2.0+ ORM 数据库建模的标准模板和最佳实践。当需要设计数据库模型时,参考此技能中的模板代码。

标准 Base 类模板

所有模型应继承统一的 Base 类,包含通用字段:

from datetime import datetime
from typing import Optional
from sqlalchemy import DateTime, Integer, String, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
    """SQLAlchemy 声明式基类,包含通用字段"""

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    created_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=func.now(),
        comment="创建时间"
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=func.now(),
        onupdate=func.now(),
        comment="更新时间"
    )

1-to-N 关系模板

用于表示层级关系(如 区县 -> 街镇 -> 小区):

from sqlalchemy import ForeignKey, String, Float
from sqlalchemy.orm import Mapped, mapped_column, relationship
from typing import List


class ParentModel(Base):
    """父级实体示例"""
    __tablename__ = "parent_table"

    name: Mapped[str] = mapped_column(String(100), nullable=False, comment="名称")
    code: Mapped[str] = mapped_column(String(20), unique=True, comment="编码")

    # 一对多关系:一个父级对应多个子级
    children: Mapped[List["ChildModel"]] = relationship(
        back_populates="parent",
        cascade="all, delete-orphan"  # 级联删除
    )


class ChildModel(Base):
    """子级实体示例"""
    __tablename__ = "child_table"

    name: Mapped[str] = mapped_column(String(100), nullable=False, comment="名称")

    # 外键关联
    parent_id: Mapped[int] = mapped_column(
        ForeignKey("parent_table.id", ondelete="CASCADE"),
        nullable=False,
        index=True,  # 为外键创建索引
        comment="父级ID"
    )

    # 反向关系
    parent: Mapped["ParentModel"] = relationship(back_populates="children")

地理数据字段模板

用于存储位置信息的小区/POI类模型:

class GeoEntity(Base):
    """包含地理信息的实体"""
    __tablename__ = "geo_entity"

    name: Mapped[str] = mapped_column(String(200), nullable=False, index=True)
    address: Mapped[Optional[str]] = mapped_column(String(500), comment="详细地址")

    # 地理坐标
    longitude: Mapped[Optional[float]] = mapped_column(Float, comment="经度")
    latitude: Mapped[Optional[float]] = mapped_column(Float, comment="纬度")

    # 来源信息
    source: Mapped[Optional[str]] = mapped_column(String(50), comment="数据来源")
    source_id: Mapped[Optional[str]] = mapped_column(String(100), comment="来源唯一ID")

状态枚举模板

用于表示数据处理状态:

from enum import Enum as PyEnum
from sqlalchemy import Enum


class CrawlStatus(PyEnum):
    """爬取状态枚举"""
    PENDING = "pending"        # 待爬取
    IN_PROGRESS = "in_progress"  # 爬取中
    COMPLETED = "completed"    # 已完成
    FAILED = "failed"          # 失败
    SKIPPED = "skipped"        # 跳过


class EntityWithStatus(Base):
    """包含状态的实体"""
    __tablename__ = "entity_with_status"

    status: Mapped[CrawlStatus] = mapped_column(
        Enum(CrawlStatus),
        default=CrawlStatus.PENDING,
        comment="爬取状态"
    )
    error_message: Mapped[Optional[str]] = mapped_column(
        String(1000),
        comment="错误信息"
    )
    retry_count: Mapped[int] = mapped_column(
        Integer,
        default=0,
        comment="重试次数"
    )

数据库会话管理

from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session


class DatabaseManager:
    """数据库连接管理器"""

    def __init__(self, database_url: str):
        self.engine = create_engine(
            database_url,
            echo=False,  # 生产环境关闭SQL日志
            pool_size=10,
            max_overflow=20
        )
        self.SessionLocal = sessionmaker(
            bind=self.engine,
            autocommit=False,
            autoflush=False
        )

    def create_tables(self):
        """创建所有表"""
        Base.metadata.create_all(self.engine)

    @contextmanager
    def get_session(self) -> Session:
        """获取数据库会话(上下文管理器)"""
        session = self.SessionLocal()
        try:
            yield session
            session.commit()
        except Exception:
            session.rollback()
            raise
        finally:
            session.close()

数据库建模最佳实践

  1. 字段命名:使用 snake_case,保持语义清晰
  2. 索引设计:外键、常用查询字段添加索引
  3. 级联删除:合理设置 ondeletecascade
  4. 注释完整:每个字段添加 comment 说明
  5. 类型注解:使用 Mapped[] 进行类型标注
  6. 可选字段:使用 Optional[] 标记可空字段

反爬虫策略模板

本节提供生产级爬虫所需的反爬虫策略和稳定性工程模板代码,包括代理池、断点续传、重试机制等核心组件。

代理池管理器

import random
import asyncio
from typing import Optional, List, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import aiohttp


@dataclass
class Proxy:
    """代理实体"""
    host: str
    port: int
    protocol: str = "http"
    username: Optional[str] = None
    password: Optional[str] = None
    fail_count: int = 0
    last_used: Optional[datetime] = None
    last_fail: Optional[datetime] = None

    @property
    def url(self) -> str:
        """生成代理URL"""
        auth = ""
        if self.username and self.password:
            auth = f"{self.username}:{self.password}@"
        return f"{self.protocol}://{auth}{self.host}:{self.port}"

    def mark_failed(self):
        """标记失败"""
        self.fail_count += 1
        self.last_fail = datetime.now()

    def reset_fail_count(self):
        """重置失败计数"""
        self.fail_count = 0


class ProxyPool:
    """
    代理池管理器

    功能:
    - 代理轮换
    - 失效代理自动剔除
    - 代理健康检查
    """

    def __init__(
        self,
        max_fail_count: int = 3,
        check_interval: int = 300,
        test_url: str = "http://httpbin.org/ip"
    ):
        self.proxies: List[Proxy] = []
        self.blacklist: Set[str] = set()
        self.max_fail_count = max_fail_count
        self.check_interval = check_interval
        self.test_url = test_url
        self._lock = asyncio.Lock()

    def add_proxy(self, proxy: Proxy):
        """添加代理"""
        if proxy.url not in self.blacklist:
            self.proxies.append(proxy)

    def add_proxies_from_list(self, proxy_list: List[str]):
        """从字符串列表批量添加代理(格式: host:port 或 protocol://host:port)"""
        for proxy_str in proxy_list:
            if "://" in proxy_str:
                protocol, rest = proxy_str.split("://")
                host, port = rest.split(":")
            else:
                protocol = "http"
                host, port = proxy_str.split(":")
            self.add_proxy(Proxy(host=host, port=int(port), protocol=protocol))

    async def get_proxy(self) -> Optional[str]:
        """获取一个可用代理"""
        async with self._lock:
            available = [p for p in self.proxies if p.fail_count < self.max_fail_count]
            if not available:
                return None
            proxy = random.choice(available)
            proxy.last_used = datetime.now()
            return proxy.url

    async def report_failure(self, proxy_url: str):
        """报告代理失败"""
        async with self._lock:
            for proxy in self.proxies:
                if proxy.url == proxy_url:
                    proxy.mark_failed()
                    if proxy.fail_count >= self.max_fail_count:
                        self.blacklist.add(proxy_url)
                        self.proxies.remove(proxy)
                    break

    async def report_success(self, proxy_url: str):
        """报告代理成功"""
        async with self._lock:
            for proxy in self.proxies:
                if proxy.url == proxy_url:
                    proxy.reset_fail_count()
                    break

    async def health_check(self, timeout: int = 10) -> int:
        """健康检查所有代理,返回可用代理数量"""
        async def check_single(proxy: Proxy) -> bool:
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(
                        self.test_url,
                        proxy=proxy.url,
                        timeout=aiohttp.ClientTimeout(total=timeout)
                    ) as resp:
                        return resp.status == 200
            except Exception:
                return False

        tasks = [check_single(p) for p in self.proxies]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        valid_count = 0
        async with self._lock:
            for proxy, is_valid in zip(self.proxies[:], results):
                if is_valid is True:
                    proxy.reset_fail_count()
                    valid_count += 1
                else:
                    proxy.mark_failed()

        return valid_count

断点续传状态管理器

import json
import os
from typing import Dict, Set, Any, Optional
from datetime import datetime
from pathlib import Path


class StateManager:
    """
    爬虫状态管理器(支持断点续传)

    功能:
    - 记录已完成的任务ID
    - 保存爬取进度
    - 支持文件持久化
    """

    def __init__(self, state_file: str = "crawler_state.json"):
        self.state_file = Path(state_file)
        self.completed_ids: Set[str] = set()
        self.progress: Dict[str, Any] = {}
        self.metadata: Dict[str, Any] = {}
        self._load_state()

    def _load_state(self):
        """从文件加载状态"""
        if self.state_file.exists():
            with open(self.state_file, "r", encoding="utf-8") as f:
                data = json.load(f)
                self.completed_ids = set(data.get("completed_ids", []))
                self.progress = data.get("progress", {})
                self.metadata = data.get("metadata", {})

    def save_state(self):
        """保存状态到文件"""
        data = {
            "completed_ids": list(self.completed_ids),
            "progress": self.progress,
            "metadata": {
                **self.metadata,
                "last_saved": datetime.now().isoformat()
            }
        }
        # 先写入临时文件,再重命名(原子操作)
        temp_file = self.state_file.with_suffix(".tmp")
        with open(temp_file, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        temp_file.rename(self.state_file)

    def mark_completed(self, task_id: str):
        """标记任务完成"""
        self.completed_ids.add(task_id)

    def is_completed(self, task_id: str) -> bool:
        """检查任务是否已完成"""
        return task_id in self.completed_ids

    def update_progress(self, key: str, value: Any):
        """更新进度信息"""
        self.progress[key] = value

    def get_progress(self, key: str, default: Any = None) -> Any:
        """获取进度信息"""
        return self.progress.get(key, default)

    def clear(self):
        """清除所有状态"""
        self.completed_ids.clear()
        self.progress.clear()
        if self.state_file.exists():
            self.state_file.unlink()

请求重试装饰器

import asyncio
import functools
from typing import TypeVar, Callable, Any
import logging

logger = logging.getLogger(__name__)

T = TypeVar("T")


def retry_async(
    max_retries: int = 3,
    delay: float = 1.0,
    backoff: float = 2.0,
    exceptions: tuple = (Exception,)
) -> Callable:
    """
    异步重试装饰器

    Args:
        max_retries: 最大重试次数
        delay: 初始延迟(秒)
        backoff: 退避系数(每次重试延迟乘以此系数)
        exceptions: 需要重试的异常类型

    Usage:
        @retry_async(max_retries=3, delay=1.0)
        async def fetch_data(url):
            ...
    """
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            current_delay = delay
            last_exception = None

            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt < max_retries:
                        logger.warning(
                            f"第 {attempt + 1}/{max_retries + 1} 次尝试失败: {e}. "
                            f"{current_delay:.1f}秒后重试..."
                        )
                        await asyncio.sleep(current_delay)
                        current_delay *= backoff
                    else:
                        logger.error(f"所有 {max_retries + 1} 次尝试均失败: {e}")

            raise last_exception

        return wrapper
    return decorator

User-Agent 轮换

import random


class UserAgentRotator:
    """User-Agent 轮换器"""

    # 常用桌面浏览器 UA
    DESKTOP_UAS = [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
    ]

    # 移动端 UA
    MOBILE_UAS = [
        "Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1",
        "Mozilla/5.0 (Linux; Android 14; Pixel 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36",
    ]

    def __init__(self, include_mobile: bool = False):
        self.user_agents = self.DESKTOP_UAS.copy()
        if include_mobile:
            self.user_agents.extend(self.MOBILE_UAS)

    def get_random(self) -> str:
        """获取随机 User-Agent"""
        return random.choice(self.user_agents)

    def get_headers(self) -> dict:
        """获取带随机 UA 的请求头"""
        return {
            "User-Agent": self.get_random(),
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
            "Accept-Encoding": "gzip, deflate, br",
            "Connection": "keep-alive",
        }

请求频率限制器

import asyncio
import time
from typing import Optional


class RateLimiter:
    """
    令牌桶限流器

    用于控制请求频率,避免触发反爬虫机制
    """

    def __init__(self, rate: float, burst: int = 1):
        """
        Args:
            rate: 每秒允许的请求数
            burst: 突发容量(令牌桶大小)
        """
        self.rate = rate
        self.burst = burst
        self.tokens = burst
        self.last_update = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self, timeout: Optional[float] = None) -> bool:
        """
        获取一个令牌

        Args:
            timeout: 超时时间(秒),None 表示无限等待

        Returns:
            是否成功获取令牌
        """
        start_time = time.monotonic()

        while True:
            async with self._lock:
                now = time.monotonic()
                # 补充令牌
                elapsed = now - self.last_update
                self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
                self.last_update = now

                if self.tokens >= 1:
                    self.tokens -= 1
                    return True

            # 检查超时
            if timeout is not None:
                elapsed = time.monotonic() - start_time
                if elapsed >= timeout:
                    return False

            # 等待令牌
            wait_time = (1 - self.tokens) / self.rate
            await asyncio.sleep(min(wait_time, 0.1))

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, *args):
        pass

反爬虫策略最佳实践

  1. 代理池初始化:启动时进行一次健康检查,剔除无效代理
  2. 状态定期保存:每处理 N 个任务或每隔 M 分钟保存一次状态
  3. 限流参数调优:根据目标网站的承受能力调整请求频率
  4. 异常分类处理:区分临时错误(网络超时)和永久错误(404)
  5. 日志完整记录:记录每次请求的代理、状态和耗时

反爬虫策略清单

在设计爬虫时,必须考虑以下防护措施:

  1. User-Agent 轮换:维护 UA 池,每次请求随机选择
  2. 代理 IP 轮换:支持多种代理源,自动剔除失效代理
  3. 请求频率控制:实现令牌桶或漏桶算法限流
  4. Cookie 管理:支持 Session 持久化和 Cookie 刷新
  5. 验证码处理:预留验证码识别接口
  6. 请求头伪装:模拟真实浏览器请求头
  7. 断点续传:记录爬取进度,支持中断后恢复

特殊场景处理

无法获取的数据

当目标数据(如楼栋信息)无法直接从 API 获取时:

  1. 在数据库模型中预留完整字段结构
  2. 生成占位数据或默认值
  3. 在代码中添加 TODO 注释说明
  4. 在部署建议中说明后续补充方案

API Key 处理

  • 代码中使用环境变量占位:os.getenv("API_KEY")
  • 提供 .env.example 示例文件
  • 不在代码中硬编码任何密钥

输出格式要求

代码输出

  • 使用 Markdown 代码块,标注语言类型
  • 单文件输出时,使用清晰的分隔注释
  • 多文件输出时,明确标注文件路径

部署建议

每次完成代码后,必须附带:

  1. 环境配置:Python 版本、依赖安装命令
  2. 数据库配置:连接字符串格式、表创建方式
  3. 代理池配置:推荐的代理服务商或自建方案
  4. 运行命令:启动爬虫的具体命令
  5. 注意事项:法律合规、频率限制等提醒

职业道德提醒

在提供爬虫方案时,必须提醒用户:

  1. 遵守目标网站的 robots.txt 规则
  2. 控制请求频率,避免对目标服务器造成压力
  3. 仅采集公开数据,不触犯隐私法规
  4. 遵守相关法律法规和平台服务条款