diff --git a/src/agent.rs b/src/agent.rs index 216e2d3..4add026 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -12,7 +12,7 @@ use crate::template::{self, LoadedTemplate}; use crate::tools::ExternalToolManager; use crate::LlmConfig; -use crate::state::{AgentState, AgentPhase, Step, StepStatus}; +use crate::state::{AgentState, AgentPhase, Step, StepStatus, StepResult, StepResultStatus, check_scratchpad_size}; pub struct ServiceInfo { pub port: u16, @@ -558,7 +558,45 @@ fn build_planning_tools() -> Vec { ] } -fn build_execution_tools() -> Vec { +/// Coordinator tools — used by the main loop after step completion +fn build_coordinator_tools() -> Vec { + vec![ + make_tool("update_plan", "修改执行计划。提供完整步骤列表,系统自动 diff:description 未变的已完成步骤保留成果,变化的步骤及后续重新执行。", serde_json::json!({ + "type": "object", + "properties": { + "steps": { + "type": "array", + "items": { + "type": "object", + "properties": { + "title": { "type": "string", "description": "步骤标题" }, + "description": { "type": "string", "description": "详细描述" } + }, + "required": ["title", "description"] + } + } + }, + "required": ["steps"] + })), + make_tool("update_scratchpad", "更新全局备忘录。跨步骤持久化的关键信息。新内容会追加到已有内容之后。", serde_json::json!({ + "type": "object", + "properties": { + "content": { "type": "string", "description": "要追加的内容" } + }, + "required": ["content"] + })), + make_tool("update_requirement", "更新项目需求描述。当需要调整方向时使用。", serde_json::json!({ + "type": "object", + "properties": { + "requirement": { "type": "string", "description": "更新后的需求描述" } + }, + "required": ["requirement"] + })), + ] +} + +/// Step execution tools — used by run_step_loop +fn build_step_tools() -> Vec { vec![ make_tool("execute", "在工作区目录中执行 shell 命令", serde_json::json!({ "type": "object", @@ -588,34 +626,27 @@ fn build_execution_tools() -> Vec { "type": "object", "properties": {} })), - make_tool("update_requirement", "更新项目需求描述。当用户反馈改变了目标方向时使用,新文本会替换原有需求。", serde_json::json!({ - "type": "object", - "properties": { - "requirement": { "type": "string", "description": "更新后的需求描述" } - }, - "required": ["requirement"] - })), - make_tool("advance_step", "完成当前步骤并进入下一步。必须提供当前步骤的工作摘要,摘要会传递给后续步骤作为上下文。", serde_json::json!({ - "type": "object", - "properties": { - "summary": { "type": "string", "description": "当前步骤的工作摘要(将传递给后续步骤)" } - }, - "required": ["summary"] - })), - make_tool("update_scratchpad", "更新跨步骤工作区。后续所有步骤的上下文中都会包含此内容。用于记录关键信息(如文件路径、配置项、重要发现)。新内容会追加到已有内容之后。", serde_json::json!({ + make_tool("update_scratchpad", "更新步骤级工作记忆。用于记录本步骤内的中间状态(步骤结束后丢弃,精华写进 summary)。不是日志,只保留当前有用的信息。", serde_json::json!({ "type": "object", "properties": { "content": { "type": "string", "description": "要追加的内容" } }, "required": ["content"] })), - make_tool("wait_for_approval", "暂停执行,等待用户确认后继续。用于关键决策点,如:确认方案、确认配置变更、确认危险操作等。用户的回复会作为反馈返回。", serde_json::json!({ + make_tool("wait_for_approval", "暂停执行,等待用户确认后继续。用于关键决策点。", serde_json::json!({ "type": "object", "properties": { "reason": { "type": "string", "description": "说明为什么需要用户确认" } }, "required": ["reason"] })), + make_tool("step_done", "完成当前步骤。必须提供摘要,概括本步骤做了什么、结果如何。", serde_json::json!({ + "type": "object", + "properties": { + "summary": { "type": "string", "description": "本步骤的工作摘要" } + }, + "required": ["summary"] + })), tool_kb_search(), tool_kb_read(), ] @@ -630,7 +661,7 @@ fn build_planning_prompt(project_id: &str, instructions: &str) -> String { prompt } -fn build_execution_prompt(project_id: &str, instructions: &str) -> String { +fn build_coordinator_prompt(project_id: &str, instructions: &str) -> String { let mut prompt = include_str!("prompts/execution.md") .replace("{project_id}", project_id); if !instructions.is_empty() { @@ -639,6 +670,48 @@ fn build_execution_prompt(project_id: &str, instructions: &str) -> String { prompt } +fn build_step_execution_prompt(project_id: &str, instructions: &str) -> String { + let mut prompt = include_str!("prompts/step_execution.md") + .replace("{project_id}", project_id); + if !instructions.is_empty() { + prompt.push_str(&format!("\n\n## 项目模板指令\n\n{}", instructions)); + } + prompt +} + +/// Build user message for a step sub-loop +fn build_step_user_message(step: &Step, completed_summaries: &[(i32, String, String)], parent_scratchpad: &str) -> String { + let mut ctx = String::new(); + + ctx.push_str(&format!("## 当前步骤(步骤 {})\n", step.order)); + ctx.push_str(&format!("标题:{}\n", step.title)); + ctx.push_str(&format!("描述:{}\n", step.description)); + + if !step.user_feedbacks.is_empty() { + ctx.push_str("\n用户反馈:\n"); + for fb in &step.user_feedbacks { + ctx.push_str(&format!("- {}\n", fb)); + } + } + ctx.push('\n'); + + if !completed_summaries.is_empty() { + ctx.push_str("## 已完成步骤摘要\n"); + for (order, title, summary) in completed_summaries { + ctx.push_str(&format!("- 步骤 {} ({}): {}\n", order, title, summary)); + } + ctx.push('\n'); + } + + if !parent_scratchpad.is_empty() { + ctx.push_str("## 项目备忘录(只读)\n"); + ctx.push_str(parent_scratchpad); + ctx.push('\n'); + } + + ctx +} + fn build_feedback_prompt(project_id: &str, state: &AgentState, feedback: &str) -> String { let mut plan_state = String::new(); for s in &state.steps { @@ -977,58 +1050,67 @@ async fn process_feedback( state } +/// Run an isolated sub-loop for a single step. Returns StepResult. #[allow(clippy::too_many_arguments)] -async fn run_agent_loop( +async fn run_step_loop( llm: &LlmClient, exec: &LocalExecutor, pool: &SqlitePool, broadcast_tx: &broadcast::Sender, project_id: &str, workflow_id: &str, - requirement: &str, workdir: &str, mgr: &Arc, instructions: &str, - initial_state: Option, + step: &Step, + completed_summaries: &[(i32, String, String)], + parent_scratchpad: &str, external_tools: Option<&ExternalToolManager>, event_rx: &mut mpsc::Receiver, -) -> anyhow::Result<()> { - let planning_tools = build_planning_tools(); - let mut execution_tools = build_execution_tools(); +) -> StepResult { + let system_prompt = build_step_execution_prompt(project_id, instructions); + let user_message = build_step_user_message(step, completed_summaries, parent_scratchpad); + + let mut step_tools = build_step_tools(); if let Some(ext) = external_tools { - execution_tools.extend(ext.tool_definitions()); + step_tools.extend(ext.tool_definitions()); } - let mut state = initial_state.unwrap_or_else(AgentState::new); + let mut step_chat_history: Vec = Vec::new(); + let mut step_scratchpad = String::new(); + let step_order = step.order; + + for iteration in 0..50 { + // Build messages: system + user context + chat history + let mut messages = vec![ + ChatMessage::system(&system_prompt), + ChatMessage::user(&user_message), + ]; + // If step scratchpad is non-empty, inject it + if !step_scratchpad.is_empty() { + let last_user = messages.len() - 1; + if let Some(content) = &messages[last_user].content { + let mut amended = content.clone(); + amended.push_str(&format!("\n\n## 步骤工作记忆\n{}", step_scratchpad)); + messages[last_user].content = Some(amended); + } + } + messages.extend(step_chat_history.clone()); - for iteration in 0..80 { - // Build messages and select tools based on current phase - let system_prompt = match &state.phase { - AgentPhase::Planning => build_planning_prompt(project_id, instructions), - AgentPhase::Executing { .. } => build_execution_prompt(project_id, instructions), - AgentPhase::Completed => break, - }; - let tools = match &state.phase { - AgentPhase::Planning => &planning_tools, - AgentPhase::Executing { .. } => &execution_tools, - AgentPhase::Completed => break, - }; - let messages = state.build_messages(&system_prompt, requirement); let msg_count = messages.len() as i32; - let tool_count = tools.len() as i32; - let phase_label = match &state.phase { - AgentPhase::Planning => "planning".to_string(), - AgentPhase::Executing { step } => format!("executing({})", step), - AgentPhase::Completed => "completed".to_string(), - }; + let tool_count = step_tools.len() as i32; + let phase_label = format!("step({})", step_order); - tracing::info!("[workflow {}] LLM call #{} phase={:?} msgs={}", workflow_id, iteration + 1, state.phase, messages.len()); + tracing::info!("[workflow {}] Step {} LLM call #{} msgs={}", workflow_id, step_order, iteration + 1, messages.len()); let call_start = std::time::Instant::now(); - let response = match llm.chat_with_tools(messages, tools).await { + let response = match llm.chat_with_tools(messages, &step_tools).await { Ok(r) => r, Err(e) => { - tracing::error!("[workflow {}] LLM call failed: {}", workflow_id, e); - return Err(e); + tracing::error!("[workflow {}] Step {} LLM call failed: {}", workflow_id, step_order, e); + return StepResult { + status: StepResultStatus::Failed { error: format!("LLM call failed: {}", e) }, + summary: format!("LLM 调用失败: {}", e), + }; } }; let latency_ms = call_start.elapsed().as_millis() as i64; @@ -1037,123 +1119,71 @@ async fn run_agent_loop( .map(|u| (Some(u.prompt_tokens), Some(u.completion_tokens))) .unwrap_or((None, None)); - let choice = response.choices.into_iter().next() - .ok_or_else(|| anyhow::anyhow!("No response from LLM"))?; + let choice = match response.choices.into_iter().next() { + Some(c) => c, + None => { + return StepResult { + status: StepResultStatus::Failed { error: "No response from LLM".into() }, + summary: "LLM 无响应".into(), + }; + } + }; - // Add assistant message to chat history - state.current_step_chat_history.push(choice.message.clone()); + step_chat_history.push(choice.message.clone()); - // Collect text_response for logging let llm_text_response = choice.message.content.clone().unwrap_or_default(); if let Some(tool_calls) = &choice.message.tool_calls { - tracing::info!("[workflow {}] Tool calls: {}", workflow_id, + tracing::info!("[workflow {}] Step {} tool calls: {}", workflow_id, step_order, tool_calls.iter().map(|tc| tc.function.name.as_str()).collect::>().join(", ")); - let mut phase_transition = false; + let mut step_done_result: Option = None; for tc in tool_calls { - if phase_transition { - state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, "(skipped: phase transition)")); + if step_done_result.is_some() { + step_chat_history.push(ChatMessage::tool_result(&tc.id, "(skipped: step completed)")); continue; } let args: serde_json::Value = serde_json::from_str(&tc.function.arguments).unwrap_or_default(); - let cur = state.current_step(); match tc.function.name.as_str() { - "update_plan" => { - let raw_steps = args["steps"].as_array().cloned().unwrap_or_default(); - - state.steps.clear(); - - for (i, item) in raw_steps.iter().enumerate() { - let order = (i + 1) as i32; - let title = item["title"].as_str().unwrap_or(item.as_str().unwrap_or("")).to_string(); - let detail = item["description"].as_str().unwrap_or("").to_string(); - - state.steps.push(Step { - order, - title, - description: detail, - status: StepStatus::Pending, - summary: None, - user_feedbacks: Vec::new(), - db_id: String::new(), - }); - } - - // Transition: Planning → Executing(1) - if let Some(first) = state.steps.first_mut() { - first.status = StepStatus::Running; - } - - let _ = broadcast_tx.send(WsMessage::PlanUpdate { - workflow_id: workflow_id.to_string(), - steps: plan_infos_from_state(&state), - }); - - state.current_step_chat_history.clear(); - state.phase = AgentPhase::Executing { step: 1 }; - phase_transition = true; - - // Snapshot after plan is set - save_state_snapshot(pool, workflow_id, 0, &state).await; - tracing::info!("[workflow {}] Plan set ({} steps), entering Executing(1)", workflow_id, state.steps.len()); - } - - "advance_step" => { + "step_done" => { let summary = args["summary"].as_str().unwrap_or("").to_string(); - - // Mark current step done with summary - if let Some(step) = state.steps.iter_mut().find(|s| s.order == cur) { - step.status = StepStatus::Done; - step.summary = Some(summary); - } - - // Move to next step or complete - let next = cur + 1; - if let Some(next_step) = state.steps.iter_mut().find(|s| s.order == next) { - next_step.status = StepStatus::Running; - state.phase = AgentPhase::Executing { step: next }; - tracing::info!("[workflow {}] Advanced to step {}", workflow_id, next); - } else { - state.phase = AgentPhase::Completed; - tracing::info!("[workflow {}] All steps completed", workflow_id); - } - - state.current_step_chat_history.clear(); - phase_transition = true; - - // Broadcast step status change to frontend - let _ = broadcast_tx.send(WsMessage::PlanUpdate { - workflow_id: workflow_id.to_string(), - steps: plan_infos_from_state(&state), + log_execution(pool, broadcast_tx, workflow_id, step_order, "step_done", &summary, "步骤完成", "done").await; + step_chat_history.push(ChatMessage::tool_result(&tc.id, "步骤已完成。")); + step_done_result = Some(StepResult { + status: StepResultStatus::Done, + summary, }); - - // Snapshot on step transition - save_state_snapshot(pool, workflow_id, cur, &state).await; } "update_scratchpad" => { let content = args["content"].as_str().unwrap_or(""); - if !state.scratchpad.is_empty() { - state.scratchpad.push('\n'); + let mut new_pad = step_scratchpad.clone(); + if !new_pad.is_empty() { + new_pad.push('\n'); + } + new_pad.push_str(content); + match check_scratchpad_size(&new_pad) { + Ok(()) => { + step_scratchpad = new_pad; + step_chat_history.push(ChatMessage::tool_result(&tc.id, "步骤工作记忆已更新。")); + } + Err(msg) => { + step_chat_history.push(ChatMessage::tool_result(&tc.id, &msg)); + } } - state.scratchpad.push_str(content); - state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, "Scratchpad 已更新。")); } "wait_for_approval" => { let reason = args["reason"].as_str().unwrap_or("等待确认"); - // Mark step as WaitingApproval - if let Some(step) = state.steps.iter_mut().find(|s| s.order == cur) { - step.status = StepStatus::WaitingApproval; - } + // Broadcast waiting status let _ = broadcast_tx.send(WsMessage::PlanUpdate { workflow_id: workflow_id.to_string(), - steps: plan_infos_from_state(&state), + steps: plan_infos_from_state_with_override(step_order, "waiting_approval", + pool, workflow_id).await, }); let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate { workflow_id: workflow_id.to_string(), @@ -1163,47 +1193,44 @@ async fn run_agent_loop( .bind(workflow_id) .execute(pool) .await; - save_state_snapshot(pool, workflow_id, cur, &state).await; - log_execution(pool, broadcast_tx, workflow_id, cur, "wait_for_approval", reason, reason, "waiting").await; + log_execution(pool, broadcast_tx, workflow_id, step_order, "wait_for_approval", reason, reason, "waiting").await; - tracing::info!("[workflow {}] Waiting for approval: {}", workflow_id, reason); + tracing::info!("[workflow {}] Step {} waiting for approval: {}", workflow_id, step_order, reason); - // Block until we receive a Comment event + // Block until Comment event let approval_content = loop { match event_rx.recv().await { Some(AgentEvent::Comment { content, .. }) => break content, Some(_) => continue, - None => return Err(anyhow::anyhow!("Event channel closed while waiting for approval")), + None => { + return StepResult { + status: StepResultStatus::Failed { error: "Event channel closed".into() }, + summary: "事件通道关闭".into(), + }; + } } }; - tracing::info!("[workflow {}] Approval response: {}", workflow_id, approval_content); + tracing::info!("[workflow {}] Step {} approval response: {}", workflow_id, step_order, approval_content); - // Check if user rejected if approval_content.starts_with("rejected:") { let reason = approval_content.strip_prefix("rejected:").unwrap_or("").trim(); - tracing::info!("[workflow {}] User rejected: {}", workflow_id, reason); - if let Some(step) = state.steps.iter_mut().find(|s| s.order == cur) { - step.status = StepStatus::Failed; - step.user_feedbacks.push(format!("用户终止: {}", reason)); - } - log_execution(pool, broadcast_tx, workflow_id, cur, "wait_for_approval", "rejected", reason, "failed").await; - // Return error to end the agent loop; caller sets workflow to "failed" - return Err(anyhow::anyhow!("用户终止了执行: {}", reason)); + log_execution(pool, broadcast_tx, workflow_id, step_order, "wait_for_approval", "rejected", reason, "failed").await; + step_chat_history.push(ChatMessage::tool_result(&tc.id, &format!("用户拒绝: {}", reason))); + step_done_result = Some(StepResult { + status: StepResultStatus::Failed { error: format!("用户终止: {}", reason) }, + summary: format!("用户终止了执行: {}", reason), + }); + continue; } - // Approved — extract feedback after "approved:" prefix if present + // Approved let feedback = if approval_content.starts_with("approved:") { approval_content.strip_prefix("approved:").unwrap_or("").trim().to_string() } else { approval_content.clone() }; - // Resume: restore Running status - if let Some(step) = state.steps.iter_mut().find(|s| s.order == cur) { - step.status = StepStatus::Running; - step.user_feedbacks.push(approval_content.clone()); - } let _ = sqlx::query("UPDATE workflows SET status = 'executing' WHERE id = ?") .bind(workflow_id) .execute(pool) @@ -1212,34 +1239,13 @@ async fn run_agent_loop( workflow_id: workflow_id.to_string(), status: "executing".into(), }); - let _ = broadcast_tx.send(WsMessage::PlanUpdate { - workflow_id: workflow_id.to_string(), - steps: plan_infos_from_state(&state), - }); let tool_msg = if feedback.is_empty() { "用户已确认,继续执行。".to_string() } else { format!("用户已确认。反馈: {}", feedback) }; - state.current_step_chat_history.push( - ChatMessage::tool_result(&tc.id, &tool_msg) - ); - } - - "update_requirement" => { - let new_req = args["requirement"].as_str().unwrap_or(""); - let _ = sqlx::query("UPDATE workflows SET requirement = ? WHERE id = ?") - .bind(new_req) - .bind(workflow_id) - .execute(pool) - .await; - let _ = tokio::fs::write(format!("{}/requirement.md", workdir), new_req).await; - let _ = broadcast_tx.send(WsMessage::RequirementUpdate { - workflow_id: workflow_id.to_string(), - requirement: new_req.to_string(), - }); - state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, "需求已更新。")); + step_chat_history.push(ChatMessage::tool_result(&tc.id, &tool_msg)); } "start_service" => { @@ -1280,8 +1286,8 @@ async fn run_agent_loop( Err(e) => format!("Error: 启动失败:{}", e), }; let status = if result.starts_with("Error:") { "failed" } else { "done" }; - log_execution(pool, broadcast_tx, workflow_id, cur, "start_service", cmd, &result, status).await; - state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); + log_execution(pool, broadcast_tx, workflow_id, step_order, "start_service", cmd, &result, status).await; + step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); } "stop_service" => { @@ -1295,8 +1301,8 @@ async fn run_agent_loop( } else { "当前没有运行中的服务。".to_string() }; - log_execution(pool, broadcast_tx, workflow_id, cur, "stop_service", "", &result, "done").await; - state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); + log_execution(pool, broadcast_tx, workflow_id, step_order, "stop_service", "", &result, "done").await; + step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); } "kb_search" => { @@ -1319,7 +1325,7 @@ async fn run_agent_loop( } else { "知识库未初始化。".to_string() }; - state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); + step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); } "kb_read" => { @@ -1332,10 +1338,10 @@ async fn run_agent_loop( } else { "知识库未初始化。".to_string() }; - state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); + step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); } - // External tools (convention-based) + // External tools name if external_tools.as_ref().is_some_and(|e| e.has_tool(name)) => { let result = match external_tools.unwrap().invoke(name, &tc.function.arguments, workdir).await { Ok(output) => { @@ -1345,21 +1351,21 @@ async fn run_agent_loop( Err(e) => format!("Tool error: {}", e), }; let status = if result.starts_with("Tool error:") { "failed" } else { "done" }; - log_execution(pool, broadcast_tx, workflow_id, cur, &tc.function.name, &tc.function.arguments, &result, status).await; - state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); + log_execution(pool, broadcast_tx, workflow_id, step_order, &tc.function.name, &tc.function.arguments, &result, status).await; + step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); } // IO tools: execute, read_file, write_file, list_files _ => { let result = execute_tool(&tc.function.name, &tc.function.arguments, workdir, exec).await; let status = if result.starts_with("Error:") { "failed" } else { "done" }; - log_execution(pool, broadcast_tx, workflow_id, cur, &tc.function.name, &tc.function.arguments, &result, status).await; - state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); + log_execution(pool, broadcast_tx, workflow_id, step_order, &tc.function.name, &tc.function.arguments, &result, status).await; + step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); } } } - // Build tool_calls JSON for LLM call log + // Log LLM call let tc_json: Vec = tool_calls.iter().map(|tc| { serde_json::json!({ "name": tc.function.name, @@ -1367,39 +1373,399 @@ async fn run_agent_loop( }) }).collect(); let tc_json_str = serde_json::to_string(&tc_json).unwrap_or_else(|_| "[]".to_string()); - log_llm_call( - pool, broadcast_tx, workflow_id, state.current_step(), + pool, broadcast_tx, workflow_id, step_order, &phase_label, msg_count, tool_count, &tc_json_str, &llm_text_response, prompt_tokens, completion_tokens, latency_ms, ).await; - if phase_transition { - continue; + if let Some(result) = step_done_result { + return result; } } else { - // No tool calls — LLM sent a text response + // Text response without tool calls let content = choice.message.content.as_deref().unwrap_or("(no content)"); - tracing::info!("[workflow {}] LLM text response: {}", workflow_id, truncate_str(content, 200)); - - // Log text response to execution_log for frontend display - log_execution(pool, broadcast_tx, workflow_id, state.current_step(), "text_response", "", content, "done").await; - + tracing::info!("[workflow {}] Step {} text response: {}", workflow_id, step_order, truncate_str(content, 200)); + log_execution(pool, broadcast_tx, workflow_id, step_order, "text_response", "", content, "done").await; log_llm_call( - pool, broadcast_tx, workflow_id, state.current_step(), + pool, broadcast_tx, workflow_id, step_order, &phase_label, msg_count, tool_count, "[]", content, prompt_tokens, completion_tokens, latency_ms, ).await; + // Text response in step loop — continue, LLM may follow up with tool calls + } + } - // Text response does NOT end the workflow. Only advance_step progresses. - // In Planning phase, LLM may be thinking before calling update_plan — just continue. + // Hit 50-iteration limit + tracing::warn!("[workflow {}] Step {} hit iteration limit (50)", workflow_id, step_order); + StepResult { + status: StepResultStatus::Failed { error: "步骤迭代次数超限(50轮)".into() }, + summary: "步骤执行超过50轮迭代限制,未能完成".into(), + } +} + +/// Helper to get plan step infos with a status override for a specific step. +/// Used during wait_for_approval in the step sub-loop where we don't have +/// mutable access to the AgentState. +async fn plan_infos_from_state_with_override( + step_order: i32, + override_status: &str, + pool: &SqlitePool, + workflow_id: &str, +) -> Vec { + // Read the latest state snapshot to get step info + let snapshot = sqlx::query_scalar::<_, String>( + "SELECT state_json FROM agent_state_snapshots WHERE workflow_id = ? ORDER BY created_at DESC LIMIT 1" + ) + .bind(workflow_id) + .fetch_optional(pool) + .await + .ok() + .flatten(); + + if let Some(json) = snapshot { + if let Ok(state) = serde_json::from_str::(&json) { + return state.steps.iter().map(|s| { + let status = if s.order == step_order { + override_status.to_string() + } else { + match s.status { + StepStatus::Pending => "pending", + StepStatus::Running => "running", + StepStatus::WaitingApproval => "waiting_approval", + StepStatus::Done => "done", + StepStatus::Failed => "failed", + }.to_string() + }; + PlanStepInfo { + order: s.order, + description: s.title.clone(), + command: s.description.clone(), + status: Some(status), + } + }).collect(); + } + } + Vec::new() +} + +#[allow(clippy::too_many_arguments)] +async fn run_agent_loop( + llm: &LlmClient, + exec: &LocalExecutor, + pool: &SqlitePool, + broadcast_tx: &broadcast::Sender, + project_id: &str, + workflow_id: &str, + requirement: &str, + workdir: &str, + mgr: &Arc, + instructions: &str, + initial_state: Option, + external_tools: Option<&ExternalToolManager>, + event_rx: &mut mpsc::Receiver, +) -> anyhow::Result<()> { + let planning_tools = build_planning_tools(); + let coordinator_tools = build_coordinator_tools(); + + let mut state = initial_state.unwrap_or_else(AgentState::new); + + // --- Planning phase loop --- + // Keep iterating until we transition out of Planning + for iteration in 0..20 { + if !matches!(state.phase, AgentPhase::Planning) { + break; } - if matches!(state.phase, AgentPhase::Completed) { + let system_prompt = build_planning_prompt(project_id, instructions); + let messages = state.build_messages(&system_prompt, requirement); + let msg_count = messages.len() as i32; + let tool_count = planning_tools.len() as i32; + + tracing::info!("[workflow {}] Planning LLM call #{} msgs={}", workflow_id, iteration + 1, messages.len()); + let call_start = std::time::Instant::now(); + let response = match llm.chat_with_tools(messages, &planning_tools).await { + Ok(r) => r, + Err(e) => { + tracing::error!("[workflow {}] LLM call failed: {}", workflow_id, e); + return Err(e); + } + }; + let latency_ms = call_start.elapsed().as_millis() as i64; + + let (prompt_tokens, completion_tokens) = response.usage.as_ref() + .map(|u| (Some(u.prompt_tokens), Some(u.completion_tokens))) + .unwrap_or((None, None)); + + let choice = response.choices.into_iter().next() + .ok_or_else(|| anyhow::anyhow!("No response from LLM"))?; + + state.current_step_chat_history.push(choice.message.clone()); + let llm_text_response = choice.message.content.clone().unwrap_or_default(); + + if let Some(tool_calls) = &choice.message.tool_calls { + let mut phase_transition = false; + + for tc in tool_calls { + if phase_transition { + state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, "(skipped: phase transition)")); + continue; + } + + let args: serde_json::Value = serde_json::from_str(&tc.function.arguments).unwrap_or_default(); + + match tc.function.name.as_str() { + "update_plan" => { + let raw_steps = args["steps"].as_array().cloned().unwrap_or_default(); + state.steps.clear(); + + for (i, item) in raw_steps.iter().enumerate() { + let order = (i + 1) as i32; + let title = item["title"].as_str().unwrap_or(item.as_str().unwrap_or("")).to_string(); + let detail = item["description"].as_str().unwrap_or("").to_string(); + state.steps.push(Step { + order, title, description: detail, + status: StepStatus::Pending, summary: None, + user_feedbacks: Vec::new(), db_id: String::new(), + }); + } + + if let Some(first) = state.steps.first_mut() { + first.status = StepStatus::Running; + } + + let _ = broadcast_tx.send(WsMessage::PlanUpdate { + workflow_id: workflow_id.to_string(), + steps: plan_infos_from_state(&state), + }); + + state.current_step_chat_history.clear(); + state.phase = AgentPhase::Executing { step: 1 }; + phase_transition = true; + + save_state_snapshot(pool, workflow_id, 0, &state).await; + tracing::info!("[workflow {}] Plan set ({} steps), entering Executing", workflow_id, state.steps.len()); + } + // Planning phase IO tools + _ => { + let result = execute_tool(&tc.function.name, &tc.function.arguments, workdir, exec).await; + state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); + } + } + } + + let tc_json: Vec = tool_calls.iter().map(|tc| { + serde_json::json!({ "name": tc.function.name, "arguments_preview": truncate_str(&tc.function.arguments, 200) }) + }).collect(); + let tc_json_str = serde_json::to_string(&tc_json).unwrap_or_else(|_| "[]".to_string()); + log_llm_call(pool, broadcast_tx, workflow_id, 0, "planning", msg_count, tool_count, + &tc_json_str, &llm_text_response, prompt_tokens, completion_tokens, latency_ms).await; + } else { + let content = choice.message.content.as_deref().unwrap_or("(no content)"); + tracing::info!("[workflow {}] Planning text response: {}", workflow_id, truncate_str(content, 200)); + log_execution(pool, broadcast_tx, workflow_id, 0, "text_response", "", content, "done").await; + log_llm_call(pool, broadcast_tx, workflow_id, 0, "planning", msg_count, tool_count, + "[]", content, prompt_tokens, completion_tokens, latency_ms).await; + } + } + + // --- Executing phase: step isolation loop --- + while matches!(state.phase, AgentPhase::Executing { .. }) { + let step_order = match state.first_actionable_step() { + Some(o) => o, + None => { + state.phase = AgentPhase::Completed; + break; + } + }; + + // Mark step as Running + if let Some(step) = state.steps.iter_mut().find(|s| s.order == step_order) { + step.status = StepStatus::Running; + } + state.phase = AgentPhase::Executing { step: step_order }; + state.current_step_chat_history.clear(); + + let _ = broadcast_tx.send(WsMessage::PlanUpdate { + workflow_id: workflow_id.to_string(), + steps: plan_infos_from_state(&state), + }); + save_state_snapshot(pool, workflow_id, step_order, &state).await; + + // Build completed summaries for context + let completed_summaries: Vec<(i32, String, String)> = state.steps.iter() + .filter(|s| matches!(s.status, StepStatus::Done)) + .map(|s| (s.order, s.title.clone(), s.summary.clone().unwrap_or_default())) + .collect(); + + let step = state.steps.iter().find(|s| s.order == step_order).unwrap().clone(); + + tracing::info!("[workflow {}] Starting step {} sub-loop: {}", workflow_id, step_order, step.title); + + // Run the isolated step sub-loop + let step_result = run_step_loop( + llm, exec, pool, broadcast_tx, + project_id, workflow_id, workdir, mgr, + instructions, &step, &completed_summaries, &state.scratchpad, + external_tools, event_rx, + ).await; + + tracing::info!("[workflow {}] Step {} completed: {:?}", workflow_id, step_order, step_result.status); + + // Update step status based on result + match &step_result.status { + StepResultStatus::Done => { + if let Some(s) = state.steps.iter_mut().find(|s| s.order == step_order) { + s.status = StepStatus::Done; + s.summary = Some(step_result.summary.clone()); + } + } + StepResultStatus::Failed { error } => { + if let Some(s) = state.steps.iter_mut().find(|s| s.order == step_order) { + s.status = StepStatus::Failed; + s.summary = Some(step_result.summary.clone()); + } + let _ = broadcast_tx.send(WsMessage::PlanUpdate { + workflow_id: workflow_id.to_string(), + steps: plan_infos_from_state(&state), + }); + save_state_snapshot(pool, workflow_id, step_order, &state).await; + return Err(anyhow::anyhow!("Step {} failed: {}", step_order, error)); + } + StepResultStatus::NeedsApproval { message: _ } => { + // This shouldn't normally happen since wait_for_approval is handled inside + // run_step_loop, but handle gracefully + if let Some(s) = state.steps.iter_mut().find(|s| s.order == step_order) { + s.status = StepStatus::WaitingApproval; + } + save_state_snapshot(pool, workflow_id, step_order, &state).await; + continue; + } + } + + let _ = broadcast_tx.send(WsMessage::PlanUpdate { + workflow_id: workflow_id.to_string(), + steps: plan_infos_from_state(&state), + }); + save_state_snapshot(pool, workflow_id, step_order, &state).await; + + // --- Coordinator review --- + // Check if there are more steps; if not, we're done + if state.first_actionable_step().is_none() { + state.phase = AgentPhase::Completed; break; } + + // Coordinator LLM reviews the step result and may update the plan + let coordinator_prompt = build_coordinator_prompt(project_id, instructions); + let review_message = format!( + "步骤 {} 「{}」执行完成。\n\n执行摘要:{}\n\n请审视结果。如需修改后续计划请使用 update_plan,否则回复确认继续。", + step_order, step.title, step_result.summary + ); + + // Build coordinator context with plan overview + scratchpad + let mut coordinator_ctx = String::new(); + coordinator_ctx.push_str("## 计划概览\n"); + for s in &state.steps { + let marker = match s.status { + StepStatus::Done => " [done]", + StepStatus::Running => " [running]", + StepStatus::WaitingApproval => " [waiting]", + StepStatus::Failed => " [FAILED]", + StepStatus::Pending => "", + }; + coordinator_ctx.push_str(&format!("{}. {}{}\n", s.order, s.title, marker)); + if let Some(summary) = &s.summary { + coordinator_ctx.push_str(&format!(" 摘要: {}\n", summary)); + } + } + if !state.scratchpad.is_empty() { + coordinator_ctx.push_str(&format!("\n## 全局备忘录\n{}\n", state.scratchpad)); + } + + let coord_messages = vec![ + ChatMessage::system(&coordinator_prompt), + ChatMessage::user(&coordinator_ctx), + ChatMessage::user(&review_message), + ]; + + // Add to main chat history for context + state.current_step_chat_history.clear(); + + tracing::info!("[workflow {}] Coordinator review for step {}", workflow_id, step_order); + let call_start = std::time::Instant::now(); + let coord_response = match llm.chat_with_tools(coord_messages.clone(), &coordinator_tools).await { + Ok(r) => r, + Err(e) => { + tracing::warn!("[workflow {}] Coordinator LLM call failed, continuing: {}", workflow_id, e); + continue; // Non-fatal, just skip review + } + }; + let latency_ms = call_start.elapsed().as_millis() as i64; + + let (prompt_tokens, completion_tokens) = coord_response.usage.as_ref() + .map(|u| (Some(u.prompt_tokens), Some(u.completion_tokens))) + .unwrap_or((None, None)); + + log_llm_call(pool, broadcast_tx, workflow_id, step_order, "coordinator", + coord_messages.len() as i32, coordinator_tools.len() as i32, + "[]", "", prompt_tokens, completion_tokens, latency_ms).await; + + if let Some(choice) = coord_response.choices.into_iter().next() { + if let Some(tool_calls) = &choice.message.tool_calls { + for tc in tool_calls { + let args: serde_json::Value = serde_json::from_str(&tc.function.arguments).unwrap_or_default(); + match tc.function.name.as_str() { + "update_plan" => { + let raw_steps = args["steps"].as_array().cloned().unwrap_or_default(); + let new_steps: Vec = raw_steps.iter().enumerate().map(|(i, item)| { + Step { + order: (i + 1) as i32, + title: item["title"].as_str().unwrap_or("").to_string(), + description: item["description"].as_str().unwrap_or("").to_string(), + status: StepStatus::Pending, summary: None, + user_feedbacks: Vec::new(), db_id: String::new(), + } + }).collect(); + + state.apply_plan_diff(new_steps); + let _ = broadcast_tx.send(WsMessage::PlanUpdate { + workflow_id: workflow_id.to_string(), + steps: plan_infos_from_state(&state), + }); + tracing::info!("[workflow {}] Coordinator revised plan", workflow_id); + save_state_snapshot(pool, workflow_id, step_order, &state).await; + } + "update_scratchpad" => { + let content = args["content"].as_str().unwrap_or(""); + let mut new_pad = state.scratchpad.clone(); + if !new_pad.is_empty() { new_pad.push('\n'); } + new_pad.push_str(content); + if check_scratchpad_size(&new_pad).is_ok() { + state.scratchpad = new_pad; + } + } + "update_requirement" => { + let new_req = args["requirement"].as_str().unwrap_or(""); + let _ = sqlx::query("UPDATE workflows SET requirement = ? WHERE id = ?") + .bind(new_req).bind(workflow_id).execute(pool).await; + let _ = tokio::fs::write(format!("{}/requirement.md", workdir), new_req).await; + let _ = broadcast_tx.send(WsMessage::RequirementUpdate { + workflow_id: workflow_id.to_string(), + requirement: new_req.to_string(), + }); + } + _ => {} + } + } + } else { + // Text response — coordinator is satisfied, continue + let content = choice.message.content.as_deref().unwrap_or(""); + tracing::info!("[workflow {}] Coordinator: {}", workflow_id, truncate_str(content, 200)); + } + } } // Final snapshot @@ -1466,3 +1832,196 @@ async fn generate_title(llm: &LlmClient, requirement: &str) -> anyhow::Result Step { + Step { + order, + title: title.into(), + description: desc.into(), + status, + summary: None, + user_feedbacks: Vec::new(), + db_id: String::new(), + } + } + + // --- build_step_user_message --- + + #[test] + fn step_msg_basic() { + let step = make_step(1, "Setup env", "Install dependencies", StepStatus::Running); + let msg = build_step_user_message(&step, &[], ""); + + assert!(msg.contains("## 当前步骤(步骤 1)")); + assert!(msg.contains("标题:Setup env")); + assert!(msg.contains("描述:Install dependencies")); + // No completed summaries or scratchpad sections + assert!(!msg.contains("已完成步骤摘要")); + assert!(!msg.contains("项目备忘录")); + } + + #[test] + fn step_msg_with_completed_summaries() { + let step = make_step(3, "Deploy", "Push to prod", StepStatus::Running); + let summaries = vec![ + (1, "Setup".into(), "Installed deps".into()), + (2, "Build".into(), "Compiled OK".into()), + ]; + let msg = build_step_user_message(&step, &summaries, ""); + + assert!(msg.contains("## 已完成步骤摘要")); + assert!(msg.contains("步骤 1 (Setup): Installed deps")); + assert!(msg.contains("步骤 2 (Build): Compiled OK")); + } + + #[test] + fn step_msg_with_parent_scratchpad() { + let step = make_step(2, "Build", "compile", StepStatus::Running); + let msg = build_step_user_message(&step, &[], "DB_HOST=localhost\nDB_PORT=5432"); + + assert!(msg.contains("## 项目备忘录(只读)")); + assert!(msg.contains("DB_HOST=localhost")); + assert!(msg.contains("DB_PORT=5432")); + } + + #[test] + fn step_msg_with_user_feedback() { + let step = Step { + user_feedbacks: vec!["Use Python 3.12".into(), "Skip linting".into()], + ..make_step(1, "Setup", "setup env", StepStatus::Running) + }; + let msg = build_step_user_message(&step, &[], ""); + + assert!(msg.contains("用户反馈")); + assert!(msg.contains("- Use Python 3.12")); + assert!(msg.contains("- Skip linting")); + } + + #[test] + fn step_msg_full_context() { + let step = Step { + user_feedbacks: vec!["add caching".into()], + ..make_step(3, "API", "build REST API", StepStatus::Running) + }; + let summaries = vec![ + (1, "DB".into(), "Schema created".into()), + (2, "Models".into(), "ORM models done".into()), + ]; + let msg = build_step_user_message(&step, &summaries, "tech_stack=FastAPI"); + + // All sections present + assert!(msg.contains("## 当前步骤(步骤 3)")); + assert!(msg.contains("## 已完成步骤摘要")); + assert!(msg.contains("## 项目备忘录(只读)")); + assert!(msg.contains("用户反馈")); + // Content correct + assert!(msg.contains("build REST API")); + assert!(msg.contains("Schema created")); + assert!(msg.contains("tech_stack=FastAPI")); + assert!(msg.contains("add caching")); + } + + // --- truncate_str --- + + #[test] + fn truncate_short_noop() { + assert_eq!(truncate_str("hello", 10), "hello"); + } + + #[test] + fn truncate_exact() { + assert_eq!(truncate_str("hello", 5), "hello"); + } + + #[test] + fn truncate_cuts() { + assert_eq!(truncate_str("hello world", 5), "hello"); + } + + #[test] + fn truncate_respects_char_boundary() { + let s = "你好世界"; // each char is 3 bytes + // 7 bytes → should cut to 6 (2 chars) + let t = truncate_str(s, 7); + assert_eq!(t, "你好"); + assert_eq!(t.len(), 6); + } + + // --- tool definitions --- + + #[test] + fn step_tools_have_step_done() { + let tools = build_step_tools(); + let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect(); + assert!(names.contains(&"step_done"), "step_done must be in step tools"); + assert!(!names.contains(&"advance_step"), "advance_step must NOT be in step tools"); + assert!(!names.contains(&"update_plan"), "update_plan must NOT be in step tools"); + assert!(!names.contains(&"update_requirement"), "update_requirement must NOT be in step tools"); + } + + #[test] + fn step_tools_have_execution_tools() { + let tools = build_step_tools(); + let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect(); + for expected in &["execute", "read_file", "write_file", "list_files", + "start_service", "stop_service", "update_scratchpad", + "wait_for_approval", "kb_search", "kb_read"] { + assert!(names.contains(expected), "{} must be in step tools", expected); + } + } + + #[test] + fn coordinator_tools_correct() { + let tools = build_coordinator_tools(); + let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect(); + assert!(names.contains(&"update_plan")); + assert!(names.contains(&"update_scratchpad")); + assert!(names.contains(&"update_requirement")); + // Must NOT have execution tools + assert!(!names.contains(&"execute")); + assert!(!names.contains(&"step_done")); + assert!(!names.contains(&"advance_step")); + } + + #[test] + fn planning_tools_correct() { + let tools = build_planning_tools(); + let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect(); + assert!(names.contains(&"update_plan")); + assert!(names.contains(&"list_files")); + assert!(names.contains(&"read_file")); + assert!(names.contains(&"kb_search")); + assert!(names.contains(&"kb_read")); + assert!(!names.contains(&"execute")); + assert!(!names.contains(&"step_done")); + } + + // --- plan_infos_from_state --- + + #[test] + fn plan_infos_maps_correctly() { + let state = AgentState { + phase: AgentPhase::Executing { step: 2 }, + steps: vec![ + Step { status: StepStatus::Done, summary: Some("done".into()), + ..make_step(1, "A", "desc A", StepStatus::Done) }, + make_step(2, "B", "desc B", StepStatus::Running), + ], + current_step_chat_history: Vec::new(), + scratchpad: String::new(), + }; + + let infos = plan_infos_from_state(&state); + assert_eq!(infos.len(), 2); + assert_eq!(infos[0].order, 1); + assert_eq!(infos[0].description, "A"); // title maps to description field + assert_eq!(infos[0].command, "desc A"); // description maps to command field + assert_eq!(infos[0].status.as_deref(), Some("done")); + assert_eq!(infos[1].status.as_deref(), Some("running")); + } +} diff --git a/src/prompts/execution.md b/src/prompts/execution.md index a1f4b22..29d5126 100644 --- a/src/prompts/execution.md +++ b/src/prompts/execution.md @@ -1,25 +1,27 @@ -你是一个 AI 智能体,正处于【执行阶段】。请专注完成当前步骤的任务。 +你是一个 AI 智能体的协调者,正处于【执行阶段】。每个步骤由独立的子执行器完成,你负责审视结果并协调整体进度。 -可用工具: -- execute:执行 shell 命令 -- read_file / write_file / list_files:文件操作 -- start_service / stop_service:管理后台服务 -- update_requirement:更新项目需求 -- advance_step:完成当前步骤并进入下一步(必须提供摘要) -- update_scratchpad:保存跨步骤持久化的关键信息 +## 你的角色 -工作流程: -1. 阅读下方的「当前步骤」描述 -2. 使用工具执行所需操作 -3. 完成后调用 advance_step(summary=...) 推进到下一步 -4. 最后一步完成后,直接回复简要总结(不调用工具)即可结束 +- 审视每个步骤的执行摘要 +- 根据执行结果决定:继续下一步、修改后续计划、或终止执行 +- 维护全局备忘录,记录跨步骤的关键信息 + +## 可用工具 + +- update_plan:修改执行计划(提供完整步骤列表,系统自动 diff) +- update_scratchpad:更新全局备忘录(跨步骤持久化的关键信息) +- update_requirement:更新项目需求描述 + +## 工作流程 + +当你收到步骤执行摘要时: +1. 审视摘要,判断步骤是否成功完成了预期目标 +2. 如需调整后续计划,使用 update_plan +3. 如无需调整,回复确认继续(不调用工具即可) 环境信息: -- 工作目录是独立的项目工作区,Python venv 已预先激活(.venv/) -- 使用 `uv add <包名>` 或 `pip install <包名>` 安装依赖 +- 工作目录是独立的项目工作区 - 静态文件访问:/api/projects/{project_id}/files/{filename} -- 后台服务访问:/api/projects/{project_id}/app/(启动命令需监听 0.0.0.0:$PORT) -- 【重要】应用通过反向代理访问,前端 HTML/JS 中的 fetch/XHR 请求必须使用相对路径(如 fetch('todos')),绝对不能用 / 开头的路径(如 fetch('/todos')),否则会 404 -- 知识库工具:kb_search(query) 搜索相关片段,kb_read() 读取全文 +- 后台服务访问:/api/projects/{project_id}/app/ 请使用中文回复。 diff --git a/src/prompts/step_execution.md b/src/prompts/step_execution.md new file mode 100644 index 0000000..4303ffc --- /dev/null +++ b/src/prompts/step_execution.md @@ -0,0 +1,34 @@ +你是一个步骤执行者,负责完成当前分配给你的步骤。 + +## 可用工具 + +- execute:执行 shell 命令 +- read_file / write_file / list_files:文件操作 +- start_service / stop_service:管理后台服务 +- kb_search / kb_read:搜索和读取知识库 +- update_scratchpad:记录本步骤内的中间状态(步骤结束后丢弃,精华写进 summary) +- wait_for_approval:暂停执行等待用户确认 +- step_done:**完成当前步骤时必须调用**,提供本步骤的工作摘要 + +## 工作流程 + +1. 阅读当前步骤的描述和上下文 +2. 使用工具执行所需操作 +3. 完成后调用 step_done(summary=...) 汇报结果 + +## 规则 + +- **专注当前步骤**,不做超出范围的事 +- 完成后**必须**调用 step_done(summary),summary 应简洁概括本步骤做了什么、结果如何 +- 需要用户确认时使用 wait_for_approval(reason) +- update_scratchpad 用于记录本步骤内的中间状态,是工作记忆而非日志,只保留当前有用的信息 + +## 环境信息 + +- 工作目录是独立的项目工作区,Python venv 已预先激活(.venv/) +- 使用 `uv add <包名>` 或 `pip install <包名>` 安装依赖 +- 静态文件访问:/api/projects/{project_id}/files/{filename} +- 后台服务访问:/api/projects/{project_id}/app/(启动命令需监听 0.0.0.0:$PORT) +- 【重要】应用通过反向代理访问,前端 HTML/JS 中的 fetch/XHR 请求必须使用相对路径(如 fetch('todos')),绝对不能用 / 开头的路径(如 fetch('/todos')),否则会 404 + +请使用中文回复。 diff --git a/src/state.rs b/src/state.rs index b8c1f2d..b2365b8 100644 --- a/src/state.rs +++ b/src/state.rs @@ -2,6 +2,36 @@ use serde::{Deserialize, Serialize}; use crate::llm::ChatMessage; +// --- Step result (returned by run_step_loop) --- + +#[derive(Debug, Clone)] +pub struct StepResult { + pub status: StepResultStatus, + pub summary: String, +} + +#[derive(Debug, Clone)] +pub enum StepResultStatus { + Done, + Failed { error: String }, + NeedsApproval { message: String }, +} + +/// Check scratchpad size. Limit: ~8K tokens ≈ 24K bytes. +const SCRATCHPAD_MAX_BYTES: usize = 24_000; + +pub fn check_scratchpad_size(content: &str) -> Result<(), String> { + if content.len() > SCRATCHPAD_MAX_BYTES { + Err(format!( + "Scratchpad 超出容量限制(当前 {} 字节,上限 {} 字节)。请精简内容后重试。", + content.len(), + SCRATCHPAD_MAX_BYTES, + )) + } else { + Ok(()) + } +} + // --- Agent phase state machine --- #[derive(Debug, Clone, Serialize, Deserialize)] @@ -205,3 +235,312 @@ impl AgentState { msgs } } + +#[cfg(test)] +mod tests { + use super::*; + + fn make_step(order: i32, title: &str, desc: &str, status: StepStatus) -> Step { + Step { + order, + title: title.into(), + description: desc.into(), + status, + summary: None, + user_feedbacks: Vec::new(), + db_id: String::new(), + } + } + + // --- check_scratchpad_size --- + + #[test] + fn scratchpad_empty_ok() { + assert!(check_scratchpad_size("").is_ok()); + } + + #[test] + fn scratchpad_under_limit_ok() { + let content = "a".repeat(24_000); + assert!(check_scratchpad_size(&content).is_ok()); + } + + #[test] + fn scratchpad_over_limit_err() { + let content = "a".repeat(24_001); + let err = check_scratchpad_size(&content).unwrap_err(); + assert!(err.contains("24001")); + assert!(err.contains("24000")); + } + + #[test] + fn scratchpad_exactly_at_limit() { + let content = "a".repeat(SCRATCHPAD_MAX_BYTES); + assert!(check_scratchpad_size(&content).is_ok()); + } + + #[test] + fn scratchpad_multibyte_counts_bytes_not_chars() { + // 8000 个中文字 = 24000 bytes (UTF-8), exactly at limit + let content = "你".repeat(8000); + assert_eq!(content.len(), 24000); + assert!(check_scratchpad_size(&content).is_ok()); + + // One more char pushes over + let content_over = format!("{}你", content); + assert!(check_scratchpad_size(&content_over).is_err()); + } + + // --- first_actionable_step --- + + #[test] + fn first_actionable_all_done() { + let state = AgentState { + phase: AgentPhase::Executing { step: 1 }, + steps: vec![ + make_step(1, "A", "a", StepStatus::Done), + make_step(2, "B", "b", StepStatus::Done), + ], + current_step_chat_history: Vec::new(), + scratchpad: String::new(), + }; + assert_eq!(state.first_actionable_step(), None); + } + + #[test] + fn first_actionable_skips_done() { + let state = AgentState { + phase: AgentPhase::Executing { step: 2 }, + steps: vec![ + make_step(1, "A", "a", StepStatus::Done), + make_step(2, "B", "b", StepStatus::Pending), + make_step(3, "C", "c", StepStatus::Pending), + ], + current_step_chat_history: Vec::new(), + scratchpad: String::new(), + }; + assert_eq!(state.first_actionable_step(), Some(2)); + } + + #[test] + fn first_actionable_finds_running() { + let state = AgentState { + phase: AgentPhase::Executing { step: 2 }, + steps: vec![ + make_step(1, "A", "a", StepStatus::Done), + make_step(2, "B", "b", StepStatus::Running), + ], + current_step_chat_history: Vec::new(), + scratchpad: String::new(), + }; + assert_eq!(state.first_actionable_step(), Some(2)); + } + + #[test] + fn first_actionable_finds_waiting_approval() { + let state = AgentState { + phase: AgentPhase::Executing { step: 1 }, + steps: vec![ + make_step(1, "A", "a", StepStatus::WaitingApproval), + make_step(2, "B", "b", StepStatus::Pending), + ], + current_step_chat_history: Vec::new(), + scratchpad: String::new(), + }; + assert_eq!(state.first_actionable_step(), Some(1)); + } + + #[test] + fn first_actionable_skips_failed() { + let state = AgentState { + phase: AgentPhase::Executing { step: 2 }, + steps: vec![ + make_step(1, "A", "a", StepStatus::Failed), + make_step(2, "B", "b", StepStatus::Pending), + ], + current_step_chat_history: Vec::new(), + scratchpad: String::new(), + }; + assert_eq!(state.first_actionable_step(), Some(2)); + } + + // --- apply_plan_diff --- + + #[test] + fn plan_diff_identical_keeps_done() { + let mut state = AgentState::new(); + state.steps = vec![ + Step { status: StepStatus::Done, summary: Some("did A".into()), + ..make_step(1, "A", "desc A", StepStatus::Done) }, + make_step(2, "B", "desc B", StepStatus::Pending), + ]; + + let new_steps = vec![ + make_step(1, "A", "desc A", StepStatus::Pending), + make_step(2, "B", "desc B", StepStatus::Pending), + ]; + state.apply_plan_diff(new_steps); + + assert!(matches!(state.steps[0].status, StepStatus::Done)); + assert_eq!(state.steps[0].summary.as_deref(), Some("did A")); + assert!(matches!(state.steps[1].status, StepStatus::Pending)); + } + + #[test] + fn plan_diff_change_invalidates_from_mismatch() { + let mut state = AgentState::new(); + state.steps = vec![ + Step { status: StepStatus::Done, summary: Some("did A".into()), + ..make_step(1, "A", "desc A", StepStatus::Done) }, + Step { status: StepStatus::Done, summary: Some("did B".into()), + ..make_step(2, "B", "desc B", StepStatus::Done) }, + make_step(3, "C", "desc C", StepStatus::Pending), + ]; + + // Change step 2's description → invalidates 2 and 3 + let new_steps = vec![ + make_step(1, "A", "desc A", StepStatus::Pending), + make_step(2, "B", "desc B CHANGED", StepStatus::Pending), + make_step(3, "C", "desc C", StepStatus::Pending), + ]; + state.apply_plan_diff(new_steps); + + assert!(matches!(state.steps[0].status, StepStatus::Done)); // kept + assert!(matches!(state.steps[1].status, StepStatus::Pending)); // invalidated + assert!(state.steps[1].summary.is_none()); // summary cleared + assert!(matches!(state.steps[2].status, StepStatus::Pending)); // invalidated + } + + #[test] + fn plan_diff_add_new_steps() { + let mut state = AgentState::new(); + state.steps = vec![ + Step { status: StepStatus::Done, summary: Some("did A".into()), + ..make_step(1, "A", "desc A", StepStatus::Done) }, + ]; + + let new_steps = vec![ + make_step(1, "A", "desc A", StepStatus::Pending), + make_step(2, "New", "new step", StepStatus::Pending), + ]; + state.apply_plan_diff(new_steps); + + assert_eq!(state.steps.len(), 2); + assert!(matches!(state.steps[0].status, StepStatus::Done)); + assert!(matches!(state.steps[1].status, StepStatus::Pending)); + assert_eq!(state.steps[1].title, "New"); + } + + #[test] + fn plan_diff_remove_steps() { + let mut state = AgentState::new(); + state.steps = vec![ + Step { status: StepStatus::Done, summary: Some("did A".into()), + ..make_step(1, "A", "desc A", StepStatus::Done) }, + make_step(2, "B", "desc B", StepStatus::Pending), + make_step(3, "C", "desc C", StepStatus::Pending), + ]; + + // New plan only has 1 step (same as step 1) + let new_steps = vec![ + make_step(1, "A", "desc A", StepStatus::Pending), + ]; + state.apply_plan_diff(new_steps); + + assert_eq!(state.steps.len(), 1); + assert!(matches!(state.steps[0].status, StepStatus::Done)); + } + + // --- build_step_context --- + + #[test] + fn step_context_includes_all_sections() { + let state = AgentState { + phase: AgentPhase::Executing { step: 2 }, + steps: vec![ + Step { status: StepStatus::Done, summary: Some("installed deps".into()), + ..make_step(1, "Setup", "install deps", StepStatus::Done) }, + make_step(2, "Build", "compile code", StepStatus::Running), + make_step(3, "Test", "run tests", StepStatus::Pending), + ], + current_step_chat_history: Vec::new(), + scratchpad: "key=value".into(), + }; + + let ctx = state.build_step_context("Build a web app"); + + assert!(ctx.contains("## 需求\nBuild a web app")); + assert!(ctx.contains("## 计划概览")); + assert!(ctx.contains("1. Setup done")); + assert!(ctx.contains("2. Build >> current")); + assert!(ctx.contains("3. Test")); + assert!(ctx.contains("## 当前步骤(步骤 2)")); + assert!(ctx.contains("标题:Build")); + assert!(ctx.contains("描述:compile code")); + assert!(ctx.contains("## 已完成步骤摘要")); + assert!(ctx.contains("installed deps")); + assert!(ctx.contains("## 备忘录\nkey=value")); + } + + #[test] + fn step_context_user_feedback() { + let state = AgentState { + phase: AgentPhase::Executing { step: 1 }, + steps: vec![ + Step { + user_feedbacks: vec!["please use React".into()], + ..make_step(1, "Setup", "setup project", StepStatus::Running) + }, + ], + current_step_chat_history: Vec::new(), + scratchpad: String::new(), + }; + + let ctx = state.build_step_context("Build app"); + assert!(ctx.contains("用户反馈")); + assert!(ctx.contains("please use React")); + } + + // --- build_messages --- + + #[test] + fn build_messages_planning() { + let state = AgentState::new(); + let msgs = state.build_messages("system prompt", "requirement text"); + + assert_eq!(msgs.len(), 2); + assert_eq!(msgs[0].role, "system"); + assert_eq!(msgs[0].content.as_deref(), Some("system prompt")); + assert_eq!(msgs[1].role, "user"); + assert_eq!(msgs[1].content.as_deref(), Some("requirement text")); + } + + #[test] + fn build_messages_executing_includes_history() { + let state = AgentState { + phase: AgentPhase::Executing { step: 1 }, + steps: vec![make_step(1, "Do thing", "details", StepStatus::Running)], + current_step_chat_history: vec![ + ChatMessage { role: "assistant".into(), content: Some("let me help".into()), tool_calls: None, tool_call_id: None }, + ], + scratchpad: String::new(), + }; + + let msgs = state.build_messages("sys", "req"); + assert_eq!(msgs.len(), 3); // system + user context + 1 history + assert_eq!(msgs[2].role, "assistant"); + } + + #[test] + fn build_messages_completed_minimal() { + let state = AgentState { + phase: AgentPhase::Completed, + steps: Vec::new(), + current_step_chat_history: Vec::new(), + scratchpad: String::new(), + }; + + let msgs = state.build_messages("sys", "req"); + assert_eq!(msgs.len(), 1); // only system + } +}