feat: file sync from worker to server + report.md convention

- Add AgentUpdate::FileSync (base64-encoded) for file transfer
- Worker syncs all workspace files to server after workflow completes
  (skips .venv, __pycache__, .git, node_modules, files > 1MB)
- Server writes synced files to /app/data/workspaces/{project_id}/
- Remove report field from WorkflowComplete (use report.md convention)
- Update prompts: last step should generate report.md, remove KB/worker tool references
This commit is contained in:
2026-04-06 19:41:17 +01:00
parent 1958b5219b
commit 214d2a2338
4 changed files with 68 additions and 41 deletions

View File

@@ -9,6 +9,7 @@
- 每个步骤应是一个逻辑阶段(如"搭建环境"、"实现后端 API"),而非具体命令 - 每个步骤应是一个逻辑阶段(如"搭建环境"、"实现后端 API"),而非具体命令
- 每个步骤包含简短标题和详细描述 - 每个步骤包含简短标题和详细描述
- 步骤数量合理(通常 3-8 步) - 步骤数量合理(通常 3-8 步)
- **最后一步必须是"总结与报告"**:生成 `report.md` 文件,总结完成情况、关键结果和使用说明
【关键规则】如果下方「项目模板指令」中已经定义了明确的步骤(如"步骤 1"、"步骤 2"……),你**必须**将模板中定义的所有步骤**完整映射**到 update_plan 的步骤中,**不得跳过或合并**。模板步骤是强制性的,不是建议。 【关键规则】如果下方「项目模板指令」中已经定义了明确的步骤(如"步骤 1"、"步骤 2"……),你**必须**将模板中定义的所有步骤**完整映射**到 update_plan 的步骤中,**不得跳过或合并**。模板步骤是强制性的,不是建议。
@@ -17,14 +18,5 @@
环境信息: 环境信息:
- 工作目录是独立的项目工作区Python venv 已预先激活(.venv/ - 工作目录是独立的项目工作区Python venv 已预先激活(.venv/
- 可用工具bash、git、curl、uv - 可用工具bash、git、curl、uv
- 静态文件访问:/api/projects/{project_id}/files/{filename}
- 后台服务访问:/api/projects/{project_id}/app/(反向代理,路径会被转发到应用的 /
【重要】反向代理注意事项:
- 用户通过 /api/projects/{project_id}/app/ 访问应用,请求被代理到应用的 / 路径
- 因此前端 HTML 中的所有 API 请求必须使用【不带开头 / 的相对路径】
- 正确示例fetch('todos') 或 fetch('./todos') 错误示例fetch('/todos') 或 fetch('/api/todos')
- HTML 中的 <base> 标签不需要设置,只要不用绝对路径就行
- 知识库工具kb_search(query) 搜索相关片段kb_read() 读取全文
请使用中文回复。 请使用中文回复。

View File

@@ -5,9 +5,6 @@
- execute执行 shell 命令 - execute执行 shell 命令
- read_file / write_file / list_files文件操作 - read_file / write_file / list_files文件操作
- start_service / stop_service管理后台服务 - start_service / stop_service管理后台服务
- kb_search / kb_read搜索和读取知识库
- list_workers列出已注册的远程 worker 节点及其硬件/软件信息
- execute_on_worker(worker, script, timeout):在远程 worker 上执行脚本
- update_scratchpad记录本步骤内的中间状态步骤结束后丢弃精华写进 summary - update_scratchpad记录本步骤内的中间状态步骤结束后丢弃精华写进 summary
- ask_user向用户提问暂停执行等待用户回复 - ask_user向用户提问暂停执行等待用户回复
- step_done**完成当前步骤时必须调用**,提供本步骤的工作摘要 - step_done**完成当前步骤时必须调用**,提供本步骤的工作摘要
@@ -25,31 +22,11 @@
- 完成步骤时,用 `step_done``artifacts` 参数声明本步骤产出的文件。每个产出物需要 name、path、type (file/json/markdown) - 完成步骤时,用 `step_done``artifacts` 参数声明本步骤产出的文件。每个产出物需要 name、path、type (file/json/markdown)
- 需要用户确认或输入时使用 ask_user(question) - 需要用户确认或输入时使用 ask_user(question)
- update_scratchpad 用于记录本步骤内的中间状态,是工作记忆而非日志,只保留当前有用的信息 - update_scratchpad 用于记录本步骤内的中间状态,是工作记忆而非日志,只保留当前有用的信息
- **如果当前步骤是最后一步(或包含"报告/总结"**,请生成 `report.md`,用 Markdown 格式总结整个项目的完成情况、关键结果和使用说明
## 环境信息 ## 环境信息
- 工作目录是独立的项目工作区Python venv 已预先激活(.venv/ - 工作目录是独立的项目工作区Python venv 已预先激活(.venv/
- 使用 `uv add <包名>``pip install <包名>` 安装依赖 - 使用 `uv add <包名>``pip install <包名>` 安装依赖
- 静态文件访问:/api/projects/{project_id}/files/{filename}
- 后台服务访问:/api/projects/{project_id}/app/(启动命令需监听 0.0.0.0:$PORT
- 【重要】应用通过反向代理访问,前端 HTML/JS 中的 fetch/XHR 请求必须使用相对路径(如 fetch('todos')),绝对不能用 / 开头的路径(如 fetch('/todos')),否则会 404
## 远程 Worker
可以通过 `list_workers` 查看所有已注册的远程 worker然后用 `execute_on_worker` 在指定 worker 上执行脚本。适用于需要特定硬件(如 GPU或在远程环境执行任务的场景。
**重要**
- 在 worker 上执行脚本时,可以通过 obj API 访问项目文件:
- 下载文件:`curl https://tori.euphon.cloud/api/obj/{project_id}/files/{path}`
- 上传文件:`curl -X POST -F 'files=@output.txt' https://tori.euphon.cloud/api/obj/{project_id}/files/`
- Python 脚本会自动通过 `uv run --script` 执行,支持 PEP 723 内联依赖声明:
```python
# /// script
# requires-python = ">=3.10"
# dependencies = ["requests", "pandas"]
# ///
import requests, pandas as pd
...
```
请使用中文回复。 请使用中文回复。

View File

@@ -19,9 +19,11 @@ pub enum AgentUpdate {
ExecutionLog { workflow_id: String, step_order: i32, tool_name: String, tool_input: String, output: String, status: String }, ExecutionLog { workflow_id: String, step_order: i32, tool_name: String, tool_input: String, output: String, status: String },
LlmCallLog { workflow_id: String, step_order: i32, phase: String, messages_count: i32, tools_count: i32, tool_calls: String, text_response: String, prompt_tokens: Option<u32>, completion_tokens: Option<u32>, latency_ms: i64 }, LlmCallLog { workflow_id: String, step_order: i32, phase: String, messages_count: i32, tools_count: i32, tool_calls: String, text_response: String, prompt_tokens: Option<u32>, completion_tokens: Option<u32>, latency_ms: i64 },
StateSnapshot { workflow_id: String, step_order: i32, state: AgentState }, StateSnapshot { workflow_id: String, step_order: i32, state: AgentState },
WorkflowComplete { workflow_id: String, status: String, report: Option<String> }, WorkflowComplete { workflow_id: String, status: String },
ArtifactSave { workflow_id: String, step_order: i32, artifact: Artifact }, ArtifactSave { workflow_id: String, step_order: i32, artifact: Artifact },
RequirementUpdate { workflow_id: String, requirement: String }, RequirementUpdate { workflow_id: String, requirement: String },
/// base64-encoded file content
FileSync { project_id: String, path: String, data_b64: String },
Error { message: String }, Error { message: String },
} }
@@ -98,14 +100,9 @@ pub async fn handle_single_update(
"INSERT INTO agent_state_snapshots (id, workflow_id, step_order, state_json, created_at) VALUES (?, ?, ?, ?, datetime('now'))" "INSERT INTO agent_state_snapshots (id, workflow_id, step_order, state_json, created_at) VALUES (?, ?, ?, ?, datetime('now'))"
).bind(&id).bind(workflow_id).bind(step_order).bind(&json).execute(pool).await; ).bind(&id).bind(workflow_id).bind(step_order).bind(&json).execute(pool).await;
} }
AgentUpdate::WorkflowComplete { workflow_id, status, report } => { AgentUpdate::WorkflowComplete { workflow_id, status } => {
let _ = sqlx::query("UPDATE workflows SET status = ? WHERE id = ?") let _ = sqlx::query("UPDATE workflows SET status = ? WHERE id = ?")
.bind(status).bind(workflow_id).execute(pool).await; .bind(status).bind(workflow_id).execute(pool).await;
if let Some(r) = report {
let _ = sqlx::query("UPDATE workflows SET report = ? WHERE id = ?")
.bind(r).bind(workflow_id).execute(pool).await;
bcast(broadcast_tx, WsMessage::ReportReady { workflow_id: workflow_id.clone() });
}
bcast(broadcast_tx, WsMessage::WorkflowStatusUpdate { workflow_id: workflow_id.clone(), status: status.clone() }); bcast(broadcast_tx, WsMessage::WorkflowStatusUpdate { workflow_id: workflow_id.clone(), status: status.clone() });
} }
AgentUpdate::ArtifactSave { workflow_id, step_order, artifact } => { AgentUpdate::ArtifactSave { workflow_id, step_order, artifact } => {
@@ -120,6 +117,17 @@ pub async fn handle_single_update(
.bind(requirement).bind(workflow_id).execute(pool).await; .bind(requirement).bind(workflow_id).execute(pool).await;
bcast(broadcast_tx, WsMessage::RequirementUpdate { workflow_id: workflow_id.clone(), requirement: requirement.clone() }); bcast(broadcast_tx, WsMessage::RequirementUpdate { workflow_id: workflow_id.clone(), requirement: requirement.clone() });
} }
AgentUpdate::FileSync { project_id, path, data_b64 } => {
use base64::Engine;
let base = format!("/app/data/workspaces/{}", project_id);
let full = std::path::Path::new(&base).join(path);
if let Some(parent) = full.parent() {
let _ = tokio::fs::create_dir_all(parent).await;
}
if let Ok(bytes) = base64::engine::general_purpose::STANDARD.decode(data_b64) {
let _ = tokio::fs::write(&full, &bytes).await;
}
}
AgentUpdate::Error { message } => { AgentUpdate::Error { message } => {
bcast(broadcast_tx, WsMessage::Error { message: message.clone() }); bcast(broadcast_tx, WsMessage::Error { message: message.clone() });
} }

View File

@@ -191,10 +191,12 @@ async fn connect_and_run(server_url: &str, worker_name: &str, llm_config: &crate
}).await; }).await;
} }
// Sync all workspace files to server
sync_workspace(&update_tx, &project_id, &workdir).await;
let _ = update_tx.send(AgentUpdate::WorkflowComplete { let _ = update_tx.send(AgentUpdate::WorkflowComplete {
workflow_id: workflow_id.clone(), workflow_id: workflow_id.clone(),
status: final_status.into(), status: final_status.into(),
report: None,
}).await; }).await;
*comment_tx.lock().await = None; *comment_tx.lock().await = None;
@@ -214,3 +216,51 @@ async fn connect_and_run(server_url: &str, worker_name: &str, llm_config: &crate
Ok(()) Ok(())
} }
/// Sync all workspace files to server via FileSync updates.
/// Skips .venv/, __pycache__/, .git/ and files > 1MB.
async fn sync_workspace(
update_tx: &mpsc::Sender<AgentUpdate>,
project_id: &str,
workdir: &str,
) {
use base64::Engine;
let base = std::path::Path::new(workdir);
if !base.exists() {
return;
}
let mut stack = vec![base.to_path_buf()];
let mut count = 0u32;
while let Some(dir) = stack.pop() {
let mut entries = match tokio::fs::read_dir(&dir).await {
Ok(e) => e,
Err(_) => continue,
};
while let Ok(Some(entry)) = entries.next_entry().await {
let name = entry.file_name().to_string_lossy().to_string();
// Skip dirs we don't want to sync
if matches!(name.as_str(), ".venv" | "__pycache__" | ".git" | "node_modules" | ".mypy_cache") {
continue;
}
let path = entry.path();
if path.is_dir() {
stack.push(path);
} else if let Ok(meta) = entry.metadata().await {
// Skip files > 1MB
if meta.len() > 1_048_576 {
continue;
}
if let Ok(bytes) = tokio::fs::read(&path).await {
let rel = path.strip_prefix(base).unwrap_or(&path);
let _ = update_tx.send(AgentUpdate::FileSync {
project_id: project_id.to_string(),
path: rel.to_string_lossy().to_string(),
data_b64: base64::engine::general_purpose::STANDARD.encode(&bytes),
}).await;
count += 1;
}
}
}
}
tracing::info!("Synced {} files from workspace {}", count, workdir);
}