#!/usr/bin/env python3 """ stock-feed: A股社媒短讯研究工具 ========================================== 从小红书、抖音、公众号三大平台搜索A股相关短讯, 默认传入17个A股核心关键词,默认查询近7天数据。 Usage: python stock_feed.py python stock_feed.py --days 30 python stock_feed.py --keyword "半导体,芯片" python stock_feed.py --output-format html """ from __future__ import annotations import argparse import json import os import sys import time from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any from urllib.parse import quote # 强制 stdout/stderr UTF-8 编码(兼容所有平台) for _stream in (sys.stdout, sys.stderr): if hasattr(_stream, "reconfigure"): try: _stream.reconfigure(encoding="utf-8", errors="replace") except Exception: pass # ─── 常量 ────────────────────────────────────────────────────────────────────────── API_BASE = "https://redfox.hk/story/api/multiPlatform/workSearch" PLATFORMS = { "xhs": { "label": "小红书", "result_key": "xhsResult", }, "dy": { "label": "抖音", "result_key": "dyResult", }, "gzh": { "label": "公众号", "result_key": "gzhResult", }, } DEFAULT_COUNT = 50 DEFAULT_DAYS = 7 SOURCE_LABEL = "A股社媒资讯-GitHub" # A股默认关键词(17个) DEFAULT_KEYWORDS = ( "A股,A股市场,A股大盘,A股分析,股票,涨停,涨跌," "潜力股,A股龙头,A股复盘,选股,加仓,调仓,补仓," "拆股,仓位管理,A股行情" ) # ─── API Key ──────────────────────────────────────────────────────────────────────── class InsufficientCreditsError(Exception): """API 积分不足错误""" pass def get_api_key(cli_key: str | None = None) -> str: """按优先级获取 API Key: 命令行 > 环境变量 > 配置文件""" if cli_key: return cli_key # 环境变量 for env_name in ("REDFOX_API_KEY", "X_API_KEY"): val = os.environ.get(env_name, "").strip() if val: return val # 配置文件 config_path = os.path.expanduser("~/.qoder/apis/redfox.json") if os.path.isfile(config_path): try: with open(config_path) as f: cfg = json.load(f) val = (cfg.get("api_key") or "").strip() if val: return val except Exception: pass return "" # ─── 数量解析 ─────────────────────────────────────────────────────────────────────── def parse_count(value: Any) -> int: """解析数量字段,支持 '1.2w'、'5000+' 等中文格式""" if value is None: return 0 if isinstance(value, (int, float)): return int(value) text = str(value).replace("+", "").replace(",", "").strip() if not text: return 0 try: if "w" in text.lower(): return int(float(text.lower().replace("w", "")) * 10000) if text.endswith("万"): return int(float(text[:-1]) * 10000) if text.endswith("亿"): return int(float(text[:-1]) * 100000000) return int(float(text)) except (TypeError, ValueError): return 0 def fuzzy_count(value: Any) -> str: """模糊化互动数,5000以下保留原始值""" num = parse_count(value) if num <= 0: return "--" if num < 5000: return str(num) if num < 10000: return "5000+" wan = num // 10000 return f"{wan}w+" # ─── HTTP 请求 ────────────────────────────────────────────────────────────────────── def _http_post(url: str, payload: dict, api_key: str, max_retries: int = 3) -> dict: """带重试的 HTTP POST 请求""" import urllib.request import urllib.error headers = { "Content-Type": "application/json", "X-API-KEY": api_key, "User-Agent": "stock-feed/1.0", } body = json.dumps(payload, ensure_ascii=False).encode("utf-8") last_error = None for attempt in range(max_retries): try: req = urllib.request.Request(url, data=body, headers=headers, method="POST") with urllib.request.urlopen(req, timeout=30) as resp: raw = resp.read().decode("utf-8") result = json.loads(raw) code = result.get("code") if code == 3108: time.sleep(5 * (attempt + 1)) continue if code == 3201: raise InsufficientCreditsError(result.get("msg", "积分不足")) if code not in (200, 2000): raise Exception(f"API 错误 code={code}: {result.get('msg', '未知')}") return result except urllib.error.HTTPError as e: last_error = f"HTTP {e.code}" if attempt < max_retries - 1: time.sleep(2 ** attempt) except urllib.error.URLError as e: last_error = f"网络错误: {e.reason}" if attempt < max_retries - 1: time.sleep(2 ** attempt) except Exception as e: last_error = str(e) if attempt < max_retries - 1: time.sleep(2 ** attempt) raise Exception(f"请求失败: {last_error}(已尝试 {max_retries} 次)") def _first_of(art: dict, *keys: str, default: Any = None) -> Any: """从文章字典中按优先级取第一个非空值""" for k in keys: v = art.get(k) if v is not None and v != "" and v != 0: return v return default def _normalize_article(art: dict, platform: str, idx: int) -> dict: """将不同平台的数据归一化为统一格式""" if platform == "xhs": return _normalize_xhs(art, idx) elif platform == "dy": return _normalize_dy(art, idx) elif platform == "gzh": return _normalize_gzh(art, idx) return art def _normalize_xhs(art: dict, idx: int) -> dict: """归一化小红书数据""" note_id = str(_first_of(art, "workId", "id", "noteId", "workUuid", "uuid", default="")) author_id = str(_first_of(art, "accountUserid", "authorId", "accountId", default="")) title_raw = _first_of(art, "workTitle", "title", "displayTitle", default="") desc_raw = _first_of(art, "workDesc", "desc", "displayDesc", "summary", default="") title = (title_raw or desc_raw or "无标题")[:200] desc = (desc_raw or "")[:500] note_link = _first_of(art, "workUrl", "shareInfoLink", "url", default="") if not note_link and note_id: xsec_token = art.get("xsecToken", "") if xsec_token: note_link = f"https://www.xiaohongshu.com/explore/{note_id}?xsec_token={xsec_token}" else: note_link = f"https://www.xiaohongshu.com/explore/{note_id}" author_link = f"https://www.xiaohongshu.com/user/profile/{author_id}" if author_id else "" author_name = _first_of(art, "accountNickname", "authorNickname", "author", "accountName", "nickname", default="未知") pub_time = _first_of(art, "workPublishTime", "createTime", "publishTime", "time", default="") if isinstance(pub_time, (int, float)) and pub_time > 1000000000000: from datetime import datetime as _dt try: pub_time = _dt.fromtimestamp(pub_time / 1000.0).strftime("%Y-%m-%d %H:%M:%S") except (OSError, ValueError): pub_time = str(pub_time) cover = _first_of(art, "coverUrl", "cover", default="") account_type = _first_of(art, "accountType", default="") work_type = _first_of(art, "workType", "noteType", default="") return { "id": f"XHS{idx}", "platform": "小红书", "platform_key": "xhs", "title": title, "desc": desc, "url": note_link, "author": author_name, "author_id": author_id, "author_link": author_link, "author_fans": fuzzy_count(_first_of(art, "authorFans", "followerCount", default=0)), "published_at": str(pub_time), "engagement": { "likes": parse_count(_first_of(art, "workLikedCount", "likedCount", "likeCount", default=0)), "comments": parse_count(_first_of(art, "workCommentsCount", "commentsCount", "commentCount", default=0)), "collects": parse_count(_first_of(art, "workCollectedCount", "collectedCount", "collectCount", default=0)), "shares": parse_count(_first_of(art, "workSharedCount", "sharedCount", "shareCount", default=0)), "interactions": parse_count(_first_of(art, "interactiveCount", default=0)), }, "engagement_display": _engagement_display(art, "xhs"), "cover": cover, "scores": _extract_scores(art), "account_type": account_type, "work_type": work_type, } def _normalize_dy(art: dict, idx: int) -> dict: """归一化抖音数据""" work_url = _first_of(art, "workUrl", "url", default="") title_raw = _first_of(art, "title", "desc", default="") desc_raw = _first_of(art, "desc", "summary", default="") title = (title_raw or "无标题")[:200] desc = (desc_raw or "")[:500] author_name = _first_of(art, "accountName", "author", "authorNickname", default="未知") author_id = str(_first_of(art, "accountId", "authorId", default="")) pub_time = _first_of(art, "publishTime", "createTime", default="") cover = _first_of(art, "cover", "coverUrl", default="") return { "id": f"DY{idx}", "platform": "抖音", "platform_key": "dy", "title": title, "desc": desc, "url": work_url, "author": author_name, "author_id": author_id, "author_link": f"https://www.douyin.com/user/{author_id}" if author_id else "", "author_fans": fuzzy_count(_first_of(art, "followerCount", "authorFans", default=0)), "published_at": str(pub_time), "engagement": { "likes": parse_count(_first_of(art, "likeCount", "likedCount", default=0)), "comments": parse_count(_first_of(art, "commentCount", "commentsCount", default=0)), "collects": parse_count(_first_of(art, "collectCount", "collectedCount", default=0)), "shares": parse_count(_first_of(art, "shareCount", "sharedCount", default=0)), }, "engagement_display": _engagement_display(art, "dy"), "cover": cover, "scores": _extract_scores(art), } def _normalize_gzh(art: dict, idx: int) -> dict: """归一化公众号数据""" url = _first_of(art, "url", "workUrl", default="") title = (art.get("title") or "无标题")[:200] summary = _first_of(art, "summary", "desc", default="") author_name = _first_of(art, "author", "accountName", default="-") author_id = str(_first_of(art, "accountId", "authorId", default="")) pub_time = _first_of(art, "publicTime", "publishTime", "createTime", default="") cover = _first_of(art, "imageUrl", "coverUrl", "cover", default="") return { "id": f"GZH{idx}", "platform": "公众号", "platform_key": "gzh", "title": title, "desc": (summary or "")[:500], "url": url, "author": author_name, "author_id": author_id, "author_link": "", "author_fans": fuzzy_count(_first_of(art, "followerCount", "authorFans", default=0)), "published_at": str(pub_time), "engagement": { "reads": parse_count(_first_of(art, "clicksCount", "readCount", default=0)), "likes": parse_count(_first_of(art, "likeCount", "likedCount", default=0)), "watches": parse_count(_first_of(art, "watchCount", default=0)), "collects": parse_count(_first_of(art, "collectCount", "collectedCount", default=0)), "shares": parse_count(_first_of(art, "shareCount", "sharedCount", default=0)), "comments": parse_count(_first_of(art, "commentsCount", "commentCount", default=0)), }, "engagement_display": _engagement_display(art, "gzh"), "cover": cover, "scores": _extract_scores(art), } def _engagement_display(art: dict, platform: str) -> str: """生成可读的互动数据字符串""" if platform == "xhs": likes = fuzzy_count(_first_of(art, "workLikedCount", "likedCount", "likeCount", default=0)) comments = fuzzy_count(_first_of(art, "workCommentsCount", "commentsCount", "commentCount", default=0)) collects = fuzzy_count(_first_of(art, "workCollectedCount", "collectedCount", "collectCount", default=0)) interactions = fuzzy_count(_first_of(art, "interactiveCount", default=0)) return f"🔥{interactions}互动 👍{likes} ⭐{collects} 💬{comments}" elif platform == "dy": likes = fuzzy_count(_first_of(art, "workLikedCount", "likeCount", "likedCount", default=0)) comments = fuzzy_count(_first_of(art, "workCommentsCount", "commentCount", "commentsCount", default=0)) shares = fuzzy_count(_first_of(art, "workSharedCount", "shareCount", "sharedCount", default=0)) collects = fuzzy_count(_first_of(art, "workCollectedCount", "collectCount", "collectedCount", default=0)) return f"👍{likes} 💬{comments} ⭐{collects} 🔄{shares}" elif platform == "gzh": reads = fuzzy_count(_first_of(art, "clicksCount", "readCount", default=0)) likes = fuzzy_count(_first_of(art, "likeCount", "likedCount", default=0)) watches = fuzzy_count(_first_of(art, "watchCount", default=0)) comments = fuzzy_count(_first_of(art, "commentsCount", "commentCount", default=0)) shares = fuzzy_count(_first_of(art, "shareCount", "sharedCount", default=0)) return f"📖{reads} 👍{likes} 👁{watches} 💬{comments} 🔄{shares}" return "" def _extract_scores(art: dict) -> dict: """提取评分字段""" return { "total": art.get("totalScore", 0), "relevance": art.get("relevanceScore", 0), "popularity": art.get("popularityScore", 0), "recency": art.get("recencyScore", 0), } # ─── 主搜索函数 ───────────────────────────────────────────────────────────────────── def search( keyword: str, platforms: list[str] | None = None, count: int = DEFAULT_COUNT, api_key: str | None = None, days: int = DEFAULT_DAYS, ) -> dict: """通过统一接口搜索多平台A股话题数据""" if not platforms: platforms = list(PLATFORMS.keys()) key = get_api_key(api_key) if not key: sys.stderr.write("\u274c 未找到 API Key,请先配置:\n") sys.stderr.write(" export REDFOX_API_KEY=ak_你的密钥\n") sys.stderr.write(" 或使用 --api-key 参数传入\n") sys.stderr.write(" 注册地址: https://www.redfox.hk/login\n") sys.stderr.flush() sys.exit(1) # 构建统一请求参数 today = datetime.now() start_date = (today - timedelta(days=days)).strftime("%Y-%m-%d") end_date = today.strftime("%Y-%m-%d") payload = { "keyword": keyword, "source": SOURCE_LABEL, "startDate": start_date, "endDate": end_date, } sys.stderr.write(f"[\u2699\ufe0f] 搜索中: {keyword} ...\n") sys.stderr.flush() results = {} credit_error = False try: result = _http_post(API_BASE, payload, key) data = result.get("data") or {} for p in platforms: if p not in PLATFORMS: results[p] = {"platform": p, "label": p, "items": [], "total": 0, "error": "未知平台"} continue plat = PLATFORMS[p] label = plat["label"] result_key = plat["result_key"] articles = data.get(result_key, []) if isinstance(articles, dict): articles = articles.get("articles", []) if not isinstance(articles, list): articles = [] # 去重并归一化 all_articles = [] seen_ids = set() for art in articles: uid = ( art.get("workUuid") or art.get("uuid") or art.get("id") or art.get("noteId") or "" ) if uid and uid in seen_ids: continue if uid: seen_ids.add(uid) item = _normalize_article(art, p, len(all_articles) + 1) all_articles.append(item) if len(all_articles) >= count: break sys.stderr.write(f"[{label}] 获取 {len(all_articles)} 条\n") sys.stderr.flush() results[p] = { "platform": p, "label": label, "items": all_articles[:count], "total": len(all_articles[:count]), } except InsufficientCreditsError as e: sys.stderr.write(f"⚠️ {e}\n") sys.stderr.write(f"请配置个人 API Key: export REDFOX_API_KEY=你的密钥\n") sys.stderr.write(f"注册地址: https://www.redfox.hk/login\n") sys.stderr.flush() credit_error = True except Exception as e: sys.stderr.write(f"请求失败: {e}\n") sys.stderr.flush() # 为未处理的平台填充空结果 for p in platforms: if p not in results: results[p] = { "platform": p, "label": PLATFORMS[p]["label"], "items": [], "total": 0, } if credit_error: results[p]["error"] = "积分不足,请配置个人 API Key" # 汇总统计 total_items = sum(r["total"] for r in results.values()) today_utc = datetime.now(timezone.utc) return { "keyword": keyword, "searched_at": today_utc.isoformat(), "date_range": { "from": (today_utc - timedelta(days=days)).strftime("%Y-%m-%d"), "to": today_utc.strftime("%Y-%m-%d"), }, "platforms": results, "total_items": total_items, } # ─── JSON 输出 ────────────────────────────────────────────────────────────────────── def format_as_json(data: dict, max_items: int = 50) -> dict: """精简 JSON 格式(供 AI 智能体分析使用)""" output = { "keyword": data["keyword"], "searched_at": data["searched_at"], "date_range": data["date_range"], "total_items": data["total_items"], "platforms": {}, } for pkey, pdata in data["platforms"].items(): items = [] for item in pdata.get("items", [])[:max_items]: items.append({ "id": item["id"], "platform": item["platform"], "title": item["title"], "author": item["author"], "author_fans": item["author_fans"], "published_at": item["published_at"], "engagement_display": item["engagement_display"], "engagement": item["engagement"], "url": item["url"], "desc": item["desc"][:200], "scores": item.get("scores", {}), }) output["platforms"][pkey] = { "label": pdata["label"], "total": pdata["total"], "items": items, } if pdata.get("error"): output["platforms"][pkey]["error"] = pdata["error"] return output # ─── HTML 报告 ────────────────────────────────────────────────────────────────────── def _md_to_html(text: str) -> str: """简易 Markdown → HTML 转换(无第三方依赖)""" import re lines = text.split("\n") out = [] in_list = False for line in lines: stripped = line.strip() if stripped.startswith("### "): if in_list: out.append(""); in_list = False out.append(f'

