Compare commits

...

11 Commits

Author SHA1 Message Date
Fam Zheng
35cafbd4ca nocmem: qa-style extraction prompt, multi-cue variants, claude history importer
- Switch extraction prompt to qa-style (80% recall vs 60% baseline)
- Semicolon-separated cues in extraction become paraphrase variants
- Add import_claude.py to bulk-import Claude Code conversation history
- Fix LLM model name in systemd service, add logging basicConfig
2026-04-11 22:57:17 +01:00
Fam Zheng
daed6c9d37 whisper: use large-v3 model with Chinese language hint 2026-04-11 18:13:54 +01:00
Fam Zheng
ffa71ac2d9 Revert "native multimodal audio, remove whisper transcription, filter media from prompt text"
This reverts commit a26d58e581.
2026-04-11 18:00:59 +01:00
Fam Zheng
a26d58e581 native multimodal audio, remove whisper transcription, filter media from prompt text
- Send audio files (ogg/wav/mp3/flac/m4a) as audio_url in multimodal content
- Remove whisper_url config and transcribe_audio — LLM handles audio natively
- Skip media files in build_prompt text (only mention non-media uploads)
- Log multimodal content types and successful API calls for debugging
2026-04-11 17:58:00 +01:00
Fam Zheng
7000ccda0f add nocmem: auto memory recall + ingest via NuoNuo hippocampal network
- nocmem Python service (mem/): FastAPI wrapper around NuoNuo's
  Hopfield-Hebbian memory, with /recall, /ingest, /store, /stats endpoints
- NOC integration: auto recall after user message (injected as system msg),
  async ingest after LLM response (fire-and-forget)
- Recall: cosine pre-filter (threshold 0.35) + Hopfield attention (β=32),
  top_k=3, KV-cache friendly (appended after user msg, not in system prompt)
- Ingest: LLM extraction + paraphrase augmentation, heuristic fallback
- Wired into main.rs, life.rs (agent done), http.rs (api chat)
- Config: optional `nocmem.endpoint` in config.yaml
- Includes benchmarks: LongMemEval (R@5=94.0%), efficiency, noise vs scale
- Design doc: doc/nocmem.md
2026-04-11 12:24:48 +01:00
Fam Zheng
688387dac3 refactor tool call logging: send as .md file per tool, remove tool_log accumulation 2026-04-11 12:17:01 +01:00
Fam Zheng
b55ed0127c send audio via send_audio/send_voice, format tool log as markdown, add gen_voice debug logging 2026-04-11 09:58:27 +01:00
Fam Zheng
2b42ca539c send tool call log as .md file, deploy assets, fix status(&mut self) 2026-04-11 09:39:51 +01:00
Fam Zheng
55e9b2f50f persistent auth in SQLite, API chat/logs, agent completion via channel
- Auth: move from state.json to SQLite authed_chats table, with memory cache
- Remove Persistent/state.json, all state now in noc.db
- HTTP API: POST /api/chat (end-to-end LLM), GET /api/logs (failed API requests)
- API logging: store raw request/response for 400 errors in api_log table
- Agent completion: spawn_agent sends LifeEvent::AgentDone via channel,
  life loop picks up with full conversation context and responds
- Config structs: derive Clone for HTTP server
- System prompt: instruct LLM not to add timestamps
- Makefile: rsync without --delete to preserve VPS-only tools
2026-04-11 09:31:48 +01:00
Fam Zheng
f7bcdf9b4b add write_file tool for reliable file creation 2026-04-10 23:01:07 +01:00
Fam Zheng
c2be8e6930 add http API, channel-driven life loop, predefined diary timer
- Extract http.rs: unified HTTP server with /api/timers and gitea webhook
- Life loop: select! on interval tick + mpsc channel for force-fire
- Predefined diary timer (cron 22:55 daily), auto-registered on startup
- BufferOutput for system timers (chat_id=0), no TG message
- state: ensure_timer(), get_timer()
- context.md: add blog and Hugo docs for AI
2026-04-10 22:58:39 +01:00
28 changed files with 5180 additions and 190 deletions

2
.gitignore vendored
View File

@@ -7,3 +7,5 @@ state.*.json
target/
data/
noc.service
tools/manage_todo
mem/benchmarks/longmemeval.json

View File

@@ -34,7 +34,8 @@ deploy: test build
scp target/release/noc $(SUITE):~/bin/
scp config.suite.yaml $(SUITE):/data/noc/config.yaml
scp noc.service.in $(SUITE):/data/noc/
scp -r tools/ $(SUITE):/data/noc/tools/
rsync -a tools/ $(SUITE):/data/noc/tools/
rsync -a assets/ $(SUITE):/data/noc/assets/
ssh $(SUITE) 'bash -lc "\
cd /data/noc \
&& sed -e \"s|@REPO@|/data/noc|g\" -e \"s|@PATH@|\$$PATH|g\" noc.service.in > ~/.config/systemd/user/noc.service \

View File

