#!/usr/bin/env python3 """ cn-last30days: 中国社媒平台话题研究工具 ========================================== 从小红书、抖音、公众号三大平台搜索过去30天内人们关于某话题的真实讨论。 Usage: python cn_last30days.py "AI视频工具" python cn_last30days.py "大模型" --output-format html python cn_last30days.py "小红书运营" --platforms xhs,gzh """ from __future__ import annotations import argparse import json import os import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any from urllib.parse import quote # Windows stdout UTF-8 if os.name == "nt": for stream in (sys.stdout, sys.stderr): if hasattr(stream, "reconfigure"): stream.reconfigure(encoding="utf-8", errors="replace") # ─── 常量 ────────────────────────────────────────────────────────────────────────── API_BASE = "https://redfox.hk/story/api" PLATFORMS = { "xhs": { "endpoint": "/xhs/crawl/work", "label": "小红书", "source": "多平台话题研究-xhs-ClawHub", "list_key": "articles", "requires_dates": True, }, "dy": { "endpoint": "/dy/search/search", "label": "抖音", "source": "多平台话题研究-dy-ClawHub", "list_key": "articles", "requires_dates": False, }, "gzh": { "endpoint": "/gzh/search/hotArticle", "label": "公众号", "source": "多平台话题研究-gzh-ClawHub", "list_key": "articles", "requires_dates": True, }, } DEFAULT_COUNT = 50 SOURCE_LABEL = "多平台话题研究-GitHub" # ─── API Key ──────────────────────────────────────────────────────────────────────── PUBLIC_API_KEY = "ak_db0e200c049b44288d46da0e758d53dd" class InsufficientCreditsError(Exception): """API 积分不足错误""" pass def get_api_key(cli_key: str | None = None) -> str: """按优先级获取 API Key: 命令行 > 内置公共Key > 环境变量""" if cli_key: return cli_key # 优先使用内置公共 Key(有免费额度) if PUBLIC_API_KEY: return PUBLIC_API_KEY # 其次从环境变量获取 for env_name in ("REDFOX_API_KEY", "X_API_KEY"): val = os.environ.get(env_name, "").strip() if val: return val # 无可用 Key,返回空字符串触发提示 return "" # ─── 数量解析 ─────────────────────────────────────────────────────────────────────── def parse_count(value: Any) -> int: """解析数量字段,支持 '1.2w'、'5000+' 等中文格式""" if value is None: return 0 if isinstance(value, (int, float)): return int(value) text = str(value).replace("+", "").replace(",", "").strip() if not text: return 0 try: if "w" in text.lower(): return int(float(text.lower().replace("w", "")) * 10000) if text.endswith("万"): return int(float(text[:-1]) * 10000) if text.endswith("亿"): return int(float(text[:-1]) * 100000000) return int(float(text)) except (TypeError, ValueError): return 0 def fuzzy_count(value: Any) -> str: """模糊化互动数,5000以下保留原始值""" num = parse_count(value) if num <= 0: return "--" if num < 5000: return str(num) if num < 10000: return "5000+" wan = num // 10000 return f"{wan}w+" # ─── HTTP 请求 ────────────────────────────────────────────────────────────────────── def _http_post(url: str, payload: dict, api_key: str, max_retries: int = 3) -> dict: """带重试的 HTTP POST 请求""" import urllib.request import urllib.error headers = { "Content-Type": "application/json", "X-API-KEY": api_key, "User-Agent": "cn-last30days/1.0", } body = json.dumps(payload, ensure_ascii=False).encode("utf-8") last_error = None for attempt in range(max_retries): try: req = urllib.request.Request(url, data=body, headers=headers, method="POST") with urllib.request.urlopen(req, timeout=30) as resp: raw = resp.read().decode("utf-8") result = json.loads(raw) code = result.get("code") if code == 3108: # 限频,等待重试 time.sleep(5 * (attempt + 1)) continue if code == 3201: # 积分不足,不可重试 raise InsufficientCreditsError(result.get("msg", "积分不足")) if code not in (200, 2000): raise Exception(f"API 错误 code={code}: {result.get('msg', '未知')}") return result except urllib.error.HTTPError as e: last_error = f"HTTP {e.code}" if attempt < max_retries - 1: time.sleep(2 ** attempt) except urllib.error.URLError as e: last_error = f"网络错误: {e.reason}" if attempt < max_retries - 1: time.sleep(2 ** attempt) except Exception as e: last_error = str(e) if attempt < max_retries - 1: time.sleep(2 ** attempt) raise Exception(f"请求失败: {last_error}(已尝试 {max_retries} 次)") # ─── 平台数据获取 ──────────────────────────────────────────────────────────────────── def _fetch_platform(platform_key: str, keyword: str, count: int, api_key: str, days: int = 30) -> dict: """获取单个平台的数据""" plat = PLATFORMS[platform_key] url = f"{API_BASE}{plat['endpoint']}" label = plat["label"] sys.stderr.write(f"[{label}] 搜索中...\n") sys.stderr.flush() # 构建请求参数 today = datetime.now() start_date = (today - timedelta(days=days)).strftime("%Y-%m-%d") end_date = today.strftime("%Y-%m-%d") payload = { "keyword": keyword, "source": SOURCE_LABEL, } # 需要日期的平台始终传 startDate/endDate if plat.get("requires_dates"): payload["startDate"] = start_date payload["endDate"] = end_date # 抖音可选传日期,传上更好过滤 else: payload["startDate"] = start_date payload["endDate"] = end_date # 小红书额外支持 sortType if platform_key == "xhs": payload["sortType"] = "_0" # 相关性排序 credit_error = False all_articles = [] seen_ids = set() try: result = _http_post(url, payload, api_key) data = result.get("data") or {} # 使用 list_key 提取列表数据 list_key = plat.get("list_key", "articles") articles = data.get(list_key, []) if isinstance(data, dict) else (data if isinstance(data, list) else []) # 去重并归一化 for art in articles: uid = ( art.get("workUuid") or art.get("uuid") or art.get("id") or art.get("noteId") or "" ) if uid and uid in seen_ids: continue if uid: seen_ids.add(uid) item = _normalize_article(art, platform_key, len(all_articles) + 1) all_articles.append(item) if len(all_articles) >= count: break except InsufficientCreditsError as e: sys.stderr.write(f"[{label}] ⚠️ {e}\n") sys.stderr.write(f"[{label}] 请配置个人 API Key: export REDFOX_API_KEY=你的密钥\n") sys.stderr.write(f"[{label}] 注册地址: https://www.redfox.hk/login\n") sys.stderr.flush() credit_error = True except Exception as e: sys.stderr.write(f"[{label}] 请求失败: {e}\n") sys.stderr.flush() sys.stderr.write(f"[{label}] 获取 {len(all_articles)} 条\n") sys.stderr.flush() result = { "platform": platform_key, "label": label, "items": all_articles[:count], "total": len(all_articles[:count]), } if credit_error: result["error"] = "积分不足,请配置个人 API Key" return result def _first_of(art: dict, *keys: str, default: Any = None) -> Any: """从文章字典中按优先级取第一个非空值""" for k in keys: v = art.get(k) if v is not None and v != "" and v != 0: return v return default def _normalize_article(art: dict, platform: str, idx: int) -> dict: """将不同平台的数据归一化为统一格式""" if platform == "xhs": return _normalize_xhs(art, idx) elif platform == "dy": return _normalize_dy(art, idx) elif platform == "gzh": return _normalize_gzh(art, idx) return art def _normalize_xhs(art: dict, idx: int) -> dict: """归一化小红书数据 - 兼容 xhsUser/searchArticle (work*前缀) 和 xhs/search/search 两种格式""" note_id = str(_first_of(art, "workId", "id", "noteId", "workUuid", "uuid", default="")) author_id = str(_first_of(art, "accountUserid", "authorId", "accountId", default="")) title_raw = _first_of(art, "workTitle", "title", "displayTitle", default="") desc_raw = _first_of(art, "workDesc", "desc", "displayDesc", "summary", default="") title = (title_raw or desc_raw or "无标题")[:200] desc = (desc_raw or "")[:500] # 链接 note_link = _first_of(art, "workUrl", "shareInfoLink", "url", default="") if not note_link and note_id: xsec_token = art.get("xsecToken", "") if xsec_token: note_link = f"https://www.xiaohongshu.com/explore/{note_id}?xsec_token={xsec_token}" else: note_link = f"https://www.xiaohongshu.com/explore/{note_id}" author_link = f"https://www.xiaohongshu.com/user/profile/{author_id}" if author_id else "" # 作者 author_name = _first_of(art, "accountNickname", "authorNickname", "author", "accountName", "nickname", default="未知") # 时间 pub_time = _first_of(art, "workPublishTime", "createTime", "publishTime", "time", default="") if isinstance(pub_time, (int, float)) and pub_time > 1000000000000: from datetime import datetime as _dt try: pub_time = _dt.fromtimestamp(pub_time / 1000.0).strftime("%Y-%m-%d %H:%M:%S") except (OSError, ValueError): pub_time = str(pub_time) # 封面 cover = _first_of(art, "coverUrl", "cover", default="") # 账号类型 account_type = _first_of(art, "accountType", default="") # 笔记类型 work_type = _first_of(art, "workType", "noteType", default="") return { "id": f"XHS{idx}", "platform": "小红书", "platform_key": "xhs", "title": title, "desc": desc, "url": note_link, "author": author_name, "author_id": author_id, "author_link": author_link, "author_fans": fuzzy_count(_first_of(art, "authorFans", "followerCount", default=0)), "published_at": str(pub_time), "engagement": { "likes": parse_count(_first_of(art, "workLikedCount", "likedCount", "likeCount", default=0)), "comments": parse_count(_first_of(art, "workCommentsCount", "commentsCount", "commentCount", default=0)), "collects": parse_count(_first_of(art, "workCollectedCount", "collectedCount", "collectCount", default=0)), "shares": parse_count(_first_of(art, "workSharedCount", "sharedCount", "shareCount", default=0)), "interactions": parse_count(_first_of(art, "interactiveCount", default=0)), }, "engagement_display": _engagement_display(art, "xhs"), "cover": cover, "scores": _extract_scores(art), "account_type": account_type, "work_type": work_type, } def _normalize_dy(art: dict, idx: int) -> dict: """归一化抖音数据 - 兼容 dyData/searchArticle 和 dy/search/search 两种格式""" work_url = _first_of(art, "workUrl", "url", default="") title_raw = _first_of(art, "title", "desc", default="") desc_raw = _first_of(art, "desc", "summary", default="") title = (title_raw or "无标题")[:200] desc = (desc_raw or "")[:500] author_name = _first_of(art, "accountName", "author", "authorNickname", default="未知") author_id = str(_first_of(art, "accountId", "authorId", default="")) pub_time = _first_of(art, "publishTime", "createTime", default="") cover = _first_of(art, "cover", "coverUrl", default="") return { "id": f"DY{idx}", "platform": "抖音", "platform_key": "dy", "title": title, "desc": desc, "url": work_url, "author": author_name, "author_id": author_id, "author_link": f"https://www.douyin.com/user/{author_id}" if author_id else "", "author_fans": fuzzy_count(_first_of(art, "followerCount", "authorFans", default=0)), "published_at": str(pub_time), "engagement": { "likes": parse_count(_first_of(art, "likeCount", "likedCount", default=0)), "comments": parse_count(_first_of(art, "commentCount", "commentsCount", default=0)), "collects": parse_count(_first_of(art, "collectCount", "collectedCount", default=0)), "shares": parse_count(_first_of(art, "shareCount", "sharedCount", default=0)), }, "engagement_display": _engagement_display(art, "dy"), "cover": cover, "scores": _extract_scores(art), } def _normalize_gzh(art: dict, idx: int) -> dict: """归一化公众号数据 - 适配 gzh/search/hotArticle 格式""" url = _first_of(art, "url", "workUrl", default="") title = (art.get("title") or "无标题")[:200] summary = _first_of(art, "summary", "desc", default="") author_name = _first_of(art, "author", "accountName", default="-") author_id = str(_first_of(art, "accountId", "authorId", default="")) pub_time = _first_of(art, "publicTime", "publishTime", "createTime", default="") cover = _first_of(art, "imageUrl", "coverUrl", "cover", default="") return { "id": f"GZH{idx}", "platform": "公众号", "platform_key": "gzh", "title": title, "desc": (summary or "")[:500], "url": url, "author": author_name, "author_id": author_id, "author_link": "", "author_fans": fuzzy_count(_first_of(art, "followerCount", "authorFans", default=0)), "published_at": str(pub_time), "engagement": { "reads": parse_count(_first_of(art, "clicksCount", "readCount", default=0)), "likes": parse_count(_first_of(art, "likeCount", "likedCount", default=0)), "watches": parse_count(_first_of(art, "watchCount", default=0)), "collects": parse_count(_first_of(art, "collectCount", "collectedCount", default=0)), "shares": parse_count(_first_of(art, "shareCount", "sharedCount", default=0)), "comments": parse_count(_first_of(art, "commentsCount", "commentCount", default=0)), }, "engagement_display": _engagement_display(art, "gzh"), "cover": cover, "scores": _extract_scores(art), } def _engagement_display(art: dict, platform: str) -> str: """生成可读的互动数据字符串""" if platform == "xhs": likes = fuzzy_count(_first_of(art, "workLikedCount", "likedCount", "likeCount", default=0)) comments = fuzzy_count(_first_of(art, "workCommentsCount", "commentsCount", "commentCount", default=0)) collects = fuzzy_count(_first_of(art, "workCollectedCount", "collectedCount", "collectCount", default=0)) interactions = fuzzy_count(_first_of(art, "interactiveCount", default=0)) return f"🔥{interactions}互动 👍{likes} ⭐{collects} 💬{comments}" elif platform == "dy": likes = fuzzy_count(_first_of(art, "workLikedCount", "likeCount", "likedCount", default=0)) comments = fuzzy_count(_first_of(art, "workCommentsCount", "commentCount", "commentsCount", default=0)) shares = fuzzy_count(_first_of(art, "workSharedCount", "shareCount", "sharedCount", default=0)) collects = fuzzy_count(_first_of(art, "workCollectedCount", "collectCount", "collectedCount", default=0)) return f"👍{likes} 💬{comments} ⭐{collects} 🔄{shares}" elif platform == "gzh": reads = fuzzy_count(_first_of(art, "clicksCount", "readCount", default=0)) likes = fuzzy_count(_first_of(art, "likeCount", "likedCount", default=0)) watches = fuzzy_count(_first_of(art, "watchCount", default=0)) comments = fuzzy_count(_first_of(art, "commentsCount", "commentCount", default=0)) shares = fuzzy_count(_first_of(art, "shareCount", "sharedCount", default=0)) return f"📖{reads} 👍{likes} 👁{watches} 💬{comments} 🔄{shares}" return "" def _extract_scores(art: dict) -> dict: """提取评分字段(如有关键词搜索评分)""" return { "total": art.get("totalScore", 0), "relevance": art.get("relevanceScore", 0), "popularity": art.get("popularityScore", 0), "recency": art.get("recencyScore", 0), } # ─── 主搜索函数 ───────────────────────────────────────────────────────────────────── def search( keyword: str, platforms: list[str] | None = None, count: int = DEFAULT_COUNT, api_key: str | None = None, days: int = 30, ) -> dict: """在多个平台上搜索话题数据""" if not platforms: platforms = list(PLATFORMS.keys()) key = get_api_key(api_key) results = {} # 并行获取各平台数据 with ThreadPoolExecutor(max_workers=3) as executor: futures = { executor.submit(_fetch_platform, p, keyword, count, key, days): p for p in platforms } for future in as_completed(futures): p = futures[future] try: results[p] = future.result() except Exception as e: results[p] = { "platform": p, "label": PLATFORMS[p]["label"], "items": [], "total": 0, "error": str(e), } # 汇总统计 total_items = sum(r["total"] for r in results.values()) today = datetime.now(timezone.utc) return { "keyword": keyword, "searched_at": today.isoformat(), "date_range": { "from": (today - timedelta(days=days)).strftime("%Y-%m-%d"), "to": today.strftime("%Y-%m-%d"), }, "platforms": results, "total_items": total_items, } # ─── JSON 输出 ────────────────────────────────────────────────────────────────────── def format_as_json(data: dict, max_items: int = 50) -> dict: """精简 JSON 格式(供 AI 智能体分析使用)""" output = { "keyword": data["keyword"], "searched_at": data["searched_at"], "date_range": data["date_range"], "total_items": data["total_items"], "platforms": {}, } for pkey, pdata in data["platforms"].items(): items = [] for item in pdata.get("items", [])[:max_items]: items.append({ "id": item["id"], "platform": item["platform"], "title": item["title"], "author": item["author"], "author_fans": item["author_fans"], "published_at": item["published_at"], "engagement_display": item["engagement_display"], "engagement": item["engagement"], "url": item["url"], "desc": item["desc"][:200], "scores": item.get("scores", {}), }) output["platforms"][pkey] = { "label": pdata["label"], "total": pdata["total"], "items": items, } if pdata.get("error"): output["platforms"][pkey]["error"] = pdata["error"] return output # ─── HTML 报告 ────────────────────────────────────────────────────────────────────── def _md_to_html(text: str) -> str: """简易 Markdown → HTML 转换(无第三方依赖)""" import re lines = text.split("\n") out = [] in_list = False for line in lines: stripped = line.strip() # 标题 if stripped.startswith("### "): if in_list: out.append(""); in_list = False out.append(f'
{_md_inline(stripped)}
') if in_list: out.append("") return "\n".join(out) def _md_inline(text: str) -> str: """行内 Markdown 转换:粗体、链接""" import re text = text.replace("&", "&").replace("<", "<").replace(">", ">") # [text](url) text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', text) # **bold** text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) return text def format_as_html(data: dict, max_items: int = 50, report_html: str = "") -> str: """生成网站风格 HTML 报告""" keyword = data["keyword"] total = data["total_items"] date_range = data["date_range"] # 平台配色和图标 platform_meta = { "xhs": {"primary": "#ff2442", "bg": "#fff1f0", "icon": "📕", "name": "小红书"}, "dy": {"primary": "#161823", "bg": "#f5f5f5", "icon": "🎵", "name": "抖音"}, "gzh": {"primary": "#07c160", "bg": "#f0fff4", "icon": "📖", "name": "公众号"}, } # 统计卡片 stats_html = "" for pkey, pdata in data["platforms"].items(): meta = platform_meta.get(pkey, platform_meta["xhs"]) ptotal = pdata["total"] # 统计总互动 total_likes = sum(it.get("engagement", {}).get("likes", 0) for it in pdata.get("items", [])[:max_items]) total_reads = sum( it.get("engagement", {}).get("reads", 0) + it.get("engagement", {}).get("likes", 0) + it.get("engagement", {}).get("collects", 0) + it.get("engagement", {}).get("shares", 0) + it.get("engagement", {}).get("comments", 0) for it in pdata.get("items", [])[:max_items] ) stats_html += f'''' + desc_escaped + '
' if desc_escaped else ''}未查询到相关内容,建议更换关键词重试。