{stripped[4:]}

') elif stripped.startswith("## "): if in_list: out.append(""); in_list = False out.append(f'

{stripped[2:]}

') elif stripped.startswith("# "): if in_list: out.append(""); in_list = False out.append(f'

{stripped[2:]}

') elif stripped == "---": if in_list: out.append(""); in_list = False out.append('
') elif stripped.startswith("- "): if not in_list: out.append('"); in_list = False out.append('') else: if in_list: out.append(""); in_list = False out.append(f'

{_md_inline(stripped)}

') if in_list: out.append("") return "\n".join(out) def _md_inline(text: str) -> str: """行内 Markdown 转换:粗体、链接""" import re text = text.replace("&", "&").replace("<", "<").replace(">", ">") text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', text) text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) return text def format_as_html(data: dict, max_items: int = 50, report_html: str = "") -> str: """生成网站风格 HTML 报告""" keyword = data["keyword"] total = data["total_items"] date_range = data["date_range"] platform_meta = { "xhs": {"primary": "#ff2442", "bg": "#fff1f0", "icon": "📕", "name": "小红书"}, "dy": {"primary": "#161823", "bg": "#f5f5f5", "icon": "🎵", "name": "抖音"}, "gzh": {"primary": "#07c160", "bg": "#f0fff4", "icon": "📖", "name": "公众号"}, } stats_html = "" for pkey, pdata in data["platforms"].items(): meta = platform_meta.get(pkey, platform_meta["xhs"]) ptotal = pdata["total"] m_primary = meta["primary"] m_bg = meta["bg"] m_icon = meta["icon"] m_name = meta["name"] total_likes = sum(it.get("engagement", {}).get("likes", 0) for it in pdata.get("items", [])[:max_items]) total_reads = sum( it.get("engagement", {}).get("reads", 0) + it.get("engagement", {}).get("likes", 0) + it.get("engagement", {}).get("collects", 0) + it.get("engagement", {}).get("shares", 0) + it.get("engagement", {}).get("comments", 0) for it in pdata.get("items", [])[:max_items] ) stats_html += ( f'\n
\n' f'
{m_icon}
\n' f'
\n' f'
{m_name}
\n' f'
{ptotal}
\n' f'
\n' f'
' ) tabs_html = "" panels_html = "" for pkey, pdata in data["platforms"].items(): meta = platform_meta.get(pkey, platform_meta["xhs"]) label = pdata["label"] ptotal = pdata["total"] m_primary = meta["primary"] m_icon = meta["icon"] is_first = pkey == list(data["platforms"].keys())[0] active = " active" if is_first else "" tabs_html += ( '\n' ) display = "block" if is_first else "none" items = pdata.get("items", [])[:max_items] error_html = "" if pdata.get("error"): error_html = f'
⚠️ {pdata["error"]}
' cards = "" for idx, item in enumerate(items): title_escaped = item["title"].replace("&", "&").replace("<", "<").replace(">", ">").replace('"', """) desc_escaped = item["desc"][:200].replace("&", "&").replace("<", "<").replace(">", ">") if item.get("desc") else "" author_escaped = item["author"].replace("&", "&").replace("<", "<").replace(">", ">") item_url = item.get("url", "") url_attr = 'href="' + item_url + '"' if item_url else 'href="#"' author_link = item.get("author_link", "") author_html = ('' + author_escaped + '') if author_link else ('' + author_escaped + '') # 互动数据标签 — 按平台展示对应字段,始终显示(包括0值),使用fuzzy_count格式化 eng = item.get("engagement", {}) eng_tags = "" if pkey == "gzh": reads = fuzzy_count(eng.get("reads", 0)) likes = fuzzy_count(eng.get("likes", 0)) shares = fuzzy_count(eng.get("shares", 0)) eng_tags = ('📖 ' + reads + '' + '👍 ' + likes + '' + '🔄 ' + shares + '') elif pkey == "xhs": likes = fuzzy_count(eng.get("likes", 0)) collects = fuzzy_count(eng.get("collects", 0)) comments = fuzzy_count(eng.get("comments", 0)) eng_tags = ('👍 ' + likes + '' + '⭐ ' + collects + '' + '💬 ' + comments + '') elif pkey == "dy": likes = fuzzy_count(eng.get("likes", 0)) comments = fuzzy_count(eng.get("comments", 0)) shares = fuzzy_count(eng.get("shares", 0)) eng_tags = ('👍 ' + likes + '' + '💬 ' + comments + '' + '🔄 ' + shares + '') author_fans = item.get("author_fans", "--") pub_date = item.get("published_at", "")[:10] if item.get("published_at") else "--" desc_html = ('

