You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

219 lines
8.5 KiB
Python

import socketio
import redis
import json
import asyncio
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
class SocketManager:
def __init__(self, redis_url: str = "redis://redis:6379"):
# Create Socket.IO server with Redis adapter for scaling
self.redis_client = redis.Redis.from_url(redis_url, decode_responses=True)
# Initialize Socket.IO with CORS
self.sio = socketio.AsyncServer(
cors_allowed_origins="*",
async_mode="asgi",
logger=True,
engineio_logger=True
)
# Track active connections and game sessions
self.active_connections: Dict[str, Dict[str, Any]] = {}
# Register event handlers
self._register_events()
def _register_events(self):
"""Register all Socket.IO event handlers"""
@self.sio.event
async def connect(sid, environ, auth):
print(f"Client {sid} connected")
self.active_connections[sid] = {
"connected_at": datetime.utcnow(),
"player_id": None,
"session_id": None,
"room": None
}
await self.sio.emit('connection_status', {'status': 'connected'}, room=sid)
@self.sio.event
async def disconnect(sid):
print(f"Client {sid} disconnected")
if sid in self.active_connections:
# Leave any rooms
connection = self.active_connections[sid]
if connection.get('room'):
await self.sio.leave_room(sid, connection['room'])
# Clean up connection
del self.active_connections[sid]
@self.sio.event
async def join_game(sid, data):
"""Join a game session room for real-time updates"""
try:
session_id = data.get('session_id')
player_name = data.get('player_name')
if not session_id:
await self.sio.emit('error', {'message': 'Session ID required'}, room=sid)
return
# Join the game room
room = f"game_{session_id}"
await self.sio.enter_room(sid, room)
# Update connection info
if sid in self.active_connections:
self.active_connections[sid].update({
'session_id': session_id,
'room': room,
'player_name': player_name
})
# Notify others in the room
await self.sio.emit('player_joined', {
'player_name': player_name,
'session_id': session_id
}, room=room, skip_sid=sid)
await self.sio.emit('game_joined', {
'room': room,
'session_id': session_id
}, room=sid)
except Exception as e:
await self.sio.emit('error', {'message': str(e)}, room=sid)
@self.sio.event
async def leave_game(sid, data):
"""Leave a game session room"""
try:
if sid in self.active_connections:
connection = self.active_connections[sid]
room = connection.get('room')
player_name = connection.get('player_name')
if room:
await self.sio.leave_room(sid, room)
# Notify others
await self.sio.emit('player_left', {
'player_name': player_name
}, room=room)
# Update connection
connection.update({
'session_id': None,
'room': None
})
except Exception as e:
await self.sio.emit('error', {'message': str(e)}, room=sid)
@self.sio.event
async def subscribe_leaderboard(sid):
"""Subscribe to real-time leaderboard updates"""
try:
await self.sio.enter_room(sid, 'leaderboard')
await self.sio.emit('leaderboard_subscribed', {}, room=sid)
# Send current leaderboard
leaderboard = await self.get_current_leaderboard()
await self.sio.emit('leaderboard_update', leaderboard, room=sid)
except Exception as e:
await self.sio.emit('error', {'message': str(e)}, room=sid)
async def notify_answer_submitted(self, session_id: int, player_name: str,
is_correct: bool, points_earned: int, total_score: int):
"""Notify game room about answer submission"""
room = f"game_{session_id}"
await self.sio.emit('answer_result', {
'player_name': player_name,
'is_correct': is_correct,
'points_earned': points_earned,
'total_score': total_score,
'timestamp': datetime.utcnow().isoformat()
}, room=room)
async def notify_question_changed(self, session_id: int, question_data: dict):
"""Notify game room about new question"""
room = f"game_{session_id}"
await self.sio.emit('new_question', {
'question': question_data,
'timestamp': datetime.utcnow().isoformat()
}, room=room)
async def notify_game_ended(self, session_id: int, final_score: int,
questions_answered: int):
"""Notify game room about game end"""
room = f"game_{session_id}"
await self.sio.emit('game_ended', {
'final_score': final_score,
'questions_answered': questions_answered,
'timestamp': datetime.utcnow().isoformat()
}, room=room)
async def update_leaderboard(self, leaderboard_data: List[dict]):
"""Broadcast leaderboard update to all subscribers"""
await self.sio.emit('leaderboard_update', {
'leaderboard': leaderboard_data,
'timestamp': datetime.utcnow().isoformat()
}, room='leaderboard')
async def notify_time_warning(self, session_id: int, time_remaining: int):
"""Notify about time warnings"""
room = f"game_{session_id}"
await self.sio.emit('time_warning', {
'time_remaining': time_remaining,
'message': f'{time_remaining // 60} minutes remaining!',
'timestamp': datetime.utcnow().isoformat()
}, room=room)
async def get_current_leaderboard(self):
"""Get current leaderboard from Redis cache or database"""
try:
# Try to get from Redis cache first
cached_leaderboard = self.redis_client.get("leaderboard")
if cached_leaderboard:
return json.loads(cached_leaderboard)
# Fallback to empty leaderboard
return []
except Exception as e:
print(f"Error getting leaderboard: {e}")
return []
async def cache_leaderboard(self, leaderboard_data: List[dict]):
"""Cache leaderboard in Redis"""
try:
self.redis_client.setex(
"leaderboard",
300, # 5 minutes TTL
json.dumps(leaderboard_data)
)
except Exception as e:
print(f"Error caching leaderboard: {e}")
def get_connection_stats(self):
"""Get statistics about active connections"""
return {
'total_connections': len(self.active_connections),
'active_games': len([c for c in self.active_connections.values() if c.get('session_id')]),
'leaderboard_subscribers': len(self.sio.manager.get_participants('/', 'leaderboard'))
}
async def broadcast_system_message(self, message: str, message_type: str = 'info'):
"""Broadcast system message to all connected clients"""
await self.sio.emit('system_message', {
'message': message,
'type': message_type,
'timestamp': datetime.utcnow().isoformat()
})
# Global socket manager instance
socket_manager = SocketManager()