tori/src/agent.rs
Fam Zheng f214b67f92 feat: add template setup executable support
Templates can now include a `setup` file that runs in the workdir
before agent execution starts. Used for workspace initialization
like pulling tool binaries or installing dependencies.
2026-03-09 17:02:12 +00:00

2197 lines
94 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicU16, Ordering};
use serde::{Deserialize, Serialize};
use sqlx::sqlite::SqlitePool;
use tokio::sync::{mpsc, RwLock, broadcast};
use crate::llm::{LlmClient, ChatMessage, Tool, ToolFunction};
use crate::exec::LocalExecutor;
use crate::template::{self, LoadedTemplate};
use crate::tools::ExternalToolManager;
use crate::LlmConfig;
use crate::state::{AgentState, AgentPhase, Artifact, Step, StepStatus, StepResult, StepResultStatus, check_scratchpad_size};
pub struct ServiceInfo {
pub port: u16,
pub pid: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum AgentEvent {
NewRequirement { workflow_id: String, requirement: String, template_id: Option<String> },
Comment { workflow_id: String, content: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum WsMessage {
PlanUpdate { workflow_id: String, steps: Vec<PlanStepInfo> },
StepStatusUpdate { step_id: String, status: String, output: String },
WorkflowStatusUpdate { workflow_id: String, status: String },
RequirementUpdate { workflow_id: String, requirement: String },
ReportReady { workflow_id: String },
ProjectUpdate { project_id: String, name: String },
LlmCallLog { workflow_id: String, entry: crate::db::LlmCallLogEntry },
ActivityUpdate { workflow_id: String, activity: String },
Error { message: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanStepInfo {
pub order: i32,
pub description: String,
pub command: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub artifacts: Vec<Artifact>,
}
pub fn plan_infos_from_state(state: &AgentState) -> Vec<PlanStepInfo> {
state.steps.iter().map(|s| {
let status = match s.status {
StepStatus::Pending => "pending",
StepStatus::Running => "running",
StepStatus::WaitingApproval => "waiting_approval",
StepStatus::Done => "done",
StepStatus::Failed => "failed",
};
PlanStepInfo {
order: s.order,
description: s.title.clone(),
command: s.description.clone(),
status: Some(status.to_string()),
artifacts: s.artifacts.clone(),
}
}).collect()
}
pub struct AgentManager {
agents: RwLock<HashMap<String, mpsc::Sender<AgentEvent>>>,
broadcast: RwLock<HashMap<String, broadcast::Sender<WsMessage>>>,
pub services: RwLock<HashMap<String, ServiceInfo>>,
next_port: AtomicU16,
pool: SqlitePool,
llm_config: LlmConfig,
template_repo: Option<crate::TemplateRepoConfig>,
kb: Option<Arc<crate::kb::KbManager>>,
}
impl AgentManager {
pub fn new(
pool: SqlitePool,
llm_config: LlmConfig,
template_repo: Option<crate::TemplateRepoConfig>,
kb: Option<Arc<crate::kb::KbManager>>,
) -> Arc<Self> {
Arc::new(Self {
agents: RwLock::new(HashMap::new()),
broadcast: RwLock::new(HashMap::new()),
services: RwLock::new(HashMap::new()),
next_port: AtomicU16::new(9100),
pool,
llm_config,
template_repo,
kb,
})
}
pub fn allocate_port(&self) -> u16 {
self.next_port.fetch_add(1, Ordering::Relaxed)
}
pub async fn get_service_port(&self, project_id: &str) -> Option<u16> {
self.services.read().await.get(project_id).map(|s| s.port)
}
pub async fn get_broadcast(&self, project_id: &str) -> broadcast::Receiver<WsMessage> {
let mut map = self.broadcast.write().await;
let tx = map.entry(project_id.to_string())
.or_insert_with(|| broadcast::channel(64).0);
tx.subscribe()
}
pub async fn send_event(self: &Arc<Self>, project_id: &str, event: AgentEvent) {
let agents = self.agents.read().await;
if let Some(tx) = agents.get(project_id) {
let _ = tx.send(event).await;
} else {
drop(agents);
self.spawn_agent(project_id.to_string()).await;
let agents = self.agents.read().await;
if let Some(tx) = agents.get(project_id) {
let _ = tx.send(event).await;
}
}
}
async fn spawn_agent(self: &Arc<Self>, project_id: String) {
let (tx, rx) = mpsc::channel(32);
self.agents.write().await.insert(project_id.clone(), tx);
let broadcast_tx = {
let mut map = self.broadcast.write().await;
map.entry(project_id.clone())
.or_insert_with(|| broadcast::channel(64).0)
.clone()
};
let mgr = Arc::clone(self);
tokio::spawn(agent_loop(project_id, rx, broadcast_tx, mgr));
}
}
// Template system is in crate::template
/// Read INSTRUCTIONS.md from workdir if it exists.
async fn read_instructions(workdir: &str) -> String {
let path = format!("{}/INSTRUCTIONS.md", workdir);
tokio::fs::read_to_string(&path).await.unwrap_or_default()
}
async fn ensure_workspace(exec: &LocalExecutor, workdir: &str) {
let _ = tokio::fs::create_dir_all(workdir).await;
let setup_script = format!("{}/scripts/setup.sh", workdir);
if Path::new(&setup_script).exists() {
tracing::info!("Running setup.sh in {}", workdir);
let _ = exec.execute("bash scripts/setup.sh", workdir).await;
} else {
let venv_path = format!("{}/.venv", workdir);
if !Path::new(&venv_path).exists() {
let _ = exec.execute("uv venv .venv", workdir).await;
let _ = exec.execute("uv pip install httpx fastapi uvicorn requests flask pydantic numpy pandas matplotlib pillow jinja2 pyyaml python-dotenv beautifulsoup4 lxml aiohttp aiofiles pytest rich click typer sqlalchemy", workdir).await;
}
}
}
async fn agent_loop(
project_id: String,
mut rx: mpsc::Receiver<AgentEvent>,
broadcast_tx: broadcast::Sender<WsMessage>,
mgr: Arc<AgentManager>,
) {
let pool = mgr.pool.clone();
let llm_config = mgr.llm_config.clone();
let llm = LlmClient::new(&llm_config);
let exec = LocalExecutor::new();
let workdir = format!("/app/data/workspaces/{}", project_id);
tracing::info!("Agent loop started for project {}", project_id);
while let Some(event) = rx.recv().await {
match event {
AgentEvent::NewRequirement { workflow_id, requirement, template_id: forced_template } => {
tracing::info!("Processing new requirement for workflow {}", workflow_id);
// Generate project title in background (don't block the agent loop)
{
let title_llm = LlmClient::new(&llm_config);
let title_pool = pool.clone();
let title_btx = broadcast_tx.clone();
let title_pid = project_id.clone();
let title_req = requirement.clone();
tokio::spawn(async move {
if let Ok(title) = generate_title(&title_llm, &title_req).await {
let _ = sqlx::query("UPDATE projects SET name = ? WHERE id = ?")
.bind(&title)
.bind(&title_pid)
.execute(&title_pool)
.await;
let _ = title_btx.send(WsMessage::ProjectUpdate {
project_id: title_pid,
name: title,
});
}
});
}
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: "executing".into(),
});
let _ = sqlx::query("UPDATE workflows SET status = 'executing' WHERE id = ?")
.bind(&workflow_id)
.execute(&pool)
.await;
// Template selection + workspace setup
let template_id = if forced_template.is_some() {
tracing::info!("Using forced template: {:?}", forced_template);
forced_template
} else {
template::select_template(&llm, &requirement, mgr.template_repo.as_ref()).await
};
// Persist template_id to workflow
if let Some(ref tid) = template_id {
let _ = sqlx::query("UPDATE workflows SET template_id = ? WHERE id = ?")
.bind(tid)
.bind(&workflow_id)
.execute(&pool)
.await;
}
let loaded_template = if let Some(ref tid) = template_id {
tracing::info!("Template selected for workflow {}: {}", workflow_id, tid);
let _ = tokio::fs::create_dir_all(&workdir).await;
if template::is_repo_template(tid) {
// Repo template: extract from git then load
match template::extract_repo_template(tid, mgr.template_repo.as_ref()).await {
Ok(template_dir) => {
if let Err(e) = template::apply_template(&template_dir, &workdir).await {
tracing::error!("Failed to apply repo template {}: {}", tid, e);
}
match LoadedTemplate::load_from_dir(tid, &template_dir).await {
Ok(t) => Some(t),
Err(e) => {
tracing::error!("Failed to load repo template {}: {}", tid, e);
None
}
}
}
Err(e) => {
tracing::error!("Failed to extract repo template {}: {}", tid, e);
None
}
}
} else {
// Local built-in template
let template_dir = std::path::Path::new(template::templates_dir()).join(tid);
if let Err(e) = template::apply_template(&template_dir, &workdir).await {
tracing::error!("Failed to apply template {}: {}", tid, e);
}
match LoadedTemplate::load(tid).await {
Ok(t) => Some(t),
Err(e) => {
tracing::error!("Failed to load template {}: {}", tid, e);
None
}
}
}
} else {
None
};
// Import KB files from template
if let Some(ref t) = loaded_template {
if let Some(ref kb) = mgr.kb {
let mut batch_items: Vec<(String, String)> = Vec::new();
for (title, content) in &t.kb_files {
// Check if article already exists by title
let existing: Option<String> = sqlx::query_scalar(
"SELECT id FROM kb_articles WHERE title = ?"
)
.bind(title)
.fetch_optional(&pool)
.await
.ok()
.flatten();
let article_id = if let Some(id) = existing {
let _ = sqlx::query(
"UPDATE kb_articles SET content = ?, updated_at = datetime('now') WHERE id = ?"
)
.bind(content)
.bind(&id)
.execute(&pool)
.await;
id
} else {
let id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT INTO kb_articles (id, title, content) VALUES (?, ?, ?)"
)
.bind(&id)
.bind(title)
.bind(content)
.execute(&pool)
.await;
id
};
batch_items.push((article_id, content.clone()));
}
// Batch index: single embed.py call for all articles
if !batch_items.is_empty() {
if let Err(e) = kb.index_batch(&batch_items).await {
tracing::warn!("Failed to batch index KB articles: {}", e);
}
}
tracing::info!("Imported {} KB articles from template", t.kb_files.len());
}
}
ensure_workspace(&exec, &workdir).await;
let _ = tokio::fs::write(format!("{}/requirement.md", workdir), &requirement).await;
// Run template setup if present
if let Some(ref tid) = template_id {
let template_dir = if template::is_repo_template(tid) {
template::extract_repo_template(tid, mgr.template_repo.as_ref())
.await
.ok()
} else {
Some(std::path::Path::new(template::templates_dir()).join(tid))
};
if let Some(ref tdir) = template_dir {
if let Err(e) = template::run_setup(tdir, &workdir).await {
tracing::error!("Template setup failed for {}: {}", tid, e);
}
}
}
let instructions = if let Some(ref t) = loaded_template {
t.instructions.clone()
} else {
read_instructions(&workdir).await
};
let ext_tools = loaded_template.as_ref().map(|t| &t.external_tools);
tracing::info!("Starting agent loop for workflow {}", workflow_id);
// Run tool-calling agent loop
let result = run_agent_loop(
&llm, &exec, &pool, &broadcast_tx,
&project_id, &workflow_id, &requirement, &workdir, &mgr,
&instructions, None, ext_tools, &mut rx,
).await;
let final_status = if result.is_ok() { "done" } else { "failed" };
tracing::info!("Agent loop finished for workflow {}, status: {}", workflow_id, final_status);
if let Err(e) = &result {
tracing::error!("Agent error for workflow {}: {}", workflow_id, e);
let _ = broadcast_tx.send(WsMessage::Error {
message: format!("Agent error: {}", e),
});
}
let _ = sqlx::query("UPDATE workflows SET status = ? WHERE id = ?")
.bind(final_status)
.bind(&workflow_id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: final_status.into(),
});
// Generate report from execution log
let log_entries = sqlx::query_as::<_, crate::db::ExecutionLogEntry>(
"SELECT * FROM execution_log WHERE workflow_id = ? ORDER BY created_at"
)
.bind(&workflow_id)
.fetch_all(&pool)
.await
.unwrap_or_default();
if let Ok(report) = generate_report(&llm, &requirement, &log_entries, &project_id).await {
let _ = sqlx::query("UPDATE workflows SET report = ? WHERE id = ?")
.bind(&report)
.bind(&workflow_id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::ReportReady {
workflow_id: workflow_id.clone(),
});
}
}
AgentEvent::Comment { workflow_id, content } => {
tracing::info!("Comment on workflow {}: {}", workflow_id, content);
let wf = sqlx::query_as::<_, crate::db::Workflow>(
"SELECT * FROM workflows WHERE id = ?",
)
.bind(&workflow_id)
.fetch_optional(&pool)
.await
.ok()
.flatten();
let Some(wf) = wf else { continue };
// Load latest state snapshot
let snapshot = sqlx::query_scalar::<_, String>(
"SELECT state_json FROM agent_state_snapshots WHERE workflow_id = ? ORDER BY created_at DESC LIMIT 1"
)
.bind(&workflow_id)
.fetch_optional(&pool)
.await
.ok()
.flatten();
let mut state = snapshot
.and_then(|json| serde_json::from_str::<AgentState>(&json).ok())
.unwrap_or_else(AgentState::new);
// For failed/done workflows: reset failed steps and continue directly
// For running workflows: process feedback via LLM
let is_resuming = wf.status == "failed" || wf.status == "done";
if is_resuming {
// Reset Failed steps to Pending so they get re-executed
for step in &mut state.steps {
if matches!(step.status, StepStatus::Failed) {
step.status = StepStatus::Pending;
}
}
// Attach comment as feedback to the first actionable step
if let Some(order) = state.first_actionable_step() {
if let Some(step) = state.steps.iter_mut().find(|s| s.order == order) {
step.user_feedbacks.push(content.clone());
}
}
tracing::info!("[workflow {}] Resuming from state, first actionable: {:?}",
workflow_id, state.first_actionable_step());
} else {
// Active workflow: LLM decides whether to revise plan
state = process_feedback(
&llm, &pool, &broadcast_tx,
&project_id, &workflow_id, state, &content,
).await;
}
// If there are actionable steps, resume execution
if state.first_actionable_step().is_some() {
ensure_workspace(&exec, &workdir).await;
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: "executing".into(),
});
let _ = sqlx::query("UPDATE workflows SET status = 'executing' WHERE id = ?")
.bind(&workflow_id)
.execute(&pool)
.await;
// Prepare state for execution: set first pending step to Running
if let Some(next) = state.first_actionable_step() {
if let Some(step) = state.steps.iter_mut().find(|s| s.order == next) {
if matches!(step.status, StepStatus::Pending) {
step.status = StepStatus::Running;
}
}
state.phase = AgentPhase::Executing { step: next };
state.current_step_chat_history.clear();
}
let instructions = read_instructions(&workdir).await;
// Reload external tools from template if available
let ext_tools = if !wf.template_id.is_empty() {
let tid = &wf.template_id;
if template::is_repo_template(tid) {
match template::extract_repo_template(tid, mgr.template_repo.as_ref()).await {
Ok(template_dir) => {
LoadedTemplate::load_from_dir(tid, &template_dir).await
.ok().map(|t| t.external_tools)
}
Err(e) => {
tracing::warn!("Failed to reload template {}: {}", tid, e);
None
}
}
} else {
LoadedTemplate::load(tid).await.ok().map(|t| t.external_tools)
}
} else {
None
};
let result = run_agent_loop(
&llm, &exec, &pool, &broadcast_tx,
&project_id, &workflow_id, &wf.requirement, &workdir, &mgr,
&instructions, Some(state), ext_tools.as_ref(), &mut rx,
).await;
let final_status = if result.is_ok() { "done" } else { "failed" };
if let Err(e) = &result {
let _ = broadcast_tx.send(WsMessage::Error {
message: format!("Agent error: {}", e),
});
}
let _ = sqlx::query("UPDATE workflows SET status = ? WHERE id = ?")
.bind(final_status)
.bind(&workflow_id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: final_status.into(),
});
// Regenerate report
let log_entries = sqlx::query_as::<_, crate::db::ExecutionLogEntry>(
"SELECT * FROM execution_log WHERE workflow_id = ? ORDER BY created_at"
)
.bind(&workflow_id)
.fetch_all(&pool)
.await
.unwrap_or_default();
if let Ok(report) = generate_report(&llm, &wf.requirement, &log_entries, &project_id).await {
let _ = sqlx::query("UPDATE workflows SET report = ? WHERE id = ?")
.bind(&report)
.bind(&workflow_id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::ReportReady {
workflow_id: workflow_id.clone(),
});
}
} else {
// No actionable steps — feedback was informational only
// Mark workflow back to done
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: "done".into(),
});
}
}
}
}
tracing::info!("Agent loop ended for project {}", project_id);
}
// --- Tool definitions ---
fn make_tool(name: &str, description: &str, parameters: serde_json::Value) -> Tool {
Tool {
tool_type: "function".into(),
function: ToolFunction {
name: name.into(),
description: description.into(),
parameters,
},
}
}
fn tool_read_file() -> Tool {
make_tool("read_file", "读取工作区中的文件内容", serde_json::json!({
"type": "object",
"properties": {
"path": { "type": "string", "description": "工作区内的相对路径" }
},
"required": ["path"]
}))
}
fn tool_list_files() -> Tool {
make_tool("list_files", "列出工作区目录中的文件和子目录", serde_json::json!({
"type": "object",
"properties": {
"path": { "type": "string", "description": "工作区内的相对路径,默认为根目录" }
}
}))
}
fn tool_kb_search() -> Tool {
make_tool("kb_search", "搜索知识库中与查询相关的内容片段。返回最相关的 top-5 片段。", serde_json::json!({
"type": "object",
"properties": {
"query": { "type": "string", "description": "搜索查询" }
},
"required": ["query"]
}))
}
fn tool_kb_read() -> Tool {
make_tool("kb_read", "读取知识库全文内容。", serde_json::json!({
"type": "object",
"properties": {}
}))
}
fn build_planning_tools() -> Vec<Tool> {
vec![
make_tool("update_plan", "设置高层执行计划。分析需求后调用此工具提交计划。每个步骤应是一个逻辑阶段(不是具体命令),包含简短标题和详细描述。调用后自动进入执行阶段。", serde_json::json!({
"type": "object",
"properties": {
"steps": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": { "type": "string", "description": "步骤标题,简短概括(如'搭建环境'" },
"description": { "type": "string", "description": "详细描述,说明具体要做什么、为什么" }
},
"required": ["title", "description"]
},
"description": "高层计划步骤列表"
}
},
"required": ["steps"]
})),
tool_list_files(),
tool_read_file(),
tool_kb_search(),
tool_kb_read(),
]
}
/// Coordinator tools — used by the main loop after step completion
fn build_coordinator_tools() -> Vec<Tool> {
vec![
make_tool("update_plan", "修改执行计划。提供完整步骤列表,系统自动 diffdescription 未变的已完成步骤保留成果,变化的步骤及后续重新执行。", serde_json::json!({
"type": "object",
"properties": {
"steps": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": { "type": "string", "description": "步骤标题" },
"description": { "type": "string", "description": "详细描述" }
},
"required": ["title", "description"]
}
}
},
"required": ["steps"]
})),
make_tool("update_scratchpad", "更新全局备忘录。跨步骤持久化的关键信息。新内容会追加到已有内容之后。", serde_json::json!({
"type": "object",
"properties": {
"content": { "type": "string", "description": "要追加的内容" }
},
"required": ["content"]
})),
make_tool("update_requirement", "更新项目需求描述。当需要调整方向时使用。", serde_json::json!({
"type": "object",
"properties": {
"requirement": { "type": "string", "description": "更新后的需求描述" }
},
"required": ["requirement"]
})),
]
}
/// Step execution tools — used by run_step_loop
fn build_step_tools() -> Vec<Tool> {
vec![
make_tool("execute", "在工作区目录中执行 shell 命令", serde_json::json!({
"type": "object",
"properties": {
"command": { "type": "string", "description": "要执行的 shell 命令" }
},
"required": ["command"]
})),
tool_read_file(),
make_tool("write_file", "在工作区中写入文件(自动创建父目录)", serde_json::json!({
"type": "object",
"properties": {
"path": { "type": "string", "description": "工作区内的相对路径" },
"content": { "type": "string", "description": "要写入的文件内容" }
},
"required": ["path", "content"]
})),
tool_list_files(),
make_tool("start_service", "启动后台服务进程(如 FastAPI 应用)。系统会自动分配端口并通过环境变量 PORT 传入。服务启动后可通过 /api/projects/{project_id}/app/ 访问。注意:启动命令应监听 0.0.0.0:$PORT。", serde_json::json!({
"type": "object",
"properties": {
"command": { "type": "string", "description": "启动命令,如 'uvicorn main:app --host 0.0.0.0 --port $PORT'" }
},
"required": ["command"]
})),
make_tool("stop_service", "停止当前项目正在运行的后台服务进程。", serde_json::json!({
"type": "object",
"properties": {}
})),
make_tool("update_scratchpad", "更新步骤级工作记忆。用于记录本步骤内的中间状态(步骤结束后丢弃,精华写进 summary。不是日志只保留当前有用的信息。", serde_json::json!({
"type": "object",
"properties": {
"content": { "type": "string", "description": "要追加的内容" }
},
"required": ["content"]
})),
make_tool("wait_for_approval", "暂停执行,等待用户确认后继续。用于关键决策点。", serde_json::json!({
"type": "object",
"properties": {
"reason": { "type": "string", "description": "说明为什么需要用户确认" }
},
"required": ["reason"]
})),
make_tool("step_done", "完成当前步骤。必须提供摘要和产出物列表(无产出物时传空数组)。", serde_json::json!({
"type": "object",
"properties": {
"summary": { "type": "string", "description": "本步骤的工作摘要" },
"artifacts": {
"type": "array",
"description": "本步骤的产出物列表。无产出物时传空数组 []",
"items": {
"type": "object",
"properties": {
"name": { "type": "string", "description": "产物名称" },
"path": { "type": "string", "description": "文件路径(相对工作目录)" },
"type": { "type": "string", "enum": ["file", "json", "markdown"], "description": "产物类型" },
"description": { "type": "string", "description": "一句话说明" }
},
"required": ["name", "path", "type"]
}
}
},
"required": ["summary", "artifacts"]
})),
tool_kb_search(),
tool_kb_read(),
]
}
fn build_planning_prompt(project_id: &str, instructions: &str) -> String {
let mut prompt = include_str!("prompts/planning.md")
.replace("{project_id}", project_id);
if !instructions.is_empty() {
prompt.push_str(&format!("\n\n## 项目模板指令\n\n{}", instructions));
}
prompt
}
fn build_coordinator_prompt(project_id: &str, instructions: &str) -> String {
let mut prompt = include_str!("prompts/execution.md")
.replace("{project_id}", project_id);
if !instructions.is_empty() {
prompt.push_str(&format!("\n\n## 项目模板指令\n\n{}", instructions));
}
prompt
}
fn build_step_execution_prompt(project_id: &str, instructions: &str) -> String {
let mut prompt = include_str!("prompts/step_execution.md")
.replace("{project_id}", project_id);
if !instructions.is_empty() {
prompt.push_str(&format!("\n\n## 项目模板指令\n\n{}", instructions));
}
prompt
}
/// Build user message for a step sub-loop
fn build_step_user_message(step: &Step, completed_summaries: &[(i32, String, String, Vec<Artifact>)], parent_scratchpad: &str) -> String {
let mut ctx = String::new();
ctx.push_str(&format!("## 当前步骤(步骤 {}\n", step.order));
ctx.push_str(&format!("标题:{}\n", step.title));
ctx.push_str(&format!("描述:{}\n", step.description));
if !step.user_feedbacks.is_empty() {
ctx.push_str("\n用户反馈:\n");
for fb in &step.user_feedbacks {
ctx.push_str(&format!("- {}\n", fb));
}
}
ctx.push('\n');
if !completed_summaries.is_empty() {
ctx.push_str("## 已完成步骤摘要\n");
for (order, title, summary, artifacts) in completed_summaries {
ctx.push_str(&format!("- 步骤 {} ({}): {}\n", order, title, summary));
if !artifacts.is_empty() {
let arts: Vec<String> = artifacts.iter()
.map(|a| format!("{} ({})", a.name, a.artifact_type))
.collect();
ctx.push_str(&format!(" 产物: {}\n", arts.join(", ")));
}
}
ctx.push('\n');
}
if !parent_scratchpad.is_empty() {
ctx.push_str("## 项目备忘录(只读)\n");
ctx.push_str(parent_scratchpad);
ctx.push('\n');
}
ctx
}
fn build_feedback_prompt(project_id: &str, state: &AgentState, feedback: &str) -> String {
let mut plan_state = String::new();
for s in &state.steps {
let status = match s.status {
StepStatus::Done => " [done]",
StepStatus::Running => " [running]",
StepStatus::WaitingApproval => " [waiting]",
StepStatus::Failed => " [FAILED]",
StepStatus::Pending => "",
};
plan_state.push_str(&format!("{}. {}{}\n {}\n", s.order, s.title, status, s.description));
if let Some(summary) = &s.summary {
plan_state.push_str(&format!(" 摘要: {}\n", summary));
}
}
include_str!("prompts/feedback.md")
.replace("{project_id}", project_id)
.replace("{plan_state}", &plan_state)
.replace("{feedback}", feedback)
}
fn build_feedback_tools() -> Vec<Tool> {
vec![
make_tool("revise_plan", "修改执行计划。提供完整步骤列表。系统自动 diffdescription 未变的已完成步骤保留成果,变化的步骤及后续重新执行。", serde_json::json!({
"type": "object",
"properties": {
"steps": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": { "type": "string", "description": "步骤标题" },
"description": { "type": "string", "description": "详细描述" }
},
"required": ["title", "description"]
}
}
},
"required": ["steps"]
})),
]
}
// --- Helpers ---
/// Truncate a string at a char boundary, returning at most `max_bytes` bytes.
fn truncate_str(s: &str, max_bytes: usize) -> &str {
if s.len() <= max_bytes {
return s;
}
let mut end = max_bytes;
while end > 0 && !s.is_char_boundary(end) {
end -= 1;
}
&s[..end]
}
// --- Tool execution ---
async fn execute_tool(
name: &str,
arguments: &str,
workdir: &str,
exec: &LocalExecutor,
) -> String {
let args: serde_json::Value = serde_json::from_str(arguments).unwrap_or_default();
match name {
"execute" => {
let cmd = args["command"].as_str().unwrap_or("");
match exec.execute(cmd, workdir).await {
Ok(r) => {
let mut out = r.stdout;
if !r.stderr.is_empty() {
out.push_str("\nSTDERR: ");
out.push_str(&r.stderr);
}
if r.exit_code != 0 {
out.push_str(&format!("\n[exit code: {}]", r.exit_code));
}
if out.len() > 8000 {
let truncated = truncate_str(&out, 8000).to_string();
out = truncated;
out.push_str("\n...(truncated)");
}
out
}
Err(e) => format!("Error: {}", e),
}
}
"read_file" => {
let path = args["path"].as_str().unwrap_or("");
if path.contains("..") {
return "Error: path traversal not allowed".into();
}
let full = std::path::PathBuf::from(workdir).join(path);
match tokio::fs::read_to_string(&full).await {
Ok(content) => {
if content.len() > 8000 {
format!("{}...(truncated, {} bytes total)", truncate_str(&content, 8000), content.len())
} else {
content
}
}
Err(e) => format!("Error: {}", e),
}
}
"write_file" => {
let path = args["path"].as_str().unwrap_or("");
let content = args["content"].as_str().unwrap_or("");
if path.contains("..") {
return "Error: path traversal not allowed".into();
}
let full = std::path::PathBuf::from(workdir).join(path);
if let Some(parent) = full.parent() {
let _ = tokio::fs::create_dir_all(parent).await;
}
match tokio::fs::write(&full, content).await {
Ok(()) => format!("Written {} bytes to {}", content.len(), path),
Err(e) => format!("Error: {}", e),
}
}
"list_files" => {
let path = args["path"].as_str().unwrap_or(".");
if path.contains("..") {
return "Error: path traversal not allowed".into();
}
let full = std::path::PathBuf::from(workdir).join(path);
match tokio::fs::read_dir(&full).await {
Ok(mut entries) => {
let mut items = Vec::new();
while let Ok(Some(entry)) = entries.next_entry().await {
let name = entry.file_name().to_string_lossy().to_string();
let is_dir = entry.file_type().await.map(|t| t.is_dir()).unwrap_or(false);
items.push(if is_dir { format!("{}/", name) } else { name });
}
items.sort();
if items.is_empty() { "(empty directory)".into() } else { items.join("\n") }
}
Err(e) => format!("Error: {}", e),
}
}
_ => format!("Unknown tool: {}", name),
}
}
// --- Tool-calling agent loop (state machine) ---
/// Save an AgentState snapshot to DB.
async fn save_state_snapshot(pool: &SqlitePool, workflow_id: &str, step_order: i32, state: &AgentState) {
let id = uuid::Uuid::new_v4().to_string();
let json = serde_json::to_string(state).unwrap_or_default();
let _ = sqlx::query(
"INSERT INTO agent_state_snapshots (id, workflow_id, step_order, state_json, created_at) VALUES (?, ?, ?, ?, datetime('now'))"
)
.bind(&id)
.bind(workflow_id)
.bind(step_order)
.bind(&json)
.execute(pool)
.await;
}
/// Log a tool call to execution_log and broadcast to frontend.
async fn log_execution(
pool: &SqlitePool,
broadcast_tx: &broadcast::Sender<WsMessage>,
workflow_id: &str,
step_order: i32,
tool_name: &str,
tool_input: &str,
output: &str,
status: &str,
) {
let id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT INTO execution_log (id, workflow_id, step_order, tool_name, tool_input, output, status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))"
)
.bind(&id)
.bind(workflow_id)
.bind(step_order)
.bind(tool_name)
.bind(tool_input)
.bind(output)
.bind(status)
.execute(pool)
.await;
let _ = broadcast_tx.send(WsMessage::StepStatusUpdate {
step_id: id,
status: status.into(),
output: output.to_string(),
});
}
/// Log an LLM call to llm_call_log and broadcast to frontend.
#[allow(clippy::too_many_arguments)]
async fn log_llm_call(
pool: &SqlitePool,
broadcast_tx: &broadcast::Sender<WsMessage>,
workflow_id: &str,
step_order: i32,
phase: &str,
messages_count: i32,
tools_count: i32,
tool_calls_json: &str,
text_response: &str,
prompt_tokens: Option<u32>,
completion_tokens: Option<u32>,
latency_ms: i64,
) {
let id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT INTO llm_call_log (id, workflow_id, step_order, phase, messages_count, tools_count, tool_calls, text_response, prompt_tokens, completion_tokens, latency_ms, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))"
)
.bind(&id)
.bind(workflow_id)
.bind(step_order)
.bind(phase)
.bind(messages_count)
.bind(tools_count)
.bind(tool_calls_json)
.bind(text_response)
.bind(prompt_tokens.map(|v| v as i32))
.bind(completion_tokens.map(|v| v as i32))
.bind(latency_ms as i32)
.execute(pool)
.await;
let entry = crate::db::LlmCallLogEntry {
id: id.clone(),
workflow_id: workflow_id.to_string(),
step_order,
phase: phase.to_string(),
messages_count,
tools_count,
tool_calls: tool_calls_json.to_string(),
text_response: text_response.to_string(),
prompt_tokens: prompt_tokens.map(|v| v as i32),
completion_tokens: completion_tokens.map(|v| v as i32),
latency_ms: latency_ms as i32,
created_at: String::new(),
};
let _ = broadcast_tx.send(WsMessage::LlmCallLog {
workflow_id: workflow_id.to_string(),
entry,
});
}
/// Process user feedback: call LLM to decide whether to revise the plan.
/// Returns the (possibly modified) AgentState ready for resumed execution.
async fn process_feedback(
llm: &LlmClient,
pool: &SqlitePool,
broadcast_tx: &broadcast::Sender<WsMessage>,
project_id: &str,
workflow_id: &str,
mut state: AgentState,
feedback: &str,
) -> AgentState {
let prompt = build_feedback_prompt(project_id, &state, feedback);
let tools = build_feedback_tools();
let messages = vec![
ChatMessage::system(&prompt),
ChatMessage::user(feedback),
];
tracing::info!("[workflow {}] Processing feedback with LLM", workflow_id);
let response = match llm.chat_with_tools(messages, &tools).await {
Ok(r) => r,
Err(e) => {
tracing::error!("[workflow {}] Feedback LLM call failed: {}", workflow_id, e);
// On failure, attach feedback to first non-done step and return unchanged
if let Some(step) = state.steps.iter_mut().find(|s| !matches!(s.status, StepStatus::Done)) {
step.user_feedbacks.push(feedback.to_string());
}
return state;
}
};
let choice = match response.choices.into_iter().next() {
Some(c) => c,
None => return state,
};
if let Some(tool_calls) = &choice.message.tool_calls {
for tc in tool_calls {
if tc.function.name == "revise_plan" {
let args: serde_json::Value = serde_json::from_str(&tc.function.arguments).unwrap_or_default();
let raw_steps = args["steps"].as_array().cloned().unwrap_or_default();
let new_steps: Vec<Step> = raw_steps.iter().enumerate().map(|(i, item)| {
let order = (i + 1) as i32;
Step {
order,
title: item["title"].as_str().unwrap_or("").to_string(),
description: item["description"].as_str().unwrap_or("").to_string(),
status: StepStatus::Pending,
summary: None,
user_feedbacks: Vec::new(),
db_id: String::new(),
artifacts: Vec::new(),
}
}).collect();
// Apply docker-cache diff
state.apply_plan_diff(new_steps);
// Broadcast updated plan
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
workflow_id: workflow_id.to_string(),
steps: plan_infos_from_state(&state),
});
tracing::info!("[workflow {}] Plan revised via feedback. First actionable: {:?}",
workflow_id, state.first_actionable_step());
}
}
} else {
// Text response only — feedback is informational, no plan change
let text = choice.message.content.as_deref().unwrap_or("");
tracing::info!("[workflow {}] Feedback processed, no plan change: {}", workflow_id, truncate_str(text, 200));
log_execution(pool, broadcast_tx, workflow_id, state.current_step(), "text_response", "", text, "done").await;
}
// Attach feedback to the first actionable step (or last step)
let target_order = state.first_actionable_step()
.unwrap_or_else(|| state.steps.last().map(|s| s.order).unwrap_or(0));
if let Some(step) = state.steps.iter_mut().find(|s| s.order == target_order) {
step.user_feedbacks.push(feedback.to_string());
}
// Snapshot after feedback processing
save_state_snapshot(pool, workflow_id, state.current_step(), &state).await;
state
}
/// Run an isolated sub-loop for a single step. Returns StepResult.
#[allow(clippy::too_many_arguments)]
async fn run_step_loop(
llm: &LlmClient,
exec: &LocalExecutor,
pool: &SqlitePool,
broadcast_tx: &broadcast::Sender<WsMessage>,
project_id: &str,
workflow_id: &str,
workdir: &str,
mgr: &Arc<AgentManager>,
instructions: &str,
step: &Step,
completed_summaries: &[(i32, String, String, Vec<Artifact>)],
parent_scratchpad: &str,
external_tools: Option<&ExternalToolManager>,
event_rx: &mut mpsc::Receiver<AgentEvent>,
) -> StepResult {
let system_prompt = build_step_execution_prompt(project_id, instructions);
let user_message = build_step_user_message(step, completed_summaries, parent_scratchpad);
let mut step_tools = build_step_tools();
if let Some(ext) = external_tools {
step_tools.extend(ext.tool_definitions());
}
let mut step_chat_history: Vec<ChatMessage> = Vec::new();
let mut step_scratchpad = String::new();
let step_order = step.order;
for iteration in 0..50 {
// Build messages: system + user context + chat history
let mut messages = vec![
ChatMessage::system(&system_prompt),
ChatMessage::user(&user_message),
];
// If step scratchpad is non-empty, inject it
if !step_scratchpad.is_empty() {
let last_user = messages.len() - 1;
if let Some(content) = &messages[last_user].content {
let mut amended = content.clone();
amended.push_str(&format!("\n\n## 步骤工作记忆\n{}", step_scratchpad));
messages[last_user].content = Some(amended);
}
}
messages.extend(step_chat_history.clone());
let msg_count = messages.len() as i32;
let tool_count = step_tools.len() as i32;
let phase_label = format!("step({})", step_order);
tracing::info!("[workflow {}] Step {} LLM call #{} msgs={}", workflow_id, step_order, iteration + 1, messages.len());
let _ = broadcast_tx.send(WsMessage::ActivityUpdate {
workflow_id: workflow_id.to_string(),
activity: format!("步骤 {} — 等待 LLM 响应...", step_order),
});
let call_start = std::time::Instant::now();
let response = match llm.chat_with_tools(messages, &step_tools).await {
Ok(r) => r,
Err(e) => {
tracing::error!("[workflow {}] Step {} LLM call failed: {}", workflow_id, step_order, e);
return StepResult {
status: StepResultStatus::Failed { error: format!("LLM call failed: {}", e) },
artifacts: Vec::new(),
summary: format!("LLM 调用失败: {}", e),
};
}
};
let latency_ms = call_start.elapsed().as_millis() as i64;
let (prompt_tokens, completion_tokens) = response.usage.as_ref()
.map(|u| (Some(u.prompt_tokens), Some(u.completion_tokens)))
.unwrap_or((None, None));
let choice = match response.choices.into_iter().next() {
Some(c) => c,
None => {
return StepResult {
status: StepResultStatus::Failed { error: "No response from LLM".into() },
summary: "LLM 无响应".into(),
artifacts: Vec::new(),
};
}
};
step_chat_history.push(choice.message.clone());
let llm_text_response = choice.message.content.clone().unwrap_or_default();
if let Some(tool_calls) = &choice.message.tool_calls {
tracing::info!("[workflow {}] Step {} tool calls: {}", workflow_id, step_order,
tool_calls.iter().map(|tc| tc.function.name.as_str()).collect::<Vec<_>>().join(", "));
let mut step_done_result: Option<StepResult> = None;
for tc in tool_calls {
if step_done_result.is_some() {
step_chat_history.push(ChatMessage::tool_result(&tc.id, "(skipped: step completed)"));
continue;
}
let args: serde_json::Value = serde_json::from_str(&tc.function.arguments).unwrap_or_default();
match tc.function.name.as_str() {
"step_done" => {
let summary = args["summary"].as_str().unwrap_or("").to_string();
let artifacts: Vec<Artifact> = args.get("artifacts")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|a| {
Some(Artifact {
name: a["name"].as_str()?.to_string(),
path: a["path"].as_str()?.to_string(),
artifact_type: a["type"].as_str().unwrap_or("file").to_string(),
description: a["description"].as_str().unwrap_or("").to_string(),
})
}).collect())
.unwrap_or_default();
// Save artifacts to DB
for art in &artifacts {
let art_id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT INTO step_artifacts (id, workflow_id, step_order, name, path, artifact_type, description) VALUES (?, ?, ?, ?, ?, ?, ?)"
)
.bind(&art_id)
.bind(workflow_id)
.bind(step_order)
.bind(&art.name)
.bind(&art.path)
.bind(&art.artifact_type)
.bind(&art.description)
.execute(pool)
.await;
}
log_execution(pool, broadcast_tx, workflow_id, step_order, "step_done", &summary, "步骤完成", "done").await;
step_chat_history.push(ChatMessage::tool_result(&tc.id, "步骤已完成。"));
step_done_result = Some(StepResult {
status: StepResultStatus::Done,
summary,
artifacts,
});
}
"update_scratchpad" => {
let content = args["content"].as_str().unwrap_or("");
let mut new_pad = step_scratchpad.clone();
if !new_pad.is_empty() {
new_pad.push('\n');
}
new_pad.push_str(content);
match check_scratchpad_size(&new_pad) {
Ok(()) => {
step_scratchpad = new_pad;
step_chat_history.push(ChatMessage::tool_result(&tc.id, "步骤工作记忆已更新。"));
}
Err(msg) => {
step_chat_history.push(ChatMessage::tool_result(&tc.id, &msg));
}
}
}
"wait_for_approval" => {
let reason = args["reason"].as_str().unwrap_or("等待确认");
let _ = broadcast_tx.send(WsMessage::ActivityUpdate {
workflow_id: workflow_id.to_string(),
activity: format!("步骤 {} — 等待用户确认: {}", step_order, reason),
});
// Broadcast waiting status
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
workflow_id: workflow_id.to_string(),
steps: plan_infos_from_state_with_override(step_order, "waiting_approval",
pool, workflow_id).await,
});
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.to_string(),
status: "waiting_approval".into(),
});
let _ = sqlx::query("UPDATE workflows SET status = 'waiting_approval' WHERE id = ?")
.bind(workflow_id)
.execute(pool)
.await;
log_execution(pool, broadcast_tx, workflow_id, step_order, "wait_for_approval", reason, reason, "waiting").await;
tracing::info!("[workflow {}] Step {} waiting for approval: {}", workflow_id, step_order, reason);
// Block until Comment event
let approval_content = loop {
match event_rx.recv().await {
Some(AgentEvent::Comment { content, .. }) => break content,
Some(_) => continue,
None => {
return StepResult {
status: StepResultStatus::Failed { error: "Event channel closed".into() },
summary: "事件通道关闭".into(),
artifacts: Vec::new(),
};
}
}
};
tracing::info!("[workflow {}] Step {} approval response: {}", workflow_id, step_order, approval_content);
if approval_content.starts_with("rejected:") {
let reason = approval_content.strip_prefix("rejected:").unwrap_or("").trim();
log_execution(pool, broadcast_tx, workflow_id, step_order, "wait_for_approval", "rejected", reason, "failed").await;
step_chat_history.push(ChatMessage::tool_result(&tc.id, &format!("用户拒绝: {}", reason)));
step_done_result = Some(StepResult {
status: StepResultStatus::Failed { error: format!("用户终止: {}", reason) },
summary: format!("用户终止了执行: {}", reason),
artifacts: Vec::new(),
});
continue;
}
// Approved
let feedback = if approval_content.starts_with("approved:") {
approval_content.strip_prefix("approved:").unwrap_or("").trim().to_string()
} else {
approval_content.clone()
};
let _ = sqlx::query("UPDATE workflows SET status = 'executing' WHERE id = ?")
.bind(workflow_id)
.execute(pool)
.await;
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.to_string(),
status: "executing".into(),
});
let tool_msg = if feedback.is_empty() {
"用户已确认,继续执行。".to_string()
} else {
format!("用户已确认。反馈: {}", feedback)
};
step_chat_history.push(ChatMessage::tool_result(&tc.id, &tool_msg));
}
"start_service" => {
let cmd = args["command"].as_str().unwrap_or("");
{
let mut services = mgr.services.write().await;
if let Some(old) = services.remove(project_id) {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(old.pid as i32),
nix::sys::signal::Signal::SIGTERM,
);
}
}
let port = mgr.allocate_port();
let cmd_with_port = cmd.replace("$PORT", &port.to_string());
let venv_bin = format!("{}/.venv/bin", workdir);
let path_env = match std::env::var("PATH") {
Ok(p) => format!("{}:{}", venv_bin, p),
Err(_) => venv_bin,
};
let result = match tokio::process::Command::new("sh")
.arg("-c")
.arg(&cmd_with_port)
.current_dir(workdir)
.env("PORT", port.to_string())
.env("PATH", &path_env)
.env("VIRTUAL_ENV", format!("{}/.venv", workdir))
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
{
Ok(child) => {
let pid = child.id().unwrap_or(0);
mgr.services.write().await.insert(project_id.to_string(), ServiceInfo { port, pid });
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
format!("服务已启动,端口 {},访问地址:/api/projects/{}/app/", port, project_id)
}
Err(e) => format!("Error: 启动失败:{}", e),
};
let status = if result.starts_with("Error:") { "failed" } else { "done" };
log_execution(pool, broadcast_tx, workflow_id, step_order, "start_service", cmd, &result, status).await;
step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
}
"stop_service" => {
let mut services = mgr.services.write().await;
let result = if let Some(svc) = services.remove(project_id) {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(svc.pid as i32),
nix::sys::signal::Signal::SIGTERM,
);
"服务已停止。".to_string()
} else {
"当前没有运行中的服务。".to_string()
};
log_execution(pool, broadcast_tx, workflow_id, step_order, "stop_service", "", &result, "done").await;
step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
}
"kb_search" => {
let query = args["query"].as_str().unwrap_or("");
let result = if let Some(kb) = &mgr.kb {
match kb.search(query).await {
Ok(results) if results.is_empty() => "知识库为空或没有匹配结果。".to_string(),
Ok(results) => {
results.iter().enumerate().map(|(i, r)| {
let article_label = if r.article_title.is_empty() {
String::new()
} else {
format!(" [文章: {}]", r.article_title)
};
format!("--- 片段 {} (相似度: {:.2}){} ---\n{}", i + 1, r.score, article_label, r.content)
}).collect::<Vec<_>>().join("\n\n")
}
Err(e) => format!("Error: {}", e),
}
} else {
"知识库未初始化。".to_string()
};
step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
}
"kb_read" => {
let result = if let Some(kb) = &mgr.kb {
match kb.read_all().await {
Ok(content) if content.is_empty() => "知识库为空。".to_string(),
Ok(content) => content,
Err(e) => format!("Error: {}", e),
}
} else {
"知识库未初始化。".to_string()
};
step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
}
// External tools
name if external_tools.as_ref().is_some_and(|e| e.has_tool(name)) => {
let _ = broadcast_tx.send(WsMessage::ActivityUpdate {
workflow_id: workflow_id.to_string(),
activity: format!("步骤 {} — 工具: {}", step_order, name),
});
let result = match external_tools.unwrap().invoke(name, &tc.function.arguments, workdir).await {
Ok(output) => {
let truncated = truncate_str(&output, 8192);
truncated.to_string()
}
Err(e) => format!("Tool error: {}", e),
};
let status = if result.starts_with("Tool error:") { "failed" } else { "done" };
log_execution(pool, broadcast_tx, workflow_id, step_order, &tc.function.name, &tc.function.arguments, &result, status).await;
step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
}
// IO tools: execute, read_file, write_file, list_files
_ => {
let tool_desc = match tc.function.name.as_str() {
"execute" => {
let cmd_preview = args.get("command").and_then(|v| v.as_str()).unwrap_or("").chars().take(60).collect::<String>();
format!("执行命令: {}", cmd_preview)
}
"read_file" => format!("读取文件: {}", args.get("path").and_then(|v| v.as_str()).unwrap_or("?")),
"write_file" => format!("写入文件: {}", args.get("path").and_then(|v| v.as_str()).unwrap_or("?")),
"list_files" => "列出文件".to_string(),
other => format!("工具: {}", other),
};
let _ = broadcast_tx.send(WsMessage::ActivityUpdate {
workflow_id: workflow_id.to_string(),
activity: format!("步骤 {} — {}", step_order, tool_desc),
});
let result = execute_tool(&tc.function.name, &tc.function.arguments, workdir, exec).await;
let status = if result.starts_with("Error:") { "failed" } else { "done" };
log_execution(pool, broadcast_tx, workflow_id, step_order, &tc.function.name, &tc.function.arguments, &result, status).await;
step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
}
}
}
// Log LLM call
let tc_json: Vec<serde_json::Value> = tool_calls.iter().map(|tc| {
serde_json::json!({
"name": tc.function.name,
"arguments_preview": truncate_str(&tc.function.arguments, 200),
})
}).collect();
let tc_json_str = serde_json::to_string(&tc_json).unwrap_or_else(|_| "[]".to_string());
log_llm_call(
pool, broadcast_tx, workflow_id, step_order,
&phase_label, msg_count, tool_count,
&tc_json_str, &llm_text_response,
prompt_tokens, completion_tokens, latency_ms,
).await;
if let Some(result) = step_done_result {
return result;
}
} else {
// Text response without tool calls
let content = choice.message.content.as_deref().unwrap_or("(no content)");
tracing::info!("[workflow {}] Step {} text response: {}", workflow_id, step_order, truncate_str(content, 200));
log_execution(pool, broadcast_tx, workflow_id, step_order, "text_response", "", content, "done").await;
log_llm_call(
pool, broadcast_tx, workflow_id, step_order,
&phase_label, msg_count, tool_count,
"[]", content,
prompt_tokens, completion_tokens, latency_ms,
).await;
// Text response in step loop — continue, LLM may follow up with tool calls
}
}
// Hit 50-iteration limit
tracing::warn!("[workflow {}] Step {} hit iteration limit (50)", workflow_id, step_order);
StepResult {
status: StepResultStatus::Failed { error: "步骤迭代次数超限50轮".into() },
summary: "步骤执行超过50轮迭代限制未能完成".into(),
artifacts: Vec::new(),
}
}
/// Helper to get plan step infos with a status override for a specific step.
/// Used during wait_for_approval in the step sub-loop where we don't have
/// mutable access to the AgentState.
async fn plan_infos_from_state_with_override(
step_order: i32,
override_status: &str,
pool: &SqlitePool,
workflow_id: &str,
) -> Vec<PlanStepInfo> {
// Read the latest state snapshot to get step info
let snapshot = sqlx::query_scalar::<_, String>(
"SELECT state_json FROM agent_state_snapshots WHERE workflow_id = ? ORDER BY created_at DESC LIMIT 1"
)
.bind(workflow_id)
.fetch_optional(pool)
.await
.ok()
.flatten();
if let Some(json) = snapshot {
if let Ok(state) = serde_json::from_str::<AgentState>(&json) {
return state.steps.iter().map(|s| {
let status = if s.order == step_order {
override_status.to_string()
} else {
match s.status {
StepStatus::Pending => "pending",
StepStatus::Running => "running",
StepStatus::WaitingApproval => "waiting_approval",
StepStatus::Done => "done",
StepStatus::Failed => "failed",
}.to_string()
};
PlanStepInfo {
order: s.order,
description: s.title.clone(),
command: s.description.clone(),
status: Some(status),
artifacts: s.artifacts.clone(),
}
}).collect();
}
}
Vec::new()
}
#[allow(clippy::too_many_arguments)]
async fn run_agent_loop(
llm: &LlmClient,
exec: &LocalExecutor,
pool: &SqlitePool,
broadcast_tx: &broadcast::Sender<WsMessage>,
project_id: &str,
workflow_id: &str,
requirement: &str,
workdir: &str,
mgr: &Arc<AgentManager>,
instructions: &str,
initial_state: Option<AgentState>,
external_tools: Option<&ExternalToolManager>,
event_rx: &mut mpsc::Receiver<AgentEvent>,
) -> anyhow::Result<()> {
let planning_tools = build_planning_tools();
let coordinator_tools = build_coordinator_tools();
let mut state = initial_state.unwrap_or_else(AgentState::new);
// --- Planning phase loop ---
// Keep iterating until we transition out of Planning
for iteration in 0..20 {
if !matches!(state.phase, AgentPhase::Planning) {
break;
}
let system_prompt = build_planning_prompt(project_id, instructions);
let messages = state.build_messages(&system_prompt, requirement);
let msg_count = messages.len() as i32;
let tool_count = planning_tools.len() as i32;
tracing::info!("[workflow {}] Planning LLM call #{} msgs={}", workflow_id, iteration + 1, messages.len());
let _ = broadcast_tx.send(WsMessage::ActivityUpdate {
workflow_id: workflow_id.to_string(),
activity: "规划中 — 等待 LLM 响应...".to_string(),
});
let call_start = std::time::Instant::now();
let response = match llm.chat_with_tools(messages, &planning_tools).await {
Ok(r) => r,
Err(e) => {
tracing::error!("[workflow {}] LLM call failed: {}", workflow_id, e);
return Err(e);
}
};
let latency_ms = call_start.elapsed().as_millis() as i64;
let (prompt_tokens, completion_tokens) = response.usage.as_ref()
.map(|u| (Some(u.prompt_tokens), Some(u.completion_tokens)))
.unwrap_or((None, None));
let choice = response.choices.into_iter().next()
.ok_or_else(|| anyhow::anyhow!("No response from LLM"))?;
state.current_step_chat_history.push(choice.message.clone());
let llm_text_response = choice.message.content.clone().unwrap_or_default();
if let Some(tool_calls) = &choice.message.tool_calls {
let mut phase_transition = false;
for tc in tool_calls {
if phase_transition {
state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, "(skipped: phase transition)"));
continue;
}
let args: serde_json::Value = serde_json::from_str(&tc.function.arguments).unwrap_or_default();
match tc.function.name.as_str() {
"update_plan" => {
let raw_steps = args["steps"].as_array().cloned().unwrap_or_default();
state.steps.clear();
for (i, item) in raw_steps.iter().enumerate() {
let order = (i + 1) as i32;
let title = item["title"].as_str().unwrap_or(item.as_str().unwrap_or("")).to_string();
let detail = item["description"].as_str().unwrap_or("").to_string();
state.steps.push(Step {
order, title, description: detail,
status: StepStatus::Pending, summary: None,
user_feedbacks: Vec::new(), db_id: String::new(),
artifacts: Vec::new(),
});
}
if let Some(first) = state.steps.first_mut() {
first.status = StepStatus::Running;
}
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
workflow_id: workflow_id.to_string(),
steps: plan_infos_from_state(&state),
});
state.current_step_chat_history.clear();
state.phase = AgentPhase::Executing { step: 1 };
phase_transition = true;
save_state_snapshot(pool, workflow_id, 0, &state).await;
tracing::info!("[workflow {}] Plan set ({} steps), entering Executing", workflow_id, state.steps.len());
}
// Planning phase IO tools
_ => {
let result = execute_tool(&tc.function.name, &tc.function.arguments, workdir, exec).await;
state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
}
}
}
let tc_json: Vec<serde_json::Value> = tool_calls.iter().map(|tc| {
serde_json::json!({ "name": tc.function.name, "arguments_preview": truncate_str(&tc.function.arguments, 200) })
}).collect();
let tc_json_str = serde_json::to_string(&tc_json).unwrap_or_else(|_| "[]".to_string());
log_llm_call(pool, broadcast_tx, workflow_id, 0, "planning", msg_count, tool_count,
&tc_json_str, &llm_text_response, prompt_tokens, completion_tokens, latency_ms).await;
} else {
let content = choice.message.content.as_deref().unwrap_or("(no content)");
tracing::info!("[workflow {}] Planning text response: {}", workflow_id, truncate_str(content, 200));
log_execution(pool, broadcast_tx, workflow_id, 0, "text_response", "", content, "done").await;
log_llm_call(pool, broadcast_tx, workflow_id, 0, "planning", msg_count, tool_count,
"[]", content, prompt_tokens, completion_tokens, latency_ms).await;
}
}
// --- Executing phase: step isolation loop ---
while matches!(state.phase, AgentPhase::Executing { .. }) {
let step_order = match state.first_actionable_step() {
Some(o) => o,
None => {
state.phase = AgentPhase::Completed;
break;
}
};
// Mark step as Running
if let Some(step) = state.steps.iter_mut().find(|s| s.order == step_order) {
step.status = StepStatus::Running;
}
state.phase = AgentPhase::Executing { step: step_order };
state.current_step_chat_history.clear();
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
workflow_id: workflow_id.to_string(),
steps: plan_infos_from_state(&state),
});
save_state_snapshot(pool, workflow_id, step_order, &state).await;
// Build completed summaries for context
let completed_summaries: Vec<(i32, String, String, Vec<Artifact>)> = state.steps.iter()
.filter(|s| matches!(s.status, StepStatus::Done))
.map(|s| (s.order, s.title.clone(), s.summary.clone().unwrap_or_default(), s.artifacts.clone()))
.collect();
let step = state.steps.iter().find(|s| s.order == step_order).unwrap().clone();
tracing::info!("[workflow {}] Starting step {} sub-loop: {}", workflow_id, step_order, step.title);
// Run the isolated step sub-loop
let step_result = run_step_loop(
llm, exec, pool, broadcast_tx,
project_id, workflow_id, workdir, mgr,
instructions, &step, &completed_summaries, &state.scratchpad,
external_tools, event_rx,
).await;
tracing::info!("[workflow {}] Step {} completed: {:?}", workflow_id, step_order, step_result.status);
// Update step status based on result
match &step_result.status {
StepResultStatus::Done => {
if let Some(s) = state.steps.iter_mut().find(|s| s.order == step_order) {
s.status = StepStatus::Done;
s.summary = Some(step_result.summary.clone());
s.artifacts = step_result.artifacts.clone();
}
}
StepResultStatus::Failed { error } => {
if let Some(s) = state.steps.iter_mut().find(|s| s.order == step_order) {
s.status = StepStatus::Failed;
s.summary = Some(step_result.summary.clone());
}
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
workflow_id: workflow_id.to_string(),
steps: plan_infos_from_state(&state),
});
save_state_snapshot(pool, workflow_id, step_order, &state).await;
return Err(anyhow::anyhow!("Step {} failed: {}", step_order, error));
}
StepResultStatus::NeedsApproval { message: _ } => {
// This shouldn't normally happen since wait_for_approval is handled inside
// run_step_loop, but handle gracefully
if let Some(s) = state.steps.iter_mut().find(|s| s.order == step_order) {
s.status = StepStatus::WaitingApproval;
}
save_state_snapshot(pool, workflow_id, step_order, &state).await;
continue;
}
}
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
workflow_id: workflow_id.to_string(),
steps: plan_infos_from_state(&state),
});
save_state_snapshot(pool, workflow_id, step_order, &state).await;
// --- Coordinator review ---
// Check if there are more steps; if not, we're done
if state.first_actionable_step().is_none() {
state.phase = AgentPhase::Completed;
break;
}
// Coordinator LLM reviews the step result and may update the plan
let coordinator_prompt = build_coordinator_prompt(project_id, instructions);
let review_message = format!(
"步骤 {}{}」执行完成。\n\n执行摘要:{}\n\n请审视结果。如需修改后续计划请使用 update_plan否则回复确认继续。",
step_order, step.title, step_result.summary
);
// Build coordinator context with plan overview + scratchpad
let mut coordinator_ctx = String::new();
coordinator_ctx.push_str("## 计划概览\n");
for s in &state.steps {
let marker = match s.status {
StepStatus::Done => " [done]",
StepStatus::Running => " [running]",
StepStatus::WaitingApproval => " [waiting]",
StepStatus::Failed => " [FAILED]",
StepStatus::Pending => "",
};
coordinator_ctx.push_str(&format!("{}. {}{}\n", s.order, s.title, marker));
if let Some(summary) = &s.summary {
coordinator_ctx.push_str(&format!(" 摘要: {}\n", summary));
}
}
if !state.scratchpad.is_empty() {
coordinator_ctx.push_str(&format!("\n## 全局备忘录\n{}\n", state.scratchpad));
}
let coord_messages = vec![
ChatMessage::system(&coordinator_prompt),
ChatMessage::user(&coordinator_ctx),
ChatMessage::user(&review_message),
];
// Add to main chat history for context
state.current_step_chat_history.clear();
tracing::info!("[workflow {}] Coordinator review for step {}", workflow_id, step_order);
let _ = broadcast_tx.send(WsMessage::ActivityUpdate {
workflow_id: workflow_id.to_string(),
activity: format!("步骤 {} 完成 — 协调器审核中...", step_order),
});
let call_start = std::time::Instant::now();
let coord_response = match llm.chat_with_tools(coord_messages.clone(), &coordinator_tools).await {
Ok(r) => r,
Err(e) => {
tracing::warn!("[workflow {}] Coordinator LLM call failed, continuing: {}", workflow_id, e);
continue; // Non-fatal, just skip review
}
};
let latency_ms = call_start.elapsed().as_millis() as i64;
let (prompt_tokens, completion_tokens) = coord_response.usage.as_ref()
.map(|u| (Some(u.prompt_tokens), Some(u.completion_tokens)))
.unwrap_or((None, None));
log_llm_call(pool, broadcast_tx, workflow_id, step_order, "coordinator",
coord_messages.len() as i32, coordinator_tools.len() as i32,
"[]", "", prompt_tokens, completion_tokens, latency_ms).await;
if let Some(choice) = coord_response.choices.into_iter().next() {
if let Some(tool_calls) = &choice.message.tool_calls {
for tc in tool_calls {
let args: serde_json::Value = serde_json::from_str(&tc.function.arguments).unwrap_or_default();
match tc.function.name.as_str() {
"update_plan" => {
let raw_steps = args["steps"].as_array().cloned().unwrap_or_default();
let new_steps: Vec<Step> = raw_steps.iter().enumerate().map(|(i, item)| {
Step {
order: (i + 1) as i32,
title: item["title"].as_str().unwrap_or("").to_string(),
description: item["description"].as_str().unwrap_or("").to_string(),
status: StepStatus::Pending, summary: None,
user_feedbacks: Vec::new(), db_id: String::new(),
artifacts: Vec::new(),
}
}).collect();
state.apply_plan_diff(new_steps);
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
workflow_id: workflow_id.to_string(),
steps: plan_infos_from_state(&state),
});
tracing::info!("[workflow {}] Coordinator revised plan", workflow_id);
save_state_snapshot(pool, workflow_id, step_order, &state).await;
}
"update_scratchpad" => {
let content = args["content"].as_str().unwrap_or("");
let mut new_pad = state.scratchpad.clone();
if !new_pad.is_empty() { new_pad.push('\n'); }
new_pad.push_str(content);
if check_scratchpad_size(&new_pad).is_ok() {
state.scratchpad = new_pad;
}
}
"update_requirement" => {
let new_req = args["requirement"].as_str().unwrap_or("");
let _ = sqlx::query("UPDATE workflows SET requirement = ? WHERE id = ?")
.bind(new_req).bind(workflow_id).execute(pool).await;
let _ = tokio::fs::write(format!("{}/requirement.md", workdir), new_req).await;
let _ = broadcast_tx.send(WsMessage::RequirementUpdate {
workflow_id: workflow_id.to_string(),
requirement: new_req.to_string(),
});
}
_ => {}
}
}
} else {
// Text response — coordinator is satisfied, continue
let content = choice.message.content.as_deref().unwrap_or("");
tracing::info!("[workflow {}] Coordinator: {}", workflow_id, truncate_str(content, 200));
}
}
}
// Final snapshot
save_state_snapshot(pool, workflow_id, state.current_step(), &state).await;
Ok(())
}
async fn generate_report(
llm: &LlmClient,
requirement: &str,
entries: &[crate::db::ExecutionLogEntry],
project_id: &str,
) -> anyhow::Result<String> {
let steps_detail: String = entries
.iter()
.map(|e| {
let output_preview = if e.output.len() > 2000 {
format!("{}...(truncated)", truncate_str(&e.output, 2000))
} else {
e.output.clone()
};
format!(
"### [{}] {} (step {})\nInput: `{}`\nOutput:\n```\n{}\n```\n",
e.status, e.tool_name, e.step_order, truncate_str(&e.tool_input, 500), output_preview
)
})
.collect::<Vec<_>>()
.join("\n");
let system_prompt = include_str!("prompts/report.md")
.replace("{project_id}", project_id);
let user_msg = format!(
"需求:\n{}\n\n执行详情:\n{}",
requirement, steps_detail
);
let report = llm
.chat(vec![
ChatMessage::system(&system_prompt),
ChatMessage::user(&user_msg),
])
.await?;
Ok(report)
}
async fn generate_title(llm: &LlmClient, requirement: &str) -> anyhow::Result<String> {
let response = llm
.chat(vec![
ChatMessage::system("为给定的需求生成一个简短的项目标题最多15个汉字。只回复标题本身不要加任何其他内容。使用中文。"),
ChatMessage::user(requirement),
])
.await?;
let mut title = response.trim().trim_matches('"').to_string();
// Hard limit: if LLM returns garbage, take only the first line, max 80 chars
if let Some(first_line) = title.lines().next() {
title = first_line.to_string();
}
if title.len() > 80 {
title = truncate_str(&title, 80).to_string();
}
Ok(title)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state::{Step, StepStatus};
fn make_step(order: i32, title: &str, desc: &str, status: StepStatus) -> Step {
Step {
order,
title: title.into(),
description: desc.into(),
status,
summary: None,
user_feedbacks: Vec::new(),
db_id: String::new(),
artifacts: Vec::new(),
}
}
// --- build_step_user_message ---
#[test]
fn step_msg_basic() {
let step = make_step(1, "Setup env", "Install dependencies", StepStatus::Running);
let msg = build_step_user_message(&step, &[], "");
assert!(msg.contains("## 当前步骤(步骤 1"));
assert!(msg.contains("标题Setup env"));
assert!(msg.contains("描述Install dependencies"));
// No completed summaries or scratchpad sections
assert!(!msg.contains("已完成步骤摘要"));
assert!(!msg.contains("项目备忘录"));
}
#[test]
fn step_msg_with_completed_summaries() {
let step = make_step(3, "Deploy", "Push to prod", StepStatus::Running);
let summaries = vec![
(1, "Setup".into(), "Installed deps".into(), Vec::new()),
(2, "Build".into(), "Compiled OK".into(), Vec::new()),
];
let msg = build_step_user_message(&step, &summaries, "");
assert!(msg.contains("## 已完成步骤摘要"));
assert!(msg.contains("步骤 1 (Setup): Installed deps"));
assert!(msg.contains("步骤 2 (Build): Compiled OK"));
}
#[test]
fn step_msg_with_parent_scratchpad() {
let step = make_step(2, "Build", "compile", StepStatus::Running);
let msg = build_step_user_message(&step, &[], "DB_HOST=localhost\nDB_PORT=5432");
assert!(msg.contains("## 项目备忘录(只读)"));
assert!(msg.contains("DB_HOST=localhost"));
assert!(msg.contains("DB_PORT=5432"));
}
#[test]
fn step_msg_with_user_feedback() {
let step = Step {
user_feedbacks: vec!["Use Python 3.12".into(), "Skip linting".into()],
..make_step(1, "Setup", "setup env", StepStatus::Running)
};
let msg = build_step_user_message(&step, &[], "");
assert!(msg.contains("用户反馈"));
assert!(msg.contains("- Use Python 3.12"));
assert!(msg.contains("- Skip linting"));
}
#[test]
fn step_msg_full_context() {
let step = Step {
user_feedbacks: vec!["add caching".into()],
..make_step(3, "API", "build REST API", StepStatus::Running)
};
let summaries = vec![
(1, "DB".into(), "Schema created".into(), Vec::new()),
(2, "Models".into(), "ORM models done".into(), Vec::new()),
];
let msg = build_step_user_message(&step, &summaries, "tech_stack=FastAPI");
// All sections present
assert!(msg.contains("## 当前步骤(步骤 3"));
assert!(msg.contains("## 已完成步骤摘要"));
assert!(msg.contains("## 项目备忘录(只读)"));
assert!(msg.contains("用户反馈"));
// Content correct
assert!(msg.contains("build REST API"));
assert!(msg.contains("Schema created"));
assert!(msg.contains("tech_stack=FastAPI"));
assert!(msg.contains("add caching"));
}
// --- truncate_str ---
#[test]
fn truncate_short_noop() {
assert_eq!(truncate_str("hello", 10), "hello");
}
#[test]
fn truncate_exact() {
assert_eq!(truncate_str("hello", 5), "hello");
}
#[test]
fn truncate_cuts() {
assert_eq!(truncate_str("hello world", 5), "hello");
}
#[test]
fn truncate_respects_char_boundary() {
let s = "你好世界"; // each char is 3 bytes
// 7 bytes → should cut to 6 (2 chars)
let t = truncate_str(s, 7);
assert_eq!(t, "你好");
assert_eq!(t.len(), 6);
}
// --- tool definitions ---
#[test]
fn step_tools_have_step_done() {
let tools = build_step_tools();
let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect();
assert!(names.contains(&"step_done"), "step_done must be in step tools");
assert!(!names.contains(&"advance_step"), "advance_step must NOT be in step tools");
assert!(!names.contains(&"update_plan"), "update_plan must NOT be in step tools");
assert!(!names.contains(&"update_requirement"), "update_requirement must NOT be in step tools");
}
#[test]
fn step_tools_have_execution_tools() {
let tools = build_step_tools();
let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect();
for expected in &["execute", "read_file", "write_file", "list_files",
"start_service", "stop_service", "update_scratchpad",
"wait_for_approval", "kb_search", "kb_read"] {
assert!(names.contains(expected), "{} must be in step tools", expected);
}
}
#[test]
fn coordinator_tools_correct() {
let tools = build_coordinator_tools();
let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect();
assert!(names.contains(&"update_plan"));
assert!(names.contains(&"update_scratchpad"));
assert!(names.contains(&"update_requirement"));
// Must NOT have execution tools
assert!(!names.contains(&"execute"));
assert!(!names.contains(&"step_done"));
assert!(!names.contains(&"advance_step"));
}
#[test]
fn planning_tools_correct() {
let tools = build_planning_tools();
let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect();
assert!(names.contains(&"update_plan"));
assert!(names.contains(&"list_files"));
assert!(names.contains(&"read_file"));
assert!(names.contains(&"kb_search"));
assert!(names.contains(&"kb_read"));
assert!(!names.contains(&"execute"));
assert!(!names.contains(&"step_done"));
}
// --- plan_infos_from_state ---
#[test]
fn plan_infos_maps_correctly() {
let state = AgentState {
phase: AgentPhase::Executing { step: 2 },
steps: vec![
Step { status: StepStatus::Done, summary: Some("done".into()),
..make_step(1, "A", "desc A", StepStatus::Done) },
make_step(2, "B", "desc B", StepStatus::Running),
],
current_step_chat_history: Vec::new(),
scratchpad: String::new(),
};
let infos = plan_infos_from_state(&state);
assert_eq!(infos.len(), 2);
assert_eq!(infos[0].order, 1);
assert_eq!(infos[0].description, "A"); // title maps to description field
assert_eq!(infos[0].command, "desc A"); // description maps to command field
assert_eq!(infos[0].status.as_deref(), Some("done"));
assert_eq!(infos[1].status.as_deref(), Some("running"));
}
}