68671784f6
deploy notes / build-and-deploy (push) Failing after 2m2s
- backend: POST /api/recordings/:id/feishu → 拼 markdown (总结在最上 + 附件链接到转录/录音 + 转写全文) → 写 /data/feishu-tmp/<id>/ → HTTP POST 到 feishu sidecar
- 复用:已有 feishu_doc_id 时 --update 同一个 doc,前端按钮文案变「↻ 重新生成」
- schema 加 feishu_doc_id + feishu_url 两列(ALTER TABLE 兼容旧 db)
- LLM prompt 改:行动项用 markdown checkbox `- [ ] 谁·做什么·何时`
- sidecar apps/notes/feishu: node:20 + python3 + python3-markdown + @larksuite/cli + COPY 自己的 markdown-to-feishu script + FastAPI /convert
- k8s: deployment 加 feishu container 共享 PVC;lark-cli-creds Secret 挂 /root/.lark-cli/config.json
- CI: 主 image --no-cache(cube 规矩),sidecar 保留 layer cache(chromium-free,但 apt/npm 也大)
- 前端: content 头部加「📤 一键转飞书文档」按钮;已转过显示飞书链接 + 按钮变重生成
971 lines
36 KiB
Python
Executable File
971 lines
36 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""markdown-to-feishu — convert a Markdown file (with rich embeds) into a Feishu
|
|
docx, using the lark-cli wrapper. Tables, images (URL + local), Mermaid /
|
|
PlantUML diagrams, and arbitrary attachments (PDF / CSV / log / anything) all
|
|
get planted as real DocxXML blocks. Re-runs against the same .md by default
|
|
update the previously-created doc instead of spawning a new one.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import html as html_lib
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import textwrap
|
|
import time
|
|
import uuid
|
|
from html.parser import HTMLParser
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
|
|
import markdown
|
|
|
|
|
|
STATE_DIR = Path(os.environ.get("MD2FEISHU_STATE_DIR", str(Path.home() / ".local/share/markdown-to-feishu")))
|
|
STATE_FILE = STATE_DIR / "state.json"
|
|
|
|
SENTINEL_PREFIX = "MD2FEISHU_SENTINEL"
|
|
|
|
VERSION = "0.1.0"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# State (markdown abs path -> doc id) so re-runs update in place
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_state() -> dict:
|
|
if not STATE_FILE.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
|
|
except (json.JSONDecodeError, OSError):
|
|
return {}
|
|
|
|
|
|
def save_state(state: dict) -> None:
|
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
STATE_FILE.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# lark-cli runner
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class LarkError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def run_lark(args: list[str], *, stdin: str | None = None, identity: str = "user", verbose: bool = False, cwd: str | None = None) -> dict:
|
|
cmd = ["lark-cli", "--as", identity] + args
|
|
if verbose:
|
|
cwd_note = f" (cwd={cwd})" if cwd else ""
|
|
sys.stderr.write(f"[lark] {' '.join(cmd)}{cwd_note}\n")
|
|
proc = subprocess.run(
|
|
cmd,
|
|
input=stdin,
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=cwd,
|
|
)
|
|
if proc.returncode != 0:
|
|
raise LarkError(
|
|
f"lark-cli failed (exit {proc.returncode}): {' '.join(cmd)}\n"
|
|
f"stderr: {proc.stderr.strip()}\n"
|
|
f"stdout: {proc.stdout.strip()}"
|
|
)
|
|
if not proc.stdout.strip():
|
|
return {}
|
|
try:
|
|
return json.loads(proc.stdout)
|
|
except json.JSONDecodeError:
|
|
return {"_raw": proc.stdout}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Markdown helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def is_http_url(s: str) -> bool:
|
|
p = urlparse(s)
|
|
return p.scheme in ("http", "https")
|
|
|
|
|
|
def is_anchor(s: str) -> bool:
|
|
return s.startswith("#")
|
|
|
|
|
|
def preprocess_markdown(text: str) -> str:
|
|
"""Handle GFM extras python-markdown core misses."""
|
|
# Strip BOM
|
|
if text.startswith(""):
|
|
text = text[1:]
|
|
out_lines: list[str] = []
|
|
in_fence = False
|
|
fence_re = re.compile(r"^\s*```")
|
|
strike_re = re.compile(r"~~(\S(?:.*?\S)?)~~")
|
|
# GFM task-list items at top level: "- [x] text" / "* [ ] text" / "1. [x] text"
|
|
# Convert to a stand-alone HTML <checkbox> block so python-markdown passes
|
|
# it through. Leading whitespace becomes a marker (so nested checkboxes
|
|
# don't get hoisted to top level).
|
|
task_re = re.compile(r"^(\s*)(?:[-*+]|\d+\.)\s+\[([ xX])\]\s+(.*)$")
|
|
for line in text.split("\n"):
|
|
if fence_re.match(line):
|
|
in_fence = not in_fence
|
|
out_lines.append(line)
|
|
continue
|
|
if in_fence:
|
|
out_lines.append(line)
|
|
continue
|
|
m = task_re.match(line)
|
|
if m and not m.group(1): # top-level only; nested stays a list item
|
|
done = "true" if m.group(2).lower() == "x" else "false"
|
|
body = m.group(3).strip()
|
|
# Surround with blank lines so it parses as raw HTML block
|
|
out_lines.append("")
|
|
out_lines.append(f'<checkbox done="{done}">{html_lib.escape(body)}</checkbox>')
|
|
out_lines.append("")
|
|
continue
|
|
out_lines.append(strike_re.sub(r"<del>\1</del>", line))
|
|
return "\n".join(out_lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HTML -> DocxXML converter
|
|
# ---------------------------------------------------------------------------
|
|
|
|
INLINE_TAGS = {"a", "b", "strong", "em", "i", "u", "del", "s", "strike", "code", "span", "br", "img", "cite", "latex"}
|
|
BLOCK_PASSTHROUGH = {"p", "h1", "h2", "h3", "h4", "h5", "h6", "h7", "h8", "h9", "hr", "br"}
|
|
|
|
|
|
def xml_escape_text(s: str) -> str:
|
|
return s.replace("&", "&").replace("<", "<").replace(">", ">")
|
|
|
|
|
|
def xml_escape_attr(s: str) -> str:
|
|
return xml_escape_text(s).replace('"', """)
|
|
|
|
|
|
class DocxXMLBuilder(HTMLParser):
|
|
"""Walks python-markdown HTML and emits DocxXML.
|
|
|
|
Local images / attachments / non-inline-able media become placeholder
|
|
<p>SENTINEL</p> paragraphs; each one is recorded in ``self.embeds`` so the
|
|
caller can media-insert the real file in the correct position afterwards.
|
|
"""
|
|
|
|
def __init__(self, md_dir: Path, session_tag: str):
|
|
super().__init__(convert_charrefs=True)
|
|
self.md_dir = md_dir
|
|
self.session_tag = session_tag
|
|
self.out: list[str] = []
|
|
self.embeds: list[dict] = [] # {sentinel, file, type, caption}
|
|
self._code_buf: list[str] | None = None
|
|
self._code_lang: str | None = None
|
|
self._table_buf: list[str] | None = None # we buffer the entire table so colspan/rowspan etc. just round-trip
|
|
self._table_depth = 0
|
|
self._in_pre = False
|
|
self._inline_stack: list[str] = []
|
|
self._li_stack: list[str] = [] # track ul/ol type for current li
|
|
self._blockquote_depth = 0
|
|
self._p_depth = 0 # how many <p> are currently open in our output stream
|
|
|
|
# ---- sentinel handling ----
|
|
def _next_sentinel(self) -> str:
|
|
n = len(self.embeds)
|
|
# All caps + underscores so it never collides with normal markdown prose
|
|
return f"{SENTINEL_PREFIX}_{self.session_tag}_{n:04d}"
|
|
|
|
def _resolve_local(self, src: str) -> Path | None:
|
|
# Strip query/fragment for sanity
|
|
clean = src.split("#", 1)[0].split("?", 1)[0]
|
|
if not clean or is_http_url(clean) or is_anchor(clean):
|
|
return None
|
|
p = Path(clean)
|
|
if not p.is_absolute():
|
|
p = (self.md_dir / p).resolve()
|
|
return p if p.exists() and p.is_file() else None
|
|
|
|
# ---- emit helpers ----
|
|
def _emit(self, s: str) -> None:
|
|
# If we're buffering a table, append there instead
|
|
if self._table_buf is not None:
|
|
self._table_buf.append(s)
|
|
else:
|
|
self.out.append(s)
|
|
|
|
def _emit_placeholder(self, file: Path, kind: str, caption: str | None = None) -> None:
|
|
sentinel = self._next_sentinel()
|
|
self.embeds.append({
|
|
"sentinel": sentinel,
|
|
"file": str(file),
|
|
"type": kind,
|
|
"caption": caption,
|
|
})
|
|
# The placeholder must end up as its own top-level <p> so media-insert
|
|
# can anchor on it cleanly and the cleanup pass can block_delete it.
|
|
# If we're currently inside a <p>, split: close, emit standalone, reopen.
|
|
if self._table_buf is not None:
|
|
# Inside a table cell — best we can do is emit the sentinel as
|
|
# inline text and rely on str_replace cleanup. Media still lands at
|
|
# top level (per --selection-with-ellipsis semantics).
|
|
self._emit(sentinel)
|
|
return
|
|
if self._p_depth > 0:
|
|
self.out.append("</p>")
|
|
self.out.append(f"<p>{sentinel}</p>")
|
|
self.out.append("<p>")
|
|
return
|
|
self._emit(f"<p>{sentinel}</p>")
|
|
|
|
# ---- HTMLParser hooks ----
|
|
def handle_starttag(self, tag, attrs):
|
|
attrd = dict(attrs)
|
|
|
|
# Inside <pre><code>: capture verbatim
|
|
if self._in_pre:
|
|
# Don't recurse, but still record raw markup if any nested tags appear
|
|
if tag == "code":
|
|
self._code_lang = self._extract_lang(attrd.get("class", ""))
|
|
self._code_buf = []
|
|
return
|
|
|
|
# Table buffer mode: just copy markup through, no transformations needed
|
|
if self._table_buf is not None:
|
|
self._table_buf.append(self._raw_tag(tag, attrd))
|
|
if tag == "table":
|
|
self._table_depth += 1
|
|
return
|
|
|
|
if tag == "table":
|
|
self._table_buf = []
|
|
self._table_depth = 1
|
|
self._table_buf.append(self._raw_tag(tag, attrd))
|
|
return
|
|
|
|
if tag == "pre":
|
|
self._in_pre = True
|
|
return
|
|
|
|
if tag == "img":
|
|
self._emit_img(attrd)
|
|
return
|
|
|
|
if tag == "a":
|
|
href = attrd.get("href", "")
|
|
local = self._resolve_local(href) if href else None
|
|
if local is not None:
|
|
# Inline attachment: keep the link text in the prose so the
|
|
# paragraph still reads naturally, and queue a placeholder so
|
|
# the attachment block appears right after this paragraph.
|
|
caption = attrd.get("title") or None
|
|
self._emit_placeholder(local, "file", caption)
|
|
# Drop the <a> tags (keep their text children) by pushing
|
|
# a "transparent" marker on the inline stack.
|
|
self._inline_stack.append("__TRANSPARENT_A__")
|
|
return
|
|
# Regular link
|
|
self._inline_stack.append("a")
|
|
attrs_s = self._attrs_string({"href": href})
|
|
self._emit(f"<a{attrs_s}>")
|
|
return
|
|
|
|
if tag in {"b", "strong"}:
|
|
self._inline_stack.append("b")
|
|
self._emit("<b>")
|
|
return
|
|
if tag in {"em", "i"}:
|
|
self._inline_stack.append("em")
|
|
self._emit("<em>")
|
|
return
|
|
if tag in {"u"}:
|
|
self._inline_stack.append("u")
|
|
self._emit("<u>")
|
|
return
|
|
if tag in {"del", "s", "strike"}:
|
|
self._inline_stack.append("del")
|
|
self._emit("<del>")
|
|
return
|
|
if tag == "code":
|
|
self._inline_stack.append("code")
|
|
self._emit("<code>")
|
|
return
|
|
if tag == "br":
|
|
self._emit("<br/>")
|
|
return
|
|
|
|
if tag == "ul":
|
|
self._li_stack.append("ul")
|
|
self._emit("<ul>")
|
|
return
|
|
if tag == "ol":
|
|
self._li_stack.append("ol")
|
|
self._emit("<ol>")
|
|
return
|
|
if tag == "li":
|
|
if self._li_stack and self._li_stack[-1] == "ol":
|
|
self._emit('<li seq="auto">')
|
|
else:
|
|
self._emit("<li>")
|
|
return
|
|
|
|
if tag == "blockquote":
|
|
self._blockquote_depth += 1
|
|
self._emit("<blockquote>")
|
|
return
|
|
|
|
if tag == "p":
|
|
self._p_depth += 1
|
|
self._emit("<p>")
|
|
return
|
|
|
|
if tag == "checkbox":
|
|
# Emitted by our preprocessor for GFM task list items.
|
|
done = attrd.get("done", "false")
|
|
self._emit(f'<checkbox done="{xml_escape_attr(done)}">')
|
|
self._inline_stack.append("checkbox")
|
|
return
|
|
|
|
if tag in BLOCK_PASSTHROUGH:
|
|
self._emit(f"<{tag}>")
|
|
return
|
|
|
|
# span etc.
|
|
if tag == "span":
|
|
self._inline_stack.append("span")
|
|
self._emit("<span>")
|
|
return
|
|
|
|
# Anything else we don't recognise — drop the tag, keep its text
|
|
self._inline_stack.append("__UNKNOWN__")
|
|
|
|
def handle_endtag(self, tag):
|
|
if self._in_pre:
|
|
if tag == "code":
|
|
self._flush_code()
|
|
elif tag == "pre":
|
|
self._in_pre = False
|
|
return
|
|
|
|
if self._table_buf is not None:
|
|
self._table_buf.append(f"</{tag}>")
|
|
if tag == "table":
|
|
self._table_depth -= 1
|
|
if self._table_depth == 0:
|
|
table_xml = "".join(self._table_buf)
|
|
self._table_buf = None
|
|
# Clean the buffered HTML so it's valid DocxXML
|
|
self.out.append(self._sanitise_table(table_xml))
|
|
return
|
|
|
|
if tag == "pre":
|
|
self._in_pre = False
|
|
return
|
|
|
|
if tag == "img":
|
|
return
|
|
|
|
if tag == "a":
|
|
top = self._inline_stack.pop() if self._inline_stack else None
|
|
if top == "__TRANSPARENT_A__":
|
|
return
|
|
self._emit("</a>")
|
|
return
|
|
|
|
if tag in {"b", "strong"}:
|
|
if self._inline_stack and self._inline_stack[-1] == "b":
|
|
self._inline_stack.pop()
|
|
self._emit("</b>")
|
|
return
|
|
if tag in {"em", "i"}:
|
|
if self._inline_stack and self._inline_stack[-1] == "em":
|
|
self._inline_stack.pop()
|
|
self._emit("</em>")
|
|
return
|
|
if tag in {"u"}:
|
|
if self._inline_stack and self._inline_stack[-1] == "u":
|
|
self._inline_stack.pop()
|
|
self._emit("</u>")
|
|
return
|
|
if tag in {"del", "s", "strike"}:
|
|
if self._inline_stack and self._inline_stack[-1] == "del":
|
|
self._inline_stack.pop()
|
|
self._emit("</del>")
|
|
return
|
|
if tag == "code":
|
|
if self._inline_stack and self._inline_stack[-1] == "code":
|
|
self._inline_stack.pop()
|
|
self._emit("</code>")
|
|
return
|
|
if tag == "span":
|
|
if self._inline_stack and self._inline_stack[-1] == "span":
|
|
self._inline_stack.pop()
|
|
self._emit("</span>")
|
|
return
|
|
|
|
if tag == "ul":
|
|
if self._li_stack and self._li_stack[-1] == "ul":
|
|
self._li_stack.pop()
|
|
self._emit("</ul>")
|
|
return
|
|
if tag == "ol":
|
|
if self._li_stack and self._li_stack[-1] == "ol":
|
|
self._li_stack.pop()
|
|
self._emit("</ol>")
|
|
return
|
|
if tag == "li":
|
|
self._emit("</li>")
|
|
return
|
|
|
|
if tag == "blockquote":
|
|
self._blockquote_depth = max(0, self._blockquote_depth - 1)
|
|
self._emit("</blockquote>")
|
|
return
|
|
|
|
if tag == "p":
|
|
self._p_depth = max(0, self._p_depth - 1)
|
|
self._emit("</p>")
|
|
return
|
|
|
|
if tag == "checkbox":
|
|
if self._inline_stack and self._inline_stack[-1] == "checkbox":
|
|
self._inline_stack.pop()
|
|
self._emit("</checkbox>")
|
|
return
|
|
|
|
if tag in BLOCK_PASSTHROUGH:
|
|
self._emit(f"</{tag}>")
|
|
return
|
|
|
|
if self._inline_stack and self._inline_stack[-1] == "__UNKNOWN__":
|
|
self._inline_stack.pop()
|
|
|
|
def handle_startendtag(self, tag, attrs):
|
|
attrd = dict(attrs)
|
|
if tag == "img":
|
|
self._emit_img(attrd)
|
|
return
|
|
if tag == "br":
|
|
self._emit("<br/>")
|
|
return
|
|
if tag == "hr":
|
|
self._emit("<hr/>")
|
|
return
|
|
# Treat as start+end
|
|
self.handle_starttag(tag, attrs)
|
|
self.handle_endtag(tag)
|
|
|
|
def handle_data(self, data):
|
|
if not data:
|
|
return
|
|
if self._in_pre and self._code_buf is not None:
|
|
self._code_buf.append(data)
|
|
return
|
|
if self._table_buf is not None:
|
|
self._table_buf.append(xml_escape_text(data))
|
|
return
|
|
# Preserve user text but escape XML specials
|
|
# In <pre> outside <code> we also escape (shouldn't normally happen)
|
|
self._emit(xml_escape_text(data))
|
|
|
|
# ---- code / language extraction ----
|
|
@staticmethod
|
|
def _extract_lang(class_attr: str) -> str:
|
|
# python-markdown fenced_code emits e.g. class="language-mermaid"
|
|
for tok in class_attr.split():
|
|
if tok.startswith("language-"):
|
|
return tok[len("language-"):]
|
|
if tok.startswith("lang-"):
|
|
return tok[len("lang-"):]
|
|
return ""
|
|
|
|
def _flush_code(self) -> None:
|
|
body = "".join(self._code_buf or [])
|
|
lang = (self._code_lang or "").strip().lower()
|
|
self._code_buf = None
|
|
self._code_lang = None
|
|
# Mermaid / PlantUML get rendered as whiteboards
|
|
if lang in {"mermaid"}:
|
|
self._emit(f'<whiteboard type="mermaid">{xml_escape_text(body.rstrip())}</whiteboard>')
|
|
return
|
|
if lang in {"plantuml", "puml"}:
|
|
self._emit(f'<whiteboard type="plantuml">{xml_escape_text(body.rstrip())}</whiteboard>')
|
|
return
|
|
# Strip trailing newline that python-markdown adds inside <code>
|
|
body = body.rstrip("\n")
|
|
lang_attr = f' lang="{xml_escape_attr(lang)}"' if lang else ""
|
|
self._emit(f"<pre{lang_attr}><code>{xml_escape_text(body)}</code></pre>")
|
|
|
|
# ---- image emit ----
|
|
def _emit_img(self, attrd: dict) -> None:
|
|
src = attrd.get("src", "").strip()
|
|
alt = attrd.get("alt", "").strip()
|
|
title = attrd.get("title", "").strip()
|
|
caption = title or alt or None
|
|
if not src:
|
|
return
|
|
if is_http_url(src):
|
|
attrs_s = self._attrs_string({"href": src, "caption": caption, "name": alt or None})
|
|
self._emit(f"<img{attrs_s}/>")
|
|
return
|
|
local = self._resolve_local(src)
|
|
if local is None:
|
|
sys.stderr.write(f"[warn] image not found, dropping: {src}\n")
|
|
return
|
|
self._emit_placeholder(local, "image", caption)
|
|
|
|
# ---- attrs helpers ----
|
|
@staticmethod
|
|
def _attrs_string(d: dict) -> str:
|
|
parts = []
|
|
for k, v in d.items():
|
|
if v is None or v == "":
|
|
continue
|
|
parts.append(f' {k}="{xml_escape_attr(str(v))}"')
|
|
return "".join(parts)
|
|
|
|
@staticmethod
|
|
def _raw_tag(tag: str, attrd: dict) -> str:
|
|
return f"<{tag}{DocxXMLBuilder._attrs_string(attrd)}>"
|
|
|
|
@staticmethod
|
|
def _sanitise_table(html: str) -> str:
|
|
"""Coerce python-markdown's HTML table into DocxXML-legal markup:
|
|
- <strong>/<em>/<i> become <b>/<em>
|
|
- Drop style="..." attributes (DocxXML uses background-color /
|
|
vertical-align, not CSS)
|
|
- Drop unknown attributes on cells
|
|
"""
|
|
# tag rename
|
|
html = re.sub(r"<(/?)strong\b", r"<\1b", html)
|
|
html = re.sub(r"<(/?)i\b", r"<\1em", html)
|
|
# drop style="..." on th/td/tr/table
|
|
html = re.sub(r'\s+style="[^"]*"', "", html)
|
|
# drop align="..." on th/td (we don't try to map to DocxXML alignment)
|
|
html = re.sub(r'\s+align="[^"]*"', "", html)
|
|
return html
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Driver
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def derive_title(md_text: str, md_path: Path) -> str:
|
|
for line in md_text.splitlines():
|
|
line = line.strip()
|
|
if line.startswith("# "):
|
|
return line[2:].strip()
|
|
# fallback: filename without extension
|
|
return md_path.stem
|
|
|
|
|
|
def strip_first_h1(md_text: str) -> str:
|
|
"""Drop the first H1 line if present — we'll convey it via <title> instead."""
|
|
out_lines: list[str] = []
|
|
dropped = False
|
|
for line in md_text.splitlines():
|
|
if not dropped and line.strip().startswith("# "):
|
|
dropped = True
|
|
continue
|
|
out_lines.append(line)
|
|
return "\n".join(out_lines)
|
|
|
|
|
|
def build_xml(md_path: Path, *, title: str, session_tag: str) -> tuple[str, list[dict]]:
|
|
raw = md_path.read_text(encoding="utf-8")
|
|
raw = preprocess_markdown(raw)
|
|
body_md = strip_first_h1(raw)
|
|
html = markdown.markdown(
|
|
body_md,
|
|
extensions=["fenced_code", "tables", "sane_lists"],
|
|
output_format="xhtml",
|
|
)
|
|
builder = DocxXMLBuilder(md_dir=md_path.parent, session_tag=session_tag)
|
|
builder.feed(html)
|
|
builder.close()
|
|
body_xml = "".join(builder.out)
|
|
# Unwrap stray <p>...</p> around block-level <checkbox> (python-markdown
|
|
# wraps unknown HTML tags in <p>); then collapse empty <p></p> left over
|
|
# from the placeholder split.
|
|
body_xml = re.sub(
|
|
r"<p>\s*(<checkbox\s+done=\"(?:true|false)\">[^<]*</checkbox>)\s*</p>",
|
|
r"\1",
|
|
body_xml,
|
|
)
|
|
body_xml = re.sub(r"<p>\s*</p>", "", body_xml)
|
|
title_xml = f"<title>{xml_escape_text(title)}</title>"
|
|
return title_xml + body_xml, builder.embeds
|
|
|
|
|
|
def create_or_overwrite_doc(*, doc_id: str | None, content: str, identity: str, parent_token: str | None, parent_position: str | None, verbose: bool) -> dict:
|
|
if doc_id:
|
|
if verbose:
|
|
sys.stderr.write(f"[md2feishu] overwriting existing doc {doc_id}\n")
|
|
# Use stdin for content to avoid argv length / shell escaping pitfalls
|
|
args = [
|
|
"docs", "+update",
|
|
"--api-version", "v2",
|
|
"--doc", doc_id,
|
|
"--command", "overwrite",
|
|
"--doc-format", "xml",
|
|
"--content", "-",
|
|
]
|
|
res = run_lark(args, stdin=content, identity=identity, verbose=verbose)
|
|
return {"doc_id": doc_id, "result": res}
|
|
if verbose:
|
|
sys.stderr.write("[md2feishu] creating new doc\n")
|
|
args = [
|
|
"docs", "+create",
|
|
"--api-version", "v2",
|
|
"--doc-format", "xml",
|
|
"--content", "-",
|
|
]
|
|
if parent_token:
|
|
args += ["--parent-token", parent_token]
|
|
if parent_position:
|
|
args += ["--parent-position", parent_position]
|
|
res = run_lark(args, stdin=content, identity=identity, verbose=verbose)
|
|
document = (res.get("data") or {}).get("document") or {}
|
|
new_id = document.get("document_id")
|
|
if not new_id:
|
|
raise LarkError(f"docs +create did not return a document_id: {json.dumps(res, ensure_ascii=False)}")
|
|
return {"doc_id": new_id, "url": document.get("url"), "result": res}
|
|
|
|
|
|
def insert_embed(doc_id: str, embed: dict, *, identity: str, verbose: bool) -> None:
|
|
# lark-cli refuses absolute paths for --file. cd into the file's parent
|
|
# and pass just the basename.
|
|
file_path = Path(embed["file"]).resolve()
|
|
args = [
|
|
"docs", "+media-insert",
|
|
"--doc", doc_id,
|
|
"--file", file_path.name,
|
|
"--type", embed["type"],
|
|
"--selection-with-ellipsis", embed["sentinel"],
|
|
"--before",
|
|
]
|
|
if embed.get("caption") and embed["type"] == "image":
|
|
args += ["--caption", embed["caption"]]
|
|
run_lark(args, identity=identity, verbose=verbose, cwd=str(file_path.parent))
|
|
|
|
|
|
def cleanup_sentinels(doc_id: str, session_tag: str, embeds: list[dict], *, identity: str, verbose: bool) -> None:
|
|
"""Two-pass cleanup:
|
|
1. block_delete any paragraph whose entire text is a sentinel
|
|
2. str_replace any remaining sentinel occurrences (handles sentinels
|
|
that ended up inline inside table cells or mixed prose)
|
|
"""
|
|
res = run_lark([
|
|
"docs", "+fetch",
|
|
"--api-version", "v2",
|
|
"--doc", doc_id,
|
|
"--detail", "with-ids",
|
|
"--doc-format", "xml",
|
|
], identity=identity, verbose=verbose)
|
|
xml_payload = ((res.get("data") or {}).get("document") or {}).get("content") or ""
|
|
if not xml_payload:
|
|
xml_payload = json.dumps(res, ensure_ascii=False)
|
|
sentinel_re = re.compile(
|
|
rf'<p[^>]*\bid="([^"]+)"[^>]*>\s*{SENTINEL_PREFIX}_{session_tag}_\d+\s*</p>'
|
|
)
|
|
ids = sentinel_re.findall(xml_payload)
|
|
if ids:
|
|
if verbose:
|
|
sys.stderr.write(f"[md2feishu] deleting {len(ids)} sentinel paragraph(s)\n")
|
|
try:
|
|
run_lark([
|
|
"docs", "+update",
|
|
"--api-version", "v2",
|
|
"--doc", doc_id,
|
|
"--command", "block_delete",
|
|
"--block-id", ",".join(ids),
|
|
], identity=identity, verbose=verbose)
|
|
except LarkError as e:
|
|
sys.stderr.write(f"[warn] block_delete cleanup failed: {e}\n")
|
|
# Fallback: scrub any inline sentinel text still in the doc
|
|
for embed in embeds:
|
|
sentinel = embed["sentinel"]
|
|
if sentinel in xml_payload and (not ids or f">{sentinel}<" not in xml_payload):
|
|
try:
|
|
run_lark([
|
|
"docs", "+update",
|
|
"--api-version", "v2",
|
|
"--doc", doc_id,
|
|
"--command", "str_replace",
|
|
"--pattern", sentinel,
|
|
"--content", "",
|
|
], identity=identity, verbose=verbose)
|
|
except LarkError as e:
|
|
sys.stderr.write(f"[warn] str_replace cleanup for {sentinel} failed: {e}\n")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
HELP_EPILOG = textwrap.dedent("""
|
|
EXAMPLES
|
|
# First run — creates a new Feishu doc, remembers the mapping
|
|
markdown-to-feishu ./report.md
|
|
|
|
# Re-run on the same file — updates the same doc in place (no new doc spawned)
|
|
markdown-to-feishu ./report.md
|
|
|
|
# Force a brand-new doc even when state already has a mapping
|
|
markdown-to-feishu --new ./report.md
|
|
|
|
# Update a specific doc explicitly, ignoring state file
|
|
markdown-to-feishu --update doxcnAbc123 ./report.md
|
|
|
|
# Drop into a particular folder when creating
|
|
markdown-to-feishu --parent-token fldcnXXXX ./report.md
|
|
|
|
# Put it under your personal knowledge library
|
|
markdown-to-feishu --parent-position my_library ./report.md
|
|
|
|
# Override the document title (default = first H1 or filename stem)
|
|
markdown-to-feishu --title "2026 Q2 OKR" ./okr.md
|
|
|
|
# Inspect the generated XML and embed plan, without touching Feishu
|
|
markdown-to-feishu --dry-run ./report.md
|
|
|
|
# Forget the mapping for a file (does NOT delete the Feishu doc)
|
|
markdown-to-feishu --forget ./report.md
|
|
|
|
# Show the recorded mapping for this file
|
|
markdown-to-feishu --show ./report.md
|
|
|
|
SUPPORTED MARKDOWN -> FEISHU BLOCK MAPPING
|
|
# / ## / ... / ###### -> <h1> ... <h9> (the first H1 becomes the
|
|
document <title>)
|
|
paragraphs -> <p>
|
|
**bold** / __bold__ -> <b>
|
|
*italic* / _italic_ -> <em>
|
|
~~strike~~ (GFM) -> <del>
|
|
`inline code` -> <code>
|
|
[text](https://...) -> <a href="...">text</a>
|
|
[text](./local.pdf) -> attachment block (file uploaded via
|
|
docs +media-insert --type file)
|
|
 -> <img href="https://..."/> (URL is fetched
|
|
server-side by Feishu)
|
|
 -> inline image block (file uploaded via
|
|
docs +media-insert --type image; alt /
|
|
title becomes caption)
|
|
> blockquote -> <blockquote>
|
|
--- / *** -> <hr/>
|
|
- item / * item / 1. item -> <ul> / <ol> with seq="auto"
|
|
nested lists (4-space indent) -> nested <ul> / <ol>
|
|
| a | b | GFM tables -> <table><thead><tr><th>...
|
|
```lang ... ``` -> <pre lang="lang"><code>...</code></pre>
|
|
```mermaid ... ``` -> <whiteboard type="mermaid">...</whiteboard>
|
|
```plantuml ... ``` -> <whiteboard type="plantuml">...</whiteboard>
|
|
|
|
ATTACHMENT DETECTION
|
|
Any [text](path) link whose href is NOT an http(s) URL and NOT an in-doc
|
|
anchor (#foo), and which resolves to an existing local file (relative to
|
|
the markdown file's directory), is uploaded as a Feishu file block. The
|
|
visible link text is dropped — the attachment block carries the filename
|
|
itself. This is what makes pasting PDFs / CSVs / logs / arbitrary binaries
|
|
feel "native".
|
|
|
|
Caveat: if a link resolves to a missing local file, it falls through to a
|
|
regular <a> link (the path will appear as-is). Run with --verbose to see
|
|
each resolution decision.
|
|
|
|
IDENTITY
|
|
Defaults to --as user so the created doc is owned by YOUR Feishu account,
|
|
not the bot. This means you can manage / move / delete it directly from
|
|
Feishu without any ownership transfer dance. Use --as bot only if you
|
|
explicitly want bot-owned documents.
|
|
|
|
UPDATE-BY-DEFAULT BEHAVIOUR
|
|
State lives at ~/.local/share/markdown-to-feishu/state.json (override with
|
|
$MD2FEISHU_STATE_DIR or --state-file). Keyed by the markdown file's
|
|
absolute path. When state has a doc_id for the given path:
|
|
|
|
- default -> overwrite that doc in place
|
|
- --new -> ignore state, create a fresh doc, replace
|
|
the mapping with the new id
|
|
- --update <id> -> overwrite the given id and update state
|
|
|
|
overwrite replays the full XML and re-uploads every local media file from
|
|
source, so the doc always matches the markdown 1:1. Comments on the doc
|
|
survive overwrite; manual edits inside the doc do NOT (markdown is the
|
|
source of truth).
|
|
|
|
EXIT CODES
|
|
0 success
|
|
1 generic error (bad args, file not found, lark-cli failure)
|
|
2 partial success — doc created/updated but at least one embed failed
|
|
|
|
ENVIRONMENT
|
|
MD2FEISHU_STATE_DIR override the directory holding state.json
|
|
LARK_CLI_PROFILE passed through; honoured by lark-cli itself
|
|
|
|
DEPENDENCIES
|
|
python3, python3-markdown, lark-cli (must be authenticated as user via
|
|
`lark-cli auth login`)
|
|
""")
|
|
|
|
|
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
|
p = argparse.ArgumentParser(
|
|
prog="markdown-to-feishu",
|
|
description="Convert a Markdown file (with rich embeds: tables, images, mermaid, attachments) into a Feishu docx. Re-runs update the previously-created doc by default.",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=HELP_EPILOG,
|
|
)
|
|
p.add_argument("markdown", nargs="?", help="path to the .md file")
|
|
p.add_argument("--new", action="store_true", help="force-create a new doc even if state already has a mapping for this file")
|
|
p.add_argument("--update", metavar="DOC_ID", help="overwrite the given doc id (URL also accepted); ignores and then updates state")
|
|
p.add_argument("--title", help="override document title (default: first H1, else filename stem)")
|
|
p.add_argument("--parent-token", help="parent folder or wiki node token (only used when creating)")
|
|
p.add_argument("--parent-position", help="parent position keyword, e.g. my_library (only used when creating)")
|
|
p.add_argument("--as", dest="identity", choices=["user", "bot"], default="user", help="identity for lark-cli (default: user, so you own the doc)")
|
|
p.add_argument("--dry-run", action="store_true", help="print generated XML + embed plan without calling lark-cli")
|
|
p.add_argument("--state-file", help="override path to state.json (default: ~/.local/share/markdown-to-feishu/state.json)")
|
|
p.add_argument("--forget", action="store_true", help="remove the state mapping for this file (does not delete the Feishu doc) and exit")
|
|
p.add_argument("--show", action="store_true", help="print the recorded mapping for this file (if any) and exit")
|
|
p.add_argument("-v", "--verbose", action="store_true", help="verbose logging (every lark-cli invocation)")
|
|
p.add_argument("--version", action="version", version=f"markdown-to-feishu {VERSION}")
|
|
return p.parse_args(argv)
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
args = parse_args(argv)
|
|
global STATE_FILE, STATE_DIR
|
|
if args.state_file:
|
|
STATE_FILE = Path(args.state_file).expanduser().resolve()
|
|
STATE_DIR = STATE_FILE.parent
|
|
|
|
if not args.markdown:
|
|
sys.stderr.write("error: missing markdown file (use --help)\n")
|
|
return 1
|
|
|
|
md_path = Path(args.markdown).expanduser().resolve()
|
|
if not md_path.exists() or not md_path.is_file():
|
|
sys.stderr.write(f"error: {md_path} is not a file\n")
|
|
return 1
|
|
key = str(md_path)
|
|
|
|
state = load_state()
|
|
|
|
if args.show:
|
|
entry = state.get(key)
|
|
if entry is None:
|
|
print(f"no mapping recorded for {md_path}")
|
|
else:
|
|
print(json.dumps(entry, indent=2, ensure_ascii=False))
|
|
return 0
|
|
|
|
if args.forget:
|
|
if key in state:
|
|
state.pop(key)
|
|
save_state(state)
|
|
print(f"forgot mapping for {md_path}")
|
|
else:
|
|
print(f"no mapping recorded for {md_path}")
|
|
return 0
|
|
|
|
md_text = md_path.read_text(encoding="utf-8")
|
|
title = args.title or derive_title(md_text, md_path)
|
|
session_tag = uuid.uuid4().hex[:8].upper()
|
|
|
|
try:
|
|
content, embeds = build_xml(md_path, title=title, session_tag=session_tag)
|
|
except Exception as e:
|
|
sys.stderr.write(f"error: failed to build XML: {e}\n")
|
|
return 1
|
|
|
|
if args.dry_run:
|
|
print("=== GENERATED DOCXXML ===")
|
|
print(content)
|
|
print()
|
|
print("=== EMBED PLAN ===")
|
|
if not embeds:
|
|
print("(no out-of-band embeds)")
|
|
else:
|
|
for e in embeds:
|
|
print(json.dumps(e, ensure_ascii=False))
|
|
target = "new doc"
|
|
if args.update:
|
|
target = f"update doc {args.update}"
|
|
elif not args.new and key in state:
|
|
target = f"update existing doc {state[key].get('doc_id')}"
|
|
print()
|
|
print(f"=== TARGET ===\n{target}")
|
|
return 0
|
|
|
|
# Decide create-vs-update
|
|
explicit_doc = args.update
|
|
if explicit_doc and explicit_doc.startswith("http"):
|
|
# extract /docx/<id>
|
|
m = re.search(r"/docx/([A-Za-z0-9]+)", explicit_doc)
|
|
if m:
|
|
explicit_doc = m.group(1)
|
|
target_doc_id = None
|
|
if explicit_doc:
|
|
target_doc_id = explicit_doc
|
|
elif not args.new and key in state:
|
|
target_doc_id = state[key].get("doc_id")
|
|
|
|
try:
|
|
outcome = create_or_overwrite_doc(
|
|
doc_id=target_doc_id,
|
|
content=content,
|
|
identity=args.identity,
|
|
parent_token=args.parent_token,
|
|
parent_position=args.parent_position,
|
|
verbose=args.verbose,
|
|
)
|
|
except LarkError as e:
|
|
sys.stderr.write(f"error: {e}\n")
|
|
return 1
|
|
|
|
doc_id = outcome["doc_id"]
|
|
failed_embeds: list[dict] = []
|
|
for embed in embeds:
|
|
try:
|
|
insert_embed(doc_id, embed, identity=args.identity, verbose=args.verbose)
|
|
except LarkError as e:
|
|
sys.stderr.write(f"[warn] failed to insert {embed['file']}: {e}\n")
|
|
failed_embeds.append(embed)
|
|
|
|
# Always try to clean up sentinels we managed to anchor
|
|
if embeds:
|
|
try:
|
|
cleanup_sentinels(doc_id, session_tag, embeds, identity=args.identity, verbose=args.verbose)
|
|
except LarkError as e:
|
|
sys.stderr.write(f"[warn] cleanup failed: {e}\n")
|
|
|
|
# Save state
|
|
entry = state.get(key, {})
|
|
entry.update({
|
|
"doc_id": doc_id,
|
|
"url": outcome.get("url") or entry.get("url"),
|
|
"updated_at": time.strftime("%Y-%m-%dT%H:%M:%S%z"),
|
|
"title": title,
|
|
})
|
|
if entry.get("url") is None and not target_doc_id:
|
|
# Fetch URL via a separate call if it wasn't returned (shouldn't happen on create)
|
|
pass
|
|
state[key] = entry
|
|
save_state(state)
|
|
|
|
print(json.dumps({
|
|
"doc_id": doc_id,
|
|
"url": entry.get("url"),
|
|
"title": title,
|
|
"embeds_inserted": len(embeds) - len(failed_embeds),
|
|
"embeds_failed": len(failed_embeds),
|
|
}, indent=2, ensure_ascii=False))
|
|
|
|
return 2 if failed_embeds else 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main(sys.argv[1:]))
|