Files
oil-formula-calculator/backend/main.py
Hera Zhao c04bb53ddd
Some checks failed
Test / unit-test (push) Successful in 7s
Test / build-check (push) Successful in 4s
PR Preview / teardown-preview (pull_request) Has been skipped
Test / e2e-test (push) Failing after 1m12s
PR Preview / test (pull_request) Successful in 7s
PR Preview / deploy-preview (pull_request) Successful in 19s
fix: 精油编辑统一保存到 DB + 导出 Excel 增加功效列
- 新增 oil_cards 表,持久化知识卡片(功效/用法/方法/注意/emoji)
- POST /api/oils 扩展接受 card_* 字段,在同一事务里 upsert oil_cards
- GET /api/oil-cards 返回全部卡片
- 前端 getOilCard 优先查 DB,再 fallback 静态表
- saveEditOil 统一走 saveOil,不再分两套保存
- 精油价目 Excel 导出增加「功效」列
- 首次部署自动从静态 OIL_CARDS 播种到 oil_cards 表

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 10:56:30 +00:00

2049 lines
98 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import FastAPI, HTTPException, Request, Depends
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import Optional
import json
import os
from backend.database import get_db, init_db, seed_defaults, log_audit
from backend.translate import auto_translate
import hashlib
import secrets as _secrets
app = FastAPI(title="Essential Oil Formula Calculator API")
# Default oil knowledge cards for DB seeding (mirrors frontend OIL_CARDS)
DEFAULT_OIL_CARDS = {
"野橘": {"emoji": "🍊", "en": "Wild Orange", "effects": "安抚镇静、驱散负能量、紧张情绪和压力\n抗氧化、消炎、抗病毒、增强免疫\n提振食欲,刺激胆汁分泌,促进消化\n促进循环", "usage": "日常香薰,提振情绪、舒压,营造愉悦氛围\n饮水加入 1至2 滴,护肝抗病毒\n泡澡时滴 3至4 滴,消除疲劳,放松身心\n扫地、拖地时加入 3至5 滴,清新空气\n涂抹腹部促进消化\n涂抹肝区帮助肝脏排毒\n口腔溃疡时加入水中漱口", "method": "🔹香薰 🔸内用 🔺涂抹", "caution": "轻微光敏,白天涂抹注意防晒"},
"冬青": {"emoji": "🌿", "en": "Wintergreen", "effects": "强效镇痛(肌肉、关节)\n抗炎、促进循环\n舒缓紧绷肌肉,抗痉挛", "usage": "牙疼时加 1 滴到水中漱口\n扭伤、落枕、酸痛(如肩颈酸痛)处稀释涂抹\n运动前后按摩", "method": "🔹香薰 |🔺涂抹(需 6 倍稀释)", "caution": "不可内用、孕期慎用、避免儿童误食"},
"生姜": {"emoji": "🫚", "en": "Ginger", "effects": "促进消化、暖胃\n活血、改善循环、祛湿\n抗炎、抗氧化、强健免疫\n缓解恶心、晕车\n促进骨骼、肌肉和关节的健康", "usage": "胀气、腹冷时,稀释涂抹腹部或喝 1 滴\n手脚冰凉时稀释涂抹脚底或将1滴加入热饮中\n晕车时,吸闻或滴在手心嗅吸\n祛除风寒可将 2 滴加入热水中泡脚\n痛经时,稀释涂抹于小腹并按摩\n做菜时可加入 1 滴帮助增添风味", "method": "🔹香薰 🔸内用 🔺涂抹(需稀释)", "caution": ""},
"柠檬草": {"emoji": "🍃", "en": "Lemongrass", "effects": "强效抗菌、抗炎\n驱虫、净化空气\n扩张血管,促进循环,缓解肌肉疼痛", "usage": "筋膜紧绷、腿麻或肌肉酸痛时稀释涂抹\n肩周炎时6 倍稀释后涂抹于肩颈部位并按摩\n做菜时加入 1 滴,增加泰式风味\n加入椰子油中制成家居喷雾,涂抹在裸露肌肤上驱蚊虫\n洗衣时加 3至5 滴祛味杀菌\n日常香薰平衡情绪", "method": "🔹香薰 🔸内用 🔺涂抹(需 6 倍稀释)", "caution": ""},
"柑橘清新": {"emoji": "🍬", "en": "Citrus Bliss", "effects": "提振精神,改善负面情绪\n净化空间\n降低压力", "usage": "日常香薰提升愉悦感,提振精神,净化空间\n拖地时加几滴清新空气\n加入到护手霜中,滋润手部肌肤,享受清新香气", "method": "🔹香薰 🔺涂抹", "caution": "含柑橘类,光敏注意白天涂抹"},
"芳香调理": {"emoji": "🤲", "en": "AromaTouch", "effects": "放松紧绷肌肉,放松关节\n促进血液循环\n促进淋巴排毒\n提升免疫\n舒缓放松,减少紧张", "usage": "稀释涂抹于太阳穴,缓解头痛,改善紧张情绪\n稀释涂抹于僵硬的身体部位如肩颈处并按摩,促进肌肉放松\n日常香薰或加入热水中泡澡,释放压力", "method": "🔹香薰 🔺涂抹", "caution": ""},
"西洋蓍草": {"emoji": "🔵", "en": "Yarrow | Pom", "effects": "改善肌肤老化症状\n美白肌肤,改善瑕疵\n呵护敏感肌肤,对抗炎症\n提升整体免疫", "usage": "早晚护肤时涂抹3至4滴于面部改善皱纹和细纹美白肌肤\n每天早晚舌下含服1滴促进细胞健康提升免疫", "method": "🔸内用 🔺涂抹", "caution": ""},
"新瑞活力": {"emoji": "🌿", "en": "MetaPWR", "effects": "促进新陈代谢,减肥\n抑制食欲,减少对甜食的渴望\n稳定血糖波动\n提振情绪,激励身心", "usage": "饭前喝1至2滴控制食欲稳定血糖提升代谢\n日常香薰可以帮助恢复能量,消除疲乏感\n稀释涂抹与身体需紧致的部位,帮助紧致塑形\n加入饮品中,帮助增添风味", "method": "🔹香薰 🔸内用 🔺涂抹(需稀释)", "caution": ""},
"安定情绪": {"emoji": "🌳", "en": "Balance", "effects": "促进全身的放松\n减轻焦虑,缓解紧张情绪\n带来宁静和安定感", "usage": "日常香薰稳定情绪,放松\n夜间香薰促进睡眠\n涂抹脚底或脊椎放松情绪,放松肌肉\n冥想、瑜伽前涂抹", "method": "🔹香薰 🔺涂抹", "caution": ""},
"安宁神气": {"emoji": "😴", "en": "Serenity", "effects": "促进深度睡眠\n放松身体,缓解焦虑\n平衡情绪\n平衡自律神经系统", "usage": "夜间香薰或稀释涂抹脚底促进深度睡眠,释放压力\n稀释涂抹太阳穴或脚底舒缓压力\n吸闻缓解焦虑和紧张情绪", "method": "🔹香薰 🔺涂抹", "caution": ""},
"元气焕能": {"emoji": "🔥", "en": "Zendocrine", "effects": "帮助身体净化,排毒\n维持肝脏和肾脏健康\n平衡情绪", "usage": "饭前内用1至2滴帮助代谢\n稀释涂抹肝区或内服3滴帮助养护肝脏\n稀释涂抹后腰脊椎出帮助养护肾脏,排除毒素\n日常香薰消除压力", "method": "🔹香薰 🔸内用 🔺涂抹", "caution": ""},
"温柔呵护": {"emoji": "🌸", "en": "Soft Talk", "effects": "平衡荷尔蒙\n抚平情绪波动\n调理经期不适\n舒缓压力\n提升女性魅力", "usage": "稀释涂抹下腹部帮助平衡荷尔蒙,或进行经期调理\n手心嗅吸帮助舒缓压力,平衡情绪\n2滴直接涂抹于脖颈后侧或手腕动脉处提升女性魅力", "method": "🔹香薰 🔺涂抹", "caution": ""},
"柠檬": {"emoji": "🍋", "en": "Lemon", "effects": "清洁身体与环境\n强健免疫系统\n帮助肝脏代谢、排毒\n抗氧化\n净化空气、去异味\n蔬果清洗、保鲜\n促进循环、提振精神", "usage": "添加至护肤品中晚上使用\n添加至牙膏里美白牙齿\n滴入口中或水里喝下一天三次每次3至5滴净化身体\n洗水果和蔬菜时添加 1至2 滴浸泡\n嗓子疼或感冒初期时含服柠檬1至2滴\n日常香薰提振情绪,护肝", "method": "🔹香薰 🔸内用 🔺涂抹(夜间)", "caution": "光敏性,白天避免涂抹"},
"薰衣草": {"emoji": "💜", "en": "Lavender", "effects": "镇静安神、改善睡眠、缓解头痛\n舒缓压力、平衡情绪、抗抑郁\n烧烫伤修复、疤痕、痘印\n促进伤口修复、止血\n促进细胞再生,修复结缔组织\n抗炎、抗过敏、止痛\n皮肤舒缓止痒,如蚊虫叮咬", "usage": "烧伤、烫伤、割伤及任何伤口处涂抹,止血防疤\n夜间香薰助眠,白天香薰舒缓情绪\n鱼刺卡嗓子时滴入口中\n加入护肤品中平衡油脂、改善痘痘、去疤痕", "method": "🔹香薰 🔸内用 🔺涂抹(儿童/敏感肌需稀释)", "caution": ""},
"椒样薄荷": {"emoji": "🌿", "en": "Peppermint", "effects": "促进健康的呼吸系统\n祛痰、抗粘膜发炎、打开呼吸道\n强肝利胆,促进消化\n退热、缓解中暑\n清凉止痒\n提神醒脑、提升专注、缓解头痛", "usage": "白天香薰提神醒脑,清新空气\n按摩头部缓解头疼、提神醒脑\n蚊虫叮咬后,涂抹止痒\n混入水中进行漱口,清新口气\n发烧时涂抹额头腋下帮助降温\n打嗝、咳嗽、鼻塞时吸闻\n消化不良时稀释涂抹于腹部或内用 2 滴", "method": "🔹香薰 🔸内用 🔺涂抹(儿童/敏感肌需稀释)", "caution": "孕期/高血压慎用,晚上少用"},
"茶树": {"emoji": "🌱", "en": "Tea Tree", "effects": "抗菌、抗病毒、抗真菌\n提升免疫力\n头皮屑护理\n预防化脓\n居家杀菌净化", "usage": "各种痤疮处点涂\n加入护肤品中,清洁皮肤\n洗头时加 1 滴到洗头膏,去头皮屑\n洗衣服时加入 3至5 滴,杀菌祛味\n脚气时用茶树泡脚\n感冒时涂抹,杀菌抗病毒", "method": "🔹香薰 🔸内用 🔺涂抹(儿童/敏感肌需稀释)", "caution": ""},
"西班牙牛至": {"emoji": "🔥", "en": "Oregano", "effects": "强抗菌、抗病毒、抗顽固性真菌\n成人炎症辅助\n促进消化\n强抗氧化、抗衰老\n免疫力提升", "usage": "洗衣服或拖地时加入 3至5 滴,消炎杀菌\n吃坏肚子时灌于胶囊中内用\n灰指甲时稀释涂抹于患处\n流感季节时香薰,杀灭空气中微生物", "method": "🔹香薰 🔸内用(胶囊) 🔺涂抹(需高倍稀释)", "caution": ""},
"保卫": {"emoji": "🛡", "en": "On Guard", "effects": "强化免疫力\n抗氧化\n天然杀菌、净化空气\n维护口腔健康", "usage": "日常香熏净化空气,强化免疫力\n流感季节或换季时香薰\n混入水中漱口,保持口气清新\n日常稀释涂抹于脊椎或脚底,强化免疫力\n感冒时涂抹,抗菌抗病毒", "method": "🔹香薰 🔸内用 🔺涂抹(儿童/敏感肌需稀释)", "caution": "含肉桂丁香,不宜频繁涂抹"},
"顺畅呼吸": {"emoji": "🌬", "en": "Breathe", "effects": "帮助缓解鼻炎、感冒等呼吸道不适\n促进呼吸系统健康\n净化空气", "usage": "日常香薰,强健呼吸系统,净化空气\n咳嗽、鼻塞时香薰、吸闻、涂抹于鼻翼、喉咙或肺部\n打鼾、哮喘、鼻炎可日常吸闻\n运动前吸闻,扩张呼吸道", "method": "🔹香薰 🔺涂抹(儿童/敏感肌需稀释)", "caution": ""},
"乐活": {"emoji": "🍃", "en": "DigestZen", "effects": "促进消化\n缓解胀气、消化不良、便秘等胃肠不适", "usage": "便秘时,稀释涂抹肚脐周围并顺时针揉腹\n喝酒前后各喝2滴解酒护肝\n晕车时吸闻或稀释涂抹肚脐周围\n拉肚子时逆时针揉腹", "method": "🔹熏香 🔸内用 🔺涂抹(儿童/敏感肌需稀释)", "caution": ""},
"舒缓": {"emoji": "🌿", "en": "Deep Blue", "effects": "缓解肌肉酸痛\n抗痉挛,抗炎", "usage": "肌肉酸痛、扭伤、挫伤、肩颈紧绷、落枕、关节疼痛时稀释涂抹于患处", "method": "🔺涂抹(需稀释)", "caution": ""},
"乳香": {"emoji": "👑", "en": "Frankincense", "effects": "促进伤口愈合,促进细胞健康\n抗氧化、抗发炎、抗衰老、淡纹\n活血行气\n疏通血管\n滋养大脑神经", "usage": "加入护肤品中,淡斑,抗衰\n稀释后涂抹大眼眶,改善视力\n早晚舌下含服 2 滴,提高血氧含量\n夜间香薰,滋养大脑,安眠\n任何情况下,想不起来用什么就用乳香", "method": "🔹香薰 🔸内用 🔺涂抹", "caution": ""},
}
def title_case(s: str) -> str:
"""Convert to title case: 'pain relief''Pain Relief'"""
return s.strip().title() if s else s
# ── Password hashing (PBKDF2-SHA256, stdlib) ─────────
def hash_password(password: str) -> str:
salt = _secrets.token_hex(16)
h = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 200_000)
return f"{salt}${h.hex()}"
def verify_password(password: str, stored: str) -> bool:
if not stored:
return False
if "$" not in stored:
# Legacy plaintext — compare directly
return password == stored
salt, h = stored.split("$", 1)
return hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 200_000).hex() == h
def _upgrade_password_if_needed(conn, user_id: int, password: str, stored: str):
"""If stored password is legacy plaintext, upgrade to hashed."""
if stored and "$" not in stored:
conn.execute("UPDATE users SET password = ? WHERE id = ?", (hash_password(password), user_id))
conn.commit()
# Periodic WAL checkpoint to ensure data is flushed to main DB file
import threading, time as _time
def _wal_checkpoint_loop():
while True:
_time.sleep(300) # Every 5 minutes
try:
conn = get_db()
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
conn.close()
except: pass
threading.Thread(target=_wal_checkpoint_loop, daemon=True).start()
# ── Auth ────────────────────────────────────────────────
ANON_USER = {"id": None, "role": "viewer", "username": "anonymous", "display_name": "匿名用户"}
def get_current_user(request: Request):
token = request.headers.get("Authorization", "").removeprefix("Bearer ").strip()
if not token:
return ANON_USER
conn = get_db()
user = conn.execute("SELECT id, username, role, display_name, password, business_verified, username_changed FROM users WHERE token = ?", (token,)).fetchone()
conn.close()
if not user:
return ANON_USER
return dict(user)
def require_role(*roles):
"""Returns a dependency that checks the user has one of the given roles."""
def checker(user=Depends(get_current_user)):
if user["role"] not in roles:
raise HTTPException(403, "权限不足")
return user
return checker
# ── Models ──────────────────────────────────────────────
class OilIn(BaseModel):
name: str
bottle_price: float
drop_count: int
retail_price: Optional[float] = None
en_name: Optional[str] = None
is_active: Optional[int] = None
unit: Optional[str] = None
# Oil card fields (optional, saved to oil_cards table)
card_emoji: Optional[str] = None
card_effects: Optional[str] = None
card_usage: Optional[str] = None
card_method: Optional[str] = None
card_caution: Optional[str] = None
class IngredientIn(BaseModel):
oil_name: str
drops: float
class RecipeIn(BaseModel):
name: str
note: str = ""
ingredients: list[IngredientIn]
tags: list[str] = []
en_name: Optional[str] = None
class RecipeUpdate(BaseModel):
name: Optional[str] = None
en_name: Optional[str] = None
note: Optional[str] = None
ingredients: Optional[list[IngredientIn]] = None
tags: Optional[list[str]] = None
version: Optional[int] = None
volume: Optional[str] = None
class UserIn(BaseModel):
username: str
role: str = "viewer"
display_name: str = ""
class UserUpdate(BaseModel):
role: Optional[str] = None
display_name: Optional[str] = None
# ── Me ──────────────────────────────────────────────────
APP_VERSION = "20260401"
@app.get("/api/version")
def get_version():
return {"version": APP_VERSION}
@app.get("/api/me")
def get_me(user=Depends(get_current_user)):
return {"username": user["username"], "role": user["role"], "display_name": user["username"], "id": user.get("id"), "has_password": bool(user.get("password")), "business_verified": bool(user.get("business_verified")), "username_changed": bool(user.get("username_changed"))}
# ── Bug Reports ─────────────────────────────────────────
@app.post("/api/bug-report", status_code=201)
def submit_bug(body: dict, user=Depends(get_current_user)):
content = body.get("content", "").strip()
if not content:
raise HTTPException(400, "请输入内容")
conn = get_db()
who = user.get("display_name") or user.get("username") or "匿名"
# Admin can set priority; user-submitted bugs default to urgent (0)
default_pri = 2 if user.get("role") == "admin" else 0
priority = body.get("priority", default_pri)
c = conn.execute("INSERT INTO bug_reports (user_id, content, priority) VALUES (?, ?, ?)", (user.get("id"), content, priority))
bug_id = c.lastrowid
# Auto-log: created
conn.execute(
"INSERT INTO bug_comments (bug_id, user_id, action, content) VALUES (?, ?, ?, ?)",
(bug_id, user.get("id"), "创建", content),
)
# Only notify admin if submitter is NOT admin
if user.get("role") != "admin":
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", "🐛 Bug 反馈", f"{who}{content}\n[bug_id:{bug_id}]")
)
conn.commit()
conn.close()
return {"ok": True}
@app.get("/api/bug-reports")
def list_bugs(user=Depends(get_current_user)):
conn = get_db()
if user["role"] == "admin":
rows = conn.execute(
"SELECT b.id, b.content, b.is_resolved, b.created_at, b.user_id, b.priority, b.assigned_to, "
"u.display_name, u.username, a.display_name as assigned_name "
"FROM bug_reports b LEFT JOIN users u ON b.user_id = u.id LEFT JOIN users a ON b.assigned_to = a.id "
"ORDER BY b.priority ASC, COALESCE((SELECT MAX(c.created_at) FROM bug_comments c WHERE c.bug_id=b.id), b.created_at) DESC"
).fetchall()
else:
rows = conn.execute(
"SELECT b.id, b.content, b.is_resolved, b.created_at, b.user_id, b.priority, b.assigned_to, "
"u.display_name, u.username, a.display_name as assigned_name "
"FROM bug_reports b LEFT JOIN users u ON b.user_id = u.id LEFT JOIN users a ON b.assigned_to = a.id "
"WHERE b.user_id = ? OR b.assigned_to = ? OR b.is_resolved IN (1, 3) "
"ORDER BY b.priority ASC, COALESCE((SELECT MAX(c.created_at) FROM bug_comments c WHERE c.bug_id=b.id), b.created_at) DESC",
(user.get("id"), user.get("id"))
).fetchall()
bugs = [dict(r) for r in rows]
# Attach comments/log for each bug
for bug in bugs:
comments = conn.execute(
"SELECT c.id, c.action, c.content, c.created_at, u.display_name, u.username "
"FROM bug_comments c LEFT JOIN users u ON c.user_id = u.id "
"WHERE c.bug_id = ? ORDER BY c.created_at ASC",
(bug["id"],)
).fetchall()
bug["comments"] = [dict(c) for c in comments]
conn.close()
return bugs
@app.put("/api/bug-reports/{bug_id}")
def update_bug(bug_id: int, body: dict, user=Depends(get_current_user)):
conn = get_db()
bug = conn.execute("SELECT user_id, content, is_resolved FROM bug_reports WHERE id = ?", (bug_id,)).fetchone()
if not bug:
conn.close()
raise HTTPException(404, "Bug not found")
# status: 0=open, 1=待测试, 2=已修复(admin only), 3=已测试(tester feedback)
status_names = {0: "待处理", 1: "待测试", 2: "已修复", 3: "已测试"}
new_status = body.get("status")
note = body.get("note", "").strip()
if new_status is not None:
# Only admin can mark as resolved (status 2)
if new_status == 2 and user["role"] != "admin":
conn.close()
raise HTTPException(403, "只有管理员可以标记为已修复")
conn.execute("UPDATE bug_reports SET is_resolved = ? WHERE id = ?", (new_status, bug_id))
# Auto-log the status change
action = "" + status_names.get(new_status, str(new_status))
conn.execute(
"INSERT INTO bug_comments (bug_id, user_id, action, content) VALUES (?, ?, ?, ?)",
(bug_id, user.get("id"), action, note),
)
notify_user_id = body.get("notify_user_id")
if new_status == 1 and notify_user_id:
# Admin sends to specific tester — user-targeted notification
conn.execute("UPDATE bug_reports SET assigned_to = ? WHERE id = ?", (notify_user_id, bug_id))
target = conn.execute("SELECT role, display_name, username FROM users WHERE id = ?", (notify_user_id,)).fetchone()
if target:
msg = "请帮忙测试「" + bug["content"][:80] + ""
if note:
msg += "\n\n备注:" + note
msg += "\n[bug_id:" + str(bug_id) + "]"
conn.execute(
"INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)",
(target["role"], "🔧 Bug 待测试", msg, notify_user_id)
)
elif new_status == 1 and bug["user_id"]:
# Admin marks as testing → notify reporter
reporter = conn.execute("SELECT role, display_name, username FROM users WHERE id = ?", (bug["user_id"],)).fetchone()
if reporter:
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
(reporter["role"], "🔧 你的 Bug 已修复,请测试",
"请帮忙测试「" + bug["content"][:80] + "" + ("\n\n备注:" + note if note else ""))
)
if reporter["role"] in ("viewer", "editor"):
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("senior_editor", "🔧 Bug 待测试(来自" + (reporter["display_name"] or reporter["username"]) + "",
"问题「" + bug["content"][:50] + "」已修复,请协助测试确认。")
)
elif new_status == 3:
# Tester confirms tested → notify admin
who = user.get("display_name") or user.get("username") or "测试者"
msg = who + " 已测试「" + bug["content"][:50] + ""
if note:
msg += "\n\n备注:" + note
msg += "\n[bug_id:" + str(bug_id) + "]"
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", "🧪 Bug 已测试", msg)
)
elif new_status == 2:
# Admin marks as resolved
pass # No notification needed, admin did it themselves
if "content" in body:
conn.execute("UPDATE bug_reports SET content = ? WHERE id = ?", (body["content"], bug_id))
if "priority" in body:
conn.execute("UPDATE bug_reports SET priority = ? WHERE id = ?", (body["priority"], bug_id))
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/bug-reports/{bug_id}/comment")
@app.delete("/api/bug-reports/{bug_id}")
def delete_bug(bug_id: int, user=Depends(require_role("admin"))):
conn = get_db()
conn.execute("DELETE FROM bug_comments WHERE bug_id = ?", (bug_id,))
conn.execute("DELETE FROM bug_reports WHERE id = ?", (bug_id,))
conn.commit()
conn.close()
return {"ok": True}
def add_bug_comment(bug_id: int, body: dict, user=Depends(get_current_user)):
"""Add a manual comment/note to a bug's log."""
content = body.get("content", "").strip()
if not content:
raise HTTPException(400, "请输入内容")
conn = get_db()
bug = conn.execute("SELECT id FROM bug_reports WHERE id = ?", (bug_id,)).fetchone()
if not bug:
conn.close()
raise HTTPException(404, "Bug not found")
conn.execute(
"INSERT INTO bug_comments (bug_id, user_id, action, content) VALUES (?, ?, ?, ?)",
(bug_id, user.get("id"), "备注", content),
)
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/symptom-search")
def symptom_search(body: dict, user=Depends(get_current_user)):
"""Search recipes by symptom, notify editors if no good match."""
query = body.get("query", "").strip()
if not query:
return {"recipes": [], "exact": False}
conn = get_db()
# Search in recipe names
rows = conn.execute(
"SELECT id, name, note, owner_id, version, en_name, volume FROM recipes ORDER BY id"
).fetchall()
exact = []
related = []
for r in rows:
if query in r["name"]:
exact.append(_recipe_to_dict(conn, r))
elif any(c in r["name"] for c in query):
related.append(_recipe_to_dict(conn, r))
# If user reports no match, notify editors
if body.get("report_missing"):
who = user.get("display_name") or user.get("username") or "用户"
for role in ("admin", "senior_editor"):
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
(role, "🔍 用户需求:" + query,
f"{who} 搜索了「{query}」,没有找到满意的配方,请考虑添加。")
)
conn.execute("INSERT INTO search_log (user_id, query, matched_count) VALUES (?, ?, ?)",
(user.get("id"), query, 0))
conn.commit()
conn.close()
return {"exact": exact, "related": related[:20]}
# ── Register ────────────────────────────────────────────
@app.post("/api/register", status_code=201)
def register(body: dict):
username = body.get("username", "").strip()
password = body.get("password", "").strip()
if not username or len(username) < 2:
raise HTTPException(400, "用户名至少2个字符")
if not password or len(password) < 4:
raise HTTPException(400, "密码至少4位")
# Case-insensitive uniqueness check
conn = get_db()
existing = conn.execute("SELECT id FROM users WHERE LOWER(username) = LOWER(?)", (username,)).fetchone()
if existing:
conn.close()
raise HTTPException(400, "用户名已被占用")
token = _secrets.token_hex(24)
try:
conn.execute(
"INSERT INTO users (username, token, role, display_name, password) VALUES (?, ?, ?, ?, ?)",
(username, token, "viewer", username, hash_password(password))
)
uid = conn.execute("SELECT id FROM users WHERE username = ?", (username,)).fetchone()
log_audit(conn, uid["id"] if uid else None, "register", "user", username, username, None)
conn.commit()
except Exception:
conn.close()
raise HTTPException(400, "用户名已被占用")
conn.close()
return {"token": token}
# ── Login ───────────────────────────────────────────────
@app.post("/api/login")
def login(body: dict):
username = body.get("username", "").strip()
password = body.get("password", "").strip()
if not username or not password:
raise HTTPException(400, "请输入用户名和密码")
conn = get_db()
user = conn.execute("SELECT id, token, password, display_name, role, username FROM users WHERE LOWER(username) = LOWER(?)", (username,)).fetchone()
if not user:
conn.close()
raise HTTPException(401, "用户名不存在")
if not user["password"]:
conn.close()
raise HTTPException(401, "该账号未设置密码,请使用链接登录后设置密码")
if not verify_password(password, user["password"]):
conn.close()
raise HTTPException(401, "密码错误")
# Auto-upgrade legacy plaintext password to hashed
_upgrade_password_if_needed(conn, user["id"], password, user["password"])
conn.close()
return {"token": user["token"], "display_name": user["display_name"], "role": user["role"]}
@app.put("/api/me")
def update_me(body: dict, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
conn = get_db()
# Update display_name
if "display_name" in body:
dn = body["display_name"].strip()
if not dn:
conn.close()
raise HTTPException(400, "昵称不能为空")
conn.execute("UPDATE users SET display_name = ? WHERE id = ?", (dn, user["id"]))
# Update username
if "username" in body:
un = body["username"].strip()
if not un or len(un) < 2:
conn.close()
raise HTTPException(400, "用户名至少2个字符")
existing = conn.execute("SELECT id FROM users WHERE username = ? AND id != ?", (un, user["id"])).fetchone()
if existing:
conn.close()
raise HTTPException(400, "用户名已被占用")
conn.execute("UPDATE users SET username = ? WHERE id = ?", (un, user["id"]))
# Update password (requires old password verification)
if "password" in body:
pw = body["password"].strip()
if pw and len(pw) < 4:
conn.close()
raise HTTPException(400, "新密码至少4位")
old_pw = body.get("old_password", "").strip()
current_pw = user.get("password") or ""
if current_pw and not verify_password(old_pw, current_pw):
conn.close()
raise HTTPException(400, "当前密码不正确")
if pw:
conn.execute("UPDATE users SET password = ? WHERE id = ?", (hash_password(pw), user["id"]))
conn.commit()
conn.close()
return {"ok": True}
@app.put("/api/me/password")
def set_password(body: dict, user=Depends(get_current_user)):
"""Legacy endpoint, kept for compatibility."""
if not user["id"]:
raise HTTPException(403, "请先登录")
pw = body.get("password", "").strip()
if not pw or len(pw) < 4:
raise HTTPException(400, "密码至少4位")
conn = get_db()
conn.execute("UPDATE users SET password = ? WHERE id = ?", (hash_password(pw), user["id"]))
conn.commit()
conn.close()
return {"ok": True}
@app.put("/api/me/username")
def change_username(body: dict, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
conn = get_db()
u = conn.execute("SELECT username_changed FROM users WHERE id = ?", (user["id"],)).fetchone()
if u and u["username_changed"]:
conn.close()
raise HTTPException(400, "用户名只能修改一次")
new_name = body.get("username", "").strip()
if not new_name or len(new_name) < 2:
conn.close()
raise HTTPException(400, "用户名至少2个字符")
existing = conn.execute("SELECT id FROM users WHERE LOWER(username) = LOWER(?) AND id != ?", (new_name, user["id"])).fetchone()
if existing:
conn.close()
raise HTTPException(400, "用户名已被占用")
conn.execute("UPDATE users SET username = ?, display_name = ?, username_changed = 1 WHERE id = ?",
(new_name, new_name, user["id"]))
conn.commit()
conn.close()
return {"ok": True, "username": new_name}
# ── Business Verification ──────────────────────────────
@app.post("/api/business-apply", status_code=201)
def business_apply(body: dict, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
conn = get_db()
# Check if already has pending application
existing = conn.execute(
"SELECT id, status FROM business_applications WHERE user_id = ? ORDER BY id DESC LIMIT 1",
(user["id"],)
).fetchone()
if existing and existing["status"] == "pending":
conn.close()
raise HTTPException(400, "已有待审核的申请")
if user.get("business_verified"):
conn.close()
raise HTTPException(400, "已是认证商业用户")
business_name = body.get("business_name", "").strip()
document = body.get("document", "") # base64 image
if not business_name:
conn.close()
raise HTTPException(400, "请填写商户名称")
if document and len(document) > 2000000:
conn.close()
raise HTTPException(400, "文件太大请压缩到1.5MB以内")
conn.execute(
"INSERT INTO business_applications (user_id, business_name, document) VALUES (?, ?, ?)",
(user["id"], business_name, document)
)
who = user.get("display_name") or user.get("username")
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", "🏢 商业认证申请", f"{who} 申请商业用户认证,商户名:{business_name}")
)
log_audit(conn, user["id"], "business_apply", "user", user["id"], who,
json.dumps({"business_name": business_name}))
conn.commit()
conn.close()
return {"ok": True}
@app.get("/api/my-business-application")
def get_my_business_application(user=Depends(get_current_user)):
if not user["id"]:
return {"status": None}
conn = get_db()
row = conn.execute(
"SELECT business_name, document, status, reject_reason, created_at FROM business_applications WHERE user_id = ? ORDER BY id DESC LIMIT 1",
(user["id"],)
).fetchone()
conn.close()
if not row:
return {"status": None}
return dict(row)
@app.get("/api/business-applications")
def list_business_applications(user=Depends(require_role("admin"))):
conn = get_db()
rows = conn.execute(
"SELECT a.id, a.user_id, a.business_name, a.document, a.status, a.reject_reason, a.created_at, "
"u.display_name, u.username FROM business_applications a "
"LEFT JOIN users u ON a.user_id = u.id ORDER BY a.id DESC"
).fetchall()
conn.close()
return [dict(r) for r in rows]
@app.post("/api/business-applications/{app_id}/approve")
def approve_business(app_id: int, user=Depends(require_role("admin"))):
conn = get_db()
app = conn.execute("SELECT user_id, business_name FROM business_applications WHERE id = ?", (app_id,)).fetchone()
if not app:
conn.close()
raise HTTPException(404, "申请不存在")
conn.execute("UPDATE business_applications SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?", (app_id,))
conn.execute("UPDATE users SET business_verified = 1 WHERE id = ?", (app["user_id"],))
target = conn.execute("SELECT role, display_name, username FROM users WHERE id = ?", (app["user_id"],)).fetchone()
target_name = (target["display_name"] or target["username"]) if target else "unknown"
if target:
conn.execute(
"INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)",
(target["role"], "🎉 商业认证通过", "恭喜!你的商业用户认证已通过,现在可以使用项目核算等商业功能。", app["user_id"])
)
log_audit(conn, user["id"], "approve_business", "user", app["user_id"], target_name,
json.dumps({"business_name": app["business_name"]}))
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/business-applications/{app_id}/reject")
def reject_business(app_id: int, body: dict = None, user=Depends(require_role("admin"))):
conn = get_db()
app = conn.execute("SELECT user_id FROM business_applications WHERE id = ?", (app_id,)).fetchone()
if not app:
conn.close()
raise HTTPException(404, "申请不存在")
reason = (body or {}).get("reason", "").strip()
conn.execute("UPDATE business_applications SET status = 'rejected', reviewed_at = datetime('now'), reject_reason = ? WHERE id = ?", (reason, app_id))
target = conn.execute("SELECT role, display_name, username FROM users WHERE id = ?", (app["user_id"],)).fetchone()
target_name = (target["display_name"] or target["username"]) if target else "unknown"
if target:
msg = "你的商业用户认证申请未通过。"
if reason:
msg += "\n\n原因:" + reason
msg += "\n\n你可以修改后重新申请。"
conn.execute(
"INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)",
(target["role"], "商业认证未通过", msg, app["user_id"])
)
log_audit(conn, user["id"], "reject_business", "user", app["user_id"], target_name,
json.dumps({"reason": reason}))
conn.commit()
conn.close()
return {"ok": True}
# ── Translation Suggestions ────────────────────────────
@app.post("/api/translation-suggest", status_code=201)
def suggest_translation(body: dict, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
recipe_name = body.get("recipe_name", "").strip()
suggested_en = body.get("suggested_en", "").strip()
recipe_id = body.get("recipe_id")
if not recipe_name or not suggested_en:
raise HTTPException(400, "请填写翻译")
conn = get_db()
conn.execute(
"INSERT INTO translation_suggestions (recipe_id, recipe_name, suggested_en, user_id) VALUES (?, ?, ?, ?)",
(recipe_id, recipe_name, suggested_en, user["id"])
)
who = user.get("display_name") or user.get("username")
if user["role"] != "admin":
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", "📝 翻译建议", f"{who} 建议将「{recipe_name}」翻译为「{suggested_en}")
)
conn.commit()
conn.close()
return {"ok": True}
@app.get("/api/translation-suggestions")
def list_translation_suggestions(user=Depends(require_role("admin"))):
conn = get_db()
rows = conn.execute(
"SELECT s.id, s.recipe_id, s.recipe_name, s.suggested_en, s.status, s.created_at, "
"u.display_name, u.username FROM translation_suggestions s "
"LEFT JOIN users u ON s.user_id = u.id WHERE s.status = 'pending' ORDER BY s.id DESC"
).fetchall()
conn.close()
return [dict(r) for r in rows]
@app.post("/api/translation-suggestions/{sid}/approve")
def approve_translation(sid: int, user=Depends(require_role("admin"))):
conn = get_db()
row = conn.execute("SELECT recipe_name, suggested_en FROM translation_suggestions WHERE id = ?", (sid,)).fetchone()
if not row:
conn.close()
raise HTTPException(404)
conn.execute("UPDATE translation_suggestions SET status = 'approved' WHERE id = ?", (sid,))
conn.commit()
conn.close()
return {"ok": True, "recipe_name": row["recipe_name"], "suggested_en": row["suggested_en"]}
@app.post("/api/translation-suggestions/{sid}/reject")
def reject_translation(sid: int, user=Depends(require_role("admin"))):
conn = get_db()
conn.execute("UPDATE translation_suggestions SET status = 'rejected' WHERE id = ?", (sid,))
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/business-grant/{user_id}")
def grant_business(user_id: int, user=Depends(require_role("admin"))):
conn = get_db()
conn.execute("UPDATE users SET business_verified = 1 WHERE id = ?", (user_id,))
target = conn.execute("SELECT role, display_name, username FROM users WHERE id = ?", (user_id,)).fetchone()
target_name = (target["display_name"] or target["username"]) if target else "unknown"
if target:
conn.execute(
"INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)",
(target["role"], "🎉 商业认证已开通", "管理员已为你开通商业用户认证,现在可以使用商业核算等功能。", user_id)
)
log_audit(conn, user["id"], "grant_business", "user", user_id, target_name, None)
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/business-revoke/{user_id}")
def revoke_business(user_id: int, body: dict = None, user=Depends(require_role("admin"))):
conn = get_db()
conn.execute("UPDATE users SET business_verified = 0 WHERE id = ?", (user_id,))
reason = (body or {}).get("reason", "").strip()
target = conn.execute("SELECT role, display_name, username FROM users WHERE id = ?", (user_id,)).fetchone()
if target:
msg = "你的商业用户资格已被取消。"
if reason:
msg += "\n\n原因:" + reason
msg += "\n\n如有疑问请联系管理员,也可重新申请认证。"
conn.execute(
"INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)",
(target["role"], "商业资格已取消", msg, user_id)
)
target_name = (target["display_name"] or target["username"]) if target else "unknown"
log_audit(conn, user["id"], "revoke_business", "user", user_id, target_name,
json.dumps({"reason": reason}) if reason else None)
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/impersonate")
def impersonate(body: dict, user=Depends(require_role("admin"))):
target_id = body.get("user_id")
if not target_id:
raise HTTPException(400, "user_id required")
conn = get_db()
target = conn.execute("SELECT token FROM users WHERE id = ?", (target_id,)).fetchone()
conn.close()
if not target:
raise HTTPException(404, "User not found")
return {"token": target["token"]}
# ── Oils ────────────────────────────────────────────────
@app.get("/api/oils")
def list_oils():
conn = get_db()
rows = conn.execute("SELECT name, bottle_price, drop_count, retail_price, is_active, en_name, unit FROM oils ORDER BY name").fetchall()
conn.close()
return [dict(r) for r in rows]
@app.post("/api/oils", status_code=201)
def upsert_oil(oil: OilIn, user=Depends(require_role("admin", "senior_editor"))):
conn = get_db()
conn.execute(
"INSERT INTO oils (name, bottle_price, drop_count, retail_price, en_name, is_active, unit) VALUES (?, ?, ?, ?, ?, ?, ?) "
"ON CONFLICT(name) DO UPDATE SET bottle_price=excluded.bottle_price, drop_count=excluded.drop_count, "
"retail_price=excluded.retail_price, en_name=COALESCE(excluded.en_name, oils.en_name), "
"is_active=COALESCE(excluded.is_active, oils.is_active), unit=COALESCE(excluded.unit, oils.unit)",
(oil.name, oil.bottle_price, oil.drop_count, oil.retail_price, title_case(oil.en_name) if oil.en_name else oil.en_name, oil.is_active, oil.unit),
)
# Upsert oil_cards if any card field provided
has_card = any(v is not None for v in [oil.card_emoji, oil.card_effects, oil.card_usage, oil.card_method, oil.card_caution])
if has_card:
conn.execute(
"INSERT INTO oil_cards (name, emoji, en, effects, usage, method, caution) "
"VALUES (?, ?, COALESCE(?, ''), ?, ?, ?, ?) "
"ON CONFLICT(name) DO UPDATE SET "
"emoji=COALESCE(excluded.emoji, oil_cards.emoji), "
"en=COALESCE(excluded.en, oil_cards.en), "
"effects=COALESCE(excluded.effects, oil_cards.effects), "
"usage=COALESCE(excluded.usage, oil_cards.usage), "
"method=COALESCE(excluded.method, oil_cards.method), "
"caution=COALESCE(excluded.caution, oil_cards.caution)",
(oil.name, oil.card_emoji or '', title_case(oil.en_name) if oil.en_name else '',
oil.card_effects or '', oil.card_usage or '', oil.card_method or '', oil.card_caution or ''),
)
log_audit(conn, user["id"], "upsert_oil", "oil", oil.name, oil.name,
json.dumps({"bottle_price": oil.bottle_price, "drop_count": oil.drop_count}))
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/oils/{name}")
def delete_oil(name: str, user=Depends(require_role("admin", "senior_editor"))):
conn = get_db()
row = conn.execute("SELECT name, bottle_price, drop_count, retail_price, is_active FROM oils WHERE name = ?", (name,)).fetchone()
snapshot = dict(row) if row else {}
conn.execute("DELETE FROM oils WHERE name = ?", (name,))
log_audit(conn, user["id"], "delete_oil", "oil", name, name,
json.dumps(snapshot, ensure_ascii=False))
conn.commit()
conn.close()
return {"ok": True}
# ── Oil Cards ──────────────────────────────────────────
@app.get("/api/oil-cards")
def list_oil_cards():
conn = get_db()
rows = conn.execute("SELECT name, emoji, en, effects, usage, method, caution FROM oil_cards ORDER BY name").fetchall()
conn.close()
return [dict(r) for r in rows]
@app.get("/api/oil-cards/{name}")
def get_oil_card(name: str):
conn = get_db()
row = conn.execute("SELECT name, emoji, en, effects, usage, method, caution FROM oil_cards WHERE name = ?", (name,)).fetchone()
conn.close()
if not row:
raise HTTPException(404, "Oil card not found")
return dict(row)
# ── Recipes ─────────────────────────────────────────────
def _recipe_to_dict(conn, row):
rid = row["id"]
ings = conn.execute(
"SELECT oil_name, drops FROM recipe_ingredients WHERE recipe_id = ?", (rid,)
).fetchall()
tags = conn.execute(
"SELECT tag_name FROM recipe_tags WHERE recipe_id = ?", (rid,)
).fetchall()
owner = conn.execute(
"SELECT display_name, username FROM users WHERE id = ?", (row["owner_id"],)
).fetchone() if row["owner_id"] else None
return {
"id": rid,
"name": row["name"],
"en_name": row["en_name"] if "en_name" in row.keys() else "",
"note": row["note"],
"owner_id": row["owner_id"],
"owner_name": (owner["display_name"] or owner["username"]) if owner else None,
"version": row["version"] if "version" in row.keys() else 1,
"ingredients": [{"oil_name": i["oil_name"], "drops": i["drops"]} for i in ings],
"tags": [t["tag_name"] for t in tags],
"volume": row["volume"] if "volume" in row.keys() else "",
}
@app.get("/api/recipes")
def list_recipes(user=Depends(get_current_user)):
conn = get_db()
# Admin sees all; others see admin-owned (adopted) + their own
if user["role"] == "admin":
rows = conn.execute("SELECT id, name, note, owner_id, version, en_name, volume FROM recipes ORDER BY id").fetchall()
else:
admin = conn.execute("SELECT id FROM users WHERE role = 'admin' LIMIT 1").fetchone()
admin_id = admin["id"] if admin else 1
user_id = user.get("id")
if user_id:
rows = conn.execute(
"SELECT id, name, note, owner_id, version, en_name, volume FROM recipes WHERE owner_id = ? OR owner_id = ? ORDER BY id",
(admin_id, user_id)
).fetchall()
else:
rows = conn.execute(
"SELECT id, name, note, owner_id, version, en_name, volume FROM recipes WHERE owner_id = ? ORDER BY id",
(admin_id,)
).fetchall()
result = [_recipe_to_dict(conn, r) for r in rows]
conn.close()
return result
@app.get("/api/recipes/{recipe_id}")
def get_recipe(recipe_id: int):
conn = get_db()
row = conn.execute("SELECT id, name, note, owner_id, version, en_name, volume FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
if not row:
conn.close()
raise HTTPException(404, "Recipe not found")
result = _recipe_to_dict(conn, row)
conn.close()
return result
@app.post("/api/recipes", status_code=201)
def create_recipe(recipe: RecipeIn, user=Depends(get_current_user)):
if not user.get("id"):
raise HTTPException(401, "请先登录")
conn = get_db()
c = conn.cursor()
# Senior editors adding directly to public library: set owner to admin so everyone can see
owner_id = user["id"]
if user["role"] in ("senior_editor",):
admin = c.execute("SELECT id FROM users WHERE role = 'admin' LIMIT 1").fetchone()
if admin:
owner_id = admin["id"]
en_name = title_case(recipe.en_name) if recipe.en_name else auto_translate(recipe.name)
c.execute("INSERT INTO recipes (name, note, owner_id, en_name) VALUES (?, ?, ?, ?)",
(recipe.name, recipe.note, owner_id, en_name))
rid = c.lastrowid
for ing in recipe.ingredients:
c.execute(
"INSERT INTO recipe_ingredients (recipe_id, oil_name, drops) VALUES (?, ?, ?)",
(rid, ing.oil_name, ing.drops),
)
for tag in recipe.tags:
c.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag,))
c.execute("INSERT OR IGNORE INTO recipe_tags (recipe_id, tag_name) VALUES (?, ?)", (rid, tag))
# Only log for admin/senior_editor direct adds (share); others wait for adopt
if user["role"] in ("admin", "senior_editor"):
log_audit(conn, user["id"], "share_recipe", "recipe", rid, recipe.name)
who = user.get("display_name") or user["username"]
if user["role"] == "senior_editor":
# Senior editor adds directly — just inform admin
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", "📋 新配方已添加",
f"{who} 将配方「{recipe.name}」添加到了公共配方库。\n[recipe_id:{rid}]")
)
elif user["role"] not in ("admin",):
# Other users need review
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", "📝 新配方待审核",
f"{who} 共享了配方「{recipe.name}」,请到管理配方查看。\n[recipe_id:{rid}]")
)
conn.commit()
conn.close()
return {"id": rid}
def _check_recipe_permission(conn, recipe_id, user):
"""Check if user can modify this recipe. Requires editor+ role."""
row = conn.execute("SELECT owner_id, name FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
if not row:
raise HTTPException(404, "Recipe not found")
if user["role"] in ("admin", "senior_editor", "editor"):
return row
raise HTTPException(403, "权限不足")
@app.put("/api/recipes/{recipe_id}")
def update_recipe(recipe_id: int, update: RecipeUpdate, user=Depends(get_current_user)):
if not user.get("id"):
raise HTTPException(401, "请先登录")
conn = get_db()
c = conn.cursor()
_check_recipe_permission(conn, recipe_id, user)
# Optimistic locking: check version if provided
if update.version is not None:
current = c.execute("SELECT version FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
if current and current["version"] and current["version"] != update.version:
conn.close()
raise HTTPException(409, "此配方已被其他人修改,请刷新后重试")
# Snapshot before state for re-review diff notification
before_row = c.execute("SELECT name, note, en_name, volume FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
before_ings = [dict(r) for r in c.execute("SELECT oil_name, drops FROM recipe_ingredients WHERE recipe_id = ?", (recipe_id,)).fetchall()]
before_tags = set(r["tag_name"] for r in c.execute("SELECT tag_name FROM recipe_tags WHERE recipe_id = ?", (recipe_id,)).fetchall())
if update.name is not None:
c.execute("UPDATE recipes SET name = ? WHERE id = ?", (update.name, recipe_id))
# Re-translate en_name if name changed and no explicit en_name provided
if update.en_name is None:
c.execute("UPDATE recipes SET en_name = ? WHERE id = ?", (auto_translate(update.name), recipe_id))
if update.note is not None:
c.execute("UPDATE recipes SET note = ? WHERE id = ?", (update.note, recipe_id))
if update.en_name is not None:
c.execute("UPDATE recipes SET en_name = ? WHERE id = ?", (title_case(update.en_name), recipe_id))
if update.volume is not None:
c.execute("UPDATE recipes SET volume = ? WHERE id = ?", (update.volume, recipe_id))
if update.ingredients is not None:
c.execute("DELETE FROM recipe_ingredients WHERE recipe_id = ?", (recipe_id,))
for ing in update.ingredients:
c.execute(
"INSERT INTO recipe_ingredients (recipe_id, oil_name, drops) VALUES (?, ?, ?)",
(recipe_id, ing.oil_name, ing.drops),
)
if update.tags is not None:
c.execute("DELETE FROM recipe_tags WHERE recipe_id = ?", (recipe_id,))
for tag in update.tags:
c.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag,))
c.execute(
"INSERT OR IGNORE INTO recipe_tags (recipe_id, tag_name) VALUES (?, ?)",
(recipe_id, tag),
)
c.execute("UPDATE recipes SET updated_by = ?, version = COALESCE(version, 1) + 1 WHERE id = ?", (user["id"], recipe_id))
# Get recipe name for log
rname = c.execute("SELECT name FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
changed = []
if update.name is not None: changed.append("名称")
if update.ingredients is not None: changed.append("成分")
if update.tags is not None: changed.append("标签")
if update.note is not None: changed.append("备注")
if update.en_name is not None: changed.append("英文名")
log_audit(conn, user["id"], "update_recipe", "recipe", recipe_id,
rname["name"] if rname else update.name,
json.dumps({"changed": "".join(changed)}, ensure_ascii=False) if changed else None)
# Notify admin when non-admin user edits a recipe tagged 再次审核
after_tags = before_tags if update.tags is None else set(update.tags)
needs_review = "再次审核" in (before_tags | after_tags)
if user.get("role") != "admin" and needs_review and changed:
after_row = c.execute("SELECT name, note, en_name, volume FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
after_ings = [dict(r) for r in c.execute("SELECT oil_name, drops FROM recipe_ingredients WHERE recipe_id = ?", (recipe_id,)).fetchall()]
diff_lines = []
def _fmt_ings(ings):
return "".join(f"{i['oil_name']} {i['drops']}" for i in ings) or "(空)"
if update.name is not None and before_row["name"] != after_row["name"]:
diff_lines.append(f"名称:{before_row['name']}{after_row['name']}")
if update.ingredients is not None and before_ings != after_ings:
diff_lines.append(f"成分:{_fmt_ings(before_ings)}{_fmt_ings(after_ings)}")
if update.tags is not None and before_tags != after_tags:
diff_lines.append(f"标签:{''.join(sorted(before_tags)) or '(空)'}{''.join(sorted(after_tags)) or '(空)'}")
if update.note is not None and (before_row["note"] or "") != (after_row["note"] or ""):
diff_lines.append(f"备注:{before_row['note'] or '(空)'}{after_row['note'] or '(空)'}")
if update.en_name is not None and (before_row["en_name"] or "") != (after_row["en_name"] or ""):
diff_lines.append(f"英文名:{before_row['en_name'] or '(空)'}{after_row['en_name'] or '(空)'}")
if diff_lines:
editor = user.get("display_name") or user.get("username") or f"user#{user['id']}"
title = f"📝 再次审核配方被修改:{after_row['name']}"
body = f"{editor} 修改了配方「{after_row['name']}」:\n\n" + "\n".join(diff_lines)
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", title, body),
)
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/recipes/{recipe_id}")
def delete_recipe(recipe_id: int, user=Depends(get_current_user)):
if not user.get("id"):
raise HTTPException(401, "请先登录")
conn = get_db()
row = _check_recipe_permission(conn, recipe_id, user)
# Save full snapshot for undo
full = conn.execute("SELECT id, name, note, owner_id, version, en_name, volume FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
snapshot = _recipe_to_dict(conn, full)
log_audit(conn, user["id"], "delete_recipe", "recipe", recipe_id, row["name"],
json.dumps(snapshot, ensure_ascii=False))
conn.execute("DELETE FROM recipes WHERE id = ?", (recipe_id,))
conn.commit()
conn.close()
return {"ok": True}
# ── Adopt (admin takes ownership of editor recipes) ────
@app.post("/api/recipes/{recipe_id}/adopt")
def adopt_recipe(recipe_id: int, user=Depends(require_role("admin"))):
conn = get_db()
row = conn.execute("SELECT id, name, owner_id FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
if not row:
conn.close()
raise HTTPException(404, "Recipe not found")
if row["owner_id"] == user["id"]:
conn.close()
return {"ok": True, "msg": "already owned"}
old_owner = conn.execute("SELECT id, role, display_name, username FROM users WHERE id = ?", (row["owner_id"],)).fetchone()
old_name = (old_owner["display_name"] or old_owner["username"]) if old_owner else "unknown"
# Auto-fill en_name if missing
existing_en = conn.execute("SELECT en_name FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
if not existing_en["en_name"]:
conn.execute("UPDATE recipes SET en_name = ? WHERE id = ?", (auto_translate(row["name"]), recipe_id))
conn.execute("UPDATE recipes SET owner_id = ?, updated_by = ? WHERE id = ?", (user["id"], user["id"], recipe_id))
log_audit(conn, user["id"], "adopt_recipe", "recipe", recipe_id, row["name"],
json.dumps({"from_user": old_name}))
# Notify submitter that recipe was approved
if old_owner and old_owner["id"] != user["id"]:
conn.execute(
"INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)",
(old_owner["role"], "🎉 配方已采纳",
f"你共享的配方「{row['name']}」已被采纳到公共配方库!", old_owner["id"])
)
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/recipes/{recipe_id}/reject")
def reject_recipe(recipe_id: int, body: dict = None, user=Depends(require_role("admin", "senior_editor"))):
conn = get_db()
row = conn.execute("SELECT id, name, owner_id FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
if not row:
conn.close()
raise HTTPException(404, "Recipe not found")
reason = (body or {}).get("reason", "").strip()
# Notify submitter
old_owner = conn.execute("SELECT id, role, display_name, username FROM users WHERE id = ?", (row["owner_id"],)).fetchone()
if old_owner and old_owner["id"] != user["id"]:
msg = f"你共享的配方「{row['name']}」未被采纳。"
if reason:
msg += f"\n原因:{reason}"
msg += "\n你可以修改后重新共享。"
conn.execute(
"INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)",
(old_owner["role"], "配方未被采纳", msg, old_owner["id"])
)
# Delete the recipe
conn.execute("DELETE FROM recipe_ingredients WHERE recipe_id = ?", (recipe_id,))
conn.execute("DELETE FROM recipe_tags WHERE recipe_id = ?", (recipe_id,))
conn.execute("DELETE FROM recipes WHERE id = ?", (recipe_id,))
from_name = (old_owner["display_name"] or old_owner["username"]) if old_owner else "unknown"
log_audit(conn, user["id"], "reject_recipe", "recipe", recipe_id, row["name"],
json.dumps({"reason": reason, "from_user": from_name}))
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/recipes/{recipe_id}/recommend")
def recommend_recipe(recipe_id: int, body: dict = None, user=Depends(get_current_user)):
"""Senior editor recommends a recipe for admin approval."""
if user["role"] not in ("senior_editor", "admin"):
raise HTTPException(403, "权限不足")
conn = get_db()
recipe = conn.execute("SELECT name, owner_id FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
if not recipe:
conn.close()
raise HTTPException(404, "配方不存在")
who = user.get("display_name") or user.get("username")
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", "👍 配方推荐通过",
f"{who} 审核了配方「{recipe['name']}」并推荐通过,请最终确认。\n[recipe_id:{recipe_id}]")
)
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/recipes/{recipe_id}/assign-review")
def assign_recipe_review(recipe_id: int, body: dict, user=Depends(require_role("admin"))):
reviewer_id = body.get("user_id")
if not reviewer_id:
raise HTTPException(400, "请选择审核人")
conn = get_db()
recipe = conn.execute("SELECT name FROM recipes WHERE id = ?", (recipe_id,)).fetchone()
if not recipe:
conn.close()
raise HTTPException(404, "配方不存在")
reviewer = conn.execute("SELECT role, display_name, username FROM users WHERE id = ?", (reviewer_id,)).fetchone()
if not reviewer:
conn.close()
raise HTTPException(404, "用户不存在")
conn.execute(
"INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)",
(reviewer["role"], "📋 请审核配方",
f"管理员指派你审核配方「{recipe['name']}」,请到管理配方页面查看并反馈意见。\n[recipe_id:{recipe_id}]",
reviewer_id)
)
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/recipes/adopt-batch")
def adopt_batch(body: dict, user=Depends(require_role("admin"))):
ids = body.get("ids", [])
if not ids:
raise HTTPException(400, "No recipe ids provided")
conn = get_db()
adopted = 0
for rid in ids:
row = conn.execute("SELECT id, name, owner_id FROM recipes WHERE id = ?", (rid,)).fetchone()
if row and row["owner_id"] != user["id"]:
conn.execute("UPDATE recipes SET owner_id = ?, updated_by = ? WHERE id = ?", (user["id"], user["id"], rid))
log_audit(conn, user["id"], "adopt_recipe", "recipe", rid, row["name"])
adopted += 1
conn.commit()
conn.close()
return {"ok": True, "adopted": adopted}
# ── Tags ────────────────────────────────────────────────
@app.get("/api/tags")
def list_tags():
conn = get_db()
rows = conn.execute("SELECT name FROM tags ORDER BY name").fetchall()
conn.close()
return [r["name"] for r in rows]
@app.post("/api/tags", status_code=201)
def create_tag(body: dict, user=Depends(require_role("admin", "senior_editor", "editor"))):
name = body.get("name", "").strip()
if not name:
raise HTTPException(400, "Tag name required")
conn = get_db()
conn.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (name,))
log_audit(conn, user["id"], "create_tag", "tag", name, name, None)
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/tags/{name}")
def delete_tag(name: str, user=Depends(require_role("admin"))):
conn = get_db()
conn.execute("DELETE FROM recipe_tags WHERE tag_name = ?", (name,))
conn.execute("DELETE FROM tags WHERE name = ?", (name,))
log_audit(conn, user["id"], "delete_tag", "tag", name, name)
conn.commit()
conn.close()
return {"ok": True}
# ── Users (admin only) ─────────────────────────────────
@app.get("/api/users")
def list_users(user=Depends(require_role("admin"))):
conn = get_db()
rows = conn.execute("SELECT id, username, token, role, display_name, created_at, business_verified FROM users ORDER BY id").fetchall()
conn.close()
return [dict(r) for r in rows]
@app.post("/api/users", status_code=201)
def create_user(body: UserIn, user=Depends(require_role("admin"))):
token = _secrets.token_hex(24)
conn = get_db()
try:
conn.execute(
"INSERT INTO users (username, token, role, display_name) VALUES (?, ?, ?, ?)",
(body.username, token, body.role, body.display_name),
)
log_audit(conn, user["id"], "create_user", "user", body.username, body.display_name,
json.dumps({"role": body.role}))
conn.commit()
except Exception:
conn.close()
raise HTTPException(400, "用户名已存在")
conn.close()
return {"token": token, "username": body.username}
@app.delete("/api/users/{user_id}")
def delete_user(user_id: int, user=Depends(require_role("admin"))):
if user_id == user["id"]:
raise HTTPException(400, "不能删除自己")
conn = get_db()
target = conn.execute("SELECT id, username, token, role, display_name FROM users WHERE id = ?", (user_id,)).fetchone()
if not target:
conn.close()
raise HTTPException(404, "User not found")
# Transfer personal diary recipes to admin before deletion (skip duplicates)
target_name = target["display_name"] or target["username"]
diaries = conn.execute("SELECT id, name, ingredients FROM user_diary WHERE user_id = ?", (user_id,)).fetchall()
transferred = 0
if diaries:
# Build set of ingredient fingerprints from admin diary + public recipes
def _ings_key(ings_json):
"""Normalize ingredients to a comparable key."""
try:
ings = json.loads(ings_json) if isinstance(ings_json, str) else []
return tuple(sorted((i.get("oil") or i.get("oil_name", ""), i.get("drops", 0)) for i in ings))
except Exception:
return ()
existing_keys = set()
admin_diaries = conn.execute("SELECT ingredients FROM user_diary WHERE user_id = ?", (user["id"],)).fetchall()
for row in admin_diaries:
existing_keys.add(_ings_key(row["ingredients"]))
public_recipes = conn.execute("SELECT id FROM recipes").fetchall()
for pr in public_recipes:
pub_ings = conn.execute("SELECT oil_name, drops FROM recipe_ingredients WHERE recipe_id = ?", (pr["id"],)).fetchall()
existing_keys.add(tuple(sorted((r["oil_name"], r["drops"]) for r in pub_ings)))
for d in diaries:
d_key = _ings_key(d["ingredients"])
is_dup = d_key in existing_keys and d_key != ()
if is_dup:
conn.execute("DELETE FROM user_diary WHERE id = ?", (d["id"],))
else:
new_name = f"{d['name']}{target_name}"
conn.execute("UPDATE user_diary SET user_id = ?, name = ? WHERE id = ?",
(user["id"], new_name, d["id"]))
transferred += 1
snapshot = dict(target)
conn.execute("DELETE FROM users WHERE id = ?", (user_id,))
detail = dict(snapshot)
if diaries:
detail["transferred_diary_count"] = transferred
detail["skipped_duplicate_count"] = len(diaries) - transferred
log_audit(conn, user["id"], "delete_user", "user", user_id, target["username"],
json.dumps(detail, ensure_ascii=False))
conn.commit()
conn.close()
return {"ok": True}
@app.put("/api/users/{user_id}")
def update_user(user_id: int, body: UserUpdate, user=Depends(require_role("admin"))):
conn = get_db()
target = conn.execute("SELECT role, display_name, username FROM users WHERE id = ?", (user_id,)).fetchone()
old_role = target["role"] if target else "unknown"
target_name = (target["display_name"] or target["username"]) if target else "unknown"
if body.role is not None:
if body.role == "admin":
conn.close()
raise HTTPException(403, "不能将用户设为管理员")
conn.execute("UPDATE users SET role = ?, role_changed_at = datetime('now') WHERE id = ?", (body.role, user_id))
if body.display_name is not None:
conn.execute("UPDATE users SET display_name = ? WHERE id = ?", (body.display_name, user_id))
role_labels = {"admin": "管理员", "senior_editor": "高级编辑", "editor": "编辑", "viewer": "查看者"}
detail = {}
if body.role is not None and body.role != old_role:
detail["from_role"] = role_labels.get(old_role, old_role)
detail["to_role"] = role_labels.get(body.role, body.role)
if body.display_name is not None:
detail["display_name"] = body.display_name
log_audit(conn, user["id"], "update_user", "user", user_id, target_name,
json.dumps(detail, ensure_ascii=False))
conn.commit()
conn.close()
return {"ok": True}
# ── Undo (admin only) ──────────────────────────────────
@app.post("/api/audit-log/{log_id}/undo")
def undo_action(log_id: int, user=Depends(require_role("admin"))):
conn = get_db()
entry = conn.execute("SELECT * FROM audit_log WHERE id = ?", (log_id,)).fetchone()
if not entry:
conn.close()
raise HTTPException(404, "Audit log entry not found")
action = entry["action"]
detail = entry["detail"]
if not detail:
conn.close()
raise HTTPException(400, "此操作无法撤销(无快照数据)")
snapshot = json.loads(detail)
if action == "delete_recipe":
# Restore recipe from snapshot
c = conn.cursor()
c.execute("INSERT INTO recipes (name, note, owner_id) VALUES (?, ?, ?)",
(snapshot["name"], snapshot.get("note", ""), snapshot.get("owner_id")))
new_id = c.lastrowid
for ing in snapshot.get("ingredients", []):
c.execute("INSERT INTO recipe_ingredients (recipe_id, oil_name, drops) VALUES (?, ?, ?)",
(new_id, ing["oil_name"], ing["drops"]))
for tag in snapshot.get("tags", []):
c.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag,))
c.execute("INSERT OR IGNORE INTO recipe_tags (recipe_id, tag_name) VALUES (?, ?)", (new_id, tag))
log_audit(conn, user["id"], "undo_delete_recipe", "recipe", new_id, snapshot["name"],
json.dumps({"original_log_id": log_id}))
conn.commit()
conn.close()
return {"ok": True, "new_id": new_id}
elif action == "delete_oil":
conn.execute(
"INSERT OR IGNORE INTO oils (name, bottle_price, drop_count) VALUES (?, ?, ?)",
(snapshot["name"], snapshot["bottle_price"], snapshot["drop_count"]))
log_audit(conn, user["id"], "undo_delete_oil", "oil", snapshot["name"], snapshot["name"],
json.dumps({"original_log_id": log_id}))
conn.commit()
conn.close()
return {"ok": True}
elif action == "delete_user":
try:
conn.execute(
"INSERT INTO users (username, token, role, display_name) VALUES (?, ?, ?, ?)",
(snapshot["username"], snapshot["token"], snapshot["role"], snapshot.get("display_name", "")))
log_audit(conn, user["id"], "undo_delete_user", "user", snapshot["username"], snapshot.get("display_name"),
json.dumps({"original_log_id": log_id}))
conn.commit()
except Exception:
conn.close()
raise HTTPException(400, "恢复失败(用户名可能已被占用)")
conn.close()
return {"ok": True}
conn.close()
raise HTTPException(400, "此操作类型不支持撤销")
# ── Audit Log (admin only) ─────────────────────────────
@app.get("/api/audit-log")
def get_audit_log(limit: int = 100, offset: int = 0, user=Depends(require_role("admin"))):
conn = get_db()
rows = conn.execute(
"SELECT a.id, a.action, a.target_type, a.target_id, a.target_name, a.detail, a.created_at, "
"u.display_name as user_name, u.username "
"FROM audit_log a LEFT JOIN users u ON a.user_id = u.id "
"ORDER BY a.id DESC LIMIT ? OFFSET ?",
(limit, offset),
).fetchall()
conn.close()
return [dict(r) for r in rows]
# ── Brand Settings ─────────────────────────────────────
@app.get("/api/brand")
def get_brand(user=Depends(get_current_user)):
if not user["id"]:
return {"qr_code": None, "brand_logo": None, "brand_bg": None, "brand_name": None, "brand_align": "center"}
conn = get_db()
row = conn.execute("SELECT qr_code, brand_logo, brand_bg, brand_name, brand_align FROM users WHERE id = ?", (user["id"],)).fetchone()
conn.close()
if not row:
return {"qr_code": None, "brand_logo": None, "brand_bg": None, "brand_name": None, "brand_align": "center"}
return dict(row)
@app.put("/api/brand")
def update_brand(body: dict, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
conn = get_db()
if "qr_code" in body:
# Limit size: ~500KB base64
if body["qr_code"] and len(body["qr_code"]) > 700000:
raise HTTPException(400, "图片太大,请压缩后上传")
conn.execute("UPDATE users SET qr_code = ? WHERE id = ?", (body["qr_code"], user["id"]))
if "brand_logo" in body:
if body["brand_logo"] and len(body["brand_logo"]) > 700000:
raise HTTPException(400, "图片太大,请压缩后上传")
conn.execute("UPDATE users SET brand_logo = ? WHERE id = ?", (body["brand_logo"], user["id"]))
if "brand_bg" in body:
if body["brand_bg"] and len(body["brand_bg"]) > 1500000:
raise HTTPException(400, "背景图太大请压缩到1MB以内")
conn.execute("UPDATE users SET brand_bg = ? WHERE id = ?", (body["brand_bg"], user["id"]))
if "brand_name" in body:
conn.execute("UPDATE users SET brand_name = ? WHERE id = ?", (body["brand_name"], user["id"]))
if "brand_align" in body:
conn.execute("UPDATE users SET brand_align = ? WHERE id = ?", (body["brand_align"], user["id"]))
conn.commit()
conn.close()
return {"ok": True}
# ── Profit Projects ────────────────────────────────────
@app.get("/api/projects")
def list_projects():
conn = get_db()
rows = conn.execute("SELECT * FROM profit_projects ORDER BY id DESC").fetchall()
conn.close()
result = []
for r in rows:
d = dict(r)
d["ingredients"] = json.loads(r["ingredients"])
try:
extra = json.loads(r["pricing"]) if r["pricing"] else {}
if isinstance(extra, dict):
d.update(extra)
except (json.JSONDecodeError, TypeError):
pass
result.append(d)
return result
@app.post("/api/projects", status_code=201)
def create_project(body: dict, user=Depends(require_role("admin", "senior_editor"))):
conn = get_db()
c = conn.cursor()
extra = {}
for k in ("selling_price", "packaging_cost", "labor_cost", "other_cost", "quantity"):
if k in body:
extra[k] = body[k]
c.execute(
"INSERT INTO profit_projects (name, ingredients, pricing, note, created_by) VALUES (?, ?, ?, ?, ?)",
(body["name"], json.dumps(body.get("ingredients", []), ensure_ascii=False),
json.dumps(extra) if extra else '{}', body.get("note", ""), user["id"])
)
conn.commit()
pid = c.lastrowid
conn.close()
return {"id": pid}
@app.put("/api/projects/{pid}")
def update_project(pid: int, body: dict, user=Depends(require_role("admin", "senior_editor"))):
conn = get_db()
if "name" in body:
conn.execute("UPDATE profit_projects SET name = ? WHERE id = ?", (body["name"], pid))
if "ingredients" in body:
conn.execute("UPDATE profit_projects SET ingredients = ? WHERE id = ?",
(json.dumps(body["ingredients"], ensure_ascii=False), pid))
if "pricing" in body:
conn.execute("UPDATE profit_projects SET pricing = ? WHERE id = ?", (body["pricing"], pid))
if "note" in body:
conn.execute("UPDATE profit_projects SET note = ? WHERE id = ?", (body["note"], pid))
# Store extra cost fields in pricing as JSON
extra = {}
for k in ("selling_price", "packaging_cost", "labor_cost", "other_cost", "quantity"):
if k in body:
extra[k] = body[k]
if extra:
conn.execute("UPDATE profit_projects SET pricing = ? WHERE id = ?",
(json.dumps(extra, ensure_ascii=False), pid))
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/projects/{pid}")
def delete_project(pid: int, user=Depends(require_role("admin", "senior_editor"))):
conn = get_db()
conn.execute("DELETE FROM profit_projects WHERE id = ?", (pid,))
conn.commit()
conn.close()
return {"ok": True}
# ── Diary (personal recipes + journal) ─────────────────
@app.get("/api/diary")
def list_diary(user=Depends(get_current_user)):
if not user["id"]:
return []
conn = get_db()
rows = conn.execute(
"SELECT id, source_recipe_id, name, ingredients, note, tags, created_at "
"FROM user_diary WHERE user_id = ? ORDER BY id DESC", (user["id"],)
).fetchall()
result = []
for r in rows:
entries = conn.execute(
"SELECT id, content, created_at FROM diary_entries WHERE diary_id = ? ORDER BY created_at DESC", (r["id"],)
).fetchall()
d = dict(r)
d["ingredients"] = json.loads(r["ingredients"])
d["tags"] = json.loads(r["tags"] or "[]")
d["entries"] = [dict(e) for e in entries]
result.append(d)
conn.close()
return result
@app.post("/api/diary", status_code=201)
def create_diary(body: dict, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
name = body.get("name", "").strip()
ingredients = body.get("ingredients", [])
note = body.get("note", "")
tags = body.get("tags", [])
source_id = body.get("source_recipe_id")
if not name:
raise HTTPException(400, "请输入配方名称")
conn = get_db()
c = conn.cursor()
c.execute(
"INSERT INTO user_diary (user_id, source_recipe_id, name, ingredients, note, tags) VALUES (?, ?, ?, ?, ?, ?)",
(user["id"], source_id, name, json.dumps(ingredients, ensure_ascii=False), note, json.dumps(tags, ensure_ascii=False))
)
conn.commit()
did = c.lastrowid
conn.close()
return {"id": did}
@app.put("/api/diary/{diary_id}")
def update_diary(diary_id: int, body: dict, user=Depends(get_current_user)):
conn = get_db()
row = conn.execute("SELECT user_id FROM user_diary WHERE id = ?", (diary_id,)).fetchone()
if not row or row["user_id"] != user["id"]:
conn.close()
raise HTTPException(403, "无权操作")
if "name" in body:
conn.execute("UPDATE user_diary SET name = ? WHERE id = ?", (body["name"], diary_id))
if "note" in body:
conn.execute("UPDATE user_diary SET note = ? WHERE id = ?", (body["note"], diary_id))
if "ingredients" in body:
conn.execute("UPDATE user_diary SET ingredients = ? WHERE id = ?",
(json.dumps(body["ingredients"], ensure_ascii=False), diary_id))
if "tags" in body:
conn.execute("UPDATE user_diary SET tags = ? WHERE id = ?",
(json.dumps(body["tags"], ensure_ascii=False), diary_id))
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/diary/{diary_id}")
def delete_diary(diary_id: int, user=Depends(get_current_user)):
conn = get_db()
row = conn.execute("SELECT user_id FROM user_diary WHERE id = ?", (diary_id,)).fetchone()
if not row or row["user_id"] != user["id"]:
conn.close()
raise HTTPException(403, "无权操作")
conn.execute("DELETE FROM user_diary WHERE id = ?", (diary_id,))
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/diary/{diary_id}/entries", status_code=201)
def add_diary_entry(diary_id: int, body: dict, user=Depends(get_current_user)):
conn = get_db()
row = conn.execute("SELECT user_id FROM user_diary WHERE id = ?", (diary_id,)).fetchone()
if not row or row["user_id"] != user["id"]:
conn.close()
raise HTTPException(403, "无权操作")
content = body.get("content", "").strip()
if not content:
raise HTTPException(400, "内容不能为空")
conn.execute("INSERT INTO diary_entries (diary_id, content) VALUES (?, ?)", (diary_id, content))
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/diary/entries/{entry_id}")
def delete_diary_entry(entry_id: int, user=Depends(get_current_user)):
conn = get_db()
row = conn.execute(
"SELECT d.user_id FROM diary_entries e JOIN user_diary d ON e.diary_id = d.id WHERE e.id = ?",
(entry_id,)
).fetchone()
if not row or row["user_id"] != user["id"]:
conn.close()
raise HTTPException(403, "无权操作")
conn.execute("DELETE FROM diary_entries WHERE id = ?", (entry_id,))
conn.commit()
conn.close()
return {"ok": True}
# ── Favorites ──────────────────────────────────────────
@app.get("/api/favorites")
def get_favorites(user=Depends(get_current_user)):
if not user["id"]:
return []
conn = get_db()
rows = conn.execute("SELECT recipe_id FROM user_favorites WHERE user_id = ? ORDER BY created_at DESC", (user["id"],)).fetchall()
conn.close()
return [r["recipe_id"] for r in rows]
@app.post("/api/favorites/{recipe_id}", status_code=201)
def add_favorite(recipe_id: int, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
conn = get_db()
conn.execute("INSERT OR IGNORE INTO user_favorites (user_id, recipe_id) VALUES (?, ?)", (user["id"], recipe_id))
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/favorites/{recipe_id}")
def remove_favorite(recipe_id: int, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
conn = get_db()
conn.execute("DELETE FROM user_favorites WHERE user_id = ? AND recipe_id = ?", (user["id"], recipe_id))
conn.commit()
conn.close()
return {"ok": True}
# ── User Inventory ─────────────────────────────────────
@app.get("/api/inventory")
def get_inventory(user=Depends(get_current_user)):
if not user["id"]:
return []
conn = get_db()
rows = conn.execute("SELECT oil_name FROM user_inventory WHERE user_id = ? ORDER BY oil_name", (user["id"],)).fetchall()
conn.close()
return [r["oil_name"] for r in rows]
@app.post("/api/inventory", status_code=201)
def add_inventory(body: dict, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
name = body.get("oil_name", "").strip()
if not name:
raise HTTPException(400, "oil_name required")
conn = get_db()
conn.execute("INSERT OR IGNORE INTO user_inventory (user_id, oil_name) VALUES (?, ?)", (user["id"], name))
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/inventory/{oil_name}")
def remove_inventory(oil_name: str, user=Depends(get_current_user)):
if not user["id"]:
raise HTTPException(403, "请先登录")
conn = get_db()
conn.execute("DELETE FROM user_inventory WHERE user_id = ? AND oil_name = ?", (user["id"], oil_name))
conn.commit()
conn.close()
return {"ok": True}
@app.get("/api/inventory/recipes")
def recipes_by_inventory(user=Depends(get_current_user)):
"""Get recipes that can be made with user's inventory oils."""
if not user["id"]:
return []
conn = get_db()
inv = [r["oil_name"] for r in conn.execute(
"SELECT oil_name FROM user_inventory WHERE user_id = ?", (user["id"],)).fetchall()]
if not inv:
conn.close()
return []
rows = conn.execute("SELECT id, name, note, owner_id, version, en_name, volume FROM recipes ORDER BY id").fetchall()
result = []
for r in rows:
recipe = _recipe_to_dict(conn, r)
eo_oils = [i["oil_name"] for i in recipe["ingredients"] if i["oil_name"] != "椰子油"]
matched = [o for o in eo_oils if o in inv]
if matched:
recipe["inventory_match"] = len(matched)
recipe["inventory_total"] = len(eo_oils)
recipe["inventory_missing"] = [o for o in eo_oils if o not in inv]
result.append(recipe)
conn.close()
result.sort(key=lambda r: (-r["inventory_match"], r["inventory_total"] - r["inventory_match"]))
return result
# ── Search Logging ─────────────────────────────────────
# ── Search Logging ─────────────────────────────────────
@app.post("/api/search-log")
def log_search(body: dict, user=Depends(get_current_user)):
query = body.get("query", "").strip()
matched = body.get("matched_count", 0)
if not query:
return {"ok": True}
conn = get_db()
conn.execute("INSERT INTO search_log (user_id, query, matched_count) VALUES (?, ?, ?)",
(user.get("id"), query, matched))
# Instant notification when no match found
if matched == 0:
who = user.get("display_name") or user.get("username") or "用户"
for role in ("admin", "senior_editor"):
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
(role, "🔍 有人搜索了未收录的配方",
f"{who} 搜索了「{query}」但没有找到匹配配方,请考虑添加。")
)
conn.commit()
conn.close()
return {"ok": True, "matched": matched}
@app.get("/api/search-log/unmatched")
def get_unmatched_searches(days: int = 7, user=Depends(require_role("admin", "senior_editor"))):
conn = get_db()
rows = conn.execute(
"SELECT query, COUNT(*) as cnt, MAX(created_at) as last_at "
"FROM search_log WHERE matched_count = 0 AND created_at > datetime('now', ?) "
"GROUP BY query ORDER BY cnt DESC LIMIT 50",
(f"-{days} days",)
).fetchall()
conn.close()
return [dict(r) for r in rows]
# ── Recipe review history ──────────────────────────────
@app.get("/api/recipe-reviews")
def list_recipe_reviews(user=Depends(require_role("admin"))):
conn = get_db()
rows = conn.execute(
"SELECT a.id, a.action, a.target_name, a.detail, a.created_at, "
"u.display_name, u.username "
"FROM audit_log a LEFT JOIN users u ON a.user_id = u.id "
"WHERE a.action IN ('adopt_recipe', 'reject_recipe') "
"ORDER BY a.id DESC LIMIT 100"
).fetchall()
conn.close()
return [dict(r) for r in rows]
# ── Contribution stats ─────────────────────────────────
@app.get("/api/me/contribution")
def my_contribution(user=Depends(get_current_user)):
if not user.get("id"):
return {"adopted_count": 0, "shared_count": 0, "adopted_names": [], "pending_names": []}
conn = get_db()
display = user.get("display_name") or user.get("username")
# adopted: unique recipe names adopted from this user
adopted_rows = conn.execute(
"SELECT DISTINCT target_name FROM audit_log WHERE action = 'adopt_recipe' AND detail LIKE ?",
(f'%"from_user": "{display}"%',)
).fetchall()
adopted_names = list(set(r["target_name"] for r in adopted_rows if r["target_name"]))
# pending: recipes still owned by user in public library (skip admin — admin owns all public recipes)
if user.get("role") == "admin":
pending_names = []
else:
pending_rows = conn.execute(
"SELECT name FROM recipes WHERE owner_id = ?", (user["id"],)
).fetchall()
pending_names = [r["name"] for r in pending_rows]
# rejected: unique recipe names rejected (not already adopted or pending)
rejected_rows = conn.execute(
"SELECT DISTINCT target_name FROM audit_log WHERE action = 'reject_recipe' AND detail LIKE ?",
(f'%"from_user": "{display}"%',)
).fetchall()
rejected_names = set(r["target_name"] for r in rejected_rows if r["target_name"])
# Unique names across all: same recipe rejected then re-submitted counts as 1
all_names = set(adopted_names) | set(pending_names) | rejected_names
conn.close()
return {
"adopted_count": len(adopted_names),
"shared_count": len(all_names),
"adopted_names": adopted_names,
"pending_names": pending_names,
}
# ── Notifications ──────────────────────────────────────
@app.get("/api/notifications")
def get_notifications(user=Depends(get_current_user)):
if not user["id"]:
return []
conn = get_db()
# Only show notifications after user registration or last role change (whichever is later)
user_row = conn.execute("SELECT created_at, role_changed_at FROM users WHERE id = ?", (user["id"],)).fetchone()
cutoff = "2000-01-01"
if user_row:
cutoff = user_row["created_at"] or cutoff
if user_row["role_changed_at"] and user_row["role_changed_at"] > cutoff:
cutoff = user_row["role_changed_at"]
rows = conn.execute(
"SELECT id, title, body, is_read, created_at FROM notifications "
"WHERE (target_user_id = ? OR (target_user_id IS NULL AND (target_role = ? OR target_role = 'all'))) "
"AND created_at >= ? "
"ORDER BY is_read ASC, id DESC LIMIT 200",
(user["id"], user["role"], cutoff)
).fetchall()
conn.close()
return [dict(r) for r in rows]
@app.post("/api/notifications/{nid}/read")
def mark_notification_read(nid: int, body: dict = None, user=Depends(get_current_user)):
conn = get_db()
notif = conn.execute("SELECT title FROM notifications WHERE id = ?", (nid,)).fetchone()
# Bug test notifications can only be marked read with force=true (from "已测试" flow)
force = (body or {}).get("force", False) if body else False
if notif and "待测试" in (notif["title"] or "") and not force:
conn.close()
raise HTTPException(400, "请先点击「已测试」完成测试")
conn.execute("UPDATE notifications SET is_read = 1 WHERE id = ?", (nid,))
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/notifications/{nid}/added")
def mark_notification_added(nid: int, user=Depends(get_current_user)):
"""Mark a 'search missing' notification as handled: notify others and the original requester."""
conn = get_db()
notif = conn.execute("SELECT title, body FROM notifications WHERE id = ?", (nid,)).fetchone()
if not notif:
conn.close()
raise HTTPException(404, "通知不存在")
# Mark this one as read
conn.execute("UPDATE notifications SET is_read = 1 WHERE id = ?", (nid,))
who = user.get("display_name") or user.get("username")
title = notif["title"] or ""
# Extract query from title "🔍 用户需求XXX"
query = title.replace("🔍 用户需求:", "").strip() if "用户需求" in title else title
# Mark all same-title notifications as read
conn.execute("UPDATE notifications SET is_read = 1 WHERE title = ? AND is_read = 0", (title,))
# Notify other editors that it's been handled
for role in ("admin", "senior_editor"):
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
(role, "✅ 配方已添加",
f"{who} 已为「{query}」添加了配方,无需重复处理。")
)
# Notify the original requester (search the body for who searched)
body_text = notif["body"] or ""
# body format: "XXX 搜索了「YYY」..."
if "搜索了" in body_text:
requester_name = body_text.split(" 搜索了")[0].strip()
# Find the user
requester = conn.execute(
"SELECT id, role FROM users WHERE display_name = ? OR username = ?",
(requester_name, requester_name)
).fetchone()
if requester:
conn.execute(
"INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)",
(requester["role"], "🎉 你搜索的配方已添加",
f"你之前搜索的「{query}」已有编辑添加了配方,快去查看吧!", requester["id"])
)
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/notifications/{nid}/unread")
def mark_notification_unread(nid: int, user=Depends(get_current_user)):
conn = get_db()
conn.execute("UPDATE notifications SET is_read = 0 WHERE id = ?", (nid,))
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/notifications/read-all")
def mark_all_notifications_read(user=Depends(get_current_user)):
conn = get_db()
# Mark all as read EXCEPT bug test notifications (title contains '待测试')
conn.execute(
"UPDATE notifications SET is_read = 1 WHERE is_read = 0 "
"AND title NOT LIKE '%待测试%' "
"AND (target_user_id = ? OR (target_user_id IS NULL AND (target_role = ? OR target_role = 'all')))",
(user["id"], user["role"])
)
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/cron/weekly-review")
def weekly_review(user=Depends(require_role("admin"))):
"""Generate weekly notifications. Call via cron or manually."""
conn = get_db()
# 1. Pending recipes for admin review
admin = conn.execute("SELECT id FROM users WHERE role = 'admin' LIMIT 1").fetchone()
admin_id = admin["id"] if admin else 1
pending = conn.execute(
"SELECT COUNT(*) as cnt FROM recipes WHERE owner_id != ?", (admin_id,)
).fetchone()["cnt"]
if pending > 0:
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
("admin", f"{pending} 条配方待审核",
"其他用户新增了配方,请到「管理配方」→「待审核」查看并采纳。")
)
# 2. Unmatched searches for admin + senior_editor
unmatched = conn.execute(
"SELECT query, COUNT(*) as cnt FROM search_log "
"WHERE matched_count = 0 AND created_at > datetime('now', '-7 days') "
"GROUP BY query ORDER BY cnt DESC LIMIT 10"
).fetchall()
if unmatched:
queries = "".join([f"{r['query']}」({r['cnt']}次)" for r in unmatched])
for role in ("admin", "senior_editor"):
conn.execute(
"INSERT INTO notifications (target_role, title, body) VALUES (?, ?, ?)",
(role, "本周未匹配的搜索需求",
f"以下搜索没有找到配方:{queries}。请考虑完善相关配方。")
)
conn.commit()
conn.close()
return {"ok": True, "pending_recipes": pending, "unmatched_queries": len(unmatched)}
# ── Category Modules (homepage) ────────────────────────
@app.get("/api/categories")
def list_categories():
conn = get_db()
rows = conn.execute("SELECT * FROM category_modules ORDER BY sort_order, id").fetchall()
conn.close()
return [dict(r) for r in rows]
@app.post("/api/categories", status_code=201)
def create_category(body: dict, user=Depends(require_role("admin"))):
conn = get_db()
conn.execute(
"INSERT INTO category_modules (name, subtitle, icon, bg_image, color_from, color_to, tag_name, sort_order) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(body["name"], body.get("subtitle", ""), body.get("icon", "🌿"), body.get("bg_image", ""),
body.get("color_from", "#7a9e7e"), body.get("color_to", "#5a7d5e"),
body["tag_name"], body.get("sort_order", 0))
)
conn.commit()
conn.close()
return {"ok": True}
@app.delete("/api/categories/{cat_id}")
def delete_category(cat_id: int, user=Depends(require_role("admin"))):
conn = get_db()
conn.execute("DELETE FROM category_modules WHERE id = ?", (cat_id,))
conn.commit()
conn.close()
return {"ok": True}
# ── Static files (frontend) ────────────────────────────
FRONTEND_DIR = os.environ.get("FRONTEND_DIR", "/app/frontend")
@app.on_event("startup")
def startup():
init_db()
defaults_path = os.path.join(os.path.dirname(__file__), "defaults.json")
if os.path.exists(defaults_path):
with open(defaults_path) as f:
data = json.load(f)
seed_defaults(data["oils_meta"], data["recipes"], DEFAULT_OIL_CARDS)
# One-time migration: sync display_name = username, notify about username change
conn = get_db()
needs_sync = conn.execute("SELECT id, username, display_name FROM users WHERE display_name != username AND display_name IS NOT NULL").fetchall()
if needs_sync:
for row in needs_sync:
conn.execute("UPDATE users SET display_name = ? WHERE id = ?", (row["username"], row["id"]))
# Send notification once (check if already sent)
already_notified = conn.execute("SELECT id FROM notifications WHERE title = '📢 用户名变更通知'").fetchone()
if not already_notified:
all_users = conn.execute("SELECT id FROM users").fetchall()
for u in all_users:
conn.execute(
"INSERT INTO notifications (target_role, title, body, target_user_id) VALUES (?, ?, ?, ?)",
("viewer", "📢 用户名更新提醒",
"为了统一体验,系统已将显示名称合并为用户名。你有一次修改用户名的机会,修改后将不可更改,请慎重选择。", u["id"])
)
conn.commit()
print(f"[INIT] Synced display_name for {len(needs_sync)} users")
conn.close()
# Auto-fill missing en_name for existing recipes
conn = get_db()
missing = conn.execute("SELECT id, name FROM recipes WHERE en_name IS NULL OR en_name = ''").fetchall()
for row in missing:
conn.execute("UPDATE recipes SET en_name = ? WHERE id = ?", (auto_translate(row["name"]), row["id"]))
if missing:
conn.commit()
print(f"[INIT] Auto-translated {len(missing)} recipe names to English")
conn.close()
if os.path.isdir(FRONTEND_DIR):
# Serve static assets (js/css/images) directly
app.mount("/assets", StaticFiles(directory=os.path.join(FRONTEND_DIR, "assets")), name="assets")
app.mount("/public", StaticFiles(directory=FRONTEND_DIR), name="public")
# SPA fallback: any non-API, non-asset route returns index.html
from fastapi.responses import FileResponse
@app.get("/{path:path}")
async def spa_fallback(path: str):
# Serve actual files if they exist (favicon, icons, etc.)
file_path = os.path.join(FRONTEND_DIR, path)
if os.path.isfile(file_path):
return FileResponse(file_path)
# Otherwise return index.html for Vue Router
return FileResponse(os.path.join(FRONTEND_DIR, "index.html"))