Files
schedule-planner/server.py
2026-04-06 13:46:31 +00:00

400 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Hera Planner - 数据持久化服务器"""
import json, os, shutil, hashlib, time
from datetime import datetime
from http.server import HTTPServer, SimpleHTTPRequestHandler
from pathlib import Path
DATA_DIR = os.environ.get('DATA_DIR', '/data')
STATIC_DIR = os.environ.get('STATIC_DIR', '/app/static')
BACKUP_DIR = os.path.join(DATA_DIR, 'backups')
PASS_FILE = os.path.join(DATA_DIR, 'password.txt')
BUDDY_FILE = os.path.join(DATA_DIR, 'sleep_buddy.json')
BUDDY_PASS_FILE = os.path.join(DATA_DIR, 'buddy_password.txt')
DEFAULT_HASH = '8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92'
Path(DATA_DIR).mkdir(parents=True, exist_ok=True)
Path(BACKUP_DIR).mkdir(parents=True, exist_ok=True)
DATA_FILE = os.path.join(DATA_DIR, 'planner_data.json')
def get_pass_hash():
if os.path.exists(PASS_FILE): return open(PASS_FILE).read().strip()
return DEFAULT_HASH
def set_pass_hash(h):
with open(PASS_FILE, 'w') as f: f.write(h)
def get_buddy_pass():
if os.path.exists(BUDDY_PASS_FILE): return open(BUDDY_PASS_FILE).read().strip()
return DEFAULT_HASH
def set_buddy_pass(h):
with open(BUDDY_PASS_FILE, 'w') as f: f.write(h)
def load_data():
if os.path.exists(DATA_FILE):
with open(DATA_FILE, 'r', encoding='utf-8') as f: return json.load(f)
return {}
def save_data(data):
tmp = DATA_FILE + '.tmp'
with open(tmp, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False)
os.replace(tmp, DATA_FILE)
def load_buddy():
if os.path.exists(BUDDY_FILE):
with open(BUDDY_FILE, 'r', encoding='utf-8') as f: return json.load(f)
return {'users': {}, 'notifications': []}
def save_buddy(data):
tmp = BUDDY_FILE + '.tmp'
with open(tmp, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False)
os.replace(tmp, BUDDY_FILE)
def merge_protected(existing, incoming):
"""服务器端数据保护:所有重要数据都做合并,防止多设备覆盖丢失"""
# 按 id 合并sp_notes, sp_bugs, sp_goals, sp_checklists, sp_custom_reminders
# 按日期合并sp_sleep
# 深度合并sp_health_checks, sp_music_checks, sp_health_plans, sp_music_plans
# 嵌套合并sp_todos (每个象限按 id)sp_docs (按 doc id + entries)
# 不合并last-write-winssp_inbox, sp_reviews, sp_modules, sp_schedule 等简单配置
ARRAY_KEYS_BY_ID = ['sp_notes', 'sp_bugs', 'sp_goals', 'sp_checklists', 'sp_custom_reminders', 'sp_gym', 'sp_period'] # 按 id 合并
ARRAY_KEYS_BY_DATE = ['sp_sleep'] # 睡眠按日期合并
OBJECT_MERGE_KEYS = ['sp_health_checks', 'sp_music_checks', 'sp_health_plans', 'sp_music_plans'] # 打卡数据深度合并
# sp_todos 是 {q1:[],q2:[],q3:[],q4:[]},需要按 id 合并每个象限
NESTED_ARRAY_KEYS = ['sp_todos'] # 对象中的数组按 id 合并
for key in ARRAY_KEYS_BY_ID:
if key in existing and key in incoming:
try:
server = json.loads(existing[key]) if isinstance(existing[key], str) else existing[key]
client = json.loads(incoming[key]) if isinstance(incoming[key], str) else incoming[key]
if isinstance(server, list) and isinstance(client, list):
# 检查删除标记
deleted_key = key + '_deleted'
deleted_ids = set()
if deleted_key in incoming:
try: deleted_ids = set(json.loads(incoming[deleted_key]))
except: pass
# 按 id 合并,客户端优先,排除已删除的
merged = {r.get('id', i): r for i, r in enumerate(server) if r.get('id') not in deleted_ids}
for i, r in enumerate(client):
merged[r.get('id', f'c_{i}')] = r
incoming[key] = json.dumps(list(merged.values()))
except: pass
for key in ARRAY_KEYS_BY_DATE:
if key in existing and key in incoming:
try:
server = json.loads(existing[key]) if isinstance(existing[key], str) else existing[key]
client = json.loads(incoming[key]) if isinstance(incoming[key], str) else incoming[key]
if isinstance(server, list) and isinstance(client, list):
# 检查是否有删除标记
deleted_key = key + '_deleted'
deleted_dates = set()
if deleted_key in incoming:
try: deleted_dates = set(json.loads(incoming[deleted_key]))
except: pass
merged = {r['date']: r for r in server if r['date'] not in deleted_dates}
for r in client:
merged[r['date']] = r
incoming[key] = json.dumps(sorted(merged.values(), key=lambda x: x.get('date',''), reverse=True))
except: pass
for key in OBJECT_MERGE_KEYS:
if key in existing and key in incoming:
try:
server = json.loads(existing[key]) if isinstance(existing[key], str) else existing[key]
client = json.loads(incoming[key]) if isinstance(incoming[key], str) else incoming[key]
if isinstance(server, dict) and isinstance(client, dict):
# 深度合并:对于数组值按 id 合并,对于对象值合并 key
merged = dict(server)
for k, v in client.items():
if isinstance(v, list) and isinstance(merged.get(k), list):
m = {r.get('id', i): r for i, r in enumerate(merged[k])}
for i, r in enumerate(v):
m[r.get('id', f'c_{i}')] = r
merged[k] = list(m.values())
elif isinstance(v, dict) and isinstance(merged.get(k), dict):
merged[k] = {**merged[k], **v}
else:
merged[k] = v
incoming[key] = json.dumps(merged)
except: pass
# sp_todos: {q1:[...], q2:[...], ...} — 每个象限的数组按 id 合并
for key in NESTED_ARRAY_KEYS:
if key in existing and key in incoming:
try:
server = json.loads(existing[key]) if isinstance(existing[key], str) else existing[key]
client = json.loads(incoming[key]) if isinstance(incoming[key], str) else incoming[key]
if isinstance(server, dict) and isinstance(client, dict):
merged = {}
all_keys = set(list(server.keys()) + list(client.keys()))
for k in all_keys:
sv = server.get(k, [])
cv = client.get(k, [])
if isinstance(sv, list) and isinstance(cv, list):
m = {r.get('id', i): r for i, r in enumerate(sv)}
for i, r in enumerate(cv):
m[r.get('id', f'c_{i}')] = r
merged[k] = list(m.values())
else:
merged[k] = cv if cv else sv
incoming[key] = json.dumps(merged)
except: pass
# sp_docs: 按 doc id 合并,每个 doc 的 entries 也按内容合并
if 'sp_docs' in existing and 'sp_docs' in incoming:
try:
server_docs = json.loads(existing['sp_docs']) if isinstance(existing['sp_docs'], str) else existing['sp_docs']
client_docs = json.loads(incoming['sp_docs']) if isinstance(incoming['sp_docs'], str) else incoming['sp_docs']
if isinstance(server_docs, list) and isinstance(client_docs, list):
merged = {d.get('id', d.get('name', i)): d for i, d in enumerate(server_docs)}
for i, d in enumerate(client_docs):
did = d.get('id', d.get('name', f'c_{i}'))
if did in merged:
# 合并 entries客户端优先但保留服务器独有的
se = merged[did].get('entries', [])
ce = d.get('entries', [])
entry_map = {}
for e in se:
eid = e.get('noteId', '') or e.get('text', '')
entry_map[eid] = e
for e in ce:
eid = e.get('noteId', '') or e.get('text', '')
entry_map[eid] = e
d['entries'] = list(entry_map.values())
merged[did] = d
incoming['sp_docs'] = json.dumps(list(merged.values()))
except: pass
return incoming
def do_backup():
if not os.path.exists(DATA_FILE): return
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
shutil.copy2(DATA_FILE, os.path.join(BACKUP_DIR, f'planner_{ts}.json'))
for old in sorted(Path(BACKUP_DIR).glob('planner_*.json'))[:-30]: old.unlink()
def json_response(handler, code, data):
body = json.dumps(data, ensure_ascii=False).encode('utf-8')
handler.send_response(code)
handler.send_header('Content-Type', 'application/json; charset=utf-8')
handler.send_header('Content-Length', len(body))
handler.end_headers()
handler.wfile.write(body)
def read_body(handler):
length = int(handler.headers.get('Content-Length', 0))
return json.loads(handler.rfile.read(length))
def no_cache_html(handler, fpath):
with open(fpath, 'rb') as f: content = f.read()
handler.send_response(200)
handler.send_header('Content-Type', 'text/html; charset=utf-8')
handler.send_header('Content-Length', len(content))
handler.send_header('Cache-Control', 'no-cache, no-store, must-revalidate, max-age=0')
handler.send_header('Pragma', 'no-cache')
handler.send_header('Expires', '0')
handler.end_headers()
handler.wfile.write(content)
class Handler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=STATIC_DIR, **kwargs)
def do_GET(self):
if self.path == '/api/data':
json_response(self, 200, load_data())
elif self.path == '/api/backups':
backups = sorted(Path(BACKUP_DIR).glob('planner_*.json'), reverse=True)
items = [{'name': b.name, 'size': b.stat().st_size, 'time': b.stem.split('_',1)[1]} for b in backups[:20]]
json_response(self, 200, items)
elif self.path == '/api/sleep-buddy':
# 只返回睡眠数据,不返回其他任何数据
json_response(self, 200, load_buddy())
elif self.path == '/api/version':
# 返回 HTML 文件的修改时间戳,用于客户端自动刷新
try:
mtime = os.path.getmtime(os.path.join(STATIC_DIR, 'index.html'))
json_response(self, 200, {'v': str(int(mtime))})
except: json_response(self, 200, {'v': '0'})
elif self.path in ('/', '/index.html'):
no_cache_html(self, os.path.join(STATIC_DIR, 'index.html'))
elif self.path in ('/sleep', '/sleep-buddy', '/sleep-buddy.html'):
fpath = os.path.join(STATIC_DIR, 'sleep-buddy.html')
if os.path.exists(fpath): no_cache_html(self, fpath)
else: self.send_response(404); self.end_headers()
else:
super().do_GET()
def do_POST(self):
if self.path == '/api/login':
try:
body = read_body(self)
if body.get('hash', '') == get_pass_hash():
json_response(self, 200, {'ok': True})
else:
json_response(self, 401, {'ok': False, 'error': '密码不正确'})
except Exception as e:
json_response(self, 500, {'ok': False, 'error': str(e)})
elif self.path == '/api/change-password':
try:
body = read_body(self)
if body.get('oldHash', '') != get_pass_hash():
json_response(self, 401, {'ok': False, 'error': '当前密码不正确'})
else:
set_pass_hash(body.get('newHash', ''))
json_response(self, 200, {'ok': True})
except Exception as e:
json_response(self, 500, {'ok': False, 'error': str(e)})
elif self.path == '/api/data':
try:
data = read_body(self)
data.pop('sp_pass_hash', None)
existing = load_data()
# 服务器端数据保护:合并所有数组类型的数据,防止设备推送不完整数据导致丢失
# 对于客户端没有的 key保留服务器已有的新设备不会冲掉旧数据
if existing:
for k, v in existing.items():
if k.startswith('sp_') and k not in data:
data[k] = v # 客户端没这个 key保留服务器的
data = merge_protected(existing, data)
save_data(data)
do_backup()
json_response(self, 200, {'ok': True})
except Exception as e:
json_response(self, 500, {'ok': False, 'error': str(e)})
# ===== Sleep Buddy API完全独立不接触主数据 =====
elif self.path == '/api/buddy-register':
try:
body = read_body(self)
username = body.get('username', '').strip()
pw_hash = body.get('hash', '')
if not username or not pw_hash:
json_response(self, 400, {'ok': False, 'error': '请输入用户名和密码'})
else:
buddy = load_buddy()
accounts = buddy.get('accounts', {})
if username in accounts:
json_response(self, 400, {'ok': False, 'error': '用户名已存在'})
else:
accounts[username] = {'hash': pw_hash}
buddy['accounts'] = accounts
if username not in buddy.get('users', {}):
buddy.setdefault('users', {})[username] = []
save_buddy(buddy)
json_response(self, 200, {'ok': True})
except Exception as e:
json_response(self, 500, {'ok': False, 'error': str(e)})
elif self.path == '/api/buddy-delete-user':
# 只允许通过 Planner 密码删除(管理员操作)
try:
body = read_body(self)
admin_hash = body.get('adminHash', '')
username = body.get('username', '').strip()
if admin_hash != get_pass_hash():
json_response(self, 401, {'ok': False, 'error': '需要管理员密码'})
else:
buddy = load_buddy()
buddy.get('accounts', {}).pop(username, None)
buddy.get('users', {}).pop(username, None)
save_buddy(buddy)
json_response(self, 200, {'ok': True})
except Exception as e:
json_response(self, 500, {'ok': False, 'error': str(e)})
elif self.path == '/api/buddy-login':
try:
body = read_body(self)
username = body.get('username', '').strip()
pw_hash = body.get('hash', '')
buddy = load_buddy()
accounts = buddy.get('accounts', {})
if username in accounts and accounts[username].get('hash') == pw_hash:
json_response(self, 200, {'ok': True, 'username': username})
else:
json_response(self, 401, {'ok': False, 'error': '用户名或密码不正确'})
except Exception as e:
json_response(self, 500, {'ok': False, 'error': str(e)})
elif self.path == '/api/sleep-buddy':
try:
body = read_body(self)
buddy = load_buddy()
user = body.get('user', '')
action = body.get('action', '')
if action == 'record':
# 记录睡眠
if user not in buddy['users']: buddy['users'][user] = []
record = body.get('record', {})
# 去重
buddy['users'][user] = [r for r in buddy['users'][user] if r.get('date') != record.get('date')]
buddy['users'][user].append(record)
buddy['users'][user].sort(key=lambda x: x.get('date',''), reverse=True)
# 只保留最近60天
buddy['users'][user] = buddy['users'][user][:60]
save_buddy(buddy)
json_response(self, 200, {'ok': True})
elif action == 'delete-record':
date = body.get('date', '')
if user in buddy.get('users', {}):
buddy['users'][user] = [r for r in buddy['users'][user] if r.get('date') != date]
save_buddy(buddy)
json_response(self, 200, {'ok': True})
elif action == 'sleep-now':
# 标记去睡觉,通知其他人
buddy['notifications'].append({
'from': user,
'time': datetime.now().strftime('%H:%M'),
'date': datetime.now().strftime('%Y-%m-%d'),
'msg': f'{user} 去睡觉啦,你也早点休息!',
'ts': time.time(),
})
# 只保留最近20条通知
buddy['notifications'] = buddy['notifications'][-20:]
save_buddy(buddy)
json_response(self, 200, {'ok': True})
elif action == 'get-notifications':
# 获取未读通知不是自己发的最近24小时内的
cutoff = time.time() - 86400
notifs = [n for n in buddy.get('notifications', [])
if n.get('from') != user and n.get('ts', 0) > cutoff]
json_response(self, 200, {'notifications': notifs})
elif action == 'set-target':
target = body.get('target', '22:00')
buddy.setdefault('targets', {})[user] = target
save_buddy(buddy)
json_response(self, 200, {'ok': True})
else:
json_response(self, 200, buddy)
except Exception as e:
json_response(self, 500, {'ok': False, 'error': str(e)})
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
if '/api/' in (args[0] if args else ''):
print(f"[{datetime.now():%H:%M:%S}] {args[0]}")
if __name__ == '__main__':
port = int(os.environ.get('PORT', '8080'))
server = HTTPServer(('0.0.0.0', port), Handler)
print(f"Planner server running on port {port}")
server.serve_forever()