add streaming responses, file transfer, remote deploy

- Streaming: use claude --output-format stream-json, edit TG message
  every 5s with progress, show tool use status during execution,
  ◎ cursor indicator while processing
- File transfer: download user uploads to ~/incoming/, scan
  ~/outgoing/{sid}/ for new files after claude completes
- Error handling: wrap post-auth logic in handle_inner, all errors
  reply to user instead of silently failing
- Remote deploy: make deploy-hera via SSH, generate service from
  template with dynamic PATH/REPO
- Service: binary installed to ~/bin/noc, WorkingDirectory=%h
- Invoke claude directly instead of ms wrapper
- Session state persisted to disk across restarts
This commit is contained in:
Fam Zheng
2026-04-05 08:20:32 +01:00
parent db8ff94f7c
commit 4d88e80f1c
5 changed files with 402 additions and 44 deletions

1
.gitignore vendored
View File

@@ -1,4 +1,5 @@
/target
config.yaml
config.hera.yaml
state.json
noc.service

View File

@@ -1,6 +1,8 @@
REPO := $(shell pwd)
HERA := heradev
HERA_DIR := noc
.PHONY: build deploy
.PHONY: build deploy deploy-hera
build:
cargo build --release
@@ -9,8 +11,23 @@ noc.service: noc.service.in
sed -e 's|@REPO@|$(REPO)|g' -e 's|@PATH@|$(PATH)|g' $< > $@
deploy: build noc.service
mkdir -p ~/.config/systemd/user
mkdir -p ~/bin ~/.config/systemd/user
systemctl --user stop noc 2>/dev/null || true
cp target/release/noc ~/bin/
cp noc.service ~/.config/systemd/user/
systemctl --user daemon-reload
systemctl --user enable --now noc
systemctl --user restart noc
deploy-hera: build
ssh $(HERA) 'mkdir -p ~/bin ~/$(HERA_DIR) ~/.config/systemd/user && systemctl --user stop noc 2>/dev/null || true'
scp target/release/noc $(HERA):~/bin/
scp config.hera.yaml noc.service.in $(HERA):~/$(HERA_DIR)/
ssh $(HERA) 'bash -lc "\
cd ~/$(HERA_DIR) \
&& mv -f config.hera.yaml config.yaml \
&& sed -e \"s|@REPO@|\$$HOME/$(HERA_DIR)|g\" -e \"s|@PATH@|\$$PATH|g\" noc.service.in > ~/.config/systemd/user/noc.service \
&& systemctl --user daemon-reload \
&& systemctl --user enable --now noc \
&& systemctl --user restart noc \
&& systemctl --user status noc"'

View File

@@ -1,8 +1,10 @@
# TODO
- [ ] Streaming responses — edit message as `ms` output arrives instead of waiting for full completion
- [ ] Markdown formatting — parse `ms` output and send with TG MarkdownV2
- [ ] Timeout handling — kill `ms` if it hangs beyond a threshold
- [ ] Streaming responses — edit message as claude output arrives instead of waiting for full completion
- [ ] Markdown formatting — parse claude output and send with TG MarkdownV2
- [ ] Timeout handling — kill claude if it hangs beyond a threshold
- [ ] Graceful shutdown on SIGTERM
- [ ] `/reset` command to force new session without waiting for 5am
- [ ] Rate limiting per chat
- [ ] Voice message support — STT (whisper.cpp) → text → claude
- [ ] Video/audio file transcription

View File

@@ -6,7 +6,7 @@ Wants=network-online.target
[Service]
Type=simple
WorkingDirectory=%h
ExecStart=@REPO@/target/release/noc
ExecStart=%h/bin/noc
Restart=on-failure
RestartSec=5
Environment=RUST_LOG=noc=info

View File

