tori/src/agent.rs
Fam Zheng 0a8eee0285 LLM call logging, plan persistence API, quote-to-feedback UX, requirement input improvements
- Add llm_call_log table and per-call timing/token tracking in agent loop
- New GET /workflows/{id}/plan endpoint to restore plan from snapshots on page load
- New GET /workflows/{id}/llm-calls endpoint + WS LlmCallLog broadcast
- Parse Usage from LLM API response (prompt_tokens, completion_tokens)
- Detailed mode toggle in execution log showing LLM call cards with phase/tokens/latency
- Quote-to-feedback: hover quote buttons on plan steps and log entries, multi-quote chips in comment input
- Requirement input: larger textarea, multi-line display with pre-wrap and scroll

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 09:16:51 +00:00

1366 lines
55 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicU16, Ordering};
use serde::{Deserialize, Serialize};
use sqlx::sqlite::SqlitePool;
use tokio::sync::{mpsc, RwLock, broadcast};
use crate::llm::{LlmClient, ChatMessage, Tool, ToolFunction};
use crate::exec::LocalExecutor;
use crate::LlmConfig;
use crate::state::{AgentState, AgentPhase, Step, StepStatus};
pub struct ServiceInfo {
pub port: u16,
pub pid: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum AgentEvent {
NewRequirement { workflow_id: String, requirement: String },
Comment { workflow_id: String, content: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum WsMessage {
PlanUpdate { workflow_id: String, steps: Vec<PlanStepInfo> },
StepStatusUpdate { step_id: String, status: String, output: String },
WorkflowStatusUpdate { workflow_id: String, status: String },
RequirementUpdate { workflow_id: String, requirement: String },
ReportReady { workflow_id: String },
ProjectUpdate { project_id: String, name: String },
LlmCallLog { workflow_id: String, entry: crate::db::LlmCallLogEntry },
Error { message: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanStepInfo {
pub order: i32,
pub description: String,
pub command: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<String>,
}
pub fn plan_infos_from_state(state: &AgentState) -> Vec<PlanStepInfo> {
state.steps.iter().map(|s| {
let status = match s.status {
StepStatus::Pending => "pending",
StepStatus::Running => "running",
StepStatus::Done => "done",
StepStatus::Failed => "failed",
};
PlanStepInfo {
order: s.order,
description: s.title.clone(),
command: s.description.clone(),
status: Some(status.to_string()),
}
}).collect()
}
pub struct AgentManager {
agents: RwLock<HashMap<String, mpsc::Sender<AgentEvent>>>,
broadcast: RwLock<HashMap<String, broadcast::Sender<WsMessage>>>,
pub services: RwLock<HashMap<String, ServiceInfo>>,
next_port: AtomicU16,
pool: SqlitePool,
llm_config: LlmConfig,
kb: Option<Arc<crate::kb::KbManager>>,
}
impl AgentManager {
pub fn new(pool: SqlitePool, llm_config: LlmConfig, kb: Option<Arc<crate::kb::KbManager>>) -> Arc<Self> {
Arc::new(Self {
agents: RwLock::new(HashMap::new()),
broadcast: RwLock::new(HashMap::new()),
services: RwLock::new(HashMap::new()),
next_port: AtomicU16::new(9100),
pool,
llm_config,
kb,
})
}
pub fn allocate_port(&self) -> u16 {
self.next_port.fetch_add(1, Ordering::Relaxed)
}
pub async fn get_service_port(&self, project_id: &str) -> Option<u16> {
self.services.read().await.get(project_id).map(|s| s.port)
}
pub async fn get_broadcast(&self, project_id: &str) -> broadcast::Receiver<WsMessage> {
let mut map = self.broadcast.write().await;
let tx = map.entry(project_id.to_string())
.or_insert_with(|| broadcast::channel(64).0);
tx.subscribe()
}
pub async fn send_event(self: &Arc<Self>, project_id: &str, event: AgentEvent) {
let agents = self.agents.read().await;
if let Some(tx) = agents.get(project_id) {
let _ = tx.send(event).await;
} else {
drop(agents);
self.spawn_agent(project_id.to_string()).await;
let agents = self.agents.read().await;
if let Some(tx) = agents.get(project_id) {
let _ = tx.send(event).await;
}
}
}
async fn spawn_agent(self: &Arc<Self>, project_id: String) {
let (tx, rx) = mpsc::channel(32);
self.agents.write().await.insert(project_id.clone(), tx);
let broadcast_tx = {
let mut map = self.broadcast.write().await;
map.entry(project_id.clone())
.or_insert_with(|| broadcast::channel(64).0)
.clone()
};
let mgr = Arc::clone(self);
tokio::spawn(agent_loop(project_id, rx, broadcast_tx, mgr));
}
}
// --- Template system ---
#[derive(Debug, Deserialize)]
struct TemplateInfo {
name: String,
description: String,
match_hint: String,
}
fn templates_dir() -> &'static str {
if Path::new("/app/templates").is_dir() {
"/app/templates"
} else {
"app-templates"
}
}
/// Scan available templates and ask LLM to pick one (or none).
async fn select_template(llm: &LlmClient, requirement: &str) -> Option<String> {
let base = Path::new(templates_dir());
let mut entries = match tokio::fs::read_dir(base).await {
Ok(e) => e,
Err(_) => return None,
};
let mut templates: Vec<(String, TemplateInfo)> = Vec::new();
while let Ok(Some(entry)) = entries.next_entry().await {
if !entry.file_type().await.map(|t| t.is_dir()).unwrap_or(false) {
continue;
}
let id = entry.file_name().to_string_lossy().to_string();
let meta_path = entry.path().join("template.json");
if let Ok(data) = tokio::fs::read_to_string(&meta_path).await {
if let Ok(info) = serde_json::from_str::<TemplateInfo>(&data) {
templates.push((id, info));
}
}
}
if templates.is_empty() {
return None;
}
let listing: String = templates
.iter()
.map(|(id, info)| format!("- id: {}\n 名称: {}\n 描述: {}\n 适用场景: {}", id, info.name, info.description, info.match_hint))
.collect::<Vec<_>>()
.join("\n");
let prompt = format!(
"以下是可用的项目模板:\n{}\n\n用户需求:{}\n\n选择最匹配的模板 ID如果都不合适则回复 none。只回复模板 ID 或 none不要其他内容。",
listing, requirement
);
let response = llm
.chat(vec![
ChatMessage::system("你是一个模板选择助手。根据用户需求选择最合适的项目模板。只回复模板 ID 或 none。"),
ChatMessage::user(&prompt),
])
.await
.ok()?;
let answer = response.trim().to_lowercase();
if answer == "none" {
return None;
}
// Verify the answer matches an actual template ID
templates.iter().find(|(id, _)| id == &answer).map(|(id, _)| id.clone())
}
/// Copy template contents to workdir (excluding template.json).
async fn apply_template(template_id: &str, workdir: &str) -> anyhow::Result<()> {
let src = Path::new(templates_dir()).join(template_id);
if !src.is_dir() {
anyhow::bail!("Template directory not found: {}", template_id);
}
copy_dir_recursive(&src, Path::new(workdir)).await
}
/// Recursively copy directory contents, skipping template.json at the top level.
async fn copy_dir_recursive(src: &Path, dst: &Path) -> anyhow::Result<()> {
// Use a stack to avoid async recursion
let mut stack: Vec<(std::path::PathBuf, std::path::PathBuf, bool)> =
vec![(src.to_path_buf(), dst.to_path_buf(), true)];
while let Some((src_dir, dst_dir, top_level)) = stack.pop() {
tokio::fs::create_dir_all(&dst_dir).await?;
let mut entries = tokio::fs::read_dir(&src_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if top_level && name_str == "template.json" {
continue;
}
let src_path = entry.path();
let dst_path = dst_dir.join(&name);
if entry.file_type().await?.is_dir() {
stack.push((src_path, dst_path, false));
} else {
tokio::fs::copy(&src_path, &dst_path).await?;
}
}
}
Ok(())
}
/// Read INSTRUCTIONS.md from workdir if it exists.
async fn read_instructions(workdir: &str) -> String {
let path = format!("{}/INSTRUCTIONS.md", workdir);
tokio::fs::read_to_string(&path).await.unwrap_or_default()
}
async fn ensure_workspace(exec: &LocalExecutor, workdir: &str) {
let _ = tokio::fs::create_dir_all(workdir).await;
let setup_script = format!("{}/scripts/setup.sh", workdir);
if Path::new(&setup_script).exists() {
tracing::info!("Running setup.sh in {}", workdir);
let _ = exec.execute("bash scripts/setup.sh", workdir).await;
} else {
let venv_path = format!("{}/.venv", workdir);
if !Path::new(&venv_path).exists() {
let _ = exec.execute("uv venv .venv", workdir).await;
}
}
}
async fn agent_loop(
project_id: String,
mut rx: mpsc::Receiver<AgentEvent>,
broadcast_tx: broadcast::Sender<WsMessage>,
mgr: Arc<AgentManager>,
) {
let pool = mgr.pool.clone();
let llm_config = mgr.llm_config.clone();
let llm = LlmClient::new(&llm_config);
let exec = LocalExecutor::new();
let workdir = format!("/app/data/workspaces/{}", project_id);
tracing::info!("Agent loop started for project {}", project_id);
while let Some(event) = rx.recv().await {
match event {
AgentEvent::NewRequirement { workflow_id, requirement } => {
tracing::info!("Processing new requirement for workflow {}", workflow_id);
// Generate project title in background (don't block the agent loop)
{
let title_llm = LlmClient::new(&llm_config);
let title_pool = pool.clone();
let title_btx = broadcast_tx.clone();
let title_pid = project_id.clone();
let title_req = requirement.clone();
tokio::spawn(async move {
if let Ok(title) = generate_title(&title_llm, &title_req).await {
let _ = sqlx::query("UPDATE projects SET name = ? WHERE id = ?")
.bind(&title)
.bind(&title_pid)
.execute(&title_pool)
.await;
let _ = title_btx.send(WsMessage::ProjectUpdate {
project_id: title_pid,
name: title,
});
}
});
}
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: "executing".into(),
});
let _ = sqlx::query("UPDATE workflows SET status = 'executing' WHERE id = ?")
.bind(&workflow_id)
.execute(&pool)
.await;
// Template selection + workspace setup
let template_id = select_template(&llm, &requirement).await;
if let Some(ref tid) = template_id {
tracing::info!("Template selected for workflow {}: {}", workflow_id, tid);
let _ = tokio::fs::create_dir_all(&workdir).await;
if let Err(e) = apply_template(tid, &workdir).await {
tracing::error!("Failed to apply template {}: {}", tid, e);
}
}
ensure_workspace(&exec, &workdir).await;
let _ = tokio::fs::write(format!("{}/requirement.md", workdir), &requirement).await;
let instructions = read_instructions(&workdir).await;
tracing::info!("Starting agent loop for workflow {}", workflow_id);
// Run tool-calling agent loop
let result = run_agent_loop(
&llm, &exec, &pool, &broadcast_tx,
&project_id, &workflow_id, &requirement, &workdir, &mgr,
&instructions, None,
).await;
let final_status = if result.is_ok() { "done" } else { "failed" };
tracing::info!("Agent loop finished for workflow {}, status: {}", workflow_id, final_status);
if let Err(e) = &result {
tracing::error!("Agent error for workflow {}: {}", workflow_id, e);
let _ = broadcast_tx.send(WsMessage::Error {
message: format!("Agent error: {}", e),
});
}
let _ = sqlx::query("UPDATE workflows SET status = ? WHERE id = ?")
.bind(final_status)
.bind(&workflow_id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: final_status.into(),
});
// Generate report from execution log
let log_entries = sqlx::query_as::<_, crate::db::ExecutionLogEntry>(
"SELECT * FROM execution_log WHERE workflow_id = ? ORDER BY created_at"
)
.bind(&workflow_id)
.fetch_all(&pool)
.await
.unwrap_or_default();
if let Ok(report) = generate_report(&llm, &requirement, &log_entries, &project_id).await {
let _ = sqlx::query("UPDATE workflows SET report = ? WHERE id = ?")
.bind(&report)
.bind(&workflow_id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::ReportReady {
workflow_id: workflow_id.clone(),
});
}
}
AgentEvent::Comment { workflow_id, content } => {
tracing::info!("Comment on workflow {}: {}", workflow_id, content);
let wf = sqlx::query_as::<_, crate::db::Workflow>(
"SELECT * FROM workflows WHERE id = ?",
)
.bind(&workflow_id)
.fetch_optional(&pool)
.await
.ok()
.flatten();
let Some(wf) = wf else { continue };
// Load latest state snapshot
let snapshot = sqlx::query_scalar::<_, String>(
"SELECT state_json FROM agent_state_snapshots WHERE workflow_id = ? ORDER BY created_at DESC LIMIT 1"
)
.bind(&workflow_id)
.fetch_optional(&pool)
.await
.ok()
.flatten();
let state = snapshot
.and_then(|json| serde_json::from_str::<AgentState>(&json).ok())
.unwrap_or_else(AgentState::new);
// Process feedback: LLM decides whether to revise plan
let state = process_feedback(
&llm, &pool, &broadcast_tx,
&project_id, &workflow_id, state, &content,
).await;
// If there are actionable steps, resume execution
if state.first_actionable_step().is_some() {
ensure_workspace(&exec, &workdir).await;
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: "executing".into(),
});
let _ = sqlx::query("UPDATE workflows SET status = 'executing' WHERE id = ?")
.bind(&workflow_id)
.execute(&pool)
.await;
// Prepare state for execution: set first pending step to Running
let mut state = state;
if let Some(next) = state.first_actionable_step() {
if let Some(step) = state.steps.iter_mut().find(|s| s.order == next) {
if matches!(step.status, StepStatus::Pending) {
step.status = StepStatus::Running;
}
}
state.phase = AgentPhase::Executing { step: next };
state.current_step_chat_history.clear();
}
let instructions = read_instructions(&workdir).await;
let result = run_agent_loop(
&llm, &exec, &pool, &broadcast_tx,
&project_id, &workflow_id, &wf.requirement, &workdir, &mgr,
&instructions, Some(state),
).await;
let final_status = if result.is_ok() { "done" } else { "failed" };
if let Err(e) = &result {
let _ = broadcast_tx.send(WsMessage::Error {
message: format!("Agent error: {}", e),
});
}
let _ = sqlx::query("UPDATE workflows SET status = ? WHERE id = ?")
.bind(final_status)
.bind(&workflow_id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: final_status.into(),
});
// Regenerate report
let log_entries = sqlx::query_as::<_, crate::db::ExecutionLogEntry>(
"SELECT * FROM execution_log WHERE workflow_id = ? ORDER BY created_at"
)
.bind(&workflow_id)
.fetch_all(&pool)
.await
.unwrap_or_default();
if let Ok(report) = generate_report(&llm, &wf.requirement, &log_entries, &project_id).await {
let _ = sqlx::query("UPDATE workflows SET report = ? WHERE id = ?")
.bind(&report)
.bind(&workflow_id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::ReportReady {
workflow_id: workflow_id.clone(),
});
}
} else {
// No actionable steps — feedback was informational only
// Mark workflow back to done
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: "done".into(),
});
}
}
}
}
tracing::info!("Agent loop ended for project {}", project_id);
}
// --- Tool definitions ---
fn make_tool(name: &str, description: &str, parameters: serde_json::Value) -> Tool {
Tool {
tool_type: "function".into(),
function: ToolFunction {
name: name.into(),
description: description.into(),
parameters,
},
}
}
fn tool_read_file() -> Tool {
make_tool("read_file", "读取工作区中的文件内容", serde_json::json!({
"type": "object",
"properties": {
"path": { "type": "string", "description": "工作区内的相对路径" }
},
"required": ["path"]
}))
}
fn tool_list_files() -> Tool {
make_tool("list_files", "列出工作区目录中的文件和子目录", serde_json::json!({
"type": "object",
"properties": {
"path": { "type": "string", "description": "工作区内的相对路径,默认为根目录" }
}
}))
}
fn tool_kb_search() -> Tool {
make_tool("kb_search", "搜索知识库中与查询相关的内容片段。返回最相关的 top-5 片段。", serde_json::json!({
"type": "object",
"properties": {
"query": { "type": "string", "description": "搜索查询" }
},
"required": ["query"]
}))
}
fn tool_kb_read() -> Tool {
make_tool("kb_read", "读取知识库全文内容。", serde_json::json!({
"type": "object",
"properties": {}
}))
}
fn build_planning_tools() -> Vec<Tool> {
vec![
make_tool("update_plan", "设置高层执行计划。分析需求后调用此工具提交计划。每个步骤应是一个逻辑阶段(不是具体命令),包含简短标题和详细描述。调用后自动进入执行阶段。", serde_json::json!({
"type": "object",
"properties": {
"steps": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": { "type": "string", "description": "步骤标题,简短概括(如'搭建环境'" },
"description": { "type": "string", "description": "详细描述,说明具体要做什么、为什么" }
},
"required": ["title", "description"]
},
"description": "高层计划步骤列表"
}
},
"required": ["steps"]
})),
tool_list_files(),
tool_read_file(),
tool_kb_search(),
tool_kb_read(),
]
}
fn build_execution_tools() -> Vec<Tool> {
vec![
make_tool("execute", "在工作区目录中执行 shell 命令", serde_json::json!({
"type": "object",
"properties": {
"command": { "type": "string", "description": "要执行的 shell 命令" }
},
"required": ["command"]
})),
tool_read_file(),
make_tool("write_file", "在工作区中写入文件(自动创建父目录)", serde_json::json!({
"type": "object",
"properties": {
"path": { "type": "string", "description": "工作区内的相对路径" },
"content": { "type": "string", "description": "要写入的文件内容" }
},
"required": ["path", "content"]
})),
tool_list_files(),
make_tool("start_service", "启动后台服务进程(如 FastAPI 应用)。系统会自动分配端口并通过环境变量 PORT 传入。服务启动后可通过 /api/projects/{project_id}/app/ 访问。注意:启动命令应监听 0.0.0.0:$PORT。", serde_json::json!({
"type": "object",
"properties": {
"command": { "type": "string", "description": "启动命令,如 'uvicorn main:app --host 0.0.0.0 --port $PORT'" }
},
"required": ["command"]
})),
make_tool("stop_service", "停止当前项目正在运行的后台服务进程。", serde_json::json!({
"type": "object",
"properties": {}
})),
make_tool("update_requirement", "更新项目需求描述。当用户反馈改变了目标方向时使用,新文本会替换原有需求。", serde_json::json!({
"type": "object",
"properties": {
"requirement": { "type": "string", "description": "更新后的需求描述" }
},
"required": ["requirement"]
})),
make_tool("advance_step", "完成当前步骤并进入下一步。必须提供当前步骤的工作摘要,摘要会传递给后续步骤作为上下文。", serde_json::json!({
"type": "object",
"properties": {
"summary": { "type": "string", "description": "当前步骤的工作摘要(将传递给后续步骤)" }
},
"required": ["summary"]
})),
make_tool("update_scratchpad", "更新跨步骤工作区。后续所有步骤的上下文中都会包含此内容。用于记录关键信息(如文件路径、配置项、重要发现)。新内容会追加到已有内容之后。", serde_json::json!({
"type": "object",
"properties": {
"content": { "type": "string", "description": "要追加的内容" }
},
"required": ["content"]
})),
tool_kb_search(),
tool_kb_read(),
]
}
fn build_planning_prompt(project_id: &str, instructions: &str) -> String {
let mut prompt = include_str!("prompts/planning.md")
.replace("{project_id}", project_id);
if !instructions.is_empty() {
prompt.push_str(&format!("\n\n## 项目模板指令\n\n{}", instructions));
}
prompt
}
fn build_execution_prompt(project_id: &str, instructions: &str) -> String {
let mut prompt = include_str!("prompts/execution.md")
.replace("{project_id}", project_id);
if !instructions.is_empty() {
prompt.push_str(&format!("\n\n## 项目模板指令\n\n{}", instructions));
}
prompt
}
fn build_feedback_prompt(project_id: &str, state: &AgentState, feedback: &str) -> String {
let mut plan_state = String::new();
for s in &state.steps {
let status = match s.status {
StepStatus::Done => " [done]",
StepStatus::Running => " [running]",
StepStatus::Failed => " [FAILED]",
StepStatus::Pending => "",
};
plan_state.push_str(&format!("{}. {}{}\n {}\n", s.order, s.title, status, s.description));
if let Some(summary) = &s.summary {
plan_state.push_str(&format!(" 摘要: {}\n", summary));
}
}
include_str!("prompts/feedback.md")
.replace("{project_id}", project_id)
.replace("{plan_state}", &plan_state)
.replace("{feedback}", feedback)
}
fn build_feedback_tools() -> Vec<Tool> {
vec![
make_tool("revise_plan", "修改执行计划。提供完整步骤列表。系统自动 diffdescription 未变的已完成步骤保留成果,变化的步骤及后续重新执行。", serde_json::json!({
"type": "object",
"properties": {
"steps": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": { "type": "string", "description": "步骤标题" },
"description": { "type": "string", "description": "详细描述" }
},
"required": ["title", "description"]
}
}
},
"required": ["steps"]
})),
]
}
// --- Helpers ---
/// Truncate a string at a char boundary, returning at most `max_bytes` bytes.
fn truncate_str(s: &str, max_bytes: usize) -> &str {
if s.len() <= max_bytes {
return s;
}
let mut end = max_bytes;
while end > 0 && !s.is_char_boundary(end) {
end -= 1;
}
&s[..end]
}
// --- Tool execution ---
async fn execute_tool(
name: &str,
arguments: &str,
workdir: &str,
exec: &LocalExecutor,
) -> String {
let args: serde_json::Value = serde_json::from_str(arguments).unwrap_or_default();
match name {
"execute" => {
let cmd = args["command"].as_str().unwrap_or("");
match exec.execute(cmd, workdir).await {
Ok(r) => {
let mut out = r.stdout;
if !r.stderr.is_empty() {
out.push_str("\nSTDERR: ");
out.push_str(&r.stderr);
}
if r.exit_code != 0 {
out.push_str(&format!("\n[exit code: {}]", r.exit_code));
}
if out.len() > 8000 {
let truncated = truncate_str(&out, 8000).to_string();
out = truncated;
out.push_str("\n...(truncated)");
}
out
}
Err(e) => format!("Error: {}", e),
}
}
"read_file" => {
let path = args["path"].as_str().unwrap_or("");
if path.contains("..") {
return "Error: path traversal not allowed".into();
}
let full = std::path::PathBuf::from(workdir).join(path);
match tokio::fs::read_to_string(&full).await {
Ok(content) => {
if content.len() > 8000 {
format!("{}...(truncated, {} bytes total)", truncate_str(&content, 8000), content.len())
} else {
content
}
}
Err(e) => format!("Error: {}", e),
}
}
"write_file" => {
let path = args["path"].as_str().unwrap_or("");
let content = args["content"].as_str().unwrap_or("");
if path.contains("..") {
return "Error: path traversal not allowed".into();
}
let full = std::path::PathBuf::from(workdir).join(path);
if let Some(parent) = full.parent() {
let _ = tokio::fs::create_dir_all(parent).await;
}
match tokio::fs::write(&full, content).await {
Ok(()) => format!("Written {} bytes to {}", content.len(), path),
Err(e) => format!("Error: {}", e),
}
}
"list_files" => {
let path = args["path"].as_str().unwrap_or(".");
if path.contains("..") {
return "Error: path traversal not allowed".into();
}
let full = std::path::PathBuf::from(workdir).join(path);
match tokio::fs::read_dir(&full).await {
Ok(mut entries) => {
let mut items = Vec::new();
while let Ok(Some(entry)) = entries.next_entry().await {
let name = entry.file_name().to_string_lossy().to_string();
let is_dir = entry.file_type().await.map(|t| t.is_dir()).unwrap_or(false);
items.push(if is_dir { format!("{}/", name) } else { name });
}
items.sort();
if items.is_empty() { "(empty directory)".into() } else { items.join("\n") }
}
Err(e) => format!("Error: {}", e),
}
}
_ => format!("Unknown tool: {}", name),
}
}
// --- Tool-calling agent loop (state machine) ---
/// Save an AgentState snapshot to DB.
async fn save_state_snapshot(pool: &SqlitePool, workflow_id: &str, step_order: i32, state: &AgentState) {
let id = uuid::Uuid::new_v4().to_string();
let json = serde_json::to_string(state).unwrap_or_default();
let _ = sqlx::query(
"INSERT INTO agent_state_snapshots (id, workflow_id, step_order, state_json, created_at) VALUES (?, ?, ?, ?, datetime('now'))"
)
.bind(&id)
.bind(workflow_id)
.bind(step_order)
.bind(&json)
.execute(pool)
.await;
}
/// Log a tool call to execution_log and broadcast to frontend.
async fn log_execution(
pool: &SqlitePool,
broadcast_tx: &broadcast::Sender<WsMessage>,
workflow_id: &str,
step_order: i32,
tool_name: &str,
tool_input: &str,
output: &str,
status: &str,
) {
let id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT INTO execution_log (id, workflow_id, step_order, tool_name, tool_input, output, status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))"
)
.bind(&id)
.bind(workflow_id)
.bind(step_order)
.bind(tool_name)
.bind(tool_input)
.bind(output)
.bind(status)
.execute(pool)
.await;
let _ = broadcast_tx.send(WsMessage::StepStatusUpdate {
step_id: id,
status: status.into(),
output: output.to_string(),
});
}
/// Log an LLM call to llm_call_log and broadcast to frontend.
#[allow(clippy::too_many_arguments)]
async fn log_llm_call(
pool: &SqlitePool,
broadcast_tx: &broadcast::Sender<WsMessage>,
workflow_id: &str,
step_order: i32,
phase: &str,
messages_count: i32,
tools_count: i32,
tool_calls_json: &str,
text_response: &str,
prompt_tokens: Option<u32>,
completion_tokens: Option<u32>,
latency_ms: i64,
) {
let id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT INTO llm_call_log (id, workflow_id, step_order, phase, messages_count, tools_count, tool_calls, text_response, prompt_tokens, completion_tokens, latency_ms, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))"
)
.bind(&id)
.bind(workflow_id)
.bind(step_order)
.bind(phase)
.bind(messages_count)
.bind(tools_count)
.bind(tool_calls_json)
.bind(text_response)
.bind(prompt_tokens.map(|v| v as i32))
.bind(completion_tokens.map(|v| v as i32))
.bind(latency_ms as i32)
.execute(pool)
.await;
let entry = crate::db::LlmCallLogEntry {
id: id.clone(),
workflow_id: workflow_id.to_string(),
step_order,
phase: phase.to_string(),
messages_count,
tools_count,
tool_calls: tool_calls_json.to_string(),
text_response: text_response.to_string(),
prompt_tokens: prompt_tokens.map(|v| v as i32),
completion_tokens: completion_tokens.map(|v| v as i32),
latency_ms: latency_ms as i32,
created_at: String::new(),
};
let _ = broadcast_tx.send(WsMessage::LlmCallLog {
workflow_id: workflow_id.to_string(),
entry,
});
}
/// Process user feedback: call LLM to decide whether to revise the plan.
/// Returns the (possibly modified) AgentState ready for resumed execution.
async fn process_feedback(
llm: &LlmClient,
pool: &SqlitePool,
broadcast_tx: &broadcast::Sender<WsMessage>,
project_id: &str,
workflow_id: &str,
mut state: AgentState,
feedback: &str,
) -> AgentState {
let prompt = build_feedback_prompt(project_id, &state, feedback);
let tools = build_feedback_tools();
let messages = vec![
ChatMessage::system(&prompt),
ChatMessage::user(feedback),
];
tracing::info!("[workflow {}] Processing feedback with LLM", workflow_id);
let response = match llm.chat_with_tools(messages, &tools).await {
Ok(r) => r,
Err(e) => {
tracing::error!("[workflow {}] Feedback LLM call failed: {}", workflow_id, e);
// On failure, attach feedback to first non-done step and return unchanged
if let Some(step) = state.steps.iter_mut().find(|s| !matches!(s.status, StepStatus::Done)) {
step.user_feedbacks.push(feedback.to_string());
}
return state;
}
};
let choice = match response.choices.into_iter().next() {
Some(c) => c,
None => return state,
};
if let Some(tool_calls) = &choice.message.tool_calls {
for tc in tool_calls {
if tc.function.name == "revise_plan" {
let args: serde_json::Value = serde_json::from_str(&tc.function.arguments).unwrap_or_default();
let raw_steps = args["steps"].as_array().cloned().unwrap_or_default();
let new_steps: Vec<Step> = raw_steps.iter().enumerate().map(|(i, item)| {
let order = (i + 1) as i32;
Step {
order,
title: item["title"].as_str().unwrap_or("").to_string(),
description: item["description"].as_str().unwrap_or("").to_string(),
status: StepStatus::Pending,
summary: None,
user_feedbacks: Vec::new(),
db_id: String::new(),
}
}).collect();
// Apply docker-cache diff
state.apply_plan_diff(new_steps);
// Broadcast updated plan
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
workflow_id: workflow_id.to_string(),
steps: plan_infos_from_state(&state),
});
tracing::info!("[workflow {}] Plan revised via feedback. First actionable: {:?}",
workflow_id, state.first_actionable_step());
}
}
} else {
// Text response only — feedback is informational, no plan change
let text = choice.message.content.as_deref().unwrap_or("");
tracing::info!("[workflow {}] Feedback processed, no plan change: {}", workflow_id, truncate_str(text, 200));
log_execution(pool, broadcast_tx, workflow_id, state.current_step(), "text_response", "", text, "done").await;
}
// Attach feedback to the first actionable step (or last step)
let target_order = state.first_actionable_step()
.unwrap_or_else(|| state.steps.last().map(|s| s.order).unwrap_or(0));
if let Some(step) = state.steps.iter_mut().find(|s| s.order == target_order) {
step.user_feedbacks.push(feedback.to_string());
}
// Snapshot after feedback processing
save_state_snapshot(pool, workflow_id, state.current_step(), &state).await;
state
}
#[allow(clippy::too_many_arguments)]
async fn run_agent_loop(
llm: &LlmClient,
exec: &LocalExecutor,
pool: &SqlitePool,
broadcast_tx: &broadcast::Sender<WsMessage>,
project_id: &str,
workflow_id: &str,
requirement: &str,
workdir: &str,
mgr: &Arc<AgentManager>,
instructions: &str,
initial_state: Option<AgentState>,
) -> anyhow::Result<()> {
let planning_tools = build_planning_tools();
let execution_tools = build_execution_tools();
let mut state = initial_state.unwrap_or_else(AgentState::new);
for iteration in 0..80 {
// Build messages and select tools based on current phase
let system_prompt = match &state.phase {
AgentPhase::Planning => build_planning_prompt(project_id, instructions),
AgentPhase::Executing { .. } => build_execution_prompt(project_id, instructions),
AgentPhase::Completed => break,
};
let tools = match &state.phase {
AgentPhase::Planning => &planning_tools,
AgentPhase::Executing { .. } => &execution_tools,
AgentPhase::Completed => break,
};
let messages = state.build_messages(&system_prompt, requirement);
let msg_count = messages.len() as i32;
let tool_count = tools.len() as i32;
let phase_label = match &state.phase {
AgentPhase::Planning => "planning".to_string(),
AgentPhase::Executing { step } => format!("executing({})", step),
AgentPhase::Completed => "completed".to_string(),
};
tracing::info!("[workflow {}] LLM call #{} phase={:?} msgs={}", workflow_id, iteration + 1, state.phase, messages.len());
let call_start = std::time::Instant::now();
let response = match llm.chat_with_tools(messages, tools).await {
Ok(r) => r,
Err(e) => {
tracing::error!("[workflow {}] LLM call failed: {}", workflow_id, e);
return Err(e);
}
};
let latency_ms = call_start.elapsed().as_millis() as i64;
let (prompt_tokens, completion_tokens) = response.usage.as_ref()
.map(|u| (Some(u.prompt_tokens), Some(u.completion_tokens)))
.unwrap_or((None, None));
let choice = response.choices.into_iter().next()
.ok_or_else(|| anyhow::anyhow!("No response from LLM"))?;
// Add assistant message to chat history
state.current_step_chat_history.push(choice.message.clone());
// Collect text_response for logging
let llm_text_response = choice.message.content.clone().unwrap_or_default();
if let Some(tool_calls) = &choice.message.tool_calls {
tracing::info!("[workflow {}] Tool calls: {}", workflow_id,
tool_calls.iter().map(|tc| tc.function.name.as_str()).collect::<Vec<_>>().join(", "));
let mut phase_transition = false;
for tc in tool_calls {
if phase_transition {
state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, "(skipped: phase transition)"));
continue;
}
let args: serde_json::Value = serde_json::from_str(&tc.function.arguments).unwrap_or_default();
let cur = state.current_step();
match tc.function.name.as_str() {
"update_plan" => {
let raw_steps = args["steps"].as_array().cloned().unwrap_or_default();
state.steps.clear();
for (i, item) in raw_steps.iter().enumerate() {
let order = (i + 1) as i32;
let title = item["title"].as_str().unwrap_or(item.as_str().unwrap_or("")).to_string();
let detail = item["description"].as_str().unwrap_or("").to_string();
state.steps.push(Step {
order,
title,
description: detail,
status: StepStatus::Pending,
summary: None,
user_feedbacks: Vec::new(),
db_id: String::new(),
});
}
// Transition: Planning → Executing(1)
if let Some(first) = state.steps.first_mut() {
first.status = StepStatus::Running;
}
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
workflow_id: workflow_id.to_string(),
steps: plan_infos_from_state(&state),
});
state.current_step_chat_history.clear();
state.phase = AgentPhase::Executing { step: 1 };
phase_transition = true;
// Snapshot after plan is set
save_state_snapshot(pool, workflow_id, 0, &state).await;
tracing::info!("[workflow {}] Plan set ({} steps), entering Executing(1)", workflow_id, state.steps.len());
}
"advance_step" => {
let summary = args["summary"].as_str().unwrap_or("").to_string();
// Mark current step done with summary
if let Some(step) = state.steps.iter_mut().find(|s| s.order == cur) {
step.status = StepStatus::Done;
step.summary = Some(summary);
}
// Move to next step or complete
let next = cur + 1;
if let Some(next_step) = state.steps.iter_mut().find(|s| s.order == next) {
next_step.status = StepStatus::Running;
state.phase = AgentPhase::Executing { step: next };
tracing::info!("[workflow {}] Advanced to step {}", workflow_id, next);
} else {
state.phase = AgentPhase::Completed;
tracing::info!("[workflow {}] All steps completed", workflow_id);
}
state.current_step_chat_history.clear();
phase_transition = true;
// Broadcast step status change to frontend
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
workflow_id: workflow_id.to_string(),
steps: plan_infos_from_state(&state),
});
// Snapshot on step transition
save_state_snapshot(pool, workflow_id, cur, &state).await;
}
"update_scratchpad" => {
let content = args["content"].as_str().unwrap_or("");
if !state.scratchpad.is_empty() {
state.scratchpad.push('\n');
}
state.scratchpad.push_str(content);
state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, "Scratchpad 已更新。"));
}
"update_requirement" => {
let new_req = args["requirement"].as_str().unwrap_or("");
let _ = sqlx::query("UPDATE workflows SET requirement = ? WHERE id = ?")
.bind(new_req)
.bind(workflow_id)
.execute(pool)
.await;
let _ = tokio::fs::write(format!("{}/requirement.md", workdir), new_req).await;
let _ = broadcast_tx.send(WsMessage::RequirementUpdate {
workflow_id: workflow_id.to_string(),
requirement: new_req.to_string(),
});
state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, "需求已更新。"));
}
"start_service" => {
let cmd = args["command"].as_str().unwrap_or("");
{
let mut services = mgr.services.write().await;
if let Some(old) = services.remove(project_id) {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(old.pid as i32),
nix::sys::signal::Signal::SIGTERM,
);
}
}
let port = mgr.allocate_port();
let cmd_with_port = cmd.replace("$PORT", &port.to_string());
let venv_bin = format!("{}/.venv/bin", workdir);
let path_env = match std::env::var("PATH") {
Ok(p) => format!("{}:{}", venv_bin, p),
Err(_) => venv_bin,
};
let result = match tokio::process::Command::new("sh")
.arg("-c")
.arg(&cmd_with_port)
.current_dir(workdir)
.env("PORT", port.to_string())
.env("PATH", &path_env)
.env("VIRTUAL_ENV", format!("{}/.venv", workdir))
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
{
Ok(child) => {
let pid = child.id().unwrap_or(0);
mgr.services.write().await.insert(project_id.to_string(), ServiceInfo { port, pid });
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
format!("服务已启动,端口 {},访问地址:/api/projects/{}/app/", port, project_id)
}
Err(e) => format!("Error: 启动失败:{}", e),
};
let status = if result.starts_with("Error:") { "failed" } else { "done" };
log_execution(pool, broadcast_tx, workflow_id, cur, "start_service", cmd, &result, status).await;
state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
}
"stop_service" => {
let mut services = mgr.services.write().await;
let result = if let Some(svc) = services.remove(project_id) {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(svc.pid as i32),
nix::sys::signal::Signal::SIGTERM,
);
"服务已停止。".to_string()
} else {
"当前没有运行中的服务。".to_string()
};
log_execution(pool, broadcast_tx, workflow_id, cur, "stop_service", "", &result, "done").await;
state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
}
"kb_search" => {
let query = args["query"].as_str().unwrap_or("");
let result = if let Some(kb) = &mgr.kb {
match kb.search(query).await {
Ok(results) if results.is_empty() => "知识库为空或没有匹配结果。".to_string(),
Ok(results) => {
results.iter().enumerate().map(|(i, r)| {
let article_label = if r.article_title.is_empty() {
String::new()
} else {
format!(" [文章: {}]", r.article_title)
};
format!("--- 片段 {} (相似度: {:.2}){} ---\n{}", i + 1, r.score, article_label, r.content)
}).collect::<Vec<_>>().join("\n\n")
}
Err(e) => format!("Error: {}", e),
}
} else {
"知识库未初始化。".to_string()
};
state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
}
"kb_read" => {
let result = if let Some(kb) = &mgr.kb {
match kb.read_all().await {
Ok(content) if content.is_empty() => "知识库为空。".to_string(),
Ok(content) => content,
Err(e) => format!("Error: {}", e),
}
} else {
"知识库未初始化。".to_string()
};
state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
}
// IO tools: execute, read_file, write_file, list_files
_ => {
let result = execute_tool(&tc.function.name, &tc.function.arguments, workdir, exec).await;
let status = if result.starts_with("Error:") { "failed" } else { "done" };
log_execution(pool, broadcast_tx, workflow_id, cur, &tc.function.name, &tc.function.arguments, &result, status).await;
state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
}
}
}
// Build tool_calls JSON for LLM call log
let tc_json: Vec<serde_json::Value> = tool_calls.iter().map(|tc| {
serde_json::json!({
"name": tc.function.name,
"arguments_preview": truncate_str(&tc.function.arguments, 200),
})
}).collect();
let tc_json_str = serde_json::to_string(&tc_json).unwrap_or_else(|_| "[]".to_string());
log_llm_call(
pool, broadcast_tx, workflow_id, state.current_step(),
&phase_label, msg_count, tool_count,
&tc_json_str, &llm_text_response,
prompt_tokens, completion_tokens, latency_ms,
).await;
if phase_transition {
continue;
}
} else {
// No tool calls — LLM sent a text response
let content = choice.message.content.as_deref().unwrap_or("(no content)");
tracing::info!("[workflow {}] LLM text response: {}", workflow_id, truncate_str(content, 200));
// Log text response to execution_log for frontend display
log_execution(pool, broadcast_tx, workflow_id, state.current_step(), "text_response", "", content, "done").await;
log_llm_call(
pool, broadcast_tx, workflow_id, state.current_step(),
&phase_label, msg_count, tool_count,
"[]", content,
prompt_tokens, completion_tokens, latency_ms,
).await;
// Text response does NOT end the workflow. Only advance_step progresses.
// In Planning phase, LLM may be thinking before calling update_plan — just continue.
}
if matches!(state.phase, AgentPhase::Completed) {
break;
}
}
// Final snapshot
save_state_snapshot(pool, workflow_id, state.current_step(), &state).await;
Ok(())
}
async fn generate_report(
llm: &LlmClient,
requirement: &str,
entries: &[crate::db::ExecutionLogEntry],
project_id: &str,
) -> anyhow::Result<String> {
let steps_detail: String = entries
.iter()
.map(|e| {
let output_preview = if e.output.len() > 2000 {
format!("{}...(truncated)", truncate_str(&e.output, 2000))
} else {
e.output.clone()
};
format!(
"### [{}] {} (step {})\nInput: `{}`\nOutput:\n```\n{}\n```\n",
e.status, e.tool_name, e.step_order, truncate_str(&e.tool_input, 500), output_preview
)
})
.collect::<Vec<_>>()
.join("\n");
let system_prompt = include_str!("prompts/report.md")
.replace("{project_id}", project_id);
let user_msg = format!(
"需求:\n{}\n\n执行详情:\n{}",
requirement, steps_detail
);
let report = llm
.chat(vec![
ChatMessage::system(&system_prompt),
ChatMessage::user(&user_msg),
])
.await?;
Ok(report)
}
async fn generate_title(llm: &LlmClient, requirement: &str) -> anyhow::Result<String> {
let response = llm
.chat(vec![
ChatMessage::system("为给定的需求生成一个简短的项目标题最多15个汉字。只回复标题本身不要加任何其他内容。使用中文。"),
ChatMessage::user(requirement),
])
.await?;
let mut title = response.trim().trim_matches('"').to_string();
// Hard limit: if LLM returns garbage, take only the first line, max 80 chars
if let Some(first_line) = title.lines().next() {
title = first_line.to_string();
}
if title.len() > 80 {
title = truncate_str(&title, 80).to_string();
}
Ok(title)
}