use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; use std::process::Stdio; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::Result; use chrono::{Local, NaiveDate, NaiveTime}; use serde::{Deserialize, Serialize}; use teloxide::dispatching::UpdateFilterExt; use teloxide::net::Download; use teloxide::prelude::*; use teloxide::types::InputFile; use tokio::io::AsyncBufReadExt; use tokio::process::Command; use tokio::sync::RwLock; use tokio::time::Instant; use tracing::{error, info, warn}; use base64::Engine; use uuid::Uuid; // ── config ────────────────────────────────────────────────────────── #[derive(Deserialize)] struct Config { #[serde(default = "default_name")] name: String, tg: TgConfig, auth: AuthConfig, session: SessionConfig, #[serde(default)] backend: BackendConfig, #[serde(default)] whisper_url: Option, } fn default_name() -> String { "noc".to_string() } #[derive(Deserialize, Clone, Default)] #[serde(tag = "type")] enum BackendConfig { #[serde(rename = "claude")] #[default] Claude, #[serde(rename = "openai")] OpenAI { endpoint: String, model: String, #[serde(default = "default_api_key")] api_key: String, }, } fn default_api_key() -> String { "unused".to_string() } #[derive(Deserialize)] struct TgConfig { key: String, } #[derive(Deserialize)] struct AuthConfig { passphrase: String, } #[derive(Deserialize)] struct SessionConfig { refresh_hour: u32, } // ── persistent state ──────────────────────────────────────────────── #[derive(Serialize, Deserialize, Default)] struct Persistent { authed: HashMap, known_sessions: HashSet, } #[derive(Serialize, Deserialize, Clone, Default)] struct ConversationState { summary: String, messages: Vec, total_messages: usize, } const MAX_WINDOW: usize = 100; const SLIDE_SIZE: usize = 50; // ── subagent & tool call ─────────────────────────────────────────── struct SubAgent { task: String, output: Arc>, completed: Arc, exit_code: Arc>>, pid: Option, } struct ToolCall { id: String, name: String, arguments: String, } fn tools_dir() -> PathBuf { // tools/ relative to the config file location let config_path = std::env::var("NOC_CONFIG").unwrap_or_else(|_| "config.yaml".into()); let config_dir = Path::new(&config_path) .parent() .unwrap_or(Path::new(".")); config_dir.join("tools") } /// Scan tools/ directory for scripts with --schema, merge with built-in tools. /// Called on every API request so new/updated scripts take effect immediately. fn discover_tools() -> serde_json::Value { let mut tools = vec![ serde_json::json!({ "type": "function", "function": { "name": "spawn_agent", "description": "启动一个 Claude Code 子代理异步执行复杂任务。子代理可使用 shell、浏览器和搜索引擎,适合网页搜索、资料查找、技术调研、代码任务等。完成后会收到通知。", "parameters": { "type": "object", "properties": { "id": {"type": "string", "description": "简短唯一标识符(如 'research'、'fix-bug')"}, "task": {"type": "string", "description": "给子代理的详细任务描述"} }, "required": ["id", "task"] } } }), serde_json::json!({ "type": "function", "function": { "name": "agent_status", "description": "查看正在运行或已完成的子代理的状态和输出", "parameters": { "type": "object", "properties": { "id": {"type": "string", "description": "子代理标识符"} }, "required": ["id"] } } }), serde_json::json!({ "type": "function", "function": { "name": "kill_agent", "description": "终止一个正在运行的子代理", "parameters": { "type": "object", "properties": { "id": {"type": "string", "description": "子代理标识符"} }, "required": ["id"] } } }), serde_json::json!({ "type": "function", "function": { "name": "send_file", "description": "通过 Telegram 向用户发送服务器上的文件,文件必须存在于服务器文件系统中。", "parameters": { "type": "object", "properties": { "path": {"type": "string", "description": "服务器上文件的绝对路径"}, "caption": {"type": "string", "description": "可选的文件说明/描述"} }, "required": ["path"] } } }), serde_json::json!({ "type": "function", "function": { "name": "update_scratch", "description": "更新你的草稿区(工作笔记、状态、提醒)。草稿区内容会附加到每条用户消息中,确保你始终可见。用于跨轮次跟踪上下文。", "parameters": { "type": "object", "properties": { "content": {"type": "string", "description": "完整的草稿区内容(替换之前的内容)"} }, "required": ["content"] } } }), serde_json::json!({ "type": "function", "function": { "name": "update_memory", "description": "写入持久记忆槽。共 100 个槽位(0-99),跨会话保留。记忆槽内容会注入到每次对话的 system prompt 中。用于存储关键事实、用户偏好或重要上下文。内容设为空字符串可清除槽位。", "parameters": { "type": "object", "properties": { "slot_nr": {"type": "integer", "description": "槽位编号(0-99)"}, "content": {"type": "string", "description": "要存储的内容(最多200字符),空字符串表示清除该槽位"} }, "required": ["slot_nr", "content"] } } }), serde_json::json!({ "type": "function", "function": { "name": "gen_voice", "description": "将文字合成为语音并直接发送给用户。", "parameters": { "type": "object", "properties": { "text": {"type": "string", "description": "要合成语音的文字内容"} }, "required": ["text"] } } }), ]; // discover script tools let dir = tools_dir(); if let Ok(entries) = std::fs::read_dir(&dir) { for entry in entries.flatten() { let path = entry.path(); if !path.is_file() { continue; } // run --schema with a short timeout let output = std::process::Command::new(&path) .arg("--schema") .output(); match output { Ok(out) if out.status.success() => { let stdout = String::from_utf8_lossy(&out.stdout); match serde_json::from_str::(stdout.trim()) { Ok(schema) => { let name = schema["name"].as_str().unwrap_or("?"); info!(tool = %name, path = %path.display(), "discovered script tool"); tools.push(serde_json::json!({ "type": "function", "function": schema, })); } Err(e) => { warn!(path = %path.display(), "invalid --schema JSON: {e}"); } } } _ => {} // not a tool script, skip silently } } } serde_json::Value::Array(tools) } struct AppState { persist: RwLock, state_path: PathBuf, db: tokio::sync::Mutex, agents: RwLock>>, } impl AppState { fn load(path: PathBuf) -> Self { let persist = std::fs::read_to_string(&path) .ok() .and_then(|s| serde_json::from_str(&s).ok()) .unwrap_or_default(); info!("loaded state from {}", path.display()); let db_path = path.parent().unwrap_or(Path::new(".")).join("noc.db"); let conn = rusqlite::Connection::open(&db_path) .unwrap_or_else(|e| panic!("open {}: {e}", db_path.display())); conn.execute_batch( "CREATE TABLE IF NOT EXISTS conversations ( session_id TEXT PRIMARY KEY, summary TEXT NOT NULL DEFAULT '', total_messages INTEGER NOT NULL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')) ); CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id); CREATE TABLE IF NOT EXISTS scratch_area ( id INTEGER PRIMARY KEY AUTOINCREMENT, content TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS config ( key TEXT PRIMARY KEY, value TEXT NOT NULL DEFAULT '', create_time TEXT NOT NULL DEFAULT (datetime('now')), update_time TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS config_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT NOT NULL, value TEXT NOT NULL, create_time TEXT NOT NULL, update_time TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS memory_slots ( slot_nr INTEGER PRIMARY KEY CHECK(slot_nr BETWEEN 0 AND 99), content TEXT NOT NULL DEFAULT '' );", ) .expect("init db schema"); // migrations let _ = conn.execute( "ALTER TABLE messages ADD COLUMN created_at TEXT NOT NULL DEFAULT ''", [], ); info!("opened db {}", db_path.display()); Self { persist: RwLock::new(persist), state_path: path, db: tokio::sync::Mutex::new(conn), agents: RwLock::new(HashMap::new()), } } async fn save(&self) { let data = self.persist.read().await; if let Ok(json) = serde_json::to_string_pretty(&*data) { if let Err(e) = std::fs::write(&self.state_path, json) { error!("save state: {e}"); } } } async fn load_conv(&self, sid: &str) -> ConversationState { let db = self.db.lock().await; let (summary, total) = db .query_row( "SELECT summary, total_messages FROM conversations WHERE session_id = ?1", [sid], |row| Ok((row.get::<_, String>(0)?, row.get::<_, usize>(1)?)), ) .unwrap_or_default(); let mut stmt = db .prepare("SELECT role, content, created_at FROM messages WHERE session_id = ?1 ORDER BY id") .unwrap(); let messages: Vec = stmt .query_map([sid], |row| { let role: String = row.get(0)?; let content: String = row.get(1)?; let ts: String = row.get(2)?; let tagged = if ts.is_empty() { content } else { format!("[{ts}] {content}") }; Ok(serde_json::json!({"role": role, "content": tagged})) }) .unwrap() .filter_map(|r| r.ok()) .collect(); ConversationState { summary, messages, total_messages: total, } } async fn push_message(&self, sid: &str, role: &str, content: &str) { let db = self.db.lock().await; let _ = db.execute( "INSERT OR IGNORE INTO conversations (session_id) VALUES (?1)", [sid], ); let _ = db.execute( "INSERT INTO messages (session_id, role, content, created_at) VALUES (?1, ?2, ?3, datetime('now', 'localtime'))", rusqlite::params![sid, role, content], ); } async fn message_count(&self, sid: &str) -> usize { let db = self.db.lock().await; db.query_row( "SELECT COUNT(*) FROM messages WHERE session_id = ?1", [sid], |row| row.get(0), ) .unwrap_or(0) } async fn slide_window(&self, sid: &str, new_summary: &str, slide_size: usize) { let db = self.db.lock().await; let _ = db.execute( "DELETE FROM messages WHERE id IN ( SELECT id FROM messages WHERE session_id = ?1 ORDER BY id LIMIT ?2 )", rusqlite::params![sid, slide_size], ); let _ = db.execute( "UPDATE conversations SET summary = ?1, total_messages = total_messages + ?2 \ WHERE session_id = ?3", rusqlite::params![new_summary, slide_size, sid], ); } async fn get_oldest_messages(&self, sid: &str, count: usize) -> Vec { let db = self.db.lock().await; let mut stmt = db .prepare( "SELECT role, content FROM messages WHERE session_id = ?1 ORDER BY id LIMIT ?2", ) .unwrap(); stmt.query_map(rusqlite::params![sid, count], |row| { let role: String = row.get(0)?; let content: String = row.get(1)?; Ok(serde_json::json!({"role": role, "content": content})) }) .unwrap() .filter_map(|r| r.ok()) .collect() } async fn get_scratch(&self) -> String { let db = self.db.lock().await; db.query_row( "SELECT content FROM scratch_area ORDER BY id DESC LIMIT 1", [], |row| row.get(0), ) .unwrap_or_default() } async fn push_scratch(&self, content: &str) { let db = self.db.lock().await; let _ = db.execute( "INSERT INTO scratch_area (content) VALUES (?1)", [content], ); } async fn get_config(&self, key: &str) -> Option { let db = self.db.lock().await; db.query_row( "SELECT value FROM config WHERE key = ?1", [key], |row| row.get(0), ) .ok() } async fn get_memory_slots(&self) -> Vec<(i32, String)> { let db = self.db.lock().await; let mut stmt = db .prepare("SELECT slot_nr, content FROM memory_slots WHERE content != '' ORDER BY slot_nr") .unwrap(); stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?))) .unwrap() .filter_map(|r| r.ok()) .collect() } async fn set_memory_slot(&self, slot_nr: i32, content: &str) -> Result<()> { if !(0..=99).contains(&slot_nr) { anyhow::bail!("slot_nr must be 0-99, got {slot_nr}"); } if content.len() > 200 { anyhow::bail!("content too long: {} chars (max 200)", content.len()); } let db = self.db.lock().await; db.execute( "INSERT INTO memory_slots (slot_nr, content) VALUES (?1, ?2) \ ON CONFLICT(slot_nr) DO UPDATE SET content = ?2", rusqlite::params![slot_nr, content], )?; Ok(()) } } // ── helpers ───────────────────────────────────────────────────────── fn session_date(refresh_hour: u32) -> NaiveDate { let now = Local::now(); let refresh = NaiveTime::from_hms_opt(refresh_hour, 0, 0).unwrap(); if now.time() < refresh { now.date_naive() - chrono::Duration::days(1) } else { now.date_naive() } } fn session_uuid(prefix: &str, chat_id: i64, refresh_hour: u32) -> String { let date = session_date(refresh_hour); let name = format!("{}-{}-{}", prefix, chat_id, date.format("%Y%m%d")); Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()).to_string() } fn home_dir() -> PathBuf { PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/tmp".into())) } fn incoming_dir() -> PathBuf { home_dir().join("incoming") } fn outgoing_dir(sid: &str) -> PathBuf { home_dir().join("outgoing").join(sid) } // ── main ──────────────────────────────────────────────────────────── #[tokio::main] async fn main() { tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::from_default_env() .add_directive("noc=info".parse().unwrap()), ) .init(); let config_path = std::env::var("NOC_CONFIG").unwrap_or_else(|_| "config.yaml".into()); let raw = std::fs::read_to_string(&config_path) .unwrap_or_else(|e| panic!("read {config_path}: {e}")); let config: Config = serde_yaml::from_str(&raw).unwrap_or_else(|e| panic!("parse config: {e}")); let state_path = std::env::var("NOC_STATE") .map(PathBuf::from) .unwrap_or_else(|_| PathBuf::from("state.json")); let state = Arc::new(AppState::load(state_path)); let _ = std::fs::create_dir_all(incoming_dir()); let bot = Bot::new(&config.tg.key); let me = bot.get_me().await.unwrap(); let bot_username = Arc::new(me.username.clone().unwrap_or_default()); info!(username = %bot_username, "noc bot starting"); let handler = Update::filter_message().endpoint(handle); Dispatcher::builder(bot, handler) .dependencies(dptree::deps![state, Arc::new(config), bot_username]) .default_handler(|_| async {}) .build() .dispatch() .await; } // ── file download ─────────────────────────────────────────────────── async fn download_tg_file(bot: &Bot, file_id: &str, filename: &str) -> Result { let dir = incoming_dir(); tokio::fs::create_dir_all(&dir).await?; let dest = dir.join(filename); let tf = bot.get_file(file_id).await?; let mut file = tokio::fs::File::create(&dest).await?; bot.download_file(&tf.path, &mut file).await?; info!("downloaded {} -> {}", filename, dest.display()); Ok(dest) } // ── outgoing scan ─────────────────────────────────────────────────── async fn snapshot_dir(dir: &Path) -> HashSet { let mut set = HashSet::new(); if let Ok(mut entries) = tokio::fs::read_dir(dir).await { while let Ok(Some(entry)) = entries.next_entry().await { let path = entry.path(); if path.is_file() { set.insert(path); } } } set } async fn new_files_in(dir: &Path, before: &HashSet) -> Vec { let mut files = Vec::new(); if let Ok(mut entries) = tokio::fs::read_dir(dir).await { while let Ok(Some(entry)) = entries.next_entry().await { let path = entry.path(); if path.is_file() && !before.contains(&path) { files.push(path); } } } files.sort(); files } // ── handler ───────────────────────────────────────────────────────── async fn handle( bot: Bot, msg: Message, state: Arc, config: Arc, _bot_username: Arc, ) -> ResponseResult<()> { let chat_id = msg.chat.id; let is_private = msg.chat.is_private(); let text = msg.text().or(msg.caption()).unwrap_or("").to_string(); let raw_id = chat_id.0; let date = session_date(config.session.refresh_hour); let is_authed = { let p = state.persist.read().await; p.authed.get(&raw_id) == Some(&date) }; if !is_authed { if text.trim() == config.auth.passphrase { { let mut p = state.persist.write().await; p.authed.insert(raw_id, date); } state.save().await; bot.send_message(chat_id, "authenticated").await?; info!(chat = raw_id, "authed"); } else { bot.send_message(chat_id, "not authenticated").await?; } return Ok(()); } if let Err(e) = handle_inner(&bot, &msg, chat_id, &text, is_private, &state, &config).await { error!(chat = raw_id, "handle: {e:#}"); let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; } Ok(()) } async fn handle_inner( bot: &Bot, msg: &Message, chat_id: ChatId, text: &str, is_private: bool, state: &Arc, config: &Arc, ) -> Result<()> { let mut uploaded: Vec = Vec::new(); let mut download_errors: Vec = Vec::new(); let mut transcriptions: Vec = Vec::new(); if let Some(doc) = msg.document() { let name = doc.file_name.as_deref().unwrap_or("file"); match download_tg_file(bot, &doc.file.id, name).await { Ok(p) => uploaded.push(p), Err(e) => download_errors.push(format!("{name}: {e:#}")), } } if let Some(photos) = msg.photo() { if let Some(photo) = photos.last() { let name = format!("photo_{}.jpg", Local::now().format("%H%M%S")); match download_tg_file(bot, &photo.file.id, &name).await { Ok(p) => uploaded.push(p), Err(e) => download_errors.push(format!("photo: {e:#}")), } } } if let Some(audio) = msg.audio() { let fallback = format!("audio_{}.ogg", Local::now().format("%H%M%S")); let name = audio.file_name.as_deref().unwrap_or(&fallback); match download_tg_file(bot, &audio.file.id, name).await { Ok(p) => { if let Some(url) = &config.whisper_url { match transcribe_audio(url, &p).await { Ok(t) if !t.is_empty() => transcriptions.push(t), Ok(_) => uploaded.push(p), Err(e) => { warn!("transcribe failed: {e:#}"); uploaded.push(p); } } } else { uploaded.push(p); } } Err(e) => download_errors.push(format!("audio: {e:#}")), } } if let Some(voice) = msg.voice() { let name = format!("voice_{}.ogg", Local::now().format("%H%M%S")); match download_tg_file(bot, &voice.file.id, &name).await { Ok(p) => { if let Some(url) = &config.whisper_url { match transcribe_audio(url, &p).await { Ok(t) if !t.is_empty() => transcriptions.push(t), Ok(_) => uploaded.push(p), Err(e) => { warn!("transcribe failed: {e:#}"); uploaded.push(p); } } } else { uploaded.push(p); } } Err(e) => download_errors.push(format!("voice: {e:#}")), } } if let Some(video) = msg.video() { let fallback = format!("video_{}.mp4", Local::now().format("%H%M%S")); let name = video.file_name.as_deref().unwrap_or(&fallback); match download_tg_file(bot, &video.file.id, name).await { Ok(p) => uploaded.push(p), Err(e) => download_errors.push(format!("video: {e:#}")), } } if let Some(vn) = msg.video_note() { let name = format!("videonote_{}.mp4", Local::now().format("%H%M%S")); match download_tg_file(bot, &vn.file.id, &name).await { Ok(p) => uploaded.push(p), Err(e) => download_errors.push(format!("video_note: {e:#}")), } } if text.is_empty() && uploaded.is_empty() && transcriptions.is_empty() { if !download_errors.is_empty() { let err_msg = format!("[文件下载失败]\n{}", download_errors.join("\n")); bot.send_message(chat_id, err_msg).await?; } return Ok(()); } let sid = session_uuid(&config.name, chat_id.0, config.session.refresh_hour); info!(%sid, "recv"); let out_dir = outgoing_dir(&sid); tokio::fs::create_dir_all(&out_dir).await?; let before = snapshot_dir(&out_dir).await; // handle diag command (OpenAI backend only) if text.trim() == "diag" { if let BackendConfig::OpenAI { .. } = &config.backend { let conv = state.load_conv(&sid).await; let count = state.message_count(&sid).await; let persona = state.get_config("persona").await.unwrap_or_default(); let scratch = state.get_scratch().await; let tools = discover_tools(); let empty = vec![]; let tools_arr = tools.as_array().unwrap_or(&empty); let mut diag = format!( "# NOC Diag\n\n\ ## Session\n\ - id: `{sid}`\n\ - window: {count}/{MAX_WINDOW} (slide at {MAX_WINDOW}, drop {SLIDE_SIZE})\n\ - total processed: {}\n\n\ ## Persona ({} chars)\n```\n{}\n```\n\n\ ## Scratch ({} chars)\n```\n{}\n```\n\n\ ## Summary ({} chars)\n```\n{}\n```\n\n\ ## Tools ({} registered)\n", conv.total_messages + count, persona.len(), if persona.is_empty() { "(default)" } else { &persona }, scratch.len(), if scratch.is_empty() { "(empty)" } else { &scratch }, conv.summary.len(), if conv.summary.is_empty() { "(empty)".to_string() } else { conv.summary }, tools_arr.len(), ); for tool in tools_arr { let func = &tool["function"]; let name = func["name"].as_str().unwrap_or("?"); let desc = func["description"].as_str().unwrap_or(""); let params = serde_json::to_string_pretty(&func["parameters"]) .unwrap_or_default(); diag.push_str(&format!( "### `{name}`\n{desc}\n\n```json\n{params}\n```\n\n" )); } let memory_slots = state.get_memory_slots().await; diag.push_str(&format!("## Memory Slots ({}/100 used)\n", memory_slots.len())); if memory_slots.is_empty() { diag.push_str("(empty)\n\n"); } else { for (nr, content) in &memory_slots { diag.push_str(&format!("- `[{nr}]` {content}\n")); } diag.push('\n'); } let tmp = std::env::temp_dir().join(format!("noc-diag-{sid}.md")); tokio::fs::write(&tmp, &diag).await?; bot.send_document(chat_id, InputFile::file(&tmp)) .await?; let _ = tokio::fs::remove_file(&tmp).await; return Ok(()); } } // handle "cc" prefix: pass directly to claude -p, no session, no history if let Some(cc_prompt) = text.strip_prefix("cc").map(|s| s.trim_start()) { if !cc_prompt.is_empty() { info!(%sid, "cc passthrough"); let prompt = build_prompt(cc_prompt, &uploaded, &download_errors, &transcriptions); match run_claude_streaming(&[], &prompt, bot, chat_id).await { Ok(_) => {} Err(e) => { error!(%sid, "cc claude: {e:#}"); let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; } } return Ok(()); } } let prompt = build_prompt(text, &uploaded, &download_errors, &transcriptions); match &config.backend { BackendConfig::Claude => { let known = state.persist.read().await.known_sessions.contains(&sid); let result = invoke_claude_streaming(&sid, &prompt, known, bot, chat_id).await; match &result { Ok(_) => { if !known { state.persist.write().await.known_sessions.insert(sid.clone()); state.save().await; } } Err(e) => { error!(%sid, "claude: {e:#}"); let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; } } } BackendConfig::OpenAI { endpoint, model, api_key, } => { let conv = state.load_conv(&sid).await; let persona = state.get_config("persona").await.unwrap_or_default(); let memory_slots = state.get_memory_slots().await; let system_msg = build_system_prompt(&conv.summary, &persona, &memory_slots); let mut api_messages = vec![system_msg]; api_messages.extend(conv.messages); let scratch = state.get_scratch().await; let user_content = build_user_content(&prompt, &scratch, &uploaded); api_messages.push(serde_json::json!({"role": "user", "content": user_content})); match run_openai_with_tools( endpoint, model, api_key, api_messages, bot, chat_id, state, &sid, config, is_private, ) .await { Ok(response) => { state.push_message(&sid, "user", &prompt).await; if !response.is_empty() { state.push_message(&sid, "assistant", &response).await; } // sliding window let count = state.message_count(&sid).await; if count >= MAX_WINDOW { info!(%sid, "sliding window: {count} messages, summarizing oldest {SLIDE_SIZE}"); let _ = bot .send_message(chat_id, "[整理记忆中...]") .await; let to_summarize = state.get_oldest_messages(&sid, SLIDE_SIZE).await; let current_summary = { let db = state.db.lock().await; db.query_row( "SELECT summary FROM conversations WHERE session_id = ?1", [&sid], |row| row.get::<_, String>(0), ) .unwrap_or_default() }; match summarize_messages( endpoint, model, api_key, ¤t_summary, &to_summarize, ) .await { Ok(new_summary) => { state.slide_window(&sid, &new_summary, SLIDE_SIZE).await; let remaining = state.message_count(&sid).await; info!(%sid, "window slid, {remaining} messages remain, summary {} chars", new_summary.len()); } Err(e) => { warn!(%sid, "summarize failed: {e:#}, keeping all messages"); } } } } Err(e) => { error!(%sid, "openai: {e:#}"); let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; } } } } // send new files from outgoing dir let new_files = new_files_in(&out_dir, &before).await; for path in &new_files { info!(%sid, "sending file: {}", path.display()); if let Err(e) = bot.send_document(chat_id, InputFile::file(path)).await { error!(%sid, "send_document {}: {e:#}", path.display()); let _ = bot .send_message(chat_id, format!("[发送文件失败: {}]", path.display())) .await; } } Ok(()) } fn build_prompt( text: &str, uploaded: &[PathBuf], errors: &[String], transcriptions: &[String], ) -> String { let mut parts = Vec::new(); for t in transcriptions { parts.push(format!("[语音消息] {t}")); } for f in uploaded { parts.push(format!("[用户上传了文件: {}]", f.display())); } for e in errors { parts.push(format!("[文件下载失败: {e}]")); } if !text.is_empty() { parts.push(text.to_string()); } parts.join("\n") } async fn transcribe_audio(whisper_url: &str, file_path: &Path) -> Result { let client = reqwest::Client::new(); let url = format!("{}/v1/audio/transcriptions", whisper_url.trim_end_matches('/')); let file_bytes = tokio::fs::read(file_path).await?; let file_name = file_path .file_name() .and_then(|n| n.to_str()) .unwrap_or("audio.ogg") .to_string(); let part = reqwest::multipart::Part::bytes(file_bytes) .file_name(file_name) .mime_str("audio/ogg")?; let form = reqwest::multipart::Form::new() .part("file", part) .text("model", "base"); let resp = client.post(&url).multipart(form).send().await?.error_for_status()?; let json: serde_json::Value = resp.json().await?; Ok(json["text"].as_str().unwrap_or("").to_string()) } fn build_system_prompt(summary: &str, persona: &str, memory_slots: &[(i32, String)]) -> serde_json::Value { let mut text = if persona.is_empty() { String::from("你是一个AI助手。") } else { persona.to_string() }; text.push_str( "\n\n你可以使用提供的工具来完成任务。\ 当需要执行命令、运行代码或启动复杂子任务时,直接调用对应的工具,不要只是描述你会怎么做。\ 当需要搜索信息(如网页搜索、资料查找、技术调研等)时,使用 spawn_agent 启动一个子代理来完成搜索任务,\ 子代理可以使用浏览器和搜索引擎,搜索完成后你会收到结果通知。\ 输出格式:使用纯文本或基础Markdown(加粗、列表、代码块)。\ 不要使用LaTeX公式($...$)、特殊Unicode符号(→←↔)或HTML标签,Telegram无法渲染这些。", ); if !memory_slots.is_empty() { text.push_str("\n\n## 持久记忆(跨会话保留)\n"); for (nr, content) in memory_slots { text.push_str(&format!("[{nr}] {content}\n")); } } if !summary.is_empty() { text.push_str("\n\n## 之前的对话总结\n"); text.push_str(summary); } serde_json::json!({"role": "system", "content": text}) } /// Build user message content, with optional images/videos as multimodal input. fn build_user_content( text: &str, scratch: &str, media: &[PathBuf], ) -> serde_json::Value { let full_text = if scratch.is_empty() { text.to_string() } else { format!("{text}\n\n[scratch]\n{scratch}") }; // collect media data (images + videos) let mut media_parts: Vec = Vec::new(); for path in media { let (mime, is_video) = match path .extension() .and_then(|e| e.to_str()) .map(|e| e.to_lowercase()) .as_deref() { Some("jpg" | "jpeg") => ("image/jpeg", false), Some("png") => ("image/png", false), Some("gif") => ("image/gif", false), Some("webp") => ("image/webp", false), Some("mp4") => ("video/mp4", true), Some("webm") => ("video/webm", true), Some("mov") => ("video/quicktime", true), _ => continue, }; if let Ok(data) = std::fs::read(path) { let b64 = base64::engine::general_purpose::STANDARD.encode(&data); let data_url = format!("data:{mime};base64,{b64}"); if is_video { media_parts.push(serde_json::json!({ "type": "video_url", "video_url": {"url": data_url} })); } else { media_parts.push(serde_json::json!({ "type": "image_url", "image_url": {"url": data_url} })); } } } if media_parts.is_empty() { // plain text — more compatible serde_json::Value::String(full_text) } else { // multimodal array let mut content = vec![serde_json::json!({"type": "text", "text": full_text})]; content.extend(media_parts); serde_json::Value::Array(content) } } async fn summarize_messages( endpoint: &str, model: &str, api_key: &str, existing_summary: &str, dropped: &[serde_json::Value], ) -> Result { let msgs_text: String = dropped .iter() .filter_map(|m| { let role = m["role"].as_str()?; let content = m["content"].as_str()?; Some(format!("{role}: {content}")) }) .collect::>() .join("\n\n"); let prompt = if existing_summary.is_empty() { format!( "请将以下对话总结为约4000字符的摘要,保留关键信息和上下文:\n\n{}", msgs_text ) } else { format!( "请将以下新对话内容整合到现有总结中,保持总结在约4000字符以内。\ 保留重要信息,让较旧的话题自然淡出。\n\n\ 现有总结:\n{}\n\n新对话:\n{}", existing_summary, msgs_text ) }; let client = reqwest::Client::new(); let url = format!("{}/chat/completions", endpoint.trim_end_matches('/')); let body = serde_json::json!({ "model": model, "messages": [ {"role": "system", "content": "你是一个对话总结助手。请生成简洁但信息丰富的总结。"}, {"role": "user", "content": prompt} ], }); let resp = client .post(&url) .header("Authorization", format!("Bearer {api_key}")) .json(&body) .send() .await? .error_for_status()?; let json: serde_json::Value = resp.json().await?; let summary = json["choices"][0]["message"]["content"] .as_str() .unwrap_or("") .to_string(); Ok(summary) } // ── tool execution ───────────────────────────────────────────────── async fn execute_tool( name: &str, arguments: &str, state: &Arc, bot: &Bot, chat_id: ChatId, sid: &str, config: &Arc, ) -> String { let args: serde_json::Value = match serde_json::from_str(arguments) { Ok(v) => v, Err(e) => return format!("Invalid arguments: {e}"), }; match name { "spawn_agent" => { let id = args["id"].as_str().unwrap_or("agent"); let task = args["task"].as_str().unwrap_or(""); spawn_agent(id, task, state, bot, chat_id, sid, config).await } "agent_status" => { let id = args["id"].as_str().unwrap_or(""); check_agent_status(id, state).await } "kill_agent" => { let id = args["id"].as_str().unwrap_or(""); kill_agent(id, state).await } "send_file" => { let path_str = args["path"].as_str().unwrap_or(""); let caption = args["caption"].as_str().unwrap_or(""); let path = Path::new(path_str); if !path.exists() { return format!("File not found: {path_str}"); } if !path.is_file() { return format!("Not a file: {path_str}"); } let input_file = InputFile::file(path); let mut req = bot.send_document(chat_id, input_file); if !caption.is_empty() { req = req.caption(caption); } match req.await { Ok(_) => format!("File sent: {path_str}"), Err(e) => format!("Failed to send file: {e:#}"), } } "update_scratch" => { let content = args["content"].as_str().unwrap_or(""); state.push_scratch(content).await; format!("Scratch updated ({} chars)", content.len()) } "update_memory" => { let slot_nr = args["slot_nr"].as_i64().unwrap_or(-1) as i32; let content = args["content"].as_str().unwrap_or(""); match state.set_memory_slot(slot_nr, content).await { Ok(_) => { if content.is_empty() { format!("Memory slot {slot_nr} cleared") } else { format!("Memory slot {slot_nr} updated ({} chars)", content.len()) } } Err(e) => format!("Error: {e}"), } } "gen_voice" => { let text = args["text"].as_str().unwrap_or(""); if text.is_empty() { return "Error: text is required".to_string(); } let script = tools_dir().join("gen_voice"); let result = tokio::time::timeout( std::time::Duration::from_secs(120), tokio::process::Command::new(&script) .arg(arguments) .output(), ) .await; match result { Ok(Ok(out)) if out.status.success() => { let path_str = String::from_utf8_lossy(&out.stdout).trim().to_string(); let path = Path::new(&path_str); if path.exists() { let input_file = InputFile::file(path); match bot.send_voice(chat_id, input_file).await { Ok(_) => format!("语音已发送: {path_str}"), Err(e) => format!("语音生成成功但发送失败: {e:#}"), } } else { format!("语音生成失败: 输出文件不存在 ({path_str})") } } Ok(Ok(out)) => { let stderr = String::from_utf8_lossy(&out.stderr); let stdout = String::from_utf8_lossy(&out.stdout); format!("gen_voice failed: {stdout} {stderr}") } Ok(Err(e)) => format!("gen_voice exec error: {e}"), Err(_) => "gen_voice timeout (120s)".to_string(), } } _ => run_script_tool(name, arguments).await, } } async fn spawn_agent( id: &str, task: &str, state: &Arc, bot: &Bot, chat_id: ChatId, sid: &str, config: &Arc, ) -> String { // check if already exists if state.agents.read().await.contains_key(id) { return format!("Agent '{id}' already exists. Use agent_status to check it."); } let mut child = match Command::new("claude") .args(["--dangerously-skip-permissions", "-p", task]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() { Ok(c) => c, Err(e) => return format!("Failed to spawn agent: {e}"), }; let pid = child.id(); let output = Arc::new(tokio::sync::RwLock::new(String::new())); let completed = Arc::new(AtomicBool::new(false)); let exit_code = Arc::new(tokio::sync::RwLock::new(None)); let agent = Arc::new(SubAgent { task: task.to_string(), output: output.clone(), completed: completed.clone(), exit_code: exit_code.clone(), pid, }); state.agents.write().await.insert(id.to_string(), agent); // background task: collect output and wakeup on completion let out = output.clone(); let done = completed.clone(); let ecode = exit_code.clone(); let bot_c = bot.clone(); let chat_id_c = chat_id; let state_c = state.clone(); let config_c = config.clone(); let sid_c = sid.to_string(); let id_c = id.to_string(); tokio::spawn(async move { let stdout = child.stdout.take(); if let Some(stdout) = stdout { let mut lines = tokio::io::BufReader::new(stdout).lines(); while let Ok(Some(line)) = lines.next_line().await { let mut o = out.write().await; o.push_str(&line); o.push('\n'); } } let status = child.wait().await; let code = status.as_ref().ok().and_then(|s| s.code()); *ecode.write().await = code; done.store(true, Ordering::SeqCst); info!(agent = %id_c, "agent completed, exit={code:?}"); // wakeup: inject result and trigger LLM let result = out.read().await.clone(); let result_short = truncate_at_char_boundary(&result, 4000); let wakeup = format!( "[Agent '{id_c}' 执行完成 (exit={})]\n{result_short}", code.unwrap_or(-1) ); if let Err(e) = agent_wakeup( &config_c, &state_c, &bot_c, chat_id_c, &sid_c, &wakeup, &id_c, ) .await { error!(agent = %id_c, "wakeup failed: {e:#}"); let _ = bot_c .send_message(chat_id_c, format!("[agent wakeup error] {e:#}")) .await; } }); format!("Agent '{id}' spawned (pid={pid:?})") } async fn agent_wakeup( config: &Config, state: &AppState, bot: &Bot, chat_id: ChatId, sid: &str, wakeup_msg: &str, agent_id: &str, ) -> Result<()> { match &config.backend { BackendConfig::OpenAI { endpoint, model, api_key, } => { state.push_message(sid, "user", wakeup_msg).await; let conv = state.load_conv(sid).await; let persona = state.get_config("persona").await.unwrap_or_default(); let memory_slots = state.get_memory_slots().await; let system_msg = build_system_prompt(&conv.summary, &persona, &memory_slots); let mut api_messages = vec![system_msg]; api_messages.extend(conv.messages); info!(agent = %agent_id, "wakeup: sending {} messages to LLM", api_messages.len()); let response = run_openai_streaming(endpoint, model, api_key, &api_messages, bot, chat_id) .await?; if !response.is_empty() { state.push_message(sid, "assistant", &response).await; } Ok(()) } _ => { let _ = bot .send_message(chat_id, format!("[Agent '{agent_id}' done]\n{wakeup_msg}")) .await; Ok(()) } } } async fn check_agent_status(id: &str, state: &AppState) -> String { let agents = state.agents.read().await; match agents.get(id) { Some(agent) => { let status = if agent.completed.load(Ordering::SeqCst) { let code = agent.exit_code.read().await; format!("completed (exit={})", code.unwrap_or(-1)) } else { "running".to_string() }; let output = agent.output.read().await; let out_preview = truncate_at_char_boundary(&output, 3000); format!( "Agent '{id}': {status}\nTask: {}\nOutput ({} bytes):\n{out_preview}", agent.task, output.len() ) } None => format!("Agent '{id}' not found"), } } async fn kill_agent(id: &str, state: &AppState) -> String { let agents = state.agents.read().await; match agents.get(id) { Some(agent) => { if agent.completed.load(Ordering::SeqCst) { return format!("Agent '{id}' already completed"); } if let Some(pid) = agent.pid { unsafe { libc::kill(pid as i32, libc::SIGTERM); } format!("Sent SIGTERM to agent '{id}' (pid={pid})") } else { format!("Agent '{id}' has no PID") } } None => format!("Agent '{id}' not found"), } } async fn run_script_tool(name: &str, arguments: &str) -> String { // find script in tools/ that matches this tool name let dir = tools_dir(); let entries = match std::fs::read_dir(&dir) { Ok(e) => e, Err(_) => return format!("Unknown tool: {name}"), }; for entry in entries.flatten() { let path = entry.path(); if !path.is_file() { continue; } // check if this script provides the requested tool let schema_out = std::process::Command::new(&path) .arg("--schema") .output(); if let Ok(out) = schema_out { if out.status.success() { let stdout = String::from_utf8_lossy(&out.stdout); if let Ok(schema) = serde_json::from_str::(stdout.trim()) { if schema["name"].as_str() == Some(name) { // found it — execute info!(tool = %name, path = %path.display(), "running script tool"); let result = tokio::time::timeout( std::time::Duration::from_secs(60), Command::new(&path).arg(arguments).output(), ) .await; return match result { Ok(Ok(output)) => { let mut s = String::from_utf8_lossy(&output.stdout).to_string(); let stderr = String::from_utf8_lossy(&output.stderr); if !stderr.is_empty() { if !s.is_empty() { s.push_str("\n[stderr]\n"); } s.push_str(&stderr); } if s.is_empty() { format!("(exit={})", output.status.code().unwrap_or(-1)) } else { s } } Ok(Err(e)) => format!("Failed to execute {name}: {e}"), Err(_) => "Timeout after 60s".to_string(), }; } } } } } format!("Unknown tool: {name}") } // ── openai with tool call loop ───────────────────────────────────── #[allow(clippy::too_many_arguments)] async fn run_openai_with_tools( endpoint: &str, model: &str, api_key: &str, mut messages: Vec, bot: &Bot, chat_id: ChatId, state: &Arc, sid: &str, config: &Arc, is_private: bool, ) -> Result { let client = reqwest::Client::new(); let url = format!("{}/chat/completions", endpoint.trim_end_matches('/')); let tools = discover_tools(); loop { let body = serde_json::json!({ "model": model, "messages": messages, "tools": tools, "stream": true, }); info!("API request: {} messages, {} tools", messages.len(), tools.as_array().map(|a| a.len()).unwrap_or(0)); let resp_raw = client .post(&url) .header("Authorization", format!("Bearer {api_key}")) .json(&body) .send() .await?; if !resp_raw.status().is_success() { let status = resp_raw.status(); let body_text = resp_raw.text().await.unwrap_or_default(); // dump messages for debugging for (i, m) in messages.iter().enumerate() { let role = m["role"].as_str().unwrap_or("?"); let content_len = m["content"].as_str().map(|s| s.len()).unwrap_or(0); let has_tc = m.get("tool_calls").is_some(); let has_tcid = m.get("tool_call_id").is_some(); warn!(" msg[{i}] role={role} content_len={content_len} tool_calls={has_tc} tool_call_id={has_tcid}"); } error!("OpenAI API {status}: {body_text}"); anyhow::bail!("OpenAI API {status}: {body_text}"); } let mut resp = resp_raw; let token = bot.token().to_owned(); let raw_chat_id = chat_id.0; let draft_id: i64 = 1; let mut use_draft = is_private; // sendMessageDraft only works in private chats let mut msg_id: Option = None; let mut accumulated = String::new(); let mut last_edit = Instant::now(); let mut buffer = String::new(); let mut done = false; // tool call accumulation let mut tool_calls: Vec = Vec::new(); let mut has_tool_calls = false; while let Some(chunk) = resp.chunk().await? { if done { break; } buffer.push_str(&String::from_utf8_lossy(&chunk)); while let Some(pos) = buffer.find('\n') { let line = buffer[..pos].to_string(); buffer = buffer[pos + 1..].to_string(); let trimmed = line.trim(); if trimmed.is_empty() || trimmed.starts_with(':') { continue; } let data = match trimmed.strip_prefix("data: ") { Some(d) => d, None => continue, }; if data.trim() == "[DONE]" { done = true; break; } if let Ok(json) = serde_json::from_str::(data) { let delta = &json["choices"][0]["delta"]; // handle content delta if let Some(content) = delta["content"].as_str() { if !content.is_empty() { accumulated.push_str(content); } } // handle tool call delta if let Some(tc_arr) = delta["tool_calls"].as_array() { has_tool_calls = true; for tc in tc_arr { let idx = tc["index"].as_u64().unwrap_or(0) as usize; while tool_calls.len() <= idx { tool_calls.push(ToolCall { id: String::new(), name: String::new(), arguments: String::new(), }); } if let Some(id) = tc["id"].as_str() { tool_calls[idx].id = id.to_string(); } if let Some(name) = tc["function"]["name"].as_str() { tool_calls[idx].name = name.to_string(); } if let Some(args) = tc["function"]["arguments"].as_str() { tool_calls[idx].arguments.push_str(args); } } } // display update (only when there's content to show) if accumulated.is_empty() { continue; } { let interval = if use_draft { DRAFT_INTERVAL_MS } else { EDIT_INTERVAL_MS }; if last_edit.elapsed().as_millis() < interval as u128 { continue; } let display = if use_draft { truncate_at_char_boundary(&accumulated, TG_MSG_LIMIT).to_string() } else { truncate_for_display(&accumulated) }; if use_draft { match send_message_draft( &client, &token, raw_chat_id, draft_id, &display, ) .await { Ok(_) => { last_edit = Instant::now(); } Err(e) => { warn!("sendMessageDraft failed, falling back: {e:#}"); use_draft = false; if let Ok(sent) = bot.send_message(chat_id, &display).await { msg_id = Some(sent.id); last_edit = Instant::now(); } } } } else if let Some(id) = msg_id { if bot .edit_message_text(chat_id, id, &display) .await .is_ok() { last_edit = Instant::now(); } } else if let Ok(sent) = bot.send_message(chat_id, &display).await { msg_id = Some(sent.id); last_edit = Instant::now(); } } // end display block } } } // decide what to do based on response type if has_tool_calls && !tool_calls.is_empty() { // append assistant message with tool calls let tc_json: Vec = tool_calls .iter() .map(|tc| { serde_json::json!({ "id": tc.id, "type": "function", "function": { "name": tc.name, "arguments": tc.arguments, } }) }) .collect(); let assistant_msg = serde_json::json!({ "role": "assistant", "content": if accumulated.is_empty() { "" } else { &accumulated }, "tool_calls": tc_json, }); messages.push(assistant_msg); // execute each tool for tc in &tool_calls { info!(tool = %tc.name, "executing tool call"); let _ = bot .send_message(chat_id, format!("[{}({})]", tc.name, truncate_at_char_boundary(&tc.arguments, 100))) .await; let result = execute_tool(&tc.name, &tc.arguments, state, bot, chat_id, sid, config) .await; messages.push(serde_json::json!({ "role": "tool", "tool_call_id": tc.id, "content": result, })); } // clear display state for next round tool_calls.clear(); // loop back to call API again continue; } // content response — send final result if !accumulated.is_empty() { send_final_result(bot, chat_id, msg_id, use_draft, &accumulated).await; } return Ok(accumulated); } } // ── claude bridge (streaming) ─────────────────────────────────────── /// Stream JSON event types we care about. #[derive(Deserialize)] struct StreamEvent { #[serde(rename = "type")] event_type: String, message: Option, result: Option, #[serde(default)] is_error: bool, } #[derive(Deserialize)] struct AssistantMessage { content: Vec, } #[derive(Deserialize)] struct ContentBlock { #[serde(rename = "type")] block_type: String, text: Option, name: Option, input: Option, } /// Extract all text from an assistant message's content blocks. fn extract_text(msg: &AssistantMessage) -> String { msg.content .iter() .filter(|b| b.block_type == "text") .filter_map(|b| b.text.as_deref()) .collect::>() .join("") } /// Extract tool use status line, e.g. "Bash: echo hello" fn extract_tool_use(msg: &AssistantMessage) -> Option { for block in &msg.content { if block.block_type == "tool_use" { let name = block.name.as_deref().unwrap_or("tool"); let detail = block .input .as_ref() .and_then(|v| { // try common fields: command, pattern, file_path, query v.get("command") .or(v.get("pattern")) .or(v.get("file_path")) .or(v.get("query")) .or(v.get("prompt")) .and_then(|s| s.as_str()) }) .unwrap_or(""); let detail_short = truncate_at_char_boundary(detail, 80); return Some(format!("{name}: {detail_short}")); } } None } const EDIT_INTERVAL_MS: u64 = 2000; const DRAFT_INTERVAL_MS: u64 = 1000; const TG_MSG_LIMIT: usize = 4096; async fn invoke_claude_streaming( sid: &str, prompt: &str, known: bool, bot: &Bot, chat_id: ChatId, ) -> Result { if known { return run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await; } match run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await { Ok(out) => { info!(%sid, "resumed existing session"); Ok(out) } Err(e) => { warn!(%sid, "resume failed ({e:#}), creating new session"); run_claude_streaming(&["--session-id", sid], prompt, bot, chat_id).await } } } async fn send_message_draft( client: &reqwest::Client, token: &str, chat_id: i64, draft_id: i64, text: &str, ) -> Result<()> { let url = format!("https://api.telegram.org/bot{token}/sendMessageDraft"); let resp = client .post(&url) .json(&serde_json::json!({ "chat_id": chat_id, "draft_id": draft_id, "text": text, })) .send() .await?; let body: serde_json::Value = resp.json().await?; if body["ok"].as_bool() != Some(true) { anyhow::bail!("sendMessageDraft: {}", body); } Ok(()) } async fn run_claude_streaming( extra_args: &[&str], prompt: &str, bot: &Bot, chat_id: ChatId, ) -> Result { let mut args: Vec<&str> = vec![ "--dangerously-skip-permissions", "-p", "--output-format", "stream-json", "--verbose", ]; args.extend(extra_args); args.push(prompt); let mut child = Command::new("claude") .args(&args) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn()?; let stdout = child.stdout.take().unwrap(); let mut lines = tokio::io::BufReader::new(stdout).lines(); // sendMessageDraft for native streaming, with editMessageText fallback let http = reqwest::Client::new(); let token = bot.token().to_owned(); let raw_chat_id = chat_id.0; let draft_id: i64 = 1; let mut use_draft = true; let mut msg_id: Option = None; let mut last_sent_text = String::new(); let mut last_edit = Instant::now(); let mut final_result = String::new(); let mut is_error = false; let mut tool_status = String::new(); while let Ok(Some(line)) = lines.next_line().await { let event: StreamEvent = match serde_json::from_str(&line) { Ok(e) => e, Err(_) => continue, }; match event.event_type.as_str() { "assistant" => { if let Some(amsg) = &event.message { // determine display content let (display_raw, new_text) = if let Some(status) = extract_tool_use(amsg) { tool_status = format!("[{status}]"); let d = if last_sent_text.is_empty() { tool_status.clone() } else { format!("{last_sent_text}\n\n{tool_status}") }; (d, None) } else { let text = extract_text(amsg); if text.is_empty() || text == last_sent_text { continue; } let interval = if use_draft { DRAFT_INTERVAL_MS } else { EDIT_INTERVAL_MS }; if last_edit.elapsed().as_millis() < interval as u128 { continue; } tool_status.clear(); (text.clone(), Some(text)) }; let display = if use_draft { // draft mode: no cursor — cursor breaks monotonic text growth truncate_at_char_boundary(&display_raw, TG_MSG_LIMIT).to_string() } else { truncate_for_display(&display_raw) }; if use_draft { match send_message_draft( &http, &token, raw_chat_id, draft_id, &display, ) .await { Ok(_) => { if let Some(t) = new_text { last_sent_text = t; } last_edit = Instant::now(); } Err(e) => { warn!("sendMessageDraft failed, falling back: {e:#}"); use_draft = false; if let Ok(sent) = bot.send_message(chat_id, &display).await { msg_id = Some(sent.id); if let Some(t) = new_text { last_sent_text = t; } last_edit = Instant::now(); } } } } else if let Some(id) = msg_id { if bot .edit_message_text(chat_id, id, &display) .await .is_ok() { if let Some(t) = new_text { last_sent_text = t; } last_edit = Instant::now(); } } else if let Ok(sent) = bot.send_message(chat_id, &display).await { msg_id = Some(sent.id); if let Some(t) = new_text { last_sent_text = t; } last_edit = Instant::now(); } } } "result" => { final_result = event.result.unwrap_or_default(); is_error = event.is_error; } _ => {} } } // read stderr before waiting (in case child already exited) let stderr_handle = child.stderr.take(); let status = child.wait().await; // collect stderr for diagnostics let stderr_text = if let Some(mut se) = stderr_handle { let mut buf = String::new(); let _ = tokio::io::AsyncReadExt::read_to_string(&mut se, &mut buf).await; buf } else { String::new() }; // determine error: explicit is_error from stream, or non-zero exit with no result let has_error = is_error || (final_result.is_empty() && status.as_ref().map(|s| !s.success()).unwrap_or(true)); if has_error { let err_detail = if !final_result.is_empty() { final_result.clone() } else if !stderr_text.is_empty() { stderr_text.trim().to_string() } else { format!("claude exited: {:?}", status) }; if !use_draft { if let Some(id) = msg_id { let _ = bot .edit_message_text(chat_id, id, format!("[error] {err_detail}")) .await; } } anyhow::bail!("{err_detail}"); } if final_result.is_empty() { return Ok(final_result); } send_final_result(bot, chat_id, msg_id, use_draft, &final_result).await; Ok(final_result) } // ── openai-compatible backend (streaming) ────────────────────────── async fn run_openai_streaming( endpoint: &str, model: &str, api_key: &str, messages: &[serde_json::Value], bot: &Bot, chat_id: ChatId, ) -> Result { let client = reqwest::Client::new(); let url = format!("{}/chat/completions", endpoint.trim_end_matches('/')); let body = serde_json::json!({ "model": model, "messages": messages, "stream": true, }); let mut resp = client .post(&url) .header("Authorization", format!("Bearer {api_key}")) .json(&body) .send() .await? .error_for_status()?; let token = bot.token().to_owned(); let raw_chat_id = chat_id.0; let draft_id: i64 = 1; let mut use_draft = true; let mut msg_id: Option = None; let mut accumulated = String::new(); let mut last_edit = Instant::now(); let mut buffer = String::new(); let mut done = false; while let Some(chunk) = resp.chunk().await? { if done { break; } buffer.push_str(&String::from_utf8_lossy(&chunk)); while let Some(pos) = buffer.find('\n') { let line = buffer[..pos].to_string(); buffer = buffer[pos + 1..].to_string(); let trimmed = line.trim(); if trimmed.is_empty() || trimmed.starts_with(':') { continue; } let data = match trimmed.strip_prefix("data: ") { Some(d) => d, None => continue, }; if data.trim() == "[DONE]" { done = true; break; } if let Ok(json) = serde_json::from_str::(data) { if let Some(content) = json["choices"][0]["delta"]["content"].as_str() { if content.is_empty() { continue; } accumulated.push_str(content); let interval = if use_draft { DRAFT_INTERVAL_MS } else { EDIT_INTERVAL_MS }; if last_edit.elapsed().as_millis() < interval as u128 { continue; } let display = if use_draft { truncate_at_char_boundary(&accumulated, TG_MSG_LIMIT).to_string() } else { truncate_for_display(&accumulated) }; if use_draft { match send_message_draft( &client, &token, raw_chat_id, draft_id, &display, ) .await { Ok(_) => { last_edit = Instant::now(); } Err(e) => { warn!("sendMessageDraft failed, falling back: {e:#}"); use_draft = false; if let Ok(sent) = bot.send_message(chat_id, &display).await { msg_id = Some(sent.id); last_edit = Instant::now(); } } } } else if let Some(id) = msg_id { if bot.edit_message_text(chat_id, id, &display).await.is_ok() { last_edit = Instant::now(); } } else if let Ok(sent) = bot.send_message(chat_id, &display).await { msg_id = Some(sent.id); last_edit = Instant::now(); } } } } } if accumulated.is_empty() { return Ok(accumulated); } send_final_result(bot, chat_id, msg_id, use_draft, &accumulated).await; Ok(accumulated) } const CURSOR: &str = " \u{25CE}"; fn truncate_for_display(s: &str) -> String { let budget = TG_MSG_LIMIT - CURSOR.len() - 1; if s.len() <= budget { format!("{s}{CURSOR}") } else { let truncated = truncate_at_char_boundary(s, budget - 2); format!("{truncated}\n…{CURSOR}") } } fn truncate_at_char_boundary(s: &str, max: usize) -> &str { if s.len() <= max { return s; } let mut end = max; while !s.is_char_boundary(end) { end -= 1; } &s[..end] } fn escape_html(s: &str) -> String { s.replace('&', "&") .replace('<', "<") .replace('>', ">") .replace('"', """) } fn markdown_to_telegram_html(md: &str) -> String { use pulldown_cmark::{CodeBlockKind, Event, Options, Parser, Tag, TagEnd}; let mut opts = Options::empty(); opts.insert(Options::ENABLE_STRIKETHROUGH); let parser = Parser::new_ext(md, opts); let mut html = String::new(); for event in parser { match event { Event::Start(tag) => match tag { Tag::Paragraph => {} Tag::Heading { .. } => html.push_str(""), Tag::BlockQuote(_) => html.push_str("
"), Tag::CodeBlock(kind) => match kind { CodeBlockKind::Fenced(ref lang) if !lang.is_empty() => { html.push_str(&format!( "
",
                            escape_html(lang.as_ref())
                        ));
                    }
                    _ => html.push_str("
"),
                },
                Tag::Item => html.push_str("• "),
                Tag::Emphasis => html.push_str(""),
                Tag::Strong => html.push_str(""),
                Tag::Strikethrough => html.push_str(""),
                Tag::Link { dest_url, .. } => {
                    html.push_str(&format!(
                        "",
                        escape_html(dest_url.as_ref())
                    ));
                }
                _ => {}
            },
            Event::End(tag) => match tag {
                TagEnd::Paragraph => html.push_str("\n\n"),
                TagEnd::Heading(_) => html.push_str("\n\n"),
                TagEnd::BlockQuote(_) => html.push_str("
"), TagEnd::CodeBlock => html.push_str("\n\n"), TagEnd::List(_) => html.push('\n'), TagEnd::Item => html.push('\n'), TagEnd::Emphasis => html.push_str(""), TagEnd::Strong => html.push_str("
"), TagEnd::Strikethrough => html.push_str(""), TagEnd::Link => html.push_str(""), _ => {} }, Event::Text(text) => html.push_str(&escape_html(text.as_ref())), Event::Code(text) => { html.push_str(""); html.push_str(&escape_html(text.as_ref())); html.push_str(""); } Event::SoftBreak | Event::HardBreak => html.push('\n'), Event::Rule => html.push_str("\n---\n\n"), _ => {} } } html.trim_end().to_string() } /// Send final result with HTML formatting, fallback to plain text on failure. async fn send_final_result( bot: &Bot, chat_id: ChatId, msg_id: Option, use_draft: bool, result: &str, ) { use teloxide::types::ParseMode; let html = markdown_to_telegram_html(result); // try HTML as single message let html_ok = if let (false, Some(id)) = (use_draft, msg_id) { bot.edit_message_text(chat_id, id, &html) .parse_mode(ParseMode::Html) .await .is_ok() } else { bot.send_message(chat_id, &html) .parse_mode(ParseMode::Html) .await .is_ok() }; if html_ok { return; } // fallback: plain text with chunking let chunks = split_msg(result, TG_MSG_LIMIT); if let (false, Some(id)) = (use_draft, msg_id) { let _ = bot.edit_message_text(chat_id, id, chunks[0]).await; for chunk in &chunks[1..] { let _ = bot.send_message(chat_id, *chunk).await; } } else { for chunk in &chunks { let _ = bot.send_message(chat_id, *chunk).await; } } } fn split_msg(s: &str, max: usize) -> Vec<&str> { if s.len() <= max { return vec![s]; } let mut parts = Vec::new(); let mut rest = s; while !rest.is_empty() { if rest.len() <= max { parts.push(rest); break; } let mut end = max; while !rest.is_char_boundary(end) { end -= 1; } let (chunk, tail) = rest.split_at(end); parts.push(chunk); rest = tail; } parts }