refactor: extract template and tools modules from agent

Split template selection and external tool management into dedicated
modules for better separation of concerns.
This commit is contained in:
Fam Zheng 2026-03-04 11:47:01 +00:00
parent c0b681adc3
commit fe1370230f
5 changed files with 500 additions and 116 deletions

1
Cargo.lock generated
View File

@ -2087,6 +2087,7 @@ dependencies = [
"serde_yaml", "serde_yaml",
"sqlx", "sqlx",
"tokio", "tokio",
"tokio-util",
"tower-http", "tower-http",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",

View File

@ -25,4 +25,5 @@ chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1", features = ["v4"] } uuid = { version = "1", features = ["v4"] }
anyhow = "1" anyhow = "1"
mime_guess = "2" mime_guess = "2"
tokio-util = { version = "0.7", features = ["io"] }
nix = { version = "0.29", features = ["signal"] } nix = { version = "0.29", features = ["signal"] }

View File

@ -8,6 +8,8 @@ use tokio::sync::{mpsc, RwLock, broadcast};
use crate::llm::{LlmClient, ChatMessage, Tool, ToolFunction}; use crate::llm::{LlmClient, ChatMessage, Tool, ToolFunction};
use crate::exec::LocalExecutor; use crate::exec::LocalExecutor;
use crate::template::{self, LoadedTemplate};
use crate::tools::ExternalToolManager;
use crate::LlmConfig; use crate::LlmConfig;
use crate::state::{AgentState, AgentPhase, Step, StepStatus}; use crate::state::{AgentState, AgentPhase, Step, StepStatus};
@ -131,115 +133,7 @@ impl AgentManager {
} }
} }
// --- Template system --- // Template system is in crate::template
#[derive(Debug, Deserialize)]
struct TemplateInfo {
name: String,
description: String,
match_hint: String,
}
fn templates_dir() -> &'static str {
if Path::new("/app/templates").is_dir() {
"/app/templates"
} else {
"app-templates"
}
}
/// Scan available templates and ask LLM to pick one (or none).
async fn select_template(llm: &LlmClient, requirement: &str) -> Option<String> {
let base = Path::new(templates_dir());
let mut entries = match tokio::fs::read_dir(base).await {
Ok(e) => e,
Err(_) => return None,
};
let mut templates: Vec<(String, TemplateInfo)> = Vec::new();
while let Ok(Some(entry)) = entries.next_entry().await {
if !entry.file_type().await.map(|t| t.is_dir()).unwrap_or(false) {
continue;
}
let id = entry.file_name().to_string_lossy().to_string();
let meta_path = entry.path().join("template.json");
if let Ok(data) = tokio::fs::read_to_string(&meta_path).await {
if let Ok(info) = serde_json::from_str::<TemplateInfo>(&data) {
templates.push((id, info));
}
}
}
if templates.is_empty() {
return None;
}
let listing: String = templates
.iter()
.map(|(id, info)| format!("- id: {}\n 名称: {}\n 描述: {}\n 适用场景: {}", id, info.name, info.description, info.match_hint))
.collect::<Vec<_>>()
.join("\n");
let prompt = format!(
"以下是可用的项目模板:\n{}\n\n用户需求:{}\n\n选择最匹配的模板 ID如果都不合适则回复 none。只回复模板 ID 或 none不要其他内容。",
listing, requirement
);
let response = llm
.chat(vec![
ChatMessage::system("你是一个模板选择助手。根据用户需求选择最合适的项目模板。只回复模板 ID 或 none。"),
ChatMessage::user(&prompt),
])
.await
.ok()?;
let answer = response.trim().to_lowercase();
if answer == "none" {
return None;
}
// Verify the answer matches an actual template ID
templates.iter().find(|(id, _)| id == &answer).map(|(id, _)| id.clone())
}
/// Copy template contents to workdir (excluding template.json).
async fn apply_template(template_id: &str, workdir: &str) -> anyhow::Result<()> {
let src = Path::new(templates_dir()).join(template_id);
if !src.is_dir() {
anyhow::bail!("Template directory not found: {}", template_id);
}
copy_dir_recursive(&src, Path::new(workdir)).await
}
/// Recursively copy directory contents, skipping template.json at the top level.
async fn copy_dir_recursive(src: &Path, dst: &Path) -> anyhow::Result<()> {
// Use a stack to avoid async recursion
let mut stack: Vec<(std::path::PathBuf, std::path::PathBuf, bool)> =
vec![(src.to_path_buf(), dst.to_path_buf(), true)];
while let Some((src_dir, dst_dir, top_level)) = stack.pop() {
tokio::fs::create_dir_all(&dst_dir).await?;
let mut entries = tokio::fs::read_dir(&src_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if top_level && name_str == "template.json" {
continue;
}
let src_path = entry.path();
let dst_path = dst_dir.join(&name);
if entry.file_type().await?.is_dir() {
stack.push((src_path, dst_path, false));
} else {
tokio::fs::copy(&src_path, &dst_path).await?;
}
}
}
Ok(())
}
/// Read INSTRUCTIONS.md from workdir if it exists. /// Read INSTRUCTIONS.md from workdir if it exists.
async fn read_instructions(workdir: &str) -> String { async fn read_instructions(workdir: &str) -> String {
@ -312,25 +206,90 @@ async fn agent_loop(
.await; .await;
// Template selection + workspace setup // Template selection + workspace setup
let template_id = select_template(&llm, &requirement).await; let template_id = template::select_template(&llm, &requirement).await;
if let Some(ref tid) = template_id { let loaded_template = if let Some(ref tid) = template_id {
tracing::info!("Template selected for workflow {}: {}", workflow_id, tid); tracing::info!("Template selected for workflow {}: {}", workflow_id, tid);
let _ = tokio::fs::create_dir_all(&workdir).await; let _ = tokio::fs::create_dir_all(&workdir).await;
if let Err(e) = apply_template(tid, &workdir).await { if let Err(e) = template::apply_template(tid, &workdir).await {
tracing::error!("Failed to apply template {}: {}", tid, e); tracing::error!("Failed to apply template {}: {}", tid, e);
} }
match LoadedTemplate::load(tid).await {
Ok(t) => Some(t),
Err(e) => {
tracing::error!("Failed to load template {}: {}", tid, e);
None
} }
}
} else {
None
};
// Import KB files from template
if let Some(ref t) = loaded_template {
if let Some(ref kb) = mgr.kb {
let mut batch_items: Vec<(String, String)> = Vec::new();
for (title, content) in &t.kb_files {
// Check if article already exists by title
let existing: Option<String> = sqlx::query_scalar(
"SELECT id FROM kb_articles WHERE title = ?"
)
.bind(title)
.fetch_optional(&pool)
.await
.ok()
.flatten();
let article_id = if let Some(id) = existing {
let _ = sqlx::query(
"UPDATE kb_articles SET content = ?, updated_at = datetime('now') WHERE id = ?"
)
.bind(content)
.bind(&id)
.execute(&pool)
.await;
id
} else {
let id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT INTO kb_articles (id, title, content) VALUES (?, ?, ?)"
)
.bind(&id)
.bind(title)
.bind(content)
.execute(&pool)
.await;
id
};
batch_items.push((article_id, content.clone()));
}
// Batch index: single embed.py call for all articles
if !batch_items.is_empty() {
if let Err(e) = kb.index_batch(&batch_items).await {
tracing::warn!("Failed to batch index KB articles: {}", e);
}
}
tracing::info!("Imported {} KB articles from template", t.kb_files.len());
}
}
ensure_workspace(&exec, &workdir).await; ensure_workspace(&exec, &workdir).await;
let _ = tokio::fs::write(format!("{}/requirement.md", workdir), &requirement).await; let _ = tokio::fs::write(format!("{}/requirement.md", workdir), &requirement).await;
let instructions = read_instructions(&workdir).await; let instructions = if let Some(ref t) = loaded_template {
t.instructions.clone()
} else {
read_instructions(&workdir).await
};
let ext_tools = loaded_template.as_ref().map(|t| &t.external_tools);
tracing::info!("Starting agent loop for workflow {}", workflow_id); tracing::info!("Starting agent loop for workflow {}", workflow_id);
// Run tool-calling agent loop // Run tool-calling agent loop
let result = run_agent_loop( let result = run_agent_loop(
&llm, &exec, &pool, &broadcast_tx, &llm, &exec, &pool, &broadcast_tx,
&project_id, &workflow_id, &requirement, &workdir, &mgr, &project_id, &workflow_id, &requirement, &workdir, &mgr,
&instructions, None, &instructions, None, ext_tools,
).await; ).await;
let final_status = if result.is_ok() { "done" } else { "failed" }; let final_status = if result.is_ok() { "done" } else { "failed" };
@ -433,10 +392,12 @@ async fn agent_loop(
let instructions = read_instructions(&workdir).await; let instructions = read_instructions(&workdir).await;
// Try to detect which template was used (check for tools/ in workdir parent template)
// For comments, we don't re-load the template — external tools are not available in feedback resume
let result = run_agent_loop( let result = run_agent_loop(
&llm, &exec, &pool, &broadcast_tx, &llm, &exec, &pool, &broadcast_tx,
&project_id, &workflow_id, &wf.requirement, &workdir, &mgr, &project_id, &workflow_id, &wf.requirement, &workdir, &mgr,
&instructions, Some(state), &instructions, Some(state), None,
).await; ).await;
let final_status = if result.is_ok() { "done" } else { "failed" }; let final_status = if result.is_ok() { "done" } else { "failed" };
@ -990,9 +951,13 @@ async fn run_agent_loop(
mgr: &Arc<AgentManager>, mgr: &Arc<AgentManager>,
instructions: &str, instructions: &str,
initial_state: Option<AgentState>, initial_state: Option<AgentState>,
external_tools: Option<&ExternalToolManager>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let planning_tools = build_planning_tools(); let planning_tools = build_planning_tools();
let execution_tools = build_execution_tools(); let mut execution_tools = build_execution_tools();
if let Some(ext) = external_tools {
execution_tools.extend(ext.tool_definitions());
}
let mut state = initial_state.unwrap_or_else(AgentState::new); let mut state = initial_state.unwrap_or_else(AgentState::new);
@ -1247,6 +1212,20 @@ async fn run_agent_loop(
state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
} }
// External tools (convention-based)
name if external_tools.as_ref().is_some_and(|e| e.has_tool(name)) => {
let result = match external_tools.unwrap().invoke(name, &tc.function.arguments, workdir).await {
Ok(output) => {
let truncated = truncate_str(&output, 8192);
truncated.to_string()
}
Err(e) => format!("Tool error: {}", e),
};
let status = if result.starts_with("Tool error:") { "failed" } else { "done" };
log_execution(pool, broadcast_tx, workflow_id, cur, &tc.function.name, &tc.function.arguments, &result, status).await;
state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
}
// IO tools: execute, read_file, write_file, list_files // IO tools: execute, read_file, write_file, list_files
_ => { _ => {
let result = execute_tool(&tc.function.name, &tc.function.arguments, workdir, exec).await; let result = execute_tool(&tc.function.name, &tc.function.arguments, workdir, exec).await;

244
src/template.rs Normal file
View File

@ -0,0 +1,244 @@
use std::path::Path;
use serde::Deserialize;
use crate::llm::{ChatMessage, LlmClient};
use crate::tools::ExternalToolManager;
#[derive(Debug, Deserialize)]
pub struct TemplateInfo {
pub name: String,
pub description: String,
pub match_hint: String,
}
#[allow(dead_code)]
pub struct LoadedTemplate {
pub id: String,
pub info: TemplateInfo,
pub instructions: String,
pub external_tools: ExternalToolManager,
pub kb_files: Vec<(String, String)>,
}
pub fn templates_dir() -> &'static str {
if Path::new("/app/templates").is_dir() {
"/app/templates"
} else {
"app-templates"
}
}
/// Scan available templates and ask LLM to pick one (or none).
pub async fn select_template(llm: &LlmClient, requirement: &str) -> Option<String> {
let base = Path::new(templates_dir());
let mut entries = match tokio::fs::read_dir(base).await {
Ok(e) => e,
Err(_) => return None,
};
let mut templates: Vec<(String, TemplateInfo)> = Vec::new();
while let Ok(Some(entry)) = entries.next_entry().await {
// Use metadata() instead of file_type() to follow symlinks
let is_dir = tokio::fs::metadata(entry.path())
.await
.map(|m| m.is_dir())
.unwrap_or(false);
if !is_dir {
continue;
}
let id = entry.file_name().to_string_lossy().to_string();
let meta_path = entry.path().join("template.json");
if let Ok(data) = tokio::fs::read_to_string(&meta_path).await {
if let Ok(info) = serde_json::from_str::<TemplateInfo>(&data) {
templates.push((id, info));
}
}
}
if templates.is_empty() {
return None;
}
let listing: String = templates
.iter()
.map(|(id, info)| {
format!(
"- id: {}\n 名称: {}\n 描述: {}\n 适用场景: {}",
id, info.name, info.description, info.match_hint
)
})
.collect::<Vec<_>>()
.join("\n");
let prompt = format!(
"以下是可用的项目模板:\n{}\n\n用户需求:{}\n\n选择最匹配的模板 ID如果都不合适则回复 none。只回复模板 ID 或 none不要其他内容。",
listing, requirement
);
let response = llm
.chat(vec![
ChatMessage::system("你是一个模板选择助手。根据用户需求选择最合适的项目模板。只回复模板 ID 或 none。"),
ChatMessage::user(&prompt),
])
.await
.ok()?;
let answer = response.trim().to_lowercase();
tracing::info!("Template selection LLM response: '{}' (available: {:?})",
answer, templates.iter().map(|(id, _)| id.as_str()).collect::<Vec<_>>());
if answer == "none" {
return None;
}
// Verify the answer matches an actual template ID
let result = templates
.iter()
.find(|(id, _)| id == &answer)
.map(|(id, _)| id.clone());
if result.is_none() {
tracing::warn!("Template selection: LLM returned '{}' which doesn't match any template ID", answer);
}
result
}
/// Copy template contents to workdir (excluding template.json, tools/, kb/).
pub async fn apply_template(template_id: &str, workdir: &str) -> anyhow::Result<()> {
let src = Path::new(templates_dir()).join(template_id);
if !src.is_dir() {
anyhow::bail!("Template directory not found: {}", template_id);
}
copy_dir_recursive(&src, Path::new(workdir)).await
}
/// Recursively copy directory contents, skipping template.json/tools/kb at the top level.
async fn copy_dir_recursive(src: &Path, dst: &Path) -> anyhow::Result<()> {
let mut stack: Vec<(std::path::PathBuf, std::path::PathBuf, bool)> =
vec![(src.to_path_buf(), dst.to_path_buf(), true)];
while let Some((src_dir, dst_dir, top_level)) = stack.pop() {
tokio::fs::create_dir_all(&dst_dir).await?;
let mut entries = tokio::fs::read_dir(&src_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let name = entry.file_name();
let name_str = name.to_string_lossy();
// Skip metadata/convention dirs at top level
if top_level && (name_str == "template.json" || name_str == "tools" || name_str == "kb")
{
continue;
}
let src_path = entry.path();
let dst_path = dst_dir.join(&name);
if entry.file_type().await?.is_dir() {
stack.push((src_path, dst_path, false));
} else {
tokio::fs::copy(&src_path, &dst_path).await?;
}
}
}
Ok(())
}
impl LoadedTemplate {
/// Load a template: discover tools, read KB files, read instructions.
pub async fn load(template_id: &str) -> anyhow::Result<Self> {
let base = Path::new(templates_dir()).join(template_id);
if !base.is_dir() {
anyhow::bail!("Template directory not found: {}", template_id);
}
// Read template.json
let meta_path = base.join("template.json");
let meta_data = tokio::fs::read_to_string(&meta_path).await?;
let info: TemplateInfo = serde_json::from_str(&meta_data)?;
// Read INSTRUCTIONS.md
let instructions_path = base.join("INSTRUCTIONS.md");
let instructions = tokio::fs::read_to_string(&instructions_path)
.await
.unwrap_or_default();
// Discover external tools
let tools_dir = base.join("tools");
let external_tools = ExternalToolManager::discover(&tools_dir).await;
tracing::info!(
"Template '{}': discovered {} external tools",
template_id,
external_tools.len()
);
// Scan KB files
let kb_dir = base.join("kb");
let kb_files = scan_kb_files(&kb_dir).await;
tracing::info!(
"Template '{}': found {} KB files",
template_id,
kb_files.len()
);
Ok(Self {
id: template_id.to_string(),
info,
instructions,
external_tools,
kb_files,
})
}
}
/// Scan kb/ directory for .md files. Returns (title, content) pairs.
/// Title is extracted from the first `# heading` line, or falls back to the filename.
async fn scan_kb_files(kb_dir: &Path) -> Vec<(String, String)> {
let mut results = Vec::new();
let mut entries = match tokio::fs::read_dir(kb_dir).await {
Ok(e) => e,
Err(_) => return results,
};
while let Ok(Some(entry)) = entries.next_entry().await {
let path = entry.path();
// Resolve symlinks
let real_path = match tokio::fs::canonicalize(&path).await {
Ok(p) => p,
Err(_) => path.clone(),
};
// Only process .md files
let ext = real_path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("");
if ext != "md" {
continue;
}
let content = match tokio::fs::read_to_string(&real_path).await {
Ok(c) => c,
Err(e) => {
tracing::warn!("Failed to read KB file {}: {}", real_path.display(), e);
continue;
}
};
// Extract title: first `# heading` line, or filename without extension
let title = content
.lines()
.find(|l| l.starts_with("# "))
.map(|l| l.trim_start_matches("# ").trim().to_string())
.unwrap_or_else(|| {
path.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("untitled")
.to_string()
});
results.push((title, content));
}
results
}

159
src/tools.rs Normal file
View File

@ -0,0 +1,159 @@
use std::collections::HashMap;
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use crate::llm::{Tool, ToolFunction};
struct ExternalTool {
path: PathBuf,
schema: Tool,
}
pub struct ExternalToolManager {
tools: HashMap<String, ExternalTool>,
}
impl ExternalToolManager {
/// Scan a tools/ directory, calling `--print-schema` on each executable to discover tools.
pub async fn discover(tools_dir: &Path) -> Self {
let mut tools = HashMap::new();
let mut entries = match tokio::fs::read_dir(tools_dir).await {
Ok(e) => e,
Err(_) => return Self { tools },
};
while let Ok(Some(entry)) = entries.next_entry().await {
let path = entry.path();
// Skip non-files
let meta = match tokio::fs::metadata(&path).await {
Ok(m) => m,
Err(_) => continue,
};
if !meta.is_file() {
continue;
}
// Check executable bit
if meta.permissions().mode() & 0o111 == 0 {
tracing::debug!("Skipping non-executable: {}", path.display());
continue;
}
// Call --print-schema
let output = match tokio::process::Command::new(&path)
.arg("--print-schema")
.output()
.await
{
Ok(o) => o,
Err(e) => {
tracing::warn!("Failed to run --print-schema on {}: {}", path.display(), e);
continue;
}
};
if !output.status.success() {
tracing::warn!(
"--print-schema failed for {}: {}",
path.display(),
String::from_utf8_lossy(&output.stderr)
);
continue;
}
let schema: serde_json::Value = match serde_json::from_slice(&output.stdout) {
Ok(v) => v,
Err(e) => {
tracing::warn!("Invalid schema JSON from {}: {}", path.display(), e);
continue;
}
};
let name = schema["name"]
.as_str()
.unwrap_or_else(|| {
path.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
})
.to_string();
let description = schema["description"].as_str().unwrap_or("").to_string();
let parameters = schema["parameters"].clone();
let tool = Tool {
tool_type: "function".into(),
function: ToolFunction {
name: name.clone(),
description,
parameters,
},
};
tracing::info!("Discovered external tool: {}", name);
tools.insert(
name,
ExternalTool {
path: path.clone(),
schema: tool,
},
);
}
Self { tools }
}
/// Return all discovered Tool definitions for LLM API calls.
pub fn tool_definitions(&self) -> Vec<Tool> {
self.tools.values().map(|t| t.schema.clone()).collect()
}
/// Invoke an external tool by name, passing JSON args as the first argv.
pub async fn invoke(
&self,
name: &str,
args_json: &str,
workdir: &str,
) -> anyhow::Result<String> {
let tool = self
.tools
.get(name)
.ok_or_else(|| anyhow::anyhow!("External tool not found: {}", name))?;
let output = tokio::process::Command::new(&tool.path)
.arg(args_json)
.current_dir(workdir)
.output()
.await?;
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if output.status.success() {
Ok(stdout)
} else {
let mut result = stdout;
if !stderr.is_empty() {
result.push_str("\nSTDERR: ");
result.push_str(&stderr);
}
result.push_str(&format!(
"\n[exit code: {}]",
output.status.code().unwrap_or(-1)
));
Ok(result)
}
}
/// Check if a tool with the given name exists.
pub fn has_tool(&self, name: &str) -> bool {
self.tools.contains_key(name)
}
/// Number of discovered tools.
pub fn len(&self) -> usize {
self.tools.len()
}
}