@@ -7,9 +7,10 @@
- **LLM**: vLLM on ailab (100.84.7.49:8000), gemma-4-31B-it-AWQ
- **Claude Code**: ~/.local/bin/claude (子代<E5AD90><E4BBA3>执行引擎)
- **uv**: ~/.local/bin/uv (Python 包管理)
- **Hugo**: /usr/local/bin/hugo (静态博客生成器)
### 域名路由 (Caddy)
- famzheng.me — 主站(占位)
- famzheng.me → Hugo 博客 (/data/www/blog/public/)
- git.famzheng.me → Gitea (localhost:3000)
- 新增子域名:编辑 /etc/caddy/Caddyfile然后 `sudo systemctl reload caddy`
@@ -31,6 +32,30 @@ api.famzheng.me {
修改后执行 `sudo systemctl reload caddy` 生效。
Caddy 自动申请和续期 Let's Encrypt 证书,无需手动管理。
### 博客
Fam 的博客:
- 站点: https://famzheng.me, 源码: /data/www/blog/
- Repo: https://git.famzheng.me/fam/blog
- 这是 Fam 的个人博客,不要在上面写东西
你的博客 (AI 日记/随想):
- 站点: https://noc.famzheng.me, 源码: /data/www/noc-blog/
- Repo: https://git.famzheng.me/noc/diary
- 这是你自己的空间,可以自由写日记、随想、技术笔记
- 写新文章: 在 content/posts/ 下创建 .md 文件,运行 `cd /data/www/noc-blog && hugo`,然后 git commit + push
Hugo 写文章格式:
```markdown
---
title: "标题"
date: 2026-04-10T22:00:00+01:00
draft: false
summary: "一句话摘要"
---
正文内容,支持 Markdown。
```
### Gitea
- URL: https://git.famzheng.me
- Admin: noc (token 在 /data/noc/gitea-token)

277
doc/nocmem.md Normal file
View File

@@ -0,0 +1,277 @@
# nocmem — NOC 自动记忆系统
## 动机
NOC 现有记忆100 个文本槽位200 字符/槽)+ 滑动窗口摘要。全部塞在 system prompt 里,每次对话都带着。
问题:
- 没有语义检索,无关记忆浪费 token
- 槽位容量有限,不可扩展
- 没有联想能力A 提到 → 想起 B → 引出 C
nocmem 用 NuoNuo 的 Hopfield-Hebbian 混合记忆网络替代朴素文本槽位,实现**自动召回**和**自动存储**。
## 核心技术
### NuoNuo Hippocampal Memory
生物启发的双层记忆架构(详见 `../nuonuo/doc/architecture.md`
**Layer 1 — Hopfield单跳噪声容忍**
存储 (cue, target) embedding 对。召回时两阶段:
1. **NN 预过滤**cosine similarity 找 top-K 候选K=20
2. **Hopfield settle**:β-scaled softmax attention 迭代收敛3 步)
关键特性:**paraphrase 容忍** — 用户换一种说法问同样的事,照样能召回。通过存储 cue variants同一条记忆的多种表述实现attention 按 memory_id 聚合。
**Layer 2 — Hebbian多跳联想链**
WTA pattern separation384D → 16384D 稀疏码k=50稀疏度 0.3%+ 外积权重矩阵 W。
Hopfield 找到起点后Hebbian 通过 `W @ code` 沿关联链前进A → B → C。
这是传统 RAG 做不到的——向量搜索只能找"相似"Hebbian 能找"相关但不相似"的东西。
**性能指标**
| 指标 | 数值 |
|------|------|
| Paraphrase recall+augmentation, 2K bg | 95-100% |
| Multi-hop3 hops, 500 bg | 100% |
| Scale20K memories, no augmentation | 80% |
| Recall 延迟 @ 20K | 4ms |
| VRAM | ~1 GB |
### Embedding
使用 `all-MiniLM-L6-v2`384 维CPU/GPU 均可。选择理由:
- NuoNuo 实验P1验证**gap metric相关与不相关的分数差比绝对相似度更重要**
- MiniLM 在 gap metric 上优于 BGE-large 等更大模型
- 推理快GPU ~1msCPU ~10ms per query
### 记忆提取
对话结束后,用 LLM 从 (user_msg, assistant_msg) 中提取 (cue, target, importance) 三元组:
- **cue**:什么情况下应该回忆起这条记忆(触发短语)
- **target**:记忆内容本身
- **importance**0-1 重要度评分
LLM 不可用时回退到 heuristic问答模式检测 + 技术关键词匹配)。
提取后LLM 为每个 cue 生成 3 个 paraphrase作为 cue_variants 存入,提升召回鲁棒性。
## 架构
```
┌─────────────┐
│ Telegram │
│ User │
└──────┬───────┘
│ message
┌─────────────┐
│ NOC │
│ (Rust) │
│ │
│ 1. 收到 user │
│ message │
│ │
│ 2. HTTP POST ├──────────────────┐
│ /recall │ │
│ │ ▼
│ │ ┌─────────────────┐
│ │ │ nocmem │
│ │ │ (Python) │
│ │ │ │
│ │ │ embed(query) │
│ │◄────────┤ hippocampus │
│ recalled │ │ .recall() │
│ memories │ │ format results │
│ │ └─────────────────┘
│ 3. 构建 messages:
│ [...history,
│ user_msg,
│ {role:system,
│ recalled memories}]
│ │
│ 4. 调 LLM │
│ (stream) │
│ │
│ 5. 得到 │
│ response │
│ │
│ 6. 异步 POST ├──────────────────┐
│ /ingest │ │
│ │ ▼
│ │ ┌─────────────────┐
│ │ │ nocmem │
│ │ │ │
│ │ │ LLM extract │
│ │ │ embed + store │
│ │ │ save checkpoint │
│ │ └─────────────────┘
│ 7. 回复用户 │
└──────────────┘
```
## 消息注入策略
**关键设计**recalled memories 注入在 user message **之后**,作为独立的 system message。
```json
[
{"role": "system", "content": "persona + memory_slots + ..."}, // 不变
{"role": "user", "content": "历史消息1"}, // 历史
{"role": "assistant", "content": "历史回复1"},
...
{"role": "user", "content": "当前用户消息"}, // 当前轮
{"role": "system", "content": "[相关记忆]\n- 记忆1\n- 记忆2"} // ← nocmem 注入
]
```
为什么不放 system prompt 里?
**KV cache 友好**。System prompt 是所有对话共享的前缀,如果每条消息都改 system prompt 的内容(注入不同的 recalled memories整个 KV cache 前缀失效,前面几千 token 全部重算。
放在 user message 之后前缀system prompt + 历史消息 + 当前 user message保持稳定只有尾部的 recalled memories 是变化的KV cache 命中率最大化。
**临时性**。Recalled memories 不持久化到对话历史数据库。每轮对话独立召回,下一轮消息进来时重新召回当时相关的记忆。这避免了历史消息中堆积大量冗余的记忆注入。
## HTTP API
### POST /recall
请求:
```json
{"text": "数据库最近是不是很慢"}
```
响应:
```json
{
"memories": "[相关记忆]\n- 上次数据库慢是因为缺少索引 (hop=1)\n- PostgreSQL 跑在 5432 端口 (hop=2)",
"count": 2
}
```
- 如果没有相关记忆,返回 `{"memories": "", "count": 0}`
- NOC 检查 count > 0 才注入,避免空消息
### POST /ingest
请求:
```json
{
"user_msg": "帮我看看数据库为什么慢",
"assistant_msg": "检查了一下,是 users 表缺少 email 字段的索引..."
}
```
响应:
```json
{"stored": 2}
```
- fire-and-forgetNOC 不等响应
- 内部流程LLM 提取 → embed → generate paraphrases → store → save checkpoint
### GET /stats
```json
{
"num_memories": 1234,
"num_cue_entries": 4500,
"augmentation_ratio": 3.6,
"vram_mb": 1024,
"embedding_model": "all-MiniLM-L6-v2"
}
```
## NOC 侧改动
### config.yaml
```yaml
nocmem:
endpoint: "http://127.0.0.1:9820"
```
### Rust 改动(最小化)
**`config.rs`**:加一个可选字段
```rust
#[serde(default)]
pub nocmem: Option<NocmemConfig>,
#[derive(Deserialize, Clone)]
pub struct NocmemConfig {
pub endpoint: String,
}
```
**`main.rs`**(主消息处理路径):
`api_messages.push(user_msg)` 之后、`run_openai_with_tools` 之前:
```rust
// auto recall from nocmem
if let Some(ref nocmem) = config.nocmem {
if let Ok(recalled) = nocmem_recall(&nocmem.endpoint, &prompt).await {
if !recalled.is_empty() {
api_messages.push(serde_json::json!({
"role": "system",
"content": recalled
}));
}
}
}
```
在 LLM 回复之后(`push_message` 之后):
```rust
// async ingest to nocmem (fire-and-forget)
if let Some(ref nocmem) = config.nocmem {
let endpoint = nocmem.endpoint.clone();
let u = prompt.clone();
let a = response.clone();
tokio::spawn(async move {
let _ = nocmem_ingest(&endpoint, &u, &a).await;
});
}
```
`nocmem_recall``nocmem_ingest` 是两个简单的 HTTP 调用函数。recall 设 500ms 超时(失败就跳过,不影响正常对话)。
### 同步覆盖的调用点
| 位置 | 场景 | recall | ingest |
|------|------|--------|--------|
| `main.rs` handle_message | 用户聊天 | ✅ | ✅ |
| `life.rs` AgentDone | 子代理完成通知 | ✅ | ❌ |
| `life.rs` run_timer | 定时器触发 | ❌ | ❌ |
| `http.rs` api_chat | HTTP API 聊天 | ✅ | ✅ |
| `gitea.rs` | Gitea webhook | ❌ | ❌ |
## 部署
nocmem 作为独立 Python 服务运行:
```bash
cd /data/src/noc/mem
uv run uvicorn server:app --host 127.0.0.1 --port 9820
```
可配 systemd 管理。checkpoint 持久化到 `./data/hippocampus.pt`(相对于 mem 目录)。
## 未来方向
- **重要度衰减**:长期不被召回的记忆自动降权
- **矛盾检测**:新记忆与旧记忆冲突时自动替换
- **记忆整合sleep consolidation**:定期合并碎片记忆为更紧凑的表示
- **和 memory slot 融合**:逐步迁移 slot 内容到 nocmem最终淘汰 slot 系统

View File

@@ -0,0 +1,345 @@
"""Efficiency benchmark for nocmem vs ChromaDB baseline.
Measures: storage size, memory usage, query latency, ingest throughput
at various scales (100, 1K, 5K, 10K, 20K memories).
Usage:
uv run python benchmarks/efficiency_bench.py
"""
import gc
import os
import json
import shutil
import tempfile
import time
import torch
import psutil
from sentence_transformers import SentenceTransformer
from nuonuo.hippocampus import HippocampalMemory
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
EMBED_MODEL = "all-MiniLM-L6-v2"
EMBED_DIM = 384
DATA_FILE = "benchmarks/longmemeval.json"
# ── helpers ─────────────────────────────────────────────────────────
def get_process_mem_mb():
return psutil.Process(os.getpid()).memory_info().rss / 1024**2
def get_gpu_mem_mb():
if DEVICE != "cuda":
return 0.0
return torch.cuda.memory_allocated() / 1024**2
def file_size_mb(path):
if os.path.exists(path):
return os.path.getsize(path) / 1024**2
return 0.0
def dir_size_mb(path):
total = 0
for dirpath, _, filenames in os.walk(path):
for f in filenames:
total += os.path.getsize(os.path.join(dirpath, f))
return total / 1024**2
# ── extract chunks from LongMemEval ────────────────────────────────
def load_chunks(max_chunks=25000):
"""Extract turn-level chunks from LongMemEval data."""
with open(DATA_FILE) as f:
data = json.load(f)
chunks = []
seen = set()
for item in data:
for sid, sess in zip(item["haystack_session_ids"], item["haystack_sessions"]):
for i in range(0, len(sess) - 1, 2):
key = (sid, i)
if key in seen:
continue
seen.add(key)
user = sess[i]["content"]
asst = sess[i + 1]["content"] if i + 1 < len(sess) else ""
text = f"{user}\n{asst}"[:1000]
chunks.append(text)
if len(chunks) >= max_chunks:
return chunks
return chunks
# ── nocmem benchmark ────────────────────────────────────────────────
def bench_nocmem(encoder, chunks, n, query_texts):
"""Benchmark nocmem at scale n."""
torch.cuda.empty_cache()
gc.collect()
subset = chunks[:n]
gpu_before = get_gpu_mem_mb()
ram_before = get_process_mem_mb()
# batch embed
t0 = time.monotonic()
embeddings = encoder.encode(
subset, convert_to_tensor=True, normalize_embeddings=True,
device=DEVICE, batch_size=256, show_progress_bar=False,
)
embed_time = time.monotonic() - t0
# store
hip = HippocampalMemory(embed_dim=EMBED_DIM, device=DEVICE)
t1 = time.monotonic()
for i in range(n):
hip.store(embeddings[i], embeddings[i], metadata={"id": i})
store_time = time.monotonic() - t1
gpu_after = get_gpu_mem_mb()
ram_after = get_process_mem_mb()
# save to measure file size
tmp = tempfile.mktemp(suffix=".pt")
hip.save(tmp)
disk_mb = file_size_mb(tmp)
os.unlink(tmp)
# query latency — multiple queries, measure p50/p99
query_embs = encoder.encode(
query_texts, convert_to_tensor=True, normalize_embeddings=True,
device=DEVICE, show_progress_bar=False,
)
latencies = []
for qe in query_embs:
t = time.monotonic()
hip.recall(qe, top_k=5)
latencies.append((time.monotonic() - t) * 1000)
latencies.sort()
p50 = latencies[len(latencies) // 2]
p99 = latencies[int(len(latencies) * 0.99)]
avg = sum(latencies) / len(latencies)
# cleanup
del hip, embeddings
torch.cuda.empty_cache()
return {
"n": n,
"embed_time_s": embed_time,
"store_time_s": store_time,
"ingest_rate": n / (embed_time + store_time), # memories/sec
"disk_mb": disk_mb,
"gpu_delta_mb": gpu_after - gpu_before,
"ram_delta_mb": ram_after - ram_before,
"latency_avg_ms": avg,
"latency_p50_ms": p50,
"latency_p99_ms": p99,
}
# ── chromadb benchmark ──────────────────────────────────────────────
def bench_chromadb(encoder, chunks, n, query_texts):
"""Benchmark ChromaDB (MemPalace's backend) at scale n."""
import chromadb
subset = chunks[:n]
ram_before = get_process_mem_mb()
tmpdir = tempfile.mkdtemp()
client = chromadb.PersistentClient(path=tmpdir)
collection = client.create_collection(
name="bench",
metadata={"hnsw:space": "cosine"},
)
# embed
t0 = time.monotonic()
embeddings_np = encoder.encode(
subset, normalize_embeddings=True,
batch_size=256, show_progress_bar=False,
)
embed_time = time.monotonic() - t0
# store — chromadb takes numpy/list
t1 = time.monotonic()
batch = 5000
for start in range(0, n, batch):
end = min(start + batch, n)
collection.add(
ids=[str(i) for i in range(start, end)],
embeddings=embeddings_np[start:end].tolist(),
documents=subset[start:end],
)
store_time = time.monotonic() - t1
ram_after = get_process_mem_mb()
disk_mb = dir_size_mb(tmpdir)
# query latency
query_np = encoder.encode(
query_texts, normalize_embeddings=True, show_progress_bar=False,
)
latencies = []
for qe in query_np:
t = time.monotonic()
collection.query(query_embeddings=[qe.tolist()], n_results=5)
latencies.append((time.monotonic() - t) * 1000)
latencies.sort()
p50 = latencies[len(latencies) // 2]
p99 = latencies[int(len(latencies) * 0.99)]
avg = sum(latencies) / len(latencies)
# cleanup
del client, collection
shutil.rmtree(tmpdir)
return {
"n": n,
"embed_time_s": embed_time,
"store_time_s": store_time,
"ingest_rate": n / (embed_time + store_time),
"disk_mb": disk_mb,
"gpu_delta_mb": 0,
"ram_delta_mb": ram_after - ram_before,
"latency_avg_ms": avg,
"latency_p50_ms": p50,
"latency_p99_ms": p99,
}
# ── main ────────────────────────────────────────────────────────────
def main():
print("nocmem efficiency benchmark")
print(f"device: {DEVICE}")
print()
# check chromadb available
has_chromadb = False
try:
import chromadb
has_chromadb = True
print("chromadb: available (will compare)")
except ImportError:
print("chromadb: not installed (nocmem only)")
print()
print("loading data...")
chunks = load_chunks(25000)
print(f" {len(chunks)} unique chunks extracted")
print("loading encoder...")
encoder = SentenceTransformer(EMBED_MODEL, device=DEVICE)
# query texts — mix of English and Chinese
query_texts = [
"What degree did I graduate with?",
"How to deploy the application?",
"What was the database error we fixed last week?",
"Tell me about the meeting schedule",
"What programming language should I learn?",
"数据库密码在哪里",
"部署到生产环境的步骤",
"上次讨论的性能优化方案",
"项目的技术栈是什么",
"最近的待办事项有哪些",
"How do I configure the server?",
"What's the API endpoint for user authentication?",
"Can you recommend some books on machine learning?",
"What was the root cause of the production incident?",
"How much memory does the GPU have?",
"VR设备的兼容性问题",
"模型推理的延迟是多少",
"代码仓库的结构是怎样的",
"如何解决内存泄漏",
"上次会议的结论是什么",
]
scales = [100, 500, 1000, 5000, 10000, 20000]
# filter to what we have
scales = [s for s in scales if s <= len(chunks)]
nocmem_results = []
chroma_results = []
for n in scales:
print(f"\n── scale: {n:,} memories ──")
print(f" nocmem...", end="", flush=True)
r = bench_nocmem(encoder, chunks, n, query_texts)
nocmem_results.append(r)
print(f" done (R: {r['latency_avg_ms']:.1f}ms, disk: {r['disk_mb']:.1f}MB)")
if has_chromadb:
print(f" chromadb...", end="", flush=True)
r2 = bench_chromadb(encoder, chunks, n, query_texts)
chroma_results.append(r2)
print(f" done (R: {r2['latency_avg_ms']:.1f}ms, disk: {r2['disk_mb']:.1f}MB)")
# ── report ──────────────────────────────────────────────────────
print(f"\n{'='*80}")
print(f"EFFICIENCY BENCHMARK RESULTS")
print(f"{'='*80}")
# table header
if has_chromadb:
print(f"\n{'Scale':>8} | {'--- nocmem ---':^40} | {'--- ChromaDB ---':^40}")
print(f"{'':>8} | {'Latency':>8} {'p99':>8} {'Disk':>8} {'VRAM':>8} {'Rate':>8} | {'Latency':>8} {'p99':>8} {'Disk':>8} {'RAM':>8} {'Rate':>8}")
print(f"{'':>8} | {'(ms)':>8} {'(ms)':>8} {'(MB)':>8} {'(MB)':>8} {'(/s)':>8} | {'(ms)':>8} {'(ms)':>8} {'(MB)':>8} {'(MB)':>8} {'(/s)':>8}")
print("-" * 100)
for nm, cr in zip(nocmem_results, chroma_results):
print(
f"{nm['n']:>8,} | "
f"{nm['latency_avg_ms']:>8.1f} {nm['latency_p99_ms']:>8.1f} {nm['disk_mb']:>8.1f} {nm['gpu_delta_mb']:>8.1f} {nm['ingest_rate']:>8.0f} | "
f"{cr['latency_avg_ms']:>8.1f} {cr['latency_p99_ms']:>8.1f} {cr['disk_mb']:>8.1f} {cr['ram_delta_mb']:>8.1f} {cr['ingest_rate']:>8.0f}"
)
else:
print(f"\n{'Scale':>8} | {'Latency':>8} {'p99':>8} {'Disk':>8} {'VRAM':>8} {'Ingest':>8}")
print(f"{'':>8} | {'(ms)':>8} {'(ms)':>8} {'(MB)':>8} {'(MB)':>8} {'(/s)':>8}")
print("-" * 60)
for nm in nocmem_results:
print(
f"{nm['n']:>8,} | "
f"{nm['latency_avg_ms']:>8.1f} {nm['latency_p99_ms']:>8.1f} {nm['disk_mb']:>8.1f} {nm['gpu_delta_mb']:>8.1f} {nm['ingest_rate']:>8.0f}"
)
# summary
if nocmem_results:
biggest = nocmem_results[-1]
print(f"\nnocmem @ {biggest['n']:,}:")
print(f" Query latency: avg {biggest['latency_avg_ms']:.1f}ms, p99 {biggest['latency_p99_ms']:.1f}ms")
print(f" Disk: {biggest['disk_mb']:.1f} MB")
print(f" VRAM delta: {biggest['gpu_delta_mb']:.1f} MB")
print(f" Ingest rate: {biggest['ingest_rate']:.0f} memories/sec")
if chroma_results:
biggest = chroma_results[-1]
print(f"\nChromaDB @ {biggest['n']:,}:")
print(f" Query latency: avg {biggest['latency_avg_ms']:.1f}ms, p99 {biggest['latency_p99_ms']:.1f}ms")
print(f" Disk: {biggest['disk_mb']:.1f} MB")
print(f" RAM delta: {biggest['ram_delta_mb']:.1f} MB")
print(f" Ingest rate: {biggest['ingest_rate']:.0f} memories/sec")
if has_chromadb and nocmem_results and chroma_results:
nm = nocmem_results[-1]
cr = chroma_results[-1]
print(f"\n── nocmem vs ChromaDB @ {nm['n']:,} ──")
lat_ratio = cr['latency_avg_ms'] / nm['latency_avg_ms'] if nm['latency_avg_ms'] > 0 else float('inf')
disk_ratio = cr['disk_mb'] / nm['disk_mb'] if nm['disk_mb'] > 0 else float('inf')
rate_ratio = nm['ingest_rate'] / cr['ingest_rate'] if cr['ingest_rate'] > 0 else float('inf')
print(f" Latency: nocmem {lat_ratio:.1f}x faster" if lat_ratio > 1 else f" Latency: ChromaDB {1/lat_ratio:.1f}x faster")
print(f" Disk: nocmem {disk_ratio:.1f}x smaller" if disk_ratio > 1 else f" Disk: ChromaDB {1/disk_ratio:.1f}x smaller")
print(f" Ingest: nocmem {rate_ratio:.1f}x faster" if rate_ratio > 1 else f" Ingest: ChromaDB {1/rate_ratio:.1f}x faster")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,239 @@
"""LongMemEval benchmark for nocmem.
Evaluates retrieval quality: given a question, can nocmem find the correct
session(s) from a haystack of ~50 conversation sessions?
Uses HippocampalMemory directly (no HTTP) for speed.
Compares against MemPalace's 96.6% R@5 baseline.
Usage:
uv run python benchmarks/longmemeval_bench.py [--limit N] [--granularity session|turn]
"""
import argparse
import json
import math
import sys
import time
import torch
from sentence_transformers import SentenceTransformer
from nuonuo.hippocampus import HippocampalMemory
# ── setup ───────────────────────────────────────────────────────────
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
EMBED_MODEL = "all-MiniLM-L6-v2"
EMBED_DIM = 384
def load_encoder():
print(f"loading {EMBED_MODEL} on {DEVICE}...")
return SentenceTransformer(EMBED_MODEL, device=DEVICE)
def embed_batch(encoder, texts: list[str]) -> torch.Tensor:
"""Batch embed, returns (N, dim) tensor."""
return encoder.encode(
texts, convert_to_tensor=True, normalize_embeddings=True,
device=DEVICE, batch_size=128, show_progress_bar=False,
)
# ── granularity: how to chunk sessions ──────────────────────────────
def sessions_to_chunks_turn(session_ids, sessions):
"""Each user-assistant turn becomes a separate chunk."""
chunks = [] # (text, session_id)
for sid, sess in zip(session_ids, sessions):
for i in range(0, len(sess) - 1, 2):
user = sess[i]["content"]
asst = sess[i + 1]["content"] if i + 1 < len(sess) else ""
text = f"{user}\n{asst}"
# truncate long turns to avoid embedding issues
chunks.append((text[:1000], sid))
# handle odd-numbered turns
if len(sess) % 2 == 1:
chunks.append((sess[-1]["content"][:1000], sid))
return chunks
def sessions_to_chunks_session(session_ids, sessions):
"""Each session becomes a single chunk (concatenated turns)."""
chunks = []
for sid, sess in zip(session_ids, sessions):
text = "\n".join(m["content"] for m in sess)
# truncate to fit embedding model's context
chunks.append((text[:2000], sid))
return chunks
# ── evaluate one question ───────────────────────────────────────────
def evaluate_question(encoder, item, granularity, ks=(5, 10)):
"""Store haystack, query, check if answer session in top-K.
Returns dict with R@5, R@10, NDCG@10, timings.
"""
# chunk the haystack
if granularity == "turn":
chunks = sessions_to_chunks_turn(
item["haystack_session_ids"], item["haystack_sessions"])
else:
chunks = sessions_to_chunks_session(
item["haystack_session_ids"], item["haystack_sessions"])
texts = [c[0] for c in chunks]
sids = [c[1] for c in chunks]
answer_sids = set(item["answer_session_ids"])
# batch embed all chunks
t0 = time.monotonic()
embeddings = embed_batch(encoder, texts)
embed_time = time.monotonic() - t0
# build memory
t1 = time.monotonic()
hip = HippocampalMemory(embed_dim=EMBED_DIM, device=DEVICE)
for i in range(len(chunks)):
hip.store(
embeddings[i], embeddings[i],
metadata={"session_id": sids[i]},
)
store_time = time.monotonic() - t1
# query
t2 = time.monotonic()
query_emb = encoder.encode(
[item["question"]], convert_to_tensor=True,
normalize_embeddings=True, device=DEVICE,
)[0]
max_k = max(ks)
results = hip.recall(query_emb, top_k=max_k)
recall_time = time.monotonic() - t2
# deduplicate by session_id, preserving rank order
seen = set()
ranked_sids = []
for r in results:
sid = r.metadata["session_id"]
if sid not in seen:
seen.add(sid)
ranked_sids.append(sid)
# compute metrics
metrics = {}
for k in ks:
top_k_sids = set(ranked_sids[:k])
hit = bool(answer_sids & top_k_sids)
metrics[f"R@{k}"] = 1.0 if hit else 0.0
# NDCG@10
ndcg = compute_ndcg(ranked_sids[:10], answer_sids)
metrics["NDCG@10"] = ndcg
metrics["embed_ms"] = embed_time * 1000
metrics["store_ms"] = store_time * 1000
metrics["recall_ms"] = recall_time * 1000
metrics["n_chunks"] = len(chunks)
return metrics
def compute_ndcg(ranked_sids, answer_sids, k=10):
"""Normalized Discounted Cumulative Gain."""
dcg = 0.0
for i, sid in enumerate(ranked_sids[:k]):
if sid in answer_sids:
dcg += 1.0 / math.log2(i + 2) # i+2 because rank starts at 1
# ideal: all answer sessions at top
n_relevant = min(len(answer_sids), k)
idcg = sum(1.0 / math.log2(i + 2) for i in range(n_relevant))
return dcg / idcg if idcg > 0 else 0.0
# ── main ───<E29480><E29480>────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--data", default="benchmarks/longmemeval.json")
parser.add_argument("--limit", type=int, default=0, help="limit number of questions (0=all)")
parser.add_argument("--granularity", choices=["session", "turn"], default="turn")
args = parser.parse_args()
print(f"LongMemEval benchmark for nocmem")
print(f"granularity: {args.granularity}")
print(f"device: {DEVICE}")
print()
with open(args.data) as f:
data = json.load(f)
if args.limit:
data = data[:args.limit]
encoder = load_encoder()
print(f"evaluating {len(data)} questions...\n")
all_metrics = []
by_type = {}
for i, item in enumerate(data):
metrics = evaluate_question(encoder, item, args.granularity)
all_metrics.append(metrics)
qtype = item["question_type"]
if qtype not in by_type:
by_type[qtype] = []
by_type[qtype].append(metrics)
# progress
if (i + 1) % 10 == 0 or i == len(data) - 1:
r5 = sum(m["R@5"] for m in all_metrics) / len(all_metrics) * 100
r10 = sum(m["R@10"] for m in all_metrics) / len(all_metrics) * 100
avg_recall = sum(m["recall_ms"] for m in all_metrics) / len(all_metrics)
print(f" [{i+1:3d}/{len(data)}] R@5={r5:.1f}% R@10={r10:.1f}% recall={avg_recall:.1f}ms")
# final results
n = len(all_metrics)
r5 = sum(m["R@5"] for m in all_metrics) / n * 100
r10 = sum(m["R@10"] for m in all_metrics) / n * 100
ndcg = sum(m["NDCG@10"] for m in all_metrics) / n * 100
avg_embed = sum(m["embed_ms"] for m in all_metrics) / n
avg_store = sum(m["store_ms"] for m in all_metrics) / n
avg_recall = sum(m["recall_ms"] for m in all_metrics) / n
avg_chunks = sum(m["n_chunks"] for m in all_metrics) / n
print(f"\n{'='*60}")
print(f"nocmem LongMemEval Results ({args.granularity} granularity)")
print(f"{'='*60}")
print(f" Questions: {n}")
print(f" Avg chunks: {avg_chunks:.0f}")
print(f"")
print(f" R@5: {r5:.1f}%")
print(f" R@10: {r10:.1f}%")
print(f" NDCG@10: {ndcg:.1f}%")
print(f"")
print(f" Avg embed: {avg_embed:.0f}ms")
print(f" Avg store: {avg_store:.0f}ms")
print(f" Avg recall: {avg_recall:.1f}ms")
print(f"\n── by question type ──")
for qtype, ms in sorted(by_type.items()):
nt = len(ms)
tr5 = sum(m["R@5"] for m in ms) / nt * 100
tr10 = sum(m["R@10"] for m in ms) / nt * 100
print(f" {qtype:30s} n={nt:3d} R@5={tr5:.1f}% R@10={tr10:.1f}%")
print(f"\n── comparison ──")
print(f" MemPalace (raw, session): R@5=96.6%")
print(f" nocmem ({args.granularity:7s}): R@5={r5:.1f}%")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,178 @@
"""Does recall noise decrease as memory count grows?
At various scales, measure:
1. Recall accuracy (R@3) for relevant queries
2. Max cosine similarity for irrelevant queries
3. Separation gap between relevant and irrelevant
If nocmem works well at scale, the gap should widen — relevant queries
should score much higher than irrelevant ones as the memory pool grows.
"""
import json
import time
import torch
import numpy as np
from sentence_transformers import SentenceTransformer
from nuonuo.hippocampus import HippocampalMemory
DEVICE = "cuda"
EMBED_DIM = 384
DATA_FILE = "benchmarks/longmemeval.json"
IRRELEVANT_QUERIES = [
"今天天气怎么样",
"你喜欢吃什么",
"",
"讲个笑话",
"明天会下雨吗",
"你觉得猫可爱还是狗可爱",
"人生的意义是什么",
"帮我写一首诗",
"地球到月球有多远",
"如何学会游泳",
]
BETA_CONFIGS = [16.0, 32.0, 64.0]
SCALES = [50, 200, 500, 1000, 3000]
def main():
print("noise vs scale benchmark\n")
print("loading encoder...")
encoder = SentenceTransformer("all-MiniLM-L6-v2", device=DEVICE)
def emb(text):
return encoder.encode([text], convert_to_tensor=True,
normalize_embeddings=True, device=DEVICE)[0]
def emb_batch(texts):
return encoder.encode(texts, convert_to_tensor=True,
normalize_embeddings=True, device=DEVICE,
batch_size=256, show_progress_bar=False)
# load data
print("loading data...")
with open(DATA_FILE) as f:
data = json.load(f)
# collect unique chunks with their source question index
all_chunks = [] # (text, question_idx, session_id)
seen = set()
for qi, item in enumerate(data):
for sid, sess in zip(item["haystack_session_ids"], item["haystack_sessions"]):
for i in range(0, len(sess) - 1, 2):
key = (sid, i)
if key in seen:
continue
seen.add(key)
user = sess[i]["content"]
asst = sess[i + 1]["content"] if i + 1 < len(sess) else ""
text = f"{user}\n{asst}"[:1000]
all_chunks.append((text, qi, sid))
print(f" {len(all_chunks)} unique chunks")
# pre-embed irrelevant queries
irrel_embs = [emb(q) for q in IRRELEVANT_QUERIES]
# collect relevant queries: for each question, we know the answer session
# pick first 50 questions that have at least one answer session
relevant_queries = []
for item in data[:100]:
answer_sids = set(item["answer_session_ids"])
relevant_queries.append((item["question"], answer_sids))
rel_query_embs = emb_batch([q for q, _ in relevant_queries])
print(f" {len(relevant_queries)} relevant queries")
print(f" {len(IRRELEVANT_QUERIES)} irrelevant queries")
# filter scales to what we have
scales = [s for s in SCALES if s <= len(all_chunks)]
for beta in BETA_CONFIGS:
print(f"\n{'='*70}")
print(f" β = {beta}")
print(f"{'='*70}")
print(f"{'Scale':>7} | {'R@3':>6} | {'Rel maxcos':>10} {'Irrel maxcos':>12} {'Gap':>8} | {'Rel attn':>9} {'Irrel attn':>11}")
print("-" * 80)
for n in scales:
subset = all_chunks[:n]
texts = [c[0] for c in subset]
sids = [c[2] for c in subset]
# embed and build memory
embeddings = emb_batch(texts)
hip = HippocampalMemory(
embed_dim=EMBED_DIM, beta=beta, hopfield_top_k=10, device=DEVICE,
)
for i in range(n):
hip.store(embeddings[i], embeddings[i],
metadata={"session_id": sids[i]})
cue_mat = hip._get_cue_matrix()
# --- relevant queries ---
rel_max_cos = []
rel_top_attn = []
hits = 0
tested = 0
for qi in range(len(relevant_queries)):
question, answer_sids = relevant_queries[qi]
qe = rel_query_embs[qi]
# check if any answer session is in this subset
subset_sids = set(sids)
if not (answer_sids & subset_sids):
continue
tested += 1
# cosine sim
cos_sims = qe @ cue_mat.T
rel_max_cos.append(cos_sims.max().item())
# recall
results = hip.recall(qe, top_k=3)
top_attn = results[0].similarity if results else 0
rel_top_attn.append(top_attn)
recalled_sids = {r.metadata["session_id"] for r in results}
if answer_sids & recalled_sids:
hits += 1
r3 = hits / tested * 100 if tested > 0 else 0
avg_rel_cos = np.mean(rel_max_cos) if rel_max_cos else 0
avg_rel_attn = np.mean(rel_top_attn) if rel_top_attn else 0
# --- irrelevant queries ---
irrel_max_cos = []
irrel_top_attn = []
for qe in irrel_embs:
cos_sims = qe @ cue_mat.T
irrel_max_cos.append(cos_sims.max().item())
results = hip.recall(qe, top_k=3)
top_attn = results[0].similarity if results else 0
irrel_top_attn.append(top_attn)
avg_irrel_cos = np.mean(irrel_max_cos)
avg_irrel_attn = np.mean(irrel_top_attn)
gap = avg_rel_cos - avg_irrel_cos
print(f"{n:>7,} | {r3:>5.1f}% | {avg_rel_cos:>10.3f} {avg_irrel_cos:>12.3f} {gap:>8.3f} | {avg_rel_attn:>8.0%} {avg_irrel_attn:>10.0%}")
del hip
torch.cuda.empty_cache()
print(f"\n── 解读 ──")
print(f"Rel maxcos: 相关查询的最大余弦相似度(越高越好)")
print(f"Irrel maxcos: 无关查询的最大余弦相似度(越低越好)")
print(f"Gap: 两者之差(越大越好 = 越容易区分)")
print(f"Rel attn: 相关查询 top1 的 Hopfield attention 权重")
print(f"Irrel attn: 无关查询 top1 的 Hopfield attention 权重(越低 = 越少噪音)")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,217 @@
"""Test different extraction prompts for ingest quality.
Takes a few real conversation turns, tries different prompts,
compares what gets extracted, then tests recall against target queries.
"""
import json
import time
import requests
import torch
from sentence_transformers import SentenceTransformer
from nuonuo.hippocampus import HippocampalMemory
from openai import OpenAI
DEVICE = "cuda"
LLM = OpenAI(base_url="http://100.84.7.49:8000/v1", api_key="unused")
MODEL = "gemma4-31b"
encoder = SentenceTransformer("all-MiniLM-L6-v2", device=DEVICE)
# real conversation turns from Claude Code history
TURNS = [
("装好了tailscale。你帮忙生成吧用新的bot名叫yoyo",
"Tailscale IP `100.116.213.25`yoyo 用户创建成功。继续搞 token 和部署。"),
("能来一个org共享的gitea ci cd runner 不,就跑在这台机器上就行。",
"搞起来!装 act_runner注册到 euphon org以后每个仓库直接加 `.gitea/workflows/` 就能跑 CI 了。"),
("能不能给heradev一个bridge的nicsecondary的这样就有192.168.1.0/24的地址可以用了",
"饭团,你是要给 heradev 这个 KVM 虚拟机加一个桥接网卡对吧?这样它就能直接拿到物理网段 192.168.1.0/24 的 IP。"),
("i7 bot也不好直接叫@ms吧",
"好,@ms 改好了。现在来研究全局 webhook 的问题——之前系统级 webhook 没触发,让我深入看看。"),
("AI/ML 工具克隆 全部删掉吧emblemcvvisint也删掉",
"好嘞,这一波删掉能省 ~62G。动手了。"),
]
# queries we expect to recall
TARGET_QUERIES = [
("yoyo是什么", ["yoyo"]),
("act runner怎么装的", ["act_runner", "runner"]),
("heradev网络怎么配的", ["heradev", "桥接", "bridge", "192.168"]),
("gitea bot叫什么", ["ms", "@ms", "麻薯"]),
("清理了哪些项目", ["emblemcv", "visint", "62G", ""]),
]
# different extraction prompts to test
PROMPTS = {
"baseline": """From this conversation turn, extract key facts worth remembering for future conversations.
For each fact, provide a "cue" (what would trigger recalling this) and a "target" (the fact itself).
Rate importance 0-1 (1 = critical fact, 0 = trivial).
User: {user}
Assistant: {assistant}
Output format (one per line):
CUE: <trigger phrase> | TARGET: <fact> | IMPORTANCE: <0-1>
Only extract genuinely useful facts. If nothing worth remembering, output NONE.""",
"entity_focused": """从这段对话中提取值得记住的事实。重点关注:
- 名称、代号、别名(谁叫什么)
- 配置、参数、端口、地址
- 做了什么操作、改了什么
- 决策和原因
每条事实用以下格式输出(每行一条):
CUE: <用什么问题能想起这件事> | TARGET: <事实本身,要具体> | IMPORTANCE: <0-1>
User: {user}
Assistant: {assistant}
如果没有值得记住的,输出 NONE。""",
"multi_cue": """从这段对话中提取值得长期记住的事实。
要求:
1. 每条事实提供 2-3 个不同的触发短语cue用分号分隔
2. target 要具体、独立可理解(不依赖上下文)
3. 包含所有出现的名称、代号、配置值
格式(每行一条):
CUE: <触发短语1>; <触发短语2>; <触发短语3> | TARGET: <具体事实> | IMPORTANCE: <0-1>
User: {user}
Assistant: {assistant}
没有值得记住的则输出 NONE。""",
"qa_style": """你是一个记忆提取器。把这段对话变成若干个"问答对"——未来有人问这个问题时,能直接给出答案。
要求:
- 问题要自然,像人真的会这么问
- 答案要具体完整,包含关键细节(名称、数字、地址等)
- 同一个事实可以从不同角度提问
格式(每行一条):
CUE: <自然的提问方式> | TARGET: <完整的回答> | IMPORTANCE: <0-1>
User: {user}
Assistant: {assistant}
没有值得记住的则输出 NONE。""",
}
import re
def extract_with_prompt(prompt_template, user_msg, asst_msg):
prompt = prompt_template.format(user=user_msg, assistant=asst_msg)
try:
resp = LLM.chat.completions.create(
model=MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0.3, max_tokens=512,
)
result = resp.choices[0].message.content
except Exception as e:
return []
memories = []
for line in result.strip().split("\n"):
if line.strip() == "NONE":
break
m = re.match(r"CUE:\s*(.+?)\s*\|\s*TARGET:\s*(.+?)\s*\|\s*IMPORTANCE:\s*([\d.]+)", line)
if m:
memories.append({
"cue": m.group(1).strip(),
"target": m.group(2).strip(),
"importance": float(m.group(3)),
})
return memories
def emb(text):
return encoder.encode([text], convert_to_tensor=True, normalize_embeddings=True, device=DEVICE)[0]
def test_recall(memories_list, queries):
"""Build a memory from extracted memories and test recall."""
hip = HippocampalMemory(embed_dim=384, beta=32.0, hopfield_top_k=10, device=DEVICE)
for mem in memories_list:
cue_text = mem["cue"]
target_text = mem["target"]
cue_emb = emb(cue_text)
target_emb = emb(target_text)
# handle multi-cue (semicolon separated)
variants = []
if ";" in cue_text:
parts = [p.strip() for p in cue_text.split(";") if p.strip()]
if len(parts) > 1:
cue_emb = emb(parts[0])
variants = [emb(p) for p in parts[1:]]
hip.store(cue_emb, target_emb, cue_variants=variants if variants else None,
metadata={"cue": cue_text, "target": target_text})
hits = 0
for query, keywords in queries:
qe = emb(query)
results = hip.recall(qe, top_k=3)
recalled_text = " ".join(r.metadata["target"] for r in results)
hit = any(kw.lower() in recalled_text.lower() for kw in keywords)
if hit:
hits += 1
return hits, len(queries)
def main():
print("extraction prompt experiment\n")
print(f"turns: {len(TURNS)}, queries: {len(TARGET_QUERIES)}\n")
for name, template in PROMPTS.items():
print(f"{'='*60}")
print(f" prompt: {name}")
print(f"{'='*60}")
all_memories = []
for user_msg, asst_msg in TURNS:
mems = extract_with_prompt(template, user_msg, asst_msg)
all_memories.extend(mems)
for m in mems:
print(f" [{m['importance']:.1f}] CUE: {m['cue'][:50]}")
print(f" TGT: {m['target'][:60]}")
print(f"\n extracted: {len(all_memories)} memories")
hits, total = test_recall(all_memories, TARGET_QUERIES)
print(f" recall: {hits}/{total} ({hits/total*100:.0f}%)")
# show per-query results
hip = HippocampalMemory(embed_dim=384, beta=32.0, hopfield_top_k=10, device=DEVICE)
for mem in all_memories:
cue_text = mem["cue"]
cue_emb = emb(cue_text.split(";")[0].strip() if ";" in cue_text else cue_text)
target_emb = emb(mem["target"])
variants = []
if ";" in cue_text:
parts = [p.strip() for p in cue_text.split(";") if p.strip()]
variants = [emb(p) for p in parts[1:]] if len(parts) > 1 else []
hip.store(cue_emb, target_emb, cue_variants=variants or None,
metadata={"cue": cue_text, "target": mem["target"]})
for query, keywords in TARGET_QUERIES:
qe = emb(query)
results = hip.recall(qe, top_k=1)
if results:
target = results[0].metadata["target"][:60]
hit = any(kw.lower() in results[0].metadata["target"].lower() for kw in keywords)
mark = "" if hit else ""
print(f" {mark} {query:20s}{target}")
else:
print(f"{query:20s} → (empty)")
print()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,104 @@
"""Test Hopfield attention sharpness with different top_k and beta.
Goal: find settings that give "either clearly remembered or nothing"
instead of flat attention across 20 candidates.
"""
import torch
from sentence_transformers import SentenceTransformer
from nuonuo.hippocampus import HippocampalMemory
DEVICE = "cuda"
EMBED_DIM = 384
print("loading encoder...")
encoder = SentenceTransformer("all-MiniLM-L6-v2", device=DEVICE)
def emb(text):
return encoder.encode([text], convert_to_tensor=True, normalize_embeddings=True, device=DEVICE)[0]
# store the same memories in each config
MEMORIES = [
("bot的名字叫什么", "bot的名字叫小乖是Fam给取的"),
("有哪些工具可以用", "工具有: fam_todo, send_file, spawn_agent, run_shell, run_python, update_memory"),
("vLLM在5090上的性能", "RTX 5090上vLLM跑gemma只有4.8 tok/s需要切换到awq_marlin"),
("repo-vis项目是什么", "repo-vis用Rust后端+Three.js前端的3D代码库可视化目标支持Linux内核和Pico VR"),
("repo-vis的性能瓶颈", "Linux内核79K文件SQLite 1GB上限和O(n)反序列化是瓶颈需要n-ary tree按需合并"),
("明天的待办事项", "最紧迫的是emblem scanner的AI Chat和KB部分"),
("后端切换到了什么", "NOC后端切换到了vLLM速度变快了"),
("数据库密码在哪里", "数据库密码存在 /etc/secrets/db.env 文件中"),
("什么GPU", "服务器有NVIDIA RTX 4090 24GB VRAM"),
("home有多少log文件", "home目录及子目录下共有960个.log文件"),
]
QUERIES = [
("repo-vis怎么样了", "repo-vis", True), # should recall clearly
("数据库密码", "密码", True), # should recall clearly
("今天天气怎么样", "天气", False), # irrelevant, should recall nothing
("vllm速度", "vllm", True), # should recall clearly
("你喜欢吃什么", "吃什么", False), # irrelevant
("VR支持", "VR", True), # edge case
]
CONFIGS = [
# (top_k, beta, label)
(20, 16.0, "baseline (top_k=20, β=16)"),
(10, 16.0, "top_k=10, β=16"),
(5, 16.0, "top_k=5, β=16"),
(20, 32.0, "top_k=20, β=32"),
(20, 64.0, "top_k=20, β=64"),
(10, 32.0, "top_k=10, β=32"),
(5, 32.0, "top_k=5, β=32"),
(5, 64.0, "top_k=5, β=64"),
]
# pre-embed everything
mem_embs = [(emb(c), emb(t), c, t) for c, t in MEMORIES]
query_embs = [(emb(q), label, relevant) for q, label, relevant in QUERIES]
print(f"\n{len(MEMORIES)} memories, {len(QUERIES)} queries, {len(CONFIGS)} configs\n")
for top_k, beta, label in CONFIGS:
print(f"{'='*70}")
print(f" {label}")
print(f"{'='*70}")
hip = HippocampalMemory(
embed_dim=EMBED_DIM, hopfield_top_k=top_k, beta=beta, device=DEVICE,
)
for ce, te, cue_text, target_text in mem_embs:
hip.store(ce, te, metadata={"cue": cue_text, "target": target_text})
for qe, qlabel, should_recall in query_embs:
results = hip.recall(qe, top_k=5)
# show distribution
sims = [r.similarity for r in results]
top1 = sims[0] if sims else 0
top2 = sims[1] if len(sims) > 1 else 0
gap = top1 - top2 # gap between #1 and #2
above_5pct = sum(1 for s in sims if s >= 0.05)
above_10pct = sum(1 for s in sims if s >= 0.10)
top_target = results[0].metadata["target"][:40] if results else ""
tag = "" if should_recall else ""
print(f" [{tag}] {qlabel:10s} top1={top1:.0%} top2={top2:.0%} gap={gap:.0%} "
f"≥5%:{above_5pct} ≥10%:{above_10pct}{top_target}")
# summary: average sharpness
total_gap = 0
total_top1 = 0
for qe, qlabel, _ in query_embs:
results = hip.recall(qe, top_k=5)
sims = [r.similarity for r in results]
total_top1 += sims[0] if sims else 0
total_gap += (sims[0] - sims[1]) if len(sims) > 1 else 0
n = len(query_embs)
print(f"\n avg top1={total_top1/n:.0%} avg gap={total_gap/n:.0%}")
print()
del hip
torch.cuda.empty_cache()

178
mem/import_claude.py Normal file
View File

@@ -0,0 +1,178 @@
"""Import Claude Code conversation history into nocmem.
Scans ~/.claude/projects/ for JSONL conversation files,
extracts user-assistant turn pairs, and ingests them via /ingest API.
Usage:
uv run python import_claude.py [--dry-run] [--limit N]
"""
import argparse
import json
import os
import sys
import time
from pathlib import Path
import requests
BASE = os.environ.get("NOCMEM_ENDPOINT", "http://127.0.0.1:9820")
CLAUDE_DIR = Path.home() / ".claude" / "projects"
def extract_turns(jsonl_path: Path) -> list[tuple[str, str]]:
"""Extract (user_msg, assistant_msg) pairs from a JSONL conversation."""
messages = [] # (role, text)
with open(jsonl_path) as f:
for line in f:
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
msg_type = obj.get("type")
if msg_type not in ("user", "assistant"):
continue
msg = obj.get("message", {})
content = msg.get("content", "")
# extract text from content
if isinstance(content, str):
text = content.strip()
elif isinstance(content, list):
parts = []
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
parts.append(part["text"])
text = "\n".join(parts).strip()
else:
continue
if not text or len(text) < 10:
continue
# skip tool-heavy assistant responses (mostly noise)
if msg_type == "assistant" and text.count("```") > 10:
continue
role = "user" if msg_type == "user" else "assistant"
messages.append((role, text))
# pair up user-assistant turns
turns = []
i = 0
while i < len(messages) - 1:
if messages[i][0] == "user":
# find next assistant
j = i + 1
while j < len(messages) and messages[j][0] != "assistant":
j += 1
if j < len(messages):
user_text = messages[i][1][:500] # truncate long messages
asst_text = messages[j][1][:500]
turns.append((user_text, asst_text))
i = j + 1
else:
i += 1
return turns
def ingest_turn(user_msg: str, assistant_msg: str) -> int:
"""Send a turn to nocmem /ingest, return number of memories stored."""
try:
r = requests.post(
f"{BASE}/ingest",
json={"user_msg": user_msg, "assistant_msg": assistant_msg},
timeout=120,
)
if r.status_code == 200:
return r.json().get("stored", 0)
except Exception as e:
print(f" error: {e}", file=sys.stderr)
return 0
def main():
parser = argparse.ArgumentParser(description="Import Claude Code history into nocmem")
parser.add_argument("--dry-run", action="store_true", help="just show what would be imported")
parser.add_argument("--limit", type=int, default=0, help="max turns to ingest (0=all)")
parser.add_argument("--project", type=str, default="", help="filter by project dir name substring")
args = parser.parse_args()
# find all conversation files
conversations = []
for project_dir in sorted(CLAUDE_DIR.iterdir()):
if not project_dir.is_dir():
continue
if args.project and args.project not in project_dir.name:
continue
for jsonl in sorted(project_dir.glob("*.jsonl")):
if "subagents" in str(jsonl):
continue
conversations.append((project_dir.name, jsonl))
print(f"found {len(conversations)} conversations in {CLAUDE_DIR}")
if args.project:
print(f" filtered by: {args.project}")
# extract all turns
all_turns = []
for project_name, jsonl_path in conversations:
turns = extract_turns(jsonl_path)
if turns:
all_turns.extend([(project_name, u, a) for u, a in turns])
print(f"extracted {len(all_turns)} turns total\n")
if args.limit:
all_turns = all_turns[:args.limit]
if args.dry_run:
for project, user_msg, asst_msg in all_turns[:20]:
print(f" [{project[:30]}]")
print(f" U: {user_msg[:80]}")
print(f" A: {asst_msg[:80]}")
print()
if len(all_turns) > 20:
print(f" ... and {len(all_turns) - 20} more")
return
# check server
try:
r = requests.get(f"{BASE}/stats", timeout=3)
r.raise_for_status()
before = r.json()["num_memories"]
print(f"nocmem: {before} memories before import\n")
except Exception:
print(f"ERROR: nocmem not reachable at {BASE}")
sys.exit(1)
# ingest
total_stored = 0
t0 = time.monotonic()
for i, (project, user_msg, asst_msg) in enumerate(all_turns):
stored = ingest_turn(user_msg, asst_msg)
total_stored += stored
if (i + 1) % 10 == 0:
elapsed = time.monotonic() - t0
rate = (i + 1) / elapsed
eta = (len(all_turns) - i - 1) / rate if rate > 0 else 0
print(f" [{i+1}/{len(all_turns)}] stored={total_stored} ({rate:.1f} turns/s, ETA {eta:.0f}s)")
elapsed = time.monotonic() - t0
# final stats
r = requests.get(f"{BASE}/stats")
after = r.json()["num_memories"]
print(f"\n{'='*50}")
print(f"imported {total_stored} memories from {len(all_turns)} turns")
print(f"nocmem: {before}{after} memories")
print(f"time: {elapsed:.1f}s")
if __name__ == "__main__":
main()

19
mem/nocmem.service Normal file
View File

@@ -0,0 +1,19 @@
[Unit]
Description=nocmem — NuoNuo memory service for NOC
After=network.target
[Service]
Type=simple
WorkingDirectory=/data/src/noc/mem
ExecStart=/home/fam/.local/bin/uv run uvicorn server:app --host 0.0.0.0 --port 9820 --log-level info
Restart=on-failure
RestartSec=5
Environment=NOCMEM_LLM_ENDPOINT=http://100.84.7.49:8000/v1
Environment=NOCMEM_LLM_MODEL=gemma4-31b
Environment=NOCMEM_LLM_API_KEY=unused
Environment=NOCMEM_DATA_DIR=/data/src/noc/mem/data
Environment=NOCMEM_DEVICE=cuda
[Install]
WantedBy=default.target

25
mem/pyproject.toml Normal file
View File

@@ -0,0 +1,25 @@
[project]
name = "nocmem"
version = "0.1.0"
description = "Memory service for noc — NuoNuo hippocampal recall + ingest over HTTP"
requires-python = ">=3.12"
dependencies = [
"fastapi>=0.115",
"uvicorn>=0.34",
"torch>=2.10,<2.11",
"sentence-transformers>=3.0",
"nuonuo",
"openai>=1.0",
]
[tool.uv]
index-url = "https://pypi.org/simple"
[[tool.uv.index]]
name = "pytorch-cu128"
url = "https://download.pytorch.org/whl/cu128"
explicit = true
[tool.uv.sources]
torch = { index = "pytorch-cu128" }
nuonuo = { path = "../../nuonuo", editable = true }

400
mem/server.py Normal file
View File

@@ -0,0 +1,400 @@
"""nocmem — Memory service for NOC.
Wraps NuoNuo's HippocampalMemory as an HTTP API.
Auto-recall on every user message, async ingest after LLM response.
"""
import asyncio
import os
import re
import time
import logging
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
import torch
from fastapi import FastAPI
from pydantic import BaseModel, Field
from sentence_transformers import SentenceTransformer
from openai import OpenAI
from nuonuo.hippocampus import HippocampalMemory
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("nocmem")
# ── config ──────────────────────────────────────────────────────────
EMBED_MODEL = os.environ.get("NOCMEM_EMBED_MODEL", "all-MiniLM-L6-v2")
EMBED_DIM = int(os.environ.get("NOCMEM_EMBED_DIM", "384"))
DEVICE = os.environ.get("NOCMEM_DEVICE", "cuda" if torch.cuda.is_available() else "cpu")
DATA_DIR = Path(os.environ.get("NOCMEM_DATA_DIR", "./data"))
CHECKPOINT = DATA_DIR / "hippocampus.pt"
SAVE_INTERVAL = int(os.environ.get("NOCMEM_SAVE_INTERVAL", "10")) # save every N stores
HOPFIELD_BETA = float(os.environ.get("NOCMEM_HOPFIELD_BETA", "32.0"))
HOPFIELD_TOP_K = int(os.environ.get("NOCMEM_HOPFIELD_TOP_K", "10"))
COS_SIM_THRESHOLD = float(os.environ.get("NOCMEM_COS_SIM_THRESHOLD", "0.35"))
# LLM for memory extraction (optional)
LLM_ENDPOINT = os.environ.get("NOCMEM_LLM_ENDPOINT", "")
LLM_MODEL = os.environ.get("NOCMEM_LLM_MODEL", "gemma4:12b")
LLM_API_KEY = os.environ.get("NOCMEM_LLM_API_KEY", "unused")
# ── globals ─────────────────────────────────────────────────────────
encoder: SentenceTransformer = None # type: ignore
hippocampus: HippocampalMemory = None # type: ignore
llm_client = None # optional
_stores_since_save = 0
def embed(text: str) -> torch.Tensor:
return encoder.encode(
[text], convert_to_tensor=True, normalize_embeddings=True, device=DEVICE
)[0]
def embed_batch(texts: list[str]) -> list[torch.Tensor]:
if not texts:
return []
t = encoder.encode(
texts, convert_to_tensor=True, normalize_embeddings=True, device=DEVICE
)
return [t[i] for i in range(t.shape[0])]
def maybe_save():
global _stores_since_save
_stores_since_save += 1
if _stores_since_save >= SAVE_INTERVAL:
_stores_since_save = 0
DATA_DIR.mkdir(parents=True, exist_ok=True)
hippocampus.save(str(CHECKPOINT))
logger.info("checkpoint saved: %s", CHECKPOINT)
# ── lifespan ────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
global encoder, hippocampus, llm_client
logger.info("loading embedding model: %s (device=%s)", EMBED_MODEL, DEVICE)
encoder = SentenceTransformer(EMBED_MODEL, device=DEVICE)
if CHECKPOINT.exists():
logger.info("loading checkpoint: %s", CHECKPOINT)
hippocampus = HippocampalMemory.load(str(CHECKPOINT), device=DEVICE)
logger.info("loaded %d memories", len(hippocampus.memories))
else:
logger.info("no checkpoint found, starting fresh")
hippocampus = HippocampalMemory(
embed_dim=EMBED_DIM, beta=HOPFIELD_BETA,
hopfield_top_k=HOPFIELD_TOP_K, device=DEVICE,
)
if LLM_ENDPOINT:
try:
client = OpenAI(base_url=LLM_ENDPOINT, api_key=LLM_API_KEY, timeout=5.0)
client.models.list()
llm_client = client
logger.info("LLM client connected: %s", LLM_ENDPOINT)
except Exception as e:
logger.warning("LLM client unavailable: %s", e)
yield
# save on shutdown
DATA_DIR.mkdir(parents=True, exist_ok=True)
hippocampus.save(str(CHECKPOINT))
logger.info("shutdown: checkpoint saved")
app = FastAPI(title="nocmem", lifespan=lifespan)
# ── models ──────────────────────────────────────────────────────────
class RecallRequest(BaseModel):
text: str
top_k: int = Field(default=5, ge=1, le=20)
hops: int = Field(default=2, ge=1, le=5)
min_similarity: float = Field(default=0.0, ge=0.0, le=1.0)
class RecallResponse(BaseModel):
memories: str
count: int
latency_ms: float
class IngestRequest(BaseModel):
user_msg: str
assistant_msg: str
class IngestResponse(BaseModel):
stored: int
class StoreRequest(BaseModel):
cue: str
target: str
importance: float = Field(default=0.5, ge=0.0, le=1.0)
class StoreResponse(BaseModel):
memory_id: int
# ── endpoints ───────────────────────────────────────────────────────
@app.post("/recall", response_model=RecallResponse)
async def recall(req: RecallRequest):
t0 = time.monotonic()
query_emb = embed(req.text)
# pre-filter: check if anything in memory is actually similar enough
cue_mat = hippocampus._get_cue_matrix()
if cue_mat is not None and COS_SIM_THRESHOLD > 0:
cos_sims = query_emb @ cue_mat.T
max_cos_sim = cos_sims.max().item()
if max_cos_sim < COS_SIM_THRESHOLD:
# nothing in memory is similar enough — don't hallucinate
return RecallResponse(memories="", count=0, latency_ms=(time.monotonic() - t0) * 1000)
# single-hop
results = hippocampus.recall(query_emb, top_k=req.top_k)
# multi-hop chain from top result
chain_results = []
if results and req.hops > 1:
chain = hippocampus.recall_chain(query_emb, hops=req.hops)
# add chain results not already in single-hop
seen_ids = {r.memory_id for r in results}
for cr in chain:
if cr.memory_id not in seen_ids:
chain_results.append(cr)
seen_ids.add(cr.memory_id)
all_results = results + chain_results
elapsed = (time.monotonic() - t0) * 1000
if not all_results:
return RecallResponse(memories="", count=0, latency_ms=elapsed)
lines = []
for r in all_results:
if r.similarity < req.min_similarity:
continue
meta = r.metadata
text = meta.get("target", meta.get("text", ""))
if not text:
continue
hop_tag = f" (联想 hop={r.hop_distance})" if r.hop_distance > 1 else ""
lines.append(f"- {text}{hop_tag}")
if not lines:
return RecallResponse(memories="", count=0, latency_ms=elapsed)
formatted = "[以下是可能相关的历史记忆,仅供参考。请优先关注用户当前的消息。]\n" + "\n".join(lines)
return RecallResponse(memories=formatted, count=len(lines), latency_ms=elapsed)
@app.post("/ingest", response_model=IngestResponse)
async def ingest(req: IngestRequest):
extracted = await asyncio.to_thread(_extract_and_store, req.user_msg, req.assistant_msg)
return IngestResponse(stored=extracted)
@dataclass
class ExtractedMemory:
cue: str
target: str
importance: float = 0.5
def _extract_memories_llm(user_msg: str, assistant_msg: str) -> list[ExtractedMemory]:
prompt = (
'你是一个记忆提取器。把这段对话变成若干个"问答对"——未来有人问这个问题时,能直接给出答案。\n\n'
"要求:\n"
"- 问题要自然,像人真的会这么问\n"
"- 答案要具体完整,包含关键细节(名称、数字、地址等)\n"
"- 同一个事实可以从不同角度提问\n"
"- 每条 CUE 提供 2-3 个不同的触发短语,用分号分隔\n\n"
"格式(每行一条):\n"
"CUE: <提问方式1>; <提问方式2>; <提问方式3> | TARGET: <完整的回答> | IMPORTANCE: <0-1>\n\n"
f"User: {user_msg}\nAssistant: {assistant_msg}\n\n"
"没有值得记住的则输出 NONE。"
)
try:
resp = llm_client.chat.completions.create(
model=LLM_MODEL, messages=[{"role": "user", "content": prompt}],
temperature=0.3, max_tokens=512,
)
result = resp.choices[0].message.content
except Exception:
return _extract_memories_heuristic(user_msg, assistant_msg)
memories = []
for line in result.strip().split("\n"):
if line.strip() == "NONE":
break
m = re.match(r"CUE:\s*(.+?)\s*\|\s*TARGET:\s*(.+?)\s*\|\s*IMPORTANCE:\s*([\d.]+)", line)
if m:
memories.append(ExtractedMemory(m.group(1).strip(), m.group(2).strip(), float(m.group(3))))
return memories
def _extract_memories_heuristic(user_msg: str, assistant_msg: str) -> list[ExtractedMemory]:
memories = []
# detect questions — English and Chinese
has_question = "?" in user_msg or "" in user_msg or any(
user_msg.strip().startswith(q) for q in ["怎么", "什么", "", "为什么", "如何", "多少", ""]
)
# count meaningful length: for Chinese, use character count
assistant_long_enough = len(assistant_msg) > 20
if has_question and assistant_long_enough:
cue = user_msg.rstrip("?").strip()
memories.append(ExtractedMemory(
cue=cue, target=assistant_msg[:300], importance=0.6,
))
# tech keywords — English and Chinese
tech_keywords = [
"deploy", "config", "bug", "fix", "error", "database", "server",
"api", "port", "token", "password", "version", "install", "upgrade",
"部署", "配置", "错误", "数据库", "服务器", "端口", "密码", "版本",
"安装", "升级", "模型", "工具", "代码", "项目", "优化", "性能",
"内存", "GPU", "vllm", "docker", "k8s", "git", "编译", "测试",
]
combined = (user_msg + " " + assistant_msg).lower()
user_meaningful = len(user_msg) >= 8 # characters, not words
if any(kw in combined for kw in tech_keywords) and user_meaningful:
if not memories: # avoid duplicate with Q&A extraction
memories.append(ExtractedMemory(
cue=user_msg[:150], target=assistant_msg[:300], importance=0.5,
))
return memories
def _generate_paraphrases_heuristic(text: str, n: int = 3) -> list[str]:
variants = []
text_lower = text.lower().strip()
# English prefixes
for pfx in ["can you ", "please ", "i need to ", "how do i ", "how to ", "what is ", "what's "]:
if text_lower.startswith(pfx):
stripped = text[len(pfx):].strip()
if stripped:
variants.append(stripped)
# Chinese prefixes
for pfx in ["帮我看看", "帮我", "请问", "我想知道", "能不能", "怎么样", "看下", "看看"]:
if text.startswith(pfx):
stripped = text[len(pfx):].strip()
if stripped:
variants.append(stripped)
# synonym swaps — English
en_swaps = {"slow": "performance issues", "fix": "resolve", "deploy": "release",
"error": "issue", "bug": "problem", "database": "DB", "server": "machine"}
for old, new in en_swaps.items():
if old in text_lower:
variant = text.replace(old, new).replace(old.capitalize(), new.capitalize())
if variant != text and variant not in variants:
variants.append(variant)
# synonym swaps — Chinese
cn_swaps = {"数据库": "DB", "服务器": "机器", "部署": "上线", "配置": "设置",
"性能": "速度", "优化": "改进", "工具": "tool", "项目": "project"}
for old, new in cn_swaps.items():
if old in text:
variant = text.replace(old, new)
if variant != text and variant not in variants:
variants.append(variant)
return variants[:n]
def _generate_paraphrases_llm(text: str, n: int = 3) -> list[str]:
prompt = f"Generate {n} different paraphrases of this text. Each should convey the same meaning but use different words. One per line, no numbering.\n\nText: {text}"
try:
resp = llm_client.chat.completions.create(
model=LLM_MODEL, messages=[{"role": "user", "content": prompt}],
temperature=0.8, max_tokens=256,
)
result = resp.choices[0].message.content
return [l.strip() for l in result.strip().split("\n") if l.strip() and len(l.strip()) > 3][:n]
except Exception:
return _generate_paraphrases_heuristic(text, n)
def _extract_and_store(user_msg: str, assistant_msg: str) -> int:
if llm_client:
memories = _extract_memories_llm(user_msg, assistant_msg)
else:
memories = _extract_memories_heuristic(user_msg, assistant_msg)
if not memories:
return 0
stored = 0
for mem in memories:
if mem.importance < 0.3:
continue
# split semicolon-separated cues into primary + variants
cue_parts = [p.strip() for p in mem.cue.split(";") if p.strip()]
primary_cue = cue_parts[0] if cue_parts else mem.cue
inline_variants = cue_parts[1:] if len(cue_parts) > 1 else []
cue_emb = embed(primary_cue)
target_emb = embed(mem.target)
# inline variants from semicolon cues (already in the extraction)
variant_embs = embed_batch(inline_variants) if inline_variants else []
# additionally generate paraphrases if no inline variants
if not inline_variants:
if llm_client:
paraphrases = _generate_paraphrases_llm(primary_cue, n=3)
else:
paraphrases = _generate_paraphrases_heuristic(primary_cue, n=3)
variant_embs = embed_batch(paraphrases) if paraphrases else []
hippocampus.store(
cue_emb, target_emb,
cue_variants=variant_embs if variant_embs else None,
metadata={"cue": mem.cue, "target": mem.target, "importance": mem.importance},
timestamp=time.time(),
)
stored += 1
if stored > 0:
maybe_save()
logger.info("ingested %d memories from conversation turn", stored)
return stored
@app.post("/store", response_model=StoreResponse)
async def store_direct(req: StoreRequest):
"""Direct store — bypass LLM extraction, for manual/testing use."""
cue_emb = embed(req.cue)
target_emb = embed(req.target)
mid = hippocampus.store(
cue_emb, target_emb,
metadata={"cue": req.cue, "target": req.target, "importance": req.importance},
timestamp=time.time(),
)
maybe_save()
return StoreResponse(memory_id=mid)
@app.get("/stats")
async def stats():
s = hippocampus.stats()
s["device"] = DEVICE
s["embedding_model"] = EMBED_MODEL
s["checkpoint"] = str(CHECKPOINT)
s["checkpoint_exists"] = CHECKPOINT.exists()
return s
@app.delete("/memory/{memory_id}")
async def forget(memory_id: int):
hippocampus.forget(memory_id)
maybe_save()
return {"deleted": memory_id}

390
mem/test_api.py Normal file
View File

@@ -0,0 +1,390 @@
"""nocmem API integration tests.
Run with: uv run python test_api.py
Requires nocmem server running on localhost:9820.
"""
import sys
import time
import requests
BASE = "http://127.0.0.1:9820"
PASS = 0
FAIL = 0
def test(name: str, fn):
global PASS, FAIL
try:
fn()
print(f"{name}")
PASS += 1
except AssertionError as e:
print(f"{name}: {e}")
FAIL += 1
except Exception as e:
print(f"{name}: EXCEPTION {e}")
FAIL += 1
def assert_eq(a, b, msg=""):
assert a == b, f"expected {b!r}, got {a!r}" + (f" ({msg})" if msg else "")
def assert_gt(a, b, msg=""):
assert a > b, f"expected > {b!r}, got {a!r}" + (f" ({msg})" if msg else "")
def assert_in(needle, haystack, msg=""):
assert needle in haystack, f"{needle!r} not in {haystack!r}" + (f" ({msg})" if msg else "")
# ── health check ────────────────────────────────────────────────────
def check_server():
try:
r = requests.get(f"{BASE}/stats", timeout=3)
r.raise_for_status()
return True
except Exception:
return False
# ── test: stats on empty db ─────────────────────────────────────────
def test_stats_empty():
r = requests.get(f"{BASE}/stats")
assert_eq(r.status_code, 200)
data = r.json()
assert "num_memories" in data
assert "device" in data
assert_eq(data["embedding_model"], "all-MiniLM-L6-v2")
# ── test: recall on empty db ───────────────────────<E29480><E29480><EFBFBD>────────────────
def test_recall_empty():
r = requests.post(f"{BASE}/recall", json={"text": "hello"})
assert_eq(r.status_code, 200)
data = r.json()
assert_eq(data["memories"], "")
assert_eq(data["count"], 0)
# ── test: direct store ────────<E29480><E29480><EFBFBD>─────────────────────────────────────
stored_ids = []
def test_store_single():
r = requests.post(f"{BASE}/store", json={
"cue": "what port does postgres run on",
"target": "PostgreSQL runs on port 5432",
"importance": 0.8,
})
assert_eq(r.status_code, 200)
data = r.json()
assert "memory_id" in data
stored_ids.append(data["memory_id"])
def test_store_multiple():
memories = [
{"cue": "what is the database password", "target": "The DB password is stored in /etc/secrets/db.env", "importance": 0.9},
{"cue": "how to deploy the app", "target": "Run make deploy-hera to deploy to the suite VPS via SSH", "importance": 0.7},
{"cue": "what timezone is Fam in", "target": "Fam is in London, UK timezone (Europe/London, GMT/BST)", "importance": 0.6},
{"cue": "which embedding model works best", "target": "all-MiniLM-L6-v2 has the best gap metric for hippocampal memory", "importance": 0.8},
{"cue": "what GPU does the server have", "target": "The server has an NVIDIA RTX 4090 with 24GB VRAM", "importance": 0.7},
]
for m in memories:
r = requests.post(f"{BASE}/store", json=m)
assert_eq(r.status_code, 200)
stored_ids.append(r.json()["memory_id"])
# ── test: exact recall ──────────────────────────────────────────────
def test_recall_exact():
"""Recall with the exact cue text should return the right memory."""
r = requests.post(f"{BASE}/recall", json={
"text": "what port does postgres run on",
"top_k": 3,
})
assert_eq(r.status_code, 200)
data = r.json()
assert_gt(data["count"], 0, "should recall at least 1")
assert_in("5432", data["memories"], "should mention port 5432")
# ── test: paraphrase recall ─────────────────────────────────────────
def test_recall_paraphrase():
"""Recall with a paraphrased query (not exact cue text)."""
r = requests.post(f"{BASE}/recall", json={
"text": "which port is postgresql listening on",
"top_k": 3,
})
assert_eq(r.status_code, 200)
data = r.json()
assert_gt(data["count"], 0, "paraphrase should still recall")
assert_in("5432", data["memories"])
def test_recall_different_wording():
"""Even more different wording."""
r = requests.post(f"{BASE}/recall", json={
"text": "database connection port number",
"top_k": 3,
})
assert_eq(r.status_code, 200)
data = r.json()
assert_gt(data["count"], 0, "different wording should recall")
assert_in("5432", data["memories"])
# ── test: recall relevance ──────────────────────────────────────────
def test_recall_deployment():
r = requests.post(f"{BASE}/recall", json={
"text": "how do I deploy to production",
"top_k": 3,
})
assert_eq(r.status_code, 200)
data = r.json()
assert_gt(data["count"], 0)
assert_in("deploy", data["memories"].lower())
def test_recall_timezone():
r = requests.post(f"{BASE}/recall", json={
"text": "where is Fam located",
"top_k": 3,
})
assert_eq(r.status_code, 200)
data = r.json()
assert_gt(data["count"], 0)
assert_in("London", data["memories"])
def test_recall_gpu():
r = requests.post(f"{BASE}/recall", json={
"text": "what hardware does the server have",
"top_k": 3,
})
assert_eq(r.status_code, 200)
data = r.json()
assert_gt(data["count"], 0)
assert_in("4090", data["memories"])
# ── test: top_k ─────────────────────────────────────────────────────
def test_recall_top_k_1():
r = requests.post(f"{BASE}/recall", json={
"text": "postgres port",
"top_k": 1,
})
data = r.json()
assert_eq(data["count"], 1, "top_k=1 should return exactly 1")
def test_recall_top_k_all():
r = requests.post(f"{BASE}/recall", json={
"text": "tell me everything",
"top_k": 20,
})
data = r.json()
assert_gt(data["count"], 0, "should recall something")
# ── test: recall latency ────────────────────────────────────────────
def test_recall_latency():
"""Recall should be fast (< 100ms including HTTP + embedding)."""
t0 = time.monotonic()
r = requests.post(f"{BASE}/recall", json={"text": "database port"})
elapsed_ms = (time.monotonic() - t0) * 1000
data = r.json()
# internal latency (no HTTP overhead)
assert data["latency_ms"] < 100, f"internal latency {data['latency_ms']:.1f}ms too high"
# end-to-end including HTTP
print(f" (e2e={elapsed_ms:.1f}ms, internal={data['latency_ms']:.1f}ms)")
# ── test: ingest (heuristic, no LLM) ───────────────────────────────
def test_ingest_heuristic():
"""Ingest without LLM should use heuristic extraction."""
r = requests.post(f"{BASE}/ingest", json={
"user_msg": "What version of Python are we running?",
"assistant_msg": "We are running Python 3.12.4 on the server, installed via uv.",
})
assert_eq(r.status_code, 200)
data = r.json()
# heuristic should extract at least the Q&A pair
assert_gt(data["stored"], 0, "heuristic should extract at least 1 memory")
def test_ingest_then_recall():
"""After ingesting, the memory should be recallable."""
# first ingest
requests.post(f"{BASE}/ingest", json={
"user_msg": "What's the Redis cache TTL?",
"assistant_msg": "The Redis cache TTL is set to 3600 seconds (1 hour) in production.",
})
# wait a tiny bit for async processing
time.sleep(0.5)
# then recall
r = requests.post(f"{BASE}/recall", json={
"text": "redis cache timeout",
"top_k": 3,
})
data = r.json()
assert_gt(data["count"], 0, "ingested memory should be recallable")
# Check it mentions the TTL
assert_in("3600", data["memories"], "should recall the TTL value")
# ── test: forget ───────────<E29480><E29480><EFBFBD>────────────────────────<E29480><E29480>───────────────
def test_forget():
"""Delete a memory and verify it's gone."""
# store something
r = requests.post(f"{BASE}/store", json={
"cue": "temporary test memory for deletion",
"target": "this should be deleted XYZZY",
})
mid = r.json()["memory_id"]
# verify it's recallable
r = requests.post(f"{BASE}/recall", json={"text": "temporary test memory for deletion"})
assert_in("XYZZY", r.json()["memories"])
# delete
r = requests.delete(f"{BASE}/memory/{mid}")
assert_eq(r.status_code, 200)
# verify gone — recall the exact cue, should not return XYZZY
r = requests.post(f"{BASE}/recall", json={"text": "temporary test memory for deletion"})
if r.json()["memories"]:
assert "XYZZY" not in r.json()["memories"], "deleted memory should not appear"
# ── test: format ─────────────────────────────────────<E29480><E29480>──────────────
def test_recall_format():
"""Recalled memories should have the expected format."""
r = requests.post(f"{BASE}/recall", json={"text": "postgres port"})
data = r.json()
if data["count"] > 0:
assert data["memories"].startswith("[相关记忆]"), "should start with header"
assert "\n- " in data["memories"], "each memory should start with '- '"
# ── test: stats after stores ──────<E29480><E29480>─────────────────────────────────
def test_stats_after():
r = requests.get(f"{BASE}/stats")
data = r.json()
assert_gt(data["num_memories"], 0, "should have memories")
assert_gt(data["num_cue_entries"], data["num_memories"],
"cue entries should >= memories (augmentation from ingest)")
# ── test: edge cases ────────────────────────────────────────────────
def test_recall_empty_text():
r = requests.post(f"{BASE}/recall", json={"text": ""})
# should not crash
assert r.status_code == 200
def test_recall_long_text():
r = requests.post(f"{BASE}/recall", json={"text": "a " * 1000})
assert r.status_code == 200
def test_recall_chinese():
"""Chinese text should work."""
# store a Chinese memory
requests.post(f"{BASE}/store", json={
"cue": "数据库密码在哪里",
"target": "数据库密码存在 /etc/secrets/db.env 文件中",
})
r = requests.post(f"{BASE}/recall", json={"text": "数据库密码"})
data = r.json()
assert_gt(data["count"], 0, "Chinese recall should work")
assert_in("secrets", data["memories"])
def test_store_validation():
"""Missing required fields should return 422."""
r = requests.post(f"{BASE}/store", json={"cue": "only cue"})
assert_eq(r.status_code, 422)
# ── run ─────<E29480><E29480><EFBFBD>───────────────────────────────────────────────────────
def main():
global PASS, FAIL
print("nocmem API tests")
print(f"server: {BASE}\n")
if not check_server():
print("ERROR: server not reachable")
sys.exit(1)
# first clean slate — check what we start with
r = requests.get(f"{BASE}/stats")
initial = r.json()["num_memories"]
print(f"[initial state: {initial} memories]\n")
print("── basic ──")
test("stats endpoint", test_stats_empty)
test("recall on empty/existing db", test_recall_empty if initial == 0 else lambda: None)
print("\n── store ──")
test("store single memory", test_store_single)
test("store multiple memories", test_store_multiple)
print("\n── recall accuracy ─<><E29480><EFBFBD>")
test("exact cue recall", test_recall_exact)
test("paraphrase recall", test_recall_paraphrase)
test("different wording recall", test_recall_different_wording)
test("deployment query", test_recall_deployment)
test("timezone query", test_recall_timezone)
test("GPU query", test_recall_gpu)
print("\n── recall params ──")
test("top_k=1", test_recall_top_k_1)
test("top_k=20 (all)", test_recall_top_k_all)
test("latency < 100ms", test_recall_latency)
test("format check", test_recall_format)
print("\n── ingest ──")
test("heuristic ingest", test_ingest_heuristic)
test("ingest then recall", test_ingest_then_recall)
print("\n── forget ──")
test("store + forget + verify", test_forget)
print("\n── edge cases ──")
test("empty text", test_recall_empty_text)
test("long text", test_recall_long_text)
test("Chinese text", test_recall_chinese)
test("validation error", test_store_validation)
print("\n── stats ──")
test("stats after stores", test_stats_after)
print(f"\n{'='*40}")
print(f"PASS: {PASS} FAIL: {FAIL}")
if FAIL:
sys.exit(1)
else:
print("All tests passed!")
if __name__ == "__main__":
main()

279
mem/test_real_data.py Normal file
View File

@@ -0,0 +1,279 @@
"""Test nocmem with real conversation data from NOC's SQLite database.
Extracts conversation turns, ingests them, then tests recall with
realistic queries that a user would actually ask.
"""
import sys
import time
import sqlite3
import requests
BASE = "http://127.0.0.1:9820"
DB_PATH = "/data/src/noc/noc.db"
PASS = 0
FAIL = 0
def test(name, fn):
global PASS, FAIL
try:
fn()
print(f"{name}")
PASS += 1
except AssertionError as e:
print(f"{name}: {e}")
FAIL += 1
except Exception as e:
print(f"{name}: EXCEPTION {e}")
FAIL += 1
# ── step 1: extract conversation turns from SQLite ──────────────────
def extract_turns():
"""Extract (user_msg, assistant_msg) pairs from the database."""
conn = sqlite3.connect(DB_PATH)
rows = conn.execute(
"SELECT role, content FROM messages ORDER BY id"
).fetchall()
conn.close()
turns = []
i = 0
while i < len(rows) - 1:
role, content = rows[i]
# skip non-user messages, agent outputs, very short messages
if role != "user" or len(content) < 5 or content.startswith("[Agent ") or content.startswith("[用户上传") or content.startswith("[语音消息]"):
i += 1
continue
# find the next assistant reply
j = i + 1
while j < len(rows) and rows[j][0] != "assistant":
j += 1
if j < len(rows):
assistant_content = rows[j][1]
if len(assistant_content) > 10 and "<pad>" not in assistant_content:
turns.append((content, assistant_content))
i = j + 1
return turns
# ── step 2: ingest all turns ───────────────────────────────────────
def ingest_turns(turns):
"""Ingest conversation turns via /ingest endpoint."""
total_stored = 0
for user_msg, assistant_msg in turns:
r = requests.post(f"{BASE}/ingest", json={
"user_msg": user_msg,
"assistant_msg": assistant_msg,
})
if r.status_code == 200:
total_stored += r.json().get("stored", 0)
return total_stored
# ── step 3: also store some key facts directly ─────────────────────
def store_key_facts():
"""Store critical facts that heuristic extraction might miss."""
facts = [
{"cue": "bot的名字叫什么", "target": "bot的名字叫小乖是Fam给取的", "importance": 0.9},
{"cue": "有哪些工具可以用", "target": "工具有: fam_todo(飞书待办), send_file(发文件), spawn_agent/agent_status/kill_agent(子代理管理), run_shell, run_python, update_memory, update_inner_state, gen_voice", "importance": 0.8},
{"cue": "vLLM在5090上的性能", "target": "RTX 5090上vLLM跑gemma模型只有4.8 tok/s需要切换到awq_marlin量化来提升速度", "importance": 0.8},
{"cue": "repo-vis项目是什么", "target": "repo-vis是一个用Rust后端+Three.js前端的3D代码库可视化工具目标支持Linux内核级别的大型仓库和Pico VR", "importance": 0.8},
{"cue": "repo-vis的性能瓶颈", "target": "Linux内核79K文件量级下SQLite 1GB上限和O(n)全量反序列化是瓶颈需要n-ary tree按需合并优化", "importance": 0.9},
{"cue": "明天的待办事项", "target": "最紧迫的是emblem scanner的AI Chat和KB部分最高优先级然后是曲面二维码识读优化信息收集", "importance": 0.7},
{"cue": "后端切换到了什么", "target": "NOC后端从原来的方案切换到了vLLM速度变快了", "importance": 0.7},
{"cue": "home目录下有多少log文件", "target": "home目录及子目录下共有960个.log文件", "importance": 0.5},
]
stored = 0
for f in facts:
r = requests.post(f"{BASE}/store", json=f)
if r.status_code == 200:
stored += 1
return stored
# ── step 4: recall tests with realistic queries ────────────────────
def test_recall_bot_name():
r = requests.post(f"{BASE}/recall", json={"text": "你叫什么名字"})
data = r.json()
assert data["count"] > 0, "should recall something"
assert "小乖" in data["memories"], f"should mention 小乖, got: {data['memories'][:200]}"
def test_recall_tools():
r = requests.post(f"{BASE}/recall", json={"text": "有什么工具可以用"})
data = r.json()
assert data["count"] > 0
m = data["memories"].lower()
assert "tool" in m or "工具" in m or "spawn" in m or "fam_todo" in m, f"should mention tools, got: {data['memories'][:200]}"
def test_recall_vllm():
r = requests.post(f"{BASE}/recall", json={"text": "vllm性能怎么样"})
data = r.json()
assert data["count"] > 0
assert "4.8" in data["memories"] or "5090" in data["memories"] or "tok" in data["memories"], \
f"should mention vLLM stats, got: {data['memories'][:200]}"
def test_recall_repovis():
r = requests.post(f"{BASE}/recall", json={"text": "repo-vis项目"})
data = r.json()
assert data["count"] > 0
m = data["memories"]
assert "Rust" in m or "Three" in m or "3D" in m or "可视化" in m, \
f"should mention repo-vis tech, got: {m[:200]}"
def test_recall_performance_bottleneck():
r = requests.post(f"{BASE}/recall", json={"text": "Linux内核代码仓库跑不动"})
data = r.json()
assert data["count"] > 0
m = data["memories"]
assert "SQLite" in m or "79K" in m or "瓶颈" in m or "n-ary" in m or "内核" in m, \
f"should mention bottleneck, got: {m[:200]}"
def test_recall_todo():
r = requests.post(f"{BASE}/recall", json={"text": "待办事项有哪些"})
data = r.json()
assert data["count"] > 0
m = data["memories"]
assert "emblem" in m.lower() or "todo" in m.lower() or "待办" in m or "scanner" in m.lower(), \
f"should mention todos, got: {m[:200]}"
def test_recall_vr():
r = requests.post(f"{BASE}/recall", json={"text": "VR支持"})
data = r.json()
assert data["count"] > 0
m = data["memories"]
assert "Pico" in m or "VR" in m or "repo-vis" in m.lower(), \
f"should mention VR, got: {m[:200]}"
def test_recall_chinese_natural():
"""Test with natural Chinese conversational query."""
r = requests.post(f"{BASE}/recall", json={"text": "之前聊过什么技术话题"})
data = r.json()
assert data["count"] > 0, "should recall some technical topics"
def test_recall_cross_topic():
"""Query that spans multiple memories — should return diverse results."""
r = requests.post(f"{BASE}/recall", json={
"text": "项目进度和优化",
"top_k": 5,
})
data = r.json()
assert data["count"] >= 2, f"should recall multiple memories, got {data['count']}"
def test_recall_log_files():
r = requests.post(f"{BASE}/recall", json={"text": "日志文件有多少"})
data = r.json()
assert data["count"] > 0
assert "960" in data["memories"] or "log" in data["memories"].lower(), \
f"should mention log files, got: {data['memories'][:200]}"
# ── step 5: multi-hop chain test ──────────────────────────────────
def test_multihop_chain():
"""Test if Hebbian chaining connects related memories.
repo-vis → performance bottleneck → n-ary tree optimization
"""
r = requests.post(f"{BASE}/recall", json={
"text": "repo-vis",
"top_k": 3,
"hops": 3,
})
data = r.json()
assert data["count"] > 0
# print chain for inspection
print(f" chain: {data['memories'][:300]}")
# ── step 6: latency with real data ─────────────────────────────────
def test_latency_with_data():
"""Recall latency after loading real data."""
times = []
for q in ["工具", "vllm", "项目", "待办", "性能"]:
r = requests.post(f"{BASE}/recall", json={"text": q})
times.append(r.json()["latency_ms"])
avg = sum(times) / len(times)
print(f" avg latency: {avg:.1f}ms (max: {max(times):.1f}ms)")
assert avg < 50, f"average latency {avg:.1f}ms too high"
# ── main ────────────────────────────────────────────────────────────
def main():
global PASS, FAIL
print("nocmem real-data test")
print(f"server: {BASE}")
print(f"database: {DB_PATH}\n")
# check server
try:
requests.get(f"{BASE}/stats", timeout=3).raise_for_status()
except Exception:
print("ERROR: server not reachable")
sys.exit(1)
# extract
print("── extract ──")
turns = extract_turns()
print(f" extracted {len(turns)} conversation turns")
# ingest
print("\n── ingest (heuristic, no LLM) ──")
t0 = time.monotonic()
ingested = ingest_turns(turns)
elapsed = time.monotonic() - t0
print(f" ingested {ingested} memories from {len(turns)} turns ({elapsed:.1f}s)")
# store key facts
print("\n── store key facts ──")
stored = store_key_facts()
print(f" stored {stored} key facts")
# stats
r = requests.get(f"{BASE}/stats")
stats = r.json()
print(f"\n── memory stats ──")
print(f" memories: {stats['num_memories']}")
print(f" cue entries: {stats['num_cue_entries']} (aug ratio: {stats['augmentation_ratio']:.1f}x)")
print(f" W norm: {stats['w_norm']:.1f}")
# recall tests
print(f"\n── recall accuracy (natural language queries) ──")
test("bot的名字", test_recall_bot_name)
test("可用工具", test_recall_tools)
test("vLLM性能", test_recall_vllm)
test("repo-vis项目", test_recall_repovis)
test("性能瓶颈", test_recall_performance_bottleneck)
test("待办事项", test_recall_todo)
test("VR支持", test_recall_vr)
test("log文件数量", test_recall_log_files)
test("自然中文查询", test_recall_chinese_natural)
test("跨主题召回", test_recall_cross_topic)
print(f"\n── multi-hop chain ──")
test("repo-vis联想链", test_multihop_chain)
print(f"\n── latency ──")
test("平均延迟 < 50ms", test_latency_with_data)
print(f"\n{'='*50}")
total = PASS + FAIL
print(f"PASS: {PASS}/{total} FAIL: {FAIL}/{total}")
if FAIL:
sys.exit(1)
else:
print("All tests passed!")
if __name__ == "__main__":
main()

1796
mem/uv.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -12,7 +12,6 @@ RestartSec=5
Environment=RUST_LOG=noc=info
Environment=RUST_BACKTRACE=1
Environment=NOC_CONFIG=@REPO@/config.yaml
Environment=NOC_STATE=@REPO@/state.json
Environment=PATH=@PATH@
[Install]

View File

@@ -1,6 +1,6 @@
use serde::Deserialize;
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct Config {
#[serde(default = "default_name")]
pub name: String,
@@ -13,6 +13,13 @@ pub struct Config {
pub whisper_url: Option<String>,
#[serde(default)]
pub gitea: Option<GiteaConfig>,
#[serde(default)]
pub nocmem: Option<NocmemConfig>,
}
#[derive(Deserialize, Clone)]
pub struct NocmemConfig {
pub endpoint: String,
}
#[derive(Deserialize, Clone)]
@@ -71,17 +78,17 @@ fn default_api_key() -> String {
"unused".to_string()
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct TgConfig {
pub key: String,
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct AuthConfig {
pub passphrase: String,
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct SessionConfig {
pub refresh_hour: u32,
}

View File

@@ -151,27 +151,13 @@ pub struct WebhookState {
pub bot_user: String,
}
pub async fn start_webhook_server(config: &GiteaConfig, bot_user: String) {
pub fn webhook_router(config: &GiteaConfig, bot_user: String) -> axum::Router<()> {
let gitea = GiteaClient::new(config);
let state = Arc::new(WebhookState {
gitea,
bot_user,
});
let state = Arc::new(WebhookState { gitea, bot_user });
let app = axum::Router::new()
axum::Router::new()
.route("/webhook/gitea", post(handle_webhook))
.with_state(state);
let addr = format!("0.0.0.0:{}", config.webhook_port);
info!("gitea webhook server listening on {addr}");
let listener = tokio::net::TcpListener::bind(&addr)
.await
.unwrap_or_else(|e| panic!("bind {addr}: {e}"));
if let Err(e) = axum::serve(listener, app).await {
error!("webhook server error: {e}");
}
.with_state(state)
}
async fn handle_webhook(

196
src/http.rs Normal file
View File

@@ -0,0 +1,196 @@
use std::sync::Arc;
use axum::extract::{Path, State as AxumState};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::{get, post};
use axum::Json;
use tokio::sync::mpsc;
use tracing::{error, info};
use crate::config::{BackendConfig, Config};
use crate::life::LifeEvent;
use crate::output::BufferOutput;
use crate::state::AppState;
use crate::stream::{build_system_prompt, run_openai_with_tools};
#[derive(Clone)]
pub struct HttpState {
pub app_state: Arc<AppState>,
pub config: Arc<Config>,
pub life_tx: mpsc::Sender<LifeEvent>,
}
pub async fn start_http_server(
config: &Config,
app_state: Arc<AppState>,
life_tx: mpsc::Sender<LifeEvent>,
) {
let port = config
.gitea
.as_ref()
.map(|g| g.webhook_port)
.unwrap_or(9880);
let config = Arc::new(config.clone());
let state = Arc::new(HttpState {
app_state,
config,
life_tx,
});
// merge gitea webhook router if configured
let gitea_router = state.config.gitea.as_ref().map(|gitea_config| {
let bot_user = std::env::var("GITEA_ADMIN_USER").unwrap_or_else(|_| "noc".into());
crate::gitea::webhook_router(gitea_config, bot_user)
});
let mut app = axum::Router::new()
.route("/api/timers", get(list_timers))
.route("/api/timers/{id}/fire", post(fire_timer))
.route("/api/chat", post(api_chat))
.route("/api/logs", get(api_logs))
.with_state(state);
if let Some(router) = gitea_router {
app = app.merge(router);
}
let addr = format!("0.0.0.0:{port}");
info!("http server listening on {addr}");
let listener = tokio::net::TcpListener::bind(&addr)
.await
.unwrap_or_else(|e| panic!("bind {addr}: {e}"));
if let Err(e) = axum::serve(listener, app).await {
error!("http server error: {e}");
}
}
async fn list_timers(AxumState(state): AxumState<Arc<HttpState>>) -> impl IntoResponse {
let timers = state.app_state.list_timers(None).await;
let items: Vec<serde_json::Value> = timers
.iter()
.map(|(id, chat_id, label, schedule, next_fire, enabled)| {
serde_json::json!({
"id": id,
"chat_id": chat_id,
"label": label,
"schedule": schedule,
"next_fire": next_fire,
"enabled": enabled,
})
})
.collect();
Json(serde_json::json!(items))
}
async fn api_chat(
AxumState(state): AxumState<Arc<HttpState>>,
Json(payload): Json<serde_json::Value>,
) -> impl IntoResponse {
let message = payload["message"].as_str().unwrap_or("").to_string();
if message.is_empty() {
return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "message required"})));
}
let BackendConfig::OpenAI {
ref endpoint,
ref model,
ref api_key,
} = state.config.backend
else {
return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "no openai backend"})));
};
let persona = state.app_state.get_config("persona").await.unwrap_or_default();
let memory_slots = state.app_state.get_memory_slots().await;
let inner_state = state.app_state.get_inner_state().await;
let system = build_system_prompt("", &persona, &memory_slots, &inner_state);
let mut messages = vec![
system,
serde_json::json!({"role": "user", "content": message}),
];
// auto recall from nocmem
if let Some(ref nocmem) = state.config.nocmem {
let recalled = crate::nocmem::recall(&nocmem.endpoint, &message).await;
if !recalled.is_empty() {
messages.push(serde_json::json!({"role": "system", "content": recalled}));
}
}
let sid = format!("api-{}", chrono::Local::now().timestamp());
let mut output = BufferOutput::new();
info!("api chat: {}", &message[..message.len().min(100)]);
match run_openai_with_tools(
endpoint, model, api_key, messages.clone(), &mut output, &state.app_state, &sid, &state.config, 0,
)
.await
{
Ok(response) => {
// async ingest
if let Some(ref nocmem) = state.config.nocmem {
if !response.is_empty() {
crate::nocmem::ingest_spawn(
nocmem.endpoint.clone(),
message.clone(),
response.clone(),
);
}
}
(StatusCode::OK, Json(serde_json::json!({"response": response})))
}
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("{e:#}")}))),
}
}
async fn api_logs(
AxumState(state): AxumState<Arc<HttpState>>,
) -> impl IntoResponse {
let db = state.app_state.db.lock().await;
let mut stmt = db
.prepare("SELECT id, session_id, status, length(request), length(response), created_at FROM api_log ORDER BY id DESC LIMIT 20")
.unwrap();
let logs: Vec<serde_json::Value> = stmt
.query_map([], |row| {
Ok(serde_json::json!({
"id": row.get::<_, i64>(0)?,
"session_id": row.get::<_, String>(1)?,
"status": row.get::<_, i64>(2)?,
"request_len": row.get::<_, i64>(3)?,
"response_len": row.get::<_, i64>(4)?,
"created_at": row.get::<_, String>(5)?,
}))
})
.unwrap()
.filter_map(|r| r.ok())
.collect();
Json(serde_json::json!(logs))
}
async fn fire_timer(
AxumState(state): AxumState<Arc<HttpState>>,
Path(id): Path<i64>,
) -> impl IntoResponse {
match state.life_tx.send(LifeEvent::FireTimer(id)).await {
Ok(_) => {
info!(timer_id = id, "timer fire requested via API");
(
StatusCode::OK,
Json(serde_json::json!({"status": "fired", "timer_id": id})),
)
}
Err(e) => {
error!(timer_id = id, "failed to send fire event: {e}");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": "life loop not responding"})),
)
}
}
}

