fix: robust worker interaction
- Text heartbeat every 30s (not ping frames) to survive Cloudflare/proxy - Comment on completed/failed workflow re-dispatches to worker with state resume - Dispatch retries 3 times with 5s delay (survives reconnect window)
This commit is contained in:
78
src/agent.rs
78
src/agent.rs
@@ -153,7 +153,16 @@ impl AgentManager {
|
||||
require_plan_approval: false,
|
||||
};
|
||||
|
||||
match self.worker_mgr.assign_workflow(assign).await {
|
||||
// Retry dispatch up to 3 times (worker might be reconnecting)
|
||||
let mut dispatch_result = self.worker_mgr.assign_workflow(assign.clone()).await;
|
||||
for attempt in 1..3 {
|
||||
if dispatch_result.is_ok() { break; }
|
||||
tracing::warn!("Dispatch attempt {} failed, retrying in 5s...", attempt);
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
dispatch_result = self.worker_mgr.assign_workflow(assign.clone()).await;
|
||||
}
|
||||
|
||||
match dispatch_result {
|
||||
Ok(name) => {
|
||||
tracing::info!("Workflow {} dispatched to worker '{}'", workflow_id, name);
|
||||
}
|
||||
@@ -177,8 +186,71 @@ impl AgentManager {
|
||||
}
|
||||
}
|
||||
AgentEvent::Comment { workflow_id, content } => {
|
||||
if let Err(e) = self.worker_mgr.forward_comment(&workflow_id, &content).await {
|
||||
tracing::warn!("Failed to forward comment for workflow {}: {}", workflow_id, e);
|
||||
// Try to forward to running worker first
|
||||
if let Err(_) = self.worker_mgr.forward_comment(&workflow_id, &content).await {
|
||||
// No worker handling this workflow — re-dispatch it
|
||||
// Load workflow to get project_id and requirement
|
||||
let wf = sqlx::query_as::<_, crate::db::Workflow>(
|
||||
"SELECT * FROM workflows WHERE id = ?"
|
||||
).bind(&workflow_id).fetch_optional(&self.pool).await.ok().flatten();
|
||||
|
||||
if let Some(wf) = wf {
|
||||
tracing::info!("Re-dispatching workflow {} with comment", workflow_id);
|
||||
// Load latest state for resume
|
||||
let state_json: Option<String> = sqlx::query_scalar(
|
||||
"SELECT state_json FROM agent_state_snapshots WHERE workflow_id = ? ORDER BY created_at DESC LIMIT 1"
|
||||
).bind(&workflow_id).fetch_optional(&self.pool).await.ok().flatten();
|
||||
|
||||
let mut initial_state = state_json
|
||||
.and_then(|json| serde_json::from_str::<crate::state::AgentState>(&json).ok());
|
||||
|
||||
// Attach comment as user feedback + reset failed/waiting steps
|
||||
if let Some(ref mut state) = initial_state {
|
||||
for step in &mut state.steps {
|
||||
if matches!(step.status, crate::state::StepStatus::Failed) {
|
||||
step.status = crate::state::StepStatus::Pending;
|
||||
}
|
||||
if matches!(step.status, crate::state::StepStatus::WaitingUser) {
|
||||
step.status = crate::state::StepStatus::Running;
|
||||
}
|
||||
}
|
||||
if let Some(order) = state.first_actionable_step() {
|
||||
if let Some(step) = state.steps.iter_mut().find(|s| s.order == order) {
|
||||
step.user_feedbacks.push(content);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let assign = crate::worker::ServerToWorker::WorkflowAssign {
|
||||
workflow_id: workflow_id.clone(),
|
||||
project_id: wf.project_id.clone(),
|
||||
requirement: wf.requirement,
|
||||
template_id: if wf.template_id.is_empty() { None } else { Some(wf.template_id) },
|
||||
initial_state,
|
||||
require_plan_approval: false,
|
||||
};
|
||||
|
||||
let btx = {
|
||||
let mut map = self.broadcast.write().await;
|
||||
map.entry(wf.project_id.clone())
|
||||
.or_insert_with(|| broadcast::channel(64).0)
|
||||
.clone()
|
||||
};
|
||||
|
||||
match self.worker_mgr.assign_workflow(assign).await {
|
||||
Ok(name) => {
|
||||
let _ = sqlx::query("UPDATE workflows SET status = 'executing', status_reason = '' WHERE id = ?")
|
||||
.bind(&workflow_id).execute(&self.pool).await;
|
||||
let _ = btx.send(WsMessage::WorkflowStatusUpdate {
|
||||
workflow_id, status: "executing".into(),
|
||||
});
|
||||
tracing::info!("Workflow re-dispatched to worker '{}'", name);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to re-dispatch workflow {}: {}", workflow_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,19 +92,15 @@ async fn connect_and_run(
|
||||
// Store the new ws_tx so relay tasks can use it
|
||||
*shared_ws_tx.lock().await = Some(ws_tx);
|
||||
|
||||
// Ping keepalive
|
||||
let ping_tx = shared_ws_tx.clone();
|
||||
// Heartbeat keepalive (text message so Cloudflare/proxies don't drop it)
|
||||
let hb_tx = shared_ws_tx.clone();
|
||||
let ping_task = tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let guard = ping_tx.lock().await;
|
||||
if guard.is_none() { break; }
|
||||
// Can't send while holding mutex with Option, drop and re-acquire
|
||||
drop(guard);
|
||||
let mut guard = ping_tx.lock().await;
|
||||
let mut guard = hb_tx.lock().await;
|
||||
if let Some(ref mut tx) = *guard {
|
||||
if tx.send(Message::Ping(vec![].into())).await.is_err() {
|
||||
if tx.send(Message::Text(r#"{"type":"heartbeat"}"#.into())).await.is_err() {
|
||||
*guard = None;
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -90,6 +90,10 @@ async fn handle_worker_socket(socket: WebSocket, state: Arc<WsWorkerState>) {
|
||||
while let Some(Ok(msg)) = receiver.next().await {
|
||||
match msg {
|
||||
Message::Text(text) => {
|
||||
// Fast-path: heartbeat
|
||||
if text.contains("\"heartbeat\"") {
|
||||
continue;
|
||||
}
|
||||
match serde_json::from_str::<WorkerToServer>(&text) {
|
||||
Ok(worker_msg) => {
|
||||
handle_worker_message(&state_clone, worker_msg).await;
|
||||
|
||||
Reference in New Issue
Block a user