You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

429 lines
10 KiB
Go

package proxy
import (
"bytes"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/gofiber/fiber/v3"
"github.com/knowfoolery/backend/services/gateway-service/config"
)
type Manager struct {
services map[string]*ServiceProxy
client *http.Client
config config.ServicesConfig
mutex sync.RWMutex
}
type ServiceProxy struct {
Name string
Config config.ServiceConfig
BaseURL *url.URL
CircuitBreaker *CircuitBreaker
Stats *ProxyStats
}
type ProxyStats struct {
TotalRequests int64
SuccessRequests int64
FailedRequests int64
TotalLatency time.Duration
AverageLatency time.Duration
LastRequest time.Time
mutex sync.RWMutex
}
type CircuitBreaker struct {
failures int
lastFailTime time.Time
state CircuitState
config config.CircuitBreakerConfig
mutex sync.RWMutex
}
type CircuitState string
const (
CircuitStateClosed CircuitState = "closed"
CircuitStateOpen CircuitState = "open"
CircuitStateHalfOpen CircuitState = "half_open"
)
func NewManager(cfg config.ServicesConfig) *Manager {
services := make(map[string]*ServiceProxy)
client := &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
},
}
services["game-service"] = NewServiceProxy("game-service", cfg.GameService, cfg.CircuitBreaker)
services["question-service"] = NewServiceProxy("question-service", cfg.QuestionService, cfg.CircuitBreaker)
services["user-service"] = NewServiceProxy("user-service", cfg.UserService, cfg.CircuitBreaker)
services["leaderboard-service"] = NewServiceProxy("leaderboard-service", cfg.LeaderboardService, cfg.CircuitBreaker)
services["session-service"] = NewServiceProxy("session-service", cfg.SessionService, cfg.CircuitBreaker)
services["admin-service"] = NewServiceProxy("admin-service", cfg.AdminService, cfg.CircuitBreaker)
return &Manager{
services: services,
client: client,
config: cfg,
}
}
func NewServiceProxy(name string, cfg config.ServiceConfig, cbCfg config.CircuitBreakerConfig) *ServiceProxy {
baseURL, _ := url.Parse(cfg.URL)
return &ServiceProxy{
Name: name,
Config: cfg,
BaseURL: baseURL,
CircuitBreaker: &CircuitBreaker{
config: cbCfg,
state: CircuitStateClosed,
},
Stats: &ProxyStats{},
}
}
func (m *Manager) ProxyToService(serviceName string) fiber.Handler {
return func(c fiber.Ctx) error {
m.mutex.RLock()
service, exists := m.services[serviceName]
m.mutex.RUnlock()
if !exists {
return c.Status(fiber.StatusBadGateway).JSON(fiber.Map{
"error": "Service not found",
"service": serviceName,
})
}
if !service.CircuitBreaker.CanRequest() {
return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{
"error": "Service circuit breaker is open",
"service": serviceName,
"state": service.CircuitBreaker.GetState(),
})
}
return m.proxyRequest(c, service)
}
}
func (m *Manager) proxyRequest(c fiber.Ctx, service *ServiceProxy) error {
start := time.Now()
targetURL := m.buildTargetURL(c, service)
req, err := m.createRequest(c, targetURL)
if err != nil {
service.CircuitBreaker.RecordFailure()
service.Stats.RecordFailure(time.Since(start))
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to create request: %v", err),
})
}
m.setProxyHeaders(c, req)
var resp *http.Response
var reqErr error
for attempt := 0; attempt <= service.Config.RetryAttempts; attempt++ {
if attempt > 0 {
time.Sleep(service.Config.RetryDelay)
}
resp, reqErr = m.client.Do(req)
if reqErr == nil {
break
}
if attempt == service.Config.RetryAttempts {
break
}
if req.Body != nil {
if seeker, ok := req.Body.(io.Seeker); ok {
seeker.Seek(0, io.SeekStart)
}
}
}
duration := time.Since(start)
if reqErr != nil {
service.CircuitBreaker.RecordFailure()
service.Stats.RecordFailure(duration)
return c.Status(fiber.StatusBadGateway).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to proxy request: %v", reqErr),
"service": service.Name,
})
}
defer resp.Body.Close()
if resp.StatusCode >= 500 {
service.CircuitBreaker.RecordFailure()
service.Stats.RecordFailure(duration)
} else {
service.CircuitBreaker.RecordSuccess()
service.Stats.RecordSuccess(duration)
}
return m.copyResponse(c, resp)
}
func (m *Manager) buildTargetURL(c fiber.Ctx, service *ServiceProxy) string {
path := c.Path()
servicePath := m.getServicePath(path, service.Name)
targetURL := service.BaseURL.String() + servicePath
if queryString := string(c.Request().URI().QueryString()); queryString != "" {
targetURL += "?" + queryString
}
return targetURL
}
func (m *Manager) getServicePath(path, serviceName string) string {
prefix := "/api/v1/"
if !strings.HasPrefix(path, prefix) {
return path
}
pathWithoutPrefix := path[len(prefix):]
var servicePrefix string
switch serviceName {
case "game-service":
servicePrefix = "game/"
case "question-service":
servicePrefix = "questions/"
case "user-service":
servicePrefix = "users/"
case "leaderboard-service":
servicePrefix = "leaderboard/"
case "session-service":
servicePrefix = "sessions/"
case "admin-service":
servicePrefix = "admin/"
default:
return path
}
if strings.HasPrefix(pathWithoutPrefix, servicePrefix) {
return "/" + pathWithoutPrefix[len(servicePrefix):]
}
return path
}
func (m *Manager) createRequest(c fiber.Ctx, targetURL string) (*http.Request, error) {
var body io.Reader
if c.Request().Body() != nil && len(c.Request().Body()) > 0 {
body = bytes.NewReader(c.Request().Body())
}
req, err := http.NewRequest(c.Method(), targetURL, body)
if err != nil {
return nil, err
}
req = req.WithContext(c.UserContext())
return req, nil
}
func (m *Manager) setProxyHeaders(c fiber.Ctx, req *http.Request) {
c.Request().Header.VisitAll(func(key, value []byte) {
keyStr := string(key)
valueStr := string(value)
switch keyStr {
case "Host", "Connection", "Transfer-Encoding", "Upgrade", "Proxy-Connection":
return
default:
req.Header.Set(keyStr, valueStr)
}
})
req.Header.Set("X-Forwarded-For", c.IP())
req.Header.Set("X-Forwarded-Proto", c.Protocol())
req.Header.Set("X-Forwarded-Host", c.Hostname())
req.Header.Set("X-Gateway-Request-ID", c.Get("X-Request-ID", ""))
if userID := c.Get("X-User-ID"); userID != "" {
req.Header.Set("X-User-ID", userID)
}
if userEmail := c.Get("X-User-Email"); userEmail != "" {
req.Header.Set("X-User-Email", userEmail)
}
if userRoles := c.Get("X-User-Roles"); userRoles != "" {
req.Header.Set("X-User-Roles", userRoles)
}
}
func (m *Manager) copyResponse(c fiber.Ctx, resp *http.Response) error {
c.Status(resp.StatusCode)
for key, values := range resp.Header {
switch key {
case "Connection", "Transfer-Encoding", "Upgrade":
continue
default:
for _, value := range values {
c.Set(key, value)
}
}
}
_, err := io.Copy(c.Response().BodyWriter(), resp.Body)
return err
}
func (cb *CircuitBreaker) CanRequest() bool {
if !cb.config.Enabled {
return true
}
cb.mutex.RLock()
defer cb.mutex.RUnlock()
switch cb.state {
case CircuitStateClosed:
return true
case CircuitStateOpen:
return time.Since(cb.lastFailTime) > cb.config.Timeout
case CircuitStateHalfOpen:
return true
default:
return true
}
}
func (cb *CircuitBreaker) RecordSuccess() {
if !cb.config.Enabled {
return
}
cb.mutex.Lock()
defer cb.mutex.Unlock()
cb.failures = 0
if cb.state == CircuitStateHalfOpen {
cb.state = CircuitStateClosed
}
}
func (cb *CircuitBreaker) RecordFailure() {
if !cb.config.Enabled {
return
}
cb.mutex.Lock()
defer cb.mutex.Unlock()
cb.failures++
cb.lastFailTime = time.Now()
if cb.failures >= cb.config.Threshold {
cb.state = CircuitStateOpen
} else if cb.state == CircuitStateOpen && time.Since(cb.lastFailTime) > cb.config.Timeout {
cb.state = CircuitStateHalfOpen
}
}
func (cb *CircuitBreaker) GetState() CircuitState {
cb.mutex.RLock()
defer cb.mutex.RUnlock()
if cb.state == CircuitStateOpen && time.Since(cb.lastFailTime) > cb.config.Timeout {
cb.state = CircuitStateHalfOpen
}
return cb.state
}
func (ps *ProxyStats) RecordSuccess(duration time.Duration) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
ps.TotalRequests++
ps.SuccessRequests++
ps.TotalLatency += duration
ps.AverageLatency = ps.TotalLatency / time.Duration(ps.TotalRequests)
ps.LastRequest = time.Now()
}
func (ps *ProxyStats) RecordFailure(duration time.Duration) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
ps.TotalRequests++
ps.FailedRequests++
ps.TotalLatency += duration
ps.AverageLatency = ps.TotalLatency / time.Duration(ps.TotalRequests)
ps.LastRequest = time.Now()
}
func (ps *ProxyStats) GetStats() map[string]interface{} {
ps.mutex.RLock()
defer ps.mutex.RUnlock()
successRate := float64(0)
if ps.TotalRequests > 0 {
successRate = float64(ps.SuccessRequests) / float64(ps.TotalRequests) * 100
}
return map[string]interface{}{
"total_requests": ps.TotalRequests,
"success_requests": ps.SuccessRequests,
"failed_requests": ps.FailedRequests,
"success_rate": fmt.Sprintf("%.2f%%", successRate),
"average_latency": ps.AverageLatency.String(),
"total_latency": ps.TotalLatency.String(),
"last_request": ps.LastRequest,
}
}
func (m *Manager) GetServiceStats() map[string]interface{} {
m.mutex.RLock()
defer m.mutex.RUnlock()
stats := make(map[string]interface{})
for name, service := range m.services {
stats[name] = map[string]interface{}{
"proxy_stats": service.Stats.GetStats(),
"circuit_breaker": map[string]interface{}{
"state": service.CircuitBreaker.GetState(),
"enabled": service.CircuitBreaker.config.Enabled,
"failures": service.CircuitBreaker.failures,
},
"config": map[string]interface{}{
"url": service.Config.URL,
"timeout": service.Config.Timeout.String(),
"retry_attempts": service.Config.RetryAttempts,
"retry_delay": service.Config.RetryDelay.String(),
},
}
}
return stats
}