add nocmem: auto memory recall + ingest via NuoNuo hippocampal network

- nocmem Python service (mem/): FastAPI wrapper around NuoNuo's
  Hopfield-Hebbian memory, with /recall, /ingest, /store, /stats endpoints
- NOC integration: auto recall after user message (injected as system msg),
  async ingest after LLM response (fire-and-forget)
- Recall: cosine pre-filter (threshold 0.35) + Hopfield attention (β=32),
  top_k=3, KV-cache friendly (appended after user msg, not in system prompt)
- Ingest: LLM extraction + paraphrase augmentation, heuristic fallback
- Wired into main.rs, life.rs (agent done), http.rs (api chat)
- Config: optional `nocmem.endpoint` in config.yaml
- Includes benchmarks: LongMemEval (R@5=94.0%), efficiency, noise vs scale
- Design doc: doc/nocmem.md
This commit is contained in:
Fam Zheng
2026-04-11 12:24:48 +01:00
parent 688387dac3
commit 7000ccda0f
17 changed files with 4164 additions and 3 deletions

1
.gitignore vendored
View File

@@ -8,3 +8,4 @@ target/
data/
noc.service
tools/manage_todo
mem/benchmarks/longmemeval.json

277
doc/nocmem.md Normal file
View File

@@ -0,0 +1,277 @@
# nocmem — NOC 自动记忆系统
## 动机
NOC 现有记忆100 个文本槽位200 字符/槽)+ 滑动窗口摘要。全部塞在 system prompt 里,每次对话都带着。
问题:
- 没有语义检索,无关记忆浪费 token
- 槽位容量有限,不可扩展
- 没有联想能力A 提到 → 想起 B → 引出 C
nocmem 用 NuoNuo 的 Hopfield-Hebbian 混合记忆网络替代朴素文本槽位,实现**自动召回**和**自动存储**。
## 核心技术
### NuoNuo Hippocampal Memory
生物启发的双层记忆架构(详见 `../nuonuo/doc/architecture.md`
**Layer 1 — Hopfield单跳噪声容忍**
存储 (cue, target) embedding 对。召回时两阶段:
1. **NN 预过滤**cosine similarity 找 top-K 候选K=20
2. **Hopfield settle**:β-scaled softmax attention 迭代收敛3 步)
关键特性:**paraphrase 容忍** — 用户换一种说法问同样的事,照样能召回。通过存储 cue variants同一条记忆的多种表述实现attention 按 memory_id 聚合。
**Layer 2 — Hebbian多跳联想链**
WTA pattern separation384D → 16384D 稀疏码k=50稀疏度 0.3%+ 外积权重矩阵 W。
Hopfield 找到起点后Hebbian 通过 `W @ code` 沿关联链前进A → B → C。
这是传统 RAG 做不到的——向量搜索只能找"相似"Hebbian 能找"相关但不相似"的东西。
**性能指标**
| 指标 | 数值 |
|------|------|
| Paraphrase recall+augmentation, 2K bg | 95-100% |
| Multi-hop3 hops, 500 bg | 100% |
| Scale20K memories, no augmentation | 80% |
| Recall 延迟 @ 20K | 4ms |
| VRAM | ~1 GB |
### Embedding
使用 `all-MiniLM-L6-v2`384 维CPU/GPU 均可。选择理由:
- NuoNuo 实验P1验证**gap metric相关与不相关的分数差比绝对相似度更重要**
- MiniLM 在 gap metric 上优于 BGE-large 等更大模型
- 推理快GPU ~1msCPU ~10ms per query
### 记忆提取
对话结束后,用 LLM 从 (user_msg, assistant_msg) 中提取 (cue, target, importance) 三元组:
- **cue**:什么情况下应该回忆起这条记忆(触发短语)
- **target**:记忆内容本身
- **importance**0-1 重要度评分
LLM 不可用时回退到 heuristic问答模式检测 + 技术关键词匹配)。
提取后LLM 为每个 cue 生成 3 个 paraphrase作为 cue_variants 存入,提升召回鲁棒性。
## 架构
```
┌─────────────┐
│ Telegram │
│ User │
└──────┬───────┘
│ message
┌─────────────┐
│ NOC │
│ (Rust) │
│ │
│ 1. 收到 user │
│ message │
│ │
│ 2. HTTP POST ├──────────────────┐
│ /recall │ │
│ │ ▼
│ │ ┌─────────────────┐
│ │ │ nocmem │
│ │ │ (Python) │
│ │ │ │
│ │ │ embed(query) │
│ │◄────────┤ hippocampus │
│ recalled │ │ .recall() │
│ memories │ │ format results │
│ │ └─────────────────┘
│ 3. 构建 messages:
│ [...history,
│ user_msg,
│ {role:system,
│ recalled memories}]
│ │
│ 4. 调 LLM │
│ (stream) │
│ │
│ 5. 得到 │
│ response │
│ │
│ 6. 异步 POST ├──────────────────┐
│ /ingest │ │
│ │ ▼
│ │ ┌─────────────────┐
│ │ │ nocmem │
│ │ │ │
│ │ │ LLM extract │
│ │ │ embed + store │
│ │ │ save checkpoint │
│ │ └─────────────────┘
│ 7. 回复用户 │
└──────────────┘
```
## 消息注入策略
**关键设计**recalled memories 注入在 user message **之后**,作为独立的 system message。
```json
[
{"role": "system", "content": "persona + memory_slots + ..."}, // 不变
{"role": "user", "content": "历史消息1"}, // 历史
{"role": "assistant", "content": "历史回复1"},
...
{"role": "user", "content": "当前用户消息"}, // 当前轮
{"role": "system", "content": "[相关记忆]\n- 记忆1\n- 记忆2"} // ← nocmem 注入
]
```
为什么不放 system prompt 里?
**KV cache 友好**。System prompt 是所有对话共享的前缀,如果每条消息都改 system prompt 的内容(注入不同的 recalled memories整个 KV cache 前缀失效,前面几千 token 全部重算。
放在 user message 之后前缀system prompt + 历史消息 + 当前 user message保持稳定只有尾部的 recalled memories 是变化的KV cache 命中率最大化。
**临时性**。Recalled memories 不持久化到对话历史数据库。每轮对话独立召回,下一轮消息进来时重新召回当时相关的记忆。这避免了历史消息中堆积大量冗余的记忆注入。
## HTTP API
### POST /recall
请求:
```json
{"text": "数据库最近是不是很慢"}
```
响应:
```json
{
"memories": "[相关记忆]\n- 上次数据库慢是因为缺少索引 (hop=1)\n- PostgreSQL 跑在 5432 端口 (hop=2)",
"count": 2
}
```
- 如果没有相关记忆,返回 `{"memories": "", "count": 0}`
- NOC 检查 count > 0 才注入,避免空消息
### POST /ingest
请求:
```json
{
"user_msg": "帮我看看数据库为什么慢",
"assistant_msg": "检查了一下,是 users 表缺少 email 字段的索引..."
}
```
响应:
```json
{"stored": 2}
```
- fire-and-forgetNOC 不等响应
- 内部流程LLM 提取 → embed → generate paraphrases → store → save checkpoint
### GET /stats
```json
{
"num_memories": 1234,
"num_cue_entries": 4500,
"augmentation_ratio": 3.6,
"vram_mb": 1024,
"embedding_model": "all-MiniLM-L6-v2"
}
```
## NOC 侧改动
### config.yaml
```yaml
nocmem:
endpoint: "http://127.0.0.1:9820"
```
### Rust 改动(最小化)
**`config.rs`**:加一个可选字段
```rust
#[serde(default)]
pub nocmem: Option<NocmemConfig>,
#[derive(Deserialize, Clone)]
pub struct NocmemConfig {
pub endpoint: String,
}
```
**`main.rs`**(主消息处理路径):
`api_messages.push(user_msg)` 之后、`run_openai_with_tools` 之前:
```rust
// auto recall from nocmem
if let Some(ref nocmem) = config.nocmem {
if let Ok(recalled) = nocmem_recall(&nocmem.endpoint, &prompt).await {
if !recalled.is_empty() {
api_messages.push(serde_json::json!({
"role": "system",
"content": recalled
}));
}
}
}
```
在 LLM 回复之后(`push_message` 之后):
```rust
// async ingest to nocmem (fire-and-forget)
if let Some(ref nocmem) = config.nocmem {
let endpoint = nocmem.endpoint.clone();
let u = prompt.clone();
let a = response.clone();
tokio::spawn(async move {
let _ = nocmem_ingest(&endpoint, &u, &a).await;
});
}
```
`nocmem_recall``nocmem_ingest` 是两个简单的 HTTP 调用函数。recall 设 500ms 超时(失败就跳过,不影响正常对话)。
### 同步覆盖的调用点
| 位置 | 场景 | recall | ingest |
|------|------|--------|--------|
| `main.rs` handle_message | 用户聊天 | ✅ | ✅ |
| `life.rs` AgentDone | 子代理完成通知 | ✅ | ❌ |
| `life.rs` run_timer | 定时器触发 | ❌ | ❌ |
| `http.rs` api_chat | HTTP API 聊天 | ✅ | ✅ |
| `gitea.rs` | Gitea webhook | ❌ | ❌ |
## 部署
nocmem 作为独立 Python 服务运行:
```bash
cd /data/src/noc/mem
uv run uvicorn server:app --host 127.0.0.1 --port 9820
```
可配 systemd 管理。checkpoint 持久化到 `./data/hippocampus.pt`(相对于 mem 目录)。
## 未来方向
- **重要度衰减**:长期不被召回的记忆自动降权
- **矛盾检测**:新记忆与旧记忆冲突时自动替换
- **记忆整合sleep consolidation**:定期合并碎片记忆为更紧凑的表示
- **和 memory slot 融合**:逐步迁移 slot 内容到 nocmem最终淘汰 slot 系统

View File

@@ -0,0 +1,345 @@
"""Efficiency benchmark for nocmem vs ChromaDB baseline.
Measures: storage size, memory usage, query latency, ingest throughput
at various scales (100, 1K, 5K, 10K, 20K memories).
Usage:
uv run python benchmarks/efficiency_bench.py
"""
import gc
import os
import json
import shutil
import tempfile
import time
import torch
import psutil
from sentence_transformers import SentenceTransformer
from nuonuo.hippocampus import HippocampalMemory
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
EMBED_MODEL = "all-MiniLM-L6-v2"
EMBED_DIM = 384
DATA_FILE = "benchmarks/longmemeval.json"
# ── helpers ─────────────────────────────────────────────────────────
def get_process_mem_mb():
return psutil.Process(os.getpid()).memory_info().rss / 1024**2
def get_gpu_mem_mb():
if DEVICE != "cuda":
return 0.0
return torch.cuda.memory_allocated() / 1024**2
def file_size_mb(path):
if os.path.exists(path):
return os.path.getsize(path) / 1024**2
return 0.0
def dir_size_mb(path):
total = 0
for dirpath, _, filenames in os.walk(path):
for f in filenames:
total += os.path.getsize(os.path.join(dirpath, f))
return total / 1024**2
# ── extract chunks from LongMemEval ────────────────────────────────
def load_chunks(max_chunks=25000):
"""Extract turn-level chunks from LongMemEval data."""
with open(DATA_FILE) as f:
data = json.load(f)
chunks = []
seen = set()
for item in data:
for sid, sess in zip(item["haystack_session_ids"], item["haystack_sessions"]):
for i in range(0, len(sess) - 1, 2):
key = (sid, i)
if key in seen:
continue
seen.add(key)
user = sess[i]["content"]
asst = sess[i + 1]["content"] if i + 1 < len(sess) else ""
text = f"{user}\n{asst}"[:1000]
chunks.append(text)
if len(chunks) >= max_chunks:
return chunks
return chunks
# ── nocmem benchmark ────────────────────────────────────────────────
def bench_nocmem(encoder, chunks, n, query_texts):
"""Benchmark nocmem at scale n."""
torch.cuda.empty_cache()
gc.collect()
subset = chunks[:n]
gpu_before = get_gpu_mem_mb()
ram_before = get_process_mem_mb()
# batch embed
t0 = time.monotonic()
embeddings = encoder.encode(
subset, convert_to_tensor=True, normalize_embeddings=True,
device=DEVICE, batch_size=256, show_progress_bar=False,
)
embed_time = time.monotonic() - t0
# store
hip = HippocampalMemory(embed_dim=EMBED_DIM, device=DEVICE)
t1 = time.monotonic()
for i in range(n):
hip.store(embeddings[i], embeddings[i], metadata={"id": i})
store_time = time.monotonic() - t1
gpu_after = get_gpu_mem_mb()
ram_after = get_process_mem_mb()
# save to measure file size
tmp = tempfile.mktemp(suffix=".pt")
hip.save(tmp)
disk_mb = file_size_mb(tmp)
os.unlink(tmp)
# query latency — multiple queries, measure p50/p99
query_embs = encoder.encode(
query_texts, convert_to_tensor=True, normalize_embeddings=True,
device=DEVICE, show_progress_bar=False,
)
latencies = []
for qe in query_embs:
t = time.monotonic()
hip.recall(qe, top_k=5)
latencies.append((time.monotonic() - t) * 1000)
latencies.sort()
p50 = latencies[len(latencies) // 2]
p99 = latencies[int(len(latencies) * 0.99)]
avg = sum(latencies) / len(latencies)
# cleanup
del hip, embeddings
torch.cuda.empty_cache()
return {
"n": n,
"embed_time_s": embed_time,
"store_time_s": store_time,
"ingest_rate": n / (embed_time + store_time), # memories/sec
"disk_mb": disk_mb,
"gpu_delta_mb": gpu_after - gpu_before,
"ram_delta_mb": ram_after - ram_before,
"latency_avg_ms": avg,
"latency_p50_ms": p50,
"latency_p99_ms": p99,
}
# ── chromadb benchmark ──────────────────────────────────────────────
def bench_chromadb(encoder, chunks, n, query_texts):
"""Benchmark ChromaDB (MemPalace's backend) at scale n."""
import chromadb
subset = chunks[:n]
ram_before = get_process_mem_mb()
tmpdir = tempfile.mkdtemp()
client = chromadb.PersistentClient(path=tmpdir)
collection = client.create_collection(
name="bench",
metadata={"hnsw:space": "cosine"},
)
# embed
t0 = time.monotonic()
embeddings_np = encoder.encode(
subset, normalize_embeddings=True,
batch_size=256, show_progress_bar=False,
)
embed_time = time.monotonic() - t0
# store — chromadb takes numpy/list
t1 = time.monotonic()
batch = 5000
for start in range(0, n, batch):
end = min(start + batch, n)
collection.add(
ids=[str(i) for i in range(start, end)],
embeddings=embeddings_np[start:end].tolist(),
documents=subset[start:end],
)
store_time = time.monotonic() - t1
ram_after = get_process_mem_mb()
disk_mb = dir_size_mb(tmpdir)
# query latency
query_np = encoder.encode(
query_texts, normalize_embeddings=True, show_progress_bar=False,
)
latencies = []
for qe in query_np:
t = time.monotonic()
collection.query(query_embeddings=[qe.tolist()], n_results=5)
latencies.append((time.monotonic() - t) * 1000)
latencies.sort()
p50 = latencies[len(latencies) // 2]
p99 = latencies[int(len(latencies) * 0.99)]
avg = sum(latencies) / len(latencies)
# cleanup
del client, collection
shutil.rmtree(tmpdir)
return {
"n": n,
"embed_time_s": embed_time,
"store_time_s": store_time,
"ingest_rate": n / (embed_time + store_time),
"disk_mb": disk_mb,
"gpu_delta_mb": 0,
"ram_delta_mb": ram_after - ram_before,
"latency_avg_ms": avg,
"latency_p50_ms": p50,
"latency_p99_ms": p99,
}
# ── main ────────────────────────────────────────────────────────────
def main():
print("nocmem efficiency benchmark")
print(f"device: {DEVICE}")
print()
# check chromadb available
has_chromadb = False
try:
import chromadb
has_chromadb = True
print("chromadb: available (will compare)")
except ImportError:
print("chromadb: not installed (nocmem only)")
print()
print("loading data...")
chunks = load_chunks(25000)
print(f" {len(chunks)} unique chunks extracted")
print("loading encoder...")
encoder = SentenceTransformer(EMBED_MODEL, device=DEVICE)
# query texts — mix of English and Chinese
query_texts = [
"What degree did I graduate with?",
"How to deploy the application?",
"What was the database error we fixed last week?",
"Tell me about the meeting schedule",
"What programming language should I learn?",
"数据库密码在哪里",
"部署到生产环境的步骤",
"上次讨论的性能优化方案",
"项目的技术栈是什么",
"最近的待办事项有哪些",
"How do I configure the server?",
"What's the API endpoint for user authentication?",
"Can you recommend some books on machine learning?",
"What was the root cause of the production incident?",
"How much memory does the GPU have?",
"VR设备的兼容性问题",
"模型推理的延迟是多少",
"代码仓库的结构是怎样的",
"如何解决内存泄漏",
"上次会议的结论是什么",
]
scales = [100, 500, 1000, 5000, 10000, 20000]
# filter to what we have
scales = [s for s in scales if s <= len(chunks)]
nocmem_results = []
chroma_results = []
for n in scales:
print(f"\n── scale: {n:,} memories ──")
print(f" nocmem...", end="", flush=True)
r = bench_nocmem(encoder, chunks, n, query_texts)
nocmem_results.append(r)
print(f" done (R: {r['latency_avg_ms']:.1f}ms, disk: {r['disk_mb']:.1f}MB)")
if has_chromadb:
print(f" chromadb...", end="", flush=True)
r2 = bench_chromadb(encoder, chunks, n, query_texts)
chroma_results.append(r2)
print(f" done (R: {r2['latency_avg_ms']:.1f}ms, disk: {r2['disk_mb']:.1f}MB)")
# ── report ──────────────────────────────────────────────────────
print(f"\n{'='*80}")
print(f"EFFICIENCY BENCHMARK RESULTS")
print(f"{'='*80}")
# table header
if has_chromadb:
print(f"\n{'Scale':>8} | {'--- nocmem ---':^40} | {'--- ChromaDB ---':^40}")
print(f"{'':>8} | {'Latency':>8} {'p99':>8} {'Disk':>8} {'VRAM':>8} {'Rate':>8} | {'Latency':>8} {'p99':>8} {'Disk':>8} {'RAM':>8} {'Rate':>8}")
print(f"{'':>8} | {'(ms)':>8} {'(ms)':>8} {'(MB)':>8} {'(MB)':>8} {'(/s)':>8} | {'(ms)':>8} {'(ms)':>8} {'(MB)':>8} {'(MB)':>8} {'(/s)':>8}")
print("-" * 100)
for nm, cr in zip(nocmem_results, chroma_results):
print(
f"{nm['n']:>8,} | "
f"{nm['latency_avg_ms']:>8.1f} {nm['latency_p99_ms']:>8.1f} {nm['disk_mb']:>8.1f} {nm['gpu_delta_mb']:>8.1f} {nm['ingest_rate']:>8.0f} | "
f"{cr['latency_avg_ms']:>8.1f} {cr['latency_p99_ms']:>8.1f} {cr['disk_mb']:>8.1f} {cr['ram_delta_mb']:>8.1f} {cr['ingest_rate']:>8.0f}"
)
else:
print(f"\n{'Scale':>8} | {'Latency':>8} {'p99':>8} {'Disk':>8} {'VRAM':>8} {'Ingest':>8}")
print(f"{'':>8} | {'(ms)':>8} {'(ms)':>8} {'(MB)':>8} {'(MB)':>8} {'(/s)':>8}")
print("-" * 60)
for nm in nocmem_results:
print(
f"{nm['n']:>8,} | "
f"{nm['latency_avg_ms']:>8.1f} {nm['latency_p99_ms']:>8.1f} {nm['disk_mb']:>8.1f} {nm['gpu_delta_mb']:>8.1f} {nm['ingest_rate']:>8.0f}"
)
# summary
if nocmem_results:
biggest = nocmem_results[-1]
print(f"\nnocmem @ {biggest['n']:,}:")
print(f" Query latency: avg {biggest['latency_avg_ms']:.1f}ms, p99 {biggest['latency_p99_ms']:.1f}ms")
print(f" Disk: {biggest['disk_mb']:.1f} MB")
print(f" VRAM delta: {biggest['gpu_delta_mb']:.1f} MB")
print(f" Ingest rate: {biggest['ingest_rate']:.0f} memories/sec")
if chroma_results:
biggest = chroma_results[-1]
print(f"\nChromaDB @ {biggest['n']:,}:")
print(f" Query latency: avg {biggest['latency_avg_ms']:.1f}ms, p99 {biggest['latency_p99_ms']:.1f}ms")
print(f" Disk: {biggest['disk_mb']:.1f} MB")
print(f" RAM delta: {biggest['ram_delta_mb']:.1f} MB")
print(f" Ingest rate: {biggest['ingest_rate']:.0f} memories/sec")
if has_chromadb and nocmem_results and chroma_results:
nm = nocmem_results[-1]
cr = chroma_results[-1]
print(f"\n── nocmem vs ChromaDB @ {nm['n']:,} ──")
lat_ratio = cr['latency_avg_ms'] / nm['latency_avg_ms'] if nm['latency_avg_ms'] > 0 else float('inf')
disk_ratio = cr['disk_mb'] / nm['disk_mb'] if nm['disk_mb'] > 0 else float('inf')
rate_ratio = nm['ingest_rate'] / cr['ingest_rate'] if cr['ingest_rate'] > 0 else float('inf')
print(f" Latency: nocmem {lat_ratio:.1f}x faster" if lat_ratio > 1 else f" Latency: ChromaDB {1/lat_ratio:.1f}x faster")
print(f" Disk: nocmem {disk_ratio:.1f}x smaller" if disk_ratio > 1 else f" Disk: ChromaDB {1/disk_ratio:.1f}x smaller")
print(f" Ingest: nocmem {rate_ratio:.1f}x faster" if rate_ratio > 1 else f" Ingest: ChromaDB {1/rate_ratio:.1f}x faster")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,239 @@
"""LongMemEval benchmark for nocmem.
Evaluates retrieval quality: given a question, can nocmem find the correct
session(s) from a haystack of ~50 conversation sessions?
Uses HippocampalMemory directly (no HTTP) for speed.
Compares against MemPalace's 96.6% R@5 baseline.
Usage:
uv run python benchmarks/longmemeval_bench.py [--limit N] [--granularity session|turn]
"""
import argparse
import json
import math
import sys
import time
import torch
from sentence_transformers import SentenceTransformer
from nuonuo.hippocampus import HippocampalMemory
# ── setup ───────────────────────────────────────────────────────────
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
EMBED_MODEL = "all-MiniLM-L6-v2"
EMBED_DIM = 384
def load_encoder():
print(f"loading {EMBED_MODEL} on {DEVICE}...")
return SentenceTransformer(EMBED_MODEL, device=DEVICE)
def embed_batch(encoder, texts: list[str]) -> torch.Tensor:
"""Batch embed, returns (N, dim) tensor."""
return encoder.encode(
texts, convert_to_tensor=True, normalize_embeddings=True,
device=DEVICE, batch_size=128, show_progress_bar=False,
)
# ── granularity: how to chunk sessions ──────────────────────────────
def sessions_to_chunks_turn(session_ids, sessions):
"""Each user-assistant turn becomes a separate chunk."""
chunks = [] # (text, session_id)
for sid, sess in zip(session_ids, sessions):
for i in range(0, len(sess) - 1, 2):
user = sess[i]["content"]
asst = sess[i + 1]["content"] if i + 1 < len(sess) else ""
text = f"{user}\n{asst}"
# truncate long turns to avoid embedding issues
chunks.append((text[:1000], sid))
# handle odd-numbered turns
if len(sess) % 2 == 1:
chunks.append((sess[-1]["content"][:1000], sid))
return chunks
def sessions_to_chunks_session(session_ids, sessions):
"""Each session becomes a single chunk (concatenated turns)."""
chunks = []
for sid, sess in zip(session_ids, sessions):
text = "\n".join(m["content"] for m in sess)
# truncate to fit embedding model's context
chunks.append((text[:2000], sid))
return chunks
# ── evaluate one question ───────────────────────────────────────────
def evaluate_question(encoder, item, granularity, ks=(5, 10)):
"""Store haystack, query, check if answer session in top-K.
Returns dict with R@5, R@10, NDCG@10, timings.
"""
# chunk the haystack
if granularity == "turn":
chunks = sessions_to_chunks_turn(
item["haystack_session_ids"], item["haystack_sessions"])
else:
chunks = sessions_to_chunks_session(
item["haystack_session_ids"], item["haystack_sessions"])
texts = [c[0] for c in chunks]
sids = [c[1] for c in chunks]
answer_sids = set(item["answer_session_ids"])
# batch embed all chunks
t0 = time.monotonic()
embeddings = embed_batch(encoder, texts)
embed_time = time.monotonic() - t0
# build memory
t1 = time.monotonic()
hip = HippocampalMemory(embed_dim=EMBED_DIM, device=DEVICE)
for i in range(len(chunks)):
hip.store(
embeddings[i], embeddings[i],
metadata={"session_id": sids[i]},
)
store_time = time.monotonic() - t1
# query
t2 = time.monotonic()
query_emb = encoder.encode(
[item["question"]], convert_to_tensor=True,
normalize_embeddings=True, device=DEVICE,
)[0]
max_k = max(ks)
results = hip.recall(query_emb, top_k=max_k)
recall_time = time.monotonic() - t2
# deduplicate by session_id, preserving rank order
seen = set()
ranked_sids = []
for r in results:
sid = r.metadata["session_id"]
if sid not in seen:
seen.add(sid)
ranked_sids.append(sid)
# compute metrics
metrics = {}
for k in ks:
top_k_sids = set(ranked_sids[:k])
hit = bool(answer_sids & top_k_sids)
metrics[f"R@{k}"] = 1.0 if hit else 0.0
# NDCG@10
ndcg = compute_ndcg(ranked_sids[:10], answer_sids)
metrics["NDCG@10"] = ndcg
metrics["embed_ms"] = embed_time * 1000
metrics["store_ms"] = store_time * 1000
metrics["recall_ms"] = recall_time * 1000
metrics["n_chunks"] = len(chunks)
return metrics
def compute_ndcg(ranked_sids, answer_sids, k=10):
"""Normalized Discounted Cumulative Gain."""
dcg = 0.0
for i, sid in enumerate(ranked_sids[:k]):
if sid in answer_sids:
dcg += 1.0 / math.log2(i + 2) # i+2 because rank starts at 1
# ideal: all answer sessions at top
n_relevant = min(len(answer_sids), k)
idcg = sum(1.0 / math.log2(i + 2) for i in range(n_relevant))
return dcg / idcg if idcg > 0 else 0.0
# ── main ───<E29480><E29480>────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--data", default="benchmarks/longmemeval.json")
parser.add_argument("--limit", type=int, default=0, help="limit number of questions (0=all)")
parser.add_argument("--granularity", choices=["session", "turn"], default="turn")
args = parser.parse_args()
print(f"LongMemEval benchmark for nocmem")
print(f"granularity: {args.granularity}")
print(f"device: {DEVICE}")
print()
with open(args.data) as f:
data = json.load(f)
if args.limit:
data = data[:args.limit]
encoder = load_encoder()
print(f"evaluating {len(data)} questions...\n")
all_metrics = []
by_type = {}
for i, item in enumerate(data):
metrics = evaluate_question(encoder, item, args.granularity)
all_metrics.append(metrics)
qtype = item["question_type"]
if qtype not in by_type:
by_type[qtype] = []
by_type[qtype].append(metrics)
# progress
if (i + 1) % 10 == 0 or i == len(data) - 1:
r5 = sum(m["R@5"] for m in all_metrics) / len(all_metrics) * 100
r10 = sum(m["R@10"] for m in all_metrics) / len(all_metrics) * 100
avg_recall = sum(m["recall_ms"] for m in all_metrics) / len(all_metrics)
print(f" [{i+1:3d}/{len(data)}] R@5={r5:.1f}% R@10={r10:.1f}% recall={avg_recall:.1f}ms")
# final results
n = len(all_metrics)
r5 = sum(m["R@5"] for m in all_metrics) / n * 100
r10 = sum(m["R@10"] for m in all_metrics) / n * 100
ndcg = sum(m["NDCG@10"] for m in all_metrics) / n * 100
avg_embed = sum(m["embed_ms"] for m in all_metrics) / n
avg_store = sum(m["store_ms"] for m in all_metrics) / n
avg_recall = sum(m["recall_ms"] for m in all_metrics) / n
avg_chunks = sum(m["n_chunks"] for m in all_metrics) / n
print(f"\n{'='*60}")
print(f"nocmem LongMemEval Results ({args.granularity} granularity)")
print(f"{'='*60}")
print(f" Questions: {n}")
print(f" Avg chunks: {avg_chunks:.0f}")
print(f"")
print(f" R@5: {r5:.1f}%")
print(f" R@10: {r10:.1f}%")
print(f" NDCG@10: {ndcg:.1f}%")
print(f"")
print(f" Avg embed: {avg_embed:.0f}ms")
print(f" Avg store: {avg_store:.0f}ms")
print(f" Avg recall: {avg_recall:.1f}ms")
print(f"\n── by question type ──")
for qtype, ms in sorted(by_type.items()):
nt = len(ms)
tr5 = sum(m["R@5"] for m in ms) / nt * 100
tr10 = sum(m["R@10"] for m in ms) / nt * 100
print(f" {qtype:30s} n={nt:3d} R@5={tr5:.1f}% R@10={tr10:.1f}%")
print(f"\n── comparison ──")
print(f" MemPalace (raw, session): R@5=96.6%")
print(f" nocmem ({args.granularity:7s}): R@5={r5:.1f}%")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,178 @@
"""Does recall noise decrease as memory count grows?
At various scales, measure:
1. Recall accuracy (R@3) for relevant queries
2. Max cosine similarity for irrelevant queries
3. Separation gap between relevant and irrelevant
If nocmem works well at scale, the gap should widen — relevant queries
should score much higher than irrelevant ones as the memory pool grows.
"""
import json
import time
import torch
import numpy as np
from sentence_transformers import SentenceTransformer
from nuonuo.hippocampus import HippocampalMemory
DEVICE = "cuda"
EMBED_DIM = 384
DATA_FILE = "benchmarks/longmemeval.json"
IRRELEVANT_QUERIES = [
"今天天气怎么样",
"你喜欢吃什么",
"",
"讲个笑话",
"明天会下雨吗",
"你觉得猫可爱还是狗可爱",
"人生的意义是什么",
"帮我写一首诗",
"地球到月球有多远",
"如何学会游泳",
]
BETA_CONFIGS = [16.0, 32.0, 64.0]
SCALES = [50, 200, 500, 1000, 3000]
def main():
print("noise vs scale benchmark\n")
print("loading encoder...")
encoder = SentenceTransformer("all-MiniLM-L6-v2", device=DEVICE)
def emb(text):
return encoder.encode([text], convert_to_tensor=True,
normalize_embeddings=True, device=DEVICE)[0]
def emb_batch(texts):
return encoder.encode(texts, convert_to_tensor=True,
normalize_embeddings=True, device=DEVICE,
batch_size=256, show_progress_bar=False)
# load data
print("loading data...")
with open(DATA_FILE) as f:
data = json.load(f)
# collect unique chunks with their source question index
all_chunks = [] # (text, question_idx, session_id)
seen = set()
for qi, item in enumerate(data):
for sid, sess in zip(item["haystack_session_ids"], item["haystack_sessions"]):
for i in range(0, len(sess) - 1, 2):
key = (sid, i)
if key in seen:
continue
seen.add(key)
user = sess[i]["content"]
asst = sess[i + 1]["content"] if i + 1 < len(sess) else ""
text = f"{user}\n{asst}"[:1000]
all_chunks.append((text, qi, sid))
print(f" {len(all_chunks)} unique chunks")
# pre-embed irrelevant queries
irrel_embs = [emb(q) for q in IRRELEVANT_QUERIES]
# collect relevant queries: for each question, we know the answer session
# pick first 50 questions that have at least one answer session
relevant_queries = []
for item in data[:100]:
answer_sids = set(item["answer_session_ids"])
relevant_queries.append((item["question"], answer_sids))
rel_query_embs = emb_batch([q for q, _ in relevant_queries])
print(f" {len(relevant_queries)} relevant queries")
print(f" {len(IRRELEVANT_QUERIES)} irrelevant queries")
# filter scales to what we have
scales = [s for s in SCALES if s <= len(all_chunks)]
for beta in BETA_CONFIGS:
print(f"\n{'='*70}")
print(f" β = {beta}")
print(f"{'='*70}")
print(f"{'Scale':>7} | {'R@3':>6} | {'Rel maxcos':>10} {'Irrel maxcos':>12} {'Gap':>8} | {'Rel attn':>9} {'Irrel attn':>11}")
print("-" * 80)
for n in scales:
subset = all_chunks[:n]
texts = [c[0] for c in subset]
sids = [c[2] for c in subset]
# embed and build memory
embeddings = emb_batch(texts)
hip = HippocampalMemory(
embed_dim=EMBED_DIM, beta=beta, hopfield_top_k=10, device=DEVICE,
)
for i in range(n):
hip.store(embeddings[i], embeddings[i],
metadata={"session_id": sids[i]})
cue_mat = hip._get_cue_matrix()
# --- relevant queries ---
rel_max_cos = []
rel_top_attn = []
hits = 0
tested = 0
for qi in range(len(relevant_queries)):
question, answer_sids = relevant_queries[qi]
qe = rel_query_embs[qi]
# check if any answer session is in this subset
subset_sids = set(sids)
if not (answer_sids & subset_sids):
continue
tested += 1
# cosine sim
cos_sims = qe @ cue_mat.T
rel_max_cos.append(cos_sims.max().item())
# recall
results = hip.recall(qe, top_k=3)
top_attn = results[0].similarity if results else 0
rel_top_attn.append(top_attn)
recalled_sids = {r.metadata["session_id"] for r in results}
if answer_sids & recalled_sids:
hits += 1
r3 = hits / tested * 100 if tested > 0 else 0
avg_rel_cos = np.mean(rel_max_cos) if rel_max_cos else 0
avg_rel_attn = np.mean(rel_top_attn) if rel_top_attn else 0
# --- irrelevant queries ---
irrel_max_cos = []
irrel_top_attn = []
for qe in irrel_embs:
cos_sims = qe @ cue_mat.T
irrel_max_cos.append(cos_sims.max().item())
results = hip.recall(qe, top_k=3)
top_attn = results[0].similarity if results else 0
irrel_top_attn.append(top_attn)
avg_irrel_cos = np.mean(irrel_max_cos)
avg_irrel_attn = np.mean(irrel_top_attn)
gap = avg_rel_cos - avg_irrel_cos
print(f"{n:>7,} | {r3:>5.1f}% | {avg_rel_cos:>10.3f} {avg_irrel_cos:>12.3f} {gap:>8.3f} | {avg_rel_attn:>8.0%} {avg_irrel_attn:>10.0%}")
del hip
torch.cuda.empty_cache()
print(f"\n── 解读 ──")
print(f"Rel maxcos: 相关查询的最大余弦相似度(越高越好)")
print(f"Irrel maxcos: 无关查询的最大余弦相似度(越低越好)")
print(f"Gap: 两者之差(越大越好 = 越容易区分)")
print(f"Rel attn: 相关查询 top1 的 Hopfield attention 权重")
print(f"Irrel attn: 无关查询 top1 的 Hopfield attention 权重(越低 = 越少噪音)")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,104 @@
"""Test Hopfield attention sharpness with different top_k and beta.
Goal: find settings that give "either clearly remembered or nothing"
instead of flat attention across 20 candidates.
"""
import torch
from sentence_transformers import SentenceTransformer
from nuonuo.hippocampus import HippocampalMemory
DEVICE = "cuda"
EMBED_DIM = 384
print("loading encoder...")
encoder = SentenceTransformer("all-MiniLM-L6-v2", device=DEVICE)
def emb(text):
return encoder.encode([text], convert_to_tensor=True, normalize_embeddings=True, device=DEVICE)[0]
# store the same memories in each config
MEMORIES = [
("bot的名字叫什么", "bot的名字叫小乖是Fam给取的"),
("有哪些工具可以用", "工具有: fam_todo, send_file, spawn_agent, run_shell, run_python, update_memory"),
("vLLM在5090上的性能", "RTX 5090上vLLM跑gemma只有4.8 tok/s需要切换到awq_marlin"),
("repo-vis项目是什么", "repo-vis用Rust后端+Three.js前端的3D代码库可视化目标支持Linux内核和Pico VR"),
("repo-vis的性能瓶颈", "Linux内核79K文件SQLite 1GB上限和O(n)反序列化是瓶颈需要n-ary tree按需合并"),
("明天的待办事项", "最紧迫的是emblem scanner的AI Chat和KB部分"),
("后端切换到了什么", "NOC后端切换到了vLLM速度变快了"),
("数据库密码在哪里", "数据库密码存在 /etc/secrets/db.env 文件中"),
("什么GPU", "服务器有NVIDIA RTX 4090 24GB VRAM"),
("home有多少log文件", "home目录及子目录下共有960个.log文件"),
]
QUERIES = [
("repo-vis怎么样了", "repo-vis", True), # should recall clearly
("数据库密码", "密码", True), # should recall clearly
("今天天气怎么样", "天气", False), # irrelevant, should recall nothing
("vllm速度", "vllm", True), # should recall clearly
("你喜欢吃什么", "吃什么", False), # irrelevant
("VR支持", "VR", True), # edge case
]
CONFIGS = [
# (top_k, beta, label)
(20, 16.0, "baseline (top_k=20, β=16)"),
(10, 16.0, "top_k=10, β=16"),
(5, 16.0, "top_k=5, β=16"),
(20, 32.0, "top_k=20, β=32"),
(20, 64.0, "top_k=20, β=64"),
(10, 32.0, "top_k=10, β=32"),
(5, 32.0, "top_k=5, β=32"),
(5, 64.0, "top_k=5, β=64"),
]
# pre-embed everything
mem_embs = [(emb(c), emb(t), c, t) for c, t in MEMORIES]
query_embs = [(emb(q), label, relevant) for q, label, relevant in QUERIES]
print(f"\n{len(MEMORIES)} memories, {len(QUERIES)} queries, {len(CONFIGS)} configs\n")
for top_k, beta, label in CONFIGS:
print(f"{'='*70}")
print(f" {label}")
print(f"{'='*70}")
hip = HippocampalMemory(
embed_dim=EMBED_DIM, hopfield_top_k=top_k, beta=beta, device=DEVICE,
)
for ce, te, cue_text, target_text in mem_embs:
hip.store(ce, te, metadata={"cue": cue_text, "target": target_text})
for qe, qlabel, should_recall in query_embs:
results = hip.recall(qe, top_k=5)
# show distribution
sims = [r.similarity for r in results]
top1 = sims[0] if sims else 0
top2 = sims[1] if len(sims) > 1 else 0
gap = top1 - top2 # gap between #1 and #2
above_5pct = sum(1 for s in sims if s >= 0.05)
above_10pct = sum(1 for s in sims if s >= 0.10)
top_target = results[0].metadata["target"][:40] if results else ""
tag = "" if should_recall else ""
print(f" [{tag}] {qlabel:10s} top1={top1:.0%} top2={top2:.0%} gap={gap:.0%} "
f"≥5%:{above_5pct} ≥10%:{above_10pct}{top_target}")
# summary: average sharpness
total_gap = 0
total_top1 = 0
for qe, qlabel, _ in query_embs:
results = hip.recall(qe, top_k=5)
sims = [r.similarity for r in results]
total_top1 += sims[0] if sims else 0
total_gap += (sims[0] - sims[1]) if len(sims) > 1 else 0
n = len(query_embs)
print(f"\n avg top1={total_top1/n:.0%} avg gap={total_gap/n:.0%}")
print()
del hip
torch.cuda.empty_cache()

19
mem/nocmem.service Normal file
View File

@@ -0,0 +1,19 @@
[Unit]
Description=nocmem — NuoNuo memory service for NOC
After=network.target
[Service]
Type=simple
WorkingDirectory=/data/src/noc/mem
ExecStart=/home/fam/.local/bin/uv run uvicorn server:app --host 0.0.0.0 --port 9820
Restart=on-failure
RestartSec=5
Environment=NOCMEM_LLM_ENDPOINT=http://100.84.7.49:8000/v1
Environment=NOCMEM_LLM_MODEL=QuantTrio/gemma-4-31B-it-AWQ
Environment=NOCMEM_LLM_API_KEY=unused
Environment=NOCMEM_DATA_DIR=/data/src/noc/mem/data
Environment=NOCMEM_DEVICE=cuda
[Install]
WantedBy=default.target

25
mem/pyproject.toml Normal file
View File

@@ -0,0 +1,25 @@
[project]
name = "nocmem"
version = "0.1.0"
description = "Memory service for noc — NuoNuo hippocampal recall + ingest over HTTP"
requires-python = ">=3.12"
dependencies = [
"fastapi>=0.115",
"uvicorn>=0.34",
"torch>=2.10,<2.11",
"sentence-transformers>=3.0",
"nuonuo",
"openai>=1.0",
]
[tool.uv]
index-url = "https://pypi.org/simple"
[[tool.uv.index]]
name = "pytorch-cu128"
url = "https://download.pytorch.org/whl/cu128"
explicit = true
[tool.uv.sources]
torch = { index = "pytorch-cu128" }
nuonuo = { path = "../../nuonuo", editable = true }

386
mem/server.py Normal file
View File

@@ -0,0 +1,386 @@
"""nocmem — Memory service for NOC.
Wraps NuoNuo's HippocampalMemory as an HTTP API.
Auto-recall on every user message, async ingest after LLM response.
"""
import asyncio
import os
import re
import time
import logging
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
import torch
from fastapi import FastAPI
from pydantic import BaseModel, Field
from sentence_transformers import SentenceTransformer
from openai import OpenAI
from nuonuo.hippocampus import HippocampalMemory
logger = logging.getLogger("nocmem")
# ── config ──────────────────────────────────────────────────────────
EMBED_MODEL = os.environ.get("NOCMEM_EMBED_MODEL", "all-MiniLM-L6-v2")
EMBED_DIM = int(os.environ.get("NOCMEM_EMBED_DIM", "384"))
DEVICE = os.environ.get("NOCMEM_DEVICE", "cuda" if torch.cuda.is_available() else "cpu")
DATA_DIR = Path(os.environ.get("NOCMEM_DATA_DIR", "./data"))
CHECKPOINT = DATA_DIR / "hippocampus.pt"
SAVE_INTERVAL = int(os.environ.get("NOCMEM_SAVE_INTERVAL", "10")) # save every N stores
HOPFIELD_BETA = float(os.environ.get("NOCMEM_HOPFIELD_BETA", "32.0"))
HOPFIELD_TOP_K = int(os.environ.get("NOCMEM_HOPFIELD_TOP_K", "10"))
COS_SIM_THRESHOLD = float(os.environ.get("NOCMEM_COS_SIM_THRESHOLD", "0.35"))
# LLM for memory extraction (optional)
LLM_ENDPOINT = os.environ.get("NOCMEM_LLM_ENDPOINT", "")
LLM_MODEL = os.environ.get("NOCMEM_LLM_MODEL", "gemma4:12b")
LLM_API_KEY = os.environ.get("NOCMEM_LLM_API_KEY", "unused")
# ── globals ─────────────────────────────────────────────────────────
encoder: SentenceTransformer = None # type: ignore
hippocampus: HippocampalMemory = None # type: ignore
llm_client = None # optional
_stores_since_save = 0
def embed(text: str) -> torch.Tensor:
return encoder.encode(
[text], convert_to_tensor=True, normalize_embeddings=True, device=DEVICE
)[0]
def embed_batch(texts: list[str]) -> list[torch.Tensor]:
if not texts:
return []
t = encoder.encode(
texts, convert_to_tensor=True, normalize_embeddings=True, device=DEVICE
)
return [t[i] for i in range(t.shape[0])]
def maybe_save():
global _stores_since_save
_stores_since_save += 1
if _stores_since_save >= SAVE_INTERVAL:
_stores_since_save = 0
DATA_DIR.mkdir(parents=True, exist_ok=True)
hippocampus.save(str(CHECKPOINT))
logger.info("checkpoint saved: %s", CHECKPOINT)
# ── lifespan ────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
global encoder, hippocampus, llm_client
logger.info("loading embedding model: %s (device=%s)", EMBED_MODEL, DEVICE)
encoder = SentenceTransformer(EMBED_MODEL, device=DEVICE)
if CHECKPOINT.exists():
logger.info("loading checkpoint: %s", CHECKPOINT)
hippocampus = HippocampalMemory.load(str(CHECKPOINT), device=DEVICE)
logger.info("loaded %d memories", len(hippocampus.memories))
else:
logger.info("no checkpoint found, starting fresh")
hippocampus = HippocampalMemory(
embed_dim=EMBED_DIM, beta=HOPFIELD_BETA,
hopfield_top_k=HOPFIELD_TOP_K, device=DEVICE,
)
if LLM_ENDPOINT:
try:
client = OpenAI(base_url=LLM_ENDPOINT, api_key=LLM_API_KEY, timeout=5.0)
client.models.list()
llm_client = client
logger.info("LLM client connected: %s", LLM_ENDPOINT)
except Exception as e:
logger.warning("LLM client unavailable: %s", e)
yield
# save on shutdown
DATA_DIR.mkdir(parents=True, exist_ok=True)
hippocampus.save(str(CHECKPOINT))
logger.info("shutdown: checkpoint saved")
app = FastAPI(title="nocmem", lifespan=lifespan)
# ── models ──────────────────────────────────────────────────────────
class RecallRequest(BaseModel):
text: str
top_k: int = Field(default=5, ge=1, le=20)
hops: int = Field(default=2, ge=1, le=5)
min_similarity: float = Field(default=0.0, ge=0.0, le=1.0)
class RecallResponse(BaseModel):
memories: str
count: int
latency_ms: float
class IngestRequest(BaseModel):
user_msg: str
assistant_msg: str
class IngestResponse(BaseModel):
stored: int
class StoreRequest(BaseModel):
cue: str
target: str
importance: float = Field(default=0.5, ge=0.0, le=1.0)
class StoreResponse(BaseModel):
memory_id: int
# ── endpoints ───────────────────────────────────────────────────────
@app.post("/recall", response_model=RecallResponse)
async def recall(req: RecallRequest):
t0 = time.monotonic()
query_emb = embed(req.text)
# pre-filter: check if anything in memory is actually similar enough
cue_mat = hippocampus._get_cue_matrix()
if cue_mat is not None and COS_SIM_THRESHOLD > 0:
cos_sims = query_emb @ cue_mat.T
max_cos_sim = cos_sims.max().item()
if max_cos_sim < COS_SIM_THRESHOLD:
# nothing in memory is similar enough — don't hallucinate
return RecallResponse(memories="", count=0, latency_ms=(time.monotonic() - t0) * 1000)
# single-hop
results = hippocampus.recall(query_emb, top_k=req.top_k)
# multi-hop chain from top result
chain_results = []
if results and req.hops > 1:
chain = hippocampus.recall_chain(query_emb, hops=req.hops)
# add chain results not already in single-hop
seen_ids = {r.memory_id for r in results}
for cr in chain:
if cr.memory_id not in seen_ids:
chain_results.append(cr)
seen_ids.add(cr.memory_id)
all_results = results + chain_results
elapsed = (time.monotonic() - t0) * 1000
if not all_results:
return RecallResponse(memories="", count=0, latency_ms=elapsed)
lines = []
for r in all_results:
if r.similarity < req.min_similarity:
continue
meta = r.metadata
text = meta.get("target", meta.get("text", ""))
if not text:
continue
hop_tag = f" (联想 hop={r.hop_distance})" if r.hop_distance > 1 else ""
lines.append(f"- {text}{hop_tag}")
if not lines:
return RecallResponse(memories="", count=0, latency_ms=elapsed)
formatted = "[以下是可能相关的历史记忆,仅供参考。请优先关注用户当前的消息。]\n" + "\n".join(lines)
return RecallResponse(memories=formatted, count=len(lines), latency_ms=elapsed)
@app.post("/ingest", response_model=IngestResponse)
async def ingest(req: IngestRequest):
extracted = await asyncio.to_thread(_extract_and_store, req.user_msg, req.assistant_msg)
return IngestResponse(stored=extracted)
@dataclass
class ExtractedMemory:
cue: str
target: str
importance: float = 0.5
def _extract_memories_llm(user_msg: str, assistant_msg: str) -> list[ExtractedMemory]:
prompt = (
"From this conversation turn, extract key facts worth remembering for future conversations.\n"
"For each fact, provide a \"cue\" (what would trigger recalling this) and a \"target\" (the fact itself).\n"
"Rate importance 0-1 (1 = critical fact, 0 = trivial).\n\n"
f"User: {user_msg}\nAssistant: {assistant_msg}\n\n"
"Output format (one per line):\nCUE: <trigger phrase> | TARGET: <fact> | IMPORTANCE: <0-1>\n\n"
"Only extract genuinely useful facts. If nothing worth remembering, output NONE."
)
try:
resp = llm_client.chat.completions.create(
model=LLM_MODEL, messages=[{"role": "user", "content": prompt}],
temperature=0.3, max_tokens=512,
)
result = resp.choices[0].message.content
except Exception:
return _extract_memories_heuristic(user_msg, assistant_msg)
memories = []
for line in result.strip().split("\n"):
if line.strip() == "NONE":
break
m = re.match(r"CUE:\s*(.+?)\s*\|\s*TARGET:\s*(.+?)\s*\|\s*IMPORTANCE:\s*([\d.]+)", line)
if m:
memories.append(ExtractedMemory(m.group(1).strip(), m.group(2).strip(), float(m.group(3))))
return memories
def _extract_memories_heuristic(user_msg: str, assistant_msg: str) -> list[ExtractedMemory]:
memories = []
# detect questions — English and Chinese
has_question = "?" in user_msg or "" in user_msg or any(
user_msg.strip().startswith(q) for q in ["怎么", "什么", "", "为什么", "如何", "多少", ""]
)
# count meaningful length: for Chinese, use character count
assistant_long_enough = len(assistant_msg) > 20
if has_question and assistant_long_enough:
cue = user_msg.rstrip("?").strip()
memories.append(ExtractedMemory(
cue=cue, target=assistant_msg[:300], importance=0.6,
))
# tech keywords — English and Chinese
tech_keywords = [
"deploy", "config", "bug", "fix", "error", "database", "server",
"api", "port", "token", "password", "version", "install", "upgrade",
"部署", "配置", "错误", "数据库", "服务器", "端口", "密码", "版本",
"安装", "升级", "模型", "工具", "代码", "项目", "优化", "性能",
"内存", "GPU", "vllm", "docker", "k8s", "git", "编译", "测试",
]
combined = (user_msg + " " + assistant_msg).lower()
user_meaningful = len(user_msg) >= 8 # characters, not words
if any(kw in combined for kw in tech_keywords) and user_meaningful:
if not memories: # avoid duplicate with Q&A extraction
memories.append(ExtractedMemory(
cue=user_msg[:150], target=assistant_msg[:300], importance=0.5,
))
return memories
def _generate_paraphrases_heuristic(text: str, n: int = 3) -> list[str]:
variants = []
text_lower = text.lower().strip()
# English prefixes
for pfx in ["can you ", "please ", "i need to ", "how do i ", "how to ", "what is ", "what's "]:
if text_lower.startswith(pfx):
stripped = text[len(pfx):].strip()
if stripped:
variants.append(stripped)
# Chinese prefixes
for pfx in ["帮我看看", "帮我", "请问", "我想知道", "能不能", "怎么样", "看下", "看看"]:
if text.startswith(pfx):
stripped = text[len(pfx):].strip()
if stripped:
variants.append(stripped)
# synonym swaps — English
en_swaps = {"slow": "performance issues", "fix": "resolve", "deploy": "release",
"error": "issue", "bug": "problem", "database": "DB", "server": "machine"}
for old, new in en_swaps.items():
if old in text_lower:
variant = text.replace(old, new).replace(old.capitalize(), new.capitalize())
if variant != text and variant not in variants:
variants.append(variant)
# synonym swaps — Chinese
cn_swaps = {"数据库": "DB", "服务器": "机器", "部署": "上线", "配置": "设置",
"性能": "速度", "优化": "改进", "工具": "tool", "项目": "project"}
for old, new in cn_swaps.items():
if old in text:
variant = text.replace(old, new)
if variant != text and variant not in variants:
variants.append(variant)
return variants[:n]
def _generate_paraphrases_llm(text: str, n: int = 3) -> list[str]:
prompt = f"Generate {n} different paraphrases of this text. Each should convey the same meaning but use different words. One per line, no numbering.\n\nText: {text}"
try:
resp = llm_client.chat.completions.create(
model=LLM_MODEL, messages=[{"role": "user", "content": prompt}],
temperature=0.8, max_tokens=256,
)
result = resp.choices[0].message.content
return [l.strip() for l in result.strip().split("\n") if l.strip() and len(l.strip()) > 3][:n]
except Exception:
return _generate_paraphrases_heuristic(text, n)
def _extract_and_store(user_msg: str, assistant_msg: str) -> int:
if llm_client:
memories = _extract_memories_llm(user_msg, assistant_msg)
else:
memories = _extract_memories_heuristic(user_msg, assistant_msg)
if not memories:
return 0
stored = 0
for mem in memories:
if mem.importance < 0.3:
continue
cue_emb = embed(mem.cue)
target_emb = embed(mem.target)
if llm_client:
paraphrases = _generate_paraphrases_llm(mem.cue, n=3)
else:
paraphrases = _generate_paraphrases_heuristic(mem.cue, n=3)
variant_embs = embed_batch(paraphrases) if paraphrases else []
hippocampus.store(
cue_emb, target_emb,
cue_variants=variant_embs,
metadata={"cue": mem.cue, "target": mem.target, "importance": mem.importance},
timestamp=time.time(),
)
stored += 1
if stored > 0:
maybe_save()
logger.info("ingested %d memories from conversation turn", stored)
return stored
@app.post("/store", response_model=StoreResponse)
async def store_direct(req: StoreRequest):
"""Direct store — bypass LLM extraction, for manual/testing use."""
cue_emb = embed(req.cue)
target_emb = embed(req.target)
mid = hippocampus.store(
cue_emb, target_emb,
metadata={"cue": req.cue, "target": req.target, "importance": req.importance},
timestamp=time.time(),
)
maybe_save()
return StoreResponse(memory_id=mid)
@app.get("/stats")
async def stats():
s = hippocampus.stats()
s["device"] = DEVICE
s["embedding_model"] = EMBED_MODEL
s["checkpoint"] = str(CHECKPOINT)
s["checkpoint_exists"] = CHECKPOINT.exists()
return s
@app.delete("/memory/{memory_id}")
async def forget(memory_id: int):
hippocampus.forget(memory_id)
maybe_save()
return {"deleted": memory_id}

390
mem/test_api.py Normal file
View File

@@ -0,0 +1,390 @@
"""nocmem API integration tests.
Run with: uv run python test_api.py
Requires nocmem server running on localhost:9820.
"""
import sys
import time
import requests
BASE = "http://127.0.0.1:9820"
PASS = 0
FAIL = 0
def test(name: str, fn):
global PASS, FAIL
try:
fn()
print(f"{name}")
PASS += 1
except AssertionError as e:
print(f"{name}: {e}")
FAIL += 1
except Exception as e:
print(f"{name}: EXCEPTION {e}")
FAIL += 1
def assert_eq(a, b, msg=""):
assert a == b, f"expected {b!r}, got {a!r}" + (f" ({msg})" if msg else "")
def assert_gt(a, b, msg=""):
assert a > b, f"expected > {b!r}, got {a!r}" + (f" ({msg})" if msg else "")
def assert_in(needle, haystack, msg=""):
assert needle in haystack, f"{needle!r} not in {haystack!r}" + (f" ({msg})" if msg else "")
# ── health check ────────────────────────────────────────────────────
def check_server():
try:
r = requests.get(f"{BASE}/stats", timeout=3)
r.raise_for_status()
return True
except Exception:
return False
# ── test: stats on empty db ─────────────────────────────────────────
def test_stats_empty():
r = requests.get(f"{BASE}/stats")
assert_eq(r.status_code, 200)
data = r.json()
assert "num_memories" in data
assert "device" in data
assert_eq(data["embedding_model"], "all-MiniLM-L6-v2")
# ── test: recall on empty db ───────────────────────<E29480><E29480><EFBFBD>────────────────
def test_recall_empty():
r = requests.post(f"{BASE}/recall", json={"text": "hello"})
assert_eq(r.status_code, 200)
data = r.json()
assert_eq(data["memories"], "")
assert_eq(data["count"], 0)
# ── test: direct store ────────<E29480><E29480><EFBFBD>─────────────────────────────────────
stored_ids = []
def test_store_single():
r = requests.post(f"{BASE}/store", json={
"cue": "what port does postgres run on",
"target": "PostgreSQL runs on port 5432",
"importance": 0.8,
})
assert_eq(r.status_code, 200)
data = r.json()
assert "memory_id" in data
stored_ids.append(data["memory_id"])
def test_store_multiple():
memories = [
{"cue": "what is the database password", "target": "The DB password is stored in /etc/secrets/db.env", "importance": 0.9},
{"cue": "how to deploy the app", "target": "Run make deploy-hera to deploy to the suite VPS via SSH", "importance": 0.7},
{"cue": "what timezone is Fam in", "target": "Fam is in London, UK timezone (Europe/London, GMT/BST)", "importance": 0.6},
{"cue": "which embedding model works best", "target": "all-MiniLM-L6-v2 has the best gap metric for hippocampal memory", "importance": 0.8},
{"cue": "what GPU does the server have", "target": "The server has an NVIDIA RTX 4090 with 24GB VRAM", "importance": 0.7},
]
for m in memories:
r = requests.post(f"{BASE}/store", json=m)
assert_eq(r.status_code, 200)
stored_ids.append(r.json()["memory_id"])
# ── test: exact recall ──────────────────────────────────────────────
def test_recall_exact():
"""Recall with the exact cue text should return the right memory."""
r = requests.post(f"{BASE}/recall", json={
"text": "what port does postgres run on",
"top_k": 3,
})
assert_eq(r.status_code, 200)
data = r.json()
assert_gt(data["count"], 0, "should recall at least 1")
assert_in("5432", data["memories"], "should mention port 5432")
# ── test: paraphrase recall ─────────────────────────────────────────
def test_recall_paraphrase():
"""Recall with a paraphrased query (not exact cue text)."""
r = requests.post(f"{BASE}/recall", json={
"text": "which port is postgresql listening on",
"top_k": 3,
})
assert_eq(r.status_code, 200)
data = r.json()
assert_gt(data["count"], 0, "paraphrase should still recall")
assert_in("5432", data["memories"])
def test_recall_different_wording():
"""Even more different wording."""
r = requests.post(f"{BASE}/recall", json={
"text": "database connection port number",
"top_k": 3,
})
assert_eq(r.status_code, 200)
data = r.json()
assert_gt(data["count"], 0, "different wording should recall")
assert_in("5432", data["memories"])
# ── test: recall relevance ──────────────────────────────────────────
def test_recall_deployment():
r = requests.post(f"{BASE}/recall", json={
"text": "how do I deploy to production",
"top_k": 3,
})
assert_eq(r.status_code, 200)
data = r.json()
assert_gt(data["count"], 0)
assert_in("deploy", data["memories"].lower())
def test_recall_timezone():
r = requests.post(f"{BASE}/recall", json={
"text": "where is Fam located",
"top_k": 3,
})
assert_eq(r.status_code, 200)
data = r.json()
assert_gt(data["count"], 0)
assert_in("London", data["memories"])
def test_recall_gpu():
r = requests.post(f"{BASE}/recall", json={
"text": "what hardware does the server have",
"top_k": 3,
})
assert_eq(r.status_code, 200)
data = r.json()
assert_gt(data["count"], 0)
assert_in("4090", data["memories"])
# ── test: top_k ─────────────────────────────────────────────────────
def test_recall_top_k_1():
r = requests.post(f"{BASE}/recall", json={
"text": "postgres port",
"top_k": 1,
})
data = r.json()
assert_eq(data["count"], 1, "top_k=1 should return exactly 1")
def test_recall_top_k_all():
r = requests.post(f"{BASE}/recall", json={
"text": "tell me everything",
"top_k": 20,
})
data = r.json()
assert_gt(data["count"], 0, "should recall something")
# ── test: recall latency ────────────────────────────────────────────
def test_recall_latency():
"""Recall should be fast (< 100ms including HTTP + embedding)."""
t0 = time.monotonic()
r = requests.post(f"{BASE}/recall", json={"text": "database port"})
elapsed_ms = (time.monotonic() - t0) * 1000
data = r.json()
# internal latency (no HTTP overhead)
assert data["latency_ms"] < 100, f"internal latency {data['latency_ms']:.1f}ms too high"
# end-to-end including HTTP
print(f" (e2e={elapsed_ms:.1f}ms, internal={data['latency_ms']:.1f}ms)")
# ── test: ingest (heuristic, no LLM) ───────────────────────────────
def test_ingest_heuristic():
"""Ingest without LLM should use heuristic extraction."""
r = requests.post(f"{BASE}/ingest", json={
"user_msg": "What version of Python are we running?",
"assistant_msg": "We are running Python 3.12.4 on the server, installed via uv.",
})
assert_eq(r.status_code, 200)
data = r.json()
# heuristic should extract at least the Q&A pair
assert_gt(data["stored"], 0, "heuristic should extract at least 1 memory")
def test_ingest_then_recall():
"""After ingesting, the memory should be recallable."""
# first ingest
requests.post(f"{BASE}/ingest", json={
"user_msg": "What's the Redis cache TTL?",
"assistant_msg": "The Redis cache TTL is set to 3600 seconds (1 hour) in production.",
})
# wait a tiny bit for async processing
time.sleep(0.5)
# then recall
r = requests.post(f"{BASE}/recall", json={
"text": "redis cache timeout",
"top_k": 3,
})
data = r.json()
assert_gt(data["count"], 0, "ingested memory should be recallable")
# Check it mentions the TTL
assert_in("3600", data["memories"], "should recall the TTL value")
# ── test: forget ───────────<E29480><E29480><EFBFBD>────────────────────────<E29480><E29480>───────────────
def test_forget():
"""Delete a memory and verify it's gone."""
# store something
r = requests.post(f"{BASE}/store", json={
"cue": "temporary test memory for deletion",
"target": "this should be deleted XYZZY",
})
mid = r.json()["memory_id"]
# verify it's recallable
r = requests.post(f"{BASE}/recall", json={"text": "temporary test memory for deletion"})
assert_in("XYZZY", r.json()["memories"])
# delete
r = requests.delete(f"{BASE}/memory/{mid}")
assert_eq(r.status_code, 200)
# verify gone — recall the exact cue, should not return XYZZY
r = requests.post(f"{BASE}/recall", json={"text": "temporary test memory for deletion"})
if r.json()["memories"]:
assert "XYZZY" not in r.json()["memories"], "deleted memory should not appear"
# ── test: format ─────────────────────────────────────<E29480><E29480>──────────────
def test_recall_format():
"""Recalled memories should have the expected format."""
r = requests.post(f"{BASE}/recall", json={"text": "postgres port"})
data = r.json()
if data["count"] > 0:
assert data["memories"].startswith("[相关记忆]"), "should start with header"
assert "\n- " in data["memories"], "each memory should start with '- '"
# ── test: stats after stores ──────<E29480><E29480>─────────────────────────────────
def test_stats_after():
r = requests.get(f"{BASE}/stats")
data = r.json()
assert_gt(data["num_memories"], 0, "should have memories")
assert_gt(data["num_cue_entries"], data["num_memories"],
"cue entries should >= memories (augmentation from ingest)")
# ── test: edge cases ────────────────────────────────────────────────
def test_recall_empty_text():
r = requests.post(f"{BASE}/recall", json={"text": ""})
# should not crash
assert r.status_code == 200
def test_recall_long_text():
r = requests.post(f"{BASE}/recall", json={"text": "a " * 1000})
assert r.status_code == 200
def test_recall_chinese():
"""Chinese text should work."""
# store a Chinese memory
requests.post(f"{BASE}/store", json={
"cue": "数据库密码在哪里",
"target": "数据库密码存在 /etc/secrets/db.env 文件中",
})
r = requests.post(f"{BASE}/recall", json={"text": "数据库密码"})
data = r.json()
assert_gt(data["count"], 0, "Chinese recall should work")
assert_in("secrets", data["memories"])
def test_store_validation():
"""Missing required fields should return 422."""
r = requests.post(f"{BASE}/store", json={"cue": "only cue"})
assert_eq(r.status_code, 422)
# ── run ─────<E29480><E29480><EFBFBD>───────────────────────────────────────────────────────
def main():
global PASS, FAIL
print("nocmem API tests")
print(f"server: {BASE}\n")
if not check_server():
print("ERROR: server not reachable")
sys.exit(1)
# first clean slate — check what we start with
r = requests.get(f"{BASE}/stats")
initial = r.json()["num_memories"]
print(f"[initial state: {initial} memories]\n")
print("── basic ──")
test("stats endpoint", test_stats_empty)
test("recall on empty/existing db", test_recall_empty if initial == 0 else lambda: None)
print("\n── store ──")
test("store single memory", test_store_single)
test("store multiple memories", test_store_multiple)
print("\n── recall accuracy ─<><E29480><EFBFBD>")
test("exact cue recall", test_recall_exact)
test("paraphrase recall", test_recall_paraphrase)
test("different wording recall", test_recall_different_wording)
test("deployment query", test_recall_deployment)
test("timezone query", test_recall_timezone)
test("GPU query", test_recall_gpu)
print("\n── recall params ──")
test("top_k=1", test_recall_top_k_1)
test("top_k=20 (all)", test_recall_top_k_all)
test("latency < 100ms", test_recall_latency)
test("format check", test_recall_format)
print("\n── ingest ──")
test("heuristic ingest", test_ingest_heuristic)
test("ingest then recall", test_ingest_then_recall)
print("\n── forget ──")
test("store + forget + verify", test_forget)
print("\n── edge cases ──")
test("empty text", test_recall_empty_text)
test("long text", test_recall_long_text)
test("Chinese text", test_recall_chinese)
test("validation error", test_store_validation)
print("\n── stats ──")
test("stats after stores", test_stats_after)
print(f"\n{'='*40}")
print(f"PASS: {PASS} FAIL: {FAIL}")
if FAIL:
sys.exit(1)
else:
print("All tests passed!")
if __name__ == "__main__":
main()

279
mem/test_real_data.py Normal file
View File

@@ -0,0 +1,279 @@
"""Test nocmem with real conversation data from NOC's SQLite database.
Extracts conversation turns, ingests them, then tests recall with
realistic queries that a user would actually ask.
"""
import sys
import time
import sqlite3
import requests
BASE = "http://127.0.0.1:9820"
DB_PATH = "/data/src/noc/noc.db"
PASS = 0
FAIL = 0
def test(name, fn):
global PASS, FAIL
try:
fn()
print(f"{name}")
PASS += 1
except AssertionError as e:
print(f"{name}: {e}")
FAIL += 1
except Exception as e:
print(f"{name}: EXCEPTION {e}")
FAIL += 1
# ── step 1: extract conversation turns from SQLite ──────────────────
def extract_turns():
"""Extract (user_msg, assistant_msg) pairs from the database."""
conn = sqlite3.connect(DB_PATH)
rows = conn.execute(
"SELECT role, content FROM messages ORDER BY id"
).fetchall()
conn.close()
turns = []
i = 0
while i < len(rows) - 1:
role, content = rows[i]
# skip non-user messages, agent outputs, very short messages
if role != "user" or len(content) < 5 or content.startswith("[Agent ") or content.startswith("[用户上传") or content.startswith("[语音消息]"):
i += 1
continue
# find the next assistant reply
j = i + 1
while j < len(rows) and rows[j][0] != "assistant":
j += 1
if j < len(rows):
assistant_content = rows[j][1]
if len(assistant_content) > 10 and "<pad>" not in assistant_content:
turns.append((content, assistant_content))
i = j + 1
return turns
# ── step 2: ingest all turns ───────────────────────────────────────
def ingest_turns(turns):
"""Ingest conversation turns via /ingest endpoint."""
total_stored = 0
for user_msg, assistant_msg in turns:
r = requests.post(f"{BASE}/ingest", json={
"user_msg": user_msg,
"assistant_msg": assistant_msg,
})
if r.status_code == 200:
total_stored += r.json().get("stored", 0)
return total_stored
# ── step 3: also store some key facts directly ─────────────────────
def store_key_facts():
"""Store critical facts that heuristic extraction might miss."""
facts = [
{"cue": "bot的名字叫什么", "target": "bot的名字叫小乖是Fam给取的", "importance": 0.9},
{"cue": "有哪些工具可以用", "target": "工具有: fam_todo(飞书待办), send_file(发文件), spawn_agent/agent_status/kill_agent(子代理管理), run_shell, run_python, update_memory, update_inner_state, gen_voice", "importance": 0.8},
{"cue": "vLLM在5090上的性能", "target": "RTX 5090上vLLM跑gemma模型只有4.8 tok/s需要切换到awq_marlin量化来提升速度", "importance": 0.8},
{"cue": "repo-vis项目是什么", "target": "repo-vis是一个用Rust后端+Three.js前端的3D代码库可视化工具目标支持Linux内核级别的大型仓库和Pico VR", "importance": 0.8},
{"cue": "repo-vis的性能瓶颈", "target": "Linux内核79K文件量级下SQLite 1GB上限和O(n)全量反序列化是瓶颈需要n-ary tree按需合并优化", "importance": 0.9},
{"cue": "明天的待办事项", "target": "最紧迫的是emblem scanner的AI Chat和KB部分最高优先级然后是曲面二维码识读优化信息收集", "importance": 0.7},
{"cue": "后端切换到了什么", "target": "NOC后端从原来的方案切换到了vLLM速度变快了", "importance": 0.7},
{"cue": "home目录下有多少log文件", "target": "home目录及子目录下共有960个.log文件", "importance": 0.5},
]
stored = 0
for f in facts:
r = requests.post(f"{BASE}/store", json=f)
if r.status_code == 200:
stored += 1
return stored
# ── step 4: recall tests with realistic queries ────────────────────
def test_recall_bot_name():
r = requests.post(f"{BASE}/recall", json={"text": "你叫什么名字"})
data = r.json()
assert data["count"] > 0, "should recall something"
assert "小乖" in data["memories"], f"should mention 小乖, got: {data['memories'][:200]}"
def test_recall_tools():
r = requests.post(f"{BASE}/recall", json={"text": "有什么工具可以用"})
data = r.json()
assert data["count"] > 0
m = data["memories"].lower()
assert "tool" in m or "工具" in m or "spawn" in m or "fam_todo" in m, f"should mention tools, got: {data['memories'][:200]}"
def test_recall_vllm():
r = requests.post(f"{BASE}/recall", json={"text": "vllm性能怎么样"})
data = r.json()
assert data["count"] > 0
assert "4.8" in data["memories"] or "5090" in data["memories"] or "tok" in data["memories"], \
f"should mention vLLM stats, got: {data['memories'][:200]}"
def test_recall_repovis():
r = requests.post(f"{BASE}/recall", json={"text": "repo-vis项目"})
data = r.json()
assert data["count"] > 0
m = data["memories"]
assert "Rust" in m or "Three" in m or "3D" in m or "可视化" in m, \
f"should mention repo-vis tech, got: {m[:200]}"
def test_recall_performance_bottleneck():
r = requests.post(f"{BASE}/recall", json={"text": "Linux内核代码仓库跑不动"})
data = r.json()
assert data["count"] > 0
m = data["memories"]
assert "SQLite" in m or "79K" in m or "瓶颈" in m or "n-ary" in m or "内核" in m, \
f"should mention bottleneck, got: {m[:200]}"
def test_recall_todo():
r = requests.post(f"{BASE}/recall", json={"text": "待办事项有哪些"})
data = r.json()
assert data["count"] > 0
m = data["memories"]
assert "emblem" in m.lower() or "todo" in m.lower() or "待办" in m or "scanner" in m.lower(), \
f"should mention todos, got: {m[:200]}"
def test_recall_vr():
r = requests.post(f"{BASE}/recall", json={"text": "VR支持"})
data = r.json()
assert data["count"] > 0
m = data["memories"]
assert "Pico" in m or "VR" in m or "repo-vis" in m.lower(), \
f"should mention VR, got: {m[:200]}"
def test_recall_chinese_natural():
"""Test with natural Chinese conversational query."""
r = requests.post(f"{BASE}/recall", json={"text": "之前聊过什么技术话题"})
data = r.json()
assert data["count"] > 0, "should recall some technical topics"
def test_recall_cross_topic():
"""Query that spans multiple memories — should return diverse results."""
r = requests.post(f"{BASE}/recall", json={
"text": "项目进度和优化",
"top_k": 5,
})
data = r.json()
assert data["count"] >= 2, f"should recall multiple memories, got {data['count']}"
def test_recall_log_files():
r = requests.post(f"{BASE}/recall", json={"text": "日志文件有多少"})
data = r.json()
assert data["count"] > 0
assert "960" in data["memories"] or "log" in data["memories"].lower(), \
f"should mention log files, got: {data['memories'][:200]}"
# ── step 5: multi-hop chain test ──────────────────────────────────
def test_multihop_chain():
"""Test if Hebbian chaining connects related memories.
repo-vis → performance bottleneck → n-ary tree optimization
"""
r = requests.post(f"{BASE}/recall", json={
"text": "repo-vis",
"top_k": 3,
"hops": 3,
})
data = r.json()
assert data["count"] > 0
# print chain for inspection
print(f" chain: {data['memories'][:300]}")
# ── step 6: latency with real data ─────────────────────────────────
def test_latency_with_data():
"""Recall latency after loading real data."""
times = []
for q in ["工具", "vllm", "项目", "待办", "性能"]:
r = requests.post(f"{BASE}/recall", json={"text": q})
times.append(r.json()["latency_ms"])
avg = sum(times) / len(times)
print(f" avg latency: {avg:.1f}ms (max: {max(times):.1f}ms)")
assert avg < 50, f"average latency {avg:.1f}ms too high"
# ── main ────────────────────────────────────────────────────────────
def main():
global PASS, FAIL
print("nocmem real-data test")
print(f"server: {BASE}")
print(f"database: {DB_PATH}\n")
# check server
try:
requests.get(f"{BASE}/stats", timeout=3).raise_for_status()
except Exception:
print("ERROR: server not reachable")
sys.exit(1)
# extract
print("── extract ──")
turns = extract_turns()
print(f" extracted {len(turns)} conversation turns")
# ingest
print("\n── ingest (heuristic, no LLM) ──")
t0 = time.monotonic()
ingested = ingest_turns(turns)
elapsed = time.monotonic() - t0
print(f" ingested {ingested} memories from {len(turns)} turns ({elapsed:.1f}s)")
# store key facts
print("\n── store key facts ──")
stored = store_key_facts()
print(f" stored {stored} key facts")
# stats
r = requests.get(f"{BASE}/stats")
stats = r.json()
print(f"\n── memory stats ──")
print(f" memories: {stats['num_memories']}")
print(f" cue entries: {stats['num_cue_entries']} (aug ratio: {stats['augmentation_ratio']:.1f}x)")
print(f" W norm: {stats['w_norm']:.1f}")
# recall tests
print(f"\n── recall accuracy (natural language queries) ──")
test("bot的名字", test_recall_bot_name)
test("可用工具", test_recall_tools)
test("vLLM性能", test_recall_vllm)
test("repo-vis项目", test_recall_repovis)
test("性能瓶颈", test_recall_performance_bottleneck)
test("待办事项", test_recall_todo)
test("VR支持", test_recall_vr)
test("log文件数量", test_recall_log_files)
test("自然中文查询", test_recall_chinese_natural)
test("跨主题召回", test_recall_cross_topic)
print(f"\n── multi-hop chain ──")
test("repo-vis联想链", test_multihop_chain)
print(f"\n── latency ──")
test("平均延迟 < 50ms", test_latency_with_data)
print(f"\n{'='*50}")
total = PASS + FAIL
print(f"PASS: {PASS}/{total} FAIL: {FAIL}/{total}")
if FAIL:
sys.exit(1)
else:
print("All tests passed!")
if __name__ == "__main__":
main()

1796
mem/uv.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -13,6 +13,13 @@ pub struct Config {
pub whisper_url: Option<String>,
#[serde(default)]
pub gitea: Option<GiteaConfig>,
#[serde(default)]
pub nocmem: Option<NocmemConfig>,
}
#[derive(Deserialize, Clone)]
pub struct NocmemConfig {
pub endpoint: String,
}
#[derive(Deserialize, Clone)]

View File

@@ -109,22 +109,42 @@ async fn api_chat(
let inner_state = state.app_state.get_inner_state().await;
let system = build_system_prompt("", &persona, &memory_slots, &inner_state);
let messages = vec![
let mut messages = vec![
system,
serde_json::json!({"role": "user", "content": message}),
];
// auto recall from nocmem
if let Some(ref nocmem) = state.config.nocmem {
let recalled = crate::nocmem::recall(&nocmem.endpoint, &message).await;
if !recalled.is_empty() {
messages.push(serde_json::json!({"role": "system", "content": recalled}));
}
}
let sid = format!("api-{}", chrono::Local::now().timestamp());
let mut output = BufferOutput::new();
info!("api chat: {}", &message[..message.len().min(100)]);
match run_openai_with_tools(
endpoint, model, api_key, messages, &mut output, &state.app_state, &sid, &state.config, 0,
endpoint, model, api_key, messages.clone(), &mut output, &state.app_state, &sid, &state.config, 0,
)
.await
{
Ok(response) => (StatusCode::OK, Json(serde_json::json!({"response": response}))),
Ok(response) => {
// async ingest
if let Some(ref nocmem) = state.config.nocmem {
if !response.is_empty() {
crate::nocmem::ingest_spawn(
nocmem.endpoint.clone(),
message.clone(),
response.clone(),
);
}
}
(StatusCode::OK, Json(serde_json::json!({"response": response})))
}
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("{e:#}")}))),
}
}

