Files
tori/src/db.rs
Fam Zheng 28a00dd2f3 feat: configurable OAuth (Google + TikTok SSO), project membership, inline file preview
- Auth: configurable OAuthProvider enum supporting Google OAuth and TikTok SSO
- Auth: /auth/provider endpoint for frontend to detect active provider
- Auth: user role system (admin via ADMIN_USERS env var sees all projects)
- Projects: project_members many-to-many table with role (owner/member)
- Projects: membership-based access control, auto-add creator as owner
- Projects: member management API (list/add/remove)
- Files: remove Content-Disposition attachment header, let browser decide
- Health: public /tori/api/health endpoint for k8s probes
2026-03-17 03:42:38 +00:00

408 lines
13 KiB
Rust

use serde::{Deserialize, Serialize};
use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
#[derive(Clone)]
pub struct Database {
pub pool: SqlitePool,
}
impl Database {
pub async fn new(path: &str) -> anyhow::Result<Self> {
let url = format!("sqlite:{}?mode=rwc", path);
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect(&url)
.await?;
Ok(Self { pool })
}
pub async fn migrate(&self) -> anyhow::Result<()> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
)"
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS workflows (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL REFERENCES projects(id),
requirement TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)"
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS comments (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL REFERENCES workflows(id),
content TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)"
)
.execute(&self.pool)
.await?;
// Migration: add report column to workflows
let _ = sqlx::query(
"ALTER TABLE workflows ADD COLUMN report TEXT NOT NULL DEFAULT ''"
)
.execute(&self.pool)
.await;
// Migration: add template_id column to workflows
let _ = sqlx::query(
"ALTER TABLE workflows ADD COLUMN template_id TEXT NOT NULL DEFAULT ''"
)
.execute(&self.pool)
.await;
// Migration: add deleted column to projects
let _ = sqlx::query(
"ALTER TABLE projects ADD COLUMN deleted INTEGER NOT NULL DEFAULT 0"
)
.execute(&self.pool)
.await;
// Migration: add owner_id column to projects
let _ = sqlx::query(
"ALTER TABLE projects ADD COLUMN owner_id TEXT NOT NULL DEFAULT ''"
)
.execute(&self.pool)
.await;
// KB tables
sqlx::query(
"CREATE TABLE IF NOT EXISTS kb_articles (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
content TEXT NOT NULL DEFAULT '',
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
)"
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS kb_chunks (
id TEXT PRIMARY KEY,
article_id TEXT NOT NULL,
title TEXT NOT NULL DEFAULT '',
content TEXT NOT NULL,
embedding BLOB NOT NULL
)"
)
.execute(&self.pool)
.await?;
// Migration: add article_id to kb_chunks if missing
let _ = sqlx::query(
"ALTER TABLE kb_chunks ADD COLUMN article_id TEXT NOT NULL DEFAULT ''"
)
.execute(&self.pool)
.await;
// Migrate old kb_content to kb_articles
let has_old_table: bool = sqlx::query_scalar(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='kb_content'"
)
.fetch_one(&self.pool)
.await
.unwrap_or(false);
if has_old_table {
let old_content: Option<String> = sqlx::query_scalar(
"SELECT content FROM kb_content WHERE id = 1"
)
.fetch_optional(&self.pool)
.await
.unwrap_or(None);
if let Some(content) = old_content {
if !content.is_empty() {
let id = uuid::Uuid::new_v4().to_string();
let _ = sqlx::query(
"INSERT OR IGNORE INTO kb_articles (id, title, content) VALUES (?, '导入的知识库', ?)"
)
.bind(&id)
.bind(&content)
.execute(&self.pool)
.await;
}
}
let _ = sqlx::query("DROP TABLE kb_content")
.execute(&self.pool)
.await;
}
// New tables: agent_state_snapshots + execution_log
sqlx::query(
"CREATE TABLE IF NOT EXISTS agent_state_snapshots (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL REFERENCES workflows(id),
step_order INTEGER NOT NULL,
state_json TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)"
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS execution_log (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL REFERENCES workflows(id),
step_order INTEGER NOT NULL,
tool_name TEXT NOT NULL,
tool_input TEXT NOT NULL DEFAULT '',
output TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'running',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)"
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS llm_call_log (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL REFERENCES workflows(id),
step_order INTEGER NOT NULL,
phase TEXT NOT NULL,
messages_count INTEGER NOT NULL,
tools_count INTEGER NOT NULL,
tool_calls TEXT NOT NULL DEFAULT '[]',
text_response TEXT NOT NULL DEFAULT '',
prompt_tokens INTEGER,
completion_tokens INTEGER,
latency_ms INTEGER NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)"
)
.execute(&self.pool)
.await?;
// Migration: add text_response column to llm_call_log
let _ = sqlx::query(
"ALTER TABLE llm_call_log ADD COLUMN text_response TEXT NOT NULL DEFAULT ''"
)
.execute(&self.pool)
.await;
sqlx::query(
"CREATE TABLE IF NOT EXISTS timers (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL REFERENCES projects(id),
name TEXT NOT NULL,
interval_secs INTEGER NOT NULL,
requirement TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
last_run_at TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)"
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)"
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
email TEXT NOT NULL UNIQUE,
name TEXT NOT NULL DEFAULT '',
picture TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
last_login_at TEXT NOT NULL DEFAULT (datetime('now'))
)"
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS step_artifacts (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL,
step_order INTEGER NOT NULL,
name TEXT NOT NULL,
path TEXT NOT NULL,
artifact_type TEXT NOT NULL DEFAULT 'file',
description TEXT NOT NULL DEFAULT ''
)"
)
.execute(&self.pool)
.await?;
// Migration: add role column to users (admin = see all projects)
let _ = sqlx::query(
"ALTER TABLE users ADD COLUMN role TEXT NOT NULL DEFAULT 'user'"
)
.execute(&self.pool)
.await;
sqlx::query(
"CREATE TABLE IF NOT EXISTS project_members (
project_id TEXT NOT NULL REFERENCES projects(id),
user_id TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'owner',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
PRIMARY KEY (project_id, user_id)
)"
)
.execute(&self.pool)
.await?;
// Migration: assign all existing memberless projects to the first user (or leave for manual assignment)
// When auth is not configured, owner_id is empty — these projects are visible to everyone
// When a user logs in and creates projects, they get auto-added as admin
{
// Find existing projects with owner_id set but no members yet
let owned: Vec<(String, String)> = sqlx::query_as(
"SELECT p.id, p.owner_id FROM projects p \
WHERE p.deleted = 0 AND p.owner_id != '' \
AND NOT EXISTS (SELECT 1 FROM project_members pm WHERE pm.project_id = p.id)"
)
.fetch_all(&self.pool)
.await
.unwrap_or_default();
for (pid, uid) in owned {
let _ = sqlx::query(
"INSERT OR IGNORE INTO project_members (project_id, user_id, role) VALUES (?, ?, 'owner')"
)
.bind(&pid)
.bind(&uid)
.execute(&self.pool)
.await;
}
// For orphan projects (no owner, no members), assign to first user if one exists
let first_user: Option<(String,)> = sqlx::query_as(
"SELECT id FROM users ORDER BY created_at ASC LIMIT 1"
)
.fetch_optional(&self.pool)
.await
.unwrap_or(None);
if let Some((first_uid,)) = first_user {
let orphans: Vec<(String,)> = sqlx::query_as(
"SELECT p.id FROM projects p \
WHERE p.deleted = 0 \
AND NOT EXISTS (SELECT 1 FROM project_members pm WHERE pm.project_id = p.id)"
)
.fetch_all(&self.pool)
.await
.unwrap_or_default();
for (pid,) in orphans {
let _ = sqlx::query(
"INSERT OR IGNORE INTO project_members (project_id, user_id, role) VALUES (?, ?, 'owner')"
)
.bind(&pid)
.bind(&first_uid)
.execute(&self.pool)
.await;
}
}
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Project {
pub id: String,
pub name: String,
pub description: String,
pub created_at: String,
pub updated_at: String,
#[serde(default)]
pub deleted: bool,
#[serde(default)]
pub owner_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Workflow {
pub id: String,
pub project_id: String,
pub requirement: String,
pub status: String,
pub created_at: String,
pub report: String,
pub template_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct ExecutionLogEntry {
pub id: String,
pub workflow_id: String,
pub step_order: i32,
pub tool_name: String,
pub tool_input: String,
pub output: String,
pub status: String,
pub created_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Comment {
pub id: String,
pub workflow_id: String,
pub content: String,
pub created_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct KbArticle {
pub id: String,
pub title: String,
pub content: String,
pub updated_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Timer {
pub id: String,
pub project_id: String,
pub name: String,
pub interval_secs: i64,
pub requirement: String,
pub enabled: bool,
pub last_run_at: String,
pub created_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct LlmCallLogEntry {
pub id: String,
pub workflow_id: String,
pub step_order: i32,
pub phase: String,
pub messages_count: i32,
pub tools_count: i32,
pub tool_calls: String,
pub text_response: String,
pub prompt_tokens: Option<i32>,
pub completion_tokens: Option<i32>,
pub latency_ms: i32,
pub created_at: String,
}