Resume incomplete workflows on server startup

Query workflows with status pending/planning/executing after restart
and re-submit them to the agent manager for continuation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-01 09:02:01 +00:00
parent 77784b3526
commit 837977cd17

View File

@@ -9,6 +9,7 @@ mod ws;
use std::sync::Arc; use std::sync::Arc;
use axum::Router; use axum::Router;
use sqlx::sqlite::SqlitePool;
use tower_http::cors::CorsLayer; use tower_http::cors::CorsLayer;
use tower_http::services::{ServeDir, ServeFile}; use tower_http::services::{ServeDir, ServeFile};
@@ -78,6 +79,9 @@ async fn main() -> anyhow::Result<()> {
timer::start_timer_runner(database.pool.clone(), agent_mgr.clone()); timer::start_timer_runner(database.pool.clone(), agent_mgr.clone());
// Resume incomplete workflows after restart
resume_workflows(database.pool.clone(), agent_mgr.clone()).await;
let state = Arc::new(AppState { let state = Arc::new(AppState {
db: database, db: database,
config: config.clone(), config: config.clone(),
@@ -91,10 +95,43 @@ async fn main() -> anyhow::Result<()> {
.fallback_service(ServeDir::new("web/dist").fallback(ServeFile::new("web/dist/index.html"))) .fallback_service(ServeDir::new("web/dist").fallback(ServeFile::new("web/dist/index.html")))
.layer(CorsLayer::permissive()); .layer(CorsLayer::permissive());
let addr = format!("{}:{}", config.server.host, config.server.port); let addr = format!("{}:{}", &config.server.host, config.server.port);
tracing::info!("Tori server listening on {}", addr); tracing::info!("Tori server listening on {}", addr);
let listener = tokio::net::TcpListener::bind(&addr).await?; let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app).await?; axum::serve(listener, app).await?;
Ok(()) Ok(())
} }
async fn resume_workflows(pool: SqlitePool, agent_mgr: Arc<agent::AgentManager>) {
let rows: Vec<(String, String, String)> = match sqlx::query_as(
"SELECT w.id, w.project_id, w.requirement FROM workflows w \
JOIN projects p ON w.project_id = p.id \
WHERE w.status IN ('pending', 'planning', 'executing') \
AND p.deleted = 0 \
ORDER BY w.created_at ASC"
)
.fetch_all(&pool)
.await
{
Ok(r) => r,
Err(e) => {
tracing::error!("Failed to query incomplete workflows: {}", e);
return;
}
};
if rows.is_empty() {
tracing::info!("No incomplete workflows to resume");
return;
}
tracing::info!("Resuming {} incomplete workflow(s)", rows.len());
for (workflow_id, project_id, requirement) in rows {
tracing::info!("Resuming workflow {} (project {})", workflow_id, project_id);
agent_mgr.send_event(&project_id, agent::AgentEvent::NewRequirement {
workflow_id,
requirement,
}).await;
}
}