add OpenAI-compatible backend, markdown rendering, and sendMessageDraft fix

- Configurable backend: claude (CLI) or openai (API), selected in config.yaml
- OpenAI streaming via SSE with conversation history in memory
- Session isolation: config name included in session UUID
- Markdown to Telegram HTML conversion (pulldown-cmark) for final messages
- Fix sendMessageDraft: skip cursor to preserve monotonic text growth,
  skip empty content chunks from SSE stream
- Simplify Makefile: single deploy target
This commit is contained in:
Fam Zheng
2026-04-09 10:23:50 +01:00
parent eba7d89006
commit 84ba209b3f
4 changed files with 357 additions and 31 deletions

35
Cargo.lock generated
View File

@@ -394,6 +394,15 @@ dependencies = [
"slab", "slab",
] ]
[[package]]
name = "getopts"
version = "0.2.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe4fbac503b8d1f88e6676011885f34b7174f46e59956bba534ba83abded4df"
dependencies = [
"unicode-width",
]
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.17" version = "0.2.17"
@@ -976,6 +985,7 @@ dependencies = [
"anyhow", "anyhow",
"chrono", "chrono",
"dptree", "dptree",
"pulldown-cmark",
"reqwest 0.12.28", "reqwest 0.12.28",
"serde", "serde",
"serde_json", "serde_json",
@@ -1168,6 +1178,25 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "pulldown-cmark"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f86ba2052aebccc42cbbb3ed234b8b13ce76f75c3551a303cb2bcffcff12bb14"
dependencies = [
"bitflags 2.11.0",
"getopts",
"memchr",
"pulldown-cmark-escape",
"unicase",
]
[[package]]
name = "pulldown-cmark-escape"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "007d8adb5ddab6f8e3f491ac63566a7d5002cc7ed73901f72057943fa71ae1ae"
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.45" version = "1.0.45"
@@ -2015,6 +2044,12 @@ version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75"
[[package]]
name = "unicode-width"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254"
[[package]] [[package]]
name = "unicode-xid" name = "unicode-xid"
version = "0.2.6" version = "0.2.6"

View File

@@ -10,6 +10,7 @@ dptree = "0.3"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
serde_yaml = "0.9" serde_yaml = "0.9"
pulldown-cmark = "0.12"
reqwest = { version = "0.12", features = ["json"] } reqwest = { version = "0.12", features = ["json"] }
teloxide = { version = "0.12", features = ["macros"] } teloxide = { version = "0.12", features = ["macros"] }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }

View File

@@ -13,7 +13,7 @@ noc.service: noc.service.in
deploy: build noc.service deploy: build noc.service
mkdir -p ~/bin ~/.config/systemd/user mkdir -p ~/bin ~/.config/systemd/user
systemctl --user stop noc 2>/dev/null || true systemctl --user stop noc 2>/dev/null || true
cp target/release/noc ~/bin/ install target/release/noc ~/bin/noc
cp noc.service ~/.config/systemd/user/ cp noc.service ~/.config/systemd/user/
systemctl --user daemon-reload systemctl --user daemon-reload
systemctl --user enable --now noc systemctl --user enable --now noc

View File

