use std::path::{Path, PathBuf}; use std::process::Stdio; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::Result; use teloxide::prelude::*; use teloxide::types::InputFile; use tokio::io::AsyncBufReadExt; use tokio::process::Command; use tokio::sync::RwLock; use tracing::{error, info, warn}; use crate::config::{BackendConfig, Config}; use crate::display::truncate_at_char_boundary; use crate::state::AppState; use crate::stream::{build_system_prompt, run_openai_streaming}; // ── subagent & tool call ─────────────────────────────────────────── pub struct SubAgent { pub task: String, pub output: Arc>, pub completed: Arc, pub exit_code: Arc>>, pub pid: Option, } pub struct ToolCall { pub id: String, pub name: String, pub arguments: String, } pub fn tools_dir() -> PathBuf { // tools/ relative to the config file location let config_path = std::env::var("NOC_CONFIG").unwrap_or_else(|_| "config.yaml".into()); let config_dir = Path::new(&config_path) .parent() .unwrap_or(Path::new(".")); config_dir.join("tools") } /// Scan tools/ directory for scripts with --schema, merge with built-in tools. /// Called on every API request so new/updated scripts take effect immediately. pub fn discover_tools() -> serde_json::Value { let mut tools = vec![ serde_json::json!({ "type": "function", "function": { "name": "spawn_agent", "description": "启动一个 Claude Code 子代理异步执行复杂任务。子代理可使用 shell、浏览器和搜索引擎,适合网页搜索、资料查找、技术调研、代码任务等。完成后会收到通知。", "parameters": { "type": "object", "properties": { "id": {"type": "string", "description": "简短唯一标识符(如 'research'、'fix-bug')"}, "task": {"type": "string", "description": "给子代理的详细任务描述"} }, "required": ["id", "task"] } } }), serde_json::json!({ "type": "function", "function": { "name": "agent_status", "description": "查看正在运行或已完成的子代理的状态和输出", "parameters": { "type": "object", "properties": { "id": {"type": "string", "description": "子代理标识符"} }, "required": ["id"] } } }), serde_json::json!({ "type": "function", "function": { "name": "kill_agent", "description": "终止一个正在运行的子代理", "parameters": { "type": "object", "properties": { "id": {"type": "string", "description": "子代理标识符"} }, "required": ["id"] } } }), serde_json::json!({ "type": "function", "function": { "name": "send_file", "description": "通过 Telegram 向用户发送服务器上的文件,文件必须存在于服务器文件系统中。", "parameters": { "type": "object", "properties": { "path": {"type": "string", "description": "服务器上文件的绝对路径"}, "caption": {"type": "string", "description": "可选的文件说明/描述"} }, "required": ["path"] } } }), serde_json::json!({ "type": "function", "function": { "name": "update_inner_state", "description": "更新你的内在状态。这是你自己的持续意识,跨会话保留,Life Loop 和对话都能看到。记录你对当前情况的理解、正在跟踪的事、对 Fam 状态的感知等。", "parameters": { "type": "object", "properties": { "content": {"type": "string", "description": "完整的内在状态文本(替换之前的)"} }, "required": ["content"] } } }), serde_json::json!({ "type": "function", "function": { "name": "update_scratch", "description": "更新你的草稿区(工作笔记、状态、提醒)。草稿区内容会附加到每条用户消息中,确保你始终可见。用于跨轮次跟踪上下文。", "parameters": { "type": "object", "properties": { "content": {"type": "string", "description": "完整的草稿区内容(替换之前的内容)"} }, "required": ["content"] } } }), serde_json::json!({ "type": "function", "function": { "name": "set_timer", "description": "Set a timer that will fire in the future. Supports: '5min'/'2h' (relative), 'once:2026-04-10 09:00' (absolute), 'cron:0 8 * * *' (recurring). When fired, you'll receive the label as a prompt.", "parameters": { "type": "object", "properties": { "schedule": {"type": "string", "description": "Timer schedule: e.g. '5min', '1h', 'once:2026-04-10 09:00', 'cron:30 8 * * *'"}, "label": {"type": "string", "description": "What this timer is for — this text will be sent to you when it fires"} }, "required": ["schedule", "label"] } } }), serde_json::json!({ "type": "function", "function": { "name": "list_timers", "description": "List all active timers", "parameters": { "type": "object", "properties": {}, } } }), serde_json::json!({ "type": "function", "function": { "name": "cancel_timer", "description": "Cancel a timer by ID", "parameters": { "type": "object", "properties": { "timer_id": {"type": "integer", "description": "Timer ID from list_timers"} }, "required": ["timer_id"] } } }), serde_json::json!({ "type": "function", "function": { "name": "update_memory", "description": "写入持久记忆槽。共 100 个槽位(0-99),跨会话保留。记忆槽内容会注入到每次对话的 system prompt 中。用于存储关键事实、用户偏好或重要上下文。内容设为空字符串可清除槽位。", "parameters": { "type": "object", "properties": { "slot_nr": {"type": "integer", "description": "槽位编号(0-99)"}, "content": {"type": "string", "description": "要存储的内容(最多200字符),空字符串表示清除该槽位"} }, "required": ["slot_nr", "content"] } } }), serde_json::json!({ "type": "function", "function": { "name": "gen_voice", "description": "将文字合成为语音并直接发送给用户。", "parameters": { "type": "object", "properties": { "text": {"type": "string", "description": "要合成语音的文字内容"} }, "required": ["text"] } } }), ]; // discover script tools let dir = tools_dir(); if let Ok(entries) = std::fs::read_dir(&dir) { for entry in entries.flatten() { let path = entry.path(); if !path.is_file() { continue; } // run --schema with a short timeout let output = std::process::Command::new(&path) .arg("--schema") .output(); match output { Ok(out) if out.status.success() => { let stdout = String::from_utf8_lossy(&out.stdout); match serde_json::from_str::(stdout.trim()) { Ok(schema) => { let name = schema["name"].as_str().unwrap_or("?"); info!(tool = %name, path = %path.display(), "discovered script tool"); tools.push(serde_json::json!({ "type": "function", "function": schema, })); } Err(e) => { warn!(path = %path.display(), "invalid --schema JSON: {e}"); } } } _ => {} // not a tool script, skip silently } } } serde_json::Value::Array(tools) } // ── tool execution ───────────────────────────────────────────────── pub async fn execute_tool( name: &str, arguments: &str, state: &Arc, bot: &Bot, chat_id: ChatId, sid: &str, config: &Arc, ) -> String { let args: serde_json::Value = match serde_json::from_str(arguments) { Ok(v) => v, Err(e) => return format!("Invalid arguments: {e}"), }; match name { "spawn_agent" => { let id = args["id"].as_str().unwrap_or("agent"); let task = args["task"].as_str().unwrap_or(""); spawn_agent(id, task, state, bot, chat_id, sid, config).await } "agent_status" => { let id = args["id"].as_str().unwrap_or(""); check_agent_status(id, state).await } "kill_agent" => { let id = args["id"].as_str().unwrap_or(""); kill_agent(id, state).await } "send_file" => { let path_str = args["path"].as_str().unwrap_or(""); let caption = args["caption"].as_str().unwrap_or(""); let path = Path::new(path_str); if !path.exists() { return format!("File not found: {path_str}"); } if !path.is_file() { return format!("Not a file: {path_str}"); } let input_file = InputFile::file(path); let mut req = bot.send_document(chat_id, input_file); if !caption.is_empty() { req = req.caption(caption); } match req.await { Ok(_) => format!("File sent: {path_str}"), Err(e) => format!("Failed to send file: {e:#}"), } } "update_inner_state" => { let content = args["content"].as_str().unwrap_or(""); state.set_inner_state(content).await; format!("Inner state updated ({} chars)", content.len()) } "update_scratch" => { let content = args["content"].as_str().unwrap_or(""); state.push_scratch(content).await; format!("Scratch updated ({} chars)", content.len()) } "set_timer" => { let schedule = args["schedule"].as_str().unwrap_or(""); let label = args["label"].as_str().unwrap_or(""); match parse_next_fire(schedule) { Ok(next) => { let next_str = next.format("%Y-%m-%d %H:%M:%S").to_string(); let id = state .add_timer(chat_id.0, label, schedule, &next_str) .await; format!("Timer #{id} set: \"{label}\" → next fire at {next_str}") } Err(e) => format!("Invalid schedule '{schedule}': {e}"), } } "list_timers" => { let timers = state.list_timers(Some(chat_id.0)).await; if timers.is_empty() { "No active timers.".to_string() } else { timers .iter() .map(|(id, _, label, sched, next, enabled)| { let status = if *enabled { "" } else { " [disabled]" }; format!("#{id}: \"{label}\" ({sched}) → {next}{status}") }) .collect::>() .join("\n") } } "cancel_timer" => { let tid = args["timer_id"].as_i64().unwrap_or(0); if state.cancel_timer(tid).await { format!("Timer #{tid} cancelled") } else { format!("Timer #{tid} not found") } } "update_memory" => { let slot_nr = args["slot_nr"].as_i64().unwrap_or(-1) as i32; let content = args["content"].as_str().unwrap_or(""); match state.set_memory_slot(slot_nr, content).await { Ok(_) => { if content.is_empty() { format!("Memory slot {slot_nr} cleared") } else { format!("Memory slot {slot_nr} updated ({} chars)", content.len()) } } Err(e) => format!("Error: {e}"), } } "gen_voice" => { let text = args["text"].as_str().unwrap_or(""); if text.is_empty() { return "Error: text is required".to_string(); } let script = tools_dir().join("gen_voice"); let result = tokio::time::timeout( std::time::Duration::from_secs(120), tokio::process::Command::new(&script) .arg(arguments) .output(), ) .await; match result { Ok(Ok(out)) if out.status.success() => { let path_str = String::from_utf8_lossy(&out.stdout).trim().to_string(); let path = Path::new(&path_str); if path.exists() { let input_file = InputFile::file(path); match bot.send_voice(chat_id, input_file).await { Ok(_) => format!("语音已发送: {path_str}"), Err(e) => format!("语音生成成功但发送失败: {e:#}"), } } else { format!("语音生成失败: 输出文件不存在 ({path_str})") } } Ok(Ok(out)) => { let stderr = String::from_utf8_lossy(&out.stderr); let stdout = String::from_utf8_lossy(&out.stdout); format!("gen_voice failed: {stdout} {stderr}") } Ok(Err(e)) => format!("gen_voice exec error: {e}"), Err(_) => "gen_voice timeout (120s)".to_string(), } } _ => run_script_tool(name, arguments).await, } } pub async fn spawn_agent( id: &str, task: &str, state: &Arc, bot: &Bot, chat_id: ChatId, sid: &str, config: &Arc, ) -> String { // check if already exists if state.agents.read().await.contains_key(id) { return format!("Agent '{id}' already exists. Use agent_status to check it."); } let mut child = match Command::new("claude") .args(["--dangerously-skip-permissions", "-p", task]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() { Ok(c) => c, Err(e) => return format!("Failed to spawn agent: {e}"), }; let pid = child.id(); let output = Arc::new(tokio::sync::RwLock::new(String::new())); let completed = Arc::new(AtomicBool::new(false)); let exit_code = Arc::new(tokio::sync::RwLock::new(None)); let agent = Arc::new(SubAgent { task: task.to_string(), output: output.clone(), completed: completed.clone(), exit_code: exit_code.clone(), pid, }); state.agents.write().await.insert(id.to_string(), agent); // background task: collect output and wakeup on completion let out = output.clone(); let done = completed.clone(); let ecode = exit_code.clone(); let bot_c = bot.clone(); let chat_id_c = chat_id; let state_c = state.clone(); let config_c = config.clone(); let sid_c = sid.to_string(); let id_c = id.to_string(); tokio::spawn(async move { let stdout = child.stdout.take(); if let Some(stdout) = stdout { let mut lines = tokio::io::BufReader::new(stdout).lines(); while let Ok(Some(line)) = lines.next_line().await { let mut o = out.write().await; o.push_str(&line); o.push('\n'); } } let status = child.wait().await; let code = status.as_ref().ok().and_then(|s| s.code()); *ecode.write().await = code; done.store(true, Ordering::SeqCst); info!(agent = %id_c, "agent completed, exit={code:?}"); // wakeup: inject result and trigger LLM let result = out.read().await.clone(); let result_short = truncate_at_char_boundary(&result, 4000); let wakeup = format!( "[Agent '{id_c}' 执行完成 (exit={})]\n{result_short}", code.unwrap_or(-1) ); if let Err(e) = agent_wakeup( &config_c, &state_c, &bot_c, chat_id_c, &sid_c, &wakeup, &id_c, ) .await { error!(agent = %id_c, "wakeup failed: {e:#}"); let _ = bot_c .send_message(chat_id_c, format!("[agent wakeup error] {e:#}")) .await; } }); format!("Agent '{id}' spawned (pid={pid:?})") } pub async fn agent_wakeup( config: &Config, state: &AppState, bot: &Bot, chat_id: ChatId, sid: &str, wakeup_msg: &str, agent_id: &str, ) -> Result<()> { match &config.backend { BackendConfig::OpenAI { endpoint, model, api_key, } => { state.push_message(sid, "user", wakeup_msg).await; let conv = state.load_conv(sid).await; let persona = state.get_config("persona").await.unwrap_or_default(); let memory_slots = state.get_memory_slots().await; let inner = state.get_inner_state().await; let system_msg = build_system_prompt(&conv.summary, &persona, &memory_slots, &inner); let mut api_messages = vec![system_msg]; api_messages.extend(conv.messages); info!(agent = %agent_id, "wakeup: sending {} messages to LLM", api_messages.len()); let response = run_openai_streaming(endpoint, model, api_key, &api_messages, bot, chat_id) .await?; if !response.is_empty() { state.push_message(sid, "assistant", &response).await; } Ok(()) } _ => { let _ = bot .send_message(chat_id, format!("[Agent '{agent_id}' done]\n{wakeup_msg}")) .await; Ok(()) } } } pub async fn check_agent_status(id: &str, state: &AppState) -> String { let agents = state.agents.read().await; match agents.get(id) { Some(agent) => { let status = if agent.completed.load(Ordering::SeqCst) { let code = agent.exit_code.read().await; format!("completed (exit={})", code.unwrap_or(-1)) } else { "running".to_string() }; let output = agent.output.read().await; let out_preview = truncate_at_char_boundary(&output, 3000); format!( "Agent '{id}': {status}\nTask: {}\nOutput ({} bytes):\n{out_preview}", agent.task, output.len() ) } None => format!("Agent '{id}' not found"), } } pub async fn kill_agent(id: &str, state: &AppState) -> String { let agents = state.agents.read().await; match agents.get(id) { Some(agent) => { if agent.completed.load(Ordering::SeqCst) { return format!("Agent '{id}' already completed"); } if let Some(pid) = agent.pid { unsafe { libc::kill(pid as i32, libc::SIGTERM); } format!("Sent SIGTERM to agent '{id}' (pid={pid})") } else { format!("Agent '{id}' has no PID") } } None => format!("Agent '{id}' not found"), } } pub async fn run_script_tool(name: &str, arguments: &str) -> String { // find script in tools/ that matches this tool name let dir = tools_dir(); let entries = match std::fs::read_dir(&dir) { Ok(e) => e, Err(_) => return format!("Unknown tool: {name}"), }; for entry in entries.flatten() { let path = entry.path(); if !path.is_file() { continue; } // check if this script provides the requested tool let schema_out = std::process::Command::new(&path) .arg("--schema") .output(); if let Ok(out) = schema_out { if out.status.success() { let stdout = String::from_utf8_lossy(&out.stdout); if let Ok(schema) = serde_json::from_str::(stdout.trim()) { if schema["name"].as_str() == Some(name) { // found it — execute info!(tool = %name, path = %path.display(), "running script tool"); let result = tokio::time::timeout( std::time::Duration::from_secs(60), Command::new(&path).arg(arguments).output(), ) .await; return match result { Ok(Ok(output)) => { let mut s = String::from_utf8_lossy(&output.stdout).to_string(); let stderr = String::from_utf8_lossy(&output.stderr); if !stderr.is_empty() { if !s.is_empty() { s.push_str("\n[stderr]\n"); } s.push_str(&stderr); } if s.is_empty() { format!("(exit={})", output.status.code().unwrap_or(-1)) } else { s } } Ok(Err(e)) => format!("Failed to execute {name}: {e}"), Err(_) => "Timeout after 60s".to_string(), }; } } } } } format!("Unknown tool: {name}") } // ── schedule parsing ─────────────────────────────────────────────── pub fn parse_next_fire(schedule: &str) -> Result> { let now = chrono::Local::now(); // relative: "5min", "2h", "30s", "1d" if let Some(val) = schedule .strip_suffix("min") .or_else(|| schedule.strip_suffix("m")) { let mins: i64 = val.trim().parse().map_err(|e| anyhow::anyhow!("{e}"))?; return Ok(now + chrono::Duration::minutes(mins)); } if let Some(val) = schedule.strip_suffix('h') { let hours: i64 = val.trim().parse().map_err(|e| anyhow::anyhow!("{e}"))?; return Ok(now + chrono::Duration::hours(hours)); } if let Some(val) = schedule.strip_suffix('s') { let secs: i64 = val.trim().parse().map_err(|e| anyhow::anyhow!("{e}"))?; return Ok(now + chrono::Duration::seconds(secs)); } if let Some(val) = schedule.strip_suffix('d') { let days: i64 = val.trim().parse().map_err(|e| anyhow::anyhow!("{e}"))?; return Ok(now + chrono::Duration::days(days)); } // absolute: "once:2026-04-10 09:00" if let Some(dt_str) = schedule.strip_prefix("once:") { let dt = chrono::NaiveDateTime::parse_from_str(dt_str.trim(), "%Y-%m-%d %H:%M") .or_else(|_| { chrono::NaiveDateTime::parse_from_str(dt_str.trim(), "%Y-%m-%d %H:%M:%S") }) .map_err(|e| anyhow::anyhow!("parse datetime: {e}"))?; return Ok(dt.and_local_timezone(chrono::Local).unwrap()); } // cron: "cron:30 8 * * *" if let Some(expr) = schedule.strip_prefix("cron:") { let cron_schedule = expr .trim() .parse::() .map_err(|e| anyhow::anyhow!("parse cron: {e}"))?; let next = cron_schedule .upcoming(chrono::Local) .next() .ok_or_else(|| anyhow::anyhow!("no upcoming time for cron"))?; return Ok(next); } anyhow::bail!("unknown schedule format: {schedule}") } pub fn compute_next_cron_fire(schedule: &str) -> Option { let expr = schedule.strip_prefix("cron:")?; let cron_schedule = expr.trim().parse::().ok()?; let next = cron_schedule.upcoming(chrono::Local).next()?; Some(next.format("%Y-%m-%d %H:%M:%S").to_string()) }