#!/usr/bin/env python3 """Hera Planner - 数据持久化服务器""" import json, os, shutil, hashlib, time from datetime import datetime from http.server import HTTPServer, SimpleHTTPRequestHandler from pathlib import Path DATA_DIR = os.environ.get('DATA_DIR', '/data') STATIC_DIR = os.environ.get('STATIC_DIR', '/app/static') BACKUP_DIR = os.path.join(DATA_DIR, 'backups') PASS_FILE = os.path.join(DATA_DIR, 'password.txt') BUDDY_FILE = os.path.join(DATA_DIR, 'sleep_buddy.json') BUDDY_PASS_FILE = os.path.join(DATA_DIR, 'buddy_password.txt') DEFAULT_HASH = '8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92' Path(DATA_DIR).mkdir(parents=True, exist_ok=True) Path(BACKUP_DIR).mkdir(parents=True, exist_ok=True) DATA_FILE = os.path.join(DATA_DIR, 'planner_data.json') def get_pass_hash(): if os.path.exists(PASS_FILE): return open(PASS_FILE).read().strip() return DEFAULT_HASH def set_pass_hash(h): with open(PASS_FILE, 'w') as f: f.write(h) def get_buddy_pass(): if os.path.exists(BUDDY_PASS_FILE): return open(BUDDY_PASS_FILE).read().strip() return DEFAULT_HASH def set_buddy_pass(h): with open(BUDDY_PASS_FILE, 'w') as f: f.write(h) def load_data(): if os.path.exists(DATA_FILE): with open(DATA_FILE, 'r', encoding='utf-8') as f: return json.load(f) return {} def save_data(data): tmp = DATA_FILE + '.tmp' with open(tmp, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False) os.replace(tmp, DATA_FILE) def load_buddy(): if os.path.exists(BUDDY_FILE): with open(BUDDY_FILE, 'r', encoding='utf-8') as f: return json.load(f) return {'users': {}, 'notifications': []} def save_buddy(data): tmp = BUDDY_FILE + '.tmp' with open(tmp, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False) os.replace(tmp, BUDDY_FILE) def merge_protected(existing, incoming): """服务器端数据保护:所有重要数据都做合并,防止多设备覆盖丢失""" # 按 id 合并:sp_notes, sp_bugs, sp_goals, sp_checklists, sp_custom_reminders # 按日期合并:sp_sleep # 深度合并:sp_health_checks, sp_music_checks, sp_health_plans, sp_music_plans # 嵌套合并:sp_todos (每个象限按 id),sp_docs (按 doc id + entries) # 不合并(last-write-wins):sp_inbox, sp_reviews, sp_modules, sp_schedule 等简单配置 ARRAY_KEYS_BY_ID = ['sp_notes', 'sp_bugs', 'sp_goals', 'sp_checklists', 'sp_custom_reminders', 'sp_gym', 'sp_period'] # 按 id 合并 ARRAY_KEYS_BY_DATE = ['sp_sleep'] # 睡眠按日期合并 OBJECT_MERGE_KEYS = ['sp_health_checks', 'sp_music_checks', 'sp_health_plans', 'sp_music_plans'] # 打卡数据深度合并 # sp_todos 是 {q1:[],q2:[],q3:[],q4:[]},需要按 id 合并每个象限 NESTED_ARRAY_KEYS = ['sp_todos'] # 对象中的数组按 id 合并 for key in ARRAY_KEYS_BY_ID: if key in existing and key in incoming: try: server = json.loads(existing[key]) if isinstance(existing[key], str) else existing[key] client = json.loads(incoming[key]) if isinstance(incoming[key], str) else incoming[key] if isinstance(server, list) and isinstance(client, list): # 检查删除标记 deleted_key = key + '_deleted' deleted_ids = set() if deleted_key in incoming: try: deleted_ids = set(json.loads(incoming[deleted_key])) except: pass # 按 id 合并,客户端优先,排除已删除的 merged = {r.get('id', i): r for i, r in enumerate(server) if r.get('id') not in deleted_ids} for i, r in enumerate(client): merged[r.get('id', f'c_{i}')] = r incoming[key] = json.dumps(list(merged.values())) except: pass for key in ARRAY_KEYS_BY_DATE: if key in existing and key in incoming: try: server = json.loads(existing[key]) if isinstance(existing[key], str) else existing[key] client = json.loads(incoming[key]) if isinstance(incoming[key], str) else incoming[key] if isinstance(server, list) and isinstance(client, list): # 检查是否有删除标记 deleted_key = key + '_deleted' deleted_dates = set() if deleted_key in incoming: try: deleted_dates = set(json.loads(incoming[deleted_key])) except: pass merged = {r['date']: r for r in server if r['date'] not in deleted_dates} for r in client: merged[r['date']] = r incoming[key] = json.dumps(sorted(merged.values(), key=lambda x: x.get('date',''), reverse=True)) except: pass for key in OBJECT_MERGE_KEYS: if key in existing and key in incoming: try: server = json.loads(existing[key]) if isinstance(existing[key], str) else existing[key] client = json.loads(incoming[key]) if isinstance(incoming[key], str) else incoming[key] if isinstance(server, dict) and isinstance(client, dict): # 深度合并:对于数组值按 id 合并,对于对象值合并 key merged = dict(server) for k, v in client.items(): if isinstance(v, list) and isinstance(merged.get(k), list): m = {r.get('id', i): r for i, r in enumerate(merged[k])} for i, r in enumerate(v): m[r.get('id', f'c_{i}')] = r merged[k] = list(m.values()) elif isinstance(v, dict) and isinstance(merged.get(k), dict): merged[k] = {**merged[k], **v} else: merged[k] = v incoming[key] = json.dumps(merged) except: pass # sp_todos: {q1:[...], q2:[...], ...} — 每个象限的数组按 id 合并 for key in NESTED_ARRAY_KEYS: if key in existing and key in incoming: try: server = json.loads(existing[key]) if isinstance(existing[key], str) else existing[key] client = json.loads(incoming[key]) if isinstance(incoming[key], str) else incoming[key] if isinstance(server, dict) and isinstance(client, dict): merged = {} all_keys = set(list(server.keys()) + list(client.keys())) for k in all_keys: sv = server.get(k, []) cv = client.get(k, []) if isinstance(sv, list) and isinstance(cv, list): m = {r.get('id', i): r for i, r in enumerate(sv)} for i, r in enumerate(cv): m[r.get('id', f'c_{i}')] = r merged[k] = list(m.values()) else: merged[k] = cv if cv else sv incoming[key] = json.dumps(merged) except: pass # sp_docs: 按 doc id 合并,每个 doc 的 entries 也按内容合并 if 'sp_docs' in existing and 'sp_docs' in incoming: try: server_docs = json.loads(existing['sp_docs']) if isinstance(existing['sp_docs'], str) else existing['sp_docs'] client_docs = json.loads(incoming['sp_docs']) if isinstance(incoming['sp_docs'], str) else incoming['sp_docs'] if isinstance(server_docs, list) and isinstance(client_docs, list): merged = {d.get('id', d.get('name', i)): d for i, d in enumerate(server_docs)} for i, d in enumerate(client_docs): did = d.get('id', d.get('name', f'c_{i}')) if did in merged: # 合并 entries:客户端优先,但保留服务器独有的 se = merged[did].get('entries', []) ce = d.get('entries', []) entry_map = {} for e in se: eid = e.get('noteId', '') or e.get('text', '') entry_map[eid] = e for e in ce: eid = e.get('noteId', '') or e.get('text', '') entry_map[eid] = e d['entries'] = list(entry_map.values()) merged[did] = d incoming['sp_docs'] = json.dumps(list(merged.values())) except: pass return incoming def do_backup(): if not os.path.exists(DATA_FILE): return ts = datetime.now().strftime('%Y%m%d_%H%M%S') shutil.copy2(DATA_FILE, os.path.join(BACKUP_DIR, f'planner_{ts}.json')) for old in sorted(Path(BACKUP_DIR).glob('planner_*.json'))[:-30]: old.unlink() def json_response(handler, code, data): body = json.dumps(data, ensure_ascii=False).encode('utf-8') handler.send_response(code) handler.send_header('Content-Type', 'application/json; charset=utf-8') handler.send_header('Content-Length', len(body)) handler.end_headers() handler.wfile.write(body) def read_body(handler): length = int(handler.headers.get('Content-Length', 0)) return json.loads(handler.rfile.read(length)) def no_cache_html(handler, fpath): with open(fpath, 'rb') as f: content = f.read() handler.send_response(200) handler.send_header('Content-Type', 'text/html; charset=utf-8') handler.send_header('Content-Length', len(content)) handler.send_header('Cache-Control', 'no-cache, no-store, must-revalidate, max-age=0') handler.send_header('Pragma', 'no-cache') handler.send_header('Expires', '0') handler.end_headers() handler.wfile.write(content) class Handler(SimpleHTTPRequestHandler): def __init__(self, *args, **kwargs): super().__init__(*args, directory=STATIC_DIR, **kwargs) def do_GET(self): if self.path == '/api/data': json_response(self, 200, load_data()) elif self.path == '/api/backups': backups = sorted(Path(BACKUP_DIR).glob('planner_*.json'), reverse=True) items = [{'name': b.name, 'size': b.stat().st_size, 'time': b.stem.split('_',1)[1]} for b in backups[:20]] json_response(self, 200, items) elif self.path == '/api/sleep-buddy': # 只返回睡眠数据,不返回其他任何数据 json_response(self, 200, load_buddy()) elif self.path == '/api/version': # 返回 HTML 文件的修改时间戳,用于客户端自动刷新 try: mtime = os.path.getmtime(os.path.join(STATIC_DIR, 'index.html')) json_response(self, 200, {'v': str(int(mtime))}) except: json_response(self, 200, {'v': '0'}) elif self.path in ('/', '/index.html'): no_cache_html(self, os.path.join(STATIC_DIR, 'index.html')) elif self.path in ('/sleep', '/sleep-buddy', '/sleep-buddy.html'): fpath = os.path.join(STATIC_DIR, 'sleep-buddy.html') if os.path.exists(fpath): no_cache_html(self, fpath) else: self.send_response(404); self.end_headers() else: super().do_GET() def do_POST(self): if self.path == '/api/login': try: body = read_body(self) if body.get('hash', '') == get_pass_hash(): json_response(self, 200, {'ok': True}) else: json_response(self, 401, {'ok': False, 'error': '密码不正确'}) except Exception as e: json_response(self, 500, {'ok': False, 'error': str(e)}) elif self.path == '/api/change-password': try: body = read_body(self) if body.get('oldHash', '') != get_pass_hash(): json_response(self, 401, {'ok': False, 'error': '当前密码不正确'}) else: set_pass_hash(body.get('newHash', '')) json_response(self, 200, {'ok': True}) except Exception as e: json_response(self, 500, {'ok': False, 'error': str(e)}) elif self.path == '/api/data': try: data = read_body(self) data.pop('sp_pass_hash', None) existing = load_data() # 服务器端数据保护:合并所有数组类型的数据,防止设备推送不完整数据导致丢失 # 对于客户端没有的 key,保留服务器已有的(新设备不会冲掉旧数据) if existing: for k, v in existing.items(): if k.startswith('sp_') and k not in data: data[k] = v # 客户端没这个 key,保留服务器的 data = merge_protected(existing, data) save_data(data) do_backup() json_response(self, 200, {'ok': True}) except Exception as e: json_response(self, 500, {'ok': False, 'error': str(e)}) # ===== Sleep Buddy API(完全独立,不接触主数据) ===== elif self.path == '/api/buddy-register': try: body = read_body(self) username = body.get('username', '').strip() pw_hash = body.get('hash', '') if not username or not pw_hash: json_response(self, 400, {'ok': False, 'error': '请输入用户名和密码'}) else: buddy = load_buddy() accounts = buddy.get('accounts', {}) if username in accounts: json_response(self, 400, {'ok': False, 'error': '用户名已存在'}) else: accounts[username] = {'hash': pw_hash} buddy['accounts'] = accounts if username not in buddy.get('users', {}): buddy.setdefault('users', {})[username] = [] save_buddy(buddy) json_response(self, 200, {'ok': True}) except Exception as e: json_response(self, 500, {'ok': False, 'error': str(e)}) elif self.path == '/api/buddy-delete-user': # 只允许通过 Planner 密码删除(管理员操作) try: body = read_body(self) admin_hash = body.get('adminHash', '') username = body.get('username', '').strip() if admin_hash != get_pass_hash(): json_response(self, 401, {'ok': False, 'error': '需要管理员密码'}) else: buddy = load_buddy() buddy.get('accounts', {}).pop(username, None) buddy.get('users', {}).pop(username, None) save_buddy(buddy) json_response(self, 200, {'ok': True}) except Exception as e: json_response(self, 500, {'ok': False, 'error': str(e)}) elif self.path == '/api/buddy-login': try: body = read_body(self) username = body.get('username', '').strip() pw_hash = body.get('hash', '') buddy = load_buddy() accounts = buddy.get('accounts', {}) if username in accounts and accounts[username].get('hash') == pw_hash: json_response(self, 200, {'ok': True, 'username': username}) else: json_response(self, 401, {'ok': False, 'error': '用户名或密码不正确'}) except Exception as e: json_response(self, 500, {'ok': False, 'error': str(e)}) elif self.path == '/api/sleep-buddy': try: body = read_body(self) buddy = load_buddy() user = body.get('user', '') action = body.get('action', '') if action == 'record': # 记录睡眠 if user not in buddy['users']: buddy['users'][user] = [] record = body.get('record', {}) # 去重 buddy['users'][user] = [r for r in buddy['users'][user] if r.get('date') != record.get('date')] buddy['users'][user].append(record) buddy['users'][user].sort(key=lambda x: x.get('date',''), reverse=True) # 只保留最近60天 buddy['users'][user] = buddy['users'][user][:60] save_buddy(buddy) json_response(self, 200, {'ok': True}) elif action == 'delete-record': date = body.get('date', '') if user in buddy.get('users', {}): buddy['users'][user] = [r for r in buddy['users'][user] if r.get('date') != date] save_buddy(buddy) json_response(self, 200, {'ok': True}) elif action == 'sleep-now': # 标记去睡觉,通知其他人 buddy['notifications'].append({ 'from': user, 'time': datetime.now().strftime('%H:%M'), 'date': datetime.now().strftime('%Y-%m-%d'), 'msg': f'{user} 去睡觉啦,你也早点休息!', 'ts': time.time(), }) # 只保留最近20条通知 buddy['notifications'] = buddy['notifications'][-20:] save_buddy(buddy) json_response(self, 200, {'ok': True}) elif action == 'get-notifications': # 获取未读通知(不是自己发的,最近24小时内的) cutoff = time.time() - 86400 notifs = [n for n in buddy.get('notifications', []) if n.get('from') != user and n.get('ts', 0) > cutoff] json_response(self, 200, {'notifications': notifs}) elif action == 'set-target': target = body.get('target', '22:00') buddy.setdefault('targets', {})[user] = target save_buddy(buddy) json_response(self, 200, {'ok': True}) else: json_response(self, 200, buddy) except Exception as e: json_response(self, 500, {'ok': False, 'error': str(e)}) else: self.send_response(404) self.end_headers() def log_message(self, format, *args): if '/api/' in (args[0] if args else ''): print(f"[{datetime.now():%H:%M:%S}] {args[0]}") if __name__ == '__main__': port = int(os.environ.get('PORT', '8080')) server = HTTPServer(('0.0.0.0', port), Handler) print(f"Planner server running on port {port}") server.serve_forever()