Tori: AI agent workflow manager - initial implementation

Rust (Axum) + Vue 3 + SQLite. Features:
- Project CRUD REST API with proper error handling
- Per-project agent loop (mpsc + broadcast channels)
- LLM-driven plan generation and replan on user feedback
- SSH command execution with status streaming
- WebSocket real-time updates to frontend
- Four-zone UI: requirement, plan (left), execution (right), comment

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-28 10:36:50 +00:00
parent 1122ab27dd
commit 7edbbee471
43 changed files with 7164 additions and 83 deletions

520
src/agent.rs Normal file
View File

@@ -0,0 +1,520 @@
use std::collections::HashMap;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use sqlx::sqlite::SqlitePool;
use tokio::sync::{mpsc, RwLock, broadcast};
use crate::llm::{LlmClient, ChatMessage};
use crate::ssh::SshExecutor;
use crate::{LlmConfig, SshConfig};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum AgentEvent {
NewRequirement { workflow_id: String, requirement: String },
Comment { workflow_id: String, content: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum WsMessage {
PlanUpdate { workflow_id: String, steps: Vec<PlanStepInfo> },
StepStatusUpdate { step_id: String, status: String, output: String },
WorkflowStatusUpdate { workflow_id: String, status: String },
Error { message: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanStepInfo {
pub order: i32,
pub description: String,
pub command: String,
}
pub struct AgentManager {
agents: RwLock<HashMap<String, mpsc::Sender<AgentEvent>>>,
broadcast: RwLock<HashMap<String, broadcast::Sender<WsMessage>>>,
pool: SqlitePool,
llm_config: LlmConfig,
ssh_config: SshConfig,
}
impl AgentManager {
pub fn new(pool: SqlitePool, llm_config: LlmConfig, ssh_config: SshConfig) -> Arc<Self> {
Arc::new(Self {
agents: RwLock::new(HashMap::new()),
broadcast: RwLock::new(HashMap::new()),
pool,
llm_config,
ssh_config,
})
}
pub async fn get_broadcast(&self, project_id: &str) -> broadcast::Receiver<WsMessage> {
let mut map = self.broadcast.write().await;
let tx = map.entry(project_id.to_string())
.or_insert_with(|| broadcast::channel(64).0);
tx.subscribe()
}
pub async fn send_event(self: &Arc<Self>, project_id: &str, event: AgentEvent) {
let agents = self.agents.read().await;
if let Some(tx) = agents.get(project_id) {
let _ = tx.send(event).await;
} else {
drop(agents);
self.spawn_agent(project_id.to_string()).await;
let agents = self.agents.read().await;
if let Some(tx) = agents.get(project_id) {
let _ = tx.send(event).await;
}
}
}
async fn spawn_agent(self: &Arc<Self>, project_id: String) {
let (tx, rx) = mpsc::channel(32);
self.agents.write().await.insert(project_id.clone(), tx);
let broadcast_tx = {
let mut map = self.broadcast.write().await;
map.entry(project_id.clone())
.or_insert_with(|| broadcast::channel(64).0)
.clone()
};
let pool = self.pool.clone();
let llm_config = self.llm_config.clone();
let ssh_config = self.ssh_config.clone();
tokio::spawn(agent_loop(project_id, rx, broadcast_tx, pool, llm_config, ssh_config));
}
}
async fn agent_loop(
project_id: String,
mut rx: mpsc::Receiver<AgentEvent>,
broadcast_tx: broadcast::Sender<WsMessage>,
pool: SqlitePool,
llm_config: LlmConfig,
ssh_config: SshConfig,
) {
let llm = LlmClient::new(&llm_config);
let ssh = SshExecutor::new(&ssh_config);
tracing::info!("Agent loop started for project {}", project_id);
while let Some(event) = rx.recv().await {
match event {
AgentEvent::NewRequirement { workflow_id, requirement } => {
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: "planning".into(),
});
let plan_result = generate_plan(&llm, &requirement).await;
match plan_result {
Ok(steps) => {
// Save steps to DB
for (i, step) in steps.iter().enumerate() {
let step_id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT INTO plan_steps (id, workflow_id, step_order, description, status) VALUES (?, ?, ?, ?, 'pending')"
)
.bind(&step_id)
.bind(&workflow_id)
.bind(i as i32 + 1)
.bind(&step.description)
.execute(&pool)
.await;
}
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
workflow_id: workflow_id.clone(),
steps: steps.clone(),
});
let _ = sqlx::query("UPDATE workflows SET status = 'executing' WHERE id = ?")
.bind(&workflow_id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: "executing".into(),
});
// Execute each step
let db_steps = sqlx::query_as::<_, crate::db::PlanStep>(
"SELECT * FROM plan_steps WHERE workflow_id = ? ORDER BY step_order"
)
.bind(&workflow_id)
.fetch_all(&pool)
.await
.unwrap_or_default();
let mut all_ok = true;
for (i, db_step) in db_steps.iter().enumerate() {
let _ = sqlx::query("UPDATE plan_steps SET status = 'running' WHERE id = ?")
.bind(&db_step.id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::StepStatusUpdate {
step_id: db_step.id.clone(),
status: "running".into(),
output: String::new(),
});
let cmd = &steps[i].command;
if cmd.is_empty() {
let _ = sqlx::query("UPDATE plan_steps SET status = 'done', output = 'Skipped (no command)' WHERE id = ?")
.bind(&db_step.id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::StepStatusUpdate {
step_id: db_step.id.clone(),
status: "done".into(),
output: "Skipped (no command)".into(),
});
continue;
}
match ssh.execute(cmd).await {
Ok(result) => {
let output = if result.stderr.is_empty() {
result.stdout.clone()
} else {
format!("{}\nSTDERR: {}", result.stdout, result.stderr)
};
let status = if result.exit_code == 0 { "done" } else { "failed" };
let _ = sqlx::query("UPDATE plan_steps SET status = ?, output = ? WHERE id = ?")
.bind(status)
.bind(&output)
.bind(&db_step.id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::StepStatusUpdate {
step_id: db_step.id.clone(),
status: status.into(),
output,
});
if result.exit_code != 0 {
all_ok = false;
break;
}
}
Err(e) => {
let _ = sqlx::query("UPDATE plan_steps SET status = 'failed', output = ? WHERE id = ?")
.bind(e.to_string())
.bind(&db_step.id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::StepStatusUpdate {
step_id: db_step.id.clone(),
status: "failed".into(),
output: e.to_string(),
});
all_ok = false;
break;
}
}
}
let final_status = if all_ok { "done" } else { "failed" };
let _ = sqlx::query("UPDATE workflows SET status = ? WHERE id = ?")
.bind(final_status)
.bind(&workflow_id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: final_status.into(),
});
}
Err(e) => {
let _ = broadcast_tx.send(WsMessage::Error {
message: format!("Plan generation failed: {}", e),
});
let _ = sqlx::query("UPDATE workflows SET status = 'failed' WHERE id = ?")
.bind(&workflow_id)
.execute(&pool)
.await;
}
}
}
AgentEvent::Comment { workflow_id, content } => {
tracing::info!("Comment on workflow {}: {}", workflow_id, content);
// Get current workflow and steps for context
let wf = sqlx::query_as::<_, crate::db::Workflow>(
"SELECT * FROM workflows WHERE id = ?",
)
.bind(&workflow_id)
.fetch_optional(&pool)
.await
.ok()
.flatten();
let Some(wf) = wf else { continue };
let current_steps = sqlx::query_as::<_, crate::db::PlanStep>(
"SELECT * FROM plan_steps WHERE workflow_id = ? ORDER BY step_order",
)
.bind(&workflow_id)
.fetch_all(&pool)
.await
.unwrap_or_default();
// Ask LLM to replan
let replan_result =
replan(&llm, &wf.requirement, &current_steps, &content).await;
match replan_result {
Ok(new_steps) => {
// Clear old pending steps, keep done ones
let _ = sqlx::query(
"DELETE FROM plan_steps WHERE workflow_id = ? AND status IN ('pending', 'failed')",
)
.bind(&workflow_id)
.execute(&pool)
.await;
let done_count = sqlx::query_scalar::<_, i32>(
"SELECT COUNT(*) FROM plan_steps WHERE workflow_id = ?",
)
.bind(&workflow_id)
.fetch_one(&pool)
.await
.unwrap_or(0);
for (i, step) in new_steps.iter().enumerate() {
let step_id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT INTO plan_steps (id, workflow_id, step_order, description, status) VALUES (?, ?, ?, ?, 'pending')",
)
.bind(&step_id)
.bind(&workflow_id)
.bind(done_count + i as i32 + 1)
.bind(&step.description)
.execute(&pool)
.await;
}
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
workflow_id: workflow_id.clone(),
steps: new_steps.clone(),
});
// Resume execution
let _ = sqlx::query(
"UPDATE workflows SET status = 'executing' WHERE id = ?",
)
.bind(&workflow_id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: "executing".into(),
});
let db_steps = sqlx::query_as::<_, crate::db::PlanStep>(
"SELECT * FROM plan_steps WHERE workflow_id = ? AND status = 'pending' ORDER BY step_order",
)
.bind(&workflow_id)
.fetch_all(&pool)
.await
.unwrap_or_default();
let mut all_ok = true;
for (i, db_step) in db_steps.iter().enumerate() {
let _ = sqlx::query(
"UPDATE plan_steps SET status = 'running' WHERE id = ?",
)
.bind(&db_step.id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::StepStatusUpdate {
step_id: db_step.id.clone(),
status: "running".into(),
output: String::new(),
});
let cmd = if i < new_steps.len() {
&new_steps[i].command
} else {
&String::new()
};
if cmd.is_empty() {
let _ = sqlx::query(
"UPDATE plan_steps SET status = 'done', output = 'Skipped (no command)' WHERE id = ?",
)
.bind(&db_step.id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::StepStatusUpdate {
step_id: db_step.id.clone(),
status: "done".into(),
output: "Skipped (no command)".into(),
});
continue;
}
match ssh.execute(cmd).await {
Ok(result) => {
let output = if result.stderr.is_empty() {
result.stdout.clone()
} else {
format!("{}\nSTDERR: {}", result.stdout, result.stderr)
};
let status = if result.exit_code == 0 {
"done"
} else {
"failed"
};
let _ = sqlx::query(
"UPDATE plan_steps SET status = ?, output = ? WHERE id = ?",
)
.bind(status)
.bind(&output)
.bind(&db_step.id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::StepStatusUpdate {
step_id: db_step.id.clone(),
status: status.into(),
output,
});
if result.exit_code != 0 {
all_ok = false;
break;
}
}
Err(e) => {
let _ = sqlx::query(
"UPDATE plan_steps SET status = 'failed', output = ? WHERE id = ?",
)
.bind(e.to_string())
.bind(&db_step.id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::StepStatusUpdate {
step_id: db_step.id.clone(),
status: "failed".into(),
output: e.to_string(),
});
all_ok = false;
break;
}
}
}
let final_status = if all_ok { "done" } else { "failed" };
let _ = sqlx::query(
"UPDATE workflows SET status = ? WHERE id = ?",
)
.bind(final_status)
.bind(&workflow_id)
.execute(&pool)
.await;
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
workflow_id: workflow_id.clone(),
status: final_status.into(),
});
}
Err(e) => {
let _ = broadcast_tx.send(WsMessage::Error {
message: format!("Replan failed: {}", e),
});
}
}
}
}
}
tracing::info!("Agent loop ended for project {}", project_id);
}
async fn generate_plan(llm: &LlmClient, requirement: &str) -> anyhow::Result<Vec<PlanStepInfo>> {
let system_prompt = r#"You are an AI workflow planner. Given a requirement, generate a list of executable steps.
Respond in JSON format only, as an array of objects:
[
{"order": 1, "description": "what this step does", "command": "shell command to execute via SSH"},
...
]
Keep the plan practical and each command should be a single shell command.
If a step doesn't need a shell command (e.g., verification), set command to empty string."#;
let response = llm.chat(vec![
ChatMessage { role: "system".into(), content: system_prompt.into() },
ChatMessage { role: "user".into(), content: requirement.into() },
]).await?;
let json_str = extract_json_array(&response);
let steps: Vec<PlanStepInfo> = serde_json::from_str(json_str)?;
Ok(steps)
}
fn extract_json_array(response: &str) -> &str {
if let Some(start) = response.find('[') {
if let Some(end) = response.rfind(']') {
return &response[start..=end];
}
}
response
}
async fn replan(
llm: &LlmClient,
requirement: &str,
current_steps: &[crate::db::PlanStep],
comment: &str,
) -> anyhow::Result<Vec<PlanStepInfo>> {
let steps_summary: String = current_steps
.iter()
.map(|s| format!(" {}. [{}] {}", s.step_order, s.status, s.description))
.collect::<Vec<_>>()
.join("\n");
let system_prompt = r#"You are an AI workflow planner. The user has provided feedback on an existing plan.
Based on the original requirement, current step statuses, and user feedback, generate ONLY the new/remaining steps that need to be executed.
Do NOT include steps that are already done.
Respond in JSON format only, as an array of objects:
[
{"order": 1, "description": "what this step does", "command": "shell command to execute via SSH"},
...
]
If no new steps are needed (feedback is just informational), return an empty array: []"#;
let user_msg = format!(
"Original requirement:\n{}\n\nCurrent steps:\n{}\n\nUser feedback:\n{}",
requirement, steps_summary, comment
);
let response = llm
.chat(vec![
ChatMessage {
role: "system".into(),
content: system_prompt.into(),
},
ChatMessage {
role: "user".into(),
content: user_msg,
},
])
.await?;
let json_str = extract_json_array(&response);
let steps: Vec<PlanStepInfo> = serde_json::from_str(json_str)?;
Ok(steps)
}