View File

@@ -86,6 +86,14 @@ pub async fn life_loop(
// append the agent completion as a new user message
messages.push(serde_json::json!({"role": "user", "content": notification}));
// auto recall from nocmem
if let Some(ref nocmem) = config.nocmem {
let recalled = crate::nocmem::recall(&nocmem.endpoint, &notification).await;
if !recalled.is_empty() {
messages.push(serde_json::json!({"role": "system", "content": recalled}));
}
}
if let BackendConfig::OpenAI { ref endpoint, ref model, ref api_key } = config.backend {
let chat_id_tg = ChatId(cid);
let sid = format!("agent-{id}");

View File

@@ -3,6 +3,7 @@ mod display;
mod gitea;
mod http;
mod life;
mod nocmem;
mod output;
mod state;
mod stream;
@@ -388,6 +389,14 @@ async fn handle_inner(
let user_content = build_user_content(&prompt, &scratch, &uploaded);
api_messages.push(serde_json::json!({"role": "user", "content": user_content}));
// auto recall from nocmem
if let Some(ref nocmem) = config.nocmem {
let recalled = nocmem::recall(&nocmem.endpoint, &prompt).await;
if !recalled.is_empty() {
api_messages.push(serde_json::json!({"role": "system", "content": recalled}));
}
}
let mut tg_output = TelegramOutput::new(bot.clone(), chat_id, is_private);
match run_openai_with_tools(
@@ -399,6 +408,15 @@ async fn handle_inner(
state.push_message(&sid, "user", &prompt).await;
if !response.is_empty() {
state.push_message(&sid, "assistant", &response).await;
// async ingest to nocmem (fire-and-forget)
if let Some(ref nocmem) = config.nocmem {
nocmem::ingest_spawn(
nocmem.endpoint.clone(),
prompt.clone(),
response.clone(),
);
}
}
// sliding window

69
src/nocmem.rs Normal file
View File

@@ -0,0 +1,69 @@
//! nocmem client — auto-recall and async ingest via HTTP.
use tracing::{info, warn};
/// Recall relevant memories for the given text.
/// Returns formatted memory string, or empty if none found / error / not configured.
pub async fn recall(endpoint: &str, text: &str) -> String {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_millis(500))
.build()
.unwrap();
let url = format!("{}/recall", endpoint.trim_end_matches('/'));
match client
.post(&url)
.json(&serde_json::json!({"text": text, "top_k": 3, "hops": 2}))
.send()
.await
{
Ok(resp) => {
if let Ok(json) = resp.json::<serde_json::Value>().await {
let count = json["count"].as_i64().unwrap_or(0);
let memories = json["memories"].as_str().unwrap_or("");
if count > 0 && !memories.is_empty() {
let latency = json["latency_ms"].as_f64().unwrap_or(0.0);
info!("nocmem recall: {count} memories, {latency:.1}ms");
return memories.to_string();
}
}
}
Err(e) => {
warn!("nocmem recall failed: {e:#}");
}
}
String::new()
}
/// Fire-and-forget ingest of a conversation turn.
pub fn ingest_spawn(endpoint: String, user_msg: String, assistant_msg: String) {
tokio::spawn(async move {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.build()
.unwrap();
let url = format!("{}/ingest", endpoint.trim_end_matches('/'));
match client
.post(&url)
.json(&serde_json::json!({
"user_msg": user_msg,
"assistant_msg": assistant_msg,
}))
.send()
.await
{
Ok(resp) => {
if let Ok(json) = resp.json::<serde_json::Value>().await {
let stored = json["stored"].as_i64().unwrap_or(0);
if stored > 0 {
info!("nocmem ingest: stored {stored} memories");
}
}
}
Err(e) => {
warn!("nocmem ingest failed: {e:#}");
}
}
});
}