Files
cube/apps/notes/feishu/server.py
T
Fam Zheng 1ee35b4d19
deploy notes / build-and-deploy (push) Successful in 2m7s
notes(asr): overlap 切片 + LLM 拼接去重
- ffmpeg 用 -ss/-t 顺序切 65s 段,stride 55s(10s overlap);单段 ≤70s 整段不切
- 串行喂外部 ASR 后,把全部 chunk_texts 喂一次 LLM 让它去重 + 修边界字
- 单段直接返回 naive,LLM 失败也 fallback naive,不卡流程
- sidecar 注入 LLM_GATEWAY/LLM_MODEL/LLM_TOKEN env
2026-05-17 22:47:06 +01:00

250 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""notes 多用途 sidecar
POST /transcribe — 用 ffmpeg 切片 + 串行调外部 ASR,绕过单请求大小限制
POST /convert — markdown-to-feishu,把会议纪要 push 飞书 docx
"""
import json
import logging
import os
import shutil
import subprocess
import tempfile
import uuid
from pathlib import Path
from typing import Optional
import requests
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s: %(message)s')
log = logging.getLogger('feishu')
app = FastAPI()
@app.get('/healthz')
def healthz():
return {'ok': True}
class TranscribeReq(BaseModel):
audio_path: str
chunk_seconds: int = 65 # 单段长度,远低于 Qwen3-ASR 8192-token cache~7min
overlap_seconds: int = 10 # 相邻段重叠,给 LLM stitching 留 anchor
@app.post('/transcribe')
def transcribe(req: TranscribeReq):
"""ffmpeg 切 overlap 片 → 串行 ASR → LLM 拼接去重。"""
src = Path(req.audio_path)
if not src.exists():
raise HTTPException(400, f'audio not found: {src}')
asr_url = os.environ.get('ASR_URL', '')
asr_token = os.environ.get('ASR_TOKEN', '')
if not asr_url or not asr_token:
raise HTTPException(500, 'ASR_URL/ASR_TOKEN not configured in sidecar')
tmp = Path(tempfile.gettempdir()) / f'transcribe-{uuid.uuid4().hex}'
tmp.mkdir(parents=True)
try:
# 1) 用 ffprobe 拿总时长
out = subprocess.check_output(
['ffprobe', '-v', 'quiet', '-show_entries', 'format=duration',
'-of', 'csv=p=0', str(src)],
timeout=60,
)
try:
duration = float(out.decode().strip())
except ValueError:
raise HTTPException(500, f'ffprobe duration parse: {out!r}')
log.info("duration=%.1fs", duration)
# 2) 切 chunk_seconds 段,stride = chunk_seconds - overlap_seconds
stride = max(1, req.chunk_seconds - req.overlap_seconds)
ext = src.suffix.lstrip('.') or 'm4a'
chunks_meta = []
i = 0
start = 0.0
# 短录音单段够:不切,直接整段
single_shot = duration <= req.chunk_seconds + 5
if single_shot:
chunks_meta = [{'start': 0.0, 'path': src, 'idx': 0}]
else:
while start < duration:
cp = tmp / f'chunk_{i:03d}.{ext}'
# -ss 在 -i 前:input seek,快;-c copy 不重新编码
try:
subprocess.run(
['ffmpeg', '-y', '-ss', f'{start:.2f}',
'-t', f'{req.chunk_seconds}',
'-i', str(src), '-c', 'copy', str(cp)],
check=True, capture_output=True, timeout=120,
)
except subprocess.CalledProcessError:
subprocess.run(
['ffmpeg', '-y', '-ss', f'{start:.2f}',
'-t', f'{req.chunk_seconds}',
'-i', str(src),
'-c:a', 'aac', '-b:a', '64k', '-ac', '1', '-ar', '16000',
str(cp)],
check=True, capture_output=True, timeout=180,
)
if cp.stat().st_size < 1024:
break
chunks_meta.append({'start': start, 'path': cp, 'idx': i})
start += stride
i += 1
if not chunks_meta:
raise HTTPException(500, 'no chunks produced')
log.info("chunks=%d, stride=%ds, overlap=%ds",
len(chunks_meta), stride, req.overlap_seconds)
# 3) 串行 ASR
chunk_texts = []
for m in chunks_meta:
log.info("ASR chunk %d/%d (start=%.1fs, %dKB)",
m['idx'] + 1, len(chunks_meta), m['start'],
m['path'].stat().st_size // 1024)
with open(m['path'], 'rb') as f:
r = requests.post(
asr_url,
headers={'Authorization': f'Bearer {asr_token}'},
files={'file': (m['path'].name, f, 'audio/mp4')},
data={'model': 'qwen3-asr', 'response_format': 'json'},
timeout=300,
)
if not r.ok:
raise HTTPException(502, f'ASR chunk {m["idx"]} {r.status_code}: {r.text[:300]}')
try:
text = r.json().get('text', '').strip()
except Exception:
raise HTTPException(502, f'ASR chunk {m["idx"]} bad json: {r.text[:200]}')
chunk_texts.append(text)
# 4) 单段直接返回
if len(chunk_texts) == 1:
return {'text': chunk_texts[0], 'chunks': 1, 'stitched_by': 'single'}
# 5) LLM 拼接(gemma 一次性看所有 chunks 去重 + 拼)
stitched = llm_stitch(chunk_texts, req.overlap_seconds)
return {
'text': stitched,
'chunks': len(chunk_texts),
'stitched_by': 'llm',
}
finally:
shutil.rmtree(tmp, ignore_errors=True)
def llm_stitch(chunks: list[str], overlap_seconds: int) -> str:
"""让 LLM 把相邻段重叠部分去重 + 修正边界字。失败 fallback 朴素拼接。"""
gw = os.environ.get('LLM_GATEWAY', '').rstrip('/')
tok = os.environ.get('LLM_TOKEN', '')
model = os.environ.get('LLM_MODEL', 'gemma-4-31b-it')
naive = '\n'.join(chunks)
if not gw or not tok:
log.warning("LLM not configured, fall back to naive concat")
return naive
parts = []
for i, c in enumerate(chunks):
parts.append(f"{i + 1}\n{c}")
user = (
f"下面是一段会议录音的 ASR 转写,被切成 {len(chunks)} 段。"
f"相邻段有约 {overlap_seconds} 秒(几句话)的重叠。\n\n"
+ "\n\n".join(parts)
+ "\n\n请把所有段拼成一段连续文本:去掉相邻段交界处的重复、"
"修正明显 ASR 错字(结合上下文)、补回被切断的词。\n"
"不要加任何解释、标题、段号;只输出拼好的连续文本。"
)
payload = {
"model": model,
"messages": [
{"role": "system", "content": "你是 ASR 转写后处理助手,专门做去重拼接和错字修正。"},
{"role": "user", "content": user},
],
"temperature": 0.1,
}
try:
r = requests.post(
gw + '/chat/completions',
headers={'Authorization': f'Bearer {tok}'},
json=payload,
timeout=600,
)
if not r.ok:
log.warning("stitch LLM %s: %s", r.status_code, r.text[:200])
return naive
d = r.json()
text = d['choices'][0]['message']['content'].strip()
return text or naive
except Exception as e:
log.warning("stitch LLM call failed: %s", e)
return naive
class ConvertReq(BaseModel):
md_path: str
title: Optional[str] = None
existing_doc_id: Optional[str] = None
@app.post('/convert')
def convert(req: ConvertReq):
md = Path(req.md_path)
if not md.exists():
raise HTTPException(400, f'md not found: {md}')
cmd = ['/usr/local/bin/markdown-to-feishu', str(md), '--as', 'user']
if req.existing_doc_id:
cmd += ['--update', req.existing_doc_id]
if req.title:
cmd += ['--title', req.title]
log.info("run: %s", ' '.join(cmd))
env = os.environ.copy()
# markdown-to-feishu state file 放 PVC,重启不丢
env['MD2FEISHU_STATE_DIR'] = '/data/feishu-state'
Path('/data/feishu-state').mkdir(parents=True, exist_ok=True)
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=600, env=env,
cwd=str(md.parent),
)
except subprocess.TimeoutExpired:
raise HTTPException(504, 'markdown-to-feishu timeout (>10min)')
# exit code 2 = embeds 有失败,但 doc 创建成功,仍 parse stdout
if proc.returncode not in (0, 2):
log.warning("md2feishu exit=%d stderr=%s", proc.returncode, proc.stderr[-500:])
raise HTTPException(502, f'md2feishu exit {proc.returncode}: '
f'{proc.stderr.strip()[-400:]}')
# 取 stdout 里最后一段 JSON 对象(script 的 final print
out = proc.stdout.strip()
# 从后往前找第一个 '{',取到末尾
last_open = out.rfind('{')
if last_open < 0:
raise HTTPException(502, f'md2feishu no json output. stdout tail: {out[-400:]}')
try:
data = json.loads(out[last_open:])
except json.JSONDecodeError as e:
raise HTTPException(502, f'md2feishu json parse: {e}; tail: {out[-400:]}')
doc_id = data.get('doc_id')
url = data.get('url')
if not doc_id or not url:
raise HTTPException(502, f'md2feishu missing doc_id/url: {data}')
log.info("ok: doc_id=%s url=%s embeds=%s",
doc_id, url, data.get('embeds_inserted'))
return {
'doc_id': doc_id,
'url': url,
'embeds_inserted': data.get('embeds_inserted', 0),
'embeds_failed': data.get('embeds_failed', 0),
}