diff --git a/src/agent.rs b/src/agent.rs index befe43e..72c1bc4 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -153,7 +153,16 @@ impl AgentManager { require_plan_approval: false, }; - match self.worker_mgr.assign_workflow(assign).await { + // Retry dispatch up to 3 times (worker might be reconnecting) + let mut dispatch_result = self.worker_mgr.assign_workflow(assign.clone()).await; + for attempt in 1..3 { + if dispatch_result.is_ok() { break; } + tracing::warn!("Dispatch attempt {} failed, retrying in 5s...", attempt); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + dispatch_result = self.worker_mgr.assign_workflow(assign.clone()).await; + } + + match dispatch_result { Ok(name) => { tracing::info!("Workflow {} dispatched to worker '{}'", workflow_id, name); } @@ -177,8 +186,71 @@ impl AgentManager { } } AgentEvent::Comment { workflow_id, content } => { - if let Err(e) = self.worker_mgr.forward_comment(&workflow_id, &content).await { - tracing::warn!("Failed to forward comment for workflow {}: {}", workflow_id, e); + // Try to forward to running worker first + if let Err(_) = self.worker_mgr.forward_comment(&workflow_id, &content).await { + // No worker handling this workflow — re-dispatch it + // Load workflow to get project_id and requirement + let wf = sqlx::query_as::<_, crate::db::Workflow>( + "SELECT * FROM workflows WHERE id = ?" + ).bind(&workflow_id).fetch_optional(&self.pool).await.ok().flatten(); + + if let Some(wf) = wf { + tracing::info!("Re-dispatching workflow {} with comment", workflow_id); + // Load latest state for resume + let state_json: Option = sqlx::query_scalar( + "SELECT state_json FROM agent_state_snapshots WHERE workflow_id = ? ORDER BY created_at DESC LIMIT 1" + ).bind(&workflow_id).fetch_optional(&self.pool).await.ok().flatten(); + + let mut initial_state = state_json + .and_then(|json| serde_json::from_str::(&json).ok()); + + // Attach comment as user feedback + reset failed/waiting steps + if let Some(ref mut state) = initial_state { + for step in &mut state.steps { + if matches!(step.status, crate::state::StepStatus::Failed) { + step.status = crate::state::StepStatus::Pending; + } + if matches!(step.status, crate::state::StepStatus::WaitingUser) { + step.status = crate::state::StepStatus::Running; + } + } + if let Some(order) = state.first_actionable_step() { + if let Some(step) = state.steps.iter_mut().find(|s| s.order == order) { + step.user_feedbacks.push(content); + } + } + } + + let assign = crate::worker::ServerToWorker::WorkflowAssign { + workflow_id: workflow_id.clone(), + project_id: wf.project_id.clone(), + requirement: wf.requirement, + template_id: if wf.template_id.is_empty() { None } else { Some(wf.template_id) }, + initial_state, + require_plan_approval: false, + }; + + let btx = { + let mut map = self.broadcast.write().await; + map.entry(wf.project_id.clone()) + .or_insert_with(|| broadcast::channel(64).0) + .clone() + }; + + match self.worker_mgr.assign_workflow(assign).await { + Ok(name) => { + let _ = sqlx::query("UPDATE workflows SET status = 'executing', status_reason = '' WHERE id = ?") + .bind(&workflow_id).execute(&self.pool).await; + let _ = btx.send(WsMessage::WorkflowStatusUpdate { + workflow_id, status: "executing".into(), + }); + tracing::info!("Workflow re-dispatched to worker '{}'", name); + } + Err(e) => { + tracing::error!("Failed to re-dispatch workflow {}: {}", workflow_id, e); + } + } + } } } } diff --git a/src/worker_runner.rs b/src/worker_runner.rs index 79a4503..529e084 100644 --- a/src/worker_runner.rs +++ b/src/worker_runner.rs @@ -92,19 +92,15 @@ async fn connect_and_run( // Store the new ws_tx so relay tasks can use it *shared_ws_tx.lock().await = Some(ws_tx); - // Ping keepalive - let ping_tx = shared_ws_tx.clone(); + // Heartbeat keepalive (text message so Cloudflare/proxies don't drop it) + let hb_tx = shared_ws_tx.clone(); let ping_task = tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); loop { interval.tick().await; - let guard = ping_tx.lock().await; - if guard.is_none() { break; } - // Can't send while holding mutex with Option, drop and re-acquire - drop(guard); - let mut guard = ping_tx.lock().await; + let mut guard = hb_tx.lock().await; if let Some(ref mut tx) = *guard { - if tx.send(Message::Ping(vec![].into())).await.is_err() { + if tx.send(Message::Text(r#"{"type":"heartbeat"}"#.into())).await.is_err() { *guard = None; break; } diff --git a/src/ws_worker.rs b/src/ws_worker.rs index a709ab7..6bf1745 100644 --- a/src/ws_worker.rs +++ b/src/ws_worker.rs @@ -90,6 +90,10 @@ async fn handle_worker_socket(socket: WebSocket, state: Arc) { while let Some(Ok(msg)) = receiver.next().await { match msg { Message::Text(text) => { + // Fast-path: heartbeat + if text.contains("\"heartbeat\"") { + continue; + } match serde_json::from_str::(&text) { Ok(worker_msg) => { handle_worker_message(&state_clone, worker_msg).await;