Files
Fam Zheng 1a8f297302
deploy cube / build-and-deploy (push) Successful in 1m10s
deploy music / build-and-deploy (push) Successful in 1m47s
deploy simpleasm / build-and-deploy (push) Successful in 1m20s
music: 新建 music app,替换 piano-sheet
听歌 + 练琴曲目管理:
- 数据:piece (title/artist/category/lyrics/play_count/notes) + attachments (audio/video/pdf/image; image 带 role=chord/numbered/staff)
- 后端 axum + sqlite,附件流式落 PVC,ServeFile 支持 Range(视频拖动)
- 前端 guitar 风格 player:左 sidebar + tabs(歌词/吉他谱/简谱/五线谱/PDF/视频),LRC 同步、快捷键、笔记自动保存
- ns cube-music + music.famzheng.me + bodylimit 5GiB
- scripts/import_guitar.py 用于把 oci /data/guitar/ 旧曲库导入
2026-05-09 22:36:14 +01:00

223 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
从 /data/guitar/ 把旧 guitar app 的曲库导入到 music API。
跑法(在 oci 上,源数据原地):
python3 import_guitar.py --src /data/guitar --api https://music.famzheng.me
每个 mp3 = 一个 piece
- title / artist 从文件名 "{title} - {artist}.mp3"
- lyrics 来自同名 .lrcUTF-8
- chord png 来自 chords/{sanitize(artist)}-{sanitize(title)}.png(如有)
- play_count 来自 playcounts.json[mp3 文件名]
- category 设为 "流行"(旧 guitar 里都是流行/吉他歌)
幂等:以 (title, artist) 作主键去重,已有 piece 跳过附件重传。
"""
import argparse
import json
import re
import sys
import urllib.request
import urllib.parse
import urllib.error
import os
from pathlib import Path
CATEGORY = "流行"
def sanitize(text: str) -> str:
"""匹配旧 chord_server.py 的 sanitize:替换非法文件名字符为 -"""
return re.sub(r'[<>:"/\\|?*]', '-', text).strip()
def parse_filename(stem: str) -> tuple[str, str]:
"""'{title} - {artist}' → (title, artist)"""
parts = stem.split(' - ')
title = parts[0].strip()
artist = ' - '.join(parts[1:]).strip() if len(parts) > 1 else ''
return title, artist
def http_request(method: str, url: str, *, json_body=None, file_field=None,
timeout: int = 600) -> dict:
"""简单 HTTP 客户端,return JSON dict。失败抛异常。"""
headers = {}
data = None
if json_body is not None:
data = json.dumps(json_body).encode('utf-8')
headers['Content-Type'] = 'application/json'
elif file_field is not None:
boundary = '----music-import-' + os.urandom(8).hex()
body = []
for field_name, filename, content, mime in file_field:
body.append(f'--{boundary}\r\n'.encode())
disp = (f'Content-Disposition: form-data; name="{field_name}"; '
f'filename="{filename}"\r\n').encode()
body.append(disp)
body.append(f'Content-Type: {mime}\r\n\r\n'.encode())
body.append(content)
body.append(b'\r\n')
body.append(f'--{boundary}--\r\n'.encode())
data = b''.join(body)
headers['Content-Type'] = f'multipart/form-data; boundary={boundary}'
req = urllib.request.Request(url, data=data, method=method, headers=headers)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode('utf-8'))
def list_pieces(api: str) -> list:
return http_request('GET', f'{api}/api/pieces')
def create_piece(api: str, title: str, artist: str, lyrics: str | None) -> int:
body = {
'title': title,
'artist': artist or None,
'category': CATEGORY,
'lyrics': lyrics or None,
}
r = http_request('POST', f'{api}/api/pieces', json_body=body)
return int(r['id'])
def patch_play_count(api: str, piece_id: int, count: int):
http_request('PATCH', f'{api}/api/pieces/{piece_id}',
json_body={'play_count': count})
def upload_attachment(api: str, piece_id: int, path: Path,
mime: str, role: str | None = None) -> dict:
url = f'{api}/api/pieces/{piece_id}/attachments'
if role:
url += '?role=' + urllib.parse.quote(role)
bytes_content = path.read_bytes()
return http_request('POST', url, file_field=[
('files', path.name, bytes_content, mime),
])
def detect_audio_mime(path: Path) -> str:
ext = path.suffix.lower()
return {
'.mp3': 'audio/mpeg',
'.m4a': 'audio/mp4',
'.flac': 'audio/flac',
'.ogg': 'audio/ogg',
'.wav': 'audio/wav',
}.get(ext, 'application/octet-stream')
def main():
ap = argparse.ArgumentParser()
ap.add_argument('--src', default='/data/guitar', help='guitar 数据根目录')
ap.add_argument('--api', required=True, help='music API basee.g. https://music.famzheng.me')
ap.add_argument('--dry-run', action='store_true', help='只打印不实际调用')
ap.add_argument('--limit', type=int, default=0, help='测试用,最多导入 N 首(0=全部)')
args = ap.parse_args()
src = Path(args.src)
if not src.exists():
print(f'[!] {src} 不存在', file=sys.stderr)
sys.exit(1)
chords_dir = src / 'chords'
playcounts_file = src / 'playcounts.json'
playcounts = {}
if playcounts_file.exists():
try:
playcounts = json.loads(playcounts_file.read_text())
except Exception as e:
print(f'[!] 读 playcounts.json 失败: {e}', file=sys.stderr)
# 已有 pieces 去重
existing = {}
if not args.dry_run:
try:
for p in list_pieces(args.api):
key = (p['title'], p.get('artist') or '')
existing[key] = p['id']
print(f'[i] 远端已有 {len(existing)} 首曲目,重复的会跳过创建')
except urllib.error.URLError as e:
print(f'[!] 连不上 {args.api}: {e}', file=sys.stderr)
sys.exit(1)
mp3s = sorted(src.glob('*.mp3'))
if args.limit:
mp3s = mp3s[:args.limit]
total = len(mp3s)
ok = skipped = failed = 0
for i, mp3 in enumerate(mp3s, 1):
stem = mp3.stem
title, artist = parse_filename(stem)
if not title:
print(f'[!] [{i}/{total}] 无法解析文件名: {mp3.name}', file=sys.stderr)
failed += 1
continue
key = (title, artist)
prefix = f'[{i}/{total}] {title} - {artist}'
if key in existing:
print(f'{prefix} (已存在 id={existing[key]}, 跳过)')
skipped += 1
continue
# lrc
lrc_path = mp3.with_suffix('.lrc')
lyrics = None
if lrc_path.exists():
try:
lyrics = lrc_path.read_text(encoding='utf-8')
except UnicodeDecodeError:
lyrics = lrc_path.read_text(encoding='gbk', errors='replace')
# chord png
chord_path = chords_dir / f'{sanitize(artist)}-{sanitize(title)}.png'
if not chord_path.exists():
chord_path = None
play_count = int(playcounts.get(mp3.name, 0))
if args.dry_run:
print(f' + {prefix}'
f'{" [词]" if lyrics else ""}'
f'{" [谱]" if chord_path else ""}'
f'{f" [{play_count}次]" if play_count else ""}')
ok += 1
continue
try:
piece_id = create_piece(args.api, title, artist, lyrics)
print(f' + {prefix} → id={piece_id}', end='', flush=True)
upload_attachment(args.api, piece_id, mp3,
detect_audio_mime(mp3))
print(' [audio]', end='', flush=True)
if chord_path:
upload_attachment(args.api, piece_id, chord_path,
'image/png', role='chord')
print(' [chord]', end='', flush=True)
if play_count:
patch_play_count(args.api, piece_id, play_count)
print(f' [{play_count}次]', end='', flush=True)
print()
ok += 1
except Exception as e:
print(f'\n[!] {prefix} 失败: {e}', file=sys.stderr)
failed += 1
print()
print(f'完成: ok={ok} skipped={skipped} failed={failed} total={total}')
if __name__ == '__main__':
main()