Files
oil-formula-calculator/backend/main.py
Hera Zhao 2a823e5bac
Some checks failed
PR Preview / teardown-preview (pull_request) Has been skipped
Test / unit-test (push) Successful in 4s
Test / build-check (push) Successful in 4s
PR Preview / test (pull_request) Successful in 5s
PR Preview / deploy-preview (pull_request) Successful in 16s
Test / e2e-test (push) Failing after 1m22s
feat: 精油价目页优化 — 名称缩放、去容量、红色底色、下架功能
显示优化:
- 中文名和英文名各一行,nowrap+ellipsis不换行
- 去掉📖 emoji标记
- 去掉右侧容量标签
- 划掉的零售价后面加/瓶,跟会员价一致

管理员功能:
- 信息不全的精油(缺价格/滴数/零售价)显示浅红底色
- 补全后自动恢复正常底色
- 编辑弹窗加"下架"按钮,下架后所有人看到浅灰底色
- 已下架可"重新上架"

后端:
- OilIn model 加 is_active 字段
- upsert 支持更新 is_active

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 16:09:52 +00:00

1555 lines
63 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import FastAPI, HTTPException, Request, Depends
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import Optional
import json
import os
from backend.database import get_db, init_db, seed_defaults, log_audit
import hashlib
import secrets as _secrets
app = FastAPI(title="Essential Oil Formula Calculator API")
# ── Password hashing (PBKDF2-SHA256, stdlib) ─────────
def hash_password(password: str) -> str:
salt = _secrets.token_hex(16)
h = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 200_000)
return f"{salt}${h.hex()}"
def verify_password(password: str, stored: str) -> bool:
if not stored:
return False
if "$" not in stored:
# Legacy plaintext — compare directly
return password == stored
salt, h = stored.split("$", 1)
return hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 200_000).hex() == h
def _upgrade_password_if_needed(conn, user_id: int, password: str, stored: str):
"""If stored password is legacy plaintext, upgrade to hashed."""
if stored and "$" not in stored:
conn.execute("UPDATE users SET password = ? WHERE id = ?", (hash_password(password), user_id))
conn.commit()
# Periodic WAL checkpoint to ensure data is flushed to main DB file
import threading, time as _time
def _wal_checkpoint_loop():
while True:
_time.sleep(300) # Every 5 minutes
try:
conn = get_db()
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
conn.close()
except: pass
threading.Thread(target=_wal_checkpoint_loop, daemon=True).start()
# ── Auth ────────────────────────────────────────────────
ANON_USER = {"id": None, "role": "viewer", "username": "anonymous", "display_name": "匿名用户"}
def get_current_user(request: Request):
token = request.headers.get("Authorization", "").removeprefix("Bearer ").strip()
if not token:
return ANON_USER
conn = get_db()
user = conn.execute("SELECT id, username, role, display_name, password, business_verified FROM users WHERE token = ?", (token,)).fetchone()
conn.close()
if not user:
return ANON_USER
return dict(user)
def require_role(*roles):
"""Returns a dependency that checks the user has one of the given roles."""
def checker(user=Depends(get_current_user)):
if user["role"] not in roles:
raise HTTPException(403, "权限不足")
return user
return checker
# ── Models ──────────────────────────────────────────────
class OilIn(BaseModel):
name: str
bottle_price: float
drop_count: int
retail_price: Optional[float] = None
en_name: Optional[str] = None
is_active: Optional[int] = None
class IngredientIn(BaseModel):
oil_name: str
drops: float
class RecipeIn(BaseModel):
name: str
note: str = ""
ingredients: list[IngredientIn]
tags: list[str] = []
class RecipeUpdate(BaseModel):
name: Optional[str] = None
en_name: Optional[str] = None
note: Optional[str] = None
ingredients: Optional[list[IngredientIn]] = None
tags: Optional[list[str]] = None
version: Optional[int] = None
class UserIn(BaseModel):
username: str
role: str = "viewer"
display_name: str = ""
class UserUpdate(BaseModel):
role: Optional[str] = None
display_name: Optional[str] = None
# ── Me ──────────────────────────────────────────────────
APP_VERSION = "20260401"
@app.get("/api/version")
def get_version():
return {"version": APP_VERSION}
@app.get("/api/me")
def get_me(user=Depends(get_current_user)):
return {"username": user["username"], "role": user["role"], "display_name": user.get("display_name", ""), "id": user.get("id"), "has_password": bool(user.get("password")), "business_verified": bool(user.get("business_verified"))}
# ── Bug Reports ─────────────────────────────────────────
@app.post("/api/bug-report", status_code=201)
def submit_bug(body: dict, user=Depends(get_current_user)):
content = body.get("content", "").strip()
if not content:
raise HTTPException(400, "请输入内容")
conn = get_db()
who = user.get("display_name") or user.get("username") or "匿名"
# Admin can set priority; user-submitted bugs default to urgent (0)
default_pri = 2 if user.get("role") == "admin" else 0
priority = body.get("priority", default_pri)
c = conn.execute("INSERT INTO bug_reports (user_id, content, priority) VALUES (?, ?, ?)", (user.get("id"), content, priority))
bug_id = c.lastrowid
# Auto-log: created
conn.execute(
"INSERT INTO bug_comments (bug_id, user_id, action, content) VALUES (?, ?, ?, ?)",
(bug_id, user.get("id"), "创建", content),
)
# Only notify admin if submitter is NOT admin
if user.get("role") != "admin":
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", "🐛 Bug 反馈", f"{who}{content}\n[bug_id:{bug_id}]")
)
conn.commit()
conn.close()
return {"ok": True}
@app.get("/api/bug-reports")
def list_bugs(user=Depends(get_current_user)):
conn = get_db()
if user["role"] == "admin":
rows = conn.execute(
"SELECT b.id, b.content, b.is_resolved, b.created_at, b.user_id, b.priority, b.assigned_to, "
"u.display_name, u.username, a.display_name as assigned_name "
"FROM bug_reports b LEFT JOIN users u ON b.user_id = u.id LEFT JOIN users a ON b.assigned_to = a.id "
"ORDER BY b.priority ASC, COALESCE((SELECT MAX(c.created_at) FROM bug_comments c WHERE c.bug_id=b.id), b.created_at) DESC"
).fetchall()
else:
rows = conn.execute(
"SELECT b.id, b.content, b.is_resolved, b.created_at, b.user_id, b.priority, b.assigned_to, "
"u.display_name, u.username, a.display_name as assigned_name "
"FROM bug_reports b LEFT JOIN users u ON b.user_id = u.id LEFT JOIN users a ON b.assigned_to = a.id "
"WHERE b.user_id = ? OR b.assigned_to = ? OR b.is_resolved IN (1, 3) "
"ORDER BY b.priority ASC, COALESCE((SELECT MAX(c.created_at) FROM bug_comments c WHERE c.bug_id=b.id), b.created_at) DESC",
(user.get("id"), user.get("id"))
).fetchall()
bugs = [dict(r) for r in rows]
# Attach comments/log for each bug
for bug in bugs:
comments = conn.execute(
"SELECT c.id, c.action, c.content, c.created_at, u.display_name, u.username "
"FROM bug_comments c LEFT JOIN users u ON c.user_id = u.id "
"WHERE c.bug_id = ? ORDER BY c.created_at ASC",
(bug["id"],)
).fetchall()
bug["comments"] = [dict(c) for c in comments]
conn.close()
return bugs
@app.put("/api/bug-reports/{bug_id}")
def update_bug(bug_id: int, body: dict, user=Depends(get_current_user)):
conn = get_db()
bug = conn.execute("SELECT user_id, content, is_resolved FROM bug_reports WHERE id = ?", (bug_id,)).fetchone()
if not bug:
conn.close()
raise HTTPException(404, "Bug not found")
# status: 0=open, 1=待测试, 2=已修复(admin only), 3=已测试(tester feedback)
status_names = {0: "待处理", 1: "待测试", 2: "已修复", 3: "已测试"}
new_status = body.get("status")
note = body.get("note", "").strip()
if new_status is not None:
# Only admin can mark as resolved (status 2)
if new_status == 2 and user["role"] != "admin":
conn.close()
raise HTTPException(403, "只有管理员可以标记为已修复")
conn.execute("UPDATE bug_reports SET is_resolved = ? WHERE id = ?", (new_status, bug_id))
# Auto-log the status change
action = "" + status_names.get(new_status, str(new_status))
conn.execute(
"INSERT INTO bug_comments (bug_id, user_id, action, content) VALUES (?, ?, ?, ?)",
(bug_id, user.get("id"), action, note),
)
notify_user_id = body.get("notify_user_id")
if new_status == 1 and notify_user_id:
# Admin sends to specific tester — user-targeted notification
conn.execute("UPDATE bug_reports SET assigned_to = ? WHERE id = ?", (notify_user_id, bug_id))
target = conn.execute("SELECT role, display_name, username FROM users WHERE id = ?", (notify_user_id,)).fetchone()
if target:
msg = "请帮忙测试「" + bug["content"][:80] + ""
if note:
msg += "\n\n备注:" + note
msg += "\n[bug_id:" + str(bug_id) + "]"
conn.execute(
"INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)",
(target["role"], "🔧 Bug 待测试", msg, notify_user_id)
)
elif new_status == 1 and bug["user_id"]:
# Admin marks as testing → notify reporter
reporter = conn.execute("SELECT role, display_name, username FROM users WHERE id = ?", (bug["user_id"],)).fetchone()
if reporter:
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
(reporter["role"], "🔧 你的 Bug 已修复,请测试",
"请帮忙测试「" + bug["content"][:80] + "" + ("\n\n备注:" + note if note else ""))
)
if reporter["role"] in ("viewer", "editor"):
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("senior_editor", "🔧 Bug 待测试(来自" + (reporter["display_name"] or reporter["username"]) + "",
"问题「" + bug["content"][:50] + "」已修复,请协助测试确认。")
)
elif new_status == 3:
# Tester confirms tested → notify admin
who = user.get("display_name") or user.get("username") or "测试者"
msg = who + " 已测试「" + bug["content"][:50] + ""
if note:
msg += "\n\n备注:" + note
msg += "\n[bug_id:" + str(bug_id) + "]"
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", "🧪 Bug 已测试", msg)
)
elif new_status == 2:
# Admin marks as resolved
pass # No notification needed, admin did it themselves
if "content" in body:
conn.execute("UPDATE bug_reports SET content = ? WHERE id = ?", (body["content"], bug_id))
if "priority" in body:
conn.execute("UPDATE bug_reports SET priority = ? WHERE id = ?", (body["priority"], bug_id))
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/bug-reports/{bug_id}/comment")
@app.delete("/api/bug-reports/{bug_id}")
def delete_bug(bug_id: int, user=Depends(require_role("admin"))):
conn = get_db()
conn.execute("DELETE FROM bug_comments WHERE bug_id = ?", (bug_id,))
conn.execute("DELETE FROM bug_reports WHERE id = ?", (bug_id,))
conn.commit()
conn.close()
return {"ok": True}
def add_bug_comment(bug_id: int, body: dict, user=Depends(get_current_user)):
"""Add a manual comment/note to a bug's log."""
content = body.get("content", "").strip()
if not content:
raise HTTPException(400, "请输入内容")
conn = get_db()
bug = conn.execute("SELECT id FROM bug_reports WHERE id = ?", (bug_id,)).fetchone()
if not bug:
conn.close()
raise HTTPException(404, "Bug not found")
conn.execute(
"INSERT INTO bug_comments (bug_id, user_id, action, content) VALUES (?, ?, ?, ?)",
(bug_id, user.get("id"), "备注", content),
)
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/symptom-search")
def symptom_search(body: dict, user=Depends(get_current_user)):
"""Search recipes by symptom, notify editors if no good match."""
query = body.get("query", "").strip()
if not query:
return {"recipes": [], "exact": False}
conn = get_db()
# Search in recipe names
rows = conn.execute(
"SELECT id, name, note, owner_id, version, en_name FROM recipes ORDER BY id"
).fetchall()
exact = []
related = []
for r in rows:
if query in r["name"]:
exact.append(_recipe_to_dict(conn, r))
elif any(c in r["name"] for c in query):
related.append(_recipe_to_dict(conn, r))
# If user reports no match, notify editors
if body.get("report_missing"):
who = user.get("display_name") or user.get("username") or "用户"
for role in ("admin", "senior_editor", "editor"):
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
(role, "🔍 用户需求:" + query,
f"{who} 搜索了「{query}」,没有找到满意的配方,请考虑添加。")
)
conn.execute("INSERT INTO search_log (user_id, query, matched_count) VALUES (?, ?, ?)",
(user.get("id"), query, 0))
conn.commit()
conn.close()
return {"exact": exact, "related": related[:20]}
# ── Register ────────────────────────────────────────────
@app.post("/api/register", status_code=201)
def register(body: dict):
username = body.get("username", "").strip()
password = body.get("password", "").strip()
display_name = body.get("display_name", "").strip()
if not username or len(username) < 2:
raise HTTPException(400, "用户名至少2个字符")
if not password or len(password) < 4:
raise HTTPException(400, "密码至少4位")
token = _secrets.token_hex(24)
conn = get_db()
try:
conn.execute(
"INSERT INTO users (username, token, role, display_name, password) VALUES (?, ?, ?, ?, ?)",
(username, token, "viewer", display_name or username, hash_password(password))
)
conn.commit()
except Exception:
conn.close()
raise HTTPException(400, "用户名已被占用")
conn.close()
return {"token": token}
# ── Login ───────────────────────────────────────────────
@app.post("/api/login")
def login(body: dict):
username = body.get("username", "").strip()
password = body.get("password", "").strip()
if not username or not password:
raise HTTPException(400, "请输入用户名和密码")
conn = get_db()
user = conn.execute("SELECT id, token, password, display_name, role FROM users WHERE username = ?", (username,)).fetchone()
if not user:
conn.close()
raise HTTPException(401, "用户名不存在")
if not user["password"]:
conn.close()
raise HTTPException(401, "该账号未设置密码,请使用链接登录后设置密码")
if not verify_password(password, user["password"]):
conn.close()
raise HTTPException(401, "密码错误")
# Auto-upgrade legacy plaintext password to hashed
_upgrade_password_if_needed(conn, user["id"], password, user["password"])
conn.close()
return {"token": user["token"], "display_name": user["display_name"], "role": user["role"]}
@app.put("/api/me")
def update_me(body: dict, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
conn = get_db()
# Update display_name
if "display_name" in body:
dn = body["display_name"].strip()
if not dn:
conn.close()
raise HTTPException(400, "昵称不能为空")
conn.execute("UPDATE users SET display_name = ? WHERE id = ?", (dn, user["id"]))
# Update username
if "username" in body:
un = body["username"].strip()
if not un or len(un) < 2:
conn.close()
raise HTTPException(400, "用户名至少2个字符")
existing = conn.execute("SELECT id FROM users WHERE username = ? AND id != ?", (un, user["id"])).fetchone()
if existing:
conn.close()
raise HTTPException(400, "用户名已被占用")
conn.execute("UPDATE users SET username = ? WHERE id = ?", (un, user["id"]))
# Update password (requires old password verification)
if "password" in body:
pw = body["password"].strip()
if pw and len(pw) < 4:
conn.close()
raise HTTPException(400, "新密码至少4位")
old_pw = body.get("old_password", "").strip()
current_pw = user.get("password") or ""
if current_pw and not verify_password(old_pw, current_pw):
conn.close()
raise HTTPException(400, "当前密码不正确")
if pw:
conn.execute("UPDATE users SET password = ? WHERE id = ?", (hash_password(pw), user["id"]))
conn.commit()
conn.close()
return {"ok": True}
@app.put("/api/me/password")
def set_password(body: dict, user=Depends(get_current_user)):
"""Legacy endpoint, kept for compatibility."""
if not user["id"]:
raise HTTPException(403, "请先登录")
pw = body.get("password", "").strip()
if not pw or len(pw) < 4:
raise HTTPException(400, "密码至少4位")
conn = get_db()
conn.execute("UPDATE users SET password = ? WHERE id = ?", (hash_password(pw), user["id"]))
conn.commit()
conn.close()
return {"ok": True}
# ── Business Verification ──────────────────────────────
@app.post("/api/business-apply", status_code=201)
def business_apply(body: dict, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
conn = get_db()
# Check if already has pending application
existing = conn.execute(
"SELECT id, status FROM business_applications WHERE user_id = ? ORDER BY id DESC LIMIT 1",
(user["id"],)
).fetchone()
if existing and existing["status"] == "pending":
conn.close()
raise HTTPException(400, "已有待审核的申请")
if user.get("business_verified"):
conn.close()
raise HTTPException(400, "已是认证商业用户")
business_name = body.get("business_name", "").strip()
document = body.get("document", "") # base64 image
if not business_name:
conn.close()
raise HTTPException(400, "请填写商户名称")
if document and len(document) > 2000000:
conn.close()
raise HTTPException(400, "文件太大请压缩到1.5MB以内")
conn.execute(
"INSERT INTO business_applications (user_id, business_name, document) VALUES (?, ?, ?)",
(user["id"], business_name, document)
)
who = user.get("display_name") or user.get("username")
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", "🏢 商业认证申请", f"{who} 申请商业用户认证,商户名:{business_name}")
)
conn.commit()
conn.close()
return {"ok": True}
@app.get("/api/my-business-application")
def get_my_business_application(user=Depends(get_current_user)):
if not user["id"]:
return {"status": None}
conn = get_db()
row = conn.execute(
"SELECT business_name, document, status, reject_reason, created_at FROM business_applications WHERE user_id = ? ORDER BY id DESC LIMIT 1",
(user["id"],)
).fetchone()
conn.close()
if not row:
return {"status": None}
return dict(row)
@app.get("/api/business-applications")
def list_business_applications(user=Depends(require_role("admin"))):
conn = get_db()
rows = conn.execute(
"SELECT a.id, a.user_id, a.business_name, a.document, a.status, a.created_at, "
"u.display_name, u.username FROM business_applications a "
"LEFT JOIN users u ON a.user_id = u.id ORDER BY a.id DESC"
).fetchall()
conn.close()
return [dict(r) for r in rows]
@app.post("/api/business-applications/{app_id}/approve")
def approve_business(app_id: int, user=Depends(require_role("admin"))):
conn = get_db()
app = conn.execute("SELECT user_id, business_name FROM business_applications WHERE id = ?", (app_id,)).fetchone()
if not app:
conn.close()
raise HTTPException(404, "申请不存在")
conn.execute("UPDATE business_applications SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?", (app_id,))
conn.execute("UPDATE users SET business_verified = 1 WHERE id = ?", (app["user_id"],))
# Notify user
target = conn.execute("SELECT role FROM users WHERE id = ?", (app["user_id"],)).fetchone()
if target:
conn.execute(
"INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)",
(target["role"], "🎉 商业认证通过", "恭喜!你的商业用户认证已通过,现在可以使用项目核算等商业功能。", app["user_id"])
)
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/business-applications/{app_id}/reject")
def reject_business(app_id: int, body: dict = None, user=Depends(require_role("admin"))):
conn = get_db()
app = conn.execute("SELECT user_id FROM business_applications WHERE id = ?", (app_id,)).fetchone()
if not app:
conn.close()
raise HTTPException(404, "申请不存在")
reason = (body or {}).get("reason", "").strip()
conn.execute("UPDATE business_applications SET status = 'rejected', reviewed_at = datetime('now'), reject_reason = ? WHERE id = ?", (reason, app_id))
target = conn.execute("SELECT role FROM users WHERE id = ?", (app["user_id"],)).fetchone()
if target:
msg = "你的商业用户认证申请未通过。"
if reason:
msg += "\n\n原因:" + reason
msg += "\n\n你可以修改后重新申请。"
conn.execute(
"INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)",
(target["role"], "商业认证未通过", msg, app["user_id"])
)
conn.commit()
conn.close()
return {"ok": True}
# ── Translation Suggestions ────────────────────────────
@app.post("/api/translation-suggest", status_code=201)
def suggest_translation(body: dict, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
recipe_name = body.get("recipe_name", "").strip()
suggested_en = body.get("suggested_en", "").strip()
recipe_id = body.get("recipe_id")
if not recipe_name or not suggested_en:
raise HTTPException(400, "请填写翻译")
conn = get_db()
conn.execute(
"INSERT INTO translation_suggestions (recipe_id, recipe_name, suggested_en, user_id) VALUES (?, ?, ?, ?)",
(recipe_id, recipe_name, suggested_en, user["id"])
)
who = user.get("display_name") or user.get("username")
if user["role"] != "admin":
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", "📝 翻译建议", f"{who} 建议将「{recipe_name}」翻译为「{suggested_en}")
)
conn.commit()
conn.close()
return {"ok": True}
@app.get("/api/translation-suggestions")
def list_translation_suggestions(user=Depends(require_role("admin"))):
conn = get_db()
rows = conn.execute(
"SELECT s.id, s.recipe_id, s.recipe_name, s.suggested_en, s.status, s.created_at, "
"u.display_name, u.username FROM translation_suggestions s "
"LEFT JOIN users u ON s.user_id = u.id WHERE s.status = 'pending' ORDER BY s.id DESC"
).fetchall()
conn.close()
return [dict(r) for r in rows]
@app.post("/api/translation-suggestions/{sid}/approve")
def approve_translation(sid: int, user=Depends(require_role("admin"))):
conn = get_db()
row = conn.execute("SELECT recipe_name, suggested_en FROM translation_suggestions WHERE id = ?", (sid,)).fetchone()
if not row:
conn.close()
raise HTTPException(404)
conn.execute("UPDATE translation_suggestions SET status = 'approved' WHERE id = ?", (sid,))
conn.commit()
conn.close()
return {"ok": True, "recipe_name": row["recipe_name"], "suggested_en": row["suggested_en"]}
@app.post("/api/translation-suggestions/{sid}/reject")
def reject_translation(sid: int, user=Depends(require_role("admin"))):
conn = get_db()
conn.execute("UPDATE translation_suggestions SET status = 'rejected' WHERE id = ?", (sid,))
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/business-revoke/{user_id}")
def revoke_business(user_id: int, body: dict = None, user=Depends(require_role("admin"))):
conn = get_db()
conn.execute("UPDATE users SET business_verified = 0 WHERE id = ?", (user_id,))
reason = (body or {}).get("reason", "").strip()
target = conn.execute("SELECT role FROM users WHERE id = ?", (user_id,)).fetchone()
if target:
msg = "你的商业用户资格已被取消。"
if reason:
msg += "\n\n原因:" + reason
msg += "\n\n如有疑问请联系管理员,也可重新申请认证。"
conn.execute(
"INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)",
(target["role"], "商业资格已取消", msg, user_id)
)
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/impersonate")
def impersonate(body: dict, user=Depends(require_role("admin"))):
target_id = body.get("user_id")
if not target_id:
raise HTTPException(400, "user_id required")
conn = get_db()
target = conn.execute("SELECT token FROM users WHERE id = ?", (target_id,)).fetchone()
conn.close()
if not target:
raise HTTPException(404, "User not found")
return {"token": target["token"]}
# ── Oils ────────────────────────────────────────────────
@app.get("/api/oils")
def list_oils():
conn = get_db()
rows = conn.execute("SELECT name, bottle_price, drop_count, retail_price, is_active, en_name FROM oils ORDER BY name").fetchall()
conn.close()
return [dict(r) for r in rows]
@app.post("/api/oils", status_code=201)
def upsert_oil(oil: OilIn, user=Depends(require_role("admin", "senior_editor"))):
conn = get_db()
conn.execute(
"INSERT INTO oils (name, bottle_price, drop_count, retail_price, en_name, is_active) VALUES (?, ?, ?, ?, ?, ?) "
"ON CONFLICT(name) DO UPDATE SET bottle_price=excluded.bottle_price, drop_count=excluded.drop_count, "
"retail_price=excluded.retail_price, en_name=COALESCE(excluded.en_name, oils.en_name), "
"is_active=COALESCE(excluded.is_active, oils.is_active)",
(oil.name, oil.bottle_price, oil.drop_count, oil.retail_price, oil.en_name, oil.is_active),
)
log_audit(conn, user["id"], "upsert_oil", "oil", oil.name, oil.name,
json.dumps({"bottle_price": oil.bottle_price, "drop_count": oil.drop_count}))
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/oils/{name}")
def delete_oil(name: str, user=Depends(require_role("admin", "senior_editor"))):
conn = get_db()
row = conn.execute("SELECT name, bottle_price, drop_count, retail_price, is_active FROM oils WHERE name = ?", (name,)).fetchone()
snapshot = dict(row) if row else {}
conn.execute("DELETE FROM oils WHERE name = ?", (name,))
log_audit(conn, user["id"], "delete_oil", "oil", name, name,
json.dumps(snapshot, ensure_ascii=False))
conn.commit()
conn.close()
return {"ok": True}
# ── Recipes ─────────────────────────────────────────────
def _recipe_to_dict(conn, row):
rid = row["id"]
ings = conn.execute(
"SELECT oil_name, drops FROM recipe_ingredients WHERE recipe_id = ?", (rid,)
).fetchall()
tags = conn.execute(
"SELECT tag_name FROM recipe_tags WHERE recipe_id = ?", (rid,)
).fetchall()
owner = conn.execute(
"SELECT display_name, username FROM users WHERE id = ?", (row["owner_id"],)
).fetchone() if row["owner_id"] else None
return {
"id": rid,
"name": row["name"],
"en_name": row["en_name"] if "en_name" in row.keys() else "",
"note": row["note"],
"owner_id": row["owner_id"],
"owner_name": (owner["display_name"] or owner["username"]) if owner else None,
"version": row["version"] if "version" in row.keys() else 1,
"ingredients": [{"oil_name": i["oil_name"], "drops": i["drops"]} for i in ings],
"tags": [t["tag_name"] for t in tags],
}
@app.get("/api/recipes")
def list_recipes(user=Depends(get_current_user)):
conn = get_db()
# Admin sees all; others see admin-owned (adopted) + their own
if user["role"] == "admin":
rows = conn.execute("SELECT id, name, note, owner_id, version, en_name FROM recipes ORDER BY id").fetchall()
else:
admin = conn.execute("SELECT id FROM users WHERE role = 'admin' LIMIT 1").fetchone()
admin_id = admin["id"] if admin else 1
user_id = user.get("id")
if user_id:
rows = conn.execute(
"SELECT id, name, note, owner_id, version, en_name FROM recipes WHERE owner_id = ? OR owner_id = ? ORDER BY id",
(admin_id, user_id)
).fetchall()
else:
rows = conn.execute(
"SELECT id, name, note, owner_id, version, en_name FROM recipes WHERE owner_id = ? ORDER BY id",
(admin_id,)
).fetchall()
result = [_recipe_to_dict(conn, r) for r in rows]
conn.close()
return result
@app.get("/api/recipes/{recipe_id}")
def get_recipe(recipe_id: int):
conn = get_db()
row = conn.execute("SELECT id, name, note, owner_id, version, en_name FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
if not row:
conn.close()
raise HTTPException(404, "Recipe not found")
result = _recipe_to_dict(conn, row)
conn.close()
return result
@app.post("/api/recipes", status_code=201)
def create_recipe(recipe: RecipeIn, user=Depends(get_current_user)):
if not user.get("id"):
raise HTTPException(401, "请先登录")
conn = get_db()
c = conn.cursor()
c.execute("INSERT INTO recipes (name, note, owner_id) VALUES (?, ?, ?)",
(recipe.name, recipe.note, user["id"]))
rid = c.lastrowid
for ing in recipe.ingredients:
c.execute(
"INSERT INTO recipe_ingredients (recipe_id, oil_name, drops) VALUES (?, ?, ?)",
(rid, ing.oil_name, ing.drops),
)
for tag in recipe.tags:
c.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag,))
c.execute("INSERT OR IGNORE INTO recipe_tags (recipe_id, tag_name) VALUES (?, ?)", (rid, tag))
log_audit(conn, user["id"], "create_recipe", "recipe", rid, recipe.name)
# Notify admin when non-admin creates a recipe
if user["role"] != "admin":
who = user.get("display_name") or user["username"]
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", "📝 新配方待审核",
f"{who} 新增了配方「{recipe.name}」,请到管理配方查看并采纳。")
)
conn.commit()
conn.close()
return {"id": rid}
def _check_recipe_permission(conn, recipe_id, user):
"""Check if user can modify this recipe."""
row = conn.execute("SELECT owner_id, name FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
if not row:
raise HTTPException(404, "Recipe not found")
if user["role"] in ("admin", "senior_editor"):
return row
if row["owner_id"] == user.get("id"):
return row
raise HTTPException(403, "只能修改自己创建的配方")
@app.put("/api/recipes/{recipe_id}")
def update_recipe(recipe_id: int, update: RecipeUpdate, user=Depends(get_current_user)):
if not user.get("id"):
raise HTTPException(401, "请先登录")
conn = get_db()
c = conn.cursor()
_check_recipe_permission(conn, recipe_id, user)
# Optimistic locking: check version if provided
if update.version is not None:
current = c.execute("SELECT version FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
if current and current["version"] and current["version"] != update.version:
conn.close()
raise HTTPException(409, "此配方已被其他人修改,请刷新后重试")
if update.name is not None:
c.execute("UPDATE recipes SET name = ? WHERE id = ?", (update.name, recipe_id))
if update.note is not None:
c.execute("UPDATE recipes SET note = ? WHERE id = ?", (update.note, recipe_id))
if update.en_name is not None:
c.execute("UPDATE recipes SET en_name = ? WHERE id = ?", (update.en_name, recipe_id))
if update.ingredients is not None:
c.execute("DELETE FROM recipe_ingredients WHERE recipe_id = ?", (recipe_id,))
for ing in update.ingredients:
c.execute(
"INSERT INTO recipe_ingredients (recipe_id, oil_name, drops) VALUES (?, ?, ?)",
(recipe_id, ing.oil_name, ing.drops),
)
if update.tags is not None:
c.execute("DELETE FROM recipe_tags WHERE recipe_id = ?", (recipe_id,))
for tag in update.tags:
c.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag,))
c.execute(
"INSERT OR IGNORE INTO recipe_tags (recipe_id, tag_name) VALUES (?, ?)",
(recipe_id, tag),
)
c.execute("UPDATE recipes SET updated_by = ?, version = COALESCE(version, 1) + 1 WHERE id = ?", (user["id"], recipe_id))
log_audit(conn, user["id"], "update_recipe", "recipe", recipe_id, update.name)
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/recipes/{recipe_id}")
def delete_recipe(recipe_id: int, user=Depends(get_current_user)):
if not user.get("id"):
raise HTTPException(401, "请先登录")
conn = get_db()
row = _check_recipe_permission(conn, recipe_id, user)
# Save full snapshot for undo
full = conn.execute("SELECT id, name, note, owner_id, version, en_name FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
snapshot = _recipe_to_dict(conn, full)
log_audit(conn, user["id"], "delete_recipe", "recipe", recipe_id, row["name"],
json.dumps(snapshot, ensure_ascii=False))
conn.execute("DELETE FROM recipes WHERE id = ?", (recipe_id,))
conn.commit()
conn.close()
return {"ok": True}
# ── Adopt (admin takes ownership of editor recipes) ────
@app.post("/api/recipes/{recipe_id}/adopt")
def adopt_recipe(recipe_id: int, user=Depends(require_role("admin"))):
conn = get_db()
row = conn.execute("SELECT id, name, owner_id FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
if not row:
conn.close()
raise HTTPException(404, "Recipe not found")
if row["owner_id"] == user["id"]:
conn.close()
return {"ok": True, "msg": "already owned"}
old_owner = conn.execute("SELECT display_name, username FROM users WHERE id = ?", (row["owner_id"],)).fetchone()
old_name = (old_owner["display_name"] or old_owner["username"]) if old_owner else "unknown"
conn.execute("UPDATE recipes SET owner_id = ?, updated_by = ? WHERE id = ?", (user["id"], user["id"], recipe_id))
log_audit(conn, user["id"], "adopt_recipe", "recipe", recipe_id, row["name"],
json.dumps({"from_user": old_name}))
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/recipes/adopt-batch")
def adopt_batch(body: dict, user=Depends(require_role("admin"))):
ids = body.get("ids", [])
if not ids:
raise HTTPException(400, "No recipe ids provided")
conn = get_db()
adopted = 0
for rid in ids:
row = conn.execute("SELECT id, name, owner_id FROM recipes WHERE id = ?", (rid,)).fetchone()
if row and row["owner_id"] != user["id"]:
conn.execute("UPDATE recipes SET owner_id = ?, updated_by = ? WHERE id = ?", (user["id"], user["id"], rid))
log_audit(conn, user["id"], "adopt_recipe", "recipe", rid, row["name"])
adopted += 1
conn.commit()
conn.close()
return {"ok": True, "adopted": adopted}
# ── Tags ────────────────────────────────────────────────
@app.get("/api/tags")
def list_tags():
conn = get_db()
rows = conn.execute("SELECT name FROM tags ORDER BY name").fetchall()
conn.close()
return [r["name"] for r in rows]
@app.post("/api/tags", status_code=201)
def create_tag(body: dict, user=Depends(require_role("admin", "senior_editor", "editor"))):
name = body.get("name", "").strip()
if not name:
raise HTTPException(400, "Tag name required")
conn = get_db()
conn.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (name,))
# Don't log tag creation (too frequent/noisy)
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/tags/{name}")
def delete_tag(name: str, user=Depends(require_role("admin"))):
conn = get_db()
conn.execute("DELETE FROM recipe_tags WHERE tag_name = ?", (name,))
conn.execute("DELETE FROM tags WHERE name = ?", (name,))
log_audit(conn, user["id"], "delete_tag", "tag", name, name)
conn.commit()
conn.close()
return {"ok": True}
# ── Users (admin only) ─────────────────────────────────
@app.get("/api/users")
def list_users(user=Depends(require_role("admin"))):
conn = get_db()
rows = conn.execute("SELECT id, username, token, role, display_name, created_at, business_verified FROM users ORDER BY id").fetchall()
conn.close()
return [dict(r) for r in rows]
@app.post("/api/users", status_code=201)
def create_user(body: UserIn, user=Depends(require_role("admin"))):
token = _secrets.token_hex(24)
conn = get_db()
try:
conn.execute(
"INSERT INTO users (username, token, role, display_name) VALUES (?, ?, ?, ?)",
(body.username, token, body.role, body.display_name),
)
log_audit(conn, user["id"], "create_user", "user", body.username, body.display_name,
json.dumps({"role": body.role}))
conn.commit()
except Exception:
conn.close()
raise HTTPException(400, "用户名已存在")
conn.close()
return {"token": token, "username": body.username}
@app.delete("/api/users/{user_id}")
def delete_user(user_id: int, user=Depends(require_role("admin"))):
if user_id == user["id"]:
raise HTTPException(400, "不能删除自己")
conn = get_db()
target = conn.execute("SELECT id, username, token, role, display_name FROM users WHERE id = ?", (user_id,)).fetchone()
if not target:
conn.close()
raise HTTPException(404, "User not found")
snapshot = dict(target)
conn.execute("DELETE FROM users WHERE id = ?", (user_id,))
log_audit(conn, user["id"], "delete_user", "user", user_id, target["username"],
json.dumps(snapshot, ensure_ascii=False))
conn.commit()
conn.close()
return {"ok": True}
@app.put("/api/users/{user_id}")
def update_user(user_id: int, body: UserUpdate, user=Depends(require_role("admin"))):
conn = get_db()
if body.role is not None:
conn.execute("UPDATE users SET role = ? WHERE id = ?", (body.role, user_id))
if body.display_name is not None:
conn.execute("UPDATE users SET display_name = ? WHERE id = ?", (body.display_name, user_id))
log_audit(conn, user["id"], "update_user", "user", user_id, None,
json.dumps({"role": body.role, "display_name": body.display_name}))
conn.commit()
conn.close()
return {"ok": True}
# ── Undo (admin only) ──────────────────────────────────
@app.post("/api/audit-log/{log_id}/undo")
def undo_action(log_id: int, user=Depends(require_role("admin"))):
conn = get_db()
entry = conn.execute("SELECT * FROM audit_log WHERE id = ?", (log_id,)).fetchone()
if not entry:
conn.close()
raise HTTPException(404, "Audit log entry not found")
action = entry["action"]
detail = entry["detail"]
if not detail:
conn.close()
raise HTTPException(400, "此操作无法撤销(无快照数据)")
snapshot = json.loads(detail)
if action == "delete_recipe":
# Restore recipe from snapshot
c = conn.cursor()
c.execute("INSERT INTO recipes (name, note, owner_id) VALUES (?, ?, ?)",
(snapshot["name"], snapshot.get("note", ""), snapshot.get("owner_id")))
new_id = c.lastrowid
for ing in snapshot.get("ingredients", []):
c.execute("INSERT INTO recipe_ingredients (recipe_id, oil_name, drops) VALUES (?, ?, ?)",
(new_id, ing["oil_name"], ing["drops"]))
for tag in snapshot.get("tags", []):
c.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag,))
c.execute("INSERT OR IGNORE INTO recipe_tags (recipe_id, tag_name) VALUES (?, ?)", (new_id, tag))
log_audit(conn, user["id"], "undo_delete_recipe", "recipe", new_id, snapshot["name"],
json.dumps({"original_log_id": log_id}))
conn.commit()
conn.close()
return {"ok": True, "new_id": new_id}
elif action == "delete_oil":
conn.execute(
"INSERT OR IGNORE INTO oils (name, bottle_price, drop_count) VALUES (?, ?, ?)",
(snapshot["name"], snapshot["bottle_price"], snapshot["drop_count"]))
log_audit(conn, user["id"], "undo_delete_oil", "oil", snapshot["name"], snapshot["name"],
json.dumps({"original_log_id": log_id}))
conn.commit()
conn.close()
return {"ok": True}
elif action == "delete_user":
try:
conn.execute(
"INSERT INTO users (username, token, role, display_name) VALUES (?, ?, ?, ?)",
(snapshot["username"], snapshot["token"], snapshot["role"], snapshot.get("display_name", "")))
log_audit(conn, user["id"], "undo_delete_user", "user", snapshot["username"], snapshot.get("display_name"),
json.dumps({"original_log_id": log_id}))
conn.commit()
except Exception:
conn.close()
raise HTTPException(400, "恢复失败(用户名可能已被占用)")
conn.close()
return {"ok": True}
conn.close()
raise HTTPException(400, "此操作类型不支持撤销")
# ── Audit Log (admin only) ─────────────────────────────
@app.get("/api/audit-log")
def get_audit_log(limit: int = 100, offset: int = 0, user=Depends(require_role("admin"))):
conn = get_db()
rows = conn.execute(
"SELECT a.id, a.action, a.target_type, a.target_id, a.target_name, a.detail, a.created_at, "
"u.display_name as user_name, u.username "
"FROM audit_log a LEFT JOIN users u ON a.user_id = u.id "
"ORDER BY a.id DESC LIMIT ? OFFSET ?",
(limit, offset),
).fetchall()
conn.close()
return [dict(r) for r in rows]
# ── Brand Settings ─────────────────────────────────────
@app.get("/api/brand")
def get_brand(user=Depends(get_current_user)):
if not user["id"]:
return {"qr_code": None, "brand_logo": None, "brand_bg": None, "brand_name": None, "brand_align": "center"}
conn = get_db()
row = conn.execute("SELECT qr_code, brand_logo, brand_bg, brand_name, brand_align FROM users WHERE id = ?", (user["id"],)).fetchone()
conn.close()
if not row:
return {"qr_code": None, "brand_logo": None, "brand_bg": None, "brand_name": None, "brand_align": "center"}
return dict(row)
@app.put("/api/brand")
def update_brand(body: dict, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
conn = get_db()
if "qr_code" in body:
# Limit size: ~500KB base64
if body["qr_code"] and len(body["qr_code"]) > 700000:
raise HTTPException(400, "图片太大,请压缩后上传")
conn.execute("UPDATE users SET qr_code = ? WHERE id = ?", (body["qr_code"], user["id"]))
if "brand_logo" in body:
if body["brand_logo"] and len(body["brand_logo"]) > 700000:
raise HTTPException(400, "图片太大,请压缩后上传")
conn.execute("UPDATE users SET brand_logo = ? WHERE id = ?", (body["brand_logo"], user["id"]))
if "brand_bg" in body:
if body["brand_bg"] and len(body["brand_bg"]) > 1500000:
raise HTTPException(400, "背景图太大请压缩到1MB以内")
conn.execute("UPDATE users SET brand_bg = ? WHERE id = ?", (body["brand_bg"], user["id"]))
if "brand_name" in body:
conn.execute("UPDATE users SET brand_name = ? WHERE id = ?", (body["brand_name"], user["id"]))
if "brand_align" in body:
conn.execute("UPDATE users SET brand_align = ? WHERE id = ?", (body["brand_align"], user["id"]))
conn.commit()
conn.close()
return {"ok": True}
# ── Profit Projects ────────────────────────────────────
@app.get("/api/projects")
def list_projects():
conn = get_db()
rows = conn.execute("SELECT * FROM profit_projects ORDER BY id DESC").fetchall()
conn.close()
return [{ **dict(r), "ingredients": json.loads(r["ingredients"]) } for r in rows]
@app.post("/api/projects", status_code=201)
def create_project(body: dict, user=Depends(require_role("admin", "senior_editor"))):
conn = get_db()
c = conn.cursor()
c.execute(
"INSERT INTO profit_projects (name, ingredients, pricing, note, created_by) VALUES (?, ?, ?, ?, ?)",
(body["name"], json.dumps(body.get("ingredients", []), ensure_ascii=False),
body.get("pricing", 0), body.get("note", ""), user["id"])
)
conn.commit()
pid = c.lastrowid
conn.close()
return {"id": pid}
@app.put("/api/projects/{pid}")
def update_project(pid: int, body: dict, user=Depends(require_role("admin", "senior_editor"))):
conn = get_db()
if "name" in body:
conn.execute("UPDATE profit_projects SET name = ? WHERE id = ?", (body["name"], pid))
if "ingredients" in body:
conn.execute("UPDATE profit_projects SET ingredients = ? WHERE id = ?",
(json.dumps(body["ingredients"], ensure_ascii=False), pid))
if "pricing" in body:
conn.execute("UPDATE profit_projects SET pricing = ? WHERE id = ?", (body["pricing"], pid))
if "note" in body:
conn.execute("UPDATE profit_projects SET note = ? WHERE id = ?", (body["note"], pid))
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/projects/{pid}")
def delete_project(pid: int, user=Depends(require_role("admin", "senior_editor"))):
conn = get_db()
conn.execute("DELETE FROM profit_projects WHERE id = ?", (pid,))
conn.commit()
conn.close()
return {"ok": True}
# ── Diary (personal recipes + journal) ─────────────────
@app.get("/api/diary")
def list_diary(user=Depends(get_current_user)):
if not user["id"]:
return []
conn = get_db()
rows = conn.execute(
"SELECT id, source_recipe_id, name, ingredients, note, tags, created_at "
"FROM user_diary WHERE user_id = ? ORDER BY id DESC", (user["id"],)
).fetchall()
result = []
for r in rows:
entries = conn.execute(
"SELECT id, content, created_at FROM diary_entries WHERE diary_id = ? ORDER BY created_at DESC", (r["id"],)
).fetchall()
d = dict(r)
d["ingredients"] = json.loads(r["ingredients"])
d["tags"] = json.loads(r["tags"] or "[]")
d["entries"] = [dict(e) for e in entries]
result.append(d)
conn.close()
return result
@app.post("/api/diary", status_code=201)
def create_diary(body: dict, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
name = body.get("name", "").strip()
ingredients = body.get("ingredients", [])
note = body.get("note", "")
source_id = body.get("source_recipe_id")
if not name:
raise HTTPException(400, "请输入配方名称")
conn = get_db()
c = conn.cursor()
c.execute(
"INSERT INTO user_diary (user_id, source_recipe_id, name, ingredients, note) VALUES (?, ?, ?, ?, ?)",
(user["id"], source_id, name, json.dumps(ingredients, ensure_ascii=False), note)
)
conn.commit()
did = c.lastrowid
conn.close()
return {"id": did}
@app.put("/api/diary/{diary_id}")
def update_diary(diary_id: int, body: dict, user=Depends(get_current_user)):
conn = get_db()
row = conn.execute("SELECT user_id FROM user_diary WHERE id = ?", (diary_id,)).fetchone()
if not row or row["user_id"] != user["id"]:
conn.close()
raise HTTPException(403, "无权操作")
if "name" in body:
conn.execute("UPDATE user_diary SET name = ? WHERE id = ?", (body["name"], diary_id))
if "note" in body:
conn.execute("UPDATE user_diary SET note = ? WHERE id = ?", (body["note"], diary_id))
if "ingredients" in body:
conn.execute("UPDATE user_diary SET ingredients = ? WHERE id = ?",
(json.dumps(body["ingredients"], ensure_ascii=False), diary_id))
if "tags" in body:
conn.execute("UPDATE user_diary SET tags = ? WHERE id = ?",
(json.dumps(body["tags"], ensure_ascii=False), diary_id))
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/diary/{diary_id}")
def delete_diary(diary_id: int, user=Depends(get_current_user)):
conn = get_db()
row = conn.execute("SELECT user_id FROM user_diary WHERE id = ?", (diary_id,)).fetchone()
if not row or row["user_id"] != user["id"]:
conn.close()
raise HTTPException(403, "无权操作")
conn.execute("DELETE FROM user_diary WHERE id = ?", (diary_id,))
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/diary/{diary_id}/entries", status_code=201)
def add_diary_entry(diary_id: int, body: dict, user=Depends(get_current_user)):
conn = get_db()
row = conn.execute("SELECT user_id FROM user_diary WHERE id = ?", (diary_id,)).fetchone()
if not row or row["user_id"] != user["id"]:
conn.close()
raise HTTPException(403, "无权操作")
content = body.get("content", "").strip()
if not content:
raise HTTPException(400, "内容不能为空")
conn.execute("INSERT INTO diary_entries (diary_id, content) VALUES (?, ?)", (diary_id, content))
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/diary/entries/{entry_id}")
def delete_diary_entry(entry_id: int, user=Depends(get_current_user)):
conn = get_db()
row = conn.execute(
"SELECT d.user_id FROM diary_entries e JOIN user_diary d ON e.diary_id = d.id WHERE e.id = ?",
(entry_id,)
).fetchone()
if not row or row["user_id"] != user["id"]:
conn.close()
raise HTTPException(403, "无权操作")
conn.execute("DELETE FROM diary_entries WHERE id = ?", (entry_id,))
conn.commit()
conn.close()
return {"ok": True}
# ── Favorites ──────────────────────────────────────────
@app.get("/api/favorites")
def get_favorites(user=Depends(get_current_user)):
if not user["id"]:
return []
conn = get_db()
rows = conn.execute("SELECT recipe_id FROM user_favorites WHERE user_id = ? ORDER BY created_at DESC", (user["id"],)).fetchall()
conn.close()
return [r["recipe_id"] for r in rows]
@app.post("/api/favorites/{recipe_id}", status_code=201)
def add_favorite(recipe_id: int, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
conn = get_db()
conn.execute("INSERT OR IGNORE INTO user_favorites (user_id, recipe_id) VALUES (?, ?)", (user["id"], recipe_id))
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/favorites/{recipe_id}")
def remove_favorite(recipe_id: int, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
conn = get_db()
conn.execute("DELETE FROM user_favorites WHERE user_id = ? AND recipe_id = ?", (user["id"], recipe_id))
conn.commit()
conn.close()
return {"ok": True}
# ── User Inventory ─────────────────────────────────────
@app.get("/api/inventory")
def get_inventory(user=Depends(get_current_user)):
if not user["id"]:
return []
conn = get_db()
rows = conn.execute("SELECT oil_name FROM user_inventory WHERE user_id = ? ORDER BY oil_name", (user["id"],)).fetchall()
conn.close()
return [r["oil_name"] for r in rows]
@app.post("/api/inventory", status_code=201)
def add_inventory(body: dict, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
name = body.get("oil_name", "").strip()
if not name:
raise HTTPException(400, "oil_name required")
conn = get_db()
conn.execute("INSERT OR IGNORE INTO user_inventory (user_id, oil_name) VALUES (?, ?)", (user["id"], name))
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/inventory/{oil_name}")
def remove_inventory(oil_name: str, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
conn = get_db()
conn.execute("DELETE FROM user_inventory WHERE user_id = ? AND oil_name = ?", (user["id"], oil_name))
conn.commit()
conn.close()
return {"ok": True}
@app.get("/api/inventory/recipes")
def recipes_by_inventory(user=Depends(get_current_user)):
"""Get recipes that can be made with user's inventory oils."""
if not user["id"]:
return []
conn = get_db()
inv = [r["oil_name"] for r in conn.execute(
"SELECT oil_name FROM user_inventory WHERE user_id = ?", (user["id"],)).fetchall()]
if not inv:
conn.close()
return []
rows = conn.execute("SELECT id, name, note, owner_id, version, en_name FROM recipes ORDER BY id").fetchall()
result = []
for r in rows:
recipe = _recipe_to_dict(conn, r)
eo_oils = [i["oil_name"] for i in recipe["ingredients"] if i["oil_name"] != "椰子油"]
matched = [o for o in eo_oils if o in inv]
if matched:
recipe["inventory_match"] = len(matched)
recipe["inventory_total"] = len(eo_oils)
recipe["inventory_missing"] = [o for o in eo_oils if o not in inv]
result.append(recipe)
conn.close()
result.sort(key=lambda r: (-r["inventory_match"], r["inventory_total"] - r["inventory_match"]))
return result
# ── Search Logging ─────────────────────────────────────
@app.post("/api/search-log")
def log_search(body: dict, user=Depends(get_current_user)):
query = body.get("query", "").strip()
matched = body.get("matched_count", 0)
if not query:
return {"ok": True}
conn = get_db()
conn.execute("INSERT INTO search_log (user_id, query, matched_count) VALUES (?, ?, ?)",
(user.get("id"), query, matched))
# Instant notification when no match found
if matched == 0:
who = user.get("display_name") or user.get("username") or "用户"
for role in ("admin", "senior_editor"):
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
(role, "🔍 有人搜索了未收录的配方",
f"{who} 搜索了「{query}」但没有找到匹配配方,请考虑添加。")
)
conn.commit()
conn.close()
return {"ok": True, "matched": matched}
@app.get("/api/search-log/unmatched")
def get_unmatched_searches(days: int = 7, user=Depends(require_role("admin", "senior_editor"))):
conn = get_db()
rows = conn.execute(
"SELECT query, COUNT(*) as cnt, MAX(created_at) as last_at "
"FROM search_log WHERE matched_count = 0 AND created_at > datetime('now', ?) "
"GROUP BY query ORDER BY cnt DESC LIMIT 50",
(f"-{days} days",)
).fetchall()
conn.close()
return [dict(r) for r in rows]
# ── Notifications ──────────────────────────────────────
@app.get("/api/notifications")
def get_notifications(user=Depends(get_current_user)):
if not user["id"]:
return []
conn = get_db()
rows = conn.execute(
"SELECT id, title, body, is_read, created_at FROM notifications "
"WHERE (target_user_id = ? OR (target_user_id IS NULL AND (target_role = ? OR target_role = 'all'))) "
"ORDER BY is_read ASC, id DESC LIMIT 200",
(user["id"], user["role"])
).fetchall()
conn.close()
return [dict(r) for r in rows]
@app.post("/api/notifications/{nid}/read")
def mark_notification_read(nid: int, body: dict = None, user=Depends(get_current_user)):
conn = get_db()
notif = conn.execute("SELECT title FROM notifications WHERE id = ?", (nid,)).fetchone()
# Bug test notifications can only be marked read with force=true (from "已测试" flow)
force = (body or {}).get("force", False) if body else False
if notif and "待测试" in (notif["title"] or "") and not force:
conn.close()
raise HTTPException(400, "请先点击「已测试」完成测试")
conn.execute("UPDATE notifications SET is_read = 1 WHERE id = ?", (nid,))
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/notifications/{nid}/unread")
def mark_notification_unread(nid: int, user=Depends(get_current_user)):
conn = get_db()
conn.execute("UPDATE notifications SET is_read = 0 WHERE id = ?", (nid,))
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/notifications/read-all")
def mark_all_notifications_read(user=Depends(get_current_user)):
conn = get_db()
# Mark all as read EXCEPT bug test notifications (title contains '待测试')
conn.execute(
"UPDATE notifications SET is_read = 1 WHERE is_read = 0 "
"AND title NOT LIKE '%待测试%' "
"AND (target_user_id = ? OR (target_user_id IS NULL AND (target_role = ? OR target_role = 'all')))",
(user["id"], user["role"])
)
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/cron/weekly-review")
def weekly_review(user=Depends(require_role("admin"))):
"""Generate weekly notifications. Call via cron or manually."""
conn = get_db()
# 1. Pending recipes for admin review
admin = conn.execute("SELECT id FROM users WHERE role = 'admin' LIMIT 1").fetchone()
admin_id = admin["id"] if admin else 1
pending = conn.execute(
"SELECT COUNT(*) as cnt FROM recipes WHERE owner_id != ?", (admin_id,)
).fetchone()["cnt"]
if pending > 0:
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", f"{pending} 条配方待审核",
"其他用户新增了配方,请到「管理配方」→「待审核」查看并采纳。")
)
# 2. Unmatched searches for admin + senior_editor
unmatched = conn.execute(
"SELECT query, COUNT(*) as cnt FROM search_log "
"WHERE matched_count = 0 AND created_at > datetime('now', '-7 days') "
"GROUP BY query ORDER BY cnt DESC LIMIT 10"
).fetchall()
if unmatched:
queries = "".join([f"{r['query']}」({r['cnt']}次)" for r in unmatched])
for role in ("admin", "senior_editor"):
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
(role, "本周未匹配的搜索需求",
f"以下搜索没有找到配方:{queries}。请考虑完善相关配方。")
)
conn.commit()
conn.close()
return {"ok": True, "pending_recipes": pending, "unmatched_queries": len(unmatched)}
# ── Category Modules (homepage) ────────────────────────
@app.get("/api/categories")
def list_categories():
conn = get_db()
rows = conn.execute("SELECT * FROM category_modules ORDER BY sort_order, id").fetchall()
conn.close()
return [dict(r) for r in rows]
@app.post("/api/categories", status_code=201)
def create_category(body: dict, user=Depends(require_role("admin"))):
conn = get_db()
conn.execute(
"INSERT INTO category_modules (name, subtitle, icon, bg_image, color_from, color_to, tag_name, sort_order) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(body["name"], body.get("subtitle", ""), body.get("icon", "🌿"), body.get("bg_image", ""),
body.get("color_from", "#7a9e7e"), body.get("color_to", "#5a7d5e"),
body["tag_name"], body.get("sort_order", 0))
)
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/categories/{cat_id}")
def delete_category(cat_id: int, user=Depends(require_role("admin"))):
conn = get_db()
conn.execute("DELETE FROM category_modules WHERE id = ?", (cat_id,))
conn.commit()
conn.close()
return {"ok": True}
# ── Static files (frontend) ────────────────────────────
FRONTEND_DIR = os.environ.get("FRONTEND_DIR", "/app/frontend")
@app.on_event("startup")
def startup():
init_db()
defaults_path = os.path.join(os.path.dirname(__file__), "defaults.json")
if os.path.exists(defaults_path):
with open(defaults_path) as f:
data = json.load(f)
seed_defaults(data["oils_meta"], data["recipes"])
if os.path.isdir(FRONTEND_DIR):
# Serve static assets (js/css/images) directly
app.mount("/assets", StaticFiles(directory=os.path.join(FRONTEND_DIR, "assets")), name="assets")
app.mount("/public", StaticFiles(directory=FRONTEND_DIR), name="public")
# SPA fallback: any non-API, non-asset route returns index.html
from fastapi.responses import FileResponse
@app.get("/{path:path}")
async def spa_fallback(path: str):
# Serve actual files if they exist (favicon, icons, etc.)
file_path = os.path.join(FRONTEND_DIR, path)
if os.path.isfile(file_path):
return FileResponse(file_path)
# Otherwise return index.html for Vue Router
return FileResponse(os.path.join(FRONTEND_DIR, "index.html"))