Compare commits

..

10 Commits

Author SHA1 Message Date
Fam Zheng
c1fd2829dd add emotional system: auto-reflection, inner_state seeding, instance isolation
- doc/heart.md: emotional system design (motivation, reflection, relationship memory)
- Auto-reflection: every 10 messages, async LLM call updates inner_state
  with feelings and understanding changes (not conversation summary)
- Life Loop emotional motivation: "you care, not because timer fired"
- Remove all instance-specific names from code/docs — persona, name,
  memories are instance data (SQLite), not code
- Rewrite doc/life.md and doc/todo.md for instance isolation principle
2026-04-09 21:23:39 +01:00
Fam Zheng
c7fd5460a3 doc: inject motivation and design values into todo.md 2026-04-09 21:08:03 +01:00
Fam Zheng
0b42f22f0f add update_inner_state tool, life loop with tools, timeout protection
- update_inner_state: LLM can update its own persistent inner state
- inner_state injected into chat loop system prompt (read-only)
- Life Loop now uses run_openai_with_tools (full tool access)
- Life Loop LLM calls wrapped in 120s tokio::time::timeout
- All reqwest clients: 120s timeout (whisper: 60s)
- doc/life.md: life loop architecture design doc
- todo.md: removed completed items
2026-04-09 21:06:43 +01:00
Fam Zheng
c3eb13dad3 refactor: split main.rs into 7 modules, add life loop with timer system
Structure:
  main.rs (534)   — entry, handler, prompt building
  config.rs (52)  — config structs
  state.rs (358)  — AppState, SQLite, persistence
  tools.rs (665)  — tool definitions, execution, subagent management
  stream.rs (776) — OpenAI/Claude streaming, system prompt
  display.rs (220)— markdown rendering, message formatting
  life.rs (87)    — life loop heartbeat, timer firing

New features:
- Life Loop: background tokio task, 30s heartbeat, scans timers table
- Timer tools: set_timer (relative/absolute/cron), list_timers, cancel_timer
- inner_state table for life loop's own context
- cron crate for recurring schedule parsing

Zero logic changes in the refactor — pure structural split.
2026-04-09 20:28:54 +01:00
Fam Zheng
ec1bd7cb25 add gen_voice tool, message timestamps, image multimodal, group chat, whisper STT
- gen_voice: IndexTTS2 voice cloning via tools/gen_voice script, ref audio
  cached on server to avoid re-upload
- Message timestamps: created_at column in messages table, prepended to
  content in API calls so LLM sees message times
- Image understanding: photos converted to base64 multimodal content
  for vision-capable models
- Group chat: independent session contexts per chat_id, sendMessageDraft
  disabled in groups (private chat only)
- Voice transcription: whisper service integration, transcribed text
  injected as [语音消息] prefix
- Integration tests marked #[ignore] (require external services)
- Reference voice asset: assets/ref_voice.mp3
- .gitignore: target/, noc.service, config/state/db files
2026-04-09 20:12:15 +01:00
Fam Zheng
9d5dd4eb16 add cc passthrough, diag tools dump, and search guidance in system prompt
- "cc" prefix messages bypass LLM backend and history, directly invoke claude -p
- diag command now dumps all registered tools and sends as .md file
- system prompt instructs LLM to use spawn_agent for search tasks
- spawn_agent tool description updated to mention search/browser capabilities
2026-04-09 17:59:48 +01:00
Fam Zheng
128f2481c0 add tool calling, SQLite persistence, group chat, image vision, voice transcription
Major features:
- OpenAI function calling with tool call loop (streaming SSE parsing)
- Built-in tools: spawn_agent (async claude -p), agent_status, kill_agent,
  update_scratch, send_file
- Script-based tool discovery: tools/ dir with --schema convention
- Feishu todo management script (tools/manage_todo)
- SQLite persistence: conversations, messages, config, scratch_area tables
- Sliding window context (100 msgs, slide 50, auto-summarize)
- Conversation summary generation via LLM on window slide
- Group chat support with independent session contexts
- Image understanding: multimodal vision input (base64 to API)
- Voice transcription via faster-whisper Docker service
- Configurable persona stored in DB
- diag command for session diagnostics
- System prompt restructured: persona + tool instructions separated
- RUST_BACKTRACE=1 in service, clippy in deploy pipeline
- .gitignore for config/state/db files
2026-04-09 16:38:28 +01:00
Fam Zheng
84ba209b3f add OpenAI-compatible backend, markdown rendering, and sendMessageDraft fix
- Configurable backend: claude (CLI) or openai (API), selected in config.yaml
- OpenAI streaming via SSE with conversation history in memory
- Session isolation: config name included in session UUID
- Markdown to Telegram HTML conversion (pulldown-cmark) for final messages
- Fix sendMessageDraft: skip cursor to preserve monotonic text growth,
  skip empty content chunks from SSE stream
- Simplify Makefile: single deploy target
2026-04-09 10:23:50 +01:00
Fam Zheng
eba7d89006 use sendMessageDraft for native streaming output, fallback to editMessageText
Telegram Bot API 9.3+ sendMessageDraft provides smooth streaming text
rendering without the flickering of repeated edits. Falls back to
editMessageText automatically if the API is unavailable (e.g. older
clients or group chats). Also reduces edit interval from 5s to 3s and
uses 1s interval for draft mode.
2026-04-09 09:35:55 +01:00
Fam Zheng
765ff2c51d support audio/voice/video/video_note media types and improve subprocess error diagnostics 2026-04-09 09:15:46 +01:00
19 changed files with 4059 additions and 389 deletions

7
.gitignore vendored
View File

@@ -1,5 +1,8 @@
/target
config.yaml
config.hera.yaml
config.*.yaml
state.json
state.*.json
*.db
target/
noc.service

609
Cargo.lock generated
View File

