extract Output trait: decouple AI core from Telegram

- Add src/output.rs with Output trait and 3 implementations:
  TelegramOutput (streaming via draft/edit), GiteaOutput (comments),
  BufferOutput (for worker/tests)
- Refactor run_openai_with_tools and execute_tool to use &mut dyn Output
- Remove run_claude_streaming, invoke_claude_streaming, run_openai_streaming
  (dead code — only OpenAI-compatible backend is used now)
- Remove BackendConfig::Claude code path from handler
- stream.rs: 790 → 150 lines
This commit is contained in:
Fam Zheng
2026-04-10 16:54:39 +00:00
parent dbd729ecb8
commit f646391f14
7 changed files with 344 additions and 702 deletions

12
Cargo.lock generated
View File

@@ -51,6 +51,17 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "async-trait"
version = "0.1.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
@@ -1045,6 +1056,7 @@ name = "noc"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"axum",
"base64 0.22.1",
"chrono",

View File

@@ -5,6 +5,7 @@ edition = "2021"
[dependencies]
anyhow = "1"
async-trait = "0.1"
axum = "0.8"
base64 = "0.22"
chrono = { version = "0.4", features = ["serde"] }

View File

@@ -4,6 +4,7 @@ use teloxide::prelude::*;
use tracing::{error, info, warn};
use crate::config::{BackendConfig, Config};
use crate::output::TelegramOutput;
use crate::state::AppState;
use crate::stream::run_openai_with_tools;
use crate::tools::compute_next_cron_fire;
@@ -62,12 +63,13 @@ pub async fn life_loop(bot: Bot, state: Arc<AppState>, config: Arc<Config>) {
} = config.backend
{
let sid = format!("life-{chat_id_raw}");
let mut tg_output = TelegramOutput::new(bot.clone(), chat_id, true);
let result = tokio::time::timeout(
std::time::Duration::from_secs(LIFE_LOOP_TIMEOUT_SECS),
run_openai_with_tools(
endpoint, model, api_key, messages, &bot, chat_id, &state, &sid,
&config, true,
endpoint, model, api_key, messages, &mut tg_output, &state, &sid,
&config, *chat_id_raw,
),
)
.await;

View File

@@ -2,6 +2,7 @@ mod config;
mod display;
mod gitea;
mod life;
mod output;
mod state;
mod stream;
mod tools;
@@ -21,12 +22,9 @@ use uuid::Uuid;
use config::{BackendConfig, Config};
use display::build_user_content;
use output::TelegramOutput;
use state::{AppState, MAX_WINDOW, SLIDE_SIZE};
use stream::{
build_system_prompt, invoke_claude_streaming, run_claude_streaming, run_openai_with_tools,
summarize_messages,
};
use tools::discover_tools;
use stream::{build_system_prompt, run_openai_with_tools, summarize_messages};
// ── helpers ─────────────────────────────────────────────────────────
@@ -315,7 +313,7 @@ async fn handle_inner(
let count = state.message_count(&sid).await;
let persona = state.get_config("persona").await.unwrap_or_default();
let scratch = state.get_scratch().await;
let tools = discover_tools();
let tools = tools::discover_tools();
let empty = vec![];
let tools_arr = tools.as_array().unwrap_or(&empty);
@@ -373,126 +371,97 @@ async fn handle_inner(
}
}
// handle "cc" prefix: pass directly to claude -p, no session, no history
if let Some(cc_prompt) = text.strip_prefix("cc").map(|s| s.trim_start()) {
if !cc_prompt.is_empty() {
info!(%sid, "cc passthrough");
let prompt = build_prompt(cc_prompt, &uploaded, &download_errors, &transcriptions);
match run_claude_streaming(&[], &prompt, bot, chat_id).await {
Ok(_) => {}
Err(e) => {
error!(%sid, "cc claude: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
}
return Ok(());
}
}
let prompt = build_prompt(text, &uploaded, &download_errors, &transcriptions);
match &config.backend {
BackendConfig::Claude => {
let known = state.persist.read().await.known_sessions.contains(&sid);
let result =
invoke_claude_streaming(&sid, &prompt, known, bot, chat_id).await;
match &result {
Ok(_) => {
if !known {
state.persist.write().await.known_sessions.insert(sid.clone());
state.save().await;
let BackendConfig::OpenAI {
endpoint,
model,
api_key,
} = &config.backend
else {
let _ = bot.send_message(chat_id, "Only OpenAI backend is supported").await;
return Ok(());
};
let conv = state.load_conv(&sid).await;
let persona = state.get_config("persona").await.unwrap_or_default();
let memory_slots = state.get_memory_slots().await;
let inner = state.get_inner_state().await;
let system_msg = build_system_prompt(&conv.summary, &persona, &memory_slots, &inner);
let mut api_messages = vec![system_msg];
api_messages.extend(conv.messages);
let scratch = state.get_scratch().await;
let user_content = build_user_content(&prompt, &scratch, &uploaded);
api_messages.push(serde_json::json!({"role": "user", "content": user_content}));
let mut tg_output = TelegramOutput::new(bot.clone(), chat_id, is_private);
match run_openai_with_tools(
endpoint, model, api_key, api_messages, &mut tg_output, state, &sid, config, chat_id.0,
)
.await
{
Ok(response) => {
state.push_message(&sid, "user", &prompt).await;
if !response.is_empty() {
state.push_message(&sid, "assistant", &response).await;
}
// sliding window
let count = state.message_count(&sid).await;
if count >= MAX_WINDOW {
info!(%sid, "sliding window: {count} messages, summarizing oldest {SLIDE_SIZE}");
let _ = bot
.send_message(chat_id, "[整理记忆中...]")
.await;
let to_summarize =
state.get_oldest_messages(&sid, SLIDE_SIZE).await;
let current_summary = {
let db = state.db.lock().await;
db.query_row(
"SELECT summary FROM conversations WHERE session_id = ?1",
[&sid],
|row| row.get::<_, String>(0),
)
.unwrap_or_default()
};
match summarize_messages(
endpoint,
model,
api_key,
&current_summary,
&to_summarize,
)
.await
{
Ok(new_summary) => {
state.slide_window(&sid, &new_summary, SLIDE_SIZE).await;
let remaining = state.message_count(&sid).await;
info!(%sid, "window slid, {remaining} messages remain, summary {} chars", new_summary.len());
}
Err(e) => {
warn!(%sid, "summarize failed: {e:#}, keeping all messages");
}
}
Err(e) => {
error!(%sid, "claude: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
}
// auto-reflect every 10 messages
let count = state.message_count(&sid).await;
if count % 10 == 0 && count > 0 {
let state_c = state.clone();
let config_c = config.clone();
tokio::spawn(async move {
crate::life::reflect(&state_c, &config_c).await;
});
}
}
BackendConfig::OpenAI {
endpoint,
model,
api_key,
} => {
let conv = state.load_conv(&sid).await;
let persona = state.get_config("persona").await.unwrap_or_default();
let memory_slots = state.get_memory_slots().await;
let inner = state.get_inner_state().await;
let system_msg = build_system_prompt(&conv.summary, &persona, &memory_slots, &inner);
let mut api_messages = vec![system_msg];
api_messages.extend(conv.messages);
let scratch = state.get_scratch().await;
let user_content = build_user_content(&prompt, &scratch, &uploaded);
api_messages.push(serde_json::json!({"role": "user", "content": user_content}));
match run_openai_with_tools(
endpoint, model, api_key, api_messages, bot, chat_id, state, &sid, config, is_private,
)
.await
{
Ok(response) => {
state.push_message(&sid, "user", &prompt).await;
if !response.is_empty() {
state.push_message(&sid, "assistant", &response).await;
}
// sliding window
let count = state.message_count(&sid).await;
if count >= MAX_WINDOW {
info!(%sid, "sliding window: {count} messages, summarizing oldest {SLIDE_SIZE}");
let _ = bot
.send_message(chat_id, "[整理记忆中...]")
.await;
let to_summarize =
state.get_oldest_messages(&sid, SLIDE_SIZE).await;
let current_summary = {
let db = state.db.lock().await;
db.query_row(
"SELECT summary FROM conversations WHERE session_id = ?1",
[&sid],
|row| row.get::<_, String>(0),
)
.unwrap_or_default()
};
match summarize_messages(
endpoint,
model,
api_key,
&current_summary,
&to_summarize,
)
.await
{
Ok(new_summary) => {
state.slide_window(&sid, &new_summary, SLIDE_SIZE).await;
let remaining = state.message_count(&sid).await;
info!(%sid, "window slid, {remaining} messages remain, summary {} chars", new_summary.len());
}
Err(e) => {
warn!(%sid, "summarize failed: {e:#}, keeping all messages");
}
}
}
// auto-reflect every 10 messages
let count = state.message_count(&sid).await;
if count % 10 == 0 && count > 0 {
let state_c = state.clone();
let config_c = config.clone();
tokio::spawn(async move {
crate::life::reflect(&state_c, &config_c).await;
});
}
}
Err(e) => {
error!(%sid, "openai: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
}
Err(e) => {
error!(%sid, "openai: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
}

207
src/output.rs Normal file
View File

@@ -0,0 +1,207 @@
use anyhow::Result;
use async_trait::async_trait;
use std::path::Path;
/// Output trait — abstraction over where AI responses go.
///
/// Implementations:
/// - TelegramOutput: send/edit messages in Telegram chat
/// - GiteaOutput: post comments on issues/PRs
/// - BufferOutput: collect text in memory (for Worker, tests)
#[async_trait]
pub trait Output: Send + Sync {
/// Send or update streaming text. Called repeatedly as tokens arrive.
/// Implementation decides whether to create new message or edit existing one.
async fn stream_update(&mut self, text: &str) -> Result<()>;
/// Finalize the message — called once when streaming is done.
async fn finalize(&mut self, text: &str) -> Result<()>;
/// Send a status/notification line (e.g. "[tool: bash] running...")
async fn status(&self, text: &str) -> Result<()>;
/// Send a file. Returns Ok(true) if sent, Ok(false) if not supported.
async fn send_file(&self, path: &Path, caption: &str) -> Result<bool>;
}
// ── Telegram ───────────────────────────────────────────────────────
use teloxide::prelude::*;
use teloxide::types::InputFile;
use tokio::time::Instant;
use crate::display::{truncate_at_char_boundary, truncate_for_display};
use crate::stream::{send_message_draft, DRAFT_INTERVAL_MS, EDIT_INTERVAL_MS, TG_MSG_LIMIT};
pub struct TelegramOutput {
pub bot: Bot,
pub chat_id: ChatId,
pub is_private: bool,
// internal state
msg_id: Option<teloxide::types::MessageId>,
use_draft: bool,
last_edit: Instant,
http: reqwest::Client,
}
impl TelegramOutput {
pub fn new(bot: Bot, chat_id: ChatId, is_private: bool) -> Self {
Self {
bot,
chat_id,
is_private,
msg_id: None,
use_draft: is_private,
last_edit: Instant::now(),
http: reqwest::Client::new(),
}
}
}
#[async_trait]
impl Output for TelegramOutput {
async fn stream_update(&mut self, text: &str) -> Result<()> {
let interval = if self.use_draft {
DRAFT_INTERVAL_MS
} else {
EDIT_INTERVAL_MS
};
if self.last_edit.elapsed().as_millis() < interval as u128 {
return Ok(());
}
let display = if self.use_draft {
truncate_at_char_boundary(text, TG_MSG_LIMIT).to_string()
} else {
truncate_for_display(text)
};
if self.use_draft {
let token = self.bot.token().to_owned();
match send_message_draft(&self.http, &token, self.chat_id.0, 1, &display).await {
Ok(_) => {
self.last_edit = Instant::now();
}
Err(e) => {
tracing::warn!("sendMessageDraft failed, falling back: {e:#}");
self.use_draft = false;
if let Ok(sent) = self.bot.send_message(self.chat_id, &display).await {
self.msg_id = Some(sent.id);
self.last_edit = Instant::now();
}
}
}
} else if let Some(id) = self.msg_id {
if self
.bot
.edit_message_text(self.chat_id, id, &display)
.await
.is_ok()
{
self.last_edit = Instant::now();
}
} else if let Ok(sent) = self.bot.send_message(self.chat_id, &display).await {
self.msg_id = Some(sent.id);
self.last_edit = Instant::now();
}
Ok(())
}
async fn finalize(&mut self, text: &str) -> Result<()> {
crate::display::send_final_result(
&self.bot,
self.chat_id,
self.msg_id,
self.use_draft,
text,
)
.await;
Ok(())
}
async fn status(&self, text: &str) -> Result<()> {
let _ = self.bot.send_message(self.chat_id, text).await;
Ok(())
}
async fn send_file(&self, path: &Path, caption: &str) -> Result<bool> {
let input_file = InputFile::file(path);
let mut req = self.bot.send_document(self.chat_id, input_file);
if !caption.is_empty() {
req = req.caption(caption);
}
req.await?;
Ok(true)
}
}
// ── Gitea ──────────────────────────────────────────────────────────
use crate::gitea::GiteaClient;
pub struct GiteaOutput {
pub client: GiteaClient,
pub owner: String,
pub repo: String,
pub issue_nr: u64,
}
#[async_trait]
impl Output for GiteaOutput {
async fn stream_update(&mut self, _text: &str) -> Result<()> {
// Gitea comments don't support streaming — just accumulate
Ok(())
}
async fn finalize(&mut self, text: &str) -> Result<()> {
self.client
.post_comment(&self.owner, &self.repo, self.issue_nr, text)
.await
}
async fn status(&self, _text: &str) -> Result<()> {
// No status updates for Gitea
Ok(())
}
async fn send_file(&self, _path: &Path, _caption: &str) -> Result<bool> {
// Gitea comments can't send files directly
Ok(false)
}
}
// ── Buffer (for Worker, tests) ─────────────────────────────────────
pub struct BufferOutput {
pub text: String,
}
impl BufferOutput {
pub fn new() -> Self {
Self {
text: String::new(),
}
}
}
#[async_trait]
impl Output for BufferOutput {
async fn stream_update(&mut self, text: &str) -> Result<()> {
self.text = text.to_string();
Ok(())
}
async fn finalize(&mut self, text: &str) -> Result<()> {
self.text = text.to_string();
Ok(())
}
async fn status(&self, _text: &str) -> Result<()> {
Ok(())
}
async fn send_file(&self, _path: &Path, _caption: &str) -> Result<bool> {
Ok(false)
}
}

View File

@@ -1,18 +1,11 @@
use std::process::Stdio;
use std::sync::Arc;
use anyhow::Result;
use serde::Deserialize;
use teloxide::prelude::*;
use tokio::io::AsyncBufReadExt;
use tokio::process::Command;
use tokio::time::Instant;
use tracing::{error, info, warn};
use crate::config::Config;
use crate::display::{
send_final_result, truncate_at_char_boundary, truncate_for_display,
};
use crate::display::truncate_at_char_boundary;
use crate::output::Output;
use crate::state::AppState;
use crate::tools::{discover_tools, execute_tool, ToolCall};
@@ -21,66 +14,6 @@ pub const DRAFT_INTERVAL_MS: u64 = 1000;
pub const TG_MSG_LIMIT: usize = 4096;
pub const CURSOR: &str = " \u{25CE}";
/// Stream JSON event types we care about.
#[derive(Deserialize)]
pub struct StreamEvent {
#[serde(rename = "type")]
pub event_type: String,
pub message: Option<AssistantMessage>,
pub result: Option<String>,
#[serde(default)]
pub is_error: bool,
}
#[derive(Deserialize)]
pub struct AssistantMessage {
pub content: Vec<ContentBlock>,
}
#[derive(Deserialize)]
pub struct ContentBlock {
#[serde(rename = "type")]
pub block_type: String,
pub text: Option<String>,
pub name: Option<String>,
pub input: Option<serde_json::Value>,
}
/// Extract all text from an assistant message's content blocks.
pub fn extract_text(msg: &AssistantMessage) -> String {
msg.content
.iter()
.filter(|b| b.block_type == "text")
.filter_map(|b| b.text.as_deref())
.collect::<Vec<_>>()
.join("")
}
/// Extract tool use status line, e.g. "Bash: echo hello"
pub fn extract_tool_use(msg: &AssistantMessage) -> Option<String> {
for block in &msg.content {
if block.block_type == "tool_use" {
let name = block.name.as_deref().unwrap_or("tool");
let detail = block
.input
.as_ref()
.and_then(|v| {
// try common fields: command, pattern, file_path, query
v.get("command")
.or(v.get("pattern"))
.or(v.get("file_path"))
.or(v.get("query"))
.or(v.get("prompt"))
.and_then(|s| s.as_str())
})
.unwrap_or("");
let detail_short = truncate_at_char_boundary(detail, 80);
return Some(format!("{name}: {detail_short}"));
}
}
None
}
pub async fn send_message_draft(
client: &reqwest::Client,
token: &str,
@@ -113,12 +46,11 @@ pub async fn run_openai_with_tools(
model: &str,
api_key: &str,
mut messages: Vec<serde_json::Value>,
bot: &Bot,
chat_id: ChatId,
output: &mut dyn Output,
state: &Arc<AppState>,
sid: &str,
config: &Arc<Config>,
is_private: bool,
chat_id: i64,
) -> Result<String> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
@@ -149,7 +81,6 @@ pub async fn run_openai_with_tools(
if !resp_raw.status().is_success() {
let status = resp_raw.status();
let body_text = resp_raw.text().await.unwrap_or_default();
// dump messages for debugging
for (i, m) in messages.iter().enumerate() {
let role = m["role"].as_str().unwrap_or("?");
let content_len = m["content"].as_str().map(|s| s.len()).unwrap_or(0);
@@ -162,15 +93,7 @@ pub async fn run_openai_with_tools(
}
let mut resp = resp_raw;
let token = bot.token().to_owned();
let raw_chat_id = chat_id.0;
let draft_id: i64 = 1;
let mut use_draft = is_private; // sendMessageDraft only works in private chats
let mut msg_id: Option<teloxide::types::MessageId> = None;
let mut accumulated = String::new();
let mut last_edit = Instant::now();
let mut buffer = String::new();
let mut done = false;
@@ -206,14 +129,12 @@ pub async fn run_openai_with_tools(
if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
let delta = &json["choices"][0]["delta"];
// handle content delta
if let Some(content) = delta["content"].as_str() {
if !content.is_empty() {
accumulated.push_str(content);
}
}
// handle tool call delta
if let Some(tc_arr) = delta["tool_calls"].as_array() {
has_tool_calls = true;
for tc in tc_arr {
@@ -237,70 +158,15 @@ pub async fn run_openai_with_tools(
}
}
// display update (only when there's content to show)
if accumulated.is_empty() {
continue;
if !accumulated.is_empty() {
let _ = output.stream_update(&accumulated).await;
}
{
let interval = if use_draft {
DRAFT_INTERVAL_MS
} else {
EDIT_INTERVAL_MS
};
if last_edit.elapsed().as_millis() < interval as u128 {
continue;
}
let display = if use_draft {
truncate_at_char_boundary(&accumulated, TG_MSG_LIMIT).to_string()
} else {
truncate_for_display(&accumulated)
};
if use_draft {
match send_message_draft(
&client, &token, raw_chat_id, draft_id, &display,
)
.await
{
Ok(_) => {
last_edit = Instant::now();
}
Err(e) => {
warn!("sendMessageDraft failed, falling back: {e:#}");
use_draft = false;
if let Ok(sent) =
bot.send_message(chat_id, &display).await
{
msg_id = Some(sent.id);
last_edit = Instant::now();
}
}
}
} else if let Some(id) = msg_id {
if bot
.edit_message_text(chat_id, id, &display)
.await
.is_ok()
{
last_edit = Instant::now();
}
} else if let Ok(sent) =
bot.send_message(chat_id, &display).await
{
msg_id = Some(sent.id);
last_edit = Instant::now();
}
} // end display block
}
}
}
// decide what to do based on response type
if has_tool_calls && !tool_calls.is_empty() {
// append assistant message with tool calls
let tc_json: Vec<serde_json::Value> = tool_calls
.iter()
.map(|tc| {
@@ -322,15 +188,14 @@ pub async fn run_openai_with_tools(
});
messages.push(assistant_msg);
// execute each tool
for tc in &tool_calls {
info!(tool = %tc.name, "executing tool call");
let _ = bot
.send_message(chat_id, format!("[{}({})]", tc.name, truncate_at_char_boundary(&tc.arguments, 100)))
let _ = output
.status(&format!("[{}({})]", tc.name, truncate_at_char_boundary(&tc.arguments, 100)))
.await;
let result =
execute_tool(&tc.name, &tc.arguments, state, bot, chat_id, sid, config)
execute_tool(&tc.name, &tc.arguments, state, output, sid, config, chat_id)
.await;
messages.push(serde_json::json!({
@@ -340,357 +205,18 @@ pub async fn run_openai_with_tools(
}));
}
// clear display state for next round
tool_calls.clear();
// loop back to call API again
continue;
}
// content response — send final result
if !accumulated.is_empty() {
send_final_result(bot, chat_id, msg_id, use_draft, &accumulated).await;
let _ = output.finalize(&accumulated).await;
}
return Ok(accumulated);
}
}
// ── claude bridge (streaming) ───────────────────────────────────────
pub async fn invoke_claude_streaming(
sid: &str,
prompt: &str,
known: bool,
bot: &Bot,
chat_id: ChatId,
) -> Result<String> {
if known {
return run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await;
}
match run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await {
Ok(out) => {
info!(%sid, "resumed existing session");
Ok(out)
}
Err(e) => {
warn!(%sid, "resume failed ({e:#}), creating new session");
run_claude_streaming(&["--session-id", sid], prompt, bot, chat_id).await
}
}
}
pub async fn run_claude_streaming(
extra_args: &[&str],
prompt: &str,
bot: &Bot,
chat_id: ChatId,
) -> Result<String> {
let mut args: Vec<&str> = vec![
"--dangerously-skip-permissions",
"-p",
"--output-format",
"stream-json",
"--verbose",
];
args.extend(extra_args);
args.push(prompt);
let mut child = Command::new("claude")
.args(&args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdout = child.stdout.take().unwrap();
let mut lines = tokio::io::BufReader::new(stdout).lines();
// sendMessageDraft for native streaming, with editMessageText fallback
let http = reqwest::Client::new();
let token = bot.token().to_owned();
let raw_chat_id = chat_id.0;
let draft_id: i64 = 1;
let mut use_draft = true;
let mut msg_id: Option<teloxide::types::MessageId> = None;
let mut last_sent_text = String::new();
let mut last_edit = Instant::now();
let mut final_result = String::new();
let mut is_error = false;
let mut tool_status = String::new();
while let Ok(Some(line)) = lines.next_line().await {
let event: StreamEvent = match serde_json::from_str(&line) {
Ok(e) => e,
Err(_) => continue,
};
match event.event_type.as_str() {
"assistant" => {
if let Some(amsg) = &event.message {
// determine display content
let (display_raw, new_text) =
if let Some(status) = extract_tool_use(amsg) {
tool_status = format!("[{status}]");
let d = if last_sent_text.is_empty() {
tool_status.clone()
} else {
format!("{last_sent_text}\n\n{tool_status}")
};
(d, None)
} else {
let text = extract_text(amsg);
if text.is_empty() || text == last_sent_text {
continue;
}
let interval = if use_draft {
DRAFT_INTERVAL_MS
} else {
EDIT_INTERVAL_MS
};
if last_edit.elapsed().as_millis() < interval as u128 {
continue;
}
tool_status.clear();
(text.clone(), Some(text))
};
let display = if use_draft {
// draft mode: no cursor — cursor breaks monotonic text growth
truncate_at_char_boundary(&display_raw, TG_MSG_LIMIT).to_string()
} else {
truncate_for_display(&display_raw)
};
if use_draft {
match send_message_draft(
&http, &token, raw_chat_id, draft_id, &display,
)
.await
{
Ok(_) => {
if let Some(t) = new_text {
last_sent_text = t;
}
last_edit = Instant::now();
}
Err(e) => {
warn!("sendMessageDraft failed, falling back: {e:#}");
use_draft = false;
if let Ok(sent) =
bot.send_message(chat_id, &display).await
{
msg_id = Some(sent.id);
if let Some(t) = new_text {
last_sent_text = t;
}
last_edit = Instant::now();
}
}
}
} else if let Some(id) = msg_id {
if bot
.edit_message_text(chat_id, id, &display)
.await
.is_ok()
{
if let Some(t) = new_text {
last_sent_text = t;
}
last_edit = Instant::now();
}
} else if let Ok(sent) =
bot.send_message(chat_id, &display).await
{
msg_id = Some(sent.id);
if let Some(t) = new_text {
last_sent_text = t;
}
last_edit = Instant::now();
}
}
}
"result" => {
final_result = event.result.unwrap_or_default();
is_error = event.is_error;
}
_ => {}
}
}
// read stderr before waiting (in case child already exited)
let stderr_handle = child.stderr.take();
let status = child.wait().await;
// collect stderr for diagnostics
let stderr_text = if let Some(mut se) = stderr_handle {
let mut buf = String::new();
let _ = tokio::io::AsyncReadExt::read_to_string(&mut se, &mut buf).await;
buf
} else {
String::new()
};
// determine error: explicit is_error from stream, or non-zero exit with no result
let has_error = is_error
|| (final_result.is_empty()
&& status.as_ref().map(|s| !s.success()).unwrap_or(true));
if has_error {
let err_detail = if !final_result.is_empty() {
final_result.clone()
} else if !stderr_text.is_empty() {
stderr_text.trim().to_string()
} else {
format!("claude exited: {:?}", status)
};
if !use_draft {
if let Some(id) = msg_id {
let _ = bot
.edit_message_text(chat_id, id, format!("[error] {err_detail}"))
.await;
}
}
anyhow::bail!("{err_detail}");
}
if final_result.is_empty() {
return Ok(final_result);
}
send_final_result(bot, chat_id, msg_id, use_draft, &final_result).await;
Ok(final_result)
}
// ── openai-compatible backend (streaming) ──────────────────────────
pub async fn run_openai_streaming(
endpoint: &str,
model: &str,
api_key: &str,
messages: &[serde_json::Value],
bot: &Bot,
chat_id: ChatId,
) -> Result<String> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.build()
.unwrap();
let url = format!("{}/chat/completions", endpoint.trim_end_matches('/'));
let body = serde_json::json!({
"model": model,
"messages": messages,
"stream": true,
});
let mut resp = client
.post(&url)
.header("Authorization", format!("Bearer {api_key}"))
.json(&body)
.send()
.await?
.error_for_status()?;
let token = bot.token().to_owned();
let raw_chat_id = chat_id.0;
let draft_id: i64 = 1;
let mut use_draft = true;
let mut msg_id: Option<teloxide::types::MessageId> = None;
let mut accumulated = String::new();
let mut last_edit = Instant::now();
let mut buffer = String::new();
let mut done = false;
while let Some(chunk) = resp.chunk().await? {
if done {
break;
}
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(pos) = buffer.find('\n') {
let line = buffer[..pos].to_string();
buffer = buffer[pos + 1..].to_string();
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with(':') {
continue;
}
let data = match trimmed.strip_prefix("data: ") {
Some(d) => d,
None => continue,
};
if data.trim() == "[DONE]" {
done = true;
break;
}
if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
if let Some(content) = json["choices"][0]["delta"]["content"].as_str() {
if content.is_empty() {
continue;
}
accumulated.push_str(content);
let interval = if use_draft {
DRAFT_INTERVAL_MS
} else {
EDIT_INTERVAL_MS
};
if last_edit.elapsed().as_millis() < interval as u128 {
continue;
}
let display = if use_draft {
truncate_at_char_boundary(&accumulated, TG_MSG_LIMIT).to_string()
} else {
truncate_for_display(&accumulated)
};
if use_draft {
match send_message_draft(
&client, &token, raw_chat_id, draft_id, &display,
)
.await
{
Ok(_) => {
last_edit = Instant::now();
}
Err(e) => {
warn!("sendMessageDraft failed, falling back: {e:#}");
use_draft = false;
if let Ok(sent) = bot.send_message(chat_id, &display).await {
msg_id = Some(sent.id);
last_edit = Instant::now();
}
}
}
} else if let Some(id) = msg_id {
if bot.edit_message_text(chat_id, id, &display).await.is_ok() {
last_edit = Instant::now();
}
} else if let Ok(sent) = bot.send_message(chat_id, &display).await {
msg_id = Some(sent.id);
last_edit = Instant::now();
}
}
}
}
}
if accumulated.is_empty() {
return Ok(accumulated);
}
send_final_result(bot, chat_id, msg_id, use_draft, &accumulated).await;
Ok(accumulated)
}
pub fn build_system_prompt(summary: &str, persona: &str, memory_slots: &[(i32, String, String)], inner_state: &str) -> serde_json::Value {
let mut text = if persona.is_empty() {
String::from("你是一个AI助手。")

View File

@@ -4,17 +4,15 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use anyhow::Result;
use teloxide::prelude::*;
use teloxide::types::InputFile;
use tokio::io::AsyncBufReadExt;
use tokio::process::Command;
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use crate::config::{BackendConfig, Config};
use crate::config::Config;
use crate::display::truncate_at_char_boundary;
use crate::output::Output;
use crate::state::AppState;
use crate::stream::{build_system_prompt, run_openai_streaming};
// ── subagent & tool call ───────────────────────────────────────────
@@ -261,10 +259,10 @@ pub async fn execute_tool(
name: &str,
arguments: &str,
state: &Arc<AppState>,
bot: &Bot,
chat_id: ChatId,
output: &mut dyn Output,
sid: &str,
config: &Arc<Config>,
chat_id: i64,
) -> String {
let args: serde_json::Value = match serde_json::from_str(arguments) {
Ok(v) => v,
@@ -275,7 +273,7 @@ pub async fn execute_tool(
"spawn_agent" => {
let id = args["id"].as_str().unwrap_or("agent");
let task = args["task"].as_str().unwrap_or("");
spawn_agent(id, task, state, bot, chat_id, sid, config).await
spawn_agent(id, task, state, output, sid, config).await
}
"agent_status" => {
let id = args["id"].as_str().unwrap_or("");
@@ -295,13 +293,9 @@ pub async fn execute_tool(
if !path.is_file() {
return format!("Not a file: {path_str}");
}
let input_file = InputFile::file(path);
let mut req = bot.send_document(chat_id, input_file);
if !caption.is_empty() {
req = req.caption(caption);
}
match req.await {
Ok(_) => format!("File sent: {path_str}"),
match output.send_file(path, caption).await {
Ok(true) => format!("File sent: {path_str}"),
Ok(false) => format!("File sending not supported in this context: {path_str}"),
Err(e) => format!("Failed to send file: {e:#}"),
}
}
@@ -322,7 +316,7 @@ pub async fn execute_tool(
Ok(next) => {
let next_str = next.format("%Y-%m-%d %H:%M:%S").to_string();
let id = state
.add_timer(chat_id.0, label, schedule, &next_str)
.add_timer(chat_id, label, schedule, &next_str)
.await;
format!("Timer #{id} set: \"{label}\" → next fire at {next_str}")
}
@@ -330,7 +324,7 @@ pub async fn execute_tool(
}
}
"list_timers" => {
let timers = state.list_timers(Some(chat_id.0)).await;
let timers = state.list_timers(Some(chat_id)).await;
if timers.is_empty() {
"No active timers.".to_string()
} else {
@@ -424,9 +418,9 @@ pub async fn execute_tool(
let path_str = String::from_utf8_lossy(&out.stdout).trim().to_string();
let path = Path::new(&path_str);
if path.exists() {
let input_file = InputFile::file(path);
match bot.send_voice(chat_id, input_file).await {
Ok(_) => format!("语音已发送: {path_str}"),
match output.send_file(path, "").await {
Ok(true) => format!("语音已发送: {path_str}"),
Ok(false) => format!("语音生成成功但当前通道不支持发送文件: {path_str}"),
Err(e) => format!("语音生成成功但发送失败: {e:#}"),
}
} else {
@@ -450,10 +444,9 @@ pub async fn spawn_agent(
id: &str,
task: &str,
state: &Arc<AppState>,
bot: &Bot,
chat_id: ChatId,
sid: &str,
config: &Arc<Config>,
output: &dyn Output,
_sid: &str,
_config: &Arc<Config>,
) -> String {
// check if already exists
if state.agents.read().await.contains_key(id) {
@@ -471,13 +464,13 @@ pub async fn spawn_agent(
};
let pid = child.id();
let output = Arc::new(tokio::sync::RwLock::new(String::new()));
let agent_output = Arc::new(tokio::sync::RwLock::new(String::new()));
let completed = Arc::new(AtomicBool::new(false));
let exit_code = Arc::new(tokio::sync::RwLock::new(None));
let agent = Arc::new(SubAgent {
task: task.to_string(),
output: output.clone(),
output: agent_output.clone(),
completed: completed.clone(),
exit_code: exit_code.clone(),
pid,
@@ -485,15 +478,10 @@ pub async fn spawn_agent(
state.agents.write().await.insert(id.to_string(), agent);
// background task: collect output and wakeup on completion
let out = output.clone();
// background task: collect output
let out = agent_output.clone();
let done = completed.clone();
let ecode = exit_code.clone();
let bot_c = bot.clone();
let chat_id_c = chat_id;
let state_c = state.clone();
let config_c = config.clone();
let sid_c = sid.to_string();
let id_c = id.to_string();
tokio::spawn(async move {
@@ -512,75 +500,12 @@ pub async fn spawn_agent(
done.store(true, Ordering::SeqCst);
info!(agent = %id_c, "agent completed, exit={code:?}");
// wakeup: inject result and trigger LLM
let result = out.read().await.clone();
let result_short = truncate_at_char_boundary(&result, 4000);
let wakeup = format!(
"[Agent '{id_c}' 执行完成 (exit={})]\n{result_short}",
code.unwrap_or(-1)
);
if let Err(e) = agent_wakeup(
&config_c, &state_c, &bot_c, chat_id_c, &sid_c, &wakeup, &id_c,
)
.await
{
error!(agent = %id_c, "wakeup failed: {e:#}");
let _ = bot_c
.send_message(chat_id_c, format!("[agent wakeup error] {e:#}"))
.await;
}
});
let _ = output.status(&format!("Agent '{id}' spawned (pid={pid:?})")).await;
format!("Agent '{id}' spawned (pid={pid:?})")
}
pub async fn agent_wakeup(
config: &Config,
state: &AppState,
bot: &Bot,
chat_id: ChatId,
sid: &str,
wakeup_msg: &str,
agent_id: &str,
) -> Result<()> {
match &config.backend {
BackendConfig::OpenAI {
endpoint,
model,
api_key,
} => {
state.push_message(sid, "user", wakeup_msg).await;
let conv = state.load_conv(sid).await;
let persona = state.get_config("persona").await.unwrap_or_default();
let memory_slots = state.get_memory_slots().await;
let inner = state.get_inner_state().await;
let system_msg = build_system_prompt(&conv.summary, &persona, &memory_slots, &inner);
let mut api_messages = vec![system_msg];
api_messages.extend(conv.messages);
info!(agent = %agent_id, "wakeup: sending {} messages to LLM", api_messages.len());
let response =
run_openai_streaming(endpoint, model, api_key, &api_messages, bot, chat_id)
.await?;
if !response.is_empty() {
state.push_message(sid, "assistant", &response).await;
}
Ok(())
}
_ => {
let _ = bot
.send_message(chat_id, format!("[Agent '{agent_id}' done]\n{wakeup_msg}"))
.await;
Ok(())
}
}
}
pub async fn check_agent_status(id: &str, state: &AppState) -> String {
let agents = state.agents.read().await;
match agents.get(id) {