- POST /tori/api/token — sign ES256 JWT with configurable private key - exec.rs auto-generates and injects TORI_JWT env var for all commands - Config: jwt_private_key field for PEM file path
2299 lines
100 KiB
Rust
2299 lines
100 KiB
Rust
use std::collections::HashMap;
|
||
use std::path::Path;
|
||
use std::sync::Arc;
|
||
use std::sync::atomic::{AtomicU16, Ordering};
|
||
use serde::{Deserialize, Serialize};
|
||
use sqlx::sqlite::SqlitePool;
|
||
use tokio::sync::{mpsc, RwLock, broadcast};
|
||
|
||
use crate::llm::{LlmClient, ChatMessage, Tool, ToolFunction};
|
||
use crate::exec::LocalExecutor;
|
||
use crate::template::{self, LoadedTemplate};
|
||
use crate::tools::ExternalToolManager;
|
||
use crate::LlmConfig;
|
||
|
||
use crate::state::{AgentState, AgentPhase, Artifact, Step, StepStatus, StepResult, StepResultStatus, check_scratchpad_size};
|
||
|
||
pub struct ServiceInfo {
|
||
pub port: u16,
|
||
pub pid: u32,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
#[serde(tag = "type")]
|
||
pub enum AgentEvent {
|
||
NewRequirement { workflow_id: String, requirement: String, template_id: Option<String> },
|
||
Comment { workflow_id: String, content: String },
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
#[serde(tag = "type")]
|
||
pub enum WsMessage {
|
||
PlanUpdate { workflow_id: String, steps: Vec<PlanStepInfo> },
|
||
StepStatusUpdate { step_id: String, status: String, output: String },
|
||
WorkflowStatusUpdate { workflow_id: String, status: String },
|
||
RequirementUpdate { workflow_id: String, requirement: String },
|
||
ReportReady { workflow_id: String },
|
||
ProjectUpdate { project_id: String, name: String },
|
||
LlmCallLog { workflow_id: String, entry: crate::db::LlmCallLogEntry },
|
||
ActivityUpdate { workflow_id: String, activity: String },
|
||
Error { message: String },
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
pub struct PlanStepInfo {
|
||
pub order: i32,
|
||
pub description: String,
|
||
pub command: String,
|
||
#[serde(skip_serializing_if = "Option::is_none")]
|
||
pub status: Option<String>,
|
||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||
pub artifacts: Vec<Artifact>,
|
||
}
|
||
|
||
pub fn plan_infos_from_state(state: &AgentState) -> Vec<PlanStepInfo> {
|
||
state.steps.iter().map(|s| {
|
||
let status = match s.status {
|
||
StepStatus::Pending => "pending",
|
||
StepStatus::Running => "running",
|
||
StepStatus::WaitingUser => "waiting_user",
|
||
StepStatus::Done => "done",
|
||
StepStatus::Failed => "failed",
|
||
};
|
||
PlanStepInfo {
|
||
order: s.order,
|
||
description: s.title.clone(),
|
||
command: s.description.clone(),
|
||
status: Some(status.to_string()),
|
||
artifacts: s.artifacts.clone(),
|
||
}
|
||
}).collect()
|
||
}
|
||
|
||
pub struct AgentManager {
|
||
agents: RwLock<HashMap<String, mpsc::Sender<AgentEvent>>>,
|
||
broadcast: RwLock<HashMap<String, broadcast::Sender<WsMessage>>>,
|
||
pub services: RwLock<HashMap<String, ServiceInfo>>,
|
||
next_port: AtomicU16,
|
||
pool: SqlitePool,
|
||
llm_config: LlmConfig,
|
||
template_repo: Option<crate::TemplateRepoConfig>,
|
||
kb: Option<Arc<crate::kb::KbManager>>,
|
||
jwt_private_key_path: Option<String>,
|
||
}
|
||
|
||
impl AgentManager {
|
||
pub fn new(
|
||
pool: SqlitePool,
|
||
llm_config: LlmConfig,
|
||
template_repo: Option<crate::TemplateRepoConfig>,
|
||
kb: Option<Arc<crate::kb::KbManager>>,
|
||
jwt_private_key_path: Option<String>,
|
||
) -> Arc<Self> {
|
||
Arc::new(Self {
|
||
agents: RwLock::new(HashMap::new()),
|
||
broadcast: RwLock::new(HashMap::new()),
|
||
services: RwLock::new(HashMap::new()),
|
||
next_port: AtomicU16::new(9100),
|
||
pool,
|
||
llm_config,
|
||
template_repo,
|
||
kb,
|
||
jwt_private_key_path,
|
||
})
|
||
}
|
||
|
||
pub fn allocate_port(&self) -> u16 {
|
||
self.next_port.fetch_add(1, Ordering::Relaxed)
|
||
}
|
||
|
||
pub async fn get_service_port(&self, project_id: &str) -> Option<u16> {
|
||
self.services.read().await.get(project_id).map(|s| s.port)
|
||
}
|
||
|
||
pub async fn get_broadcast(&self, project_id: &str) -> broadcast::Receiver<WsMessage> {
|
||
let mut map = self.broadcast.write().await;
|
||
let tx = map.entry(project_id.to_string())
|
||
.or_insert_with(|| broadcast::channel(64).0);
|
||
tx.subscribe()
|
||
}
|
||
|
||
pub async fn send_event(self: &Arc<Self>, project_id: &str, event: AgentEvent) {
|
||
let agents = self.agents.read().await;
|
||
if let Some(tx) = agents.get(project_id) {
|
||
let _ = tx.send(event).await;
|
||
} else {
|
||
drop(agents);
|
||
self.spawn_agent(project_id.to_string()).await;
|
||
let agents = self.agents.read().await;
|
||
if let Some(tx) = agents.get(project_id) {
|
||
let _ = tx.send(event).await;
|
||
}
|
||
}
|
||
}
|
||
|
||
async fn spawn_agent(self: &Arc<Self>, project_id: String) {
|
||
let (tx, rx) = mpsc::channel(32);
|
||
self.agents.write().await.insert(project_id.clone(), tx);
|
||
|
||
let broadcast_tx = {
|
||
let mut map = self.broadcast.write().await;
|
||
map.entry(project_id.clone())
|
||
.or_insert_with(|| broadcast::channel(64).0)
|
||
.clone()
|
||
};
|
||
|
||
let mgr = Arc::clone(self);
|
||
tokio::spawn(agent_loop(project_id, rx, broadcast_tx, mgr));
|
||
}
|
||
}
|
||
|
||
// Template system is in crate::template
|
||
|
||
/// Read INSTRUCTIONS.md from workdir if it exists.
|
||
async fn read_instructions(workdir: &str) -> String {
|
||
let path = format!("{}/INSTRUCTIONS.md", workdir);
|
||
tokio::fs::read_to_string(&path).await.unwrap_or_default()
|
||
}
|
||
|
||
async fn ensure_workspace(exec: &LocalExecutor, workdir: &str) {
|
||
let _ = tokio::fs::create_dir_all(workdir).await;
|
||
let setup_script = format!("{}/scripts/setup.sh", workdir);
|
||
if Path::new(&setup_script).exists() {
|
||
tracing::info!("Running setup.sh in {}", workdir);
|
||
let _ = exec.execute("bash scripts/setup.sh", workdir).await;
|
||
} else {
|
||
let venv_path = format!("{}/.venv", workdir);
|
||
if !Path::new(&venv_path).exists() {
|
||
let _ = exec.execute("uv venv .venv", workdir).await;
|
||
let _ = exec.execute("uv pip install httpx fastapi uvicorn requests flask pydantic numpy pandas matplotlib pillow jinja2 pyyaml python-dotenv beautifulsoup4 lxml aiohttp aiofiles pytest rich click typer sqlalchemy", workdir).await;
|
||
}
|
||
}
|
||
}
|
||
|
||
async fn agent_loop(
|
||
project_id: String,
|
||
mut rx: mpsc::Receiver<AgentEvent>,
|
||
broadcast_tx: broadcast::Sender<WsMessage>,
|
||
mgr: Arc<AgentManager>,
|
||
) {
|
||
let pool = mgr.pool.clone();
|
||
let llm_config = mgr.llm_config.clone();
|
||
let llm = LlmClient::new(&llm_config);
|
||
let exec = LocalExecutor::new(mgr.jwt_private_key_path.clone());
|
||
let workdir = format!("/app/data/workspaces/{}", project_id);
|
||
|
||
tracing::info!("Agent loop started for project {}", project_id);
|
||
|
||
while let Some(event) = rx.recv().await {
|
||
match event {
|
||
AgentEvent::NewRequirement { workflow_id, requirement, template_id: forced_template } => {
|
||
tracing::info!("Processing new requirement for workflow {}", workflow_id);
|
||
// Generate project title in background (don't block the agent loop)
|
||
{
|
||
let title_llm = LlmClient::new(&llm_config);
|
||
let title_pool = pool.clone();
|
||
let title_btx = broadcast_tx.clone();
|
||
let title_pid = project_id.clone();
|
||
let title_req = requirement.clone();
|
||
tokio::spawn(async move {
|
||
if let Ok(title) = generate_title(&title_llm, &title_req).await {
|
||
let _ = sqlx::query("UPDATE projects SET name = ? WHERE id = ?")
|
||
.bind(&title)
|
||
.bind(&title_pid)
|
||
.execute(&title_pool)
|
||
.await;
|
||
let _ = title_btx.send(WsMessage::ProjectUpdate {
|
||
project_id: title_pid,
|
||
name: title,
|
||
});
|
||
}
|
||
});
|
||
}
|
||
|
||
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
|
||
workflow_id: workflow_id.clone(),
|
||
status: "executing".into(),
|
||
});
|
||
let _ = sqlx::query("UPDATE workflows SET status = 'executing' WHERE id = ?")
|
||
.bind(&workflow_id)
|
||
.execute(&pool)
|
||
.await;
|
||
|
||
// Template selection + workspace setup
|
||
let template_id = if forced_template.is_some() {
|
||
tracing::info!("Using forced template: {:?}", forced_template);
|
||
forced_template
|
||
} else {
|
||
template::select_template(&llm, &requirement, mgr.template_repo.as_ref()).await
|
||
};
|
||
// Persist template_id to workflow
|
||
if let Some(ref tid) = template_id {
|
||
let _ = sqlx::query("UPDATE workflows SET template_id = ? WHERE id = ?")
|
||
.bind(tid)
|
||
.bind(&workflow_id)
|
||
.execute(&pool)
|
||
.await;
|
||
}
|
||
|
||
let loaded_template = if let Some(ref tid) = template_id {
|
||
tracing::info!("Template selected for workflow {}: {}", workflow_id, tid);
|
||
let _ = tokio::fs::create_dir_all(&workdir).await;
|
||
|
||
if template::is_repo_template(tid) {
|
||
// Repo template: extract from git then load
|
||
match template::extract_repo_template(tid, mgr.template_repo.as_ref()).await {
|
||
Ok(template_dir) => {
|
||
if let Err(e) = template::apply_template(&template_dir, &workdir).await {
|
||
tracing::error!("Failed to apply repo template {}: {}", tid, e);
|
||
}
|
||
match LoadedTemplate::load_from_dir(tid, &template_dir).await {
|
||
Ok(t) => Some(t),
|
||
Err(e) => {
|
||
tracing::error!("Failed to load repo template {}: {}", tid, e);
|
||
None
|
||
}
|
||
}
|
||
}
|
||
Err(e) => {
|
||
tracing::error!("Failed to extract repo template {}: {}", tid, e);
|
||
None
|
||
}
|
||
}
|
||
} else {
|
||
// Local built-in template
|
||
let template_dir = std::path::Path::new(template::templates_dir()).join(tid);
|
||
if let Err(e) = template::apply_template(&template_dir, &workdir).await {
|
||
tracing::error!("Failed to apply template {}: {}", tid, e);
|
||
}
|
||
match LoadedTemplate::load(tid).await {
|
||
Ok(t) => Some(t),
|
||
Err(e) => {
|
||
tracing::error!("Failed to load template {}: {}", tid, e);
|
||
None
|
||
}
|
||
}
|
||
}
|
||
} else {
|
||
None
|
||
};
|
||
|
||
// Import KB files from template
|
||
if let Some(ref t) = loaded_template {
|
||
if let Some(ref kb) = mgr.kb {
|
||
let mut batch_items: Vec<(String, String)> = Vec::new();
|
||
for (title, content) in &t.kb_files {
|
||
// Check if article already exists by title
|
||
let existing: Option<String> = sqlx::query_scalar(
|
||
"SELECT id FROM kb_articles WHERE title = ?"
|
||
)
|
||
.bind(title)
|
||
.fetch_optional(&pool)
|
||
.await
|
||
.ok()
|
||
.flatten();
|
||
|
||
let article_id = if let Some(id) = existing {
|
||
let _ = sqlx::query(
|
||
"UPDATE kb_articles SET content = ?, updated_at = datetime('now') WHERE id = ?"
|
||
)
|
||
.bind(content)
|
||
.bind(&id)
|
||
.execute(&pool)
|
||
.await;
|
||
id
|
||
} else {
|
||
let id = uuid::Uuid::new_v4().to_string();
|
||
let _ = sqlx::query(
|
||
"INSERT INTO kb_articles (id, title, content) VALUES (?, ?, ?)"
|
||
)
|
||
.bind(&id)
|
||
.bind(title)
|
||
.bind(content)
|
||
.execute(&pool)
|
||
.await;
|
||
id
|
||
};
|
||
|
||
batch_items.push((article_id, content.clone()));
|
||
}
|
||
// Batch index: single embed.py call for all articles
|
||
if !batch_items.is_empty() {
|
||
if let Err(e) = kb.index_batch(&batch_items).await {
|
||
tracing::warn!("Failed to batch index KB articles: {}", e);
|
||
}
|
||
}
|
||
tracing::info!("Imported {} KB articles from template", t.kb_files.len());
|
||
}
|
||
}
|
||
|
||
ensure_workspace(&exec, &workdir).await;
|
||
let _ = tokio::fs::write(format!("{}/requirement.md", workdir), &requirement).await;
|
||
|
||
// Run template setup if present
|
||
if let Some(ref tid) = template_id {
|
||
let template_dir = if template::is_repo_template(tid) {
|
||
template::extract_repo_template(tid, mgr.template_repo.as_ref())
|
||
.await
|
||
.ok()
|
||
} else {
|
||
Some(std::path::Path::new(template::templates_dir()).join(tid))
|
||
};
|
||
if let Some(ref tdir) = template_dir {
|
||
if let Err(e) = template::run_setup(tdir, &workdir).await {
|
||
tracing::error!("Template setup failed for {}: {}", tid, e);
|
||
}
|
||
}
|
||
}
|
||
|
||
let instructions = if let Some(ref t) = loaded_template {
|
||
t.instructions.clone()
|
||
} else {
|
||
read_instructions(&workdir).await
|
||
};
|
||
|
||
let ext_tools = loaded_template.as_ref().map(|t| &t.external_tools);
|
||
let plan_approval = loaded_template.as_ref().map_or(false, |t| t.require_plan_approval);
|
||
|
||
tracing::info!("Starting agent loop for workflow {}", workflow_id);
|
||
// Run tool-calling agent loop
|
||
let result = run_agent_loop(
|
||
&llm, &exec, &pool, &broadcast_tx,
|
||
&project_id, &workflow_id, &requirement, &workdir, &mgr,
|
||
&instructions, None, ext_tools, &mut rx,
|
||
plan_approval,
|
||
).await;
|
||
|
||
let final_status = if result.is_ok() { "done" } else { "failed" };
|
||
tracing::info!("Agent loop finished for workflow {}, status: {}", workflow_id, final_status);
|
||
if let Err(e) = &result {
|
||
tracing::error!("Agent error for workflow {}: {}", workflow_id, e);
|
||
let _ = broadcast_tx.send(WsMessage::Error {
|
||
message: format!("Agent error: {}", e),
|
||
});
|
||
}
|
||
|
||
let _ = sqlx::query("UPDATE workflows SET status = ? WHERE id = ?")
|
||
.bind(final_status)
|
||
.bind(&workflow_id)
|
||
.execute(&pool)
|
||
.await;
|
||
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
|
||
workflow_id: workflow_id.clone(),
|
||
status: final_status.into(),
|
||
});
|
||
|
||
// Generate report from execution log
|
||
let log_entries = sqlx::query_as::<_, crate::db::ExecutionLogEntry>(
|
||
"SELECT * FROM execution_log WHERE workflow_id = ? ORDER BY created_at"
|
||
)
|
||
.bind(&workflow_id)
|
||
.fetch_all(&pool)
|
||
.await
|
||
.unwrap_or_default();
|
||
|
||
if let Ok(report) = generate_report(&llm, &requirement, &log_entries, &project_id).await {
|
||
let _ = sqlx::query("UPDATE workflows SET report = ? WHERE id = ?")
|
||
.bind(&report)
|
||
.bind(&workflow_id)
|
||
.execute(&pool)
|
||
.await;
|
||
let _ = broadcast_tx.send(WsMessage::ReportReady {
|
||
workflow_id: workflow_id.clone(),
|
||
});
|
||
}
|
||
}
|
||
AgentEvent::Comment { workflow_id, content } => {
|
||
tracing::info!("Comment on workflow {}: {}", workflow_id, content);
|
||
|
||
let wf = sqlx::query_as::<_, crate::db::Workflow>(
|
||
"SELECT * FROM workflows WHERE id = ?",
|
||
)
|
||
.bind(&workflow_id)
|
||
.fetch_optional(&pool)
|
||
.await
|
||
.ok()
|
||
.flatten();
|
||
|
||
let Some(wf) = wf else { continue };
|
||
|
||
// Load latest state snapshot
|
||
let snapshot = sqlx::query_scalar::<_, String>(
|
||
"SELECT state_json FROM agent_state_snapshots WHERE workflow_id = ? ORDER BY created_at DESC LIMIT 1"
|
||
)
|
||
.bind(&workflow_id)
|
||
.fetch_optional(&pool)
|
||
.await
|
||
.ok()
|
||
.flatten();
|
||
|
||
let mut state = snapshot
|
||
.and_then(|json| serde_json::from_str::<AgentState>(&json).ok())
|
||
.unwrap_or_else(AgentState::new);
|
||
|
||
// Resume directly if: workflow is failed/done/waiting_user,
|
||
// OR if state snapshot has a WaitingUser step (e.g. after pod restart)
|
||
let has_waiting_step = state.steps.iter().any(|s| matches!(s.status, StepStatus::WaitingUser));
|
||
let is_resuming = wf.status == "failed" || wf.status == "done"
|
||
|| wf.status == "waiting_user" || has_waiting_step;
|
||
if is_resuming {
|
||
// Reset Failed/WaitingUser steps so they get re-executed
|
||
for step in &mut state.steps {
|
||
if matches!(step.status, StepStatus::Failed) {
|
||
step.status = StepStatus::Pending;
|
||
}
|
||
if matches!(step.status, StepStatus::WaitingUser) {
|
||
// Mark as Running so it continues (not re-plans)
|
||
step.status = StepStatus::Running;
|
||
}
|
||
}
|
||
// Attach comment as feedback to the first actionable step
|
||
if let Some(order) = state.first_actionable_step() {
|
||
if let Some(step) = state.steps.iter_mut().find(|s| s.order == order) {
|
||
step.user_feedbacks.push(content.clone());
|
||
}
|
||
}
|
||
tracing::info!("[workflow {}] Resuming from state (status={}), first actionable: {:?}",
|
||
workflow_id, wf.status, state.first_actionable_step());
|
||
} else {
|
||
// Active workflow: LLM decides whether to revise plan
|
||
state = process_feedback(
|
||
&llm, &pool, &broadcast_tx,
|
||
&project_id, &workflow_id, state, &content,
|
||
).await;
|
||
}
|
||
|
||
// If there are actionable steps, resume execution
|
||
if state.first_actionable_step().is_some() {
|
||
ensure_workspace(&exec, &workdir).await;
|
||
|
||
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
|
||
workflow_id: workflow_id.clone(),
|
||
status: "executing".into(),
|
||
});
|
||
let _ = sqlx::query("UPDATE workflows SET status = 'executing' WHERE id = ?")
|
||
.bind(&workflow_id)
|
||
.execute(&pool)
|
||
.await;
|
||
|
||
// Prepare state for execution: set first pending step to Running
|
||
if let Some(next) = state.first_actionable_step() {
|
||
let was_same_step = matches!(state.phase, AgentPhase::Executing { step } if step == next);
|
||
if let Some(step) = state.steps.iter_mut().find(|s| s.order == next) {
|
||
if matches!(step.status, StepStatus::Pending) {
|
||
step.status = StepStatus::Running;
|
||
}
|
||
}
|
||
state.phase = AgentPhase::Executing { step: next };
|
||
// Only clear chat history when advancing to a new step;
|
||
// keep it when resuming the same step after ask_user
|
||
if !was_same_step {
|
||
state.current_step_chat_history.clear();
|
||
}
|
||
}
|
||
|
||
let instructions = read_instructions(&workdir).await;
|
||
|
||
// Reload template config if available
|
||
let loaded_template = if !wf.template_id.is_empty() {
|
||
let tid = &wf.template_id;
|
||
if template::is_repo_template(tid) {
|
||
match template::extract_repo_template(tid, mgr.template_repo.as_ref()).await {
|
||
Ok(template_dir) => {
|
||
LoadedTemplate::load_from_dir(tid, &template_dir).await.ok()
|
||
}
|
||
Err(e) => {
|
||
tracing::warn!("Failed to reload template {}: {}", tid, e);
|
||
None
|
||
}
|
||
}
|
||
} else {
|
||
LoadedTemplate::load(tid).await.ok()
|
||
}
|
||
} else {
|
||
None
|
||
};
|
||
let ext_tools = loaded_template.as_ref().map(|t| &t.external_tools);
|
||
let plan_approval = loaded_template.as_ref().map_or(false, |t| t.require_plan_approval);
|
||
|
||
let result = run_agent_loop(
|
||
&llm, &exec, &pool, &broadcast_tx,
|
||
&project_id, &workflow_id, &wf.requirement, &workdir, &mgr,
|
||
&instructions, Some(state), ext_tools, &mut rx,
|
||
plan_approval,
|
||
).await;
|
||
|
||
let final_status = if result.is_ok() { "done" } else { "failed" };
|
||
if let Err(e) = &result {
|
||
let _ = broadcast_tx.send(WsMessage::Error {
|
||
message: format!("Agent error: {}", e),
|
||
});
|
||
}
|
||
|
||
let _ = sqlx::query("UPDATE workflows SET status = ? WHERE id = ?")
|
||
.bind(final_status)
|
||
.bind(&workflow_id)
|
||
.execute(&pool)
|
||
.await;
|
||
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
|
||
workflow_id: workflow_id.clone(),
|
||
status: final_status.into(),
|
||
});
|
||
|
||
// Regenerate report
|
||
let log_entries = sqlx::query_as::<_, crate::db::ExecutionLogEntry>(
|
||
"SELECT * FROM execution_log WHERE workflow_id = ? ORDER BY created_at"
|
||
)
|
||
.bind(&workflow_id)
|
||
.fetch_all(&pool)
|
||
.await
|
||
.unwrap_or_default();
|
||
|
||
if let Ok(report) = generate_report(&llm, &wf.requirement, &log_entries, &project_id).await {
|
||
let _ = sqlx::query("UPDATE workflows SET report = ? WHERE id = ?")
|
||
.bind(&report)
|
||
.bind(&workflow_id)
|
||
.execute(&pool)
|
||
.await;
|
||
let _ = broadcast_tx.send(WsMessage::ReportReady {
|
||
workflow_id: workflow_id.clone(),
|
||
});
|
||
}
|
||
} else {
|
||
// No actionable steps — feedback was informational only
|
||
// Mark workflow back to done
|
||
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
|
||
workflow_id: workflow_id.clone(),
|
||
status: "done".into(),
|
||
});
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
tracing::info!("Agent loop ended for project {}", project_id);
|
||
}
|
||
|
||
// --- Tool definitions ---
|
||
|
||
fn make_tool(name: &str, description: &str, parameters: serde_json::Value) -> Tool {
|
||
Tool {
|
||
tool_type: "function".into(),
|
||
function: ToolFunction {
|
||
name: name.into(),
|
||
description: description.into(),
|
||
parameters,
|
||
},
|
||
}
|
||
}
|
||
|
||
fn tool_read_file() -> Tool {
|
||
make_tool("read_file", "读取工作区中的文件内容", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {
|
||
"path": { "type": "string", "description": "工作区内的相对路径" }
|
||
},
|
||
"required": ["path"]
|
||
}))
|
||
}
|
||
|
||
fn tool_list_files() -> Tool {
|
||
make_tool("list_files", "列出工作区目录中的文件和子目录", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {
|
||
"path": { "type": "string", "description": "工作区内的相对路径,默认为根目录" }
|
||
}
|
||
}))
|
||
}
|
||
|
||
fn tool_kb_search() -> Tool {
|
||
make_tool("kb_search", "搜索知识库中与查询相关的内容片段。返回最相关的 top-5 片段。", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {
|
||
"query": { "type": "string", "description": "搜索查询" }
|
||
},
|
||
"required": ["query"]
|
||
}))
|
||
}
|
||
|
||
fn tool_kb_read() -> Tool {
|
||
make_tool("kb_read", "读取知识库全文内容。", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {}
|
||
}))
|
||
}
|
||
|
||
fn build_planning_tools() -> Vec<Tool> {
|
||
vec![
|
||
make_tool("update_plan", "设置高层执行计划。分析需求后调用此工具提交计划。每个步骤应是一个逻辑阶段(不是具体命令),包含简短标题和详细描述。调用后自动进入执行阶段。", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {
|
||
"steps": {
|
||
"type": "array",
|
||
"items": {
|
||
"type": "object",
|
||
"properties": {
|
||
"title": { "type": "string", "description": "步骤标题,简短概括(如'搭建环境')" },
|
||
"description": { "type": "string", "description": "详细描述,说明具体要做什么、为什么" }
|
||
},
|
||
"required": ["title", "description"]
|
||
},
|
||
"description": "高层计划步骤列表"
|
||
}
|
||
},
|
||
"required": ["steps"]
|
||
})),
|
||
tool_list_files(),
|
||
tool_read_file(),
|
||
tool_kb_search(),
|
||
tool_kb_read(),
|
||
]
|
||
}
|
||
|
||
/// Coordinator tools — used by the main loop after step completion
|
||
fn build_coordinator_tools() -> Vec<Tool> {
|
||
vec![
|
||
make_tool("update_plan", "修改执行计划。提供完整步骤列表,系统自动 diff:description 未变的已完成步骤保留成果,变化的步骤及后续重新执行。", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {
|
||
"steps": {
|
||
"type": "array",
|
||
"items": {
|
||
"type": "object",
|
||
"properties": {
|
||
"title": { "type": "string", "description": "步骤标题" },
|
||
"description": { "type": "string", "description": "详细描述" }
|
||
},
|
||
"required": ["title", "description"]
|
||
}
|
||
}
|
||
},
|
||
"required": ["steps"]
|
||
})),
|
||
make_tool("update_scratchpad", "更新全局备忘录。跨步骤持久化的关键信息。新内容会追加到已有内容之后。", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {
|
||
"content": { "type": "string", "description": "要追加的内容" }
|
||
},
|
||
"required": ["content"]
|
||
})),
|
||
make_tool("update_requirement", "更新项目需求描述。当需要调整方向时使用。", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {
|
||
"requirement": { "type": "string", "description": "更新后的需求描述" }
|
||
},
|
||
"required": ["requirement"]
|
||
})),
|
||
]
|
||
}
|
||
|
||
/// Step execution tools — used by run_step_loop
|
||
fn build_step_tools() -> Vec<Tool> {
|
||
vec![
|
||
make_tool("execute", "在工作区目录中执行 shell 命令", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {
|
||
"command": { "type": "string", "description": "要执行的 shell 命令" }
|
||
},
|
||
"required": ["command"]
|
||
})),
|
||
tool_read_file(),
|
||
make_tool("write_file", "在工作区中写入文件(自动创建父目录)", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {
|
||
"path": { "type": "string", "description": "工作区内的相对路径" },
|
||
"content": { "type": "string", "description": "要写入的文件内容" }
|
||
},
|
||
"required": ["path", "content"]
|
||
})),
|
||
tool_list_files(),
|
||
make_tool("start_service", "启动后台服务进程(如 FastAPI 应用)。系统会自动分配端口并通过环境变量 PORT 传入。服务启动后可通过 /api/projects/{project_id}/app/ 访问。注意:启动命令应监听 0.0.0.0:$PORT。", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {
|
||
"command": { "type": "string", "description": "启动命令,如 'uvicorn main:app --host 0.0.0.0 --port $PORT'" }
|
||
},
|
||
"required": ["command"]
|
||
})),
|
||
make_tool("stop_service", "停止当前项目正在运行的后台服务进程。", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {}
|
||
})),
|
||
make_tool("update_scratchpad", "更新步骤级工作记忆。用于记录本步骤内的中间状态(步骤结束后丢弃,精华写进 summary)。不是日志,只保留当前有用的信息。", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {
|
||
"content": { "type": "string", "description": "要追加的内容" }
|
||
},
|
||
"required": ["content"]
|
||
})),
|
||
make_tool("ask_user", "向用户提问,暂停执行等待用户回复。用于需要用户输入、确认或决策的场景。", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {
|
||
"question": { "type": "string", "description": "要向用户提出的问题或需要确认的内容" }
|
||
},
|
||
"required": ["question"]
|
||
})),
|
||
make_tool("step_done", "完成当前步骤。必须提供摘要和产出物列表(无产出物时传空数组)。", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {
|
||
"summary": { "type": "string", "description": "本步骤的工作摘要" },
|
||
"artifacts": {
|
||
"type": "array",
|
||
"description": "本步骤的产出物列表。无产出物时传空数组 []",
|
||
"items": {
|
||
"type": "object",
|
||
"properties": {
|
||
"name": { "type": "string", "description": "产物名称" },
|
||
"path": { "type": "string", "description": "文件路径(相对工作目录)" },
|
||
"type": { "type": "string", "enum": ["file", "json", "markdown"], "description": "产物类型" },
|
||
"description": { "type": "string", "description": "一句话说明" }
|
||
},
|
||
"required": ["name", "path", "type"]
|
||
}
|
||
}
|
||
},
|
||
"required": ["summary", "artifacts"]
|
||
})),
|
||
tool_kb_search(),
|
||
tool_kb_read(),
|
||
]
|
||
}
|
||
|
||
fn build_planning_prompt(project_id: &str, instructions: &str) -> String {
|
||
let mut prompt = include_str!("prompts/planning.md")
|
||
.replace("{project_id}", project_id);
|
||
if !instructions.is_empty() {
|
||
prompt.push_str(&format!("\n\n## 项目模板指令\n\n{}", instructions));
|
||
}
|
||
prompt
|
||
}
|
||
|
||
fn build_coordinator_prompt(project_id: &str, instructions: &str) -> String {
|
||
let mut prompt = include_str!("prompts/execution.md")
|
||
.replace("{project_id}", project_id);
|
||
if !instructions.is_empty() {
|
||
prompt.push_str(&format!("\n\n## 项目模板指令\n\n{}", instructions));
|
||
}
|
||
prompt
|
||
}
|
||
|
||
fn build_step_execution_prompt(project_id: &str, instructions: &str) -> String {
|
||
let mut prompt = include_str!("prompts/step_execution.md")
|
||
.replace("{project_id}", project_id);
|
||
if !instructions.is_empty() {
|
||
prompt.push_str(&format!("\n\n## 项目模板指令\n\n{}", instructions));
|
||
}
|
||
prompt
|
||
}
|
||
|
||
/// Build user message for a step sub-loop
|
||
fn build_step_user_message(step: &Step, completed_summaries: &[(i32, String, String, Vec<Artifact>)], parent_scratchpad: &str) -> String {
|
||
let mut ctx = String::new();
|
||
|
||
ctx.push_str(&format!("## 当前步骤(步骤 {})\n", step.order));
|
||
ctx.push_str(&format!("标题:{}\n", step.title));
|
||
ctx.push_str(&format!("描述:{}\n", step.description));
|
||
|
||
if !step.user_feedbacks.is_empty() {
|
||
ctx.push_str("\n用户反馈:\n");
|
||
for fb in &step.user_feedbacks {
|
||
ctx.push_str(&format!("- {}\n", fb));
|
||
}
|
||
}
|
||
ctx.push('\n');
|
||
|
||
if !completed_summaries.is_empty() {
|
||
ctx.push_str("## 已完成步骤摘要\n");
|
||
for (order, title, summary, artifacts) in completed_summaries {
|
||
ctx.push_str(&format!("- 步骤 {} ({}): {}\n", order, title, summary));
|
||
if !artifacts.is_empty() {
|
||
let arts: Vec<String> = artifacts.iter()
|
||
.map(|a| format!("{} ({})", a.name, a.artifact_type))
|
||
.collect();
|
||
ctx.push_str(&format!(" 产物: {}\n", arts.join(", ")));
|
||
}
|
||
}
|
||
ctx.push('\n');
|
||
}
|
||
|
||
if !parent_scratchpad.is_empty() {
|
||
ctx.push_str("## 项目备忘录(只读)\n");
|
||
ctx.push_str(parent_scratchpad);
|
||
ctx.push('\n');
|
||
}
|
||
|
||
ctx
|
||
}
|
||
|
||
fn build_feedback_prompt(project_id: &str, state: &AgentState, feedback: &str) -> String {
|
||
let mut plan_state = String::new();
|
||
for s in &state.steps {
|
||
let status = match s.status {
|
||
StepStatus::Done => " [done]",
|
||
StepStatus::Running => " [running]",
|
||
StepStatus::WaitingUser => " [waiting]",
|
||
StepStatus::Failed => " [FAILED]",
|
||
StepStatus::Pending => "",
|
||
};
|
||
plan_state.push_str(&format!("{}. {}{}\n {}\n", s.order, s.title, status, s.description));
|
||
if let Some(summary) = &s.summary {
|
||
plan_state.push_str(&format!(" 摘要: {}\n", summary));
|
||
}
|
||
}
|
||
include_str!("prompts/feedback.md")
|
||
.replace("{project_id}", project_id)
|
||
.replace("{plan_state}", &plan_state)
|
||
.replace("{feedback}", feedback)
|
||
}
|
||
|
||
fn build_feedback_tools() -> Vec<Tool> {
|
||
vec![
|
||
make_tool("revise_plan", "修改执行计划。提供完整步骤列表。系统自动 diff:description 未变的已完成步骤保留成果,变化的步骤及后续重新执行。", serde_json::json!({
|
||
"type": "object",
|
||
"properties": {
|
||
"steps": {
|
||
"type": "array",
|
||
"items": {
|
||
"type": "object",
|
||
"properties": {
|
||
"title": { "type": "string", "description": "步骤标题" },
|
||
"description": { "type": "string", "description": "详细描述" }
|
||
},
|
||
"required": ["title", "description"]
|
||
}
|
||
}
|
||
},
|
||
"required": ["steps"]
|
||
})),
|
||
]
|
||
}
|
||
|
||
// --- Helpers ---
|
||
|
||
/// Truncate a string at a char boundary, returning at most `max_bytes` bytes.
|
||
fn truncate_str(s: &str, max_bytes: usize) -> &str {
|
||
if s.len() <= max_bytes {
|
||
return s;
|
||
}
|
||
let mut end = max_bytes;
|
||
while end > 0 && !s.is_char_boundary(end) {
|
||
end -= 1;
|
||
}
|
||
&s[..end]
|
||
}
|
||
|
||
// --- Tool execution ---
|
||
|
||
async fn execute_tool(
|
||
name: &str,
|
||
arguments: &str,
|
||
workdir: &str,
|
||
exec: &LocalExecutor,
|
||
) -> String {
|
||
let args: serde_json::Value = serde_json::from_str(arguments).unwrap_or_default();
|
||
|
||
match name {
|
||
"execute" => {
|
||
let cmd = args["command"].as_str().unwrap_or("");
|
||
match exec.execute(cmd, workdir).await {
|
||
Ok(r) => {
|
||
let mut out = r.stdout;
|
||
if !r.stderr.is_empty() {
|
||
out.push_str("\nSTDERR: ");
|
||
out.push_str(&r.stderr);
|
||
}
|
||
if r.exit_code != 0 {
|
||
out.push_str(&format!("\n[exit code: {}]", r.exit_code));
|
||
}
|
||
if out.len() > 8000 {
|
||
let truncated = truncate_str(&out, 8000).to_string();
|
||
out = truncated;
|
||
out.push_str("\n...(truncated)");
|
||
}
|
||
out
|
||
}
|
||
Err(e) => format!("Error: {}", e),
|
||
}
|
||
}
|
||
"read_file" => {
|
||
let path = args["path"].as_str().unwrap_or("");
|
||
if path.contains("..") {
|
||
return "Error: path traversal not allowed".into();
|
||
}
|
||
let full = std::path::PathBuf::from(workdir).join(path);
|
||
match tokio::fs::read_to_string(&full).await {
|
||
Ok(content) => {
|
||
if content.len() > 8000 {
|
||
format!("{}...(truncated, {} bytes total)", truncate_str(&content, 8000), content.len())
|
||
} else {
|
||
content
|
||
}
|
||
}
|
||
Err(e) => format!("Error: {}", e),
|
||
}
|
||
}
|
||
"write_file" => {
|
||
let path = args["path"].as_str().unwrap_or("");
|
||
let content = args["content"].as_str().unwrap_or("");
|
||
if path.contains("..") {
|
||
return "Error: path traversal not allowed".into();
|
||
}
|
||
let full = std::path::PathBuf::from(workdir).join(path);
|
||
if let Some(parent) = full.parent() {
|
||
let _ = tokio::fs::create_dir_all(parent).await;
|
||
}
|
||
match tokio::fs::write(&full, content).await {
|
||
Ok(()) => format!("Written {} bytes to {}", content.len(), path),
|
||
Err(e) => format!("Error: {}", e),
|
||
}
|
||
}
|
||
"list_files" => {
|
||
let path = args["path"].as_str().unwrap_or(".");
|
||
if path.contains("..") {
|
||
return "Error: path traversal not allowed".into();
|
||
}
|
||
let full = std::path::PathBuf::from(workdir).join(path);
|
||
match tokio::fs::read_dir(&full).await {
|
||
Ok(mut entries) => {
|
||
let mut items = Vec::new();
|
||
while let Ok(Some(entry)) = entries.next_entry().await {
|
||
let name = entry.file_name().to_string_lossy().to_string();
|
||
let is_dir = entry.file_type().await.map(|t| t.is_dir()).unwrap_or(false);
|
||
items.push(if is_dir { format!("{}/", name) } else { name });
|
||
}
|
||
items.sort();
|
||
if items.is_empty() { "(empty directory)".into() } else { items.join("\n") }
|
||
}
|
||
Err(e) => format!("Error: {}", e),
|
||
}
|
||
}
|
||
_ => format!("Unknown tool: {}", name),
|
||
}
|
||
}
|
||
|
||
// --- Tool-calling agent loop (state machine) ---
|
||
|
||
/// Save an AgentState snapshot to DB.
|
||
async fn save_state_snapshot(pool: &SqlitePool, workflow_id: &str, step_order: i32, state: &AgentState) {
|
||
let id = uuid::Uuid::new_v4().to_string();
|
||
let json = serde_json::to_string(state).unwrap_or_default();
|
||
let _ = sqlx::query(
|
||
"INSERT INTO agent_state_snapshots (id, workflow_id, step_order, state_json, created_at) VALUES (?, ?, ?, ?, datetime('now'))"
|
||
)
|
||
.bind(&id)
|
||
.bind(workflow_id)
|
||
.bind(step_order)
|
||
.bind(&json)
|
||
.execute(pool)
|
||
.await;
|
||
}
|
||
|
||
/// Log a tool call to execution_log and broadcast to frontend.
|
||
async fn log_execution(
|
||
pool: &SqlitePool,
|
||
broadcast_tx: &broadcast::Sender<WsMessage>,
|
||
workflow_id: &str,
|
||
step_order: i32,
|
||
tool_name: &str,
|
||
tool_input: &str,
|
||
output: &str,
|
||
status: &str,
|
||
) {
|
||
let id = uuid::Uuid::new_v4().to_string();
|
||
let _ = sqlx::query(
|
||
"INSERT INTO execution_log (id, workflow_id, step_order, tool_name, tool_input, output, status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))"
|
||
)
|
||
.bind(&id)
|
||
.bind(workflow_id)
|
||
.bind(step_order)
|
||
.bind(tool_name)
|
||
.bind(tool_input)
|
||
.bind(output)
|
||
.bind(status)
|
||
.execute(pool)
|
||
.await;
|
||
|
||
let _ = broadcast_tx.send(WsMessage::StepStatusUpdate {
|
||
step_id: id,
|
||
status: status.into(),
|
||
output: output.to_string(),
|
||
});
|
||
}
|
||
|
||
/// Log an LLM call to llm_call_log and broadcast to frontend.
|
||
#[allow(clippy::too_many_arguments)]
|
||
async fn log_llm_call(
|
||
pool: &SqlitePool,
|
||
broadcast_tx: &broadcast::Sender<WsMessage>,
|
||
workflow_id: &str,
|
||
step_order: i32,
|
||
phase: &str,
|
||
messages_count: i32,
|
||
tools_count: i32,
|
||
tool_calls_json: &str,
|
||
text_response: &str,
|
||
prompt_tokens: Option<u32>,
|
||
completion_tokens: Option<u32>,
|
||
latency_ms: i64,
|
||
) {
|
||
let id = uuid::Uuid::new_v4().to_string();
|
||
let _ = sqlx::query(
|
||
"INSERT INTO llm_call_log (id, workflow_id, step_order, phase, messages_count, tools_count, tool_calls, text_response, prompt_tokens, completion_tokens, latency_ms, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))"
|
||
)
|
||
.bind(&id)
|
||
.bind(workflow_id)
|
||
.bind(step_order)
|
||
.bind(phase)
|
||
.bind(messages_count)
|
||
.bind(tools_count)
|
||
.bind(tool_calls_json)
|
||
.bind(text_response)
|
||
.bind(prompt_tokens.map(|v| v as i32))
|
||
.bind(completion_tokens.map(|v| v as i32))
|
||
.bind(latency_ms as i32)
|
||
.execute(pool)
|
||
.await;
|
||
|
||
let entry = crate::db::LlmCallLogEntry {
|
||
id: id.clone(),
|
||
workflow_id: workflow_id.to_string(),
|
||
step_order,
|
||
phase: phase.to_string(),
|
||
messages_count,
|
||
tools_count,
|
||
tool_calls: tool_calls_json.to_string(),
|
||
text_response: text_response.to_string(),
|
||
prompt_tokens: prompt_tokens.map(|v| v as i32),
|
||
completion_tokens: completion_tokens.map(|v| v as i32),
|
||
latency_ms: latency_ms as i32,
|
||
created_at: String::new(),
|
||
};
|
||
|
||
let _ = broadcast_tx.send(WsMessage::LlmCallLog {
|
||
workflow_id: workflow_id.to_string(),
|
||
entry,
|
||
});
|
||
}
|
||
|
||
/// Process user feedback: call LLM to decide whether to revise the plan.
|
||
/// Returns the (possibly modified) AgentState ready for resumed execution.
|
||
async fn process_feedback(
|
||
llm: &LlmClient,
|
||
pool: &SqlitePool,
|
||
broadcast_tx: &broadcast::Sender<WsMessage>,
|
||
project_id: &str,
|
||
workflow_id: &str,
|
||
mut state: AgentState,
|
||
feedback: &str,
|
||
) -> AgentState {
|
||
let prompt = build_feedback_prompt(project_id, &state, feedback);
|
||
let tools = build_feedback_tools();
|
||
let messages = vec![
|
||
ChatMessage::system(&prompt),
|
||
ChatMessage::user(feedback),
|
||
];
|
||
|
||
tracing::info!("[workflow {}] Processing feedback with LLM", workflow_id);
|
||
let response = match llm.chat_with_tools(messages, &tools).await {
|
||
Ok(r) => r,
|
||
Err(e) => {
|
||
tracing::error!("[workflow {}] Feedback LLM call failed: {}", workflow_id, e);
|
||
// On failure, attach feedback to first non-done step and return unchanged
|
||
if let Some(step) = state.steps.iter_mut().find(|s| !matches!(s.status, StepStatus::Done)) {
|
||
step.user_feedbacks.push(feedback.to_string());
|
||
}
|
||
return state;
|
||
}
|
||
};
|
||
|
||
let choice = match response.choices.into_iter().next() {
|
||
Some(c) => c,
|
||
None => return state,
|
||
};
|
||
|
||
if let Some(tool_calls) = &choice.message.tool_calls {
|
||
for tc in tool_calls {
|
||
if tc.function.name == "revise_plan" {
|
||
let args: serde_json::Value = serde_json::from_str(&tc.function.arguments).unwrap_or_default();
|
||
let raw_steps = args["steps"].as_array().cloned().unwrap_or_default();
|
||
|
||
let new_steps: Vec<Step> = raw_steps.iter().enumerate().map(|(i, item)| {
|
||
let order = (i + 1) as i32;
|
||
Step {
|
||
order,
|
||
title: item["title"].as_str().unwrap_or("").to_string(),
|
||
description: item["description"].as_str().unwrap_or("").to_string(),
|
||
status: StepStatus::Pending,
|
||
summary: None,
|
||
user_feedbacks: Vec::new(),
|
||
db_id: String::new(),
|
||
artifacts: Vec::new(),
|
||
}
|
||
}).collect();
|
||
|
||
// Apply docker-cache diff
|
||
let diff = state.apply_plan_diff(new_steps);
|
||
|
||
// Broadcast updated plan
|
||
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
steps: plan_infos_from_state(&state),
|
||
});
|
||
|
||
tracing::info!("[workflow {}] Plan revised via feedback. First actionable: {:?}",
|
||
workflow_id, state.first_actionable_step());
|
||
|
||
// Log the diff so frontend can show what changed
|
||
let diff_display = format!("```diff\n{}\n```", diff);
|
||
log_execution(pool, broadcast_tx, workflow_id, 0, "revise_plan", "计划变更", &diff_display, "done").await;
|
||
}
|
||
}
|
||
} else {
|
||
// Text response only — feedback is informational, no plan change
|
||
let text = choice.message.content.as_deref().unwrap_or("");
|
||
tracing::info!("[workflow {}] Feedback processed, no plan change: {}", workflow_id, truncate_str(text, 200));
|
||
log_execution(pool, broadcast_tx, workflow_id, state.current_step(), "text_response", "", text, "done").await;
|
||
}
|
||
|
||
// Attach feedback to the first actionable step (or last step)
|
||
let target_order = state.first_actionable_step()
|
||
.unwrap_or_else(|| state.steps.last().map(|s| s.order).unwrap_or(0));
|
||
if let Some(step) = state.steps.iter_mut().find(|s| s.order == target_order) {
|
||
step.user_feedbacks.push(feedback.to_string());
|
||
}
|
||
|
||
// Snapshot after feedback processing
|
||
save_state_snapshot(pool, workflow_id, state.current_step(), &state).await;
|
||
|
||
state
|
||
}
|
||
|
||
/// Run an isolated sub-loop for a single step. Returns StepResult.
|
||
#[allow(clippy::too_many_arguments)]
|
||
async fn run_step_loop(
|
||
llm: &LlmClient,
|
||
exec: &LocalExecutor,
|
||
pool: &SqlitePool,
|
||
broadcast_tx: &broadcast::Sender<WsMessage>,
|
||
project_id: &str,
|
||
workflow_id: &str,
|
||
workdir: &str,
|
||
mgr: &Arc<AgentManager>,
|
||
instructions: &str,
|
||
step: &Step,
|
||
completed_summaries: &[(i32, String, String, Vec<Artifact>)],
|
||
parent_scratchpad: &str,
|
||
external_tools: Option<&ExternalToolManager>,
|
||
event_rx: &mut mpsc::Receiver<AgentEvent>,
|
||
) -> StepResult {
|
||
let system_prompt = build_step_execution_prompt(project_id, instructions);
|
||
let user_message = build_step_user_message(step, completed_summaries, parent_scratchpad);
|
||
|
||
let mut step_tools = build_step_tools();
|
||
if let Some(ext) = external_tools {
|
||
step_tools.extend(ext.tool_definitions());
|
||
}
|
||
|
||
let mut step_chat_history: Vec<ChatMessage> = Vec::new();
|
||
let mut step_scratchpad = String::new();
|
||
let step_order = step.order;
|
||
|
||
for iteration in 0..50 {
|
||
// Build messages: system + user context + chat history
|
||
let mut messages = vec![
|
||
ChatMessage::system(&system_prompt),
|
||
ChatMessage::user(&user_message),
|
||
];
|
||
// If step scratchpad is non-empty, inject it
|
||
if !step_scratchpad.is_empty() {
|
||
let last_user = messages.len() - 1;
|
||
if let Some(content) = &messages[last_user].content {
|
||
let mut amended = content.clone();
|
||
amended.push_str(&format!("\n\n## 步骤工作记忆\n{}", step_scratchpad));
|
||
messages[last_user].content = Some(amended);
|
||
}
|
||
}
|
||
messages.extend(step_chat_history.clone());
|
||
|
||
let msg_count = messages.len() as i32;
|
||
let tool_count = step_tools.len() as i32;
|
||
let phase_label = format!("step({})", step_order);
|
||
|
||
tracing::info!("[workflow {}] Step {} LLM call #{} msgs={}", workflow_id, step_order, iteration + 1, messages.len());
|
||
let _ = broadcast_tx.send(WsMessage::ActivityUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
activity: format!("步骤 {} — 等待 LLM 响应...", step_order),
|
||
});
|
||
let call_start = std::time::Instant::now();
|
||
let response = match llm.chat_with_tools(messages, &step_tools).await {
|
||
Ok(r) => r,
|
||
Err(e) => {
|
||
tracing::error!("[workflow {}] Step {} LLM call failed: {}", workflow_id, step_order, e);
|
||
return StepResult {
|
||
status: StepResultStatus::Failed { error: format!("LLM call failed: {}", e) },
|
||
artifacts: Vec::new(),
|
||
summary: format!("LLM 调用失败: {}", e),
|
||
};
|
||
}
|
||
};
|
||
let latency_ms = call_start.elapsed().as_millis() as i64;
|
||
|
||
let (prompt_tokens, completion_tokens) = response.usage.as_ref()
|
||
.map(|u| (Some(u.prompt_tokens), Some(u.completion_tokens)))
|
||
.unwrap_or((None, None));
|
||
|
||
let choice = match response.choices.into_iter().next() {
|
||
Some(c) => c,
|
||
None => {
|
||
return StepResult {
|
||
status: StepResultStatus::Failed { error: "No response from LLM".into() },
|
||
summary: "LLM 无响应".into(),
|
||
artifacts: Vec::new(),
|
||
};
|
||
}
|
||
};
|
||
|
||
step_chat_history.push(choice.message.clone());
|
||
|
||
let llm_text_response = choice.message.content.clone().unwrap_or_default();
|
||
|
||
if let Some(tool_calls) = &choice.message.tool_calls {
|
||
tracing::info!("[workflow {}] Step {} tool calls: {}", workflow_id, step_order,
|
||
tool_calls.iter().map(|tc| tc.function.name.as_str()).collect::<Vec<_>>().join(", "));
|
||
|
||
let mut step_done_result: Option<StepResult> = None;
|
||
|
||
for tc in tool_calls {
|
||
if step_done_result.is_some() {
|
||
step_chat_history.push(ChatMessage::tool_result(&tc.id, "(skipped: step completed)"));
|
||
continue;
|
||
}
|
||
|
||
let args: serde_json::Value = serde_json::from_str(&tc.function.arguments).unwrap_or_default();
|
||
|
||
match tc.function.name.as_str() {
|
||
"step_done" => {
|
||
let summary = args["summary"].as_str().unwrap_or("").to_string();
|
||
let artifacts: Vec<Artifact> = args.get("artifacts")
|
||
.and_then(|v| v.as_array())
|
||
.map(|arr| arr.iter().filter_map(|a| {
|
||
Some(Artifact {
|
||
name: a["name"].as_str()?.to_string(),
|
||
path: a["path"].as_str()?.to_string(),
|
||
artifact_type: a["type"].as_str().unwrap_or("file").to_string(),
|
||
description: a["description"].as_str().unwrap_or("").to_string(),
|
||
})
|
||
}).collect())
|
||
.unwrap_or_default();
|
||
|
||
// Save artifacts to DB
|
||
for art in &artifacts {
|
||
let art_id = uuid::Uuid::new_v4().to_string();
|
||
let _ = sqlx::query(
|
||
"INSERT INTO step_artifacts (id, workflow_id, step_order, name, path, artifact_type, description) VALUES (?, ?, ?, ?, ?, ?, ?)"
|
||
)
|
||
.bind(&art_id)
|
||
.bind(workflow_id)
|
||
.bind(step_order)
|
||
.bind(&art.name)
|
||
.bind(&art.path)
|
||
.bind(&art.artifact_type)
|
||
.bind(&art.description)
|
||
.execute(pool)
|
||
.await;
|
||
}
|
||
|
||
log_execution(pool, broadcast_tx, workflow_id, step_order, "step_done", &summary, "步骤完成", "done").await;
|
||
step_chat_history.push(ChatMessage::tool_result(&tc.id, "步骤已完成。"));
|
||
step_done_result = Some(StepResult {
|
||
status: StepResultStatus::Done,
|
||
summary,
|
||
artifacts,
|
||
});
|
||
}
|
||
|
||
"update_scratchpad" => {
|
||
let content = args["content"].as_str().unwrap_or("");
|
||
let mut new_pad = step_scratchpad.clone();
|
||
if !new_pad.is_empty() {
|
||
new_pad.push('\n');
|
||
}
|
||
new_pad.push_str(content);
|
||
match check_scratchpad_size(&new_pad) {
|
||
Ok(()) => {
|
||
step_scratchpad = new_pad;
|
||
step_chat_history.push(ChatMessage::tool_result(&tc.id, "步骤工作记忆已更新。"));
|
||
}
|
||
Err(msg) => {
|
||
step_chat_history.push(ChatMessage::tool_result(&tc.id, &msg));
|
||
}
|
||
}
|
||
}
|
||
|
||
"ask_user" => {
|
||
let reason = args["question"].as_str().unwrap_or("等待确认");
|
||
let _ = broadcast_tx.send(WsMessage::ActivityUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
activity: format!("步骤 {} — 等待用户确认: {}", step_order, reason),
|
||
});
|
||
|
||
// Broadcast waiting status
|
||
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
steps: plan_infos_from_state_with_override(step_order, "waiting_user",
|
||
pool, workflow_id).await,
|
||
});
|
||
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
status: "waiting_user".into(),
|
||
});
|
||
let _ = sqlx::query("UPDATE workflows SET status = 'waiting_user' WHERE id = ?")
|
||
.bind(workflow_id)
|
||
.execute(pool)
|
||
.await;
|
||
log_execution(pool, broadcast_tx, workflow_id, step_order, "ask_user", reason, reason, "waiting").await;
|
||
|
||
tracing::info!("[workflow {}] Step {} waiting for approval: {}", workflow_id, step_order, reason);
|
||
|
||
// Block until Comment event
|
||
let approval_content = loop {
|
||
match event_rx.recv().await {
|
||
Some(AgentEvent::Comment { content, .. }) => break content,
|
||
Some(_) => continue,
|
||
None => {
|
||
return StepResult {
|
||
status: StepResultStatus::Failed { error: "Event channel closed".into() },
|
||
summary: "事件通道关闭".into(),
|
||
artifacts: Vec::new(),
|
||
};
|
||
}
|
||
}
|
||
};
|
||
|
||
tracing::info!("[workflow {}] Step {} approval response: {}", workflow_id, step_order, approval_content);
|
||
|
||
if approval_content.starts_with("rejected:") {
|
||
let reason = approval_content.strip_prefix("rejected:").unwrap_or("").trim();
|
||
log_execution(pool, broadcast_tx, workflow_id, step_order, "ask_user", "rejected", reason, "failed").await;
|
||
step_chat_history.push(ChatMessage::tool_result(&tc.id, &format!("用户拒绝: {}", reason)));
|
||
step_done_result = Some(StepResult {
|
||
status: StepResultStatus::Failed { error: format!("用户终止: {}", reason) },
|
||
summary: format!("用户终止了执行: {}", reason),
|
||
artifacts: Vec::new(),
|
||
});
|
||
continue;
|
||
}
|
||
|
||
// Approved
|
||
let feedback = if approval_content.starts_with("approved:") {
|
||
approval_content.strip_prefix("approved:").unwrap_or("").trim().to_string()
|
||
} else {
|
||
approval_content.clone()
|
||
};
|
||
|
||
let _ = sqlx::query("UPDATE workflows SET status = 'executing' WHERE id = ?")
|
||
.bind(workflow_id)
|
||
.execute(pool)
|
||
.await;
|
||
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
status: "executing".into(),
|
||
});
|
||
|
||
let tool_msg = if feedback.is_empty() {
|
||
"用户已确认,继续执行。".to_string()
|
||
} else {
|
||
format!("用户已确认。反馈: {}", feedback)
|
||
};
|
||
step_chat_history.push(ChatMessage::tool_result(&tc.id, &tool_msg));
|
||
}
|
||
|
||
"start_service" => {
|
||
let cmd = args["command"].as_str().unwrap_or("");
|
||
{
|
||
let mut services = mgr.services.write().await;
|
||
if let Some(old) = services.remove(project_id) {
|
||
let _ = nix::sys::signal::kill(
|
||
nix::unistd::Pid::from_raw(old.pid as i32),
|
||
nix::sys::signal::Signal::SIGTERM,
|
||
);
|
||
}
|
||
}
|
||
let port = mgr.allocate_port();
|
||
let cmd_with_port = cmd.replace("$PORT", &port.to_string());
|
||
let venv_bin = format!("{}/.venv/bin", workdir);
|
||
let path_env = match std::env::var("PATH") {
|
||
Ok(p) => format!("{}:{}", venv_bin, p),
|
||
Err(_) => venv_bin,
|
||
};
|
||
let result = match tokio::process::Command::new("sh")
|
||
.arg("-c")
|
||
.arg(&cmd_with_port)
|
||
.current_dir(workdir)
|
||
.env("PORT", port.to_string())
|
||
.env("PATH", &path_env)
|
||
.env("VIRTUAL_ENV", format!("{}/.venv", workdir))
|
||
.stdout(std::process::Stdio::null())
|
||
.stderr(std::process::Stdio::null())
|
||
.spawn()
|
||
{
|
||
Ok(child) => {
|
||
let pid = child.id().unwrap_or(0);
|
||
mgr.services.write().await.insert(project_id.to_string(), ServiceInfo { port, pid });
|
||
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
||
format!("服务已启动,端口 {},访问地址:/api/projects/{}/app/", port, project_id)
|
||
}
|
||
Err(e) => format!("Error: 启动失败:{}", e),
|
||
};
|
||
let status = if result.starts_with("Error:") { "failed" } else { "done" };
|
||
log_execution(pool, broadcast_tx, workflow_id, step_order, "start_service", cmd, &result, status).await;
|
||
step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
|
||
}
|
||
|
||
"stop_service" => {
|
||
let mut services = mgr.services.write().await;
|
||
let result = if let Some(svc) = services.remove(project_id) {
|
||
let _ = nix::sys::signal::kill(
|
||
nix::unistd::Pid::from_raw(svc.pid as i32),
|
||
nix::sys::signal::Signal::SIGTERM,
|
||
);
|
||
"服务已停止。".to_string()
|
||
} else {
|
||
"当前没有运行中的服务。".to_string()
|
||
};
|
||
log_execution(pool, broadcast_tx, workflow_id, step_order, "stop_service", "", &result, "done").await;
|
||
step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
|
||
}
|
||
|
||
"kb_search" => {
|
||
let query = args["query"].as_str().unwrap_or("");
|
||
let result = if let Some(kb) = &mgr.kb {
|
||
match kb.search(query).await {
|
||
Ok(results) if results.is_empty() => "知识库为空或没有匹配结果。".to_string(),
|
||
Ok(results) => {
|
||
results.iter().enumerate().map(|(i, r)| {
|
||
let article_label = if r.article_title.is_empty() {
|
||
String::new()
|
||
} else {
|
||
format!(" [文章: {}]", r.article_title)
|
||
};
|
||
format!("--- 片段 {} (相似度: {:.2}){} ---\n{}", i + 1, r.score, article_label, r.content)
|
||
}).collect::<Vec<_>>().join("\n\n")
|
||
}
|
||
Err(e) => format!("Error: {}", e),
|
||
}
|
||
} else {
|
||
"知识库未初始化。".to_string()
|
||
};
|
||
step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
|
||
}
|
||
|
||
"kb_read" => {
|
||
let result = if let Some(kb) = &mgr.kb {
|
||
match kb.read_all().await {
|
||
Ok(content) if content.is_empty() => "知识库为空。".to_string(),
|
||
Ok(content) => content,
|
||
Err(e) => format!("Error: {}", e),
|
||
}
|
||
} else {
|
||
"知识库未初始化。".to_string()
|
||
};
|
||
step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
|
||
}
|
||
|
||
// External tools
|
||
name if external_tools.as_ref().is_some_and(|e| e.has_tool(name)) => {
|
||
let _ = broadcast_tx.send(WsMessage::ActivityUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
activity: format!("步骤 {} — 工具: {}", step_order, name),
|
||
});
|
||
let result = match external_tools.unwrap().invoke(name, &tc.function.arguments, workdir).await {
|
||
Ok(output) => {
|
||
let truncated = truncate_str(&output, 8192);
|
||
truncated.to_string()
|
||
}
|
||
Err(e) => format!("Tool error: {}", e),
|
||
};
|
||
let status = if result.starts_with("Tool error:") { "failed" } else { "done" };
|
||
log_execution(pool, broadcast_tx, workflow_id, step_order, &tc.function.name, &tc.function.arguments, &result, status).await;
|
||
step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
|
||
}
|
||
|
||
// IO tools: execute, read_file, write_file, list_files
|
||
_ => {
|
||
let tool_desc = match tc.function.name.as_str() {
|
||
"execute" => {
|
||
let cmd_preview = args.get("command").and_then(|v| v.as_str()).unwrap_or("").chars().take(60).collect::<String>();
|
||
format!("执行命令: {}", cmd_preview)
|
||
}
|
||
"read_file" => format!("读取文件: {}", args.get("path").and_then(|v| v.as_str()).unwrap_or("?")),
|
||
"write_file" => format!("写入文件: {}", args.get("path").and_then(|v| v.as_str()).unwrap_or("?")),
|
||
"list_files" => "列出文件".to_string(),
|
||
other => format!("工具: {}", other),
|
||
};
|
||
let _ = broadcast_tx.send(WsMessage::ActivityUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
activity: format!("步骤 {} — {}", step_order, tool_desc),
|
||
});
|
||
let result = execute_tool(&tc.function.name, &tc.function.arguments, workdir, exec).await;
|
||
let status = if result.starts_with("Error:") { "failed" } else { "done" };
|
||
log_execution(pool, broadcast_tx, workflow_id, step_order, &tc.function.name, &tc.function.arguments, &result, status).await;
|
||
step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
|
||
}
|
||
}
|
||
}
|
||
|
||
// Log LLM call
|
||
let tc_json: Vec<serde_json::Value> = tool_calls.iter().map(|tc| {
|
||
serde_json::json!({
|
||
"name": tc.function.name,
|
||
"arguments_preview": truncate_str(&tc.function.arguments, 200),
|
||
})
|
||
}).collect();
|
||
let tc_json_str = serde_json::to_string(&tc_json).unwrap_or_else(|_| "[]".to_string());
|
||
log_llm_call(
|
||
pool, broadcast_tx, workflow_id, step_order,
|
||
&phase_label, msg_count, tool_count,
|
||
&tc_json_str, &llm_text_response,
|
||
prompt_tokens, completion_tokens, latency_ms,
|
||
).await;
|
||
|
||
if let Some(result) = step_done_result {
|
||
return result;
|
||
}
|
||
} else {
|
||
// Text response without tool calls
|
||
let content = choice.message.content.as_deref().unwrap_or("(no content)");
|
||
tracing::info!("[workflow {}] Step {} text response: {}", workflow_id, step_order, truncate_str(content, 200));
|
||
log_execution(pool, broadcast_tx, workflow_id, step_order, "text_response", "", content, "done").await;
|
||
log_llm_call(
|
||
pool, broadcast_tx, workflow_id, step_order,
|
||
&phase_label, msg_count, tool_count,
|
||
"[]", content,
|
||
prompt_tokens, completion_tokens, latency_ms,
|
||
).await;
|
||
// Text response in step loop — continue, LLM may follow up with tool calls
|
||
}
|
||
}
|
||
|
||
// Hit 50-iteration limit
|
||
tracing::warn!("[workflow {}] Step {} hit iteration limit (50)", workflow_id, step_order);
|
||
StepResult {
|
||
status: StepResultStatus::Failed { error: "步骤迭代次数超限(50轮)".into() },
|
||
summary: "步骤执行超过50轮迭代限制,未能完成".into(),
|
||
artifacts: Vec::new(),
|
||
}
|
||
}
|
||
|
||
/// Helper to get plan step infos with a status override for a specific step.
|
||
/// Used during ask_user in the step sub-loop where we don't have
|
||
/// mutable access to the AgentState.
|
||
async fn plan_infos_from_state_with_override(
|
||
step_order: i32,
|
||
override_status: &str,
|
||
pool: &SqlitePool,
|
||
workflow_id: &str,
|
||
) -> Vec<PlanStepInfo> {
|
||
// Read the latest state snapshot to get step info
|
||
let snapshot = sqlx::query_scalar::<_, String>(
|
||
"SELECT state_json FROM agent_state_snapshots WHERE workflow_id = ? ORDER BY created_at DESC LIMIT 1"
|
||
)
|
||
.bind(workflow_id)
|
||
.fetch_optional(pool)
|
||
.await
|
||
.ok()
|
||
.flatten();
|
||
|
||
if let Some(json) = snapshot {
|
||
if let Ok(state) = serde_json::from_str::<AgentState>(&json) {
|
||
return state.steps.iter().map(|s| {
|
||
let status = if s.order == step_order {
|
||
override_status.to_string()
|
||
} else {
|
||
match s.status {
|
||
StepStatus::Pending => "pending",
|
||
StepStatus::Running => "running",
|
||
StepStatus::WaitingUser => "waiting_user",
|
||
StepStatus::Done => "done",
|
||
StepStatus::Failed => "failed",
|
||
}.to_string()
|
||
};
|
||
PlanStepInfo {
|
||
order: s.order,
|
||
description: s.title.clone(),
|
||
command: s.description.clone(),
|
||
status: Some(status),
|
||
artifacts: s.artifacts.clone(),
|
||
}
|
||
}).collect();
|
||
}
|
||
}
|
||
Vec::new()
|
||
}
|
||
|
||
#[allow(clippy::too_many_arguments)]
|
||
async fn run_agent_loop(
|
||
llm: &LlmClient,
|
||
exec: &LocalExecutor,
|
||
pool: &SqlitePool,
|
||
broadcast_tx: &broadcast::Sender<WsMessage>,
|
||
project_id: &str,
|
||
workflow_id: &str,
|
||
requirement: &str,
|
||
workdir: &str,
|
||
mgr: &Arc<AgentManager>,
|
||
instructions: &str,
|
||
initial_state: Option<AgentState>,
|
||
external_tools: Option<&ExternalToolManager>,
|
||
event_rx: &mut mpsc::Receiver<AgentEvent>,
|
||
require_plan_approval: bool,
|
||
) -> anyhow::Result<()> {
|
||
let planning_tools = build_planning_tools();
|
||
let coordinator_tools = build_coordinator_tools();
|
||
|
||
let mut state = initial_state.unwrap_or_else(AgentState::new);
|
||
|
||
// --- Planning phase loop ---
|
||
// Keep iterating until we transition out of Planning
|
||
for iteration in 0..20 {
|
||
if !matches!(state.phase, AgentPhase::Planning) {
|
||
break;
|
||
}
|
||
|
||
let system_prompt = build_planning_prompt(project_id, instructions);
|
||
let messages = state.build_messages(&system_prompt, requirement);
|
||
let msg_count = messages.len() as i32;
|
||
let tool_count = planning_tools.len() as i32;
|
||
|
||
tracing::info!("[workflow {}] Planning LLM call #{} msgs={}", workflow_id, iteration + 1, messages.len());
|
||
let _ = broadcast_tx.send(WsMessage::ActivityUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
activity: "规划中 — 等待 LLM 响应...".to_string(),
|
||
});
|
||
let call_start = std::time::Instant::now();
|
||
let response = match llm.chat_with_tools(messages, &planning_tools).await {
|
||
Ok(r) => r,
|
||
Err(e) => {
|
||
tracing::error!("[workflow {}] LLM call failed: {}", workflow_id, e);
|
||
return Err(e);
|
||
}
|
||
};
|
||
let latency_ms = call_start.elapsed().as_millis() as i64;
|
||
|
||
let (prompt_tokens, completion_tokens) = response.usage.as_ref()
|
||
.map(|u| (Some(u.prompt_tokens), Some(u.completion_tokens)))
|
||
.unwrap_or((None, None));
|
||
|
||
let choice = response.choices.into_iter().next()
|
||
.ok_or_else(|| anyhow::anyhow!("No response from LLM"))?;
|
||
|
||
state.current_step_chat_history.push(choice.message.clone());
|
||
let llm_text_response = choice.message.content.clone().unwrap_or_default();
|
||
|
||
if let Some(tool_calls) = &choice.message.tool_calls {
|
||
let mut phase_transition = false;
|
||
|
||
for tc in tool_calls {
|
||
if phase_transition {
|
||
state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, "(skipped: phase transition)"));
|
||
continue;
|
||
}
|
||
|
||
let args: serde_json::Value = serde_json::from_str(&tc.function.arguments).unwrap_or_default();
|
||
|
||
match tc.function.name.as_str() {
|
||
"update_plan" => {
|
||
let raw_steps = args["steps"].as_array().cloned().unwrap_or_default();
|
||
state.steps.clear();
|
||
|
||
for (i, item) in raw_steps.iter().enumerate() {
|
||
let order = (i + 1) as i32;
|
||
let title = item["title"].as_str().unwrap_or(item.as_str().unwrap_or("")).to_string();
|
||
let detail = item["description"].as_str().unwrap_or("").to_string();
|
||
state.steps.push(Step {
|
||
order, title, description: detail,
|
||
status: StepStatus::Pending, summary: None,
|
||
user_feedbacks: Vec::new(), db_id: String::new(),
|
||
artifacts: Vec::new(),
|
||
});
|
||
}
|
||
|
||
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
steps: plan_infos_from_state(&state),
|
||
});
|
||
|
||
save_state_snapshot(pool, workflow_id, 0, &state).await;
|
||
tracing::info!("[workflow {}] Plan set ({} steps)", workflow_id, state.steps.len());
|
||
|
||
// If require_plan_approval, wait for user to confirm the plan
|
||
if require_plan_approval {
|
||
tracing::info!("[workflow {}] Waiting for plan approval", workflow_id);
|
||
let _ = broadcast_tx.send(WsMessage::ActivityUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
activity: "计划已生成 — 等待用户确认...".to_string(),
|
||
});
|
||
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
status: "waiting_user".into(),
|
||
});
|
||
let _ = sqlx::query("UPDATE workflows SET status = 'waiting_user' WHERE id = ?")
|
||
.bind(workflow_id)
|
||
.execute(pool)
|
||
.await;
|
||
log_execution(pool, broadcast_tx, workflow_id, 0, "plan_approval", "等待确认计划", "等待用户确认执行计划", "waiting").await;
|
||
|
||
// Block until Comment event
|
||
let approval_content = loop {
|
||
match event_rx.recv().await {
|
||
Some(AgentEvent::Comment { content, .. }) => break content,
|
||
Some(_) => continue,
|
||
None => {
|
||
anyhow::bail!("Event channel closed while waiting for plan approval");
|
||
}
|
||
}
|
||
};
|
||
|
||
tracing::info!("[workflow {}] Plan approval response: {}", workflow_id, approval_content);
|
||
|
||
if approval_content.starts_with("rejected:") {
|
||
let reason = approval_content.strip_prefix("rejected:").unwrap_or("").trim();
|
||
tracing::info!("[workflow {}] Plan rejected: {}", workflow_id, reason);
|
||
log_execution(pool, broadcast_tx, workflow_id, 0, "plan_approval", "rejected", reason, "failed").await;
|
||
|
||
// Feed rejection back into planning conversation so LLM can re-plan
|
||
state.current_step_chat_history.push(ChatMessage::tool_result(
|
||
&tc.id,
|
||
&format!("用户拒绝了此计划: {}。请根据反馈修改计划后重新调用 update_plan。", reason),
|
||
));
|
||
state.steps.clear();
|
||
|
||
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
status: "executing".into(),
|
||
});
|
||
let _ = sqlx::query("UPDATE workflows SET status = 'executing' WHERE id = ?")
|
||
.bind(workflow_id)
|
||
.execute(pool)
|
||
.await;
|
||
// Stay in Planning phase, continue the loop
|
||
continue;
|
||
}
|
||
|
||
// Approved
|
||
let feedback = if approval_content.starts_with("approved:") {
|
||
approval_content.strip_prefix("approved:").unwrap_or("").trim().to_string()
|
||
} else {
|
||
String::new()
|
||
};
|
||
|
||
log_execution(pool, broadcast_tx, workflow_id, 0, "plan_approval", "approved", &feedback, "done").await;
|
||
let _ = broadcast_tx.send(WsMessage::WorkflowStatusUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
status: "executing".into(),
|
||
});
|
||
let _ = sqlx::query("UPDATE workflows SET status = 'executing' WHERE id = ?")
|
||
.bind(workflow_id)
|
||
.execute(pool)
|
||
.await;
|
||
}
|
||
|
||
// Enter execution phase
|
||
if let Some(first) = state.steps.first_mut() {
|
||
first.status = StepStatus::Running;
|
||
}
|
||
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
steps: plan_infos_from_state(&state),
|
||
});
|
||
state.current_step_chat_history.clear();
|
||
state.phase = AgentPhase::Executing { step: 1 };
|
||
phase_transition = true;
|
||
|
||
save_state_snapshot(pool, workflow_id, 0, &state).await;
|
||
tracing::info!("[workflow {}] Entering Executing", workflow_id);
|
||
}
|
||
// Planning phase IO tools
|
||
_ => {
|
||
let result = execute_tool(&tc.function.name, &tc.function.arguments, workdir, exec).await;
|
||
state.current_step_chat_history.push(ChatMessage::tool_result(&tc.id, &result));
|
||
}
|
||
}
|
||
}
|
||
|
||
let tc_json: Vec<serde_json::Value> = tool_calls.iter().map(|tc| {
|
||
serde_json::json!({ "name": tc.function.name, "arguments_preview": truncate_str(&tc.function.arguments, 200) })
|
||
}).collect();
|
||
let tc_json_str = serde_json::to_string(&tc_json).unwrap_or_else(|_| "[]".to_string());
|
||
log_llm_call(pool, broadcast_tx, workflow_id, 0, "planning", msg_count, tool_count,
|
||
&tc_json_str, &llm_text_response, prompt_tokens, completion_tokens, latency_ms).await;
|
||
} else {
|
||
let content = choice.message.content.as_deref().unwrap_or("(no content)");
|
||
tracing::info!("[workflow {}] Planning text response: {}", workflow_id, truncate_str(content, 200));
|
||
log_execution(pool, broadcast_tx, workflow_id, 0, "text_response", "", content, "done").await;
|
||
log_llm_call(pool, broadcast_tx, workflow_id, 0, "planning", msg_count, tool_count,
|
||
"[]", content, prompt_tokens, completion_tokens, latency_ms).await;
|
||
}
|
||
}
|
||
|
||
// --- Executing phase: step isolation loop ---
|
||
while matches!(state.phase, AgentPhase::Executing { .. }) {
|
||
let step_order = match state.first_actionable_step() {
|
||
Some(o) => o,
|
||
None => {
|
||
state.phase = AgentPhase::Completed;
|
||
break;
|
||
}
|
||
};
|
||
|
||
// Mark step as Running
|
||
if let Some(step) = state.steps.iter_mut().find(|s| s.order == step_order) {
|
||
step.status = StepStatus::Running;
|
||
}
|
||
state.phase = AgentPhase::Executing { step: step_order };
|
||
state.current_step_chat_history.clear();
|
||
|
||
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
steps: plan_infos_from_state(&state),
|
||
});
|
||
save_state_snapshot(pool, workflow_id, step_order, &state).await;
|
||
|
||
// Build completed summaries for context
|
||
let completed_summaries: Vec<(i32, String, String, Vec<Artifact>)> = state.steps.iter()
|
||
.filter(|s| matches!(s.status, StepStatus::Done))
|
||
.map(|s| (s.order, s.title.clone(), s.summary.clone().unwrap_or_default(), s.artifacts.clone()))
|
||
.collect();
|
||
|
||
let step = state.steps.iter().find(|s| s.order == step_order).unwrap().clone();
|
||
|
||
tracing::info!("[workflow {}] Starting step {} sub-loop: {}", workflow_id, step_order, step.title);
|
||
|
||
// Run the isolated step sub-loop
|
||
let step_result = run_step_loop(
|
||
llm, exec, pool, broadcast_tx,
|
||
project_id, workflow_id, workdir, mgr,
|
||
instructions, &step, &completed_summaries, &state.scratchpad,
|
||
external_tools, event_rx,
|
||
).await;
|
||
|
||
tracing::info!("[workflow {}] Step {} completed: {:?}", workflow_id, step_order, step_result.status);
|
||
|
||
// Update step status based on result
|
||
match &step_result.status {
|
||
StepResultStatus::Done => {
|
||
if let Some(s) = state.steps.iter_mut().find(|s| s.order == step_order) {
|
||
s.status = StepStatus::Done;
|
||
s.summary = Some(step_result.summary.clone());
|
||
s.artifacts = step_result.artifacts.clone();
|
||
}
|
||
}
|
||
StepResultStatus::Failed { error } => {
|
||
if let Some(s) = state.steps.iter_mut().find(|s| s.order == step_order) {
|
||
s.status = StepStatus::Failed;
|
||
s.summary = Some(step_result.summary.clone());
|
||
}
|
||
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
steps: plan_infos_from_state(&state),
|
||
});
|
||
save_state_snapshot(pool, workflow_id, step_order, &state).await;
|
||
return Err(anyhow::anyhow!("Step {} failed: {}", step_order, error));
|
||
}
|
||
StepResultStatus::NeedsInput { message: _ } => {
|
||
// This shouldn't normally happen since ask_user is handled inside
|
||
// run_step_loop, but handle gracefully
|
||
if let Some(s) = state.steps.iter_mut().find(|s| s.order == step_order) {
|
||
s.status = StepStatus::WaitingUser;
|
||
}
|
||
save_state_snapshot(pool, workflow_id, step_order, &state).await;
|
||
continue;
|
||
}
|
||
}
|
||
|
||
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
steps: plan_infos_from_state(&state),
|
||
});
|
||
save_state_snapshot(pool, workflow_id, step_order, &state).await;
|
||
|
||
// --- Coordinator review ---
|
||
// Check if there are more steps; if not, we're done
|
||
if state.first_actionable_step().is_none() {
|
||
state.phase = AgentPhase::Completed;
|
||
break;
|
||
}
|
||
|
||
// Coordinator LLM reviews the step result and may update the plan
|
||
let coordinator_prompt = build_coordinator_prompt(project_id, instructions);
|
||
let review_message = format!(
|
||
"步骤 {} 「{}」执行完成。\n\n执行摘要:{}\n\n请审视结果。如需修改后续计划请使用 update_plan,否则回复确认继续。",
|
||
step_order, step.title, step_result.summary
|
||
);
|
||
|
||
// Build coordinator context with plan overview + scratchpad
|
||
let mut coordinator_ctx = String::new();
|
||
coordinator_ctx.push_str("## 计划概览\n");
|
||
for s in &state.steps {
|
||
let marker = match s.status {
|
||
StepStatus::Done => " [done]",
|
||
StepStatus::Running => " [running]",
|
||
StepStatus::WaitingUser => " [waiting]",
|
||
StepStatus::Failed => " [FAILED]",
|
||
StepStatus::Pending => "",
|
||
};
|
||
coordinator_ctx.push_str(&format!("{}. {}{}\n", s.order, s.title, marker));
|
||
if let Some(summary) = &s.summary {
|
||
coordinator_ctx.push_str(&format!(" 摘要: {}\n", summary));
|
||
}
|
||
}
|
||
if !state.scratchpad.is_empty() {
|
||
coordinator_ctx.push_str(&format!("\n## 全局备忘录\n{}\n", state.scratchpad));
|
||
}
|
||
|
||
let coord_messages = vec![
|
||
ChatMessage::system(&coordinator_prompt),
|
||
ChatMessage::user(&coordinator_ctx),
|
||
ChatMessage::user(&review_message),
|
||
];
|
||
|
||
// Add to main chat history for context
|
||
state.current_step_chat_history.clear();
|
||
|
||
tracing::info!("[workflow {}] Coordinator review for step {}", workflow_id, step_order);
|
||
let _ = broadcast_tx.send(WsMessage::ActivityUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
activity: format!("步骤 {} 完成 — 协调器审核中...", step_order),
|
||
});
|
||
let call_start = std::time::Instant::now();
|
||
let coord_response = match llm.chat_with_tools(coord_messages.clone(), &coordinator_tools).await {
|
||
Ok(r) => r,
|
||
Err(e) => {
|
||
tracing::warn!("[workflow {}] Coordinator LLM call failed, continuing: {}", workflow_id, e);
|
||
continue; // Non-fatal, just skip review
|
||
}
|
||
};
|
||
let latency_ms = call_start.elapsed().as_millis() as i64;
|
||
|
||
let (prompt_tokens, completion_tokens) = coord_response.usage.as_ref()
|
||
.map(|u| (Some(u.prompt_tokens), Some(u.completion_tokens)))
|
||
.unwrap_or((None, None));
|
||
|
||
log_llm_call(pool, broadcast_tx, workflow_id, step_order, "coordinator",
|
||
coord_messages.len() as i32, coordinator_tools.len() as i32,
|
||
"[]", "", prompt_tokens, completion_tokens, latency_ms).await;
|
||
|
||
if let Some(choice) = coord_response.choices.into_iter().next() {
|
||
if let Some(tool_calls) = &choice.message.tool_calls {
|
||
for tc in tool_calls {
|
||
let args: serde_json::Value = serde_json::from_str(&tc.function.arguments).unwrap_or_default();
|
||
match tc.function.name.as_str() {
|
||
"update_plan" => {
|
||
let raw_steps = args["steps"].as_array().cloned().unwrap_or_default();
|
||
let new_steps: Vec<Step> = raw_steps.iter().enumerate().map(|(i, item)| {
|
||
Step {
|
||
order: (i + 1) as i32,
|
||
title: item["title"].as_str().unwrap_or("").to_string(),
|
||
description: item["description"].as_str().unwrap_or("").to_string(),
|
||
status: StepStatus::Pending, summary: None,
|
||
user_feedbacks: Vec::new(), db_id: String::new(),
|
||
artifacts: Vec::new(),
|
||
}
|
||
}).collect();
|
||
|
||
state.apply_plan_diff(new_steps);
|
||
let _ = broadcast_tx.send(WsMessage::PlanUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
steps: plan_infos_from_state(&state),
|
||
});
|
||
tracing::info!("[workflow {}] Coordinator revised plan", workflow_id);
|
||
save_state_snapshot(pool, workflow_id, step_order, &state).await;
|
||
}
|
||
"update_scratchpad" => {
|
||
let content = args["content"].as_str().unwrap_or("");
|
||
let mut new_pad = state.scratchpad.clone();
|
||
if !new_pad.is_empty() { new_pad.push('\n'); }
|
||
new_pad.push_str(content);
|
||
if check_scratchpad_size(&new_pad).is_ok() {
|
||
state.scratchpad = new_pad;
|
||
}
|
||
}
|
||
"update_requirement" => {
|
||
let new_req = args["requirement"].as_str().unwrap_or("");
|
||
let _ = sqlx::query("UPDATE workflows SET requirement = ? WHERE id = ?")
|
||
.bind(new_req).bind(workflow_id).execute(pool).await;
|
||
let _ = tokio::fs::write(format!("{}/requirement.md", workdir), new_req).await;
|
||
let _ = broadcast_tx.send(WsMessage::RequirementUpdate {
|
||
workflow_id: workflow_id.to_string(),
|
||
requirement: new_req.to_string(),
|
||
});
|
||
}
|
||
_ => {}
|
||
}
|
||
}
|
||
} else {
|
||
// Text response — coordinator is satisfied, continue
|
||
let content = choice.message.content.as_deref().unwrap_or("");
|
||
tracing::info!("[workflow {}] Coordinator: {}", workflow_id, truncate_str(content, 200));
|
||
}
|
||
}
|
||
}
|
||
|
||
// Final snapshot
|
||
save_state_snapshot(pool, workflow_id, state.current_step(), &state).await;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn generate_report(
|
||
llm: &LlmClient,
|
||
requirement: &str,
|
||
entries: &[crate::db::ExecutionLogEntry],
|
||
project_id: &str,
|
||
) -> anyhow::Result<String> {
|
||
let steps_detail: String = entries
|
||
.iter()
|
||
.map(|e| {
|
||
let output_preview = if e.output.len() > 2000 {
|
||
format!("{}...(truncated)", truncate_str(&e.output, 2000))
|
||
} else {
|
||
e.output.clone()
|
||
};
|
||
format!(
|
||
"### [{}] {} (step {})\nInput: `{}`\nOutput:\n```\n{}\n```\n",
|
||
e.status, e.tool_name, e.step_order, truncate_str(&e.tool_input, 500), output_preview
|
||
)
|
||
})
|
||
.collect::<Vec<_>>()
|
||
.join("\n");
|
||
|
||
let system_prompt = include_str!("prompts/report.md")
|
||
.replace("{project_id}", project_id);
|
||
|
||
let user_msg = format!(
|
||
"需求:\n{}\n\n执行详情:\n{}",
|
||
requirement, steps_detail
|
||
);
|
||
|
||
let report = llm
|
||
.chat(vec![
|
||
ChatMessage::system(&system_prompt),
|
||
ChatMessage::user(&user_msg),
|
||
])
|
||
.await?;
|
||
|
||
Ok(report)
|
||
}
|
||
|
||
async fn generate_title(llm: &LlmClient, requirement: &str) -> anyhow::Result<String> {
|
||
let response = llm
|
||
.chat(vec![
|
||
ChatMessage::system("为给定的需求生成一个简短的项目标题(最多15个汉字)。只回复标题本身,不要加任何其他内容。使用中文。"),
|
||
ChatMessage::user(requirement),
|
||
])
|
||
.await?;
|
||
|
||
let mut title = response.trim().trim_matches('"').to_string();
|
||
// Hard limit: if LLM returns garbage, take only the first line, max 80 chars
|
||
if let Some(first_line) = title.lines().next() {
|
||
title = first_line.to_string();
|
||
}
|
||
if title.len() > 80 {
|
||
title = truncate_str(&title, 80).to_string();
|
||
}
|
||
Ok(title)
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
use crate::state::{Step, StepStatus};
|
||
|
||
fn make_step(order: i32, title: &str, desc: &str, status: StepStatus) -> Step {
|
||
Step {
|
||
order,
|
||
title: title.into(),
|
||
description: desc.into(),
|
||
status,
|
||
summary: None,
|
||
user_feedbacks: Vec::new(),
|
||
db_id: String::new(),
|
||
artifacts: Vec::new(),
|
||
}
|
||
}
|
||
|
||
// --- build_step_user_message ---
|
||
|
||
#[test]
|
||
fn step_msg_basic() {
|
||
let step = make_step(1, "Setup env", "Install dependencies", StepStatus::Running);
|
||
let msg = build_step_user_message(&step, &[], "");
|
||
|
||
assert!(msg.contains("## 当前步骤(步骤 1)"));
|
||
assert!(msg.contains("标题:Setup env"));
|
||
assert!(msg.contains("描述:Install dependencies"));
|
||
// No completed summaries or scratchpad sections
|
||
assert!(!msg.contains("已完成步骤摘要"));
|
||
assert!(!msg.contains("项目备忘录"));
|
||
}
|
||
|
||
#[test]
|
||
fn step_msg_with_completed_summaries() {
|
||
let step = make_step(3, "Deploy", "Push to prod", StepStatus::Running);
|
||
let summaries = vec![
|
||
(1, "Setup".into(), "Installed deps".into(), Vec::new()),
|
||
(2, "Build".into(), "Compiled OK".into(), Vec::new()),
|
||
];
|
||
let msg = build_step_user_message(&step, &summaries, "");
|
||
|
||
assert!(msg.contains("## 已完成步骤摘要"));
|
||
assert!(msg.contains("步骤 1 (Setup): Installed deps"));
|
||
assert!(msg.contains("步骤 2 (Build): Compiled OK"));
|
||
}
|
||
|
||
#[test]
|
||
fn step_msg_with_parent_scratchpad() {
|
||
let step = make_step(2, "Build", "compile", StepStatus::Running);
|
||
let msg = build_step_user_message(&step, &[], "DB_HOST=localhost\nDB_PORT=5432");
|
||
|
||
assert!(msg.contains("## 项目备忘录(只读)"));
|
||
assert!(msg.contains("DB_HOST=localhost"));
|
||
assert!(msg.contains("DB_PORT=5432"));
|
||
}
|
||
|
||
#[test]
|
||
fn step_msg_with_user_feedback() {
|
||
let step = Step {
|
||
user_feedbacks: vec!["Use Python 3.12".into(), "Skip linting".into()],
|
||
..make_step(1, "Setup", "setup env", StepStatus::Running)
|
||
};
|
||
let msg = build_step_user_message(&step, &[], "");
|
||
|
||
assert!(msg.contains("用户反馈"));
|
||
assert!(msg.contains("- Use Python 3.12"));
|
||
assert!(msg.contains("- Skip linting"));
|
||
}
|
||
|
||
#[test]
|
||
fn step_msg_full_context() {
|
||
let step = Step {
|
||
user_feedbacks: vec!["add caching".into()],
|
||
..make_step(3, "API", "build REST API", StepStatus::Running)
|
||
};
|
||
let summaries = vec![
|
||
(1, "DB".into(), "Schema created".into(), Vec::new()),
|
||
(2, "Models".into(), "ORM models done".into(), Vec::new()),
|
||
];
|
||
let msg = build_step_user_message(&step, &summaries, "tech_stack=FastAPI");
|
||
|
||
// All sections present
|
||
assert!(msg.contains("## 当前步骤(步骤 3)"));
|
||
assert!(msg.contains("## 已完成步骤摘要"));
|
||
assert!(msg.contains("## 项目备忘录(只读)"));
|
||
assert!(msg.contains("用户反馈"));
|
||
// Content correct
|
||
assert!(msg.contains("build REST API"));
|
||
assert!(msg.contains("Schema created"));
|
||
assert!(msg.contains("tech_stack=FastAPI"));
|
||
assert!(msg.contains("add caching"));
|
||
}
|
||
|
||
// --- truncate_str ---
|
||
|
||
#[test]
|
||
fn truncate_short_noop() {
|
||
assert_eq!(truncate_str("hello", 10), "hello");
|
||
}
|
||
|
||
#[test]
|
||
fn truncate_exact() {
|
||
assert_eq!(truncate_str("hello", 5), "hello");
|
||
}
|
||
|
||
#[test]
|
||
fn truncate_cuts() {
|
||
assert_eq!(truncate_str("hello world", 5), "hello");
|
||
}
|
||
|
||
#[test]
|
||
fn truncate_respects_char_boundary() {
|
||
let s = "你好世界"; // each char is 3 bytes
|
||
// 7 bytes → should cut to 6 (2 chars)
|
||
let t = truncate_str(s, 7);
|
||
assert_eq!(t, "你好");
|
||
assert_eq!(t.len(), 6);
|
||
}
|
||
|
||
// --- tool definitions ---
|
||
|
||
#[test]
|
||
fn step_tools_have_step_done() {
|
||
let tools = build_step_tools();
|
||
let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect();
|
||
assert!(names.contains(&"step_done"), "step_done must be in step tools");
|
||
assert!(!names.contains(&"advance_step"), "advance_step must NOT be in step tools");
|
||
assert!(!names.contains(&"update_plan"), "update_plan must NOT be in step tools");
|
||
assert!(!names.contains(&"update_requirement"), "update_requirement must NOT be in step tools");
|
||
}
|
||
|
||
#[test]
|
||
fn step_tools_have_execution_tools() {
|
||
let tools = build_step_tools();
|
||
let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect();
|
||
for expected in &["execute", "read_file", "write_file", "list_files",
|
||
"start_service", "stop_service", "update_scratchpad",
|
||
"ask_user", "kb_search", "kb_read"] {
|
||
assert!(names.contains(expected), "{} must be in step tools", expected);
|
||
}
|
||
}
|
||
|
||
#[test]
|
||
fn coordinator_tools_correct() {
|
||
let tools = build_coordinator_tools();
|
||
let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect();
|
||
assert!(names.contains(&"update_plan"));
|
||
assert!(names.contains(&"update_scratchpad"));
|
||
assert!(names.contains(&"update_requirement"));
|
||
// Must NOT have execution tools
|
||
assert!(!names.contains(&"execute"));
|
||
assert!(!names.contains(&"step_done"));
|
||
assert!(!names.contains(&"advance_step"));
|
||
}
|
||
|
||
#[test]
|
||
fn planning_tools_correct() {
|
||
let tools = build_planning_tools();
|
||
let names: Vec<&str> = tools.iter().map(|t| t.function.name.as_str()).collect();
|
||
assert!(names.contains(&"update_plan"));
|
||
assert!(names.contains(&"list_files"));
|
||
assert!(names.contains(&"read_file"));
|
||
assert!(names.contains(&"kb_search"));
|
||
assert!(names.contains(&"kb_read"));
|
||
assert!(!names.contains(&"execute"));
|
||
assert!(!names.contains(&"step_done"));
|
||
}
|
||
|
||
// --- plan_infos_from_state ---
|
||
|
||
#[test]
|
||
fn plan_infos_maps_correctly() {
|
||
let state = AgentState {
|
||
phase: AgentPhase::Executing { step: 2 },
|
||
steps: vec![
|
||
Step { status: StepStatus::Done, summary: Some("done".into()),
|
||
..make_step(1, "A", "desc A", StepStatus::Done) },
|
||
make_step(2, "B", "desc B", StepStatus::Running),
|
||
],
|
||
current_step_chat_history: Vec::new(),
|
||
scratchpad: String::new(),
|
||
};
|
||
|
||
let infos = plan_infos_from_state(&state);
|
||
assert_eq!(infos.len(), 2);
|
||
assert_eq!(infos[0].order, 1);
|
||
assert_eq!(infos[0].description, "A"); // title maps to description field
|
||
assert_eq!(infos[0].command, "desc A"); // description maps to command field
|
||
assert_eq!(infos[0].status.as_deref(), Some("done"));
|
||
assert_eq!(infos[1].status.as_deref(), Some("running"));
|
||
}
|
||
}
|