From c2be8e69303e834c740b40bedaf8c1798960c50e Mon Sep 17 00:00:00 2001 From: Fam Zheng Date: Fri, 10 Apr 2026 22:58:39 +0100 Subject: [PATCH] add http API, channel-driven life loop, predefined diary timer - Extract http.rs: unified HTTP server with /api/timers and gitea webhook - Life loop: select! on interval tick + mpsc channel for force-fire - Predefined diary timer (cron 22:55 daily), auto-registered on startup - BufferOutput for system timers (chat_id=0), no TG message - state: ensure_timer(), get_timer() - context.md: add blog and Hugo docs for AI --- context.md | 27 +++++- src/gitea.rs | 22 +---- src/http.rs | 98 ++++++++++++++++++++++ src/life.rs | 228 +++++++++++++++++++++++++++++++-------------------- src/main.rs | 19 +++-- src/state.rs | 33 ++++++++ 6 files changed, 311 insertions(+), 116 deletions(-) create mode 100644 src/http.rs diff --git a/context.md b/context.md index 50c7d5e..3cd44cf 100644 --- a/context.md +++ b/context.md @@ -7,9 +7,10 @@ - **LLM**: vLLM on ailab (100.84.7.49:8000), gemma-4-31B-it-AWQ - **Claude Code**: ~/.local/bin/claude (子代��执行引擎) - **uv**: ~/.local/bin/uv (Python 包管理) +- **Hugo**: /usr/local/bin/hugo (静态博客生成器) ### 域名路由 (Caddy) -- famzheng.me — 主站(占位) +- famzheng.me → Hugo 博客 (/data/www/blog/public/) - git.famzheng.me → Gitea (localhost:3000) - 新增子域名:编辑 /etc/caddy/Caddyfile,然后 `sudo systemctl reload caddy` @@ -31,6 +32,30 @@ api.famzheng.me { 修改后执行 `sudo systemctl reload caddy` 生效。 Caddy 自动申请和续期 Let's Encrypt 证书,无需手动管理。 +### 博客 +Fam 的博客: +- 站点: https://famzheng.me, 源码: /data/www/blog/ +- Repo: https://git.famzheng.me/fam/blog +- 这是 Fam 的个人博客,不要在上面写东西 + +你的博客 (AI 日记/随想): +- 站点: https://noc.famzheng.me, 源码: /data/www/noc-blog/ +- Repo: https://git.famzheng.me/noc/diary +- 这是你自己的空间,可以自由写日记、随想、技术笔记 +- 写新文章: 在 content/posts/ 下创建 .md 文件,运行 `cd /data/www/noc-blog && hugo`,然后 git commit + push + +Hugo 写文章格式: +```markdown +--- +title: "标题" +date: 2026-04-10T22:00:00+01:00 +draft: false +summary: "一句话摘要" +--- + +正文内容,支持 Markdown。 +``` + ### Gitea - URL: https://git.famzheng.me - Admin: noc (token 在 /data/noc/gitea-token) diff --git a/src/gitea.rs b/src/gitea.rs index d5e3ed8..93d6d0a 100644 --- a/src/gitea.rs +++ b/src/gitea.rs @@ -151,27 +151,13 @@ pub struct WebhookState { pub bot_user: String, } -pub async fn start_webhook_server(config: &GiteaConfig, bot_user: String) { +pub fn webhook_router(config: &GiteaConfig, bot_user: String) -> axum::Router<()> { let gitea = GiteaClient::new(config); - let state = Arc::new(WebhookState { - gitea, - bot_user, - }); + let state = Arc::new(WebhookState { gitea, bot_user }); - let app = axum::Router::new() + axum::Router::new() .route("/webhook/gitea", post(handle_webhook)) - .with_state(state); - - let addr = format!("0.0.0.0:{}", config.webhook_port); - info!("gitea webhook server listening on {addr}"); - - let listener = tokio::net::TcpListener::bind(&addr) - .await - .unwrap_or_else(|e| panic!("bind {addr}: {e}")); - - if let Err(e) = axum::serve(listener, app).await { - error!("webhook server error: {e}"); - } + .with_state(state) } async fn handle_webhook( diff --git a/src/http.rs b/src/http.rs new file mode 100644 index 0000000..2949192 --- /dev/null +++ b/src/http.rs @@ -0,0 +1,98 @@ +use std::sync::Arc; + +use axum::extract::{Path, State as AxumState}; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::routing::{get, post}; +use axum::Json; +use tokio::sync::mpsc; +use tracing::{error, info}; + +use crate::config::Config; +use crate::life::LifeEvent; +use crate::state::AppState; + +#[derive(Clone)] +pub struct HttpState { + pub app_state: Arc, + pub life_tx: mpsc::Sender, +} + +pub async fn start_http_server( + config: &Config, + app_state: Arc, + life_tx: mpsc::Sender, +) { + let port = config + .gitea + .as_ref() + .map(|g| g.webhook_port) + .unwrap_or(9880); + + let state = Arc::new(HttpState { + app_state, + life_tx, + }); + + let mut app = axum::Router::new() + .route("/api/timers", get(list_timers)) + .route("/api/timers/{id}/fire", post(fire_timer)) + .with_state(state); + + // merge gitea webhook router if configured + if let Some(gitea_config) = &config.gitea { + let bot_user = std::env::var("GITEA_ADMIN_USER").unwrap_or_else(|_| "noc".into()); + app = app.merge(crate::gitea::webhook_router(gitea_config, bot_user)); + } + + let addr = format!("0.0.0.0:{port}"); + info!("http server listening on {addr}"); + + let listener = tokio::net::TcpListener::bind(&addr) + .await + .unwrap_or_else(|e| panic!("bind {addr}: {e}")); + + if let Err(e) = axum::serve(listener, app).await { + error!("http server error: {e}"); + } +} + +async fn list_timers(AxumState(state): AxumState>) -> impl IntoResponse { + let timers = state.app_state.list_timers(None).await; + let items: Vec = timers + .iter() + .map(|(id, chat_id, label, schedule, next_fire, enabled)| { + serde_json::json!({ + "id": id, + "chat_id": chat_id, + "label": label, + "schedule": schedule, + "next_fire": next_fire, + "enabled": enabled, + }) + }) + .collect(); + Json(serde_json::json!(items)) +} + +async fn fire_timer( + AxumState(state): AxumState>, + Path(id): Path, +) -> impl IntoResponse { + match state.life_tx.send(LifeEvent::FireTimer(id)).await { + Ok(_) => { + info!(timer_id = id, "timer fire requested via API"); + ( + StatusCode::OK, + Json(serde_json::json!({"status": "fired", "timer_id": id})), + ) + } + Err(e) => { + error!(timer_id = id, "failed to send fire event: {e}"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": "life loop not responding"})), + ) + } + } +} diff --git a/src/life.rs b/src/life.rs index 3c81cea..e5fc774 100644 --- a/src/life.rs +++ b/src/life.rs @@ -1,117 +1,167 @@ use std::sync::Arc; use teloxide::prelude::*; +use tokio::sync::mpsc; use tracing::{error, info, warn}; use crate::config::{BackendConfig, Config}; -use crate::output::TelegramOutput; +use crate::output::{BufferOutput, TelegramOutput}; use crate::state::AppState; use crate::stream::run_openai_with_tools; use crate::tools::compute_next_cron_fire; const LIFE_LOOP_TIMEOUT_SECS: u64 = 120; -pub async fn life_loop(bot: Bot, state: Arc, config: Arc) { +const DIARY_LABEL: &str = "写日记:回顾今天的对话和事件,在 /data/www/noc-blog/content/posts/ 下创建一篇日记(文件名格式 YYYY-MM-DD.md),用 run_shell 写入内容,然后执行 cd /data/www/noc-blog && hugo && git add -A && git commit -m 'diary: DATE' && git push"; +const DIARY_SCHEDULE: &str = "cron:0 55 22 * * *"; + +/// Events that can wake up the life loop. +pub enum LifeEvent { + /// Force-fire a specific timer by ID. + FireTimer(i64), +} + +pub async fn life_loop( + bot: Bot, + state: Arc, + config: Arc, + mut rx: mpsc::Receiver, +) { info!("life loop started"); + + // pre-defined timers — ensure they exist on every startup + if state.ensure_timer(0, DIARY_LABEL, DIARY_SCHEDULE).await { + info!("registered predefined diary timer"); + } + let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); loop { - interval.tick().await; - - let due = state.due_timers().await; - if due.is_empty() { - continue; - } - - for (timer_id, chat_id_raw, label, schedule) in &due { - let chat_id = ChatId(*chat_id_raw); - info!(timer_id, %label, "timer fired"); - - let persona = state.get_config("persona").await.unwrap_or_default(); - let inner = state.get_inner_state().await; - let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(); - - let mut system_text = if persona.is_empty() { - "你是一个AI伙伴。".to_string() - } else { - persona.clone() - }; - - system_text.push_str(&format!( - "\n\n[当前时间] {now}\n\n[你的内心状态]\n{}", - if inner.is_empty() { "(空)" } else { &inner } - )); - - // emotional motivation — why she acts - system_text.push_str( - "\n\n你不是因为 timer 到了才说话。你是因为在乎 Fam,所以想知道他怎么样。\ - 如果你觉得现在不该打扰他(太晚了、他今天很累、刚聊过),就什么都不说,回复空文本。\ - 主动沉默也是一种关心。\ - \n可以用 update_inner_state 更新你的内心状态。\ - 输出格式:纯文本或基础Markdown,不要LaTeX或特殊Unicode。", - ); - - let messages = vec![ - serde_json::json!({"role": "system", "content": system_text}), - serde_json::json!({"role": "user", "content": format!("[timer] {label}")}), - ]; - - if let BackendConfig::OpenAI { - ref endpoint, - ref model, - ref api_key, - } = config.backend - { - let sid = format!("life-{chat_id_raw}"); - let mut tg_output = TelegramOutput::new(bot.clone(), chat_id, true); - - let result = tokio::time::timeout( - std::time::Duration::from_secs(LIFE_LOOP_TIMEOUT_SECS), - run_openai_with_tools( - endpoint, model, api_key, messages, &mut tg_output, &state, &sid, - &config, *chat_id_raw, - ), - ) - .await; - - match result { - Ok(Ok(response)) => { - let detail = if response.is_empty() { - "(silent)".to_string() + tokio::select! { + _ = interval.tick() => { + let due = state.due_timers().await; + for (timer_id, chat_id_raw, label, schedule) in &due { + run_timer(&bot, &state, &config, *timer_id, *chat_id_raw, label, schedule).await; + } + } + Some(event) = rx.recv() => { + match event { + LifeEvent::FireTimer(id) => { + info!(timer_id = id, "timer force-fired via channel"); + if let Some((timer_id, chat_id_raw, label, schedule)) = state.get_timer(id).await { + run_timer(&bot, &state, &config, timer_id, chat_id_raw, &label, &schedule).await; } else { - response.chars().take(200).collect() - }; - state.log_life("timer", &format!("{label} → {detail}")).await; - if !response.is_empty() { - info!(timer_id, "life loop response ({} chars)", response.len()); + warn!(timer_id = id, "force-fire: timer not found"); } } - Ok(Err(e)) => { - state.log_life("timer_error", &format!("{label}: {e:#}")).await; - error!(timer_id, "life loop LLM error: {e:#}"); - } - Err(_) => { - state.log_life("timer_timeout", label).await; - warn!(timer_id, "life loop timeout after {LIFE_LOOP_TIMEOUT_SECS}s"); - } } } - - // reschedule or delete - if schedule.starts_with("cron:") { - if let Some(next) = compute_next_cron_fire(schedule) { - state.update_timer_next_fire(*timer_id, &next).await; - info!(timer_id, next = %next, "cron rescheduled"); - } else { - state.cancel_timer(*timer_id).await; - } - } else { - state.cancel_timer(*timer_id).await; - } } } } +async fn run_timer( + bot: &Bot, + state: &Arc, + config: &Arc, + timer_id: i64, + chat_id_raw: i64, + label: &str, + schedule: &str, +) { + let chat_id = ChatId(chat_id_raw); + info!(timer_id, %label, "timer fired"); + + let persona = state.get_config("persona").await.unwrap_or_default(); + let inner = state.get_inner_state().await; + let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(); + + let mut system_text = if persona.is_empty() { + "你是一个AI伙伴。".to_string() + } else { + persona.clone() + }; + + system_text.push_str(&format!( + "\n\n[当前时间] {now}\n\n[你的内心状态]\n{}", + if inner.is_empty() { "(空)" } else { &inner } + )); + + system_text.push_str( + "\n\n你不是因为 timer 到了才说话。你是因为在乎 Fam,所以想知道他怎么样。\ + 如果你觉得现在不该打扰他(太晚了、他今天很累、刚聊过),就什么都不说,回复空文本。\ + 主动沉默也是一种关心。\ + \n可以用 update_inner_state 更新你的内心状态。\ + 输出格式:纯文本或基础Markdown,不要LaTeX或特殊Unicode。", + ); + + let messages = vec![ + serde_json::json!({"role": "system", "content": system_text}), + serde_json::json!({"role": "user", "content": format!("[timer] {label}")}), + ]; + + if let BackendConfig::OpenAI { + ref endpoint, + ref model, + ref api_key, + } = config.backend + { + let sid = format!("life-{chat_id_raw}"); + let mut tg_output; + let mut buf_output; + let output: &mut dyn crate::output::Output = if chat_id_raw == 0 { + buf_output = BufferOutput::new(); + &mut buf_output + } else { + tg_output = TelegramOutput::new(bot.clone(), chat_id, true); + &mut tg_output + }; + + let result = tokio::time::timeout( + std::time::Duration::from_secs(LIFE_LOOP_TIMEOUT_SECS), + run_openai_with_tools( + endpoint, model, api_key, messages, output, state, &sid, + config, chat_id_raw, + ), + ) + .await; + + match result { + Ok(Ok(response)) => { + let detail = if response.is_empty() { + "(silent)".to_string() + } else { + response.chars().take(200).collect() + }; + state.log_life("timer", &format!("{label} → {detail}")).await; + if !response.is_empty() { + info!(timer_id, "life loop response ({} chars)", response.len()); + } + } + Ok(Err(e)) => { + state.log_life("timer_error", &format!("{label}: {e:#}")).await; + error!(timer_id, "life loop LLM error: {e:#}"); + } + Err(_) => { + state.log_life("timer_timeout", label).await; + warn!(timer_id, "life loop timeout after {LIFE_LOOP_TIMEOUT_SECS}s"); + } + } + } + + // reschedule or delete + if schedule.starts_with("cron:") { + if let Some(next) = compute_next_cron_fire(schedule) { + state.update_timer_next_fire(timer_id, &next).await; + info!(timer_id, next = %next, "cron rescheduled"); + } else { + state.cancel_timer(timer_id).await; + } + } else { + state.cancel_timer(timer_id).await; + } +} + /// Auto-reflection: update inner state based on recent interactions. /// Called asynchronously after every 10 messages, does not block the chat. pub async fn reflect(state: &AppState, config: &Config) { diff --git a/src/main.rs b/src/main.rs index 2e559a2..906625d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod config; mod display; mod gitea; +mod http; mod life; mod output; mod state; @@ -92,16 +93,18 @@ async fn main() { let config = Arc::new(config); - // start life loop - tokio::spawn(life::life_loop(bot.clone(), state.clone(), config.clone())); + // channel: http server → life loop + let (life_tx, life_rx) = tokio::sync::mpsc::channel(16); - // start gitea webhook server - if let Some(gitea_config) = &config.gitea { - let gc = gitea_config.clone(); - // Use the gitea admin username as the bot user for @mention detection - let bot_user = std::env::var("GITEA_ADMIN_USER").unwrap_or_else(|_| "noc".into()); + // start life loop + tokio::spawn(life::life_loop(bot.clone(), state.clone(), config.clone(), life_rx)); + + // start http server (API + gitea webhook) + { + let srv_config = config.clone(); + let srv_state = state.clone(); tokio::spawn(async move { - gitea::start_webhook_server(&gc, bot_user).await; + http::start_http_server(&srv_config, srv_state, life_tx).await; }); } diff --git a/src/state.rs b/src/state.rs index 53a90e7..e2c1801 100644 --- a/src/state.rs +++ b/src/state.rs @@ -275,6 +275,29 @@ impl AppState { ); } + /// Ensure a timer with the given label exists. If it already exists, do nothing. + /// Returns true if a new timer was created. + pub async fn ensure_timer(&self, chat_id: i64, label: &str, schedule: &str) -> bool { + let db = self.db.lock().await; + let exists: bool = db + .query_row( + "SELECT COUNT(*) > 0 FROM timers WHERE label = ?1 AND enabled = 1", + rusqlite::params![label], + |row| row.get(0), + ) + .unwrap_or(false); + if exists { + return false; + } + drop(db); + if let Some(next) = crate::tools::compute_next_cron_fire(schedule) { + self.add_timer(chat_id, label, schedule, &next).await; + true + } else { + false + } + } + pub async fn add_timer(&self, chat_id: i64, label: &str, schedule: &str, next_fire: &str) -> i64 { let db = self.db.lock().await; db.execute( @@ -285,6 +308,16 @@ impl AppState { db.last_insert_rowid() } + pub async fn get_timer(&self, id: i64) -> Option<(i64, i64, String, String)> { + let db = self.db.lock().await; + db.query_row( + "SELECT id, chat_id, label, schedule FROM timers WHERE id = ?1 AND enabled = 1", + rusqlite::params![id], + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), + ) + .ok() + } + pub async fn list_timers(&self, chat_id: Option) -> Vec<(i64, i64, String, String, String, bool)> { let db = self.db.lock().await; let (sql, params): (&str, Vec>) = match chat_id {