from fastapi import FastAPI, HTTPException, Request, Depends from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import Optional import json import os from backend.database import get_db, init_db, seed_defaults, log_audit import hashlib import secrets as _secrets app = FastAPI(title="Essential Oil Formula Calculator API") # ── Password hashing (PBKDF2-SHA256, stdlib) ───────── def hash_password(password: str) -> str: salt = _secrets.token_hex(16) h = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 200_000) return f"{salt}${h.hex()}" def verify_password(password: str, stored: str) -> bool: if not stored: return False if "$" not in stored: # Legacy plaintext — compare directly return password == stored salt, h = stored.split("$", 1) return hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 200_000).hex() == h def _upgrade_password_if_needed(conn, user_id: int, password: str, stored: str): """If stored password is legacy plaintext, upgrade to hashed.""" if stored and "$" not in stored: conn.execute("UPDATE users SET password = ? WHERE id = ?", (hash_password(password), user_id)) conn.commit() # Periodic WAL checkpoint to ensure data is flushed to main DB file import threading, time as _time def _wal_checkpoint_loop(): while True: _time.sleep(300) # Every 5 minutes try: conn = get_db() conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") conn.close() except: pass threading.Thread(target=_wal_checkpoint_loop, daemon=True).start() # ── Auth ──────────────────────────────────────────────── ANON_USER = {"id": None, "role": "viewer", "username": "anonymous", "display_name": "匿名用户"} def get_current_user(request: Request): token = request.headers.get("Authorization", "").removeprefix("Bearer ").strip() if not token: return ANON_USER conn = get_db() user = conn.execute("SELECT id, username, role, display_name, password, business_verified FROM users WHERE token = ?", (token,)).fetchone() conn.close() if not user: return ANON_USER return dict(user) def require_role(*roles): """Returns a dependency that checks the user has one of the given roles.""" def checker(user=Depends(get_current_user)): if user["role"] not in roles: raise HTTPException(403, "权限不足") return user return checker # ── Models ────────────────────────────────────────────── class OilIn(BaseModel): name: str bottle_price: float drop_count: int retail_price: Optional[float] = None class IngredientIn(BaseModel): oil_name: str drops: float class RecipeIn(BaseModel): name: str note: str = "" ingredients: list[IngredientIn] tags: list[str] = [] class RecipeUpdate(BaseModel): name: Optional[str] = None en_name: Optional[str] = None note: Optional[str] = None ingredients: Optional[list[IngredientIn]] = None tags: Optional[list[str]] = None version: Optional[int] = None class UserIn(BaseModel): username: str role: str = "viewer" display_name: str = "" class UserUpdate(BaseModel): role: Optional[str] = None display_name: Optional[str] = None # ── Me ────────────────────────────────────────────────── APP_VERSION = "20260401" @app.get("/api/version") def get_version(): return {"version": APP_VERSION} @app.get("/api/me") def get_me(user=Depends(get_current_user)): return {"username": user["username"], "role": user["role"], "display_name": user.get("display_name", ""), "id": user.get("id"), "has_password": bool(user.get("password")), "business_verified": bool(user.get("business_verified"))} # ── Bug Reports ───────────────────────────────────────── @app.post("/api/bug-report", status_code=201) def submit_bug(body: dict, user=Depends(get_current_user)): content = body.get("content", "").strip() if not content: raise HTTPException(400, "请输入内容") conn = get_db() who = user.get("display_name") or user.get("username") or "匿名" # Admin can set priority; user-submitted bugs default to urgent (0) default_pri = 2 if user.get("role") == "admin" else 0 priority = body.get("priority", default_pri) c = conn.execute("INSERT INTO bug_reports (user_id, content, priority) VALUES (?, ?, ?)", (user.get("id"), content, priority)) bug_id = c.lastrowid # Auto-log: created conn.execute( "INSERT INTO bug_comments (bug_id, user_id, action, content) VALUES (?, ?, ?, ?)", (bug_id, user.get("id"), "创建", content), ) # Only notify admin if submitter is NOT admin if user.get("role") != "admin": conn.execute( "INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)", ("admin", "🐛 Bug 反馈", f"{who}:{content}\n[bug_id:{bug_id}]") ) conn.commit() conn.close() return {"ok": True} @app.get("/api/bug-reports") def list_bugs(user=Depends(get_current_user)): conn = get_db() if user["role"] == "admin": rows = conn.execute( "SELECT b.id, b.content, b.is_resolved, b.created_at, b.user_id, b.priority, b.assigned_to, " "u.display_name, u.username, a.display_name as assigned_name " "FROM bug_reports b LEFT JOIN users u ON b.user_id = u.id LEFT JOIN users a ON b.assigned_to = a.id " "ORDER BY b.priority ASC, COALESCE((SELECT MAX(c.created_at) FROM bug_comments c WHERE c.bug_id=b.id), b.created_at) DESC" ).fetchall() else: rows = conn.execute( "SELECT b.id, b.content, b.is_resolved, b.created_at, b.user_id, b.priority, b.assigned_to, " "u.display_name, u.username, a.display_name as assigned_name " "FROM bug_reports b LEFT JOIN users u ON b.user_id = u.id LEFT JOIN users a ON b.assigned_to = a.id " "WHERE b.user_id = ? OR b.assigned_to = ? OR b.is_resolved IN (1, 3) " "ORDER BY b.priority ASC, COALESCE((SELECT MAX(c.created_at) FROM bug_comments c WHERE c.bug_id=b.id), b.created_at) DESC", (user.get("id"), user.get("id")) ).fetchall() bugs = [dict(r) for r in rows] # Attach comments/log for each bug for bug in bugs: comments = conn.execute( "SELECT c.id, c.action, c.content, c.created_at, u.display_name, u.username " "FROM bug_comments c LEFT JOIN users u ON c.user_id = u.id " "WHERE c.bug_id = ? ORDER BY c.created_at ASC", (bug["id"],) ).fetchall() bug["comments"] = [dict(c) for c in comments] conn.close() return bugs @app.put("/api/bug-reports/{bug_id}") def update_bug(bug_id: int, body: dict, user=Depends(get_current_user)): conn = get_db() bug = conn.execute("SELECT user_id, content, is_resolved FROM bug_reports WHERE id = ?", (bug_id,)).fetchone() if not bug: conn.close() raise HTTPException(404, "Bug not found") # status: 0=open, 1=待测试, 2=已修复(admin only), 3=已测试(tester feedback) status_names = {0: "待处理", 1: "待测试", 2: "已修复", 3: "已测试"} new_status = body.get("status") note = body.get("note", "").strip() if new_status is not None: # Only admin can mark as resolved (status 2) if new_status == 2 and user["role"] != "admin": conn.close() raise HTTPException(403, "只有管理员可以标记为已修复") conn.execute("UPDATE bug_reports SET is_resolved = ? WHERE id = ?", (new_status, bug_id)) # Auto-log the status change action = "→ " + status_names.get(new_status, str(new_status)) conn.execute( "INSERT INTO bug_comments (bug_id, user_id, action, content) VALUES (?, ?, ?, ?)", (bug_id, user.get("id"), action, note), ) notify_user_id = body.get("notify_user_id") if new_status == 1 and notify_user_id: # Admin sends to specific tester — user-targeted notification conn.execute("UPDATE bug_reports SET assigned_to = ? WHERE id = ?", (notify_user_id, bug_id)) target = conn.execute("SELECT role, display_name, username FROM users WHERE id = ?", (notify_user_id,)).fetchone() if target: msg = "请帮忙测试「" + bug["content"][:80] + "」" if note: msg += "\n\n备注:" + note msg += "\n[bug_id:" + str(bug_id) + "]" conn.execute( "INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)", (target["role"], "🔧 Bug 待测试", msg, notify_user_id) ) elif new_status == 1 and bug["user_id"]: # Admin marks as testing → notify reporter reporter = conn.execute("SELECT role, display_name, username FROM users WHERE id = ?", (bug["user_id"],)).fetchone() if reporter: conn.execute( "INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)", (reporter["role"], "🔧 你的 Bug 已修复,请测试", "请帮忙测试「" + bug["content"][:80] + "」" + ("\n\n备注:" + note if note else "")) ) if reporter["role"] in ("viewer", "editor"): conn.execute( "INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)", ("senior_editor", "🔧 Bug 待测试(来自" + (reporter["display_name"] or reporter["username"]) + ")", "问题「" + bug["content"][:50] + "」已修复,请协助测试确认。") ) elif new_status == 3: # Tester confirms tested → notify admin who = user.get("display_name") or user.get("username") or "测试者" msg = who + " 已测试「" + bug["content"][:50] + "」" if note: msg += "\n\n备注:" + note msg += "\n[bug_id:" + str(bug_id) + "]" conn.execute( "INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)", ("admin", "🧪 Bug 已测试", msg) ) elif new_status == 2: # Admin marks as resolved pass # No notification needed, admin did it themselves if "content" in body: conn.execute("UPDATE bug_reports SET content = ? WHERE id = ?", (body["content"], bug_id)) if "priority" in body: conn.execute("UPDATE bug_reports SET priority = ? WHERE id = ?", (body["priority"], bug_id)) conn.commit() conn.close() return {"ok": True} @app.post("/api/bug-reports/{bug_id}/comment") @app.delete("/api/bug-reports/{bug_id}") def delete_bug(bug_id: int, user=Depends(require_role("admin"))): conn = get_db() conn.execute("DELETE FROM bug_comments WHERE bug_id = ?", (bug_id,)) conn.execute("DELETE FROM bug_reports WHERE id = ?", (bug_id,)) conn.commit() conn.close() return {"ok": True} def add_bug_comment(bug_id: int, body: dict, user=Depends(get_current_user)): """Add a manual comment/note to a bug's log.""" content = body.get("content", "").strip() if not content: raise HTTPException(400, "请输入内容") conn = get_db() bug = conn.execute("SELECT id FROM bug_reports WHERE id = ?", (bug_id,)).fetchone() if not bug: conn.close() raise HTTPException(404, "Bug not found") conn.execute( "INSERT INTO bug_comments (bug_id, user_id, action, content) VALUES (?, ?, ?, ?)", (bug_id, user.get("id"), "备注", content), ) conn.commit() conn.close() return {"ok": True} @app.post("/api/symptom-search") def symptom_search(body: dict, user=Depends(get_current_user)): """Search recipes by symptom, notify editors if no good match.""" query = body.get("query", "").strip() if not query: return {"recipes": [], "exact": False} conn = get_db() # Search in recipe names rows = conn.execute( "SELECT id, name, note, owner_id, version, en_name FROM recipes ORDER BY id" ).fetchall() exact = [] related = [] for r in rows: if query in r["name"]: exact.append(_recipe_to_dict(conn, r)) elif any(c in r["name"] for c in query): related.append(_recipe_to_dict(conn, r)) # If user reports no match, notify editors if body.get("report_missing"): who = user.get("display_name") or user.get("username") or "用户" for role in ("admin", "senior_editor", "editor"): conn.execute( "INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)", (role, "🔍 用户需求:" + query, f"{who} 搜索了「{query}」,没有找到满意的配方,请考虑添加。") ) conn.execute("INSERT INTO search_log (user_id, query, matched_count) VALUES (?, ?, ?)", (user.get("id"), query, 0)) conn.commit() conn.close() return {"exact": exact, "related": related[:20]} # ── Register ──────────────────────────────────────────── @app.post("/api/register", status_code=201) def register(body: dict): username = body.get("username", "").strip() password = body.get("password", "").strip() display_name = body.get("display_name", "").strip() if not username or len(username) < 2: raise HTTPException(400, "用户名至少2个字符") if not password or len(password) < 4: raise HTTPException(400, "密码至少4位") token = _secrets.token_hex(24) conn = get_db() try: conn.execute( "INSERT INTO users (username, token, role, display_name, password) VALUES (?, ?, ?, ?, ?)", (username, token, "viewer", display_name or username, hash_password(password)) ) conn.commit() except Exception: conn.close() raise HTTPException(400, "用户名已被占用") conn.close() return {"token": token} # ── Login ─────────────────────────────────────────────── @app.post("/api/login") def login(body: dict): username = body.get("username", "").strip() password = body.get("password", "").strip() if not username or not password: raise HTTPException(400, "请输入用户名和密码") conn = get_db() user = conn.execute("SELECT id, token, password, display_name, role FROM users WHERE username = ?", (username,)).fetchone() if not user: conn.close() raise HTTPException(401, "用户名不存在") if not user["password"]: conn.close() raise HTTPException(401, "该账号未设置密码,请使用链接登录后设置密码") if not verify_password(password, user["password"]): conn.close() raise HTTPException(401, "密码错误") # Auto-upgrade legacy plaintext password to hashed _upgrade_password_if_needed(conn, user["id"], password, user["password"]) conn.close() return {"token": user["token"], "display_name": user["display_name"], "role": user["role"]} @app.put("/api/me") def update_me(body: dict, user=Depends(get_current_user)): if not user["id"]: raise HTTPException(403, "请先登录") conn = get_db() # Update display_name if "display_name" in body: dn = body["display_name"].strip() if not dn: conn.close() raise HTTPException(400, "昵称不能为空") conn.execute("UPDATE users SET display_name = ? WHERE id = ?", (dn, user["id"])) # Update username if "username" in body: un = body["username"].strip() if not un or len(un) < 2: conn.close() raise HTTPException(400, "用户名至少2个字符") existing = conn.execute("SELECT id FROM users WHERE username = ? AND id != ?", (un, user["id"])).fetchone() if existing: conn.close() raise HTTPException(400, "用户名已被占用") conn.execute("UPDATE users SET username = ? WHERE id = ?", (un, user["id"])) # Update password (requires old password verification) if "password" in body: pw = body["password"].strip() if pw and len(pw) < 4: conn.close() raise HTTPException(400, "新密码至少4位") old_pw = body.get("old_password", "").strip() current_pw = user.get("password") or "" if current_pw and not verify_password(old_pw, current_pw): conn.close() raise HTTPException(400, "当前密码不正确") if pw: conn.execute("UPDATE users SET password = ? WHERE id = ?", (hash_password(pw), user["id"])) conn.commit() conn.close() return {"ok": True} @app.put("/api/me/password") def set_password(body: dict, user=Depends(get_current_user)): """Legacy endpoint, kept for compatibility.""" if not user["id"]: raise HTTPException(403, "请先登录") pw = body.get("password", "").strip() if not pw or len(pw) < 4: raise HTTPException(400, "密码至少4位") conn = get_db() conn.execute("UPDATE users SET password = ? WHERE id = ?", (hash_password(pw), user["id"])) conn.commit() conn.close() return {"ok": True} # ── Business Verification ────────────────────────────── @app.post("/api/business-apply", status_code=201) def business_apply(body: dict, user=Depends(get_current_user)): if not user["id"]: raise HTTPException(403, "请先登录") conn = get_db() # Check if already has pending application existing = conn.execute( "SELECT id, status FROM business_applications WHERE user_id = ? ORDER BY id DESC LIMIT 1", (user["id"],) ).fetchone() if existing and existing["status"] == "pending": conn.close() raise HTTPException(400, "已有待审核的申请") if user.get("business_verified"): conn.close() raise HTTPException(400, "已是认证商业用户") business_name = body.get("business_name", "").strip() document = body.get("document", "") # base64 image if not business_name: conn.close() raise HTTPException(400, "请填写商户名称") if document and len(document) > 2000000: conn.close() raise HTTPException(400, "文件太大,请压缩到1.5MB以内") conn.execute( "INSERT INTO business_applications (user_id, business_name, document) VALUES (?, ?, ?)", (user["id"], business_name, document) ) who = user.get("display_name") or user.get("username") conn.execute( "INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)", ("admin", "🏢 商业认证申请", f"{who} 申请商业用户认证,商户名:{business_name}") ) conn.commit() conn.close() return {"ok": True} @app.get("/api/my-business-application") def get_my_business_application(user=Depends(get_current_user)): if not user["id"]: return {"status": None} conn = get_db() row = conn.execute( "SELECT business_name, document, status, reject_reason, created_at FROM business_applications WHERE user_id = ? ORDER BY id DESC LIMIT 1", (user["id"],) ).fetchone() conn.close() if not row: return {"status": None} return dict(row) @app.get("/api/business-applications") def list_business_applications(user=Depends(require_role("admin"))): conn = get_db() rows = conn.execute( "SELECT a.id, a.user_id, a.business_name, a.document, a.status, a.created_at, " "u.display_name, u.username FROM business_applications a " "LEFT JOIN users u ON a.user_id = u.id ORDER BY a.id DESC" ).fetchall() conn.close() return [dict(r) for r in rows] @app.post("/api/business-applications/{app_id}/approve") def approve_business(app_id: int, user=Depends(require_role("admin"))): conn = get_db() app = conn.execute("SELECT user_id, business_name FROM business_applications WHERE id = ?", (app_id,)).fetchone() if not app: conn.close() raise HTTPException(404, "申请不存在") conn.execute("UPDATE business_applications SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?", (app_id,)) conn.execute("UPDATE users SET business_verified = 1 WHERE id = ?", (app["user_id"],)) # Notify user target = conn.execute("SELECT role FROM users WHERE id = ?", (app["user_id"],)).fetchone() if target: conn.execute( "INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)", (target["role"], "🎉 商业认证通过", "恭喜!你的商业用户认证已通过,现在可以使用项目核算等商业功能。", app["user_id"]) ) conn.commit() conn.close() return {"ok": True} @app.post("/api/business-applications/{app_id}/reject") def reject_business(app_id: int, body: dict = None, user=Depends(require_role("admin"))): conn = get_db() app = conn.execute("SELECT user_id FROM business_applications WHERE id = ?", (app_id,)).fetchone() if not app: conn.close() raise HTTPException(404, "申请不存在") reason = (body or {}).get("reason", "").strip() conn.execute("UPDATE business_applications SET status = 'rejected', reviewed_at = datetime('now'), reject_reason = ? WHERE id = ?", (reason, app_id)) target = conn.execute("SELECT role FROM users WHERE id = ?", (app["user_id"],)).fetchone() if target: msg = "你的商业用户认证申请未通过。" if reason: msg += "\n\n原因:" + reason msg += "\n\n你可以修改后重新申请。" conn.execute( "INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)", (target["role"], "商业认证未通过", msg, app["user_id"]) ) conn.commit() conn.close() return {"ok": True} # ── Translation Suggestions ──────────────────────────── @app.post("/api/translation-suggest", status_code=201) def suggest_translation(body: dict, user=Depends(get_current_user)): if not user["id"]: raise HTTPException(403, "请先登录") recipe_name = body.get("recipe_name", "").strip() suggested_en = body.get("suggested_en", "").strip() recipe_id = body.get("recipe_id") if not recipe_name or not suggested_en: raise HTTPException(400, "请填写翻译") conn = get_db() conn.execute( "INSERT INTO translation_suggestions (recipe_id, recipe_name, suggested_en, user_id) VALUES (?, ?, ?, ?)", (recipe_id, recipe_name, suggested_en, user["id"]) ) who = user.get("display_name") or user.get("username") if user["role"] != "admin": conn.execute( "INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)", ("admin", "📝 翻译建议", f"{who} 建议将「{recipe_name}」翻译为「{suggested_en}」") ) conn.commit() conn.close() return {"ok": True} @app.get("/api/translation-suggestions") def list_translation_suggestions(user=Depends(require_role("admin"))): conn = get_db() rows = conn.execute( "SELECT s.id, s.recipe_id, s.recipe_name, s.suggested_en, s.status, s.created_at, " "u.display_name, u.username FROM translation_suggestions s " "LEFT JOIN users u ON s.user_id = u.id WHERE s.status = 'pending' ORDER BY s.id DESC" ).fetchall() conn.close() return [dict(r) for r in rows] @app.post("/api/translation-suggestions/{sid}/approve") def approve_translation(sid: int, user=Depends(require_role("admin"))): conn = get_db() row = conn.execute("SELECT recipe_name, suggested_en FROM translation_suggestions WHERE id = ?", (sid,)).fetchone() if not row: conn.close() raise HTTPException(404) conn.execute("UPDATE translation_suggestions SET status = 'approved' WHERE id = ?", (sid,)) conn.commit() conn.close() return {"ok": True, "recipe_name": row["recipe_name"], "suggested_en": row["suggested_en"]} @app.post("/api/translation-suggestions/{sid}/reject") def reject_translation(sid: int, user=Depends(require_role("admin"))): conn = get_db() conn.execute("UPDATE translation_suggestions SET status = 'rejected' WHERE id = ?", (sid,)) conn.commit() conn.close() return {"ok": True} @app.post("/api/business-revoke/{user_id}") def revoke_business(user_id: int, body: dict = None, user=Depends(require_role("admin"))): conn = get_db() conn.execute("UPDATE users SET business_verified = 0 WHERE id = ?", (user_id,)) reason = (body or {}).get("reason", "").strip() target = conn.execute("SELECT role FROM users WHERE id = ?", (user_id,)).fetchone() if target: msg = "你的商业用户资格已被取消。" if reason: msg += "\n\n原因:" + reason msg += "\n\n如有疑问请联系管理员,也可重新申请认证。" conn.execute( "INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)", (target["role"], "商业资格已取消", msg, user_id) ) conn.commit() conn.close() return {"ok": True} @app.post("/api/impersonate") def impersonate(body: dict, user=Depends(require_role("admin"))): target_id = body.get("user_id") if not target_id: raise HTTPException(400, "user_id required") conn = get_db() target = conn.execute("SELECT token FROM users WHERE id = ?", (target_id,)).fetchone() conn.close() if not target: raise HTTPException(404, "User not found") return {"token": target["token"]} # ── Oils ──────────────────────────────────────────────── @app.get("/api/oils") def list_oils(): conn = get_db() rows = conn.execute("SELECT name, bottle_price, drop_count, retail_price, is_active FROM oils ORDER BY name").fetchall() conn.close() return [dict(r) for r in rows] @app.post("/api/oils", status_code=201) def upsert_oil(oil: OilIn, user=Depends(require_role("admin", "senior_editor"))): conn = get_db() conn.execute( "INSERT INTO oils (name, bottle_price, drop_count, retail_price) VALUES (?, ?, ?, ?) " "ON CONFLICT(name) DO UPDATE SET bottle_price=excluded.bottle_price, drop_count=excluded.drop_count, retail_price=excluded.retail_price", (oil.name, oil.bottle_price, oil.drop_count, oil.retail_price), ) log_audit(conn, user["id"], "upsert_oil", "oil", oil.name, oil.name, json.dumps({"bottle_price": oil.bottle_price, "drop_count": oil.drop_count})) conn.commit() conn.close() return {"ok": True} @app.delete("/api/oils/{name}") def delete_oil(name: str, user=Depends(require_role("admin", "senior_editor"))): conn = get_db() row = conn.execute("SELECT name, bottle_price, drop_count, retail_price, is_active FROM oils WHERE name = ?", (name,)).fetchone() snapshot = dict(row) if row else {} conn.execute("DELETE FROM oils WHERE name = ?", (name,)) log_audit(conn, user["id"], "delete_oil", "oil", name, name, json.dumps(snapshot, ensure_ascii=False)) conn.commit() conn.close() return {"ok": True} # ── Recipes ───────────────────────────────────────────── def _recipe_to_dict(conn, row): rid = row["id"] ings = conn.execute( "SELECT oil_name, drops FROM recipe_ingredients WHERE recipe_id = ?", (rid,) ).fetchall() tags = conn.execute( "SELECT tag_name FROM recipe_tags WHERE recipe_id = ?", (rid,) ).fetchall() owner = conn.execute( "SELECT display_name, username FROM users WHERE id = ?", (row["owner_id"],) ).fetchone() if row["owner_id"] else None return { "id": rid, "name": row["name"], "en_name": row["en_name"] if "en_name" in row.keys() else "", "note": row["note"], "owner_id": row["owner_id"], "owner_name": (owner["display_name"] or owner["username"]) if owner else None, "version": row["version"] if "version" in row.keys() else 1, "ingredients": [{"oil_name": i["oil_name"], "drops": i["drops"]} for i in ings], "tags": [t["tag_name"] for t in tags], } @app.get("/api/recipes") def list_recipes(user=Depends(get_current_user)): conn = get_db() # Admin sees all; others see admin-owned (adopted) + their own if user["role"] == "admin": rows = conn.execute("SELECT id, name, note, owner_id, version, en_name FROM recipes ORDER BY id").fetchall() else: admin = conn.execute("SELECT id FROM users WHERE role = 'admin' LIMIT 1").fetchone() admin_id = admin["id"] if admin else 1 user_id = user.get("id") if user_id: rows = conn.execute( "SELECT id, name, note, owner_id, version, en_name FROM recipes WHERE owner_id = ? OR owner_id = ? ORDER BY id", (admin_id, user_id) ).fetchall() else: rows = conn.execute( "SELECT id, name, note, owner_id, version, en_name FROM recipes WHERE owner_id = ? ORDER BY id", (admin_id,) ).fetchall() result = [_recipe_to_dict(conn, r) for r in rows] conn.close() return result @app.get("/api/recipes/{recipe_id}") def get_recipe(recipe_id: int): conn = get_db() row = conn.execute("SELECT id, name, note, owner_id, version, en_name FROM recipes WHERE id = ?", (recipe_id,)).fetchone() if not row: conn.close() raise HTTPException(404, "Recipe not found") result = _recipe_to_dict(conn, row) conn.close() return result @app.post("/api/recipes", status_code=201) def create_recipe(recipe: RecipeIn, user=Depends(get_current_user)): if not user.get("id"): raise HTTPException(401, "请先登录") conn = get_db() c = conn.cursor() c.execute("INSERT INTO recipes (name, note, owner_id) VALUES (?, ?, ?)", (recipe.name, recipe.note, user["id"])) rid = c.lastrowid for ing in recipe.ingredients: c.execute( "INSERT INTO recipe_ingredients (recipe_id, oil_name, drops) VALUES (?, ?, ?)", (rid, ing.oil_name, ing.drops), ) for tag in recipe.tags: c.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag,)) c.execute("INSERT OR IGNORE INTO recipe_tags (recipe_id, tag_name) VALUES (?, ?)", (rid, tag)) log_audit(conn, user["id"], "create_recipe", "recipe", rid, recipe.name) # Notify admin when non-admin creates a recipe if user["role"] != "admin": who = user.get("display_name") or user["username"] conn.execute( "INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)", ("admin", "📝 新配方待审核", f"{who} 新增了配方「{recipe.name}」,请到管理配方查看并采纳。") ) conn.commit() conn.close() return {"id": rid} def _check_recipe_permission(conn, recipe_id, user): """Check if user can modify this recipe.""" row = conn.execute("SELECT owner_id, name FROM recipes WHERE id = ?", (recipe_id,)).fetchone() if not row: raise HTTPException(404, "Recipe not found") if user["role"] in ("admin", "senior_editor"): return row if row["owner_id"] == user.get("id"): return row raise HTTPException(403, "只能修改自己创建的配方") @app.put("/api/recipes/{recipe_id}") def update_recipe(recipe_id: int, update: RecipeUpdate, user=Depends(get_current_user)): if not user.get("id"): raise HTTPException(401, "请先登录") conn = get_db() c = conn.cursor() _check_recipe_permission(conn, recipe_id, user) # Optimistic locking: check version if provided if update.version is not None: current = c.execute("SELECT version FROM recipes WHERE id = ?", (recipe_id,)).fetchone() if current and current["version"] and current["version"] != update.version: conn.close() raise HTTPException(409, "此配方已被其他人修改,请刷新后重试") if update.name is not None: c.execute("UPDATE recipes SET name = ? WHERE id = ?", (update.name, recipe_id)) if update.note is not None: c.execute("UPDATE recipes SET note = ? WHERE id = ?", (update.note, recipe_id)) if update.en_name is not None: c.execute("UPDATE recipes SET en_name = ? WHERE id = ?", (update.en_name, recipe_id)) if update.ingredients is not None: c.execute("DELETE FROM recipe_ingredients WHERE recipe_id = ?", (recipe_id,)) for ing in update.ingredients: c.execute( "INSERT INTO recipe_ingredients (recipe_id, oil_name, drops) VALUES (?, ?, ?)", (recipe_id, ing.oil_name, ing.drops), ) if update.tags is not None: c.execute("DELETE FROM recipe_tags WHERE recipe_id = ?", (recipe_id,)) for tag in update.tags: c.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag,)) c.execute( "INSERT OR IGNORE INTO recipe_tags (recipe_id, tag_name) VALUES (?, ?)", (recipe_id, tag), ) c.execute("UPDATE recipes SET updated_by = ?, version = COALESCE(version, 1) + 1 WHERE id = ?", (user["id"], recipe_id)) log_audit(conn, user["id"], "update_recipe", "recipe", recipe_id, update.name) conn.commit() conn.close() return {"ok": True} @app.delete("/api/recipes/{recipe_id}") def delete_recipe(recipe_id: int, user=Depends(get_current_user)): if not user.get("id"): raise HTTPException(401, "请先登录") conn = get_db() row = _check_recipe_permission(conn, recipe_id, user) # Save full snapshot for undo full = conn.execute("SELECT id, name, note, owner_id, version, en_name FROM recipes WHERE id = ?", (recipe_id,)).fetchone() snapshot = _recipe_to_dict(conn, full) log_audit(conn, user["id"], "delete_recipe", "recipe", recipe_id, row["name"], json.dumps(snapshot, ensure_ascii=False)) conn.execute("DELETE FROM recipes WHERE id = ?", (recipe_id,)) conn.commit() conn.close() return {"ok": True} # ── Adopt (admin takes ownership of editor recipes) ──── @app.post("/api/recipes/{recipe_id}/adopt") def adopt_recipe(recipe_id: int, user=Depends(require_role("admin"))): conn = get_db() row = conn.execute("SELECT id, name, owner_id FROM recipes WHERE id = ?", (recipe_id,)).fetchone() if not row: conn.close() raise HTTPException(404, "Recipe not found") if row["owner_id"] == user["id"]: conn.close() return {"ok": True, "msg": "already owned"} old_owner = conn.execute("SELECT display_name, username FROM users WHERE id = ?", (row["owner_id"],)).fetchone() old_name = (old_owner["display_name"] or old_owner["username"]) if old_owner else "unknown" conn.execute("UPDATE recipes SET owner_id = ?, updated_by = ? WHERE id = ?", (user["id"], user["id"], recipe_id)) log_audit(conn, user["id"], "adopt_recipe", "recipe", recipe_id, row["name"], json.dumps({"from_user": old_name})) conn.commit() conn.close() return {"ok": True} @app.post("/api/recipes/adopt-batch") def adopt_batch(body: dict, user=Depends(require_role("admin"))): ids = body.get("ids", []) if not ids: raise HTTPException(400, "No recipe ids provided") conn = get_db() adopted = 0 for rid in ids: row = conn.execute("SELECT id, name, owner_id FROM recipes WHERE id = ?", (rid,)).fetchone() if row and row["owner_id"] != user["id"]: conn.execute("UPDATE recipes SET owner_id = ?, updated_by = ? WHERE id = ?", (user["id"], user["id"], rid)) log_audit(conn, user["id"], "adopt_recipe", "recipe", rid, row["name"]) adopted += 1 conn.commit() conn.close() return {"ok": True, "adopted": adopted} # ── Tags ──────────────────────────────────────────────── @app.get("/api/tags") def list_tags(): conn = get_db() rows = conn.execute("SELECT name FROM tags ORDER BY name").fetchall() conn.close() return [r["name"] for r in rows] @app.post("/api/tags", status_code=201) def create_tag(body: dict, user=Depends(require_role("admin", "senior_editor", "editor"))): name = body.get("name", "").strip() if not name: raise HTTPException(400, "Tag name required") conn = get_db() conn.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (name,)) # Don't log tag creation (too frequent/noisy) conn.commit() conn.close() return {"ok": True} @app.delete("/api/tags/{name}") def delete_tag(name: str, user=Depends(require_role("admin"))): conn = get_db() conn.execute("DELETE FROM recipe_tags WHERE tag_name = ?", (name,)) conn.execute("DELETE FROM tags WHERE name = ?", (name,)) log_audit(conn, user["id"], "delete_tag", "tag", name, name) conn.commit() conn.close() return {"ok": True} # ── Users (admin only) ───────────────────────────────── @app.get("/api/users") def list_users(user=Depends(require_role("admin"))): conn = get_db() rows = conn.execute("SELECT id, username, token, role, display_name, created_at, business_verified FROM users ORDER BY id").fetchall() conn.close() return [dict(r) for r in rows] @app.post("/api/users", status_code=201) def create_user(body: UserIn, user=Depends(require_role("admin"))): token = _secrets.token_hex(24) conn = get_db() try: conn.execute( "INSERT INTO users (username, token, role, display_name) VALUES (?, ?, ?, ?)", (body.username, token, body.role, body.display_name), ) log_audit(conn, user["id"], "create_user", "user", body.username, body.display_name, json.dumps({"role": body.role})) conn.commit() except Exception: conn.close() raise HTTPException(400, "用户名已存在") conn.close() return {"token": token, "username": body.username} @app.delete("/api/users/{user_id}") def delete_user(user_id: int, user=Depends(require_role("admin"))): if user_id == user["id"]: raise HTTPException(400, "不能删除自己") conn = get_db() target = conn.execute("SELECT id, username, token, role, display_name FROM users WHERE id = ?", (user_id,)).fetchone() if not target: conn.close() raise HTTPException(404, "User not found") snapshot = dict(target) conn.execute("DELETE FROM users WHERE id = ?", (user_id,)) log_audit(conn, user["id"], "delete_user", "user", user_id, target["username"], json.dumps(snapshot, ensure_ascii=False)) conn.commit() conn.close() return {"ok": True} @app.put("/api/users/{user_id}") def update_user(user_id: int, body: UserUpdate, user=Depends(require_role("admin"))): conn = get_db() if body.role is not None: conn.execute("UPDATE users SET role = ? WHERE id = ?", (body.role, user_id)) if body.display_name is not None: conn.execute("UPDATE users SET display_name = ? WHERE id = ?", (body.display_name, user_id)) log_audit(conn, user["id"], "update_user", "user", user_id, None, json.dumps({"role": body.role, "display_name": body.display_name})) conn.commit() conn.close() return {"ok": True} # ── Undo (admin only) ────────────────────────────────── @app.post("/api/audit-log/{log_id}/undo") def undo_action(log_id: int, user=Depends(require_role("admin"))): conn = get_db() entry = conn.execute("SELECT * FROM audit_log WHERE id = ?", (log_id,)).fetchone() if not entry: conn.close() raise HTTPException(404, "Audit log entry not found") action = entry["action"] detail = entry["detail"] if not detail: conn.close() raise HTTPException(400, "此操作无法撤销(无快照数据)") snapshot = json.loads(detail) if action == "delete_recipe": # Restore recipe from snapshot c = conn.cursor() c.execute("INSERT INTO recipes (name, note, owner_id) VALUES (?, ?, ?)", (snapshot["name"], snapshot.get("note", ""), snapshot.get("owner_id"))) new_id = c.lastrowid for ing in snapshot.get("ingredients", []): c.execute("INSERT INTO recipe_ingredients (recipe_id, oil_name, drops) VALUES (?, ?, ?)", (new_id, ing["oil_name"], ing["drops"])) for tag in snapshot.get("tags", []): c.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag,)) c.execute("INSERT OR IGNORE INTO recipe_tags (recipe_id, tag_name) VALUES (?, ?)", (new_id, tag)) log_audit(conn, user["id"], "undo_delete_recipe", "recipe", new_id, snapshot["name"], json.dumps({"original_log_id": log_id})) conn.commit() conn.close() return {"ok": True, "new_id": new_id} elif action == "delete_oil": conn.execute( "INSERT OR IGNORE INTO oils (name, bottle_price, drop_count) VALUES (?, ?, ?)", (snapshot["name"], snapshot["bottle_price"], snapshot["drop_count"])) log_audit(conn, user["id"], "undo_delete_oil", "oil", snapshot["name"], snapshot["name"], json.dumps({"original_log_id": log_id})) conn.commit() conn.close() return {"ok": True} elif action == "delete_user": try: conn.execute( "INSERT INTO users (username, token, role, display_name) VALUES (?, ?, ?, ?)", (snapshot["username"], snapshot["token"], snapshot["role"], snapshot.get("display_name", ""))) log_audit(conn, user["id"], "undo_delete_user", "user", snapshot["username"], snapshot.get("display_name"), json.dumps({"original_log_id": log_id})) conn.commit() except Exception: conn.close() raise HTTPException(400, "恢复失败(用户名可能已被占用)") conn.close() return {"ok": True} conn.close() raise HTTPException(400, "此操作类型不支持撤销") # ── Audit Log (admin only) ───────────────────────────── @app.get("/api/audit-log") def get_audit_log(limit: int = 100, offset: int = 0, user=Depends(require_role("admin"))): conn = get_db() rows = conn.execute( "SELECT a.id, a.action, a.target_type, a.target_id, a.target_name, a.detail, a.created_at, " "u.display_name as user_name, u.username " "FROM audit_log a LEFT JOIN users u ON a.user_id = u.id " "ORDER BY a.id DESC LIMIT ? OFFSET ?", (limit, offset), ).fetchall() conn.close() return [dict(r) for r in rows] # ── Brand Settings ───────────────────────────────────── @app.get("/api/brand") def get_brand(user=Depends(get_current_user)): if not user["id"]: return {"qr_code": None, "brand_logo": None, "brand_bg": None, "brand_name": None, "brand_align": "center"} conn = get_db() row = conn.execute("SELECT qr_code, brand_logo, brand_bg, brand_name, brand_align FROM users WHERE id = ?", (user["id"],)).fetchone() conn.close() if not row: return {"qr_code": None, "brand_logo": None, "brand_bg": None, "brand_name": None, "brand_align": "center"} return dict(row) @app.put("/api/brand") def update_brand(body: dict, user=Depends(get_current_user)): if not user["id"]: raise HTTPException(403, "请先登录") conn = get_db() if "qr_code" in body: # Limit size: ~500KB base64 if body["qr_code"] and len(body["qr_code"]) > 700000: raise HTTPException(400, "图片太大,请压缩后上传") conn.execute("UPDATE users SET qr_code = ? WHERE id = ?", (body["qr_code"], user["id"])) if "brand_logo" in body: if body["brand_logo"] and len(body["brand_logo"]) > 700000: raise HTTPException(400, "图片太大,请压缩后上传") conn.execute("UPDATE users SET brand_logo = ? WHERE id = ?", (body["brand_logo"], user["id"])) if "brand_bg" in body: if body["brand_bg"] and len(body["brand_bg"]) > 1500000: raise HTTPException(400, "背景图太大,请压缩到1MB以内") conn.execute("UPDATE users SET brand_bg = ? WHERE id = ?", (body["brand_bg"], user["id"])) if "brand_name" in body: conn.execute("UPDATE users SET brand_name = ? WHERE id = ?", (body["brand_name"], user["id"])) if "brand_align" in body: conn.execute("UPDATE users SET brand_align = ? WHERE id = ?", (body["brand_align"], user["id"])) conn.commit() conn.close() return {"ok": True} # ── Profit Projects ──────────────────────────────────── @app.get("/api/projects") def list_projects(): conn = get_db() rows = conn.execute("SELECT * FROM profit_projects ORDER BY id DESC").fetchall() conn.close() return [{ **dict(r), "ingredients": json.loads(r["ingredients"]) } for r in rows] @app.post("/api/projects", status_code=201) def create_project(body: dict, user=Depends(require_role("admin", "senior_editor"))): conn = get_db() c = conn.cursor() c.execute( "INSERT INTO profit_projects (name, ingredients, pricing, note, created_by) VALUES (?, ?, ?, ?, ?)", (body["name"], json.dumps(body.get("ingredients", []), ensure_ascii=False), body.get("pricing", 0), body.get("note", ""), user["id"]) ) conn.commit() pid = c.lastrowid conn.close() return {"id": pid} @app.put("/api/projects/{pid}") def update_project(pid: int, body: dict, user=Depends(require_role("admin", "senior_editor"))): conn = get_db() if "name" in body: conn.execute("UPDATE profit_projects SET name = ? WHERE id = ?", (body["name"], pid)) if "ingredients" in body: conn.execute("UPDATE profit_projects SET ingredients = ? WHERE id = ?", (json.dumps(body["ingredients"], ensure_ascii=False), pid)) if "pricing" in body: conn.execute("UPDATE profit_projects SET pricing = ? WHERE id = ?", (body["pricing"], pid)) if "note" in body: conn.execute("UPDATE profit_projects SET note = ? WHERE id = ?", (body["note"], pid)) conn.commit() conn.close() return {"ok": True} @app.delete("/api/projects/{pid}") def delete_project(pid: int, user=Depends(require_role("admin", "senior_editor"))): conn = get_db() conn.execute("DELETE FROM profit_projects WHERE id = ?", (pid,)) conn.commit() conn.close() return {"ok": True} # ── Diary (personal recipes + journal) ───────────────── @app.get("/api/diary") def list_diary(user=Depends(get_current_user)): if not user["id"]: return [] conn = get_db() rows = conn.execute( "SELECT id, source_recipe_id, name, ingredients, note, tags, created_at " "FROM user_diary WHERE user_id = ? ORDER BY id DESC", (user["id"],) ).fetchall() result = [] for r in rows: entries = conn.execute( "SELECT id, content, created_at FROM diary_entries WHERE diary_id = ? ORDER BY created_at DESC", (r["id"],) ).fetchall() d = dict(r) d["ingredients"] = json.loads(r["ingredients"]) d["tags"] = json.loads(r["tags"] or "[]") d["entries"] = [dict(e) for e in entries] result.append(d) conn.close() return result @app.post("/api/diary", status_code=201) def create_diary(body: dict, user=Depends(get_current_user)): if not user["id"]: raise HTTPException(403, "请先登录") name = body.get("name", "").strip() ingredients = body.get("ingredients", []) note = body.get("note", "") source_id = body.get("source_recipe_id") if not name: raise HTTPException(400, "请输入配方名称") conn = get_db() c = conn.cursor() c.execute( "INSERT INTO user_diary (user_id, source_recipe_id, name, ingredients, note) VALUES (?, ?, ?, ?, ?)", (user["id"], source_id, name, json.dumps(ingredients, ensure_ascii=False), note) ) conn.commit() did = c.lastrowid conn.close() return {"id": did} @app.put("/api/diary/{diary_id}") def update_diary(diary_id: int, body: dict, user=Depends(get_current_user)): conn = get_db() row = conn.execute("SELECT user_id FROM user_diary WHERE id = ?", (diary_id,)).fetchone() if not row or row["user_id"] != user["id"]: conn.close() raise HTTPException(403, "无权操作") if "name" in body: conn.execute("UPDATE user_diary SET name = ? WHERE id = ?", (body["name"], diary_id)) if "note" in body: conn.execute("UPDATE user_diary SET note = ? WHERE id = ?", (body["note"], diary_id)) if "ingredients" in body: conn.execute("UPDATE user_diary SET ingredients = ? WHERE id = ?", (json.dumps(body["ingredients"], ensure_ascii=False), diary_id)) if "tags" in body: conn.execute("UPDATE user_diary SET tags = ? WHERE id = ?", (json.dumps(body["tags"], ensure_ascii=False), diary_id)) conn.commit() conn.close() return {"ok": True} @app.delete("/api/diary/{diary_id}") def delete_diary(diary_id: int, user=Depends(get_current_user)): conn = get_db() row = conn.execute("SELECT user_id FROM user_diary WHERE id = ?", (diary_id,)).fetchone() if not row or row["user_id"] != user["id"]: conn.close() raise HTTPException(403, "无权操作") conn.execute("DELETE FROM user_diary WHERE id = ?", (diary_id,)) conn.commit() conn.close() return {"ok": True} @app.post("/api/diary/{diary_id}/entries", status_code=201) def add_diary_entry(diary_id: int, body: dict, user=Depends(get_current_user)): conn = get_db() row = conn.execute("SELECT user_id FROM user_diary WHERE id = ?", (diary_id,)).fetchone() if not row or row["user_id"] != user["id"]: conn.close() raise HTTPException(403, "无权操作") content = body.get("content", "").strip() if not content: raise HTTPException(400, "内容不能为空") conn.execute("INSERT INTO diary_entries (diary_id, content) VALUES (?, ?)", (diary_id, content)) conn.commit() conn.close() return {"ok": True} @app.delete("/api/diary/entries/{entry_id}") def delete_diary_entry(entry_id: int, user=Depends(get_current_user)): conn = get_db() row = conn.execute( "SELECT d.user_id FROM diary_entries e JOIN user_diary d ON e.diary_id = d.id WHERE e.id = ?", (entry_id,) ).fetchone() if not row or row["user_id"] != user["id"]: conn.close() raise HTTPException(403, "无权操作") conn.execute("DELETE FROM diary_entries WHERE id = ?", (entry_id,)) conn.commit() conn.close() return {"ok": True} # ── Favorites ────────────────────────────────────────── @app.get("/api/favorites") def get_favorites(user=Depends(get_current_user)): if not user["id"]: return [] conn = get_db() rows = conn.execute("SELECT recipe_id FROM user_favorites WHERE user_id = ? ORDER BY created_at DESC", (user["id"],)).fetchall() conn.close() return [r["recipe_id"] for r in rows] @app.post("/api/favorites/{recipe_id}", status_code=201) def add_favorite(recipe_id: int, user=Depends(get_current_user)): if not user["id"]: raise HTTPException(403, "请先登录") conn = get_db() conn.execute("INSERT OR IGNORE INTO user_favorites (user_id, recipe_id) VALUES (?, ?)", (user["id"], recipe_id)) conn.commit() conn.close() return {"ok": True} @app.delete("/api/favorites/{recipe_id}") def remove_favorite(recipe_id: int, user=Depends(get_current_user)): if not user["id"]: raise HTTPException(403, "请先登录") conn = get_db() conn.execute("DELETE FROM user_favorites WHERE user_id = ? AND recipe_id = ?", (user["id"], recipe_id)) conn.commit() conn.close() return {"ok": True} # ── User Inventory ───────────────────────────────────── @app.get("/api/inventory") def get_inventory(user=Depends(get_current_user)): if not user["id"]: return [] conn = get_db() rows = conn.execute("SELECT oil_name FROM user_inventory WHERE user_id = ? ORDER BY oil_name", (user["id"],)).fetchall() conn.close() return [r["oil_name"] for r in rows] @app.post("/api/inventory", status_code=201) def add_inventory(body: dict, user=Depends(get_current_user)): if not user["id"]: raise HTTPException(403, "请先登录") name = body.get("oil_name", "").strip() if not name: raise HTTPException(400, "oil_name required") conn = get_db() conn.execute("INSERT OR IGNORE INTO user_inventory (user_id, oil_name) VALUES (?, ?)", (user["id"], name)) conn.commit() conn.close() return {"ok": True} @app.delete("/api/inventory/{oil_name}") def remove_inventory(oil_name: str, user=Depends(get_current_user)): if not user["id"]: raise HTTPException(403, "请先登录") conn = get_db() conn.execute("DELETE FROM user_inventory WHERE user_id = ? AND oil_name = ?", (user["id"], oil_name)) conn.commit() conn.close() return {"ok": True} @app.get("/api/inventory/recipes") def recipes_by_inventory(user=Depends(get_current_user)): """Get recipes that can be made with user's inventory oils.""" if not user["id"]: return [] conn = get_db() inv = [r["oil_name"] for r in conn.execute( "SELECT oil_name FROM user_inventory WHERE user_id = ?", (user["id"],)).fetchall()] if not inv: conn.close() return [] rows = conn.execute("SELECT id, name, note, owner_id, version, en_name FROM recipes ORDER BY id").fetchall() result = [] for r in rows: recipe = _recipe_to_dict(conn, r) eo_oils = [i["oil_name"] for i in recipe["ingredients"] if i["oil_name"] != "椰子油"] matched = [o for o in eo_oils if o in inv] if matched: recipe["inventory_match"] = len(matched) recipe["inventory_total"] = len(eo_oils) recipe["inventory_missing"] = [o for o in eo_oils if o not in inv] result.append(recipe) conn.close() result.sort(key=lambda r: (-r["inventory_match"], r["inventory_total"] - r["inventory_match"])) return result # ── Search Logging ───────────────────────────────────── @app.post("/api/search-log") def log_search(body: dict, user=Depends(get_current_user)): query = body.get("query", "").strip() matched = body.get("matched_count", 0) if not query: return {"ok": True} conn = get_db() conn.execute("INSERT INTO search_log (user_id, query, matched_count) VALUES (?, ?, ?)", (user.get("id"), query, matched)) # Instant notification when no match found if matched == 0: who = user.get("display_name") or user.get("username") or "用户" for role in ("admin", "senior_editor"): conn.execute( "INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)", (role, "🔍 有人搜索了未收录的配方", f"{who} 搜索了「{query}」但没有找到匹配配方,请考虑添加。") ) conn.commit() conn.close() return {"ok": True, "matched": matched} @app.get("/api/search-log/unmatched") def get_unmatched_searches(days: int = 7, user=Depends(require_role("admin", "senior_editor"))): conn = get_db() rows = conn.execute( "SELECT query, COUNT(*) as cnt, MAX(created_at) as last_at " "FROM search_log WHERE matched_count = 0 AND created_at > datetime('now', ?) " "GROUP BY query ORDER BY cnt DESC LIMIT 50", (f"-{days} days",) ).fetchall() conn.close() return [dict(r) for r in rows] # ── Notifications ────────────────────────────────────── @app.get("/api/notifications") def get_notifications(user=Depends(get_current_user)): if not user["id"]: return [] conn = get_db() rows = conn.execute( "SELECT id, title, body, is_read, created_at FROM notifications " "WHERE (target_user_id = ? OR (target_user_id IS NULL AND (target_role = ? OR target_role = 'all'))) " "ORDER BY is_read ASC, id DESC LIMIT 200", (user["id"], user["role"]) ).fetchall() conn.close() return [dict(r) for r in rows] @app.post("/api/notifications/{nid}/read") def mark_notification_read(nid: int, body: dict = None, user=Depends(get_current_user)): conn = get_db() notif = conn.execute("SELECT title FROM notifications WHERE id = ?", (nid,)).fetchone() # Bug test notifications can only be marked read with force=true (from "已测试" flow) force = (body or {}).get("force", False) if body else False if notif and "待测试" in (notif["title"] or "") and not force: conn.close() raise HTTPException(400, "请先点击「已测试」完成测试") conn.execute("UPDATE notifications SET is_read = 1 WHERE id = ?", (nid,)) conn.commit() conn.close() return {"ok": True} @app.post("/api/notifications/{nid}/unread") def mark_notification_unread(nid: int, user=Depends(get_current_user)): conn = get_db() conn.execute("UPDATE notifications SET is_read = 0 WHERE id = ?", (nid,)) conn.commit() conn.close() return {"ok": True} @app.post("/api/notifications/read-all") def mark_all_notifications_read(user=Depends(get_current_user)): conn = get_db() # Mark all as read EXCEPT bug test notifications (title contains '待测试') conn.execute( "UPDATE notifications SET is_read = 1 WHERE is_read = 0 " "AND title NOT LIKE '%待测试%' " "AND (target_user_id = ? OR (target_user_id IS NULL AND (target_role = ? OR target_role = 'all')))", (user["id"], user["role"]) ) conn.commit() conn.close() return {"ok": True} @app.post("/api/cron/weekly-review") def weekly_review(user=Depends(require_role("admin"))): """Generate weekly notifications. Call via cron or manually.""" conn = get_db() # 1. Pending recipes for admin review admin = conn.execute("SELECT id FROM users WHERE role = 'admin' LIMIT 1").fetchone() admin_id = admin["id"] if admin else 1 pending = conn.execute( "SELECT COUNT(*) as cnt FROM recipes WHERE owner_id != ?", (admin_id,) ).fetchone()["cnt"] if pending > 0: conn.execute( "INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)", ("admin", f"有 {pending} 条配方待审核", "其他用户新增了配方,请到「管理配方」→「待审核」查看并采纳。") ) # 2. Unmatched searches for admin + senior_editor unmatched = conn.execute( "SELECT query, COUNT(*) as cnt FROM search_log " "WHERE matched_count = 0 AND created_at > datetime('now', '-7 days') " "GROUP BY query ORDER BY cnt DESC LIMIT 10" ).fetchall() if unmatched: queries = "、".join([f"「{r['query']}」({r['cnt']}次)" for r in unmatched]) for role in ("admin", "senior_editor"): conn.execute( "INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)", (role, "本周未匹配的搜索需求", f"以下搜索没有找到配方:{queries}。请考虑完善相关配方。") ) conn.commit() conn.close() return {"ok": True, "pending_recipes": pending, "unmatched_queries": len(unmatched)} # ── Category Modules (homepage) ──────────────────────── @app.get("/api/categories") def list_categories(): conn = get_db() rows = conn.execute("SELECT * FROM category_modules ORDER BY sort_order, id").fetchall() conn.close() return [dict(r) for r in rows] @app.post("/api/categories", status_code=201) def create_category(body: dict, user=Depends(require_role("admin"))): conn = get_db() conn.execute( "INSERT INTO category_modules (name, subtitle, icon, bg_image, color_from, color_to, tag_name, sort_order) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (body["name"], body.get("subtitle", ""), body.get("icon", "🌿"), body.get("bg_image", ""), body.get("color_from", "#7a9e7e"), body.get("color_to", "#5a7d5e"), body["tag_name"], body.get("sort_order", 0)) ) conn.commit() conn.close() return {"ok": True} @app.delete("/api/categories/{cat_id}") def delete_category(cat_id: int, user=Depends(require_role("admin"))): conn = get_db() conn.execute("DELETE FROM category_modules WHERE id = ?", (cat_id,)) conn.commit() conn.close() return {"ok": True} # ── Static files (frontend) ──────────────────────────── FRONTEND_DIR = os.environ.get("FRONTEND_DIR", "/app/frontend") @app.on_event("startup") def startup(): init_db() defaults_path = os.path.join(os.path.dirname(__file__), "defaults.json") if os.path.exists(defaults_path): with open(defaults_path) as f: data = json.load(f) seed_defaults(data["oils_meta"], data["recipes"]) if os.path.isdir(FRONTEND_DIR): # Serve static assets (js/css/images) directly app.mount("/assets", StaticFiles(directory=os.path.join(FRONTEND_DIR, "assets")), name="assets") app.mount("/public", StaticFiles(directory=FRONTEND_DIR), name="public") # SPA fallback: any non-API, non-asset route returns index.html from fastapi.responses import FileResponse @app.get("/{path:path}") async def spa_fallback(path: str): # Serve actual files if they exist (favicon, icons, etc.) file_path = os.path.join(FRONTEND_DIR, path) if os.path.isfile(file_path): return FileResponse(file_path) # Otherwise return index.html for Vue Router return FileResponse(os.path.join(FRONTEND_DIR, "index.html"))