#!/usr/bin/env python3 """ 从 /data/guitar/ 把旧 guitar app 的曲库导入到 music API。 跑法(在 oci 上,源数据原地): python3 import_guitar.py --src /data/guitar --api https://music.famzheng.me 每个 mp3 = 一个 piece: - title / artist 从文件名 "{title} - {artist}.mp3" 拆 - lyrics 来自同名 .lrc(UTF-8) - chord png 来自 chords/{sanitize(artist)}-{sanitize(title)}.png(如有) - play_count 来自 playcounts.json[mp3 文件名] - category 设为 "流行"(旧 guitar 里都是流行/吉他歌) 幂等:以 (title, artist) 作主键去重,已有 piece 跳过附件重传。 """ import argparse import json import re import sys import urllib.request import urllib.parse import urllib.error import os from pathlib import Path CATEGORY = "流行" def sanitize(text: str) -> str: """匹配旧 chord_server.py 的 sanitize:替换非法文件名字符为 -""" return re.sub(r'[<>:"/\\|?*]', '-', text).strip() def parse_filename(stem: str) -> tuple[str, str]: """'{title} - {artist}' → (title, artist)""" parts = stem.split(' - ') title = parts[0].strip() artist = ' - '.join(parts[1:]).strip() if len(parts) > 1 else '' return title, artist def http_request(method: str, url: str, *, json_body=None, file_field=None, timeout: int = 600) -> dict: """简单 HTTP 客户端,return JSON dict。失败抛异常。""" headers = {} data = None if json_body is not None: data = json.dumps(json_body).encode('utf-8') headers['Content-Type'] = 'application/json' elif file_field is not None: boundary = '----music-import-' + os.urandom(8).hex() body = [] for field_name, filename, content, mime in file_field: body.append(f'--{boundary}\r\n'.encode()) disp = (f'Content-Disposition: form-data; name="{field_name}"; ' f'filename="{filename}"\r\n').encode() body.append(disp) body.append(f'Content-Type: {mime}\r\n\r\n'.encode()) body.append(content) body.append(b'\r\n') body.append(f'--{boundary}--\r\n'.encode()) data = b''.join(body) headers['Content-Type'] = f'multipart/form-data; boundary={boundary}' req = urllib.request.Request(url, data=data, method=method, headers=headers) with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode('utf-8')) def list_pieces(api: str) -> list: return http_request('GET', f'{api}/api/pieces') def create_piece(api: str, title: str, artist: str, lyrics: str | None) -> int: body = { 'title': title, 'artist': artist or None, 'category': CATEGORY, 'lyrics': lyrics or None, } r = http_request('POST', f'{api}/api/pieces', json_body=body) return int(r['id']) def patch_play_count(api: str, piece_id: int, count: int): http_request('PATCH', f'{api}/api/pieces/{piece_id}', json_body={'play_count': count}) def upload_attachment(api: str, piece_id: int, path: Path, mime: str, role: str | None = None) -> dict: url = f'{api}/api/pieces/{piece_id}/attachments' if role: url += '?role=' + urllib.parse.quote(role) bytes_content = path.read_bytes() return http_request('POST', url, file_field=[ ('files', path.name, bytes_content, mime), ]) def detect_audio_mime(path: Path) -> str: ext = path.suffix.lower() return { '.mp3': 'audio/mpeg', '.m4a': 'audio/mp4', '.flac': 'audio/flac', '.ogg': 'audio/ogg', '.wav': 'audio/wav', }.get(ext, 'application/octet-stream') def main(): ap = argparse.ArgumentParser() ap.add_argument('--src', default='/data/guitar', help='guitar 数据根目录') ap.add_argument('--api', required=True, help='music API base,e.g. https://music.famzheng.me') ap.add_argument('--dry-run', action='store_true', help='只打印不实际调用') ap.add_argument('--limit', type=int, default=0, help='测试用,最多导入 N 首(0=全部)') args = ap.parse_args() src = Path(args.src) if not src.exists(): print(f'[!] {src} 不存在', file=sys.stderr) sys.exit(1) chords_dir = src / 'chords' playcounts_file = src / 'playcounts.json' playcounts = {} if playcounts_file.exists(): try: playcounts = json.loads(playcounts_file.read_text()) except Exception as e: print(f'[!] 读 playcounts.json 失败: {e}', file=sys.stderr) # 已有 pieces 去重 existing = {} if not args.dry_run: try: for p in list_pieces(args.api): key = (p['title'], p.get('artist') or '') existing[key] = p['id'] print(f'[i] 远端已有 {len(existing)} 首曲目,重复的会跳过创建') except urllib.error.URLError as e: print(f'[!] 连不上 {args.api}: {e}', file=sys.stderr) sys.exit(1) mp3s = sorted(src.glob('*.mp3')) if args.limit: mp3s = mp3s[:args.limit] total = len(mp3s) ok = skipped = failed = 0 for i, mp3 in enumerate(mp3s, 1): stem = mp3.stem title, artist = parse_filename(stem) if not title: print(f'[!] [{i}/{total}] 无法解析文件名: {mp3.name}', file=sys.stderr) failed += 1 continue key = (title, artist) prefix = f'[{i}/{total}] {title} - {artist}' if key in existing: print(f' ↻ {prefix} (已存在 id={existing[key]}, 跳过)') skipped += 1 continue # lrc lrc_path = mp3.with_suffix('.lrc') lyrics = None if lrc_path.exists(): try: lyrics = lrc_path.read_text(encoding='utf-8') except UnicodeDecodeError: lyrics = lrc_path.read_text(encoding='gbk', errors='replace') # chord png chord_path = chords_dir / f'{sanitize(artist)}-{sanitize(title)}.png' if not chord_path.exists(): chord_path = None play_count = int(playcounts.get(mp3.name, 0)) if args.dry_run: print(f' + {prefix}' f'{" [词]" if lyrics else ""}' f'{" [谱]" if chord_path else ""}' f'{f" [{play_count}次]" if play_count else ""}') ok += 1 continue try: piece_id = create_piece(args.api, title, artist, lyrics) print(f' + {prefix} → id={piece_id}', end='', flush=True) upload_attachment(args.api, piece_id, mp3, detect_audio_mime(mp3)) print(' [audio]', end='', flush=True) if chord_path: upload_attachment(args.api, piece_id, chord_path, 'image/png', role='chord') print(' [chord]', end='', flush=True) if play_count: patch_play_count(args.api, piece_id, play_count) print(f' [{play_count}次]', end='', flush=True) print() ok += 1 except Exception as e: print(f'\n[!] {prefix} 失败: {e}', file=sys.stderr) failed += 1 print() print(f'完成: ok={ok} skipped={skipped} failed={failed} total={total}') if __name__ == '__main__': main()