import sqlite3 import json import os import secrets DB_PATH = os.environ.get("DB_PATH", "/data/oil_calculator.db") def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn def init_db(): os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = get_db() c = conn.cursor() c.executescript(""" CREATE TABLE IF NOT EXISTS oils ( name TEXT PRIMARY KEY, bottle_price REAL NOT NULL, drop_count INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS recipes ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, note TEXT DEFAULT '', sort_order INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS recipe_ingredients ( id INTEGER PRIMARY KEY AUTOINCREMENT, recipe_id INTEGER NOT NULL REFERENCES recipes(id) ON DELETE CASCADE, oil_name TEXT NOT NULL, drops REAL NOT NULL ); CREATE TABLE IF NOT EXISTS tags ( name TEXT PRIMARY KEY ); CREATE TABLE IF NOT EXISTS recipe_tags ( recipe_id INTEGER NOT NULL REFERENCES recipes(id) ON DELETE CASCADE, tag_name TEXT NOT NULL, PRIMARY KEY (recipe_id, tag_name) ); CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, token TEXT UNIQUE NOT NULL, role TEXT NOT NULL DEFAULT 'viewer', display_name TEXT, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, action TEXT NOT NULL, target_type TEXT, target_id TEXT, target_name TEXT, detail TEXT, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS user_inventory ( user_id INTEGER NOT NULL, oil_name TEXT NOT NULL, PRIMARY KEY (user_id, oil_name) ); CREATE TABLE IF NOT EXISTS profit_projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, ingredients TEXT NOT NULL DEFAULT '[]', pricing REAL DEFAULT 0, note TEXT DEFAULT '', created_by INTEGER, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS user_diary ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, source_recipe_id INTEGER, name TEXT NOT NULL, ingredients TEXT NOT NULL DEFAULT '[]', note TEXT DEFAULT '', created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS diary_entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, diary_id INTEGER NOT NULL REFERENCES user_diary(id) ON DELETE CASCADE, content TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS user_favorites ( user_id INTEGER NOT NULL, recipe_id INTEGER NOT NULL, created_at TEXT DEFAULT (datetime('now')), PRIMARY KEY (user_id, recipe_id) ); CREATE TABLE IF NOT EXISTS search_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, query TEXT NOT NULL, matched_count INTEGER DEFAULT 0, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS notifications ( id INTEGER PRIMARY KEY AUTOINCREMENT, target_role TEXT NOT NULL DEFAULT 'admin', title TEXT NOT NULL, body TEXT, is_read INTEGER DEFAULT 0, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS category_modules ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, subtitle TEXT DEFAULT '', icon TEXT DEFAULT '🌿', bg_image TEXT DEFAULT '', color_from TEXT DEFAULT '#7a9e7e', color_to TEXT DEFAULT '#5a7d5e', tag_name TEXT NOT NULL, sort_order INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS oil_cards ( name TEXT PRIMARY KEY REFERENCES oils(name) ON DELETE CASCADE, emoji TEXT DEFAULT '', en TEXT DEFAULT '', effects TEXT DEFAULT '', usage TEXT DEFAULT '', method TEXT DEFAULT '', caution TEXT DEFAULT '' ); """) # Migration: add password and brand fields to users if missing user_cols = [row[1] for row in c.execute("PRAGMA table_info(users)").fetchall()] if "password" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN password TEXT") # Create bug_reports table c.execute("""CREATE TABLE IF NOT EXISTS bug_reports ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, content TEXT NOT NULL, is_resolved INTEGER DEFAULT 0, created_at TEXT DEFAULT (datetime('now')) )""") # Migration: add priority to bug_reports bug_cols = [row[1] for row in c.execute("PRAGMA table_info(bug_reports)").fetchall()] if bug_cols and "priority" not in bug_cols: c.execute("ALTER TABLE bug_reports ADD COLUMN priority INTEGER DEFAULT 2") # Create bug_comments table for activity log c.execute("""CREATE TABLE IF NOT EXISTS bug_comments ( id INTEGER PRIMARY KEY AUTOINCREMENT, bug_id INTEGER NOT NULL REFERENCES bug_reports(id) ON DELETE CASCADE, user_id INTEGER, action TEXT NOT NULL, content TEXT DEFAULT '', created_at TEXT DEFAULT (datetime('now')) )""") if "qr_code" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN qr_code TEXT") if "brand_logo" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN brand_logo TEXT") if "brand_name" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN brand_name TEXT") if "brand_bg" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN brand_bg TEXT") if "brand_align" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN brand_align TEXT DEFAULT 'center'") if "role_changed_at" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN role_changed_at TEXT") if "username_changed" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN username_changed INTEGER DEFAULT 0") # Migration: add tags to user_diary diary_cols = [row[1] for row in c.execute("PRAGMA table_info(user_diary)").fetchall()] if diary_cols and "tags" not in diary_cols: c.execute("ALTER TABLE user_diary ADD COLUMN tags TEXT DEFAULT '[]'") # Migration: business verification if "business_verified" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN business_verified INTEGER DEFAULT 0") c.execute("""CREATE TABLE IF NOT EXISTS business_applications ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, business_name TEXT, document TEXT, status TEXT DEFAULT 'pending', reject_reason TEXT DEFAULT '', created_at TEXT DEFAULT (datetime('now')), reviewed_at TEXT )""") # Translation suggestions table c.execute("""CREATE TABLE IF NOT EXISTS translation_suggestions ( id INTEGER PRIMARY KEY AUTOINCREMENT, recipe_id INTEGER, recipe_name TEXT NOT NULL, suggested_en TEXT NOT NULL, user_id INTEGER, status TEXT DEFAULT 'pending', created_at TEXT DEFAULT (datetime('now')) )""") # Migration: add reject_reason to business_applications if missing biz_cols = [row[1] for row in c.execute("PRAGMA table_info(business_applications)").fetchall()] if biz_cols and "reject_reason" not in biz_cols: c.execute("ALTER TABLE business_applications ADD COLUMN reject_reason TEXT DEFAULT ''") # Migration: add target_user_id to notifications for user-specific notifications notif_cols = [row[1] for row in c.execute("PRAGMA table_info(notifications)").fetchall()] if "target_user_id" not in notif_cols: c.execute("ALTER TABLE notifications ADD COLUMN target_user_id INTEGER") # Migration: add assigned_to to bug_reports bug_cols2 = [row[1] for row in c.execute("PRAGMA table_info(bug_reports)").fetchall()] if "assigned_to" not in bug_cols2: c.execute("ALTER TABLE bug_reports ADD COLUMN assigned_to INTEGER") # Migration: add version to recipes for optimistic locking recipe_cols = [row[1] for row in c.execute("PRAGMA table_info(recipes)").fetchall()] if "version" not in recipe_cols: c.execute("ALTER TABLE recipes ADD COLUMN version INTEGER DEFAULT 1") # Migration: add retail_price and is_active to oils if missing oil_cols = [row[1] for row in c.execute("PRAGMA table_info(oils)").fetchall()] if "retail_price" not in oil_cols: c.execute("ALTER TABLE oils ADD COLUMN retail_price REAL") if "is_active" not in oil_cols: c.execute("ALTER TABLE oils ADD COLUMN is_active INTEGER DEFAULT 1") if "en_name" not in oil_cols: c.execute("ALTER TABLE oils ADD COLUMN en_name TEXT DEFAULT ''") if "unit" not in oil_cols: c.execute("ALTER TABLE oils ADD COLUMN unit TEXT DEFAULT 'drop'") # Migration: add new columns to category_modules if missing cat_cols = [row[1] for row in c.execute("PRAGMA table_info(category_modules)").fetchall()] if cat_cols and "bg_image" not in cat_cols: c.execute("DROP TABLE category_modules") c.execute("""CREATE TABLE category_modules ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, subtitle TEXT DEFAULT '', icon TEXT DEFAULT '🌿', bg_image TEXT DEFAULT '', color_from TEXT DEFAULT '#7a9e7e', color_to TEXT DEFAULT '#5a7d5e', tag_name TEXT NOT NULL, sort_order INTEGER DEFAULT 0)""") # Migration: add owner_id to recipes if missing cols = [row[1] for row in c.execute("PRAGMA table_info(recipes)").fetchall()] if "owner_id" not in cols: c.execute("ALTER TABLE recipes ADD COLUMN owner_id INTEGER") if "updated_by" not in cols: c.execute("ALTER TABLE recipes ADD COLUMN updated_by INTEGER") if "en_name" not in cols: c.execute("ALTER TABLE recipes ADD COLUMN en_name TEXT DEFAULT ''") if "volume" not in cols: c.execute("ALTER TABLE recipes ADD COLUMN volume TEXT DEFAULT ''") # Migration: rename oils 西洋蓍草→西洋蓍草石榴籽, 元气→元气焕能 _oil_renames = [("元气", "元气焕能")] for old_name, new_name in _oil_renames: old_exists = c.execute("SELECT 1 FROM oils WHERE name = ?", (old_name,)).fetchone() new_exists = c.execute("SELECT 1 FROM oils WHERE name = ?", (new_name,)).fetchone() if old_exists and new_exists: # Both exist: delete old, update recipe references to new c.execute("DELETE FROM oils WHERE name = ?", (old_name,)) c.execute("UPDATE recipe_ingredients SET oil_name = ? WHERE oil_name = ?", (new_name, old_name)) elif old_exists: c.execute("UPDATE oils SET name = ? WHERE name = ?", (new_name, old_name)) c.execute("UPDATE recipe_ingredients SET oil_name = ? WHERE oil_name = ?", (new_name, old_name)) # Migration: clean up recipe names — remove leading numbers, normalize 细胞律动 format _recipe_renames = { "2、神经系统细胞律动": "细胞律动-神经系统", "3、消化系统细胞律动": "细胞律动-消化系统", "4、骨骼系统细胞律动(炎症控制)": "细胞律动-骨骼系统(炎症控制)", "5、淋巴系统细胞律动": "细胞律动-淋巴系统", "6、生殖系统细胞律动": "细胞律动-生殖系统", "7、免疫系统细胞律动": "细胞律动-免疫系统", "8、循环系统细胞律动": "细胞律动-循环系统", "9、内分泌系统细胞律动": "细胞律动-内分泌系统", "12、芳香调理技术": "芳香调理技术", "普拉提根基配方:2": "普拉提根基配方(二)", } for old_name, new_name in _recipe_renames.items(): c.execute("UPDATE recipes SET name = ? WHERE name = ?", (new_name, old_name)) # Migration: trailing Arabic numerals → Chinese numerals in parentheses import re as _re _num_map = {'1': '一', '2': '二', '3': '三', '4': '四', '5': '五', '6': '六', '7': '七', '8': '八', '9': '九'} _all_recipes = c.execute("SELECT id, name FROM recipes").fetchall() for row in _all_recipes: name = row['name'] # Step 1: trailing Arabic digits → Chinese in parentheses: 灰指甲1 → 灰指甲(一) m = _re.search(r'(\d+)$', name) if m: chinese = ''.join(_num_map.get(d, d) for d in m.group(1)) name = name[:m.start()] + '(' + chinese + ')' c.execute("UPDATE recipes SET name = ? WHERE id = ?", (name, row['id'])) # Step 2: trailing bare Chinese numeral → add parentheses: 灰指甲一 → 灰指甲(一) m2 = _re.search(r'([一二三四五六七八九十]+)$', name) if m2 and not name.endswith(')'): name = name[:m2.start()] + '(' + m2.group(1) + ')' c.execute("UPDATE recipes SET name = ? WHERE id = ?", (name, row['id'])) # Migration: add number suffix to base recipes that have numbered siblings _all_recipes2 = c.execute("SELECT id, name FROM recipes").fetchall() _cn_nums = list('一二三四五六七八九十') _base_groups = {} for row in _all_recipes2: name = row['name'] m = _re.match(r'^(.+?)(([一二三四五六七八九十]+))$', name) if m: _base_groups.setdefault(m.group(1), set()).add(m.group(2)) # Find bare names that match a numbered group, assign next available number for row in _all_recipes2: if row['name'] in _base_groups: used = _base_groups[row['name']] next_num = next((n for n in _cn_nums if n not in used), '十') new_name = row['name'] + '(' + next_num + ')' c.execute("UPDATE recipes SET name = ? WHERE id = ?", (new_name, row['id'])) _base_groups[row['name']].add(next_num) # Seed admin user if no users exist count = c.execute("SELECT COUNT(*) FROM users").fetchone()[0] if count == 0: admin_token = os.environ.get("ADMIN_TOKEN", secrets.token_hex(24)) c.execute( "INSERT INTO users (username, token, role, display_name) VALUES (?, ?, ?, ?)", ("hera", admin_token, "admin", "Hera"), ) admin_id = c.lastrowid # Assign all existing recipes to admin c.execute("UPDATE recipes SET owner_id = ? WHERE owner_id IS NULL", (admin_id,)) print(f"[INIT] Admin user created. Token: {admin_token}") conn.commit() conn.close() def log_audit(conn, user_id, action, target_type=None, target_id=None, target_name=None, detail=None): conn.execute( "INSERT INTO audit_log (user_id, action, target_type, target_id, target_name, detail) " "VALUES (?, ?, ?, ?, ?, ?)", (user_id, action, target_type, str(target_id) if target_id else None, target_name, detail), ) def seed_defaults(default_oils_meta: dict, default_recipes: list, default_oil_cards: dict = None): """Seed DB with defaults if empty.""" conn = get_db() c = conn.cursor() # Seed oils count = c.execute("SELECT COUNT(*) FROM oils").fetchone()[0] if count == 0: for name, meta in default_oils_meta.items(): c.execute( "INSERT OR IGNORE INTO oils (name, bottle_price, drop_count) VALUES (?, ?, ?)", (name, meta["bottlePrice"], meta["dropCount"]), ) # Seed recipes count = c.execute("SELECT COUNT(*) FROM recipes").fetchone()[0] if count == 0: # Get admin user id for ownership admin = c.execute("SELECT id FROM users WHERE role = 'admin' LIMIT 1").fetchone() admin_id = admin[0] if admin else None for r in default_recipes: c.execute( "INSERT INTO recipes (name, note, owner_id) VALUES (?, ?, ?)", (r["name"], r.get("note", ""), admin_id), ) rid = c.lastrowid for ing in r["ingredients"]: c.execute( "INSERT INTO recipe_ingredients (recipe_id, oil_name, drops) VALUES (?, ?, ?)", (rid, ing["oil"], ing["drops"]), ) for tag in r.get("tags", []): c.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag,)) c.execute( "INSERT OR IGNORE INTO recipe_tags (recipe_id, tag_name) VALUES (?, ?)", (rid, tag), ) # Seed oil_cards from static data if table is empty if default_oil_cards: card_count = c.execute("SELECT COUNT(*) FROM oil_cards").fetchone()[0] if card_count == 0: for name, card in default_oil_cards.items(): c.execute( "INSERT OR IGNORE INTO oil_cards (name, emoji, en, effects, usage, method, caution) " "VALUES (?, ?, ?, ?, ?, ?, ?)", (name, card.get("emoji", ""), card.get("en", ""), card.get("effects", ""), card.get("usage", ""), card.get("method", ""), card.get("caution", "")), ) conn.commit() conn.close()