Files
cube/apps/music/src/main.rs
T
Fam Zheng ccb5ad05ce
deploy music / build-and-deploy (push) Failing after 1m50s
music(inspire): 加「💡 今天练什么」灵感推荐 modal
- 后端 POST /api/inspire 流式 SSE:随机 keyword 池(23 个)+ 用户曲库画像(recent/top/least)+ Tavily 热点搜索 → gemma stream(temperature=1.0)
- Tavily key 走 k8s Secret tavily-creds(复用 mochi config 同一 token)
- 每次按按钮:keyword 随机 + 用户可输 hint("想练快歌" / "陪儿子" / "新东西")
- 输出强制格式:4 首歌('补回来' 2 + '试试新' 2),每首歌名-歌手 + 一句理由
- 前端 topbar 加 💡 按钮,modal 流式渲染(极简 md:**bold** + 列表)
2026-05-10 15:52:00 +01:00

1820 lines
61 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! music.famzheng.me — 听歌 + 练琴。
//!
//! 数据模型:曲目 (piece) → 附件 (attachment, 类型 video/audio/pdf/image)。
//! 元数据走 sqlite,附件 bytes 落 `/data/blobs/<id>`Range 下载交给 tower-http ServeFile。
//!
//! API
//! - `GET /api/pieces` 列表(含附件计数 + 简要类型分布)
//! - `POST /api/pieces` 创建(json: title, category?, notes?
//! - `GET /api/pieces/:id` 详情(含 attachments 列表)
//! - `PATCH /api/pieces/:id` 改 title / category / notes
//! - `DELETE /api/pieces/:id` 删曲目 + 级联删附件 + 同步删磁盘
//! - `POST /api/pieces/:id/attachments` multipart 流式上传,可一次多文件
//! - `GET /api/attachments/:id` 下载(带 Rangevideo/audio 拖动用)
//! - `DELETE /api/attachments/:id` 删单个附件 + 磁盘文件
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use axum::{
body::Body,
extract::{DefaultBodyLimit, Multipart, Path, Query, Request, State},
http::{header, StatusCode},
response::{
sse::{Event, Sse},
IntoResponse, Json as JsonResp, Response,
},
routing::{delete, get, post},
Router,
};
use futures::Stream;
use std::convert::Infallible;
use rusqlite::{params, Connection, OptionalExtension};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio::io::AsyncWriteExt;
use tower::ServiceExt;
const SINGLE_FILE_BYTES: usize = 1024 * 1024 * 1024; // 1 GiB / 单附件
const REQUEST_BYTES: usize = 5 * 1024 * 1024 * 1024; // 5 GiB / 单次上传
#[derive(Clone)]
struct AppState {
db: Arc<Mutex<Connection>>,
blobs_dir: PathBuf,
/// 同 pod 的 chord-fetcher sidecar root(默认 http://localhost:8001)。
chord_url: String,
http: reqwest::Client,
/// LLM 网关(OpenAI 兼容 /v1)—— 同 mochi/config.yaml。
chat_gateway: String,
chat_token: String,
chat_model: String,
/// Tavily 网络搜索 token(给灵感推荐 endpoint 用)。
tavily_token: String,
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
cube_core::init_tracing();
let db_path = std::env::var("DB_PATH").unwrap_or_else(|_| "/data/app.db".into());
let blobs_dir =
PathBuf::from(std::env::var("BLOBS_DIR").unwrap_or_else(|_| "/data/blobs".into()));
let dist = std::env::var("MUSIC_DIST_DIR").unwrap_or_else(|_| "/dist".into());
std::fs::create_dir_all(&blobs_dir).expect("mkdir blobs_dir");
let conn = Connection::open(&db_path).expect("open sqlite");
conn.execute_batch(
"PRAGMA journal_mode=WAL;
PRAGMA foreign_keys=ON;
CREATE TABLE IF NOT EXISTS pieces (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
artist TEXT,
category TEXT,
notes TEXT,
lyrics TEXT,
play_count INTEGER NOT NULL DEFAULT 0,
last_played_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS attachments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
piece_id INTEGER NOT NULL,
kind TEXT NOT NULL,
role TEXT,
mime TEXT NOT NULL,
filename TEXT NOT NULL,
size_bytes INTEGER NOT NULL DEFAULT 0,
sort_order INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (piece_id) REFERENCES pieces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_att_piece ON attachments(piece_id);
CREATE TABLE IF NOT EXISTS chat_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
piece_id INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (piece_id) REFERENCES pieces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_chat_piece ON chat_messages(piece_id);
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS piece_tags (
piece_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (piece_id, tag_id),
FOREIGN KEY (piece_id) REFERENCES pieces(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_pt_tag ON piece_tags(tag_id);
CREATE TABLE IF NOT EXISTS playlists (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS playlist_pieces (
playlist_id INTEGER NOT NULL,
piece_id INTEGER NOT NULL,
sort_order INTEGER NOT NULL DEFAULT 0,
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (playlist_id, piece_id),
FOREIGN KEY (playlist_id) REFERENCES playlists(id) ON DELETE CASCADE,
FOREIGN KEY (piece_id) REFERENCES pieces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_pp_piece ON playlist_pieces(piece_id);",
)
.expect("init schema");
tracing::info!(%db_path, blobs = %blobs_dir.display(), "music ready");
let chord_url =
std::env::var("CHORD_URL").unwrap_or_else(|_| "http://localhost:8001".into());
let chat_gateway =
std::env::var("CHAT_GATEWAY").unwrap_or_else(|_| "http://3.135.65.204:8848/v1".into());
let chat_token = std::env::var("CHAT_TOKEN").unwrap_or_default();
let chat_model =
std::env::var("CHAT_MODEL").unwrap_or_else(|_| "gemma-4-31b-it".into());
let tavily_token = std::env::var("TAVILY_TOKEN").unwrap_or_default();
// 关键:reqwest 默认 timeout 不要给 chat 用 —— chat stream 必须能跑很久。
// 对 chord sidecar 的小请求另外用 .timeout() per-request。
let http = reqwest::Client::builder()
.build()
.expect("build reqwest client");
let state = AppState {
db: Arc::new(Mutex::new(conn)),
blobs_dir,
chord_url,
http,
chat_gateway,
chat_token,
chat_model,
tavily_token,
};
let api = Router::new()
.route("/pieces", get(list_pieces).post(create_piece))
.route(
"/pieces/:id",
get(get_piece).patch(patch_piece).delete(delete_piece),
)
.route("/pieces/:id/play", post(record_play))
.route("/pieces/:id/chord/fetch", post(chord_fetch))
.route("/pieces/:id/chord/status", get(chord_status))
.route(
"/pieces/:id/chat",
get(list_chat).post(post_chat).delete(clear_chat),
)
.route("/inspire", post(post_inspire))
.route(
"/pieces/:id/attachments",
post(upload_attachments).layer(DefaultBodyLimit::max(REQUEST_BYTES)),
)
.route(
"/attachments/:id",
get(get_attachment).delete(delete_attachment),
)
.route("/tags", get(list_tags).post(create_tag))
.route("/tags/:id", delete(delete_tag))
.route("/playlists", get(list_playlists).post(create_playlist))
.route(
"/playlists/:id",
get(get_playlist).patch(patch_playlist).delete(delete_playlist),
)
.route("/playlists/:id/pieces", post(playlist_add_piece))
.route(
"/playlists/:id/pieces/:piece_id",
delete(playlist_remove_piece),
)
.with_state(state);
let app = cube_core::base(dist).nest("/api", api);
cube_core::serve(app, 8080).await
}
// ---------- 类型 ----------
#[derive(Serialize)]
struct PieceSummary {
id: i64,
title: String,
artist: Option<String>,
category: Option<String>,
play_count: i64,
last_played_at: Option<String>,
attachments: i64,
kinds: Vec<String>,
tags: Vec<String>,
has_lyrics: bool,
created_at: String,
}
#[derive(Serialize)]
struct PieceDetail {
id: i64,
title: String,
artist: Option<String>,
category: Option<String>,
notes: Option<String>,
lyrics: Option<String>,
play_count: i64,
last_played_at: Option<String>,
created_at: String,
attachments: Vec<Attachment>,
tags: Vec<String>,
}
#[derive(Serialize)]
struct Attachment {
id: i64,
kind: String,
role: Option<String>,
mime: String,
filename: String,
size_bytes: i64,
sort_order: i64,
created_at: String,
}
#[derive(Deserialize)]
struct UploadQuery {
role: Option<String>,
}
#[derive(Deserialize)]
struct CreatePiece {
title: String,
artist: Option<String>,
category: Option<String>,
notes: Option<String>,
lyrics: Option<String>,
}
#[derive(Deserialize)]
struct PatchPiece {
title: Option<String>,
artist: Option<Option<String>>,
category: Option<Option<String>>,
notes: Option<Option<String>>,
lyrics: Option<Option<String>>,
/// 整体 replace;空数组等于清空
tags: Option<Vec<String>>,
/// admin / import 用:直接写 play_countmvp 无认证)
play_count: Option<i64>,
}
#[derive(Deserialize, Default)]
struct ListPiecesQuery {
tag: Option<String>,
playlist: Option<i64>,
}
// ---------- handlers: pieces ----------
async fn list_pieces(
State(s): State<AppState>,
Query(q): Query<ListPiecesQuery>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
// 构造可选 filter 条件。每行 join 一次拿到 piecesubquery 单独算 attachments / tags
let (filter_join, filter_where, bind): (&str, &str, Vec<rusqlite::types::Value>) =
if let Some(t) = q.tag.as_deref().filter(|s| !s.is_empty()) {
(
"JOIN piece_tags pt ON pt.piece_id = p.id JOIN tags ft ON ft.id = pt.tag_id",
"WHERE ft.name = ?1",
vec![t.to_string().into()],
)
} else if let Some(pid) = q.playlist {
(
"JOIN playlist_pieces pp ON pp.piece_id = p.id",
"WHERE pp.playlist_id = ?1",
vec![pid.into()],
)
} else {
("", "", vec![])
};
let sql = format!(
"SELECT p.id, p.title, p.artist, p.category,
p.play_count, p.last_played_at, p.created_at,
(SELECT COUNT(*) FROM attachments a WHERE a.piece_id = p.id) AS att_count,
COALESCE((SELECT GROUP_CONCAT(DISTINCT a.kind)
FROM attachments a WHERE a.piece_id = p.id), '') AS kinds,
CASE WHEN p.lyrics IS NOT NULL AND length(p.lyrics) > 0 THEN 1 ELSE 0 END AS has_lyrics,
COALESCE((SELECT GROUP_CONCAT(t.name, char(9))
FROM piece_tags pt2 JOIN tags t ON t.id = pt2.tag_id
WHERE pt2.piece_id = p.id), '') AS tags
FROM pieces p
{filter_join}
{filter_where}
GROUP BY p.id
ORDER BY p.title COLLATE NOCASE ASC, p.id ASC"
);
let mut stmt = conn.prepare(&sql)?;
let bind_refs: Vec<&dyn rusqlite::ToSql> = bind.iter().map(|v| v as &dyn rusqlite::ToSql).collect();
let rows = stmt
.query_map(bind_refs.as_slice(), |r| {
let kinds_csv: String = r.get(8)?;
let kinds = if kinds_csv.is_empty() {
Vec::new()
} else {
kinds_csv.split(',').map(|x| x.to_string()).collect()
};
let tags_raw: String = r.get(10)?;
let tags = if tags_raw.is_empty() {
Vec::new()
} else {
tags_raw.split('\t').map(|x| x.to_string()).collect()
};
let has_lyrics: i64 = r.get(9)?;
Ok(PieceSummary {
id: r.get(0)?,
title: r.get(1)?,
artist: r.get(2)?,
category: r.get(3)?,
play_count: r.get(4)?,
last_played_at: r.get(5)?,
created_at: r.get(6)?,
attachments: r.get(7)?,
kinds,
tags,
has_lyrics: has_lyrics != 0,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(JsonResp(json!(rows)))
}
async fn create_piece(
State(s): State<AppState>,
JsonResp(body): JsonResp<CreatePiece>,
) -> Result<JsonResp<Value>, AppError> {
let title = body.title.trim();
if title.is_empty() {
return Err(AppError::bad_request("title required"));
}
let conn = s.db.lock().unwrap();
conn.execute(
"INSERT INTO pieces (title, artist, category, notes, lyrics)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
title,
body.artist.as_deref().map(str::trim).filter(|s| !s.is_empty()),
body.category.as_deref().map(str::trim).filter(|s| !s.is_empty()),
body.notes.as_deref(),
body.lyrics.as_deref()
],
)?;
let id = conn.last_insert_rowid();
Ok(JsonResp(json!({ "id": id })))
}
async fn get_piece(
State(s): State<AppState>,
Path(id): Path<i64>,
) -> Result<JsonResp<PieceDetail>, AppError> {
let conn = s.db.lock().unwrap();
type PieceRow = (
String,
Option<String>,
Option<String>,
Option<String>,
Option<String>,
i64,
Option<String>,
String,
);
let row: Option<PieceRow> = conn
.query_row(
"SELECT title, artist, category, notes, lyrics, play_count, last_played_at, created_at
FROM pieces WHERE id = ?1",
params![id],
|r| {
Ok((
r.get(0)?,
r.get(1)?,
r.get(2)?,
r.get(3)?,
r.get(4)?,
r.get(5)?,
r.get(6)?,
r.get(7)?,
))
},
)
.optional()?;
let (title, artist, category, notes, lyrics, play_count, last_played_at, created_at) =
row.ok_or(AppError::NotFound)?;
let mut stmt = conn.prepare(
"SELECT id, kind, role, mime, filename, size_bytes, sort_order, created_at
FROM attachments
WHERE piece_id = ?1
ORDER BY sort_order ASC, id ASC",
)?;
let attachments = stmt
.query_map(params![id], |r| {
Ok(Attachment {
id: r.get(0)?,
kind: r.get(1)?,
role: r.get(2)?,
mime: r.get(3)?,
filename: r.get(4)?,
size_bytes: r.get(5)?,
sort_order: r.get(6)?,
created_at: r.get(7)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
let mut tag_stmt = conn.prepare(
"SELECT t.name FROM piece_tags pt JOIN tags t ON t.id = pt.tag_id
WHERE pt.piece_id = ?1 ORDER BY t.name COLLATE NOCASE",
)?;
let tags: Vec<String> = tag_stmt
.query_map(params![id], |r| r.get::<_, String>(0))?
.collect::<Result<Vec<_>, _>>()?;
Ok(JsonResp(PieceDetail {
id,
title,
artist,
category,
notes,
lyrics,
play_count,
last_played_at,
created_at,
attachments,
tags,
}))
}
async fn patch_piece(
State(s): State<AppState>,
Path(id): Path<i64>,
JsonResp(body): JsonResp<PatchPiece>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let exists: bool = conn
.query_row("SELECT 1 FROM pieces WHERE id = ?1", params![id], |_| {
Ok(true)
})
.optional()?
.unwrap_or(false);
if !exists {
return Err(AppError::NotFound);
}
if let Some(title) = body.title.as_ref() {
let t = title.trim();
if t.is_empty() {
return Err(AppError::bad_request("title can't be blank"));
}
conn.execute("UPDATE pieces SET title = ?1 WHERE id = ?2", params![t, id])?;
}
if let Some(artist) = body.artist {
let artist = artist.as_deref().map(str::trim).filter(|s| !s.is_empty());
conn.execute(
"UPDATE pieces SET artist = ?1 WHERE id = ?2",
params![artist, id],
)?;
}
if let Some(cat) = body.category {
let cat = cat.as_deref().map(str::trim).filter(|s| !s.is_empty());
conn.execute(
"UPDATE pieces SET category = ?1 WHERE id = ?2",
params![cat, id],
)?;
}
if let Some(notes) = body.notes {
let notes = notes.as_deref();
conn.execute(
"UPDATE pieces SET notes = ?1 WHERE id = ?2",
params![notes, id],
)?;
}
if let Some(lyrics) = body.lyrics {
let lyrics = lyrics.as_deref();
conn.execute(
"UPDATE pieces SET lyrics = ?1 WHERE id = ?2",
params![lyrics, id],
)?;
}
if let Some(pc) = body.play_count {
conn.execute(
"UPDATE pieces SET play_count = ?1 WHERE id = ?2",
params![pc, id],
)?;
}
if let Some(tags) = body.tags {
conn.execute(
"DELETE FROM piece_tags WHERE piece_id = ?1",
params![id],
)?;
for name in tags {
let trimmed = name.trim();
if trimmed.is_empty() {
continue;
}
let tag_id = upsert_tag(&conn, trimmed)?;
conn.execute(
"INSERT OR IGNORE INTO piece_tags (piece_id, tag_id) VALUES (?1, ?2)",
params![id, tag_id],
)?;
}
}
Ok(JsonResp(json!({ "ok": true })))
}
async fn record_play(
State(s): State<AppState>,
Path(id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let n = conn.execute(
"UPDATE pieces
SET play_count = play_count + 1, last_played_at = CURRENT_TIMESTAMP
WHERE id = ?1",
params![id],
)?;
if n == 0 {
return Err(AppError::NotFound);
}
let count: i64 = conn.query_row(
"SELECT play_count FROM pieces WHERE id = ?1",
params![id],
|r| r.get(0),
)?;
Ok(JsonResp(json!({ "play_count": count })))
}
async fn delete_piece(
State(s): State<AppState>,
Path(id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let to_unlink: Vec<i64> = {
let conn = s.db.lock().unwrap();
let mut stmt =
conn.prepare("SELECT id FROM attachments WHERE piece_id = ?1")?;
let ids: Vec<i64> = stmt
.query_map(params![id], |r| r.get(0))?
.collect::<Result<Vec<_>, _>>()?;
let n = conn.execute("DELETE FROM pieces WHERE id = ?1", params![id])?;
if n == 0 {
return Err(AppError::NotFound);
}
ids
};
for aid in to_unlink {
let _ = tokio::fs::remove_file(s.blobs_dir.join(aid.to_string())).await;
}
Ok(JsonResp(json!({ "ok": true })))
}
// ---------- handlers: chat ----------
#[derive(Serialize)]
struct ChatMessage {
id: i64,
role: String,
content: String,
created_at: String,
}
#[derive(Deserialize)]
struct PostChat {
message: String,
}
async fn list_chat(
State(s): State<AppState>,
Path(piece_id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id, role, content, created_at FROM chat_messages
WHERE piece_id = ?1 ORDER BY id ASC",
)?;
let rows = stmt
.query_map(params![piece_id], |r| {
Ok(ChatMessage {
id: r.get(0)?,
role: r.get(1)?,
content: r.get(2)?,
created_at: r.get(3)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(JsonResp(json!(rows)))
}
async fn clear_chat(
State(s): State<AppState>,
Path(piece_id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
conn.execute(
"DELETE FROM chat_messages WHERE piece_id = ?1",
params![piece_id],
)?;
Ok(JsonResp(json!({ "ok": true })))
}
/// `POST /api/pieces/:id/chat` — body {"message": "..."},返回 SSE 流
/// 每个 event data 是文本片段(assistant delta content)。结束时 emit 一个 `done` event。
async fn post_chat(
State(s): State<AppState>,
Path(piece_id): Path<i64>,
JsonResp(body): JsonResp<PostChat>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, AppError> {
let user_msg = body.message.trim().to_string();
if user_msg.is_empty() {
return Err(AppError::bad_request("message required"));
}
if s.chat_token.is_empty() {
return Err(AppError::bad_request("CHAT_TOKEN not configured"));
}
// 拼 messagessystem + history + 新 user
let (system_prompt, history) = build_chat_context(&s, piece_id)?;
let mut openai_messages: Vec<Value> = Vec::new();
if !system_prompt.is_empty() {
openai_messages.push(json!({ "role": "system", "content": system_prompt }));
}
for m in &history {
openai_messages.push(json!({ "role": m.role, "content": m.content }));
}
openai_messages.push(json!({ "role": "user", "content": user_msg }));
let payload = json!({
"model": s.chat_model,
"messages": openai_messages,
"stream": true,
});
let url = format!("{}/chat/completions", s.chat_gateway.trim_end_matches('/'));
let req = s
.http
.post(&url)
.bearer_auth(&s.chat_token)
.json(&payload);
// 先存用户消息(不等 LLM 完)
{
let conn = s.db.lock().unwrap();
conn.execute(
"INSERT INTO chat_messages (piece_id, role, content) VALUES (?1, 'user', ?2)",
params![piece_id, &user_msg],
)?;
}
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Event, Infallible>>(64);
let state_clone = s.clone();
tokio::spawn(async move {
let mut full = String::new();
match req.send().await {
Ok(resp) if resp.status().is_success() => {
use futures::StreamExt;
let mut stream = resp.bytes_stream();
let mut buf = String::new();
while let Some(chunk) = stream.next().await {
let chunk = match chunk {
Ok(b) => b,
Err(e) => {
let _ = tx
.send(Ok(Event::default()
.event("error")
.data(format!("stream: {e}"))))
.await;
break;
}
};
buf.push_str(&String::from_utf8_lossy(&chunk));
while let Some(idx) = buf.find('\n') {
let line = buf[..idx].trim().to_string();
buf.drain(..=idx);
let Some(payload) = line.strip_prefix("data:") else {
continue;
};
let payload = payload.trim();
if payload.is_empty() {
continue;
}
if payload == "[DONE]" {
break;
}
match serde_json::from_str::<Value>(payload) {
Ok(v) => {
if let Some(delta) = v
.get("choices")
.and_then(|c| c.get(0))
.and_then(|c| c.get("delta"))
.and_then(|d| d.get("content"))
.and_then(|c| c.as_str())
{
if !delta.is_empty() {
full.push_str(delta);
if tx
.send(Ok(Event::default().data(delta.to_string())))
.await
.is_err()
{
// client gone
return;
}
}
}
}
Err(e) => {
tracing::warn!(error = %e, raw = %payload, "chat: bad delta json");
}
}
}
}
}
Ok(resp) => {
let st = resp.status();
let body = resp.text().await.unwrap_or_default();
let _ = tx
.send(Ok(Event::default()
.event("error")
.data(format!("gateway {st}: {body}"))))
.await;
}
Err(e) => {
let _ = tx
.send(Ok(Event::default()
.event("error")
.data(format!("connect: {e}"))))
.await;
}
}
// 持久化 assistant
if !full.is_empty() {
let conn = state_clone.db.lock().unwrap();
let _ = conn.execute(
"INSERT INTO chat_messages (piece_id, role, content) VALUES (?1, 'assistant', ?2)",
params![piece_id, &full],
);
}
let _ = tx.send(Ok(Event::default().event("done").data(""))).await;
});
Ok(Sse::new(tokio_stream::wrappers::ReceiverStream::new(rx))
.keep_alive(axum::response::sse::KeepAlive::default()))
}
fn build_chat_context(
s: &AppState,
piece_id: i64,
) -> Result<(String, Vec<ChatMessage>), AppError> {
let conn = s.db.lock().unwrap();
type Row = (
String,
Option<String>,
Option<String>,
Option<String>,
Option<String>,
);
let row: Option<Row> = conn
.query_row(
"SELECT title, artist, category, lyrics, notes FROM pieces WHERE id = ?1",
params![piece_id],
|r| {
Ok((
r.get(0)?,
r.get(1)?,
r.get(2)?,
r.get(3)?,
r.get(4)?,
))
},
)
.optional()?;
let (title, artist, category, lyrics, notes) = row.ok_or(AppError::NotFound)?;
// 仅注入当前曲目上下文,不赋予 LLM 任何角色 / 人格。
let mut sys = String::from("Context: 用户正在查看以下曲目。\n");
sys.push_str(&format!("Title: {}\n", title));
if let Some(a) = artist.as_deref().filter(|s| !s.is_empty()) {
sys.push_str(&format!("Artist: {}\n", a));
}
if let Some(c) = category.as_deref().filter(|s| !s.is_empty()) {
sys.push_str(&format!("Category: {}\n", c));
}
if let Some(n) = notes.as_deref().filter(|s| !s.is_empty()) {
sys.push_str(&format!("User notes: {}\n", n));
}
if let Some(l) = lyrics.as_deref().filter(|s| !s.is_empty()) {
// LRC 太长会爆 prompt,截到 4KB
let trimmed = if l.len() > 4096 { &l[..4096] } else { l };
sys.push_str(&format!("Lyrics (truncated to 4KB):\n{}\n", trimmed));
}
let mut stmt = conn.prepare(
"SELECT id, role, content, created_at FROM chat_messages
WHERE piece_id = ?1 ORDER BY id ASC",
)?;
let history = stmt
.query_map(params![piece_id], |r| {
Ok(ChatMessage {
id: r.get(0)?,
role: r.get(1)?,
content: r.get(2)?,
created_at: r.get(3)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok((sys, history))
}
// ---------- handlers: tags ----------
async fn list_tags(State(s): State<AppState>) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT t.id, t.name, COUNT(pt.piece_id) AS n
FROM tags t LEFT JOIN piece_tags pt ON pt.tag_id = t.id
GROUP BY t.id ORDER BY t.name COLLATE NOCASE ASC",
)?;
let rows: Vec<Value> = stmt
.query_map([], |r| {
Ok(json!({
"id": r.get::<_, i64>(0)?,
"name": r.get::<_, String>(1)?,
"count": r.get::<_, i64>(2)?,
}))
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(JsonResp(json!(rows)))
}
#[derive(Deserialize)]
struct CreateTag {
name: String,
}
async fn create_tag(
State(s): State<AppState>,
JsonResp(body): JsonResp<CreateTag>,
) -> Result<JsonResp<Value>, AppError> {
let name = body.name.trim();
if name.is_empty() {
return Err(AppError::bad_request("name required"));
}
let conn = s.db.lock().unwrap();
let id = upsert_tag(&conn, name)?;
Ok(JsonResp(json!({ "id": id, "name": name })))
}
fn upsert_tag(conn: &Connection, name: &str) -> Result<i64, rusqlite::Error> {
conn.execute(
"INSERT INTO tags (name) VALUES (?1) ON CONFLICT(name) DO NOTHING",
params![name],
)?;
conn.query_row("SELECT id FROM tags WHERE name = ?1", params![name], |r| r.get(0))
}
async fn delete_tag(
State(s): State<AppState>,
Path(id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let n = conn.execute("DELETE FROM tags WHERE id = ?1", params![id])?;
if n == 0 {
return Err(AppError::NotFound);
}
Ok(JsonResp(json!({ "ok": true })))
}
// ---------- handlers: playlists ----------
async fn list_playlists(State(s): State<AppState>) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT p.id, p.name, p.description, p.created_at,
COUNT(pp.piece_id) AS n
FROM playlists p LEFT JOIN playlist_pieces pp ON pp.playlist_id = p.id
GROUP BY p.id ORDER BY p.created_at DESC, p.id DESC",
)?;
let rows: Vec<Value> = stmt
.query_map([], |r| {
Ok(json!({
"id": r.get::<_, i64>(0)?,
"name": r.get::<_, String>(1)?,
"description": r.get::<_, Option<String>>(2)?,
"created_at": r.get::<_, String>(3)?,
"count": r.get::<_, i64>(4)?,
}))
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(JsonResp(json!(rows)))
}
#[derive(Deserialize)]
struct CreatePlaylist {
name: String,
description: Option<String>,
}
async fn create_playlist(
State(s): State<AppState>,
JsonResp(body): JsonResp<CreatePlaylist>,
) -> Result<JsonResp<Value>, AppError> {
let name = body.name.trim();
if name.is_empty() {
return Err(AppError::bad_request("name required"));
}
let conn = s.db.lock().unwrap();
conn.execute(
"INSERT INTO playlists (name, description) VALUES (?1, ?2)",
params![name, body.description.as_deref()],
)?;
Ok(JsonResp(json!({ "id": conn.last_insert_rowid() })))
}
async fn get_playlist(
State(s): State<AppState>,
Path(id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let meta: Option<(String, Option<String>, String)> = conn
.query_row(
"SELECT name, description, created_at FROM playlists WHERE id = ?1",
params![id],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
)
.optional()?;
let (name, description, created_at) = meta.ok_or(AppError::NotFound)?;
let mut stmt = conn.prepare(
"SELECT p.id, p.title, p.artist, p.category, p.play_count, p.last_played_at,
p.created_at,
(SELECT COUNT(*) FROM attachments a WHERE a.piece_id = p.id) AS att_count,
(SELECT COALESCE(GROUP_CONCAT(DISTINCT a.kind), '')
FROM attachments a WHERE a.piece_id = p.id) AS kinds,
CASE WHEN p.lyrics IS NOT NULL AND length(p.lyrics) > 0 THEN 1 ELSE 0 END,
pp.sort_order
FROM playlist_pieces pp JOIN pieces p ON p.id = pp.piece_id
WHERE pp.playlist_id = ?1
ORDER BY pp.sort_order ASC, pp.added_at ASC",
)?;
let pieces: Vec<Value> = stmt
.query_map(params![id], |r| {
let kinds_csv: String = r.get(8)?;
let kinds: Vec<&str> = if kinds_csv.is_empty() {
Vec::new()
} else {
kinds_csv.split(',').collect()
};
let has_lyrics: i64 = r.get(9)?;
Ok(json!({
"id": r.get::<_, i64>(0)?,
"title": r.get::<_, String>(1)?,
"artist": r.get::<_, Option<String>>(2)?,
"category": r.get::<_, Option<String>>(3)?,
"play_count": r.get::<_, i64>(4)?,
"last_played_at": r.get::<_, Option<String>>(5)?,
"created_at": r.get::<_, String>(6)?,
"attachments": r.get::<_, i64>(7)?,
"kinds": kinds,
"has_lyrics": has_lyrics != 0,
"sort_order": r.get::<_, i64>(10)?,
}))
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(JsonResp(json!({
"id": id,
"name": name,
"description": description,
"created_at": created_at,
"pieces": pieces,
})))
}
#[derive(Deserialize)]
struct PatchPlaylist {
name: Option<String>,
description: Option<Option<String>>,
}
async fn patch_playlist(
State(s): State<AppState>,
Path(id): Path<i64>,
JsonResp(body): JsonResp<PatchPlaylist>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let exists: bool = conn
.query_row("SELECT 1 FROM playlists WHERE id = ?1", params![id], |_| Ok(true))
.optional()?
.unwrap_or(false);
if !exists {
return Err(AppError::NotFound);
}
if let Some(n) = body.name.as_ref() {
let n = n.trim();
if n.is_empty() {
return Err(AppError::bad_request("name can't be blank"));
}
conn.execute(
"UPDATE playlists SET name = ?1 WHERE id = ?2",
params![n, id],
)?;
}
if let Some(d) = body.description {
conn.execute(
"UPDATE playlists SET description = ?1 WHERE id = ?2",
params![d.as_deref(), id],
)?;
}
Ok(JsonResp(json!({ "ok": true })))
}
async fn delete_playlist(
State(s): State<AppState>,
Path(id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let n = conn.execute("DELETE FROM playlists WHERE id = ?1", params![id])?;
if n == 0 {
return Err(AppError::NotFound);
}
Ok(JsonResp(json!({ "ok": true })))
}
#[derive(Deserialize)]
struct AddPiece {
piece_id: i64,
}
async fn playlist_add_piece(
State(s): State<AppState>,
Path(id): Path<i64>,
JsonResp(body): JsonResp<AddPiece>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
conn.execute(
"INSERT INTO playlist_pieces (playlist_id, piece_id, sort_order)
VALUES (?1, ?2,
COALESCE((SELECT MAX(sort_order) FROM playlist_pieces WHERE playlist_id = ?1), 0) + 1)
ON CONFLICT(playlist_id, piece_id) DO NOTHING",
params![id, body.piece_id],
)?;
Ok(JsonResp(json!({ "ok": true })))
}
async fn playlist_remove_piece(
State(s): State<AppState>,
Path((id, piece_id)): Path<(i64, i64)>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
conn.execute(
"DELETE FROM playlist_pieces WHERE playlist_id = ?1 AND piece_id = ?2",
params![id, piece_id],
)?;
Ok(JsonResp(json!({ "ok": true })))
}
// ---------- handlers: inspire (今天练什么) ----------
/// keyword pool —— 每次按按钮随机抽 1 个,再加自由 hint,配合 Tavily
/// 让搜索结果天然多变;同时高 temperature gemma 输出,避免推荐固定。
const INSPIRE_KEYWORDS: &[&str] = &[
"2026 华语流行 推荐",
"豆瓣高分 华语 2025",
"B 站 翻唱 热门",
"适合 吉他 弹唱 入门",
"指弹 吉他 名曲 推荐",
"广东话 流行 经典 推荐",
"民谣 治愈系 中文",
"宝藏 华语 歌手",
"华语 R&B 推荐",
"国风 流行 融合",
"近期 KTV 热门 华语",
"伤感 慢歌 弹唱",
"摇滚 简单 三和弦 弹唱",
"经典 老歌 重听",
"lounge 爵士 中文",
"独立 音乐 newcomer",
"华语 新人 2025",
"钢琴 弹唱 抒情",
"城市 民谣 推荐",
"电子 dream pop 中文",
"爵士 标准曲 入门",
"中文 朋克 lo-fi",
"蓝调 入门 学吉他",
];
#[derive(Deserialize, Default)]
struct InspireBody {
/// 用户附加意图,比如"想练快歌" / "今晚陪儿子" / "新东西"。可空。
hint: Option<String>,
}
#[derive(Serialize)]
struct PieceHint {
title: String,
artist: String,
play_count: i64,
}
async fn post_inspire(
State(s): State<AppState>,
JsonResp(body): JsonResp<InspireBody>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, AppError> {
if s.chat_token.is_empty() {
return Err(AppError::bad_request("CHAT_TOKEN not configured"));
}
// 1) 随机选 1 个 keyword
let now_ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0);
let kw = {
let idx = (now_ts as usize) % INSPIRE_KEYWORDS.len();
INSPIRE_KEYWORDS[idx]
};
let hint = body
.hint
.as_deref()
.map(|s| s.trim())
.filter(|s| !s.is_empty());
let query = match hint {
Some(h) => format!("{kw} · 角度:{h}"),
None => kw.to_string(),
};
// 2) Tavily 搜热点(5 条;失败不致命)
let web_results = if s.tavily_token.is_empty() {
Vec::new()
} else {
match s
.http
.post("https://api.tavily.com/search")
.timeout(std::time::Duration::from_secs(10))
.json(&json!({
"api_key": s.tavily_token,
"query": query,
"max_results": 5,
"search_depth": "basic",
"include_answer": false,
}))
.send()
.await
{
Ok(r) if r.status().is_success() => {
let v: Value = r.json().await.unwrap_or(json!({}));
v.get("results")
.and_then(|x| x.as_array())
.cloned()
.unwrap_or_default()
}
Ok(r) => {
tracing::warn!(status = %r.status(), "tavily non-200");
Vec::new()
}
Err(e) => {
tracing::warn!(error = %e, "tavily call failed");
Vec::new()
}
}
};
// 3) 用户曲库画像(top played / least / recent
let (recent, top_played, least_played, tags_top, n_total) = {
let conn = s.db.lock().unwrap();
let recent: Vec<PieceHint> = conn
.prepare(
"SELECT title, COALESCE(artist, ''), play_count FROM pieces
WHERE last_played_at IS NOT NULL
ORDER BY last_played_at DESC LIMIT 5",
)?
.query_map([], |r| Ok(PieceHint {
title: r.get(0)?, artist: r.get(1)?, play_count: r.get(2)?,
}))?
.collect::<Result<Vec<_>, _>>()?;
let top: Vec<PieceHint> = conn
.prepare(
"SELECT title, COALESCE(artist, ''), play_count FROM pieces
WHERE play_count > 0 ORDER BY play_count DESC LIMIT 5",
)?
.query_map([], |r| Ok(PieceHint {
title: r.get(0)?, artist: r.get(1)?, play_count: r.get(2)?,
}))?
.collect::<Result<Vec<_>, _>>()?;
let least: Vec<PieceHint> = conn
.prepare(
"SELECT title, COALESCE(artist, ''), play_count FROM pieces
ORDER BY play_count ASC, RANDOM() LIMIT 5",
)?
.query_map([], |r| Ok(PieceHint {
title: r.get(0)?, artist: r.get(1)?, play_count: r.get(2)?,
}))?
.collect::<Result<Vec<_>, _>>()?;
let tags: Vec<String> = conn
.prepare(
"SELECT t.name FROM tags t JOIN piece_tags pt ON pt.tag_id = t.id
GROUP BY t.id ORDER BY COUNT(*) DESC LIMIT 5",
)?
.query_map([], |r| r.get::<_, String>(0))?
.collect::<Result<Vec<_>, _>>()?;
let total: i64 = conn
.query_row("SELECT COUNT(*) FROM pieces", [], |r| r.get(0))?;
(recent, top, least, tags, total)
};
// 4) 构造 system prompt + user prompt
let mut sys = String::from(
"你是音乐推荐助手。基于用户曲库画像和今天的网络热点,推荐 3-5 首具体曲目。\n\
规则:\n\
- 必须推 (歌名 - 歌手) 二元组,不要含糊\n\
- 一半推用户曲库里冷门或久未碰的('补回来'型,标 [📚]\n\
- 一半推网络热点 / 用户没的('试试新'型,标 [✨]\n\
- 每首一句话理由,要具体(说为啥适合现在 / 关联用户偏好 / 编曲特色)\n\
- 不要重复每次的开场白;直奔主题;中文回答\n\
- markdown,每首一行:- **歌名 - 歌手** [📚或✨] 理由",
);
sys.push_str(&format!("\n\n(用户曲库共 {n_total} 首;标签偏好:{}", tags_top.join("")));
let mut user_msg = String::new();
user_msg.push_str(&format!("今天关键词:{query}\n\n"));
if !web_results.is_empty() {
user_msg.push_str("网络热点(前 5 条):\n");
for r in &web_results {
let t = r.get("title").and_then(|v| v.as_str()).unwrap_or("");
let c = r.get("content").and_then(|v| v.as_str()).unwrap_or("");
user_msg.push_str(&format!("- {} :: {}\n", t.trim(), c.trim().chars().take(180).collect::<String>()));
}
user_msg.push('\n');
}
user_msg.push_str("用户曲库画像:\n");
user_msg.push_str(&format!("- 最近常练:{}\n", fmt_pieces(&recent)));
user_msg.push_str(&format!("- 最爱回听:{}\n", fmt_pieces(&top_played)));
user_msg.push_str(&format!("- 收藏但久没碰:{}\n", fmt_pieces(&least_played)));
user_msg.push_str(&format!(
"\n现在 {} 时刻。给我 4 首歌('补回来' 2 + '试试新' 2 推荐),开门见山。",
chrono_like(now_ts),
));
// 5) 用 OpenAI 兼容 stream 调 gemma
let payload = json!({
"model": s.chat_model,
"messages": [
{ "role": "system", "content": sys },
{ "role": "user", "content": user_msg },
],
"stream": true,
"temperature": 1.0,
"top_p": 0.95,
});
let url = format!("{}/chat/completions", s.chat_gateway.trim_end_matches('/'));
let req = s.http.post(&url).bearer_auth(&s.chat_token).json(&payload);
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Event, Infallible>>(64);
tokio::spawn(async move {
match req.send().await {
Ok(resp) if resp.status().is_success() => {
use futures::StreamExt;
let mut stream = resp.bytes_stream();
let mut buf = String::new();
while let Some(chunk) = stream.next().await {
let chunk = match chunk {
Ok(b) => b,
Err(e) => {
let _ = tx.send(Ok(Event::default().event("error").data(format!("stream: {e}")))).await;
break;
}
};
buf.push_str(&String::from_utf8_lossy(&chunk));
while let Some(idx) = buf.find('\n') {
let line = buf[..idx].trim().to_string();
buf.drain(..=idx);
let Some(payload) = line.strip_prefix("data:") else { continue };
let payload = payload.trim();
if payload.is_empty() { continue; }
if payload == "[DONE]" { break; }
if let Ok(v) = serde_json::from_str::<Value>(payload) {
if let Some(delta) = v.get("choices").and_then(|c| c.get(0))
.and_then(|c| c.get("delta")).and_then(|d| d.get("content"))
.and_then(|c| c.as_str())
{
if !delta.is_empty() && tx.send(Ok(Event::default().data(delta.to_string()))).await.is_err() {
return;
}
}
}
}
}
}
Ok(resp) => {
let st = resp.status();
let body = resp.text().await.unwrap_or_default();
let _ = tx.send(Ok(Event::default().event("error").data(format!("gateway {st}: {body}")))).await;
}
Err(e) => {
let _ = tx.send(Ok(Event::default().event("error").data(format!("connect: {e}")))).await;
}
}
let _ = tx.send(Ok(Event::default().event("done").data(""))).await;
});
Ok(Sse::new(tokio_stream::wrappers::ReceiverStream::new(rx))
.keep_alive(axum::response::sse::KeepAlive::default()))
}
fn fmt_pieces(items: &[PieceHint]) -> String {
if items.is_empty() {
return "(无)".to_string();
}
items
.iter()
.map(|p| {
let a = if p.artist.is_empty() {
String::new()
} else {
format!(" - {}", p.artist)
};
format!("{}{}({}次)", p.title, a, p.play_count)
})
.collect::<Vec<_>>()
.join("")
}
fn chrono_like(now_ms: u128) -> String {
// 不引 chrono,简化展示,让 gemma 知道是新一轮 + 大致时间段
let secs = (now_ms / 1000) as i64;
let h = ((secs / 3600) % 24 + 8) % 24; // CST 偏移大致用,准不准无所谓
let bucket = match h {
5..=10 => "清晨",
11..=13 => "中午",
14..=17 => "下午",
18..=21 => "晚上",
_ => "深夜",
};
format!("{}unix {}", bucket, now_ms)
}
// ---------- handlers: chord auto-fetch ----------
#[derive(Deserialize)]
struct ChordModeQuery {
/// 'letters' = 弹唱谱字母版;'functional' = 数字级数版。默认 functional 兼容旧调用。
mode: Option<String>,
}
fn chord_mode_to_role(mode: &str) -> &'static str {
match mode {
"letters" => "chord_letters",
_ => "chord_functional",
}
}
fn parse_mode(q: &ChordModeQuery) -> Result<&'static str, AppError> {
let m = q.mode.as_deref().unwrap_or("functional");
match m {
"letters" => Ok("letters"),
"functional" => Ok("functional"),
other => Err(AppError::bad_request(format!(
"mode must be 'letters' or 'functional', got '{other}'"
))),
}
}
/// `POST /api/pieces/:id/chord/fetch?mode=letters|functional` — 触发 sidecar 抓 yopu 谱。
/// 已有该 mode 的 attachment 直接 completed。
async fn chord_fetch(
State(s): State<AppState>,
Path(piece_id): Path<i64>,
Query(q): Query<ChordModeQuery>,
) -> Result<JsonResp<Value>, AppError> {
let mode = parse_mode(&q)?;
let (title, artist, has) = chord_piece_meta(&s, piece_id, mode)?;
if has {
return Ok(JsonResp(json!({ "status": "completed", "mode": mode, "reason": "already imported" })));
}
let url = format!("{}/fetch", s.chord_url);
let resp = s
.http
.post(&url)
.query(&[
("piece_id", piece_id.to_string()),
("title", title),
("artist", artist.unwrap_or_default()),
("mode", mode.to_string()),
])
.timeout(std::time::Duration::from_secs(15))
.send()
.await
.map_err(|e| AppError::sidecar(format!("post fetch: {e}")))?;
if !resp.status().is_success() {
let st = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(AppError::sidecar(format!("sidecar {st}: {body}")));
}
let body: Value = resp
.json()
.await
.map_err(|e| AppError::sidecar(format!("decode: {e}")))?;
Ok(JsonResp(body))
}
/// `GET /api/pieces/:id/chord/status?mode=letters|functional`
async fn chord_status(
State(s): State<AppState>,
Path(piece_id): Path<i64>,
Query(q): Query<ChordModeQuery>,
) -> Result<JsonResp<Value>, AppError> {
let mode = parse_mode(&q)?;
let (_title, _artist, has) = chord_piece_meta(&s, piece_id, mode)?;
if has {
return Ok(JsonResp(json!({ "status": "completed", "mode": mode, "imported": true })));
}
let url = format!("{}/status/{}/{}", s.chord_url, piece_id, mode);
let resp = s
.http
.get(&url)
.timeout(std::time::Duration::from_secs(10))
.send()
.await
.map_err(|e| AppError::sidecar(format!("get status: {e}")))?;
if !resp.status().is_success() {
return Err(AppError::sidecar(format!("sidecar status: {}", resp.status())));
}
let body: Value = resp
.json()
.await
.map_err(|e| AppError::sidecar(format!("decode: {e}")))?;
let st = body.get("status").and_then(|v| v.as_str()).unwrap_or("none");
let file_exists = body
.get("file_exists")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if st == "completed" && file_exists {
let attachment_id = import_chord_png(&s, piece_id, mode).await?;
let _ = s
.http
.delete(format!("{}/state/{}/{}", s.chord_url, piece_id, mode))
.timeout(std::time::Duration::from_secs(5))
.send()
.await;
return Ok(JsonResp(json!({
"status": "completed",
"mode": mode,
"imported": true,
"attachment_id": attachment_id,
})));
}
Ok(JsonResp(body))
}
fn chord_piece_meta(
s: &AppState,
piece_id: i64,
mode: &str,
) -> Result<(String, Option<String>, bool), AppError> {
let conn = s.db.lock().unwrap();
let row: Option<(String, Option<String>)> = conn
.query_row(
"SELECT title, artist FROM pieces WHERE id = ?1",
params![piece_id],
|r| Ok((r.get(0)?, r.get(1)?)),
)
.optional()?;
let (title, artist) = row.ok_or(AppError::NotFound)?;
let role = chord_mode_to_role(mode);
let has: bool = conn
.query_row(
"SELECT 1 FROM attachments
WHERE piece_id = ?1 AND kind = 'image' AND role = ?2 LIMIT 1",
params![piece_id, role],
|_| Ok(true),
)
.optional()?
.unwrap_or(false);
Ok((title, artist, has))
}
async fn import_chord_png(s: &AppState, piece_id: i64, mode: &str) -> Result<i64, AppError> {
let src = std::path::PathBuf::from(format!("/data/chord-fetch/{piece_id}-{mode}.png"));
let meta = tokio::fs::metadata(&src).await.map_err(AppError::Io)?;
let size = meta.len() as i64;
let role = chord_mode_to_role(mode);
let filename = format!("{role}.png");
let attachment_id = {
let conn = s.db.lock().unwrap();
conn.execute(
"INSERT INTO attachments
(piece_id, kind, role, mime, filename, size_bytes, sort_order)
VALUES (?1, 'image', ?2, 'image/png', ?3, ?4,
COALESCE((SELECT MAX(sort_order) FROM attachments WHERE piece_id = ?1), 0) + 1)",
params![piece_id, role, filename, size],
)?;
conn.last_insert_rowid()
};
let dst = s.blobs_dir.join(attachment_id.to_string());
if let Err(e) = tokio::fs::copy(&src, &dst).await {
let conn = s.db.lock().unwrap();
let _ = conn.execute("DELETE FROM attachments WHERE id = ?1", params![attachment_id]);
return Err(AppError::Io(e));
}
Ok(attachment_id)
}
// ---------- handlers: attachments ----------
/// `POST /api/pieces/:id/attachments?role=chord|numbered|staff` — multipart 流式上传。
/// 每个 file field(任意 name= 一个附件,`role` query 给整批文件。
async fn upload_attachments(
State(s): State<AppState>,
Path(piece_id): Path<i64>,
Query(q): Query<UploadQuery>,
mut form: Multipart,
) -> Result<JsonResp<Value>, AppError> {
let role = match q.role.as_deref().map(str::trim).filter(|s| !s.is_empty()) {
None => None,
Some(r)
if matches!(
r,
"chord" | "chord_letters" | "chord_functional" | "numbered" | "staff"
) =>
{
Some(r.to_string())
}
Some(other) => {
return Err(AppError::bad_request(format!(
"unsupported role '{other}', expect one of: chord / chord_letters / chord_functional / numbered / staff"
)));
}
};
{
let conn = s.db.lock().unwrap();
let exists: bool = conn
.query_row(
"SELECT 1 FROM pieces WHERE id = ?1",
params![piece_id],
|_| Ok(true),
)
.optional()?
.unwrap_or(false);
if !exists {
return Err(AppError::NotFound);
}
}
let mut created: Vec<Value> = Vec::new();
while let Some(mut field) = form
.next_field()
.await
.map_err(|e| AppError::bad_request(format!("multipart: {e}")))?
{
let filename = field
.file_name()
.map(|s| s.to_string())
.unwrap_or_else(|| "untitled".to_string());
let mime = field
.content_type()
.map(|s| s.to_string())
.unwrap_or_else(|| "application/octet-stream".to_string());
let kind = classify(&mime).ok_or_else(|| {
AppError::bad_request(format!("unsupported mime '{mime}' for '{filename}'"))
})?;
// 占坑拿 attachment id —— 文件名用 id,能唯一确定路径。
let attachment_id = {
let conn = s.db.lock().unwrap();
conn.execute(
"INSERT INTO attachments (piece_id, kind, role, mime, filename, size_bytes, sort_order)
VALUES (?1, ?2, ?3, ?4, ?5, 0,
COALESCE((SELECT MAX(sort_order) FROM attachments WHERE piece_id = ?1), 0) + 1)",
params![piece_id, kind, role, mime, filename],
)?;
conn.last_insert_rowid()
};
let final_path = s.blobs_dir.join(attachment_id.to_string());
let tmp_path = s.blobs_dir.join(format!("{attachment_id}.tmp"));
let written: usize = match stream_to_file(&mut field, &tmp_path).await {
Ok(n) => n,
Err(e) => {
let _ = tokio::fs::remove_file(&tmp_path).await;
let conn = s.db.lock().unwrap();
let _ = conn.execute(
"DELETE FROM attachments WHERE id = ?1",
params![attachment_id],
);
return Err(e);
}
};
if let Err(e) = tokio::fs::rename(&tmp_path, &final_path).await {
let _ = tokio::fs::remove_file(&tmp_path).await;
let conn = s.db.lock().unwrap();
let _ = conn.execute(
"DELETE FROM attachments WHERE id = ?1",
params![attachment_id],
);
return Err(AppError::Io(e));
}
{
let conn = s.db.lock().unwrap();
conn.execute(
"UPDATE attachments SET size_bytes = ?1 WHERE id = ?2",
params![written as i64, attachment_id],
)?;
}
created.push(json!({
"id": attachment_id,
"kind": kind,
"role": role,
"mime": mime,
"filename": filename,
"size_bytes": written,
}));
}
if created.is_empty() {
return Err(AppError::bad_request("no files uploaded"));
}
Ok(JsonResp(json!({ "attachments": created })))
}
async fn stream_to_file(
field: &mut axum::extract::multipart::Field<'_>,
path: &std::path::Path,
) -> Result<usize, AppError> {
let mut file = tokio::fs::File::create(path).await.map_err(AppError::Io)?;
let mut total: usize = 0;
while let Some(chunk) = field
.chunk()
.await
.map_err(|e| AppError::bad_request(format!("upload read: {e}")))?
{
total += chunk.len();
if total > SINGLE_FILE_BYTES {
return Err(AppError::bad_request(format!(
"single file exceeds {SINGLE_FILE_BYTES} bytes"
)));
}
file.write_all(&chunk).await.map_err(AppError::Io)?;
}
file.flush().await.map_err(AppError::Io)?;
file.sync_all().await.map_err(AppError::Io)?;
Ok(total)
}
/// `GET /api/attachments/:id` — Range-aware 下载。
async fn get_attachment(
State(s): State<AppState>,
Path(id): Path<i64>,
req: Request<Body>,
) -> Result<Response, AppError> {
let row: Option<(String, String, String)> = {
let conn = s.db.lock().unwrap();
conn.query_row(
"SELECT mime, filename, kind FROM attachments WHERE id = ?1",
params![id],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
)
.optional()?
};
let (mime, filename, _kind) = row.ok_or(AppError::NotFound)?;
let path = s.blobs_dir.join(id.to_string());
let mime_hv: header::HeaderValue = mime
.parse()
.unwrap_or_else(|_| header::HeaderValue::from_static("application/octet-stream"));
let svc = tower_http::services::ServeFile::new(&path);
let mut resp = svc
.oneshot(req)
.await
.map_err(|e| AppError::Io(std::io::Error::other(e.to_string())))?
.into_response();
// 强一些的缓存头,video 拖动友好
resp.headers_mut()
.insert(header::CACHE_CONTROL, header::HeaderValue::from_static("private, max-age=3600"));
resp.headers_mut().insert(header::CONTENT_TYPE, mime_hv);
if let Ok(disp) = format!(
"inline; filename*=UTF-8''{}",
percent_encode(&filename)
)
.parse()
{
resp.headers_mut().insert(header::CONTENT_DISPOSITION, disp);
}
Ok(resp)
}
async fn delete_attachment(
State(s): State<AppState>,
Path(id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let n = {
let conn = s.db.lock().unwrap();
conn.execute("DELETE FROM attachments WHERE id = ?1", params![id])?
};
if n == 0 {
return Err(AppError::NotFound);
}
let _ = tokio::fs::remove_file(s.blobs_dir.join(id.to_string())).await;
Ok(JsonResp(json!({ "ok": true })))
}
// ---------- helpers ----------
fn classify(mime: &str) -> Option<&'static str> {
let m = mime.split(';').next().unwrap_or("").trim().to_ascii_lowercase();
if m.starts_with("video/") {
Some("video")
} else if m.starts_with("audio/") {
Some("audio")
} else if m == "application/pdf" {
Some("pdf")
} else if m.starts_with("image/") {
Some("image")
} else {
None
}
}
fn percent_encode(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for b in s.as_bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'.' | b'_' | b'~' => {
out.push(*b as char)
}
_ => out.push_str(&format!("%{:02X}", b)),
}
}
out
}
// ---------- error type ----------
enum AppError {
BadRequest(String),
NotFound,
Db(rusqlite::Error),
Io(std::io::Error),
Sidecar(String),
}
impl AppError {
fn bad_request(msg: impl Into<String>) -> Self {
Self::BadRequest(msg.into())
}
fn sidecar(msg: impl Into<String>) -> Self {
Self::Sidecar(msg.into())
}
}
impl From<rusqlite::Error> for AppError {
fn from(e: rusqlite::Error) -> Self {
Self::Db(e)
}
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
match self {
Self::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg).into_response(),
Self::NotFound => (StatusCode::NOT_FOUND, "not found").into_response(),
Self::Db(e) => {
tracing::error!(error = %e, "sqlite error");
(StatusCode::INTERNAL_SERVER_ERROR, "db error").into_response()
}
Self::Io(e) => {
tracing::error!(error = %e, "io error");
(StatusCode::INTERNAL_SERVER_ERROR, "io error").into_response()
}
Self::Sidecar(msg) => {
tracing::warn!(error = %msg, "chord sidecar");
(StatusCode::BAD_GATEWAY, msg).into_response()
}
}
}
}