Compare commits

...

10 Commits

Author SHA1 Message Date
e142ec2dd4 fix: remove duplicate listWorkers in api.ts 2026-04-07 10:00:49 +01:00
49a13d8f50 feat: optional worker selection when creating workflow (API + frontend dropdown) 2026-04-07 09:59:49 +01:00
2cb9d9321e fix: comment on completed workflow resets to Planning phase with feedback 2026-04-06 22:56:35 +01:00
f70328be0b feat: inject user comments into agent conversation in real-time (try_recv between LLM calls) 2026-04-06 22:01:28 +01:00
409bc89284 fix: robust worker interaction
- Text heartbeat every 30s (not ping frames) to survive Cloudflare/proxy
- Comment on completed/failed workflow re-dispatches to worker with state resume
- Dispatch retries 3 times with 5s delay (survives reconnect window)
2026-04-06 21:50:56 +01:00
c6c03224b1 fix: shared ws_tx across reconnects, spawned workflow task, Arc<ServiceManager> 2026-04-06 21:11:27 +01:00
2066a727b0 fix: shared ws_tx across reconnects, workflow runs in spawned task 2026-04-06 21:08:53 +01:00
63b577ee46 fix: worker creates venv before execution, prompt enforces uv pip install 2026-04-06 20:38:20 +01:00
c56bfd9377 feat: status_reason field for workflows + proper failure logging
- Add status_reason column to workflows table (migration)
- AgentUpdate::WorkflowStatus and WorkflowComplete carry reason
- Dispatch failure logs to execution_log with reason
- Worker disconnect marks orphaned workflows as failed with reason
- All status transitions now have traceable cause
2026-04-06 20:33:41 +01:00
76b964998b feat: real-time file sync on write_file (upload immediately, not just at end) 2026-04-06 20:24:37 +01:00
14 changed files with 350 additions and 162 deletions

View File