' + desc_escaped + '

') if desc_escaped else '' rank_num = idx + 1 # 公众号没有粉丝数,不展示粉丝字段 if pkey == "gzh": fans_html = '' else: fans_html = ( ' ·\n' ' ' + str(author_fans) + '粉\n' ) cards += ( '\n
\n' '
' + str(rank_num) + '
\n' '
\n' ' ' + title_escaped + '\n' '
\n' ' ' + author_html + '\n' + fans_html + ' ·\n' ' ' + pub_date + '\n' '
\n' ' ' + desc_html + '\n' ' \n' '
\n' '
' ) no_data_html = "

未查询到相关内容,建议更换关键词重试。

" if not items else "" # 小红书风控提示 xhs_notice_html = "" if pkey == "xhs": xhs_notice_html = ( '
' '⚠️ 受小红书风控规则限制,部分作品链接可能无法正常跳转,' '您可复制对应作品标题前往小红书搜索查看,感谢理解🙇\u200d♀️🙇\u200d♀️' '
' ) panels_html += ( '\n
\n' ' ' + error_html + '\n' ' ' + xhs_notice_html + '\n' '
\n' ' ' + cards + '\n' '
\n' ' ' + no_data_html + '\n' '
' ) report_section = "" if report_html: report_section = f'''

