Files
tori/src/timer.rs

78 lines
2.3 KiB
Rust

use std::sync::Arc;
use sqlx::sqlite::SqlitePool;
use crate::agent::{AgentEvent, AgentManager};
use crate::db::Timer;
pub fn start_timer_runner(pool: SqlitePool, agent_mgr: Arc<AgentManager>) {
tokio::spawn(timer_loop(pool, agent_mgr));
}
async fn timer_loop(pool: SqlitePool, agent_mgr: Arc<AgentManager>) {
tracing::info!("Timer runner started");
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
if let Err(e) = check_timers(&pool, &agent_mgr).await {
tracing::error!("Timer check error: {}", e);
}
}
}
async fn check_timers(pool: &SqlitePool, agent_mgr: &Arc<AgentManager>) -> anyhow::Result<()> {
let timers = sqlx::query_as::<_, Timer>(
"SELECT * FROM timers WHERE enabled = 1"
)
.fetch_all(pool)
.await?;
let now = chrono::Utc::now();
for timer in timers {
let due = if timer.last_run_at.is_empty() {
true
} else if let Ok(last) = chrono::NaiveDateTime::parse_from_str(&timer.last_run_at, "%Y-%m-%d %H:%M:%S") {
let last_utc = last.and_utc();
let elapsed = now.signed_duration_since(last_utc).num_seconds();
elapsed >= timer.interval_secs
} else {
true
};
if !due {
continue;
}
tracing::info!("Timer '{}' fired for project {}", timer.name, timer.project_id);
// Update last_run_at
let now_str = now.format("%Y-%m-%d %H:%M:%S").to_string();
let _ = sqlx::query("UPDATE timers SET last_run_at = ? WHERE id = ?")
.bind(&now_str)
.bind(&timer.id)
.execute(pool)
.await;
// Create a workflow for this timer
let workflow_id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT INTO workflows (id, project_id, requirement) VALUES (?, ?, ?)"
)
.bind(&workflow_id)
.bind(&timer.project_id)
.bind(&timer.requirement)
.execute(pool)
.await;
// Send event to agent
agent_mgr.send_event(&timer.project_id, AgentEvent::NewRequirement {
workflow_id,
requirement: timer.requirement.clone(),
template_id: None,
worker: None,
}).await;
}
Ok(())
}