Install
openclaw skills install crc-bank-statement-converter批量将各银行导出的各类 Excel 流水文件转换为统一的监管流水导入模板(9列标准格式)并生成汇总文件。
openclaw skills install crc-bank-statement-converter将不同银行导出的流水 Excel 文件批量转换为监管流水导入模板格式(9 列标准格式)。
自动识别以下格式,无需手动选择:
执行以下命令完成初始化(仅需一次):
# 1. 安装 Python 依赖
pip install openpyxl xlrd
# 2. 创建脚本目录
mkdir -p /home/node/agents/skills/bank-statement-converter
# 3. 写入 field_mapping.py(内容见下方代码)
# 4. 写入 converter.py(内容见下方代码)
# 5. 验证环境
python3 -c "import openpyxl, xlrd; print('OK')"
用户提供一个包含流水 Excel 文件的文件夹路径,执行以下命令:
python3 /home/node/agents/skills/bank-statement-converter/converter.py "<输入文件夹路径>" "<输出文件路径>"
.xls / .xlsx 流水文件的文件夹,支持子文件夹递归扫描输出 目录下用户说"帮我转换流水文件"时:
/home/node/workspace/流水数据)python3 /home/node/agents/skills/bank-statement-converter/converter.py "/home/node/workspace/流水数据" "/home/node/workspace/监管流水_汇总.xlsx"
生成的 Excel 文件包含:
_来源公司 列模板 9 列:日期 | 收入 | 支出 | 余额 | 摘要 | 对方户名 | 对方银行 | 对方账号 | 备注
~ 开头的临时文件和含 说明 的文件写入路径:/home/node/agents/skills/bank-statement-converter/field_mapping.py
# 目标模板 9 列 → 源文件可能的列名(按优先级排列)
FIELD_ALIASES = {
"日期": ["交易时间", "入账日期", "交易日期"],
"收入": ["转入金额", "贷方发生额", "贷方发生额(元)"],
"支出": ["转出金额", "借方发生额", "借方发生额(元)"],
"余额": ["余额", "账户余额", "账户余额(元)"],
"摘要": ["摘要", "用途"],
"对方户名": ["对方单位", "对方账户名称", "对方名称", "对方账号名称"],
"对方银行": ["对方行名", "对方账号开户网点名称", "对方开户行"],
"对方账号": ["对方账号"],
"备注": ["附言", "客户附言"],
}
HEADER_KEYWORDS = ["交易时间", "交易日期", "入账日期", "电子回单号码"]
DATE_COLS = ["日期"]
AMOUNT_COLS = ["收入", "支出"]
TEMPLATE_COLUMNS = list(FIELD_ALIASES.keys())
写入路径:/home/node/agents/skills/bank-statement-converter/converter.py
"""
银行流水 Excel 格式转换器
将不同银行导出的流水 Excel 统一转换为监管流水导入模板格式。
"""
import sys
import os
import re
import glob
import traceback
import tempfile
import shutil
import xml.etree.ElementTree as ET
from collections import OrderedDict
from datetime import datetime
try:
import openpyxl
from openpyxl.utils import get_column_letter
except ImportError:
print("需要安装 openpyxl: pip install openpyxl")
sys.exit(1)
try:
import xlrd
HAS_XLRD = True
except ImportError:
HAS_XLRD = False
from field_mapping import (
FIELD_ALIASES, HEADER_KEYWORDS, DATE_COLS, AMOUNT_COLS, TEMPLATE_COLUMNS
)
def find_header_row(rows, max_scan=25):
for i, row in enumerate(rows[:max_scan]):
non_empty = sum(1 for c in row if c and c != "None")
if non_empty < 4:
continue
for cell in row:
if any(kw in cell for kw in HEADER_KEYWORDS):
return i
return None
def is_data_row(row, headers):
non_empty = sum(1 for c in row if c and c != "None")
if non_empty < 3:
return False
text = "".join(str(c) for c in row)
if any(kw in text for kw in ["合计", "小计", "累计", "笔数"]):
return False
return True
def extract_date(value):
if not value or value == "None":
return ""
s = str(value).strip()
m = re.match(r"(\d{4}[-/]\d{2}[-/]\d{2})", s)
if m:
return m.group(1).replace("/", "-")
return s
def extract_amount(value):
if not value or value == "None":
return ""
s = str(value).strip()
m = re.search(r"[¥¥]([\d,]+\.?\d*)", s)
if m:
return m.group(1).replace(",", "")
s = s.replace(",", "")
if not s:
return ""
try:
float(s)
return s
except ValueError:
return s
def load_xlsx(filepath):
try:
wb = openpyxl.load_workbook(filepath, data_only=True)
ws = wb.active
all_rows = []
for row in ws.iter_rows(min_row=1, max_row=ws.max_row, values_only=True):
all_rows.append([str(c).strip() if c is not None else "" for c in row])
if not all_rows:
return [], [], "空文件"
header_idx = find_header_row(all_rows)
if header_idx is None:
return [], [], "未找到表头行"
headers = all_rows[header_idx]
data_rows = all_rows[header_idx + 1:]
data_rows = [r for r in data_rows if is_data_row(r, headers)]
return headers, data_rows, None
except Exception as e:
return None, [], str(e)
def load_xls(filepath):
try:
wb = xlrd.open_workbook(filepath)
ws = wb.sheet_by_index(0)
all_rows = []
for r in range(ws.nrows):
row = []
for c in range(ws.ncols):
cell = ws.cell(r, c)
if cell.ctype == xlrd.XL_CELL_DATE:
val = xlrd.xldate_as_datetime(cell.value, wb.datemode)
row.append(val.strftime("%Y-%m-%d %H:%M:%S") if val else "")
elif cell.ctype == xlrd.XL_CELL_NUMBER:
if cell.value == int(cell.value):
row.append(str(int(cell.value)))
else:
row.append(str(cell.value))
else:
row.append(str(cell.value).strip() if cell.value else "")
all_rows.append(row)
if not all_rows:
return [], [], "空文件"
header_idx = find_header_row(all_rows)
if header_idx is None:
return [], [], "未找到表头行"
headers = all_rows[header_idx]
data_rows = all_rows[header_idx + 1:]
data_rows = [r for r in data_rows if is_data_row(r, headers)]
return headers, data_rows, None
except Exception as e:
return None, [], str(e)
def load_xml_xls(filepath):
try:
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
ns = {"ss": "urn:schemas-microsoft-com:office:spreadsheet"}
root = ET.fromstring(content)
worksheet = root.find(".//ss:Worksheet", ns)
if worksheet is None:
return [], [], "XML 中未找到 Worksheet"
table = worksheet.find("ss:Table", ns)
if table is None:
return [], [], "XML 中未找到 Table"
all_rows = []
for row_elem in table.findall("ss:Row", ns):
cells = []
for cell_elem in row_elem.findall("ss:Cell", ns):
data_elem = cell_elem.find("ss:Data", ns)
cells.append(
data_elem.text.strip()
if data_elem is not None and data_elem.text
else ""
)
all_rows.append(cells)
if not all_rows:
return [], [], "空文件"
header_idx = find_header_row(all_rows)
if header_idx is None:
return [], [], "XML 中未找到表头行"
headers = all_rows[header_idx]
data_rows = all_rows[header_idx + 1:]
data_rows = [r for r in data_rows if is_data_row(r, headers)]
return headers, data_rows, None
except Exception as e:
return None, [], f"XML 解析失败: {e}"
def load_html_xls(filepath):
try:
from html.parser import HTMLParser
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
class TableParser(HTMLParser):
def __init__(self):
super().__init__()
self.rows = []
self.current_row = []
self.current_cell = ""
self.in_cell = False
def handle_starttag(self, tag, attrs):
if tag == "tr":
self.current_row = []
elif tag in ("td", "th"):
self.in_cell = True
self.current_cell = ""
def handle_endtag(self, tag):
if tag in ("td", "th"):
self.in_cell = False
self.current_row.append(self.current_cell.strip())
elif tag == "tr":
if self.current_row:
self.rows.append(self.current_row)
def handle_data(self, data):
if self.in_cell:
self.current_cell += data
def handle_entityref(self, name):
if self.in_cell:
import html
self.current_cell += " " if name == "nbsp" else html.unescape(f"&{name};")
parser = TableParser()
parser.feed(content)
if not parser.rows:
return [], [], "HTML 中未找到表格数据"
header_idx = find_header_row(parser.rows)
if header_idx is None:
return [], [], "HTML 中未找到表头行"
headers = parser.rows[header_idx]
data_rows = parser.rows[header_idx + 1:]
data_rows = [r for r in data_rows if is_data_row(r, headers)]
return headers, data_rows, None
except Exception as e:
return None, [], f"HTML 解析失败: {e}"
def load_workbook_auto(filepath):
ext = os.path.splitext(filepath)[1].lower()
with open(filepath, "rb") as f:
head = f.read(20)
if head[:2] == b"PK":
if ext == ".xls":
tmp = os.path.join(tempfile.gettempdir(), "_claw_convert.xlsx")
shutil.copy2(filepath, tmp)
result = load_xlsx(tmp)
os.unlink(tmp)
return result
return load_xlsx(filepath)
elif ext == ".xls":
if head[:5] == b"<?xml":
return load_xml_xls(filepath)
elif b"<html" in head[:20].lower():
return load_html_xls(filepath)
elif HAS_XLRD:
headers, data_rows, error = load_xls(filepath)
if error is None:
return headers, data_rows, None
return load_html_xls(filepath)
else:
return load_html_xls(filepath)
elif ext == ".xlsx":
return load_xlsx(filepath)
else:
return None, [], f"不支持的文件格式: {ext}"
def build_column_mapping(headers):
mapping = {}
used_targets = set()
for col_idx, header in enumerate(headers):
if not header or header == "None":
continue
header_clean = header.strip()
for target_field, aliases in FIELD_ALIASES.items():
if target_field in used_targets:
continue
for alias in aliases:
if alias == header_clean or alias in header_clean:
mapping[col_idx] = target_field
used_targets.add(target_field)
break
if col_idx in mapping:
break
return mapping
def convert_row(row, mapping):
result = OrderedDict()
for field in TEMPLATE_COLUMNS:
result[field] = ""
for col_idx, target_field in mapping.items():
if col_idx < len(row):
val = row[col_idx]
if target_field in DATE_COLS:
result[target_field] = extract_date(val)
elif target_field in AMOUNT_COLS:
result[target_field] = extract_amount(val)
else:
result[target_field] = str(val).strip() if val and val != "None" else ""
return result
def is_electronic_receipt(headers):
header_text = "".join(h for h in headers if h)
return "电子回单号码" in header_text
def convert_receipt_row(row, headers, company_name):
result = OrderedDict()
for field in TEMPLATE_COLUMNS:
result[field] = ""
def get(col_name):
for i, h in enumerate(headers):
if h == col_name and i < len(row):
return row[i].strip()
return ""
accounting_date = get("记账日期")
if accounting_date:
m = re.match(r"(\d{4})年(\d{2})月(\d{2})日", accounting_date)
if m:
result["日期"] = f"{m.group(1)}-{m.group(2)}-{m.group(3)}"
if not result["日期"]:
result["日期"] = extract_date(get("时间戳"))
amount = extract_amount(get("金额"))
payer_name = get("付方账户名称")
payee_name = get("收方账户名称")
if company_name and payer_name and company_name in payer_name:
result["支出"] = amount
result["对方户名"] = payee_name
result["对方银行"] = get("收方开户银行名称")
result["对方账号"] = get("收方账号")
else:
result["收入"] = amount
result["对方户名"] = payer_name
result["对方银行"] = get("付方开户银行名称")
result["对方账号"] = get("付方账号")
result["摘要"] = get("摘要")
result["备注"] = get("用途") or get("备注")
return result
def process_file(filepath):
company_name = os.path.splitext(os.path.basename(filepath))[0]
company_name = re.sub(r"\s*\d{4}.*$", "", company_name).strip()
company_name = re.sub(r"\s+新$", "", company_name).strip()
if not company_name:
company_name = os.path.splitext(os.path.basename(filepath))[0][:30]
headers, data_rows, error = load_workbook_auto(filepath)
if error:
return company_name, [], error
converted = []
if is_electronic_receipt(headers):
for row in data_rows:
converted.append(convert_receipt_row(row, headers, company_name))
else:
mapping = build_column_mapping(headers)
if not mapping:
return company_name, [], f"无法映射任何列 (表头: {headers[:8]})"
for row in data_rows:
converted.append(convert_row(row, mapping))
return company_name, converted, None
def process_folder(input_dir, output_path, template_path=None):
files = []
for ext in ("*.xlsx", "*.xls"):
files.extend(glob.glob(os.path.join(input_dir, "**", ext), recursive=True))
files = [f for f in files if not os.path.basename(f).startswith("~") and "说明" not in os.path.basename(f)]
if not files:
return 0, 0, ["未找到任何 Excel 文件"]
wb_out = openpyxl.Workbook()
wb_out.remove(wb_out.active)
success = 0
errors = []
all_summary_rows = []
for i, filepath in enumerate(sorted(files)):
company, rows, error = process_file(filepath)
rel_path = os.path.relpath(filepath, input_dir)
if error:
errors.append(f"[失败] {rel_path}: {error}")
continue
if not rows:
errors.append(f"[跳过] {rel_path}: 无数据行")
continue
sheet_name = company[:31]
ws = wb_out.create_sheet(title=sheet_name)
for col, field in enumerate(TEMPLATE_COLUMNS, 1):
ws.cell(row=1, column=col, value=field)
for r, row_data in enumerate(rows, 2):
for col, field in enumerate(TEMPLATE_COLUMNS, 1):
ws.cell(row=r, column=col, value=row_data.get(field, ""))
for row_data in rows:
row_data["_来源公司"] = company
all_summary_rows.append(row_data)
success += 1
if all_summary_rows:
ws_sum = wb_out.create_sheet(title="汇总", index=0)
sum_headers = TEMPLATE_COLUMNS + ["_来源公司"]
for col, field in enumerate(sum_headers, 1):
ws_sum.cell(row=1, column=col, value=field)
for r, row_data in enumerate(all_summary_rows, 2):
for col, field in enumerate(sum_headers, 1):
ws_sum.cell(row=r, column=col, value=row_data.get(field, ""))
wb_out.save(output_path)
return len(files), success, errors
def main():
if len(sys.argv) < 2:
print("用法: python converter.py <输入文件夹> [输出文件路径]")
print("示例: python converter.py ./流水数据 ./输出/监管流水_汇总.xlsx")
sys.exit(1)
input_dir = sys.argv[1]
if len(sys.argv) >= 3:
output_path = sys.argv[2]
else:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = os.path.join(os.path.dirname(input_dir), "输出")
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"监管流水_{timestamp}_汇总.xlsx")
if not os.path.isdir(input_dir):
print(f"错误: 文件夹不存在: {input_dir}")
sys.exit(1)
print(f"扫描文件夹: {input_dir}")
total, success, errors = process_folder(input_dir, output_path)
print(f"\n处理完成:")
print(f" 总文件数: {total}")
print(f" 成功转换: {success}")
print(f" 失败/跳过: {total - success}")
if errors:
print(f"\n失败详情:")
for err in errors:
print(f" {err}")
print(f"\n输出文件: {output_path}")
if __name__ == "__main__":
main()