From 4d88e80f1c67292a6dcf37c2f7377aee4614c9de Mon Sep 17 00:00:00 2001 From: Fam Zheng Date: Sun, 5 Apr 2026 08:20:32 +0100 Subject: [PATCH] add streaming responses, file transfer, remote deploy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Streaming: use claude --output-format stream-json, edit TG message every 5s with progress, show tool use status during execution, ◎ cursor indicator while processing - File transfer: download user uploads to ~/incoming/, scan ~/outgoing/{sid}/ for new files after claude completes - Error handling: wrap post-auth logic in handle_inner, all errors reply to user instead of silently failing - Remote deploy: make deploy-hera via SSH, generate service from template with dynamic PATH/REPO - Service: binary installed to ~/bin/noc, WorkingDirectory=%h - Invoke claude directly instead of ms wrapper - Session state persisted to disk across restarts --- .gitignore | 1 + Makefile | 21 ++- doc/todo.md | 8 +- noc.service.in | 2 +- src/main.rs | 414 ++++++++++++++++++++++++++++++++++++++++++++----- 5 files changed, 402 insertions(+), 44 deletions(-) diff --git a/.gitignore b/.gitignore index ac6b74c..f775163 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /target config.yaml +config.hera.yaml state.json noc.service diff --git a/Makefile b/Makefile index 75feeae..424d012 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,8 @@ REPO := $(shell pwd) +HERA := heradev +HERA_DIR := noc -.PHONY: build deploy +.PHONY: build deploy deploy-hera build: cargo build --release @@ -9,8 +11,23 @@ noc.service: noc.service.in sed -e 's|@REPO@|$(REPO)|g' -e 's|@PATH@|$(PATH)|g' $< > $@ deploy: build noc.service - mkdir -p ~/.config/systemd/user + mkdir -p ~/bin ~/.config/systemd/user + systemctl --user stop noc 2>/dev/null || true + cp target/release/noc ~/bin/ cp noc.service ~/.config/systemd/user/ systemctl --user daemon-reload systemctl --user enable --now noc systemctl --user restart noc + +deploy-hera: build + ssh $(HERA) 'mkdir -p ~/bin ~/$(HERA_DIR) ~/.config/systemd/user && systemctl --user stop noc 2>/dev/null || true' + scp target/release/noc $(HERA):~/bin/ + scp config.hera.yaml noc.service.in $(HERA):~/$(HERA_DIR)/ + ssh $(HERA) 'bash -lc "\ + cd ~/$(HERA_DIR) \ + && mv -f config.hera.yaml config.yaml \ + && sed -e \"s|@REPO@|\$$HOME/$(HERA_DIR)|g\" -e \"s|@PATH@|\$$PATH|g\" noc.service.in > ~/.config/systemd/user/noc.service \ + && systemctl --user daemon-reload \ + && systemctl --user enable --now noc \ + && systemctl --user restart noc \ + && systemctl --user status noc"' diff --git a/doc/todo.md b/doc/todo.md index 4f46a9e..701f7d0 100644 --- a/doc/todo.md +++ b/doc/todo.md @@ -1,8 +1,10 @@ # TODO -- [ ] Streaming responses — edit message as `ms` output arrives instead of waiting for full completion -- [ ] Markdown formatting — parse `ms` output and send with TG MarkdownV2 -- [ ] Timeout handling — kill `ms` if it hangs beyond a threshold +- [ ] Streaming responses — edit message as claude output arrives instead of waiting for full completion +- [ ] Markdown formatting — parse claude output and send with TG MarkdownV2 +- [ ] Timeout handling — kill claude if it hangs beyond a threshold - [ ] Graceful shutdown on SIGTERM - [ ] `/reset` command to force new session without waiting for 5am - [ ] Rate limiting per chat +- [ ] Voice message support — STT (whisper.cpp) → text → claude +- [ ] Video/audio file transcription diff --git a/noc.service.in b/noc.service.in index 65c2044..c0021dd 100644 --- a/noc.service.in +++ b/noc.service.in @@ -6,7 +6,7 @@ Wants=network-online.target [Service] Type=simple WorkingDirectory=%h -ExecStart=@REPO@/target/release/noc +ExecStart=%h/bin/noc Restart=on-failure RestartSec=5 Environment=RUST_LOG=noc=info diff --git a/src/main.rs b/src/main.rs index f1e8eca..c6653f5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ use std::collections::{HashMap, HashSet}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::process::Stdio; use std::sync::Arc; @@ -7,10 +7,13 @@ use anyhow::Result; use chrono::{Local, NaiveDate, NaiveTime}; use serde::{Deserialize, Serialize}; use teloxide::dispatching::UpdateFilterExt; +use teloxide::net::Download; use teloxide::prelude::*; -use teloxide::types::ChatAction; +use teloxide::types::InputFile; +use tokio::io::AsyncBufReadExt; use tokio::process::Command; use tokio::sync::RwLock; +use tokio::time::Instant; use tracing::{error, info, warn}; use uuid::Uuid; @@ -92,6 +95,18 @@ fn session_uuid(chat_id: i64, refresh_hour: u32) -> String { Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()).to_string() } +fn home_dir() -> PathBuf { + PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/tmp".into())) +} + +fn incoming_dir() -> PathBuf { + home_dir().join("incoming") +} + +fn outgoing_dir(sid: &str) -> PathBuf { + home_dir().join("outgoing").join(sid) +} + // ── main ──────────────────────────────────────────────────────────── #[tokio::main] @@ -114,6 +129,8 @@ async fn main() { .unwrap_or_else(|_| PathBuf::from("state.json")); let state = Arc::new(AppState::load(state_path)); + let _ = std::fs::create_dir_all(incoming_dir()); + info!("noc bot starting"); let bot = Bot::new(&config.tg.key); @@ -127,6 +144,50 @@ async fn main() { .await; } +// ── file download ─────────────────────────────────────────────────── + +async fn download_tg_file(bot: &Bot, file_id: &str, filename: &str) -> Result { + let dir = incoming_dir(); + tokio::fs::create_dir_all(&dir).await?; + + let dest = dir.join(filename); + let tf = bot.get_file(file_id).await?; + let mut file = tokio::fs::File::create(&dest).await?; + bot.download_file(&tf.path, &mut file).await?; + + info!("downloaded {} -> {}", filename, dest.display()); + Ok(dest) +} + +// ── outgoing scan ─────────────────────────────────────────────────── + +async fn snapshot_dir(dir: &Path) -> HashSet { + let mut set = HashSet::new(); + if let Ok(mut entries) = tokio::fs::read_dir(dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let path = entry.path(); + if path.is_file() { + set.insert(path); + } + } + } + set +} + +async fn new_files_in(dir: &Path, before: &HashSet) -> Vec { + let mut files = Vec::new(); + if let Ok(mut entries) = tokio::fs::read_dir(dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let path = entry.path(); + if path.is_file() && !before.contains(&path) { + files.push(path); + } + } + } + files.sort(); + files +} + // ── handler ───────────────────────────────────────────────────────── async fn handle( @@ -135,16 +196,11 @@ async fn handle( state: Arc, config: Arc, ) -> ResponseResult<()> { - let text = match msg.text() { - Some(t) => t, - None => return Ok(()), - }; - let chat_id = msg.chat.id; + let text = msg.text().or(msg.caption()).unwrap_or("").to_string(); let raw_id = chat_id.0; let date = session_date(config.session.refresh_hour); - // auth gate let is_authed = { let p = state.persist.read().await; p.authed.get(&raw_id) == Some(&date) @@ -165,76 +221,358 @@ async fn handle( return Ok(()); } - let sid = session_uuid(raw_id, config.session.refresh_hour); + if let Err(e) = handle_inner(&bot, &msg, chat_id, &text, &state, &config).await { + error!(chat = raw_id, "handle: {e:#}"); + let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; + } + + Ok(()) +} + +async fn handle_inner( + bot: &Bot, + msg: &Message, + chat_id: ChatId, + text: &str, + state: &Arc, + config: &Arc, +) -> Result<()> { + let mut uploaded: Vec = Vec::new(); + let mut download_errors: Vec = Vec::new(); + + if let Some(doc) = msg.document() { + let name = doc.file_name.as_deref().unwrap_or("file"); + match download_tg_file(bot, &doc.file.id, name).await { + Ok(p) => uploaded.push(p), + Err(e) => download_errors.push(format!("{name}: {e:#}")), + } + } + + if let Some(photos) = msg.photo() { + if let Some(photo) = photos.last() { + let name = format!("photo_{}.jpg", Local::now().format("%H%M%S")); + match download_tg_file(bot, &photo.file.id, &name).await { + Ok(p) => uploaded.push(p), + Err(e) => download_errors.push(format!("photo: {e:#}")), + } + } + } + + if text.is_empty() && uploaded.is_empty() { + if !download_errors.is_empty() { + let err_msg = format!("[文件下载失败]\n{}", download_errors.join("\n")); + bot.send_message(chat_id, err_msg).await?; + } + return Ok(()); + } + + let sid = session_uuid(chat_id.0, config.session.refresh_hour); info!(%sid, "recv"); - let _ = bot.send_chat_action(chat_id, ChatAction::Typing).await; + let out_dir = outgoing_dir(&sid); + tokio::fs::create_dir_all(&out_dir).await?; + let before = snapshot_dir(&out_dir).await; + + let prompt = build_prompt(text, &uploaded, &download_errors, &out_dir); let known = state.persist.read().await.known_sessions.contains(&sid); - let reply = match invoke_claude(&sid, text, known).await { - Ok(out) => { + let result = invoke_claude_streaming(&sid, &prompt, known, bot, chat_id).await; + + match &result { + Ok(_) => { if !known { state.persist.write().await.known_sessions.insert(sid.clone()); state.save().await; } - out } Err(e) => { error!(%sid, "claude: {e:#}"); - format!("[error] {e:#}") + let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; } - }; - - if reply.is_empty() { - return Ok(()); } - for chunk in split_msg(&reply, 4096) { - bot.send_message(chat_id, chunk).await?; + // send new files from outgoing dir + let new_files = new_files_in(&out_dir, &before).await; + for path in &new_files { + info!(%sid, "sending file: {}", path.display()); + if let Err(e) = bot.send_document(chat_id, InputFile::file(path)).await { + error!(%sid, "send_document {}: {e:#}", path.display()); + let _ = bot + .send_message(chat_id, format!("[发送文件失败: {}]", path.display())) + .await; + } } + Ok(()) } -// ── claude bridge ─────────────────────────────────────────────────── +fn build_prompt(text: &str, uploaded: &[PathBuf], errors: &[String], out_dir: &Path) -> String { + let mut parts = Vec::new(); -async fn invoke_claude(sid: &str, prompt: &str, known: bool) -> Result { - if known { - return run_claude(&["--resume", sid], prompt).await; + for f in uploaded { + parts.push(format!("[用户上传了文件: {}]", f.display())); } - // session might exist from before restart — try resume first - match run_claude(&["--resume", sid], prompt).await { + for e in errors { + parts.push(format!("[文件下载失败: {e}]")); + } + + if !text.is_empty() { + parts.push(text.to_string()); + } + + parts.push(format!( + "\n[系统提示: 如果需要发送文件给用户,将文件写入 {} 目录]", + out_dir.display() + )); + + parts.join("\n") +} + +// ── claude bridge (streaming) ─────────────────────────────────────── + +/// Stream JSON event types we care about. +#[derive(Deserialize)] +struct StreamEvent { + #[serde(rename = "type")] + event_type: String, + message: Option, + result: Option, + #[serde(default)] + is_error: bool, +} + +#[derive(Deserialize)] +struct AssistantMessage { + content: Vec, +} + +#[derive(Deserialize)] +struct ContentBlock { + #[serde(rename = "type")] + block_type: String, + text: Option, + name: Option, + input: Option, +} + +/// Extract all text from an assistant message's content blocks. +fn extract_text(msg: &AssistantMessage) -> String { + msg.content + .iter() + .filter(|b| b.block_type == "text") + .filter_map(|b| b.text.as_deref()) + .collect::>() + .join("") +} + +/// Extract tool use status line, e.g. "Bash: echo hello" +fn extract_tool_use(msg: &AssistantMessage) -> Option { + for block in &msg.content { + if block.block_type == "tool_use" { + let name = block.name.as_deref().unwrap_or("tool"); + let detail = block + .input + .as_ref() + .and_then(|v| { + // try common fields: command, pattern, file_path, query + v.get("command") + .or(v.get("pattern")) + .or(v.get("file_path")) + .or(v.get("query")) + .or(v.get("prompt")) + .and_then(|s| s.as_str()) + }) + .unwrap_or(""); + let detail_short = truncate_at_char_boundary(detail, 80); + return Some(format!("{name}: {detail_short}")); + } + } + None +} + +const EDIT_INTERVAL_MS: u64 = 5000; +const TG_MSG_LIMIT: usize = 4096; + +async fn invoke_claude_streaming( + sid: &str, + prompt: &str, + known: bool, + bot: &Bot, + chat_id: ChatId, +) -> Result { + if known { + return run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await; + } + + match run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await { Ok(out) => { info!(%sid, "resumed existing session"); Ok(out) } Err(e) => { warn!(%sid, "resume failed ({e:#}), creating new session"); - run_claude(&["--session-id", sid], prompt).await + run_claude_streaming(&["--session-id", sid], prompt, bot, chat_id).await } } } -async fn run_claude(extra_args: &[&str], prompt: &str) -> Result { - let mut args: Vec<&str> = vec!["--dangerously-skip-permissions", "-p"]; +async fn run_claude_streaming( + extra_args: &[&str], + prompt: &str, + bot: &Bot, + chat_id: ChatId, +) -> Result { + let mut args: Vec<&str> = vec![ + "--dangerously-skip-permissions", + "-p", + "--output-format", + "stream-json", + "--verbose", + ]; args.extend(extra_args); args.push(prompt); - let out = Command::new("claude") + let mut child = Command::new("claude") .args(&args) .stdout(Stdio::piped()) .stderr(Stdio::piped()) - .spawn()? - .wait_with_output() - .await?; + .spawn()?; - if out.status.success() { - Ok(String::from_utf8_lossy(&out.stdout).to_string()) - } else { - let err = String::from_utf8_lossy(&out.stderr); - anyhow::bail!("exit {}: {err}", out.status) + let stdout = child.stdout.take().unwrap(); + let mut lines = tokio::io::BufReader::new(stdout).lines(); + + // send placeholder immediately so user knows we're on it + let mut msg_id: Option = match bot.send_message(chat_id, CURSOR).await { + Ok(sent) => Some(sent.id), + Err(_) => None, + }; + let mut last_sent_text = String::new(); + let mut last_edit = Instant::now(); + let mut final_result = String::new(); + let mut is_error = false; + let mut tool_status = String::new(); // current tool use status line + + while let Ok(Some(line)) = lines.next_line().await { + let event: StreamEvent = match serde_json::from_str(&line) { + Ok(e) => e, + Err(_) => continue, + }; + + match event.event_type.as_str() { + "assistant" => { + if let Some(msg) = &event.message { + // check for tool use — show status + if let Some(status) = extract_tool_use(msg) { + tool_status = format!("[{status}]"); + let display = if last_sent_text.is_empty() { + tool_status.clone() + } else { + format!("{last_sent_text}\n\n{tool_status}") + }; + let display = truncate_for_display(&display); + + if let Some(id) = msg_id { + let _ = bot.edit_message_text(chat_id, id, &display).await; + } else if let Ok(sent) = bot.send_message(chat_id, &display).await { + msg_id = Some(sent.id); + } + last_edit = Instant::now(); + continue; + } + + // check for text content + let text = extract_text(msg); + if text.is_empty() || text == last_sent_text { + continue; + } + + // throttle edits + if last_edit.elapsed().as_millis() < EDIT_INTERVAL_MS as u128 { + continue; + } + + tool_status.clear(); + let display = truncate_for_display(&text); + + if let Some(id) = msg_id { + if bot.edit_message_text(chat_id, id, &display).await.is_ok() { + last_sent_text = text; + last_edit = Instant::now(); + } + } else if let Ok(sent) = bot.send_message(chat_id, &display).await { + msg_id = Some(sent.id); + last_sent_text = text; + last_edit = Instant::now(); + } + } + } + "result" => { + final_result = event.result.unwrap_or_default(); + is_error = event.is_error; + } + _ => {} + } } + + let _ = child.wait().await; + + if is_error { + // clean up streaming message if we sent one + if let Some(id) = msg_id { + let _ = bot + .edit_message_text(chat_id, id, format!("[error] {final_result}")) + .await; + } + anyhow::bail!("{final_result}"); + } + + if final_result.is_empty() { + return Ok(final_result); + } + + // final update: replace streaming message with complete result + let chunks: Vec<&str> = split_msg(&final_result, TG_MSG_LIMIT); + + if let Some(id) = msg_id { + // edit first message with final text + let _ = bot.edit_message_text(chat_id, id, chunks[0]).await; + // send remaining chunks as new messages + for chunk in &chunks[1..] { + let _ = bot.send_message(chat_id, *chunk).await; + } + } else { + // never got to send a streaming message, send all now + for chunk in &chunks { + let _ = bot.send_message(chat_id, *chunk).await; + } + } + + Ok(final_result) +} + +const CURSOR: &str = " \u{25CE}"; + +fn truncate_for_display(s: &str) -> String { + let budget = TG_MSG_LIMIT - CURSOR.len() - 1; + if s.len() <= budget { + format!("{s}{CURSOR}") + } else { + let truncated = truncate_at_char_boundary(s, budget - 2); + format!("{truncated}\n…{CURSOR}") + } +} + +fn truncate_at_char_boundary(s: &str, max: usize) -> &str { + if s.len() <= max { + return s; + } + let mut end = max; + while !s.is_char_boundary(end) { + end -= 1; + } + &s[..end] } fn split_msg(s: &str, max: usize) -> Vec<&str> {