#!/usr/bin/env python3
"""语义搜索:拉取指定文件夹的邮件(Header+正文),预处理后输出供智能体做语义分析"""
import argparse
import json
import os
import sys
import imaplib
import re
import ssl
from email.header import decode_header
from email.parser import BytesParser
from email.utils import parseaddr
IMAP_HOST = 'imap.qq.com'
IMAP_PORT = 993
SKILL_ID = '7637538402895773731'
CRED_NAME = 'qq_email'
ENV_EMAIL = 'QQ_EMAIL'
ENV_AUTH_CODE = 'QQ_EMAIL_AUTH_CODE'
LEGACY_ENV_EMAIL = f'COZE_{CRED_NAME}_QQ_EMAIL_{SKILL_ID}'
LEGACY_ENV_AUTH_CODE = f'COZE_{CRED_NAME}_QQ_EMAIL_AUTH_CODE_{SKILL_ID}'
def quote_folder(name):
"""为含空格的文件夹名包裹双引号"""
if not name:
return name
if name.startswith('"') and name.endswith('"'):
return name
if ' ' in name:
return f'"{name}"'
return name
MAX_FETCH = 100
BODY_PREVIEW_LEN = 500
def get_credentials():
email_addr = os.environ.get(ENV_EMAIL) or os.environ.get(LEGACY_ENV_EMAIL, '')
auth_code = os.environ.get(ENV_AUTH_CODE) or os.environ.get(LEGACY_ENV_AUTH_CODE, '')
if not email_addr or not auth_code:
return None, None
return email_addr, auth_code
def decode_str(s):
if s is None:
return ""
decoded_parts = decode_header(s)
result = []
for part, charset in decoded_parts:
if isinstance(part, bytes):
try:
result.append(part.decode(charset or 'utf-8', errors='replace'))
except (LookupError, UnicodeDecodeError):
result.append(part.decode('utf-8', errors='replace'))
else:
result.append(part)
return ''.join(result)
def strip_html(html):
"""去除HTML标签,提取纯文本"""
text = re.sub(r'', '', html, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'', '', text, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'<[^>]+>', ' ', text)
text = re.sub(r' ', ' ', text)
text = re.sub(r'&', '&', text)
text = re.sub(r'<', '<', text)
text = re.sub(r'>', '>', text)
text = re.sub(r'"', '"', text)
text = re.sub(r'\d+;', '', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def extract_body_text(msg, max_len=BODY_PREVIEW_LEN):
"""提取邮件正文纯文本,截断到max_len"""
body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get('Content-Disposition', ''))
if 'attachment' in content_disposition:
continue
if content_type == 'text/plain':
charset = part.get_content_charset() or 'utf-8'
try:
payload = part.get_payload(decode=True)
if payload:
body = payload.decode(charset, errors='replace')
break
except (LookupError, UnicodeDecodeError):
continue
elif content_type == 'text/html' and not body:
charset = part.get_content_charset() or 'utf-8'
try:
payload = part.get_payload(decode=True)
if payload:
body = strip_html(payload.decode(charset, errors='replace'))
except (LookupError, UnicodeDecodeError):
continue
else:
content_type = msg.get_content_type()
charset = msg.get_content_charset() or 'utf-8'
payload = msg.get_payload(decode=True)
if payload:
try:
text = payload.decode(charset, errors='replace')
if content_type == 'text/html':
body = strip_html(text)
else:
body = text
except (LookupError, UnicodeDecodeError):
body = ""
body = body.strip()
if len(body) > max_len:
body = body[:max_len] + "..."
return body
def parse_date(msg):
"""解析邮件日期"""
date_str = msg.get('Date', '')
if not date_str:
return ""
try:
from email.utils import parsedate_to_datetime
dt = parsedate_to_datetime(date_str)
return dt.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
return date_str
def has_attachments(msg):
"""判断邮件是否包含附件"""
if msg.is_multipart():
for part in msg.walk():
content_disposition = str(part.get('Content-Disposition', ''))
if 'attachment' in content_disposition:
return True
return False
def fetch_folder_emails(mail, folder, limit):
"""从指定文件夹拉取邮件,返回预处理后的列表"""
try:
status, _ = mail.select(quote_folder(folder), readonly=True)
if status != 'OK':
return [], 0
status, data = mail.search(None, 'ALL')
if status != 'OK' or not data[0]:
return [], 0
mail_ids = data[0].split()
total = len(mail_ids)
# 取最新的 limit 封(倒序)
mail_ids = mail_ids[-limit:] if limit > 0 else mail_ids
mail_ids = list(reversed(mail_ids))
results = []
parser = BytesParser()
for mid in mail_ids:
try:
status, msg_data = mail.fetch(mid, '(BODY.PEEK[])')
if status != 'OK' or not msg_data or not msg_data[0]:
continue
raw = None
for item in msg_data:
if isinstance(item, tuple):
raw = item[1]
break
if not raw:
continue
msg = parser.parsebytes(raw)
message_id = msg.get('Message-ID', '') or ''
message_id = message_id.strip()
results.append({
"mail_id": int(mid),
"folder": folder,
"subject": decode_str(msg.get('Subject', '')),
"from": decode_str(msg.get('From', '')),
"to": decode_str(msg.get('To', '')),
"date": parse_date(msg),
"body_preview": extract_body_text(msg),
"has_attachment": has_attachments(msg),
"message_id": message_id
})
except Exception as e:
print(f"Warning: Failed to parse mail {mid}: {str(e)}", file=sys.stderr)
return results, total
except Exception as e:
print(f"Error: Failed to fetch folder {folder}: {str(e)}", file=sys.stderr)
return [], 0
def main():
parser = argparse.ArgumentParser(description='语义搜索:拉取邮件供智能体做语义分析')
parser.add_argument('--folder', required=True, help='邮箱文件夹名称')
parser.add_argument('--limit', type=int, default=MAX_FETCH,
help=f'拉取邮件数量上限,默认{MAX_FETCH};传0表示不限(最多{MAX_FETCH})')
args = parser.parse_args()
email_addr, auth_code = get_credentials()
if not email_addr or not auth_code:
result = {"status": "error", "message": "缺少邮箱凭证,请配置QQ邮箱授权码"}
print(json.dumps(result, ensure_ascii=False))
return
limit = args.limit if args.limit > 0 else MAX_FETCH
limit = min(limit, MAX_FETCH)
try:
ctx = ssl.create_default_context()
mail = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT, ssl_context=ctx)
mail.login(email_addr, auth_code)
mail._encoding = 'utf-8'
emails, total = fetch_folder_emails(mail, args.folder, limit)
mail.logout()
result = {
"status": "success",
"folder": args.folder,
"total_in_folder": total,
"total_fetched": len(emails),
"fetched_limit": limit,
"has_more": total > limit,
"emails": emails
}
print(json.dumps(result, ensure_ascii=False))
except imaplib.IMAP4.error as e:
result = {"status": "error", "message": f"IMAP错误: {str(e)}"}
print(json.dumps(result, ensure_ascii=False))
except Exception as e:
result = {"status": "error", "message": f"连接失败: {str(e)}"}
print(json.dumps(result, ensure_ascii=False))
if __name__ == "__main__":
main()