You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
4.6 KiB
Python

from __future__ import annotations
import base64
from dataclasses import replace
import hashlib
import hmac
import json
import time
import pytest
from fastapi import HTTPException
from app.config import get_settings
from app.security.auth import AuthBackend
def _make_settings(**overrides: object):
return replace(get_settings(), **overrides)
def _build_jwt(secret: str, claims: dict[str, object]) -> str:
header = {"alg": "HS256", "typ": "JWT"}
header_b64 = _b64url_json(header)
payload_b64 = _b64url_json(claims)
signing_input = f"{header_b64}.{payload_b64}".encode("utf-8")
signature = hmac.new(secret.encode("utf-8"), signing_input, hashlib.sha256).digest()
signature_b64 = base64.urlsafe_b64encode(signature).decode("utf-8").rstrip("=")
return f"{header_b64}.{payload_b64}.{signature_b64}"
def _b64url_json(value: dict[str, object]) -> str:
raw = json.dumps(value, separators=(",", ":"), sort_keys=True).encode("utf-8")
return base64.urlsafe_b64encode(raw).decode("utf-8").rstrip("=")
def test_auth_backend_api_key_mode_accepts_x_api_key() -> None:
settings = _make_settings(
auth_mode="api_key",
agent_api_key="test-api-key",
auth_jwt_secret="",
)
backend = AuthBackend(settings=settings)
context = backend.authenticate(
x_api_key="test-api-key",
authorization=None,
required_scopes={"availability:read"},
)
assert context.auth_type == "api_key"
assert context.subject == "api-key"
assert "*" in context.scopes
def test_auth_backend_api_key_mode_rejects_invalid_key() -> None:
settings = _make_settings(
auth_mode="api_key",
agent_api_key="expected",
auth_jwt_secret="",
)
backend = AuthBackend(settings=settings)
with pytest.raises(HTTPException) as exc_info:
backend.authenticate(
x_api_key="wrong",
authorization=None,
required_scopes={"availability:read"},
)
assert exc_info.value.status_code == 401
assert str(exc_info.value.detail) == "Invalid API key."
def test_auth_backend_jwt_mode_validates_scope_and_claims() -> None:
secret = "jwt-secret"
settings = _make_settings(
auth_mode="jwt",
auth_jwt_secret=secret,
auth_jwt_issuer="https://issuer.example",
auth_jwt_audience="personal-agent",
agent_api_key="",
)
backend = AuthBackend(settings=settings)
token = _build_jwt(
secret=secret,
claims={
"sub": "agent-123",
"iss": "https://issuer.example",
"aud": "personal-agent",
"scope": "availability:read unsubscribe:read",
"exp": int(time.time()) + 3600,
},
)
context = backend.authenticate(
x_api_key=None,
authorization=f"Bearer {token}",
required_scopes={"availability:read"},
)
assert context.auth_type == "jwt"
assert context.subject == "agent-123"
assert "availability:read" in context.scopes
def test_auth_backend_jwt_mode_rejects_missing_scope() -> None:
secret = "jwt-secret"
settings = _make_settings(
auth_mode="jwt",
auth_jwt_secret=secret,
auth_jwt_issuer=None,
auth_jwt_audience=None,
agent_api_key="",
)
backend = AuthBackend(settings=settings)
token = _build_jwt(
secret=secret,
claims={
"sub": "agent-123",
"scope": "unsubscribe:read",
"exp": int(time.time()) + 3600,
},
)
with pytest.raises(HTTPException) as exc_info:
backend.authenticate(
x_api_key=None,
authorization=f"Bearer {token}",
required_scopes={"availability:read"},
)
assert exc_info.value.status_code == 403
assert str(exc_info.value.detail) == "Missing required scope."
def test_auth_backend_hybrid_mode_uses_jwt_when_api_key_missing() -> None:
secret = "jwt-secret"
settings = _make_settings(
auth_mode="hybrid",
agent_api_key="expected-api-key",
auth_jwt_secret=secret,
auth_jwt_issuer=None,
auth_jwt_audience=None,
)
backend = AuthBackend(settings=settings)
token = _build_jwt(
secret=secret,
claims={
"sub": "fallback-jwt",
"scope": "availability:read",
"exp": int(time.time()) + 3600,
},
)
context = backend.authenticate(
x_api_key=None,
authorization=f"Bearer {token}",
required_scopes={"availability:read"},
)
assert context.auth_type == "jwt"
assert context.subject == "fallback-jwt"