Files
cube/apps/notes/feishu/server.py
T
Fam Zheng ca11a9bda7
deploy notes / build-and-deploy (push) Successful in 1m58s
notes(asr): ffprobe duration=N/A 时回退用 ffmpeg null-muxer 解码统计
浏览器内 MediaRecorder 录的 webm/m4a 经常 metadata 没写 duration
(录到一半浏览器关掉 tab 没正常 finalize 文件)。ffprobe format.duration
返回 N/A。回退跑 `ffmpeg -i input -f null -`,从 stderr 最后一行
"time=HH:MM:SS.MS" parse 出实际秒数。慢一点但永远能拿到。
2026-05-18 00:40:23 +01:00

272 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""notes 多用途 sidecar
POST /transcribe — 用 ffmpeg 切片 + 串行调外部 ASR,绕过单请求大小限制
POST /convert — markdown-to-feishu,把会议纪要 push 飞书 docx
"""
import json
import logging
import os
import re
import shutil
import subprocess
import tempfile
import uuid
from pathlib import Path
from typing import Optional
import requests
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
def probe_duration(src: Path) -> float:
"""browser-recorded webm/m4a 经常没在 metadata 里写 duration(录到一半结束没法 finalize)。
先 try ffprobe format.durationN/A 时 fallback 让 ffmpeg null-muxer 解码一遍统计。
"""
try:
out = subprocess.check_output(
['ffprobe', '-v', 'quiet', '-show_entries', 'format=duration',
'-of', 'csv=p=0', str(src)],
timeout=60,
).decode().strip()
if out and out != 'N/A':
return float(out)
except (subprocess.CalledProcessError, ValueError, subprocess.TimeoutExpired):
pass
log.info("ffprobe format.duration=N/A, decoding to count time")
proc = subprocess.run(
['ffmpeg', '-i', str(src), '-f', 'null', '-'],
stderr=subprocess.PIPE, stdout=subprocess.DEVNULL,
timeout=900,
)
matches = re.findall(rb'time=(\d+):(\d+):(\d+(?:\.\d+)?)', proc.stderr)
if not matches:
raise HTTPException(500, f'cannot determine duration; ffmpeg stderr tail: {proc.stderr[-300:].decode("utf-8","replace")}')
h, m, s = matches[-1]
return int(h) * 3600 + int(m) * 60 + float(s)
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s: %(message)s')
log = logging.getLogger('feishu')
app = FastAPI()
@app.get('/healthz')
def healthz():
return {'ok': True}
class TranscribeReq(BaseModel):
audio_path: str
chunk_seconds: int = 65 # 单段长度,远低于 Qwen3-ASR 8192-token cache~7min
overlap_seconds: int = 10 # 相邻段重叠,给 LLM stitching 留 anchor
@app.post('/transcribe')
def transcribe(req: TranscribeReq):
"""ffmpeg 切 overlap 片 → 串行 ASR → LLM 拼接去重。"""
src = Path(req.audio_path)
if not src.exists():
raise HTTPException(400, f'audio not found: {src}')
asr_url = os.environ.get('ASR_URL', '')
asr_token = os.environ.get('ASR_TOKEN', '')
if not asr_url or not asr_token:
raise HTTPException(500, 'ASR_URL/ASR_TOKEN not configured in sidecar')
tmp = Path(tempfile.gettempdir()) / f'transcribe-{uuid.uuid4().hex}'
tmp.mkdir(parents=True)
try:
# 1) 拿总时长(ffprobe N/A 时回退 null-muxer 解码)
duration = probe_duration(src)
log.info("duration=%.1fs", duration)
# 2) 切 chunk_seconds 段,stride = chunk_seconds - overlap_seconds
stride = max(1, req.chunk_seconds - req.overlap_seconds)
ext = src.suffix.lstrip('.') or 'm4a'
chunks_meta = []
i = 0
start = 0.0
# 短录音单段够:不切,直接整段
single_shot = duration <= req.chunk_seconds + 5
if single_shot:
chunks_meta = [{'start': 0.0, 'path': src, 'idx': 0}]
else:
while start < duration:
cp = tmp / f'chunk_{i:03d}.{ext}'
# -ss 在 -i 前:input seek,快;-c copy 不重新编码
try:
subprocess.run(
['ffmpeg', '-y', '-ss', f'{start:.2f}',
'-t', f'{req.chunk_seconds}',
'-i', str(src), '-c', 'copy', str(cp)],
check=True, capture_output=True, timeout=120,
)
except subprocess.CalledProcessError:
subprocess.run(
['ffmpeg', '-y', '-ss', f'{start:.2f}',
'-t', f'{req.chunk_seconds}',
'-i', str(src),
'-c:a', 'aac', '-b:a', '64k', '-ac', '1', '-ar', '16000',
str(cp)],
check=True, capture_output=True, timeout=180,
)
if cp.stat().st_size < 1024:
break
chunks_meta.append({'start': start, 'path': cp, 'idx': i})
start += stride
i += 1
if not chunks_meta:
raise HTTPException(500, 'no chunks produced')
log.info("chunks=%d, stride=%ds, overlap=%ds",
len(chunks_meta), stride, req.overlap_seconds)
# 3) 串行 ASR
chunk_texts = []
for m in chunks_meta:
log.info("ASR chunk %d/%d (start=%.1fs, %dKB)",
m['idx'] + 1, len(chunks_meta), m['start'],
m['path'].stat().st_size // 1024)
with open(m['path'], 'rb') as f:
r = requests.post(
asr_url,
headers={'Authorization': f'Bearer {asr_token}'},
files={'file': (m['path'].name, f, 'audio/mp4')},
data={'model': 'qwen3-asr', 'response_format': 'json'},
timeout=300,
)
if not r.ok:
raise HTTPException(502, f'ASR chunk {m["idx"]} {r.status_code}: {r.text[:300]}')
try:
text = r.json().get('text', '').strip()
except Exception:
raise HTTPException(502, f'ASR chunk {m["idx"]} bad json: {r.text[:200]}')
chunk_texts.append(text)
# 4) 单段直接返回
if len(chunk_texts) == 1:
return {'text': chunk_texts[0], 'chunks': 1, 'stitched_by': 'single'}
# 5) LLM 拼接(gemma 一次性看所有 chunks 去重 + 拼)
stitched = llm_stitch(chunk_texts, req.overlap_seconds)
return {
'text': stitched,
'chunks': len(chunk_texts),
'stitched_by': 'llm',
}
finally:
shutil.rmtree(tmp, ignore_errors=True)
def llm_stitch(chunks: list[str], overlap_seconds: int) -> str:
"""让 LLM 把相邻段重叠部分去重 + 修正边界字。失败 fallback 朴素拼接。"""
gw = os.environ.get('LLM_GATEWAY', '').rstrip('/')
tok = os.environ.get('LLM_TOKEN', '')
model = os.environ.get('LLM_MODEL', 'gemma-4-31b-it')
naive = '\n'.join(chunks)
if not gw or not tok:
log.warning("LLM not configured, fall back to naive concat")
return naive
parts = []
for i, c in enumerate(chunks):
parts.append(f"{i + 1}\n{c}")
user = (
f"下面是一段会议录音的 ASR 转写,被切成 {len(chunks)} 段。"
f"相邻段有约 {overlap_seconds} 秒(几句话)的重叠。\n\n"
+ "\n\n".join(parts)
+ "\n\n请把所有段拼成一段连续文本:去掉相邻段交界处的重复、"
"修正明显 ASR 错字(结合上下文)、补回被切断的词。\n"
"不要加任何解释、标题、段号;只输出拼好的连续文本。"
)
payload = {
"model": model,
"messages": [
{"role": "system", "content": "你是 ASR 转写后处理助手,专门做去重拼接和错字修正。"},
{"role": "user", "content": user},
],
"temperature": 0.1,
}
try:
r = requests.post(
gw + '/chat/completions',
headers={'Authorization': f'Bearer {tok}'},
json=payload,
timeout=600,
)
if not r.ok:
log.warning("stitch LLM %s: %s", r.status_code, r.text[:200])
return naive
d = r.json()
text = d['choices'][0]['message']['content'].strip()
return text or naive
except Exception as e:
log.warning("stitch LLM call failed: %s", e)
return naive
class ConvertReq(BaseModel):
md_path: str
title: Optional[str] = None
existing_doc_id: Optional[str] = None
@app.post('/convert')
def convert(req: ConvertReq):
md = Path(req.md_path)
if not md.exists():
raise HTTPException(400, f'md not found: {md}')
# user identity = fam 自己拥有 dochost 上手动跑过 OAuth 授权一次)
cmd = ['/usr/local/bin/markdown-to-feishu', str(md), '--as', 'user']
if req.existing_doc_id:
cmd += ['--update', req.existing_doc_id]
if req.title:
cmd += ['--title', req.title]
log.info("run: %s", ' '.join(cmd))
env = os.environ.copy()
# markdown-to-feishu state file 放 PVC,重启不丢
env['MD2FEISHU_STATE_DIR'] = '/data/feishu-state'
Path('/data/feishu-state').mkdir(parents=True, exist_ok=True)
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=600, env=env,
cwd=str(md.parent),
)
except subprocess.TimeoutExpired:
raise HTTPException(504, 'markdown-to-feishu timeout (>10min)')
# exit code 2 = embeds 有失败,但 doc 创建成功,仍 parse stdout
if proc.returncode not in (0, 2):
log.warning("md2feishu exit=%d stderr=%s", proc.returncode, proc.stderr[-500:])
raise HTTPException(502, f'md2feishu exit {proc.returncode}: '
f'{proc.stderr.strip()[-400:]}')
# 取 stdout 里最后一段 JSON 对象(script 的 final print
out = proc.stdout.strip()
# 从后往前找第一个 '{',取到末尾
last_open = out.rfind('{')
if last_open < 0:
raise HTTPException(502, f'md2feishu no json output. stdout tail: {out[-400:]}')
try:
data = json.loads(out[last_open:])
except json.JSONDecodeError as e:
raise HTTPException(502, f'md2feishu json parse: {e}; tail: {out[-400:]}')
doc_id = data.get('doc_id')
url = data.get('url')
if not doc_id or not url:
raise HTTPException(502, f'md2feishu missing doc_id/url: {data}')
log.info("ok: doc_id=%s url=%s embeds=%s",
doc_id, url, data.get('embeds_inserted'))
return {
'doc_id': doc_id,
'url': url,
'embeds_inserted': data.get('embeds_inserted', 0),
'embeds_failed': data.get('embeds_failed', 0),
}