From fa800b160141954e56abd8fc6fa6a7542a87e578 Mon Sep 17 00:00:00 2001 From: Fam Zheng Date: Mon, 9 Mar 2026 12:01:29 +0000 Subject: [PATCH] feat: step artifacts framework - Add Artifact type to Step (name, path, artifact_type, description) - step_done tool accepts optional artifacts parameter - Save artifacts to step_artifacts DB table - Display artifacts in frontend PlanSection (tag style) - Show artifacts in step context for sub-agents and coordinator - Add LLM client retry with exponential backoff --- src/agent.rs | 155 +++++++++++++++++++++++++---- src/db.rs | 22 ++++ src/llm.rs | 85 +++++++++++----- src/prompts/step_execution.md | 1 + src/state.rs | 20 ++++ web/src/components/PlanSection.vue | 29 ++++++ web/src/types.ts | 8 ++ 7 files changed, 273 insertions(+), 47 deletions(-) diff --git a/src/agent.rs b/src/agent.rs index 98688c7..ab40c78 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -12,7 +12,7 @@ use crate::template::{self, LoadedTemplate}; use crate::tools::ExternalToolManager; use crate::LlmConfig; -use crate::state::{AgentState, AgentPhase, Step, StepStatus, StepResult, StepResultStatus, check_scratchpad_size}; +use crate::state::{AgentState, AgentPhase, Artifact, Step, StepStatus, StepResult, StepResultStatus, check_scratchpad_size}; pub struct ServiceInfo { pub port: u16, @@ -47,6 +47,8 @@ pub struct PlanStepInfo { pub command: String, #[serde(skip_serializing_if = "Option::is_none")] pub status: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub artifacts: Vec, } pub fn plan_infos_from_state(state: &AgentState) -> Vec { @@ -63,6 +65,7 @@ pub fn plan_infos_from_state(state: &AgentState) -> Vec { description: s.title.clone(), command: s.description.clone(), status: Some(status.to_string()), + artifacts: s.artifacts.clone(), } }).collect() } @@ -221,6 +224,15 @@ async fn agent_loop( } else { template::select_template(&llm, &requirement, mgr.template_repo.as_ref()).await }; + // Persist template_id to workflow + if let Some(ref tid) = template_id { + let _ = sqlx::query("UPDATE workflows SET template_id = ? WHERE id = ?") + .bind(tid) + .bind(&workflow_id) + .execute(&pool) + .await; + } + let loaded_template = if let Some(ref tid) = template_id { tracing::info!("Template selected for workflow {}: {}", workflow_id, tid); let _ = tokio::fs::create_dir_all(&workdir).await; @@ -394,15 +406,35 @@ async fn agent_loop( .ok() .flatten(); - let state = snapshot + let mut state = snapshot .and_then(|json| serde_json::from_str::(&json).ok()) .unwrap_or_else(AgentState::new); - // Process feedback: LLM decides whether to revise plan - let state = process_feedback( - &llm, &pool, &broadcast_tx, - &project_id, &workflow_id, state, &content, - ).await; + // For failed/done workflows: reset failed steps and continue directly + // For running workflows: process feedback via LLM + let is_resuming = wf.status == "failed" || wf.status == "done"; + if is_resuming { + // Reset Failed steps to Pending so they get re-executed + for step in &mut state.steps { + if matches!(step.status, StepStatus::Failed) { + step.status = StepStatus::Pending; + } + } + // Attach comment as feedback to the first actionable step + if let Some(order) = state.first_actionable_step() { + if let Some(step) = state.steps.iter_mut().find(|s| s.order == order) { + step.user_feedbacks.push(content.clone()); + } + } + tracing::info!("[workflow {}] Resuming from state, first actionable: {:?}", + workflow_id, state.first_actionable_step()); + } else { + // Active workflow: LLM decides whether to revise plan + state = process_feedback( + &llm, &pool, &broadcast_tx, + &project_id, &workflow_id, state, &content, + ).await; + } // If there are actionable steps, resume execution if state.first_actionable_step().is_some() { @@ -418,7 +450,6 @@ async fn agent_loop( .await; // Prepare state for execution: set first pending step to Running - let mut state = state; if let Some(next) = state.first_actionable_step() { if let Some(step) = state.steps.iter_mut().find(|s| s.order == next) { if matches!(step.status, StepStatus::Pending) { @@ -431,12 +462,31 @@ async fn agent_loop( let instructions = read_instructions(&workdir).await; - // Try to detect which template was used (check for tools/ in workdir parent template) - // For comments, we don't re-load the template — external tools are not available in feedback resume + // Reload external tools from template if available + let ext_tools = if !wf.template_id.is_empty() { + let tid = &wf.template_id; + if template::is_repo_template(tid) { + match template::extract_repo_template(tid, mgr.template_repo.as_ref()).await { + Ok(template_dir) => { + LoadedTemplate::load_from_dir(tid, &template_dir).await + .ok().map(|t| t.external_tools) + } + Err(e) => { + tracing::warn!("Failed to reload template {}: {}", tid, e); + None + } + } + } else { + LoadedTemplate::load(tid).await.ok().map(|t| t.external_tools) + } + } else { + None + }; + let result = run_agent_loop( &llm, &exec, &pool, &broadcast_tx, &project_id, &workflow_id, &wf.requirement, &workdir, &mgr, - &instructions, Some(state), None, &mut rx, + &instructions, Some(state), ext_tools.as_ref(), &mut rx, ).await; let final_status = if result.is_ok() { "done" } else { "failed" }; @@ -648,10 +698,24 @@ fn build_step_tools() -> Vec { }, "required": ["reason"] })), - make_tool("step_done", "完成当前步骤。必须提供摘要,概括本步骤做了什么、结果如何。", serde_json::json!({ + make_tool("step_done", "完成当前步骤。必须提供摘要。可选声明本步骤的产出物。", serde_json::json!({ "type": "object", "properties": { - "summary": { "type": "string", "description": "本步骤的工作摘要" } + "summary": { "type": "string", "description": "本步骤的工作摘要" }, + "artifacts": { + "type": "array", + "description": "本步骤的产出物列表", + "items": { + "type": "object", + "properties": { + "name": { "type": "string", "description": "产物名称" }, + "path": { "type": "string", "description": "文件路径(相对工作目录)" }, + "type": { "type": "string", "enum": ["file", "json", "markdown"], "description": "产物类型" }, + "description": { "type": "string", "description": "一句话说明" } + }, + "required": ["name", "path", "type"] + } + } }, "required": ["summary"] })), @@ -688,7 +752,7 @@ fn build_step_execution_prompt(project_id: &str, instructions: &str) -> String { } /// Build user message for a step sub-loop -fn build_step_user_message(step: &Step, completed_summaries: &[(i32, String, String)], parent_scratchpad: &str) -> String { +fn build_step_user_message(step: &Step, completed_summaries: &[(i32, String, String, Vec)], parent_scratchpad: &str) -> String { let mut ctx = String::new(); ctx.push_str(&format!("## 当前步骤(步骤 {})\n", step.order)); @@ -705,8 +769,14 @@ fn build_step_user_message(step: &Step, completed_summaries: &[(i32, String, Str if !completed_summaries.is_empty() { ctx.push_str("## 已完成步骤摘要\n"); - for (order, title, summary) in completed_summaries { + for (order, title, summary, artifacts) in completed_summaries { ctx.push_str(&format!("- 步骤 {} ({}): {}\n", order, title, summary)); + if !artifacts.is_empty() { + let arts: Vec = artifacts.iter() + .map(|a| format!("{} ({})", a.name, a.artifact_type)) + .collect(); + ctx.push_str(&format!(" 产物: {}\n", arts.join(", "))); + } } ctx.push('\n'); } @@ -1022,6 +1092,7 @@ async fn process_feedback( summary: None, user_feedbacks: Vec::new(), db_id: String::new(), + artifacts: Vec::new(), } }).collect(); @@ -1071,7 +1142,7 @@ async fn run_step_loop( mgr: &Arc, instructions: &str, step: &Step, - completed_summaries: &[(i32, String, String)], + completed_summaries: &[(i32, String, String, Vec)], parent_scratchpad: &str, external_tools: Option<&ExternalToolManager>, event_rx: &mut mpsc::Receiver, @@ -1121,6 +1192,7 @@ async fn run_step_loop( tracing::error!("[workflow {}] Step {} LLM call failed: {}", workflow_id, step_order, e); return StepResult { status: StepResultStatus::Failed { error: format!("LLM call failed: {}", e) }, + artifacts: Vec::new(), summary: format!("LLM 调用失败: {}", e), }; } @@ -1137,6 +1209,7 @@ async fn run_step_loop( return StepResult { status: StepResultStatus::Failed { error: "No response from LLM".into() }, summary: "LLM 无响应".into(), + artifacts: Vec::new(), }; } }; @@ -1162,11 +1235,41 @@ async fn run_step_loop( match tc.function.name.as_str() { "step_done" => { let summary = args["summary"].as_str().unwrap_or("").to_string(); + let artifacts: Vec = args.get("artifacts") + .and_then(|v| v.as_array()) + .map(|arr| arr.iter().filter_map(|a| { + Some(Artifact { + name: a["name"].as_str()?.to_string(), + path: a["path"].as_str()?.to_string(), + artifact_type: a["type"].as_str().unwrap_or("file").to_string(), + description: a["description"].as_str().unwrap_or("").to_string(), + }) + }).collect()) + .unwrap_or_default(); + + // Save artifacts to DB + for art in &artifacts { + let art_id = uuid::Uuid::new_v4().to_string(); + let _ = sqlx::query( + "INSERT INTO step_artifacts (id, workflow_id, step_order, name, path, artifact_type, description) VALUES (?, ?, ?, ?, ?, ?, ?)" + ) + .bind(&art_id) + .bind(workflow_id) + .bind(step_order) + .bind(&art.name) + .bind(&art.path) + .bind(&art.artifact_type) + .bind(&art.description) + .execute(pool) + .await; + } + log_execution(pool, broadcast_tx, workflow_id, step_order, "step_done", &summary, "步骤完成", "done").await; step_chat_history.push(ChatMessage::tool_result(&tc.id, "步骤已完成。")); step_done_result = Some(StepResult { status: StepResultStatus::Done, summary, + artifacts, }); } @@ -1222,6 +1325,7 @@ async fn run_step_loop( return StepResult { status: StepResultStatus::Failed { error: "Event channel closed".into() }, summary: "事件通道关闭".into(), + artifacts: Vec::new(), }; } } @@ -1236,6 +1340,7 @@ async fn run_step_loop( step_done_result = Some(StepResult { status: StepResultStatus::Failed { error: format!("用户终止: {}", reason) }, summary: format!("用户终止了执行: {}", reason), + artifacts: Vec::new(), }); continue; } @@ -1437,6 +1542,7 @@ async fn run_step_loop( StepResult { status: StepResultStatus::Failed { error: "步骤迭代次数超限(50轮)".into() }, summary: "步骤执行超过50轮迭代限制,未能完成".into(), + artifacts: Vec::new(), } } @@ -1478,6 +1584,7 @@ async fn plan_infos_from_state_with_override( description: s.title.clone(), command: s.description.clone(), status: Some(status), + artifacts: s.artifacts.clone(), } }).collect(); } @@ -1567,6 +1674,7 @@ async fn run_agent_loop( order, title, description: detail, status: StepStatus::Pending, summary: None, user_feedbacks: Vec::new(), db_id: String::new(), + artifacts: Vec::new(), }); } @@ -1633,9 +1741,9 @@ async fn run_agent_loop( save_state_snapshot(pool, workflow_id, step_order, &state).await; // Build completed summaries for context - let completed_summaries: Vec<(i32, String, String)> = state.steps.iter() + let completed_summaries: Vec<(i32, String, String, Vec)> = state.steps.iter() .filter(|s| matches!(s.status, StepStatus::Done)) - .map(|s| (s.order, s.title.clone(), s.summary.clone().unwrap_or_default())) + .map(|s| (s.order, s.title.clone(), s.summary.clone().unwrap_or_default(), s.artifacts.clone())) .collect(); let step = state.steps.iter().find(|s| s.order == step_order).unwrap().clone(); @@ -1658,6 +1766,7 @@ async fn run_agent_loop( if let Some(s) = state.steps.iter_mut().find(|s| s.order == step_order) { s.status = StepStatus::Done; s.summary = Some(step_result.summary.clone()); + s.artifacts = step_result.artifacts.clone(); } } StepResultStatus::Failed { error } => { @@ -1769,6 +1878,7 @@ async fn run_agent_loop( description: item["description"].as_str().unwrap_or("").to_string(), status: StepStatus::Pending, summary: None, user_feedbacks: Vec::new(), db_id: String::new(), + artifacts: Vec::new(), } }).collect(); @@ -1889,6 +1999,7 @@ mod tests { summary: None, user_feedbacks: Vec::new(), db_id: String::new(), + artifacts: Vec::new(), } } @@ -1911,8 +2022,8 @@ mod tests { fn step_msg_with_completed_summaries() { let step = make_step(3, "Deploy", "Push to prod", StepStatus::Running); let summaries = vec![ - (1, "Setup".into(), "Installed deps".into()), - (2, "Build".into(), "Compiled OK".into()), + (1, "Setup".into(), "Installed deps".into(), Vec::new()), + (2, "Build".into(), "Compiled OK".into(), Vec::new()), ]; let msg = build_step_user_message(&step, &summaries, ""); @@ -1951,8 +2062,8 @@ mod tests { ..make_step(3, "API", "build REST API", StepStatus::Running) }; let summaries = vec![ - (1, "DB".into(), "Schema created".into()), - (2, "Models".into(), "ORM models done".into()), + (1, "DB".into(), "Schema created".into(), Vec::new()), + (2, "Models".into(), "ORM models done".into(), Vec::new()), ]; let msg = build_step_user_message(&step, &summaries, "tech_stack=FastAPI"); diff --git a/src/db.rs b/src/db.rs index d1f3180..657b884 100644 --- a/src/db.rs +++ b/src/db.rs @@ -59,6 +59,13 @@ impl Database { .execute(&self.pool) .await; + // Migration: add template_id column to workflows + let _ = sqlx::query( + "ALTER TABLE workflows ADD COLUMN template_id TEXT NOT NULL DEFAULT ''" + ) + .execute(&self.pool) + .await; + // Migration: add deleted column to projects let _ = sqlx::query( "ALTER TABLE projects ADD COLUMN deleted INTEGER NOT NULL DEFAULT 0" @@ -208,6 +215,20 @@ impl Database { .execute(&self.pool) .await?; + sqlx::query( + "CREATE TABLE IF NOT EXISTS step_artifacts ( + id TEXT PRIMARY KEY, + workflow_id TEXT NOT NULL, + step_order INTEGER NOT NULL, + name TEXT NOT NULL, + path TEXT NOT NULL, + artifact_type TEXT NOT NULL DEFAULT 'file', + description TEXT NOT NULL DEFAULT '' + )" + ) + .execute(&self.pool) + .await?; + Ok(()) } } @@ -231,6 +252,7 @@ pub struct Workflow { pub status: String, pub created_at: String, pub report: String, + pub template_id: String, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] diff --git a/src/llm.rs b/src/llm.rs index a52094f..8b8e49d 100644 --- a/src/llm.rs +++ b/src/llm.rs @@ -93,7 +93,11 @@ pub struct ChatChoice { impl LlmClient { pub fn new(config: &LlmConfig) -> Self { Self { - client: reqwest::Client::new(), + client: reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(120)) + .connect_timeout(std::time::Duration::from_secs(10)) + .build() + .expect("Failed to build HTTP client"), config: config.clone(), } } @@ -106,34 +110,65 @@ impl LlmClient { .unwrap_or_default()) } - /// Chat with tool definitions — returns full response for tool-calling loop + /// Chat with tool definitions — returns full response for tool-calling loop. + /// Retries up to 3 times with exponential backoff on transient errors. pub async fn chat_with_tools(&self, messages: Vec, tools: &[Tool]) -> anyhow::Result { let url = format!("{}/chat/completions", self.config.base_url); - tracing::debug!("LLM request to {} model={} messages={} tools={}", url, self.config.model, messages.len(), tools.len()); - let http_resp = self.client - .post(&url) - .header("Authorization", format!("Bearer {}", self.config.api_key)) - .json(&ChatRequest { - model: self.config.model.clone(), - messages, - tools: tools.to_vec(), - }) - .send() - .await?; + let max_retries = 3u32; + let mut last_err = None; + let tools_vec = tools.to_vec(); - let status = http_resp.status(); - if !status.is_success() { - let body = http_resp.text().await.unwrap_or_default(); - tracing::error!("LLM API error {}: {}", status, &body[..body.len().min(500)]); - anyhow::bail!("LLM API error {}: {}", status, body); + for attempt in 0..max_retries { + if attempt > 0 { + let delay = std::time::Duration::from_secs(2u64.pow(attempt)); + tracing::warn!("LLM retry #{} after {}s", attempt, delay.as_secs()); + tokio::time::sleep(delay).await; + } + + tracing::debug!("LLM request to {} model={} messages={} tools={} attempt={}", url, self.config.model, messages.len(), tools_vec.len(), attempt + 1); + let result = self.client + .post(&url) + .header("Authorization", format!("Bearer {}", self.config.api_key)) + .json(&ChatRequest { + model: self.config.model.clone(), + messages: messages.clone(), + tools: tools_vec.clone(), + }) + .send() + .await; + + let http_resp = match result { + Ok(r) => r, + Err(e) => { + tracing::warn!("LLM request error (attempt {}): {}", attempt + 1, e); + last_err = Some(anyhow::anyhow!("{}", e)); + continue; + } + }; + + let status = http_resp.status(); + if status.is_server_error() || status.as_u16() == 429 { + let body = http_resp.text().await.unwrap_or_default(); + tracing::warn!("LLM API error {} (attempt {}): {}", status, attempt + 1, &body[..body.len().min(200)]); + last_err = Some(anyhow::anyhow!("LLM API error {}: {}", status, body)); + continue; + } + + if !status.is_success() { + let body = http_resp.text().await.unwrap_or_default(); + tracing::error!("LLM API error {}: {}", status, &body[..body.len().min(500)]); + anyhow::bail!("LLM API error {}: {}", status, body); + } + + let body = http_resp.text().await?; + let resp: ChatResponse = serde_json::from_str(&body).map_err(|e| { + tracing::error!("LLM response parse error: {}. Body: {}", e, &body[..body.len().min(500)]); + anyhow::anyhow!("Failed to parse LLM response: {}", e) + })?; + + return Ok(resp); } - let body = http_resp.text().await?; - let resp: ChatResponse = serde_json::from_str(&body).map_err(|e| { - tracing::error!("LLM response parse error: {}. Body: {}", e, &body[..body.len().min(500)]); - anyhow::anyhow!("Failed to parse LLM response: {}", e) - })?; - - Ok(resp) + Err(last_err.unwrap_or_else(|| anyhow::anyhow!("LLM call failed after {} retries", max_retries))) } } diff --git a/src/prompts/step_execution.md b/src/prompts/step_execution.md index 4303ffc..2ca6537 100644 --- a/src/prompts/step_execution.md +++ b/src/prompts/step_execution.md @@ -20,6 +20,7 @@ - **专注当前步骤**,不做超出范围的事 - 完成后**必须**调用 step_done(summary),summary 应简洁概括本步骤做了什么、结果如何 +- 完成步骤时,用 `step_done` 的 `artifacts` 参数声明本步骤产出的文件。每个产出物需要 name、path、type (file/json/markdown) - 需要用户确认时使用 wait_for_approval(reason) - update_scratchpad 用于记录本步骤内的中间状态,是工作记忆而非日志,只保留当前有用的信息 diff --git a/src/state.rs b/src/state.rs index b2365b8..e589599 100644 --- a/src/state.rs +++ b/src/state.rs @@ -8,6 +8,7 @@ use crate::llm::ChatMessage; pub struct StepResult { pub status: StepResultStatus, pub summary: String, + pub artifacts: Vec, } #[derive(Debug, Clone)] @@ -54,6 +55,15 @@ pub enum StepStatus { Failed, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Artifact { + pub name: String, + pub path: String, + pub artifact_type: String, + #[serde(default)] + pub description: String, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Step { pub order: i32, @@ -68,6 +78,9 @@ pub struct Step { pub user_feedbacks: Vec, #[serde(default)] pub db_id: String, + /// 步骤产出物 + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub artifacts: Vec, } // --- Core state --- @@ -203,6 +216,12 @@ impl AgentState { for s in done { let summary = s.summary.as_deref().unwrap_or("(no summary)"); ctx.push_str(&format!("- 步骤 {}: {}\n", s.order, summary)); + if !s.artifacts.is_empty() { + let arts: Vec = s.artifacts.iter() + .map(|a| format!("{} ({})", a.name, a.artifact_type)) + .collect(); + ctx.push_str(&format!(" 产物: {}\n", arts.join(", "))); + } } ctx.push('\n'); } @@ -249,6 +268,7 @@ mod tests { summary: None, user_feedbacks: Vec::new(), db_id: String::new(), + artifacts: Vec::new(), } } diff --git a/web/src/components/PlanSection.vue b/web/src/components/PlanSection.vue index d860dbb..278bf78 100644 --- a/web/src/components/PlanSection.vue +++ b/web/src/components/PlanSection.vue @@ -59,6 +59,11 @@ function quoteStep(e: Event, step: PlanStepInfo) {
{{ step.command }}
+
+ + 📄 {{ a.name }} {{ a.artifact_type }} + +
AI 将在这里展示执行计划 @@ -188,6 +193,30 @@ function quoteStep(e: Event, step: PlanStepInfo) { border-top: 1px solid var(--border); } +.step-artifacts { + padding: 4px 10px 8px 44px; + display: flex; + flex-wrap: wrap; + gap: 4px; +} + +.artifact-tag { + display: inline-flex; + align-items: center; + gap: 4px; + font-size: 11px; + color: var(--text-secondary); + background: var(--bg-tertiary); + padding: 2px 8px; + border-radius: 4px; +} + +.artifact-type { + font-size: 10px; + color: var(--accent); + opacity: 0.8; +} + .empty-state { color: var(--text-secondary); font-size: 13px; diff --git a/web/src/types.ts b/web/src/types.ts index d2ba5e2..7d1f8d4 100644 --- a/web/src/types.ts +++ b/web/src/types.ts @@ -26,11 +26,19 @@ export interface ExecutionLogEntry { created_at: string } +export interface StepArtifact { + name: string + path: string + artifact_type: string + description: string +} + export interface PlanStepInfo { order: number description: string command: string status?: 'pending' | 'running' | 'done' | 'failed' + artifacts?: StepArtifact[] } export interface Comment {