@@ -1,5 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;
@@ -7,10 +7,13 @@ use anyhow::Result;
use chrono::{Local, NaiveDate, NaiveTime};
use serde::{Deserialize, Serialize};
use teloxide::dispatching::UpdateFilterExt;
use teloxide::net::Download;
use teloxide::prelude::*;
use teloxide::types::ChatAction;
use teloxide::types::InputFile;
use tokio::io::AsyncBufReadExt;
use tokio::process::Command;
use tokio::sync::RwLock;
use tokio::time::Instant;
use tracing::{error, info, warn};
use uuid::Uuid;
@@ -92,6 +95,18 @@ fn session_uuid(chat_id: i64, refresh_hour: u32) -> String {
Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()).to_string()
}
fn home_dir() -> PathBuf {
PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/tmp".into()))
}
fn incoming_dir() -> PathBuf {
home_dir().join("incoming")
}
fn outgoing_dir(sid: &str) -> PathBuf {
home_dir().join("outgoing").join(sid)
}
// ── main ────────────────────────────────────────────────────────────
#[tokio::main]
@@ -114,6 +129,8 @@ async fn main() {
.unwrap_or_else(|_| PathBuf::from("state.json"));
let state = Arc::new(AppState::load(state_path));
let _ = std::fs::create_dir_all(incoming_dir());
info!("noc bot starting");
let bot = Bot::new(&config.tg.key);
@@ -127,6 +144,50 @@ async fn main() {
.await;
}
// ── file download ───────────────────────────────────────────────────
async fn download_tg_file(bot: &Bot, file_id: &str, filename: &str) -> Result<PathBuf> {
let dir = incoming_dir();
tokio::fs::create_dir_all(&dir).await?;
let dest = dir.join(filename);
let tf = bot.get_file(file_id).await?;
let mut file = tokio::fs::File::create(&dest).await?;
bot.download_file(&tf.path, &mut file).await?;
info!("downloaded {} -> {}", filename, dest.display());
Ok(dest)
}
// ── outgoing scan ───────────────────────────────────────────────────
async fn snapshot_dir(dir: &Path) -> HashSet<PathBuf> {
let mut set = HashSet::new();
if let Ok(mut entries) = tokio::fs::read_dir(dir).await {
while let Ok(Some(entry)) = entries.next_entry().await {
let path = entry.path();
if path.is_file() {
set.insert(path);
}
}
}
set
}
async fn new_files_in(dir: &Path, before: &HashSet<PathBuf>) -> Vec<PathBuf> {
let mut files = Vec::new();
if let Ok(mut entries) = tokio::fs::read_dir(dir).await {
while let Ok(Some(entry)) = entries.next_entry().await {
let path = entry.path();
if path.is_file() && !before.contains(&path) {
files.push(path);
}
}
}
files.sort();
files
}
// ── handler ─────────────────────────────────────────────────────────
async fn handle(
@@ -135,16 +196,11 @@ async fn handle(
state: Arc<AppState>,
config: Arc<Config>,
) -> ResponseResult<()> {
let text = match msg.text() {
Some(t) => t,
None => return Ok(()),
};
let chat_id = msg.chat.id;
let text = msg.text().or(msg.caption()).unwrap_or("").to_string();
let raw_id = chat_id.0;
let date = session_date(config.session.refresh_hour);
// auth gate
let is_authed = {
let p = state.persist.read().await;
p.authed.get(&raw_id) == Some(&date)
@@ -165,76 +221,358 @@ async fn handle(
return Ok(());
}
let sid = session_uuid(raw_id, config.session.refresh_hour);
if let Err(e) = handle_inner(&bot, &msg, chat_id, &text, &state, &config).await {
error!(chat = raw_id, "handle: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
Ok(())
}
async fn handle_inner(
bot: &Bot,
msg: &Message,
chat_id: ChatId,
text: &str,
state: &Arc<AppState>,
config: &Arc<Config>,
) -> Result<()> {
let mut uploaded: Vec<PathBuf> = Vec::new();
let mut download_errors: Vec<String> = Vec::new();
if let Some(doc) = msg.document() {
let name = doc.file_name.as_deref().unwrap_or("file");
match download_tg_file(bot, &doc.file.id, name).await {
Ok(p) => uploaded.push(p),
Err(e) => download_errors.push(format!("{name}: {e:#}")),
}
}
if let Some(photos) = msg.photo() {
if let Some(photo) = photos.last() {
let name = format!("photo_{}.jpg", Local::now().format("%H%M%S"));
match download_tg_file(bot, &photo.file.id, &name).await {
Ok(p) => uploaded.push(p),
Err(e) => download_errors.push(format!("photo: {e:#}")),
}
}
}
if text.is_empty() && uploaded.is_empty() {
if !download_errors.is_empty() {
let err_msg = format!("[文件下载失败]\n{}", download_errors.join("\n"));
bot.send_message(chat_id, err_msg).await?;
}
return Ok(());
}
let sid = session_uuid(chat_id.0, config.session.refresh_hour);
info!(%sid, "recv");
let _ = bot.send_chat_action(chat_id, ChatAction::Typing).await;
let out_dir = outgoing_dir(&sid);
tokio::fs::create_dir_all(&out_dir).await?;
let before = snapshot_dir(&out_dir).await;
let prompt = build_prompt(text, &uploaded, &download_errors, &out_dir);
let known = state.persist.read().await.known_sessions.contains(&sid);
let reply = match invoke_claude(&sid, text, known).await {
Ok(out) => {
let result = invoke_claude_streaming(&sid, &prompt, known, bot, chat_id).await;
match &result {
Ok(_) => {
if !known {
state.persist.write().await.known_sessions.insert(sid.clone());
state.save().await;
}
out
}
Err(e) => {
error!(%sid, "claude: {e:#}");
format!("[error] {e:#}")
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
};
if reply.is_empty() {
return Ok(());
}
for chunk in split_msg(&reply, 4096) {
bot.send_message(chat_id, chunk).await?;
// send new files from outgoing dir
let new_files = new_files_in(&out_dir, &before).await;
for path in &new_files {
info!(%sid, "sending file: {}", path.display());
if let Err(e) = bot.send_document(chat_id, InputFile::file(path)).await {
error!(%sid, "send_document {}: {e:#}", path.display());
let _ = bot
.send_message(chat_id, format!("[发送文件失败: {}]", path.display()))
.await;
}
}
Ok(())
}
// ── claude bridge ───────────────────────────────────────────────────
fn build_prompt(text: &str, uploaded: &[PathBuf], errors: &[String], out_dir: &Path) -> String {
let mut parts = Vec::new();
async fn invoke_claude(sid: &str, prompt: &str, known: bool) -> Result<String> {
if known {
return run_claude(&["--resume", sid], prompt).await;
for f in uploaded {
parts.push(format!("[用户上传了文件: {}]", f.display()));
}
// session might exist from before restart — try resume first
match run_claude(&["--resume", sid], prompt).await {
for e in errors {
parts.push(format!("[文件下载失败: {e}]"));
}
if !text.is_empty() {
parts.push(text.to_string());
}
parts.push(format!(
"\n[系统提示: 如果需要发送文件给用户,将文件写入 {} 目录]",
out_dir.display()
));
parts.join("\n")
}
// ── claude bridge (streaming) ───────────────────────────────────────
/// Stream JSON event types we care about.
#[derive(Deserialize)]
struct StreamEvent {
#[serde(rename = "type")]
event_type: String,
message: Option<AssistantMessage>,
result: Option<String>,
#[serde(default)]
is_error: bool,
}
#[derive(Deserialize)]
struct AssistantMessage {
content: Vec<ContentBlock>,
}
#[derive(Deserialize)]
struct ContentBlock {
#[serde(rename = "type")]
block_type: String,
text: Option<String>,
name: Option<String>,
input: Option<serde_json::Value>,
}
/// Extract all text from an assistant message's content blocks.
fn extract_text(msg: &AssistantMessage) -> String {
msg.content
.iter()
.filter(|b| b.block_type == "text")
.filter_map(|b| b.text.as_deref())
.collect::<Vec<_>>()
.join("")
}
/// Extract tool use status line, e.g. "Bash: echo hello"
fn extract_tool_use(msg: &AssistantMessage) -> Option<String> {
for block in &msg.content {
if block.block_type == "tool_use" {
let name = block.name.as_deref().unwrap_or("tool");
let detail = block
.input
.as_ref()
.and_then(|v| {
// try common fields: command, pattern, file_path, query
v.get("command")
.or(v.get("pattern"))
.or(v.get("file_path"))
.or(v.get("query"))
.or(v.get("prompt"))
.and_then(|s| s.as_str())
})
.unwrap_or("");
let detail_short = truncate_at_char_boundary(detail, 80);
return Some(format!("{name}: {detail_short}"));
}
}
None
}
const EDIT_INTERVAL_MS: u64 = 5000;
const TG_MSG_LIMIT: usize = 4096;
async fn invoke_claude_streaming(
sid: &str,
prompt: &str,
known: bool,
bot: &Bot,
chat_id: ChatId,
) -> Result<String> {
if known {
return run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await;
}
match run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await {
Ok(out) => {
info!(%sid, "resumed existing session");
Ok(out)
}
Err(e) => {
warn!(%sid, "resume failed ({e:#}), creating new session");
run_claude(&["--session-id", sid], prompt).await
run_claude_streaming(&["--session-id", sid], prompt, bot, chat_id).await
}
}
}
async fn run_claude(extra_args: &[&str], prompt: &str) -> Result<String> {
let mut args: Vec<&str> = vec!["--dangerously-skip-permissions", "-p"];
async fn run_claude_streaming(
extra_args: &[&str],
prompt: &str,
bot: &Bot,
chat_id: ChatId,
) -> Result<String> {
let mut args: Vec<&str> = vec![
"--dangerously-skip-permissions",
"-p",
"--output-format",
"stream-json",
"--verbose",
];
args.extend(extra_args);
args.push(prompt);
let out = Command::new("claude")
let mut child = Command::new("claude")
.args(&args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?
.wait_with_output()
.await?;
.spawn()?;
if out.status.success() {
Ok(String::from_utf8_lossy(&out.stdout).to_string())
} else {
let err = String::from_utf8_lossy(&out.stderr);
anyhow::bail!("exit {}: {err}", out.status)
let stdout = child.stdout.take().unwrap();
let mut lines = tokio::io::BufReader::new(stdout).lines();
// send placeholder immediately so user knows we're on it
let mut msg_id: Option<teloxide::types::MessageId> = match bot.send_message(chat_id, CURSOR).await {
Ok(sent) => Some(sent.id),
Err(_) => None,
};
let mut last_sent_text = String::new();
let mut last_edit = Instant::now();
let mut final_result = String::new();
let mut is_error = false;
let mut tool_status = String::new(); // current tool use status line
while let Ok(Some(line)) = lines.next_line().await {
let event: StreamEvent = match serde_json::from_str(&line) {
Ok(e) => e,
Err(_) => continue,
};
match event.event_type.as_str() {
"assistant" => {
if let Some(msg) = &event.message {
// check for tool use — show status
if let Some(status) = extract_tool_use(msg) {
tool_status = format!("[{status}]");
let display = if last_sent_text.is_empty() {
tool_status.clone()
} else {
format!("{last_sent_text}\n\n{tool_status}")
};
let display = truncate_for_display(&display);
if let Some(id) = msg_id {
let _ = bot.edit_message_text(chat_id, id, &display).await;
} else if let Ok(sent) = bot.send_message(chat_id, &display).await {
msg_id = Some(sent.id);
}
last_edit = Instant::now();
continue;
}
// check for text content
let text = extract_text(msg);
if text.is_empty() || text == last_sent_text {
continue;
}
// throttle edits
if last_edit.elapsed().as_millis() < EDIT_INTERVAL_MS as u128 {
continue;
}
tool_status.clear();
let display = truncate_for_display(&text);
if let Some(id) = msg_id {
if bot.edit_message_text(chat_id, id, &display).await.is_ok() {
last_sent_text = text;
last_edit = Instant::now();
}
} else if let Ok(sent) = bot.send_message(chat_id, &display).await {
msg_id = Some(sent.id);
last_sent_text = text;
last_edit = Instant::now();
}
}
}
"result" => {
final_result = event.result.unwrap_or_default();
is_error = event.is_error;
}
_ => {}
}
}
let _ = child.wait().await;
if is_error {
// clean up streaming message if we sent one
if let Some(id) = msg_id {
let _ = bot
.edit_message_text(chat_id, id, format!("[error] {final_result}"))
.await;
}
anyhow::bail!("{final_result}");
}
if final_result.is_empty() {
return Ok(final_result);
}
// final update: replace streaming message with complete result
let chunks: Vec<&str> = split_msg(&final_result, TG_MSG_LIMIT);
if let Some(id) = msg_id {
// edit first message with final text
let _ = bot.edit_message_text(chat_id, id, chunks[0]).await;
// send remaining chunks as new messages
for chunk in &chunks[1..] {
let _ = bot.send_message(chat_id, *chunk).await;
}
} else {
// never got to send a streaming message, send all now
for chunk in &chunks {
let _ = bot.send_message(chat_id, *chunk).await;
}
}
Ok(final_result)
}
const CURSOR: &str = " \u{25CE}";
fn truncate_for_display(s: &str) -> String {
let budget = TG_MSG_LIMIT - CURSOR.len() - 1;
if s.len() <= budget {
format!("{s}{CURSOR}")
} else {
let truncated = truncate_at_char_boundary(s, budget - 2);
format!("{truncated}\n{CURSOR}")
}
}
fn truncate_at_char_boundary(s: &str, max: usize) -> &str {
if s.len() <= max {
return s;
}
let mut end = max;
while !s.is_char_boundary(end) {
end -= 1;
}
&s[..end]
}
fn split_msg(s: &str, max: usize) -> Vec<&str> {