mod config; mod state; mod tools; mod stream; mod display; mod life; use std::collections::HashSet; use std::path::{Path, PathBuf}; use std::sync::Arc; use anyhow::Result; use chrono::{Local, NaiveDate, NaiveTime}; use teloxide::dispatching::UpdateFilterExt; use teloxide::net::Download; use teloxide::prelude::*; use teloxide::types::InputFile; use tracing::{error, info, warn}; use uuid::Uuid; use config::{BackendConfig, Config}; use display::build_user_content; use state::{AppState, MAX_WINDOW, SLIDE_SIZE}; use stream::{ build_system_prompt, invoke_claude_streaming, run_claude_streaming, run_openai_with_tools, summarize_messages, }; use tools::discover_tools; // ── helpers ───────────────────────────────────────────────────────── fn session_date(refresh_hour: u32) -> NaiveDate { let now = Local::now(); let refresh = NaiveTime::from_hms_opt(refresh_hour, 0, 0).unwrap(); if now.time() < refresh { now.date_naive() - chrono::Duration::days(1) } else { now.date_naive() } } fn session_uuid(prefix: &str, chat_id: i64, refresh_hour: u32) -> String { let date = session_date(refresh_hour); let name = format!("{}-{}-{}", prefix, chat_id, date.format("%Y%m%d")); Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()).to_string() } fn home_dir() -> PathBuf { PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/tmp".into())) } fn incoming_dir() -> PathBuf { home_dir().join("incoming") } fn outgoing_dir(sid: &str) -> PathBuf { home_dir().join("outgoing").join(sid) } // ── main ──────────────────────────────────────────────────────────── #[tokio::main] async fn main() { tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::from_default_env() .add_directive("noc=info".parse().unwrap()), ) .init(); let config_path = std::env::var("NOC_CONFIG").unwrap_or_else(|_| "config.yaml".into()); let raw = std::fs::read_to_string(&config_path) .unwrap_or_else(|e| panic!("read {config_path}: {e}")); let config: Config = serde_yaml::from_str(&raw).unwrap_or_else(|e| panic!("parse config: {e}")); let state_path = std::env::var("NOC_STATE") .map(PathBuf::from) .unwrap_or_else(|_| PathBuf::from("state.json")); let state = Arc::new(AppState::load(state_path)); let _ = std::fs::create_dir_all(incoming_dir()); let bot = Bot::new(&config.tg.key); let me = bot.get_me().await.unwrap(); let bot_username = Arc::new(me.username.clone().unwrap_or_default()); info!(username = %bot_username, "noc bot starting"); let handler = Update::filter_message().endpoint(handle); let config = Arc::new(config); // start life loop tokio::spawn(life::life_loop(bot.clone(), state.clone(), config.clone())); Dispatcher::builder(bot, handler) .dependencies(dptree::deps![state, config, bot_username]) .default_handler(|_| async {}) .build() .dispatch() .await; } // ── file download ─────────────────────────────────────────────────── async fn download_tg_file(bot: &Bot, file_id: &str, filename: &str) -> Result { let dir = incoming_dir(); tokio::fs::create_dir_all(&dir).await?; let dest = dir.join(filename); let tf = bot.get_file(file_id).await?; let mut file = tokio::fs::File::create(&dest).await?; bot.download_file(&tf.path, &mut file).await?; info!("downloaded {} -> {}", filename, dest.display()); Ok(dest) } // ── outgoing scan ─────────────────────────────────────────────────── async fn snapshot_dir(dir: &Path) -> HashSet { let mut set = HashSet::new(); if let Ok(mut entries) = tokio::fs::read_dir(dir).await { while let Ok(Some(entry)) = entries.next_entry().await { let path = entry.path(); if path.is_file() { set.insert(path); } } } set } async fn new_files_in(dir: &Path, before: &HashSet) -> Vec { let mut files = Vec::new(); if let Ok(mut entries) = tokio::fs::read_dir(dir).await { while let Ok(Some(entry)) = entries.next_entry().await { let path = entry.path(); if path.is_file() && !before.contains(&path) { files.push(path); } } } files.sort(); files } // ── handler ───────────────────────────────────────────────────────── async fn handle( bot: Bot, msg: Message, state: Arc, config: Arc, _bot_username: Arc, ) -> ResponseResult<()> { let chat_id = msg.chat.id; let is_private = msg.chat.is_private(); let text = msg.text().or(msg.caption()).unwrap_or("").to_string(); let raw_id = chat_id.0; let date = session_date(config.session.refresh_hour); let is_authed = { let p = state.persist.read().await; p.authed.get(&raw_id) == Some(&date) }; if !is_authed { if text.trim() == config.auth.passphrase { { let mut p = state.persist.write().await; p.authed.insert(raw_id, date); } state.save().await; bot.send_message(chat_id, "authenticated").await?; info!(chat = raw_id, "authed"); } else { bot.send_message(chat_id, "not authenticated").await?; } return Ok(()); } if let Err(e) = handle_inner(&bot, &msg, chat_id, &text, is_private, &state, &config).await { error!(chat = raw_id, "handle: {e:#}"); let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; } Ok(()) } async fn handle_inner( bot: &Bot, msg: &Message, chat_id: ChatId, text: &str, is_private: bool, state: &Arc, config: &Arc, ) -> Result<()> { let mut uploaded: Vec = Vec::new(); let mut download_errors: Vec = Vec::new(); let mut transcriptions: Vec = Vec::new(); if let Some(doc) = msg.document() { let name = doc.file_name.as_deref().unwrap_or("file"); match download_tg_file(bot, &doc.file.id, name).await { Ok(p) => uploaded.push(p), Err(e) => download_errors.push(format!("{name}: {e:#}")), } } if let Some(photos) = msg.photo() { if let Some(photo) = photos.last() { let name = format!("photo_{}.jpg", Local::now().format("%H%M%S")); match download_tg_file(bot, &photo.file.id, &name).await { Ok(p) => uploaded.push(p), Err(e) => download_errors.push(format!("photo: {e:#}")), } } } if let Some(audio) = msg.audio() { let fallback = format!("audio_{}.ogg", Local::now().format("%H%M%S")); let name = audio.file_name.as_deref().unwrap_or(&fallback); match download_tg_file(bot, &audio.file.id, name).await { Ok(p) => { if let Some(url) = &config.whisper_url { match transcribe_audio(url, &p).await { Ok(t) if !t.is_empty() => transcriptions.push(t), Ok(_) => uploaded.push(p), Err(e) => { warn!("transcribe failed: {e:#}"); uploaded.push(p); } } } else { uploaded.push(p); } } Err(e) => download_errors.push(format!("audio: {e:#}")), } } if let Some(voice) = msg.voice() { let name = format!("voice_{}.ogg", Local::now().format("%H%M%S")); match download_tg_file(bot, &voice.file.id, &name).await { Ok(p) => { if let Some(url) = &config.whisper_url { match transcribe_audio(url, &p).await { Ok(t) if !t.is_empty() => transcriptions.push(t), Ok(_) => uploaded.push(p), Err(e) => { warn!("transcribe failed: {e:#}"); uploaded.push(p); } } } else { uploaded.push(p); } } Err(e) => download_errors.push(format!("voice: {e:#}")), } } if let Some(video) = msg.video() { let fallback = format!("video_{}.mp4", Local::now().format("%H%M%S")); let name = video.file_name.as_deref().unwrap_or(&fallback); match download_tg_file(bot, &video.file.id, name).await { Ok(p) => uploaded.push(p), Err(e) => download_errors.push(format!("video: {e:#}")), } } if let Some(vn) = msg.video_note() { let name = format!("videonote_{}.mp4", Local::now().format("%H%M%S")); match download_tg_file(bot, &vn.file.id, &name).await { Ok(p) => uploaded.push(p), Err(e) => download_errors.push(format!("video_note: {e:#}")), } } if text.is_empty() && uploaded.is_empty() && transcriptions.is_empty() { if !download_errors.is_empty() { let err_msg = format!("[文件下载失败]\n{}", download_errors.join("\n")); bot.send_message(chat_id, err_msg).await?; } return Ok(()); } let sid = session_uuid(&config.name, chat_id.0, config.session.refresh_hour); info!(%sid, "recv"); let out_dir = outgoing_dir(&sid); tokio::fs::create_dir_all(&out_dir).await?; let before = snapshot_dir(&out_dir).await; // handle diag command (OpenAI backend only) if text.trim() == "diag" { if let BackendConfig::OpenAI { .. } = &config.backend { let conv = state.load_conv(&sid).await; let count = state.message_count(&sid).await; let persona = state.get_config("persona").await.unwrap_or_default(); let scratch = state.get_scratch().await; let tools = discover_tools(); let empty = vec![]; let tools_arr = tools.as_array().unwrap_or(&empty); let mut diag = format!( "# NOC Diag\n\n\ ## Session\n\ - id: `{sid}`\n\ - window: {count}/{MAX_WINDOW} (slide at {MAX_WINDOW}, drop {SLIDE_SIZE})\n\ - total processed: {}\n\n\ ## Persona ({} chars)\n```\n{}\n```\n\n\ ## Scratch ({} chars)\n```\n{}\n```\n\n\ ## Summary ({} chars)\n```\n{}\n```\n\n\ ## Tools ({} registered)\n", conv.total_messages + count, persona.len(), if persona.is_empty() { "(default)" } else { &persona }, scratch.len(), if scratch.is_empty() { "(empty)" } else { &scratch }, conv.summary.len(), if conv.summary.is_empty() { "(empty)".to_string() } else { conv.summary }, tools_arr.len(), ); for tool in tools_arr { let func = &tool["function"]; let name = func["name"].as_str().unwrap_or("?"); let desc = func["description"].as_str().unwrap_or(""); let params = serde_json::to_string_pretty(&func["parameters"]) .unwrap_or_default(); diag.push_str(&format!( "### `{name}`\n{desc}\n\n```json\n{params}\n```\n\n" )); } let memory_slots = state.get_memory_slots().await; diag.push_str(&format!("## Memory Slots ({}/100 used)\n", memory_slots.len())); if memory_slots.is_empty() { diag.push_str("(empty)\n\n"); } else { for (nr, content) in &memory_slots { diag.push_str(&format!("- `[{nr}]` {content}\n")); } diag.push('\n'); } let tmp = std::env::temp_dir().join(format!("noc-diag-{sid}.md")); tokio::fs::write(&tmp, &diag).await?; bot.send_document(chat_id, InputFile::file(&tmp)) .await?; let _ = tokio::fs::remove_file(&tmp).await; return Ok(()); } } // handle "cc" prefix: pass directly to claude -p, no session, no history if let Some(cc_prompt) = text.strip_prefix("cc").map(|s| s.trim_start()) { if !cc_prompt.is_empty() { info!(%sid, "cc passthrough"); let prompt = build_prompt(cc_prompt, &uploaded, &download_errors, &transcriptions); match run_claude_streaming(&[], &prompt, bot, chat_id).await { Ok(_) => {} Err(e) => { error!(%sid, "cc claude: {e:#}"); let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; } } return Ok(()); } } let prompt = build_prompt(text, &uploaded, &download_errors, &transcriptions); match &config.backend { BackendConfig::Claude => { let known = state.persist.read().await.known_sessions.contains(&sid); let result = invoke_claude_streaming(&sid, &prompt, known, bot, chat_id).await; match &result { Ok(_) => { if !known { state.persist.write().await.known_sessions.insert(sid.clone()); state.save().await; } } Err(e) => { error!(%sid, "claude: {e:#}"); let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; } } } BackendConfig::OpenAI { endpoint, model, api_key, } => { let conv = state.load_conv(&sid).await; let persona = state.get_config("persona").await.unwrap_or_default(); let memory_slots = state.get_memory_slots().await; let system_msg = build_system_prompt(&conv.summary, &persona, &memory_slots); let mut api_messages = vec![system_msg]; api_messages.extend(conv.messages); let scratch = state.get_scratch().await; let user_content = build_user_content(&prompt, &scratch, &uploaded); api_messages.push(serde_json::json!({"role": "user", "content": user_content})); match run_openai_with_tools( endpoint, model, api_key, api_messages, bot, chat_id, state, &sid, config, is_private, ) .await { Ok(response) => { state.push_message(&sid, "user", &prompt).await; if !response.is_empty() { state.push_message(&sid, "assistant", &response).await; } // sliding window let count = state.message_count(&sid).await; if count >= MAX_WINDOW { info!(%sid, "sliding window: {count} messages, summarizing oldest {SLIDE_SIZE}"); let _ = bot .send_message(chat_id, "[整理记忆中...]") .await; let to_summarize = state.get_oldest_messages(&sid, SLIDE_SIZE).await; let current_summary = { let db = state.db.lock().await; db.query_row( "SELECT summary FROM conversations WHERE session_id = ?1", [&sid], |row| row.get::<_, String>(0), ) .unwrap_or_default() }; match summarize_messages( endpoint, model, api_key, ¤t_summary, &to_summarize, ) .await { Ok(new_summary) => { state.slide_window(&sid, &new_summary, SLIDE_SIZE).await; let remaining = state.message_count(&sid).await; info!(%sid, "window slid, {remaining} messages remain, summary {} chars", new_summary.len()); } Err(e) => { warn!(%sid, "summarize failed: {e:#}, keeping all messages"); } } } } Err(e) => { error!(%sid, "openai: {e:#}"); let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; } } } } // send new files from outgoing dir let new_files = new_files_in(&out_dir, &before).await; for path in &new_files { info!(%sid, "sending file: {}", path.display()); if let Err(e) = bot.send_document(chat_id, InputFile::file(path)).await { error!(%sid, "send_document {}: {e:#}", path.display()); let _ = bot .send_message(chat_id, format!("[发送文件失败: {}]", path.display())) .await; } } Ok(()) } fn build_prompt( text: &str, uploaded: &[PathBuf], errors: &[String], transcriptions: &[String], ) -> String { let mut parts = Vec::new(); for t in transcriptions { parts.push(format!("[语音消息] {t}")); } for f in uploaded { parts.push(format!("[用户上传了文件: {}]", f.display())); } for e in errors { parts.push(format!("[文件下载失败: {e}]")); } if !text.is_empty() { parts.push(text.to_string()); } parts.join("\n") } async fn transcribe_audio(whisper_url: &str, file_path: &Path) -> Result { let client = reqwest::Client::new(); let url = format!("{}/v1/audio/transcriptions", whisper_url.trim_end_matches('/')); let file_bytes = tokio::fs::read(file_path).await?; let file_name = file_path .file_name() .and_then(|n| n.to_str()) .unwrap_or("audio.ogg") .to_string(); let part = reqwest::multipart::Part::bytes(file_bytes) .file_name(file_name) .mime_str("audio/ogg")?; let form = reqwest::multipart::Form::new() .part("file", part) .text("model", "base"); let resp = client.post(&url).multipart(form).send().await?.error_for_status()?; let json: serde_json::Value = resp.json().await?; Ok(json["text"].as_str().unwrap_or("").to_string()) }