You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

448 lines
14 KiB
Go

package ent
import (
"context"
"errors"
"strconv"
"strings"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
domain "knowfoolery/backend/services/leaderboard-service/internal/domain/leaderboard"
sharedtypes "knowfoolery/backend/shared/domain/types"
)
// LeaderboardRepository implements leaderboard persistence.
type LeaderboardRepository struct {
client *Client
}
// NewLeaderboardRepository creates leaderboard repository.
func NewLeaderboardRepository(client *Client) *LeaderboardRepository {
return &LeaderboardRepository{client: client}
}
// EnsureSchema creates required tables and indexes.
func (r *LeaderboardRepository) EnsureSchema(ctx context.Context) error {
const ddl = `
CREATE TABLE IF NOT EXISTS leaderboard_entries (
id UUID PRIMARY KEY,
session_id VARCHAR(64) NOT NULL UNIQUE,
player_id VARCHAR(128) NOT NULL,
player_name VARCHAR(50) NOT NULL,
score INT NOT NULL,
questions_asked INT NOT NULL,
questions_correct INT NOT NULL,
hints_used INT NOT NULL DEFAULT 0,
duration_seconds INT NOT NULL,
success_rate NUMERIC(5,2) NOT NULL,
completion_type VARCHAR(16) NOT NULL,
completed_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_lb_entries_rank_order
ON leaderboard_entries (score DESC, duration_seconds ASC, completed_at ASC);
CREATE INDEX IF NOT EXISTS idx_lb_entries_player_time
ON leaderboard_entries (player_id, completed_at DESC);
CREATE INDEX IF NOT EXISTS idx_lb_entries_completion_time
ON leaderboard_entries (completion_type, completed_at DESC);
CREATE INDEX IF NOT EXISTS idx_lb_entries_created
ON leaderboard_entries (created_at DESC);
CREATE TABLE IF NOT EXISTS leaderboard_player_stats (
player_id VARCHAR(128) PRIMARY KEY,
player_name VARCHAR(50) NOT NULL,
games_played INT NOT NULL DEFAULT 0,
games_completed INT NOT NULL DEFAULT 0,
total_score BIGINT NOT NULL DEFAULT 0,
best_score INT NOT NULL DEFAULT 0,
avg_score NUMERIC(10,2) NOT NULL DEFAULT 0,
avg_success_rate NUMERIC(5,2) NOT NULL DEFAULT 0,
total_questions BIGINT NOT NULL DEFAULT 0,
total_correct BIGINT NOT NULL DEFAULT 0,
best_duration_seconds INT NULL,
last_played_at TIMESTAMPTZ NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_lb_player_rank_order
ON leaderboard_player_stats (best_score DESC, best_duration_seconds ASC, last_played_at ASC);
CREATE INDEX IF NOT EXISTS idx_lb_player_last_played
ON leaderboard_player_stats (last_played_at DESC);
`
_, err := r.client.Pool.Exec(ctx, ddl)
return err
}
// IngestEntry inserts one new entry and upserts player stats atomically.
func (r *LeaderboardRepository) IngestEntry(
ctx context.Context,
entry *domain.LeaderboardEntry,
) (*domain.LeaderboardEntry, bool, error) {
tx, err := r.client.Pool.Begin(ctx)
if err != nil {
return nil, false, err
}
defer func() { _ = tx.Rollback(ctx) }()
existing, getErr := r.getEntryBySessionIDTx(ctx, tx, entry.SessionID)
if getErr == nil {
if commitErr := tx.Commit(ctx); commitErr != nil {
return nil, false, commitErr
}
return existing, true, nil
}
if !errors.Is(getErr, pgx.ErrNoRows) {
return nil, false, getErr
}
inserted, err := r.insertEntryTx(ctx, tx, entry)
if err != nil {
return nil, false, err
}
if err := r.upsertPlayerStatsTx(ctx, tx, inserted); err != nil {
return nil, false, err
}
if err := tx.Commit(ctx); err != nil {
return nil, false, err
}
return inserted, false, nil
}
// ListTop lists top entries for given filter.
func (r *LeaderboardRepository) ListTop(
ctx context.Context,
filter domain.TopFilter,
limit int,
) ([]*domain.LeaderboardEntry, error) {
if limit <= 0 {
limit = 10
}
where, args := buildFilterWhere(filter)
args = append(args, limit)
q := `
SELECT id, session_id, player_id, player_name, score, questions_asked, questions_correct, hints_used,
success_rate, duration_seconds, completion_type, completed_at, created_at
FROM leaderboard_entries
` + where + `
ORDER BY score DESC, duration_seconds ASC, completed_at ASC
LIMIT $` + strconvI(len(args))
rows, err := r.client.Pool.Query(ctx, q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]*domain.LeaderboardEntry, 0)
for rows.Next() {
item, scanErr := scanEntry(rows)
if scanErr != nil {
return nil, scanErr
}
items = append(items, item)
}
return items, rows.Err()
}
// GetPlayerStats returns one player's aggregate stats.
func (r *LeaderboardRepository) GetPlayerStats(ctx context.Context, playerID string) (*domain.PlayerStats, error) {
const q = `
SELECT player_id, player_name, games_played, games_completed, total_score, best_score, avg_score,
avg_success_rate, total_questions, total_correct, best_duration_seconds, last_played_at, updated_at
FROM leaderboard_player_stats
WHERE player_id=$1`
row := r.client.Pool.QueryRow(ctx, q, playerID)
stats, err := scanPlayerStats(row)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, domain.ErrPlayerNotFound
}
return nil, err
}
return stats, nil
}
// GetPlayerRank computes current rank for one player.
func (r *LeaderboardRepository) GetPlayerRank(ctx context.Context, playerID string) (int64, error) {
const q = `
WITH ranked AS (
SELECT player_id,
RANK() OVER (
ORDER BY best_score DESC, best_duration_seconds ASC NULLS LAST, last_played_at ASC NULLS LAST
) AS rank_value
FROM leaderboard_player_stats
)
SELECT rank_value FROM ranked WHERE player_id=$1`
row := r.client.Pool.QueryRow(ctx, q, playerID)
var rank int64
if err := row.Scan(&rank); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return 0, domain.ErrPlayerNotFound
}
return 0, err
}
return rank, nil
}
// ListPlayerHistory returns paginated history for one player.
func (r *LeaderboardRepository) ListPlayerHistory(
ctx context.Context,
playerID string,
pagination sharedtypes.Pagination,
) ([]*domain.LeaderboardEntry, int64, error) {
pagination.Normalize()
const qCount = `SELECT COUNT(1) FROM leaderboard_entries WHERE player_id=$1`
var total int64
if err := r.client.Pool.QueryRow(ctx, qCount, playerID).Scan(&total); err != nil {
return nil, 0, err
}
const q = `
SELECT id, session_id, player_id, player_name, score, questions_asked, questions_correct, hints_used,
success_rate, duration_seconds, completion_type, completed_at, created_at
FROM leaderboard_entries
WHERE player_id=$1
ORDER BY completed_at DESC
LIMIT $2 OFFSET $3`
rows, err := r.client.Pool.Query(ctx, q, playerID, pagination.Limit(), pagination.Offset())
if err != nil {
return nil, 0, err
}
defer rows.Close()
items := make([]*domain.LeaderboardEntry, 0)
for rows.Next() {
item, scanErr := scanEntry(rows)
if scanErr != nil {
return nil, 0, scanErr
}
items = append(items, item)
}
return items, total, rows.Err()
}
// GetGlobalStats computes global leaderboard statistics.
func (r *LeaderboardRepository) GetGlobalStats(
ctx context.Context,
filter domain.TopFilter,
) (*domain.GlobalStats, error) {
where, args := buildFilterWhere(filter)
q := `
SELECT
COUNT(1) AS total_games,
COUNT(DISTINCT player_id) AS total_players,
COALESCE(AVG(score), 0) AS avg_score,
COALESCE(AVG(success_rate), 0) AS avg_success_rate,
COALESCE(MAX(score), 0) AS max_score,
COALESCE(percentile_cont(0.5) WITHIN GROUP (ORDER BY score), 0) AS score_p50,
COALESCE(percentile_cont(0.9) WITHIN GROUP (ORDER BY score), 0) AS score_p90,
COALESCE(percentile_cont(0.99) WITHIN GROUP (ORDER BY score), 0) AS score_p99
FROM leaderboard_entries
` + where
row := r.client.Pool.QueryRow(ctx, q, args...)
stats := &domain.GlobalStats{UpdatedAt: time.Now().UTC()}
if err := row.Scan(
&stats.TotalGames,
&stats.TotalPlayers,
&stats.AvgScore,
&stats.AvgSuccessRate,
&stats.MaxScore,
&stats.ScoreP50,
&stats.ScoreP90,
&stats.ScoreP99,
); err != nil {
return nil, err
}
return stats, nil
}
func (r *LeaderboardRepository) getEntryBySessionIDTx(
ctx context.Context,
tx pgx.Tx,
sessionID string,
) (*domain.LeaderboardEntry, error) {
const q = `
SELECT id, session_id, player_id, player_name, score, questions_asked, questions_correct, hints_used,
success_rate, duration_seconds, completion_type, completed_at, created_at
FROM leaderboard_entries WHERE session_id=$1`
return scanEntry(tx.QueryRow(ctx, q, sessionID))
}
func (r *LeaderboardRepository) insertEntryTx(
ctx context.Context,
tx pgx.Tx,
entry *domain.LeaderboardEntry,
) (*domain.LeaderboardEntry, error) {
const q = `
INSERT INTO leaderboard_entries (
id, session_id, player_id, player_name, score, questions_asked, questions_correct, hints_used,
success_rate, duration_seconds, completion_type, completed_at, created_at
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,NOW())
RETURNING id, session_id, player_id, player_name, score, questions_asked, questions_correct, hints_used,
success_rate, duration_seconds, completion_type, completed_at, created_at`
row := tx.QueryRow(ctx, q,
uuid.NewString(),
entry.SessionID,
entry.PlayerID,
entry.PlayerName,
entry.Score,
entry.QuestionsAsked,
entry.QuestionsCorrect,
entry.HintsUsed,
entry.SuccessRate,
entry.DurationSeconds,
string(entry.CompletionType),
entry.CompletedAt,
)
return scanEntry(row)
}
func (r *LeaderboardRepository) upsertPlayerStatsTx(
ctx context.Context,
tx pgx.Tx,
entry *domain.LeaderboardEntry,
) error {
const q = `
INSERT INTO leaderboard_player_stats (
player_id, player_name, games_played, games_completed, total_score, best_score, avg_score,
avg_success_rate, total_questions, total_correct, best_duration_seconds, last_played_at, updated_at
) VALUES (
$1, $2, 1, CASE WHEN $3='completed' THEN 1 ELSE 0 END, $4::bigint, ($4::bigint)::int,
($4::bigint)::numeric, $5, $6, $7, $8, $9, NOW()
)
ON CONFLICT (player_id)
DO UPDATE SET
player_name = EXCLUDED.player_name,
games_played = leaderboard_player_stats.games_played + 1,
games_completed = leaderboard_player_stats.games_completed
+ CASE WHEN EXCLUDED.games_completed > 0 THEN 1 ELSE 0 END,
total_score = leaderboard_player_stats.total_score + EXCLUDED.total_score,
best_score = GREATEST(leaderboard_player_stats.best_score, EXCLUDED.best_score),
avg_score = (
(leaderboard_player_stats.total_score + EXCLUDED.total_score)::numeric
/ (leaderboard_player_stats.games_played + 1)::numeric
),
total_questions = leaderboard_player_stats.total_questions + EXCLUDED.total_questions,
total_correct = leaderboard_player_stats.total_correct + EXCLUDED.total_correct,
avg_success_rate = CASE
WHEN (leaderboard_player_stats.total_questions + EXCLUDED.total_questions) = 0 THEN 0
ELSE (
(leaderboard_player_stats.total_correct + EXCLUDED.total_correct)::numeric * 100
/ (leaderboard_player_stats.total_questions + EXCLUDED.total_questions)::numeric
)
END,
best_duration_seconds = CASE
WHEN EXCLUDED.best_score > leaderboard_player_stats.best_score THEN EXCLUDED.best_duration_seconds
WHEN EXCLUDED.best_score = leaderboard_player_stats.best_score THEN LEAST(
COALESCE(leaderboard_player_stats.best_duration_seconds, EXCLUDED.best_duration_seconds),
EXCLUDED.best_duration_seconds
)
ELSE leaderboard_player_stats.best_duration_seconds
END,
last_played_at = GREATEST(
COALESCE(leaderboard_player_stats.last_played_at, EXCLUDED.last_played_at),
EXCLUDED.last_played_at
),
updated_at = NOW()`
_, err := tx.Exec(ctx, q,
entry.PlayerID,
entry.PlayerName,
string(entry.CompletionType),
entry.Score,
entry.SuccessRate,
entry.QuestionsAsked,
entry.QuestionsCorrect,
entry.DurationSeconds,
entry.CompletedAt,
)
return err
}
func buildFilterWhere(filter domain.TopFilter) (string, []any) {
parts := make([]string, 0, 2)
args := make([]any, 0, 2)
if strings.TrimSpace(filter.CompletionType) != "" {
args = append(args, filter.CompletionType)
parts = append(parts, "completion_type=$"+strconvI(len(args)))
}
switch filter.Window {
case domain.WindowAll:
// no time filter
case domain.Window24h:
args = append(args, time.Now().UTC().Add(-24*time.Hour))
parts = append(parts, "completed_at>=$"+strconvI(len(args)))
case domain.Window7d:
args = append(args, time.Now().UTC().Add(-7*24*time.Hour))
parts = append(parts, "completed_at>=$"+strconvI(len(args)))
case domain.Window30d:
args = append(args, time.Now().UTC().Add(-30*24*time.Hour))
parts = append(parts, "completed_at>=$"+strconvI(len(args)))
}
if len(parts) == 0 {
return "", args
}
return "WHERE " + strings.Join(parts, " AND "), args
}
func scanEntry(scanner interface {
Scan(dest ...interface{}) error
}) (*domain.LeaderboardEntry, error) {
var entry domain.LeaderboardEntry
var completionType string
if err := scanner.Scan(
&entry.ID,
&entry.SessionID,
&entry.PlayerID,
&entry.PlayerName,
&entry.Score,
&entry.QuestionsAsked,
&entry.QuestionsCorrect,
&entry.HintsUsed,
&entry.SuccessRate,
&entry.DurationSeconds,
&completionType,
&entry.CompletedAt,
&entry.CreatedAt,
); err != nil {
return nil, err
}
entry.CompletionType = domain.CompletionType(completionType)
return &entry, nil
}
func scanPlayerStats(scanner interface {
Scan(dest ...interface{}) error
}) (*domain.PlayerStats, error) {
var stats domain.PlayerStats
if err := scanner.Scan(
&stats.PlayerID,
&stats.PlayerName,
&stats.GamesPlayed,
&stats.GamesCompleted,
&stats.TotalScore,
&stats.BestScore,
&stats.AvgScore,
&stats.AvgSuccessRate,
&stats.TotalQuestions,
&stats.TotalCorrect,
&stats.BestDurationSec,
&stats.LastPlayedAt,
&stats.UpdatedAt,
); err != nil {
return nil, err
}
return &stats, nil
}
func strconvI(n int) string {
return strconv.FormatInt(int64(n), 10)
}
var _ domain.Repository = (*LeaderboardRepository)(nil)