You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
429 lines
10 KiB
Go
429 lines
10 KiB
Go
package proxy
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gofiber/fiber/v3"
|
|
|
|
"github.com/knowfoolery/backend/services/gateway-service/config"
|
|
)
|
|
|
|
type Manager struct {
|
|
services map[string]*ServiceProxy
|
|
client *http.Client
|
|
config config.ServicesConfig
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
type ServiceProxy struct {
|
|
Name string
|
|
Config config.ServiceConfig
|
|
BaseURL *url.URL
|
|
CircuitBreaker *CircuitBreaker
|
|
Stats *ProxyStats
|
|
}
|
|
|
|
type ProxyStats struct {
|
|
TotalRequests int64
|
|
SuccessRequests int64
|
|
FailedRequests int64
|
|
TotalLatency time.Duration
|
|
AverageLatency time.Duration
|
|
LastRequest time.Time
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
type CircuitBreaker struct {
|
|
failures int
|
|
lastFailTime time.Time
|
|
state CircuitState
|
|
config config.CircuitBreakerConfig
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
type CircuitState string
|
|
|
|
const (
|
|
CircuitStateClosed CircuitState = "closed"
|
|
CircuitStateOpen CircuitState = "open"
|
|
CircuitStateHalfOpen CircuitState = "half_open"
|
|
)
|
|
|
|
func NewManager(cfg config.ServicesConfig) *Manager {
|
|
services := make(map[string]*ServiceProxy)
|
|
|
|
client := &http.Client{
|
|
Timeout: 30 * time.Second,
|
|
Transport: &http.Transport{
|
|
MaxIdleConns: 100,
|
|
MaxIdleConnsPerHost: 10,
|
|
IdleConnTimeout: 30 * time.Second,
|
|
},
|
|
}
|
|
|
|
services["game-service"] = NewServiceProxy("game-service", cfg.GameService, cfg.CircuitBreaker)
|
|
services["question-service"] = NewServiceProxy("question-service", cfg.QuestionService, cfg.CircuitBreaker)
|
|
services["user-service"] = NewServiceProxy("user-service", cfg.UserService, cfg.CircuitBreaker)
|
|
services["leaderboard-service"] = NewServiceProxy("leaderboard-service", cfg.LeaderboardService, cfg.CircuitBreaker)
|
|
services["session-service"] = NewServiceProxy("session-service", cfg.SessionService, cfg.CircuitBreaker)
|
|
services["admin-service"] = NewServiceProxy("admin-service", cfg.AdminService, cfg.CircuitBreaker)
|
|
|
|
return &Manager{
|
|
services: services,
|
|
client: client,
|
|
config: cfg,
|
|
}
|
|
}
|
|
|
|
func NewServiceProxy(name string, cfg config.ServiceConfig, cbCfg config.CircuitBreakerConfig) *ServiceProxy {
|
|
baseURL, _ := url.Parse(cfg.URL)
|
|
|
|
return &ServiceProxy{
|
|
Name: name,
|
|
Config: cfg,
|
|
BaseURL: baseURL,
|
|
CircuitBreaker: &CircuitBreaker{
|
|
config: cbCfg,
|
|
state: CircuitStateClosed,
|
|
},
|
|
Stats: &ProxyStats{},
|
|
}
|
|
}
|
|
|
|
func (m *Manager) ProxyToService(serviceName string) fiber.Handler {
|
|
return func(c fiber.Ctx) error {
|
|
m.mutex.RLock()
|
|
service, exists := m.services[serviceName]
|
|
m.mutex.RUnlock()
|
|
|
|
if !exists {
|
|
return c.Status(fiber.StatusBadGateway).JSON(fiber.Map{
|
|
"error": "Service not found",
|
|
"service": serviceName,
|
|
})
|
|
}
|
|
|
|
if !service.CircuitBreaker.CanRequest() {
|
|
return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{
|
|
"error": "Service circuit breaker is open",
|
|
"service": serviceName,
|
|
"state": service.CircuitBreaker.GetState(),
|
|
})
|
|
}
|
|
|
|
return m.proxyRequest(c, service)
|
|
}
|
|
}
|
|
|
|
func (m *Manager) proxyRequest(c fiber.Ctx, service *ServiceProxy) error {
|
|
start := time.Now()
|
|
|
|
targetURL := m.buildTargetURL(c, service)
|
|
|
|
req, err := m.createRequest(c, targetURL)
|
|
if err != nil {
|
|
service.CircuitBreaker.RecordFailure()
|
|
service.Stats.RecordFailure(time.Since(start))
|
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to create request: %v", err),
|
|
})
|
|
}
|
|
|
|
m.setProxyHeaders(c, req)
|
|
|
|
var resp *http.Response
|
|
var reqErr error
|
|
|
|
for attempt := 0; attempt <= service.Config.RetryAttempts; attempt++ {
|
|
if attempt > 0 {
|
|
time.Sleep(service.Config.RetryDelay)
|
|
}
|
|
|
|
resp, reqErr = m.client.Do(req)
|
|
if reqErr == nil {
|
|
break
|
|
}
|
|
|
|
if attempt == service.Config.RetryAttempts {
|
|
break
|
|
}
|
|
|
|
if req.Body != nil {
|
|
if seeker, ok := req.Body.(io.Seeker); ok {
|
|
seeker.Seek(0, io.SeekStart)
|
|
}
|
|
}
|
|
}
|
|
|
|
duration := time.Since(start)
|
|
|
|
if reqErr != nil {
|
|
service.CircuitBreaker.RecordFailure()
|
|
service.Stats.RecordFailure(duration)
|
|
return c.Status(fiber.StatusBadGateway).JSON(fiber.Map{
|
|
"error": fmt.Sprintf("Failed to proxy request: %v", reqErr),
|
|
"service": service.Name,
|
|
})
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode >= 500 {
|
|
service.CircuitBreaker.RecordFailure()
|
|
service.Stats.RecordFailure(duration)
|
|
} else {
|
|
service.CircuitBreaker.RecordSuccess()
|
|
service.Stats.RecordSuccess(duration)
|
|
}
|
|
|
|
return m.copyResponse(c, resp)
|
|
}
|
|
|
|
func (m *Manager) buildTargetURL(c fiber.Ctx, service *ServiceProxy) string {
|
|
path := c.Path()
|
|
|
|
servicePath := m.getServicePath(path, service.Name)
|
|
|
|
targetURL := service.BaseURL.String() + servicePath
|
|
|
|
if queryString := string(c.Request().URI().QueryString()); queryString != "" {
|
|
targetURL += "?" + queryString
|
|
}
|
|
|
|
return targetURL
|
|
}
|
|
|
|
func (m *Manager) getServicePath(path, serviceName string) string {
|
|
prefix := "/api/v1/"
|
|
if !strings.HasPrefix(path, prefix) {
|
|
return path
|
|
}
|
|
|
|
pathWithoutPrefix := path[len(prefix):]
|
|
|
|
var servicePrefix string
|
|
switch serviceName {
|
|
case "game-service":
|
|
servicePrefix = "game/"
|
|
case "question-service":
|
|
servicePrefix = "questions/"
|
|
case "user-service":
|
|
servicePrefix = "users/"
|
|
case "leaderboard-service":
|
|
servicePrefix = "leaderboard/"
|
|
case "session-service":
|
|
servicePrefix = "sessions/"
|
|
case "admin-service":
|
|
servicePrefix = "admin/"
|
|
default:
|
|
return path
|
|
}
|
|
|
|
if strings.HasPrefix(pathWithoutPrefix, servicePrefix) {
|
|
return "/" + pathWithoutPrefix[len(servicePrefix):]
|
|
}
|
|
|
|
return path
|
|
}
|
|
|
|
func (m *Manager) createRequest(c fiber.Ctx, targetURL string) (*http.Request, error) {
|
|
var body io.Reader
|
|
if c.Request().Body() != nil && len(c.Request().Body()) > 0 {
|
|
body = bytes.NewReader(c.Request().Body())
|
|
}
|
|
|
|
req, err := http.NewRequest(c.Method(), targetURL, body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req = req.WithContext(c.UserContext())
|
|
|
|
return req, nil
|
|
}
|
|
|
|
func (m *Manager) setProxyHeaders(c fiber.Ctx, req *http.Request) {
|
|
c.Request().Header.VisitAll(func(key, value []byte) {
|
|
keyStr := string(key)
|
|
valueStr := string(value)
|
|
|
|
switch keyStr {
|
|
case "Host", "Connection", "Transfer-Encoding", "Upgrade", "Proxy-Connection":
|
|
return
|
|
default:
|
|
req.Header.Set(keyStr, valueStr)
|
|
}
|
|
})
|
|
|
|
req.Header.Set("X-Forwarded-For", c.IP())
|
|
req.Header.Set("X-Forwarded-Proto", c.Protocol())
|
|
req.Header.Set("X-Forwarded-Host", c.Hostname())
|
|
req.Header.Set("X-Gateway-Request-ID", c.Get("X-Request-ID", ""))
|
|
|
|
if userID := c.Get("X-User-ID"); userID != "" {
|
|
req.Header.Set("X-User-ID", userID)
|
|
}
|
|
if userEmail := c.Get("X-User-Email"); userEmail != "" {
|
|
req.Header.Set("X-User-Email", userEmail)
|
|
}
|
|
if userRoles := c.Get("X-User-Roles"); userRoles != "" {
|
|
req.Header.Set("X-User-Roles", userRoles)
|
|
}
|
|
}
|
|
|
|
func (m *Manager) copyResponse(c fiber.Ctx, resp *http.Response) error {
|
|
c.Status(resp.StatusCode)
|
|
|
|
for key, values := range resp.Header {
|
|
switch key {
|
|
case "Connection", "Transfer-Encoding", "Upgrade":
|
|
continue
|
|
default:
|
|
for _, value := range values {
|
|
c.Set(key, value)
|
|
}
|
|
}
|
|
}
|
|
|
|
_, err := io.Copy(c.Response().BodyWriter(), resp.Body)
|
|
return err
|
|
}
|
|
|
|
func (cb *CircuitBreaker) CanRequest() bool {
|
|
if !cb.config.Enabled {
|
|
return true
|
|
}
|
|
|
|
cb.mutex.RLock()
|
|
defer cb.mutex.RUnlock()
|
|
|
|
switch cb.state {
|
|
case CircuitStateClosed:
|
|
return true
|
|
case CircuitStateOpen:
|
|
return time.Since(cb.lastFailTime) > cb.config.Timeout
|
|
case CircuitStateHalfOpen:
|
|
return true
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
func (cb *CircuitBreaker) RecordSuccess() {
|
|
if !cb.config.Enabled {
|
|
return
|
|
}
|
|
|
|
cb.mutex.Lock()
|
|
defer cb.mutex.Unlock()
|
|
|
|
cb.failures = 0
|
|
|
|
if cb.state == CircuitStateHalfOpen {
|
|
cb.state = CircuitStateClosed
|
|
}
|
|
}
|
|
|
|
func (cb *CircuitBreaker) RecordFailure() {
|
|
if !cb.config.Enabled {
|
|
return
|
|
}
|
|
|
|
cb.mutex.Lock()
|
|
defer cb.mutex.Unlock()
|
|
|
|
cb.failures++
|
|
cb.lastFailTime = time.Now()
|
|
|
|
if cb.failures >= cb.config.Threshold {
|
|
cb.state = CircuitStateOpen
|
|
} else if cb.state == CircuitStateOpen && time.Since(cb.lastFailTime) > cb.config.Timeout {
|
|
cb.state = CircuitStateHalfOpen
|
|
}
|
|
}
|
|
|
|
func (cb *CircuitBreaker) GetState() CircuitState {
|
|
cb.mutex.RLock()
|
|
defer cb.mutex.RUnlock()
|
|
|
|
if cb.state == CircuitStateOpen && time.Since(cb.lastFailTime) > cb.config.Timeout {
|
|
cb.state = CircuitStateHalfOpen
|
|
}
|
|
|
|
return cb.state
|
|
}
|
|
|
|
func (ps *ProxyStats) RecordSuccess(duration time.Duration) {
|
|
ps.mutex.Lock()
|
|
defer ps.mutex.Unlock()
|
|
|
|
ps.TotalRequests++
|
|
ps.SuccessRequests++
|
|
ps.TotalLatency += duration
|
|
ps.AverageLatency = ps.TotalLatency / time.Duration(ps.TotalRequests)
|
|
ps.LastRequest = time.Now()
|
|
}
|
|
|
|
func (ps *ProxyStats) RecordFailure(duration time.Duration) {
|
|
ps.mutex.Lock()
|
|
defer ps.mutex.Unlock()
|
|
|
|
ps.TotalRequests++
|
|
ps.FailedRequests++
|
|
ps.TotalLatency += duration
|
|
ps.AverageLatency = ps.TotalLatency / time.Duration(ps.TotalRequests)
|
|
ps.LastRequest = time.Now()
|
|
}
|
|
|
|
func (ps *ProxyStats) GetStats() map[string]interface{} {
|
|
ps.mutex.RLock()
|
|
defer ps.mutex.RUnlock()
|
|
|
|
successRate := float64(0)
|
|
if ps.TotalRequests > 0 {
|
|
successRate = float64(ps.SuccessRequests) / float64(ps.TotalRequests) * 100
|
|
}
|
|
|
|
return map[string]interface{}{
|
|
"total_requests": ps.TotalRequests,
|
|
"success_requests": ps.SuccessRequests,
|
|
"failed_requests": ps.FailedRequests,
|
|
"success_rate": fmt.Sprintf("%.2f%%", successRate),
|
|
"average_latency": ps.AverageLatency.String(),
|
|
"total_latency": ps.TotalLatency.String(),
|
|
"last_request": ps.LastRequest,
|
|
}
|
|
}
|
|
|
|
func (m *Manager) GetServiceStats() map[string]interface{} {
|
|
m.mutex.RLock()
|
|
defer m.mutex.RUnlock()
|
|
|
|
stats := make(map[string]interface{})
|
|
|
|
for name, service := range m.services {
|
|
stats[name] = map[string]interface{}{
|
|
"proxy_stats": service.Stats.GetStats(),
|
|
"circuit_breaker": map[string]interface{}{
|
|
"state": service.CircuitBreaker.GetState(),
|
|
"enabled": service.CircuitBreaker.config.Enabled,
|
|
"failures": service.CircuitBreaker.failures,
|
|
},
|
|
"config": map[string]interface{}{
|
|
"url": service.Config.URL,
|
|
"timeout": service.Config.Timeout.String(),
|
|
"retry_attempts": service.Config.RetryAttempts,
|
|
"retry_delay": service.Config.RetryDelay.String(),
|
|
},
|
|
}
|
|
}
|
|
|
|
return stats
|
|
} |