Files
oil-formula-calculator/backend/database.py
Hera Zhao 4dfe1b7bb4
Some checks failed
PR Preview / teardown-preview (pull_request) Has been skipped
Test / unit-test (push) Successful in 5s
Test / build-check (push) Successful in 3s
PR Preview / test (pull_request) Successful in 5s
Test / e2e-test (push) Has been cancelled
PR Preview / deploy-preview (pull_request) Successful in 13s
fix: 单买成本移到全精油后面;配方名末尾阿拉伯数字转中文数字
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 23:24:43 +00:00

359 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sqlite3
import json
import os
import secrets
DB_PATH = os.environ.get("DB_PATH", "/data/oil_calculator.db")
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_db():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = get_db()
c = conn.cursor()
c.executescript("""
CREATE TABLE IF NOT EXISTS oils (
name TEXT PRIMARY KEY,
bottle_price REAL NOT NULL,
drop_count INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS recipes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
note TEXT DEFAULT '',
sort_order INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS recipe_ingredients (
id INTEGER PRIMARY KEY AUTOINCREMENT,
recipe_id INTEGER NOT NULL REFERENCES recipes(id) ON DELETE CASCADE,
oil_name TEXT NOT NULL,
drops REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS tags (
name TEXT PRIMARY KEY
);
CREATE TABLE IF NOT EXISTS recipe_tags (
recipe_id INTEGER NOT NULL REFERENCES recipes(id) ON DELETE CASCADE,
tag_name TEXT NOT NULL,
PRIMARY KEY (recipe_id, tag_name)
);
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
token TEXT UNIQUE NOT NULL,
role TEXT NOT NULL DEFAULT 'viewer',
display_name TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
action TEXT NOT NULL,
target_type TEXT,
target_id TEXT,
target_name TEXT,
detail TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS user_inventory (
user_id INTEGER NOT NULL,
oil_name TEXT NOT NULL,
PRIMARY KEY (user_id, oil_name)
);
CREATE TABLE IF NOT EXISTS profit_projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
ingredients TEXT NOT NULL DEFAULT '[]',
pricing REAL DEFAULT 0,
note TEXT DEFAULT '',
created_by INTEGER,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS user_diary (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
source_recipe_id INTEGER,
name TEXT NOT NULL,
ingredients TEXT NOT NULL DEFAULT '[]',
note TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS diary_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
diary_id INTEGER NOT NULL REFERENCES user_diary(id) ON DELETE CASCADE,
content TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS user_favorites (
user_id INTEGER NOT NULL,
recipe_id INTEGER NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (user_id, recipe_id)
);
CREATE TABLE IF NOT EXISTS search_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
query TEXT NOT NULL,
matched_count INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
target_role TEXT NOT NULL DEFAULT 'admin',
title TEXT NOT NULL,
body TEXT,
is_read INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS category_modules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
subtitle TEXT DEFAULT '',
icon TEXT DEFAULT '🌿',
bg_image TEXT DEFAULT '',
color_from TEXT DEFAULT '#7a9e7e',
color_to TEXT DEFAULT '#5a7d5e',
tag_name TEXT NOT NULL,
sort_order INTEGER DEFAULT 0
);
""")
# Migration: add password and brand fields to users if missing
user_cols = [row[1] for row in c.execute("PRAGMA table_info(users)").fetchall()]
if "password" not in user_cols:
c.execute("ALTER TABLE users ADD COLUMN password TEXT")
# Create bug_reports table
c.execute("""CREATE TABLE IF NOT EXISTS bug_reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
content TEXT NOT NULL,
is_resolved INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now'))
)""")
# Migration: add priority to bug_reports
bug_cols = [row[1] for row in c.execute("PRAGMA table_info(bug_reports)").fetchall()]
if bug_cols and "priority" not in bug_cols:
c.execute("ALTER TABLE bug_reports ADD COLUMN priority INTEGER DEFAULT 2")
# Create bug_comments table for activity log
c.execute("""CREATE TABLE IF NOT EXISTS bug_comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
bug_id INTEGER NOT NULL REFERENCES bug_reports(id) ON DELETE CASCADE,
user_id INTEGER,
action TEXT NOT NULL,
content TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now'))
)""")
if "qr_code" not in user_cols:
c.execute("ALTER TABLE users ADD COLUMN qr_code TEXT")
if "brand_logo" not in user_cols:
c.execute("ALTER TABLE users ADD COLUMN brand_logo TEXT")
if "brand_name" not in user_cols:
c.execute("ALTER TABLE users ADD COLUMN brand_name TEXT")
if "brand_bg" not in user_cols:
c.execute("ALTER TABLE users ADD COLUMN brand_bg TEXT")
if "brand_align" not in user_cols:
c.execute("ALTER TABLE users ADD COLUMN brand_align TEXT DEFAULT 'center'")
if "role_changed_at" not in user_cols:
c.execute("ALTER TABLE users ADD COLUMN role_changed_at TEXT")
if "username_changed" not in user_cols:
c.execute("ALTER TABLE users ADD COLUMN username_changed INTEGER DEFAULT 0")
# Migration: add tags to user_diary
diary_cols = [row[1] for row in c.execute("PRAGMA table_info(user_diary)").fetchall()]
if diary_cols and "tags" not in diary_cols:
c.execute("ALTER TABLE user_diary ADD COLUMN tags TEXT DEFAULT '[]'")
# Migration: business verification
if "business_verified" not in user_cols:
c.execute("ALTER TABLE users ADD COLUMN business_verified INTEGER DEFAULT 0")
c.execute("""CREATE TABLE IF NOT EXISTS business_applications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
business_name TEXT,
document TEXT,
status TEXT DEFAULT 'pending',
reject_reason TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now')),
reviewed_at TEXT
)""")
# Translation suggestions table
c.execute("""CREATE TABLE IF NOT EXISTS translation_suggestions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
recipe_id INTEGER,
recipe_name TEXT NOT NULL,
suggested_en TEXT NOT NULL,
user_id INTEGER,
status TEXT DEFAULT 'pending',
created_at TEXT DEFAULT (datetime('now'))
)""")
# Migration: add reject_reason to business_applications if missing
biz_cols = [row[1] for row in c.execute("PRAGMA table_info(business_applications)").fetchall()]
if biz_cols and "reject_reason" not in biz_cols:
c.execute("ALTER TABLE business_applications ADD COLUMN reject_reason TEXT DEFAULT ''")
# Migration: add target_user_id to notifications for user-specific notifications
notif_cols = [row[1] for row in c.execute("PRAGMA table_info(notifications)").fetchall()]
if "target_user_id" not in notif_cols:
c.execute("ALTER TABLE notifications ADD COLUMN target_user_id INTEGER")
# Migration: add assigned_to to bug_reports
bug_cols2 = [row[1] for row in c.execute("PRAGMA table_info(bug_reports)").fetchall()]
if "assigned_to" not in bug_cols2:
c.execute("ALTER TABLE bug_reports ADD COLUMN assigned_to INTEGER")
# Migration: add version to recipes for optimistic locking
recipe_cols = [row[1] for row in c.execute("PRAGMA table_info(recipes)").fetchall()]
if "version" not in recipe_cols:
c.execute("ALTER TABLE recipes ADD COLUMN version INTEGER DEFAULT 1")
# Migration: add retail_price and is_active to oils if missing
oil_cols = [row[1] for row in c.execute("PRAGMA table_info(oils)").fetchall()]
if "retail_price" not in oil_cols:
c.execute("ALTER TABLE oils ADD COLUMN retail_price REAL")
if "is_active" not in oil_cols:
c.execute("ALTER TABLE oils ADD COLUMN is_active INTEGER DEFAULT 1")
if "en_name" not in oil_cols:
c.execute("ALTER TABLE oils ADD COLUMN en_name TEXT DEFAULT ''")
if "unit" not in oil_cols:
c.execute("ALTER TABLE oils ADD COLUMN unit TEXT DEFAULT 'drop'")
# Migration: add new columns to category_modules if missing
cat_cols = [row[1] for row in c.execute("PRAGMA table_info(category_modules)").fetchall()]
if cat_cols and "bg_image" not in cat_cols:
c.execute("DROP TABLE category_modules")
c.execute("""CREATE TABLE category_modules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL, subtitle TEXT DEFAULT '', icon TEXT DEFAULT '🌿',
bg_image TEXT DEFAULT '', color_from TEXT DEFAULT '#7a9e7e', color_to TEXT DEFAULT '#5a7d5e',
tag_name TEXT NOT NULL, sort_order INTEGER DEFAULT 0)""")
# Migration: add owner_id to recipes if missing
cols = [row[1] for row in c.execute("PRAGMA table_info(recipes)").fetchall()]
if "owner_id" not in cols:
c.execute("ALTER TABLE recipes ADD COLUMN owner_id INTEGER")
if "updated_by" not in cols:
c.execute("ALTER TABLE recipes ADD COLUMN updated_by INTEGER")
if "en_name" not in cols:
c.execute("ALTER TABLE recipes ADD COLUMN en_name TEXT DEFAULT ''")
if "volume" not in cols:
c.execute("ALTER TABLE recipes ADD COLUMN volume TEXT DEFAULT ''")
# Migration: rename oils 西洋蓍草→西洋蓍草石榴籽, 元气→元气焕能
_oil_renames = [("元气", "元气焕能")]
for old_name, new_name in _oil_renames:
old_exists = c.execute("SELECT 1 FROM oils WHERE name = ?", (old_name,)).fetchone()
new_exists = c.execute("SELECT 1 FROM oils WHERE name = ?", (new_name,)).fetchone()
if old_exists and new_exists:
# Both exist: delete old, update recipe references to new
c.execute("DELETE FROM oils WHERE name = ?", (old_name,))
c.execute("UPDATE recipe_ingredients SET oil_name = ? WHERE oil_name = ?", (new_name, old_name))
elif old_exists:
c.execute("UPDATE oils SET name = ? WHERE name = ?", (new_name, old_name))
c.execute("UPDATE recipe_ingredients SET oil_name = ? WHERE oil_name = ?", (new_name, old_name))
# Migration: clean up recipe names — remove leading numbers, normalize 细胞律动 format
_recipe_renames = {
"2、神经系统细胞律动": "细胞律动-神经系统",
"3、消化系统细胞律动": "细胞律动-消化系统",
"4、骨骼系统细胞律动炎症控制": "细胞律动-骨骼系统(炎症控制)",
"5、淋巴系统细胞律动": "细胞律动-淋巴系统",
"6、生殖系统细胞律动": "细胞律动-生殖系统",
"7、免疫系统细胞律动": "细胞律动-免疫系统",
"8、循环系统细胞律动": "细胞律动-循环系统",
"9、内分泌系统细胞律动": "细胞律动-内分泌系统",
"12、芳香调理技术": "芳香调理技术",
}
for old_name, new_name in _recipe_renames.items():
c.execute("UPDATE recipes SET name = ? WHERE name = ?", (new_name, old_name))
# Migration: trailing Arabic numerals → Chinese numerals in recipe names
_num_map = {'1': '', '2': '', '3': '', '4': '', '5': '', '6': '', '7': '', '8': '', '9': ''}
_trailing_num_recipes = c.execute("SELECT id, name FROM recipes").fetchall()
for row in _trailing_num_recipes:
import re as _re
m = _re.search(r'(\d+)$', row['name'])
if m:
digits = m.group(1)
chinese = ''.join(_num_map.get(d, d) for d in digits)
new_name = row['name'][:m.start()] + chinese
c.execute("UPDATE recipes SET name = ? WHERE id = ?", (new_name, row['id']))
# Seed admin user if no users exist
count = c.execute("SELECT COUNT(*) FROM users").fetchone()[0]
if count == 0:
admin_token = os.environ.get("ADMIN_TOKEN", secrets.token_hex(24))
c.execute(
"INSERT INTO users (username, token, role, display_name) VALUES (?, ?, ?, ?)",
("hera", admin_token, "admin", "Hera"),
)
admin_id = c.lastrowid
# Assign all existing recipes to admin
c.execute("UPDATE recipes SET owner_id = ? WHERE owner_id IS NULL", (admin_id,))
print(f"[INIT] Admin user created. Token: {admin_token}")
conn.commit()
conn.close()
def log_audit(conn, user_id, action, target_type=None, target_id=None, target_name=None, detail=None):
conn.execute(
"INSERT INTO audit_log (user_id, action, target_type, target_id, target_name, detail) "
"VALUES (?, ?, ?, ?, ?, ?)",
(user_id, action, target_type, str(target_id) if target_id else None, target_name, detail),
)
def seed_defaults(default_oils_meta: dict, default_recipes: list):
"""Seed DB with defaults if empty."""
conn = get_db()
c = conn.cursor()
# Seed oils
count = c.execute("SELECT COUNT(*) FROM oils").fetchone()[0]
if count == 0:
for name, meta in default_oils_meta.items():
c.execute(
"INSERT OR IGNORE INTO oils (name, bottle_price, drop_count) VALUES (?, ?, ?)",
(name, meta["bottlePrice"], meta["dropCount"]),
)
# Seed recipes
count = c.execute("SELECT COUNT(*) FROM recipes").fetchone()[0]
if count == 0:
# Get admin user id for ownership
admin = c.execute("SELECT id FROM users WHERE role = 'admin' LIMIT 1").fetchone()
admin_id = admin[0] if admin else None
for r in default_recipes:
c.execute(
"INSERT INTO recipes (name, note, owner_id) VALUES (?, ?, ?)",
(r["name"], r.get("note", ""), admin_id),
)
rid = c.lastrowid
for ing in r["ingredients"]:
c.execute(
"INSERT INTO recipe_ingredients (recipe_id, oil_name, drops) VALUES (?, ?, ?)",
(rid, ing["oil"], ing["drops"]),
)
for tag in r.get("tags", []):
c.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag,))
c.execute(
"INSERT OR IGNORE INTO recipe_tags (recipe_id, tag_name) VALUES (?, ?)",
(rid, tag),
)
conn.commit()
conn.close()