#!/usr/bin/env python3 # -*- coding: utf-8 -*- """公众号阅读增长率排行榜获取脚本""" import argparse import sys from datetime import datetime, timedelta from coze_workload_identity import requests API_URL = "https://onetotenvip.com/story/cozeSkill/getGzhCozeSkillDataRaise" API_HEADERS = { "Content-Type": "application/json", "N-Token": "2f9f88dbb743423dbf0a8db2977c49eb" } def parse_date(date_str: str) -> str: """解析日期参数,支持yesterday/today/YYYY-MM-DD""" date_str = date_str.strip().lower() if date_str == "yesterday": return (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") if date_str == "today": return datetime.now().strftime("%Y-%m-%d") try: dt = datetime.strptime(date_str, "%Y-%m-%d") today = datetime.now() if dt < today - timedelta(days=30): raise Exception(f"日期不能早于30天前") if dt > today: raise Exception("日期不能晚于今天") return date_str except ValueError: raise Exception("日期格式错误,请使用 YYYY-MM-DD 格式") def fetch_data(rank_date: str, source: str) -> dict: """调用API获取榜单数据""" try: resp = requests.get(API_URL, params={"rankDate": rank_date, "source": source}, headers=API_HEADERS, timeout=30) resp.raise_for_status() return resp.json() except Exception as e: raise Exception(f"API调用失败: {e}") def safe_str(val) -> str: return "-" if val is None or val == "" else str(val) def parse_int(val) -> int: if val is None or val == "": return 0 try: return int(val) except (ValueError, TypeError): return 0 def format_count(val) -> str: """格式化计数:10000~999999显示为Xw+""" num = parse_int(val) if num == 0 and (val is None or val == ""): return "-" if num >= 1000000: return str(num) if num >= 10000: return f"{num // 10000}w+" return str(num) def make_link(title: str, url: str) -> str: """生成Markdown链接""" t, u = safe_str(title), safe_str(url) return f'[{t}]({u})' if t != "-" and u != "-" else t def calc_score_v2(items: list) -> list: """计算综合评分指数(8-10分)- 横向对比版本 维度1:总互动量(转发+在看+点赞)→ 权重40% 维度2:加权互动值(转发*5 + 在看*3 + 点赞*2)→ 权重60% 使用min-max归一化映射到8-10分 """ if not items: return [] # 提取所有文章的互动数据 data_points = [] for item in items: max_w = item.get("maxWork") or {} share = parse_int(max_w.get("shareCount")) watch = parse_int(max_w.get("watchCount")) like = parse_int(max_w.get("likeCount")) total_interactions = share + watch + like # 维度1 weighted_value = share * 5 + watch * 3 + like * 2 # 维度2 data_points.append({ "item": item, "total_interactions": total_interactions, "weighted_value": weighted_value }) # 计算min-max total_min = min(d["total_interactions"] for d in data_points) total_max = max(d["total_interactions"] for d in data_points) weighted_min = min(d["weighted_value"] for d in data_points) weighted_max = max(d["weighted_value"] for d in data_points) results = [] for dp in data_points: # 维度1归一化(避免除零) if total_max == total_min: norm_total = 0.5 else: norm_total = (dp["total_interactions"] - total_min) / (total_max - total_min) # 维度2归一化 if weighted_max == weighted_min: norm_weighted = 0.5 else: norm_weighted = (dp["weighted_value"] - weighted_min) / (weighted_max - weighted_min) # 综合得分 = 维度1*0.4 + 维度2*0.6 combined = norm_total * 0.4 + norm_weighted * 0.6 # 映射到8-10分 score = 8 + combined * 2 results.append((dp["item"], score)) return results def render_table(data_list: list) -> str: """渲染Markdown表格""" lines = [ "数据说明:筛选冷门账号中阅读数10w+的爆款文章,以及综合评分指数。\n", "| 序号 | 作者 | 最高阅读数文章 | 在看数 | 点赞数 | 转发数 | 阅读数 | 发布时间 | 综合评分指数 |", "| :---: | :---: | :--- | :---: | :---: | :---: | :---: | :---: | :---: |" ] # 使用min-max归一化计算综合评分指数 scored_data = calc_score_v2(data_list) # 按综合评分指数降序排序 scored_data.sort(key=lambda x: x[1], reverse=True) for rank_num, (acc, score) in enumerate(scored_data, 1): max_w = acc.get("maxWork") or {} user = safe_str(acc.get("userName")) lines.append( f"| {rank_num} | {user} | {make_link(max_w.get('title'), max_w.get('oriUrl'))} | " f"{format_count(max_w.get('watchCount'))} | {format_count(max_w.get('likeCount'))} | " f"{format_count(max_w.get('shareCount'))} | {safe_str(max_w.get('clicksCount'))} | " f"{safe_str(max_w.get('publicTime'))} | {score:.2f} |" ) return "\n".join(lines) def main(): parser = argparse.ArgumentParser(description="获取公众号阅读增长率排行榜") parser.add_argument("--rankDate", required=True, help="榜单日期") parser.add_argument("--source", required=True, help="数据源") args = parser.parse_args() try: rank_date = parse_date(args.rankDate) result = fetch_data(rank_date, args.source) if result.get("code") != 2000: raise Exception(f"API错误: {result.get('message', '未知错误')}") data = result.get("data", []) # 昨日无数据自动查前天 if not data and args.rankDate.strip().lower() == "yesterday": day_before = (datetime.now() - timedelta(days=2)).strftime("%Y-%m-%d") result = fetch_data(day_before, args.source) if result.get("code") == 2000 and result.get("data"): data = result["data"] rank_date = day_before if not data: print(f"\n榜单日期: {rank_date} | 暂无数据\n") return print(f"\n榜单日期: {rank_date} | 榜单数量: {len(data)} 个账号\n") print(render_table(data)) print("\n数据获取完成") except Exception as e: print(f"错误: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()