music(chord): 拆两个 tab + 抓两种 (letters/functional)
deploy music / build-and-deploy (push) Successful in 1m54s
deploy music / build-and-deploy (push) Successful in 1m54s
- yopu 切 /song?title=&artist= 搜索(避免歌手词被搜糊)
- 抓的版本按搜索结果 nier-snippet svg <text> 数区分:
>0 = 字母谱 (G/Em7/C 弹唱谱);==0 = 功能谱 (1/4/5/6m 数字级数)
- sidecar fetch/status/state/image 都走 (id, mode) 维度,文件落 /data/chord-fetch/{id}-{mode}.png
- backend chord_fetch / chord_status 接 ?mode=letters|functional,import 时 role 分别为 chord_letters / chord_functional
- 前端 chord tab 拆「吉他谱」+「功能谱」,state/error/poll 各自独立;旧 role='chord' 显示在「吉他谱」兼容历史 import
- verified 标记探测:匿名访问 yopu HTML 里 0 hits(要登录可见),暂时只能按 svg_text 区分
This commit is contained in:
@@ -36,36 +36,38 @@ OUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
VALID_MODES = ('letters', 'functional')
|
||||
|
||||
# in-memory job state. piece_id -> {status, error, query}
|
||||
state: dict[int, dict] = {}
|
||||
# in-memory job state. (piece_id, mode) -> {status, error, title, artist}
|
||||
state: dict[tuple[int, str], dict] = {}
|
||||
state_lock = threading.Lock()
|
||||
job_q: queue.Queue = queue.Queue()
|
||||
|
||||
|
||||
def out_path(piece_id: int) -> Path:
|
||||
return OUT_DIR / f"{piece_id}.png"
|
||||
def out_path(piece_id: int, mode: str) -> Path:
|
||||
return OUT_DIR / f"{piece_id}-{mode}.png"
|
||||
|
||||
|
||||
def worker():
|
||||
while True:
|
||||
piece_id, query = job_q.get()
|
||||
piece_id, mode, title, artist = job_q.get()
|
||||
key = (piece_id, mode)
|
||||
with state_lock:
|
||||
state[piece_id] = {'status': 'processing', 'error': '', 'query': query}
|
||||
logger.info("[piece=%d] start fetch query=%r", piece_id, query)
|
||||
state[key] = {'status': 'processing', 'error': '', 'title': title, 'artist': artist}
|
||||
logger.info("[piece=%d mode=%s] start: title=%r artist=%r", piece_id, mode, title, artist)
|
||||
try:
|
||||
ok, msg = yopu.fetch_chord_chart(query, str(out_path(piece_id)))
|
||||
ok, msg = yopu.fetch_chord_chart(title, artist, str(out_path(piece_id, mode)), mode=mode)
|
||||
with state_lock:
|
||||
if ok:
|
||||
state[piece_id] = {'status': 'completed', 'error': '', 'query': query}
|
||||
logger.info("[piece=%d] completed: %s", piece_id, msg)
|
||||
state[key] = {'status': 'completed', 'error': '', 'title': title, 'artist': artist}
|
||||
logger.info("[piece=%d mode=%s] completed: %s", piece_id, mode, msg)
|
||||
else:
|
||||
state[piece_id] = {'status': 'failed', 'error': msg, 'query': query}
|
||||
logger.warning("[piece=%d] failed: %s", piece_id, msg)
|
||||
state[key] = {'status': 'failed', 'error': msg, 'title': title, 'artist': artist}
|
||||
logger.warning("[piece=%d mode=%s] failed: %s", piece_id, mode, msg)
|
||||
except Exception as e:
|
||||
logger.exception("[piece=%d] worker crash", piece_id)
|
||||
logger.exception("[piece=%d mode=%s] worker crash", piece_id, mode)
|
||||
with state_lock:
|
||||
state[piece_id] = {'status': 'failed', 'error': str(e), 'query': query}
|
||||
state[key] = {'status': 'failed', 'error': str(e), 'title': title, 'artist': artist}
|
||||
finally:
|
||||
job_q.task_done()
|
||||
|
||||
@@ -79,58 +81,69 @@ def healthz():
|
||||
|
||||
|
||||
@app.post('/fetch')
|
||||
def fetch(piece_id: int, query: str):
|
||||
"""加入 fetch 队列。query 一般是 '<artist> <title>'。
|
||||
def fetch(piece_id: int, title: str, artist: str = '', mode: str = 'functional'):
|
||||
"""加入 fetch 队列。
|
||||
mode='letters' = 弹唱谱字母版;mode='functional' = 数字级数版。
|
||||
幂等:已 completed 且文件还在,直接返回 completed。"""
|
||||
if piece_id <= 0 or not query.strip():
|
||||
raise HTTPException(400, 'piece_id / query required')
|
||||
if piece_id <= 0 or not title.strip():
|
||||
raise HTTPException(400, 'piece_id / title required')
|
||||
if mode not in VALID_MODES:
|
||||
raise HTTPException(400, f'mode must be one of {VALID_MODES}')
|
||||
|
||||
key = (piece_id, mode)
|
||||
with state_lock:
|
||||
cur = state.get(piece_id, {})
|
||||
if cur.get('status') == 'completed' and out_path(piece_id).exists():
|
||||
cur = state.get(key, {})
|
||||
if cur.get('status') == 'completed' and out_path(piece_id, mode).exists():
|
||||
return {'status': 'completed'}
|
||||
if cur.get('status') in ('pending', 'processing'):
|
||||
return {'status': cur['status']}
|
||||
state[piece_id] = {'status': 'pending', 'error': '', 'query': query}
|
||||
state[key] = {'status': 'pending', 'error': '', 'title': title, 'artist': artist}
|
||||
|
||||
job_q.put((piece_id, query))
|
||||
job_q.put((piece_id, mode, title.strip(), artist.strip()))
|
||||
return {'status': 'pending'}
|
||||
|
||||
|
||||
@app.get('/status/{piece_id}')
|
||||
def status(piece_id: int):
|
||||
@app.get('/status/{piece_id}/{mode}')
|
||||
def status(piece_id: int, mode: str):
|
||||
if mode not in VALID_MODES:
|
||||
raise HTTPException(400, f'mode must be one of {VALID_MODES}')
|
||||
key = (piece_id, mode)
|
||||
with state_lock:
|
||||
cur = state.get(piece_id, {})
|
||||
file_exists = out_path(piece_id).exists()
|
||||
cur = state.get(key, {})
|
||||
file_exists = out_path(piece_id, mode).exists()
|
||||
if cur.get('status') == 'completed' and not file_exists:
|
||||
return {'status': 'failed', 'error': 'png 文件丢了'}
|
||||
return {'status': 'failed', 'error': 'png 文件丢了', 'mode': mode}
|
||||
if not cur and file_exists:
|
||||
return {'status': 'completed'}
|
||||
return {'status': 'completed', 'mode': mode, 'file_exists': True}
|
||||
return {
|
||||
'status': cur.get('status', 'none'),
|
||||
'error': cur.get('error', ''),
|
||||
'query': cur.get('query', ''),
|
||||
'mode': mode,
|
||||
'file_exists': file_exists,
|
||||
}
|
||||
|
||||
|
||||
@app.get('/image/{piece_id}')
|
||||
def image(piece_id: int):
|
||||
p = out_path(piece_id)
|
||||
@app.get('/image/{piece_id}/{mode}')
|
||||
def image(piece_id: int, mode: str):
|
||||
if mode not in VALID_MODES:
|
||||
raise HTTPException(400, f'mode must be one of {VALID_MODES}')
|
||||
p = out_path(piece_id, mode)
|
||||
if not p.exists():
|
||||
raise HTTPException(404, 'not found')
|
||||
return FileResponse(p, media_type='image/png')
|
||||
|
||||
|
||||
@app.delete('/state/{piece_id}')
|
||||
def reset(piece_id: int):
|
||||
@app.delete('/state/{piece_id}/{mode}')
|
||||
def reset(piece_id: int, mode: str):
|
||||
"""music backend import 完后清状态 + 删 png(防 PVC 越积越多)。"""
|
||||
if mode not in VALID_MODES:
|
||||
raise HTTPException(400, f'mode must be one of {VALID_MODES}')
|
||||
with state_lock:
|
||||
state.pop(piece_id, None)
|
||||
p = out_path(piece_id)
|
||||
state.pop((piece_id, mode), None)
|
||||
p = out_path(piece_id, mode)
|
||||
if p.exists():
|
||||
try:
|
||||
p.unlink()
|
||||
except Exception as e:
|
||||
logger.warning("[piece=%d] cleanup unlink: %s", piece_id, e)
|
||||
logger.warning("[piece=%d mode=%s] cleanup unlink: %s", piece_id, mode, e)
|
||||
return {'ok': True}
|
||||
|
||||
+75
-27
@@ -54,16 +54,22 @@ def setup_driver(window="1920,5000"):
|
||||
return webdriver.Chrome(service=service, options=o)
|
||||
|
||||
|
||||
def find_first_chord_chart(driver, search_url):
|
||||
"""在搜索页找最佳的「功能谱」结果。
|
||||
def find_chart(driver, title: str, artist: str, prefer: str = 'functional'):
|
||||
"""在 /song?title=&artist= 找最佳候选 view。
|
||||
|
||||
yopu 现在搜索结果里同一首歌有多个版本:
|
||||
- 字母谱(chord chart):nier-snippet 里有 SVG <text> 渲染的 chord 字母(G/Em7/C)
|
||||
- 功能谱(数字 / 级数):nier-snippet 里没 SVG <text>(用 HTML/CSS 显示数字 1/4/5)
|
||||
yopu 同一首歌一般有多个版本,按搜索结果里 nier-snippet 内的
|
||||
SVG <text> 数量区分:
|
||||
- svg_text > 0 → chord 字母版(G/Em7/C),民间叫弹唱谱
|
||||
- svg_text == 0 → 功能谱 / 数字级数版
|
||||
|
||||
我们优先取第一个**功能谱**(svgTextCount === 0),fallback 到第一个字母谱。
|
||||
`prefer` ∈ {'letters', 'functional'},按需求挑第一个匹配的。
|
||||
实在没匹配就 fallback 到第一个非空候选。
|
||||
"""
|
||||
logger.info("loading search: %s", search_url)
|
||||
from urllib.parse import urlencode
|
||||
base = 'https://yopu.co/song'
|
||||
# /song 用 hash 传参(跟 yopu 前端约定一致)
|
||||
search_url = f"{base}#title={quote(title)}&artist={quote(artist)}"
|
||||
logger.info("loading /song: %s", search_url)
|
||||
driver.get(search_url)
|
||||
time.sleep(3)
|
||||
|
||||
@@ -77,40 +83,82 @@ def find_first_chord_chart(driver, search_url):
|
||||
var info = p.querySelector('.one-line-info');
|
||||
var snippet = p.querySelector('.nier-snippet');
|
||||
var svgTextCount = snippet ? snippet.querySelectorAll('svg text').length : 0;
|
||||
// 任何子元素 class 含 'verified' 都算(svelte 加了 hash class)
|
||||
var isVerified = p.querySelectorAll('[class*="verified"]').length > 0;
|
||||
out.push({
|
||||
href: p.href,
|
||||
title: titleEl ? (titleEl.textContent || '').trim() : '',
|
||||
subtitle: subEl ? (subEl.textContent || '').trim() : '',
|
||||
info: info ? (info.textContent || '').trim() : '',
|
||||
isFunctional: svgTextCount === 0,
|
||||
svgTextCount: svgTextCount,
|
||||
isLetters: svgTextCount > 0,
|
||||
isFunctional: svgTextCount === 0,
|
||||
isVerified: isVerified,
|
||||
});
|
||||
}
|
||||
return out;
|
||||
""")
|
||||
|
||||
if not hits:
|
||||
logger.warning("no a.post-main found — yopu DOM changed?")
|
||||
logger.warning("no a.post-main found at /song — fallback to /explore")
|
||||
# fallback: yopu /song 偶尔没结果,回退到 /explore
|
||||
from urllib.parse import quote as _q
|
||||
q = (artist + ' ' + title).strip()
|
||||
driver.get(f"https://yopu.co/explore#q={_q(q)}")
|
||||
time.sleep(3)
|
||||
hits = driver.execute_script("""
|
||||
var out = [];
|
||||
var posts = document.querySelectorAll('a.post-main');
|
||||
for (var i = 0; i < posts.length; i++) {
|
||||
var p = posts[i];
|
||||
var titleEl = p.querySelector('.title-line .title, .title');
|
||||
var subEl = p.querySelector('.title-line .subtitle, .subtitle');
|
||||
var info = p.querySelector('.one-line-info');
|
||||
var snippet = p.querySelector('.nier-snippet');
|
||||
var svgTextCount = snippet ? snippet.querySelectorAll('svg text').length : 0;
|
||||
out.push({
|
||||
href: p.href,
|
||||
title: titleEl ? (titleEl.textContent || '').trim() : '',
|
||||
subtitle: subEl ? (subEl.textContent || '').trim() : '',
|
||||
info: info ? (info.textContent || '').trim() : '',
|
||||
svgTextCount: svgTextCount,
|
||||
isLetters: svgTextCount > 0,
|
||||
isFunctional: svgTextCount === 0,
|
||||
isVerified: false,
|
||||
});
|
||||
}
|
||||
return out;
|
||||
""")
|
||||
if not hits:
|
||||
return None
|
||||
|
||||
# 优先功能谱
|
||||
functional = [h for h in hits if h['isFunctional']]
|
||||
if functional:
|
||||
chosen = functional[0]
|
||||
kind = 'functional'
|
||||
else:
|
||||
chosen = hits[0]
|
||||
kind = 'letter-chord (no functional version found)'
|
||||
# 优先匹配 prefer;同时优先 verified(虽然匿名访问大概率全是 false)
|
||||
def _key(h):
|
||||
match_pref = (prefer == 'letters' and h['isLetters']) or \
|
||||
(prefer == 'functional' and h['isFunctional'])
|
||||
# 数值越小越优先:first match_pref+verified, then match_pref, then verified, then all
|
||||
return (0 if (match_pref and h['isVerified']) else
|
||||
1 if match_pref else
|
||||
2 if h['isVerified'] else 3)
|
||||
|
||||
sorted_hits = sorted(hits, key=_key)
|
||||
chosen = sorted_hits[0]
|
||||
matched = (prefer == 'letters' and chosen['isLetters']) or \
|
||||
(prefer == 'functional' and chosen['isFunctional'])
|
||||
kind = prefer if matched else f"{prefer}-fallback"
|
||||
|
||||
href = chosen['href']
|
||||
if href.startswith('/'):
|
||||
p = urlparse(search_url)
|
||||
p = urlparse(driver.current_url)
|
||||
href = f"{p.scheme}://{p.netloc}{href}"
|
||||
elif not href.startswith('http'):
|
||||
href = urljoin(search_url, href)
|
||||
logger.info("[%s] %s — %s [%s] (%d total: %d functional, %d letter)",
|
||||
href = urljoin(driver.current_url, href)
|
||||
logger.info("[%s] %s — %s [%s] verified=%s (total %d, letters=%d, functional=%d, verified=%d)",
|
||||
kind, chosen['title'], chosen['subtitle'], chosen['info'],
|
||||
len(hits), len(functional), len(hits) - len(functional))
|
||||
chosen['isVerified'], len(hits),
|
||||
sum(1 for h in hits if h['isLetters']),
|
||||
sum(1 for h in hits if h['isFunctional']),
|
||||
sum(1 for h in hits if h['isVerified']))
|
||||
return {
|
||||
'url': href,
|
||||
'title': chosen.get('title') or '',
|
||||
@@ -261,24 +309,24 @@ def _save_debug(driver, tag: str):
|
||||
logger.warning("debug snapshot failed: %s", e)
|
||||
|
||||
|
||||
def fetch_chord_chart(query: str, output_path: str, *,
|
||||
def fetch_chord_chart(title: str, artist: str, output_path: str, *,
|
||||
mode: str = 'functional',
|
||||
sheet_style: str = '功能谱',
|
||||
chord_style: str = '级数名',
|
||||
verbose: bool = False) -> tuple[bool, str]:
|
||||
"""
|
||||
搜 yopu.co、进 view 页、按 row 选样式、截图。
|
||||
返回 (ok, msg)。msg 在失败时是错误说明。
|
||||
"""搜 yopu /song、按 mode 挑候选 view、截图。
|
||||
mode='functional' → 数字级数版;mode='letters' → 字母版(弹唱谱)。
|
||||
返回 (ok, msg)。
|
||||
"""
|
||||
if verbose:
|
||||
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s')
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
|
||||
|
||||
search_url = f"https://yopu.co/explore#q={quote(query)}"
|
||||
driver = None
|
||||
try:
|
||||
driver = setup_driver()
|
||||
result = find_first_chord_chart(driver, search_url)
|
||||
result = find_chart(driver, title, artist, prefer=mode)
|
||||
if not result:
|
||||
_save_debug(driver, 'no-search-hit')
|
||||
return False, '未找到和弦谱'
|
||||
|
||||
Reference in New Issue
Block a user