Agent loop state machine refactor, unified LLM interface, and UI improvements

- Rewrite agent loop as Planning→Executing(N)→Completed state machine with
  per-step context isolation to prevent token explosion
- Split tools and prompts by phase (planning vs execution)
- Add advance_step/save_memo tools for step transitions and cross-step memory
- Unify LLM interface: remove duplicate types, single chat_with_tools path
- Add UTF-8 safe truncation (truncate_str) to prevent panics on Chinese text
- Extract CreateForm component, add auto-scroll to execution log
- Add report generation with app access URL, non-blocking title generation
- Add timer system, file serving, app proxy, exec module
- Update Dockerfile with uv, deployment config

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-28 22:35:33 +00:00
parent e2d5a6a7eb
commit 2df4e12d30
31 changed files with 3924 additions and 571 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,12 +1,115 @@
mod projects;
mod timers;
mod workflows;
use std::sync::Arc;
use axum::Router;
use axum::{
body::Body,
extract::{Path, State, Request},
http::StatusCode,
response::{IntoResponse, Response},
routing::{get, any},
Router,
};
use crate::AppState;
pub fn router(state: Arc<AppState>) -> Router {
Router::new()
.merge(projects::router(state.clone()))
.merge(workflows::router(state))
.merge(workflows::router(state.clone()))
.merge(timers::router(state.clone()))
.route("/projects/{id}/files/{*path}", get(serve_project_file))
.route("/projects/{id}/app/{*path}", any(proxy_to_service).with_state(state.clone()))
.route("/projects/{id}/app/", any(proxy_to_service_root).with_state(state))
}
async fn proxy_to_service_root(
State(state): State<Arc<AppState>>,
Path(project_id): Path<String>,
req: Request<Body>,
) -> Response {
proxy_impl(&state, &project_id, "/", req).await
}
async fn proxy_to_service(
State(state): State<Arc<AppState>>,
Path((project_id, path)): Path<(String, String)>,
req: Request<Body>,
) -> Response {
proxy_impl(&state, &project_id, &format!("/{}", path), req).await
}
async fn proxy_impl(
state: &AppState,
project_id: &str,
path: &str,
req: Request<Body>,
) -> Response {
let port = match state.agent_mgr.get_service_port(project_id).await {
Some(p) => p,
None => return (StatusCode::SERVICE_UNAVAILABLE, "服务未启动").into_response(),
};
let query = req.uri().query().map(|q| format!("?{}", q)).unwrap_or_default();
let url = format!("http://127.0.0.1:{}{}{}", port, path, query);
let client = reqwest::Client::new();
let method = req.method().clone();
let headers = req.headers().clone();
let body_bytes = match axum::body::to_bytes(req.into_body(), 10 * 1024 * 1024).await {
Ok(b) => b,
Err(_) => return (StatusCode::BAD_REQUEST, "请求体过大").into_response(),
};
let mut upstream_req = client.request(method, &url);
for (key, val) in headers.iter() {
if key != "host" {
upstream_req = upstream_req.header(key, val);
}
}
upstream_req = upstream_req.body(body_bytes);
match upstream_req.send().await {
Ok(resp) => {
let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
let resp_headers = resp.headers().clone();
let body = resp.bytes().await.unwrap_or_default();
let mut response = (status, body).into_response();
for (key, val) in resp_headers.iter() {
if let Ok(name) = axum::http::header::HeaderName::from_bytes(key.as_ref()) {
response.headers_mut().insert(name, val.clone());
}
}
response
}
Err(_) => (StatusCode::BAD_GATEWAY, "无法连接到后端服务").into_response(),
}
}
async fn serve_project_file(
Path((project_id, file_path)): Path<(String, String)>,
) -> Response {
let full_path = std::path::PathBuf::from("/app/data/workspaces")
.join(&project_id)
.join(&file_path);
// Prevent path traversal
if file_path.contains("..") {
return (StatusCode::BAD_REQUEST, "Invalid path").into_response();
}
match tokio::fs::read(&full_path).await {
Ok(bytes) => {
let mime = mime_guess::from_path(&full_path)
.first_or_octet_stream()
.to_string();
(
[(axum::http::header::CONTENT_TYPE, mime)],
bytes,
).into_response()
}
Err(_) => (StatusCode::NOT_FOUND, "File not found").into_response(),
}
}

147
src/api/timers.rs Normal file
View File

