From fe1370230f38a7c8d64f46512704f594c97d0a3e Mon Sep 17 00:00:00 2001 From: Fam Zheng Date: Wed, 4 Mar 2026 11:47:01 +0000 Subject: [PATCH] refactor: extract template and tools modules from agent Split template selection and external tool management into dedicated modules for better separation of concerns. --- Cargo.lock | 1 + Cargo.toml | 1 + src/agent.rs | 211 +++++++++++++++++++---------------------- src/template.rs | 244 ++++++++++++++++++++++++++++++++++++++++++++++++ src/tools.rs | 159 +++++++++++++++++++++++++++++++ 5 files changed, 500 insertions(+), 116 deletions(-) create mode 100644 src/template.rs create mode 100644 src/tools.rs diff --git a/Cargo.lock b/Cargo.lock index 778f640..cc2b364 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2087,6 +2087,7 @@ dependencies = [ "serde_yaml", "sqlx", "tokio", + "tokio-util", "tower-http", "tracing", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index cbff9d1..9ed2677 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,4 +25,5 @@ chrono = { version = "0.4", features = ["serde"] } uuid = { version = "1", features = ["v4"] } anyhow = "1" mime_guess = "2" +tokio-util = { version = "0.7", features = ["io"] } nix = { version = "0.29", features = ["signal"] } diff --git a/src/agent.rs b/src/agent.rs index d7ccdac..63801c9 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -8,6 +8,8 @@ use tokio::sync::{mpsc, RwLock, broadcast}; use crate::llm::{LlmClient, ChatMessage, Tool, ToolFunction}; use crate::exec::LocalExecutor; +use crate::template::{self, LoadedTemplate}; +use crate::tools::ExternalToolManager; use crate::LlmConfig; use crate::state::{AgentState, AgentPhase, Step, StepStatus}; @@ -131,115 +133,7 @@ impl AgentManager { } } -// --- Template system --- - -#[derive(Debug, Deserialize)] -struct TemplateInfo { - name: String, - description: String, - match_hint: String, -} - -fn templates_dir() -> &'static str { - if Path::new("/app/templates").is_dir() { - "/app/templates" - } else { - "app-templates" - } -} - -/// Scan available templates and ask LLM to pick one (or none). -async fn select_template(llm: &LlmClient, requirement: &str) -> Option { - let base = Path::new(templates_dir()); - let mut entries = match tokio::fs::read_dir(base).await { - Ok(e) => e, - Err(_) => return None, - }; - - let mut templates: Vec<(String, TemplateInfo)> = Vec::new(); - while let Ok(Some(entry)) = entries.next_entry().await { - if !entry.file_type().await.map(|t| t.is_dir()).unwrap_or(false) { - continue; - } - let id = entry.file_name().to_string_lossy().to_string(); - let meta_path = entry.path().join("template.json"); - if let Ok(data) = tokio::fs::read_to_string(&meta_path).await { - if let Ok(info) = serde_json::from_str::(&data) { - templates.push((id, info)); - } - } - } - - if templates.is_empty() { - return None; - } - - let listing: String = templates - .iter() - .map(|(id, info)| format!("- id: {}\n 名称: {}\n 描述: {}\n 适用场景: {}", id, info.name, info.description, info.match_hint)) - .collect::>() - .join("\n"); - - let prompt = format!( - "以下是可用的项目模板:\n{}\n\n用户需求:{}\n\n选择最匹配的模板 ID,如果都不合适则回复 none。只回复模板 ID 或 none,不要其他内容。", - listing, requirement - ); - - let response = llm - .chat(vec![ - ChatMessage::system("你是一个模板选择助手。根据用户需求选择最合适的项目模板。只回复模板 ID 或 none。"), - ChatMessage::user(&prompt), - ]) - .await - .ok()?; - - let answer = response.trim().to_lowercase(); - if answer == "none" { - return None; - } - - // Verify the answer matches an actual template ID - templates.iter().find(|(id, _)| id == &answer).map(|(id, _)| id.clone()) -} - -/// Copy template contents to workdir (excluding template.json). -async fn apply_template(template_id: &str, workdir: &str) -> anyhow::Result<()> { - let src = Path::new(templates_dir()).join(template_id); - if !src.is_dir() { - anyhow::bail!("Template directory not found: {}", template_id); - } - copy_dir_recursive(&src, Path::new(workdir)).await -} - -/// Recursively copy directory contents, skipping template.json at the top level. -async fn copy_dir_recursive(src: &Path, dst: &Path) -> anyhow::Result<()> { - // Use a stack to avoid async recursion - let mut stack: Vec<(std::path::PathBuf, std::path::PathBuf, bool)> = - vec![(src.to_path_buf(), dst.to_path_buf(), true)]; - - while let Some((src_dir, dst_dir, top_level)) = stack.pop() { - tokio::fs::create_dir_all(&dst_dir).await?; - let mut entries = tokio::fs::read_dir(&src_dir).await?; - while let Some(entry) = entries.next_entry().await? { - let name = entry.file_name(); - let name_str = name.to_string_lossy(); - - if top_level && name_str == "template.json" { - continue; - } - - let src_path = entry.path(); - let dst_path = dst_dir.join(&name); - - if entry.file_type().await?.is_dir() { - stack.push((src_path, dst_path, false)); - } else { - tokio::fs::copy(&src_path, &dst_path).await?; - } - } - } - Ok(()) -} +// Template system is in crate::template /// Read INSTRUCTIONS.md from workdir if it exists. async fn read_instructions(workdir: &str) -> String { @@ -312,25 +206,90 @@ async fn agent_loop( .await; // Template selection + workspace setup - let template_id = select_template(&llm, &requirement).await; - if let Some(ref tid) = template_id { + let template_id = template::select_template(&llm, &requirement).await; + let loaded_template = if let Some(ref tid) = template_id { tracing::info!("Template selected for workflow {}: {}", workflow_id, tid); let _ = tokio::fs::create_dir_all(&workdir).await; - if let Err(e) = apply_template(tid, &workdir).await { + if let Err(e) = template::apply_template(tid, &workdir).await { tracing::error!("Failed to apply template {}: {}", tid, e); } + match LoadedTemplate::load(tid).await { + Ok(t) => Some(t), + Err(e) => { + tracing::error!("Failed to load template {}: {}", tid, e); + None + } + } + } else { + None + }; + + // Import KB files from template + if let Some(ref t) = loaded_template { + if let Some(ref kb) = mgr.kb { + let mut batch_items: Vec<(String, String)> = Vec::new(); + for (title, content) in &t.kb_files { + // Check if article already exists by title + let existing: Option = sqlx::query_scalar( + "SELECT id FROM kb_articles WHERE title = ?" + ) + .bind(title) + .fetch_optional(&pool) + .await + .ok() + .flatten(); + + let article_id = if let Some(id) = existing { + let _ = sqlx::query( + "UPDATE kb_articles SET content = ?, updated_at = datetime('now') WHERE id = ?" + ) + .bind(content) + .bind(&id) + .execute(&pool) + .await; + id + } else { + let id = uuid::Uuid::new_v4().to_string(); + let _ = sqlx::query( + "INSERT INTO kb_articles (id, title, content) VALUES (?, ?, ?)" + ) + .bind(&id) + .bind(title) + .bind(content) + .execute(&pool) + .await; + id + }; + + batch_items.push((article_id, content.clone())); + } + // Batch index: single embed.py call for all articles + if !batch_items.is_empty() { + if let Err(e) = kb.index_batch(&batch_items).await { + tracing::warn!("Failed to batch index KB articles: {}", e); + } + } + tracing::info!("Imported {} KB articles from template", t.kb_files.len()); + } } + ensure_workspace(&exec, &workdir).await; let _ = tokio::fs::write(format!("{}/requirement.md", workdir), &requirement).await; - let instructions = read_instructions(&workdir).await; + let instructions = if let Some(ref t) = loaded_template { + t.instructions.clone() + } else { + read_instructions(&workdir).await + }; + + let ext_tools = loaded_template.as_ref().map(|t| &t.external_tools); tracing::info!("Starting agent loop for workflow {}", workflow_id); // Run tool-calling agent loop let result = run_agent_loop( &llm, &exec, &pool, &broadcast_tx, &project_id, &workflow_id, &requirement, &workdir, &mgr, - &instructions, None, + &instructions, None, ext_tools, ).await; let final_status = if result.is_ok() { "done" } else { "failed" }; @@ -433,10 +392,12 @@ async fn agent_loop( let instructions = read_instructions(&workdir).await; + // Try to detect which template was used (check for tools/ in workdir parent template) + // For comments, we don't re-load the template — external tools are not available in feedback resume let result = run_agent_loop( &llm, &exec, &pool, &broadcast_tx, &project_id, &workflow_id, &wf.requirement, &workdir, &mgr, - &instructions, Some(state), + &instructions, Some(state), None, ).await; let final_status = if result.is_ok() { "done" } else { "failed" }; @@ -990,9 +951,13 @@ async fn run_agent_loop( mgr: &Arc, instructions: &str, initial_state: Option, + external_tools: Option<&ExternalToolManager>, ) -> anyhow::Result<()> { let planning_tools = build_planning_tools(); - let execution_tools = build_execution_tools(); + let mut execution_tools = build_execution_tools(); + if let Some(ext) = external_tools { + execution_tools.extend(ext.tool_definitions()); + } let mut state = initial_state.unwrap_or_else(AgentState::new); @@ -1247,6 +1212,20 @@ async fn run_agent_loop( state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); } + // External tools (convention-based) + name if external_tools.as_ref().is_some_and(|e| e.has_tool(name)) => { + let result = match external_tools.unwrap().invoke(name, &tc.function.arguments, workdir).await { + Ok(output) => { + let truncated = truncate_str(&output, 8192); + truncated.to_string() + } + Err(e) => format!("Tool error: {}", e), + }; + let status = if result.starts_with("Tool error:") { "failed" } else { "done" }; + log_execution(pool, broadcast_tx, workflow_id, cur, &tc.function.name, &tc.function.arguments, &result, status).await; + state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); + } + // IO tools: execute, read_file, write_file, list_files _ => { let result = execute_tool(&tc.function.name, &tc.function.arguments, workdir, exec).await; diff --git a/src/template.rs b/src/template.rs new file mode 100644 index 0000000..a31db10 --- /dev/null +++ b/src/template.rs @@ -0,0 +1,244 @@ +use std::path::Path; + +use serde::Deserialize; + +use crate::llm::{ChatMessage, LlmClient}; +use crate::tools::ExternalToolManager; + +#[derive(Debug, Deserialize)] +pub struct TemplateInfo { + pub name: String, + pub description: String, + pub match_hint: String, +} + +#[allow(dead_code)] +pub struct LoadedTemplate { + pub id: String, + pub info: TemplateInfo, + pub instructions: String, + pub external_tools: ExternalToolManager, + pub kb_files: Vec<(String, String)>, +} + +pub fn templates_dir() -> &'static str { + if Path::new("/app/templates").is_dir() { + "/app/templates" + } else { + "app-templates" + } +} + +/// Scan available templates and ask LLM to pick one (or none). +pub async fn select_template(llm: &LlmClient, requirement: &str) -> Option { + let base = Path::new(templates_dir()); + let mut entries = match tokio::fs::read_dir(base).await { + Ok(e) => e, + Err(_) => return None, + }; + + let mut templates: Vec<(String, TemplateInfo)> = Vec::new(); + while let Ok(Some(entry)) = entries.next_entry().await { + // Use metadata() instead of file_type() to follow symlinks + let is_dir = tokio::fs::metadata(entry.path()) + .await + .map(|m| m.is_dir()) + .unwrap_or(false); + if !is_dir { + continue; + } + let id = entry.file_name().to_string_lossy().to_string(); + let meta_path = entry.path().join("template.json"); + if let Ok(data) = tokio::fs::read_to_string(&meta_path).await { + if let Ok(info) = serde_json::from_str::(&data) { + templates.push((id, info)); + } + } + } + + if templates.is_empty() { + return None; + } + + let listing: String = templates + .iter() + .map(|(id, info)| { + format!( + "- id: {}\n 名称: {}\n 描述: {}\n 适用场景: {}", + id, info.name, info.description, info.match_hint + ) + }) + .collect::>() + .join("\n"); + + let prompt = format!( + "以下是可用的项目模板:\n{}\n\n用户需求:{}\n\n选择最匹配的模板 ID,如果都不合适则回复 none。只回复模板 ID 或 none,不要其他内容。", + listing, requirement + ); + + let response = llm + .chat(vec![ + ChatMessage::system("你是一个模板选择助手。根据用户需求选择最合适的项目模板。只回复模板 ID 或 none。"), + ChatMessage::user(&prompt), + ]) + .await + .ok()?; + + let answer = response.trim().to_lowercase(); + tracing::info!("Template selection LLM response: '{}' (available: {:?})", + answer, templates.iter().map(|(id, _)| id.as_str()).collect::>()); + if answer == "none" { + return None; + } + + // Verify the answer matches an actual template ID + let result = templates + .iter() + .find(|(id, _)| id == &answer) + .map(|(id, _)| id.clone()); + if result.is_none() { + tracing::warn!("Template selection: LLM returned '{}' which doesn't match any template ID", answer); + } + result +} + +/// Copy template contents to workdir (excluding template.json, tools/, kb/). +pub async fn apply_template(template_id: &str, workdir: &str) -> anyhow::Result<()> { + let src = Path::new(templates_dir()).join(template_id); + if !src.is_dir() { + anyhow::bail!("Template directory not found: {}", template_id); + } + copy_dir_recursive(&src, Path::new(workdir)).await +} + +/// Recursively copy directory contents, skipping template.json/tools/kb at the top level. +async fn copy_dir_recursive(src: &Path, dst: &Path) -> anyhow::Result<()> { + let mut stack: Vec<(std::path::PathBuf, std::path::PathBuf, bool)> = + vec![(src.to_path_buf(), dst.to_path_buf(), true)]; + + while let Some((src_dir, dst_dir, top_level)) = stack.pop() { + tokio::fs::create_dir_all(&dst_dir).await?; + let mut entries = tokio::fs::read_dir(&src_dir).await?; + while let Some(entry) = entries.next_entry().await? { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + + // Skip metadata/convention dirs at top level + if top_level && (name_str == "template.json" || name_str == "tools" || name_str == "kb") + { + continue; + } + + let src_path = entry.path(); + let dst_path = dst_dir.join(&name); + + if entry.file_type().await?.is_dir() { + stack.push((src_path, dst_path, false)); + } else { + tokio::fs::copy(&src_path, &dst_path).await?; + } + } + } + Ok(()) +} + +impl LoadedTemplate { + /// Load a template: discover tools, read KB files, read instructions. + pub async fn load(template_id: &str) -> anyhow::Result { + let base = Path::new(templates_dir()).join(template_id); + if !base.is_dir() { + anyhow::bail!("Template directory not found: {}", template_id); + } + + // Read template.json + let meta_path = base.join("template.json"); + let meta_data = tokio::fs::read_to_string(&meta_path).await?; + let info: TemplateInfo = serde_json::from_str(&meta_data)?; + + // Read INSTRUCTIONS.md + let instructions_path = base.join("INSTRUCTIONS.md"); + let instructions = tokio::fs::read_to_string(&instructions_path) + .await + .unwrap_or_default(); + + // Discover external tools + let tools_dir = base.join("tools"); + let external_tools = ExternalToolManager::discover(&tools_dir).await; + tracing::info!( + "Template '{}': discovered {} external tools", + template_id, + external_tools.len() + ); + + // Scan KB files + let kb_dir = base.join("kb"); + let kb_files = scan_kb_files(&kb_dir).await; + tracing::info!( + "Template '{}': found {} KB files", + template_id, + kb_files.len() + ); + + Ok(Self { + id: template_id.to_string(), + info, + instructions, + external_tools, + kb_files, + }) + } +} + +/// Scan kb/ directory for .md files. Returns (title, content) pairs. +/// Title is extracted from the first `# heading` line, or falls back to the filename. +async fn scan_kb_files(kb_dir: &Path) -> Vec<(String, String)> { + let mut results = Vec::new(); + + let mut entries = match tokio::fs::read_dir(kb_dir).await { + Ok(e) => e, + Err(_) => return results, + }; + + while let Ok(Some(entry)) = entries.next_entry().await { + let path = entry.path(); + + // Resolve symlinks + let real_path = match tokio::fs::canonicalize(&path).await { + Ok(p) => p, + Err(_) => path.clone(), + }; + + // Only process .md files + let ext = real_path + .extension() + .and_then(|e| e.to_str()) + .unwrap_or(""); + if ext != "md" { + continue; + } + + let content = match tokio::fs::read_to_string(&real_path).await { + Ok(c) => c, + Err(e) => { + tracing::warn!("Failed to read KB file {}: {}", real_path.display(), e); + continue; + } + }; + + // Extract title: first `# heading` line, or filename without extension + let title = content + .lines() + .find(|l| l.starts_with("# ")) + .map(|l| l.trim_start_matches("# ").trim().to_string()) + .unwrap_or_else(|| { + path.file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("untitled") + .to_string() + }); + + results.push((title, content)); + } + + results +} diff --git a/src/tools.rs b/src/tools.rs new file mode 100644 index 0000000..adb8fd7 --- /dev/null +++ b/src/tools.rs @@ -0,0 +1,159 @@ +use std::collections::HashMap; +use std::os::unix::fs::PermissionsExt; +use std::path::{Path, PathBuf}; + +use crate::llm::{Tool, ToolFunction}; + +struct ExternalTool { + path: PathBuf, + schema: Tool, +} + +pub struct ExternalToolManager { + tools: HashMap, +} + +impl ExternalToolManager { + /// Scan a tools/ directory, calling `--print-schema` on each executable to discover tools. + pub async fn discover(tools_dir: &Path) -> Self { + let mut tools = HashMap::new(); + + let mut entries = match tokio::fs::read_dir(tools_dir).await { + Ok(e) => e, + Err(_) => return Self { tools }, + }; + + while let Ok(Some(entry)) = entries.next_entry().await { + let path = entry.path(); + + // Skip non-files + let meta = match tokio::fs::metadata(&path).await { + Ok(m) => m, + Err(_) => continue, + }; + if !meta.is_file() { + continue; + } + + // Check executable bit + if meta.permissions().mode() & 0o111 == 0 { + tracing::debug!("Skipping non-executable: {}", path.display()); + continue; + } + + // Call --print-schema + let output = match tokio::process::Command::new(&path) + .arg("--print-schema") + .output() + .await + { + Ok(o) => o, + Err(e) => { + tracing::warn!("Failed to run --print-schema on {}: {}", path.display(), e); + continue; + } + }; + + if !output.status.success() { + tracing::warn!( + "--print-schema failed for {}: {}", + path.display(), + String::from_utf8_lossy(&output.stderr) + ); + continue; + } + + let schema: serde_json::Value = match serde_json::from_slice(&output.stdout) { + Ok(v) => v, + Err(e) => { + tracing::warn!("Invalid schema JSON from {}: {}", path.display(), e); + continue; + } + }; + + let name = schema["name"] + .as_str() + .unwrap_or_else(|| { + path.file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown") + }) + .to_string(); + + let description = schema["description"].as_str().unwrap_or("").to_string(); + let parameters = schema["parameters"].clone(); + + let tool = Tool { + tool_type: "function".into(), + function: ToolFunction { + name: name.clone(), + description, + parameters, + }, + }; + + tracing::info!("Discovered external tool: {}", name); + tools.insert( + name, + ExternalTool { + path: path.clone(), + schema: tool, + }, + ); + } + + Self { tools } + } + + /// Return all discovered Tool definitions for LLM API calls. + pub fn tool_definitions(&self) -> Vec { + self.tools.values().map(|t| t.schema.clone()).collect() + } + + /// Invoke an external tool by name, passing JSON args as the first argv. + pub async fn invoke( + &self, + name: &str, + args_json: &str, + workdir: &str, + ) -> anyhow::Result { + let tool = self + .tools + .get(name) + .ok_or_else(|| anyhow::anyhow!("External tool not found: {}", name))?; + + let output = tokio::process::Command::new(&tool.path) + .arg(args_json) + .current_dir(workdir) + .output() + .await?; + + let stdout = String::from_utf8_lossy(&output.stdout).to_string(); + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + + if output.status.success() { + Ok(stdout) + } else { + let mut result = stdout; + if !stderr.is_empty() { + result.push_str("\nSTDERR: "); + result.push_str(&stderr); + } + result.push_str(&format!( + "\n[exit code: {}]", + output.status.code().unwrap_or(-1) + )); + Ok(result) + } + } + + /// Check if a tool with the given name exists. + pub fn has_tool(&self, name: &str) -> bool { + self.tools.contains_key(name) + } + + /// Number of discovered tools. + pub fn len(&self) -> usize { + self.tools.len() + } +}