#!/usr/bin/env python3 """markdown-to-feishu — convert a Markdown file (with rich embeds) into a Feishu docx, using the lark-cli wrapper. Tables, images (URL + local), Mermaid / PlantUML diagrams, and arbitrary attachments (PDF / CSV / log / anything) all get planted as real DocxXML blocks. Re-runs against the same .md by default update the previously-created doc instead of spawning a new one. """ from __future__ import annotations import argparse import html as html_lib import json import os import re import subprocess import sys import textwrap import time import uuid from html.parser import HTMLParser from pathlib import Path from urllib.parse import urlparse import markdown STATE_DIR = Path(os.environ.get("MD2FEISHU_STATE_DIR", str(Path.home() / ".local/share/markdown-to-feishu"))) STATE_FILE = STATE_DIR / "state.json" SENTINEL_PREFIX = "MD2FEISHU_SENTINEL" VERSION = "0.1.0" # --------------------------------------------------------------------------- # State (markdown abs path -> doc id) so re-runs update in place # --------------------------------------------------------------------------- def load_state() -> dict: if not STATE_FILE.exists(): return {} try: return json.loads(STATE_FILE.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): return {} def save_state(state: dict) -> None: STATE_DIR.mkdir(parents=True, exist_ok=True) STATE_FILE.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8") # --------------------------------------------------------------------------- # lark-cli runner # --------------------------------------------------------------------------- class LarkError(RuntimeError): pass def run_lark(args: list[str], *, stdin: str | None = None, identity: str = "user", verbose: bool = False, cwd: str | None = None) -> dict: cmd = ["lark-cli", "--as", identity] + args if verbose: cwd_note = f" (cwd={cwd})" if cwd else "" sys.stderr.write(f"[lark] {' '.join(cmd)}{cwd_note}\n") proc = subprocess.run( cmd, input=stdin, capture_output=True, text=True, cwd=cwd, ) if proc.returncode != 0: raise LarkError( f"lark-cli failed (exit {proc.returncode}): {' '.join(cmd)}\n" f"stderr: {proc.stderr.strip()}\n" f"stdout: {proc.stdout.strip()}" ) if not proc.stdout.strip(): return {} try: return json.loads(proc.stdout) except json.JSONDecodeError: return {"_raw": proc.stdout} # --------------------------------------------------------------------------- # Markdown helpers # --------------------------------------------------------------------------- def is_http_url(s: str) -> bool: p = urlparse(s) return p.scheme in ("http", "https") def is_anchor(s: str) -> bool: return s.startswith("#") def preprocess_markdown(text: str) -> str: """Handle GFM extras python-markdown core misses.""" # Strip BOM if text.startswith(""): text = text[1:] out_lines: list[str] = [] in_fence = False fence_re = re.compile(r"^\s*```") strike_re = re.compile(r"~~(\S(?:.*?\S)?)~~") # GFM task-list items at top level: "- [x] text" / "* [ ] text" / "1. [x] text" # Convert to a stand-alone HTML block so python-markdown passes # it through. Leading whitespace becomes a marker (so nested checkboxes # don't get hoisted to top level). task_re = re.compile(r"^(\s*)(?:[-*+]|\d+\.)\s+\[([ xX])\]\s+(.*)$") for line in text.split("\n"): if fence_re.match(line): in_fence = not in_fence out_lines.append(line) continue if in_fence: out_lines.append(line) continue m = task_re.match(line) if m and not m.group(1): # top-level only; nested stays a list item done = "true" if m.group(2).lower() == "x" else "false" body = m.group(3).strip() # Surround with blank lines so it parses as raw HTML block out_lines.append("") out_lines.append(f'{html_lib.escape(body)}') out_lines.append("") continue out_lines.append(strike_re.sub(r"\1", line)) return "\n".join(out_lines) # --------------------------------------------------------------------------- # HTML -> DocxXML converter # --------------------------------------------------------------------------- INLINE_TAGS = {"a", "b", "strong", "em", "i", "u", "del", "s", "strike", "code", "span", "br", "img", "cite", "latex"} BLOCK_PASSTHROUGH = {"p", "h1", "h2", "h3", "h4", "h5", "h6", "h7", "h8", "h9", "hr", "br"} def xml_escape_text(s: str) -> str: return s.replace("&", "&").replace("<", "<").replace(">", ">") def xml_escape_attr(s: str) -> str: return xml_escape_text(s).replace('"', """) class DocxXMLBuilder(HTMLParser): """Walks python-markdown HTML and emits DocxXML. Local images / attachments / non-inline-able media become placeholder