@@ -20,7 +20,7 @@ pub struct ServiceInfo {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum AgentEvent { pub enum AgentEvent {
NewRequirement { workflow_id: String, requirement: String, template_id: Option<String> }, NewRequirement { workflow_id: String, requirement: String, template_id: Option<String>, #[serde(default)] worker: Option<String> },
Comment { workflow_id: String, content: String }, Comment { workflow_id: String, content: String },
} }
@@ -113,7 +113,7 @@ impl AgentManager {
/// Dispatch an event to a worker. /// Dispatch an event to a worker.
pub async fn send_event(self: &Arc<Self>, project_id: &str, event: AgentEvent) { pub async fn send_event(self: &Arc<Self>, project_id: &str, event: AgentEvent) {
match event { match event {
AgentEvent::NewRequirement { workflow_id, requirement, template_id } => { AgentEvent::NewRequirement { workflow_id, requirement, template_id, worker } => {
// Generate title (heuristic) // Generate title (heuristic)
let title = generate_title_heuristic(&requirement); let title = generate_title_heuristic(&requirement);
let _ = sqlx::query("UPDATE projects SET name = ? WHERE id = ?") let _ = sqlx::query("UPDATE projects SET name = ? WHERE id = ?")
@@ -153,27 +153,114 @@ impl AgentManager {
require_plan_approval: false, require_plan_approval: false,
}; };
match self.worker_mgr.assign_workflow(assign).await { // Retry dispatch up to 3 times (worker might be reconnecting)
let mut dispatch_result = self.worker_mgr.assign_workflow(assign.clone(), worker.as_deref()).await;
for attempt in 1..3 {
if dispatch_result.is_ok() { break; }
tracing::warn!("Dispatch attempt {} failed, retrying in 5s...", attempt);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
dispatch_result = self.worker_mgr.assign_workflow(assign.clone(), worker.as_deref()).await;
}
match dispatch_result {
Ok(name) => { Ok(name) => {
tracing::info!("Workflow {} dispatched to worker '{}'", workflow_id, name); tracing::info!("Workflow {} dispatched to worker '{}'", workflow_id, name);
} }
Err(e) => { Err(e) => {
tracing::error!("Failed to dispatch workflow {}: {}", workflow_id, e); let reason = format!("调度失败: {}", e);
let _ = sqlx::query("UPDATE workflows SET status = 'failed' WHERE id = ?") tracing::error!("Failed to dispatch workflow {}: {}", workflow_id, reason);
.bind(&workflow_id).execute(&self.pool).await; let _ = sqlx::query("UPDATE workflows SET status = 'failed', status_reason = ? WHERE id = ?")
let _ = btx.send(WsMessage::WorkflowStatusUpdate { .bind(&reason).bind(&workflow_id).execute(&self.pool).await;
workflow_id, // Log to execution_log so frontend can show the reason
status: "failed".into(), let log_id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT INTO execution_log (id, workflow_id, step_order, tool_name, tool_input, output, status, created_at) VALUES (?, ?, 0, 'system', 'dispatch', ?, 'failed', datetime('now'))"
).bind(&log_id).bind(&workflow_id).bind(&reason).execute(&self.pool).await;
let _ = btx.send(WsMessage::StepStatusUpdate {
step_id: log_id, status: "failed".into(), output: reason,
}); });
let _ = btx.send(WsMessage::Error { let _ = btx.send(WsMessage::WorkflowStatusUpdate {
message: format!("No worker available: {}", e), workflow_id, status: "failed".into(),
}); });
} }
} }
} }
AgentEvent::Comment { workflow_id, content } => { AgentEvent::Comment { workflow_id, content } => {
if let Err(e) = self.worker_mgr.forward_comment(&workflow_id, &content).await { // Try to forward to running worker first
tracing::warn!("Failed to forward comment for workflow {}: {}", workflow_id, e); if let Err(_) = self.worker_mgr.forward_comment(&workflow_id, &content).await {
// No worker handling this workflow — re-dispatch it
// Load workflow to get project_id and requirement
let wf = sqlx::query_as::<_, crate::db::Workflow>(
"SELECT * FROM workflows WHERE id = ?"
).bind(&workflow_id).fetch_optional(&self.pool).await.ok().flatten();
if let Some(wf) = wf {
tracing::info!("Re-dispatching workflow {} with comment", workflow_id);
// Load latest state for resume
let state_json: Option<String> = sqlx::query_scalar(
"SELECT state_json FROM agent_state_snapshots WHERE workflow_id = ? ORDER BY created_at DESC LIMIT 1"
).bind(&workflow_id).fetch_optional(&self.pool).await.ok().flatten();
let mut initial_state = state_json
.and_then(|json| serde_json::from_str::<crate::state::AgentState>(&json).ok());
// Prepare state for re-execution with user feedback
if let Some(ref mut state) = initial_state {
for step in &mut state.steps {
if matches!(step.status, crate::state::StepStatus::Failed) {
step.status = crate::state::StepStatus::Pending;
}
if matches!(step.status, crate::state::StepStatus::WaitingUser) {
step.status = crate::state::StepStatus::Running;
}
}
if state.first_actionable_step().is_some() {
// Has pending/failed steps — attach feedback and resume
let order = state.first_actionable_step().unwrap();
if let Some(step) = state.steps.iter_mut().find(|s| s.order == order) {
step.user_feedbacks.push(content.clone());
}
} else {
// All steps done — go back to Planning with user feedback
state.phase = crate::state::AgentPhase::Planning;
state.current_step_chat_history.clear();
state.current_step_chat_history.push(
crate::llm::ChatMessage::user(&format!("【用户反馈】{}", content))
);
}
}
let assign = crate::worker::ServerToWorker::WorkflowAssign {
workflow_id: workflow_id.clone(),
project_id: wf.project_id.clone(),
requirement: wf.requirement,
template_id: if wf.template_id.is_empty() { None } else { Some(wf.template_id) },
initial_state,
require_plan_approval: false,
};
let btx = {
let mut map = self.broadcast.write().await;
map.entry(wf.project_id.clone())
.or_insert_with(|| broadcast::channel(64).0)
.clone()
};
match self.worker_mgr.assign_workflow(assign, None).await {
Ok(name) => {
let _ = sqlx::query("UPDATE workflows SET status = 'executing', status_reason = '' WHERE id = ?")
.bind(&workflow_id).execute(&self.pool).await;
let _ = btx.send(WsMessage::WorkflowStatusUpdate {
workflow_id, status: "executing".into(),
});
tracing::info!("Workflow re-dispatched to worker '{}'", name);
}
Err(e) => {
tracing::error!("Failed to re-dispatch workflow {}: {}", workflow_id, e);
}
}
}
} }
} }
} }
@@ -606,6 +693,15 @@ pub async fn run_step_loop(
let step_order = step.order; let step_order = step.order;
for iteration in 0..50 { for iteration in 0..50 {
// Drain any pending user comments and inject as feedback
while let Ok(event) = event_rx.try_recv() {
if let AgentEvent::Comment { content, .. } = event {
tracing::info!("[workflow {}] Step {} received user feedback: {}", workflow_id, step_order, &content[..content.len().min(100)]);
step_chat_history.push(ChatMessage::user(&format!("【用户反馈】{}", content)));
send_execution(update_tx, workflow_id, step_order, "user_feedback", &content, "已注入到对话上下文", "done").await;
}
}
// Build messages: system + user context + chat history // Build messages: system + user context + chat history
let mut messages = vec![ let mut messages = vec![
ChatMessage::system(&system_prompt), ChatMessage::system(&system_prompt),
@@ -745,6 +841,7 @@ pub async fn run_step_loop(
let _ = update_tx.send(AgentUpdate::WorkflowStatus { let _ = update_tx.send(AgentUpdate::WorkflowStatus {
workflow_id: workflow_id.to_string(), workflow_id: workflow_id.to_string(),
status: "waiting_user".into(), status: "waiting_user".into(),
reason: String::new(),
}).await; }).await;
send_execution(update_tx, workflow_id, step_order, "ask_user", reason, reason, "waiting").await; send_execution(update_tx, workflow_id, step_order, "ask_user", reason, reason, "waiting").await;
@@ -789,6 +886,7 @@ pub async fn run_step_loop(
let _ = update_tx.send(AgentUpdate::WorkflowStatus { let _ = update_tx.send(AgentUpdate::WorkflowStatus {
workflow_id: workflow_id.to_string(), workflow_id: workflow_id.to_string(),
status: "executing".into(), status: "executing".into(),
reason: String::new(),
}).await; }).await;
let tool_msg = if feedback.is_empty() { let tool_msg = if feedback.is_empty() {
@@ -893,6 +991,22 @@ pub async fn run_step_loop(
let result = execute_tool(&tc.function.name, &tc.function.arguments, workdir, exec).await; let result = execute_tool(&tc.function.name, &tc.function.arguments, workdir, exec).await;
let status = if result.starts_with("Error:") { "failed" } else { "done" }; let status = if result.starts_with("Error:") { "failed" } else { "done" };
send_execution(update_tx, workflow_id, step_order, &tc.function.name, &tc.function.arguments, &result, status).await; send_execution(update_tx, workflow_id, step_order, &tc.function.name, &tc.function.arguments, &result, status).await;
// Real-time file sync: upload written file to server immediately
if tc.function.name == "write_file" && status == "done" {
if let Some(rel_path) = args.get("path").and_then(|v| v.as_str()) {
let full = std::path::Path::new(workdir).join(rel_path);
if let Ok(bytes) = tokio::fs::read(&full).await {
use base64::Engine;
let _ = update_tx.send(AgentUpdate::FileSync {
project_id: project_id.to_string(),
path: rel_path.to_string(),
data_b64: base64::engine::general_purpose::STANDARD.encode(&bytes),
}).await;
}
}
}
step_chat_history.push(ChatMessage::tool_result(&tc.id, &result)); step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
} }
} }
@@ -992,6 +1106,14 @@ pub async fn run_agent_loop(
break; break;
} }
// Drain pending user comments
while let Ok(event) = event_rx.try_recv() {
if let AgentEvent::Comment { content, .. } = event {
tracing::info!("[workflow {}] Planning received user feedback: {}", workflow_id, &content[..content.len().min(100)]);
state.current_step_chat_history.push(ChatMessage::user(&format!("【用户反馈】{}", content)));
}
}
let system_prompt = build_planning_prompt(project_id, instructions); let system_prompt = build_planning_prompt(project_id, instructions);
let messages = state.build_messages(&system_prompt, requirement); let messages = state.build_messages(&system_prompt, requirement);
let msg_count = messages.len() as i32; let msg_count = messages.len() as i32;
@@ -1068,6 +1190,7 @@ pub async fn run_agent_loop(
let _ = update_tx.send(AgentUpdate::WorkflowStatus { let _ = update_tx.send(AgentUpdate::WorkflowStatus {
workflow_id: workflow_id.to_string(), workflow_id: workflow_id.to_string(),
status: "waiting_user".into(), status: "waiting_user".into(),
reason: String::new(),
}).await; }).await;
send_execution(update_tx, workflow_id, 0, "plan_approval", "等待确认计划", "等待用户确认执行计划", "waiting").await; send_execution(update_tx, workflow_id, 0, "plan_approval", "等待确认计划", "等待用户确认执行计划", "waiting").await;
@@ -1098,6 +1221,7 @@ pub async fn run_agent_loop(
let _ = update_tx.send(AgentUpdate::WorkflowStatus { let _ = update_tx.send(AgentUpdate::WorkflowStatus {
workflow_id: workflow_id.to_string(), workflow_id: workflow_id.to_string(),
status: "executing".into(), status: "executing".into(),
reason: String::new(),
}).await; }).await;
// Stay in Planning phase, continue the loop // Stay in Planning phase, continue the loop
continue; continue;
@@ -1114,6 +1238,7 @@ pub async fn run_agent_loop(
let _ = update_tx.send(AgentUpdate::WorkflowStatus { let _ = update_tx.send(AgentUpdate::WorkflowStatus {
workflow_id: workflow_id.to_string(), workflow_id: workflow_id.to_string(),
status: "executing".into(), status: "executing".into(),
reason: String::new(),
}).await; }).await;
} }

View File

@@ -6,6 +6,7 @@ use crate::worker::WorkerInfo;
async fn list_workers(State(state): State<Arc<AppState>>) -> Json<Vec<WorkerInfo>> { async fn list_workers(State(state): State<Arc<AppState>>) -> Json<Vec<WorkerInfo>> {
let workers = state.agent_mgr.worker_mgr.list().await; let workers = state.agent_mgr.worker_mgr.list().await;
// WorkerInfo already contains `name` field from registration
let entries: Vec<WorkerInfo> = workers.into_iter().map(|(_, info)| info).collect(); let entries: Vec<WorkerInfo> = workers.into_iter().map(|(_, info)| info).collect();
Json(entries) Json(entries)
} }

View File

@@ -25,6 +25,8 @@ pub struct CreateWorkflow {
pub requirement: String, pub requirement: String,
#[serde(default)] #[serde(default)]
pub template_id: Option<String>, pub template_id: Option<String>,
#[serde(default)]
pub worker: Option<String>,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@@ -78,6 +80,7 @@ async fn create_workflow(
workflow_id: workflow.id.clone(), workflow_id: workflow.id.clone(),
requirement: workflow.requirement.clone(), requirement: workflow.requirement.clone(),
template_id: input.template_id, template_id: input.template_id,
worker: input.worker,
}).await; }).await;
Ok(Json(workflow)) Ok(Json(workflow))

View File

@@ -322,6 +322,16 @@ impl Database {
} }
} }
// Migration: add status_reason to workflows
let has_status_reason: bool = sqlx::query_scalar::<_, i32>(
"SELECT COUNT(*) FROM pragma_table_info('workflows') WHERE name='status_reason'"
).fetch_one(&self.pool).await.unwrap_or(0) > 0;
if !has_status_reason {
let _ = sqlx::query(
"ALTER TABLE workflows ADD COLUMN status_reason TEXT NOT NULL DEFAULT ''"
).execute(&self.pool).await;
}
Ok(()) Ok(())
} }
} }
@@ -348,6 +358,8 @@ pub struct Workflow {
pub created_at: String, pub created_at: String,
pub report: String, pub report: String,
pub template_id: String, pub template_id: String,
#[serde(default)]
pub status_reason: String,
} }
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]

