From 837977cd1766fc141f0e5eeab4efd38a331a981e Mon Sep 17 00:00:00 2001 From: Fam Zheng Date: Sun, 1 Mar 2026 09:02:01 +0000 Subject: [PATCH] Resume incomplete workflows on server startup Query workflows with status pending/planning/executing after restart and re-submit them to the agent manager for continuation. Co-Authored-By: Claude Opus 4.6 --- src/main.rs | 39 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index f44a4e2..82dd362 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ mod ws; use std::sync::Arc; use axum::Router; +use sqlx::sqlite::SqlitePool; use tower_http::cors::CorsLayer; use tower_http::services::{ServeDir, ServeFile}; @@ -78,6 +79,9 @@ async fn main() -> anyhow::Result<()> { timer::start_timer_runner(database.pool.clone(), agent_mgr.clone()); + // Resume incomplete workflows after restart + resume_workflows(database.pool.clone(), agent_mgr.clone()).await; + let state = Arc::new(AppState { db: database, config: config.clone(), @@ -91,10 +95,43 @@ async fn main() -> anyhow::Result<()> { .fallback_service(ServeDir::new("web/dist").fallback(ServeFile::new("web/dist/index.html"))) .layer(CorsLayer::permissive()); - let addr = format!("{}:{}", config.server.host, config.server.port); + let addr = format!("{}:{}", &config.server.host, config.server.port); tracing::info!("Tori server listening on {}", addr); let listener = tokio::net::TcpListener::bind(&addr).await?; axum::serve(listener, app).await?; Ok(()) } + +async fn resume_workflows(pool: SqlitePool, agent_mgr: Arc) { + let rows: Vec<(String, String, String)> = match sqlx::query_as( + "SELECT w.id, w.project_id, w.requirement FROM workflows w \ + JOIN projects p ON w.project_id = p.id \ + WHERE w.status IN ('pending', 'planning', 'executing') \ + AND p.deleted = 0 \ + ORDER BY w.created_at ASC" + ) + .fetch_all(&pool) + .await + { + Ok(r) => r, + Err(e) => { + tracing::error!("Failed to query incomplete workflows: {}", e); + return; + } + }; + + if rows.is_empty() { + tracing::info!("No incomplete workflows to resume"); + return; + } + + tracing::info!("Resuming {} incomplete workflow(s)", rows.len()); + for (workflow_id, project_id, requirement) in rows { + tracing::info!("Resuming workflow {} (project {})", workflow_id, project_id); + agent_mgr.send_event(&project_id, agent::AgentEvent::NewRequirement { + workflow_id, + requirement, + }).await; + } +}