From f646391f1450911ed8512d91779a948d8b1c2fe6 Mon Sep 17 00:00:00 2001 From: Fam Zheng Date: Fri, 10 Apr 2026 16:54:39 +0000 Subject: [PATCH] extract Output trait: decouple AI core from Telegram MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add src/output.rs with Output trait and 3 implementations: TelegramOutput (streaming via draft/edit), GiteaOutput (comments), BufferOutput (for worker/tests) - Refactor run_openai_with_tools and execute_tool to use &mut dyn Output - Remove run_claude_streaming, invoke_claude_streaming, run_openai_streaming (dead code — only OpenAI-compatible backend is used now) - Remove BackendConfig::Claude code path from handler - stream.rs: 790 → 150 lines --- Cargo.lock | 12 ++ Cargo.toml | 1 + src/life.rs | 6 +- src/main.rs | 209 +++++++++------------ src/output.rs | 207 +++++++++++++++++++++ src/stream.rs | 494 +------------------------------------------------- src/tools.rs | 117 +++--------- 7 files changed, 344 insertions(+), 702 deletions(-) create mode 100644 src/output.rs diff --git a/Cargo.lock b/Cargo.lock index cf653dc..fc39022 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -51,6 +51,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1045,6 +1056,7 @@ name = "noc" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "axum", "base64 0.22.1", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 3ca2c3b..9e4f3df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] anyhow = "1" +async-trait = "0.1" axum = "0.8" base64 = "0.22" chrono = { version = "0.4", features = ["serde"] } diff --git a/src/life.rs b/src/life.rs index f4a4b2d..3c81cea 100644 --- a/src/life.rs +++ b/src/life.rs @@ -4,6 +4,7 @@ use teloxide::prelude::*; use tracing::{error, info, warn}; use crate::config::{BackendConfig, Config}; +use crate::output::TelegramOutput; use crate::state::AppState; use crate::stream::run_openai_with_tools; use crate::tools::compute_next_cron_fire; @@ -62,12 +63,13 @@ pub async fn life_loop(bot: Bot, state: Arc, config: Arc) { } = config.backend { let sid = format!("life-{chat_id_raw}"); + let mut tg_output = TelegramOutput::new(bot.clone(), chat_id, true); let result = tokio::time::timeout( std::time::Duration::from_secs(LIFE_LOOP_TIMEOUT_SECS), run_openai_with_tools( - endpoint, model, api_key, messages, &bot, chat_id, &state, &sid, - &config, true, + endpoint, model, api_key, messages, &mut tg_output, &state, &sid, + &config, *chat_id_raw, ), ) .await; diff --git a/src/main.rs b/src/main.rs index a9535a8..2e559a2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod config; mod display; mod gitea; mod life; +mod output; mod state; mod stream; mod tools; @@ -21,12 +22,9 @@ use uuid::Uuid; use config::{BackendConfig, Config}; use display::build_user_content; +use output::TelegramOutput; use state::{AppState, MAX_WINDOW, SLIDE_SIZE}; -use stream::{ - build_system_prompt, invoke_claude_streaming, run_claude_streaming, run_openai_with_tools, - summarize_messages, -}; -use tools::discover_tools; +use stream::{build_system_prompt, run_openai_with_tools, summarize_messages}; // ── helpers ───────────────────────────────────────────────────────── @@ -315,7 +313,7 @@ async fn handle_inner( let count = state.message_count(&sid).await; let persona = state.get_config("persona").await.unwrap_or_default(); let scratch = state.get_scratch().await; - let tools = discover_tools(); + let tools = tools::discover_tools(); let empty = vec![]; let tools_arr = tools.as_array().unwrap_or(&empty); @@ -373,126 +371,97 @@ async fn handle_inner( } } - // handle "cc" prefix: pass directly to claude -p, no session, no history - if let Some(cc_prompt) = text.strip_prefix("cc").map(|s| s.trim_start()) { - if !cc_prompt.is_empty() { - info!(%sid, "cc passthrough"); - let prompt = build_prompt(cc_prompt, &uploaded, &download_errors, &transcriptions); - match run_claude_streaming(&[], &prompt, bot, chat_id).await { - Ok(_) => {} - Err(e) => { - error!(%sid, "cc claude: {e:#}"); - let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; - } - } - return Ok(()); - } - } - let prompt = build_prompt(text, &uploaded, &download_errors, &transcriptions); - match &config.backend { - BackendConfig::Claude => { - let known = state.persist.read().await.known_sessions.contains(&sid); - let result = - invoke_claude_streaming(&sid, &prompt, known, bot, chat_id).await; - match &result { - Ok(_) => { - if !known { - state.persist.write().await.known_sessions.insert(sid.clone()); - state.save().await; + let BackendConfig::OpenAI { + endpoint, + model, + api_key, + } = &config.backend + else { + let _ = bot.send_message(chat_id, "Only OpenAI backend is supported").await; + return Ok(()); + }; + + let conv = state.load_conv(&sid).await; + let persona = state.get_config("persona").await.unwrap_or_default(); + let memory_slots = state.get_memory_slots().await; + let inner = state.get_inner_state().await; + let system_msg = build_system_prompt(&conv.summary, &persona, &memory_slots, &inner); + + let mut api_messages = vec![system_msg]; + api_messages.extend(conv.messages); + + let scratch = state.get_scratch().await; + let user_content = build_user_content(&prompt, &scratch, &uploaded); + api_messages.push(serde_json::json!({"role": "user", "content": user_content})); + + let mut tg_output = TelegramOutput::new(bot.clone(), chat_id, is_private); + + match run_openai_with_tools( + endpoint, model, api_key, api_messages, &mut tg_output, state, &sid, config, chat_id.0, + ) + .await + { + Ok(response) => { + state.push_message(&sid, "user", &prompt).await; + if !response.is_empty() { + state.push_message(&sid, "assistant", &response).await; + } + + // sliding window + let count = state.message_count(&sid).await; + if count >= MAX_WINDOW { + info!(%sid, "sliding window: {count} messages, summarizing oldest {SLIDE_SIZE}"); + let _ = bot + .send_message(chat_id, "[整理记忆中...]") + .await; + + let to_summarize = + state.get_oldest_messages(&sid, SLIDE_SIZE).await; + let current_summary = { + let db = state.db.lock().await; + db.query_row( + "SELECT summary FROM conversations WHERE session_id = ?1", + [&sid], + |row| row.get::<_, String>(0), + ) + .unwrap_or_default() + }; + + match summarize_messages( + endpoint, + model, + api_key, + ¤t_summary, + &to_summarize, + ) + .await + { + Ok(new_summary) => { + state.slide_window(&sid, &new_summary, SLIDE_SIZE).await; + let remaining = state.message_count(&sid).await; + info!(%sid, "window slid, {remaining} messages remain, summary {} chars", new_summary.len()); + } + Err(e) => { + warn!(%sid, "summarize failed: {e:#}, keeping all messages"); } } - Err(e) => { - error!(%sid, "claude: {e:#}"); - let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; - } + } + + // auto-reflect every 10 messages + let count = state.message_count(&sid).await; + if count % 10 == 0 && count > 0 { + let state_c = state.clone(); + let config_c = config.clone(); + tokio::spawn(async move { + crate::life::reflect(&state_c, &config_c).await; + }); } } - BackendConfig::OpenAI { - endpoint, - model, - api_key, - } => { - let conv = state.load_conv(&sid).await; - let persona = state.get_config("persona").await.unwrap_or_default(); - let memory_slots = state.get_memory_slots().await; - let inner = state.get_inner_state().await; - let system_msg = build_system_prompt(&conv.summary, &persona, &memory_slots, &inner); - - let mut api_messages = vec![system_msg]; - api_messages.extend(conv.messages); - - let scratch = state.get_scratch().await; - let user_content = build_user_content(&prompt, &scratch, &uploaded); - api_messages.push(serde_json::json!({"role": "user", "content": user_content})); - - match run_openai_with_tools( - endpoint, model, api_key, api_messages, bot, chat_id, state, &sid, config, is_private, - ) - .await - { - Ok(response) => { - state.push_message(&sid, "user", &prompt).await; - if !response.is_empty() { - state.push_message(&sid, "assistant", &response).await; - } - - // sliding window - let count = state.message_count(&sid).await; - if count >= MAX_WINDOW { - info!(%sid, "sliding window: {count} messages, summarizing oldest {SLIDE_SIZE}"); - let _ = bot - .send_message(chat_id, "[整理记忆中...]") - .await; - - let to_summarize = - state.get_oldest_messages(&sid, SLIDE_SIZE).await; - let current_summary = { - let db = state.db.lock().await; - db.query_row( - "SELECT summary FROM conversations WHERE session_id = ?1", - [&sid], - |row| row.get::<_, String>(0), - ) - .unwrap_or_default() - }; - - match summarize_messages( - endpoint, - model, - api_key, - ¤t_summary, - &to_summarize, - ) - .await - { - Ok(new_summary) => { - state.slide_window(&sid, &new_summary, SLIDE_SIZE).await; - let remaining = state.message_count(&sid).await; - info!(%sid, "window slid, {remaining} messages remain, summary {} chars", new_summary.len()); - } - Err(e) => { - warn!(%sid, "summarize failed: {e:#}, keeping all messages"); - } - } - } - - // auto-reflect every 10 messages - let count = state.message_count(&sid).await; - if count % 10 == 0 && count > 0 { - let state_c = state.clone(); - let config_c = config.clone(); - tokio::spawn(async move { - crate::life::reflect(&state_c, &config_c).await; - }); - } - } - Err(e) => { - error!(%sid, "openai: {e:#}"); - let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; - } - } + Err(e) => { + error!(%sid, "openai: {e:#}"); + let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; } } diff --git a/src/output.rs b/src/output.rs new file mode 100644 index 0000000..916e935 --- /dev/null +++ b/src/output.rs @@ -0,0 +1,207 @@ +use anyhow::Result; +use async_trait::async_trait; +use std::path::Path; + +/// Output trait — abstraction over where AI responses go. +/// +/// Implementations: +/// - TelegramOutput: send/edit messages in Telegram chat +/// - GiteaOutput: post comments on issues/PRs +/// - BufferOutput: collect text in memory (for Worker, tests) +#[async_trait] +pub trait Output: Send + Sync { + /// Send or update streaming text. Called repeatedly as tokens arrive. + /// Implementation decides whether to create new message or edit existing one. + async fn stream_update(&mut self, text: &str) -> Result<()>; + + /// Finalize the message — called once when streaming is done. + async fn finalize(&mut self, text: &str) -> Result<()>; + + /// Send a status/notification line (e.g. "[tool: bash] running...") + async fn status(&self, text: &str) -> Result<()>; + + /// Send a file. Returns Ok(true) if sent, Ok(false) if not supported. + async fn send_file(&self, path: &Path, caption: &str) -> Result; +} + +// ── Telegram ─────────────────────────────────────────────────────── + +use teloxide::prelude::*; +use teloxide::types::InputFile; +use tokio::time::Instant; + +use crate::display::{truncate_at_char_boundary, truncate_for_display}; +use crate::stream::{send_message_draft, DRAFT_INTERVAL_MS, EDIT_INTERVAL_MS, TG_MSG_LIMIT}; + +pub struct TelegramOutput { + pub bot: Bot, + pub chat_id: ChatId, + pub is_private: bool, + // internal state + msg_id: Option, + use_draft: bool, + last_edit: Instant, + http: reqwest::Client, +} + +impl TelegramOutput { + pub fn new(bot: Bot, chat_id: ChatId, is_private: bool) -> Self { + Self { + bot, + chat_id, + is_private, + msg_id: None, + use_draft: is_private, + last_edit: Instant::now(), + http: reqwest::Client::new(), + } + } +} + +#[async_trait] +impl Output for TelegramOutput { + async fn stream_update(&mut self, text: &str) -> Result<()> { + let interval = if self.use_draft { + DRAFT_INTERVAL_MS + } else { + EDIT_INTERVAL_MS + }; + if self.last_edit.elapsed().as_millis() < interval as u128 { + return Ok(()); + } + + let display = if self.use_draft { + truncate_at_char_boundary(text, TG_MSG_LIMIT).to_string() + } else { + truncate_for_display(text) + }; + + if self.use_draft { + let token = self.bot.token().to_owned(); + match send_message_draft(&self.http, &token, self.chat_id.0, 1, &display).await { + Ok(_) => { + self.last_edit = Instant::now(); + } + Err(e) => { + tracing::warn!("sendMessageDraft failed, falling back: {e:#}"); + self.use_draft = false; + if let Ok(sent) = self.bot.send_message(self.chat_id, &display).await { + self.msg_id = Some(sent.id); + self.last_edit = Instant::now(); + } + } + } + } else if let Some(id) = self.msg_id { + if self + .bot + .edit_message_text(self.chat_id, id, &display) + .await + .is_ok() + { + self.last_edit = Instant::now(); + } + } else if let Ok(sent) = self.bot.send_message(self.chat_id, &display).await { + self.msg_id = Some(sent.id); + self.last_edit = Instant::now(); + } + + Ok(()) + } + + async fn finalize(&mut self, text: &str) -> Result<()> { + crate::display::send_final_result( + &self.bot, + self.chat_id, + self.msg_id, + self.use_draft, + text, + ) + .await; + Ok(()) + } + + async fn status(&self, text: &str) -> Result<()> { + let _ = self.bot.send_message(self.chat_id, text).await; + Ok(()) + } + + async fn send_file(&self, path: &Path, caption: &str) -> Result { + let input_file = InputFile::file(path); + let mut req = self.bot.send_document(self.chat_id, input_file); + if !caption.is_empty() { + req = req.caption(caption); + } + req.await?; + Ok(true) + } +} + +// ── Gitea ────────────────────────────────────────────────────────── + +use crate::gitea::GiteaClient; + +pub struct GiteaOutput { + pub client: GiteaClient, + pub owner: String, + pub repo: String, + pub issue_nr: u64, +} + +#[async_trait] +impl Output for GiteaOutput { + async fn stream_update(&mut self, _text: &str) -> Result<()> { + // Gitea comments don't support streaming — just accumulate + Ok(()) + } + + async fn finalize(&mut self, text: &str) -> Result<()> { + self.client + .post_comment(&self.owner, &self.repo, self.issue_nr, text) + .await + } + + async fn status(&self, _text: &str) -> Result<()> { + // No status updates for Gitea + Ok(()) + } + + async fn send_file(&self, _path: &Path, _caption: &str) -> Result { + // Gitea comments can't send files directly + Ok(false) + } +} + +// ── Buffer (for Worker, tests) ───────────────────────────────────── + +pub struct BufferOutput { + pub text: String, +} + +impl BufferOutput { + pub fn new() -> Self { + Self { + text: String::new(), + } + } +} + +#[async_trait] +impl Output for BufferOutput { + async fn stream_update(&mut self, text: &str) -> Result<()> { + self.text = text.to_string(); + Ok(()) + } + + async fn finalize(&mut self, text: &str) -> Result<()> { + self.text = text.to_string(); + Ok(()) + } + + async fn status(&self, _text: &str) -> Result<()> { + Ok(()) + } + + async fn send_file(&self, _path: &Path, _caption: &str) -> Result { + Ok(false) + } +} diff --git a/src/stream.rs b/src/stream.rs index d47bc55..46e5327 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,18 +1,11 @@ -use std::process::Stdio; use std::sync::Arc; use anyhow::Result; -use serde::Deserialize; -use teloxide::prelude::*; -use tokio::io::AsyncBufReadExt; -use tokio::process::Command; -use tokio::time::Instant; use tracing::{error, info, warn}; use crate::config::Config; -use crate::display::{ - send_final_result, truncate_at_char_boundary, truncate_for_display, -}; +use crate::display::truncate_at_char_boundary; +use crate::output::Output; use crate::state::AppState; use crate::tools::{discover_tools, execute_tool, ToolCall}; @@ -21,66 +14,6 @@ pub const DRAFT_INTERVAL_MS: u64 = 1000; pub const TG_MSG_LIMIT: usize = 4096; pub const CURSOR: &str = " \u{25CE}"; -/// Stream JSON event types we care about. -#[derive(Deserialize)] -pub struct StreamEvent { - #[serde(rename = "type")] - pub event_type: String, - pub message: Option, - pub result: Option, - #[serde(default)] - pub is_error: bool, -} - -#[derive(Deserialize)] -pub struct AssistantMessage { - pub content: Vec, -} - -#[derive(Deserialize)] -pub struct ContentBlock { - #[serde(rename = "type")] - pub block_type: String, - pub text: Option, - pub name: Option, - pub input: Option, -} - -/// Extract all text from an assistant message's content blocks. -pub fn extract_text(msg: &AssistantMessage) -> String { - msg.content - .iter() - .filter(|b| b.block_type == "text") - .filter_map(|b| b.text.as_deref()) - .collect::>() - .join("") -} - -/// Extract tool use status line, e.g. "Bash: echo hello" -pub fn extract_tool_use(msg: &AssistantMessage) -> Option { - for block in &msg.content { - if block.block_type == "tool_use" { - let name = block.name.as_deref().unwrap_or("tool"); - let detail = block - .input - .as_ref() - .and_then(|v| { - // try common fields: command, pattern, file_path, query - v.get("command") - .or(v.get("pattern")) - .or(v.get("file_path")) - .or(v.get("query")) - .or(v.get("prompt")) - .and_then(|s| s.as_str()) - }) - .unwrap_or(""); - let detail_short = truncate_at_char_boundary(detail, 80); - return Some(format!("{name}: {detail_short}")); - } - } - None -} - pub async fn send_message_draft( client: &reqwest::Client, token: &str, @@ -113,12 +46,11 @@ pub async fn run_openai_with_tools( model: &str, api_key: &str, mut messages: Vec, - bot: &Bot, - chat_id: ChatId, + output: &mut dyn Output, state: &Arc, sid: &str, config: &Arc, - is_private: bool, + chat_id: i64, ) -> Result { let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(120)) @@ -149,7 +81,6 @@ pub async fn run_openai_with_tools( if !resp_raw.status().is_success() { let status = resp_raw.status(); let body_text = resp_raw.text().await.unwrap_or_default(); - // dump messages for debugging for (i, m) in messages.iter().enumerate() { let role = m["role"].as_str().unwrap_or("?"); let content_len = m["content"].as_str().map(|s| s.len()).unwrap_or(0); @@ -162,15 +93,7 @@ pub async fn run_openai_with_tools( } let mut resp = resp_raw; - - let token = bot.token().to_owned(); - let raw_chat_id = chat_id.0; - let draft_id: i64 = 1; - let mut use_draft = is_private; // sendMessageDraft only works in private chats - - let mut msg_id: Option = None; let mut accumulated = String::new(); - let mut last_edit = Instant::now(); let mut buffer = String::new(); let mut done = false; @@ -206,14 +129,12 @@ pub async fn run_openai_with_tools( if let Ok(json) = serde_json::from_str::(data) { let delta = &json["choices"][0]["delta"]; - // handle content delta if let Some(content) = delta["content"].as_str() { if !content.is_empty() { accumulated.push_str(content); } } - // handle tool call delta if let Some(tc_arr) = delta["tool_calls"].as_array() { has_tool_calls = true; for tc in tc_arr { @@ -237,70 +158,15 @@ pub async fn run_openai_with_tools( } } - // display update (only when there's content to show) - if accumulated.is_empty() { - continue; + if !accumulated.is_empty() { + let _ = output.stream_update(&accumulated).await; } - - { - - let interval = if use_draft { - DRAFT_INTERVAL_MS - } else { - EDIT_INTERVAL_MS - }; - if last_edit.elapsed().as_millis() < interval as u128 { - continue; - } - - let display = if use_draft { - truncate_at_char_boundary(&accumulated, TG_MSG_LIMIT).to_string() - } else { - truncate_for_display(&accumulated) - }; - - if use_draft { - match send_message_draft( - &client, &token, raw_chat_id, draft_id, &display, - ) - .await - { - Ok(_) => { - last_edit = Instant::now(); - } - Err(e) => { - warn!("sendMessageDraft failed, falling back: {e:#}"); - use_draft = false; - if let Ok(sent) = - bot.send_message(chat_id, &display).await - { - msg_id = Some(sent.id); - last_edit = Instant::now(); - } - } - } - } else if let Some(id) = msg_id { - if bot - .edit_message_text(chat_id, id, &display) - .await - .is_ok() - { - last_edit = Instant::now(); - } - } else if let Ok(sent) = - bot.send_message(chat_id, &display).await - { - msg_id = Some(sent.id); - last_edit = Instant::now(); - } - } // end display block } } } // decide what to do based on response type if has_tool_calls && !tool_calls.is_empty() { - // append assistant message with tool calls let tc_json: Vec = tool_calls .iter() .map(|tc| { @@ -322,15 +188,14 @@ pub async fn run_openai_with_tools( }); messages.push(assistant_msg); - // execute each tool for tc in &tool_calls { info!(tool = %tc.name, "executing tool call"); - let _ = bot - .send_message(chat_id, format!("[{}({})]", tc.name, truncate_at_char_boundary(&tc.arguments, 100))) + let _ = output + .status(&format!("[{}({})]", tc.name, truncate_at_char_boundary(&tc.arguments, 100))) .await; let result = - execute_tool(&tc.name, &tc.arguments, state, bot, chat_id, sid, config) + execute_tool(&tc.name, &tc.arguments, state, output, sid, config, chat_id) .await; messages.push(serde_json::json!({ @@ -340,357 +205,18 @@ pub async fn run_openai_with_tools( })); } - // clear display state for next round tool_calls.clear(); - // loop back to call API again continue; } - // content response — send final result if !accumulated.is_empty() { - send_final_result(bot, chat_id, msg_id, use_draft, &accumulated).await; + let _ = output.finalize(&accumulated).await; } return Ok(accumulated); } } -// ── claude bridge (streaming) ─────────────────────────────────────── - -pub async fn invoke_claude_streaming( - sid: &str, - prompt: &str, - known: bool, - bot: &Bot, - chat_id: ChatId, -) -> Result { - if known { - return run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await; - } - - match run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await { - Ok(out) => { - info!(%sid, "resumed existing session"); - Ok(out) - } - Err(e) => { - warn!(%sid, "resume failed ({e:#}), creating new session"); - run_claude_streaming(&["--session-id", sid], prompt, bot, chat_id).await - } - } -} - -pub async fn run_claude_streaming( - extra_args: &[&str], - prompt: &str, - bot: &Bot, - chat_id: ChatId, -) -> Result { - let mut args: Vec<&str> = vec![ - "--dangerously-skip-permissions", - "-p", - "--output-format", - "stream-json", - "--verbose", - ]; - args.extend(extra_args); - args.push(prompt); - - let mut child = Command::new("claude") - .args(&args) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - - let stdout = child.stdout.take().unwrap(); - let mut lines = tokio::io::BufReader::new(stdout).lines(); - - // sendMessageDraft for native streaming, with editMessageText fallback - let http = reqwest::Client::new(); - let token = bot.token().to_owned(); - let raw_chat_id = chat_id.0; - let draft_id: i64 = 1; - let mut use_draft = true; - - let mut msg_id: Option = None; - let mut last_sent_text = String::new(); - let mut last_edit = Instant::now(); - let mut final_result = String::new(); - let mut is_error = false; - let mut tool_status = String::new(); - - while let Ok(Some(line)) = lines.next_line().await { - let event: StreamEvent = match serde_json::from_str(&line) { - Ok(e) => e, - Err(_) => continue, - }; - - match event.event_type.as_str() { - "assistant" => { - if let Some(amsg) = &event.message { - // determine display content - let (display_raw, new_text) = - if let Some(status) = extract_tool_use(amsg) { - tool_status = format!("[{status}]"); - let d = if last_sent_text.is_empty() { - tool_status.clone() - } else { - format!("{last_sent_text}\n\n{tool_status}") - }; - (d, None) - } else { - let text = extract_text(amsg); - if text.is_empty() || text == last_sent_text { - continue; - } - let interval = if use_draft { - DRAFT_INTERVAL_MS - } else { - EDIT_INTERVAL_MS - }; - if last_edit.elapsed().as_millis() < interval as u128 { - continue; - } - tool_status.clear(); - (text.clone(), Some(text)) - }; - - let display = if use_draft { - // draft mode: no cursor — cursor breaks monotonic text growth - truncate_at_char_boundary(&display_raw, TG_MSG_LIMIT).to_string() - } else { - truncate_for_display(&display_raw) - }; - - if use_draft { - match send_message_draft( - &http, &token, raw_chat_id, draft_id, &display, - ) - .await - { - Ok(_) => { - if let Some(t) = new_text { - last_sent_text = t; - } - last_edit = Instant::now(); - } - Err(e) => { - warn!("sendMessageDraft failed, falling back: {e:#}"); - use_draft = false; - if let Ok(sent) = - bot.send_message(chat_id, &display).await - { - msg_id = Some(sent.id); - if let Some(t) = new_text { - last_sent_text = t; - } - last_edit = Instant::now(); - } - } - } - } else if let Some(id) = msg_id { - if bot - .edit_message_text(chat_id, id, &display) - .await - .is_ok() - { - if let Some(t) = new_text { - last_sent_text = t; - } - last_edit = Instant::now(); - } - } else if let Ok(sent) = - bot.send_message(chat_id, &display).await - { - msg_id = Some(sent.id); - if let Some(t) = new_text { - last_sent_text = t; - } - last_edit = Instant::now(); - } - } - } - "result" => { - final_result = event.result.unwrap_or_default(); - is_error = event.is_error; - } - _ => {} - } - } - - // read stderr before waiting (in case child already exited) - let stderr_handle = child.stderr.take(); - let status = child.wait().await; - - // collect stderr for diagnostics - let stderr_text = if let Some(mut se) = stderr_handle { - let mut buf = String::new(); - let _ = tokio::io::AsyncReadExt::read_to_string(&mut se, &mut buf).await; - buf - } else { - String::new() - }; - - // determine error: explicit is_error from stream, or non-zero exit with no result - let has_error = is_error - || (final_result.is_empty() - && status.as_ref().map(|s| !s.success()).unwrap_or(true)); - - if has_error { - let err_detail = if !final_result.is_empty() { - final_result.clone() - } else if !stderr_text.is_empty() { - stderr_text.trim().to_string() - } else { - format!("claude exited: {:?}", status) - }; - if !use_draft { - if let Some(id) = msg_id { - let _ = bot - .edit_message_text(chat_id, id, format!("[error] {err_detail}")) - .await; - } - } - anyhow::bail!("{err_detail}"); - } - - if final_result.is_empty() { - return Ok(final_result); - } - - send_final_result(bot, chat_id, msg_id, use_draft, &final_result).await; - - Ok(final_result) -} - -// ── openai-compatible backend (streaming) ────────────────────────── - -pub async fn run_openai_streaming( - endpoint: &str, - model: &str, - api_key: &str, - messages: &[serde_json::Value], - bot: &Bot, - chat_id: ChatId, -) -> Result { - let client = reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(120)) - .build() - .unwrap(); - let url = format!("{}/chat/completions", endpoint.trim_end_matches('/')); - - let body = serde_json::json!({ - "model": model, - "messages": messages, - "stream": true, - }); - - let mut resp = client - .post(&url) - .header("Authorization", format!("Bearer {api_key}")) - .json(&body) - .send() - .await? - .error_for_status()?; - - let token = bot.token().to_owned(); - let raw_chat_id = chat_id.0; - let draft_id: i64 = 1; - let mut use_draft = true; - - let mut msg_id: Option = None; - let mut accumulated = String::new(); - let mut last_edit = Instant::now(); - let mut buffer = String::new(); - let mut done = false; - - while let Some(chunk) = resp.chunk().await? { - if done { - break; - } - buffer.push_str(&String::from_utf8_lossy(&chunk)); - - while let Some(pos) = buffer.find('\n') { - let line = buffer[..pos].to_string(); - buffer = buffer[pos + 1..].to_string(); - - let trimmed = line.trim(); - if trimmed.is_empty() || trimmed.starts_with(':') { - continue; - } - - let data = match trimmed.strip_prefix("data: ") { - Some(d) => d, - None => continue, - }; - - if data.trim() == "[DONE]" { - done = true; - break; - } - - if let Ok(json) = serde_json::from_str::(data) { - if let Some(content) = json["choices"][0]["delta"]["content"].as_str() { - if content.is_empty() { - continue; - } - accumulated.push_str(content); - - let interval = if use_draft { - DRAFT_INTERVAL_MS - } else { - EDIT_INTERVAL_MS - }; - if last_edit.elapsed().as_millis() < interval as u128 { - continue; - } - - let display = if use_draft { - truncate_at_char_boundary(&accumulated, TG_MSG_LIMIT).to_string() - } else { - truncate_for_display(&accumulated) - }; - - if use_draft { - match send_message_draft( - &client, &token, raw_chat_id, draft_id, &display, - ) - .await - { - Ok(_) => { - last_edit = Instant::now(); - } - Err(e) => { - warn!("sendMessageDraft failed, falling back: {e:#}"); - use_draft = false; - if let Ok(sent) = bot.send_message(chat_id, &display).await { - msg_id = Some(sent.id); - last_edit = Instant::now(); - } - } - } - } else if let Some(id) = msg_id { - if bot.edit_message_text(chat_id, id, &display).await.is_ok() { - last_edit = Instant::now(); - } - } else if let Ok(sent) = bot.send_message(chat_id, &display).await { - msg_id = Some(sent.id); - last_edit = Instant::now(); - } - } - } - } - } - - if accumulated.is_empty() { - return Ok(accumulated); - } - - send_final_result(bot, chat_id, msg_id, use_draft, &accumulated).await; - - Ok(accumulated) -} - pub fn build_system_prompt(summary: &str, persona: &str, memory_slots: &[(i32, String, String)], inner_state: &str) -> serde_json::Value { let mut text = if persona.is_empty() { String::from("你是一个AI助手。") diff --git a/src/tools.rs b/src/tools.rs index 3503e28..989ae97 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -4,17 +4,15 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::Result; -use teloxide::prelude::*; -use teloxide::types::InputFile; use tokio::io::AsyncBufReadExt; use tokio::process::Command; use tokio::sync::RwLock; use tracing::{error, info, warn}; -use crate::config::{BackendConfig, Config}; +use crate::config::Config; use crate::display::truncate_at_char_boundary; +use crate::output::Output; use crate::state::AppState; -use crate::stream::{build_system_prompt, run_openai_streaming}; // ── subagent & tool call ─────────────────────────────────────────── @@ -261,10 +259,10 @@ pub async fn execute_tool( name: &str, arguments: &str, state: &Arc, - bot: &Bot, - chat_id: ChatId, + output: &mut dyn Output, sid: &str, config: &Arc, + chat_id: i64, ) -> String { let args: serde_json::Value = match serde_json::from_str(arguments) { Ok(v) => v, @@ -275,7 +273,7 @@ pub async fn execute_tool( "spawn_agent" => { let id = args["id"].as_str().unwrap_or("agent"); let task = args["task"].as_str().unwrap_or(""); - spawn_agent(id, task, state, bot, chat_id, sid, config).await + spawn_agent(id, task, state, output, sid, config).await } "agent_status" => { let id = args["id"].as_str().unwrap_or(""); @@ -295,13 +293,9 @@ pub async fn execute_tool( if !path.is_file() { return format!("Not a file: {path_str}"); } - let input_file = InputFile::file(path); - let mut req = bot.send_document(chat_id, input_file); - if !caption.is_empty() { - req = req.caption(caption); - } - match req.await { - Ok(_) => format!("File sent: {path_str}"), + match output.send_file(path, caption).await { + Ok(true) => format!("File sent: {path_str}"), + Ok(false) => format!("File sending not supported in this context: {path_str}"), Err(e) => format!("Failed to send file: {e:#}"), } } @@ -322,7 +316,7 @@ pub async fn execute_tool( Ok(next) => { let next_str = next.format("%Y-%m-%d %H:%M:%S").to_string(); let id = state - .add_timer(chat_id.0, label, schedule, &next_str) + .add_timer(chat_id, label, schedule, &next_str) .await; format!("Timer #{id} set: \"{label}\" → next fire at {next_str}") } @@ -330,7 +324,7 @@ pub async fn execute_tool( } } "list_timers" => { - let timers = state.list_timers(Some(chat_id.0)).await; + let timers = state.list_timers(Some(chat_id)).await; if timers.is_empty() { "No active timers.".to_string() } else { @@ -424,9 +418,9 @@ pub async fn execute_tool( let path_str = String::from_utf8_lossy(&out.stdout).trim().to_string(); let path = Path::new(&path_str); if path.exists() { - let input_file = InputFile::file(path); - match bot.send_voice(chat_id, input_file).await { - Ok(_) => format!("语音已发送: {path_str}"), + match output.send_file(path, "").await { + Ok(true) => format!("语音已发送: {path_str}"), + Ok(false) => format!("语音生成成功但当前通道不支持发送文件: {path_str}"), Err(e) => format!("语音生成成功但发送失败: {e:#}"), } } else { @@ -450,10 +444,9 @@ pub async fn spawn_agent( id: &str, task: &str, state: &Arc, - bot: &Bot, - chat_id: ChatId, - sid: &str, - config: &Arc, + output: &dyn Output, + _sid: &str, + _config: &Arc, ) -> String { // check if already exists if state.agents.read().await.contains_key(id) { @@ -471,13 +464,13 @@ pub async fn spawn_agent( }; let pid = child.id(); - let output = Arc::new(tokio::sync::RwLock::new(String::new())); + let agent_output = Arc::new(tokio::sync::RwLock::new(String::new())); let completed = Arc::new(AtomicBool::new(false)); let exit_code = Arc::new(tokio::sync::RwLock::new(None)); let agent = Arc::new(SubAgent { task: task.to_string(), - output: output.clone(), + output: agent_output.clone(), completed: completed.clone(), exit_code: exit_code.clone(), pid, @@ -485,15 +478,10 @@ pub async fn spawn_agent( state.agents.write().await.insert(id.to_string(), agent); - // background task: collect output and wakeup on completion - let out = output.clone(); + // background task: collect output + let out = agent_output.clone(); let done = completed.clone(); let ecode = exit_code.clone(); - let bot_c = bot.clone(); - let chat_id_c = chat_id; - let state_c = state.clone(); - let config_c = config.clone(); - let sid_c = sid.to_string(); let id_c = id.to_string(); tokio::spawn(async move { @@ -512,75 +500,12 @@ pub async fn spawn_agent( done.store(true, Ordering::SeqCst); info!(agent = %id_c, "agent completed, exit={code:?}"); - - // wakeup: inject result and trigger LLM - let result = out.read().await.clone(); - let result_short = truncate_at_char_boundary(&result, 4000); - let wakeup = format!( - "[Agent '{id_c}' 执行完成 (exit={})]\n{result_short}", - code.unwrap_or(-1) - ); - - if let Err(e) = agent_wakeup( - &config_c, &state_c, &bot_c, chat_id_c, &sid_c, &wakeup, &id_c, - ) - .await - { - error!(agent = %id_c, "wakeup failed: {e:#}"); - let _ = bot_c - .send_message(chat_id_c, format!("[agent wakeup error] {e:#}")) - .await; - } }); + let _ = output.status(&format!("Agent '{id}' spawned (pid={pid:?})")).await; format!("Agent '{id}' spawned (pid={pid:?})") } -pub async fn agent_wakeup( - config: &Config, - state: &AppState, - bot: &Bot, - chat_id: ChatId, - sid: &str, - wakeup_msg: &str, - agent_id: &str, -) -> Result<()> { - match &config.backend { - BackendConfig::OpenAI { - endpoint, - model, - api_key, - } => { - state.push_message(sid, "user", wakeup_msg).await; - let conv = state.load_conv(sid).await; - let persona = state.get_config("persona").await.unwrap_or_default(); - let memory_slots = state.get_memory_slots().await; - let inner = state.get_inner_state().await; - let system_msg = build_system_prompt(&conv.summary, &persona, &memory_slots, &inner); - let mut api_messages = vec![system_msg]; - api_messages.extend(conv.messages); - - info!(agent = %agent_id, "wakeup: sending {} messages to LLM", api_messages.len()); - - let response = - run_openai_streaming(endpoint, model, api_key, &api_messages, bot, chat_id) - .await?; - - if !response.is_empty() { - state.push_message(sid, "assistant", &response).await; - } - - Ok(()) - } - _ => { - let _ = bot - .send_message(chat_id, format!("[Agent '{agent_id}' done]\n{wakeup_msg}")) - .await; - Ok(()) - } - } -} - pub async fn check_agent_status(id: &str, state: &AppState) -> String { let agents = state.agents.read().await; match agents.get(id) {