package ent import ( "context" "errors" "strconv" "strings" "time" "github.com/google/uuid" "github.com/jackc/pgx/v5" domain "knowfoolery/backend/services/leaderboard-service/internal/domain/leaderboard" sharedtypes "knowfoolery/backend/shared/domain/types" ) // LeaderboardRepository implements leaderboard persistence. type LeaderboardRepository struct { client *Client } // NewLeaderboardRepository creates leaderboard repository. func NewLeaderboardRepository(client *Client) *LeaderboardRepository { return &LeaderboardRepository{client: client} } // EnsureSchema creates required tables and indexes. func (r *LeaderboardRepository) EnsureSchema(ctx context.Context) error { const ddl = ` CREATE TABLE IF NOT EXISTS leaderboard_entries ( id UUID PRIMARY KEY, session_id VARCHAR(64) NOT NULL UNIQUE, player_id VARCHAR(128) NOT NULL, player_name VARCHAR(50) NOT NULL, score INT NOT NULL, questions_asked INT NOT NULL, questions_correct INT NOT NULL, hints_used INT NOT NULL DEFAULT 0, duration_seconds INT NOT NULL, success_rate NUMERIC(5,2) NOT NULL, completion_type VARCHAR(16) NOT NULL, completed_at TIMESTAMPTZ NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_lb_entries_rank_order ON leaderboard_entries (score DESC, duration_seconds ASC, completed_at ASC); CREATE INDEX IF NOT EXISTS idx_lb_entries_player_time ON leaderboard_entries (player_id, completed_at DESC); CREATE INDEX IF NOT EXISTS idx_lb_entries_completion_time ON leaderboard_entries (completion_type, completed_at DESC); CREATE INDEX IF NOT EXISTS idx_lb_entries_created ON leaderboard_entries (created_at DESC); CREATE TABLE IF NOT EXISTS leaderboard_player_stats ( player_id VARCHAR(128) PRIMARY KEY, player_name VARCHAR(50) NOT NULL, games_played INT NOT NULL DEFAULT 0, games_completed INT NOT NULL DEFAULT 0, total_score BIGINT NOT NULL DEFAULT 0, best_score INT NOT NULL DEFAULT 0, avg_score NUMERIC(10,2) NOT NULL DEFAULT 0, avg_success_rate NUMERIC(5,2) NOT NULL DEFAULT 0, total_questions BIGINT NOT NULL DEFAULT 0, total_correct BIGINT NOT NULL DEFAULT 0, best_duration_seconds INT NULL, last_played_at TIMESTAMPTZ NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_lb_player_rank_order ON leaderboard_player_stats (best_score DESC, best_duration_seconds ASC, last_played_at ASC); CREATE INDEX IF NOT EXISTS idx_lb_player_last_played ON leaderboard_player_stats (last_played_at DESC); ` _, err := r.client.Pool.Exec(ctx, ddl) return err } // IngestEntry inserts one new entry and upserts player stats atomically. func (r *LeaderboardRepository) IngestEntry( ctx context.Context, entry *domain.LeaderboardEntry, ) (*domain.LeaderboardEntry, bool, error) { tx, err := r.client.Pool.Begin(ctx) if err != nil { return nil, false, err } defer func() { _ = tx.Rollback(ctx) }() existing, getErr := r.getEntryBySessionIDTx(ctx, tx, entry.SessionID) if getErr == nil { if commitErr := tx.Commit(ctx); commitErr != nil { return nil, false, commitErr } return existing, true, nil } if !errors.Is(getErr, pgx.ErrNoRows) { return nil, false, getErr } inserted, err := r.insertEntryTx(ctx, tx, entry) if err != nil { return nil, false, err } if err := r.upsertPlayerStatsTx(ctx, tx, inserted); err != nil { return nil, false, err } if err := tx.Commit(ctx); err != nil { return nil, false, err } return inserted, false, nil } // ListTop lists top entries for given filter. func (r *LeaderboardRepository) ListTop( ctx context.Context, filter domain.TopFilter, limit int, ) ([]*domain.LeaderboardEntry, error) { if limit <= 0 { limit = 10 } where, args := buildFilterWhere(filter) args = append(args, limit) q := ` SELECT id, session_id, player_id, player_name, score, questions_asked, questions_correct, hints_used, success_rate, duration_seconds, completion_type, completed_at, created_at FROM leaderboard_entries ` + where + ` ORDER BY score DESC, duration_seconds ASC, completed_at ASC LIMIT $` + strconvI(len(args)) rows, err := r.client.Pool.Query(ctx, q, args...) if err != nil { return nil, err } defer rows.Close() items := make([]*domain.LeaderboardEntry, 0) for rows.Next() { item, scanErr := scanEntry(rows) if scanErr != nil { return nil, scanErr } items = append(items, item) } return items, rows.Err() } // GetPlayerStats returns one player's aggregate stats. func (r *LeaderboardRepository) GetPlayerStats(ctx context.Context, playerID string) (*domain.PlayerStats, error) { const q = ` SELECT player_id, player_name, games_played, games_completed, total_score, best_score, avg_score, avg_success_rate, total_questions, total_correct, best_duration_seconds, last_played_at, updated_at FROM leaderboard_player_stats WHERE player_id=$1` row := r.client.Pool.QueryRow(ctx, q, playerID) stats, err := scanPlayerStats(row) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return nil, domain.ErrPlayerNotFound } return nil, err } return stats, nil } // GetPlayerRank computes current rank for one player. func (r *LeaderboardRepository) GetPlayerRank(ctx context.Context, playerID string) (int64, error) { const q = ` WITH ranked AS ( SELECT player_id, RANK() OVER ( ORDER BY best_score DESC, best_duration_seconds ASC NULLS LAST, last_played_at ASC NULLS LAST ) AS rank_value FROM leaderboard_player_stats ) SELECT rank_value FROM ranked WHERE player_id=$1` row := r.client.Pool.QueryRow(ctx, q, playerID) var rank int64 if err := row.Scan(&rank); err != nil { if errors.Is(err, pgx.ErrNoRows) { return 0, domain.ErrPlayerNotFound } return 0, err } return rank, nil } // ListPlayerHistory returns paginated history for one player. func (r *LeaderboardRepository) ListPlayerHistory( ctx context.Context, playerID string, pagination sharedtypes.Pagination, ) ([]*domain.LeaderboardEntry, int64, error) { pagination.Normalize() const qCount = `SELECT COUNT(1) FROM leaderboard_entries WHERE player_id=$1` var total int64 if err := r.client.Pool.QueryRow(ctx, qCount, playerID).Scan(&total); err != nil { return nil, 0, err } const q = ` SELECT id, session_id, player_id, player_name, score, questions_asked, questions_correct, hints_used, success_rate, duration_seconds, completion_type, completed_at, created_at FROM leaderboard_entries WHERE player_id=$1 ORDER BY completed_at DESC LIMIT $2 OFFSET $3` rows, err := r.client.Pool.Query(ctx, q, playerID, pagination.Limit(), pagination.Offset()) if err != nil { return nil, 0, err } defer rows.Close() items := make([]*domain.LeaderboardEntry, 0) for rows.Next() { item, scanErr := scanEntry(rows) if scanErr != nil { return nil, 0, scanErr } items = append(items, item) } return items, total, rows.Err() } // GetGlobalStats computes global leaderboard statistics. func (r *LeaderboardRepository) GetGlobalStats( ctx context.Context, filter domain.TopFilter, ) (*domain.GlobalStats, error) { where, args := buildFilterWhere(filter) q := ` SELECT COUNT(1) AS total_games, COUNT(DISTINCT player_id) AS total_players, COALESCE(AVG(score), 0) AS avg_score, COALESCE(AVG(success_rate), 0) AS avg_success_rate, COALESCE(MAX(score), 0) AS max_score, COALESCE(percentile_cont(0.5) WITHIN GROUP (ORDER BY score), 0) AS score_p50, COALESCE(percentile_cont(0.9) WITHIN GROUP (ORDER BY score), 0) AS score_p90, COALESCE(percentile_cont(0.99) WITHIN GROUP (ORDER BY score), 0) AS score_p99 FROM leaderboard_entries ` + where row := r.client.Pool.QueryRow(ctx, q, args...) stats := &domain.GlobalStats{UpdatedAt: time.Now().UTC()} if err := row.Scan( &stats.TotalGames, &stats.TotalPlayers, &stats.AvgScore, &stats.AvgSuccessRate, &stats.MaxScore, &stats.ScoreP50, &stats.ScoreP90, &stats.ScoreP99, ); err != nil { return nil, err } return stats, nil } func (r *LeaderboardRepository) getEntryBySessionIDTx( ctx context.Context, tx pgx.Tx, sessionID string, ) (*domain.LeaderboardEntry, error) { const q = ` SELECT id, session_id, player_id, player_name, score, questions_asked, questions_correct, hints_used, success_rate, duration_seconds, completion_type, completed_at, created_at FROM leaderboard_entries WHERE session_id=$1` return scanEntry(tx.QueryRow(ctx, q, sessionID)) } func (r *LeaderboardRepository) insertEntryTx( ctx context.Context, tx pgx.Tx, entry *domain.LeaderboardEntry, ) (*domain.LeaderboardEntry, error) { const q = ` INSERT INTO leaderboard_entries ( id, session_id, player_id, player_name, score, questions_asked, questions_correct, hints_used, success_rate, duration_seconds, completion_type, completed_at, created_at ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,NOW()) RETURNING id, session_id, player_id, player_name, score, questions_asked, questions_correct, hints_used, success_rate, duration_seconds, completion_type, completed_at, created_at` row := tx.QueryRow(ctx, q, uuid.NewString(), entry.SessionID, entry.PlayerID, entry.PlayerName, entry.Score, entry.QuestionsAsked, entry.QuestionsCorrect, entry.HintsUsed, entry.SuccessRate, entry.DurationSeconds, string(entry.CompletionType), entry.CompletedAt, ) return scanEntry(row) } func (r *LeaderboardRepository) upsertPlayerStatsTx( ctx context.Context, tx pgx.Tx, entry *domain.LeaderboardEntry, ) error { const q = ` INSERT INTO leaderboard_player_stats ( player_id, player_name, games_played, games_completed, total_score, best_score, avg_score, avg_success_rate, total_questions, total_correct, best_duration_seconds, last_played_at, updated_at ) VALUES ( $1, $2, 1, CASE WHEN $3='completed' THEN 1 ELSE 0 END, $4::bigint, ($4::bigint)::int, ($4::bigint)::numeric, $5, $6, $7, $8, $9, NOW() ) ON CONFLICT (player_id) DO UPDATE SET player_name = EXCLUDED.player_name, games_played = leaderboard_player_stats.games_played + 1, games_completed = leaderboard_player_stats.games_completed + CASE WHEN EXCLUDED.games_completed > 0 THEN 1 ELSE 0 END, total_score = leaderboard_player_stats.total_score + EXCLUDED.total_score, best_score = GREATEST(leaderboard_player_stats.best_score, EXCLUDED.best_score), avg_score = ( (leaderboard_player_stats.total_score + EXCLUDED.total_score)::numeric / (leaderboard_player_stats.games_played + 1)::numeric ), total_questions = leaderboard_player_stats.total_questions + EXCLUDED.total_questions, total_correct = leaderboard_player_stats.total_correct + EXCLUDED.total_correct, avg_success_rate = CASE WHEN (leaderboard_player_stats.total_questions + EXCLUDED.total_questions) = 0 THEN 0 ELSE ( (leaderboard_player_stats.total_correct + EXCLUDED.total_correct)::numeric * 100 / (leaderboard_player_stats.total_questions + EXCLUDED.total_questions)::numeric ) END, best_duration_seconds = CASE WHEN EXCLUDED.best_score > leaderboard_player_stats.best_score THEN EXCLUDED.best_duration_seconds WHEN EXCLUDED.best_score = leaderboard_player_stats.best_score THEN LEAST( COALESCE(leaderboard_player_stats.best_duration_seconds, EXCLUDED.best_duration_seconds), EXCLUDED.best_duration_seconds ) ELSE leaderboard_player_stats.best_duration_seconds END, last_played_at = GREATEST( COALESCE(leaderboard_player_stats.last_played_at, EXCLUDED.last_played_at), EXCLUDED.last_played_at ), updated_at = NOW()` _, err := tx.Exec(ctx, q, entry.PlayerID, entry.PlayerName, string(entry.CompletionType), entry.Score, entry.SuccessRate, entry.QuestionsAsked, entry.QuestionsCorrect, entry.DurationSeconds, entry.CompletedAt, ) return err } func buildFilterWhere(filter domain.TopFilter) (string, []any) { parts := make([]string, 0, 2) args := make([]any, 0, 2) if strings.TrimSpace(filter.CompletionType) != "" { args = append(args, filter.CompletionType) parts = append(parts, "completion_type=$"+strconvI(len(args))) } switch filter.Window { case domain.WindowAll: // no time filter case domain.Window24h: args = append(args, time.Now().UTC().Add(-24*time.Hour)) parts = append(parts, "completed_at>=$"+strconvI(len(args))) case domain.Window7d: args = append(args, time.Now().UTC().Add(-7*24*time.Hour)) parts = append(parts, "completed_at>=$"+strconvI(len(args))) case domain.Window30d: args = append(args, time.Now().UTC().Add(-30*24*time.Hour)) parts = append(parts, "completed_at>=$"+strconvI(len(args))) } if len(parts) == 0 { return "", args } return "WHERE " + strings.Join(parts, " AND "), args } func scanEntry(scanner interface { Scan(dest ...interface{}) error }) (*domain.LeaderboardEntry, error) { var entry domain.LeaderboardEntry var completionType string if err := scanner.Scan( &entry.ID, &entry.SessionID, &entry.PlayerID, &entry.PlayerName, &entry.Score, &entry.QuestionsAsked, &entry.QuestionsCorrect, &entry.HintsUsed, &entry.SuccessRate, &entry.DurationSeconds, &completionType, &entry.CompletedAt, &entry.CreatedAt, ); err != nil { return nil, err } entry.CompletionType = domain.CompletionType(completionType) return &entry, nil } func scanPlayerStats(scanner interface { Scan(dest ...interface{}) error }) (*domain.PlayerStats, error) { var stats domain.PlayerStats if err := scanner.Scan( &stats.PlayerID, &stats.PlayerName, &stats.GamesPlayed, &stats.GamesCompleted, &stats.TotalScore, &stats.BestScore, &stats.AvgScore, &stats.AvgSuccessRate, &stats.TotalQuestions, &stats.TotalCorrect, &stats.BestDurationSec, &stats.LastPlayedAt, &stats.UpdatedAt, ); err != nil { return nil, err } return &stats, nil } func strconvI(n int) string { return strconv.FormatInt(int64(n), 10) } var _ domain.Repository = (*LeaderboardRepository)(nil)