@@ -0,0 +1,147 @@
use std::sync::Arc;
use axum::{
extract::{Path, State},
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
Json, Router,
};
use serde::Deserialize;
use crate::AppState;
use crate::db::Timer;
type ApiResult<T> = Result<Json<T>, Response>;
fn db_err(e: sqlx::Error) -> Response {
tracing::error!("Database error: {}", e);
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
}
#[derive(Deserialize)]
pub struct CreateTimer {
pub name: String,
pub interval_secs: i64,
pub requirement: String,
}
#[derive(Deserialize)]
pub struct UpdateTimer {
pub name: Option<String>,
pub interval_secs: Option<i64>,
pub requirement: Option<String>,
pub enabled: Option<bool>,
}
pub fn router(state: Arc<AppState>) -> Router {
Router::new()
.route("/projects/{id}/timers", get(list_timers).post(create_timer))
.route("/timers/{id}", get(get_timer).put(update_timer).delete(delete_timer))
.with_state(state)
}
async fn list_timers(
State(state): State<Arc<AppState>>,
Path(project_id): Path<String>,
) -> ApiResult<Vec<Timer>> {
sqlx::query_as::<_, Timer>(
"SELECT * FROM timers WHERE project_id = ? ORDER BY created_at DESC"
)
.bind(&project_id)
.fetch_all(&state.db.pool)
.await
.map(Json)
.map_err(db_err)
}
async fn create_timer(
State(state): State<Arc<AppState>>,
Path(project_id): Path<String>,
Json(input): Json<CreateTimer>,
) -> ApiResult<Timer> {
if input.interval_secs < 60 {
return Err((StatusCode::BAD_REQUEST, "Minimum interval is 60 seconds").into_response());
}
let id = uuid::Uuid::new_v4().to_string();
sqlx::query_as::<_, Timer>(
"INSERT INTO timers (id, project_id, name, interval_secs, requirement) VALUES (?, ?, ?, ?, ?) RETURNING *"
)
.bind(&id)
.bind(&project_id)
.bind(&input.name)
.bind(input.interval_secs)
.bind(&input.requirement)
.fetch_one(&state.db.pool)
.await
.map(Json)
.map_err(db_err)
}
async fn get_timer(
State(state): State<Arc<AppState>>,
Path(timer_id): Path<String>,
) -> ApiResult<Timer> {
sqlx::query_as::<_, Timer>("SELECT * FROM timers WHERE id = ?")
.bind(&timer_id)
.fetch_optional(&state.db.pool)
.await
.map_err(db_err)?
.map(Json)
.ok_or_else(|| (StatusCode::NOT_FOUND, "Timer not found").into_response())
}
async fn update_timer(
State(state): State<Arc<AppState>>,
Path(timer_id): Path<String>,
Json(input): Json<UpdateTimer>,
) -> ApiResult<Timer> {
// Fetch existing
let existing = sqlx::query_as::<_, Timer>("SELECT * FROM timers WHERE id = ?")
.bind(&timer_id)
.fetch_optional(&state.db.pool)
.await
.map_err(db_err)?;
let Some(existing) = existing else {
return Err((StatusCode::NOT_FOUND, "Timer not found").into_response());
};
let name = input.name.unwrap_or(existing.name);
let interval_secs = input.interval_secs.unwrap_or(existing.interval_secs);
let requirement = input.requirement.unwrap_or(existing.requirement);
let enabled = input.enabled.unwrap_or(existing.enabled);
if interval_secs < 60 {
return Err((StatusCode::BAD_REQUEST, "Minimum interval is 60 seconds").into_response());
}
sqlx::query_as::<_, Timer>(
"UPDATE timers SET name = ?, interval_secs = ?, requirement = ?, enabled = ? WHERE id = ? RETURNING *"
)
.bind(&name)
.bind(interval_secs)
.bind(&requirement)
.bind(enabled)
.bind(&timer_id)
.fetch_one(&state.db.pool)
.await
.map(Json)
.map_err(db_err)
}
async fn delete_timer(
State(state): State<Arc<AppState>>,
Path(timer_id): Path<String>,
) -> Result<StatusCode, Response> {
let result = sqlx::query("DELETE FROM timers WHERE id = ?")
.bind(&timer_id)
.execute(&state.db.pool)
.await
.map_err(db_err)?;
if result.rows_affected() > 0 {
Ok(StatusCode::NO_CONTENT)
} else {
Err((StatusCode::NOT_FOUND, "Timer not found").into_response())
}
}

View File

