music(chord): 加 yopu.co 吉他谱自动抓取(sidecar 模式)
deploy cube / build-and-deploy (push) Successful in 1m15s
deploy simpleasm / build-and-deploy (push) Successful in 1m19s
deploy music / build-and-deploy (push) Successful in 4m38s

复刻 ../guitar 的功能:
- 新加 chord-fetcher sidecar(python 3.11 + chromium + selenium),跟 main 同 pod 共享 PVC
- yopu.py v2:搜「和弦谱」→ 进 view → 选 谱面样式=功能谱 + 和弦样式=级数名 → 截 sheet-container → PIL 裁白边
- music backend 加 POST /api/pieces/:id/chord/fetch + GET /chord/status,转发 sidecar 并把 png import 成 image attachment role=chord
- 前端 chord tab 在没图时显示「自动抓取」按钮,点了 polling 状态、完成后刷新
- CI build 两个 image(music + music-chord),rollout 同步切版本
This commit is contained in:
Fam Zheng
2026-05-09 22:52:09 +01:00
parent 1a8f297302
commit e111398157
11 changed files with 1688 additions and 12 deletions
+24
View File
@@ -0,0 +1,24 @@
# music chord-fetcher sidecar
# 抓 yopu.co 截图的 selenium 服务,跟 music 主容器同 pod 共享 PVC。
FROM python:3.11-slim-bookworm
RUN apt-get update && apt-get install -y --no-install-recommends \
chromium chromium-driver fonts-noto-cjk ca-certificates \
&& rm -rf /var/lib/apt/lists/*
ENV CHROME_BIN=/usr/bin/chromium
ENV CHROMEDRIVER_PATH=/usr/bin/chromedriver
ENV PYTHONUNBUFFERED=1
RUN pip install --no-cache-dir \
selenium==4.27.1 \
pillow==11.0.0 \
fastapi==0.115.6 \
uvicorn==0.34.0
WORKDIR /app
COPY yopu.py chord_server.py ./
EXPOSE 8001
CMD ["uvicorn", "chord_server:app", "--host", "0.0.0.0", "--port", "8001"]
+127
View File
@@ -0,0 +1,127 @@
"""
chord-fetcher sidecar 的 HTTP service。
跟 music 主容器同 pod,监听 :8001。被 music backend 通过 localhost 调用。
worker 单线程串行(chromium 一次跑一个,省资源),文件落 /data/chord-fetch/{piece_id}.png。
"""
import json
import logging
import queue
import threading
import os
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
import yopu
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s: %(message)s')
logger = logging.getLogger('chord-server')
OUT_DIR = Path(os.getenv('CHORD_OUT_DIR', '/data/chord-fetch'))
OUT_DIR.mkdir(parents=True, exist_ok=True)
app = FastAPI()
# in-memory job state. piece_id -> {status, error, query}
state: dict[int, dict] = {}
state_lock = threading.Lock()
job_q: queue.Queue = queue.Queue()
def out_path(piece_id: int) -> Path:
return OUT_DIR / f"{piece_id}.png"
def worker():
while True:
piece_id, query = job_q.get()
with state_lock:
state[piece_id] = {'status': 'processing', 'error': '', 'query': query}
logger.info("[piece=%d] start fetch query=%r", piece_id, query)
try:
ok, msg = yopu.fetch_chord_chart(query, str(out_path(piece_id)))
with state_lock:
if ok:
state[piece_id] = {'status': 'completed', 'error': '', 'query': query}
logger.info("[piece=%d] completed: %s", piece_id, msg)
else:
state[piece_id] = {'status': 'failed', 'error': msg, 'query': query}
logger.warning("[piece=%d] failed: %s", piece_id, msg)
except Exception as e:
logger.exception("[piece=%d] worker crash", piece_id)
with state_lock:
state[piece_id] = {'status': 'failed', 'error': str(e), 'query': query}
finally:
job_q.task_done()
threading.Thread(target=worker, daemon=True).start()
@app.get('/healthz')
def healthz():
return {'ok': True}
@app.post('/fetch')
def fetch(piece_id: int, query: str):
"""加入 fetch 队列。query 一般是 '<artist> <title>'
幂等:已 completed 且文件还在,直接返回 completed。"""
if piece_id <= 0 or not query.strip():
raise HTTPException(400, 'piece_id / query required')
with state_lock:
cur = state.get(piece_id, {})
if cur.get('status') == 'completed' and out_path(piece_id).exists():
return {'status': 'completed'}
if cur.get('status') in ('pending', 'processing'):
return {'status': cur['status']}
state[piece_id] = {'status': 'pending', 'error': '', 'query': query}
job_q.put((piece_id, query))
return {'status': 'pending'}
@app.get('/status/{piece_id}')
def status(piece_id: int):
with state_lock:
cur = state.get(piece_id, {})
file_exists = out_path(piece_id).exists()
if cur.get('status') == 'completed' and not file_exists:
return {'status': 'failed', 'error': 'png 文件丢了'}
if not cur and file_exists:
return {'status': 'completed'}
return {
'status': cur.get('status', 'none'),
'error': cur.get('error', ''),
'query': cur.get('query', ''),
'file_exists': file_exists,
}
@app.get('/image/{piece_id}')
def image(piece_id: int):
p = out_path(piece_id)
if not p.exists():
raise HTTPException(404, 'not found')
return FileResponse(p, media_type='image/png')
@app.delete('/state/{piece_id}')
def reset(piece_id: int):
"""music backend import 完后清状态 + 删 png(防 PVC 越积越多)。"""
with state_lock:
state.pop(piece_id, None)
p = out_path(piece_id)
if p.exists():
try:
p.unlink()
except Exception as e:
logger.warning("[piece=%d] cleanup unlink: %s", piece_id, e)
return {'ok': True}
+304
View File
@@ -0,0 +1,304 @@
#!/usr/bin/env python3
"""
yopu.co 和弦谱抓取(v2
跟旧 guitar 版相比,UI 改了:现在是分立的 row:
- "谱面样式" → 选 "功能谱"
- "和弦样式" → 选 "级数名"
- "和弦图" → 默认(不动)
抓取流程:
1. /explore#q=<query> 搜索
2. 找第一个含「和弦谱」字样的结果 → 进 /view/<id>
3. 在 row label = X 的行里,点 button.option 文本 = Y
4. 撑开 div.sheet-container 容器把 overflow / max-height 砍掉,让全部内容渲染
5. 截图整个 container element
6. PIL 裁白边 + padding,存 PNG
"""
import os
import time
import logging
from pathlib import Path
from urllib.parse import quote, urlparse, urljoin
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException
from PIL import Image
logger = logging.getLogger(__name__)
def setup_driver(window="1920,5000"):
o = Options()
o.add_argument('--headless=new')
o.add_argument('--no-sandbox')
o.add_argument('--disable-dev-shm-usage')
o.add_argument('--disable-gpu')
o.add_argument(f'--window-size={window}')
o.add_argument('--lang=zh-CN')
o.add_argument('user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36')
o.add_experimental_option('prefs', {'intl.accept_languages': 'zh-CN,zh,en-US,en'})
service = None
if cdp := os.getenv('CHROMEDRIVER_PATH'):
service = Service(cdp)
if cb := os.getenv('CHROME_BIN'):
o.binary_location = cb
return webdriver.Chrome(service=service, options=o)
def find_first_chord_chart(driver, search_url):
"""在搜索页找第一个「和弦谱」结果,返回 view url 和 title。"""
logger.info("loading search: %s", search_url)
driver.get(search_url)
time.sleep(3)
chord_links = driver.execute_script("""
var hits = [];
var posts = document.querySelectorAll('a.post-main');
for (var i = 0; i < posts.length; i++) {
var info = posts[i].querySelector('.one-line-info');
var t = info ? (info.textContent || info.innerText || '') : '';
if (t.indexOf('和弦') >= 0 && t.indexOf('') >= 0) {
hits.push({
href: posts[i].href,
title: (posts[i].querySelector('.title-line .title, .title') || {}).textContent || '',
text: t.trim(),
});
}
}
return hits;
""")
if not chord_links:
logger.warning("no '和弦谱' hits in search results")
return None
first = chord_links[0]
href = first['href']
if href.startswith('/'):
p = urlparse(search_url)
href = f"{p.scheme}://{p.netloc}{href}"
elif not href.startswith('http'):
href = urljoin(search_url, href)
logger.info("matched: %s%s", first.get('title'), href)
return {'url': href, 'title': first.get('title') or '', 'text': first.get('text') or ''}
def select_option_in_row(driver, row_label, button_text, timeout=10):
"""在 label 含 row_label 的 row 里,点 button.option 文本含 button_text 的按钮。
返回 True 表示点了;False 表示找不到(不算错误,可能是 UI 文案变了)。"""
wait = WebDriverWait(driver, timeout)
try:
row = wait.until(EC.presence_of_element_located((
By.XPATH,
f"//div[contains(@class, 'row')][.//div[contains(@class, 'label') "
f"and contains(normalize-space(.), '{row_label}')]]"
)))
except TimeoutException:
logger.warning("row '%s' not found", row_label)
return False
buttons = row.find_elements(By.CSS_SELECTOR, "button.option, button")
for btn in buttons:
txt = (btn.text or '').strip()
if button_text in txt:
try:
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", btn)
time.sleep(0.3)
btn.click()
logger.info("clicked '%s' in row '%s'", button_text, row_label)
time.sleep(1.2)
return True
except Exception as e:
logger.warning("click failed in row '%s' / '%s': %s", row_label, button_text, e)
return False
logger.warning("button '%s' not found in row '%s' (had: %s)",
button_text, row_label, [(b.text or '').strip() for b in buttons])
return False
def expand_sheet_container(driver, container):
"""把 sheet-container 跟它的祖先一起把 overflow / max-height 拆掉,
让 scrollHeight 全暴露,截图能拿到完整谱面。"""
return driver.execute_script("""
var c = arguments[0];
var origStyle = c.getAttribute('style') || '';
var modified = [];
var node = c;
while (node && node !== document.body) {
var cs = window.getComputedStyle(node);
if (cs.overflow === 'hidden' || cs.overflow === 'auto'
|| cs.overflowY === 'hidden' || cs.overflowY === 'auto'
|| cs.maxHeight !== 'none') {
modified.push({ el: node, orig: node.getAttribute('style') || '' });
node.style.overflow = 'visible';
node.style.overflowY = 'visible';
node.style.maxHeight = 'none';
node.style.height = 'auto';
}
node = node.parentElement;
}
c.style.overflow = 'visible';
c.style.maxHeight = 'none';
c.style.height = 'auto';
c.style.minHeight = c.scrollHeight + 'px';
c.offsetHeight; // force reflow
c.setAttribute('data-orig-style', origStyle);
window.__yopuModified = modified;
return { scrollHeight: c.scrollHeight, modified: modified.length };
""", container)
def crop_white(path, pad_top=20, pad_bottom=50, pad_left=20, pad_right=20, white_th=250):
"""裁掉四边的白边,加点 padding。"""
img = Image.open(path)
w, h = img.size
if img.mode != 'RGB':
img = img.convert('RGB')
px = img.load()
def row_white_ratio(y):
wp = 0
for x in range(w):
r, g, b = px[x, y]
if r > white_th and g > white_th and b > white_th:
wp += 1
return wp / w
def col_white_ratio(x, y0, y1):
wp = 0
rng = max(1, y1 - y0)
for y in range(y0, y1):
r, g, b = px[x, y]
if r > white_th and g > white_th and b > white_th:
wp += 1
return wp / rng
top = 0
for y in range(h):
if row_white_ratio(y) < 0.99:
top = y
break
bottom = h
for y in range(h - 1, -1, -1):
if row_white_ratio(y) < 0.99:
bottom = y + 1
break
if top >= bottom:
return # all white, give up
left = 0
for x in range(w):
if col_white_ratio(x, top, bottom) < 0.99:
left = x
break
right = w
for x in range(w - 1, -1, -1):
if col_white_ratio(x, top, bottom) < 0.99:
right = x + 1
break
if left >= right:
return
box = (
max(0, left - pad_left),
max(0, top - pad_top),
min(w, right + pad_right),
min(h, bottom + pad_bottom),
)
img.crop(box).save(path, 'PNG')
logger.info("cropped to %s", box)
def fetch_chord_chart(query: str, output_path: str, *,
sheet_style: str = '功能谱',
chord_style: str = '级数名',
verbose: bool = False) -> tuple[bool, str]:
"""
搜 yopu.co、进 view 页、按 row 选样式、截图。
返回 (ok, msg)。msg 在失败时是错误说明。
"""
if verbose:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s')
else:
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
search_url = f"https://yopu.co/explore#q={quote(query)}"
driver = None
try:
driver = setup_driver()
result = find_first_chord_chart(driver, search_url)
if not result:
return False, '未找到和弦谱'
view_url = result['url']
logger.info("loading view: %s", view_url)
driver.get(view_url)
time.sleep(3)
# 选样式(写死的 MVP 组合)
select_option_in_row(driver, '谱面样式', sheet_style)
select_option_in_row(driver, '和弦样式', chord_style)
# 等内容刷新
time.sleep(1.5)
wait = WebDriverWait(driver, 15)
sheet = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "div.sheet-container")
))
driver.execute_script("arguments[0].scrollIntoView(true);", sheet)
time.sleep(0.5)
dims = expand_sheet_container(driver, sheet)
logger.debug("expanded scrollHeight=%s, modified=%s ancestors", dims['scrollHeight'], dims['modified'])
time.sleep(1.5)
# incrButton:放大字号 / chord size,跟旧版一样点 3 次
try:
buttons = driver.find_elements(By.CSS_SELECTOR, "button.incrButton")
if buttons:
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", buttons[0])
time.sleep(0.3)
for _ in range(3):
buttons[0].click()
time.sleep(0.4)
except Exception as e:
logger.warning("incrButton failed: %s", e)
time.sleep(1.0)
# 滚 sheet 内部回到顶部,截整个 container
driver.execute_script("arguments[0].scrollTop = 0;", sheet)
time.sleep(0.4)
out = Path(output_path)
out.parent.mkdir(parents=True, exist_ok=True)
sheet.screenshot(str(out))
if not out.exists() or out.stat().st_size < 100:
return False, '截图为空'
logger.info("screenshot: %s (%d bytes)", out, out.stat().st_size)
try:
crop_white(str(out))
except Exception as e:
logger.warning("crop failed: %s", e)
return True, str(out)
except Exception as e:
logger.error("fetch failed: %s", e, exc_info=True)
return False, str(e)
finally:
if driver:
try:
driver.quit()
except Exception:
pass