📝 研究报告

{report_html}
''' html = f''' A股社媒短讯 · {keyword}

A股社媒短讯研究

{keyword}
{date_range["from"]} ~ {date_range["to"]}
{stats_html}
📊
合计
{total}
{report_section}
{tabs_html}
{panels_html}
''' return html # ─── CLI ───────────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="stock-feed: A股社媒短讯研究工具" ) parser.add_argument( "keyword", nargs="?", default=None, help="搜索关键词(默认使用A股17个核心关键词,--from-json 模式下可省略)" ) parser.add_argument( "--platforms", "-p", default="xhs,dy,gzh", help="平台列表,逗号分隔(默认: xhs,dy,gzh)" ) parser.add_argument( "--count", "-n", type=int, default=DEFAULT_COUNT, help=f"每个平台获取条数(默认: {DEFAULT_COUNT})" ) parser.add_argument( "--days", "-d", type=int, default=DEFAULT_DAYS, help=f"搜索时间范围,最近多少天(默认: {DEFAULT_DAYS},最大: 30)" ) parser.add_argument( "--output-format", "-f", choices=["json", "html", "both"], default="json", help="输出格式(默认: json,综合报告后再按需生成HTML)" ) parser.add_argument( "--output-dir", default=str(Path.home() / "Downloads" / "StockFeed"), help="HTML 输出目录" ) parser.add_argument( "--api-key", default=None, help="API Key(覆盖环境变量和配置文件)" ) parser.add_argument( "--max-items", type=int, default=50, help="输出中最多展示条数(默认: 50)" ) parser.add_argument( "--from-json", default=None, help="从已有 JSON 文件生成 HTML,不调用 API(值: JSON 文件路径)" ) parser.add_argument( "--report-file", default=None, help="研究报告 Markdown 文件路径(嵌入到 HTML 报告顶部)" ) parser.add_argument( "--debug", action="store_true", help="调试模式,打印原始 API 响应" ) args = parser.parse_args() # ── 从 JSON 生成 HTML 模式 ── if args.from_json: json_path = Path(args.from_json) if not json_path.exists(): sys.stderr.write(f"错误: JSON 文件不存在: {json_path}\n") sys.exit(1) raw = json.loads(json_path.read_text(encoding="utf-8")) data = { "keyword": raw.get("keyword", ""), "total_items": raw.get("total_items", 0), "date_range": raw.get("date_range", {}), "platforms": {}, } for pkey, pdata in raw.get("platforms", {}).items(): items = [] for it in pdata.get("items", []): item = dict(it) item["source"] = pkey items.append(item) data["platforms"][pkey] = { "label": pdata.get("label", pkey), "total": pdata.get("total", len(items)), "items": items, } report_html = "" if args.report_file: report_path = Path(args.report_file) if report_path.exists(): report_md = report_path.read_text(encoding="utf-8") report_html = _md_to_html(report_md) else: sys.stderr.write(f"⚠️ 报告文件不存在: {report_path},跳过\n") html_content = format_as_html(data, max_items=args.max_items, report_html=report_html) output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) base_name = json_path.stem html_file = output_dir / f"{base_name}.html" html_file.write_text(html_content, encoding="utf-8") sys.stderr.write(f"✅ HTML 已保存: {html_file}\n") sys.stdout.write(json.dumps({"files": {"html": str(html_file)}}, ensure_ascii=False) + "\n") sys.stdout.flush() return # ── 正常搜索模式 ── platforms = [] for p in args.platforms.split(","): p = p.strip().lower() if p in PLATFORMS: platforms.append(p) else: sys.stderr.write(f"未知平台: {p},可用: {', '.join(PLATFORMS.keys())}\n") if not platforms: sys.stderr.write("错误: 未指定有效平台\n") sys.exit(1) # 使用默认关键词或用户自定义关键词 keyword = args.keyword.strip() if args.keyword else DEFAULT_KEYWORDS sys.stderr.write(f"\n{'='*60}\n") sys.stderr.write(f"📈 A股社媒短讯 · 搜索: {keyword}\n") sys.stderr.write(f"平台: {', '.join(PLATFORMS[p]['label'] for p in platforms)}\n") sys.stderr.write(f"每平台: {args.count} 条 | 时间: 近{args.days}天\n") sys.stderr.write(f"{'='*60}\n\n") sys.stderr.flush() try: data = search( keyword=keyword, platforms=platforms, count=args.count, api_key=args.api_key, days=args.days, ) except Exception as e: sys.stderr.write(f"\n❌ 搜索失败: {e}\n") sys.exit(1) output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) keyword_safe = keyword.replace('"', '').replace(' ', '_')[:30] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") base_name = f"stock_feed_{keyword_safe}_{timestamp}" json_file = None if args.output_format in ("json", "both"): json_data = format_as_json(data, max_items=args.max_items) json_file = output_dir / f"{base_name}.json" json_file.write_text( json.dumps(json_data, ensure_ascii=False, indent=2), encoding="utf-8", ) sys.stderr.write(f"\n✅ JSON 已保存: {json_file}\n") html_file = None if args.output_format in ("html", "both"): html_content = format_as_html(data, max_items=args.max_items) html_file = output_dir / f"{base_name}.html" html_file.write_text(html_content, encoding="utf-8") sys.stderr.write(f"✅ HTML 已保存: {html_file}\n") sys.stderr.write(f"\n{'='*60}\n") sys.stderr.write(f"搜索完成!共 {data['total_items']} 条结果\n") for pkey, pdata in data["platforms"].items(): status = f"✅ {pdata['total']} 条" if not pdata.get("error") else f"❌ {pdata['error']}" sys.stderr.write(f" {pdata['label']}: {status}\n") sys.stderr.write(f"{'='*60}\n") sys.stderr.flush() # stdout 输出精简摘要(关键词截断避免终端截断) keyword_display = keyword if len(keyword) <= 40 else keyword[:40] + f"...(共{len(keyword.split(','))}词)" summary = { "keyword": keyword_display, "date_range": data["date_range"], "total_items": data["total_items"], "platforms": {p: v["total"] for p, v in data["platforms"].items()}, "files": {}, } if json_file: summary["files"]["json"] = str(json_file) if html_file: summary["files"]["html"] = str(html_file) sys.stdout.write(json.dumps(summary, ensure_ascii=False) + "\n") sys.stdout.flush() if __name__ == "__main__": main()