#!/usr/bin/env python3
"""markdown-to-feishu — convert a Markdown file (with rich embeds) into a Feishu
docx, using the lark-cli wrapper. Tables, images (URL + local), Mermaid /
PlantUML diagrams, and arbitrary attachments (PDF / CSV / log / anything) all
get planted as real DocxXML blocks. Re-runs against the same .md by default
update the previously-created doc instead of spawning a new one.
"""
from __future__ import annotations
import argparse
import html as html_lib
import json
import os
import re
import subprocess
import sys
import textwrap
import time
import uuid
from html.parser import HTMLParser
from pathlib import Path
from urllib.parse import urlparse
import markdown
STATE_DIR = Path(os.environ.get("MD2FEISHU_STATE_DIR", str(Path.home() / ".local/share/markdown-to-feishu")))
STATE_FILE = STATE_DIR / "state.json"
SENTINEL_PREFIX = "MD2FEISHU_SENTINEL"
VERSION = "0.1.0"
# ---------------------------------------------------------------------------
# State (markdown abs path -> doc id) so re-runs update in place
# ---------------------------------------------------------------------------
def load_state() -> dict:
if not STATE_FILE.exists():
return {}
try:
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return {}
def save_state(state: dict) -> None:
STATE_DIR.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8")
# ---------------------------------------------------------------------------
# lark-cli runner
# ---------------------------------------------------------------------------
class LarkError(RuntimeError):
pass
def run_lark(args: list[str], *, stdin: str | None = None, identity: str = "user", verbose: bool = False, cwd: str | None = None) -> dict:
cmd = ["lark-cli", "--as", identity] + args
if verbose:
cwd_note = f" (cwd={cwd})" if cwd else ""
sys.stderr.write(f"[lark] {' '.join(cmd)}{cwd_note}\n")
proc = subprocess.run(
cmd,
input=stdin,
capture_output=True,
text=True,
cwd=cwd,
)
if proc.returncode != 0:
raise LarkError(
f"lark-cli failed (exit {proc.returncode}): {' '.join(cmd)}\n"
f"stderr: {proc.stderr.strip()}\n"
f"stdout: {proc.stdout.strip()}"
)
if not proc.stdout.strip():
return {}
try:
return json.loads(proc.stdout)
except json.JSONDecodeError:
return {"_raw": proc.stdout}
# ---------------------------------------------------------------------------
# Markdown helpers
# ---------------------------------------------------------------------------
def is_http_url(s: str) -> bool:
p = urlparse(s)
return p.scheme in ("http", "https")
def is_anchor(s: str) -> bool:
return s.startswith("#")
def preprocess_markdown(text: str) -> str:
"""Handle GFM extras python-markdown core misses."""
# Strip BOM
if text.startswith(""):
text = text[1:]
out_lines: list[str] = []
in_fence = False
fence_re = re.compile(r"^\s*```")
strike_re = re.compile(r"~~(\S(?:.*?\S)?)~~")
# GFM task-list items at top level: "- [x] text" / "* [ ] text" / "1. [x] text"
# Convert to a stand-alone HTML SENTINEL are currently open in our output stream
# ---- sentinel handling ----
def _next_sentinel(self) -> str:
n = len(self.embeds)
# All caps + underscores so it never collides with normal markdown prose
return f"{SENTINEL_PREFIX}_{self.session_tag}_{n:04d}"
def _resolve_local(self, src: str) -> Path | None:
# Strip query/fragment for sanity
clean = src.split("#", 1)[0].split("?", 1)[0]
if not clean or is_http_url(clean) or is_anchor(clean):
return None
p = Path(clean)
if not p.is_absolute():
p = (self.md_dir / p).resolve()
return p if p.exists() and p.is_file() else None
# ---- emit helpers ----
def _emit(self, s: str) -> None:
# If we're buffering a table, append there instead
if self._table_buf is not None:
self._table_buf.append(s)
else:
self.out.append(s)
def _emit_placeholder(self, file: Path, kind: str, caption: str | None = None) -> None:
sentinel = self._next_sentinel()
self.embeds.append({
"sentinel": sentinel,
"file": str(file),
"type": kind,
"caption": caption,
})
# The placeholder must end up as its own top-level so media-insert
# can anchor on it cleanly and the cleanup pass can block_delete it.
# If we're currently inside a , split: close, emit standalone, reopen.
if self._table_buf is not None:
# Inside a table cell — best we can do is emit the sentinel as
# inline text and rely on str_replace cleanup. Media still lands at
# top level (per --selection-with-ellipsis semantics).
self._emit(sentinel)
return
if self._p_depth > 0:
self.out.append(" {sentinel} ")
return
self._emit(f" {sentinel} ")
return
if tag == "checkbox":
# Emitted by our preprocessor for GFM task list items.
done = attrd.get("done", "false")
self._emit(f'\1", line))
return "\n".join(out_lines)
# ---------------------------------------------------------------------------
# HTML -> DocxXML converter
# ---------------------------------------------------------------------------
INLINE_TAGS = {"a", "b", "strong", "em", "i", "u", "del", "s", "strike", "code", "span", "br", "img", "cite", "latex"}
BLOCK_PASSTHROUGH = {"p", "h1", "h2", "h3", "h4", "h5", "h6", "h7", "h8", "h9", "hr", "br"}
def xml_escape_text(s: str) -> str:
return s.replace("&", "&").replace("<", "<").replace(">", ">")
def xml_escape_attr(s: str) -> str:
return xml_escape_text(s).replace('"', """)
class DocxXMLBuilder(HTMLParser):
"""Walks python-markdown HTML and emits DocxXML.
Local images / attachments / non-inline-able media become placeholder
: capture verbatim
if self._in_pre:
# Don't recurse, but still record raw markup if any nested tags appear
if tag == "code":
self._code_lang = self._extract_lang(attrd.get("class", ""))
self._code_buf = []
return
# Table buffer mode: just copy markup through, no transformations needed
if self._table_buf is not None:
self._table_buf.append(self._raw_tag(tag, attrd))
if tag == "table":
self._table_depth += 1
return
if tag == "table":
self._table_buf = []
self._table_depth = 1
self._table_buf.append(self._raw_tag(tag, attrd))
return
if tag == "pre":
self._in_pre = True
return
if tag == "img":
self._emit_img(attrd)
return
if tag == "a":
href = attrd.get("href", "")
local = self._resolve_local(href) if href else None
if local is not None:
# Inline attachment: keep the link text in the prose so the
# paragraph still reads naturally, and queue a placeholder so
# the attachment block appears right after this paragraph.
caption = attrd.get("title") or None
self._emit_placeholder(local, "file", caption)
# Drop the tags (keep their text children) by pushing
# a "transparent" marker on the inline stack.
self._inline_stack.append("__TRANSPARENT_A__")
return
# Regular link
self._inline_stack.append("a")
attrs_s = self._attrs_string({"href": href})
self._emit(f"")
return
if tag in {"b", "strong"}:
self._inline_stack.append("b")
self._emit("")
return
if tag in {"em", "i"}:
self._inline_stack.append("em")
self._emit("")
return
if tag in {"u"}:
self._inline_stack.append("u")
self._emit("")
return
if tag in {"del", "s", "strike"}:
self._inline_stack.append("del")
self._emit("")
return
if tag == "span":
if self._inline_stack and self._inline_stack[-1] == "span":
self._inline_stack.pop()
self._emit("")
return
if tag == "ul":
if self._li_stack and self._li_stack[-1] == "ul":
self._li_stack.pop()
self._emit("")
return
if tag == "ol":
if self._li_stack and self._li_stack[-1] == "ol":
self._li_stack.pop()
self._emit("")
return
if tag == "li":
self._emit("")
return
if tag == "blockquote":
self._blockquote_depth = max(0, self._blockquote_depth - 1)
self._emit("")
return
if tag == "p":
self._p_depth = max(0, self._p_depth - 1)
self._emit("")
return
if tag == "code":
self._inline_stack.append("code")
self._emit("")
return
if tag in {"b", "strong"}:
if self._inline_stack and self._inline_stack[-1] == "b":
self._inline_stack.pop()
self._emit("")
return
if tag in {"em", "i"}:
if self._inline_stack and self._inline_stack[-1] == "em":
self._inline_stack.pop()
self._emit("")
return
if tag in {"u"}:
if self._inline_stack and self._inline_stack[-1] == "u":
self._inline_stack.pop()
self._emit("")
return
if tag in {"del", "s", "strike"}:
if self._inline_stack and self._inline_stack[-1] == "del":
self._inline_stack.pop()
self._emit("")
return
if tag == "code":
if self._inline_stack and self._inline_stack[-1] == "code":
self._inline_stack.pop()
self._emit("")
return
if tag == "br":
self._emit("
")
return
if tag == "ul":
self._li_stack.append("ul")
self._emit("")
return
if tag == "ol":
self._li_stack.append("ol")
self._emit("
")
return
if tag == "li":
if self._li_stack and self._li_stack[-1] == "ol":
self._emit('
")
return
if tag == "p":
self._p_depth += 1
self._emit("
outside we also escape (shouldn't normally happen)
self._emit(xml_escape_text(data))
# ---- code / language extraction ----
@staticmethod
def _extract_lang(class_attr: str) -> str:
# python-markdown fenced_code emits e.g. class="language-mermaid"
for tok in class_attr.split():
if tok.startswith("language-"):
return tok[len("language-"):]
if tok.startswith("lang-"):
return tok[len("lang-"):]
return ""
def _flush_code(self) -> None:
body = "".join(self._code_buf or [])
lang = (self._code_lang or "").strip().lower()
self._code_buf = None
self._code_lang = None
# Mermaid / PlantUML get rendered as whiteboards
if lang in {"mermaid"}:
self._emit(f'{xml_escape_text(body.rstrip())} ')
return
if lang in {"plantuml", "puml"}:
self._emit(f'{xml_escape_text(body.rstrip())} ')
return
# Strip trailing newline that python-markdown adds inside
body = body.rstrip("\n")
lang_attr = f' lang="{xml_escape_attr(lang)}"' if lang else ""
self._emit(f"{xml_escape_text(body)}
")
# ---- image emit ----
def _emit_img(self, attrd: dict) -> None:
src = attrd.get("src", "").strip()
alt = attrd.get("alt", "").strip()
title = attrd.get("title", "").strip()
caption = title or alt or None
if not src:
return
if is_http_url(src):
attrs_s = self._attrs_string({"href": src, "caption": caption, "name": alt or None})
self._emit(f"
")
return
local = self._resolve_local(src)
if local is None:
sys.stderr.write(f"[warn] image not found, dropping: {src}\n")
return
self._emit_placeholder(local, "image", caption)
# ---- attrs helpers ----
@staticmethod
def _attrs_string(d: dict) -> str:
parts = []
for k, v in d.items():
if v is None or v == "":
continue
parts.append(f' {k}="{xml_escape_attr(str(v))}"')
return "".join(parts)
@staticmethod
def _raw_tag(tag: str, attrd: dict) -> str:
return f"<{tag}{DocxXMLBuilder._attrs_string(attrd)}>"
@staticmethod
def _sanitise_table(html: str) -> str:
"""Coerce python-markdown's HTML table into DocxXML-legal markup:
- // become /
- Drop style="..." attributes (DocxXML uses background-color /
vertical-align, not CSS)
- Drop unknown attributes on cells
"""
# tag rename
html = re.sub(r"<(/?)strong\b", r"<\1b", html)
html = re.sub(r"<(/?)i\b", r"<\1em", html)
# drop style="..." on th/td/tr/table
html = re.sub(r'\s+style="[^"]*"', "", html)
# drop align="..." on th/td (we don't try to map to DocxXML alignment)
html = re.sub(r'\s+align="[^"]*"', "", html)
return html
# ---------------------------------------------------------------------------
# Driver
# ---------------------------------------------------------------------------
def derive_title(md_text: str, md_path: Path) -> str:
for line in md_text.splitlines():
line = line.strip()
if line.startswith("# "):
return line[2:].strip()
# fallback: filename without extension
return md_path.stem
def strip_first_h1(md_text: str) -> str:
"""Drop the first H1 line if present — we'll convey it via instead."""
out_lines: list[str] = []
dropped = False
for line in md_text.splitlines():
if not dropped and line.strip().startswith("# "):
dropped = True
continue
out_lines.append(line)
return "\n".join(out_lines)
def build_xml(md_path: Path, *, title: str, session_tag: str) -> tuple[str, list[dict]]:
raw = md_path.read_text(encoding="utf-8")
raw = preprocess_markdown(raw)
body_md = strip_first_h1(raw)
html = markdown.markdown(
body_md,
extensions=["fenced_code", "tables", "sane_lists"],
output_format="xhtml",
)
builder = DocxXMLBuilder(md_dir=md_path.parent, session_tag=session_tag)
builder.feed(html)
builder.close()
body_xml = "".join(builder.out)
# Unwrap stray ...
around block-level (python-markdown
# wraps unknown HTML tags in ); then collapse empty
left over
# from the placeholder split.
body_xml = re.sub(
r"\s*([^<]* )\s*
",
r"\1",
body_xml,
)
body_xml = re.sub(r"\s*
", "", body_xml)
title_xml = f"{xml_escape_text(title)} "
return title_xml + body_xml, builder.embeds
def create_or_overwrite_doc(*, doc_id: str | None, content: str, identity: str, parent_token: str | None, parent_position: str | None, verbose: bool) -> dict:
if doc_id:
if verbose:
sys.stderr.write(f"[md2feishu] overwriting existing doc {doc_id}\n")
# Use stdin for content to avoid argv length / shell escaping pitfalls
args = [
"docs", "+update",
"--api-version", "v2",
"--doc", doc_id,
"--command", "overwrite",
"--doc-format", "xml",
"--content", "-",
]
res = run_lark(args, stdin=content, identity=identity, verbose=verbose)
return {"doc_id": doc_id, "result": res}
if verbose:
sys.stderr.write("[md2feishu] creating new doc\n")
args = [
"docs", "+create",
"--api-version", "v2",
"--doc-format", "xml",
"--content", "-",
]
if parent_token:
args += ["--parent-token", parent_token]
if parent_position:
args += ["--parent-position", parent_position]
res = run_lark(args, stdin=content, identity=identity, verbose=verbose)
document = (res.get("data") or {}).get("document") or {}
new_id = document.get("document_id")
if not new_id:
raise LarkError(f"docs +create did not return a document_id: {json.dumps(res, ensure_ascii=False)}")
return {"doc_id": new_id, "url": document.get("url"), "result": res}
def insert_embed(doc_id: str, embed: dict, *, identity: str, verbose: bool) -> None:
# lark-cli refuses absolute paths for --file. cd into the file's parent
# and pass just the basename.
file_path = Path(embed["file"]).resolve()
args = [
"docs", "+media-insert",
"--doc", doc_id,
"--file", file_path.name,
"--type", embed["type"],
"--selection-with-ellipsis", embed["sentinel"],
"--before",
]
if embed.get("caption") and embed["type"] == "image":
args += ["--caption", embed["caption"]]
run_lark(args, identity=identity, verbose=verbose, cwd=str(file_path.parent))
def cleanup_sentinels(doc_id: str, session_tag: str, embeds: list[dict], *, identity: str, verbose: bool) -> None:
"""Two-pass cleanup:
1. block_delete any paragraph whose entire text is a sentinel
2. str_replace any remaining sentinel occurrences (handles sentinels
that ended up inline inside table cells or mixed prose)
"""
res = run_lark([
"docs", "+fetch",
"--api-version", "v2",
"--doc", doc_id,
"--detail", "with-ids",
"--doc-format", "xml",
], identity=identity, verbose=verbose)
xml_payload = ((res.get("data") or {}).get("document") or {}).get("content") or ""
if not xml_payload:
xml_payload = json.dumps(res, ensure_ascii=False)
sentinel_re = re.compile(
rf']*\bid="([^"]+)"[^>]*>\s*{SENTINEL_PREFIX}_{session_tag}_\d+\s*
'
)
ids = sentinel_re.findall(xml_payload)
if ids:
if verbose:
sys.stderr.write(f"[md2feishu] deleting {len(ids)} sentinel paragraph(s)\n")
try:
run_lark([
"docs", "+update",
"--api-version", "v2",
"--doc", doc_id,
"--command", "block_delete",
"--block-id", ",".join(ids),
], identity=identity, verbose=verbose)
except LarkError as e:
sys.stderr.write(f"[warn] block_delete cleanup failed: {e}\n")
# Fallback: scrub any inline sentinel text still in the doc
for embed in embeds:
sentinel = embed["sentinel"]
if sentinel in xml_payload and (not ids or f">{sentinel}<" not in xml_payload):
try:
run_lark([
"docs", "+update",
"--api-version", "v2",
"--doc", doc_id,
"--command", "str_replace",
"--pattern", sentinel,
"--content", "",
], identity=identity, verbose=verbose)
except LarkError as e:
sys.stderr.write(f"[warn] str_replace cleanup for {sentinel} failed: {e}\n")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
HELP_EPILOG = textwrap.dedent("""
EXAMPLES
# First run — creates a new Feishu doc, remembers the mapping
markdown-to-feishu ./report.md
# Re-run on the same file — updates the same doc in place (no new doc spawned)
markdown-to-feishu ./report.md
# Force a brand-new doc even when state already has a mapping
markdown-to-feishu --new ./report.md
# Update a specific doc explicitly, ignoring state file
markdown-to-feishu --update doxcnAbc123 ./report.md
# Drop into a particular folder when creating
markdown-to-feishu --parent-token fldcnXXXX ./report.md
# Put it under your personal knowledge library
markdown-to-feishu --parent-position my_library ./report.md
# Override the document title (default = first H1 or filename stem)
markdown-to-feishu --title "2026 Q2 OKR" ./okr.md
# Inspect the generated XML and embed plan, without touching Feishu
markdown-to-feishu --dry-run ./report.md
# Forget the mapping for a file (does NOT delete the Feishu doc)
markdown-to-feishu --forget ./report.md
# Show the recorded mapping for this file
markdown-to-feishu --show ./report.md
SUPPORTED MARKDOWN -> FEISHU BLOCK MAPPING
# / ## / ... / ###### -> ... (the first H1 becomes the
document )
paragraphs ->
**bold** / __bold__ ->
*italic* / _italic_ ->
~~strike~~ (GFM) ->
`inline code` ->
[text](https://...) -> text
[text](./local.pdf) -> attachment block (file uploaded via
docs +media-insert --type file)
 ->
(URL is fetched
server-side by Feishu)
 -> inline image block (file uploaded via
docs +media-insert --type image; alt /
title becomes caption)
> blockquote ->
--- / *** ->
- item / * item / 1. item ->