"""notes 多用途 sidecar: POST /transcribe — 用 ffmpeg 切片 + 串行调外部 ASR,绕过单请求大小限制 POST /convert — markdown-to-feishu,把会议纪要 push 飞书 docx """ import json import logging import os import re import shutil import subprocess import tempfile import uuid from pathlib import Path from typing import Optional import requests from fastapi import FastAPI, HTTPException from pydantic import BaseModel def probe_duration(src: Path) -> float: """browser-recorded webm/m4a 经常没在 metadata 里写 duration(录到一半结束没法 finalize)。 先 try ffprobe format.duration,N/A 时 fallback 让 ffmpeg null-muxer 解码一遍统计。 """ try: out = subprocess.check_output( ['ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'csv=p=0', str(src)], timeout=60, ).decode().strip() if out and out != 'N/A': return float(out) except (subprocess.CalledProcessError, ValueError, subprocess.TimeoutExpired): pass log.info("ffprobe format.duration=N/A, decoding to count time") proc = subprocess.run( ['ffmpeg', '-i', str(src), '-f', 'null', '-'], stderr=subprocess.PIPE, stdout=subprocess.DEVNULL, timeout=900, ) matches = re.findall(rb'time=(\d+):(\d+):(\d+(?:\.\d+)?)', proc.stderr) if not matches: raise HTTPException(500, f'cannot determine duration; ffmpeg stderr tail: {proc.stderr[-300:].decode("utf-8","replace")}') h, m, s = matches[-1] return int(h) * 3600 + int(m) * 60 + float(s) logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s: %(message)s') log = logging.getLogger('feishu') app = FastAPI() @app.get('/healthz') def healthz(): return {'ok': True} class TranscribeReq(BaseModel): audio_path: str chunk_seconds: int = 65 # 单段长度,远低于 Qwen3-ASR 8192-token cache(~7min) overlap_seconds: int = 10 # 相邻段重叠,给 LLM stitching 留 anchor @app.post('/transcribe') def transcribe(req: TranscribeReq): """ffmpeg 切 overlap 片 → 串行 ASR → LLM 拼接去重。""" src = Path(req.audio_path) if not src.exists(): raise HTTPException(400, f'audio not found: {src}') asr_url = os.environ.get('ASR_URL', '') asr_token = os.environ.get('ASR_TOKEN', '') if not asr_url or not asr_token: raise HTTPException(500, 'ASR_URL/ASR_TOKEN not configured in sidecar') tmp = Path(tempfile.gettempdir()) / f'transcribe-{uuid.uuid4().hex}' tmp.mkdir(parents=True) try: # 1) 拿总时长(ffprobe N/A 时回退 null-muxer 解码) duration = probe_duration(src) log.info("duration=%.1fs", duration) # 2) 切 chunk_seconds 段,stride = chunk_seconds - overlap_seconds stride = max(1, req.chunk_seconds - req.overlap_seconds) ext = src.suffix.lstrip('.') or 'm4a' chunks_meta = [] i = 0 start = 0.0 # 短录音单段够:不切,直接整段 single_shot = duration <= req.chunk_seconds + 5 if single_shot: chunks_meta = [{'start': 0.0, 'path': src, 'idx': 0}] else: while start < duration: cp = tmp / f'chunk_{i:03d}.{ext}' # -ss 在 -i 前:input seek,快;-c copy 不重新编码 try: subprocess.run( ['ffmpeg', '-y', '-ss', f'{start:.2f}', '-t', f'{req.chunk_seconds}', '-i', str(src), '-c', 'copy', str(cp)], check=True, capture_output=True, timeout=120, ) except subprocess.CalledProcessError: subprocess.run( ['ffmpeg', '-y', '-ss', f'{start:.2f}', '-t', f'{req.chunk_seconds}', '-i', str(src), '-c:a', 'aac', '-b:a', '64k', '-ac', '1', '-ar', '16000', str(cp)], check=True, capture_output=True, timeout=180, ) if cp.stat().st_size < 1024: break chunks_meta.append({'start': start, 'path': cp, 'idx': i}) start += stride i += 1 if not chunks_meta: raise HTTPException(500, 'no chunks produced') log.info("chunks=%d, stride=%ds, overlap=%ds", len(chunks_meta), stride, req.overlap_seconds) # 3) 串行 ASR chunk_texts = [] for m in chunks_meta: log.info("ASR chunk %d/%d (start=%.1fs, %dKB)", m['idx'] + 1, len(chunks_meta), m['start'], m['path'].stat().st_size // 1024) with open(m['path'], 'rb') as f: r = requests.post( asr_url, headers={'Authorization': f'Bearer {asr_token}'}, files={'file': (m['path'].name, f, 'audio/mp4')}, data={'model': 'qwen3-asr', 'response_format': 'json'}, timeout=300, ) if not r.ok: raise HTTPException(502, f'ASR chunk {m["idx"]} {r.status_code}: {r.text[:300]}') try: text = r.json().get('text', '').strip() except Exception: raise HTTPException(502, f'ASR chunk {m["idx"]} bad json: {r.text[:200]}') chunk_texts.append(text) # 4) 单段直接返回 if len(chunk_texts) == 1: return {'text': chunk_texts[0], 'chunks': 1, 'stitched_by': 'single'} # 5) LLM 拼接(gemma 一次性看所有 chunks 去重 + 拼) stitched = llm_stitch(chunk_texts, req.overlap_seconds) return { 'text': stitched, 'chunks': len(chunk_texts), 'stitched_by': 'llm', } finally: shutil.rmtree(tmp, ignore_errors=True) def llm_stitch(chunks: list[str], overlap_seconds: int) -> str: """让 LLM 把相邻段重叠部分去重 + 修正边界字。失败 fallback 朴素拼接。""" gw = os.environ.get('LLM_GATEWAY', '').rstrip('/') tok = os.environ.get('LLM_TOKEN', '') model = os.environ.get('LLM_MODEL', 'gemma-4-31b-it') naive = '\n'.join(chunks) if not gw or not tok: log.warning("LLM not configured, fall back to naive concat") return naive parts = [] for i, c in enumerate(chunks): parts.append(f"段 {i + 1}:\n{c}") user = ( f"下面是一段会议录音的 ASR 转写,被切成 {len(chunks)} 段。" f"相邻段有约 {overlap_seconds} 秒(几句话)的重叠。\n\n" + "\n\n".join(parts) + "\n\n请把所有段拼成一段连续文本:去掉相邻段交界处的重复、" "修正明显 ASR 错字(结合上下文)、补回被切断的词。\n" "不要加任何解释、标题、段号;只输出拼好的连续文本。" ) payload = { "model": model, "messages": [ {"role": "system", "content": "你是 ASR 转写后处理助手,专门做去重拼接和错字修正。"}, {"role": "user", "content": user}, ], "temperature": 0.1, } try: r = requests.post( gw + '/chat/completions', headers={'Authorization': f'Bearer {tok}'}, json=payload, timeout=600, ) if not r.ok: log.warning("stitch LLM %s: %s", r.status_code, r.text[:200]) return naive d = r.json() text = d['choices'][0]['message']['content'].strip() return text or naive except Exception as e: log.warning("stitch LLM call failed: %s", e) return naive class ConvertReq(BaseModel): md_path: str title: Optional[str] = None existing_doc_id: Optional[str] = None @app.post('/convert') def convert(req: ConvertReq): md = Path(req.md_path) if not md.exists(): raise HTTPException(400, f'md not found: {md}') # user identity = fam 自己拥有 doc(host 上手动跑过 OAuth 授权一次) cmd = ['/usr/local/bin/markdown-to-feishu', str(md), '--as', 'user'] if req.existing_doc_id: cmd += ['--update', req.existing_doc_id] if req.title: cmd += ['--title', req.title] log.info("run: %s", ' '.join(cmd)) env = os.environ.copy() # markdown-to-feishu state file 放 PVC,重启不丢 env['MD2FEISHU_STATE_DIR'] = '/data/feishu-state' Path('/data/feishu-state').mkdir(parents=True, exist_ok=True) try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=600, env=env, cwd=str(md.parent), ) except subprocess.TimeoutExpired: raise HTTPException(504, 'markdown-to-feishu timeout (>10min)') # exit code 2 = embeds 有失败,但 doc 创建成功,仍 parse stdout if proc.returncode not in (0, 2): log.warning("md2feishu exit=%d stderr=%s", proc.returncode, proc.stderr[-500:]) raise HTTPException(502, f'md2feishu exit {proc.returncode}: ' f'{proc.stderr.strip()[-400:]}') # 取 stdout 里最后一段 JSON 对象(script 的 final print) out = proc.stdout.strip() # 从后往前找第一个 '{',取到末尾 last_open = out.rfind('{') if last_open < 0: raise HTTPException(502, f'md2feishu no json output. stdout tail: {out[-400:]}') try: data = json.loads(out[last_open:]) except json.JSONDecodeError as e: raise HTTPException(502, f'md2feishu json parse: {e}; tail: {out[-400:]}') doc_id = data.get('doc_id') url = data.get('url') if not doc_id or not url: raise HTTPException(502, f'md2feishu missing doc_id/url: {data}') log.info("ok: doc_id=%s url=%s embeds=%s", doc_id, url, data.get('embeds_inserted')) return { 'doc_id': doc_id, 'url': url, 'embeds_inserted': data.get('embeds_inserted', 0), 'embeds_failed': data.get('embeds_failed', 0), }