""" 轻量级服务端 Session 存储。 以客户端 IP + User-Agent 的哈希值作为 session key, 无需 cookie,无需前端改动。 每次 getScenes 写入,StartVoiceChat/StopVoiceChat 读取。 """ import hashlib from fastapi import Request # { session_key: { scene_id: { RoomId, UserId, TaskId } } } _store: dict[str, dict[str, dict]] = {} # { room_id: [{ role, content }] } _history_store: dict[str, list[dict]] = {} def _key(request: Request) -> str: forwarded = request.headers.get("X-Forwarded-For") ip = ( forwarded.split(",")[0].strip() if forwarded else (request.client.host if request.client else "unknown") ) ua = request.headers.get("User-Agent", "") return hashlib.sha256(f"{ip}:{ua}".encode()).hexdigest() def save_session(request: Request, scene_id: str, data: dict) -> None: key = _key(request) if key not in _store: _store[key] = {} _store[key][scene_id] = data def load_session(request: Request, scene_id: str) -> dict: return _store.get(_key(request), {}).get(scene_id, {}) def save_room_history(room_id: str, messages: list[dict]) -> None: _history_store[room_id] = messages def get_room_history(room_id: str) -> list[dict]: return _history_store.get(room_id, []) def clear_room_history(room_id: str) -> None: _history_store.pop(room_id, None)