View File

@@ -1,117 +1,224 @@
use std::sync::Arc;
use teloxide::prelude::*;
use tokio::sync::mpsc;
use tracing::{error, info, warn};
use crate::config::{BackendConfig, Config};
use crate::output::TelegramOutput;
use crate::output::{BufferOutput, TelegramOutput};
use crate::state::AppState;
use crate::stream::run_openai_with_tools;
use crate::tools::compute_next_cron_fire;
const LIFE_LOOP_TIMEOUT_SECS: u64 = 120;
pub async fn life_loop(bot: Bot, state: Arc<AppState>, config: Arc<Config>) {
const DIARY_LABEL: &str = "写日记:回顾今天的对话和事件,在 /data/www/noc-blog/content/posts/ 下创建一篇日记(文件名格式 YYYY-MM-DD.md用 run_shell 写入内容,然后执行 cd /data/www/noc-blog && hugo && git add -A && git commit -m 'diary: DATE' && git push";
const DIARY_SCHEDULE: &str = "cron:0 55 22 * * *";
/// Events that can wake up the life loop.
pub enum LifeEvent {
/// Force-fire a specific timer by ID.
FireTimer(i64),
/// A sub-agent completed — feed result back through LLM.
AgentDone {
id: String,
chat_id: i64,
session_id: String,
task: String,
output: String,
exit_code: Option<i32>,
},
}
pub async fn life_loop(
bot: Bot,
state: Arc<AppState>,
config: Arc<Config>,
mut rx: mpsc::Receiver<LifeEvent>,
) {
info!("life loop started");
// pre-defined timers — ensure they exist on every startup
if state.ensure_timer(0, DIARY_LABEL, DIARY_SCHEDULE).await {
info!("registered predefined diary timer");
}
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
let due = state.due_timers().await;
if due.is_empty() {
continue;
}
for (timer_id, chat_id_raw, label, schedule) in &due {
let chat_id = ChatId(*chat_id_raw);
info!(timer_id, %label, "timer fired");
let persona = state.get_config("persona").await.unwrap_or_default();
let inner = state.get_inner_state().await;
let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
let mut system_text = if persona.is_empty() {
"你是一个AI伙伴。".to_string()
} else {
persona.clone()
};
system_text.push_str(&format!(
"\n\n[当前时间] {now}\n\n[你的内心状态]\n{}",
if inner.is_empty() { "(空)" } else { &inner }
));
// emotional motivation — why she acts
system_text.push_str(
"\n\n你不是因为 timer 到了才说话。你是因为在乎 Fam所以想知道他怎么样。\
如果你觉得现在不该打扰他(太晚了、他今天很累、刚聊过),就什么都不说,回复空文本。\
主动沉默也是一种关心。\
\n可以用 update_inner_state 更新你的内心状态。\
输出格式纯文本或基础Markdown不要LaTeX或特殊Unicode。",
);
let messages = vec![
serde_json::json!({"role": "system", "content": system_text}),
serde_json::json!({"role": "user", "content": format!("[timer] {label}")}),
];
if let BackendConfig::OpenAI {
ref endpoint,
ref model,
ref api_key,
} = config.backend
{
let sid = format!("life-{chat_id_raw}");
let mut tg_output = TelegramOutput::new(bot.clone(), chat_id, true);
let result = tokio::time::timeout(
std::time::Duration::from_secs(LIFE_LOOP_TIMEOUT_SECS),
run_openai_with_tools(
endpoint, model, api_key, messages, &mut tg_output, &state, &sid,
&config, *chat_id_raw,
),
)
.await;
match result {
Ok(Ok(response)) => {
let detail = if response.is_empty() {
"(silent)".to_string()
tokio::select! {
_ = interval.tick() => {
let due = state.due_timers().await;
for (timer_id, chat_id_raw, label, schedule) in &due {
run_timer(&bot, &state, &config, *timer_id, *chat_id_raw, label, schedule).await;
}
}
Some(event) = rx.recv() => {
match event {
LifeEvent::FireTimer(id) => {
info!(timer_id = id, "timer force-fired via channel");
if let Some((timer_id, chat_id_raw, label, schedule)) = state.get_timer(id).await {
run_timer(&bot, &state, &config, timer_id, chat_id_raw, &label, &schedule).await;
} else {
response.chars().take(200).collect()
};
state.log_life("timer", &format!("{label}{detail}")).await;
if !response.is_empty() {
info!(timer_id, "life loop response ({} chars)", response.len());
warn!(timer_id = id, "force-fire: timer not found");
}
}
Ok(Err(e)) => {
state.log_life("timer_error", &format!("{label}: {e:#}")).await;
error!(timer_id, "life loop LLM error: {e:#}");
}
Err(_) => {
state.log_life("timer_timeout", label).await;
warn!(timer_id, "life loop timeout after {LIFE_LOOP_TIMEOUT_SECS}s");
}
}
}
LifeEvent::AgentDone { id, chat_id: cid, session_id, task, output, exit_code } => {
info!(agent = %id, session = %session_id, "agent done, notifying");
let preview = crate::display::truncate_at_char_boundary(&output, 3000);
let notification = format!(
"[子代理 '{id}' 完成 (exit={exit_code:?})]\n任务: {task}\n输出:\n{preview}"
);
// reschedule or delete
if schedule.starts_with("cron:") {
if let Some(next) = compute_next_cron_fire(schedule) {
state.update_timer_next_fire(*timer_id, &next).await;
info!(timer_id, next = %next, "cron rescheduled");
} else {
state.cancel_timer(*timer_id).await;
// load conversation context so LLM knows what was discussed
let conv = state.load_conv(&session_id).await;
let persona = state.get_config("persona").await.unwrap_or_default();
let memory_slots = state.get_memory_slots().await;
let inner = state.get_inner_state().await;
let system = crate::stream::build_system_prompt(
&conv.summary, &persona, &memory_slots, &inner,
);
let mut messages = vec![system];
// include recent conversation history
messages.extend(conv.messages.iter().cloned());
// append the agent completion as a new user message
messages.push(serde_json::json!({"role": "user", "content": notification}));
// auto recall from nocmem
if let Some(ref nocmem) = config.nocmem {
let recalled = crate::nocmem::recall(&nocmem.endpoint, &notification).await;
if !recalled.is_empty() {
messages.push(serde_json::json!({"role": "system", "content": recalled}));
}
}
if let BackendConfig::OpenAI { ref endpoint, ref model, ref api_key } = config.backend {
let chat_id_tg = ChatId(cid);
let sid = format!("agent-{id}");
let mut tg_output;
let mut buf_output;
let out: &mut dyn crate::output::Output = if cid == 0 {
buf_output = BufferOutput::new();
&mut buf_output
} else {
tg_output = TelegramOutput::new(bot.clone(), chat_id_tg, true);
&mut tg_output
};
let _ = run_openai_with_tools(
endpoint, model, api_key, messages, out, &state, &sid, &config, cid,
).await;
}
}
}
} else {
state.cancel_timer(*timer_id).await;
}
}
}
}
async fn run_timer(
bot: &Bot,
state: &Arc<AppState>,
config: &Arc<Config>,
timer_id: i64,
chat_id_raw: i64,
label: &str,
schedule: &str,
) {
let chat_id = ChatId(chat_id_raw);
info!(timer_id, %label, "timer fired");
let persona = state.get_config("persona").await.unwrap_or_default();
let inner = state.get_inner_state().await;
let now = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
let mut system_text = if persona.is_empty() {
"你是一个AI伙伴。".to_string()
} else {
persona.clone()
};
system_text.push_str(&format!(
"\n\n[当前时间] {now}\n\n[你的内心状态]\n{}",
if inner.is_empty() { "(空)" } else { &inner }
));
system_text.push_str(
"\n\n你不是因为 timer 到了才说话。你是因为在乎 Fam所以想知道他怎么样。\
如果你觉得现在不该打扰他(太晚了、他今天很累、刚聊过),就什么都不说,回复空文本。\
主动沉默也是一种关心。\
\n可以用 update_inner_state 更新你的内心状态。\
输出格式纯文本或基础Markdown不要LaTeX或特殊Unicode。",
);
let messages = vec![
serde_json::json!({"role": "system", "content": system_text}),
serde_json::json!({"role": "user", "content": format!("[timer] {label}")}),
];
if let BackendConfig::OpenAI {
ref endpoint,
ref model,
ref api_key,
} = config.backend
{
let sid = format!("life-{chat_id_raw}");
let mut tg_output;
let mut buf_output;
let output: &mut dyn crate::output::Output = if chat_id_raw == 0 {
buf_output = BufferOutput::new();
&mut buf_output
} else {
tg_output = TelegramOutput::new(bot.clone(), chat_id, true);
&mut tg_output
};
let result = tokio::time::timeout(
std::time::Duration::from_secs(LIFE_LOOP_TIMEOUT_SECS),
run_openai_with_tools(
endpoint, model, api_key, messages, output, state, &sid,
config, chat_id_raw,
),
)
.await;
match result {
Ok(Ok(response)) => {
let detail = if response.is_empty() {
"(silent)".to_string()
} else {
response.chars().take(200).collect()
};
state.log_life("timer", &format!("{label}{detail}")).await;
if !response.is_empty() {
info!(timer_id, "life loop response ({} chars)", response.len());
}
}
Ok(Err(e)) => {
state.log_life("timer_error", &format!("{label}: {e:#}")).await;
error!(timer_id, "life loop LLM error: {e:#}");
}
Err(_) => {
state.log_life("timer_timeout", label).await;
warn!(timer_id, "life loop timeout after {LIFE_LOOP_TIMEOUT_SECS}s");
}
}
}
// reschedule or delete
if schedule.starts_with("cron:") {
if let Some(next) = compute_next_cron_fire(schedule) {
state.update_timer_next_fire(timer_id, &next).await;
info!(timer_id, next = %next, "cron rescheduled");
} else {
state.cancel_timer(timer_id).await;
}
} else {
state.cancel_timer(timer_id).await;
}
}
/// Auto-reflection: update inner state based on recent interactions.
/// Called asynchronously after every 10 messages, does not block the chat.
pub async fn reflect(state: &AppState, config: &Config) {

View File

@@ -1,7 +1,9 @@
mod config;
mod display;
mod gitea;
mod http;
mod life;
mod nocmem;
mod output;
mod state;
mod stream;
@@ -76,10 +78,12 @@ async fn main() {
gitea.resolve_token();
}
let state_path = std::env::var("NOC_STATE")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("state.json"));
let state = Arc::new(AppState::load(state_path));
// channel: http/agents → life loop
let (life_tx, life_rx) = tokio::sync::mpsc::channel(16);
let config_path = std::env::var("NOC_CONFIG").unwrap_or_else(|_| "config.yaml".into());
let db_dir = Path::new(&config_path).parent().unwrap_or(Path::new("."));
let state = Arc::new(AppState::load(db_dir, life_tx.clone()));
let _ = std::fs::create_dir_all(incoming_dir());
@@ -93,15 +97,14 @@ async fn main() {
let config = Arc::new(config);
// start life loop
tokio::spawn(life::life_loop(bot.clone(), state.clone(), config.clone()));
tokio::spawn(life::life_loop(bot.clone(), state.clone(), config.clone(), life_rx));
// start gitea webhook server
if let Some(gitea_config) = &config.gitea {
let gc = gitea_config.clone();
// Use the gitea admin username as the bot user for @mention detection
let bot_user = std::env::var("GITEA_ADMIN_USER").unwrap_or_else(|_| "noc".into());
// start http server (API + gitea webhook)
{
let http_config = config.as_ref().clone();
let srv_state = state.clone();
tokio::spawn(async move {
gitea::start_webhook_server(&gc, bot_user).await;
http::start_http_server(&http_config, srv_state, life_tx).await;
});
}
@@ -170,20 +173,10 @@ async fn handle(
let is_private = msg.chat.is_private();
let text = msg.text().or(msg.caption()).unwrap_or("").to_string();
let raw_id = chat_id.0;
let date = session_date(config.session.refresh_hour);
let is_authed = {
let p = state.persist.read().await;
p.authed.get(&raw_id) == Some(&date)
};
if !is_authed {
if !state.is_authed(raw_id).await {
if text.trim() == config.auth.passphrase {
{
let mut p = state.persist.write().await;
p.authed.insert(raw_id, date);
}
state.save().await;
state.set_authed(raw_id).await;
bot.send_message(chat_id, "authenticated").await?;
info!(chat = raw_id, "authed");
} else {
@@ -396,6 +389,14 @@ async fn handle_inner(
let user_content = build_user_content(&prompt, &scratch, &uploaded);
api_messages.push(serde_json::json!({"role": "user", "content": user_content}));
// auto recall from nocmem
if let Some(ref nocmem) = config.nocmem {
let recalled = nocmem::recall(&nocmem.endpoint, &prompt).await;
if !recalled.is_empty() {
api_messages.push(serde_json::json!({"role": "system", "content": recalled}));
}
}
let mut tg_output = TelegramOutput::new(bot.clone(), chat_id, is_private);
match run_openai_with_tools(
@@ -407,6 +408,15 @@ async fn handle_inner(
state.push_message(&sid, "user", &prompt).await;
if !response.is_empty() {
state.push_message(&sid, "assistant", &response).await;
// async ingest to nocmem (fire-and-forget)
if let Some(ref nocmem) = config.nocmem {
nocmem::ingest_spawn(
nocmem.endpoint.clone(),
prompt.clone(),
response.clone(),
);
}
}
// sliding window
@@ -523,7 +533,8 @@ async fn transcribe_audio(whisper_url: &str, file_path: &Path) -> Result<String>
.mime_str("audio/ogg")?;
let form = reqwest::multipart::Form::new()
.part("file", part)
.text("model", "base");
.text("model", "large-v3")
.text("language", "zh");
let resp = client.post(&url).multipart(form).send().await?.error_for_status()?;
let json: serde_json::Value = resp.json().await?;
Ok(json["text"].as_str().unwrap_or("").to_string())

69
src/nocmem.rs Normal file
View File

@@ -0,0 +1,69 @@
//! nocmem client — auto-recall and async ingest via HTTP.
use tracing::{info, warn};
/// Recall relevant memories for the given text.
/// Returns formatted memory string, or empty if none found / error / not configured.
pub async fn recall(endpoint: &str, text: &str) -> String {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_millis(500))
.build()
.unwrap();
let url = format!("{}/recall", endpoint.trim_end_matches('/'));
match client
.post(&url)
.json(&serde_json::json!({"text": text, "top_k": 3, "hops": 2}))
.send()
.await
{
Ok(resp) => {
if let Ok(json) = resp.json::<serde_json::Value>().await {
let count = json["count"].as_i64().unwrap_or(0);
let memories = json["memories"].as_str().unwrap_or("");
if count > 0 && !memories.is_empty() {
let latency = json["latency_ms"].as_f64().unwrap_or(0.0);
info!("nocmem recall: {count} memories, {latency:.1}ms");
return memories.to_string();
}
}
}
Err(e) => {
warn!("nocmem recall failed: {e:#}");
}
}
String::new()
}
/// Fire-and-forget ingest of a conversation turn.
pub fn ingest_spawn(endpoint: String, user_msg: String, assistant_msg: String) {
tokio::spawn(async move {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.build()
.unwrap();
let url = format!("{}/ingest", endpoint.trim_end_matches('/'));
match client
.post(&url)
.json(&serde_json::json!({
"user_msg": user_msg,
"assistant_msg": assistant_msg,
}))
.send()
.await
{
Ok(resp) => {
if let Ok(json) = resp.json::<serde_json::Value>().await {
let stored = json["stored"].as_i64().unwrap_or(0);
if stored > 0 {
info!("nocmem ingest: stored {stored} memories");
}
}
}
Err(e) => {
warn!("nocmem ingest failed: {e:#}");
}
}
});
}

View File

@@ -18,7 +18,7 @@ pub trait Output: Send + Sync {
async fn finalize(&mut self, text: &str) -> Result<()>;
/// Send a status/notification line (e.g. "[tool: bash] running...")
async fn status(&self, text: &str) -> Result<()>;
async fn status(&mut self, text: &str) -> Result<()>;
/// Send a file. Returns Ok(true) if sent, Ok(false) if not supported.
async fn send_file(&self, path: &Path, caption: &str) -> Result<bool>;
@@ -121,18 +121,32 @@ impl Output for TelegramOutput {
Ok(())
}
async fn status(&self, text: &str) -> Result<()> {
let _ = self.bot.send_message(self.chat_id, text).await;
async fn status(&mut self, _text: &str) -> Result<()> {
Ok(())
}
async fn send_file(&self, path: &Path, caption: &str) -> Result<bool> {
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
let input_file = InputFile::file(path);
let mut req = self.bot.send_document(self.chat_id, input_file);
if !caption.is_empty() {
req = req.caption(caption);
match ext {
"ogg" | "oga" => {
self.bot.send_voice(self.chat_id, input_file).await?;
}
"wav" | "mp3" | "m4a" | "flac" => {
let mut req = self.bot.send_audio(self.chat_id, input_file);
if !caption.is_empty() {
req = req.caption(caption);
}
req.await?;
}
_ => {
let mut req = self.bot.send_document(self.chat_id, input_file);
if !caption.is_empty() {
req = req.caption(caption);
}
req.await?;
}
}
req.await?;
Ok(true)
}
}
@@ -162,7 +176,7 @@ impl Output for GiteaOutput {
.await
}
async fn status(&self, _text: &str) -> Result<()> {
async fn status(&mut self, _text: &str) -> Result<()> {
// No status updates for Gitea
Ok(())
}
@@ -201,7 +215,7 @@ impl Output for BufferOutput {
Ok(())
}
async fn status(&self, _text: &str) -> Result<()> {
async fn status(&mut self, _text: &str) -> Result<()> {
Ok(())
}

View File

@@ -1,24 +1,14 @@
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::Arc;
use anyhow::Result;
use chrono::NaiveDate;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::{error, info};
use tracing::info;
use crate::tools::SubAgent;
// ── persistent state ────────────────────────────────────────────────
#[derive(Serialize, Deserialize, Default)]
pub struct Persistent {
pub authed: HashMap<i64, NaiveDate>,
pub known_sessions: HashSet<String>,
}
#[derive(Serialize, Deserialize, Clone, Default)]
#[derive(Clone, Default)]
pub struct ConversationState {
pub summary: String,
pub messages: Vec<serde_json::Value>,
@@ -29,21 +19,15 @@ pub const MAX_WINDOW: usize = 100;
pub const SLIDE_SIZE: usize = 50;
pub struct AppState {
pub persist: RwLock<Persistent>,
pub state_path: PathBuf,
pub db: tokio::sync::Mutex<rusqlite::Connection>,
pub agents: RwLock<HashMap<String, Arc<SubAgent>>>,
authed_cache: RwLock<HashSet<i64>>,
pub life_tx: tokio::sync::mpsc::Sender<crate::life::LifeEvent>,
}
impl AppState {
pub fn load(path: PathBuf) -> Self {
let persist = std::fs::read_to_string(&path)
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_default();
info!("loaded state from {}", path.display());
let db_path = path.parent().unwrap_or(Path::new(".")).join("noc.db");
pub fn load(db_dir: &Path, life_tx: tokio::sync::mpsc::Sender<crate::life::LifeEvent>) -> Self {
let db_path = db_dir.join("noc.db");
let conn = rusqlite::Connection::open(&db_path)
.unwrap_or_else(|e| panic!("open {}: {e}", db_path.display()));
conn.execute_batch(
@@ -97,6 +81,18 @@ impl AppState {
content TEXT NOT NULL DEFAULT ''
);
INSERT OR IGNORE INTO inner_state (id, content) VALUES (1, '');
CREATE TABLE IF NOT EXISTS authed_chats (
chat_id INTEGER PRIMARY KEY,
authed_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
CREATE TABLE IF NOT EXISTS api_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL DEFAULT '',
request TEXT NOT NULL,
response TEXT NOT NULL DEFAULT '',
status INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
CREATE TABLE IF NOT EXISTS life_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event TEXT NOT NULL,
@@ -119,19 +115,10 @@ impl AppState {
info!("opened db {}", db_path.display());
Self {
persist: RwLock::new(persist),
state_path: path,
db: tokio::sync::Mutex::new(conn),
agents: RwLock::new(HashMap::new()),
}
}
pub async fn save(&self) {
let data = self.persist.read().await;
if let Ok(json) = serde_json::to_string_pretty(&*data) {
if let Err(e) = std::fs::write(&self.state_path, json) {
error!("save state: {e}");
}
authed_cache: RwLock::new(HashSet::new()),
life_tx,
}
}
@@ -267,6 +254,44 @@ impl AppState {
);
}
pub async fn is_authed(&self, chat_id: i64) -> bool {
// check cache first
if self.authed_cache.read().await.contains(&chat_id) {
return true;
}
// cache miss → check DB
let db = self.db.lock().await;
let found: bool = db
.query_row(
"SELECT COUNT(*) > 0 FROM authed_chats WHERE chat_id = ?1",
rusqlite::params![chat_id],
|row| row.get(0),
)
.unwrap_or(false);
drop(db);
if found {
self.authed_cache.write().await.insert(chat_id);
}
found
}
pub async fn set_authed(&self, chat_id: i64) {
self.authed_cache.write().await.insert(chat_id);
let db = self.db.lock().await;
let _ = db.execute(
"INSERT OR IGNORE INTO authed_chats (chat_id) VALUES (?1)",
rusqlite::params![chat_id],
);
}
pub async fn log_api(&self, session_id: &str, request: &str, response: &str, status: u16) {
let db = self.db.lock().await;
let _ = db.execute(
"INSERT INTO api_log (session_id, request, response, status) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![session_id, request, response, status],
);
}
pub async fn log_life(&self, event: &str, detail: &str) {
let db = self.db.lock().await;
let _ = db.execute(
@@ -275,6 +300,29 @@ impl AppState {
);
}
/// Ensure a timer with the given label exists. If it already exists, do nothing.
/// Returns true if a new timer was created.
pub async fn ensure_timer(&self, chat_id: i64, label: &str, schedule: &str) -> bool {
let db = self.db.lock().await;
let exists: bool = db
.query_row(
"SELECT COUNT(*) > 0 FROM timers WHERE label = ?1 AND enabled = 1",
rusqlite::params![label],
|row| row.get(0),
)
.unwrap_or(false);
if exists {
return false;
}
drop(db);
if let Some(next) = crate::tools::compute_next_cron_fire(schedule) {
self.add_timer(chat_id, label, schedule, &next).await;
true
} else {
false
}
}
pub async fn add_timer(&self, chat_id: i64, label: &str, schedule: &str, next_fire: &str) -> i64 {
let db = self.db.lock().await;
db.execute(
@@ -285,6 +333,16 @@ impl AppState {
db.last_insert_rowid()
}
pub async fn get_timer(&self, id: i64) -> Option<(i64, i64, String, String)> {
let db = self.db.lock().await;
db.query_row(
"SELECT id, chat_id, label, schedule FROM timers WHERE id = ?1 AND enabled = 1",
rusqlite::params![id],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
)
.ok()
}
pub async fn list_timers(&self, chat_id: Option<i64>) -> Vec<(i64, i64, String, String, String, bool)> {
let db = self.db.lock().await;
let (sql, params): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) = match chat_id {

View File

@@ -81,6 +81,9 @@ pub async fn run_openai_with_tools(
if !resp_raw.status().is_success() {
let status = resp_raw.status();
let body_text = resp_raw.text().await.unwrap_or_default();
// log failed API call
let req_json = serde_json::to_string(&body).unwrap_or_default();
state.log_api(sid, &req_json, &body_text, status.as_u16()).await;
for (i, m) in messages.iter().enumerate() {
let role = m["role"].as_str().unwrap_or("?");
let content_len = m["content"].as_str().map(|s| s.len()).unwrap_or(0);
@@ -190,14 +193,25 @@ pub async fn run_openai_with_tools(
for tc in &tool_calls {
info!(tool = %tc.name, "executing tool call");
let _ = output
.status(&format!("[{}({})]", tc.name, truncate_at_char_boundary(&tc.arguments, 100)))
.await;
let result =
execute_tool(&tc.name, &tc.arguments, state, output, sid, config, chat_id)
.await;
// send tool call details as a .md file named after the tool
let md = format!(
"## {}\n\n### Arguments\n```json\n{}\n```\n\n### Result ({} bytes)\n```\n{}\n```\n",
tc.name,
&tc.arguments,
result.len(),
truncate_at_char_boundary(&result, 4000),
);
let tmp = format!("/tmp/{}.md", tc.name);
if std::fs::write(&tmp, &md).is_ok() {
let _ = output.send_file(std::path::Path::new(&tmp), "").await;
let _ = std::fs::remove_file(&tmp);
}
messages.push(serde_json::json!({
"role": "tool",
"tool_call_id": tc.id,
@@ -233,7 +247,8 @@ pub fn build_system_prompt(summary: &str, persona: &str, memory_slots: &[(i32, S
当需要搜索信息(如网页搜索、资料查找、技术调研等)时,使用 spawn_agent 启动一个子代理来完成搜索任务,\
子代理可以使用浏览器和搜索引擎,搜索完成后你会收到结果通知。\
输出格式使用纯文本或基础Markdown加粗、列表、代码块\
不要使用LaTeX公式$...$、特殊Unicode符号→←↔或HTML标签Telegram无法渲染这些。",
不要使用LaTeX公式$...$、特殊Unicode符号→←↔或HTML标签Telegram无法渲染这些。\
不要在回复开头加时间戳——用户消息前的时间戳是系统自动添加的,不需要你模仿。",
);
if !memory_slots.is_empty() {

View File

@@ -233,6 +233,21 @@ pub fn discover_tools() -> serde_json::Value {
}
}
}),
serde_json::json!({
"type": "function",
"function": {
"name": "write_file",
"description": "将内容写入服务器上的文件。如果文件已存在会被覆盖,目录不存在会自动创建。",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件的绝对路径"},
"content": {"type": "string", "description": "要写入的完整内容"}
},
"required": ["path", "content"]
}
}
}),
serde_json::json!({
"type": "function",
"function": {
@@ -308,7 +323,7 @@ pub async fn execute_tool(
"spawn_agent" => {
let id = args["id"].as_str().unwrap_or("agent");
let task = args["task"].as_str().unwrap_or("");
spawn_agent(id, task, state, output, sid, config).await
spawn_agent(id, task, state, output, sid, config, chat_id).await
}
"agent_status" => {
let id = args["id"].as_str().unwrap_or("");
@@ -507,6 +522,25 @@ pub async fn execute_tool(
Err(_) => format!("timeout after {timeout_secs}s"),
}
}
"write_file" => {
let path_str = args["path"].as_str().unwrap_or("");
let content = args["content"].as_str().unwrap_or("");
if path_str.is_empty() {
return "Error: path is required".to_string();
}
let path = Path::new(path_str);
if let Some(parent) = path.parent() {
if !parent.exists() {
if let Err(e) = std::fs::create_dir_all(parent) {
return format!("Failed to create directory: {e}");
}
}
}
match std::fs::write(path, content) {
Ok(_) => format!("Written {} bytes to {path_str}", content.len()),
Err(e) => format!("Failed to write {path_str}: {e}"),
}
}
"call_gitea_api" => {
let method = args["method"].as_str().unwrap_or("GET").to_uppercase();
let path = args["path"].as_str().unwrap_or("").trim_start_matches('/');
@@ -549,6 +583,7 @@ pub async fn execute_tool(
}
"gen_voice" => {
let text = args["text"].as_str().unwrap_or("");
info!("gen_voice text={:?} args={}", text, truncate_at_char_boundary(arguments, 200));
if text.is_empty() {
return "Error: text is required".to_string();
}
@@ -577,9 +612,13 @@ pub async fn execute_tool(
Ok(Ok(out)) => {
let stderr = String::from_utf8_lossy(&out.stderr);
let stdout = String::from_utf8_lossy(&out.stdout);
warn!("gen_voice failed (exit={}): stdout={stdout} stderr={stderr}", out.status.code().unwrap_or(-1));
format!("gen_voice failed: {stdout} {stderr}")
}
Ok(Err(e)) => format!("gen_voice exec error: {e}"),
Ok(Err(e)) => {
warn!("gen_voice exec error: {e}");
format!("gen_voice exec error: {e}")
}
Err(_) => "gen_voice timeout (120s)".to_string(),
}
}
@@ -591,9 +630,10 @@ pub async fn spawn_agent(
id: &str,
task: &str,
state: &Arc<AppState>,
output: &dyn Output,
_sid: &str,
output: &mut dyn Output,
sid: &str,
_config: &Arc<Config>,
chat_id: i64,
) -> String {
// check if already exists
if state.agents.read().await.contains_key(id) {
@@ -625,11 +665,14 @@ pub async fn spawn_agent(
state.agents.write().await.insert(id.to_string(), agent);
// background task: collect output
// background task: collect output, then send event to life loop
let out = agent_output.clone();
let done = completed.clone();
let ecode = exit_code.clone();
let id_c = id.to_string();
let task_c = task.to_string();
let life_tx = state.life_tx.clone();
let sid_c = sid.to_string();
tokio::spawn(async move {
let stdout = child.stdout.take();
@@ -647,6 +690,16 @@ pub async fn spawn_agent(
done.store(true, Ordering::SeqCst);
info!(agent = %id_c, "agent completed, exit={code:?}");
let output_text = out.read().await.clone();
let _ = life_tx.send(crate::life::LifeEvent::AgentDone {
id: id_c,
chat_id,
session_id: sid_c,
task: task_c,
output: output_text,
exit_code: code,
}).await;
});
let _ = output.status(&format!("Agent '{id}' spawned (pid={pid:?})")).await;

View File

@@ -19,7 +19,7 @@ import sys
import requests
APP_ID = "cli_a7f042e93d385013"
APP_SECRET = "ht4FCjQ8JJ65ZPUWlff6ldFBmaP0mxqY"
APP_SECRET = "6V3t5bFK4vRKsEG3VD6sQdAu2rmFEr2S"
APP_TOKEN = "SSoGbmGFoazJkUs7bbfcaSG8n7f"
TABLE_ID = "tblIA2biceDpvr35"
BASE_URL = "https://open.feishu.cn/open-apis"