"""火一五流程图 — 数据模型 + YAML/JSON 解析 + Mermaid/PlantUML/DOT 代码生成。 支持的图表类型 ============== | 类型 | 输出 DSL | 典型用途 | |--------------------|-------------------|----------| | flowchart | Mermaid flowchart | 普通流程图(含分组 subgraph)| | swimlane | PlantUML activity | 真·泳道图(按角色分栏)| | swimlane_mermaid | Mermaid subgraph | 泳道风格(不需要 Java 时用)| | sequence | Mermaid sequence | 时序图 | | state | Mermaid state v2 | 状态图 | | gantt | Mermaid gantt | 甘特图 | | er | Mermaid erDiagram | ER 图 | | class | Mermaid classDiagram | UML 类图 | | journey | Mermaid journey | 用户旅程 | | pie | Mermaid pie | 饼图 | | architecture | Mermaid flowchart | 系统架构(分层 + 分组) | | c4_context | Mermaid C4Context | C4 上下文图 | | c4_container | Mermaid C4Container | C4 容器图 | | mindmap | Mermaid mindmap | 简单思维导图(正经的用 huo15-mind-map)| 用 YAML 描述;具体字段按 type 不同而不同,见 SKILL.md 示例。 """ from __future__ import annotations import json import re from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Tuple try: import yaml # type: ignore HAS_YAML = True except ImportError: HAS_YAML = False # ----- 通用数据结构 ----- @dataclass class Node: id: str label: str = "" shape: str = "rect" # rect/round/stadium/diamond/hexagon/circle/cylinder/cloud/component lane: Optional[str] = None style_class: Optional[str] = None group: Optional[str] = None # 分组名,用于 subgraph tier: Optional[str] = None # architecture tier 分层名 category: Optional[str] = None # 1..5 或 c1..c5,用于分色(可视层级) @dataclass class Edge: src: str dst: str label: Optional[str] = None # solid / dashed / thick / dotted / bidir # v1.3.2 语义类型:success / warning / error / info(带配色) kind: str = "solid" style_class: Optional[str] = None semantic: Optional[str] = None # success / warning / error / info / neutral @dataclass class Tier: id: str label: str = "" direction: str = "" # 可选 override children: List[str] = field(default_factory=list) @dataclass class Group: id: str label: str = "" direction: str = "" # 可选 override children: List[str] = field(default_factory=list) @dataclass class FlowChart: diagram_type: str = "flowchart" title: str = "" direction: str = "TB" nodes: List[Node] = field(default_factory=list) edges: List[Edge] = field(default_factory=list) groups: List[Group] = field(default_factory=list) tiers: List[Tier] = field(default_factory=list) # architecture 多层分层 lanes: List[str] = field(default_factory=list) raw: Optional[str] = None # 原样 Mermaid/DOT 代码时用 extras: Dict[str, Any] = field(default_factory=dict) # ----- 解析 ----- def parse(text_or_path: str, hint: str = "auto") -> FlowChart: """顶层解析。按 hint / 扩展名 / 内容特征分发。""" import os text = text_or_path if os.path.exists(text_or_path): with open(text_or_path, encoding="utf-8") as f: text = f.read() if hint == "auto": ext = os.path.splitext(text_or_path)[1].lower() hint = {".yaml": "yaml", ".yml": "yaml", ".json": "json", ".mmd": "mermaid", ".mermaid": "mermaid", ".puml": "plantuml", ".plantuml": "plantuml", ".dot": "dot", ".gv": "dot"}.get(ext, "auto") if hint == "auto": stripped = text.lstrip() if stripped.startswith("{"): hint = "json" elif re.match(r"^(flowchart|graph|sequenceDiagram|stateDiagram|gantt|classDiagram|erDiagram|journey|pie|C4Context|C4Container|mindmap|%%\{init)", stripped): hint = "mermaid" elif stripped.startswith("@startuml") or stripped.startswith("@startsalt"): hint = "plantuml" elif stripped.startswith("digraph") or stripped.startswith("graph "): hint = "dot" else: hint = "yaml" if hint == "json": return parse_spec(json.loads(text)) if hint == "yaml": if not HAS_YAML: raise RuntimeError("需要 PyYAML(pip install pyyaml)才能解析 YAML 规格文件") return parse_spec(yaml.safe_load(text)) if hint == "mermaid": return FlowChart(diagram_type="mermaid_raw", raw=text) if hint == "plantuml": return FlowChart(diagram_type="plantuml_raw", raw=text) if hint == "dot": return FlowChart(diagram_type="dot_raw", raw=text) raise ValueError(f"未知输入类型:{hint}") def parse_spec(spec: Dict[str, Any]) -> FlowChart: """从 dict(已解析的 YAML/JSON)构造 FlowChart。""" fc = FlowChart() fc.diagram_type = str(spec.get("type") or spec.get("diagram") or "flowchart").lower() fc.title = spec.get("title", "") or "" fc.direction = spec.get("direction", "TB") fc.extras = {k: v for k, v in spec.items() if k not in { "type", "diagram", "title", "direction", "nodes", "edges", "groups", "lanes", "relations" }} # 节点 for n in spec.get("nodes", []) or []: if isinstance(n, str): fc.nodes.append(Node(id=n, label=n)) continue fc.nodes.append(Node( id=n["id"], label=n.get("label", n["id"]), shape=n.get("shape", "rect"), lane=n.get("lane"), style_class=n.get("class"), group=n.get("group"), tier=n.get("tier"), category=n.get("category") or n.get("cat"), )) # 边(兼容 relations 别名) _SEMANTIC_KINDS = {"success", "warning", "error", "info", "neutral"} for e in (spec.get("edges") or spec.get("relations") or []): if isinstance(e, list): if len(e) == 2: fc.edges.append(Edge(src=e[0], dst=e[1])) elif len(e) >= 3: fc.edges.append(Edge(src=e[0], dst=e[1], label=e[2])) continue raw_kind = e.get("kind", "solid") semantic = e.get("semantic") or e.get("sem") # 允许把语义直接写在 kind 上:kind: success if not semantic and raw_kind in _SEMANTIC_KINDS: semantic = raw_kind raw_kind = "solid" fc.edges.append(Edge( src=e.get("from") or e.get("src") or e["source"], dst=e.get("to") or e.get("dst") or e["target"], label=e.get("label"), kind=raw_kind, style_class=e.get("class"), semantic=semantic, )) # 分层 / tiers for t in spec.get("tiers", []) or []: if isinstance(t, str): fc.tiers.append(Tier(id=t, label=t)) else: fc.tiers.append(Tier( id=t["id"], label=t.get("label", t["id"]), direction=t.get("direction", ""), children=t.get("children", []) or t.get("nodes", []), )) # 分组 / 子图 for g in spec.get("groups", []) or []: if isinstance(g, str): fc.groups.append(Group(id=g, label=g)) continue fc.groups.append(Group( id=g["id"], label=g.get("label", g["id"]), direction=g.get("direction", ""), children=g.get("children", []) or g.get("nodes", []), )) # 泳道 if "lanes" in spec: lanes = spec["lanes"] if lanes and isinstance(lanes[0], dict): for lane in lanes: fc.lanes.append(lane.get("name") or lane.get("id") or "") for step in lane.get("steps") or lane.get("nodes") or []: if isinstance(step, str): fc.nodes.append(Node(id=step, label=step, lane=lane.get("name"))) else: fc.nodes.append(Node( id=step["id"], label=step.get("label", step["id"]), shape=step.get("shape", "rect"), lane=lane.get("name"), )) else: fc.lanes = [str(l) for l in lanes] return fc # ----- Mermaid 生成 ----- _MM_SHAPE = { "rect": ("[", "]"), "round": ("(", ")"), "stadium": ("([", "])"), "subroutine": ("[[", "]]"), "cylinder": ("[(", ")]"), "circle": ("((", "))"), "asymmetric": (">", "]"), "diamond": ("{", "}"), "hexagon": ("{{", "}}"), "parallelogram": ("[/", "/]"), "trapezoid": ("[/", "\\]"), } _MM_EDGE = { "solid": "-->", "dashed": "-.->", "dotted": "-.->", "thick": "==>", "bidir": "<-->", "none": "---", } # ----- Icon 字典(label 里 :name: 语法替换为 emoji) ----- # 覆盖流程图常见语义,贴近 Material / Lucide 图标集 _ICON_ALIASES: Dict[str, str] = { "user": "👤", "users": "👥", "person": "👤", "admin": "🛡️", "login": "🔑", "key": "🔑", "lock": "🔒", "unlock": "🔓", "security": "🛡️", "shield": "🛡️", "start": "🟢", "play": "▶️", "stop": "⏹️", "pause": "⏸️", "check": "✅", "cross": "❌", "success": "✅", "fail": "❌", "error": "❌", "warning": "⚠️", "info": "ℹ️", "question": "❓", "db": "💾", "database": "💾", "cache": "⚡", "storage": "🗄️", "cloud": "☁️", "api": "🔌", "server": "🖥️", "mobile": "📱", "web": "🌐", "browser": "🌐", "globe": "🌐", "mail": "✉️", "email": "✉️", "bell": "🔔", "search": "🔍", "edit": "✏️", "delete": "🗑️", "trash": "🗑️", "cart": "🛒", "pay": "💳", "card": "💳", "money": "💰", "coin": "🪙", "order": "📦", "box": "📦", "package": "📦", "ship": "🚚", "truck": "🚚", "deliver": "📬", "inbox": "📥", "outbox": "📤", "ai": "🤖", "bot": "🤖", "brain": "🧠", "chart": "📊", "graph": "📈", "analytics": "📊", "doc": "📄", "file": "📄", "folder": "📁", "clock": "⏰", "time": "⏱️", "calendar": "📅", "star": "⭐", "heart": "❤️", "like": "👍", "dislike": "👎", "fire": "🔥", "rocket": "🚀", "bulb": "💡", "tools": "🛠️", "gear": "⚙️", "settings": "⚙️", "config": "⚙️", "git": "🔀", "branch": "🔀", "merge": "🔀", "build": "🏗️", "deploy": "📦", "launch": "🚀", "test": "🧪", "bug": "🐛", "check_circle": "✅", "phone": "📞", "link": "🔗", "queue": "📮", "mq": "📮", "log": "📜", "doc_text": "📝", "china": "🇨🇳", "world": "🌍", "alarm": "🚨", "siren": "🚨", } _ICON_RE = re.compile(r":([a-z0-9_\-]+):") def _expand_icons(text: str) -> str: """把 :icon_name: 替换为 emoji;未知 name 原样保留。 支持多次出现;也允许与普通文字混排。 """ if not text or ":" not in text: return text def _repl(m): key = m.group(1).lower() return _ICON_ALIASES.get(key, m.group(0)) return _ICON_RE.sub(_repl, text) def _mm_label(label: str) -> str: if not label: return "" # 展开 :icon_name: → emoji(v1.3.2) label = _expand_icons(label) # Mermaid 特殊字符需要用引号包起来 if any(c in label for c in "()[]{}|<>/\\\"\n"): safe = label.replace('"', '\\"').replace("\n", "
") return f'"{safe}"' # 检测 emoji:常见 pictograph / 变体选择符 / symbol-like 范围 # 避开 CJK 主区 (0x4E00-0x9FFF) for c in label: cp = ord(c) if (0x2600 <= cp <= 0x27BF # miscellaneous symbols & dingbats or 0xFE00 <= cp <= 0xFE0F # variation selectors or 0x1F300 <= cp <= 0x1FAFF # emoji or 0x1F000 <= cp <= 0x1F02F # mahjong/playing card or 0x1F0A0 <= cp <= 0x1F0FF): return f'"{label}"' return label def _mm_node(n: Node) -> str: open_br, close_br = _MM_SHAPE.get(n.shape, _MM_SHAPE["rect"]) return f"{n.id}{open_br}{_mm_label(n.label or n.id)}{close_br}" def _mm_edge(e: Edge) -> str: arrow = _MM_EDGE.get(e.kind, _MM_EDGE["solid"]) if e.label: label_part = f"|{_mm_label(e.label).strip('"')}|" return f"{e.src} {arrow}{label_part} {e.dst}" return f"{e.src} {arrow} {e.dst}" def to_mermaid(fc: FlowChart, style_directive: str = "", style: Optional[Any] = None) -> str: """把 FlowChart 转成 Mermaid 代码。 style_directive - `%%{init:...}%%` 那一行 style - Style 对象(可选,用于注入 decision / database 的 classDef) """ if fc.diagram_type == "mermaid_raw": # 原样 Mermaid 代码,只在开头插入 style_directive raw = fc.raw or "" if style_directive and "%%{init" not in raw: return style_directive + "\n" + raw return raw t = fc.diagram_type body: List[str] if t in ("flowchart", "architecture", "swimlane_mermaid"): body = _mm_flowchart(fc) elif t == "sequence": body = _mm_sequence(fc) elif t == "state": body = _mm_state(fc) elif t == "gantt": body = _mm_gantt(fc) elif t == "er": body = _mm_er(fc) elif t == "class": body = _mm_class(fc) elif t == "journey": body = _mm_journey(fc) elif t == "pie": body = _mm_pie(fc) elif t in ("c4_context", "c4context"): body = _mm_c4(fc, "Context") elif t in ("c4_container", "c4container"): body = _mm_c4(fc, "Container") elif t == "mindmap": body = _mm_mindmap(fc) else: body = _mm_flowchart(fc) # 基于 style 注入的 decision / database / terminal / category classDef(仅 flowchart 家族) if style is not None and t in ("flowchart", "architecture", "swimlane_mermaid"): try: from styles import ( decision_classdef, database_classdef, terminal_classdef, category_classdefs, semantic_colors, ) auto = fc.extras.get("_auto_classdefs", {}) decision_ids: List[str] = auto.get("decision_ids", []) or [] database_ids: List[str] = auto.get("database_ids", []) or [] terminal_ids: List[str] = auto.get("terminal_ids", []) or [] cat_map: Dict[str, List[str]] = auto.get("category_map", {}) or {} sem_map: Dict[str, List[int]] = auto.get("semantic_edges", {}) or {} if decision_ids: body.append(" " + decision_classdef(style)) for nid in decision_ids: body.append(f" class {nid} decision") if database_ids: body.append(" " + database_classdef(style)) for nid in database_ids: body.append(f" class {nid} database") if terminal_ids: body.append(" " + terminal_classdef(style)) for nid in terminal_ids: body.append(f" class {nid} terminal") if cat_map: for cls in category_classdefs(style): body.append(" " + cls) for cid, ids in cat_map.items(): for nid in ids: body.append(f" class {nid} {cid}") # 语义边:linkStyle N stroke:#...,color:#...,stroke-width:2px if sem_map: colors = semantic_colors(style) for sem, idxs in sem_map.items(): col = colors.get(sem, style.line_color) idx_list = ",".join(str(i) for i in idxs) body.append( f" linkStyle {idx_list} " f"stroke:{col},color:{col},stroke-width:2px" ) except Exception: pass # Mermaid 要求 frontmatter(---title---)必须位于文件最前;init 指令跟在其后。 if body and body[0].startswith("---"): first_block = body[0] # 例如 "---\ntitle: xxx\n---" rest = body[1:] if style_directive: return "\n".join([first_block, style_directive] + rest) return "\n".join([first_block] + rest) if style_directive: return "\n".join([style_directive] + body) return "\n".join(body) def _mm_flowchart(fc: FlowChart) -> List[str]: lines = [f"flowchart {fc.direction}"] if fc.title: lines.insert(0, f"---\ntitle: {fc.title}\n---") # 收集需要特殊 classDef 的节点 decision_ids = [n.id for n in fc.nodes if n.shape == "diamond"] database_ids = [n.id for n in fc.nodes if n.shape in ("cylinder",)] terminal_ids = [n.id for n in fc.nodes if n.shape == "stadium"] # category 分组:c1..c5 category_map: Dict[str, List[str]] = {} for n in fc.nodes: if not n.category: continue key = str(n.category).strip().lower().lstrip("c") if key in ("1", "2", "3", "4", "5"): category_map.setdefault(f"c{key}", []).append(n.id) # tiers 优先(architecture 类型);否则用 groups use_tiers = bool(fc.tiers) and fc.diagram_type == "architecture" if use_tiers: groups_by_id: Dict[str, Group] = {t.id: Group(id=t.id, label=t.label, direction=t.direction, children=t.children) for t in fc.tiers} else: groups_by_id = {g.id: g for g in fc.groups} grouped_nodes: Dict[str, List[Node]] = {g.id: [] for g in groups_by_id.values()} ungrouped: List[Node] = [] for n in fc.nodes: key: Optional[str] = None if use_tiers: # architecture tiers:从 node.tier 取 key = getattr(n, 'tier', None) or n.group elif fc.diagram_type == "swimlane_mermaid": key = n.lane if key and key not in groups_by_id: groups_by_id[key] = Group(id=key, label=key) grouped_nodes.setdefault(key, []) else: key = n.group if key and key in groups_by_id: grouped_nodes[key].append(n) else: ungrouped.append(n) # 先输出顶层节点 for n in ungrouped: lines.append(" " + _mm_node(n)) # 输出 subgraph for gid, g in groups_by_id.items(): direction = g.direction or fc.direction title = g.label or gid lines.append(f" subgraph {gid}[\"{title}\"]") if direction: lines.append(f" direction {direction}") # children 列表里的 + grouped_nodes 里的 children_ids = set(g.children) seen = set() for n in grouped_nodes.get(gid, []): lines.append(" " + _mm_node(n)) seen.add(n.id) for cid in g.children: if cid in seen: continue # 这个 id 可能是先前声明过的顶层节点 lines.append(f" {cid}") lines.append(" end") # 边 for e in fc.edges: lines.append(" " + _mm_edge(e)) # classDef / class classes = {n.style_class for n in fc.nodes if n.style_class} for cls in sorted(c for c in classes if c): lines.append(f" classDef {cls} fill:#f9f9f9,stroke:#999,stroke-width:1px") for n in fc.nodes: if n.style_class: lines.append(f" class {n.id} {n.style_class}") # 语义边索引(edge 在 body 里是按 fc.edges 顺序输出的) semantic_edges: Dict[str, List[int]] = {} for idx, e in enumerate(fc.edges): if e.semantic: semantic_edges.setdefault(e.semantic, []).append(idx) # 自动 decision / database / terminal / category classDef(渲染阶段注入) fc.extras["_auto_classdefs"] = { "semantic_edges": semantic_edges, "decision_ids": decision_ids, "database_ids": database_ids, "terminal_ids": terminal_ids, "category_map": category_map, } return lines def _mm_sequence(fc: FlowChart) -> List[str]: lines = ["sequenceDiagram"] if fc.title: lines.append(f" title {fc.title}") # 节点看作 actor / participant for n in fc.nodes: role = "actor" if n.shape == "actor" else "participant" lines.append(f" {role} {n.id} as {n.label or n.id}") for e in fc.edges: arrow = "->>" if e.kind != "dashed" else "-->>" lbl = (e.label or "").replace("\n", " ") lines.append(f" {e.src}{arrow}{e.dst}: {lbl}") # extras 支持 notes/auto_number if fc.extras.get("auto_number"): lines.insert(1, " autonumber") for note in fc.extras.get("notes", []) or []: pos = note.get("position", "over") over = note.get("over") or note.get("participant", "") txt = (note.get("text", "") or "").replace("\n", "
") lines.append(f" Note {pos} {over}: {txt}") return lines def _mm_state(fc: FlowChart) -> List[str]: lines = ["stateDiagram-v2"] if fc.title: lines.insert(0, f"---\ntitle: {fc.title}\n---") if fc.direction: lines.append(f" direction {fc.direction}") for n in fc.nodes: if n.label and n.label != n.id: lines.append(f" {n.id} : {n.label}") for e in fc.edges: lbl = f" : {e.label}" if e.label else "" lines.append(f" {e.src} --> {e.dst}{lbl}") return lines def _mm_gantt(fc: FlowChart) -> List[str]: lines = ["gantt"] if fc.title: lines.append(f" title {fc.title}") lines.append(f" dateFormat {fc.extras.get('dateFormat', 'YYYY-MM-DD')}") if "axisFormat" in fc.extras: lines.append(f" axisFormat {fc.extras['axisFormat']}") # sections / tasks sections = fc.extras.get("sections") or [] if sections: for sec in sections: lines.append(f" section {sec.get('name', '')}") for task in sec.get("tasks", []): _gantt_task(lines, task) else: for task in fc.extras.get("tasks", []) or []: _gantt_task(lines, task) return lines def _gantt_task(lines: List[str], task: Dict[str, Any]) -> None: def _s(v: Any) -> str: if v is None: return "" if hasattr(v, "strftime"): # datetime.date / datetime return v.strftime("%Y-%m-%d") return str(v) name = _s(task.get("name", "")) tid = _s(task.get("id", "")) status = _s(task.get("status", "")) start = _s(task.get("start", "")) dur = _s(task.get("duration", task.get("end", ""))) parts = [p for p in [status, tid, start, dur] if p] lines.append(f" {name} :{', '.join(parts)}") def _mm_er(fc: FlowChart) -> List[str]: lines = ["erDiagram"] if fc.title: lines.insert(0, f"---\ntitle: {fc.title}\n---") # 节点 = 实体;label 可附带字段 for n in fc.nodes: fields = n.label.split("\n") if "\n" in n.label else [] if len(fields) > 1: lines.append(f" {n.id} {{") for f in fields[1:]: lines.append(f" {f}") lines.append(" }") for e in fc.edges: relation = e.kind if e.kind != "solid" else "||--o{" lbl = e.label or "relates to" lines.append(f" {e.src} {relation} {e.dst} : \"{lbl}\"") return lines def _mm_class(fc: FlowChart) -> List[str]: lines = ["classDiagram"] if fc.title: lines.insert(0, f"---\ntitle: {fc.title}\n---") if fc.direction: lines.append(f" direction {fc.direction}") for n in fc.nodes: if "\n" in n.label: parts = n.label.split("\n") lines.append(f" class {n.id} {{") for p in parts[1:]: lines.append(f" {p}") lines.append(" }") else: lines.append(f" class {n.id}") for e in fc.edges: rel = { "extends": "<|--", "implements": "<|..", "composition": "*--", "aggregation": "o--", "dependency": "..>", "association": "-->", }.get(e.kind, "-->") lbl = f" : {e.label}" if e.label else "" lines.append(f" {e.src} {rel} {e.dst}{lbl}") return lines def _mm_journey(fc: FlowChart) -> List[str]: lines = ["journey"] if fc.title: lines.append(f" title {fc.title}") for section in fc.extras.get("sections", []) or []: lines.append(f" section {section.get('name', '')}") for task in section.get("tasks", []) or []: lines.append(f" {task['name']}: {task.get('score', 3)}: {', '.join(task.get('actors', []))}") return lines def _mm_pie(fc: FlowChart) -> List[str]: lines = ["pie" + (" showData" if fc.extras.get("show_data") else "")] if fc.title: lines.append(f" title {fc.title}") for item in fc.extras.get("items", []) or []: lines.append(f" \"{item['name']}\" : {item['value']}") return lines def _mm_c4(fc: FlowChart, level: str) -> List[str]: lines = [f"C4{level}"] if fc.title: lines.append(f" title {fc.title}") # 节点:shape 用 Person / System / System_Ext / Container / Db / ContainerDb shape_to_kind = { "person": "Person", "person_ext": "Person_Ext", "system": "System", "system_ext": "System_Ext", "container": "Container", "container_db": "ContainerDb", "component": "Component", "db": "ContainerDb", } for n in fc.nodes: kind = shape_to_kind.get(n.shape.lower(), "System") desc = "" if "\n" in n.label: parts = n.label.split("\n", 1) lbl, desc = parts[0], parts[1] else: lbl = n.label or n.id desc_part = f', "{desc}"' if desc else "" lines.append(f' {kind}({n.id}, "{lbl}"{desc_part})') for e in fc.edges: lbl = e.label or "" lines.append(f' Rel({e.src}, {e.dst}, "{lbl}")') return lines def _mm_mindmap(fc: FlowChart) -> List[str]: lines = ["mindmap"] # 简化:假定第一个 node 是根,其他都是根的 child(不处理多级) if not fc.nodes: return lines root = fc.nodes[0] lines.append(f" root(({root.label or root.id}))") # 按 edges 建父子关系 children: Dict[str, List[str]] = {} for e in fc.edges: children.setdefault(e.src, []).append(e.dst) node_by_id = {n.id: n for n in fc.nodes} def _render(nid: str, depth: int) -> None: for cid in children.get(nid, []): node = node_by_id.get(cid) label = node.label if node else cid lines.append(" " * (depth + 1) + label) _render(cid, depth + 1) _render(root.id, 1) return lines # ----- PlantUML 生成(真·泳道图) ----- def to_plantuml(fc: FlowChart, style_skinparam: str = "") -> str: if fc.diagram_type == "plantuml_raw": return fc.raw or "" if fc.diagram_type == "swimlane": return _puml_swimlane(fc, style_skinparam) if fc.diagram_type == "sequence": return _puml_sequence(fc, style_skinparam) if fc.diagram_type in ("c4_context", "c4context", "c4_container", "c4container"): return _puml_c4(fc, style_skinparam) # fallback:让 Mermaid 干 raise ValueError(f"PlantUML 当前只支持 swimlane/sequence/c4;请用 type: swimlane_mermaid 等走 Mermaid。") def _puml_swimlane(fc: FlowChart, skin: str) -> str: out = ["@startuml"] if skin: out.append(skin) if fc.title: out.append(f"title {fc.title}") # 按 lane 分组 lane_order = fc.lanes or [] if not lane_order: lane_order = list(dict.fromkeys([n.lane for n in fc.nodes if n.lane])) lane_nodes: Dict[str, List[Node]] = {l: [] for l in lane_order} for n in fc.nodes: if n.lane in lane_nodes: lane_nodes[n.lane].append(n) # 构建 id → node 映射与 children 图 node_by_id = {n.id: n for n in fc.nodes} successors: Dict[str, List[Tuple[str, Optional[str]]]] = {} for e in fc.edges: successors.setdefault(e.src, []).append((e.dst, e.label)) out.append("start") visited = set() def _activity_body(n: Node) -> str: if n.shape == "diamond": return f"if ({n.label or n.id}?) then" return f":{n.label or n.id};" # 按 lane 顺序,深度优先走边 first = fc.nodes[0] if fc.nodes else None cursor = first cur_lane: Optional[str] = None while cursor and cursor.id not in visited: visited.add(cursor.id) if cursor.lane and cursor.lane != cur_lane: out.append(f"|{cursor.lane}|") cur_lane = cursor.lane out.append(_activity_body(cursor)) nexts = successors.get(cursor.id, []) if not nexts: break # 只线性前进(复杂分支建议用原生 PlantUML 写) nxt_id, lbl = nexts[0] if lbl: out.append(f"note right: {lbl}") cursor = node_by_id.get(nxt_id) out.append("stop") out.append("@enduml") return "\n".join(out) def _puml_sequence(fc: FlowChart, skin: str) -> str: out = ["@startuml"] if skin: out.append(skin) if fc.title: out.append(f"title {fc.title}") for n in fc.nodes: role = "actor" if n.shape == "actor" else "participant" out.append(f'{role} "{n.label or n.id}" as {n.id}') for e in fc.edges: arrow = "-->" if e.kind == "dashed" else "->" lbl = f" : {e.label}" if e.label else "" out.append(f"{e.src} {arrow} {e.dst}{lbl}") out.append("@enduml") return "\n".join(out) def _puml_c4(fc: FlowChart, skin: str) -> str: """C4-PlantUML 生成器(Person/System/Container/Rel)。""" level = "Context" if fc.diagram_type in ("c4_context", "c4context") else "Container" out = ["@startuml"] if skin: out.append(skin) if fc.title: out.append(f"title {fc.title}") shape_to_c4 = { "person": "Person", "person_ext": "Person_Ext", "system": "System", "system_ext": "System_Ext", "container": "Container", "container_db": "ContainerDb", "db": "ContainerDb", "component": "Component", } for n in fc.nodes: kind = shape_to_c4.get(n.shape.lower(), "System") if "\n" in n.label: parts = n.label.split("\n", 1) lbl, desc = parts[0], parts[1] else: lbl = n.label or n.id desc = "" if desc: out.append(f'{kind}({n.id}, "{lbl}", "{desc}")') else: # Try to detect technology from group or extras tech = fc.extras.get("technologies", {}).get(n.id, "") if tech: out.append(f'{kind}({n.id}, "{lbl}", "{tech}")') else: out.append(f'{kind}({n.id}, "{lbl}")') for e in fc.edges: lbl = e.label or "" # C4-PlantUML Rel 支持方向:Left/Right/Up/Down out.append(f'Rel({e.src}, {e.dst}, "{lbl}")') out.append("@enduml") return "\n".join(out) # ----- Graphviz DOT 生成(复杂网络拓扑、系统架构备选) ----- def to_dot(fc: FlowChart, style: Optional[Any] = None) -> str: """把 FlowChart 转成 Graphviz DOT 代码。用于网络拓扑 / 架构备选。""" from styles import Style # 避免循环 if fc.diagram_type == "dot_raw": return fc.raw or "" lines = ["digraph G {"] lines.append(f' rankdir={fc.direction if fc.direction in ("TB","BT","LR","RL") else "TB"};') lines.append(' node [shape=box, style="rounded,filled", fontname="PingFang SC"];') if style: lines.append(f' bgcolor="{style.background}";') lines.append(f' node [fillcolor="{style.primary_color}", fontcolor="{style.primary_text_color}", color="{style.primary_border_color}"];') lines.append(f' edge [color="{style.line_color}", fontname="PingFang SC"];') if fc.title: lines.append(f' label="{fc.title}"; labelloc=t; fontsize=18;') # 分组 for g in fc.groups: lines.append(f' subgraph cluster_{g.id} {{') lines.append(f' label="{g.label}";') lines.append(' style="rounded,filled"; fillcolor="#F5F5F5";') for cid in g.children: lines.append(f" {cid};") lines.append(" }") for n in fc.nodes: attrs = [f'label="{n.label or n.id}"'] if n.shape in ("cylinder", "cylinder"): attrs.append('shape=cylinder') elif n.shape == "diamond": attrs.append('shape=diamond') elif n.shape == "circle": attrs.append('shape=circle') lines.append(f" {n.id} [{', '.join(attrs)}];") for e in fc.edges: attrs = [] if e.label: attrs.append(f'label="{e.label}"') if e.kind == "dashed": attrs.append('style=dashed') attr_str = f" [{', '.join(attrs)}]" if attrs else "" lines.append(f" {e.src} -> {e.dst}{attr_str};") lines.append("}") return "\n".join(lines) # ----- draw.io XML 生成(支持导出 .drawio 源文件) ----- _DRAWIO_SHAPE = { "rect": "rectangle", "round": "rectangle", # 用 rounded=1 参数表达圆角 "stadium": "rectangle", # 半圆端 via arcSize=50 "subroutine": "process", "cylinder": "cylinder", "circle": "ellipse", "asymmetric": "rectangle", "diamond": "rhombus", "hexagon": "hexagon", "parallelogram": "parallelogram", "trapezoid": "trapezoid", "person": "umlActor", "person_ext": "umlActor", "system": "rectangle", "system_ext": "rectangle", "container": "rectangle", "container_db": "cylinder", "db": "cylinder", "component": "component", } def _dx_escape(s: str) -> str: """转义 XML 特殊字符。""" return (s.replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) .replace("'", "'")) def _dx_style(shape: str, params: Dict[str, str]) -> str: """把形状 + 样式参数序列化成 draw.io style 字符串。""" shape_type = _DRAWIO_SHAPE.get(shape, "rectangle") base: Dict[str, str] = { "shape": shape_type, "whiteSpace": "wrap", "html": "1", "align": "center", "verticalAlign": "middle", "spacing": "4", } # 圆角策略 if shape in ("round", "stadium"): base["rounded"] = "1" base["arcSize"] = "45" if shape == "stadium" else "20" elif shape in ("rect", "system", "system_ext", "container", "subroutine"): base["rounded"] = "1" base["arcSize"] = "12" base.update(params) return ";".join(f"{k}={v}" for k, v in base.items() if v != "") def to_drawio(fc: FlowChart, style: Optional[Any] = None, theme: str = "modern", font_family: Optional[str] = None, shadow: bool = True) -> str: """把 FlowChart 转成 draw.io .drawio XML(现代风格:渐变 + 圆角 + 阴影 + 弧形正交连线)。""" # 字体与配色 ff = font_family or ( style.font_family.split(",")[0].strip('"') if style else "PingFang SC" ) if style: fill = style.primary_color stroke = style.primary_border_color font_color = style.primary_text_color bg = style.background line = style.line_color accent = style.accent_color grad_start = getattr(style, "gradient_start", "") or "" grad_end = getattr(style, "gradient_end", "") or "" corner = getattr(style, "corner_radius", 12) sw = getattr(style, "stroke_width", 1.6) sec = style.secondary_color tert = style.tertiary_color else: fill, stroke, font_color = "#1E293B", "#0F172A", "#FFFFFF" bg, line, accent = "#FAFAFA", "#64748B", "#6366F1" grad_start, grad_end = "", "" corner, sw = 12, 1.6 sec, tert = "#F1F5F9", "#E2E8F0" # 形状专用的 fill 选择:判断节点用 accent,数据库/容器用 secondary def node_colors(shape: str) -> Dict[str, str]: if shape == "diamond": return {"fill": accent, "text": "#FFFFFF", "stroke": accent} if shape in ("cylinder", "container_db", "db"): return {"fill": sec, "text": stroke, "stroke": tert} if shape in ("system_ext", "person_ext"): return {"fill": tert, "text": stroke, "stroke": line} return {"fill": fill, "text": font_color, "stroke": stroke} # 布局:tiers 优先,否则 groups,否则单列网格 tier_list = [t.id for t in fc.tiers] if fc.tiers else [] group_list = [g.id for g in fc.groups] if fc.groups else [] uses_tiers = bool(tier_list) container_list = tier_list if uses_tiers else group_list # 把节点按 container 分桶(tier/group/ungrouped) bucket: Dict[str, List[Node]] = {cid: [] for cid in container_list} ungrouped: List[Node] = [] for n in fc.nodes: k = getattr(n, "tier", None) if uses_tiers else n.group if k and k in bucket: bucket[k].append(n) else: ungrouped.append(n) # 计算容器 & 节点坐标 NODE_W, NODE_H = 200, 72 H_GAP, V_GAP = 48, 56 C_PAD = 28 # 容器内边距 C_HEAD = 40 # 容器 header 高度 START_X, START_Y = 48, 64 pos: Dict[str, Tuple[int, int]] = {} container_rect: Dict[str, Tuple[int, int, int, int]] = {} # id -> (x,y,w,h) cur_y = START_Y if container_list: # 所有容器里节点数量最多的 → 决定容器宽度 max_cols = max((len(bucket.get(cid, [])) for cid in container_list), default=1) max_cols = max(max_cols, 1) content_w = max_cols * NODE_W + (max_cols - 1) * H_GAP container_w = content_w + C_PAD * 2 for cid in container_list: rows = bucket.get(cid, []) # 分行:每行至多 max_cols 个 n_rows = max(1, (len(rows) + max_cols - 1) // max_cols) if rows else 1 container_h = C_HEAD + n_rows * NODE_H + (n_rows - 1) * V_GAP + C_PAD * 2 - C_PAD container_rect[cid] = (START_X, cur_y, container_w, container_h) # 节点位置相对画布 for i, n in enumerate(rows): row = i // max_cols col = i % max_cols x = START_X + C_PAD + col * (NODE_W + H_GAP) y = cur_y + C_HEAD + row * (NODE_H + V_GAP) pos[n.id] = (x, y) cur_y += container_h + V_GAP # ungrouped 节点一行排 if ungrouped: ug_cols = max(1, min(len(ungrouped), 4)) for i, n in enumerate(ungrouped): row = i // ug_cols col = i % ug_cols x = START_X + col * (NODE_W + H_GAP) y = cur_y + row * (NODE_H + V_GAP) pos[n.id] = (x, y) # 若仍没有任何节点位置(诡异情况)给个默认 for n in fc.nodes: if n.id not in pos: idx = fc.nodes.index(n) pos[n.id] = (START_X + (idx % 4) * (NODE_W + H_GAP), START_Y + (idx // 4) * (NODE_H + V_GAP)) # 分配 cell id CELL_ROOT = 1 next_id = [2] def alloc() -> int: nid = next_id[0] next_id[0] += 1 return nid container_cell: Dict[str, int] = {} node_cell: Dict[str, int] = {} # ---- 输出 XML ---- shadow_flag = "1" if shadow else "0" diag_name = _dx_escape(fc.title) if fc.title else "FlowChart" lines: List[str] = [ '', '', f' ', f' ', ' ', ' ', f' ', ] # 容器(tier/group)cell for cid in container_list: rect = container_rect.get(cid) if not rect: continue x, y, w, h = rect label_text = "" if uses_tiers: tobj = next((t for t in fc.tiers if t.id == cid), None) label_text = (tobj.label if tobj else cid) or cid else: gobj = next((g for g in fc.groups if g.id == cid), None) label_text = (gobj.label if gobj else cid) or cid cell_no = alloc() container_cell[cid] = cell_no container_style = ";".join([ "rounded=1", f"arcSize={corner + 4}", f"fillColor={sec}", f"strokeColor={tert}", f"strokeWidth={max(sw - 0.4, 1.0)}", f"fontColor={stroke}", "fontSize=14", "fontStyle=1", "verticalAlign=top", "align=left", "spacingTop=8", "spacingLeft=14", "dashed=0", f"shadow={shadow_flag}", "container=1", "collapsible=0", f"fontFamily={ff}", ]) lines.append( f' ' ) lines.append( f' ' ) lines.append(' ') # 节点 cell for n in fc.nodes: x, y = pos[n.id] container_id = (getattr(n, "tier", None) if uses_tiers else n.group) parent_cell = container_cell.get(container_id or "", CELL_ROOT) # 子单元坐标相对父容器 if parent_cell != CELL_ROOT and container_id in container_rect: cx, cy, _, _ = container_rect[container_id] nx, ny = x - cx, y - cy else: nx, ny = x, y colors = node_colors(n.shape) node_style_map: Dict[str, str] = { "fillColor": colors["fill"], "strokeColor": colors["stroke"], "fontColor": colors["text"], "fontFamily": ff, "fontSize": "13", "fontStyle": "1", "strokeWidth": str(sw), "shadow": shadow_flag, } # 渐变(仅当 style 提供了渐变起止色,且形状是"实心"节点时) if grad_start and grad_end and n.shape not in ("cylinder", "container_db", "db", "system_ext", "person_ext"): node_style_map["fillColor"] = grad_start node_style_map["gradientColor"] = grad_end node_style_map["gradientDirection"] = "north" style_str = _dx_style(n.shape, node_style_map) cell_no = alloc() node_cell[n.id] = cell_no label = _dx_escape(n.label or n.id) lines.append( f' ' ) lines.append( f' ' ) lines.append(' ') # 边 cell —— 现代:正交弧线 + 配色 for e in fc.edges: src = node_cell.get(e.src) dst = node_cell.get(e.dst) if src is None or dst is None: continue edge_map: Dict[str, str] = { "edgeStyle": "orthogonalEdgeStyle", "rounded": "1", "curved": "0", "arcSize": "12", "strokeColor": line, "strokeWidth": str(sw), "fontColor": stroke, "fontSize": "12", "fontFamily": ff, "labelBackgroundColor": bg, "html": "1", "endArrow": "classic", "endFill": "1", "jettySize": "auto", "orthogonalLoop": "1", } if e.kind == "dashed" or e.kind == "dotted": edge_map["dashed"] = "1" elif e.kind == "thick": edge_map["strokeWidth"] = str(sw + 1.5) elif e.kind == "bidir": edge_map["startArrow"] = "classic" edge_map["startFill"] = "1" edge_style = ";".join(f"{k}={v}" for k, v in edge_map.items()) cell_no = alloc() label = _dx_escape(e.label or "") lines.append( f' ' ) lines.append( ' ' ) lines.append(' ') lines.extend([ ' ', ' ', ' ', '', ]) return "\n".join(lines)