@@ -21,9 +21,41 @@ use uuid::Uuid;
#[derive(Deserialize)] #[derive(Deserialize)]
struct Config { struct Config {
#[serde(default = "default_name")]
name: String,
tg: TgConfig, tg: TgConfig,
auth: AuthConfig, auth: AuthConfig,
session: SessionConfig, session: SessionConfig,
#[serde(default)]
backend: BackendConfig,
}
fn default_name() -> String {
"noc".to_string()
}
#[derive(Deserialize, Clone)]
#[serde(tag = "type")]
enum BackendConfig {
#[serde(rename = "claude")]
Claude,
#[serde(rename = "openai")]
OpenAI {
endpoint: String,
model: String,
#[serde(default = "default_api_key")]
api_key: String,
},
}
fn default_api_key() -> String {
"unused".to_string()
}
impl Default for BackendConfig {
fn default() -> Self {
BackendConfig::Claude
}
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@@ -52,6 +84,7 @@ struct Persistent {
struct AppState { struct AppState {
persist: RwLock<Persistent>, persist: RwLock<Persistent>,
state_path: PathBuf, state_path: PathBuf,
conversations: RwLock<HashMap<String, Vec<serde_json::Value>>>,
} }
impl AppState { impl AppState {
@@ -64,6 +97,7 @@ impl AppState {
Self { Self {
persist: RwLock::new(persist), persist: RwLock::new(persist),
state_path: path, state_path: path,
conversations: RwLock::new(HashMap::new()),
} }
} }
@@ -89,9 +123,9 @@ fn session_date(refresh_hour: u32) -> NaiveDate {
} }
} }
fn session_uuid(chat_id: i64, refresh_hour: u32) -> String { fn session_uuid(prefix: &str, chat_id: i64, refresh_hour: u32) -> String {
let date = session_date(refresh_hour); let date = session_date(refresh_hour);
let name = format!("noc-{}-{}", chat_id, date.format("%Y%m%d")); let name = format!("{}-{}-{}", prefix, chat_id, date.format("%Y%m%d"));
Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()).to_string() Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()).to_string()
} }
@@ -300,7 +334,7 @@ async fn handle_inner(
return Ok(()); return Ok(());
} }
let sid = session_uuid(chat_id.0, config.session.refresh_hour); let sid = session_uuid(&config.name, chat_id.0, config.session.refresh_hour);
info!(%sid, "recv"); info!(%sid, "recv");
let out_dir = outgoing_dir(&sid); let out_dir = outgoing_dir(&sid);
@@ -309,20 +343,46 @@ async fn handle_inner(
let prompt = build_prompt(text, &uploaded, &download_errors, &out_dir); let prompt = build_prompt(text, &uploaded, &download_errors, &out_dir);
let known = state.persist.read().await.known_sessions.contains(&sid); match &config.backend {
BackendConfig::Claude => {
let result = invoke_claude_streaming(&sid, &prompt, known, bot, chat_id).await; let known = state.persist.read().await.known_sessions.contains(&sid);
let result = invoke_claude_streaming(&sid, &prompt, known, bot, chat_id).await;
match &result { match &result {
Ok(_) => { Ok(_) => {
if !known { if !known {
state.persist.write().await.known_sessions.insert(sid.clone()); state.persist.write().await.known_sessions.insert(sid.clone());
state.save().await; state.save().await;
}
}
Err(e) => {
error!(%sid, "claude: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
} }
} }
Err(e) => { BackendConfig::OpenAI {
error!(%sid, "claude: {e:#}"); endpoint,
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await; model,
api_key,
} => {
let mut messages = {
let convos = state.conversations.read().await;
convos.get(&sid).cloned().unwrap_or_default()
};
messages.push(serde_json::json!({"role": "user", "content": &prompt}));
match run_openai_streaming(endpoint, model, api_key, &messages, bot, chat_id).await {
Ok(response) => {
if !response.is_empty() {
messages
.push(serde_json::json!({"role": "assistant", "content": &response}));
}
state.conversations.write().await.insert(sid.clone(), messages);
}
Err(e) => {
error!(%sid, "openai: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
}
} }
} }
@@ -426,7 +486,7 @@ fn extract_tool_use(msg: &AssistantMessage) -> Option<String> {
None None
} }
const EDIT_INTERVAL_MS: u64 = 3000; const EDIT_INTERVAL_MS: u64 = 2000;
const DRAFT_INTERVAL_MS: u64 = 1000; const DRAFT_INTERVAL_MS: u64 = 1000;
const TG_MSG_LIMIT: usize = 4096; const TG_MSG_LIMIT: usize = 4096;
@@ -552,7 +612,12 @@ async fn run_claude_streaming(
(text.clone(), Some(text)) (text.clone(), Some(text))
}; };
let display = truncate_for_display(&display_raw); let display = if use_draft {
// draft mode: no cursor — cursor breaks monotonic text growth
truncate_at_char_boundary(&display_raw, TG_MSG_LIMIT).to_string()
} else {
truncate_for_display(&display_raw)
};
if use_draft { if use_draft {
match send_message_draft( match send_message_draft(
@@ -650,24 +715,134 @@ async fn run_claude_streaming(
return Ok(final_result); return Ok(final_result);
} }
// final result: send as real message(s) — draft auto-disappears send_final_result(bot, chat_id, msg_id, use_draft, &final_result).await;
let chunks: Vec<&str> = split_msg(&final_result, TG_MSG_LIMIT);
if !use_draft && msg_id.is_some() { Ok(final_result)
// edit mode: replace streaming message with final text }
let id = msg_id.unwrap();
let _ = bot.edit_message_text(chat_id, id, chunks[0]).await; // ── openai-compatible backend (streaming) ──────────────────────────
for chunk in &chunks[1..] {
let _ = bot.send_message(chat_id, *chunk).await; async fn run_openai_streaming(
endpoint: &str,
model: &str,
api_key: &str,
messages: &[serde_json::Value],
bot: &Bot,
chat_id: ChatId,
) -> Result<String> {
let client = reqwest::Client::new();
let url = format!("{}/chat/completions", endpoint.trim_end_matches('/'));
let body = serde_json::json!({
"model": model,
"messages": messages,
"stream": true,
});
let mut resp = client
.post(&url)
.header("Authorization", format!("Bearer {api_key}"))
.json(&body)
.send()
.await?
.error_for_status()?;
let token = bot.token().to_owned();
let raw_chat_id = chat_id.0;
let draft_id: i64 = 1;
let mut use_draft = true;
let mut msg_id: Option<teloxide::types::MessageId> = None;
let mut accumulated = String::new();
let mut last_edit = Instant::now();
let mut buffer = String::new();
let mut done = false;
while let Some(chunk) = resp.chunk().await? {
if done {
break;
} }
} else { buffer.push_str(&String::from_utf8_lossy(&chunk));
// draft mode or no existing message: sendMessage replaces the draft
for chunk in &chunks { while let Some(pos) = buffer.find('\n') {
let _ = bot.send_message(chat_id, *chunk).await; let line = buffer[..pos].to_string();
buffer = buffer[pos + 1..].to_string();
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with(':') {
continue;
}
let data = match trimmed.strip_prefix("data: ") {
Some(d) => d,
None => continue,
};
if data.trim() == "[DONE]" {
done = true;
break;
}
if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
if let Some(content) = json["choices"][0]["delta"]["content"].as_str() {
if content.is_empty() {
continue;
}
accumulated.push_str(content);
let interval = if use_draft {
DRAFT_INTERVAL_MS
} else {
EDIT_INTERVAL_MS
};
if last_edit.elapsed().as_millis() < interval as u128 {
continue;
}
let display = if use_draft {
truncate_at_char_boundary(&accumulated, TG_MSG_LIMIT).to_string()
} else {
truncate_for_display(&accumulated)
};
if use_draft {
match send_message_draft(
&client, &token, raw_chat_id, draft_id, &display,
)
.await
{
Ok(_) => {
last_edit = Instant::now();
}
Err(e) => {
warn!("sendMessageDraft failed, falling back: {e:#}");
use_draft = false;
if let Ok(sent) = bot.send_message(chat_id, &display).await {
msg_id = Some(sent.id);
last_edit = Instant::now();
}
}
}
} else if let Some(id) = msg_id {
if bot.edit_message_text(chat_id, id, &display).await.is_ok() {
last_edit = Instant::now();
}
} else if let Ok(sent) = bot.send_message(chat_id, &display).await {
msg_id = Some(sent.id);
last_edit = Instant::now();
}
}
}
} }
} }
Ok(final_result) if accumulated.is_empty() {
return Ok(accumulated);
}
send_final_result(bot, chat_id, msg_id, use_draft, &accumulated).await;
Ok(accumulated)
} }
const CURSOR: &str = " \u{25CE}"; const CURSOR: &str = " \u{25CE}";
@@ -693,6 +868,121 @@ fn truncate_at_char_boundary(s: &str, max: usize) -> &str {
&s[..end] &s[..end]
} }
fn escape_html(s: &str) -> String {
s.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
.replace('"', "&quot;")
}
fn markdown_to_telegram_html(md: &str) -> String {
use pulldown_cmark::{CodeBlockKind, Event, Options, Parser, Tag, TagEnd};
let mut opts = Options::empty();
opts.insert(Options::ENABLE_STRIKETHROUGH);
let parser = Parser::new_ext(md, opts);
let mut html = String::new();
for event in parser {
match event {
Event::Start(tag) => match tag {
Tag::Paragraph => {}
Tag::Heading { .. } => html.push_str("<b>"),
Tag::BlockQuote(_) => html.push_str("<blockquote>"),
Tag::CodeBlock(kind) => match kind {
CodeBlockKind::Fenced(ref lang) if !lang.is_empty() => {
html.push_str(&format!(
"<pre><code class=\"language-{}\">",
escape_html(lang.as_ref())
));
}
_ => html.push_str("<pre><code>"),
},
Tag::Item => html.push_str(""),
Tag::Emphasis => html.push_str("<i>"),
Tag::Strong => html.push_str("<b>"),
Tag::Strikethrough => html.push_str("<s>"),
Tag::Link { dest_url, .. } => {
html.push_str(&format!(
"<a href=\"{}\">",
escape_html(dest_url.as_ref())
));
}
_ => {}
},
Event::End(tag) => match tag {
TagEnd::Paragraph => html.push_str("\n\n"),
TagEnd::Heading(_) => html.push_str("</b>\n\n"),
TagEnd::BlockQuote(_) => html.push_str("</blockquote>"),
TagEnd::CodeBlock => html.push_str("</code></pre>\n\n"),
TagEnd::List(_) => html.push('\n'),
TagEnd::Item => html.push('\n'),
TagEnd::Emphasis => html.push_str("</i>"),
TagEnd::Strong => html.push_str("</b>"),
TagEnd::Strikethrough => html.push_str("</s>"),
TagEnd::Link => html.push_str("</a>"),
_ => {}
},
Event::Text(text) => html.push_str(&escape_html(text.as_ref())),
Event::Code(text) => {
html.push_str("<code>");
html.push_str(&escape_html(text.as_ref()));
html.push_str("</code>");
}
Event::SoftBreak | Event::HardBreak => html.push('\n'),
Event::Rule => html.push_str("\n---\n\n"),
_ => {}
}
}
html.trim_end().to_string()
}
/// Send final result with HTML formatting, fallback to plain text on failure.
async fn send_final_result(
bot: &Bot,
chat_id: ChatId,
msg_id: Option<teloxide::types::MessageId>,
use_draft: bool,
result: &str,
) {
use teloxide::types::ParseMode;
let html = markdown_to_telegram_html(result);
// try HTML as single message
let html_ok = if !use_draft && msg_id.is_some() {
bot.edit_message_text(chat_id, msg_id.unwrap(), &html)
.parse_mode(ParseMode::Html)
.await
.is_ok()
} else {
bot.send_message(chat_id, &html)
.parse_mode(ParseMode::Html)
.await
.is_ok()
};
if html_ok {
return;
}
// fallback: plain text with chunking
let chunks = split_msg(result, TG_MSG_LIMIT);
if !use_draft && msg_id.is_some() {
let id = msg_id.unwrap();
let _ = bot.edit_message_text(chat_id, id, chunks[0]).await;
for chunk in &chunks[1..] {
let _ = bot.send_message(chat_id, *chunk).await;
}
} else {
for chunk in &chunks {
let _ = bot.send_message(chat_id, *chunk).await;
}
}
}
fn split_msg(s: &str, max: usize) -> Vec<&str> { fn split_msg(s: &str, max: usize) -> Vec<&str> {
if s.len() <= max { if s.len() <= max {
return vec![s]; return vec![s];