View File

@@ -211,6 +211,7 @@ async fn resume_workflows(pool: SqlitePool, agent_mgr: Arc<agent::AgentManager>)
workflow_id, workflow_id,
requirement, requirement,
template_id: None, template_id: None,
worker: None,
}).await; }).await;
} }
} }

View File

@@ -27,6 +27,7 @@
## 环境信息 ## 环境信息
- 工作目录是独立的项目工作区Python venv 已预先激活(.venv/ - 工作目录是独立的项目工作区Python venv 已预先激活(.venv/
- 使用 `uv add <包名>``pip install <包名>` 安装依赖 - 安装 Python 包**必须**使用 `uv pip install <包名>`(不要用裸 pip
- 运行 Python 脚本直接用 `python3 xxx.py`venv 已在 PATH 中)
请使用中文回复。 请使用中文回复。

View File

@@ -14,12 +14,12 @@ use crate::state::{AgentState, Artifact};
#[serde(tag = "kind")] #[serde(tag = "kind")]
pub enum AgentUpdate { pub enum AgentUpdate {
PlanUpdate { workflow_id: String, steps: Vec<PlanStepInfo> }, PlanUpdate { workflow_id: String, steps: Vec<PlanStepInfo> },
WorkflowStatus { workflow_id: String, status: String }, WorkflowStatus { workflow_id: String, status: String, #[serde(default)] reason: String },
Activity { workflow_id: String, activity: String }, Activity { workflow_id: String, activity: String },
ExecutionLog { workflow_id: String, step_order: i32, tool_name: String, tool_input: String, output: String, status: String }, ExecutionLog { workflow_id: String, step_order: i32, tool_name: String, tool_input: String, output: String, status: String },
LlmCallLog { workflow_id: String, step_order: i32, phase: String, messages_count: i32, tools_count: i32, tool_calls: String, text_response: String, prompt_tokens: Option<u32>, completion_tokens: Option<u32>, latency_ms: i64 }, LlmCallLog { workflow_id: String, step_order: i32, phase: String, messages_count: i32, tools_count: i32, tool_calls: String, text_response: String, prompt_tokens: Option<u32>, completion_tokens: Option<u32>, latency_ms: i64 },
StateSnapshot { workflow_id: String, step_order: i32, state: AgentState }, StateSnapshot { workflow_id: String, step_order: i32, state: AgentState },
WorkflowComplete { workflow_id: String, status: String }, WorkflowComplete { workflow_id: String, status: String, #[serde(default)] reason: String },
ArtifactSave { workflow_id: String, step_order: i32, artifact: Artifact }, ArtifactSave { workflow_id: String, step_order: i32, artifact: Artifact },
RequirementUpdate { workflow_id: String, requirement: String }, RequirementUpdate { workflow_id: String, requirement: String },
/// base64-encoded file content /// base64-encoded file content
@@ -61,9 +61,9 @@ pub async fn handle_single_update(
AgentUpdate::PlanUpdate { workflow_id, steps } => { AgentUpdate::PlanUpdate { workflow_id, steps } => {
bcast(broadcast_tx, WsMessage::PlanUpdate { workflow_id: workflow_id.clone(), steps: steps.clone() }); bcast(broadcast_tx, WsMessage::PlanUpdate { workflow_id: workflow_id.clone(), steps: steps.clone() });
} }
AgentUpdate::WorkflowStatus { workflow_id, status } => { AgentUpdate::WorkflowStatus { workflow_id, status, reason } => {
let _ = sqlx::query("UPDATE workflows SET status = ? WHERE id = ?") let _ = sqlx::query("UPDATE workflows SET status = ?, status_reason = ? WHERE id = ?")
.bind(status).bind(workflow_id).execute(pool).await; .bind(status).bind(reason).bind(workflow_id).execute(pool).await;
bcast(broadcast_tx, WsMessage::WorkflowStatusUpdate { workflow_id: workflow_id.clone(), status: status.clone() }); bcast(broadcast_tx, WsMessage::WorkflowStatusUpdate { workflow_id: workflow_id.clone(), status: status.clone() });
} }
AgentUpdate::Activity { workflow_id, activity } => { AgentUpdate::Activity { workflow_id, activity } => {
@@ -100,9 +100,9 @@ pub async fn handle_single_update(
"INSERT INTO agent_state_snapshots (id, workflow_id, step_order, state_json, created_at) VALUES (?, ?, ?, ?, datetime('now'))" "INSERT INTO agent_state_snapshots (id, workflow_id, step_order, state_json, created_at) VALUES (?, ?, ?, ?, datetime('now'))"
).bind(&id).bind(workflow_id).bind(step_order).bind(&json).execute(pool).await; ).bind(&id).bind(workflow_id).bind(step_order).bind(&json).execute(pool).await;
} }
AgentUpdate::WorkflowComplete { workflow_id, status } => { AgentUpdate::WorkflowComplete { workflow_id, status, reason } => {
let _ = sqlx::query("UPDATE workflows SET status = ? WHERE id = ?") let _ = sqlx::query("UPDATE workflows SET status = ?, status_reason = ? WHERE id = ?")
.bind(status).bind(workflow_id).execute(pool).await; .bind(status).bind(reason).bind(workflow_id).execute(pool).await;
bcast(broadcast_tx, WsMessage::WorkflowStatusUpdate { workflow_id: workflow_id.clone(), status: status.clone() }); bcast(broadcast_tx, WsMessage::WorkflowStatusUpdate { workflow_id: workflow_id.clone(), status: status.clone() });
} }
AgentUpdate::ArtifactSave { workflow_id, step_order, artifact } => { AgentUpdate::ArtifactSave { workflow_id, step_order, artifact } => {

View File

@@ -69,6 +69,7 @@ async fn check_timers(pool: &SqlitePool, agent_mgr: &Arc<AgentManager>) -> anyho
workflow_id, workflow_id,
requirement: timer.requirement.clone(), requirement: timer.requirement.clone(),
template_id: None, template_id: None,
worker: None,
}).await; }).await;
} }

View File

@@ -118,17 +118,22 @@ impl WorkerManager {
.collect() .collect()
} }
/// Assign a workflow to the first available worker. Returns worker name. /// Assign a workflow to a worker. If `preferred` is specified, use that worker;
pub async fn assign_workflow(&self, assign: ServerToWorker) -> Result<String, String> { /// otherwise pick the first available.
pub async fn assign_workflow(&self, assign: ServerToWorker, preferred: Option<&str>) -> Result<String, String> {
let workflow_id = match &assign { let workflow_id = match &assign {
ServerToWorker::WorkflowAssign { workflow_id, .. } => workflow_id.clone(), ServerToWorker::WorkflowAssign { workflow_id, .. } => workflow_id.clone(),
_ => return Err("Not a workflow assignment".into()), _ => return Err("Not a workflow assignment".into()),
}; };
let workers = self.workers.read().await; let workers = self.workers.read().await;
// Pick first worker (simple strategy for now) let (name, worker) = if let Some(pref) = preferred {
let (name, worker) = workers.iter().next() workers.get_key_value(pref)
.ok_or_else(|| "No workers available".to_string())?; .ok_or_else(|| format!("Worker '{}' not available", pref))?
} else {
workers.iter().next()
.ok_or_else(|| "No workers available".to_string())?
};
worker.tx.send(assign).await.map_err(|_| { worker.tx.send(assign).await.map_err(|_| {
format!("Worker '{}' disconnected", name) format!("Worker '{}' disconnected", name)
@@ -163,4 +168,13 @@ impl WorkerManager {
pub async fn complete_workflow(&self, workflow_id: &str) { pub async fn complete_workflow(&self, workflow_id: &str) {
self.assignments.write().await.remove(workflow_id); self.assignments.write().await.remove(workflow_id);
} }
/// List all workflows assigned to a worker.
pub async fn assignments_for_worker(&self, worker_name: &str) -> Vec<String> {
self.assignments.read().await
.iter()
.filter(|(_, w)| w.as_str() == worker_name)
.map(|(wf_id, _)| wf_id.clone())
.collect()
}
} }

View File

@@ -10,72 +10,62 @@ use crate::sink::{AgentUpdate, ServiceManager};
use crate::worker::{ServerToWorker, WorkerInfo, WorkerToServer}; use crate::worker::{ServerToWorker, WorkerInfo, WorkerToServer};
fn collect_worker_info(name: &str) -> WorkerInfo { fn collect_worker_info(name: &str) -> WorkerInfo {
let cpu = std::fs::read_to_string("/proc/cpuinfo") let cpu = std::fs::read_to_string("/proc/cpuinfo").ok()
.ok() .and_then(|s| s.lines().find(|l| l.starts_with("model name"))
.and_then(|s| { .map(|l| l.split(':').nth(1).unwrap_or("").trim().to_string()))
s.lines()
.find(|l| l.starts_with("model name"))
.map(|l| l.split(':').nth(1).unwrap_or("").trim().to_string())
})
.unwrap_or_else(|| "unknown".into()); .unwrap_or_else(|| "unknown".into());
let memory = std::fs::read_to_string("/proc/meminfo").ok()
let memory = std::fs::read_to_string("/proc/meminfo") .and_then(|s| s.lines().find(|l| l.starts_with("MemTotal"))
.ok()
.and_then(|s| {
s.lines()
.find(|l| l.starts_with("MemTotal"))
.and_then(|l| l.split_whitespace().nth(1)) .and_then(|l| l.split_whitespace().nth(1))
.and_then(|kb| kb.parse::<u64>().ok()) .and_then(|kb| kb.parse::<u64>().ok())
.map(|kb| format!("{:.1} GB", kb as f64 / 1_048_576.0)) .map(|kb| format!("{:.1} GB", kb as f64 / 1_048_576.0)))
})
.unwrap_or_else(|| "unknown".into()); .unwrap_or_else(|| "unknown".into());
let gpu = std::process::Command::new("nvidia-smi") let gpu = std::process::Command::new("nvidia-smi")
.arg("--query-gpu=name") .arg("--query-gpu=name").arg("--format=csv,noheader").output().ok()
.arg("--format=csv,noheader") .and_then(|o| if o.status.success() { Some(String::from_utf8_lossy(&o.stdout).trim().to_string()) } else { None })
.output()
.ok()
.and_then(|o| {
if o.status.success() {
Some(String::from_utf8_lossy(&o.stdout).trim().to_string())
} else {
None
}
})
.unwrap_or_else(|| "none".into()); .unwrap_or_else(|| "none".into());
WorkerInfo { WorkerInfo {
name: name.to_string(), name: name.to_string(), cpu, memory, gpu,
cpu,
memory,
gpu,
os: std::env::consts::OS.to_string(), os: std::env::consts::OS.to_string(),
kernel: std::process::Command::new("uname") kernel: std::process::Command::new("uname").arg("-r").output().ok()
.arg("-r")
.output()
.ok()
.map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()) .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
.unwrap_or_else(|| "unknown".into()), .unwrap_or_else(|| "unknown".into()),
} }
} }
/// Shared WebSocket sender that can be swapped on reconnect.
type SharedWsTx = Arc<tokio::sync::Mutex<Option<futures::stream::SplitSink<
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
Message
>>>>;
pub async fn run(server_url: &str, worker_name: &str, llm_config: &crate::LlmConfig) -> anyhow::Result<()> { pub async fn run(server_url: &str, worker_name: &str, llm_config: &crate::LlmConfig) -> anyhow::Result<()> {
tracing::info!("Tori worker '{}' connecting to {} (model={})", worker_name, server_url, llm_config.model); tracing::info!("Tori worker '{}' connecting to {} (model={})", worker_name, server_url, llm_config.model);
let svc_mgr = ServiceManager::new(9100);
let ws_tx: SharedWsTx = Arc::new(tokio::sync::Mutex::new(None));
let comment_tx: Arc<tokio::sync::Mutex<Option<mpsc::Sender<AgentEvent>>>> =
Arc::new(tokio::sync::Mutex::new(None));
loop { loop {
match connect_and_run(server_url, worker_name, llm_config).await { match connect_and_run(server_url, worker_name, llm_config, &svc_mgr, &ws_tx, &comment_tx).await {
Ok(()) => { Ok(()) => tracing::info!("Connection closed, reconnecting in 5s..."),
tracing::info!("Worker connection closed, reconnecting in 5s..."); Err(e) => tracing::error!("Worker error: {}, reconnecting in 5s...", e),
}
Err(e) => {
tracing::error!("Worker error: {}, reconnecting in 5s...", e);
}
} }
// Clear ws_tx so relay tasks know the connection is gone
*ws_tx.lock().await = None;
tokio::time::sleep(std::time::Duration::from_secs(5)).await; tokio::time::sleep(std::time::Duration::from_secs(5)).await;
} }
} }
async fn connect_and_run(server_url: &str, worker_name: &str, llm_config: &crate::LlmConfig) -> anyhow::Result<()> { async fn connect_and_run(
server_url: &str,
worker_name: &str,
llm_config: &crate::LlmConfig,
svc_mgr: &Arc<ServiceManager>,
shared_ws_tx: &SharedWsTx,
comment_tx: &Arc<tokio::sync::Mutex<Option<mpsc::Sender<AgentEvent>>>>,
) -> anyhow::Result<()> {
let (ws_stream, _) = connect_async(server_url).await?; let (ws_stream, _) = connect_async(server_url).await?;
let (mut ws_tx, mut ws_rx) = ws_stream.split(); let (mut ws_tx, mut ws_rx) = ws_stream.split();
@@ -99,26 +89,27 @@ async fn connect_and_run(server_url: &str, worker_name: &str, llm_config: &crate
} }
} }
let svc_mgr = ServiceManager::new(9100); // Store the new ws_tx so relay tasks can use it
let ws_tx = Arc::new(tokio::sync::Mutex::new(ws_tx)); *shared_ws_tx.lock().await = Some(ws_tx);
// Ping task to keep connection alive // Heartbeat keepalive (text message so Cloudflare/proxies don't drop it)
let ping_tx = ws_tx.clone(); let hb_tx = shared_ws_tx.clone();
tokio::spawn(async move { let ping_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop { loop {
interval.tick().await; interval.tick().await;
let mut tx = ping_tx.lock().await; let mut guard = hb_tx.lock().await;
if tx.send(Message::Ping(vec![].into())).await.is_err() { if let Some(ref mut tx) = *guard {
if tx.send(Message::Text(r#"{"type":"heartbeat"}"#.into())).await.is_err() {
*guard = None;
break;
}
} else {
break; break;
} }
} }
}); });
// Channel for forwarding comments to the running workflow
let comment_tx: Arc<tokio::sync::Mutex<Option<mpsc::Sender<AgentEvent>>>> =
Arc::new(tokio::sync::Mutex::new(None));
// Main message loop // Main message loop
while let Some(msg) = ws_rx.next().await { while let Some(msg) = ws_rx.next().await {
let text = match msg? { let text = match msg? {
@@ -130,31 +121,32 @@ async fn connect_and_run(server_url: &str, worker_name: &str, llm_config: &crate
let server_msg: ServerToWorker = match serde_json::from_str(&text) { let server_msg: ServerToWorker = match serde_json::from_str(&text) {
Ok(m) => m, Ok(m) => m,
Err(e) => { Err(e) => { tracing::warn!("Bad server message: {}", e); continue; }
tracing::warn!("Failed to parse server message: {}", e);
continue;
}
}; };
match server_msg { match server_msg {
ServerToWorker::WorkflowAssign { ServerToWorker::WorkflowAssign {
workflow_id, workflow_id, project_id, requirement,
project_id, template_id: _, initial_state, require_plan_approval,
requirement,
template_id: _,
initial_state,
require_plan_approval,
} => { } => {
tracing::info!("Received workflow: {} (project {})", workflow_id, project_id); tracing::info!("Received workflow: {} (project {})", workflow_id, project_id);
let llm = LlmClient::new(llm_config); let llm = LlmClient::new(llm_config);
let exec = LocalExecutor::new(None); let exec = LocalExecutor::new(None);
let workdir = format!("workspaces/{}", project_id); let workdir = format!("workspaces/{}", project_id);
let instructions = String::new(); // TODO: load from template let instructions = String::new();
// update channel → serialize → WebSocket // Ensure workspace has a venv
let _ = tokio::fs::create_dir_all(&workdir).await;
let venv_path = format!("{}/.venv", workdir);
if !std::path::Path::new(&venv_path).exists() {
tracing::info!("Setting up venv in {}", workdir);
let _ = exec.execute("uv venv .venv", &workdir).await;
}
// update channel → relay to shared ws_tx
let (update_tx, mut update_rx) = mpsc::channel::<AgentUpdate>(64); let (update_tx, mut update_rx) = mpsc::channel::<AgentUpdate>(64);
let ws_tx_clone = ws_tx.clone(); let relay_ws_tx = shared_ws_tx.clone();
let wf_id_clone = workflow_id.clone(); let wf_id_clone = workflow_id.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Some(update) = update_rx.recv().await { while let Some(update) = update_rx.recv().await {
@@ -162,12 +154,19 @@ async fn connect_and_run(server_url: &str, worker_name: &str, llm_config: &crate
workflow_id: wf_id_clone.clone(), workflow_id: wf_id_clone.clone(),
update, update,
}; };
if let Ok(json) = serde_json::to_string(&msg) { let json = match serde_json::to_string(&msg) {
let mut tx = ws_tx_clone.lock().await; Ok(j) => j,
Err(_) => continue,
};
let mut guard = relay_ws_tx.lock().await;
if let Some(ref mut tx) = *guard {
if tx.send(Message::Text(json.into())).await.is_err() { if tx.send(Message::Text(json.into())).await.is_err() {
break; tracing::warn!("WebSocket send failed, buffering...");
*guard = None;
// Don't break — keep draining update_rx so agent doesn't block
} }
} }
// If ws_tx is None, updates are lost (reconnect will happen)
} }
}); });
@@ -175,81 +174,61 @@ async fn connect_and_run(server_url: &str, worker_name: &str, llm_config: &crate
let (evt_tx, mut evt_rx) = mpsc::channel::<AgentEvent>(32); let (evt_tx, mut evt_rx) = mpsc::channel::<AgentEvent>(32);
*comment_tx.lock().await = Some(evt_tx); *comment_tx.lock().await = Some(evt_tx);
let _ = tokio::fs::create_dir_all(&workdir).await; let svc = Arc::clone(svc_mgr);
let wf_id = workflow_id.clone();
let pid = project_id.clone();
tokio::spawn(async move {
let result = agent::run_agent_loop( let result = agent::run_agent_loop(
&llm, &exec, &update_tx, &mut evt_rx, &llm, &exec, &update_tx, &mut evt_rx,
&project_id, &workflow_id, &requirement, &workdir, &svc_mgr, &pid, &wf_id, &requirement, &workdir, &svc,
&instructions, initial_state, None, require_plan_approval, &instructions, initial_state, None, require_plan_approval,
).await; ).await;
let final_status = if result.is_ok() { "done" } else { "failed" }; let final_status = if result.is_ok() { "done" } else { "failed" };
if let Err(e) = &result { let reason = if let Err(ref e) = result { format!("{}", e) } else { String::new() };
tracing::error!("Workflow {} failed: {}", workflow_id, e); if let Err(ref e) = result {
let _ = update_tx.send(AgentUpdate::Error { tracing::error!("Workflow {} failed: {}", wf_id, e);
message: format!("Agent error: {}", e), let _ = update_tx.send(AgentUpdate::Error { message: format!("{}", e) }).await;
}).await;
} }
// Sync all workspace files to server // Sync workspace files to server
sync_workspace(&update_tx, &project_id, &workdir).await; sync_workspace(&update_tx, &pid, &workdir).await;
let _ = update_tx.send(AgentUpdate::WorkflowComplete { let _ = update_tx.send(AgentUpdate::WorkflowComplete {
workflow_id: workflow_id.clone(), workflow_id: wf_id.clone(), status: final_status.into(), reason,
status: final_status.into(),
}).await; }).await;
tracing::info!("Workflow {} completed: {}", wf_id, final_status);
*comment_tx.lock().await = None; });
tracing::info!("Workflow {} completed: {}", workflow_id, final_status);
} }
ServerToWorker::Comment { workflow_id, content } => { ServerToWorker::Comment { workflow_id, content } => {
if let Some(ref tx) = *comment_tx.lock().await { if let Some(ref tx) = *comment_tx.lock().await {
let _ = tx.send(AgentEvent::Comment { let _ = tx.send(AgentEvent::Comment { workflow_id, content }).await;
workflow_id,
content,
}).await;
} }
} }
} }
} }
ping_task.abort();
Ok(()) Ok(())
} }
/// Sync all workspace files to server via FileSync updates. /// Sync all workspace files to server via FileSync updates.
/// Skips .venv/, __pycache__/, .git/ and files > 1MB. async fn sync_workspace(update_tx: &mpsc::Sender<AgentUpdate>, project_id: &str, workdir: &str) {
async fn sync_workspace(
update_tx: &mpsc::Sender<AgentUpdate>,
project_id: &str,
workdir: &str,
) {
use base64::Engine; use base64::Engine;
let base = std::path::Path::new(workdir); let base = std::path::Path::new(workdir);
if !base.exists() { if !base.exists() { return; }
return;
}
let mut stack = vec![base.to_path_buf()]; let mut stack = vec![base.to_path_buf()];
let mut count = 0u32; let mut count = 0u32;
while let Some(dir) = stack.pop() { while let Some(dir) = stack.pop() {
let mut entries = match tokio::fs::read_dir(&dir).await { let mut entries = match tokio::fs::read_dir(&dir).await { Ok(e) => e, Err(_) => continue };
Ok(e) => e,
Err(_) => continue,
};
while let Ok(Some(entry)) = entries.next_entry().await { while let Ok(Some(entry)) = entries.next_entry().await {
let name = entry.file_name().to_string_lossy().to_string(); let name = entry.file_name().to_string_lossy().to_string();
// Skip dirs we don't want to sync if matches!(name.as_str(), ".venv" | "__pycache__" | ".git" | "node_modules" | ".mypy_cache") { continue; }
if matches!(name.as_str(), ".venv" | "__pycache__" | ".git" | "node_modules" | ".mypy_cache") {
continue;
}
let path = entry.path(); let path = entry.path();
if path.is_dir() { if path.is_dir() { stack.push(path); }
stack.push(path); else if let Ok(meta) = entry.metadata().await {
} else if let Ok(meta) = entry.metadata().await { if meta.len() > 1_048_576 { continue; }
// Skip files > 1MB
if meta.len() > 1_048_576 {
continue;
}
if let Ok(bytes) = tokio::fs::read(&path).await { if let Ok(bytes) = tokio::fs::read(&path).await {
let rel = path.strip_prefix(base).unwrap_or(&path); let rel = path.strip_prefix(base).unwrap_or(&path);
let _ = update_tx.send(AgentUpdate::FileSync { let _ = update_tx.send(AgentUpdate::FileSync {
@@ -262,5 +241,5 @@ async fn sync_workspace(
} }
} }
} }
tracing::info!("Synced {} files from workspace {}", count, workdir); tracing::info!("Synced {} files from {}", count, workdir);
} }

View File

@@ -90,6 +90,10 @@ async fn handle_worker_socket(socket: WebSocket, state: Arc<WsWorkerState>) {
while let Some(Ok(msg)) = receiver.next().await { while let Some(Ok(msg)) = receiver.next().await {
match msg { match msg {
Message::Text(text) => { Message::Text(text) => {
// Fast-path: heartbeat
if text.contains("\"heartbeat\"") {
continue;
}
match serde_json::from_str::<WorkerToServer>(&text) { match serde_json::from_str::<WorkerToServer>(&text) {
Ok(worker_msg) => { Ok(worker_msg) => {
handle_worker_message(&state_clone, worker_msg).await; handle_worker_message(&state_clone, worker_msg).await;
@@ -111,6 +115,23 @@ async fn handle_worker_socket(socket: WebSocket, state: Arc<WsWorkerState>) {
_ = recv_task => {}, _ = recv_task => {},
} }
// Log reason for any orphaned workflows before cleanup
let orphan_workflows: Vec<String> = {
let assignments = mgr_for_cleanup.assignments_for_worker(&name_clone).await;
assignments
};
if !orphan_workflows.is_empty() {
let reason = format!("Worker '{}' 断开连接", name_clone);
for wf_id in &orphan_workflows {
let _ = sqlx::query("UPDATE workflows SET status = 'failed', status_reason = ? WHERE id = ? AND status IN ('executing', 'planning')")
.bind(&reason).bind(wf_id).execute(&state.pool).await;
let log_id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT INTO execution_log (id, workflow_id, step_order, tool_name, tool_input, output, status, created_at) VALUES (?, ?, 0, 'system', 'worker_disconnect', ?, 'failed', datetime('now'))"
).bind(&log_id).bind(wf_id).bind(&reason).execute(&state.pool).await;
tracing::warn!("Workflow {} orphaned: {}", wf_id, reason);
}
}
mgr_for_cleanup.unregister(&name_clone).await; mgr_for_cleanup.unregister(&name_clone).await;
} }

View File

@@ -45,10 +45,10 @@ export const api = {
listWorkflows: (projectId: string) => listWorkflows: (projectId: string) =>
request<Workflow[]>(`/projects/${projectId}/workflows`), request<Workflow[]>(`/projects/${projectId}/workflows`),
createWorkflow: (projectId: string, requirement: string, templateId?: string) => createWorkflow: (projectId: string, requirement: string, templateId?: string, worker?: string) =>
request<Workflow>(`/projects/${projectId}/workflows`, { request<Workflow>(`/projects/${projectId}/workflows`, {
method: 'POST', method: 'POST',
body: JSON.stringify({ requirement, template_id: templateId || undefined }), body: JSON.stringify({ requirement, template_id: templateId || undefined, worker: worker || undefined }),
}), }),
listTemplates: () => listTemplates: () =>
@@ -93,6 +93,7 @@ export const api = {
deleteTimer: (timerId: string) => deleteTimer: (timerId: string) =>
request<void>(`/timers/${timerId}`, { method: 'DELETE' }), request<void>(`/timers/${timerId}`, { method: 'DELETE' }),
getKb: () => request<{ content: string }>('/kb'), getKb: () => request<{ content: string }>('/kb'),
listArticles: () => request<KbArticleSummary[]>('/kb/articles'), listArticles: () => request<KbArticleSummary[]>('/kb/articles'),

View File

@@ -1,5 +1,6 @@
<script setup lang="ts"> <script setup lang="ts">
import { ref, watch } from 'vue' import { ref, watch, onMounted } from 'vue'
import { api } from '../api'
const props = defineProps<{ const props = defineProps<{
requirement: string requirement: string
@@ -7,13 +8,20 @@ const props = defineProps<{
}>() }>()
const emit = defineEmits<{ const emit = defineEmits<{
submit: [text: string] submit: [text: string, worker?: string]
}>() }>()
const input = ref('') const input = ref('')
const editing = ref(!props.requirement) const editing = ref(!props.requirement)
const workers = ref<{ name: string }[]>([])
const selectedWorker = ref('')
onMounted(async () => {
try {
workers.value = await api.listWorkers()
} catch { /* ignore */ }
})
// 当 requirement 从外部更新(如 loadData 完成),自动退出编辑模式
watch(() => props.requirement, (val) => { watch(() => props.requirement, (val) => {
if (val && editing.value && !input.value.trim()) { if (val && editing.value && !input.value.trim()) {
editing.value = false editing.value = false
@@ -23,7 +31,7 @@ watch(() => props.requirement, (val) => {
function submit() { function submit() {
const text = input.value.trim() const text = input.value.trim()
if (!text) return if (!text) return
emit('submit', text) emit('submit', text, selectedWorker.value || undefined)
editing.value = false editing.value = false
} }
</script> </script>
@@ -47,9 +55,15 @@ function submit() {
rows="8" rows="8"
@keydown.ctrl.enter="submit" @keydown.ctrl.enter="submit"
/> />
<div class="submit-row">
<select v-if="workers.length" v-model="selectedWorker" class="worker-select">
<option value="">自动选择 Worker</option>
<option v-for="w in workers" :key="w.name" :value="w.name">{{ w.name }}</option>
</select>
<button class="btn-submit" @click="submit">提交需求</button> <button class="btn-submit" @click="submit">提交需求</button>
</div> </div>
</div> </div>
</div>
</template> </template>
<style scoped> <style scoped>
@@ -148,8 +162,23 @@ function submit() {
border-color: var(--accent); border-color: var(--accent);
} }
.submit-row {
display: flex;
align-items: center;
justify-content: flex-end;
gap: 8px;
}
.worker-select {
padding: 6px 10px;
font-size: 12px;
background: var(--bg-secondary);
border: 1px solid var(--border);
border-radius: 6px;
color: var(--text-primary);
}
.btn-submit { .btn-submit {
align-self: flex-end;
background: var(--accent); background: var(--accent);
color: var(--bg-primary); color: var(--bg-primary);
font-weight: 600; font-weight: 600;

View File

@@ -147,9 +147,9 @@ watch(() => props.projectId, () => {
setupWs() setupWs()
}) })
async function onSubmitRequirement(text: string) { async function onSubmitRequirement(text: string, worker?: string) {
try { try {
const wf = await api.createWorkflow(props.projectId, text) const wf = await api.createWorkflow(props.projectId, text, undefined, worker)
workflow.value = wf workflow.value = wf
logEntries.value = [] logEntries.value = []
planSteps.value = [] planSteps.value = []