feat: step artifacts framework

- Add Artifact type to Step (name, path, artifact_type, description)
- step_done tool accepts optional artifacts parameter
- Save artifacts to step_artifacts DB table
- Display artifacts in frontend PlanSection (tag style)
- Show artifacts in step context for sub-agents and coordinator
- Add LLM client retry with exponential backoff
This commit is contained in:
Fam Zheng 2026-03-09 12:01:29 +00:00
parent 29f026e383
commit fa800b1601
7 changed files with 273 additions and 47 deletions

View File

@ -12,7 +12,7 @@ use crate::template::{self, LoadedTemplate};
use crate::tools::ExternalToolManager;
use crate::LlmConfig;
use crate::state::{AgentState, AgentPhase, Step, StepStatus, StepResult, StepResultStatus, check_scratchpad_size};
use crate::state::{AgentState, AgentPhase, Artifact, Step, StepStatus, StepResult, StepResultStatus, check_scratchpad_size};
pub struct ServiceInfo {
pub port: u16,
@ -47,6 +47,8 @@ pub struct PlanStepInfo {
pub command: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub artifacts: Vec<Artifact>,
}
pub fn plan_infos_from_state(state: &AgentState) -> Vec<PlanStepInfo> {
@ -63,6 +65,7 @@ pub fn plan_infos_from_state(state: &AgentState) -> Vec<PlanStepInfo> {
description: s.title.clone(),
command: s.description.clone(),
status: Some(status.to_string()),
artifacts: s.artifacts.clone(),
}
}).collect()
}
@ -221,6 +224,15 @@ async fn agent_loop(
} else {
template::select_template(&llm, &requirement, mgr.template_repo.as_ref()).await
};
// Persist template_id to workflow
if let Some(ref tid) = template_id {
let _ = sqlx::query("UPDATE workflows SET template_id = ? WHERE id = ?")
.bind(tid)
.bind(&workflow_id)
.execute(&pool)
.await;
}
let loaded_template = if let Some(ref tid) = template_id {
tracing::info!("Template selected for workflow {}: {}", workflow_id, tid);
let _ = tokio::fs::create_dir_all(&workdir).await;
@ -394,15 +406,35 @@ async fn agent_loop(
.ok()
.flatten();
let state = snapshot
let mut state = snapshot
.and_then(|json| serde_json::from_str::<AgentState>(&json).ok())
.unwrap_or_else(AgentState::new);
// Process feedback: LLM decides whether to revise plan
let state = process_feedback(
&llm, &pool, &broadcast_tx,
&project_id, &workflow_id, state, &content,
).await;
// For failed/done workflows: reset failed steps and continue directly
// For running workflows: process feedback via LLM
let is_resuming = wf.status == "failed" || wf.status == "done";
if is_resuming {
// Reset Failed steps to Pending so they get re-executed
for step in &mut state.steps {
if matches!(step.status, StepStatus::Failed) {
step.status = StepStatus::Pending;
}
}
// Attach comment as feedback to the first actionable step
if let Some(order) = state.first_actionable_step() {
if let Some(step) = state.steps.iter_mut().find(|s| s.order == order) {
step.user_feedbacks.push(content.clone());
}
}
tracing::info!("[workflow {}] Resuming from state, first actionable: {:?}",
workflow_id, state.first_actionable_step());
} else {
// Active workflow: LLM decides whether to revise plan
state = process_feedback(
&llm, &pool, &broadcast_tx,
&project_id, &workflow_id, state, &content,
).await;
}
// If there are actionable steps, resume execution
if state.first_actionable_step().is_some() {
@ -418,7 +450,6 @@ async fn agent_loop(
.await;
// Prepare state for execution: set first pending step to Running
let mut state = state;
if let Some(next) = state.first_actionable_step() {
if let Some(step) = state.steps.iter_mut().find(|s| s.order == next) {
if matches!(step.status, StepStatus::Pending) {
@ -431,12 +462,31 @@ async fn agent_loop(
let instructions = read_instructions(&workdir).await;
// Try to detect which template was used (check for tools/ in workdir parent template)
// For comments, we don't re-load the template — external tools are not available in feedback resume
// Reload external tools from template if available
let ext_tools = if !wf.template_id.is_empty() {
let tid = &wf.template_id;
if template::is_repo_template(tid) {
match template::extract_repo_template(tid, mgr.template_repo.as_ref()).await {
Ok(template_dir) => {
LoadedTemplate::load_from_dir(tid, &template_dir).await
.ok().map(|t| t.external_tools)
}
Err(e) => {
tracing::warn!("Failed to reload template {}: {}", tid, e);
None
}
}
} else {
LoadedTemplate::load(tid).await.ok().map(|t| t.external_tools)
}
} else {
None
};
let result = run_agent_loop(
&llm, &exec, &pool, &broadcast_tx,
&project_id, &workflow_id, &wf.requirement, &workdir, &mgr,
&instructions, Some(state), None, &mut rx,
&instructions, Some(state), ext_tools.as_ref(), &mut rx,
).await;
let final_status = if result.is_ok() { "done" } else { "failed" };
@ -648,10 +698,24 @@ fn build_step_tools() -> Vec<Tool> {
},
"required": ["reason"]
})),
make_tool("step_done", "完成当前步骤。必须提供摘要,概括本步骤做了什么、结果如何", serde_json::json!({
make_tool("step_done", "完成当前步骤。必须提供摘要。可选声明本步骤的产出物", serde_json::json!({
"type": "object",
"properties": {
"summary": { "type": "string", "description": "本步骤的工作摘要" }
"summary": { "type": "string", "description": "本步骤的工作摘要" },
"artifacts": {
"type": "array",
"description": "本步骤的产出物列表",
"items": {
"type": "object",
"properties": {
"name": { "type": "string", "description": "产物名称" },
"path": { "type": "string", "description": "文件路径(相对工作目录)" },
"type": { "type": "string", "enum": ["file", "json", "markdown"], "description": "产物类型" },
"description": { "type": "string", "description": "一句话说明" }
},
"required": ["name", "path", "type"]
}
}
},
"required": ["summary"]
})),
@ -688,7 +752,7 @@ fn build_step_execution_prompt(project_id: &str, instructions: &str) -> String {
}
/// Build user message for a step sub-loop
fn build_step_user_message(step: &Step, completed_summaries: &[(i32, String, String)], parent_scratchpad: &str) -> String {
fn build_step_user_message(step: &Step, completed_summaries: &[(i32, String, String, Vec<Artifact>)], parent_scratchpad: &str) -> String {
let mut ctx = String::new();
ctx.push_str(&format!("## 当前步骤(步骤 {}\n", step.order));
@ -705,8 +769,14 @@ fn build_step_user_message(step: &Step, completed_summaries: &[(i32, String, Str
if !completed_summaries.is_empty() {
ctx.push_str("## 已完成步骤摘要\n");
for (order, title, summary) in completed_summaries {
for (order, title, summary, artifacts) in completed_summaries {
ctx.push_str(&format!("- 步骤 {} ({}): {}\n", order, title, summary));
if !artifacts.is_empty() {
let arts: Vec<String> = artifacts.iter()
.map(|a| format!("{} ({})", a.name, a.artifact_type))
.collect();
ctx.push_str(&format!(" 产物: {}\n", arts.join(", ")));
}
}
ctx.push('\n');
}
@ -1022,6 +1092,7 @@ async fn process_feedback(
summary: None,
user_feedbacks: Vec::new(),
db_id: String::new(),
artifacts: Vec::new(),
}
}).collect();
@ -1071,7 +1142,7 @@ async fn run_step_loop(
mgr: &Arc<AgentManager>,
instructions: &str,
step: &Step,
completed_summaries: &[(i32, String, String)],
completed_summaries: &[(i32, String, String, Vec<Artifact>)],
parent_scratchpad: &str,
external_tools: Option<&ExternalToolManager>,
event_rx: &mut mpsc::Receiver<AgentEvent>,
@ -1121,6 +1192,7 @@ async fn run_step_loop(
tracing::error!("[workflow {}] Step {} LLM call failed: {}", workflow_id, step_order, e);
return StepResult {
status: StepResultStatus::Failed { error: format!("LLM call failed: {}", e) },
artifacts: Vec::new(),
summary: format!("LLM 调用失败: {}", e),
};
}
@ -1137,6 +1209,7 @@ async fn run_step_loop(
return StepResult {
status: StepResultStatus::Failed { error: "No response from LLM".into() },
summary: "LLM 无响应".into(),
artifacts: Vec::new(),
};
}
};
@ -1162,11 +1235,41 @@ async fn run_step_loop(
match tc.function.name.as_str() {
"step_done" => {
let summary = args["summary"].as_str().unwrap_or("").to_string();
let artifacts: Vec<Artifact> = args.get("artifacts")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|a| {
Some(Artifact {
name: a["name"].as_str()?.to_string(),
path: a["path"].as_str()?.to_string(),
artifact_type: a["type"].as_str().unwrap_or("file").to_string(),
description: a["description"].as_str().unwrap_or("").to_string(),
})
}).collect())
.unwrap_or_default();
// Save artifacts to DB
for art in &artifacts {
let art_id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT INTO step_artifacts (id, workflow_id, step_order, name, path, artifact_type, description) VALUES (?, ?, ?, ?, ?, ?, ?)"
)
.bind(&art_id)
.bind(workflow_id)
.bind(step_order)
.bind(&art.name)
.bind(&art.path)
.bind(&art.artifact_type)
.bind(&art.description)
.execute(pool)
.await;
}
log_execution(pool, broadcast_tx, workflow_id, step_order, "step_done", &summary, "步骤完成", "done").await;
step_chat_history.push(ChatMessage::tool_result(&tc.id, "步骤已完成。"));
step_done_result = Some(StepResult {
status: StepResultStatus::Done,
summary,
artifacts,
});
}
@ -1222,6 +1325,7 @@ async fn run_step_loop(
return StepResult {
status: StepResultStatus::Failed { error: "Event channel closed".into() },
summary: "事件通道关闭".into(),
artifacts: Vec::new(),
};
}
}
@ -1236,6 +1340,7 @@ async fn run_step_loop(
step_done_result = Some(StepResult {
status: StepResultStatus::Failed { error: format!("用户终止: {}", reason) },
summary: format!("用户终止了执行: {}", reason),
artifacts: Vec::new(),
});
continue;
}
@ -1437,6 +1542,7 @@ async fn run_step_loop(
StepResult {
status: StepResultStatus::Failed { error: "步骤迭代次数超限50轮".into() },
summary: "步骤执行超过50轮迭代限制未能完成".into(),
artifacts: Vec::new(),
}
}
@ -1478,6 +1584,7 @@ async fn plan_infos_from_state_with_override(
description: s.title.clone(),
command: s.description.clone(),
status: Some(status),
artifacts: s.artifacts.clone(),
}
}).collect();
}
@ -1567,6 +1674,7 @@ async fn run_agent_loop(
order, title, description: detail,
status: StepStatus::Pending, summary: None,
user_feedbacks: Vec::new(), db_id: String::new(),
artifacts: Vec::new(),
});
}
@ -1633,9 +1741,9 @@ async fn run_agent_loop(
save_state_snapshot(pool, workflow_id, step_order, &state).await;
// Build completed summaries for context
let completed_summaries: Vec<(i32, String, String)> = state.steps.iter()
let completed_summaries: Vec<(i32, String, String, Vec<Artifact>)> = state.steps.iter()
.filter(|s| matches!(s.status, StepStatus::Done))
.map(|s| (s.order, s.title.clone(), s.summary.clone().unwrap_or_default()))
.map(|s| (s.order, s.title.clone(), s.summary.clone().unwrap_or_default(), s.artifacts.clone()))
.collect();
let step = state.steps.iter().find(|s| s.order == step_order).unwrap().clone();
@ -1658,6 +1766,7 @@ async fn run_agent_loop(
if let Some(s) = state.steps.iter_mut().find(|s| s.order == step_order) {
s.status = StepStatus::Done;
s.summary = Some(step_result.summary.clone());
s.artifacts = step_result.artifacts.clone();
}
}
StepResultStatus::Failed { error } => {
@ -1769,6 +1878,7 @@ async fn run_agent_loop(
description: item["description"].as_str().unwrap_or("").to_string(),
status: StepStatus::Pending, summary: None,
user_feedbacks: Vec::new(), db_id: String::new(),
artifacts: Vec::new(),
}
}).collect();
@ -1889,6 +1999,7 @@ mod tests {
summary: None,
user_feedbacks: Vec::new(),
db_id: String::new(),
artifacts: Vec::new(),
}
}
@ -1911,8 +2022,8 @@ mod tests {
fn step_msg_with_completed_summaries() {
let step = make_step(3, "Deploy", "Push to prod", StepStatus::Running);
let summaries = vec![
(1, "Setup".into(), "Installed deps".into()),
(2, "Build".into(), "Compiled OK".into()),
(1, "Setup".into(), "Installed deps".into(), Vec::new()),
(2, "Build".into(), "Compiled OK".into(), Vec::new()),
];
let msg = build_step_user_message(&step, &summaries, "");
@ -1951,8 +2062,8 @@ mod tests {
..make_step(3, "API", "build REST API", StepStatus::Running)
};
let summaries = vec![
(1, "DB".into(), "Schema created".into()),
(2, "Models".into(), "ORM models done".into()),
(1, "DB".into(), "Schema created".into(), Vec::new()),
(2, "Models".into(), "ORM models done".into(), Vec::new()),
];
let msg = build_step_user_message(&step, &summaries, "tech_stack=FastAPI");

View File

@ -59,6 +59,13 @@ impl Database {
.execute(&self.pool)
.await;
// Migration: add template_id column to workflows
let _ = sqlx::query(
"ALTER TABLE workflows ADD COLUMN template_id TEXT NOT NULL DEFAULT ''"
)
.execute(&self.pool)
.await;
// Migration: add deleted column to projects
let _ = sqlx::query(
"ALTER TABLE projects ADD COLUMN deleted INTEGER NOT NULL DEFAULT 0"
@ -208,6 +215,20 @@ impl Database {
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS step_artifacts (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL,
step_order INTEGER NOT NULL,
name TEXT NOT NULL,
path TEXT NOT NULL,
artifact_type TEXT NOT NULL DEFAULT 'file',
description TEXT NOT NULL DEFAULT ''
)"
)
.execute(&self.pool)
.await?;
Ok(())
}
}
@ -231,6 +252,7 @@ pub struct Workflow {
pub status: String,
pub created_at: String,
pub report: String,
pub template_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]

View File

@ -93,7 +93,11 @@ pub struct ChatChoice {
impl LlmClient {
pub fn new(config: &LlmConfig) -> Self {
Self {
client: reqwest::Client::new(),
client: reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.connect_timeout(std::time::Duration::from_secs(10))
.build()
.expect("Failed to build HTTP client"),
config: config.clone(),
}
}
@ -106,34 +110,65 @@ impl LlmClient {
.unwrap_or_default())
}
/// Chat with tool definitions — returns full response for tool-calling loop
/// Chat with tool definitions — returns full response for tool-calling loop.
/// Retries up to 3 times with exponential backoff on transient errors.
pub async fn chat_with_tools(&self, messages: Vec<ChatMessage>, tools: &[Tool]) -> anyhow::Result<ChatResponse> {
let url = format!("{}/chat/completions", self.config.base_url);
tracing::debug!("LLM request to {} model={} messages={} tools={}", url, self.config.model, messages.len(), tools.len());
let http_resp = self.client
.post(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.json(&ChatRequest {
model: self.config.model.clone(),
messages,
tools: tools.to_vec(),
})
.send()
.await?;
let max_retries = 3u32;
let mut last_err = None;
let tools_vec = tools.to_vec();
let status = http_resp.status();
if !status.is_success() {
let body = http_resp.text().await.unwrap_or_default();
tracing::error!("LLM API error {}: {}", status, &body[..body.len().min(500)]);
anyhow::bail!("LLM API error {}: {}", status, body);
for attempt in 0..max_retries {
if attempt > 0 {
let delay = std::time::Duration::from_secs(2u64.pow(attempt));
tracing::warn!("LLM retry #{} after {}s", attempt, delay.as_secs());
tokio::time::sleep(delay).await;
}
tracing::debug!("LLM request to {} model={} messages={} tools={} attempt={}", url, self.config.model, messages.len(), tools_vec.len(), attempt + 1);
let result = self.client
.post(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.json(&ChatRequest {
model: self.config.model.clone(),
messages: messages.clone(),
tools: tools_vec.clone(),
})
.send()
.await;
let http_resp = match result {
Ok(r) => r,
Err(e) => {
tracing::warn!("LLM request error (attempt {}): {}", attempt + 1, e);
last_err = Some(anyhow::anyhow!("{}", e));
continue;
}
};
let status = http_resp.status();
if status.is_server_error() || status.as_u16() == 429 {
let body = http_resp.text().await.unwrap_or_default();
tracing::warn!("LLM API error {} (attempt {}): {}", status, attempt + 1, &body[..body.len().min(200)]);
last_err = Some(anyhow::anyhow!("LLM API error {}: {}", status, body));
continue;
}
if !status.is_success() {
let body = http_resp.text().await.unwrap_or_default();
tracing::error!("LLM API error {}: {}", status, &body[..body.len().min(500)]);
anyhow::bail!("LLM API error {}: {}", status, body);
}
let body = http_resp.text().await?;
let resp: ChatResponse = serde_json::from_str(&body).map_err(|e| {
tracing::error!("LLM response parse error: {}. Body: {}", e, &body[..body.len().min(500)]);
anyhow::anyhow!("Failed to parse LLM response: {}", e)
})?;
return Ok(resp);
}
let body = http_resp.text().await?;
let resp: ChatResponse = serde_json::from_str(&body).map_err(|e| {
tracing::error!("LLM response parse error: {}. Body: {}", e, &body[..body.len().min(500)]);
anyhow::anyhow!("Failed to parse LLM response: {}", e)
})?;
Ok(resp)
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("LLM call failed after {} retries", max_retries)))
}
}

View File

@ -20,6 +20,7 @@
- **专注当前步骤**,不做超出范围的事
- 完成后**必须**调用 step_done(summary)summary 应简洁概括本步骤做了什么、结果如何
- 完成步骤时,用 `step_done``artifacts` 参数声明本步骤产出的文件。每个产出物需要 name、path、type (file/json/markdown)
- 需要用户确认时使用 wait_for_approval(reason)
- update_scratchpad 用于记录本步骤内的中间状态,是工作记忆而非日志,只保留当前有用的信息

View File

@ -8,6 +8,7 @@ use crate::llm::ChatMessage;
pub struct StepResult {
pub status: StepResultStatus,
pub summary: String,
pub artifacts: Vec<Artifact>,
}
#[derive(Debug, Clone)]
@ -54,6 +55,15 @@ pub enum StepStatus {
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Artifact {
pub name: String,
pub path: String,
pub artifact_type: String,
#[serde(default)]
pub description: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Step {
pub order: i32,
@ -68,6 +78,9 @@ pub struct Step {
pub user_feedbacks: Vec<String>,
#[serde(default)]
pub db_id: String,
/// 步骤产出物
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub artifacts: Vec<Artifact>,
}
// --- Core state ---
@ -203,6 +216,12 @@ impl AgentState {
for s in done {
let summary = s.summary.as_deref().unwrap_or("(no summary)");
ctx.push_str(&format!("- 步骤 {}: {}\n", s.order, summary));
if !s.artifacts.is_empty() {
let arts: Vec<String> = s.artifacts.iter()
.map(|a| format!("{} ({})", a.name, a.artifact_type))
.collect();
ctx.push_str(&format!(" 产物: {}\n", arts.join(", ")));
}
}
ctx.push('\n');
}
@ -249,6 +268,7 @@ mod tests {
summary: None,
user_feedbacks: Vec::new(),
db_id: String::new(),
artifacts: Vec::new(),
}
}

View File

@ -59,6 +59,11 @@ function quoteStep(e: Event, step: PlanStepInfo) {
<div v-if="step.command && expandedSteps.has(step.order)" class="step-detail">
{{ step.command }}
</div>
<div v-if="step.artifacts?.length" class="step-artifacts">
<span v-for="a in step.artifacts" :key="a.path" class="artifact-tag">
📄 {{ a.name }} <span class="artifact-type">{{ a.artifact_type }}</span>
</span>
</div>
</div>
<div v-if="!steps.length" class="empty-state">
AI 将在这里展示执行计划
@ -188,6 +193,30 @@ function quoteStep(e: Event, step: PlanStepInfo) {
border-top: 1px solid var(--border);
}
.step-artifacts {
padding: 4px 10px 8px 44px;
display: flex;
flex-wrap: wrap;
gap: 4px;
}
.artifact-tag {
display: inline-flex;
align-items: center;
gap: 4px;
font-size: 11px;
color: var(--text-secondary);
background: var(--bg-tertiary);
padding: 2px 8px;
border-radius: 4px;
}
.artifact-type {
font-size: 10px;
color: var(--accent);
opacity: 0.8;
}
.empty-state {
color: var(--text-secondary);
font-size: 13px;

View File

@ -26,11 +26,19 @@ export interface ExecutionLogEntry {
created_at: string
}
export interface StepArtifact {
name: string
path: string
artifact_type: string
description: string
}
export interface PlanStepInfo {
order: number
description: string
command: string
status?: 'pending' | 'running' | 'done' | 'failed'
artifacts?: StepArtifact[]
}
export interface Comment {