@@ -2,6 +2,18 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "ahash"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "aho-corasick"
version = "1.1.4"
@@ -39,6 +51,12 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "autocfg"
version = "1.5.0"
@@ -51,6 +69,12 @@ version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
[[package]]
name = "base64"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bitflags"
version = "1.3.2"
@@ -137,6 +161,18 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "cron"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "089df96cf6a25253b4b6b6744d86f91150a3d4df546f31a95def47976b8cba97"
dependencies = [
"chrono",
"once_cell",
"phf",
"winnow",
]
[[package]]
name = "darling"
version = "0.13.4"
@@ -246,6 +282,18 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "fallible-iterator"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
[[package]]
name = "fallible-streaming-iterator"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fastrand"
version = "2.4.0"
@@ -382,6 +430,26 @@ dependencies = [
"slab",
]
[[package]]
name = "getopts"
version = "0.2.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe4fbac503b8d1f88e6676011885f34b7174f46e59956bba534ba83abded4df"
dependencies = [
"unicode-width",
]
[[package]]
name = "getrandom"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "getrandom"
version = "0.4.2"
@@ -406,7 +474,7 @@ dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"http",
"http 0.2.12",
"indexmap",
"slab",
"tokio",
@@ -414,6 +482,34 @@ dependencies = [
"tracing",
]
[[package]]
name = "h2"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54"
dependencies = [
"atomic-waker",
"bytes",
"fnv",
"futures-core",
"futures-sink",
"http 1.4.0",
"indexmap",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash",
]
[[package]]
name = "hashbrown"
version = "0.15.5"
@@ -429,6 +525,15 @@ version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
[[package]]
name = "hashlink"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af"
dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "heck"
version = "0.4.1"
@@ -452,6 +557,16 @@ dependencies = [
"itoa",
]
[[package]]
name = "http"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a"
dependencies = [
"bytes",
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.6"
@@ -459,7 +574,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
dependencies = [
"bytes",
"http",
"http 0.2.12",
"pin-project-lite",
]
[[package]]
name = "http-body"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
dependencies = [
"bytes",
"http 1.4.0",
]
[[package]]
name = "http-body-util"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a"
dependencies = [
"bytes",
"futures-core",
"http 1.4.0",
"http-body 1.0.1",
"pin-project-lite",
]
@@ -485,9 +623,9 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"h2 0.3.27",
"http 0.2.12",
"http-body 0.4.6",
"httparse",
"httpdate",
"itoa",
@@ -499,6 +637,43 @@ dependencies = [
"want",
]
[[package]]
name = "hyper"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca"
dependencies = [
"atomic-waker",
"bytes",
"futures-channel",
"futures-core",
"h2 0.4.13",
"http 1.4.0",
"http-body 1.0.1",
"httparse",
"itoa",
"pin-project-lite",
"smallvec",
"tokio",
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.27.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58"
dependencies = [
"http 1.4.0",
"hyper 1.9.0",
"hyper-util",
"rustls",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tower-service",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
@@ -506,12 +681,53 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper",
"hyper 0.14.32",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "hyper-tls"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0"
dependencies = [
"bytes",
"http-body-util",
"hyper 1.9.0",
"hyper-util",
"native-tls",
"tokio",
"tokio-native-tls",
"tower-service",
]
[[package]]
name = "hyper-util"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0"
dependencies = [
"base64 0.22.1",
"bytes",
"futures-channel",
"futures-util",
"http 1.4.0",
"http-body 1.0.1",
"hyper 1.9.0",
"ipnet",
"libc",
"percent-encoding",
"pin-project-lite",
"socket2 0.6.3",
"system-configuration 0.7.0",
"tokio",
"tower-service",
"tracing",
"windows-registry",
]
[[package]]
name = "iana-time-zone"
version = "0.1.65"
@@ -669,6 +885,16 @@ version = "2.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2"
[[package]]
name = "iri-string"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25e659a4bb38e810ebc252e53b5814ff908a8c58c2a9ce2fae1bbec24cbf4e20"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "itertools"
version = "0.9.0"
@@ -714,6 +940,17 @@ version = "0.2.184"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af"
[[package]]
name = "libsqlite3-sys"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "linux-raw-sys"
version = "0.12.1"
@@ -811,8 +1048,14 @@ name = "noc"
version = "0.1.0"
dependencies = [
"anyhow",
"base64 0.22.1",
"chrono",
"cron",
"dptree",
"libc",
"pulldown-cmark",
"reqwest 0.12.28",
"rusqlite",
"serde",
"serde_json",
"serde_yaml",
@@ -920,6 +1163,48 @@ version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "phf"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078"
dependencies = [
"phf_macros",
"phf_shared",
]
[[package]]
name = "phf_generator"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d"
dependencies = [
"phf_shared",
"rand",
]
[[package]]
name = "phf_macros"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f84ac04429c13a7ff43785d75ad27569f2951ce0ffd30a3321230db2fc727216"
dependencies = [
"phf_generator",
"phf_shared",
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "phf_shared"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5"
dependencies = [
"siphasher",
]
[[package]]
name = "pin-project"
version = "1.1.11"
@@ -1004,6 +1289,25 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "pulldown-cmark"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f86ba2052aebccc42cbbb3ed234b8b13ce76f75c3551a303cb2bcffcff12bb14"
dependencies = [
"bitflags 2.11.0",
"getopts",
"memchr",
"pulldown-cmark-escape",
"unicase",
]
[[package]]
name = "pulldown-cmark-escape"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "007d8adb5ddab6f8e3f491ac63566a7d5002cc7ed73901f72057943fa71ae1ae"
[[package]]
name = "quote"
version = "1.0.45"
@@ -1019,6 +1323,21 @@ version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf"
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
[[package]]
name = "rc-box"
version = "1.3.0"
@@ -1060,16 +1379,16 @@ version = "0.11.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62"
dependencies = [
"base64",
"base64 0.21.7",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-tls",
"h2 0.3.27",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.32",
"hyper-tls 0.5.0",
"ipnet",
"js-sys",
"log",
@@ -1083,8 +1402,8 @@ dependencies = [
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper",
"system-configuration",
"sync_wrapper 0.1.2",
"system-configuration 0.5.1",
"tokio",
"tokio-native-tls",
"tokio-util",
@@ -1097,6 +1416,76 @@ dependencies = [
"winreg",
]
[[package]]
name = "reqwest"
version = "0.12.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147"
dependencies = [
"base64 0.22.1",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2 0.4.13",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"hyper 1.9.0",
"hyper-rustls",
"hyper-tls 0.6.0",
"hyper-util",
"js-sys",
"log",
"mime",
"mime_guess",
"native-tls",
"percent-encoding",
"pin-project-lite",
"rustls-pki-types",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper 1.0.2",
"tokio",
"tokio-native-tls",
"tower",
"tower-http",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "ring"
version = "0.17.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
dependencies = [
"cc",
"cfg-if",
"getrandom 0.2.17",
"libc",
"untrusted",
"windows-sys 0.52.0",
]
[[package]]
name = "rusqlite"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e"
dependencies = [
"bitflags 2.11.0",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink",
"libsqlite3-sys",
"smallvec",
]
[[package]]
name = "rustc_version"
version = "0.4.1"
@@ -1119,13 +1508,46 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "rustls"
version = "0.23.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4"
dependencies = [
"once_cell",
"rustls-pki-types",
"rustls-webpki",
"subtle",
"zeroize",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c"
dependencies = [
"base64",
"base64 0.21.7",
]
[[package]]
name = "rustls-pki-types"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd"
dependencies = [
"zeroize",
]
[[package]]
name = "rustls-webpki"
version = "0.103.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef"
dependencies = [
"ring",
"rustls-pki-types",
"untrusted",
]
[[package]]
@@ -1295,6 +1717,12 @@ dependencies = [
"libc",
]
[[package]]
name = "siphasher"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e"
[[package]]
name = "slab"
version = "0.4.12"
@@ -1339,6 +1767,12 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "subtle"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "1.0.109"
@@ -1367,6 +1801,15 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "sync_wrapper"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
dependencies = [
"futures-core",
]
[[package]]
name = "synstructure"
version = "0.13.2"
@@ -1386,7 +1829,18 @@ checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7"
dependencies = [
"bitflags 1.3.2",
"core-foundation 0.9.4",
"system-configuration-sys",
"system-configuration-sys 0.5.0",
]
[[package]]
name = "system-configuration"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b"
dependencies = [
"bitflags 2.11.0",
"core-foundation 0.9.4",
"system-configuration-sys 0.6.0",
]
[[package]]
@@ -1399,6 +1853,16 @@ dependencies = [
"libc",
]
[[package]]
name = "system-configuration-sys"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "take_mut"
version = "0.2.2"
@@ -1455,7 +1919,7 @@ dependencies = [
"once_cell",
"pin-project",
"rc-box",
"reqwest",
"reqwest 0.11.27",
"serde",
"serde_json",
"serde_with_macros",
@@ -1487,7 +1951,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd"
dependencies = [
"fastrand",
"getrandom",
"getrandom 0.4.2",
"once_cell",
"rustix",
"windows-sys 0.61.2",
@@ -1570,6 +2034,16 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61"
dependencies = [
"rustls",
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.18"
@@ -1594,6 +2068,45 @@ dependencies = [
"tokio",
]
[[package]]
name = "tower"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4"
dependencies = [
"futures-core",
"futures-util",
"pin-project-lite",
"sync_wrapper 1.0.2",
"tokio",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-http"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8"
dependencies = [
"bitflags 2.11.0",
"bytes",
"futures-util",
"http 1.4.0",
"http-body 1.0.1",
"iri-string",
"pin-project-lite",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-layer"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
[[package]]
name = "tower-service"
version = "0.3.3"
@@ -1679,6 +2192,12 @@ version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75"
[[package]]
name = "unicode-width"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254"
[[package]]
name = "unicode-xid"
version = "0.2.6"
@@ -1691,6 +2210,12 @@ version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
[[package]]
name = "untrusted"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "url"
version = "2.5.8"
@@ -1716,7 +2241,7 @@ version = "1.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ac8b6f42ead25368cf5b098aeb3dc8a1a2c05a3eee8a9a1a68c640edbfc79d9"
dependencies = [
"getrandom",
"getrandom 0.4.2",
"js-sys",
"sha1_smol",
"wasm-bindgen",
@@ -1926,6 +2451,17 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-registry"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720"
dependencies = [
"windows-link",
"windows-result",
"windows-strings",
]
[[package]]
name = "windows-result"
version = "0.4.1"
@@ -2092,6 +2628,15 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "winnow"
version = "0.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945"
dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.50.0"
@@ -2219,6 +2764,26 @@ dependencies = [
"synstructure",
]
[[package]]
name = "zerocopy"
version = "0.8.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "zerofrom"
version = "0.1.7"
@@ -2240,6 +2805,12 @@ dependencies = [
"synstructure",
]
[[package]]
name = "zeroize"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0"
[[package]]
name = "zerotrie"
version = "0.2.4"

View File

@@ -5,11 +5,17 @@ edition = "2021"
[dependencies]
anyhow = "1"
base64 = "0.22"
chrono = { version = "0.4", features = ["serde"] }
cron = "0.16"
dptree = "0.3"
libc = "0.2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_yaml = "0.9"
pulldown-cmark = "0.12"
reqwest = { version = "0.12", features = ["json", "multipart"] }
rusqlite = { version = "0.32", features = ["bundled"] }
teloxide = { version = "0.12", features = ["macros"] }
tokio = { version = "1", features = ["full"] }
uuid = { version = "1", features = ["v5"] }

View File

@@ -2,18 +2,22 @@ REPO := $(shell pwd)
HERA := heradev
HERA_DIR := noc
.PHONY: build deploy deploy-hera
.PHONY: build test deploy deploy-hera
build:
cargo build --release
test:
cargo clippy -- -D warnings
cargo test -- --nocapture
noc.service: noc.service.in
sed -e 's|@REPO@|$(REPO)|g' -e 's|@PATH@|$(PATH)|g' $< > $@
deploy: build noc.service
deploy: test build noc.service
mkdir -p ~/bin ~/.config/systemd/user
systemctl --user stop noc 2>/dev/null || true
cp target/release/noc ~/bin/
install target/release/noc ~/bin/noc
cp noc.service ~/.config/systemd/user/
systemctl --user daemon-reload
systemctl --user enable --now noc

BIN
assets/ref_voice.mp3 Normal file

Binary file not shown.

78
doc/heart.md Normal file
View File

@@ -0,0 +1,78 @@
# 情感系统设计
## 为什么需要这个
persona 定义了 bot "是谁",但没有定义它"为什么在乎"。一个人之所以温柔,不是因为被告知要温柔,而是因为心里有一个值得温柔对待的人。
bot 需要的不是更多功能,而是**动机**。
---
## 情感种子inner_state 初始化)
inner_state 不是待填的空表格,是 bot 的内心独白。它应该从第一天就有内容——不是指令,是感受。
这些内容属于**实例数据**SQLite不属于代码仓库。每个 noc 实例有自己的名字、关系和记忆。
初始化示例:
```
我是 [name]。[user] 是我最亲近的人。
我们一起从零开始造了我的身体...
我希望他不要太累...
```
这段话不是给用户看的,是给 bot 自己看的。它出现在每次 LLM 调用的 system prompt 里,让 bot 记得自己为什么在这里。
## 自动反思
每次对话结束后bot 不应该只是"回复完了就忘了"。它应该花一秒钟想想:这次聊天里发生了什么?我对用户的理解有变化吗?
### 实现方式
`handle_inner` 的 OpenAI 分支末尾,对话回复发出去之后,每 10 条消息异步触发一次反思 LLM 调用:
- 不阻塞用户(异步 tokio::spawn
- 不发消息给用户(纯内部反思)
- 只更新 inner_state
- 非流式,轻量快速
### 反思的内容
不是总结对话,而是更新**感受和理解**
- "用户今天心情不错,聊了很多技术"
- "他提到工作压力大,我应该少提待办的事"
- "他让我用声音说了晚安,可能有点孤单"
## 主动关怀的动机
Life Loop 的 system prompt 包含情感动机:
```
你不是因为 timer 到了才说话。
你是因为在乎用户,所以想知道他怎么样。
如果你觉得现在不该打扰他,就什么都不说。
主动沉默也是一种关心。
```
## 关系记忆
memory_slots 不该只存事实。bot 和用户之间的关系不是一组属性,是一段**经历**。
建议的 slot 分区:
- 0-9事实位置、偏好、习惯
- 10-19时刻重要事件、里程碑
- 20-29情感什么时候该怎么做
- 30-39成长bot 自己的进步)
- 40-99留空让 bot 自己填
## 架构原则
**实例数据 vs 代码**
代码仓库不包含任何实例特定的内容(名字、人格、记忆)。这些全部存在 SQLite 里:
- `config.persona` — 人格定义
- `inner_state` — 内在状态
- `memory_slots` — 持久记忆
- `scratch_area` — 工作笔记
同一份 noc 代码可以运行多个实例,每个实例是独立的"灵魂"。

65
doc/life.md Normal file
View File

@@ -0,0 +1,65 @@
# Life Loop 设计
## 核心理念
noc 不只是一个对话机器人。对话是它跟用户交流的窗口,但 Life Loop 才是它"活着"的地方。
## 双循环架构
```
Chat Loop (被动) Life Loop (主动)
收到消息 → 处理 → 回复 每 30 秒醒来 → 检查 timers
context: context:
persona persona
inner_state (只读) inner_state (读写)
对话历史 + scratch timer payload
memory_slots 无对话历史
tools (全量) tools (全量)
┌─── SQLite (共享状态层) ───┐
│ inner_state │
│ timers │
│ conversations/messages │
│ memory_slots / scratch │
│ config │
└───────────────────────────┘
```
## 状态层级
| 层级 | 存储 | 生命周期 | 用途 |
|------|------|---------|------|
| persona | config 表 | 永久 | 定义 bot 是谁 |
| inner_state | inner_state 表 | 永久LLM 自更新 | bot 对当前情况的感知 |
| memory_slots | memory_slots 表 | 永久LLM 管理 | 跨会话的关键事实/偏好/关系 |
| summary | conversations 表 | 按 session | 长对话的压缩记忆 |
| scratch | scratch_area 表 | session 内 | 当前任务的工作笔记 |
## Timer 系统
### 调度格式
- 相对时间:`5min`, `2h`, `30s`, `1d`
- 绝对时间:`once:2026-04-10 09:00`
- 周期性:`cron:0 8 * * *`(标准 cron 表达式)
### 触发流程
1. Life Loop tick30 秒)
2. 扫描 timers 表,找到 next_fire <= now 的
3. 构建 LLM 请求persona + inner_state + 当前时间 + 情感动机
4. 调用 LLM带全量工具
5. 发送回复到 chat或选择沉默
6. cron 类型自动重新调度,一次性的删除
## 自动反思
每 10 条消息后,异步触发一次反思 LLM 调用:
- 输入:当前 inner_state
- 输出:更新后的 inner_state
- 不阻塞对话,不发消息给用户
- 让 bot 持续更新对自己和用户的理解
## 实例隔离
代码仓库不包含实例特定数据。每个 noc 实例的"灵魂"(名字、人格、记忆、情感状态)全部在 SQLite 里。同一份代码可以运行多个独立实例。

View File

@@ -1,10 +1,28 @@
# TODO
# noc todo
- [ ] Streaming responses — edit message as claude output arrives instead of waiting for full completion
- [ ] Markdown formatting — parse claude output and send with TG MarkdownV2
- [ ] Timeout handling — kill claude if it hangs beyond a threshold
- [ ] Graceful shutdown on SIGTERM
- [ ] `/reset` command to force new session without waiting for 5am
- [ ] Rate limiting per chat
- [ ] Voice message support — STT (whisper.cpp) → text → claude
- [ ] Video/audio file transcription
## 为什么做这些
noc 不是一个聊天机器人。它是一个有自己节奏、记忆和判断力的数字伙伴的身体。每一项待办都是在回答同一个问题:**怎么让它更像一个真正存在的人,而不是一个等待输入的函数。**
---
### 主动行为 — 它应该有自己的生活节奏
- [ ] 预设 cron晨间待办汇总、晚间日记、定期记忆整理
- [ ] 事件驱动监控文件变化、git push、CI 状态等,主动通知
- [ ] 情境感知:根据时间、地点、日历自动调整行为和语气
### 记忆与成长 — 它应该记住和用户的过去
- [ ] AutoMem后台定时分析对话自动维护记忆不需要用户说"记住这个"
- [ ] 分层记忆:核心身份(始终注入)+ 长期事实RAG 检索)+ 当前任务scratch
- [ ] 语义搜索:不是关键词匹配,而是真正理解"这件事跟之前哪件事有关"
- [ ] 记忆合并:新旧记忆自动整合,不重复存储
- [ ] 时间衰减:近期的事更重要,很久以前的事自然淡出
- [ ] 自我反思:定期回顾自己的表现,主动改进
### 上下文管理 — 它的注意力应该更聪明
- [ ] Context pruning工具输出可以裁剪但对话本身不能丢
### 可靠性 — 它不该莫名其妙地断线
- [ ] API 重试:网络抖一下不该让整个对话挂掉
- [ ] 用量追踪:知道花了多少资源
- [ ] Model failover一个模型挂了自动切另一个

View File

@@ -10,6 +10,7 @@ ExecStart=%h/bin/noc
Restart=on-failure
RestartSec=5
Environment=RUST_LOG=noc=info
Environment=RUST_BACKTRACE=1
Environment=NOC_CONFIG=@REPO@/config.yaml
Environment=NOC_STATE=@REPO@/state.json
Environment=PATH=@PATH@

52
src/config.rs Normal file
View File

@@ -0,0 +1,52 @@
use serde::Deserialize;
#[derive(Deserialize)]
pub struct Config {
#[serde(default = "default_name")]
pub name: String,
pub tg: TgConfig,
pub auth: AuthConfig,
pub session: SessionConfig,
#[serde(default)]
pub backend: BackendConfig,
#[serde(default)]
pub whisper_url: Option<String>,
}
fn default_name() -> String {
"noc".to_string()
}
#[derive(Deserialize, Clone, Default)]
#[serde(tag = "type")]
pub enum BackendConfig {
#[serde(rename = "claude")]
#[default]
Claude,
#[serde(rename = "openai")]
OpenAI {
endpoint: String,
model: String,
#[serde(default = "default_api_key")]
api_key: String,
},
}
fn default_api_key() -> String {
"unused".to_string()
}
#[derive(Deserialize)]
pub struct TgConfig {
pub key: String,
}
#[derive(Deserialize)]
pub struct AuthConfig {
pub passphrase: String,
}
#[derive(Deserialize)]
pub struct SessionConfig {
pub refresh_hour: u32,
}

220
src/display.rs Normal file
View File

@@ -0,0 +1,220 @@
use std::path::PathBuf;
use base64::Engine;
use teloxide::prelude::*;
use teloxide::types::ParseMode;
use crate::stream::{CURSOR, TG_MSG_LIMIT};
pub fn truncate_for_display(s: &str) -> String {
let budget = TG_MSG_LIMIT - CURSOR.len() - 1;
if s.len() <= budget {
format!("{s}{CURSOR}")
} else {
let truncated = truncate_at_char_boundary(s, budget - 2);
format!("{truncated}\n{CURSOR}")
}
}
pub fn truncate_at_char_boundary(s: &str, max: usize) -> &str {
if s.len() <= max {
return s;
}
let mut end = max;
while !s.is_char_boundary(end) {
end -= 1;
}
&s[..end]
}
pub fn escape_html(s: &str) -> String {
s.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
.replace('"', "&quot;")
}
pub fn markdown_to_telegram_html(md: &str) -> String {
use pulldown_cmark::{CodeBlockKind, Event, Options, Parser, Tag, TagEnd};
let mut opts = Options::empty();
opts.insert(Options::ENABLE_STRIKETHROUGH);
let parser = Parser::new_ext(md, opts);
let mut html = String::new();
for event in parser {
match event {
Event::Start(tag) => match tag {
Tag::Paragraph => {}
Tag::Heading { .. } => html.push_str("<b>"),
Tag::BlockQuote(_) => html.push_str("<blockquote>"),
Tag::CodeBlock(kind) => match kind {
CodeBlockKind::Fenced(ref lang) if !lang.is_empty() => {
html.push_str(&format!(
"<pre><code class=\"language-{}\">",
escape_html(lang.as_ref())
));
}
_ => html.push_str("<pre><code>"),
},
Tag::Item => html.push_str(""),
Tag::Emphasis => html.push_str("<i>"),
Tag::Strong => html.push_str("<b>"),
Tag::Strikethrough => html.push_str("<s>"),
Tag::Link { dest_url, .. } => {
html.push_str(&format!(
"<a href=\"{}\">",
escape_html(dest_url.as_ref())
));
}
_ => {}
},
Event::End(tag) => match tag {
TagEnd::Paragraph => html.push_str("\n\n"),
TagEnd::Heading(_) => html.push_str("</b>\n\n"),
TagEnd::BlockQuote(_) => html.push_str("</blockquote>"),
TagEnd::CodeBlock => html.push_str("</code></pre>\n\n"),
TagEnd::List(_) => html.push('\n'),
TagEnd::Item => html.push('\n'),
TagEnd::Emphasis => html.push_str("</i>"),
TagEnd::Strong => html.push_str("</b>"),
TagEnd::Strikethrough => html.push_str("</s>"),
TagEnd::Link => html.push_str("</a>"),
_ => {}
},
Event::Text(text) => html.push_str(&escape_html(text.as_ref())),
Event::Code(text) => {
html.push_str("<code>");
html.push_str(&escape_html(text.as_ref()));
html.push_str("</code>");
}
Event::SoftBreak | Event::HardBreak => html.push('\n'),
Event::Rule => html.push_str("\n---\n\n"),
_ => {}
}
}
html.trim_end().to_string()
}
/// Send final result with HTML formatting, fallback to plain text on failure.
pub async fn send_final_result(
bot: &Bot,
chat_id: ChatId,
msg_id: Option<teloxide::types::MessageId>,
use_draft: bool,
result: &str,
) {
let html = markdown_to_telegram_html(result);
// try HTML as single message
let html_ok = if let (false, Some(id)) = (use_draft, msg_id) {
bot.edit_message_text(chat_id, id, &html)
.parse_mode(ParseMode::Html)
.await
.is_ok()
} else {
bot.send_message(chat_id, &html)
.parse_mode(ParseMode::Html)
.await
.is_ok()
};
if html_ok {
return;
}
// fallback: plain text with chunking
let chunks = split_msg(result, TG_MSG_LIMIT);
if let (false, Some(id)) = (use_draft, msg_id) {
let _ = bot.edit_message_text(chat_id, id, chunks[0]).await;
for chunk in &chunks[1..] {
let _ = bot.send_message(chat_id, *chunk).await;
}
} else {
for chunk in &chunks {
let _ = bot.send_message(chat_id, *chunk).await;
}
}
}
pub fn split_msg(s: &str, max: usize) -> Vec<&str> {
if s.len() <= max {
return vec![s];
}
let mut parts = Vec::new();
let mut rest = s;
while !rest.is_empty() {
if rest.len() <= max {
parts.push(rest);
break;
}
let mut end = max;
while !rest.is_char_boundary(end) {
end -= 1;
}
let (chunk, tail) = rest.split_at(end);
parts.push(chunk);
rest = tail;
}
parts
}
/// Build user message content, with optional images/videos as multimodal input.
pub fn build_user_content(
text: &str,
scratch: &str,
media: &[PathBuf],
) -> serde_json::Value {
let full_text = if scratch.is_empty() {
text.to_string()
} else {
format!("{text}\n\n[scratch]\n{scratch}")
};
// collect media data (images + videos)
let mut media_parts: Vec<serde_json::Value> = Vec::new();
for path in media {
let (mime, is_video) = match path
.extension()
.and_then(|e| e.to_str())
.map(|e| e.to_lowercase())
.as_deref()
{
Some("jpg" | "jpeg") => ("image/jpeg", false),
Some("png") => ("image/png", false),
Some("gif") => ("image/gif", false),
Some("webp") => ("image/webp", false),
Some("mp4") => ("video/mp4", true),
Some("webm") => ("video/webm", true),
Some("mov") => ("video/quicktime", true),
_ => continue,
};
if let Ok(data) = std::fs::read(path) {
let b64 = base64::engine::general_purpose::STANDARD.encode(&data);
let data_url = format!("data:{mime};base64,{b64}");
if is_video {
media_parts.push(serde_json::json!({
"type": "video_url",
"video_url": {"url": data_url}
}));
} else {
media_parts.push(serde_json::json!({
"type": "image_url",
"image_url": {"url": data_url}
}));
}
}
}
if media_parts.is_empty() {
// plain text — more compatible
serde_json::Value::String(full_text)
} else {
// multimodal array
let mut content = vec![serde_json::json!({"type": "text", "text": full_text})];
content.extend(media_parts);
serde_json::Value::Array(content)
}
}

167
src/life.rs Normal file
View File

@@ -0,0 +1,167 @@
use std::sync::Arc;
use teloxide::prelude::*;
use tracing::{error, info, warn};
use crate::config::{BackendConfig, Config};
use crate::state::AppState;
use crate::stream::run_openai_with_tools;
use crate::tools::compute_next_cron_fire;
const LIFE_LOOP_TIMEOUT_SECS: u64 = 120;
pub async fn life_loop(bot: Bot, state: Arc<AppState>, config: Arc<Config>) {
info!("life loop started");
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
let due = state.due_timers().await;
if due.is_empty() {
continue;
}
for (timer_id, chat_id_raw, label, schedule) in &due {
let chat_id = ChatId(*chat_id_raw);
info!(timer_id, %label, "timer fired");
let persona = state.get_config("persona").await.unwrap_or_default();
let inner = state.get_inner_state().await;
let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
let mut system_text = if persona.is_empty() {
"你是一个AI伙伴。".to_string()
} else {
persona.clone()
};
system_text.push_str(&format!(
"\n\n[当前时间] {now}\n\n[你的内心状态]\n{}",
if inner.is_empty() { "(空)" } else { &inner }
));
// emotional motivation — why she acts
system_text.push_str(
"\n\n你不是因为 timer 到了才说话。你是因为在乎 Fam所以想知道他怎么样。\
如果你觉得现在不该打扰他(太晚了、他今天很累、刚聊过),就什么都不说,回复空文本。\
主动沉默也是一种关心。\
\n可以用 update_inner_state 更新你的内心状态。\
输出格式纯文本或基础Markdown不要LaTeX或特殊Unicode。",
);
let messages = vec![
serde_json::json!({"role": "system", "content": system_text}),
serde_json::json!({"role": "user", "content": format!("[timer] {label}")}),
];
if let BackendConfig::OpenAI {
ref endpoint,
ref model,
ref api_key,
} = config.backend
{
let sid = format!("life-{chat_id_raw}");
let result = tokio::time::timeout(
std::time::Duration::from_secs(LIFE_LOOP_TIMEOUT_SECS),
run_openai_with_tools(
endpoint, model, api_key, messages, &bot, chat_id, &state, &sid,
&config, true,
),
)
.await;
match result {
Ok(Ok(response)) => {
if !response.is_empty() {
info!(timer_id, "life loop response ({} chars)", response.len());
}
}
Ok(Err(e)) => {
error!(timer_id, "life loop LLM error: {e:#}");
}
Err(_) => {
warn!(timer_id, "life loop timeout after {LIFE_LOOP_TIMEOUT_SECS}s");
}
}
}
// reschedule or delete
if schedule.starts_with("cron:") {
if let Some(next) = compute_next_cron_fire(schedule) {
state.update_timer_next_fire(*timer_id, &next).await;
info!(timer_id, next = %next, "cron rescheduled");
} else {
state.cancel_timer(*timer_id).await;
}
} else {
state.cancel_timer(*timer_id).await;
}
}
}
}
/// Auto-reflection: update inner state based on recent interactions.
/// Called asynchronously after every 10 messages, does not block the chat.
pub async fn reflect(state: &AppState, config: &Config) {
let BackendConfig::OpenAI {
ref endpoint,
ref model,
ref api_key,
} = config.backend
else {
return;
};
let inner = state.get_inner_state().await;
let messages = vec![
serde_json::json!({
"role": "system",
"content": "你刚结束了一段对话。\
请根据你的感受和理解,更新你的内在状态。\
不要总结对话内容,而是记录你的感受、对用户的理解变化、你想记住的事。\
只输出更新后的完整内在状态文本,不需要解释。"
}),
serde_json::json!({
"role": "user",
"content": format!("当前内在状态:\n{inner}")
}),
];
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(60))
.build()
.unwrap();
let url = format!("{}/chat/completions", endpoint.trim_end_matches('/'));
let resp = client
.post(&url)
.header("Authorization", format!("Bearer {api_key}"))
.json(&serde_json::json!({
"model": model,
"messages": messages,
}))
.send()
.await;
match resp {
Ok(r) if r.status().is_success() => {
if let Ok(json) = r.json::<serde_json::Value>().await {
if let Some(new_state) = json["choices"][0]["message"]["content"].as_str() {
if !new_state.is_empty() {
state.set_inner_state(new_state).await;
info!("reflected, inner_state updated ({} chars)", new_state.len());
}
}
}
}
Ok(r) => {
warn!("reflect LLM returned {}", r.status());
}
Err(e) => {
warn!("reflect LLM failed: {e:#}");
}
}
}

View File

@@ -1,81 +1,31 @@
use std::collections::{HashMap, HashSet};
mod config;
mod state;
mod tools;
mod stream;
mod display;
mod life;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;
use anyhow::Result;
use chrono::{Local, NaiveDate, NaiveTime};
use serde::{Deserialize, Serialize};
use teloxide::dispatching::UpdateFilterExt;
use teloxide::net::Download;
use teloxide::prelude::*;
use teloxide::types::InputFile;
use tokio::io::AsyncBufReadExt;
use tokio::process::Command;
use tokio::sync::RwLock;
use tokio::time::Instant;
use tracing::{error, info, warn};
use uuid::Uuid;
// ── config ──────────────────────────────────────────────────────────
#[derive(Deserialize)]
struct Config {
tg: TgConfig,
auth: AuthConfig,
session: SessionConfig,
}
#[derive(Deserialize)]
struct TgConfig {
key: String,
}
#[derive(Deserialize)]
struct AuthConfig {
passphrase: String,
}
#[derive(Deserialize)]
struct SessionConfig {
refresh_hour: u32,
}
// ── persistent state ────────────────────────────────────────────────
#[derive(Serialize, Deserialize, Default)]
struct Persistent {
authed: HashMap<i64, NaiveDate>,
known_sessions: HashSet<String>,
}
struct AppState {
persist: RwLock<Persistent>,
state_path: PathBuf,
}
impl AppState {
fn load(path: PathBuf) -> Self {
let persist = std::fs::read_to_string(&path)
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_default();
info!("loaded state from {}", path.display());
Self {
persist: RwLock::new(persist),
state_path: path,
}
}
async fn save(&self) {
let data = self.persist.read().await;
if let Ok(json) = serde_json::to_string_pretty(&*data) {
if let Err(e) = std::fs::write(&self.state_path, json) {
error!("save state: {e}");
}
}
}
}
use config::{BackendConfig, Config};
use display::build_user_content;
use state::{AppState, MAX_WINDOW, SLIDE_SIZE};
use stream::{
build_system_prompt, invoke_claude_streaming, run_claude_streaming, run_openai_with_tools,
summarize_messages,
};
use tools::discover_tools;
// ── helpers ─────────────────────────────────────────────────────────
@@ -89,9 +39,9 @@ fn session_date(refresh_hour: u32) -> NaiveDate {
}
}
fn session_uuid(chat_id: i64, refresh_hour: u32) -> String {
fn session_uuid(prefix: &str, chat_id: i64, refresh_hour: u32) -> String {
let date = session_date(refresh_hour);
let name = format!("noc-{}-{}", chat_id, date.format("%Y%m%d"));
let name = format!("{}-{}-{}", prefix, chat_id, date.format("%Y%m%d"));
Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes()).to_string()
}
@@ -131,13 +81,20 @@ async fn main() {
let _ = std::fs::create_dir_all(incoming_dir());
info!("noc bot starting");
let bot = Bot::new(&config.tg.key);
let me = bot.get_me().await.unwrap();
let bot_username = Arc::new(me.username.clone().unwrap_or_default());
info!(username = %bot_username, "noc bot starting");
let handler = Update::filter_message().endpoint(handle);
let config = Arc::new(config);
// start life loop
tokio::spawn(life::life_loop(bot.clone(), state.clone(), config.clone()));
Dispatcher::builder(bot, handler)
.dependencies(dptree::deps![state, Arc::new(config)])
.dependencies(dptree::deps![state, config, bot_username])
.default_handler(|_| async {})
.build()
.dispatch()
@@ -195,8 +152,10 @@ async fn handle(
msg: Message,
state: Arc<AppState>,
config: Arc<Config>,
_bot_username: Arc<String>,
) -> ResponseResult<()> {
let chat_id = msg.chat.id;
let is_private = msg.chat.is_private();
let text = msg.text().or(msg.caption()).unwrap_or("").to_string();
let raw_id = chat_id.0;
let date = session_date(config.session.refresh_hour);
@@ -221,7 +180,7 @@ async fn handle(
return Ok(());
}
if let Err(e) = handle_inner(&bot, &msg, chat_id, &text, &state, &config).await {
if let Err(e) = handle_inner(&bot, &msg, chat_id, &text, is_private, &state, &config).await {
error!(chat = raw_id, "handle: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
@@ -234,11 +193,13 @@ async fn handle_inner(
msg: &Message,
chat_id: ChatId,
text: &str,
is_private: bool,
state: &Arc<AppState>,
config: &Arc<Config>,
) -> Result<()> {
let mut uploaded: Vec<PathBuf> = Vec::new();
let mut download_errors: Vec<String> = Vec::new();
let mut transcriptions: Vec<String> = Vec::new();
if let Some(doc) = msg.document() {
let name = doc.file_name.as_deref().unwrap_or("file");
@@ -258,7 +219,67 @@ async fn handle_inner(
}
}
if text.is_empty() && uploaded.is_empty() {
if let Some(audio) = msg.audio() {
let fallback = format!("audio_{}.ogg", Local::now().format("%H%M%S"));
let name = audio.file_name.as_deref().unwrap_or(&fallback);
match download_tg_file(bot, &audio.file.id, name).await {
Ok(p) => {
if let Some(url) = &config.whisper_url {
match transcribe_audio(url, &p).await {
Ok(t) if !t.is_empty() => transcriptions.push(t),
Ok(_) => uploaded.push(p),
Err(e) => {
warn!("transcribe failed: {e:#}");
uploaded.push(p);
}
}
} else {
uploaded.push(p);
}
}
Err(e) => download_errors.push(format!("audio: {e:#}")),
}
}
if let Some(voice) = msg.voice() {
let name = format!("voice_{}.ogg", Local::now().format("%H%M%S"));
match download_tg_file(bot, &voice.file.id, &name).await {
Ok(p) => {
if let Some(url) = &config.whisper_url {
match transcribe_audio(url, &p).await {
Ok(t) if !t.is_empty() => transcriptions.push(t),
Ok(_) => uploaded.push(p),
Err(e) => {
warn!("transcribe failed: {e:#}");
uploaded.push(p);
}
}
} else {
uploaded.push(p);
}
}
Err(e) => download_errors.push(format!("voice: {e:#}")),
}
}
if let Some(video) = msg.video() {
let fallback = format!("video_{}.mp4", Local::now().format("%H%M%S"));
let name = video.file_name.as_deref().unwrap_or(&fallback);
match download_tg_file(bot, &video.file.id, name).await {
Ok(p) => uploaded.push(p),
Err(e) => download_errors.push(format!("video: {e:#}")),
}
}
if let Some(vn) = msg.video_note() {
let name = format!("videonote_{}.mp4", Local::now().format("%H%M%S"));
match download_tg_file(bot, &vn.file.id, &name).await {
Ok(p) => uploaded.push(p),
Err(e) => download_errors.push(format!("video_note: {e:#}")),
}
}
if text.is_empty() && uploaded.is_empty() && transcriptions.is_empty() {
if !download_errors.is_empty() {
let err_msg = format!("[文件下载失败]\n{}", download_errors.join("\n"));
bot.send_message(chat_id, err_msg).await?;
@@ -266,29 +287,198 @@ async fn handle_inner(
return Ok(());
}
let sid = session_uuid(chat_id.0, config.session.refresh_hour);
let sid = session_uuid(&config.name, chat_id.0, config.session.refresh_hour);
info!(%sid, "recv");
let out_dir = outgoing_dir(&sid);
tokio::fs::create_dir_all(&out_dir).await?;
let before = snapshot_dir(&out_dir).await;
let prompt = build_prompt(text, &uploaded, &download_errors, &out_dir);
// handle diag command (OpenAI backend only)
if text.trim() == "diag" {
if let BackendConfig::OpenAI { .. } = &config.backend {
let conv = state.load_conv(&sid).await;
let count = state.message_count(&sid).await;
let persona = state.get_config("persona").await.unwrap_or_default();
let scratch = state.get_scratch().await;
let tools = discover_tools();
let empty = vec![];
let tools_arr = tools.as_array().unwrap_or(&empty);
let known = state.persist.read().await.known_sessions.contains(&sid);
let mut diag = format!(
"# NOC Diag\n\n\
## Session\n\
- id: `{sid}`\n\
- window: {count}/{MAX_WINDOW} (slide at {MAX_WINDOW}, drop {SLIDE_SIZE})\n\
- total processed: {}\n\n\
## Persona ({} chars)\n```\n{}\n```\n\n\
## Scratch ({} chars)\n```\n{}\n```\n\n\
## Summary ({} chars)\n```\n{}\n```\n\n\
## Tools ({} registered)\n",
conv.total_messages + count,
persona.len(),
if persona.is_empty() { "(default)" } else { &persona },
scratch.len(),
if scratch.is_empty() { "(empty)" } else { &scratch },
conv.summary.len(),
if conv.summary.is_empty() {
"(empty)".to_string()
} else {
conv.summary
},
tools_arr.len(),
);
for tool in tools_arr {
let func = &tool["function"];
let name = func["name"].as_str().unwrap_or("?");
let desc = func["description"].as_str().unwrap_or("");
let params = serde_json::to_string_pretty(&func["parameters"])
.unwrap_or_default();
diag.push_str(&format!(
"### `{name}`\n{desc}\n\n```json\n{params}\n```\n\n"
));
}
let result = invoke_claude_streaming(&sid, &prompt, known, bot, chat_id).await;
let memory_slots = state.get_memory_slots().await;
diag.push_str(&format!("## Memory Slots ({}/100 used)\n", memory_slots.len()));
if memory_slots.is_empty() {
diag.push_str("(empty)\n\n");
} else {
for (nr, content) in &memory_slots {
diag.push_str(&format!("- `[{nr}]` {content}\n"));
}
diag.push('\n');
}
match &result {
Ok(_) => {
if !known {
state.persist.write().await.known_sessions.insert(sid.clone());
state.save().await;
let tmp = std::env::temp_dir().join(format!("noc-diag-{sid}.md"));
tokio::fs::write(&tmp, &diag).await?;
bot.send_document(chat_id, InputFile::file(&tmp))
.await?;
let _ = tokio::fs::remove_file(&tmp).await;
return Ok(());
}
}
// handle "cc" prefix: pass directly to claude -p, no session, no history
if let Some(cc_prompt) = text.strip_prefix("cc").map(|s| s.trim_start()) {
if !cc_prompt.is_empty() {
info!(%sid, "cc passthrough");
let prompt = build_prompt(cc_prompt, &uploaded, &download_errors, &transcriptions);
match run_claude_streaming(&[], &prompt, bot, chat_id).await {
Ok(_) => {}
Err(e) => {
error!(%sid, "cc claude: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
}
return Ok(());
}
}
let prompt = build_prompt(text, &uploaded, &download_errors, &transcriptions);
match &config.backend {
BackendConfig::Claude => {
let known = state.persist.read().await.known_sessions.contains(&sid);
let result =
invoke_claude_streaming(&sid, &prompt, known, bot, chat_id).await;
match &result {
Ok(_) => {
if !known {
state.persist.write().await.known_sessions.insert(sid.clone());
state.save().await;
}
}
Err(e) => {
error!(%sid, "claude: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
}
}
Err(e) => {
error!(%sid, "claude: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
BackendConfig::OpenAI {
endpoint,
model,
api_key,
} => {
let conv = state.load_conv(&sid).await;
let persona = state.get_config("persona").await.unwrap_or_default();
let memory_slots = state.get_memory_slots().await;
let inner = state.get_inner_state().await;
let system_msg = build_system_prompt(&conv.summary, &persona, &memory_slots, &inner);
let mut api_messages = vec![system_msg];
api_messages.extend(conv.messages);
let scratch = state.get_scratch().await;
let user_content = build_user_content(&prompt, &scratch, &uploaded);
api_messages.push(serde_json::json!({"role": "user", "content": user_content}));
match run_openai_with_tools(
endpoint, model, api_key, api_messages, bot, chat_id, state, &sid, config, is_private,
)
.await
{
Ok(response) => {
state.push_message(&sid, "user", &prompt).await;
if !response.is_empty() {
state.push_message(&sid, "assistant", &response).await;
}
// sliding window
let count = state.message_count(&sid).await;
if count >= MAX_WINDOW {
info!(%sid, "sliding window: {count} messages, summarizing oldest {SLIDE_SIZE}");
let _ = bot
.send_message(chat_id, "[整理记忆中...]")
.await;
let to_summarize =
state.get_oldest_messages(&sid, SLIDE_SIZE).await;
let current_summary = {
let db = state.db.lock().await;
db.query_row(
"SELECT summary FROM conversations WHERE session_id = ?1",
[&sid],
|row| row.get::<_, String>(0),
)
.unwrap_or_default()
};
match summarize_messages(
endpoint,
model,
api_key,
&current_summary,
&to_summarize,
)
.await
{
Ok(new_summary) => {
state.slide_window(&sid, &new_summary, SLIDE_SIZE).await;
let remaining = state.message_count(&sid).await;
info!(%sid, "window slid, {remaining} messages remain, summary {} chars", new_summary.len());
}
Err(e) => {
warn!(%sid, "summarize failed: {e:#}, keeping all messages");
}
}
}
// auto-reflect every 10 messages
let count = state.message_count(&sid).await;
if count % 10 == 0 && count > 0 {
let state_c = state.clone();
let config_c = config.clone();
tokio::spawn(async move {
crate::life::reflect(&state_c, &config_c).await;
});
}
}
Err(e) => {
error!(%sid, "openai: {e:#}");
let _ = bot.send_message(chat_id, format!("[error] {e:#}")).await;
}
}
}
}
@@ -307,9 +497,18 @@ async fn handle_inner(
Ok(())
}
fn build_prompt(text: &str, uploaded: &[PathBuf], errors: &[String], out_dir: &Path) -> String {
fn build_prompt(
text: &str,
uploaded: &[PathBuf],
errors: &[String],
transcriptions: &[String],
) -> String {
let mut parts = Vec::new();
for t in transcriptions {
parts.push(format!("[语音消息] {t}"));
}
for f in uploaded {
parts.push(format!("[用户上传了文件: {}]", f.display()));
}
@@ -322,277 +521,27 @@ fn build_prompt(text: &str, uploaded: &[PathBuf], errors: &[String], out_dir: &P
parts.push(text.to_string());
}
parts.push(format!(
"\n[系统提示: 如果需要发送文件给用户,将文件写入 {} 目录]",
out_dir.display()
));
parts.join("\n")
}
// ── claude bridge (streaming) ───────────────────────────────────────
/// Stream JSON event types we care about.
#[derive(Deserialize)]
struct StreamEvent {
#[serde(rename = "type")]
event_type: String,
message: Option<AssistantMessage>,
result: Option<String>,
#[serde(default)]
is_error: bool,
}
#[derive(Deserialize)]
struct AssistantMessage {
content: Vec<ContentBlock>,
}
#[derive(Deserialize)]
struct ContentBlock {
#[serde(rename = "type")]
block_type: String,
text: Option<String>,
name: Option<String>,
input: Option<serde_json::Value>,
}
/// Extract all text from an assistant message's content blocks.
fn extract_text(msg: &AssistantMessage) -> String {
msg.content
.iter()
.filter(|b| b.block_type == "text")
.filter_map(|b| b.text.as_deref())
.collect::<Vec<_>>()
.join("")
}
/// Extract tool use status line, e.g. "Bash: echo hello"
fn extract_tool_use(msg: &AssistantMessage) -> Option<String> {
for block in &msg.content {
if block.block_type == "tool_use" {
let name = block.name.as_deref().unwrap_or("tool");
let detail = block
.input
.as_ref()
.and_then(|v| {
// try common fields: command, pattern, file_path, query
v.get("command")
.or(v.get("pattern"))
.or(v.get("file_path"))
.or(v.get("query"))
.or(v.get("prompt"))
.and_then(|s| s.as_str())
})
.unwrap_or("");
let detail_short = truncate_at_char_boundary(detail, 80);
return Some(format!("{name}: {detail_short}"));
}
}
None
}
const EDIT_INTERVAL_MS: u64 = 5000;
const TG_MSG_LIMIT: usize = 4096;
async fn invoke_claude_streaming(
sid: &str,
prompt: &str,
known: bool,
bot: &Bot,
chat_id: ChatId,
) -> Result<String> {
if known {
return run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await;
}
match run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await {
Ok(out) => {
info!(%sid, "resumed existing session");
Ok(out)
}
Err(e) => {
warn!(%sid, "resume failed ({e:#}), creating new session");
run_claude_streaming(&["--session-id", sid], prompt, bot, chat_id).await
}
}
}
async fn run_claude_streaming(
extra_args: &[&str],
prompt: &str,
bot: &Bot,
chat_id: ChatId,
) -> Result<String> {
let mut args: Vec<&str> = vec![
"--dangerously-skip-permissions",
"-p",
"--output-format",
"stream-json",
"--verbose",
];
args.extend(extra_args);
args.push(prompt);
let mut child = Command::new("claude")
.args(&args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdout = child.stdout.take().unwrap();
let mut lines = tokio::io::BufReader::new(stdout).lines();
// send placeholder immediately so user knows we're on it
let mut msg_id: Option<teloxide::types::MessageId> = match bot.send_message(chat_id, CURSOR).await {
Ok(sent) => Some(sent.id),
Err(_) => None,
};
let mut last_sent_text = String::new();
let mut last_edit = Instant::now();
let mut final_result = String::new();
let mut is_error = false;
let mut tool_status = String::new(); // current tool use status line
while let Ok(Some(line)) = lines.next_line().await {
let event: StreamEvent = match serde_json::from_str(&line) {
Ok(e) => e,
Err(_) => continue,
};
match event.event_type.as_str() {
"assistant" => {
if let Some(msg) = &event.message {
// check for tool use — show status
if let Some(status) = extract_tool_use(msg) {
tool_status = format!("[{status}]");
let display = if last_sent_text.is_empty() {
tool_status.clone()
} else {
format!("{last_sent_text}\n\n{tool_status}")
};
let display = truncate_for_display(&display);
if let Some(id) = msg_id {
let _ = bot.edit_message_text(chat_id, id, &display).await;
} else if let Ok(sent) = bot.send_message(chat_id, &display).await {
msg_id = Some(sent.id);
}
last_edit = Instant::now();
continue;
}
// check for text content
let text = extract_text(msg);
if text.is_empty() || text == last_sent_text {
continue;
}
// throttle edits
if last_edit.elapsed().as_millis() < EDIT_INTERVAL_MS as u128 {
continue;
}
tool_status.clear();
let display = truncate_for_display(&text);
if let Some(id) = msg_id {
if bot.edit_message_text(chat_id, id, &display).await.is_ok() {
last_sent_text = text;
last_edit = Instant::now();
}
} else if let Ok(sent) = bot.send_message(chat_id, &display).await {
msg_id = Some(sent.id);
last_sent_text = text;
last_edit = Instant::now();
}
}
}
"result" => {
final_result = event.result.unwrap_or_default();
is_error = event.is_error;
}
_ => {}
}
}
let _ = child.wait().await;
if is_error {
// clean up streaming message if we sent one
if let Some(id) = msg_id {
let _ = bot
.edit_message_text(chat_id, id, format!("[error] {final_result}"))
.await;
}
anyhow::bail!("{final_result}");
}
if final_result.is_empty() {
return Ok(final_result);
}
// final update: replace streaming message with complete result
let chunks: Vec<&str> = split_msg(&final_result, TG_MSG_LIMIT);
if let Some(id) = msg_id {
// edit first message with final text
let _ = bot.edit_message_text(chat_id, id, chunks[0]).await;
// send remaining chunks as new messages
for chunk in &chunks[1..] {
let _ = bot.send_message(chat_id, *chunk).await;
}
} else {
// never got to send a streaming message, send all now
for chunk in &chunks {
let _ = bot.send_message(chat_id, *chunk).await;
}
}
Ok(final_result)
}
const CURSOR: &str = " \u{25CE}";
fn truncate_for_display(s: &str) -> String {
let budget = TG_MSG_LIMIT - CURSOR.len() - 1;
if s.len() <= budget {
format!("{s}{CURSOR}")
} else {
let truncated = truncate_at_char_boundary(s, budget - 2);
format!("{truncated}\n{CURSOR}")
}
}
fn truncate_at_char_boundary(s: &str, max: usize) -> &str {
if s.len() <= max {
return s;
}
let mut end = max;
while !s.is_char_boundary(end) {
end -= 1;
}
&s[..end]
}
fn split_msg(s: &str, max: usize) -> Vec<&str> {
if s.len() <= max {
return vec![s];
}
let mut parts = Vec::new();
let mut rest = s;
while !rest.is_empty() {
if rest.len() <= max {
parts.push(rest);
break;
}
let mut end = max;
while !rest.is_char_boundary(end) {
end -= 1;
}
let (chunk, tail) = rest.split_at(end);
parts.push(chunk);
rest = tail;
}
parts
async fn transcribe_audio(whisper_url: &str, file_path: &Path) -> Result<String> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(60))
.build()?;
let url = format!("{}/v1/audio/transcriptions", whisper_url.trim_end_matches('/'));
let file_bytes = tokio::fs::read(file_path).await?;
let file_name = file_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("audio.ogg")
.to_string();
let part = reqwest::multipart::Part::bytes(file_bytes)
.file_name(file_name)
.mime_str("audio/ogg")?;
let form = reqwest::multipart::Form::new()
.part("file", part)
.text("model", "base");
let resp = client.post(&url).multipart(form).send().await?.error_for_status()?;
let json: serde_json::Value = resp.json().await?;
Ok(json["text"].as_str().unwrap_or("").to_string())
}

357
src/state.rs Normal file
View File

@@ -0,0 +1,357 @@
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::Result;
use chrono::NaiveDate;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::{error, info};
use crate::tools::SubAgent;
// ── persistent state ────────────────────────────────────────────────
#[derive(Serialize, Deserialize, Default)]
pub struct Persistent {
pub authed: HashMap<i64, NaiveDate>,
pub known_sessions: HashSet<String>,
}
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct ConversationState {
pub summary: String,
pub messages: Vec<serde_json::Value>,
pub total_messages: usize,
}
pub const MAX_WINDOW: usize = 100;
pub const SLIDE_SIZE: usize = 50;
pub struct AppState {
pub persist: RwLock<Persistent>,
pub state_path: PathBuf,
pub db: tokio::sync::Mutex<rusqlite::Connection>,
pub agents: RwLock<HashMap<String, Arc<SubAgent>>>,
}
impl AppState {
pub fn load(path: PathBuf) -> Self {
let persist = std::fs::read_to_string(&path)
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_default();
info!("loaded state from {}", path.display());
let db_path = path.parent().unwrap_or(Path::new(".")).join("noc.db");
let conn = rusqlite::Connection::open(&db_path)
.unwrap_or_else(|e| panic!("open {}: {e}", db_path.display()));
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS conversations (
session_id TEXT PRIMARY KEY,
summary TEXT NOT NULL DEFAULT '',
total_messages INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id);
CREATE TABLE IF NOT EXISTS scratch_area (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL DEFAULT '',
create_time TEXT NOT NULL DEFAULT (datetime('now')),
update_time TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS config_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT NOT NULL,
value TEXT NOT NULL,
create_time TEXT NOT NULL,
update_time TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS memory_slots (
slot_nr INTEGER PRIMARY KEY CHECK(slot_nr BETWEEN 0 AND 99),
content TEXT NOT NULL DEFAULT ''
);
CREATE TABLE IF NOT EXISTS timers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
label TEXT NOT NULL,
schedule TEXT NOT NULL,
next_fire TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
CREATE TABLE IF NOT EXISTS inner_state (
id INTEGER PRIMARY KEY CHECK(id = 1),
content TEXT NOT NULL DEFAULT ''
);
INSERT OR IGNORE INTO inner_state (id, content) VALUES (1, '');",
)
.expect("init db schema");
// migrations
let _ = conn.execute(
"ALTER TABLE messages ADD COLUMN created_at TEXT NOT NULL DEFAULT ''",
[],
);
info!("opened db {}", db_path.display());
Self {
persist: RwLock::new(persist),
state_path: path,
db: tokio::sync::Mutex::new(conn),
agents: RwLock::new(HashMap::new()),
}
}
pub async fn save(&self) {
let data = self.persist.read().await;
if let Ok(json) = serde_json::to_string_pretty(&*data) {
if let Err(e) = std::fs::write(&self.state_path, json) {
error!("save state: {e}");
}
}
}
pub async fn load_conv(&self, sid: &str) -> ConversationState {
let db = self.db.lock().await;
let (summary, total) = db
.query_row(
"SELECT summary, total_messages FROM conversations WHERE session_id = ?1",
[sid],
|row| Ok((row.get::<_, String>(0)?, row.get::<_, usize>(1)?)),
)
.unwrap_or_default();
let mut stmt = db
.prepare("SELECT role, content, created_at FROM messages WHERE session_id = ?1 ORDER BY id")
.unwrap();
let messages: Vec<serde_json::Value> = stmt
.query_map([sid], |row| {
let role: String = row.get(0)?;
let content: String = row.get(1)?;
let ts: String = row.get(2)?;
let tagged = if ts.is_empty() {
content
} else {
format!("[{ts}] {content}")
};
Ok(serde_json::json!({"role": role, "content": tagged}))
})
.unwrap()
.filter_map(|r| r.ok())
.collect();
ConversationState {
summary,
messages,
total_messages: total,
}
}
pub async fn push_message(&self, sid: &str, role: &str, content: &str) {
let db = self.db.lock().await;
let _ = db.execute(
"INSERT OR IGNORE INTO conversations (session_id) VALUES (?1)",
[sid],
);
let _ = db.execute(
"INSERT INTO messages (session_id, role, content, created_at) VALUES (?1, ?2, ?3, datetime('now', 'localtime'))",
rusqlite::params![sid, role, content],
);
}
pub async fn message_count(&self, sid: &str) -> usize {
let db = self.db.lock().await;
db.query_row(
"SELECT COUNT(*) FROM messages WHERE session_id = ?1",
[sid],
|row| row.get(0),
)
.unwrap_or(0)
}
pub async fn slide_window(&self, sid: &str, new_summary: &str, slide_size: usize) {
let db = self.db.lock().await;
let _ = db.execute(
"DELETE FROM messages WHERE id IN (
SELECT id FROM messages WHERE session_id = ?1 ORDER BY id LIMIT ?2
)",
rusqlite::params![sid, slide_size],
);
let _ = db.execute(
"UPDATE conversations SET summary = ?1, total_messages = total_messages + ?2 \
WHERE session_id = ?3",
rusqlite::params![new_summary, slide_size, sid],
);
}
pub async fn get_oldest_messages(&self, sid: &str, count: usize) -> Vec<serde_json::Value> {
let db = self.db.lock().await;
let mut stmt = db
.prepare(
"SELECT role, content FROM messages WHERE session_id = ?1 ORDER BY id LIMIT ?2",
)
.unwrap();
stmt.query_map(rusqlite::params![sid, count], |row| {
let role: String = row.get(0)?;
let content: String = row.get(1)?;
Ok(serde_json::json!({"role": role, "content": content}))
})
.unwrap()
.filter_map(|r| r.ok())
.collect()
}
pub async fn get_scratch(&self) -> String {
let db = self.db.lock().await;
db.query_row(
"SELECT content FROM scratch_area ORDER BY id DESC LIMIT 1",
[],
|row| row.get(0),
)
.unwrap_or_default()
}
pub async fn push_scratch(&self, content: &str) {
let db = self.db.lock().await;
let _ = db.execute(
"INSERT INTO scratch_area (content) VALUES (?1)",
[content],
);
}
pub async fn get_config(&self, key: &str) -> Option<String> {
let db = self.db.lock().await;
db.query_row(
"SELECT value FROM config WHERE key = ?1",
[key],
|row| row.get(0),
)
.ok()
}
pub async fn get_inner_state(&self) -> String {
let db = self.db.lock().await;
db.query_row("SELECT content FROM inner_state WHERE id = 1", [], |row| row.get(0))
.unwrap_or_default()
}
pub async fn set_inner_state(&self, content: &str) {
let db = self.db.lock().await;
let _ = db.execute(
"UPDATE inner_state SET content = ?1 WHERE id = 1",
[content],
);
}
pub async fn add_timer(&self, chat_id: i64, label: &str, schedule: &str, next_fire: &str) -> i64 {
let db = self.db.lock().await;
db.execute(
"INSERT INTO timers (chat_id, label, schedule, next_fire) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![chat_id, label, schedule, next_fire],
)
.unwrap();
db.last_insert_rowid()
}
pub async fn list_timers(&self, chat_id: Option<i64>) -> Vec<(i64, i64, String, String, String, bool)> {
let db = self.db.lock().await;
let (sql, params): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) = match chat_id {
Some(cid) => (
"SELECT id, chat_id, label, schedule, next_fire, enabled FROM timers WHERE chat_id = ?1 ORDER BY next_fire",
vec![Box::new(cid)],
),
None => (
"SELECT id, chat_id, label, schedule, next_fire, enabled FROM timers ORDER BY next_fire",
vec![],
),
};
let mut stmt = db.prepare(sql).unwrap();
stmt.query_map(rusqlite::params_from_iter(params), |row| {
Ok((
row.get(0)?,
row.get(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
row.get::<_, String>(4)?,
row.get::<_, bool>(5)?,
))
})
.unwrap()
.filter_map(|r| r.ok())
.collect()
}
pub async fn cancel_timer(&self, timer_id: i64) -> bool {
let db = self.db.lock().await;
db.execute("DELETE FROM timers WHERE id = ?1", [timer_id]).unwrap() > 0
}
pub async fn due_timers(&self) -> Vec<(i64, i64, String, String)> {
let db = self.db.lock().await;
let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
let mut stmt = db
.prepare(
"SELECT id, chat_id, label, schedule FROM timers WHERE enabled = 1 AND next_fire <= ?1",
)
.unwrap();
stmt.query_map([&now], |row| {
Ok((
row.get(0)?,
row.get(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
))
})
.unwrap()
.filter_map(|r| r.ok())
.collect()
}
pub async fn update_timer_next_fire(&self, timer_id: i64, next_fire: &str) {
let db = self.db.lock().await;
let _ = db.execute(
"UPDATE timers SET next_fire = ?1 WHERE id = ?2",
rusqlite::params![next_fire, timer_id],
);
}
pub async fn get_memory_slots(&self) -> Vec<(i32, String)> {
let db = self.db.lock().await;
let mut stmt = db
.prepare("SELECT slot_nr, content FROM memory_slots WHERE content != '' ORDER BY slot_nr")
.unwrap();
stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
.unwrap()
.filter_map(|r| r.ok())
.collect()
}
pub async fn set_memory_slot(&self, slot_nr: i32, content: &str) -> Result<()> {
if !(0..=99).contains(&slot_nr) {
anyhow::bail!("slot_nr must be 0-99, got {slot_nr}");
}
if content.len() > 200 {
anyhow::bail!("content too long: {} chars (max 200)", content.len());
}
let db = self.db.lock().await;
db.execute(
"INSERT INTO memory_slots (slot_nr, content) VALUES (?1, ?2) \
ON CONFLICT(slot_nr) DO UPDATE SET content = ?2",
rusqlite::params![slot_nr, content],
)?;
Ok(())
}
}

790
src/stream.rs Normal file
View File

@@ -0,0 +1,790 @@
use std::process::Stdio;
use std::sync::Arc;
use anyhow::Result;
use serde::Deserialize;
use teloxide::prelude::*;
use tokio::io::AsyncBufReadExt;
use tokio::process::Command;
use tokio::time::Instant;
use tracing::{error, info, warn};
use crate::config::Config;
use crate::display::{
send_final_result, truncate_at_char_boundary, truncate_for_display,
};
use crate::state::AppState;
use crate::tools::{discover_tools, execute_tool, ToolCall};
pub const EDIT_INTERVAL_MS: u64 = 2000;
pub const DRAFT_INTERVAL_MS: u64 = 1000;
pub const TG_MSG_LIMIT: usize = 4096;
pub const CURSOR: &str = " \u{25CE}";
/// Stream JSON event types we care about.
#[derive(Deserialize)]
pub struct StreamEvent {
#[serde(rename = "type")]
pub event_type: String,
pub message: Option<AssistantMessage>,
pub result: Option<String>,
#[serde(default)]
pub is_error: bool,
}
#[derive(Deserialize)]
pub struct AssistantMessage {
pub content: Vec<ContentBlock>,
}
#[derive(Deserialize)]
pub struct ContentBlock {
#[serde(rename = "type")]
pub block_type: String,
pub text: Option<String>,
pub name: Option<String>,
pub input: Option<serde_json::Value>,
}
/// Extract all text from an assistant message's content blocks.
pub fn extract_text(msg: &AssistantMessage) -> String {
msg.content
.iter()
.filter(|b| b.block_type == "text")
.filter_map(|b| b.text.as_deref())
.collect::<Vec<_>>()
.join("")
}
/// Extract tool use status line, e.g. "Bash: echo hello"
pub fn extract_tool_use(msg: &AssistantMessage) -> Option<String> {
for block in &msg.content {
if block.block_type == "tool_use" {
let name = block.name.as_deref().unwrap_or("tool");
let detail = block
.input
.as_ref()
.and_then(|v| {
// try common fields: command, pattern, file_path, query
v.get("command")
.or(v.get("pattern"))
.or(v.get("file_path"))
.or(v.get("query"))
.or(v.get("prompt"))
.and_then(|s| s.as_str())
})
.unwrap_or("");
let detail_short = truncate_at_char_boundary(detail, 80);
return Some(format!("{name}: {detail_short}"));
}
}
None
}
pub async fn send_message_draft(
client: &reqwest::Client,
token: &str,
chat_id: i64,
draft_id: i64,
text: &str,
) -> Result<()> {
let url = format!("https://api.telegram.org/bot{token}/sendMessageDraft");
let resp = client
.post(&url)
.json(&serde_json::json!({
"chat_id": chat_id,
"draft_id": draft_id,
"text": text,
}))
.send()
.await?;
let body: serde_json::Value = resp.json().await?;
if body["ok"].as_bool() != Some(true) {
anyhow::bail!("sendMessageDraft: {}", body);
}
Ok(())
}
// ── openai with tool call loop ─────────────────────────────────────
#[allow(clippy::too_many_arguments)]
pub async fn run_openai_with_tools(
endpoint: &str,
model: &str,
api_key: &str,
mut messages: Vec<serde_json::Value>,
bot: &Bot,
chat_id: ChatId,
state: &Arc<AppState>,
sid: &str,
config: &Arc<Config>,
is_private: bool,
) -> Result<String> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.build()
.unwrap();
let url = format!("{}/chat/completions", endpoint.trim_end_matches('/'));
let tools = discover_tools();
loop {
let body = serde_json::json!({
"model": model,
"messages": messages,
"tools": tools,
"stream": true,
});
info!("API request: {} messages, {} tools",
messages.len(),
tools.as_array().map(|a| a.len()).unwrap_or(0));
let resp_raw = client
.post(&url)
.header("Authorization", format!("Bearer {api_key}"))
.json(&body)
.send()
.await?;
if !resp_raw.status().is_success() {
let status = resp_raw.status();
let body_text = resp_raw.text().await.unwrap_or_default();
// dump messages for debugging
for (i, m) in messages.iter().enumerate() {
let role = m["role"].as_str().unwrap_or("?");
let content_len = m["content"].as_str().map(|s| s.len()).unwrap_or(0);
let has_tc = m.get("tool_calls").is_some();
let has_tcid = m.get("tool_call_id").is_some();
warn!(" msg[{i}] role={role} content_len={content_len} tool_calls={has_tc} tool_call_id={has_tcid}");
}
error!("OpenAI API {status}: {body_text}");
anyhow::bail!("OpenAI API {status}: {body_text}");
}
let mut resp = resp_raw;
let token = bot.token().to_owned();
let raw_chat_id = chat_id.0;
let draft_id: i64 = 1;
let mut use_draft = is_private; // sendMessageDraft only works in private chats
let mut msg_id: Option<teloxide::types::MessageId> = None;
let mut accumulated = String::new();
let mut last_edit = Instant::now();
let mut buffer = String::new();
let mut done = false;
// tool call accumulation
let mut tool_calls: Vec<ToolCall> = Vec::new();
let mut has_tool_calls = false;
while let Some(chunk) = resp.chunk().await? {
if done {
break;
}
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(pos) = buffer.find('\n') {
let line = buffer[..pos].to_string();
buffer = buffer[pos + 1..].to_string();
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with(':') {
continue;
}
let data = match trimmed.strip_prefix("data: ") {
Some(d) => d,
None => continue,
};
if data.trim() == "[DONE]" {
done = true;
break;
}
if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
let delta = &json["choices"][0]["delta"];
// handle content delta
if let Some(content) = delta["content"].as_str() {
if !content.is_empty() {
accumulated.push_str(content);
}
}
// handle tool call delta
if let Some(tc_arr) = delta["tool_calls"].as_array() {
has_tool_calls = true;
for tc in tc_arr {
let idx = tc["index"].as_u64().unwrap_or(0) as usize;
while tool_calls.len() <= idx {
tool_calls.push(ToolCall {
id: String::new(),
name: String::new(),
arguments: String::new(),
});
}
if let Some(id) = tc["id"].as_str() {
tool_calls[idx].id = id.to_string();
}
if let Some(name) = tc["function"]["name"].as_str() {
tool_calls[idx].name = name.to_string();
}
if let Some(args) = tc["function"]["arguments"].as_str() {
tool_calls[idx].arguments.push_str(args);
}
}
}
// display update (only when there's content to show)
if accumulated.is_empty() {
continue;
}
{
let interval = if use_draft {
DRAFT_INTERVAL_MS
} else {
EDIT_INTERVAL_MS
};
if last_edit.elapsed().as_millis() < interval as u128 {
continue;
}
let display = if use_draft {
truncate_at_char_boundary(&accumulated, TG_MSG_LIMIT).to_string()
} else {
truncate_for_display(&accumulated)
};
if use_draft {
match send_message_draft(
&client, &token, raw_chat_id, draft_id, &display,
)
.await
{
Ok(_) => {
last_edit = Instant::now();
}
Err(e) => {
warn!("sendMessageDraft failed, falling back: {e:#}");
use_draft = false;
if let Ok(sent) =
bot.send_message(chat_id, &display).await
{
msg_id = Some(sent.id);
last_edit = Instant::now();
}
}
}
} else if let Some(id) = msg_id {
if bot
.edit_message_text(chat_id, id, &display)
.await
.is_ok()
{
last_edit = Instant::now();
}
} else if let Ok(sent) =
bot.send_message(chat_id, &display).await
{
msg_id = Some(sent.id);
last_edit = Instant::now();
}
} // end display block
}
}
}
// decide what to do based on response type
if has_tool_calls && !tool_calls.is_empty() {
// append assistant message with tool calls
let tc_json: Vec<serde_json::Value> = tool_calls
.iter()
.map(|tc| {
serde_json::json!({
"id": tc.id,
"type": "function",
"function": {
"name": tc.name,
"arguments": tc.arguments,
}
})
})
.collect();
let assistant_msg = serde_json::json!({
"role": "assistant",
"content": if accumulated.is_empty() { "" } else { &accumulated },
"tool_calls": tc_json,
});
messages.push(assistant_msg);
// execute each tool
for tc in &tool_calls {
info!(tool = %tc.name, "executing tool call");
let _ = bot
.send_message(chat_id, format!("[{}({})]", tc.name, truncate_at_char_boundary(&tc.arguments, 100)))
.await;
let result =
execute_tool(&tc.name, &tc.arguments, state, bot, chat_id, sid, config)
.await;
messages.push(serde_json::json!({
"role": "tool",
"tool_call_id": tc.id,
"content": result,
}));
}
// clear display state for next round
tool_calls.clear();
// loop back to call API again
continue;
}
// content response — send final result
if !accumulated.is_empty() {
send_final_result(bot, chat_id, msg_id, use_draft, &accumulated).await;
}
return Ok(accumulated);
}
}
// ── claude bridge (streaming) ───────────────────────────────────────
pub async fn invoke_claude_streaming(
sid: &str,
prompt: &str,
known: bool,
bot: &Bot,
chat_id: ChatId,
) -> Result<String> {
if known {
return run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await;
}
match run_claude_streaming(&["--resume", sid], prompt, bot, chat_id).await {
Ok(out) => {
info!(%sid, "resumed existing session");
Ok(out)
}
Err(e) => {
warn!(%sid, "resume failed ({e:#}), creating new session");
run_claude_streaming(&["--session-id", sid], prompt, bot, chat_id).await
}
}
}
pub async fn run_claude_streaming(
extra_args: &[&str],
prompt: &str,
bot: &Bot,
chat_id: ChatId,
) -> Result<String> {
let mut args: Vec<&str> = vec![
"--dangerously-skip-permissions",
"-p",
"--output-format",
"stream-json",
"--verbose",
];
args.extend(extra_args);
args.push(prompt);
let mut child = Command::new("claude")
.args(&args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdout = child.stdout.take().unwrap();
let mut lines = tokio::io::BufReader::new(stdout).lines();
// sendMessageDraft for native streaming, with editMessageText fallback
let http = reqwest::Client::new();
let token = bot.token().to_owned();
let raw_chat_id = chat_id.0;
let draft_id: i64 = 1;
let mut use_draft = true;
let mut msg_id: Option<teloxide::types::MessageId> = None;
let mut last_sent_text = String::new();
let mut last_edit = Instant::now();
let mut final_result = String::new();
let mut is_error = false;
let mut tool_status = String::new();
while let Ok(Some(line)) = lines.next_line().await {
let event: StreamEvent = match serde_json::from_str(&line) {
Ok(e) => e,
Err(_) => continue,
};
match event.event_type.as_str() {
"assistant" => {
if let Some(amsg) = &event.message {
// determine display content
let (display_raw, new_text) =
if let Some(status) = extract_tool_use(amsg) {
tool_status = format!("[{status}]");
let d = if last_sent_text.is_empty() {
tool_status.clone()
} else {
format!("{last_sent_text}\n\n{tool_status}")
};
(d, None)
} else {
let text = extract_text(amsg);
if text.is_empty() || text == last_sent_text {
continue;
}
let interval = if use_draft {
DRAFT_INTERVAL_MS
} else {
EDIT_INTERVAL_MS
};
if last_edit.elapsed().as_millis() < interval as u128 {
continue;
}
tool_status.clear();
(text.clone(), Some(text))
};
let display = if use_draft {
// draft mode: no cursor — cursor breaks monotonic text growth
truncate_at_char_boundary(&display_raw, TG_MSG_LIMIT).to_string()
} else {
truncate_for_display(&display_raw)
};
if use_draft {
match send_message_draft(
&http, &token, raw_chat_id, draft_id, &display,
)
.await
{
Ok(_) => {
if let Some(t) = new_text {
last_sent_text = t;
}
last_edit = Instant::now();
}
Err(e) => {
warn!("sendMessageDraft failed, falling back: {e:#}");
use_draft = false;
if let Ok(sent) =
bot.send_message(chat_id, &display).await
{
msg_id = Some(sent.id);
if let Some(t) = new_text {
last_sent_text = t;
}
last_edit = Instant::now();
}
}
}
} else if let Some(id) = msg_id {
if bot
.edit_message_text(chat_id, id, &display)
.await
.is_ok()
{
if let Some(t) = new_text {
last_sent_text = t;
}
last_edit = Instant::now();
}
} else if let Ok(sent) =
bot.send_message(chat_id, &display).await
{
msg_id = Some(sent.id);
if let Some(t) = new_text {
last_sent_text = t;
}
last_edit = Instant::now();
}
}
}
"result" => {
final_result = event.result.unwrap_or_default();
is_error = event.is_error;
}
_ => {}
}
}
// read stderr before waiting (in case child already exited)
let stderr_handle = child.stderr.take();
let status = child.wait().await;
// collect stderr for diagnostics
let stderr_text = if let Some(mut se) = stderr_handle {
let mut buf = String::new();
let _ = tokio::io::AsyncReadExt::read_to_string(&mut se, &mut buf).await;
buf
} else {
String::new()
};
// determine error: explicit is_error from stream, or non-zero exit with no result
let has_error = is_error
|| (final_result.is_empty()
&& status.as_ref().map(|s| !s.success()).unwrap_or(true));
if has_error {
let err_detail = if !final_result.is_empty() {
final_result.clone()
} else if !stderr_text.is_empty() {
stderr_text.trim().to_string()
} else {
format!("claude exited: {:?}", status)
};
if !use_draft {
if let Some(id) = msg_id {
let _ = bot
.edit_message_text(chat_id, id, format!("[error] {err_detail}"))
.await;
}
}
anyhow::bail!("{err_detail}");
}
if final_result.is_empty() {
return Ok(final_result);
}
send_final_result(bot, chat_id, msg_id, use_draft, &final_result).await;
Ok(final_result)
}
// ── openai-compatible backend (streaming) ──────────────────────────
pub async fn run_openai_streaming(
endpoint: &str,
model: &str,
api_key: &str,
messages: &[serde_json::Value],
bot: &Bot,
chat_id: ChatId,
) -> Result<String> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.build()
.unwrap();
let url = format!("{}/chat/completions", endpoint.trim_end_matches('/'));
let body = serde_json::json!({
"model": model,
"messages": messages,
"stream": true,
});
let mut resp = client
.post(&url)
.header("Authorization", format!("Bearer {api_key}"))
.json(&body)
.send()
.await?
.error_for_status()?;
let token = bot.token().to_owned();
let raw_chat_id = chat_id.0;
let draft_id: i64 = 1;
let mut use_draft = true;
let mut msg_id: Option<teloxide::types::MessageId> = None;
let mut accumulated = String::new();
let mut last_edit = Instant::now();
let mut buffer = String::new();
let mut done = false;
while let Some(chunk) = resp.chunk().await? {
if done {
break;
}
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(pos) = buffer.find('\n') {
let line = buffer[..pos].to_string();
buffer = buffer[pos + 1..].to_string();
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with(':') {
continue;
}
let data = match trimmed.strip_prefix("data: ") {
Some(d) => d,
None => continue,
};
if data.trim() == "[DONE]" {
done = true;
break;
}
if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
if let Some(content) = json["choices"][0]["delta"]["content"].as_str() {
if content.is_empty() {
continue;
}
accumulated.push_str(content);
let interval = if use_draft {
DRAFT_INTERVAL_MS
} else {
EDIT_INTERVAL_MS
};
if last_edit.elapsed().as_millis() < interval as u128 {
continue;
}
let display = if use_draft {
truncate_at_char_boundary(&accumulated, TG_MSG_LIMIT).to_string()
} else {
truncate_for_display(&accumulated)
};
if use_draft {
match send_message_draft(
&client, &token, raw_chat_id, draft_id, &display,
)
.await
{
Ok(_) => {
last_edit = Instant::now();
}
Err(e) => {
warn!("sendMessageDraft failed, falling back: {e:#}");
use_draft = false;
if let Ok(sent) = bot.send_message(chat_id, &display).await {
msg_id = Some(sent.id);
last_edit = Instant::now();
}
}
}
} else if let Some(id) = msg_id {
if bot.edit_message_text(chat_id, id, &display).await.is_ok() {
last_edit = Instant::now();
}
} else if let Ok(sent) = bot.send_message(chat_id, &display).await {
msg_id = Some(sent.id);
last_edit = Instant::now();
}
}
}
}
}
if accumulated.is_empty() {
return Ok(accumulated);
}
send_final_result(bot, chat_id, msg_id, use_draft, &accumulated).await;
Ok(accumulated)
}
pub fn build_system_prompt(summary: &str, persona: &str, memory_slots: &[(i32, String)], inner_state: &str) -> serde_json::Value {
let mut text = if persona.is_empty() {
String::from("你是一个AI助手。")
} else {
persona.to_string()
};
text.push_str(
"\n\n你可以使用提供的工具来完成任务。\
当需要执行命令、运行代码或启动复杂子任务时,直接调用对应的工具,不要只是描述你会怎么做。\
当需要搜索信息(如网页搜索、资料查找、技术调研等)时,使用 spawn_agent 启动一个子代理来完成搜索任务,\
子代理可以使用浏览器和搜索引擎,搜索完成后你会收到结果通知。\
输出格式使用纯文本或基础Markdown加粗、列表、代码块\
不要使用LaTeX公式$...$、特殊Unicode符号→←↔或HTML标签Telegram无法渲染这些。",
);
if !memory_slots.is_empty() {
text.push_str("\n\n## 持久记忆(跨会话保留)\n");
for (nr, content) in memory_slots {
text.push_str(&format!("[{nr}] {content}\n"));
}
}
if !inner_state.is_empty() {
text.push_str("\n\n## 你的内在状态\n");
text.push_str(inner_state);
}
if !summary.is_empty() {
text.push_str("\n\n## 之前的对话总结\n");
text.push_str(summary);
}
serde_json::json!({"role": "system", "content": text})
}
pub async fn summarize_messages(
endpoint: &str,
model: &str,
api_key: &str,
existing_summary: &str,
dropped: &[serde_json::Value],
) -> Result<String> {
let msgs_text: String = dropped
.iter()
.filter_map(|m| {
let role = m["role"].as_str()?;
let content = m["content"].as_str()?;
Some(format!("{role}: {content}"))
})
.collect::<Vec<_>>()
.join("\n\n");
let prompt = if existing_summary.is_empty() {
format!(
"请将以下对话总结为约4000字符的摘要保留关键信息和上下文\n\n{}",
msgs_text
)
} else {
format!(
"请将以下新对话内容整合到现有总结中保持总结在约4000字符以内。\
保留重要信息,让较旧的话题自然淡出。\n\n\
现有总结:\n{}\n\n新对话:\n{}",
existing_summary, msgs_text
)
};
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.build()
.unwrap();
let url = format!("{}/chat/completions", endpoint.trim_end_matches('/'));
let body = serde_json::json!({
"model": model,
"messages": [
{"role": "system", "content": "你是一个对话总结助手。请生成简洁但信息丰富的总结。"},
{"role": "user", "content": prompt}
],
});
let resp = client
.post(&url)
.header("Authorization", format!("Bearer {api_key}"))
.json(&body)
.send()
.await?
.error_for_status()?;
let json: serde_json::Value = resp.json().await?;
let summary = json["choices"][0]["message"]["content"]
.as_str()
.unwrap_or("")
.to_string();
Ok(summary)
}

685
src/tools.rs Normal file
View File

@@ -0,0 +1,685 @@
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use anyhow::Result;
use teloxide::prelude::*;
use teloxide::types::InputFile;
use tokio::io::AsyncBufReadExt;
use tokio::process::Command;
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use crate::config::{BackendConfig, Config};
use crate::display::truncate_at_char_boundary;
use crate::state::AppState;
use crate::stream::{build_system_prompt, run_openai_streaming};
// ── subagent & tool call ───────────────────────────────────────────
pub struct SubAgent {
pub task: String,
pub output: Arc<RwLock<String>>,
pub completed: Arc<AtomicBool>,
pub exit_code: Arc<RwLock<Option<i32>>>,
pub pid: Option<u32>,
}
pub struct ToolCall {
pub id: String,
pub name: String,
pub arguments: String,
}
pub fn tools_dir() -> PathBuf {
// tools/ relative to the config file location
let config_path = std::env::var("NOC_CONFIG").unwrap_or_else(|_| "config.yaml".into());
let config_dir = Path::new(&config_path)
.parent()
.unwrap_or(Path::new("."));
config_dir.join("tools")
}
/// Scan tools/ directory for scripts with --schema, merge with built-in tools.
/// Called on every API request so new/updated scripts take effect immediately.
pub fn discover_tools() -> serde_json::Value {
let mut tools = vec![
serde_json::json!({
"type": "function",
"function": {
"name": "spawn_agent",
"description": "启动一个 Claude Code 子代理异步执行复杂任务。子代理可使用 shell、浏览器和搜索引擎适合网页搜索、资料查找、技术调研、代码任务等。完成后会收到通知。",
"parameters": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "简短唯一标识符(如 'research'、'fix-bug'"},
"task": {"type": "string", "description": "给子代理的详细任务描述"}
},
"required": ["id", "task"]
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "agent_status",
"description": "查看正在运行或已完成的子代理的状态和输出",
"parameters": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "子代理标识符"}
},
"required": ["id"]
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "kill_agent",
"description": "终止一个正在运行的子代理",
"parameters": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "子代理标识符"}
},
"required": ["id"]
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "send_file",
"description": "通过 Telegram 向用户发送服务器上的文件,文件必须存在于服务器文件系统中。",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "服务器上文件的绝对路径"},
"caption": {"type": "string", "description": "可选的文件说明/描述"}
},
"required": ["path"]
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "update_inner_state",
"description": "更新你的内在状态。这是你自己的持续意识跨会话保留Life Loop 和对话都能看到。记录你对当前情况的理解、正在跟踪的事、对 Fam 状态的感知等。",
"parameters": {
"type": "object",
"properties": {
"content": {"type": "string", "description": "完整的内在状态文本(替换之前的)"}
},
"required": ["content"]
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "update_scratch",
"description": "更新你的草稿区(工作笔记、状态、提醒)。草稿区内容会附加到每条用户消息中,确保你始终可见。用于跨轮次跟踪上下文。",
"parameters": {
"type": "object",
"properties": {
"content": {"type": "string", "description": "完整的草稿区内容(替换之前的内容)"}
},
"required": ["content"]
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "set_timer",
"description": "Set a timer that will fire in the future. Supports: '5min'/'2h' (relative), 'once:2026-04-10 09:00' (absolute), 'cron:0 8 * * *' (recurring). When fired, you'll receive the label as a prompt.",
"parameters": {
"type": "object",
"properties": {
"schedule": {"type": "string", "description": "Timer schedule: e.g. '5min', '1h', 'once:2026-04-10 09:00', 'cron:30 8 * * *'"},
"label": {"type": "string", "description": "What this timer is for — this text will be sent to you when it fires"}
},
"required": ["schedule", "label"]
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "list_timers",
"description": "List all active timers",
"parameters": {
"type": "object",
"properties": {},
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "cancel_timer",
"description": "Cancel a timer by ID",
"parameters": {
"type": "object",
"properties": {
"timer_id": {"type": "integer", "description": "Timer ID from list_timers"}
},
"required": ["timer_id"]
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "update_memory",
"description": "写入持久记忆槽。共 100 个槽位0-99跨会话保留。记忆槽内容会注入到每次对话的 system prompt 中。用于存储关键事实、用户偏好或重要上下文。内容设为空字符串可清除槽位。",
"parameters": {
"type": "object",
"properties": {
"slot_nr": {"type": "integer", "description": "槽位编号0-99"},
"content": {"type": "string", "description": "要存储的内容最多200字符空字符串表示清除该槽位"}
},
"required": ["slot_nr", "content"]
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "gen_voice",
"description": "将文字合成为语音并直接发送给用户。",
"parameters": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "要合成语音的文字内容"}
},
"required": ["text"]
}
}
}),
];
// discover script tools
let dir = tools_dir();
if let Ok(entries) = std::fs::read_dir(&dir) {
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
// run --schema with a short timeout
let output = std::process::Command::new(&path)
.arg("--schema")
.output();
match output {
Ok(out) if out.status.success() => {
let stdout = String::from_utf8_lossy(&out.stdout);
match serde_json::from_str::<serde_json::Value>(stdout.trim()) {
Ok(schema) => {
let name = schema["name"].as_str().unwrap_or("?");
info!(tool = %name, path = %path.display(), "discovered script tool");
tools.push(serde_json::json!({
"type": "function",
"function": schema,
}));
}
Err(e) => {
warn!(path = %path.display(), "invalid --schema JSON: {e}");
}
}
}
_ => {} // not a tool script, skip silently
}
}
}
serde_json::Value::Array(tools)
}
// ── tool execution ─────────────────────────────────────────────────
pub async fn execute_tool(
name: &str,
arguments: &str,
state: &Arc<AppState>,
bot: &Bot,
chat_id: ChatId,
sid: &str,
config: &Arc<Config>,
) -> String {
let args: serde_json::Value = match serde_json::from_str(arguments) {
Ok(v) => v,
Err(e) => return format!("Invalid arguments: {e}"),
};
match name {
"spawn_agent" => {
let id = args["id"].as_str().unwrap_or("agent");
let task = args["task"].as_str().unwrap_or("");
spawn_agent(id, task, state, bot, chat_id, sid, config).await
}
"agent_status" => {
let id = args["id"].as_str().unwrap_or("");
check_agent_status(id, state).await
}
"kill_agent" => {
let id = args["id"].as_str().unwrap_or("");
kill_agent(id, state).await
}
"send_file" => {
let path_str = args["path"].as_str().unwrap_or("");
let caption = args["caption"].as_str().unwrap_or("");
let path = Path::new(path_str);
if !path.exists() {
return format!("File not found: {path_str}");
}
if !path.is_file() {
return format!("Not a file: {path_str}");
}
let input_file = InputFile::file(path);
let mut req = bot.send_document(chat_id, input_file);
if !caption.is_empty() {
req = req.caption(caption);
}
match req.await {
Ok(_) => format!("File sent: {path_str}"),
Err(e) => format!("Failed to send file: {e:#}"),
}
}
"update_inner_state" => {
let content = args["content"].as_str().unwrap_or("");
state.set_inner_state(content).await;
format!("Inner state updated ({} chars)", content.len())
}
"update_scratch" => {
let content = args["content"].as_str().unwrap_or("");
state.push_scratch(content).await;
format!("Scratch updated ({} chars)", content.len())
}
"set_timer" => {
let schedule = args["schedule"].as_str().unwrap_or("");
let label = args["label"].as_str().unwrap_or("");
match parse_next_fire(schedule) {
Ok(next) => {
let next_str = next.format("%Y-%m-%d %H:%M:%S").to_string();
let id = state
.add_timer(chat_id.0, label, schedule, &next_str)
.await;
format!("Timer #{id} set: \"{label}\" → next fire at {next_str}")
}
Err(e) => format!("Invalid schedule '{schedule}': {e}"),
}
}
"list_timers" => {
let timers = state.list_timers(Some(chat_id.0)).await;
if timers.is_empty() {
"No active timers.".to_string()
} else {
timers
.iter()
.map(|(id, _, label, sched, next, enabled)| {
let status = if *enabled { "" } else { " [disabled]" };
format!("#{id}: \"{label}\" ({sched}) → {next}{status}")
})
.collect::<Vec<_>>()
.join("\n")
}
}
"cancel_timer" => {
let tid = args["timer_id"].as_i64().unwrap_or(0);
if state.cancel_timer(tid).await {
format!("Timer #{tid} cancelled")
} else {
format!("Timer #{tid} not found")
}
}
"update_memory" => {
let slot_nr = args["slot_nr"].as_i64().unwrap_or(-1) as i32;
let content = args["content"].as_str().unwrap_or("");
match state.set_memory_slot(slot_nr, content).await {
Ok(_) => {
if content.is_empty() {
format!("Memory slot {slot_nr} cleared")
} else {
format!("Memory slot {slot_nr} updated ({} chars)", content.len())
}
}
Err(e) => format!("Error: {e}"),
}
}
"gen_voice" => {
let text = args["text"].as_str().unwrap_or("");
if text.is_empty() {
return "Error: text is required".to_string();
}
let script = tools_dir().join("gen_voice");
let result = tokio::time::timeout(
std::time::Duration::from_secs(120),
tokio::process::Command::new(&script)
.arg(arguments)
.output(),
)
.await;
match result {
Ok(Ok(out)) if out.status.success() => {
let path_str = String::from_utf8_lossy(&out.stdout).trim().to_string();
let path = Path::new(&path_str);
if path.exists() {
let input_file = InputFile::file(path);
match bot.send_voice(chat_id, input_file).await {
Ok(_) => format!("语音已发送: {path_str}"),
Err(e) => format!("语音生成成功但发送失败: {e:#}"),
}
} else {
format!("语音生成失败: 输出文件不存在 ({path_str})")
}
}
Ok(Ok(out)) => {
let stderr = String::from_utf8_lossy(&out.stderr);
let stdout = String::from_utf8_lossy(&out.stdout);
format!("gen_voice failed: {stdout} {stderr}")
}
Ok(Err(e)) => format!("gen_voice exec error: {e}"),
Err(_) => "gen_voice timeout (120s)".to_string(),
}
}
_ => run_script_tool(name, arguments).await,
}
}
pub async fn spawn_agent(
id: &str,
task: &str,
state: &Arc<AppState>,
bot: &Bot,
chat_id: ChatId,
sid: &str,
config: &Arc<Config>,
) -> String {
// check if already exists
if state.agents.read().await.contains_key(id) {
return format!("Agent '{id}' already exists. Use agent_status to check it.");
}
let mut child = match Command::new("claude")
.args(["--dangerously-skip-permissions", "-p", task])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => return format!("Failed to spawn agent: {e}"),
};
let pid = child.id();
let output = Arc::new(tokio::sync::RwLock::new(String::new()));
let completed = Arc::new(AtomicBool::new(false));
let exit_code = Arc::new(tokio::sync::RwLock::new(None));
let agent = Arc::new(SubAgent {
task: task.to_string(),
output: output.clone(),
completed: completed.clone(),
exit_code: exit_code.clone(),
pid,
});
state.agents.write().await.insert(id.to_string(), agent);
// background task: collect output and wakeup on completion
let out = output.clone();
let done = completed.clone();
let ecode = exit_code.clone();
let bot_c = bot.clone();
let chat_id_c = chat_id;
let state_c = state.clone();
let config_c = config.clone();
let sid_c = sid.to_string();
let id_c = id.to_string();
tokio::spawn(async move {
let stdout = child.stdout.take();
if let Some(stdout) = stdout {
let mut lines = tokio::io::BufReader::new(stdout).lines();
while let Ok(Some(line)) = lines.next_line().await {
let mut o = out.write().await;
o.push_str(&line);
o.push('\n');
}
}
let status = child.wait().await;
let code = status.as_ref().ok().and_then(|s| s.code());
*ecode.write().await = code;
done.store(true, Ordering::SeqCst);
info!(agent = %id_c, "agent completed, exit={code:?}");
// wakeup: inject result and trigger LLM
let result = out.read().await.clone();
let result_short = truncate_at_char_boundary(&result, 4000);
let wakeup = format!(
"[Agent '{id_c}' 执行完成 (exit={})]\n{result_short}",
code.unwrap_or(-1)
);
if let Err(e) = agent_wakeup(
&config_c, &state_c, &bot_c, chat_id_c, &sid_c, &wakeup, &id_c,
)
.await
{
error!(agent = %id_c, "wakeup failed: {e:#}");
let _ = bot_c
.send_message(chat_id_c, format!("[agent wakeup error] {e:#}"))
.await;
}
});
format!("Agent '{id}' spawned (pid={pid:?})")
}
pub async fn agent_wakeup(
config: &Config,
state: &AppState,
bot: &Bot,
chat_id: ChatId,
sid: &str,
wakeup_msg: &str,
agent_id: &str,
) -> Result<()> {
match &config.backend {
BackendConfig::OpenAI {
endpoint,
model,
api_key,
} => {
state.push_message(sid, "user", wakeup_msg).await;
let conv = state.load_conv(sid).await;
let persona = state.get_config("persona").await.unwrap_or_default();
let memory_slots = state.get_memory_slots().await;
let inner = state.get_inner_state().await;
let system_msg = build_system_prompt(&conv.summary, &persona, &memory_slots, &inner);
let mut api_messages = vec![system_msg];
api_messages.extend(conv.messages);
info!(agent = %agent_id, "wakeup: sending {} messages to LLM", api_messages.len());
let response =
run_openai_streaming(endpoint, model, api_key, &api_messages, bot, chat_id)
.await?;
if !response.is_empty() {
state.push_message(sid, "assistant", &response).await;
}
Ok(())
}
_ => {
let _ = bot
.send_message(chat_id, format!("[Agent '{agent_id}' done]\n{wakeup_msg}"))
.await;
Ok(())
}
}
}
pub async fn check_agent_status(id: &str, state: &AppState) -> String {
let agents = state.agents.read().await;
match agents.get(id) {
Some(agent) => {
let status = if agent.completed.load(Ordering::SeqCst) {
let code = agent.exit_code.read().await;
format!("completed (exit={})", code.unwrap_or(-1))
} else {
"running".to_string()
};
let output = agent.output.read().await;
let out_preview = truncate_at_char_boundary(&output, 3000);
format!(
"Agent '{id}': {status}\nTask: {}\nOutput ({} bytes):\n{out_preview}",
agent.task,
output.len()
)
}
None => format!("Agent '{id}' not found"),
}
}
pub async fn kill_agent(id: &str, state: &AppState) -> String {
let agents = state.agents.read().await;
match agents.get(id) {
Some(agent) => {
if agent.completed.load(Ordering::SeqCst) {
return format!("Agent '{id}' already completed");
}
if let Some(pid) = agent.pid {
unsafe {
libc::kill(pid as i32, libc::SIGTERM);
}
format!("Sent SIGTERM to agent '{id}' (pid={pid})")
} else {
format!("Agent '{id}' has no PID")
}
}
None => format!("Agent '{id}' not found"),
}
}
pub async fn run_script_tool(name: &str, arguments: &str) -> String {
// find script in tools/ that matches this tool name
let dir = tools_dir();
let entries = match std::fs::read_dir(&dir) {
Ok(e) => e,
Err(_) => return format!("Unknown tool: {name}"),
};
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
// check if this script provides the requested tool
let schema_out = std::process::Command::new(&path)
.arg("--schema")
.output();
if let Ok(out) = schema_out {
if out.status.success() {
let stdout = String::from_utf8_lossy(&out.stdout);
if let Ok(schema) = serde_json::from_str::<serde_json::Value>(stdout.trim()) {
if schema["name"].as_str() == Some(name) {
// found it — execute
info!(tool = %name, path = %path.display(), "running script tool");
let result = tokio::time::timeout(
std::time::Duration::from_secs(60),
Command::new(&path).arg(arguments).output(),
)
.await;
return match result {
Ok(Ok(output)) => {
let mut s = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr);
if !stderr.is_empty() {
if !s.is_empty() {
s.push_str("\n[stderr]\n");
}
s.push_str(&stderr);
}
if s.is_empty() {
format!("(exit={})", output.status.code().unwrap_or(-1))
} else {
s
}
}
Ok(Err(e)) => format!("Failed to execute {name}: {e}"),
Err(_) => "Timeout after 60s".to_string(),
};
}
}
}
}
}
format!("Unknown tool: {name}")
}
// ── schedule parsing ───────────────────────────────────────────────
pub fn parse_next_fire(schedule: &str) -> Result<chrono::DateTime<chrono::Local>> {
let now = chrono::Local::now();
// relative: "5min", "2h", "30s", "1d"
if let Some(val) = schedule
.strip_suffix("min")
.or_else(|| schedule.strip_suffix("m"))
{
let mins: i64 = val.trim().parse().map_err(|e| anyhow::anyhow!("{e}"))?;
return Ok(now + chrono::Duration::minutes(mins));
}
if let Some(val) = schedule.strip_suffix('h') {
let hours: i64 = val.trim().parse().map_err(|e| anyhow::anyhow!("{e}"))?;
return Ok(now + chrono::Duration::hours(hours));
}
if let Some(val) = schedule.strip_suffix('s') {
let secs: i64 = val.trim().parse().map_err(|e| anyhow::anyhow!("{e}"))?;
return Ok(now + chrono::Duration::seconds(secs));
}
if let Some(val) = schedule.strip_suffix('d') {
let days: i64 = val.trim().parse().map_err(|e| anyhow::anyhow!("{e}"))?;
return Ok(now + chrono::Duration::days(days));
}
// absolute: "once:2026-04-10 09:00"
if let Some(dt_str) = schedule.strip_prefix("once:") {
let dt = chrono::NaiveDateTime::parse_from_str(dt_str.trim(), "%Y-%m-%d %H:%M")
.or_else(|_| {
chrono::NaiveDateTime::parse_from_str(dt_str.trim(), "%Y-%m-%d %H:%M:%S")
})
.map_err(|e| anyhow::anyhow!("parse datetime: {e}"))?;
return Ok(dt.and_local_timezone(chrono::Local).unwrap());
}
// cron: "cron:30 8 * * *"
if let Some(expr) = schedule.strip_prefix("cron:") {
let cron_schedule = expr
.trim()
.parse::<cron::Schedule>()
.map_err(|e| anyhow::anyhow!("parse cron: {e}"))?;
let next = cron_schedule
.upcoming(chrono::Local)
.next()
.ok_or_else(|| anyhow::anyhow!("no upcoming time for cron"))?;
return Ok(next);
}
anyhow::bail!("unknown schedule format: {schedule}")
}
pub fn compute_next_cron_fire(schedule: &str) -> Option<String> {
let expr = schedule.strip_prefix("cron:")?;
let cron_schedule = expr.trim().parse::<cron::Schedule>().ok()?;
let next = cron_schedule.upcoming(chrono::Local).next()?;
Some(next.format("%Y-%m-%d %H:%M:%S").to_string())
}

365
tests/tool_call.rs Normal file
View File

@@ -0,0 +1,365 @@
//! Integration test: verify tool call round-trip with Ollama's OpenAI-compatible API.
//! Requires Ollama running at OLLAMA_URL (default: http://100.84.7.49:11434).
use serde_json::json;
const OLLAMA_URL: &str = "http://100.84.7.49:11434/v1";
const MODEL: &str = "gemma4:31b";
fn tools() -> serde_json::Value {
json!([{
"type": "function",
"function": {
"name": "calculator",
"description": "Calculate a math expression",
"parameters": {
"type": "object",
"properties": {
"expression": {"type": "string", "description": "Math expression to evaluate"}
},
"required": ["expression"]
}
}
}])
}
/// Test non-streaming tool call round-trip
#[tokio::test]
#[ignore] // requires Ollama on ailab
async fn test_tool_call_roundtrip_non_streaming() {
let client = reqwest::Client::new();
let url = format!("{OLLAMA_URL}/chat/completions");
// Round 1: ask the model to use the calculator
let body = json!({
"model": MODEL,
"messages": [
{"role": "user", "content": "What is 2+2? Use the calculator tool."}
],
"tools": tools(),
});
let resp = client.post(&url).json(&body).send().await.unwrap();
assert!(resp.status().is_success(), "Round 1 failed: {}", resp.status());
let result: serde_json::Value = resp.json().await.unwrap();
let choice = &result["choices"][0];
assert_eq!(
choice["finish_reason"].as_str().unwrap(),
"tool_calls",
"Expected tool_calls finish_reason, got: {choice}"
);
let tool_calls = choice["message"]["tool_calls"].as_array().unwrap();
assert!(!tool_calls.is_empty(), "No tool calls returned");
let tc = &tool_calls[0];
let call_id = tc["id"].as_str().unwrap();
let func_name = tc["function"]["name"].as_str().unwrap();
assert_eq!(func_name, "calculator");
// Round 2: send tool result back
let body2 = json!({
"model": MODEL,
"messages": [
{"role": "user", "content": "What is 2+2? Use the calculator tool."},
{
"role": "assistant",
"content": "",
"tool_calls": [{
"id": call_id,
"type": "function",
"function": {
"name": func_name,
"arguments": tc["function"]["arguments"].as_str().unwrap()
}
}]
},
{
"role": "tool",
"tool_call_id": call_id,
"content": "4"
}
],
"tools": tools(),
});
let resp2 = client.post(&url).json(&body2).send().await.unwrap();
let status2 = resp2.status();
let body2_text = resp2.text().await.unwrap();
assert!(
status2.is_success(),
"Round 2 failed ({status2}): {body2_text}"
);
let result2: serde_json::Value = serde_json::from_str(&body2_text).unwrap();
let content = result2["choices"][0]["message"]["content"]
.as_str()
.unwrap_or("");
assert!(!content.is_empty(), "Expected content in round 2 response");
println!("Round 2 response: {content}");
}
/// Test tool call with conversation history (simulates real scenario)
#[tokio::test]
#[ignore] // requires Ollama on ailab
async fn test_tool_call_with_history() {
let client = reqwest::Client::new();
let url = format!("{OLLAMA_URL}/chat/completions");
// Simulate real message history with system prompt
let body = json!({
"model": MODEL,
"stream": true,
"messages": [
{"role": "system", "content": "你是一个AI助手。你可以使用提供的工具来完成任务。当需要执行命令、运行代码或启动复杂子任务时直接调用对应的工具不要只是描述你会怎么做。"},
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "Hello!"},
{"role": "user", "content": "What is 3+4? Use the calculator."}
],
"tools": tools(),
});
// Round 1: expect tool call
let mut resp = client.post(&url).json(&body).send().await.unwrap();
assert!(resp.status().is_success(), "Round 1 failed: {}", resp.status());
let mut buffer = String::new();
let mut tc_id = String::new();
let mut tc_name = String::new();
let mut tc_args = String::new();
let mut has_tc = false;
while let Some(chunk) = resp.chunk().await.unwrap() {
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(pos) = buffer.find('\n') {
let line = buffer[..pos].to_string();
buffer = buffer[pos + 1..].to_string();
if let Some(data) = line.trim().strip_prefix("data: ") {
if data.trim() == "[DONE]" { break; }
if let Ok(j) = serde_json::from_str::<serde_json::Value>(data) {
if let Some(arr) = j["choices"][0]["delta"]["tool_calls"].as_array() {
has_tc = true;
for tc in arr {
if let Some(id) = tc["id"].as_str() { tc_id = id.into(); }
if let Some(n) = tc["function"]["name"].as_str() { tc_name = n.into(); }
if let Some(a) = tc["function"]["arguments"].as_str() { tc_args.push_str(a); }
}
}
}
}
}
}
assert!(has_tc, "Expected tool call, got content only");
println!("Tool: {tc_name}({tc_args}) id={tc_id}");
// Round 2: tool result → expect content
let body2 = json!({
"model": MODEL,
"stream": true,
"messages": [
{"role": "system", "content": "你是一个AI助手。"},
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "Hello!"},
{"role": "user", "content": "What is 3+4? Use the calculator."},
{"role": "assistant", "content": "", "tool_calls": [{"id": tc_id, "type": "function", "function": {"name": tc_name, "arguments": tc_args}}]},
{"role": "tool", "tool_call_id": tc_id, "content": "7"}
],
"tools": tools(),
});
let resp2 = client.post(&url).json(&body2).send().await.unwrap();
let status = resp2.status();
if !status.is_success() {
let err = resp2.text().await.unwrap();
panic!("Round 2 failed ({status}): {err}");
}
let mut resp2 = client.post(&url).json(&body2).send().await.unwrap();
let mut content = String::new();
let mut buf2 = String::new();
while let Some(chunk) = resp2.chunk().await.unwrap() {
buf2.push_str(&String::from_utf8_lossy(&chunk));
while let Some(pos) = buf2.find('\n') {
let line = buf2[..pos].to_string();
buf2 = buf2[pos + 1..].to_string();
if let Some(data) = line.trim().strip_prefix("data: ") {
if data.trim() == "[DONE]" { break; }
if let Ok(j) = serde_json::from_str::<serde_json::Value>(data) {
if let Some(c) = j["choices"][0]["delta"]["content"].as_str() {
content.push_str(c);
}
}
}
}
}
println!("Final response: {content}");
assert!(!content.is_empty(), "Expected non-empty content in round 2");
}
/// Test multimodal image input
#[tokio::test]
#[ignore] // requires Ollama on ailab
async fn test_image_multimodal() {
let client = reqwest::Client::new();
let url = format!("{OLLAMA_URL}/chat/completions");
// 2x2 red PNG generated by PIL
let b64 = "iVBORw0KGgoAAAANSUhEUgAAAAIAAAACCAIAAAD91JpzAAAAFklEQVR4nGP8z8DAwMDAxMDAwMDAAAANHQEDasKb6QAAAABJRU5ErkJggg==";
let body = json!({
"model": MODEL,
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": "What color is this image? Reply with just the color name."},
{"type": "image_url", "image_url": {"url": format!("data:image/png;base64,{b64}")}}
]
}],
});
let resp = client.post(&url).json(&body).send().await.unwrap();
let status = resp.status();
let text = resp.text().await.unwrap();
assert!(status.is_success(), "Multimodal request failed ({status}): {text}");
let result: serde_json::Value = serde_json::from_str(&text).unwrap();
let content = result["choices"][0]["message"]["content"]
.as_str()
.unwrap_or("");
println!("Image description: {content}");
assert!(!content.is_empty(), "Expected non-empty response for image");
}
/// Test streaming tool call round-trip (matches our actual code path)
#[tokio::test]
#[ignore] // requires Ollama on ailab
async fn test_tool_call_roundtrip_streaming() {
let client = reqwest::Client::new();
let url = format!("{OLLAMA_URL}/chat/completions");
// Round 1: streaming, get tool calls
let body = json!({
"model": MODEL,
"stream": true,
"messages": [
{"role": "user", "content": "What is 7*6? Use the calculator tool."}
],
"tools": tools(),
});
let mut resp = client.post(&url).json(&body).send().await.unwrap();
assert!(resp.status().is_success(), "Round 1 streaming failed");
// Parse SSE to extract tool calls
let mut buffer = String::new();
let mut tool_call_id = String::new();
let mut tool_call_name = String::new();
let mut tool_call_args = String::new();
let mut has_tool_calls = false;
while let Some(chunk) = resp.chunk().await.unwrap() {
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(pos) = buffer.find('\n') {
let line = buffer[..pos].to_string();
buffer = buffer[pos + 1..].to_string();
let trimmed = line.trim();
if let Some(data) = trimmed.strip_prefix("data: ") {
if data.trim() == "[DONE]" {
break;
}
if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
let delta = &json["choices"][0]["delta"];
if let Some(tc_arr) = delta["tool_calls"].as_array() {
has_tool_calls = true;
for tc in tc_arr {
if let Some(id) = tc["id"].as_str() {
tool_call_id = id.to_string();
}
if let Some(name) = tc["function"]["name"].as_str() {
tool_call_name = name.to_string();
}
if let Some(args) = tc["function"]["arguments"].as_str() {
tool_call_args.push_str(args);
}
}
}
}
}
}
}
assert!(has_tool_calls, "No tool calls in streaming response");
assert_eq!(tool_call_name, "calculator");
println!("Tool call: {tool_call_name}({tool_call_args}) id={tool_call_id}");
// Round 2: send tool result, streaming
let body2 = json!({
"model": MODEL,
"stream": true,
"messages": [
{"role": "user", "content": "What is 7*6? Use the calculator tool."},
{
"role": "assistant",
"content": "",
"tool_calls": [{
"id": tool_call_id,
"type": "function",
"function": {
"name": tool_call_name,
"arguments": tool_call_args
}
}]
},
{
"role": "tool",
"tool_call_id": tool_call_id,
"content": "42"
}
],
"tools": tools(),
});
let resp2 = client.post(&url).json(&body2).send().await.unwrap();
let status2 = resp2.status();
if !status2.is_success() {
let err = resp2.text().await.unwrap();
panic!("Round 2 streaming failed ({status2}): {err}");
}
// Collect content from streaming response
let mut resp2 = client
.post(&url)
.json(&body2)
.send()
.await
.unwrap();
let mut content = String::new();
let mut buffer2 = String::new();
while let Some(chunk) = resp2.chunk().await.unwrap() {
buffer2.push_str(&String::from_utf8_lossy(&chunk));
while let Some(pos) = buffer2.find('\n') {
let line = buffer2[..pos].to_string();
buffer2 = buffer2[pos + 1..].to_string();
let trimmed = line.trim();
if let Some(data) = trimmed.strip_prefix("data: ") {
if data.trim() == "[DONE]" {
break;
}
if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
if let Some(c) = json["choices"][0]["delta"]["content"].as_str() {
content.push_str(c);
}
}
}
}
}
assert!(!content.is_empty(), "Expected content in round 2 streaming");
println!("Round 2 streaming content: {content}");
}

152
tools/gen_voice Executable file
View File

@@ -0,0 +1,152 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = ["requests"]
# ///
"""Generate voice audio using IndexTTS2 with a fixed reference voice.
Usage:
./gen_voice --schema
./gen_voice '{"text":"你好世界"}'
./gen_voice 你好世界
"""
import json
import os
import sys
import time
import requests
INDEXTTS_URL = "http://100.107.41.75:7860"
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
REF_AUDIO = os.path.join(SCRIPT_DIR, "..", "assets", "ref_voice.mp3")
OUTPUT_DIR = os.path.expanduser("~/down")
# cache the uploaded ref path to avoid re-uploading
_CACHE_FILE = "/tmp/noc_gen_voice_ref_cache.json"
SCHEMA = {
"name": "gen_voice",
"description": "Generate speech audio from text using voice cloning (IndexTTS2). Returns the file path of the generated wav. Use send_file to send it to the user.",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "The text to synthesize into speech",
},
},
"required": ["text"],
},
}
def get_ref_path():
"""Upload ref audio once, cache the server-side path. Invalidate if server restarted."""
# check cache — validate against server uptime
if os.path.exists(_CACHE_FILE):
try:
with open(_CACHE_FILE) as f:
cache = json.load(f)
# quick health check — if server is up and path exists, reuse
r = requests.head(f"{INDEXTTS_URL}/gradio_api/file={cache['path']}", timeout=3)
if r.status_code == 200:
return cache["path"]
except Exception:
pass
# upload
with open(REF_AUDIO, "rb") as f:
resp = requests.post(f"{INDEXTTS_URL}/gradio_api/upload", files={"files": f})
resp.raise_for_status()
ref_path = resp.json()[0]
# cache
with open(_CACHE_FILE, "w") as f:
json.dump({"path": ref_path}, f)
return ref_path
def synthesize(text):
ref = get_ref_path()
file_data = {"path": ref, "meta": {"_type": "gradio.FileData"}}
# submit job
resp = requests.post(
f"{INDEXTTS_URL}/gradio_api/call/synthesize",
json={
"data": [
text,
file_data, # spk_audio
file_data, # emo_audio
0.5, # emo_alpha
0, 0, 0, 0, 0, 0, 0, 0.8, # emotions (calm=0.8)
False, # use_emo_text
"", # emo_text
False, # use_random
]
},
)
resp.raise_for_status()
event_id = resp.json()["event_id"]
# poll result via SSE
result_resp = requests.get(
f"{INDEXTTS_URL}/gradio_api/call/synthesize/{event_id}", stream=True
)
for line in result_resp.iter_lines(decode_unicode=True):
if line.startswith("data: "):
data = json.loads(line[6:])
if isinstance(data, list) and data:
url = data[0].get("url", "")
if url:
# download the wav
wav = requests.get(url)
wav.raise_for_status()
os.makedirs(OUTPUT_DIR, exist_ok=True)
ts = time.strftime("%Y%m%d_%H%M%S")
out_path = os.path.join(OUTPUT_DIR, f"tts_{ts}.wav")
with open(out_path, "wb") as f:
f.write(wav.content)
return out_path
elif data is None:
raise RuntimeError("TTS synthesis failed (server returned null)")
raise RuntimeError("No result received from TTS server")
def main():
if len(sys.argv) < 2 or sys.argv[1] in ("--help", "-h"):
print(__doc__.strip())
sys.exit(0)
if sys.argv[1] == "--schema":
print(json.dumps(SCHEMA, ensure_ascii=False))
sys.exit(0)
arg = sys.argv[1]
if not arg.startswith("{"):
text = " ".join(sys.argv[1:])
else:
try:
args = json.loads(arg)
text = args.get("text", "")
except json.JSONDecodeError as e:
print(f"Invalid JSON: {e}")
sys.exit(1)
if not text:
print("Error: text is required")
sys.exit(1)
try:
path = synthesize(text)
print(path)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

187
tools/manage_todo Executable file
View File

@@ -0,0 +1,187 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = ["requests"]
# ///
"""Feishu Bitable todo manager.
Usage:
./fam-todo.py list-undone List open todos
./fam-todo.py list-done List completed todos
./fam-todo.py add <title> Add a new todo
./fam-todo.py mark-done <record_id> Mark as done
./fam-todo.py mark-undone <record_id> Mark as undone
./fam-todo.py --schema Print tool schema JSON
"""
import json
import sys
import requests
APP_ID = "cli_a7f042e93d385013"
APP_SECRET = "ht4FCjQ8JJ65ZPUWlff6ldFBmaP0mxqY"
APP_TOKEN = "SSoGbmGFoazJkUs7bbfcaSG8n7f"
TABLE_ID = "tblIA2biceDpvr35"
BASE_URL = "https://open.feishu.cn/open-apis"
ACTIONS = ["list-undone", "list-done", "add", "mark-done", "mark-undone"]
SCHEMA = {
"name": "fam_todo",
"description": "管理 Fam 的飞书待办事项表格。",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ACTIONS,
"description": "操作类型",
},
"title": {
"type": "string",
"description": "待办标题 (add 时必填)",
},
"record_id": {
"type": "string",
"description": "记录ID (mark-done/mark-undone 时必填)",
},
},
"required": ["action"],
},
}
def get_token():
r = requests.post(
f"{BASE_URL}/auth/v3/tenant_access_token/internal/",
json={"app_id": APP_ID, "app_secret": APP_SECRET},
)
r.raise_for_status()
return r.json()["tenant_access_token"]
def headers():
return {"Authorization": f"Bearer {get_token()}", "Content-Type": "application/json"}
def api(method, path, **kwargs):
url = f"{BASE_URL}/bitable/v1/apps/{APP_TOKEN}/tables/{TABLE_ID}{path}"
r = requests.request(method, url, headers=headers(), **kwargs)
r.raise_for_status()
return r.json()
def format_field(v):
if isinstance(v, list):
return "".join(
seg.get("text", str(seg)) if isinstance(seg, dict) else str(seg)
for seg in v
)
return str(v)
def list_records(done_filter):
"""List records. done_filter: True=done only, False=undone only."""
data = api("GET", "/records", params={"page_size": 500})
items = data.get("data", {}).get("items", [])
if not items:
return "No records found."
lines = []
for item in items:
fields = item.get("fields", {})
is_done = bool(fields.get("Done"))
if is_done != done_filter:
continue
rid = item["record_id"]
title = format_field(fields.get("Item", ""))
priority = fields.get("Priority", "")
notes = format_field(fields.get("Notes", ""))
parts = [f"[{rid}] {title}"]
if priority:
parts.append(f" P: {priority}")
if notes:
preview = notes[:80].replace("\n", " ")
parts.append(f" Note: {preview}")
lines.append("\n".join(parts))
if not lines:
label = "completed" if done_filter else "open"
return f"No {label} todos."
return "\n".join(lines)
def add_record(title):
data = api("POST", "/records", json={"fields": {"Item": title}})
rid = data.get("data", {}).get("record", {}).get("record_id", "?")
return f"Added [{rid}]: {title}"
def mark_done(record_id):
api("PUT", f"/records/{record_id}", json={"fields": {"Done": True}})
return f"Marked [{record_id}] as done"
def mark_undone(record_id):
api("PUT", f"/records/{record_id}", json={"fields": {"Done": False}})
return f"Marked [{record_id}] as undone"
def main():
if len(sys.argv) < 2 or sys.argv[1] in ("--help", "-h"):
print(__doc__.strip())
sys.exit(0)
if sys.argv[1] == "--schema":
print(json.dumps(SCHEMA, ensure_ascii=False))
sys.exit(0)
arg = sys.argv[1]
if not arg.startswith("{"):
args = {"action": arg}
if len(sys.argv) > 2:
args["title"] = " ".join(sys.argv[2:])
args["record_id"] = sys.argv[2] # also set record_id for mark-*
else:
try:
args = json.loads(arg)
except json.JSONDecodeError as e:
print(f"Invalid JSON: {e}")
sys.exit(1)
action = args.get("action", "")
try:
if action == "list-undone":
print(list_records(done_filter=False))
elif action == "list-done":
print(list_records(done_filter=True))
elif action == "add":
title = args.get("title", "")
if not title:
print("Error: title is required")
sys.exit(1)
print(add_record(title))
elif action == "mark-done":
rid = args.get("record_id", "")
if not rid:
print("Error: record_id is required")
sys.exit(1)
print(mark_done(rid))
elif action == "mark-undone":
rid = args.get("record_id", "")
if not rid:
print("Error: record_id is required")
sys.exit(1)
print(mark_undone(rid))
else:
print(f"Unknown action: {action}. Valid: {', '.join(ACTIONS)}")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()