persistent auth in SQLite, API chat/logs, agent completion via channel

- Auth: move from state.json to SQLite authed_chats table, with memory cache
- Remove Persistent/state.json, all state now in noc.db
- HTTP API: POST /api/chat (end-to-end LLM), GET /api/logs (failed API requests)
- API logging: store raw request/response for 400 errors in api_log table
- Agent completion: spawn_agent sends LifeEvent::AgentDone via channel,
  life loop picks up with full conversation context and responds
- Config structs: derive Clone for HTTP server
- System prompt: instruct LLM not to add timestamps
- Makefile: rsync without --delete to preserve VPS-only tools
This commit is contained in:
Fam Zheng
2026-04-11 09:31:48 +01:00
parent f7bcdf9b4b
commit 55e9b2f50f
11 changed files with 230 additions and 71 deletions

1
.gitignore vendored
View File

@@ -7,3 +7,4 @@ state.*.json
target/
data/
noc.service
tools/manage_todo

View File

@@ -34,7 +34,7 @@ deploy: test build
scp target/release/noc $(SUITE):~/bin/
scp config.suite.yaml $(SUITE):/data/noc/config.yaml
scp noc.service.in $(SUITE):/data/noc/
scp -r tools/ $(SUITE):/data/noc/tools/
rsync -a tools/ $(SUITE):/data/noc/tools/
ssh $(SUITE) 'bash -lc "\
cd /data/noc \
&& sed -e \"s|@REPO@|/data/noc|g\" -e \"s|@PATH@|\$$PATH|g\" noc.service.in > ~/.config/systemd/user/noc.service \

View File

@@ -12,7 +12,6 @@ RestartSec=5
Environment=RUST_LOG=noc=info
Environment=RUST_BACKTRACE=1
Environment=NOC_CONFIG=@REPO@/config.yaml
Environment=NOC_STATE=@REPO@/state.json
Environment=PATH=@PATH@
[Install]

View File

