import sqlite3 import json import os import secrets DB_PATH = os.environ.get("DB_PATH", "/data/oil_calculator.db") def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn def init_db(): os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = get_db() c = conn.cursor() c.executescript(""" CREATE TABLE IF NOT EXISTS oils ( name TEXT PRIMARY KEY, bottle_price REAL NOT NULL, drop_count INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS recipes ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, note TEXT DEFAULT '', sort_order INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS recipe_ingredients ( id INTEGER PRIMARY KEY AUTOINCREMENT, recipe_id INTEGER NOT NULL REFERENCES recipes(id) ON DELETE CASCADE, oil_name TEXT NOT NULL, drops REAL NOT NULL ); CREATE TABLE IF NOT EXISTS tags ( name TEXT PRIMARY KEY ); CREATE TABLE IF NOT EXISTS recipe_tags ( recipe_id INTEGER NOT NULL REFERENCES recipes(id) ON DELETE CASCADE, tag_name TEXT NOT NULL, PRIMARY KEY (recipe_id, tag_name) ); CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, token TEXT UNIQUE NOT NULL, role TEXT NOT NULL DEFAULT 'viewer', display_name TEXT, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, action TEXT NOT NULL, target_type TEXT, target_id TEXT, target_name TEXT, detail TEXT, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS user_inventory ( user_id INTEGER NOT NULL, oil_name TEXT NOT NULL, PRIMARY KEY (user_id, oil_name) ); CREATE TABLE IF NOT EXISTS profit_projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, ingredients TEXT NOT NULL DEFAULT '[]', pricing REAL DEFAULT 0, note TEXT DEFAULT '', created_by INTEGER, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS user_diary ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, source_recipe_id INTEGER, name TEXT NOT NULL, ingredients TEXT NOT NULL DEFAULT '[]', note TEXT DEFAULT '', created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS diary_entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, diary_id INTEGER NOT NULL REFERENCES user_diary(id) ON DELETE CASCADE, content TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS user_favorites ( user_id INTEGER NOT NULL, recipe_id INTEGER NOT NULL, created_at TEXT DEFAULT (datetime('now')), PRIMARY KEY (user_id, recipe_id) ); CREATE TABLE IF NOT EXISTS search_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, query TEXT NOT NULL, matched_count INTEGER DEFAULT 0, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS notifications ( id INTEGER PRIMARY KEY AUTOINCREMENT, target_role TEXT NOT NULL DEFAULT 'admin', title TEXT NOT NULL, body TEXT, is_read INTEGER DEFAULT 0, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS category_modules ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, subtitle TEXT DEFAULT '', icon TEXT DEFAULT '🌿', bg_image TEXT DEFAULT '', color_from TEXT DEFAULT '#7a9e7e', color_to TEXT DEFAULT '#5a7d5e', tag_name TEXT NOT NULL, sort_order INTEGER DEFAULT 0 ); """) # Migration: add password and brand fields to users if missing user_cols = [row[1] for row in c.execute("PRAGMA table_info(users)").fetchall()] if "password" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN password TEXT") # Create bug_reports table c.execute("""CREATE TABLE IF NOT EXISTS bug_reports ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, content TEXT NOT NULL, is_resolved INTEGER DEFAULT 0, created_at TEXT DEFAULT (datetime('now')) )""") # Migration: add priority to bug_reports bug_cols = [row[1] for row in c.execute("PRAGMA table_info(bug_reports)").fetchall()] if bug_cols and "priority" not in bug_cols: c.execute("ALTER TABLE bug_reports ADD COLUMN priority INTEGER DEFAULT 2") # Create bug_comments table for activity log c.execute("""CREATE TABLE IF NOT EXISTS bug_comments ( id INTEGER PRIMARY KEY AUTOINCREMENT, bug_id INTEGER NOT NULL REFERENCES bug_reports(id) ON DELETE CASCADE, user_id INTEGER, action TEXT NOT NULL, content TEXT DEFAULT '', created_at TEXT DEFAULT (datetime('now')) )""") if "qr_code" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN qr_code TEXT") if "brand_logo" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN brand_logo TEXT") if "brand_name" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN brand_name TEXT") if "brand_bg" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN brand_bg TEXT") if "brand_align" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN brand_align TEXT DEFAULT 'center'") # Migration: add tags to user_diary diary_cols = [row[1] for row in c.execute("PRAGMA table_info(user_diary)").fetchall()] if diary_cols and "tags" not in diary_cols: c.execute("ALTER TABLE user_diary ADD COLUMN tags TEXT DEFAULT '[]'") # Migration: business verification if "business_verified" not in user_cols: c.execute("ALTER TABLE users ADD COLUMN business_verified INTEGER DEFAULT 0") c.execute("""CREATE TABLE IF NOT EXISTS business_applications ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, business_name TEXT, document TEXT, status TEXT DEFAULT 'pending', reject_reason TEXT DEFAULT '', created_at TEXT DEFAULT (datetime('now')), reviewed_at TEXT )""") # Translation suggestions table c.execute("""CREATE TABLE IF NOT EXISTS translation_suggestions ( id INTEGER PRIMARY KEY AUTOINCREMENT, recipe_id INTEGER, recipe_name TEXT NOT NULL, suggested_en TEXT NOT NULL, user_id INTEGER, status TEXT DEFAULT 'pending', created_at TEXT DEFAULT (datetime('now')) )""") # Migration: add reject_reason to business_applications if missing biz_cols = [row[1] for row in c.execute("PRAGMA table_info(business_applications)").fetchall()] if biz_cols and "reject_reason" not in biz_cols: c.execute("ALTER TABLE business_applications ADD COLUMN reject_reason TEXT DEFAULT ''") # Migration: add target_user_id to notifications for user-specific notifications notif_cols = [row[1] for row in c.execute("PRAGMA table_info(notifications)").fetchall()] if "target_user_id" not in notif_cols: c.execute("ALTER TABLE notifications ADD COLUMN target_user_id INTEGER") # Migration: add assigned_to to bug_reports bug_cols2 = [row[1] for row in c.execute("PRAGMA table_info(bug_reports)").fetchall()] if "assigned_to" not in bug_cols2: c.execute("ALTER TABLE bug_reports ADD COLUMN assigned_to INTEGER") # Migration: add version to recipes for optimistic locking recipe_cols = [row[1] for row in c.execute("PRAGMA table_info(recipes)").fetchall()] if "version" not in recipe_cols: c.execute("ALTER TABLE recipes ADD COLUMN version INTEGER DEFAULT 1") # Migration: add retail_price and is_active to oils if missing oil_cols = [row[1] for row in c.execute("PRAGMA table_info(oils)").fetchall()] if "retail_price" not in oil_cols: c.execute("ALTER TABLE oils ADD COLUMN retail_price REAL") if "is_active" not in oil_cols: c.execute("ALTER TABLE oils ADD COLUMN is_active INTEGER DEFAULT 1") # Migration: add new columns to category_modules if missing cat_cols = [row[1] for row in c.execute("PRAGMA table_info(category_modules)").fetchall()] if cat_cols and "bg_image" not in cat_cols: c.execute("DROP TABLE category_modules") c.execute("""CREATE TABLE category_modules ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, subtitle TEXT DEFAULT '', icon TEXT DEFAULT '🌿', bg_image TEXT DEFAULT '', color_from TEXT DEFAULT '#7a9e7e', color_to TEXT DEFAULT '#5a7d5e', tag_name TEXT NOT NULL, sort_order INTEGER DEFAULT 0)""") # Migration: add owner_id to recipes if missing cols = [row[1] for row in c.execute("PRAGMA table_info(recipes)").fetchall()] if "owner_id" not in cols: c.execute("ALTER TABLE recipes ADD COLUMN owner_id INTEGER") if "updated_by" not in cols: c.execute("ALTER TABLE recipes ADD COLUMN updated_by INTEGER") if "en_name" not in cols: c.execute("ALTER TABLE recipes ADD COLUMN en_name TEXT DEFAULT ''") if "en_oils" not in cols: c.execute("ALTER TABLE recipes ADD COLUMN en_oils TEXT DEFAULT '{}'") # Seed admin user if no users exist count = c.execute("SELECT COUNT(*) FROM users").fetchone()[0] if count == 0: admin_token = os.environ.get("ADMIN_TOKEN", secrets.token_hex(24)) c.execute( "INSERT INTO users (username, token, role, display_name) VALUES (?, ?, ?, ?)", ("hera", admin_token, "admin", "Hera"), ) admin_id = c.lastrowid # Assign all existing recipes to admin c.execute("UPDATE recipes SET owner_id = ? WHERE owner_id IS NULL", (admin_id,)) print(f"[INIT] Admin user created. Token: {admin_token}") conn.commit() conn.close() def log_audit(conn, user_id, action, target_type=None, target_id=None, target_name=None, detail=None): conn.execute( "INSERT INTO audit_log (user_id, action, target_type, target_id, target_name, detail) " "VALUES (?, ?, ?, ?, ?, ?)", (user_id, action, target_type, str(target_id) if target_id else None, target_name, detail), ) def seed_defaults(default_oils_meta: dict, default_recipes: list): """Seed DB with defaults if empty.""" conn = get_db() c = conn.cursor() # Seed oils count = c.execute("SELECT COUNT(*) FROM oils").fetchone()[0] if count == 0: for name, meta in default_oils_meta.items(): c.execute( "INSERT OR IGNORE INTO oils (name, bottle_price, drop_count) VALUES (?, ?, ?)", (name, meta["bottlePrice"], meta["dropCount"]), ) # Seed recipes count = c.execute("SELECT COUNT(*) FROM recipes").fetchone()[0] if count == 0: # Get admin user id for ownership admin = c.execute("SELECT id FROM users WHERE role = 'admin' LIMIT 1").fetchone() admin_id = admin[0] if admin else None for r in default_recipes: c.execute( "INSERT INTO recipes (name, note, owner_id) VALUES (?, ?, ?)", (r["name"], r.get("note", ""), admin_id), ) rid = c.lastrowid for ing in r["ingredients"]: c.execute( "INSERT INTO recipe_ingredients (recipe_id, oil_name, drops) VALUES (?, ?, ?)", (rid, ing["oil"], ing["drops"]), ) for tag in r.get("tags", []): c.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag,)) c.execute( "INSERT OR IGNORE INTO recipe_tags (recipe_id, tag_name) VALUES (?, ?)", (rid, tag), ) conn.commit() conn.close()