import socketio import redis import json import asyncio from typing import Dict, List, Any, Optional from datetime import datetime, timedelta class SocketManager: def __init__(self, redis_url: str = "redis://redis:6379"): # Create Socket.IO server with Redis adapter for scaling self.redis_client = redis.Redis.from_url(redis_url, decode_responses=True) # Initialize Socket.IO with CORS self.sio = socketio.AsyncServer( cors_allowed_origins="*", async_mode="asgi", logger=True, engineio_logger=True ) # Track active connections and game sessions self.active_connections: Dict[str, Dict[str, Any]] = {} # Register event handlers self._register_events() def _register_events(self): """Register all Socket.IO event handlers""" @self.sio.event async def connect(sid, environ, auth): print(f"Client {sid} connected") self.active_connections[sid] = { "connected_at": datetime.utcnow(), "player_id": None, "session_id": None, "room": None } await self.sio.emit('connection_status', {'status': 'connected'}, room=sid) @self.sio.event async def disconnect(sid): print(f"Client {sid} disconnected") if sid in self.active_connections: # Leave any rooms connection = self.active_connections[sid] if connection.get('room'): await self.sio.leave_room(sid, connection['room']) # Clean up connection del self.active_connections[sid] @self.sio.event async def join_game(sid, data): """Join a game session room for real-time updates""" try: session_id = data.get('session_id') player_name = data.get('player_name') if not session_id: await self.sio.emit('error', {'message': 'Session ID required'}, room=sid) return # Join the game room room = f"game_{session_id}" await self.sio.enter_room(sid, room) # Update connection info if sid in self.active_connections: self.active_connections[sid].update({ 'session_id': session_id, 'room': room, 'player_name': player_name }) # Notify others in the room await self.sio.emit('player_joined', { 'player_name': player_name, 'session_id': session_id }, room=room, skip_sid=sid) await self.sio.emit('game_joined', { 'room': room, 'session_id': session_id }, room=sid) except Exception as e: await self.sio.emit('error', {'message': str(e)}, room=sid) @self.sio.event async def leave_game(sid, data): """Leave a game session room""" try: if sid in self.active_connections: connection = self.active_connections[sid] room = connection.get('room') player_name = connection.get('player_name') if room: await self.sio.leave_room(sid, room) # Notify others await self.sio.emit('player_left', { 'player_name': player_name }, room=room) # Update connection connection.update({ 'session_id': None, 'room': None }) except Exception as e: await self.sio.emit('error', {'message': str(e)}, room=sid) @self.sio.event async def subscribe_leaderboard(sid): """Subscribe to real-time leaderboard updates""" try: await self.sio.enter_room(sid, 'leaderboard') await self.sio.emit('leaderboard_subscribed', {}, room=sid) # Send current leaderboard leaderboard = await self.get_current_leaderboard() await self.sio.emit('leaderboard_update', leaderboard, room=sid) except Exception as e: await self.sio.emit('error', {'message': str(e)}, room=sid) async def notify_answer_submitted(self, session_id: int, player_name: str, is_correct: bool, points_earned: int, total_score: int): """Notify game room about answer submission""" room = f"game_{session_id}" await self.sio.emit('answer_result', { 'player_name': player_name, 'is_correct': is_correct, 'points_earned': points_earned, 'total_score': total_score, 'timestamp': datetime.utcnow().isoformat() }, room=room) async def notify_question_changed(self, session_id: int, question_data: dict): """Notify game room about new question""" room = f"game_{session_id}" await self.sio.emit('new_question', { 'question': question_data, 'timestamp': datetime.utcnow().isoformat() }, room=room) async def notify_game_ended(self, session_id: int, final_score: int, questions_answered: int): """Notify game room about game end""" room = f"game_{session_id}" await self.sio.emit('game_ended', { 'final_score': final_score, 'questions_answered': questions_answered, 'timestamp': datetime.utcnow().isoformat() }, room=room) async def update_leaderboard(self, leaderboard_data: List[dict]): """Broadcast leaderboard update to all subscribers""" await self.sio.emit('leaderboard_update', { 'leaderboard': leaderboard_data, 'timestamp': datetime.utcnow().isoformat() }, room='leaderboard') async def notify_time_warning(self, session_id: int, time_remaining: int): """Notify about time warnings""" room = f"game_{session_id}" await self.sio.emit('time_warning', { 'time_remaining': time_remaining, 'message': f'⏰ {time_remaining // 60} minutes remaining!', 'timestamp': datetime.utcnow().isoformat() }, room=room) async def get_current_leaderboard(self): """Get current leaderboard from Redis cache or database""" try: # Try to get from Redis cache first cached_leaderboard = self.redis_client.get("leaderboard") if cached_leaderboard: return json.loads(cached_leaderboard) # Fallback to empty leaderboard return [] except Exception as e: print(f"Error getting leaderboard: {e}") return [] async def cache_leaderboard(self, leaderboard_data: List[dict]): """Cache leaderboard in Redis""" try: self.redis_client.setex( "leaderboard", 300, # 5 minutes TTL json.dumps(leaderboard_data) ) except Exception as e: print(f"Error caching leaderboard: {e}") def get_connection_stats(self): """Get statistics about active connections""" return { 'total_connections': len(self.active_connections), 'active_games': len([c for c in self.active_connections.values() if c.get('session_id')]), 'leaderboard_subscribers': len(self.sio.manager.get_participants('/', 'leaderboard')) } async def broadcast_system_message(self, message: str, message_type: str = 'info'): """Broadcast system message to all connected clients""" await self.sio.emit('system_message', { 'message': message, 'type': message_type, 'timestamp': datetime.utcnow().isoformat() }) # Global socket manager instance socket_manager = SocketManager()