- Add src/output.rs with Output trait and 3 implementations: TelegramOutput (streaming via draft/edit), GiteaOutput (comments), BufferOutput (for worker/tests) - Refactor run_openai_with_tools and execute_tool to use &mut dyn Output - Remove run_claude_streaming, invoke_claude_streaming, run_openai_streaming (dead code — only OpenAI-compatible backend is used now) - Remove BackendConfig::Claude code path from handler - stream.rs: 790 → 150 lines
531 lines
18 KiB
Rust
531 lines
18 KiB
Rust
mod config;
|
|
mod display;
|
|
mod gitea;
|
|
mod life;
|
|
mod output;
|
|
mod state;
|
|
mod stream;
|
|
mod tools;
|
|
|
|
use std::collections::HashSet;
|
|
use std::path::{Path, PathBuf};
|
|
use std::sync::Arc;
|
|
|
|
use anyhow::Result;
|
|
use chrono::{Local, NaiveDate, NaiveTime};
|
|
use teloxide::dispatching::UpdateFilterExt;
|
|
use teloxide::net::Download;
|
|
use teloxide::prelude::*;
|
|
use teloxide::types::InputFile;
|
|
use tracing::{error, info, warn};
|
|
use uuid::Uuid;
|
|
|
|
use config::{BackendConfig, Config};
|
|
use display::build_user_content;
|
|
use output::TelegramOutput;
|
|
use state::{AppState, MAX_WINDOW, SLIDE_SIZE};
|
|
use stream::{build_system_prompt, run_openai_with_tools, summarize_messages};
|
|
|
|
// ── helpers ─────────────────────────────────────────────────────────
|
|
|
|
fn session_date(refresh_hour: u32) -> NaiveDate {
|
|
let now = Local::now();
|
|
let refresh = NaiveTime::from_hms_opt(refresh_hour, 0, 0).unwrap();
|
|
if now.time() < refresh {
|
|
now.date_naive() - chrono::Duration::days(1)
|
|
} else {
|
|
now.date_naive()
|
|
}
|
|
}
|
|
|
|
fn session_uuid(prefix: &str, chat_id: i64, refresh_hour: u32) -> String {
|
|
let date = session_date(refresh_hour);
|
|
let name = format!("{}-{}-{}", prefix, chat_id, date.format("%Y%m%d"));
|
|
Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()).to_string()
|
|
}
|
|
|
|
fn home_dir() -> PathBuf {
|
|
PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/tmp".into()))
|
|
}
|
|
|
|
fn incoming_dir() -> PathBuf {
|
|
home_dir().join("incoming")
|
|
}
|
|
|
|
fn outgoing_dir(sid: &str) -> PathBuf {
|
|
home_dir().join("outgoing").join(sid)
|
|
}
|
|
|
|
// ── main ────────────────────────────────────────────────────────────
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
tracing_subscriber::fmt()
|
|
.with_env_filter(
|
|
tracing_subscriber::EnvFilter::from_default_env()
|
|
.add_directive("noc=info".parse().unwrap()),
|
|
)
|
|
.init();
|
|
|
|
let config_path = std::env::var("NOC_CONFIG").unwrap_or_else(|_| "config.yaml".into());
|
|
let raw = std::fs::read_to_string(&config_path)
|
|
.unwrap_or_else(|e| panic!("read {config_path}: {e}"));
|
|
let mut config: Config =
|
|
serde_yaml::from_str(&raw).unwrap_or_else(|e| panic!("parse config: {e}"));
|
|
if let Some(ref mut gitea) = config.gitea {
|
|
gitea.resolve_token();
|
|
}
|
|
|
|
let state_path = std::env::var("NOC_STATE")
|
|
.map(PathBuf::from)
|
|
.unwrap_or_else(|_| PathBuf::from("state.json"));
|
|
let state = Arc::new(AppState::load(state_path));
|
|
|
|
let _ = std::fs::create_dir_all(incoming_dir());
|
|
|
|
let bot = Bot::new(&config.tg.key);
|
|
let me = bot.get_me().await.unwrap();
|
|
let bot_username = Arc::new(me.username.clone().unwrap_or_default());
|
|
info!(username = %bot_username, "noc bot starting");
|
|
|
|
let handler = Update::filter_message().endpoint(handle);
|
|
|
|
let config = Arc::new(config);
|
|
|
|
// start life loop
|
|
tokio::spawn(life::life_loop(bot.clone(), state.clone(), config.clone()));
|
|
|
|
// start gitea webhook server
|
|
if let Some(gitea_config) = &config.gitea {
|
|
let gc = gitea_config.clone();
|
|
// Use the gitea admin username as the bot user for @mention detection
|
|
let bot_user = std::env::var("GITEA_ADMIN_USER").unwrap_or_else(|_| "noc".into());
|
|
tokio::spawn(async move {
|
|
gitea::start_webhook_server(&gc, bot_user).await;
|
|
});
|
|
}
|
|
|
|
Dispatcher::builder(bot, handler)
|
|
.dependencies(dptree::deps![state, config, bot_username])
|
|
.default_handler(|_| async {})
|
|
.build()
|
|
.dispatch()
|
|
.await;
|
|
}
|
|
|
|
// ── file download ───────────────────────────────────────────────────
|
|
|
|
async fn download_tg_file(bot: &Bot, file_id: &str, filename: &str) -> Result<PathBuf> {
|
|
let dir = incoming_dir();
|
|
tokio::fs::create_dir_all(&dir).await?;
|
|
|
|
let dest = dir.join(filename);
|
|
let tf = bot.get_file(file_id).await?;
|
|
let mut file = tokio::fs::File::create(&dest).await?;
|
|
bot.download_file(&tf.path, &mut file).await?;
|
|
|
|
info!("downloaded {} -> {}", filename, dest.display());
|
|
Ok(dest)
|
|
}
|
|
|
|
// ── outgoing scan ───────────────────────────────────────────────────
|
|
|
|
async fn snapshot_dir(dir: &Path) -> HashSet<PathBuf> {
|
|
let mut set = HashSet::new();
|
|
if let Ok(mut entries) = tokio::fs::read_dir(dir).await {
|
|
while let Ok(Some(entry)) = entries.next_entry().await {
|
|
let path = entry.path();
|
|
if path.is_file() {
|
|
set.insert(path);
|
|
}
|
|
}
|
|
}
|
|
set
|
|
}
|
|
|
|
async fn new_files_in(dir: &Path, before: &HashSet<PathBuf>) -> Vec<PathBuf> {
|
|
let mut files = Vec::new();
|
|
if let Ok(mut entries) = tokio::fs::read_dir(dir).await {
|
|
while let Ok(Some(entry)) = entries.next_entry().await {
|
|
let path = entry.path();
|
|
if path.is_file() && !before.contains(&path) {
|
|
files.push(path);
|
|
}
|
|
}
|
|
}
|
|
files.sort();
|
|
files
|
|
}
|
|
|
|
// ── handler ─────────────────────────────────────────────────────────
|
|
|
|
async fn handle(
|
|
bot: Bot,
|
|
msg: Message,
|
|
state: Arc<AppState>,
|
|
config: Arc<Config>,
|
|
_bot_username: Arc<String>,
|
|
) -> ResponseResult<()> {
|
|
let chat_id = msg.chat.id;
|
|
let is_private = msg.chat.is_private();
|
|
let text = msg.text().or(msg.caption()).unwrap_or("").to_string();
|
|
let raw_id = chat_id.0;
|
|
let date = session_date(config.session.refresh_hour);
|
|
|
|
let is_authed = {
|
|
let p = state.persist.read().await;
|
|
p.authed.get(&raw_id) == Some(&date)
|
|
};
|
|
|
|
if !is_authed {
|
|
if text.trim() == config.auth.passphrase {
|
|
{
|
|
let mut p = state.persist.write().await;
|
|
p.authed.insert(raw_id, date);
|
|
}
|
|
state.save().await;
|
|
bot.send_message(chat_id, "authenticated").await?;
|
|
info!(chat = raw_id, "authed");
|
|
} else {
|
|
bot.send_message(chat_id, "not authenticated").await?;
|
|
}
|
|
return Ok(());
|
|
}
|
|
|
|
if let Err(e) = handle_inner(&bot, &msg, chat_id, &text, is_private, &state, &config).await {
|
|
error!(chat = raw_id, "handle: {e:#}");
|
|
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_inner(
|
|
bot: &Bot,
|
|
msg: &Message,
|
|
chat_id: ChatId,
|
|
text: &str,
|
|
is_private: bool,
|
|
state: &Arc<AppState>,
|
|
config: &Arc<Config>,
|
|
) -> Result<()> {
|
|
let mut uploaded: Vec<PathBuf> = Vec::new();
|
|
let mut download_errors: Vec<String> = Vec::new();
|
|
let mut transcriptions: Vec<String> = Vec::new();
|
|
|
|
if let Some(doc) = msg.document() {
|
|
let name = doc.file_name.as_deref().unwrap_or("file");
|
|
match download_tg_file(bot, &doc.file.id, name).await {
|
|
Ok(p) => uploaded.push(p),
|
|
Err(e) => download_errors.push(format!("{name}: {e:#}")),
|
|
}
|
|
}
|
|
|
|
if let Some(photos) = msg.photo() {
|
|
if let Some(photo) = photos.last() {
|
|
let name = format!("photo_{}.jpg", Local::now().format("%H%M%S"));
|
|
match download_tg_file(bot, &photo.file.id, &name).await {
|
|
Ok(p) => uploaded.push(p),
|
|
Err(e) => download_errors.push(format!("photo: {e:#}")),
|
|
}
|
|
}
|
|
}
|
|
|
|
if let Some(audio) = msg.audio() {
|
|
let fallback = format!("audio_{}.ogg", Local::now().format("%H%M%S"));
|
|
let name = audio.file_name.as_deref().unwrap_or(&fallback);
|
|
match download_tg_file(bot, &audio.file.id, name).await {
|
|
Ok(p) => {
|
|
if let Some(url) = &config.whisper_url {
|
|
match transcribe_audio(url, &p).await {
|
|
Ok(t) if !t.is_empty() => transcriptions.push(t),
|
|
Ok(_) => uploaded.push(p),
|
|
Err(e) => {
|
|
warn!("transcribe failed: {e:#}");
|
|
uploaded.push(p);
|
|
}
|
|
}
|
|
} else {
|
|
uploaded.push(p);
|
|
}
|
|
}
|
|
Err(e) => download_errors.push(format!("audio: {e:#}")),
|
|
}
|
|
}
|
|
|
|
if let Some(voice) = msg.voice() {
|
|
let name = format!("voice_{}.ogg", Local::now().format("%H%M%S"));
|
|
match download_tg_file(bot, &voice.file.id, &name).await {
|
|
Ok(p) => {
|
|
if let Some(url) = &config.whisper_url {
|
|
match transcribe_audio(url, &p).await {
|
|
Ok(t) if !t.is_empty() => transcriptions.push(t),
|
|
Ok(_) => uploaded.push(p),
|
|
Err(e) => {
|
|
warn!("transcribe failed: {e:#}");
|
|
uploaded.push(p);
|
|
}
|
|
}
|
|
} else {
|
|
uploaded.push(p);
|
|
}
|
|
}
|
|
Err(e) => download_errors.push(format!("voice: {e:#}")),
|
|
}
|
|
}
|
|
|
|
if let Some(video) = msg.video() {
|
|
let fallback = format!("video_{}.mp4", Local::now().format("%H%M%S"));
|
|
let name = video.file_name.as_deref().unwrap_or(&fallback);
|
|
match download_tg_file(bot, &video.file.id, name).await {
|
|
Ok(p) => uploaded.push(p),
|
|
Err(e) => download_errors.push(format!("video: {e:#}")),
|
|
}
|
|
}
|
|
|
|
if let Some(vn) = msg.video_note() {
|
|
let name = format!("videonote_{}.mp4", Local::now().format("%H%M%S"));
|
|
match download_tg_file(bot, &vn.file.id, &name).await {
|
|
Ok(p) => uploaded.push(p),
|
|
Err(e) => download_errors.push(format!("video_note: {e:#}")),
|
|
}
|
|
}
|
|
|
|
if text.is_empty() && uploaded.is_empty() && transcriptions.is_empty() {
|
|
if !download_errors.is_empty() {
|
|
let err_msg = format!("[文件下载失败]\n{}", download_errors.join("\n"));
|
|
bot.send_message(chat_id, err_msg).await?;
|
|
}
|
|
return Ok(());
|
|
}
|
|
|
|
let sid = session_uuid(&config.name, chat_id.0, config.session.refresh_hour);
|
|
info!(%sid, "recv");
|
|
|
|
let out_dir = outgoing_dir(&sid);
|
|
tokio::fs::create_dir_all(&out_dir).await?;
|
|
let before = snapshot_dir(&out_dir).await;
|
|
|
|
// handle diag command (OpenAI backend only)
|
|
if text.trim() == "diag" {
|
|
if let BackendConfig::OpenAI { .. } = &config.backend {
|
|
let conv = state.load_conv(&sid).await;
|
|
let count = state.message_count(&sid).await;
|
|
let persona = state.get_config("persona").await.unwrap_or_default();
|
|
let scratch = state.get_scratch().await;
|
|
let tools = tools::discover_tools();
|
|
let empty = vec![];
|
|
let tools_arr = tools.as_array().unwrap_or(&empty);
|
|
|
|
let mut diag = format!(
|
|
"# NOC Diag\n\n\
|
|
## Session\n\
|
|
- id: `{sid}`\n\
|
|
- window: {count}/{MAX_WINDOW} (slide at {MAX_WINDOW}, drop {SLIDE_SIZE})\n\
|
|
- total processed: {}\n\n\
|
|
## Persona ({} chars)\n```\n{}\n```\n\n\
|
|
## Scratch ({} chars)\n```\n{}\n```\n\n\
|
|
## Summary ({} chars)\n```\n{}\n```\n\n\
|
|
## Tools ({} registered)\n",
|
|
conv.total_messages + count,
|
|
persona.len(),
|
|
if persona.is_empty() { "(default)" } else { &persona },
|
|
scratch.len(),
|
|
if scratch.is_empty() { "(empty)" } else { &scratch },
|
|
conv.summary.len(),
|
|
if conv.summary.is_empty() {
|
|
"(empty)".to_string()
|
|
} else {
|
|
conv.summary
|
|
},
|
|
tools_arr.len(),
|
|
);
|
|
for tool in tools_arr {
|
|
let func = &tool["function"];
|
|
let name = func["name"].as_str().unwrap_or("?");
|
|
let desc = func["description"].as_str().unwrap_or("");
|
|
let params = serde_json::to_string_pretty(&func["parameters"])
|
|
.unwrap_or_default();
|
|
diag.push_str(&format!(
|
|
"### `{name}`\n{desc}\n\n```json\n{params}\n```\n\n"
|
|
));
|
|
}
|
|
|
|
let memory_slots = state.get_memory_slots().await;
|
|
diag.push_str(&format!("## Memory Slots ({}/100 used)\n", memory_slots.len()));
|
|
if memory_slots.is_empty() {
|
|
diag.push_str("(empty)\n\n");
|
|
} else {
|
|
for (nr, content, updated_at) in &memory_slots {
|
|
diag.push_str(&format!("- `[{nr}]` {content} ({updated_at})\n"));
|
|
}
|
|
diag.push('\n');
|
|
}
|
|
|
|
let tmp = std::env::temp_dir().join(format!("noc-diag-{sid}.md"));
|
|
tokio::fs::write(&tmp, &diag).await?;
|
|
bot.send_document(chat_id, InputFile::file(&tmp))
|
|
.await?;
|
|
let _ = tokio::fs::remove_file(&tmp).await;
|
|
return Ok(());
|
|
}
|
|
}
|
|
|
|
let prompt = build_prompt(text, &uploaded, &download_errors, &transcriptions);
|
|
|
|
let BackendConfig::OpenAI {
|
|
endpoint,
|
|
model,
|
|
api_key,
|
|
} = &config.backend
|
|
else {
|
|
let _ = bot.send_message(chat_id, "Only OpenAI backend is supported").await;
|
|
return Ok(());
|
|
};
|
|
|
|
let conv = state.load_conv(&sid).await;
|
|
let persona = state.get_config("persona").await.unwrap_or_default();
|
|
let memory_slots = state.get_memory_slots().await;
|
|
let inner = state.get_inner_state().await;
|
|
let system_msg = build_system_prompt(&conv.summary, &persona, &memory_slots, &inner);
|
|
|
|
let mut api_messages = vec![system_msg];
|
|
api_messages.extend(conv.messages);
|
|
|
|
let scratch = state.get_scratch().await;
|
|
let user_content = build_user_content(&prompt, &scratch, &uploaded);
|
|
api_messages.push(serde_json::json!({"role": "user", "content": user_content}));
|
|
|
|
let mut tg_output = TelegramOutput::new(bot.clone(), chat_id, is_private);
|
|
|
|
match run_openai_with_tools(
|
|
endpoint, model, api_key, api_messages, &mut tg_output, state, &sid, config, chat_id.0,
|
|
)
|
|
.await
|
|
{
|
|
Ok(response) => {
|
|
state.push_message(&sid, "user", &prompt).await;
|
|
if !response.is_empty() {
|
|
state.push_message(&sid, "assistant", &response).await;
|
|
}
|
|
|
|
// sliding window
|
|
let count = state.message_count(&sid).await;
|
|
if count >= MAX_WINDOW {
|
|
info!(%sid, "sliding window: {count} messages, summarizing oldest {SLIDE_SIZE}");
|
|
let _ = bot
|
|
.send_message(chat_id, "[整理记忆中...]")
|
|
.await;
|
|
|
|
let to_summarize =
|
|
state.get_oldest_messages(&sid, SLIDE_SIZE).await;
|
|
let current_summary = {
|
|
let db = state.db.lock().await;
|
|
db.query_row(
|
|
"SELECT summary FROM conversations WHERE session_id = ?1",
|
|
[&sid],
|
|
|row| row.get::<_, String>(0),
|
|
)
|
|
.unwrap_or_default()
|
|
};
|
|
|
|
match summarize_messages(
|
|
endpoint,
|
|
model,
|
|
api_key,
|
|
¤t_summary,
|
|
&to_summarize,
|
|
)
|
|
.await
|
|
{
|
|
Ok(new_summary) => {
|
|
state.slide_window(&sid, &new_summary, SLIDE_SIZE).await;
|
|
let remaining = state.message_count(&sid).await;
|
|
info!(%sid, "window slid, {remaining} messages remain, summary {} chars", new_summary.len());
|
|
}
|
|
Err(e) => {
|
|
warn!(%sid, "summarize failed: {e:#}, keeping all messages");
|
|
}
|
|
}
|
|
}
|
|
|
|
// auto-reflect every 10 messages
|
|
let count = state.message_count(&sid).await;
|
|
if count % 10 == 0 && count > 0 {
|
|
let state_c = state.clone();
|
|
let config_c = config.clone();
|
|
tokio::spawn(async move {
|
|
crate::life::reflect(&state_c, &config_c).await;
|
|
});
|
|
}
|
|
}
|
|
Err(e) => {
|
|
error!(%sid, "openai: {e:#}");
|
|
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
|
|
}
|
|
}
|
|
|
|
// send new files from outgoing dir
|
|
let new_files = new_files_in(&out_dir, &before).await;
|
|
for path in &new_files {
|
|
info!(%sid, "sending file: {}", path.display());
|
|
if let Err(e) = bot.send_document(chat_id, InputFile::file(path)).await {
|
|
error!(%sid, "send_document {}: {e:#}", path.display());
|
|
let _ = bot
|
|
.send_message(chat_id, format!("[发送文件失败: {}]", path.display()))
|
|
.await;
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn build_prompt(
|
|
text: &str,
|
|
uploaded: &[PathBuf],
|
|
errors: &[String],
|
|
transcriptions: &[String],
|
|
) -> String {
|
|
let mut parts = Vec::new();
|
|
|
|
for t in transcriptions {
|
|
parts.push(format!("[语音消息] {t}"));
|
|
}
|
|
|
|
for f in uploaded {
|
|
parts.push(format!("[用户上传了文件: {}]", f.display()));
|
|
}
|
|
|
|
for e in errors {
|
|
parts.push(format!("[文件下载失败: {e}]"));
|
|
}
|
|
|
|
if !text.is_empty() {
|
|
parts.push(text.to_string());
|
|
}
|
|
|
|
parts.join("\n")
|
|
}
|
|
|
|
async fn transcribe_audio(whisper_url: &str, file_path: &Path) -> Result<String> {
|
|
let client = reqwest::Client::builder()
|
|
.timeout(std::time::Duration::from_secs(60))
|
|
.build()?;
|
|
let url = format!("{}/v1/audio/transcriptions", whisper_url.trim_end_matches('/'));
|
|
let file_bytes = tokio::fs::read(file_path).await?;
|
|
let file_name = file_path
|
|
.file_name()
|
|
.and_then(|n| n.to_str())
|
|
.unwrap_or("audio.ogg")
|
|
.to_string();
|
|
let part = reqwest::multipart::Part::bytes(file_bytes)
|
|
.file_name(file_name)
|
|
.mime_str("audio/ogg")?;
|
|
let form = reqwest::multipart::Form::new()
|
|
.part("file", part)
|
|
.text("model", "base");
|
|
let resp = client.post(&url).multipart(form).send().await?.error_for_status()?;
|
|
let json: serde_json::Value = resp.json().await?;
|
|
Ok(json["text"].as_str().unwrap_or("").to_string())
|
|
}
|