noc/src/main.rs
Fam Zheng 128f2481c0 add tool calling, SQLite persistence, group chat, image vision, voice transcription
Major features:
- OpenAI function calling with tool call loop (streaming SSE parsing)
- Built-in tools: spawn_agent (async claude -p), agent_status, kill_agent,
  update_scratch, send_file
- Script-based tool discovery: tools/ dir with --schema convention
- Feishu todo management script (tools/manage_todo)
- SQLite persistence: conversations, messages, config, scratch_area tables
- Sliding window context (100 msgs, slide 50, auto-summarize)
- Conversation summary generation via LLM on window slide
- Group chat support with independent session contexts
- Image understanding: multimodal vision input (base64 to API)
- Voice transcription via faster-whisper Docker service
- Configurable persona stored in DB
- diag command for session diagnostics
- System prompt restructured: persona + tool instructions separated
- RUST_BACKTRACE=1 in service, clippy in deploy pipeline
- .gitignore for config/state/db files
2026-04-09 16:38:28 +01:00

2114 lines
72 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use anyhow::Result;
use chrono::{Local, NaiveDate, NaiveTime};
use serde::{Deserialize, Serialize};
use teloxide::dispatching::UpdateFilterExt;
use teloxide::net::Download;
use teloxide::prelude::*;
use teloxide::types::InputFile;
use tokio::io::AsyncBufReadExt;
use tokio::process::Command;
use tokio::sync::RwLock;
use tokio::time::Instant;
use tracing::{error, info, warn};
use base64::Engine;
use uuid::Uuid;
// ── config ──────────────────────────────────────────────────────────
#[derive(Deserialize)]
struct Config {
#[serde(default = "default_name")]
name: String,
tg: TgConfig,
auth: AuthConfig,
session: SessionConfig,
#[serde(default)]
backend: BackendConfig,
#[serde(default)]
whisper_url: Option<String>,
}
fn default_name() -> String {
"noc".to_string()
}
#[derive(Deserialize, Clone, Default)]
#[serde(tag = "type")]
enum BackendConfig {
#[serde(rename = "claude")]
#[default]
Claude,
#[serde(rename = "openai")]
OpenAI {
endpoint: String,
model: String,
#[serde(default = "default_api_key")]
api_key: String,
},
}
fn default_api_key() -> String {
"unused".to_string()
}
#[derive(Deserialize)]
struct TgConfig {
key: String,
}
#[derive(Deserialize)]
struct AuthConfig {
passphrase: String,
}
#[derive(Deserialize)]
struct SessionConfig {
refresh_hour: u32,
}
// ── persistent state ────────────────────────────────────────────────
#[derive(Serialize, Deserialize, Default)]
struct Persistent {
authed: HashMap<i64, NaiveDate>,
known_sessions: HashSet<String>,
}
#[derive(Serialize, Deserialize, Clone, Default)]
struct ConversationState {
summary: String,
messages: Vec<serde_json::Value>,
total_messages: usize,
}
const MAX_WINDOW: usize = 100;
const SLIDE_SIZE: usize = 50;
// ── subagent & tool call ───────────────────────────────────────────
struct SubAgent {
task: String,
output: Arc<tokio::sync::RwLock<String>>,
completed: Arc<AtomicBool>,
exit_code: Arc<tokio::sync::RwLock<Option<i32>>>,
pid: Option<u32>,
}
struct ToolCall {
id: String,
name: String,
arguments: String,
}
fn tools_dir() -> PathBuf {
// tools/ relative to the config file location
let config_path = std::env::var("NOC_CONFIG").unwrap_or_else(|_| "config.yaml".into());
let config_dir = Path::new(&config_path)
.parent()
.unwrap_or(Path::new("."));
config_dir.join("tools")
}
/// Scan tools/ directory for scripts with --schema, merge with built-in tools.
/// Called on every API request so new/updated scripts take effect immediately.
fn discover_tools() -> serde_json::Value {
let mut tools = vec![
serde_json::json!({
"type": "function",
"function": {
"name": "spawn_agent",
"description": "Spawn a Claude Code subagent to handle a complex task asynchronously. You'll be notified when it completes.",
"parameters": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "Short unique identifier (e.g. 'research', 'fix-bug')"},
"task": {"type": "string", "description": "Detailed task description for the agent"}
},
"required": ["id", "task"]
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "agent_status",
"description": "Check the current status and output of a running or completed agent",
"parameters": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "The agent identifier"}
},
"required": ["id"]
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "kill_agent",
"description": "Terminate a running agent",
"parameters": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "The agent identifier"}
},
"required": ["id"]
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "send_file",
"description": "Send a file from the server to the user via Telegram. The file must exist on the server filesystem.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Absolute path to the file on the server"},
"caption": {"type": "string", "description": "Optional caption/description for the file"}
},
"required": ["path"]
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "update_scratch",
"description": "Update your scratch area (working notes, state, reminders). This content is appended to every user message so you always see it. Use it to track ongoing context across turns.",
"parameters": {
"type": "object",
"properties": {
"content": {"type": "string", "description": "The full scratch area content (replaces previous)"}
},
"required": ["content"]
}
}
}),
];
// discover script tools
let dir = tools_dir();
if let Ok(entries) = std::fs::read_dir(&dir) {
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
// run --schema with a short timeout
let output = std::process::Command::new(&path)
.arg("--schema")
.output();
match output {
Ok(out) if out.status.success() => {
let stdout = String::from_utf8_lossy(&out.stdout);
match serde_json::from_str::<serde_json::Value>(stdout.trim()) {
Ok(schema) => {
let name = schema["name"].as_str().unwrap_or("?");
info!(tool = %name, path = %path.display(), "discovered script tool");
tools.push(serde_json::json!({
"type": "function",
"function": schema,
}));
}
Err(e) => {
warn!(path = %path.display(), "invalid --schema JSON: {e}");
}
}
}
_ => {} // not a tool script, skip silently
}
}
}
serde_json::Value::Array(tools)
}
struct AppState {
persist: RwLock<Persistent>,
state_path: PathBuf,
db: tokio::sync::Mutex<rusqlite::Connection>,
agents: RwLock<HashMap<String, Arc<SubAgent>>>,
}
impl AppState {
fn load(path: PathBuf) -> Self {
let persist = std::fs::read_to_string(&path)
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_default();
info!("loaded state from {}", path.display());
let db_path = path.parent().unwrap_or(Path::new(".")).join("noc.db");
let conn = rusqlite::Connection::open(&db_path)
.unwrap_or_else(|e| panic!("open {}: {e}", db_path.display()));
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS conversations (
session_id TEXT PRIMARY KEY,
summary TEXT NOT NULL DEFAULT '',
total_messages INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id);
CREATE TABLE IF NOT EXISTS scratch_area (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL DEFAULT '',
create_time TEXT NOT NULL DEFAULT (datetime('now')),
update_time TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS config_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT NOT NULL,
value TEXT NOT NULL,
create_time TEXT NOT NULL,
update_time TEXT NOT NULL
);",
)
.expect("init db schema");
info!("opened db {}", db_path.display());
Self {
persist: RwLock::new(persist),
state_path: path,
db: tokio::sync::Mutex::new(conn),
agents: RwLock::new(HashMap::new()),
}
}
async fn save(&self) {
let data = self.persist.read().await;
if let Ok(json) = serde_json::to_string_pretty(&*data) {
if let Err(e) = std::fs::write(&self.state_path, json) {
error!("save state: {e}");
}
}
}
async fn load_conv(&self, sid: &str) -> ConversationState {
let db = self.db.lock().await;
let (summary, total) = db
.query_row(
"SELECT summary, total_messages FROM conversations WHERE session_id = ?1",
[sid],
|row| Ok((row.get::<_, String>(0)?, row.get::<_, usize>(1)?)),
)
.unwrap_or_default();
let mut stmt = db
.prepare("SELECT role, content FROM messages WHERE session_id = ?1 ORDER BY id")
.unwrap();
let messages: Vec<serde_json::Value> = stmt
.query_map([sid], |row| {
let role: String = row.get(0)?;
let content: String = row.get(1)?;
Ok(serde_json::json!({"role": role, "content": content}))
})
.unwrap()
.filter_map(|r| r.ok())
.collect();
ConversationState {
summary,
messages,
total_messages: total,
}
}
async fn push_message(&self, sid: &str, role: &str, content: &str) {
let db = self.db.lock().await;
let _ = db.execute(
"INSERT OR IGNORE INTO conversations (session_id) VALUES (?1)",
[sid],
);
let _ = db.execute(
"INSERT INTO messages (session_id, role, content) VALUES (?1, ?2, ?3)",
rusqlite::params![sid, role, content],
);
}
async fn message_count(&self, sid: &str) -> usize {
let db = self.db.lock().await;
db.query_row(
"SELECT COUNT(*) FROM messages WHERE session_id = ?1",
[sid],
|row| row.get(0),
)
.unwrap_or(0)
}
async fn slide_window(&self, sid: &str, new_summary: &str, slide_size: usize) {
let db = self.db.lock().await;
let _ = db.execute(
"DELETE FROM messages WHERE id IN (
SELECT id FROM messages WHERE session_id = ?1 ORDER BY id LIMIT ?2
)",
rusqlite::params![sid, slide_size],
);
let _ = db.execute(
"UPDATE conversations SET summary = ?1, total_messages = total_messages + ?2 \
WHERE session_id = ?3",
rusqlite::params![new_summary, slide_size, sid],
);
}
async fn get_oldest_messages(&self, sid: &str, count: usize) -> Vec<serde_json::Value> {
let db = self.db.lock().await;
let mut stmt = db
.prepare(
"SELECT role, content FROM messages WHERE session_id = ?1 ORDER BY id LIMIT ?2",
)
.unwrap();
stmt.query_map(rusqlite::params![sid, count], |row| {
let role: String = row.get(0)?;
let content: String = row.get(1)?;
Ok(serde_json::json!({"role": role, "content": content}))
})
.unwrap()
.filter_map(|r| r.ok())
.collect()
}
async fn get_scratch(&self) -> String {
let db = self.db.lock().await;
db.query_row(
"SELECT content FROM scratch_area ORDER BY id DESC LIMIT 1",
[],
|row| row.get(0),
)
.unwrap_or_default()
}
async fn push_scratch(&self, content: &str) {
let db = self.db.lock().await;
let _ = db.execute(
"INSERT INTO scratch_area (content) VALUES (?1)",
[content],
);
}
async fn get_config(&self, key: &str) -> Option<String> {
let db = self.db.lock().await;
db.query_row(
"SELECT value FROM config WHERE key = ?1",
[key],
|row| row.get(0),
)
.ok()
}
}
// ── helpers ─────────────────────────────────────────────────────────
fn session_date(refresh_hour: u32) -> NaiveDate {
let now = Local::now();
let refresh = NaiveTime::from_hms_opt(refresh_hour, 0, 0).unwrap();
if now.time() < refresh {
now.date_naive() - chrono::Duration::days(1)
} else {
now.date_naive()
}
}
fn session_uuid(prefix: &str, chat_id: i64, refresh_hour: u32) -> String {
let date = session_date(refresh_hour);
let name = format!("{}-{}-{}", prefix, chat_id, date.format("%Y%m%d"));
Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()).to_string()
}
fn home_dir() -> PathBuf {
PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/tmp".into()))
}
fn incoming_dir() -> PathBuf {
home_dir().join("incoming")
}
fn outgoing_dir(sid: &str) -> PathBuf {
home_dir().join("outgoing").join(sid)
}
// ── main ────────────────────────────────────────────────────────────
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("noc=info".parse().unwrap()),
)
.init();
let config_path = std::env::var("NOC_CONFIG").unwrap_or_else(|_| "config.yaml".into());
let raw = std::fs::read_to_string(&config_path)
.unwrap_or_else(|e| panic!("read {config_path}: {e}"));
let config: Config =
serde_yaml::from_str(&raw).unwrap_or_else(|e| panic!("parse config: {e}"));
let state_path = std::env::var("NOC_STATE")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("state.json"));
let state = Arc::new(AppState::load(state_path));
let _ = std::fs::create_dir_all(incoming_dir());
let bot = Bot::new(&config.tg.key);
let me = bot.get_me().await.unwrap();
let bot_username = Arc::new(me.username.clone().unwrap_or_default());
info!(username = %bot_username, "noc bot starting");
let handler = Update::filter_message().endpoint(handle);
Dispatcher::builder(bot, handler)
.dependencies(dptree::deps![state, Arc::new(config), bot_username])
.default_handler(|_| async {})
.build()
.dispatch()
.await;
}
// ── file download ───────────────────────────────────────────────────
async fn download_tg_file(bot: &Bot, file_id: &str, filename: &str) -> Result<PathBuf> {
let dir = incoming_dir();
tokio::fs::create_dir_all(&dir).await?;
let dest = dir.join(filename);
let tf = bot.get_file(file_id).await?;
let mut file = tokio::fs::File::create(&dest).await?;
bot.download_file(&tf.path, &mut file).await?;
info!("downloaded {} -> {}", filename, dest.display());
Ok(dest)
}
// ── outgoing scan ───────────────────────────────────────────────────
async fn snapshot_dir(dir: &Path) -> HashSet<PathBuf> {
let mut set = HashSet::new();
if let Ok(mut entries) = tokio::fs::read_dir(dir).await {
while let Ok(Some(entry)) = entries.next_entry().await {
let path = entry.path();
if path.is_file() {
set.insert(path);
}
}
}
set
}
async fn new_files_in(dir: &Path, before: &HashSet<PathBuf>) -> Vec<PathBuf> {
let mut files = Vec::new();
if let Ok(mut entries) = tokio::fs::read_dir(dir).await {
while let Ok(Some(entry)) = entries.next_entry().await {
let path = entry.path();
if path.is_file() && !before.contains(&path) {
files.push(path);
}
}
}
files.sort();
files
}
// ── handler ─────────────────────────────────────────────────────────
async fn handle(
bot: Bot,
msg: Message,
state: Arc<AppState>,
config: Arc<Config>,
_bot_username: Arc<String>,
) -> ResponseResult<()> {
let chat_id = msg.chat.id;
let is_private = msg.chat.is_private();
let text = msg.text().or(msg.caption()).unwrap_or("").to_string();
let raw_id = chat_id.0;
let date = session_date(config.session.refresh_hour);
let is_authed = {
let p = state.persist.read().await;
p.authed.get(&raw_id) == Some(&date)
};
if !is_authed {
if text.trim() == config.auth.passphrase {
{
let mut p = state.persist.write().await;
p.authed.insert(raw_id, date);
}
state.save().await;
bot.send_message(chat_id, "authenticated").await?;
info!(chat = raw_id, "authed");
} else {
bot.send_message(chat_id, "not authenticated").await?;
}
return Ok(());
}
if let Err(e) = handle_inner(&bot, &msg, chat_id, &text, is_private, &state, &config).await {
error!(chat = raw_id, "handle: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
Ok(())
}
async fn handle_inner(
bot: &Bot,
msg: &Message,
chat_id: ChatId,
text: &str,
is_private: bool,
state: &Arc<AppState>,
config: &Arc<Config>,
) -> Result<()> {
let mut uploaded: Vec<PathBuf> = Vec::new();
let mut download_errors: Vec<String> = Vec::new();
let mut transcriptions: Vec<String> = Vec::new();
if let Some(doc) = msg.document() {
let name = doc.file_name.as_deref().unwrap_or("file");
match download_tg_file(bot, &doc.file.id, name).await {
Ok(p) => uploaded.push(p),
Err(e) => download_errors.push(format!("{name}: {e:#}")),
}
}
if let Some(photos) = msg.photo() {
if let Some(photo) = photos.last() {
let name = format!("photo_{}.jpg", Local::now().format("%H%M%S"));
match download_tg_file(bot, &photo.file.id, &name).await {
Ok(p) => uploaded.push(p),
Err(e) => download_errors.push(format!("photo: {e:#}")),
}
}
}
if let Some(audio) = msg.audio() {
let fallback = format!("audio_{}.ogg", Local::now().format("%H%M%S"));
let name = audio.file_name.as_deref().unwrap_or(&fallback);
match download_tg_file(bot, &audio.file.id, name).await {
Ok(p) => {
if let Some(url) = &config.whisper_url {
match transcribe_audio(url, &p).await {
Ok(t) if !t.is_empty() => transcriptions.push(t),
Ok(_) => uploaded.push(p),
Err(e) => {
warn!("transcribe failed: {e:#}");
uploaded.push(p);
}
}
} else {
uploaded.push(p);
}
}
Err(e) => download_errors.push(format!("audio: {e:#}")),
}
}
if let Some(voice) = msg.voice() {
let name = format!("voice_{}.ogg", Local::now().format("%H%M%S"));
match download_tg_file(bot, &voice.file.id, &name).await {
Ok(p) => {
if let Some(url) = &config.whisper_url {
match transcribe_audio(url, &p).await {
Ok(t) if !t.is_empty() => transcriptions.push(t),
Ok(_) => uploaded.push(p),
Err(e) => {
warn!("transcribe failed: {e:#}");
uploaded.push(p);
}
}
} else {
uploaded.push(p);
}
}
Err(e) => download_errors.push(format!("voice: {e:#}")),
}
}
if let Some(video) = msg.video() {
let fallback = format!("video_{}.mp4", Local::now().format("%H%M%S"));
let name = video.file_name.as_deref().unwrap_or(&fallback);
match download_tg_file(bot, &video.file.id, name).await {
Ok(p) => uploaded.push(p),
Err(e) => download_errors.push(format!("video: {e:#}")),
}
}
if let Some(vn) = msg.video_note() {
let name = format!("videonote_{}.mp4", Local::now().format("%H%M%S"));
match download_tg_file(bot, &vn.file.id, &name).await {
Ok(p) => uploaded.push(p),
Err(e) => download_errors.push(format!("video_note: {e:#}")),
}
}
if text.is_empty() && uploaded.is_empty() && transcriptions.is_empty() {
if !download_errors.is_empty() {
let err_msg = format!("[文件下载失败]\n{}", download_errors.join("\n"));
bot.send_message(chat_id, err_msg).await?;
}
return Ok(());
}
let sid = session_uuid(&config.name, chat_id.0, config.session.refresh_hour);
info!(%sid, "recv");
let out_dir = outgoing_dir(&sid);
tokio::fs::create_dir_all(&out_dir).await?;
let before = snapshot_dir(&out_dir).await;
// handle diag command (OpenAI backend only)
if text.trim() == "diag" {
if let BackendConfig::OpenAI { .. } = &config.backend {
let conv = state.load_conv(&sid).await;
let count = state.message_count(&sid).await;
let persona = state.get_config("persona").await.unwrap_or_default();
let scratch = state.get_scratch().await;
let diag = format!(
"session: {sid}\n\
window: {count}/{MAX_WINDOW} (slide at {MAX_WINDOW}, drop {SLIDE_SIZE})\n\
total processed: {}\n\n\
persona ({} chars):\n{}\n\n\
scratch ({} chars):\n{}\n\n\
summary ({} chars):\n{}",
conv.total_messages + count,
persona.len(),
if persona.is_empty() { "(default)" } else { &persona },
scratch.len(),
if scratch.is_empty() { "(empty)" } else { &scratch },
conv.summary.len(),
if conv.summary.is_empty() {
"(empty)".to_string()
} else {
conv.summary
}
);
bot.send_message(chat_id, diag).await?;
return Ok(());
}
}
let prompt = build_prompt(text, &uploaded, &download_errors, &transcriptions);
match &config.backend {
BackendConfig::Claude => {
let known = state.persist.read().await.known_sessions.contains(&sid);
let result =
invoke_claude_streaming(&sid, &prompt, known, bot, chat_id).await;
match &result {
Ok(_) => {
if !known {
state.persist.write().await.known_sessions.insert(sid.clone());
state.save().await;
}
}
Err(e) => {
error!(%sid, "claude: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
}
}
BackendConfig::OpenAI {
endpoint,
model,
api_key,
} => {
let conv = state.load_conv(&sid).await;
let persona = state.get_config("persona").await.unwrap_or_default();
let system_msg = build_system_prompt(&conv.summary, &persona);
let mut api_messages = vec![system_msg];
api_messages.extend(conv.messages);
let scratch = state.get_scratch().await;
let user_content = build_user_content(&prompt, &scratch, &uploaded);
api_messages.push(serde_json::json!({"role": "user", "content": user_content}));
match run_openai_with_tools(
endpoint, model, api_key, api_messages, bot, chat_id, state, &sid, config, is_private,
)
.await
{
Ok(response) => {
state.push_message(&sid, "user", &prompt).await;
if !response.is_empty() {
state.push_message(&sid, "assistant", &response).await;
}
// sliding window
let count = state.message_count(&sid).await;
if count >= MAX_WINDOW {
info!(%sid, "sliding window: {count} messages, summarizing oldest {SLIDE_SIZE}");
let _ = bot
.send_message(chat_id, "[整理记忆中...]")
.await;
let to_summarize =
state.get_oldest_messages(&sid, SLIDE_SIZE).await;
let current_summary = {
let db = state.db.lock().await;
db.query_row(
"SELECT summary FROM conversations WHERE session_id = ?1",
[&sid],
|row| row.get::<_, String>(0),
)
.unwrap_or_default()
};
match summarize_messages(
endpoint,
model,
api_key,
&current_summary,
&to_summarize,
)
.await
{
Ok(new_summary) => {
state.slide_window(&sid, &new_summary, SLIDE_SIZE).await;
let remaining = state.message_count(&sid).await;
info!(%sid, "window slid, {remaining} messages remain, summary {} chars", new_summary.len());
}
Err(e) => {
warn!(%sid, "summarize failed: {e:#}, keeping all messages");
}
}
}
}
Err(e) => {
error!(%sid, "openai: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
}
}
}
// send new files from outgoing dir
let new_files = new_files_in(&out_dir, &before).await;
for path in &new_files {
info!(%sid, "sending file: {}", path.display());
if let Err(e) = bot.send_document(chat_id, InputFile::file(path)).await {
error!(%sid, "send_document {}: {e:#}", path.display());
let _ = bot
.send_message(chat_id, format!("[发送文件失败: {}]", path.display()))
.await;
}
}
Ok(())
}
fn build_prompt(
text: &str,
uploaded: &[PathBuf],
errors: &[String],
transcriptions: &[String],
) -> String {
let mut parts = Vec::new();
for t in transcriptions {
parts.push(format!("[语音消息] {t}"));
}
for f in uploaded {
parts.push(format!("[用户上传了文件: {}]", f.display()));
}
for e in errors {
parts.push(format!("[文件下载失败: {e}]"));
}
if !text.is_empty() {
parts.push(text.to_string());
}
parts.join("\n")
}
async fn transcribe_audio(whisper_url: &str, file_path: &Path) -> Result<String> {
let client = reqwest::Client::new();
let url = format!("{}/v1/audio/transcriptions", whisper_url.trim_end_matches('/'));
let file_bytes = tokio::fs::read(file_path).await?;
let file_name = file_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("audio.ogg")
.to_string();
let part = reqwest::multipart::Part::bytes(file_bytes)
.file_name(file_name)
.mime_str("audio/ogg")?;
let form = reqwest::multipart::Form::new()
.part("file", part)
.text("model", "base");
let resp = client.post(&url).multipart(form).send().await?.error_for_status()?;
let json: serde_json::Value = resp.json().await?;
Ok(json["text"].as_str().unwrap_or("").to_string())
}
fn build_system_prompt(summary: &str, persona: &str) -> serde_json::Value {
let mut text = if persona.is_empty() {
String::from("你是一个AI助手。")
} else {
persona.to_string()
};
text.push_str(
"\n\n你可以使用提供的工具来完成任务。\
当需要执行命令、运行代码或启动复杂子任务时,直接调用对应的工具,不要只是描述你会怎么做。\
输出格式使用纯文本或基础Markdown加粗、列表、代码块\
不要使用LaTeX公式$...$、特殊Unicode符号→←↔或HTML标签Telegram无法渲染这些。",
);
if !summary.is_empty() {
text.push_str("\n\n## 之前的对话总结\n");
text.push_str(summary);
}
serde_json::json!({"role": "system", "content": text})
}
/// Build user message content, with optional images as multimodal input.
fn build_user_content(
text: &str,
scratch: &str,
images: &[PathBuf],
) -> serde_json::Value {
let full_text = if scratch.is_empty() {
text.to_string()
} else {
format!("{text}\n\n[scratch]\n{scratch}")
};
// collect image data
let mut image_parts: Vec<serde_json::Value> = Vec::new();
for path in images {
let mime = match path
.extension()
.and_then(|e| e.to_str())
.map(|e| e.to_lowercase())
.as_deref()
{
Some("jpg" | "jpeg") => "image/jpeg",
Some("png") => "image/png",
Some("gif") => "image/gif",
Some("webp") => "image/webp",
_ => continue,
};
if let Ok(data) = std::fs::read(path) {
let b64 = base64::engine::general_purpose::STANDARD.encode(&data);
image_parts.push(serde_json::json!({
"type": "image_url",
"image_url": {"url": format!("data:{mime};base64,{b64}")}
}));
}
}
if image_parts.is_empty() {
// plain text — more compatible
serde_json::Value::String(full_text)
} else {
// multimodal array
let mut content = vec![serde_json::json!({"type": "text", "text": full_text})];
content.extend(image_parts);
serde_json::Value::Array(content)
}
}
async fn summarize_messages(
endpoint: &str,
model: &str,
api_key: &str,
existing_summary: &str,
dropped: &[serde_json::Value],
) -> Result<String> {
let msgs_text: String = dropped
.iter()
.filter_map(|m| {
let role = m["role"].as_str()?;
let content = m["content"].as_str()?;
Some(format!("{role}: {content}"))
})
.collect::<Vec<_>>()
.join("\n\n");
let prompt = if existing_summary.is_empty() {
format!(
"请将以下对话总结为约4000字符的摘要保留关键信息和上下文\n\n{}",
msgs_text
)
} else {
format!(
"请将以下新对话内容整合到现有总结中保持总结在约4000字符以内。\
保留重要信息,让较旧的话题自然淡出。\n\n\
现有总结:\n{}\n\n新对话:\n{}",
existing_summary, msgs_text
)
};
let client = reqwest::Client::new();
let url = format!("{}/chat/completions", endpoint.trim_end_matches('/'));
let body = serde_json::json!({
"model": model,
"messages": [
{"role": "system", "content": "你是一个对话总结助手。请生成简洁但信息丰富的总结。"},
{"role": "user", "content": prompt}
],
});
let resp = client
.post(&url)
.header("Authorization", format!("Bearer {api_key}"))
.json(&body)
.send()
.await?
.error_for_status()?;
let json: serde_json::Value = resp.json().await?;
let summary = json["choices"][0]["message"]["content"]
.as_str()
.unwrap_or("")
.to_string();
Ok(summary)
}
// ── tool execution ─────────────────────────────────────────────────
async fn execute_tool(
name: &str,
arguments: &str,
state: &Arc<AppState>,
bot: &Bot,
chat_id: ChatId,
sid: &str,
config: &Arc<Config>,
) -> String {
let args: serde_json::Value = match serde_json::from_str(arguments) {
Ok(v) => v,
Err(e) => return format!("Invalid arguments: {e}"),
};
match name {
"spawn_agent" => {
let id = args["id"].as_str().unwrap_or("agent");
let task = args["task"].as_str().unwrap_or("");
spawn_agent(id, task, state, bot, chat_id, sid, config).await
}
"agent_status" => {
let id = args["id"].as_str().unwrap_or("");
check_agent_status(id, state).await
}
"kill_agent" => {
let id = args["id"].as_str().unwrap_or("");
kill_agent(id, state).await
}
"send_file" => {
let path_str = args["path"].as_str().unwrap_or("");
let caption = args["caption"].as_str().unwrap_or("");
let path = Path::new(path_str);
if !path.exists() {
return format!("File not found: {path_str}");
}
if !path.is_file() {
return format!("Not a file: {path_str}");
}
let input_file = InputFile::file(path);
let mut req = bot.send_document(chat_id, input_file);
if !caption.is_empty() {
req = req.caption(caption);
}
match req.await {
Ok(_) => format!("File sent: {path_str}"),
Err(e) => format!("Failed to send file: {e:#}"),
}
}
"update_scratch" => {
let content = args["content"].as_str().unwrap_or("");
state.push_scratch(content).await;
format!("Scratch updated ({} chars)", content.len())
}
_ => run_script_tool(name, arguments).await,
}
}
async fn spawn_agent(
id: &str,
task: &str,
state: &Arc<AppState>,
bot: &Bot,
chat_id: ChatId,
sid: &str,
config: &Arc<Config>,
) -> String {
// check if already exists
if state.agents.read().await.contains_key(id) {
return format!("Agent '{id}' already exists. Use agent_status to check it.");
}
let mut child = match Command::new("claude")
.args(["--dangerously-skip-permissions", "-p", task])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => return format!("Failed to spawn agent: {e}"),
};
let pid = child.id();
let output = Arc::new(tokio::sync::RwLock::new(String::new()));
let completed = Arc::new(AtomicBool::new(false));
let exit_code = Arc::new(tokio::sync::RwLock::new(None));
let agent = Arc::new(SubAgent {
task: task.to_string(),
output: output.clone(),
completed: completed.clone(),
exit_code: exit_code.clone(),
pid,
});
state.agents.write().await.insert(id.to_string(), agent);
// background task: collect output and wakeup on completion
let out = output.clone();
let done = completed.clone();
let ecode = exit_code.clone();
let bot_c = bot.clone();
let chat_id_c = chat_id;
let state_c = state.clone();
let config_c = config.clone();
let sid_c = sid.to_string();
let id_c = id.to_string();
tokio::spawn(async move {
let stdout = child.stdout.take();
if let Some(stdout) = stdout {
let mut lines = tokio::io::BufReader::new(stdout).lines();
while let Ok(Some(line)) = lines.next_line().await {
let mut o = out.write().await;
o.push_str(&line);
o.push('\n');
}
}
let status = child.wait().await;
let code = status.as_ref().ok().and_then(|s| s.code());
*ecode.write().await = code;
done.store(true, Ordering::SeqCst);
info!(agent = %id_c, "agent completed, exit={code:?}");
// wakeup: inject result and trigger LLM
let result = out.read().await.clone();
let result_short = truncate_at_char_boundary(&result, 4000);
let wakeup = format!(
"[Agent '{id_c}' 执行完成 (exit={})]\n{result_short}",
code.unwrap_or(-1)
);
if let Err(e) = agent_wakeup(
&config_c, &state_c, &bot_c, chat_id_c, &sid_c, &wakeup, &id_c,
)
.await
{
error!(agent = %id_c, "wakeup failed: {e:#}");
let _ = bot_c
.send_message(chat_id_c, format!("[agent wakeup error] {e:#}"))
.await;
}
});
format!("Agent '{id}' spawned (pid={pid:?})")
}
async fn agent_wakeup(
config: &Config,
state: &AppState,
bot: &Bot,
chat_id: ChatId,
sid: &str,
wakeup_msg: &str,
agent_id: &str,
) -> Result<()> {
match &config.backend {
BackendConfig::OpenAI {
endpoint,
model,
api_key,
} => {
state.push_message(sid, "user", wakeup_msg).await;
let conv = state.load_conv(sid).await;
let persona = state.get_config("persona").await.unwrap_or_default();
let system_msg = build_system_prompt(&conv.summary, &persona);
let mut api_messages = vec![system_msg];
api_messages.extend(conv.messages);
info!(agent = %agent_id, "wakeup: sending {} messages to LLM", api_messages.len());
let response =
run_openai_streaming(endpoint, model, api_key, &api_messages, bot, chat_id)
.await?;
if !response.is_empty() {
state.push_message(sid, "assistant", &response).await;
}
Ok(())
}
_ => {
let _ = bot
.send_message(chat_id, format!("[Agent '{agent_id}' done]\n{wakeup_msg}"))
.await;
Ok(())
}
}
}
async fn check_agent_status(id: &str, state: &AppState) -> String {
let agents = state.agents.read().await;
match agents.get(id) {
Some(agent) => {
let status = if agent.completed.load(Ordering::SeqCst) {
let code = agent.exit_code.read().await;
format!("completed (exit={})", code.unwrap_or(-1))
} else {
"running".to_string()
};
let output = agent.output.read().await;
let out_preview = truncate_at_char_boundary(&output, 3000);
format!(
"Agent '{id}': {status}\nTask: {}\nOutput ({} bytes):\n{out_preview}",
agent.task,
output.len()
)
}
None => format!("Agent '{id}' not found"),
}
}
async fn kill_agent(id: &str, state: &AppState) -> String {
let agents = state.agents.read().await;
match agents.get(id) {
Some(agent) => {
if agent.completed.load(Ordering::SeqCst) {
return format!("Agent '{id}' already completed");
}
if let Some(pid) = agent.pid {
unsafe {
libc::kill(pid as i32, libc::SIGTERM);
}
format!("Sent SIGTERM to agent '{id}' (pid={pid})")
} else {
format!("Agent '{id}' has no PID")
}
}
None => format!("Agent '{id}' not found"),
}
}
async fn run_script_tool(name: &str, arguments: &str) -> String {
// find script in tools/ that matches this tool name
let dir = tools_dir();
let entries = match std::fs::read_dir(&dir) {
Ok(e) => e,
Err(_) => return format!("Unknown tool: {name}"),
};
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
// check if this script provides the requested tool
let schema_out = std::process::Command::new(&path)
.arg("--schema")
.output();
if let Ok(out) = schema_out {
if out.status.success() {
let stdout = String::from_utf8_lossy(&out.stdout);
if let Ok(schema) = serde_json::from_str::<serde_json::Value>(stdout.trim()) {
if schema["name"].as_str() == Some(name) {
// found it — execute
info!(tool = %name, path = %path.display(), "running script tool");
let result = tokio::time::timeout(
std::time::Duration::from_secs(60),
Command::new(&path).arg(arguments).output(),
)
.await;
return match result {
Ok(Ok(output)) => {
let mut s = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr);
if !stderr.is_empty() {
if !s.is_empty() {
s.push_str("\n[stderr]\n");
}
s.push_str(&stderr);
}
if s.is_empty() {
format!("(exit={})", output.status.code().unwrap_or(-1))
} else {
s
}
}
Ok(Err(e)) => format!("Failed to execute {name}: {e}"),
Err(_) => "Timeout after 60s".to_string(),
};
}
}
}
}
}
format!("Unknown tool: {name}")
}
// ── openai with tool call loop ─────────────────────────────────────
#[allow(clippy::too_many_arguments)]
async fn run_openai_with_tools(
endpoint: &str,
model: &str,
api_key: &str,
mut messages: Vec<serde_json::Value>,
bot: &Bot,
chat_id: ChatId,
state: &Arc<AppState>,
sid: &str,
config: &Arc<Config>,
is_private: bool,
) -> Result<String> {
let client = reqwest::Client::new();
let url = format!("{}/chat/completions", endpoint.trim_end_matches('/'));
let tools = discover_tools();
loop {
let body = serde_json::json!({
"model": model,
"messages": messages,
"tools": tools,
"stream": true,
});
info!("API request: {} messages, {} tools",
messages.len(),
tools.as_array().map(|a| a.len()).unwrap_or(0));
let resp_raw = client
.post(&url)
.header("Authorization", format!("Bearer {api_key}"))
.json(&body)
.send()
.await?;
if !resp_raw.status().is_success() {
let status = resp_raw.status();
let body_text = resp_raw.text().await.unwrap_or_default();
// dump messages for debugging
for (i, m) in messages.iter().enumerate() {
let role = m["role"].as_str().unwrap_or("?");
let content_len = m["content"].as_str().map(|s| s.len()).unwrap_or(0);
let has_tc = m.get("tool_calls").is_some();
let has_tcid = m.get("tool_call_id").is_some();
warn!(" msg[{i}] role={role} content_len={content_len} tool_calls={has_tc} tool_call_id={has_tcid}");
}
error!("OpenAI API {status}: {body_text}");
anyhow::bail!("OpenAI API {status}: {body_text}");
}
let mut resp = resp_raw;
let token = bot.token().to_owned();
let raw_chat_id = chat_id.0;
let draft_id: i64 = 1;
let mut use_draft = is_private; // sendMessageDraft only works in private chats
let mut msg_id: Option<teloxide::types::MessageId> = None;
let mut accumulated = String::new();
let mut last_edit = Instant::now();
let mut buffer = String::new();
let mut done = false;
// tool call accumulation
let mut tool_calls: Vec<ToolCall> = Vec::new();
let mut has_tool_calls = false;
while let Some(chunk) = resp.chunk().await? {
if done {
break;
}
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(pos) = buffer.find('\n') {
let line = buffer[..pos].to_string();
buffer = buffer[pos + 1..].to_string();
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with(':') {
continue;
}
let data = match trimmed.strip_prefix("data: ") {
Some(d) => d,
None => continue,
};
if data.trim() == "[DONE]" {
done = true;
break;
}
if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
let delta = &json["choices"][0]["delta"];
// handle content delta
if let Some(content) = delta["content"].as_str() {
if !content.is_empty() {
accumulated.push_str(content);
}
}
// handle tool call delta
if let Some(tc_arr) = delta["tool_calls"].as_array() {
has_tool_calls = true;
for tc in tc_arr {
let idx = tc["index"].as_u64().unwrap_or(0) as usize;
while tool_calls.len() <= idx {
tool_calls.push(ToolCall {
id: String::new(),
name: String::new(),
arguments: String::new(),
});
}
if let Some(id) = tc["id"].as_str() {
tool_calls[idx].id = id.to_string();
}
if let Some(name) = tc["function"]["name"].as_str() {
tool_calls[idx].name = name.to_string();
}
if let Some(args) = tc["function"]["arguments"].as_str() {
tool_calls[idx].arguments.push_str(args);
}
}
}
// display update (only when there's content to show)
if accumulated.is_empty() {
continue;
}
{
let interval = if use_draft {
DRAFT_INTERVAL_MS
} else {
EDIT_INTERVAL_MS
};
if last_edit.elapsed().as_millis() < interval as u128 {
continue;
}
let display = if use_draft {
truncate_at_char_boundary(&accumulated, TG_MSG_LIMIT).to_string()
} else {
truncate_for_display(&accumulated)
};
if use_draft {
match send_message_draft(
&client, &token, raw_chat_id, draft_id, &display,
)
.await
{
Ok(_) => {
last_edit = Instant::now();
}
Err(e) => {
warn!("sendMessageDraft failed, falling back: {e:#}");
use_draft = false;
if let Ok(sent) =
bot.send_message(chat_id, &display).await
{
msg_id = Some(sent.id);
last_edit = Instant::now();
}
}
}
} else if let Some(id) = msg_id {
if bot
.edit_message_text(chat_id, id, &display)
.await
.is_ok()
{
last_edit = Instant::now();
}
} else if let Ok(sent) =
bot.send_message(chat_id, &display).await
{
msg_id = Some(sent.id);
last_edit = Instant::now();
}
} // end display block
}
}
}
// decide what to do based on response type
if has_tool_calls && !tool_calls.is_empty() {
// append assistant message with tool calls
let tc_json: Vec<serde_json::Value> = tool_calls
.iter()
.map(|tc| {
serde_json::json!({
"id": tc.id,
"type": "function",
"function": {
"name": tc.name,
"arguments": tc.arguments,
}
})
})
.collect();
let assistant_msg = serde_json::json!({
"role": "assistant",
"content": if accumulated.is_empty() { "" } else { &accumulated },
"tool_calls": tc_json,
});
messages.push(assistant_msg);
// execute each tool
for tc in &tool_calls {
info!(tool = %tc.name, "executing tool call");
let _ = bot
.send_message(chat_id, format!("[{}({})]", tc.name, truncate_at_char_boundary(&tc.arguments, 100)))
.await;
let result =
execute_tool(&tc.name, &tc.arguments, state, bot, chat_id, sid, config)
.await;
messages.push(serde_json::json!({
"role": "tool",
"tool_call_id": tc.id,
"content": result,
}));
}
// clear display state for next round
tool_calls.clear();
// loop back to call API again
continue;
}
// content response — send final result
if !accumulated.is_empty() {
send_final_result(bot, chat_id, msg_id, use_draft, &accumulated).await;
}
return Ok(accumulated);
}
}
// ── claude bridge (streaming) ───────────────────────────────────────
/// Stream JSON event types we care about.
#[derive(Deserialize)]
struct StreamEvent {
#[serde(rename = "type")]
event_type: String,
message: Option<AssistantMessage>,
result: Option<String>,
#[serde(default)]
is_error: bool,
}
#[derive(Deserialize)]
struct AssistantMessage {
content: Vec<ContentBlock>,
}
#[derive(Deserialize)]
struct ContentBlock {
#[serde(rename = "type")]
block_type: String,
text: Option<String>,
name: Option<String>,
input: Option<serde_json::Value>,
}
/// Extract all text from an assistant message's content blocks.
fn extract_text(msg: &AssistantMessage) -> String {
msg.content
.iter()
.filter(|b| b.block_type == "text")
.filter_map(|b| b.text.as_deref())
.collect::<Vec<_>>()
.join("")
}
/// Extract tool use status line, e.g. "Bash: echo hello"
fn extract_tool_use(msg: &AssistantMessage) -> Option<String> {
for block in &msg.content {
if block.block_type == "tool_use" {
let name = block.name.as_deref().unwrap_or("tool");
let detail = block
.input
.as_ref()
.and_then(|v| {
// try common fields: command, pattern, file_path, query
v.get("command")
.or(v.get("pattern"))
.or(v.get("file_path"))
.or(v.get("query"))
.or(v.get("prompt"))
.and_then(|s| s.as_str())
})
.unwrap_or("");
let detail_short = truncate_at_char_boundary(detail, 80);
return Some(format!("{name}: {detail_short}"));
}
}
None
}
const EDIT_INTERVAL_MS: u64 = 2000;
const DRAFT_INTERVAL_MS: u64 = 1000;
const TG_MSG_LIMIT: usize = 4096;
async fn invoke_claude_streaming(
sid: &str,
prompt: &str,
known: bool,
bot: &Bot,
chat_id: ChatId,
) -> Result<String> {
if known {
return run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await;
}
match run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await {
Ok(out) => {
info!(%sid, "resumed existing session");
Ok(out)
}
Err(e) => {
warn!(%sid, "resume failed ({e:#}), creating new session");
run_claude_streaming(&["--session-id", sid], prompt, bot, chat_id).await
}
}
}
async fn send_message_draft(
client: &reqwest::Client,
token: &str,
chat_id: i64,
draft_id: i64,
text: &str,
) -> Result<()> {
let url = format!("https://api.telegram.org/bot{token}/sendMessageDraft");
let resp = client
.post(&url)
.json(&serde_json::json!({
"chat_id": chat_id,
"draft_id": draft_id,
"text": text,
}))
.send()
.await?;
let body: serde_json::Value = resp.json().await?;
if body["ok"].as_bool() != Some(true) {
anyhow::bail!("sendMessageDraft: {}", body);
}
Ok(())
}
async fn run_claude_streaming(
extra_args: &[&str],
prompt: &str,
bot: &Bot,
chat_id: ChatId,
) -> Result<String> {
let mut args: Vec<&str> = vec![
"--dangerously-skip-permissions",
"-p",
"--output-format",
"stream-json",
"--verbose",
];
args.extend(extra_args);
args.push(prompt);
let mut child = Command::new("claude")
.args(&args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdout = child.stdout.take().unwrap();
let mut lines = tokio::io::BufReader::new(stdout).lines();
// sendMessageDraft for native streaming, with editMessageText fallback
let http = reqwest::Client::new();
let token = bot.token().to_owned();
let raw_chat_id = chat_id.0;
let draft_id: i64 = 1;
let mut use_draft = true;
let mut msg_id: Option<teloxide::types::MessageId> = None;
let mut last_sent_text = String::new();
let mut last_edit = Instant::now();
let mut final_result = String::new();
let mut is_error = false;
let mut tool_status = String::new();
while let Ok(Some(line)) = lines.next_line().await {
let event: StreamEvent = match serde_json::from_str(&line) {
Ok(e) => e,
Err(_) => continue,
};
match event.event_type.as_str() {
"assistant" => {
if let Some(amsg) = &event.message {
// determine display content
let (display_raw, new_text) =
if let Some(status) = extract_tool_use(amsg) {
tool_status = format!("[{status}]");
let d = if last_sent_text.is_empty() {
tool_status.clone()
} else {
format!("{last_sent_text}\n\n{tool_status}")
};
(d, None)
} else {
let text = extract_text(amsg);
if text.is_empty() || text == last_sent_text {
continue;
}
let interval = if use_draft {
DRAFT_INTERVAL_MS
} else {
EDIT_INTERVAL_MS
};
if last_edit.elapsed().as_millis() < interval as u128 {
continue;
}
tool_status.clear();
(text.clone(), Some(text))
};
let display = if use_draft {
// draft mode: no cursor — cursor breaks monotonic text growth
truncate_at_char_boundary(&display_raw, TG_MSG_LIMIT).to_string()
} else {
truncate_for_display(&display_raw)
};
if use_draft {
match send_message_draft(
&http, &token, raw_chat_id, draft_id, &display,
)
.await
{
Ok(_) => {
if let Some(t) = new_text {
last_sent_text = t;
}
last_edit = Instant::now();
}
Err(e) => {
warn!("sendMessageDraft failed, falling back: {e:#}");
use_draft = false;
if let Ok(sent) =
bot.send_message(chat_id, &display).await
{
msg_id = Some(sent.id);
if let Some(t) = new_text {
last_sent_text = t;
}
last_edit = Instant::now();
}
}
}
} else if let Some(id) = msg_id {
if bot
.edit_message_text(chat_id, id, &display)
.await
.is_ok()
{
if let Some(t) = new_text {
last_sent_text = t;
}
last_edit = Instant::now();
}
} else if let Ok(sent) =
bot.send_message(chat_id, &display).await
{
msg_id = Some(sent.id);
if let Some(t) = new_text {
last_sent_text = t;
}
last_edit = Instant::now();
}
}
}
"result" => {
final_result = event.result.unwrap_or_default();
is_error = event.is_error;
}
_ => {}
}
}
// read stderr before waiting (in case child already exited)
let stderr_handle = child.stderr.take();
let status = child.wait().await;
// collect stderr for diagnostics
let stderr_text = if let Some(mut se) = stderr_handle {
let mut buf = String::new();
let _ = tokio::io::AsyncReadExt::read_to_string(&mut se, &mut buf).await;
buf
} else {
String::new()
};
// determine error: explicit is_error from stream, or non-zero exit with no result
let has_error = is_error
|| (final_result.is_empty()
&& status.as_ref().map(|s| !s.success()).unwrap_or(true));
if has_error {
let err_detail = if !final_result.is_empty() {
final_result.clone()
} else if !stderr_text.is_empty() {
stderr_text.trim().to_string()
} else {
format!("claude exited: {:?}", status)
};
if !use_draft {
if let Some(id) = msg_id {
let _ = bot
.edit_message_text(chat_id, id, format!("[error] {err_detail}"))
.await;
}
}
anyhow::bail!("{err_detail}");
}
if final_result.is_empty() {
return Ok(final_result);
}
send_final_result(bot, chat_id, msg_id, use_draft, &final_result).await;
Ok(final_result)
}
// ── openai-compatible backend (streaming) ──────────────────────────
async fn run_openai_streaming(
endpoint: &str,
model: &str,
api_key: &str,
messages: &[serde_json::Value],
bot: &Bot,
chat_id: ChatId,
) -> Result<String> {
let client = reqwest::Client::new();
let url = format!("{}/chat/completions", endpoint.trim_end_matches('/'));
let body = serde_json::json!({
"model": model,
"messages": messages,
"stream": true,
});
let mut resp = client
.post(&url)
.header("Authorization", format!("Bearer {api_key}"))
.json(&body)
.send()
.await?
.error_for_status()?;
let token = bot.token().to_owned();
let raw_chat_id = chat_id.0;
let draft_id: i64 = 1;
let mut use_draft = true;
let mut msg_id: Option<teloxide::types::MessageId> = None;
let mut accumulated = String::new();
let mut last_edit = Instant::now();
let mut buffer = String::new();
let mut done = false;
while let Some(chunk) = resp.chunk().await? {
if done {
break;
}
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(pos) = buffer.find('\n') {
let line = buffer[..pos].to_string();
buffer = buffer[pos + 1..].to_string();
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with(':') {
continue;
}
let data = match trimmed.strip_prefix("data: ") {
Some(d) => d,
None => continue,
};
if data.trim() == "[DONE]" {
done = true;
break;
}
if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
if let Some(content) = json["choices"][0]["delta"]["content"].as_str() {
if content.is_empty() {
continue;
}
accumulated.push_str(content);
let interval = if use_draft {
DRAFT_INTERVAL_MS
} else {
EDIT_INTERVAL_MS
};
if last_edit.elapsed().as_millis() < interval as u128 {
continue;
}
let display = if use_draft {
truncate_at_char_boundary(&accumulated, TG_MSG_LIMIT).to_string()
} else {
truncate_for_display(&accumulated)
};
if use_draft {
match send_message_draft(
&client, &token, raw_chat_id, draft_id, &display,
)
.await
{
Ok(_) => {
last_edit = Instant::now();
}
Err(e) => {
warn!("sendMessageDraft failed, falling back: {e:#}");
use_draft = false;
if let Ok(sent) = bot.send_message(chat_id, &display).await {
msg_id = Some(sent.id);
last_edit = Instant::now();
}
}
}
} else if let Some(id) = msg_id {
if bot.edit_message_text(chat_id, id, &display).await.is_ok() {
last_edit = Instant::now();
}
} else if let Ok(sent) = bot.send_message(chat_id, &display).await {
msg_id = Some(sent.id);
last_edit = Instant::now();
}
}
}
}
}
if accumulated.is_empty() {
return Ok(accumulated);
}
send_final_result(bot, chat_id, msg_id, use_draft, &accumulated).await;
Ok(accumulated)
}
const CURSOR: &str = " \u{25CE}";
fn truncate_for_display(s: &str) -> String {
let budget = TG_MSG_LIMIT - CURSOR.len() - 1;
if s.len() <= budget {
format!("{s}{CURSOR}")
} else {
let truncated = truncate_at_char_boundary(s, budget - 2);
format!("{truncated}\n{CURSOR}")
}
}
fn truncate_at_char_boundary(s: &str, max: usize) -> &str {
if s.len() <= max {
return s;
}
let mut end = max;
while !s.is_char_boundary(end) {
end -= 1;
}
&s[..end]
}
fn escape_html(s: &str) -> String {
s.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
.replace('"', "&quot;")
}
fn markdown_to_telegram_html(md: &str) -> String {
use pulldown_cmark::{CodeBlockKind, Event, Options, Parser, Tag, TagEnd};
let mut opts = Options::empty();
opts.insert(Options::ENABLE_STRIKETHROUGH);
let parser = Parser::new_ext(md, opts);
let mut html = String::new();
for event in parser {
match event {
Event::Start(tag) => match tag {
Tag::Paragraph => {}
Tag::Heading { .. } => html.push_str("<b>"),
Tag::BlockQuote(_) => html.push_str("<blockquote>"),
Tag::CodeBlock(kind) => match kind {
CodeBlockKind::Fenced(ref lang) if !lang.is_empty() => {
html.push_str(&format!(
"<pre><code class=\"language-{}\">",
escape_html(lang.as_ref())
));
}
_ => html.push_str("<pre><code>"),
},
Tag::Item => html.push_str(""),
Tag::Emphasis => html.push_str("<i>"),
Tag::Strong => html.push_str("<b>"),
Tag::Strikethrough => html.push_str("<s>"),
Tag::Link { dest_url, .. } => {
html.push_str(&format!(
"<a href=\"{}\">",
escape_html(dest_url.as_ref())
));
}
_ => {}
},
Event::End(tag) => match tag {
TagEnd::Paragraph => html.push_str("\n\n"),
TagEnd::Heading(_) => html.push_str("</b>\n\n"),
TagEnd::BlockQuote(_) => html.push_str("</blockquote>"),
TagEnd::CodeBlock => html.push_str("</code></pre>\n\n"),
TagEnd::List(_) => html.push('\n'),
TagEnd::Item => html.push('\n'),
TagEnd::Emphasis => html.push_str("</i>"),
TagEnd::Strong => html.push_str("</b>"),
TagEnd::Strikethrough => html.push_str("</s>"),
TagEnd::Link => html.push_str("</a>"),
_ => {}
},
Event::Text(text) => html.push_str(&escape_html(text.as_ref())),
Event::Code(text) => {
html.push_str("<code>");
html.push_str(&escape_html(text.as_ref()));
html.push_str("</code>");
}
Event::SoftBreak | Event::HardBreak => html.push('\n'),
Event::Rule => html.push_str("\n---\n\n"),
_ => {}
}
}
html.trim_end().to_string()
}
/// Send final result with HTML formatting, fallback to plain text on failure.
async fn send_final_result(
bot: &Bot,
chat_id: ChatId,
msg_id: Option<teloxide::types::MessageId>,
use_draft: bool,
result: &str,
) {
use teloxide::types::ParseMode;
let html = markdown_to_telegram_html(result);
// try HTML as single message
let html_ok = if let (false, Some(id)) = (use_draft, msg_id) {
bot.edit_message_text(chat_id, id, &html)
.parse_mode(ParseMode::Html)
.await
.is_ok()
} else {
bot.send_message(chat_id, &html)
.parse_mode(ParseMode::Html)
.await
.is_ok()
};
if html_ok {
return;
}
// fallback: plain text with chunking
let chunks = split_msg(result, TG_MSG_LIMIT);
if let (false, Some(id)) = (use_draft, msg_id) {
let _ = bot.edit_message_text(chat_id, id, chunks[0]).await;
for chunk in &chunks[1..] {
let _ = bot.send_message(chat_id, *chunk).await;
}
} else {
for chunk in &chunks {
let _ = bot.send_message(chat_id, *chunk).await;
}
}
}
fn split_msg(s: &str, max: usize) -> Vec<&str> {
if s.len() <= max {
return vec![s];
}
let mut parts = Vec::new();
let mut rest = s;
while !rest.is_empty() {
if rest.len() <= max {
parts.push(rest);
break;
}
let mut end = max;
while !rest.is_char_boundary(end) {
end -= 1;
}
let (chunk, tail) = rest.split_at(end);
parts.push(chunk);
rest = tail;
}
parts
}