""" chord-fetcher sidecar 的 HTTP service。 跟 music 主容器同 pod,监听 :8001。被 music backend 通过 localhost 调用。 worker 单线程串行(chromium 一次跑一个,省资源),文件落 /data/chord-fetch/{piece_id}.png。 """ import json import logging import queue import sys import threading import os from pathlib import Path from typing import Optional from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse # 调试热更:/data 是 PVC mount,重启容器不丢;放 yopu.py 在 /data/chord-overrides/ # 启动时把它放最高优先级,方便不重 build image 直接 hot-fix selector。 _OVERRIDE_DIR = Path('/data/chord-overrides') _OVERRIDE_DIR.mkdir(parents=True, exist_ok=True) if (_OVERRIDE_DIR / 'yopu.py').exists(): sys.path.insert(0, str(_OVERRIDE_DIR)) print(f"[chord-server] using yopu.py override from {_OVERRIDE_DIR}") import yopu # noqa: E402 logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s: %(message)s') logger = logging.getLogger('chord-server') OUT_DIR = Path(os.getenv('CHORD_OUT_DIR', '/data/chord-fetch')) OUT_DIR.mkdir(parents=True, exist_ok=True) app = FastAPI() # in-memory job state. piece_id -> {status, error, query} state: dict[int, dict] = {} state_lock = threading.Lock() job_q: queue.Queue = queue.Queue() def out_path(piece_id: int) -> Path: return OUT_DIR / f"{piece_id}.png" def worker(): while True: piece_id, query = job_q.get() with state_lock: state[piece_id] = {'status': 'processing', 'error': '', 'query': query} logger.info("[piece=%d] start fetch query=%r", piece_id, query) try: ok, msg = yopu.fetch_chord_chart(query, str(out_path(piece_id))) with state_lock: if ok: state[piece_id] = {'status': 'completed', 'error': '', 'query': query} logger.info("[piece=%d] completed: %s", piece_id, msg) else: state[piece_id] = {'status': 'failed', 'error': msg, 'query': query} logger.warning("[piece=%d] failed: %s", piece_id, msg) except Exception as e: logger.exception("[piece=%d] worker crash", piece_id) with state_lock: state[piece_id] = {'status': 'failed', 'error': str(e), 'query': query} finally: job_q.task_done() threading.Thread(target=worker, daemon=True).start() @app.get('/healthz') def healthz(): return {'ok': True} @app.post('/fetch') def fetch(piece_id: int, query: str): """加入 fetch 队列。query 一般是 ' '。 幂等:已 completed 且文件还在,直接返回 completed。""" if piece_id <= 0 or not query.strip(): raise HTTPException(400, 'piece_id / query required') with state_lock: cur = state.get(piece_id, {}) if cur.get('status') == 'completed' and out_path(piece_id).exists(): return {'status': 'completed'} if cur.get('status') in ('pending', 'processing'): return {'status': cur['status']} state[piece_id] = {'status': 'pending', 'error': '', 'query': query} job_q.put((piece_id, query)) return {'status': 'pending'} @app.get('/status/{piece_id}') def status(piece_id: int): with state_lock: cur = state.get(piece_id, {}) file_exists = out_path(piece_id).exists() if cur.get('status') == 'completed' and not file_exists: return {'status': 'failed', 'error': 'png 文件丢了'} if not cur and file_exists: return {'status': 'completed'} return { 'status': cur.get('status', 'none'), 'error': cur.get('error', ''), 'query': cur.get('query', ''), 'file_exists': file_exists, } @app.get('/image/{piece_id}') def image(piece_id: int): p = out_path(piece_id) if not p.exists(): raise HTTPException(404, 'not found') return FileResponse(p, media_type='image/png') @app.delete('/state/{piece_id}') def reset(piece_id: int): """music backend import 完后清状态 + 删 png(防 PVC 越积越多)。""" with state_lock: state.pop(piece_id, None) p = out_path(piece_id) if p.exists(): try: p.unlink() except Exception as e: logger.warning("[piece=%d] cleanup unlink: %s", piece_id, e) return {'ok': True}