#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import json import os import re import sys import time import urllib.error import urllib.request from urllib.parse import urlparse try: from docx import Document from bs4 import BeautifulSoup except ImportError as e: print(json.dumps({"status": "error", "error": f"缺少依赖库: {str(e)}"}, ensure_ascii=False)) sys.exit(1) # API单次查询内容长度上限(字符数) MAX_CONTENT_LENGTH = 3000 # 内容最大限制(字符数),超过此值直接提示并中断 MAX_TOTAL_LENGTH = 10000 # 违禁词检测接口:须由部署方配置,仓库内不嵌入第三方域名。 # 值为完整 HTTPS URL(含路径),POST JSON,契约见 check_sensitive_words() ENV_SENSITIVE_WORD_API_URL = "DY_SENSITIVE_WORD_API_URL" # ============================================================ # HTTPS(标准库,校验服务端证书) # ============================================================ def _post_json_with_retries(url, body_dict, timeout=30, max_retries=2): """ 使用 urllib 发送 HTTPS POST(application/json),默认校验 TLS 证书。 5xx、超时、连接类错误自动重试(与此前逻辑一致,最多 max_retries 次补试)。 """ data = json.dumps(body_dict, ensure_ascii=False).encode("utf-8") for attempt in range(max_retries + 1): req = urllib.request.Request(url, data=data, method="POST") req.add_header("Content-Type", "application/json; charset=utf-8") req.add_header("Accept", "application/json, */*") req.add_header("User-Agent", "dy-prohibited-word-check/1.0") try: with urllib.request.urlopen(req, timeout=timeout) as resp: status_code = resp.getcode() resp_body = resp.read().decode("utf-8", errors="replace") if status_code >= 500 and attempt < max_retries: time.sleep(1 * (attempt + 1)) continue if status_code >= 400: raise Exception(f"HTTP请求失败: {status_code}, {resp_body[:500]}") return resp_body except urllib.error.HTTPError as e: body = "" try: body = e.read().decode("utf-8", errors="replace") except Exception: pass if e.code >= 500 and attempt < max_retries: time.sleep(1 * (attempt + 1)) continue raise Exception(f"HTTP请求失败: {e.code}, {body[:500]}") except urllib.error.URLError as e: if attempt < max_retries: time.sleep(1 * (attempt + 1)) continue reason = getattr(e, "reason", None) detail = str(reason) if reason is not None else str(e) raise Exception(f"网络异常: {detail},已重试{max_retries}次仍失败") except TimeoutError: if attempt < max_retries: time.sleep(1 * (attempt + 1)) continue raise Exception(f"连接超时,已重试{max_retries}次仍失败") except OSError as e: if attempt < max_retries: time.sleep(1 * (attempt + 1)) continue raise Exception(f"网络异常: {str(e)},已重试{max_retries}次仍失败") raise RuntimeError("请求逻辑异常:重试循环未返回也未抛出") def _resolve_sensitive_word_api_url(cli_api_url=None): """命令行优先,其次环境变量;返回 strip 后的 URL 或空字符串。""" if cli_api_url and str(cli_api_url).strip(): return str(cli_api_url).strip() return (os.environ.get(ENV_SENSITIVE_WORD_API_URL) or "").strip() # ============================================================ # 文本提取 # ============================================================ def extract_from_file(file_path): """从文件中提取文本(支持DOC、DOCX、TXT等文本类型文件)""" if not os.path.exists(file_path): raise Exception(f"文件不存在: {file_path}") file_ext = os.path.splitext(file_path)[1].lower() if file_ext == '.pdf': raise Exception("不支持PDF文件,请上传图片、TXT等文本类型文件") elif file_ext in ['.doc', '.docx']: doc = Document(file_path) text = "" for paragraph in doc.paragraphs: text += paragraph.text + "\n" return text.strip() elif file_ext == '.txt': with open(file_path, 'r', encoding='utf-8', errors='replace') as f: return f.read().strip() elif file_ext in ['.csv', '.md', '.log', '.json', '.xml', '.html', '.htm']: with open(file_path, 'r', encoding='utf-8', errors='replace') as f: return f.read().strip() else: raise Exception(f"不支持的文件类型: {file_ext},仅支持图片、TXT、DOC、DOCX等文本类型文件") def extract_from_web(url): """ 从网页中提取文本。优先使用Playwright无头浏览器渲染JS后提取(支持SPA页面), 若Playwright不可用则回退到socket+ssl方式。 """ if not url.startswith(('http://', 'https://')): url = 'https://' + url # 优先尝试Playwright(支持SPA/JS动态渲染页面) try: from playwright.sync_api import sync_playwright with sync_playwright() as p: browser = p.chromium.launch(headless=True) page = browser.new_page() page.goto(url, timeout=30000) page.wait_for_timeout(3000) # 等待JS渲染 # 优先提取文章正文区域(常见选择器) article_selectors = [ 'article', '.article-content', '.article-body', '.article-detail', '.post-content', '.post-body', '.content-body', '.entry-content', '.rich_media_content', '#js_content', '.detail-content', '.article-content', '.news-content', '.text-content', ] text = None for selector in article_selectors: try: el = page.query_selector(selector) if el: extracted = el.inner_text().strip() if len(extracted) > 100: # 正文区域通常较长 text = extracted break except Exception: continue # 无匹配正文区域时回退到body if not text: text = page.inner_text('body') browser.close() return text.strip() except Exception: pass # Playwright不可用,回退到 urllib # 回退:urllib 抓取静态 HTML(校验 TLS 证书),仅能提取非 SPA 页面文本 try: req = urllib.request.Request( url, headers={ "User-Agent": "Mozilla/5.0 (compatible; dy-prohibited-word-extract/1.0)", "Accept": "text/html,*/*", }, method="GET", ) with urllib.request.urlopen(req, timeout=30) as resp: code = resp.getcode() if code >= 400: raise Exception(f"网页请求失败: HTTP {code}") body = resp.read().decode("utf-8", errors="replace") soup = BeautifulSoup(body, 'html.parser') for tag in soup(["script", "style", "noscript"]): tag.decompose() text = soup.get_text(separator='\n', strip=True) return text.strip() except urllib.error.HTTPError as e: raise Exception(f"网页请求失败: HTTP {e.code}") except urllib.error.URLError as e: reason = getattr(e, "reason", None) detail = str(reason) if reason is not None else str(e) raise Exception(f"网页内容提取失败: {detail}") except Exception as e: raise Exception(f"网页内容提取失败: {str(e)}") # ============================================================ # 违禁词检测 # ============================================================ def check_sensitive_words(content, api_url=None): """ 调用违禁词检测 HTTP API:POST application/json,响应 JSON 契约与原逻辑一致。 Args: content: 待检测的文案内容 api_url: 可选,完整 HTTPS URL(含路径);默认读取环境变量 DY_SENSITIVE_WORD_API_URL Returns: dict: 包含检测结果和格式化HTML的字典 """ # 内容长度校验 if len(content) > MAX_CONTENT_LENGTH: return { "status": "error", "platform": "抖音", "original_content": content[:200] + "...", "error": f"文案内容过长({len(content)}字符),单次上限为{MAX_CONTENT_LENGTH}字符,请缩减后重试" } resolved_url = _resolve_sensitive_word_api_url(cli_api_url=api_url) if not resolved_url: return { "status": "error", "platform": "抖音", "original_content": content, "error": ( "未配置违禁词检测接口地址:请设置环境变量 DY_SENSITIVE_WORD_API_URL 为 HTTPS 完整 URL(含路径)," "或调用脚本时传入 --api-url=..." ), } parsed = urlparse(resolved_url) if parsed.scheme.lower() != "https": return { "status": "error", "platform": "抖音", "original_content": content, "error": "违禁词检测接口必须使用 HTTPS(请检查 DY_SENSITIVE_WORD_API_URL 或 --api-url)", } body_params = { "content": content, "platform": "抖音", "source": "抖音违禁词查询-SkillHub", } try: resp_body = _post_json_with_retries(resolved_url, body_params, timeout=30, max_retries=2) # 解析响应 resp = json.loads(resp_body) # API返回格式: {"code": 2000, "data": {...}, "msg": "成功"} api_code = resp.get("code", 0) if api_code != 2000: raise Exception(f"API业务错误: code={api_code}, msg={resp.get('msg', '未知')}") api_data = resp.get("data") or {} # 从API返回的content中提取违禁词列表 # API格式: content中用标记违禁词 api_content = api_data.get("content") or "" original_content = api_data.get("originalContent") or content prohibited_words_type = api_data.get("prohibitedWordsType") or [] # 提取违禁词文本(兼容banned-word/sensitive-word/industry-banned-word三种类名,去重) sensitive_words = list(dict.fromkeys( re.findall(r'(.*?)', api_content) )) # 将所有违禁词标签样式统一替换为color:red样式 html_content = re.sub( r'', '', api_content ) # 过滤英文误匹配:API会将英文单词内部子串误标为违禁词(如"Glasswing"中的"ass") # 从原文提取所有英文单词,若违禁词是某个英文单词的子串则视为误匹配 english_words = re.findall(r'[A-Za-z]+', original_content) false_positive_words = set() for ew in english_words: for sw in sensitive_words: if sw.isascii() and sw.isalpha() and sw.lower() in ew.lower() and sw.lower() != ew.lower(): false_positive_words.add(sw) # 从sensitive_words中移除误匹配项 sensitive_words = [w for w in sensitive_words if w not in false_positive_words] # 从html_content中移除误匹配的span标签,还原为纯文本 for fpw in false_positive_words: escaped = re.escape(fpw) html_content = re.sub( rf'{escaped}', fpw, html_content ) result = { "status": "success", "platform": "抖音", "original_content": original_content, "sensitive_words": sensitive_words, "prohibited_words_type": prohibited_words_type, "word_count": len(sensitive_words), "html_content": html_content } return result except json.JSONDecodeError as e: return { "status": "error", "platform": "抖音", "original_content": content, "error": f"响应解析失败: {str(e)}" } except Exception as e: return { "status": "error", "platform": "抖音", "original_content": content, "error": f"处理失败: {str(e)}" } def main(): parser = argparse.ArgumentParser(description="抖音违禁词检测工具") group = parser.add_mutually_exclusive_group(required=True) group.add_argument("--content", help="直接传入文案文本") group.add_argument("--file", help="文件路径(支持TXT、DOC、DOCX等文本类型文件)") group.add_argument("--url", help="网页地址") parser.add_argument("--extract-only", action="store_true", help="仅提取文本不检测,返回提取的文本内容和长度") parser.add_argument( "--api-url", default=None, help=f"违禁词检测 API 完整 HTTPS URL(含路径),覆盖环境变量 {ENV_SENSITIVE_WORD_API_URL}", ) args = parser.parse_args() # 获取文本内容 try: if args.content: text = args.content elif args.file: text = extract_from_file(args.file) elif args.url: text = extract_from_web(args.url) else: print(json.dumps({"status": "error", "error": "请指定输入方式:--content、--file 或 --url"}, ensure_ascii=False)) return except Exception as e: print(json.dumps({"status": "error", "error": f"文本提取失败: {str(e)}"}, ensure_ascii=False)) return if not text: print(json.dumps({"status": "error", "error": "未提取到文本内容"}, ensure_ascii=False)) return # 仅提取模式:返回文本内容和长度,不调用检测API if args.extract_only: print(json.dumps({"status": "extracted", "content": text, "length": len(text)}, ensure_ascii=False)) return # 调用违禁词检测(接口地址由部署方配置,避免在仓库中嵌入第三方域名) result = check_sensitive_words(text, api_url=args.api_url) print(json.dumps(result, ensure_ascii=False)) if __name__ == "__main__": main()