diff --git a/src/agent.rs b/src/agent.rs index 00f93bf..befe43e 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -158,15 +158,20 @@ impl AgentManager { tracing::info!("Workflow {} dispatched to worker '{}'", workflow_id, name); } Err(e) => { - tracing::error!("Failed to dispatch workflow {}: {}", workflow_id, e); - let _ = sqlx::query("UPDATE workflows SET status = 'failed' WHERE id = ?") - .bind(&workflow_id).execute(&self.pool).await; - let _ = btx.send(WsMessage::WorkflowStatusUpdate { - workflow_id, - status: "failed".into(), + let reason = format!("调度失败: {}", e); + tracing::error!("Failed to dispatch workflow {}: {}", workflow_id, reason); + let _ = sqlx::query("UPDATE workflows SET status = 'failed', status_reason = ? WHERE id = ?") + .bind(&reason).bind(&workflow_id).execute(&self.pool).await; + // Log to execution_log so frontend can show the reason + let log_id = uuid::Uuid::new_v4().to_string(); + let _ = sqlx::query( + "INSERT INTO execution_log (id, workflow_id, step_order, tool_name, tool_input, output, status, created_at) VALUES (?, ?, 0, 'system', 'dispatch', ?, 'failed', datetime('now'))" + ).bind(&log_id).bind(&workflow_id).bind(&reason).execute(&self.pool).await; + let _ = btx.send(WsMessage::StepStatusUpdate { + step_id: log_id, status: "failed".into(), output: reason, }); - let _ = btx.send(WsMessage::Error { - message: format!("No worker available: {}", e), + let _ = btx.send(WsMessage::WorkflowStatusUpdate { + workflow_id, status: "failed".into(), }); } } @@ -745,6 +750,7 @@ pub async fn run_step_loop( let _ = update_tx.send(AgentUpdate::WorkflowStatus { workflow_id: workflow_id.to_string(), status: "waiting_user".into(), + reason: String::new(), }).await; send_execution(update_tx, workflow_id, step_order, "ask_user", reason, reason, "waiting").await; @@ -789,6 +795,7 @@ pub async fn run_step_loop( let _ = update_tx.send(AgentUpdate::WorkflowStatus { workflow_id: workflow_id.to_string(), status: "executing".into(), + reason: String::new(), }).await; let tool_msg = if feedback.is_empty() { @@ -1084,6 +1091,7 @@ pub async fn run_agent_loop( let _ = update_tx.send(AgentUpdate::WorkflowStatus { workflow_id: workflow_id.to_string(), status: "waiting_user".into(), + reason: String::new(), }).await; send_execution(update_tx, workflow_id, 0, "plan_approval", "等待确认计划", "等待用户确认执行计划", "waiting").await; @@ -1114,6 +1122,7 @@ pub async fn run_agent_loop( let _ = update_tx.send(AgentUpdate::WorkflowStatus { workflow_id: workflow_id.to_string(), status: "executing".into(), + reason: String::new(), }).await; // Stay in Planning phase, continue the loop continue; @@ -1130,6 +1139,7 @@ pub async fn run_agent_loop( let _ = update_tx.send(AgentUpdate::WorkflowStatus { workflow_id: workflow_id.to_string(), status: "executing".into(), + reason: String::new(), }).await; } diff --git a/src/db.rs b/src/db.rs index 54922e0..efe42b4 100644 --- a/src/db.rs +++ b/src/db.rs @@ -322,6 +322,16 @@ impl Database { } } + // Migration: add status_reason to workflows + let has_status_reason: bool = sqlx::query_scalar::<_, i32>( + "SELECT COUNT(*) FROM pragma_table_info('workflows') WHERE name='status_reason'" + ).fetch_one(&self.pool).await.unwrap_or(0) > 0; + if !has_status_reason { + let _ = sqlx::query( + "ALTER TABLE workflows ADD COLUMN status_reason TEXT NOT NULL DEFAULT ''" + ).execute(&self.pool).await; + } + Ok(()) } } @@ -348,6 +358,8 @@ pub struct Workflow { pub created_at: String, pub report: String, pub template_id: String, + #[serde(default)] + pub status_reason: String, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] diff --git a/src/sink.rs b/src/sink.rs index 2c94634..dec8e53 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -14,12 +14,12 @@ use crate::state::{AgentState, Artifact}; #[serde(tag = "kind")] pub enum AgentUpdate { PlanUpdate { workflow_id: String, steps: Vec }, - WorkflowStatus { workflow_id: String, status: String }, + WorkflowStatus { workflow_id: String, status: String, #[serde(default)] reason: String }, Activity { workflow_id: String, activity: String }, ExecutionLog { workflow_id: String, step_order: i32, tool_name: String, tool_input: String, output: String, status: String }, LlmCallLog { workflow_id: String, step_order: i32, phase: String, messages_count: i32, tools_count: i32, tool_calls: String, text_response: String, prompt_tokens: Option, completion_tokens: Option, latency_ms: i64 }, StateSnapshot { workflow_id: String, step_order: i32, state: AgentState }, - WorkflowComplete { workflow_id: String, status: String }, + WorkflowComplete { workflow_id: String, status: String, #[serde(default)] reason: String }, ArtifactSave { workflow_id: String, step_order: i32, artifact: Artifact }, RequirementUpdate { workflow_id: String, requirement: String }, /// base64-encoded file content @@ -61,9 +61,9 @@ pub async fn handle_single_update( AgentUpdate::PlanUpdate { workflow_id, steps } => { bcast(broadcast_tx, WsMessage::PlanUpdate { workflow_id: workflow_id.clone(), steps: steps.clone() }); } - AgentUpdate::WorkflowStatus { workflow_id, status } => { - let _ = sqlx::query("UPDATE workflows SET status = ? WHERE id = ?") - .bind(status).bind(workflow_id).execute(pool).await; + AgentUpdate::WorkflowStatus { workflow_id, status, reason } => { + let _ = sqlx::query("UPDATE workflows SET status = ?, status_reason = ? WHERE id = ?") + .bind(status).bind(reason).bind(workflow_id).execute(pool).await; bcast(broadcast_tx, WsMessage::WorkflowStatusUpdate { workflow_id: workflow_id.clone(), status: status.clone() }); } AgentUpdate::Activity { workflow_id, activity } => { @@ -100,9 +100,9 @@ pub async fn handle_single_update( "INSERT INTO agent_state_snapshots (id, workflow_id, step_order, state_json, created_at) VALUES (?, ?, ?, ?, datetime('now'))" ).bind(&id).bind(workflow_id).bind(step_order).bind(&json).execute(pool).await; } - AgentUpdate::WorkflowComplete { workflow_id, status } => { - let _ = sqlx::query("UPDATE workflows SET status = ? WHERE id = ?") - .bind(status).bind(workflow_id).execute(pool).await; + AgentUpdate::WorkflowComplete { workflow_id, status, reason } => { + let _ = sqlx::query("UPDATE workflows SET status = ?, status_reason = ? WHERE id = ?") + .bind(status).bind(reason).bind(workflow_id).execute(pool).await; bcast(broadcast_tx, WsMessage::WorkflowStatusUpdate { workflow_id: workflow_id.clone(), status: status.clone() }); } AgentUpdate::ArtifactSave { workflow_id, step_order, artifact } => { diff --git a/src/worker.rs b/src/worker.rs index 0766246..878cb50 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -163,4 +163,13 @@ impl WorkerManager { pub async fn complete_workflow(&self, workflow_id: &str) { self.assignments.write().await.remove(workflow_id); } + + /// List all workflows assigned to a worker. + pub async fn assignments_for_worker(&self, worker_name: &str) -> Vec { + self.assignments.read().await + .iter() + .filter(|(_, w)| w.as_str() == worker_name) + .map(|(wf_id, _)| wf_id.clone()) + .collect() + } } diff --git a/src/worker_runner.rs b/src/worker_runner.rs index d4adc93..03cbd7d 100644 --- a/src/worker_runner.rs +++ b/src/worker_runner.rs @@ -194,9 +194,11 @@ async fn connect_and_run(server_url: &str, worker_name: &str, llm_config: &crate // Sync all workspace files to server sync_workspace(&update_tx, &project_id, &workdir).await; + let reason = if let Err(ref e) = result { format!("{}", e) } else { String::new() }; let _ = update_tx.send(AgentUpdate::WorkflowComplete { workflow_id: workflow_id.clone(), status: final_status.into(), + reason, }).await; *comment_tx.lock().await = None; diff --git a/src/ws_worker.rs b/src/ws_worker.rs index 0901fb8..a709ab7 100644 --- a/src/ws_worker.rs +++ b/src/ws_worker.rs @@ -111,6 +111,23 @@ async fn handle_worker_socket(socket: WebSocket, state: Arc) { _ = recv_task => {}, } + // Log reason for any orphaned workflows before cleanup + let orphan_workflows: Vec = { + let assignments = mgr_for_cleanup.assignments_for_worker(&name_clone).await; + assignments + }; + if !orphan_workflows.is_empty() { + let reason = format!("Worker '{}' 断开连接", name_clone); + for wf_id in &orphan_workflows { + let _ = sqlx::query("UPDATE workflows SET status = 'failed', status_reason = ? WHERE id = ? AND status IN ('executing', 'planning')") + .bind(&reason).bind(wf_id).execute(&state.pool).await; + let log_id = uuid::Uuid::new_v4().to_string(); + let _ = sqlx::query( + "INSERT INTO execution_log (id, workflow_id, step_order, tool_name, tool_input, output, status, created_at) VALUES (?, ?, 0, 'system', 'worker_disconnect', ?, 'failed', datetime('now'))" + ).bind(&log_id).bind(wf_id).bind(&reason).execute(&state.pool).await; + tracing::warn!("Workflow {} orphaned: {}", wf_id, reason); + } + } mgr_for_cleanup.unregister(&name_clone).await; }