@@ -1,6 +1,6 @@
use serde::Deserialize;
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct Config {
#[serde(default = "default_name")]
pub name: String,
@@ -71,17 +71,17 @@ fn default_api_key() -> String {
"unused".to_string()
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct TgConfig {
pub key: String,
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct AuthConfig {
pub passphrase: String,
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct SessionConfig {
pub refresh_hour: u32,
}

View File

@@ -8,13 +8,16 @@ use axum::Json;
use tokio::sync::mpsc;
use tracing::{error, info};
use crate::config::Config;
use crate::config::{BackendConfig, Config};
use crate::life::LifeEvent;
use crate::output::BufferOutput;
use crate::state::AppState;
use crate::stream::{build_system_prompt, run_openai_with_tools};
#[derive(Clone)]
pub struct HttpState {
pub app_state: Arc<AppState>,
pub config: Arc<Config>,
pub life_tx: mpsc::Sender<LifeEvent>,
}
@@ -29,20 +32,28 @@ pub async fn start_http_server(
.map(|g| g.webhook_port)
.unwrap_or(9880);
let config = Arc::new(config.clone());
let state = Arc::new(HttpState {
app_state,
config,
life_tx,
});
// merge gitea webhook router if configured
let gitea_router = state.config.gitea.as_ref().map(|gitea_config| {
let bot_user = std::env::var("GITEA_ADMIN_USER").unwrap_or_else(|_| "noc".into());
crate::gitea::webhook_router(gitea_config, bot_user)
});
let mut app = axum::Router::new()
.route("/api/timers", get(list_timers))
.route("/api/timers/{id}/fire", post(fire_timer))
.route("/api/chat", post(api_chat))
.route("/api/logs", get(api_logs))
.with_state(state);
// merge gitea webhook router if configured
if let Some(gitea_config) = &config.gitea {
let bot_user = std::env::var("GITEA_ADMIN_USER").unwrap_or_else(|_| "noc".into());
app = app.merge(crate::gitea::webhook_router(gitea_config, bot_user));
if let Some(router) = gitea_router {
app = app.merge(router);
}
let addr = format!("0.0.0.0:{port}");
@@ -75,6 +86,73 @@ async fn list_timers(AxumState(state): AxumState<Arc<HttpState>>) -> impl IntoRe
Json(serde_json::json!(items))
}
async fn api_chat(
AxumState(state): AxumState<Arc<HttpState>>,
Json(payload): Json<serde_json::Value>,
) -> impl IntoResponse {
let message = payload["message"].as_str().unwrap_or("").to_string();
if message.is_empty() {
return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "message required"})));
}
let BackendConfig::OpenAI {
ref endpoint,
ref model,
ref api_key,
} = state.config.backend
else {
return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "no openai backend"})));
};
let persona = state.app_state.get_config("persona").await.unwrap_or_default();
let memory_slots = state.app_state.get_memory_slots().await;
let inner_state = state.app_state.get_inner_state().await;
let system = build_system_prompt("", &persona, &memory_slots, &inner_state);
let messages = vec![
system,
serde_json::json!({"role": "user", "content": message}),
];
let sid = format!("api-{}", chrono::Local::now().timestamp());
let mut output = BufferOutput::new();
info!("api chat: {}", &message[..message.len().min(100)]);
match run_openai_with_tools(
endpoint, model, api_key, messages, &mut output, &state.app_state, &sid, &state.config, 0,
)
.await
{
Ok(response) => (StatusCode::OK, Json(serde_json::json!({"response": response}))),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("{e:#}")}))),
}
}
async fn api_logs(
AxumState(state): AxumState<Arc<HttpState>>,
) -> impl IntoResponse {
let db = state.app_state.db.lock().await;
let mut stmt = db
.prepare("SELECT id, session_id, status, length(request), length(response), created_at FROM api_log ORDER BY id DESC LIMIT 20")
.unwrap();
let logs: Vec<serde_json::Value> = stmt
.query_map([], |row| {
Ok(serde_json::json!({
"id": row.get::<_, i64>(0)?,
"session_id": row.get::<_, String>(1)?,
"status": row.get::<_, i64>(2)?,
"request_len": row.get::<_, i64>(3)?,
"response_len": row.get::<_, i64>(4)?,
"created_at": row.get::<_, String>(5)?,
}))
})
.unwrap()
.filter_map(|r| r.ok())
.collect();
Json(serde_json::json!(logs))
}
async fn fire_timer(
AxumState(state): AxumState<Arc<HttpState>>,
Path(id): Path<i64>,

View File

@@ -19,6 +19,15 @@ const DIARY_SCHEDULE: &str = "cron:0 55 22 * * *";
pub enum LifeEvent {
/// Force-fire a specific timer by ID.
FireTimer(i64),
/// A sub-agent completed — feed result back through LLM.
AgentDone {
id: String,
chat_id: i64,
session_id: String,
task: String,
output: String,
exit_code: Option<i32>,
},
}
pub async fn life_loop(
@@ -54,6 +63,46 @@ pub async fn life_loop(
warn!(timer_id = id, "force-fire: timer not found");
}
}
LifeEvent::AgentDone { id, chat_id: cid, session_id, task, output, exit_code } => {
info!(agent = %id, session = %session_id, "agent done, notifying");
let preview = crate::display::truncate_at_char_boundary(&output, 3000);
let notification = format!(
"[子代理 '{id}' 完成 (exit={exit_code:?})]\n任务: {task}\n输出:\n{preview}"
);
// load conversation context so LLM knows what was discussed
let conv = state.load_conv(&session_id).await;
let persona = state.get_config("persona").await.unwrap_or_default();
let memory_slots = state.get_memory_slots().await;
let inner = state.get_inner_state().await;
let system = crate::stream::build_system_prompt(
&conv.summary, &persona, &memory_slots, &inner,
);
let mut messages = vec![system];
// include recent conversation history
messages.extend(conv.messages.iter().cloned());
// append the agent completion as a new user message
messages.push(serde_json::json!({"role": "user", "content": notification}));
if let BackendConfig::OpenAI { ref endpoint, ref model, ref api_key } = config.backend {
let chat_id_tg = ChatId(cid);
let sid = format!("agent-{id}");
let mut tg_output;
let mut buf_output;
let out: &mut dyn crate::output::Output = if cid == 0 {
buf_output = BufferOutput::new();
&mut buf_output
} else {
tg_output = TelegramOutput::new(bot.clone(), chat_id_tg, true);
&mut tg_output
};
let _ = run_openai_with_tools(
endpoint, model, api_key, messages, out, &state, &sid, &config, cid,
).await;
}
}
}
}
}

View File