@@ -11,6 +11,11 @@ use crate::AppState;
use crate::agent::AgentEvent;
use crate::db::{Workflow, PlanStep, Comment};
#[derive(serde::Serialize)]
struct ReportResponse {
report: String,
}
type ApiResult<T> = Result<Json<T>, Response>;
fn db_err(e: sqlx::Error) -> Response {
@@ -33,6 +38,7 @@ pub fn router(state: Arc<AppState>) -> Router {
.route("/projects/{id}/workflows", get(list_workflows).post(create_workflow))
.route("/workflows/{id}/steps", get(list_steps))
.route("/workflows/{id}/comments", get(list_comments).post(create_comment))
.route("/workflows/{id}/report", get(get_report))
.with_state(state)
}
@@ -134,3 +140,22 @@ async fn create_comment(
Ok(Json(comment))
}
async fn get_report(
State(state): State<Arc<AppState>>,
Path(workflow_id): Path<String>,
) -> Result<Json<ReportResponse>, Response> {
let wf = sqlx::query_as::<_, Workflow>(
"SELECT * FROM workflows WHERE id = ?"
)
.bind(&workflow_id)
.fetch_optional(&state.db.pool)
.await
.map_err(db_err)?;
match wf {
Some(w) if !w.report.is_empty() => Ok(Json(ReportResponse { report: w.report })),
Some(_) => Err((StatusCode::NOT_FOUND, "Report not yet generated").into_response()),
None => Err((StatusCode::NOT_FOUND, "Workflow not found").into_response()),
}
}

View File

@@ -47,6 +47,7 @@ impl Database {
workflow_id TEXT NOT NULL REFERENCES workflows(id),
step_order INTEGER NOT NULL,
description TEXT NOT NULL,
command TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'pending',
output TEXT NOT NULL DEFAULT ''
)"
@@ -65,6 +66,49 @@ impl Database {
.execute(&self.pool)
.await?;
// Migration: add report column to workflows
let _ = sqlx::query(
"ALTER TABLE workflows ADD COLUMN report TEXT NOT NULL DEFAULT ''"
)
.execute(&self.pool)
.await;
// Migration: add created_at to plan_steps
let _ = sqlx::query(
"ALTER TABLE plan_steps ADD COLUMN created_at TEXT NOT NULL DEFAULT ''"
)
.execute(&self.pool)
.await;
// Migration: add kind to plan_steps ('plan' or 'log')
let _ = sqlx::query(
"ALTER TABLE plan_steps ADD COLUMN kind TEXT NOT NULL DEFAULT 'log'"
)
.execute(&self.pool)
.await;
// Migration: add plan_step_id to plan_steps (log entries reference their parent plan step)
let _ = sqlx::query(
"ALTER TABLE plan_steps ADD COLUMN plan_step_id TEXT NOT NULL DEFAULT ''"
)
.execute(&self.pool)
.await;
sqlx::query(
"CREATE TABLE IF NOT EXISTS timers (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL REFERENCES projects(id),
name TEXT NOT NULL,
interval_secs INTEGER NOT NULL,
requirement TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
last_run_at TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)"
)
.execute(&self.pool)
.await?;
Ok(())
}
}
@@ -85,6 +129,7 @@ pub struct Workflow {
pub requirement: String,
pub status: String,
pub created_at: String,
pub report: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
@@ -93,8 +138,12 @@ pub struct PlanStep {
pub workflow_id: String,
pub step_order: i32,
pub description: String,
pub command: String,
pub status: String,
pub output: String,
pub created_at: String,
pub kind: String,
pub plan_step_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
@@ -104,3 +153,15 @@ pub struct Comment {
pub content: String,
pub created_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Timer {
pub id: String,
pub project_id: String,
pub name: String,
pub interval_secs: i64,
pub requirement: String,
pub enabled: bool,
pub last_run_at: String,
pub created_at: String,
}

40
src/exec.rs Normal file
View File

@@ -0,0 +1,40 @@
pub struct LocalExecutor;
impl LocalExecutor {
pub fn new() -> Self {
Self
}
pub async fn execute(&self, command: &str, workdir: &str) -> anyhow::Result<ExecResult> {
// Ensure workdir exists
tokio::fs::create_dir_all(workdir).await?;
// Prepend venv bin to PATH so `python3`/`pip` resolve to venv
let venv_bin = format!("{}/.venv/bin", workdir);
let path = match std::env::var("PATH") {
Ok(p) => format!("{}:{}", venv_bin, p),
Err(_) => venv_bin,
};
let output = tokio::process::Command::new("sh")
.arg("-c")
.arg(command)
.current_dir(workdir)
.env("PATH", &path)
.env("VIRTUAL_ENV", format!("{}/.venv", workdir))
.output()
.await?;
Ok(ExecResult {
stdout: String::from_utf8_lossy(&output.stdout).to_string(),
stderr: String::from_utf8_lossy(&output.stderr).to_string(),
exit_code: output.status.code().unwrap_or(-1),
})
}
}
pub struct ExecResult {
pub stdout: String,
pub stderr: String,
pub exit_code: i32,
}

View File

@@ -10,22 +10,73 @@ pub struct LlmClient {
struct ChatRequest {
model: String,
messages: Vec<ChatMessage>,
#[serde(skip_serializing_if = "Vec::is_empty")]
tools: Vec<Tool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatMessage {
pub role: String,
pub content: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub content: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_calls: Option<Vec<ToolCall>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_call_id: Option<String>,
}
impl ChatMessage {
pub fn system(content: &str) -> Self {
Self { role: "system".into(), content: Some(content.into()), tool_calls: None, tool_call_id: None }
}
pub fn user(content: &str) -> Self {
Self { role: "user".into(), content: Some(content.into()), tool_calls: None, tool_call_id: None }
}
pub fn tool_result(tool_call_id: &str, content: &str) -> Self {
Self { role: "tool".into(), content: Some(content.into()), tool_calls: None, tool_call_id: Some(tool_call_id.into()) }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Tool {
#[serde(rename = "type")]
pub tool_type: String,
pub function: ToolFunction,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolFunction {
pub name: String,
pub description: String,
pub parameters: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolCall {
pub id: String,
#[serde(rename = "type")]
pub call_type: String,
pub function: ToolCallFunction,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolCallFunction {
pub name: String,
pub arguments: String,
}
#[derive(Debug, Deserialize)]
struct ChatResponse {
choices: Vec<Choice>,
pub struct ChatResponse {
pub choices: Vec<ChatChoice>,
}
#[derive(Debug, Deserialize)]
struct Choice {
message: ChatMessage,
pub struct ChatChoice {
pub message: ChatMessage,
#[allow(dead_code)]
pub finish_reason: Option<String>,
}
impl LlmClient {
@@ -36,21 +87,42 @@ impl LlmClient {
}
}
/// Simple chat without tools — returns content string
pub async fn chat(&self, messages: Vec<ChatMessage>) -> anyhow::Result<String> {
let resp = self.client
.post(format!("{}/chat/completions", self.config.base_url))
let resp = self.chat_with_tools(messages, &[]).await?;
Ok(resp.choices.into_iter().next()
.and_then(|c| c.message.content)
.unwrap_or_default())
}
/// Chat with tool definitions — returns full response for tool-calling loop
pub async fn chat_with_tools(&self, messages: Vec<ChatMessage>, tools: &[Tool]) -> anyhow::Result<ChatResponse> {
let url = format!("{}/chat/completions", self.config.base_url);
tracing::debug!("LLM request to {} model={} messages={} tools={}", url, self.config.model, messages.len(), tools.len());
let http_resp = self.client
.post(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key))
.json(&ChatRequest {
model: self.config.model.clone(),
messages,
tools: tools.to_vec(),
})
.send()
.await?
.json::<ChatResponse>()
.await?;
Ok(resp.choices.first()
.map(|c| c.message.content.clone())
.unwrap_or_default())
let status = http_resp.status();
if !status.is_success() {
let body = http_resp.text().await.unwrap_or_default();
tracing::error!("LLM API error {}: {}", status, &body[..body.len().min(500)]);
anyhow::bail!("LLM API error {}: {}", status, body);
}
let body = http_resp.text().await?;
let resp: ChatResponse = serde_json::from_str(&body).map_err(|e| {
tracing::error!("LLM response parse error: {}. Body: {}", e, &body[..body.len().min(500)]);
anyhow::anyhow!("Failed to parse LLM response: {}", e)
})?;
Ok(resp)
}
}

View File

@@ -2,13 +2,14 @@ mod api;
mod agent;
mod db;
mod llm;
mod ssh;
mod exec;
mod timer;
mod ws;
use std::sync::Arc;
use axum::Router;
use tower_http::cors::CorsLayer;
use tower_http::services::ServeDir;
use tower_http::services::{ServeDir, ServeFile};
pub struct AppState {
pub db: db::Database,
@@ -19,7 +20,6 @@ pub struct AppState {
#[derive(Debug, Clone, serde::Deserialize)]
pub struct Config {
pub llm: LlmConfig,
pub ssh: SshConfig,
pub server: ServerConfig,
pub database: DatabaseConfig,
}
@@ -31,13 +31,6 @@ pub struct LlmConfig {
pub model: String,
}
#[derive(Debug, Clone, serde::Deserialize)]
pub struct SshConfig {
pub host: String,
pub user: String,
pub key_path: String,
}
#[derive(Debug, Clone, serde::Deserialize)]
pub struct ServerConfig {
pub host: String,
@@ -66,9 +59,10 @@ async fn main() -> anyhow::Result<()> {
let agent_mgr = agent::AgentManager::new(
database.pool.clone(),
config.llm.clone(),
config.ssh.clone(),
);
timer::start_timer_runner(database.pool.clone(), agent_mgr.clone());
let state = Arc::new(AppState {
db: database,
config: config.clone(),
@@ -78,7 +72,7 @@ async fn main() -> anyhow::Result<()> {
let app = Router::new()
.nest("/api", api::router(state))
.nest("/ws", ws::router(agent_mgr))
.fallback_service(ServeDir::new("web/dist"))
.fallback_service(ServeDir::new("web/dist").fallback(ServeFile::new("web/dist/index.html")))
.layer(CorsLayer::permissive());
let addr = format!("{}:{}", config.server.host, config.server.port);

View File

@@ -1,36 +0,0 @@
use crate::SshConfig;
pub struct SshExecutor {
config: SshConfig,
}
impl SshExecutor {
pub fn new(config: &SshConfig) -> Self {
Self {
config: config.clone(),
}
}
pub async fn execute(&self, command: &str) -> anyhow::Result<SshResult> {
let output = tokio::process::Command::new("ssh")
.arg("-i")
.arg(&self.config.key_path)
.arg("-o").arg("StrictHostKeyChecking=no")
.arg(format!("{}@{}", self.config.user, self.config.host))
.arg(command)
.output()
.await?;
Ok(SshResult {
stdout: String::from_utf8_lossy(&output.stdout).to_string(),
stderr: String::from_utf8_lossy(&output.stderr).to_string(),
exit_code: output.status.code().unwrap_or(-1),
})
}
}
pub struct SshResult {
pub stdout: String,
pub stderr: String,
pub exit_code: i32,
}

75
src/timer.rs Normal file
View File

@@ -0,0 +1,75 @@
use std::sync::Arc;
use sqlx::sqlite::SqlitePool;
use crate::agent::{AgentEvent, AgentManager};
use crate::db::Timer;
pub fn start_timer_runner(pool: SqlitePool, agent_mgr: Arc<AgentManager>) {
tokio::spawn(timer_loop(pool, agent_mgr));
}
async fn timer_loop(pool: SqlitePool, agent_mgr: Arc<AgentManager>) {
tracing::info!("Timer runner started");
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
if let Err(e) = check_timers(&pool, &agent_mgr).await {
tracing::error!("Timer check error: {}", e);
}
}
}
async fn check_timers(pool: &SqlitePool, agent_mgr: &Arc<AgentManager>) -> anyhow::Result<()> {
let timers = sqlx::query_as::<_, Timer>(
"SELECT * FROM timers WHERE enabled = 1"
)
.fetch_all(pool)
.await?;
let now = chrono::Utc::now();
for timer in timers {
let due = if timer.last_run_at.is_empty() {
true
} else if let Ok(last) = chrono::NaiveDateTime::parse_from_str(&timer.last_run_at, "%Y-%m-%d %H:%M:%S") {
let last_utc = last.and_utc();
let elapsed = now.signed_duration_since(last_utc).num_seconds();
elapsed >= timer.interval_secs
} else {
true
};
if !due {
continue;
}
tracing::info!("Timer '{}' fired for project {}", timer.name, timer.project_id);
// Update last_run_at
let now_str = now.format("%Y-%m-%d %H:%M:%S").to_string();
let _ = sqlx::query("UPDATE timers SET last_run_at = ? WHERE id = ?")
.bind(&now_str)
.bind(&timer.id)
.execute(pool)
.await;
// Create a workflow for this timer
let workflow_id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT INTO workflows (id, project_id, requirement) VALUES (?, ?, ?)"
)
.bind(&workflow_id)
.bind(&timer.project_id)
.bind(&timer.requirement)
.execute(pool)
.await;
// Send event to agent
agent_mgr.send_event(&timer.project_id, AgentEvent::NewRequirement {
workflow_id,
requirement: timer.requirement.clone(),
}).await;
}
Ok(())
}