SENTINEL

paragraphs; each one is recorded in ``self.embeds`` so the caller can media-insert the real file in the correct position afterwards. """ def __init__(self, md_dir: Path, session_tag: str): super().__init__(convert_charrefs=True) self.md_dir = md_dir self.session_tag = session_tag self.out: list[str] = [] self.embeds: list[dict] = [] # {sentinel, file, type, caption} self._code_buf: list[str] | None = None self._code_lang: str | None = None self._table_buf: list[str] | None = None # we buffer the entire table so colspan/rowspan etc. just round-trip self._table_depth = 0 self._in_pre = False self._inline_stack: list[str] = [] self._li_stack: list[str] = [] # track ul/ol type for current li self._blockquote_depth = 0 self._p_depth = 0 # how many

are currently open in our output stream # ---- sentinel handling ---- def _next_sentinel(self) -> str: n = len(self.embeds) # All caps + underscores so it never collides with normal markdown prose return f"{SENTINEL_PREFIX}_{self.session_tag}_{n:04d}" def _resolve_local(self, src: str) -> Path | None: # Strip query/fragment for sanity clean = src.split("#", 1)[0].split("?", 1)[0] if not clean or is_http_url(clean) or is_anchor(clean): return None p = Path(clean) if not p.is_absolute(): p = (self.md_dir / p).resolve() return p if p.exists() and p.is_file() else None # ---- emit helpers ---- def _emit(self, s: str) -> None: # If we're buffering a table, append there instead if self._table_buf is not None: self._table_buf.append(s) else: self.out.append(s) def _emit_placeholder(self, file: Path, kind: str, caption: str | None = None) -> None: sentinel = self._next_sentinel() self.embeds.append({ "sentinel": sentinel, "file": str(file), "type": kind, "caption": caption, }) # The placeholder must end up as its own top-level

so media-insert # can anchor on it cleanly and the cleanup pass can block_delete it. # If we're currently inside a

, split: close, emit standalone, reopen. if self._table_buf is not None: # Inside a table cell — best we can do is emit the sentinel as # inline text and rely on str_replace cleanup. Media still lands at # top level (per --selection-with-ellipsis semantics). self._emit(sentinel) return if self._p_depth > 0: self.out.append("

") self.out.append(f"

{sentinel}

") self.out.append("

") return self._emit(f"

{sentinel}

") # ---- HTMLParser hooks ---- def handle_starttag(self, tag, attrs): attrd = dict(attrs) # Inside
: capture verbatim
        if self._in_pre:
            # Don't recurse, but still record raw markup if any nested tags appear
            if tag == "code":
                self._code_lang = self._extract_lang(attrd.get("class", ""))
                self._code_buf = []
            return

        # Table buffer mode: just copy markup through, no transformations needed
        if self._table_buf is not None:
            self._table_buf.append(self._raw_tag(tag, attrd))
            if tag == "table":
                self._table_depth += 1
            return

        if tag == "table":
            self._table_buf = []
            self._table_depth = 1
            self._table_buf.append(self._raw_tag(tag, attrd))
            return

        if tag == "pre":
            self._in_pre = True
            return

        if tag == "img":
            self._emit_img(attrd)
            return

        if tag == "a":
            href = attrd.get("href", "")
            local = self._resolve_local(href) if href else None
            if local is not None:
                # Inline attachment: keep the link text in the prose so the
                # paragraph still reads naturally, and queue a placeholder so
                # the attachment block appears right after this paragraph.
                caption = attrd.get("title") or None
                self._emit_placeholder(local, "file", caption)
                # Drop the  tags (keep their text children) by pushing
                # a "transparent" marker on the inline stack.
                self._inline_stack.append("__TRANSPARENT_A__")
                return
            # Regular link
            self._inline_stack.append("a")
            attrs_s = self._attrs_string({"href": href})
            self._emit(f"")
            return

        if tag in {"b", "strong"}:
            self._inline_stack.append("b")
            self._emit("")
            return
        if tag in {"em", "i"}:
            self._inline_stack.append("em")
            self._emit("")
            return
        if tag in {"u"}:
            self._inline_stack.append("u")
            self._emit("")
            return
        if tag in {"del", "s", "strike"}:
            self._inline_stack.append("del")
            self._emit("")
            return
        if tag == "code":
            self._inline_stack.append("code")
            self._emit("")
            return
        if tag == "br":
            self._emit("
") return if tag == "ul": self._li_stack.append("ul") self._emit("
") return if tag == "ol": if self._li_stack and self._li_stack[-1] == "ol": self._li_stack.pop() self._emit("") return if tag == "li": self._emit("") return if tag == "blockquote": self._blockquote_depth = max(0, self._blockquote_depth - 1) self._emit("") return if tag == "p": self._p_depth = max(0, self._p_depth - 1) self._emit("

") return if tag == "checkbox": if self._inline_stack and self._inline_stack[-1] == "checkbox": self._inline_stack.pop() self._emit("") return if tag in BLOCK_PASSTHROUGH: self._emit(f"") return if self._inline_stack and self._inline_stack[-1] == "__UNKNOWN__": self._inline_stack.pop() def handle_startendtag(self, tag, attrs): attrd = dict(attrs) if tag == "img": self._emit_img(attrd) return if tag == "br": self._emit("
") return if tag == "hr": self._emit("
") return # Treat as start+end self.handle_starttag(tag, attrs) self.handle_endtag(tag) def handle_data(self, data): if not data: return if self._in_pre and self._code_buf is not None: self._code_buf.append(data) return if self._table_buf is not None: self._table_buf.append(xml_escape_text(data)) return # Preserve user text but escape XML specials # In
 outside  we also escape (shouldn't normally happen)
        self._emit(xml_escape_text(data))

    # ---- code / language extraction ----
    @staticmethod
    def _extract_lang(class_attr: str) -> str:
        # python-markdown fenced_code emits e.g. class="language-mermaid"
        for tok in class_attr.split():
            if tok.startswith("language-"):
                return tok[len("language-"):]
            if tok.startswith("lang-"):
                return tok[len("lang-"):]
        return ""

    def _flush_code(self) -> None:
        body = "".join(self._code_buf or [])
        lang = (self._code_lang or "").strip().lower()
        self._code_buf = None
        self._code_lang = None
        # Mermaid / PlantUML get rendered as whiteboards
        if lang in {"mermaid"}:
            self._emit(f'{xml_escape_text(body.rstrip())}')
            return
        if lang in {"plantuml", "puml"}:
            self._emit(f'{xml_escape_text(body.rstrip())}')
            return
        # Strip trailing newline that python-markdown adds inside 
        body = body.rstrip("\n")
        lang_attr = f' lang="{xml_escape_attr(lang)}"' if lang else ""
        self._emit(f"{xml_escape_text(body)}
") # ---- image emit ---- def _emit_img(self, attrd: dict) -> None: src = attrd.get("src", "").strip() alt = attrd.get("alt", "").strip() title = attrd.get("title", "").strip() caption = title or alt or None if not src: return if is_http_url(src): attrs_s = self._attrs_string({"href": src, "caption": caption, "name": alt or None}) self._emit(f"") return local = self._resolve_local(src) if local is None: sys.stderr.write(f"[warn] image not found, dropping: {src}\n") return self._emit_placeholder(local, "image", caption) # ---- attrs helpers ---- @staticmethod def _attrs_string(d: dict) -> str: parts = [] for k, v in d.items(): if v is None or v == "": continue parts.append(f' {k}="{xml_escape_attr(str(v))}"') return "".join(parts) @staticmethod def _raw_tag(tag: str, attrd: dict) -> str: return f"<{tag}{DocxXMLBuilder._attrs_string(attrd)}>" @staticmethod def _sanitise_table(html: str) -> str: """Coerce python-markdown's HTML table into DocxXML-legal markup: - // become / - Drop style="..." attributes (DocxXML uses background-color / vertical-align, not CSS) - Drop unknown attributes on cells """ # tag rename html = re.sub(r"<(/?)strong\b", r"<\1b", html) html = re.sub(r"<(/?)i\b", r"<\1em", html) # drop style="..." on th/td/tr/table html = re.sub(r'\s+style="[^"]*"', "", html) # drop align="..." on th/td (we don't try to map to DocxXML alignment) html = re.sub(r'\s+align="[^"]*"', "", html) return html # --------------------------------------------------------------------------- # Driver # --------------------------------------------------------------------------- def derive_title(md_text: str, md_path: Path) -> str: for line in md_text.splitlines(): line = line.strip() if line.startswith("# "): return line[2:].strip() # fallback: filename without extension return md_path.stem def strip_first_h1(md_text: str) -> str: """Drop the first H1 line if present — we'll convey it via instead.""" out_lines: list[str] = [] dropped = False for line in md_text.splitlines(): if not dropped and line.strip().startswith("# "): dropped = True continue out_lines.append(line) return "\n".join(out_lines) def build_xml(md_path: Path, *, title: str, session_tag: str) -> tuple[str, list[dict]]: raw = md_path.read_text(encoding="utf-8") raw = preprocess_markdown(raw) body_md = strip_first_h1(raw) html = markdown.markdown( body_md, extensions=["fenced_code", "tables", "sane_lists"], output_format="xhtml", ) builder = DocxXMLBuilder(md_dir=md_path.parent, session_tag=session_tag) builder.feed(html) builder.close() body_xml = "".join(builder.out) # Unwrap stray <p>...</p> around block-level <checkbox> (python-markdown # wraps unknown HTML tags in <p>); then collapse empty <p></p> left over # from the placeholder split. body_xml = re.sub( r"<p>\s*(<checkbox\s+done=\"(?:true|false)\">[^<]*</checkbox>)\s*</p>", r"\1", body_xml, ) body_xml = re.sub(r"<p>\s*</p>", "", body_xml) title_xml = f"<title>{xml_escape_text(title)}" return title_xml + body_xml, builder.embeds def create_or_overwrite_doc(*, doc_id: str | None, content: str, identity: str, parent_token: str | None, parent_position: str | None, verbose: bool) -> dict: if doc_id: if verbose: sys.stderr.write(f"[md2feishu] overwriting existing doc {doc_id}\n") # Use stdin for content to avoid argv length / shell escaping pitfalls args = [ "docs", "+update", "--api-version", "v2", "--doc", doc_id, "--command", "overwrite", "--doc-format", "xml", "--content", "-", ] res = run_lark(args, stdin=content, identity=identity, verbose=verbose) return {"doc_id": doc_id, "result": res} if verbose: sys.stderr.write("[md2feishu] creating new doc\n") args = [ "docs", "+create", "--api-version", "v2", "--doc-format", "xml", "--content", "-", ] if parent_token: args += ["--parent-token", parent_token] if parent_position: args += ["--parent-position", parent_position] res = run_lark(args, stdin=content, identity=identity, verbose=verbose) document = (res.get("data") or {}).get("document") or {} new_id = document.get("document_id") if not new_id: raise LarkError(f"docs +create did not return a document_id: {json.dumps(res, ensure_ascii=False)}") return {"doc_id": new_id, "url": document.get("url"), "result": res} def insert_embed(doc_id: str, embed: dict, *, identity: str, verbose: bool) -> None: # lark-cli refuses absolute paths for --file. cd into the file's parent # and pass just the basename. file_path = Path(embed["file"]).resolve() args = [ "docs", "+media-insert", "--doc", doc_id, "--file", file_path.name, "--type", embed["type"], "--selection-with-ellipsis", embed["sentinel"], "--before", ] if embed.get("caption") and embed["type"] == "image": args += ["--caption", embed["caption"]] run_lark(args, identity=identity, verbose=verbose, cwd=str(file_path.parent)) def cleanup_sentinels(doc_id: str, session_tag: str, embeds: list[dict], *, identity: str, verbose: bool) -> None: """Two-pass cleanup: 1. block_delete any paragraph whose entire text is a sentinel 2. str_replace any remaining sentinel occurrences (handles sentinels that ended up inline inside table cells or mixed prose) """ res = run_lark([ "docs", "+fetch", "--api-version", "v2", "--doc", doc_id, "--detail", "with-ids", "--doc-format", "xml", ], identity=identity, verbose=verbose) xml_payload = ((res.get("data") or {}).get("document") or {}).get("content") or "" if not xml_payload: xml_payload = json.dumps(res, ensure_ascii=False) sentinel_re = re.compile( rf']*\bid="([^"]+)"[^>]*>\s*{SENTINEL_PREFIX}_{session_tag}_\d+\s*

' ) ids = sentinel_re.findall(xml_payload) if ids: if verbose: sys.stderr.write(f"[md2feishu] deleting {len(ids)} sentinel paragraph(s)\n") try: run_lark([ "docs", "+update", "--api-version", "v2", "--doc", doc_id, "--command", "block_delete", "--block-id", ",".join(ids), ], identity=identity, verbose=verbose) except LarkError as e: sys.stderr.write(f"[warn] block_delete cleanup failed: {e}\n") # Fallback: scrub any inline sentinel text still in the doc for embed in embeds: sentinel = embed["sentinel"] if sentinel in xml_payload and (not ids or f">{sentinel}<" not in xml_payload): try: run_lark([ "docs", "+update", "--api-version", "v2", "--doc", doc_id, "--command", "str_replace", "--pattern", sentinel, "--content", "", ], identity=identity, verbose=verbose) except LarkError as e: sys.stderr.write(f"[warn] str_replace cleanup for {sentinel} failed: {e}\n") # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- HELP_EPILOG = textwrap.dedent(""" EXAMPLES # First run — creates a new Feishu doc, remembers the mapping markdown-to-feishu ./report.md # Re-run on the same file — updates the same doc in place (no new doc spawned) markdown-to-feishu ./report.md # Force a brand-new doc even when state already has a mapping markdown-to-feishu --new ./report.md # Update a specific doc explicitly, ignoring state file markdown-to-feishu --update doxcnAbc123 ./report.md # Drop into a particular folder when creating markdown-to-feishu --parent-token fldcnXXXX ./report.md # Put it under your personal knowledge library markdown-to-feishu --parent-position my_library ./report.md # Override the document title (default = first H1 or filename stem) markdown-to-feishu --title "2026 Q2 OKR" ./okr.md # Inspect the generated XML and embed plan, without touching Feishu markdown-to-feishu --dry-run ./report.md # Forget the mapping for a file (does NOT delete the Feishu doc) markdown-to-feishu --forget ./report.md # Show the recorded mapping for this file markdown-to-feishu --show ./report.md SUPPORTED MARKDOWN -> FEISHU BLOCK MAPPING # / ## / ... / ###### ->

... (the first H1 becomes the document ) paragraphs -> <p> **bold** / __bold__ -> <b> *italic* / _italic_ -> <em> ~~strike~~ (GFM) -> <del> `inline code` -> <code> [text](https://...) -> <a href="...">text</a> [text](./local.pdf) -> attachment block (file uploaded via docs +media-insert --type file) ![alt](https://...) -> <img href="https://..."/> (URL is fetched server-side by Feishu) ![alt](./local.png) -> inline image block (file uploaded via docs +media-insert --type image; alt / title becomes caption) > blockquote -> <blockquote> --- / *** -> <hr/> - item / * item / 1. item -> <ul> / <ol> with seq="auto" nested lists (4-space indent) -> nested <ul> / <ol> | a | b | GFM tables -> <table><thead><tr><th>... ```lang ... ``` -> <pre lang="lang"><code>...</code></pre> ```mermaid ... ``` -> <whiteboard type="mermaid">...</whiteboard> ```plantuml ... ``` -> <whiteboard type="plantuml">...</whiteboard> ATTACHMENT DETECTION Any [text](path) link whose href is NOT an http(s) URL and NOT an in-doc anchor (#foo), and which resolves to an existing local file (relative to the markdown file's directory), is uploaded as a Feishu file block. The visible link text is dropped — the attachment block carries the filename itself. This is what makes pasting PDFs / CSVs / logs / arbitrary binaries feel "native". Caveat: if a link resolves to a missing local file, it falls through to a regular <a> link (the path will appear as-is). Run with --verbose to see each resolution decision. IDENTITY Defaults to --as user so the created doc is owned by YOUR Feishu account, not the bot. This means you can manage / move / delete it directly from Feishu without any ownership transfer dance. Use --as bot only if you explicitly want bot-owned documents. UPDATE-BY-DEFAULT BEHAVIOUR State lives at ~/.local/share/markdown-to-feishu/state.json (override with $MD2FEISHU_STATE_DIR or --state-file). Keyed by the markdown file's absolute path. When state has a doc_id for the given path: - default -> overwrite that doc in place - --new -> ignore state, create a fresh doc, replace the mapping with the new id - --update <id> -> overwrite the given id and update state overwrite replays the full XML and re-uploads every local media file from source, so the doc always matches the markdown 1:1. Comments on the doc survive overwrite; manual edits inside the doc do NOT (markdown is the source of truth). EXIT CODES 0 success 1 generic error (bad args, file not found, lark-cli failure) 2 partial success — doc created/updated but at least one embed failed ENVIRONMENT MD2FEISHU_STATE_DIR override the directory holding state.json LARK_CLI_PROFILE passed through; honoured by lark-cli itself DEPENDENCIES python3, python3-markdown, lark-cli (must be authenticated as user via `lark-cli auth login`) """) def parse_args(argv: list[str]) -> argparse.Namespace: p = argparse.ArgumentParser( prog="markdown-to-feishu", description="Convert a Markdown file (with rich embeds: tables, images, mermaid, attachments) into a Feishu docx. Re-runs update the previously-created doc by default.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=HELP_EPILOG, ) p.add_argument("markdown", nargs="?", help="path to the .md file") p.add_argument("--new", action="store_true", help="force-create a new doc even if state already has a mapping for this file") p.add_argument("--update", metavar="DOC_ID", help="overwrite the given doc id (URL also accepted); ignores and then updates state") p.add_argument("--title", help="override document title (default: first H1, else filename stem)") p.add_argument("--parent-token", help="parent folder or wiki node token (only used when creating)") p.add_argument("--parent-position", help="parent position keyword, e.g. my_library (only used when creating)") p.add_argument("--as", dest="identity", choices=["user", "bot"], default="user", help="identity for lark-cli (default: user, so you own the doc)") p.add_argument("--dry-run", action="store_true", help="print generated XML + embed plan without calling lark-cli") p.add_argument("--state-file", help="override path to state.json (default: ~/.local/share/markdown-to-feishu/state.json)") p.add_argument("--forget", action="store_true", help="remove the state mapping for this file (does not delete the Feishu doc) and exit") p.add_argument("--show", action="store_true", help="print the recorded mapping for this file (if any) and exit") p.add_argument("-v", "--verbose", action="store_true", help="verbose logging (every lark-cli invocation)") p.add_argument("--version", action="version", version=f"markdown-to-feishu {VERSION}") return p.parse_args(argv) def main(argv: list[str]) -> int: args = parse_args(argv) global STATE_FILE, STATE_DIR if args.state_file: STATE_FILE = Path(args.state_file).expanduser().resolve() STATE_DIR = STATE_FILE.parent if not args.markdown: sys.stderr.write("error: missing markdown file (use --help)\n") return 1 md_path = Path(args.markdown).expanduser().resolve() if not md_path.exists() or not md_path.is_file(): sys.stderr.write(f"error: {md_path} is not a file\n") return 1 key = str(md_path) state = load_state() if args.show: entry = state.get(key) if entry is None: print(f"no mapping recorded for {md_path}") else: print(json.dumps(entry, indent=2, ensure_ascii=False)) return 0 if args.forget: if key in state: state.pop(key) save_state(state) print(f"forgot mapping for {md_path}") else: print(f"no mapping recorded for {md_path}") return 0 md_text = md_path.read_text(encoding="utf-8") title = args.title or derive_title(md_text, md_path) session_tag = uuid.uuid4().hex[:8].upper() try: content, embeds = build_xml(md_path, title=title, session_tag=session_tag) except Exception as e: sys.stderr.write(f"error: failed to build XML: {e}\n") return 1 if args.dry_run: print("=== GENERATED DOCXXML ===") print(content) print() print("=== EMBED PLAN ===") if not embeds: print("(no out-of-band embeds)") else: for e in embeds: print(json.dumps(e, ensure_ascii=False)) target = "new doc" if args.update: target = f"update doc {args.update}" elif not args.new and key in state: target = f"update existing doc {state[key].get('doc_id')}" print() print(f"=== TARGET ===\n{target}") return 0 # Decide create-vs-update explicit_doc = args.update if explicit_doc and explicit_doc.startswith("http"): # extract /docx/<id> m = re.search(r"/docx/([A-Za-z0-9]+)", explicit_doc) if m: explicit_doc = m.group(1) target_doc_id = None if explicit_doc: target_doc_id = explicit_doc elif not args.new and key in state: target_doc_id = state[key].get("doc_id") try: outcome = create_or_overwrite_doc( doc_id=target_doc_id, content=content, identity=args.identity, parent_token=args.parent_token, parent_position=args.parent_position, verbose=args.verbose, ) except LarkError as e: sys.stderr.write(f"error: {e}\n") return 1 doc_id = outcome["doc_id"] failed_embeds: list[dict] = [] for embed in embeds: try: insert_embed(doc_id, embed, identity=args.identity, verbose=args.verbose) except LarkError as e: sys.stderr.write(f"[warn] failed to insert {embed['file']}: {e}\n") failed_embeds.append(embed) # Always try to clean up sentinels we managed to anchor if embeds: try: cleanup_sentinels(doc_id, session_tag, embeds, identity=args.identity, verbose=args.verbose) except LarkError as e: sys.stderr.write(f"[warn] cleanup failed: {e}\n") # Save state entry = state.get(key, {}) entry.update({ "doc_id": doc_id, "url": outcome.get("url") or entry.get("url"), "updated_at": time.strftime("%Y-%m-%dT%H:%M:%S%z"), "title": title, }) if entry.get("url") is None and not target_doc_id: # Fetch URL via a separate call if it wasn't returned (shouldn't happen on create) pass state[key] = entry save_state(state) print(json.dumps({ "doc_id": doc_id, "url": entry.get("url"), "title": title, "embeds_inserted": len(embeds) - len(failed_embeds), "embeds_failed": len(failed_embeds), }, indent=2, ensure_ascii=False)) return 2 if failed_embeds else 0 if __name__ == "__main__": sys.exit(main(sys.argv[1:]))