16 KiB
Agent Runtime 设计
Context 管理现状与设计
现状
当前没有做 context 长度限制,存在超过 model token limit 的风险。
已有的缓解机制
- Phase transition 时 clear:
step_messages在 planning→executing 和 step→step 切换时会clear(),避免跨阶段累积 - 单条 tool output 截断:bash 输出限制 8000 bytes,read_file 超长时也截断
- Step context 摘要:已完成步骤只保留 summary(
step_summaries),不带完整输出
风险场景
- 一个 execution step 内 tool call 轮次过多(反复 bash、read_file),
step_messages无限增长 - 每轮 LLM 的 assistant message + tool result 都 push 进
step_messages,没有上限 - 最终整个 messages 数组超过模型 context window
方案设计
策略:滑动窗口 + 早期消息摘要
当 step_messages 长度超过阈值时,保留最近 N 轮完整对话,早期的 tool call/result 对折叠为一条摘要消息。
[system prompt]
[user: step context]
[summary of early tool interactions] ← 压缩后的历史
[recent assistant + tool messages] ← 完整保留最近 N 轮
具体实现
- Token 估算:用字符数粗估(1 token ≈ 3-4 chars 中英混合),不需要精确 tokenizer
- 阈值:可配置,默认如 80000 chars(约 20k-25k tokens),给 system prompt 和 response 留余量
- 压缩触发:每次构建 messages 时检查总长度,超过阈值则压缩
- 压缩方式:
- 简单版:直接丢弃早期 tool call/result 对,替换为
[已执行 N 次工具调用,最近结果见下文] - 进阶版:用 LLM 生成摘要(额外一次 API 调用,但质量更好)
- 简单版:直接丢弃早期 tool call/result 对,替换为
- 不压缩的部分:system prompt、user context、最近 2-3 轮完整交互
实现位置
在 run_agent_loop 中构建 messages 之后、调用 LLM 之前,插入压缩逻辑:
// agent.rs run_agent_loop 内,约 L706-L725
let (mut messages, tools) = match &state.phase { ... };
// 压缩 context
compact_messages(&mut messages, MAX_CONTEXT_CHARS);
compact_messages 函数:从前往后扫描,保留 system/user 头部,计算总长度,超限时将早期 assistant+tool 消息替换为摘要。
执行中用户反馈处理 (Plan-Centric Feedback)
核心原则
Agent loop 是 以 plan 为中心线性推进 的。用户反馈的处理不应该是简单的"中断/不中断"二元决策,而应该根据反馈落在 plan 的哪个位置来决定行为:
Plan: [step1 ✓] [step2 ✓] [step3 🔄 executing] [step4 ○] [step5 ○]
↑
当前执行位置
用户反馈 → 影响哪里?
├─ 影响已完成部分 (step1/step2) → 回退:中断当前执行,replan from affected step
├─ 改了需求本身 → 回退:中断当前执行,全部 replan
├─ 影响未完成部分 (step4/step5) → 前进:不中断,调整后续 plan steps
└─ 影响当前步骤 (step3) → 视情况:注入到当前 step context 或中断重做
与"中断"的关系
"中断"只是一个实现手段,不是目的。真正的决策是:
- 反馈是否影响当前或已完成的工作? → 必须中断(继续执行是浪费)
- 反馈只影响未来的步骤? → 不中断,修改 plan 里的 pending steps,执行到那里时自然生效
这意味着 cancel token 之类的机制仍然需要(作为中断的执行手段),但决策逻辑是 plan-aware 的。
决策流程
用户 comment 到达
│
├─ workflow idle (done/failed)
│ └─ 当前逻辑:以 comment 为 context 重新执行
│
└─ workflow executing (step N)
│
├─ 1. 分类 LLM 调用(轻量,不阻塞执行)
│ 输入:原始需求、当前 plan、当前步骤进度、用户 comment
│ 输出:{ impact: "past" | "current" | "future" | "requirement_change" }
│
├─ impact = "past" 或 "requirement_change"
│ → cancel 当前执行
│ → replan(带上已完成步骤的摘要 + 用户反馈)
│
├─ impact = "current"
│ → 注入到 step_messages(当前步骤的 context)
│ → agent 在下一次 LLM 调用时自然看到
│ → 如果反馈是否定性的("这样不行"),cancel 当前步骤,重做
│
└─ impact = "future"
→ 修改 plan 中 pending steps(update/insert/delete)
→ 不中断当前执行
→ 执行到对应步骤时自然生效
实现要点
1. Plan 作为可变数据结构
当前 plan steps 存在 DB 里,但 agent 执行时是线性推进的,没有"修改后续步骤"的能力。需要:
- plan steps 支持运行时增删改(不仅是状态更新)
- agent 每执行完一步,从 DB 重新读 plan(而不是用内存快照),这样外部修改能被感知
2. 并发:分类 LLM 与执行 LLM 并行
分类调用不应该阻塞执行。用 tokio::spawn 做分类,分类结果通过 channel 或 shared state 回传。如果分类结果是"需要中断",设置 cancel token。
// 伪代码
tokio::spawn(async move {
let impact = classify_feedback(&llm, &plan, current_step, &comment).await;
match impact {
Impact::Past | Impact::RequirementChange => {
cancel_token.cancel();
// replan 会在 cancel 生效后由主循环触发
}
Impact::Current => {
// 注入到 step context
step_messages_tx.send(comment).await;
}
Impact::Future => {
// 调用 LLM 修改后续 plan steps
replan_future_steps(&llm, &plan, current_step, &comment).await;
}
}
});
3. 中断后的 Context 衔接
中断执行后重启时,需要构建的 context:
- 已完成步骤的摘要(已有
step_summaries机制) - 被中断步骤的部分执行记录
- 用户反馈内容
- "请根据用户反馈重新规划剩余步骤"的指令
这和上面 context 压缩的滑动窗口方案互补——中断后重启本质上就是一次强制的 context 压缩。
与 context 压缩的统一视角
两个问题其实是同一件事的两面:
| Context 压缩 | 用户反馈 | |
|---|---|---|
| 触发条件 | token 数超阈值 | 用户发了 comment |
| 本质操作 | 摘要化早期历史,保留近期 | 根据反馈位置,决定哪些历史需要保留/丢弃/重做 |
| 共享机制 | step_summaries、compact_messages | 同上 + plan 的增删改 |
两者都需要一个健壮的"摘要化已完成工作"的基础能力。先把这个基础能力做好,上面两个场景都能受益。
Plan-Centric Context 优先级
Context 管理也应该围绕 plan 展开。Plan 本身是结构化的、紧凑的,基本不会超。撑爆 context 的是执行日志(一个 step 内反复 tool call 的历史)。这就有了清晰的层级和主次:
优先级(从高到低,压缩时从低往高砍):
1. System prompt + 需求 ← 不可压缩,恒定大小
2. Plan 全貌(步骤标题 + 状态) ← 不可压缩,O(步骤数) 很短
3. 用户 comments ← 保留原文,通常很短
4. 当前步骤的最近 N 轮 tool call ← 完整保留(agent 需要看到最近的执行结果)
5. 当前步骤的早期 tool call ← 第一优先压缩目标,摘要化
6. 已完成步骤的执行摘要 ← 已经是一句话/步骤,极紧凑
关键洞察:Plan 是骨架,执行日志是血肉。 骨架永远保留,血肉按新鲜度裁剪。这和人类记忆一样——你记得"今天做了什么"(plan),但不记得每一步的具体命令输出(执行日志)。
压缩算法因此变得很简单:
- 算总 token 估计
- 如果超阈值,从"当前步骤的早期 tool call"开始砍,替换为
[前 N 次工具调用已折叠,关键结果:...] - 如果还超,压缩已完成步骤的摘要(合并多步为一段)
- Plan 和 system prompt 永远不动
状态管理、并发与一致性
系统中的参与者和他们各自的"世界观"
一致性问题的本质是:多个参与者对同一份状态有各自的视图,这些视图会因为延迟和并发而产生分歧。
| 参与者 | 看到的世界 | 更新方式 | 延迟 |
|---|---|---|---|
| Agent loop | 内存 AgentState |
直接修改 | 0(权威源) |
| LLM | 上一次调用时传入的 messages | 每次调用时重建 | 一次 LLM 调用的时间(数秒) |
| DB | 持久化的 plan_steps, workflow.status | agent 写入 | 取决于 agent 何时 flush |
| 前端 | 最近一次 WS 推送 / API 响应 | WS broadcast | 网络延迟 + WS 推送时机 |
| 用户 | 看着前端 + 自己的判断 | 发 comment | 人类反应时间(秒~分钟) |
问题出在哪:
时间线:
t0 agent 开始执行 step 3
t1 前端显示 step 3 running(WS 推送)
t2 用户看到 step 3,觉得方向不对,开始打字
t3 agent 完成 step 3,开始 step 4
t4 前端显示 step 4 running
t5 用户发出 comment:"step 3 不对,应该这样做"
→ 但此时 step 3 已经做完了,step 4 在跑了
→ 用户的反馈是基于 t1 时的世界观
这种"用户看到的是过去"是不可消除的(人类反应时间),关键是系统怎么处理这个 gap。
核心设计:统一状态 + 单写者
状态 Schema
一个 project 的完整运行时状态 = 一个 ProjectState 对象:
struct ProjectState {
workflow_id: String,
requirement: String,
status: WorkflowStatus, // Idle / Executing / Done / Failed
plan: Vec<PlanStep>, // 有序步骤,每步有 status
current_step: Option<usize>, // 正在执行哪一步
step_summaries: Vec<StepSummary>, // 已完成步骤的紧凑摘要
step_messages: Vec<ChatMessage>, // 当前步骤的 tool call 历史(临时,会膨胀)
memo: String, // agent 备忘
iteration: u32, // LLM 调用轮次
cancel: CancellationToken, // 中断信号
}
用 Arc<RwLock<ProjectState>> 持有。
单写者规则
只有 agent loop 修改 ProjectState。 没有例外。
其他所有参与者(反馈分类器、API handler、WS handler)都是读者或事件发送者:
- 想看状态 → read lock,clone 出快照,立即释放
- 想改状态 → 发 event 到 agent loop 的 channel,由 agent loop 决定怎么改
这彻底消除了写-写竞争。不需要事务,不需要 CAS,不需要冲突解决。
数据流
用户 comment ─→ API handler ─→ mpsc channel ─→ agent loop (唯一写者)
│
write lock ──→ ProjectState
│
write DB (flush)
│
broadcast WS ──→ 前端
一致性分析:三对关系
1. 内存 ↔ DB 一致性
当前问题:AgentState(内存)和 plan_steps(DB)是两套独立数据,各管各的。
解法:内存是 master,DB 是 checkpoint。
- Agent loop 修改
ProjectState(内存),在关键节点 flush 到 DB - 前端 API 读内存(read lock),不读 DB(运行时)
- DB 只在重启恢复时读
Flush 时机(从密到疏):
| 事件 | 写 DB | 原因 |
|---|---|---|
| workflow.status 变更 | 立即 | 重启恢复必需 |
| plan 变更(新建/修改步骤) | 立即 | 重启恢复 + 持久记录 |
| 步骤完成(summary 产出) | 立即 | 重启恢复的关键数据 |
| step_messages 增长 | 不写 | 临时数据,中断时才摘要化 |
| memo 更新 | 步骤切换时 | 丢了问题不大 |
重启恢复 = 从 DB 重建 ProjectState,step_messages 为空(丢失可接受,summaries 提供足够上下文)。
2. 后端 ↔ 前端 一致性
前端通过两个渠道获取状态:
- WS 推送:agent loop 每次修改状态后 broadcast
- API 拉取:前端 mount 时、切换 project 时
一致性模型:最终一致。前端可能落后几百毫秒,但不会出现永久分歧。
需要保证的不变量:
- WS 推送的顺序和状态变更顺序一致(单写者天然保证)
- 前端收到 WS 消息后,可以安全地 patch 本地状态(不需要全量刷新)
- 如果 WS 断连,重连后拉一次全量状态即可恢复
3. Agent loop ↔ LLM ↔ 用户反馈 一致性
这是最微妙的。三方的时间线:
Agent loop: ──[step3 执行中]──[step3 完成]──[step4 开始]──...
LLM: ──[thinking...]──[返回 tool calls]──
用户: ──[看到 step3]──[打字中...]──[发送 comment]──
三个视角可能同时看到不同的状态。处理方式:
a) LLM 的"冻结视角"
每次 LLM 调用时,传入的 messages 是一个 point-in-time 快照。LLM 返回的 tool calls 是基于那个快照的。在 LLM 思考期间,状态可能已经变了(比如用户发了 comment),但 LLM 不知道。
这没关系——LLM 返回后,agent loop 先检查 cancel flag,再决定是否执行 tool calls。如果 cancel 了,丢弃 LLM 的响应,不浪费。
b) 用户反馈的"过时性"
用户发 comment 时看到的可能是几秒前的状态。反馈里提到的"step 3"可能已经完成了。
处理:反馈分类器拿当前快照(read lock),对比用户反馈和当前进度,判断反馈是关于:
- 已完成的步骤 → 需要 revert/redo
- 当前步骤 → 注入或中断
- 未来步骤 → 修改 plan
- 需求级别 → 全部 replan
分类结果作为 event 发给 agent loop。Agent loop 收到时再次基于最新状态做最终决策(双重检查)。
c) 反馈分类器的并发
反馈分类器 tokio::spawn 出去,和 agent loop 并行:
Agent loop: ──[step4 执行中]──────────[step4 完成]──[处理 feedback event]──
↑
Feedback classifier: ──[拿 snapshot]──[LLM 分类...]──[发 event]─┘
分类器拿的 snapshot 可能已经过时了(分类期间 agent 又推进了一步)。但因为分类结果是建议性的,agent loop 收到后基于最新状态做最终决策,所以不会出错。最坏情况:建议无效,忽略。
Agent loop 内部的事件处理
Agent loop 不再是"拿 event → 跑完整个 run_agent_loop → 拿下一个 event"。而是在 tool call 循环内部也能感知新事件:
for iteration in 0..MAX_ITERATIONS {
// 1. 检查中断
if state.cancel.is_cancelled() {
// 摘要化当前 step_messages,存 DB
break;
}
// 2. 检查是否有 pending feedback events(非阻塞)
while let Ok(event) = feedback_rx.try_recv() {
handle_feedback(&mut state, event);
// 可能修改了 plan、设置了 cancel、注入了 step_messages
}
// 3. 构建 messages,调 LLM
let messages = build_messages(&state);
let response = llm.chat(messages).await?;
// 4. 执行 tool calls,更新 state
// ...
}
关键:feedback_rx.try_recv() 是非阻塞的。在每次 LLM 调用之间检查一次。如果有 feedback,立即在 agent loop 内处理(修改 plan / 设置 cancel / 注入 context),然后继续循环。
这保持了单写者——所有状态变更都在这个循环里发生。
实现顺序
- 定义
ProjectState— 集中状态,Arc<RwLock<_>>。Agent loop 内部用起来,行为不变。 - DB flush 策略 — 关键节点写 DB(status 变更、步骤完成),前端 API 改为读内存。
- Step 摘要持久化 — 步骤完成时写 summary 到 DB,重启时重建 ProjectState。
- Context 压缩 — step_messages 滑动窗口。
- CancellationToken + feedback channel — agent loop 内部
try_recv,单写者处理反馈。 - Feedback 分类器 —
tokio::spawn,read lock 拿快照,LLM 分类,结果发回 agent loop。