diff --git a/Cargo.lock b/Cargo.lock index 1ec623a..f2ec589 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -394,6 +394,15 @@ dependencies = [ "slab", ] +[[package]] +name = "getopts" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe4fbac503b8d1f88e6676011885f34b7174f46e59956bba534ba83abded4df" +dependencies = [ + "unicode-width", +] + [[package]] name = "getrandom" version = "0.2.17" @@ -976,6 +985,7 @@ dependencies = [ "anyhow", "chrono", "dptree", + "pulldown-cmark", "reqwest 0.12.28", "serde", "serde_json", @@ -1168,6 +1178,25 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "pulldown-cmark" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f86ba2052aebccc42cbbb3ed234b8b13ce76f75c3551a303cb2bcffcff12bb14" +dependencies = [ + "bitflags 2.11.0", + "getopts", + "memchr", + "pulldown-cmark-escape", + "unicase", +] + +[[package]] +name = "pulldown-cmark-escape" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "007d8adb5ddab6f8e3f491ac63566a7d5002cc7ed73901f72057943fa71ae1ae" + [[package]] name = "quote" version = "1.0.45" @@ -2015,6 +2044,12 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "unicode-width" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" + [[package]] name = "unicode-xid" version = "0.2.6" diff --git a/Cargo.toml b/Cargo.toml index 8fe2a4f..2a22d0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ dptree = "0.3" serde = { version = "1", features = ["derive"] } serde_json = "1" serde_yaml = "0.9" +pulldown-cmark = "0.12" reqwest = { version = "0.12", features = ["json"] } teloxide = { version = "0.12", features = ["macros"] } tokio = { version = "1", features = ["full"] } diff --git a/Makefile b/Makefile index 424d012..3b9ab7a 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ noc.service: noc.service.in deploy: build noc.service mkdir -p ~/bin ~/.config/systemd/user systemctl --user stop noc 2>/dev/null || true - cp target/release/noc ~/bin/ + install target/release/noc ~/bin/noc cp noc.service ~/.config/systemd/user/ systemctl --user daemon-reload systemctl --user enable --now noc diff --git a/src/main.rs b/src/main.rs index 7c1d8a2..8d8cc80 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,9 +21,41 @@ use uuid::Uuid; #[derive(Deserialize)] struct Config { + #[serde(default = "default_name")] + name: String, tg: TgConfig, auth: AuthConfig, session: SessionConfig, + #[serde(default)] + backend: BackendConfig, +} + +fn default_name() -> String { + "noc".to_string() +} + +#[derive(Deserialize, Clone)] +#[serde(tag = "type")] +enum BackendConfig { + #[serde(rename = "claude")] + Claude, + #[serde(rename = "openai")] + OpenAI { + endpoint: String, + model: String, + #[serde(default = "default_api_key")] + api_key: String, + }, +} + +fn default_api_key() -> String { + "unused".to_string() +} + +impl Default for BackendConfig { + fn default() -> Self { + BackendConfig::Claude + } } #[derive(Deserialize)] @@ -52,6 +84,7 @@ struct Persistent { struct AppState { persist: RwLock, state_path: PathBuf, + conversations: RwLock>>, } impl AppState { @@ -64,6 +97,7 @@ impl AppState { Self { persist: RwLock::new(persist), state_path: path, + conversations: RwLock::new(HashMap::new()), } } @@ -89,9 +123,9 @@ fn session_date(refresh_hour: u32) -> NaiveDate { } } -fn session_uuid(chat_id: i64, refresh_hour: u32) -> String { +fn session_uuid(prefix: &str, chat_id: i64, refresh_hour: u32) -> String { let date = session_date(refresh_hour); - let name = format!("noc-{}-{}", chat_id, date.format("%Y%m%d")); + let name = format!("{}-{}-{}", prefix, chat_id, date.format("%Y%m%d")); Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()).to_string() } @@ -300,7 +334,7 @@ async fn handle_inner( return Ok(()); } - let sid = session_uuid(chat_id.0, config.session.refresh_hour); + let sid = session_uuid(&config.name, chat_id.0, config.session.refresh_hour); info!(%sid, "recv"); let out_dir = outgoing_dir(&sid); @@ -309,20 +343,46 @@ async fn handle_inner( let prompt = build_prompt(text, &uploaded, &download_errors, &out_dir); - let known = state.persist.read().await.known_sessions.contains(&sid); - - let result = invoke_claude_streaming(&sid, &prompt, known, bot, chat_id).await; - - match &result { - Ok(_) => { - if !known { - state.persist.write().await.known_sessions.insert(sid.clone()); - state.save().await; + match &config.backend { + BackendConfig::Claude => { + let known = state.persist.read().await.known_sessions.contains(&sid); + let result = invoke_claude_streaming(&sid, &prompt, known, bot, chat_id).await; + match &result { + Ok(_) => { + if !known { + state.persist.write().await.known_sessions.insert(sid.clone()); + state.save().await; + } + } + Err(e) => { + error!(%sid, "claude: {e:#}"); + let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; + } } } - Err(e) => { - error!(%sid, "claude: {e:#}"); - let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; + BackendConfig::OpenAI { + endpoint, + model, + api_key, + } => { + let mut messages = { + let convos = state.conversations.read().await; + convos.get(&sid).cloned().unwrap_or_default() + }; + messages.push(serde_json::json!({"role": "user", "content": &prompt})); + match run_openai_streaming(endpoint, model, api_key, &messages, bot, chat_id).await { + Ok(response) => { + if !response.is_empty() { + messages + .push(serde_json::json!({"role": "assistant", "content": &response})); + } + state.conversations.write().await.insert(sid.clone(), messages); + } + Err(e) => { + error!(%sid, "openai: {e:#}"); + let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; + } + } } } @@ -426,7 +486,7 @@ fn extract_tool_use(msg: &AssistantMessage) -> Option { None } -const EDIT_INTERVAL_MS: u64 = 3000; +const EDIT_INTERVAL_MS: u64 = 2000; const DRAFT_INTERVAL_MS: u64 = 1000; const TG_MSG_LIMIT: usize = 4096; @@ -552,7 +612,12 @@ async fn run_claude_streaming( (text.clone(), Some(text)) }; - let display = truncate_for_display(&display_raw); + let display = if use_draft { + // draft mode: no cursor — cursor breaks monotonic text growth + truncate_at_char_boundary(&display_raw, TG_MSG_LIMIT).to_string() + } else { + truncate_for_display(&display_raw) + }; if use_draft { match send_message_draft( @@ -650,24 +715,134 @@ async fn run_claude_streaming( return Ok(final_result); } - // final result: send as real message(s) — draft auto-disappears - let chunks: Vec<&str> = split_msg(&final_result, TG_MSG_LIMIT); + send_final_result(bot, chat_id, msg_id, use_draft, &final_result).await; - if !use_draft && msg_id.is_some() { - // edit mode: replace streaming message with final text - let id = msg_id.unwrap(); - let _ = bot.edit_message_text(chat_id, id, chunks[0]).await; - for chunk in &chunks[1..] { - let _ = bot.send_message(chat_id, *chunk).await; + Ok(final_result) +} + +// ── openai-compatible backend (streaming) ────────────────────────── + +async fn run_openai_streaming( + endpoint: &str, + model: &str, + api_key: &str, + messages: &[serde_json::Value], + bot: &Bot, + chat_id: ChatId, +) -> Result { + let client = reqwest::Client::new(); + let url = format!("{}/chat/completions", endpoint.trim_end_matches('/')); + + let body = serde_json::json!({ + "model": model, + "messages": messages, + "stream": true, + }); + + let mut resp = client + .post(&url) + .header("Authorization", format!("Bearer {api_key}")) + .json(&body) + .send() + .await? + .error_for_status()?; + + let token = bot.token().to_owned(); + let raw_chat_id = chat_id.0; + let draft_id: i64 = 1; + let mut use_draft = true; + + let mut msg_id: Option = None; + let mut accumulated = String::new(); + let mut last_edit = Instant::now(); + let mut buffer = String::new(); + let mut done = false; + + while let Some(chunk) = resp.chunk().await? { + if done { + break; } - } else { - // draft mode or no existing message: sendMessage replaces the draft - for chunk in &chunks { - let _ = bot.send_message(chat_id, *chunk).await; + buffer.push_str(&String::from_utf8_lossy(&chunk)); + + while let Some(pos) = buffer.find('\n') { + let line = buffer[..pos].to_string(); + buffer = buffer[pos + 1..].to_string(); + + let trimmed = line.trim(); + if trimmed.is_empty() || trimmed.starts_with(':') { + continue; + } + + let data = match trimmed.strip_prefix("data: ") { + Some(d) => d, + None => continue, + }; + + if data.trim() == "[DONE]" { + done = true; + break; + } + + if let Ok(json) = serde_json::from_str::(data) { + if let Some(content) = json["choices"][0]["delta"]["content"].as_str() { + if content.is_empty() { + continue; + } + accumulated.push_str(content); + + let interval = if use_draft { + DRAFT_INTERVAL_MS + } else { + EDIT_INTERVAL_MS + }; + if last_edit.elapsed().as_millis() < interval as u128 { + continue; + } + + let display = if use_draft { + truncate_at_char_boundary(&accumulated, TG_MSG_LIMIT).to_string() + } else { + truncate_for_display(&accumulated) + }; + + if use_draft { + match send_message_draft( + &client, &token, raw_chat_id, draft_id, &display, + ) + .await + { + Ok(_) => { + last_edit = Instant::now(); + } + Err(e) => { + warn!("sendMessageDraft failed, falling back: {e:#}"); + use_draft = false; + if let Ok(sent) = bot.send_message(chat_id, &display).await { + msg_id = Some(sent.id); + last_edit = Instant::now(); + } + } + } + } else if let Some(id) = msg_id { + if bot.edit_message_text(chat_id, id, &display).await.is_ok() { + last_edit = Instant::now(); + } + } else if let Ok(sent) = bot.send_message(chat_id, &display).await { + msg_id = Some(sent.id); + last_edit = Instant::now(); + } + } + } } } - Ok(final_result) + if accumulated.is_empty() { + return Ok(accumulated); + } + + send_final_result(bot, chat_id, msg_id, use_draft, &accumulated).await; + + Ok(accumulated) } const CURSOR: &str = " \u{25CE}"; @@ -693,6 +868,121 @@ fn truncate_at_char_boundary(s: &str, max: usize) -> &str { &s[..end] } +fn escape_html(s: &str) -> String { + s.replace('&', "&") + .replace('<', "<") + .replace('>', ">") + .replace('"', """) +} + +fn markdown_to_telegram_html(md: &str) -> String { + use pulldown_cmark::{CodeBlockKind, Event, Options, Parser, Tag, TagEnd}; + + let mut opts = Options::empty(); + opts.insert(Options::ENABLE_STRIKETHROUGH); + + let parser = Parser::new_ext(md, opts); + let mut html = String::new(); + + for event in parser { + match event { + Event::Start(tag) => match tag { + Tag::Paragraph => {} + Tag::Heading { .. } => html.push_str(""), + Tag::BlockQuote(_) => html.push_str("
"), + Tag::CodeBlock(kind) => match kind { + CodeBlockKind::Fenced(ref lang) if !lang.is_empty() => { + html.push_str(&format!( + "
",
+                            escape_html(lang.as_ref())
+                        ));
+                    }
+                    _ => html.push_str("
"),
+                },
+                Tag::Item => html.push_str("• "),
+                Tag::Emphasis => html.push_str(""),
+                Tag::Strong => html.push_str(""),
+                Tag::Strikethrough => html.push_str(""),
+                Tag::Link { dest_url, .. } => {
+                    html.push_str(&format!(
+                        "",
+                        escape_html(dest_url.as_ref())
+                    ));
+                }
+                _ => {}
+            },
+            Event::End(tag) => match tag {
+                TagEnd::Paragraph => html.push_str("\n\n"),
+                TagEnd::Heading(_) => html.push_str("\n\n"),
+                TagEnd::BlockQuote(_) => html.push_str("
"), + TagEnd::CodeBlock => html.push_str("\n\n"), + TagEnd::List(_) => html.push('\n'), + TagEnd::Item => html.push('\n'), + TagEnd::Emphasis => html.push_str(""), + TagEnd::Strong => html.push_str("
"), + TagEnd::Strikethrough => html.push_str(""), + TagEnd::Link => html.push_str(""), + _ => {} + }, + Event::Text(text) => html.push_str(&escape_html(text.as_ref())), + Event::Code(text) => { + html.push_str(""); + html.push_str(&escape_html(text.as_ref())); + html.push_str(""); + } + Event::SoftBreak | Event::HardBreak => html.push('\n'), + Event::Rule => html.push_str("\n---\n\n"), + _ => {} + } + } + + html.trim_end().to_string() +} + +/// Send final result with HTML formatting, fallback to plain text on failure. +async fn send_final_result( + bot: &Bot, + chat_id: ChatId, + msg_id: Option, + use_draft: bool, + result: &str, +) { + use teloxide::types::ParseMode; + + let html = markdown_to_telegram_html(result); + + // try HTML as single message + let html_ok = if !use_draft && msg_id.is_some() { + bot.edit_message_text(chat_id, msg_id.unwrap(), &html) + .parse_mode(ParseMode::Html) + .await + .is_ok() + } else { + bot.send_message(chat_id, &html) + .parse_mode(ParseMode::Html) + .await + .is_ok() + }; + + if html_ok { + return; + } + + // fallback: plain text with chunking + let chunks = split_msg(result, TG_MSG_LIMIT); + if !use_draft && msg_id.is_some() { + let id = msg_id.unwrap(); + let _ = bot.edit_message_text(chat_id, id, chunks[0]).await; + for chunk in &chunks[1..] { + let _ = bot.send_message(chat_id, *chunk).await; + } + } else { + for chunk in &chunks { + let _ = bot.send_message(chat_id, *chunk).await; + } + } +} + fn split_msg(s: &str, max: usize) -> Vec<&str> { if s.len() <= max { return vec![s];