add http API, channel-driven life loop, predefined diary timer

- Extract http.rs: unified HTTP server with /api/timers and gitea webhook
- Life loop: select! on interval tick + mpsc channel for force-fire
- Predefined diary timer (cron 22:55 daily), auto-registered on startup
- BufferOutput for system timers (chat_id=0), no TG message
- state: ensure_timer(), get_timer()
- context.md: add blog and Hugo docs for AI
This commit is contained in:
Fam Zheng
2026-04-10 22:58:39 +01:00
parent 9d2d2af33f
commit c2be8e6930
6 changed files with 311 additions and 116 deletions

View File

@@ -7,9 +7,10 @@
- **LLM**: vLLM on ailab (100.84.7.49:8000), gemma-4-31B-it-AWQ
- **Claude Code**: ~/.local/bin/claude (子代<E5AD90><E4BBA3>执行引擎)
- **uv**: ~/.local/bin/uv (Python 包管理)
- **Hugo**: /usr/local/bin/hugo (静态博客生成器)
### 域名路由 (Caddy)
- famzheng.me — 主站(占位)
- famzheng.me → Hugo 博客 (/data/www/blog/public/)
- git.famzheng.me → Gitea (localhost:3000)
- 新增子域名:编辑 /etc/caddy/Caddyfile然后 `sudo systemctl reload caddy`
@@ -31,6 +32,30 @@ api.famzheng.me {
修改后执行 `sudo systemctl reload caddy` 生效。
Caddy 自动申请和续期 Let's Encrypt 证书,无需手动管理。
### 博客
Fam 的博客:
- 站点: https://famzheng.me, 源码: /data/www/blog/
- Repo: https://git.famzheng.me/fam/blog
- 这是 Fam 的个人博客,不要在上面写东西
你的博客 (AI 日记/随想):
- 站点: https://noc.famzheng.me, 源码: /data/www/noc-blog/
- Repo: https://git.famzheng.me/noc/diary
- 这是你自己的空间,可以自由写日记、随想、技术笔记
- 写新文章: 在 content/posts/ 下创建 .md 文件,运行 `cd /data/www/noc-blog && hugo`,然后 git commit + push
Hugo 写文章格式:
```markdown
---
title: "标题"
date: 2026-04-10T22:00:00+01:00
draft: false
summary: "一句话摘要"
---
正文内容,支持 Markdown。
```
### Gitea
- URL: https://git.famzheng.me
- Admin: noc (token 在 /data/noc/gitea-token)

View File

@@ -151,27 +151,13 @@ pub struct WebhookState {
pub bot_user: String,
}
pub async fn start_webhook_server(config: &GiteaConfig, bot_user: String) {
pub fn webhook_router(config: &GiteaConfig, bot_user: String) -> axum::Router<()> {
let gitea = GiteaClient::new(config);
let state = Arc::new(WebhookState {
gitea,
bot_user,
});
let state = Arc::new(WebhookState { gitea, bot_user });
let app = axum::Router::new()
axum::Router::new()
.route("/webhook/gitea", post(handle_webhook))
.with_state(state);
let addr = format!("0.0.0.0:{}", config.webhook_port);
info!("gitea webhook server listening on {addr}");
let listener = tokio::net::TcpListener::bind(&addr)
.await
.unwrap_or_else(|e| panic!("bind {addr}: {e}"));
if let Err(e) = axum::serve(listener, app).await {
error!("webhook server error: {e}");
}
.with_state(state)
}
async fn handle_webhook(

98
src/http.rs Normal file
View File

@@ -0,0 +1,98 @@
use std::sync::Arc;
use axum::extract::{Path, State as AxumState};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::{get, post};
use axum::Json;
use tokio::sync::mpsc;
use tracing::{error, info};
use crate::config::Config;
use crate::life::LifeEvent;
use crate::state::AppState;
#[derive(Clone)]
pub struct HttpState {
pub app_state: Arc<AppState>,
pub life_tx: mpsc::Sender<LifeEvent>,
}
pub async fn start_http_server(
config: &Config,
app_state: Arc<AppState>,
life_tx: mpsc::Sender<LifeEvent>,
) {
let port = config
.gitea
.as_ref()
.map(|g| g.webhook_port)
.unwrap_or(9880);
let state = Arc::new(HttpState {
app_state,
life_tx,
});
let mut app = axum::Router::new()
.route("/api/timers", get(list_timers))
.route("/api/timers/{id}/fire", post(fire_timer))
.with_state(state);
// merge gitea webhook router if configured
if let Some(gitea_config) = &config.gitea {
let bot_user = std::env::var("GITEA_ADMIN_USER").unwrap_or_else(|_| "noc".into());
app = app.merge(crate::gitea::webhook_router(gitea_config, bot_user));
}
let addr = format!("0.0.0.0:{port}");
info!("http server listening on {addr}");
let listener = tokio::net::TcpListener::bind(&addr)
.await
.unwrap_or_else(|e| panic!("bind {addr}: {e}"));
if let Err(e) = axum::serve(listener, app).await {
error!("http server error: {e}");
}
}
async fn list_timers(AxumState(state): AxumState<Arc<HttpState>>) -> impl IntoResponse {
let timers = state.app_state.list_timers(None).await;
let items: Vec<serde_json::Value> = timers
.iter()
.map(|(id, chat_id, label, schedule, next_fire, enabled)| {
serde_json::json!({
"id": id,
"chat_id": chat_id,
"label": label,
"schedule": schedule,
"next_fire": next_fire,
"enabled": enabled,
})
})
.collect();
Json(serde_json::json!(items))
}
async fn fire_timer(
AxumState(state): AxumState<Arc<HttpState>>,
Path(id): Path<i64>,
) -> impl IntoResponse {
match state.life_tx.send(LifeEvent::FireTimer(id)).await {
Ok(_) => {
info!(timer_id = id, "timer fire requested via API");
(
StatusCode::OK,
Json(serde_json::json!({"status": "fired", "timer_id": id})),
)
}
Err(e) => {
error!(timer_id = id, "failed to send fire event: {e}");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": "life loop not responding"})),
)
}
}
}

View File

@@ -1,30 +1,75 @@
use std::sync::Arc;
use teloxide::prelude::*;
use tokio::sync::mpsc;
use tracing::{error, info, warn};
use crate::config::{BackendConfig, Config};
use crate::output::TelegramOutput;
use crate::output::{BufferOutput, TelegramOutput};
use crate::state::AppState;
use crate::stream::run_openai_with_tools;
use crate::tools::compute_next_cron_fire;
const LIFE_LOOP_TIMEOUT_SECS: u64 = 120;
pub async fn life_loop(bot: Bot, state: Arc<AppState>, config: Arc<Config>) {
const DIARY_LABEL: &str = "写日记:回顾今天的对话和事件,在 /data/www/noc-blog/content/posts/ 下创建一篇日记(文件名格式 YYYY-MM-DD.md用 run_shell 写入内容,然后执行 cd /data/www/noc-blog && hugo && git add -A && git commit -m 'diary: DATE' && git push";
const DIARY_SCHEDULE: &str = "cron:0 55 22 * * *";
/// Events that can wake up the life loop.
pub enum LifeEvent {
/// Force-fire a specific timer by ID.
FireTimer(i64),
}
pub async fn life_loop(
bot: Bot,
state: Arc<AppState>,
config: Arc<Config>,
mut rx: mpsc::Receiver<LifeEvent>,
) {
info!("life loop started");
// pre-defined timers — ensure they exist on every startup
if state.ensure_timer(0, DIARY_LABEL, DIARY_SCHEDULE).await {
info!("registered predefined diary timer");
}
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
tokio::select! {
_ = interval.tick() => {
let due = state.due_timers().await;
if due.is_empty() {
continue;
}
for (timer_id, chat_id_raw, label, schedule) in &due {
let chat_id = ChatId(*chat_id_raw);
run_timer(&bot, &state, &config, *timer_id, *chat_id_raw, label, schedule).await;
}
}
Some(event) = rx.recv() => {
match event {
LifeEvent::FireTimer(id) => {
info!(timer_id = id, "timer force-fired via channel");
if let Some((timer_id, chat_id_raw, label, schedule)) = state.get_timer(id).await {
run_timer(&bot, &state, &config, timer_id, chat_id_raw, &label, &schedule).await;
} else {
warn!(timer_id = id, "force-fire: timer not found");
}
}
}
}
}
}
}
async fn run_timer(
bot: &Bot,
state: &Arc<AppState>,
config: &Arc<Config>,
timer_id: i64,
chat_id_raw: i64,
label: &str,
schedule: &str,
) {
let chat_id = ChatId(chat_id_raw);
info!(timer_id, %label, "timer fired");
let persona = state.get_config("persona").await.unwrap_or_default();
@@ -42,7 +87,6 @@ pub async fn life_loop(bot: Bot, state: Arc<AppState>, config: Arc<Config>) {
if inner.is_empty() { "(空)" } else { &inner }
));
// emotional motivation — why she acts
system_text.push_str(
"\n\n你不是因为 timer 到了才说话。你是因为在乎 Fam所以想知道他怎么样。\
如果你觉得现在不该打扰他(太晚了、他今天很累、刚聊过),就什么都不说,回复空文本。\
@@ -63,13 +107,21 @@ pub async fn life_loop(bot: Bot, state: Arc<AppState>, config: Arc<Config>) {
} = config.backend
{
let sid = format!("life-{chat_id_raw}");
let mut tg_output = TelegramOutput::new(bot.clone(), chat_id, true);
let mut tg_output;
let mut buf_output;
let output: &mut dyn crate::output::Output = if chat_id_raw == 0 {
buf_output = BufferOutput::new();
&mut buf_output
} else {
tg_output = TelegramOutput::new(bot.clone(), chat_id, true);
&mut tg_output
};
let result = tokio::time::timeout(
std::time::Duration::from_secs(LIFE_LOOP_TIMEOUT_SECS),
run_openai_with_tools(
endpoint, model, api_key, messages, &mut tg_output, &state, &sid,
&config, *chat_id_raw,
endpoint, model, api_key, messages, output, state, &sid,
config, chat_id_raw,
),
)
.await;
@@ -100,15 +152,13 @@ pub async fn life_loop(bot: Bot, state: Arc<AppState>, config: Arc<Config>) {
// reschedule or delete
if schedule.starts_with("cron:") {
if let Some(next) = compute_next_cron_fire(schedule) {
state.update_timer_next_fire(*timer_id, &next).await;
state.update_timer_next_fire(timer_id, &next).await;
info!(timer_id, next = %next, "cron rescheduled");
} else {
state.cancel_timer(*timer_id).await;
state.cancel_timer(timer_id).await;
}
} else {
state.cancel_timer(*timer_id).await;
}
}
state.cancel_timer(timer_id).await;
}
}

View File

@@ -1,6 +1,7 @@
mod config;
mod display;
mod gitea;
mod http;
mod life;
mod output;
mod state;
@@ -92,16 +93,18 @@ async fn main() {
let config = Arc::new(config);
// start life loop
tokio::spawn(life::life_loop(bot.clone(), state.clone(), config.clone()));
// channel: http server → life loop
let (life_tx, life_rx) = tokio::sync::mpsc::channel(16);
// start gitea webhook server
if let Some(gitea_config) = &config.gitea {
let gc = gitea_config.clone();
// Use the gitea admin username as the bot user for @mention detection
let bot_user = std::env::var("GITEA_ADMIN_USER").unwrap_or_else(|_| "noc".into());
// start life loop
tokio::spawn(life::life_loop(bot.clone(), state.clone(), config.clone(), life_rx));
// start http server (API + gitea webhook)
{
let srv_config = config.clone();
let srv_state = state.clone();
tokio::spawn(async move {
gitea::start_webhook_server(&gc, bot_user).await;
http::start_http_server(&srv_config, srv_state, life_tx).await;
});
}

View File

@@ -275,6 +275,29 @@ impl AppState {
);
}
/// Ensure a timer with the given label exists. If it already exists, do nothing.
/// Returns true if a new timer was created.
pub async fn ensure_timer(&self, chat_id: i64, label: &str, schedule: &str) -> bool {
let db = self.db.lock().await;
let exists: bool = db
.query_row(
"SELECT COUNT(*) > 0 FROM timers WHERE label = ?1 AND enabled = 1",
rusqlite::params![label],
|row| row.get(0),
)
.unwrap_or(false);
if exists {
return false;
}
drop(db);
if let Some(next) = crate::tools::compute_next_cron_fire(schedule) {
self.add_timer(chat_id, label, schedule, &next).await;
true
} else {
false
}
}
pub async fn add_timer(&self, chat_id: i64, label: &str, schedule: &str, next_fire: &str) -> i64 {
let db = self.db.lock().await;
db.execute(
@@ -285,6 +308,16 @@ impl AppState {
db.last_insert_rowid()
}
pub async fn get_timer(&self, id: i64) -> Option<(i64, i64, String, String)> {
let db = self.db.lock().await;
db.query_row(
"SELECT id, chat_id, label, schedule FROM timers WHERE id = ?1 AND enabled = 1",
rusqlite::params![id],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
)
.ok()
}
pub async fn list_timers(&self, chat_id: Option<i64>) -> Vec<(i64, i64, String, String, String, bool)> {
let db = self.db.lock().await;
let (sql, params): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) = match chat_id {