From 55e9b2f50f8ccc222e0db0110adf94e3dee50047 Mon Sep 17 00:00:00 2001 From: Fam Zheng Date: Sat, 11 Apr 2026 09:31:48 +0100 Subject: [PATCH] persistent auth in SQLite, API chat/logs, agent completion via channel - Auth: move from state.json to SQLite authed_chats table, with memory cache - Remove Persistent/state.json, all state now in noc.db - HTTP API: POST /api/chat (end-to-end LLM), GET /api/logs (failed API requests) - API logging: store raw request/response for 400 errors in api_log table - Agent completion: spawn_agent sends LifeEvent::AgentDone via channel, life loop picks up with full conversation context and responds - Config structs: derive Clone for HTTP server - System prompt: instruct LLM not to add timestamps - Makefile: rsync without --delete to preserve VPS-only tools --- .gitignore | 1 + Makefile | 2 +- noc.service.in | 1 - src/config.rs | 8 ++-- src/http.rs | 88 +++++++++++++++++++++++++++++++++++++++++--- src/life.rs | 49 +++++++++++++++++++++++++ src/main.rs | 31 +++++----------- src/state.rs | 93 ++++++++++++++++++++++++++++++----------------- src/stream.rs | 6 ++- src/tools.rs | 20 ++++++++-- tools/manage_todo | 2 +- 11 files changed, 230 insertions(+), 71 deletions(-) diff --git a/.gitignore b/.gitignore index 2b19974..2b4f214 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ state.*.json target/ data/ noc.service +tools/manage_todo diff --git a/Makefile b/Makefile index 337a9ed..8454bcc 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,7 @@ deploy: test build scp target/release/noc $(SUITE):~/bin/ scp config.suite.yaml $(SUITE):/data/noc/config.yaml scp noc.service.in $(SUITE):/data/noc/ - scp -r tools/ $(SUITE):/data/noc/tools/ + rsync -a tools/ $(SUITE):/data/noc/tools/ ssh $(SUITE) 'bash -lc "\ cd /data/noc \ && sed -e \"s|@REPO@|/data/noc|g\" -e \"s|@PATH@|\$$PATH|g\" noc.service.in > ~/.config/systemd/user/noc.service \ diff --git a/noc.service.in b/noc.service.in index b6a7c0e..dfd0576 100644 --- a/noc.service.in +++ b/noc.service.in @@ -12,7 +12,6 @@ RestartSec=5 Environment=RUST_LOG=noc=info Environment=RUST_BACKTRACE=1 Environment=NOC_CONFIG=@REPO@/config.yaml -Environment=NOC_STATE=@REPO@/state.json Environment=PATH=@PATH@ [Install] diff --git a/src/config.rs b/src/config.rs index 1a85c23..083501b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,6 @@ use serde::Deserialize; -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct Config { #[serde(default = "default_name")] pub name: String, @@ -71,17 +71,17 @@ fn default_api_key() -> String { "unused".to_string() } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct TgConfig { pub key: String, } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct AuthConfig { pub passphrase: String, } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct SessionConfig { pub refresh_hour: u32, } diff --git a/src/http.rs b/src/http.rs index 2949192..ad45d7f 100644 --- a/src/http.rs +++ b/src/http.rs @@ -8,13 +8,16 @@ use axum::Json; use tokio::sync::mpsc; use tracing::{error, info}; -use crate::config::Config; +use crate::config::{BackendConfig, Config}; use crate::life::LifeEvent; +use crate::output::BufferOutput; use crate::state::AppState; +use crate::stream::{build_system_prompt, run_openai_with_tools}; #[derive(Clone)] pub struct HttpState { pub app_state: Arc, + pub config: Arc, pub life_tx: mpsc::Sender, } @@ -29,20 +32,28 @@ pub async fn start_http_server( .map(|g| g.webhook_port) .unwrap_or(9880); + let config = Arc::new(config.clone()); let state = Arc::new(HttpState { app_state, + config, life_tx, }); + // merge gitea webhook router if configured + let gitea_router = state.config.gitea.as_ref().map(|gitea_config| { + let bot_user = std::env::var("GITEA_ADMIN_USER").unwrap_or_else(|_| "noc".into()); + crate::gitea::webhook_router(gitea_config, bot_user) + }); + let mut app = axum::Router::new() .route("/api/timers", get(list_timers)) .route("/api/timers/{id}/fire", post(fire_timer)) + .route("/api/chat", post(api_chat)) + .route("/api/logs", get(api_logs)) .with_state(state); - // merge gitea webhook router if configured - if let Some(gitea_config) = &config.gitea { - let bot_user = std::env::var("GITEA_ADMIN_USER").unwrap_or_else(|_| "noc".into()); - app = app.merge(crate::gitea::webhook_router(gitea_config, bot_user)); + if let Some(router) = gitea_router { + app = app.merge(router); } let addr = format!("0.0.0.0:{port}"); @@ -75,6 +86,73 @@ async fn list_timers(AxumState(state): AxumState>) -> impl IntoRe Json(serde_json::json!(items)) } +async fn api_chat( + AxumState(state): AxumState>, + Json(payload): Json, +) -> impl IntoResponse { + let message = payload["message"].as_str().unwrap_or("").to_string(); + if message.is_empty() { + return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "message required"}))); + } + + let BackendConfig::OpenAI { + ref endpoint, + ref model, + ref api_key, + } = state.config.backend + else { + return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "no openai backend"}))); + }; + + let persona = state.app_state.get_config("persona").await.unwrap_or_default(); + let memory_slots = state.app_state.get_memory_slots().await; + let inner_state = state.app_state.get_inner_state().await; + + let system = build_system_prompt("", &persona, &memory_slots, &inner_state); + let messages = vec![ + system, + serde_json::json!({"role": "user", "content": message}), + ]; + + let sid = format!("api-{}", chrono::Local::now().timestamp()); + let mut output = BufferOutput::new(); + + info!("api chat: {}", &message[..message.len().min(100)]); + + match run_openai_with_tools( + endpoint, model, api_key, messages, &mut output, &state.app_state, &sid, &state.config, 0, + ) + .await + { + Ok(response) => (StatusCode::OK, Json(serde_json::json!({"response": response}))), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("{e:#}")}))), + } +} + +async fn api_logs( + AxumState(state): AxumState>, +) -> impl IntoResponse { + let db = state.app_state.db.lock().await; + let mut stmt = db + .prepare("SELECT id, session_id, status, length(request), length(response), created_at FROM api_log ORDER BY id DESC LIMIT 20") + .unwrap(); + let logs: Vec = stmt + .query_map([], |row| { + Ok(serde_json::json!({ + "id": row.get::<_, i64>(0)?, + "session_id": row.get::<_, String>(1)?, + "status": row.get::<_, i64>(2)?, + "request_len": row.get::<_, i64>(3)?, + "response_len": row.get::<_, i64>(4)?, + "created_at": row.get::<_, String>(5)?, + })) + }) + .unwrap() + .filter_map(|r| r.ok()) + .collect(); + Json(serde_json::json!(logs)) +} + async fn fire_timer( AxumState(state): AxumState>, Path(id): Path, diff --git a/src/life.rs b/src/life.rs index e5fc774..6c64439 100644 --- a/src/life.rs +++ b/src/life.rs @@ -19,6 +19,15 @@ const DIARY_SCHEDULE: &str = "cron:0 55 22 * * *"; pub enum LifeEvent { /// Force-fire a specific timer by ID. FireTimer(i64), + /// A sub-agent completed — feed result back through LLM. + AgentDone { + id: String, + chat_id: i64, + session_id: String, + task: String, + output: String, + exit_code: Option, + }, } pub async fn life_loop( @@ -54,6 +63,46 @@ pub async fn life_loop( warn!(timer_id = id, "force-fire: timer not found"); } } + LifeEvent::AgentDone { id, chat_id: cid, session_id, task, output, exit_code } => { + info!(agent = %id, session = %session_id, "agent done, notifying"); + let preview = crate::display::truncate_at_char_boundary(&output, 3000); + let notification = format!( + "[子代理 '{id}' 完成 (exit={exit_code:?})]\n任务: {task}\n输出:\n{preview}" + ); + + // load conversation context so LLM knows what was discussed + let conv = state.load_conv(&session_id).await; + let persona = state.get_config("persona").await.unwrap_or_default(); + let memory_slots = state.get_memory_slots().await; + let inner = state.get_inner_state().await; + + let system = crate::stream::build_system_prompt( + &conv.summary, &persona, &memory_slots, &inner, + ); + + let mut messages = vec![system]; + // include recent conversation history + messages.extend(conv.messages.iter().cloned()); + // append the agent completion as a new user message + messages.push(serde_json::json!({"role": "user", "content": notification})); + + if let BackendConfig::OpenAI { ref endpoint, ref model, ref api_key } = config.backend { + let chat_id_tg = ChatId(cid); + let sid = format!("agent-{id}"); + let mut tg_output; + let mut buf_output; + let out: &mut dyn crate::output::Output = if cid == 0 { + buf_output = BufferOutput::new(); + &mut buf_output + } else { + tg_output = TelegramOutput::new(bot.clone(), chat_id_tg, true); + &mut tg_output + }; + let _ = run_openai_with_tools( + endpoint, model, api_key, messages, out, &state, &sid, &config, cid, + ).await; + } + } } } } diff --git a/src/main.rs b/src/main.rs index 906625d..6763cfb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -77,10 +77,12 @@ async fn main() { gitea.resolve_token(); } - let state_path = std::env::var("NOC_STATE") - .map(PathBuf::from) - .unwrap_or_else(|_| PathBuf::from("state.json")); - let state = Arc::new(AppState::load(state_path)); + // channel: http/agents → life loop + let (life_tx, life_rx) = tokio::sync::mpsc::channel(16); + + let config_path = std::env::var("NOC_CONFIG").unwrap_or_else(|_| "config.yaml".into()); + let db_dir = Path::new(&config_path).parent().unwrap_or(Path::new(".")); + let state = Arc::new(AppState::load(db_dir, life_tx.clone())); let _ = std::fs::create_dir_all(incoming_dir()); @@ -93,18 +95,15 @@ async fn main() { let config = Arc::new(config); - // channel: http server → life loop - let (life_tx, life_rx) = tokio::sync::mpsc::channel(16); - // start life loop tokio::spawn(life::life_loop(bot.clone(), state.clone(), config.clone(), life_rx)); // start http server (API + gitea webhook) { - let srv_config = config.clone(); + let http_config = config.as_ref().clone(); let srv_state = state.clone(); tokio::spawn(async move { - http::start_http_server(&srv_config, srv_state, life_tx).await; + http::start_http_server(&http_config, srv_state, life_tx).await; }); } @@ -173,20 +172,10 @@ async fn handle( let is_private = msg.chat.is_private(); let text = msg.text().or(msg.caption()).unwrap_or("").to_string(); let raw_id = chat_id.0; - let date = session_date(config.session.refresh_hour); - let is_authed = { - let p = state.persist.read().await; - p.authed.get(&raw_id) == Some(&date) - }; - - if !is_authed { + if !state.is_authed(raw_id).await { if text.trim() == config.auth.passphrase { - { - let mut p = state.persist.write().await; - p.authed.insert(raw_id, date); - } - state.save().await; + state.set_authed(raw_id).await; bot.send_message(chat_id, "authenticated").await?; info!(chat = raw_id, "authed"); } else { diff --git a/src/state.rs b/src/state.rs index e2c1801..88d6d7f 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,24 +1,14 @@ use std::collections::{HashMap, HashSet}; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::sync::Arc; use anyhow::Result; -use chrono::NaiveDate; -use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; -use tracing::{error, info}; +use tracing::info; use crate::tools::SubAgent; -// ── persistent state ──────────────────────────────────────────────── - -#[derive(Serialize, Deserialize, Default)] -pub struct Persistent { - pub authed: HashMap, - pub known_sessions: HashSet, -} - -#[derive(Serialize, Deserialize, Clone, Default)] +#[derive(Clone, Default)] pub struct ConversationState { pub summary: String, pub messages: Vec, @@ -29,21 +19,15 @@ pub const MAX_WINDOW: usize = 100; pub const SLIDE_SIZE: usize = 50; pub struct AppState { - pub persist: RwLock, - pub state_path: PathBuf, pub db: tokio::sync::Mutex, pub agents: RwLock>>, + authed_cache: RwLock>, + pub life_tx: tokio::sync::mpsc::Sender, } impl AppState { - pub fn load(path: PathBuf) -> Self { - let persist = std::fs::read_to_string(&path) - .ok() - .and_then(|s| serde_json::from_str(&s).ok()) - .unwrap_or_default(); - info!("loaded state from {}", path.display()); - - let db_path = path.parent().unwrap_or(Path::new(".")).join("noc.db"); + pub fn load(db_dir: &Path, life_tx: tokio::sync::mpsc::Sender) -> Self { + let db_path = db_dir.join("noc.db"); let conn = rusqlite::Connection::open(&db_path) .unwrap_or_else(|e| panic!("open {}: {e}", db_path.display())); conn.execute_batch( @@ -97,6 +81,18 @@ impl AppState { content TEXT NOT NULL DEFAULT '' ); INSERT OR IGNORE INTO inner_state (id, content) VALUES (1, ''); + CREATE TABLE IF NOT EXISTS authed_chats ( + chat_id INTEGER PRIMARY KEY, + authed_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')) + ); + CREATE TABLE IF NOT EXISTS api_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL DEFAULT '', + request TEXT NOT NULL, + response TEXT NOT NULL DEFAULT '', + status INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')) + ); CREATE TABLE IF NOT EXISTS life_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, event TEXT NOT NULL, @@ -119,19 +115,10 @@ impl AppState { info!("opened db {}", db_path.display()); Self { - persist: RwLock::new(persist), - state_path: path, db: tokio::sync::Mutex::new(conn), agents: RwLock::new(HashMap::new()), - } - } - - pub async fn save(&self) { - let data = self.persist.read().await; - if let Ok(json) = serde_json::to_string_pretty(&*data) { - if let Err(e) = std::fs::write(&self.state_path, json) { - error!("save state: {e}"); - } + authed_cache: RwLock::new(HashSet::new()), + life_tx, } } @@ -267,6 +254,44 @@ impl AppState { ); } + pub async fn is_authed(&self, chat_id: i64) -> bool { + // check cache first + if self.authed_cache.read().await.contains(&chat_id) { + return true; + } + // cache miss → check DB + let db = self.db.lock().await; + let found: bool = db + .query_row( + "SELECT COUNT(*) > 0 FROM authed_chats WHERE chat_id = ?1", + rusqlite::params![chat_id], + |row| row.get(0), + ) + .unwrap_or(false); + drop(db); + if found { + self.authed_cache.write().await.insert(chat_id); + } + found + } + + pub async fn set_authed(&self, chat_id: i64) { + self.authed_cache.write().await.insert(chat_id); + let db = self.db.lock().await; + let _ = db.execute( + "INSERT OR IGNORE INTO authed_chats (chat_id) VALUES (?1)", + rusqlite::params![chat_id], + ); + } + + pub async fn log_api(&self, session_id: &str, request: &str, response: &str, status: u16) { + let db = self.db.lock().await; + let _ = db.execute( + "INSERT INTO api_log (session_id, request, response, status) VALUES (?1, ?2, ?3, ?4)", + rusqlite::params![session_id, request, response, status], + ); + } + pub async fn log_life(&self, event: &str, detail: &str) { let db = self.db.lock().await; let _ = db.execute( diff --git a/src/stream.rs b/src/stream.rs index 826c1d0..37543cf 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -81,6 +81,9 @@ pub async fn run_openai_with_tools( if !resp_raw.status().is_success() { let status = resp_raw.status(); let body_text = resp_raw.text().await.unwrap_or_default(); + // log failed API call + let req_json = serde_json::to_string(&body).unwrap_or_default(); + state.log_api(sid, &req_json, &body_text, status.as_u16()).await; for (i, m) in messages.iter().enumerate() { let role = m["role"].as_str().unwrap_or("?"); let content_len = m["content"].as_str().map(|s| s.len()).unwrap_or(0); @@ -233,7 +236,8 @@ pub fn build_system_prompt(summary: &str, persona: &str, memory_slots: &[(i32, S 当需要搜索信息(如网页搜索、资料查找、技术调研等)时,使用 spawn_agent 启动一个子代理来完成搜索任务,\ 子代理可以使用浏览器和搜索引擎,搜索完成后你会收到结果通知。\ 输出格式:使用纯文本或基础Markdown(加粗、列表、代码块)。\ - 不要使用LaTeX公式($...$)、特殊Unicode符号(→←↔)或HTML标签,Telegram无法渲染这些。", + 不要使用LaTeX公式($...$)、特殊Unicode符号(→←↔)或HTML标签,Telegram无法渲染这些。\ + 不要在回复开头加时间戳——用户消息前的时间戳是系统自动添加的,不需要你模仿。", ); if !memory_slots.is_empty() { diff --git a/src/tools.rs b/src/tools.rs index aa9a5cd..b090b4e 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -323,7 +323,7 @@ pub async fn execute_tool( "spawn_agent" => { let id = args["id"].as_str().unwrap_or("agent"); let task = args["task"].as_str().unwrap_or(""); - spawn_agent(id, task, state, output, sid, config).await + spawn_agent(id, task, state, output, sid, config, chat_id).await } "agent_status" => { let id = args["id"].as_str().unwrap_or(""); @@ -626,8 +626,9 @@ pub async fn spawn_agent( task: &str, state: &Arc, output: &dyn Output, - _sid: &str, + sid: &str, _config: &Arc, + chat_id: i64, ) -> String { // check if already exists if state.agents.read().await.contains_key(id) { @@ -659,11 +660,14 @@ pub async fn spawn_agent( state.agents.write().await.insert(id.to_string(), agent); - // background task: collect output + // background task: collect output, then send event to life loop let out = agent_output.clone(); let done = completed.clone(); let ecode = exit_code.clone(); let id_c = id.to_string(); + let task_c = task.to_string(); + let life_tx = state.life_tx.clone(); + let sid_c = sid.to_string(); tokio::spawn(async move { let stdout = child.stdout.take(); @@ -681,6 +685,16 @@ pub async fn spawn_agent( done.store(true, Ordering::SeqCst); info!(agent = %id_c, "agent completed, exit={code:?}"); + + let output_text = out.read().await.clone(); + let _ = life_tx.send(crate::life::LifeEvent::AgentDone { + id: id_c, + chat_id, + session_id: sid_c, + task: task_c, + output: output_text, + exit_code: code, + }).await; }); let _ = output.status(&format!("Agent '{id}' spawned (pid={pid:?})")).await; diff --git a/tools/manage_todo b/tools/manage_todo index ca18c5d..6d61101 100755 --- a/tools/manage_todo +++ b/tools/manage_todo @@ -19,7 +19,7 @@ import sys import requests APP_ID = "cli_a7f042e93d385013" -APP_SECRET = "ht4FCjQ8JJ65ZPUWlff6ldFBmaP0mxqY" +APP_SECRET = "6V3t5bFK4vRKsEG3VD6sQdAu2rmFEr2S" APP_TOKEN = "SSoGbmGFoazJkUs7bbfcaSG8n7f" TABLE_ID = "tblIA2biceDpvr35" BASE_URL = "https://open.feishu.cn/open-apis"