@@ -77,10 +77,12 @@ async fn main() {
gitea.resolve_token();
}
let state_path = std::env::var("NOC_STATE")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("state.json"));
let state = Arc::new(AppState::load(state_path));
// channel: http/agents → life loop
let (life_tx, life_rx) = tokio::sync::mpsc::channel(16);
let config_path = std::env::var("NOC_CONFIG").unwrap_or_else(|_| "config.yaml".into());
let db_dir = Path::new(&config_path).parent().unwrap_or(Path::new("."));
let state = Arc::new(AppState::load(db_dir, life_tx.clone()));
let _ = std::fs::create_dir_all(incoming_dir());
@@ -93,18 +95,15 @@ async fn main() {
let config = Arc::new(config);
// channel: http server → life loop
let (life_tx, life_rx) = tokio::sync::mpsc::channel(16);
// start life loop
tokio::spawn(life::life_loop(bot.clone(), state.clone(), config.clone(), life_rx));
// start http server (API + gitea webhook)
{
let srv_config = config.clone();
let http_config = config.as_ref().clone();
let srv_state = state.clone();
tokio::spawn(async move {
http::start_http_server(&srv_config, srv_state, life_tx).await;
http::start_http_server(&http_config, srv_state, life_tx).await;
});
}
@@ -173,20 +172,10 @@ async fn handle(
let is_private = msg.chat.is_private();
let text = msg.text().or(msg.caption()).unwrap_or("").to_string();
let raw_id = chat_id.0;
let date = session_date(config.session.refresh_hour);
let is_authed = {
let p = state.persist.read().await;
p.authed.get(&raw_id) == Some(&date)
};
if !is_authed {
if !state.is_authed(raw_id).await {
if text.trim() == config.auth.passphrase {
{
let mut p = state.persist.write().await;
p.authed.insert(raw_id, date);
}
state.save().await;
state.set_authed(raw_id).await;
bot.send_message(chat_id, "authenticated").await?;
info!(chat = raw_id, "authed");
} else {

View File

@@ -1,24 +1,14 @@
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::Arc;
use anyhow::Result;
use chrono::NaiveDate;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::{error, info};
use tracing::info;
use crate::tools::SubAgent;
// ── persistent state ────────────────────────────────────────────────
#[derive(Serialize, Deserialize, Default)]
pub struct Persistent {
pub authed: HashMap<i64, NaiveDate>,
pub known_sessions: HashSet<String>,
}
#[derive(Serialize, Deserialize, Clone, Default)]
#[derive(Clone, Default)]
pub struct ConversationState {
pub summary: String,
pub messages: Vec<serde_json::Value>,
@@ -29,21 +19,15 @@ pub const MAX_WINDOW: usize = 100;
pub const SLIDE_SIZE: usize = 50;
pub struct AppState {
pub persist: RwLock<Persistent>,
pub state_path: PathBuf,
pub db: tokio::sync::Mutex<rusqlite::Connection>,
pub agents: RwLock<HashMap<String, Arc<SubAgent>>>,
authed_cache: RwLock<HashSet<i64>>,
pub life_tx: tokio::sync::mpsc::Sender<crate::life::LifeEvent>,
}
impl AppState {
pub fn load(path: PathBuf) -> Self {
let persist = std::fs::read_to_string(&path)
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_default();
info!("loaded state from {}", path.display());
let db_path = path.parent().unwrap_or(Path::new(".")).join("noc.db");
pub fn load(db_dir: &Path, life_tx: tokio::sync::mpsc::Sender<crate::life::LifeEvent>) -> Self {
let db_path = db_dir.join("noc.db");
let conn = rusqlite::Connection::open(&db_path)
.unwrap_or_else(|e| panic!("open {}: {e}", db_path.display()));
conn.execute_batch(
@@ -97,6 +81,18 @@ impl AppState {
content TEXT NOT NULL DEFAULT ''
);
INSERT OR IGNORE INTO inner_state (id, content) VALUES (1, '');
CREATE TABLE IF NOT EXISTS authed_chats (
chat_id INTEGER PRIMARY KEY,
authed_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
CREATE TABLE IF NOT EXISTS api_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL DEFAULT '',
request TEXT NOT NULL,
response TEXT NOT NULL DEFAULT '',
status INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
CREATE TABLE IF NOT EXISTS life_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event TEXT NOT NULL,
@@ -119,19 +115,10 @@ impl AppState {
info!("opened db {}", db_path.display());
Self {
persist: RwLock::new(persist),
state_path: path,
db: tokio::sync::Mutex::new(conn),
agents: RwLock::new(HashMap::new()),
}
}
pub async fn save(&self) {
let data = self.persist.read().await;
if let Ok(json) = serde_json::to_string_pretty(&*data) {
if let Err(e) = std::fs::write(&self.state_path, json) {
error!("save state: {e}");
}
authed_cache: RwLock::new(HashSet::new()),
life_tx,
}
}
@@ -267,6 +254,44 @@ impl AppState {
);
}
pub async fn is_authed(&self, chat_id: i64) -> bool {
// check cache first
if self.authed_cache.read().await.contains(&chat_id) {
return true;
}
// cache miss → check DB
let db = self.db.lock().await;
let found: bool = db
.query_row(
"SELECT COUNT(*) > 0 FROM authed_chats WHERE chat_id = ?1",
rusqlite::params![chat_id],
|row| row.get(0),
)
.unwrap_or(false);
drop(db);
if found {
self.authed_cache.write().await.insert(chat_id);
}
found
}
pub async fn set_authed(&self, chat_id: i64) {
self.authed_cache.write().await.insert(chat_id);
let db = self.db.lock().await;
let _ = db.execute(
"INSERT OR IGNORE INTO authed_chats (chat_id) VALUES (?1)",
rusqlite::params![chat_id],
);
}
pub async fn log_api(&self, session_id: &str, request: &str, response: &str, status: u16) {
let db = self.db.lock().await;
let _ = db.execute(
"INSERT INTO api_log (session_id, request, response, status) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![session_id, request, response, status],
);
}
pub async fn log_life(&self, event: &str, detail: &str) {
let db = self.db.lock().await;
let _ = db.execute(

View File

@@ -81,6 +81,9 @@ pub async fn run_openai_with_tools(
if !resp_raw.status().is_success() {
let status = resp_raw.status();
let body_text = resp_raw.text().await.unwrap_or_default();
// log failed API call
let req_json = serde_json::to_string(&body).unwrap_or_default();
state.log_api(sid, &req_json, &body_text, status.as_u16()).await;
for (i, m) in messages.iter().enumerate() {
let role = m["role"].as_str().unwrap_or("?");
let content_len = m["content"].as_str().map(|s| s.len()).unwrap_or(0);
@@ -233,7 +236,8 @@ pub fn build_system_prompt(summary: &str, persona: &str, memory_slots: &[(i32, S
当需要搜索信息(如网页搜索、资料查找、技术调研等)时,使用 spawn_agent 启动一个子代理来完成搜索任务,\
子代理可以使用浏览器和搜索引擎,搜索完成后你会收到结果通知。\
输出格式使用纯文本或基础Markdown加粗、列表、代码块\
不要使用LaTeX公式$...$、特殊Unicode符号→←↔或HTML标签Telegram无法渲染这些。",
不要使用LaTeX公式$...$、特殊Unicode符号→←↔或HTML标签Telegram无法渲染这些。\
不要在回复开头加时间戳——用户消息前的时间戳是系统自动添加的,不需要你模仿。",
);
if !memory_slots.is_empty() {

View File

@@ -323,7 +323,7 @@ pub async fn execute_tool(
"spawn_agent" => {
let id = args["id"].as_str().unwrap_or("agent");
let task = args["task"].as_str().unwrap_or("");
spawn_agent(id, task, state, output, sid, config).await
spawn_agent(id, task, state, output, sid, config, chat_id).await
}
"agent_status" => {
let id = args["id"].as_str().unwrap_or("");
@@ -626,8 +626,9 @@ pub async fn spawn_agent(
task: &str,
state: &Arc<AppState>,
output: &dyn Output,
_sid: &str,
sid: &str,
_config: &Arc<Config>,
chat_id: i64,
) -> String {
// check if already exists
if state.agents.read().await.contains_key(id) {
@@ -659,11 +660,14 @@ pub async fn spawn_agent(
state.agents.write().await.insert(id.to_string(), agent);
// background task: collect output
// background task: collect output, then send event to life loop
let out = agent_output.clone();
let done = completed.clone();
let ecode = exit_code.clone();
let id_c = id.to_string();
let task_c = task.to_string();
let life_tx = state.life_tx.clone();
let sid_c = sid.to_string();
tokio::spawn(async move {
let stdout = child.stdout.take();
@@ -681,6 +685,16 @@ pub async fn spawn_agent(
done.store(true, Ordering::SeqCst);
info!(agent = %id_c, "agent completed, exit={code:?}");
let output_text = out.read().await.clone();
let _ = life_tx.send(crate::life::LifeEvent::AgentDone {
id: id_c,
chat_id,
session_id: sid_c,
task: task_c,
output: output_text,
exit_code: code,
}).await;
});
let _ = output.status(&format!("Agent '{id}' spawned (pid={pid:?})")).await;

View File

@@ -19,7 +19,7 @@ import sys
import requests
APP_ID = "cli_a7f042e93d385013"
APP_SECRET = "ht4FCjQ8JJ65ZPUWlff6ldFBmaP0mxqY"
APP_SECRET = "6V3t5bFK4vRKsEG3VD6sQdAu2rmFEr2S"
APP_TOKEN = "SSoGbmGFoazJkUs7bbfcaSG8n7f"
TABLE_ID = "tblIA2biceDpvr35"
BASE_URL = "https://open.feishu.cn/open-apis"