You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
226 lines
6.4 KiB
Python
226 lines
6.4 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
from dataclasses import replace
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import time
|
|
|
|
import pytest
|
|
from fastapi import HTTPException
|
|
|
|
from app.config import get_settings
|
|
from app.security.auth import AuthBackend
|
|
|
|
|
|
def _make_settings(**overrides: object):
|
|
return replace(get_settings(), **overrides)
|
|
|
|
|
|
def _build_jwt(secret: str, claims: dict[str, object]) -> str:
|
|
header = {"alg": "HS256", "typ": "JWT"}
|
|
header_b64 = _b64url_json(header)
|
|
payload_b64 = _b64url_json(claims)
|
|
signing_input = f"{header_b64}.{payload_b64}".encode("utf-8")
|
|
signature = hmac.new(secret.encode("utf-8"), signing_input, hashlib.sha256).digest()
|
|
signature_b64 = base64.urlsafe_b64encode(signature).decode("utf-8").rstrip("=")
|
|
return f"{header_b64}.{payload_b64}.{signature_b64}"
|
|
|
|
|
|
def _b64url_json(value: dict[str, object]) -> str:
|
|
raw = json.dumps(value, separators=(",", ":"), sort_keys=True).encode("utf-8")
|
|
return base64.urlsafe_b64encode(raw).decode("utf-8").rstrip("=")
|
|
|
|
|
|
def test_auth_backend_api_key_mode_accepts_x_api_key() -> None:
|
|
settings = _make_settings(
|
|
auth_mode="api_key",
|
|
agent_api_key="test-api-key",
|
|
auth_jwt_secret="",
|
|
)
|
|
backend = AuthBackend(settings=settings)
|
|
|
|
context = backend.authenticate(
|
|
x_api_key="test-api-key",
|
|
authorization=None,
|
|
required_scopes={"availability:read"},
|
|
)
|
|
|
|
assert context.auth_type == "api_key"
|
|
assert context.subject == "api-key"
|
|
assert "*" in context.scopes
|
|
|
|
|
|
def test_auth_backend_api_key_mode_rejects_invalid_key() -> None:
|
|
settings = _make_settings(
|
|
auth_mode="api_key",
|
|
agent_api_key="expected",
|
|
auth_jwt_secret="",
|
|
)
|
|
backend = AuthBackend(settings=settings)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
backend.authenticate(
|
|
x_api_key="wrong",
|
|
authorization=None,
|
|
required_scopes={"availability:read"},
|
|
)
|
|
|
|
assert exc_info.value.status_code == 401
|
|
assert str(exc_info.value.detail) == "Invalid API key."
|
|
|
|
|
|
def test_auth_backend_jwt_mode_validates_scope_and_claims() -> None:
|
|
secret = "jwt-secret"
|
|
settings = _make_settings(
|
|
auth_mode="jwt",
|
|
auth_jwt_secret=secret,
|
|
auth_jwt_issuer="https://issuer.example",
|
|
auth_jwt_audience="personal-agent",
|
|
agent_api_key="",
|
|
)
|
|
backend = AuthBackend(settings=settings)
|
|
token = _build_jwt(
|
|
secret=secret,
|
|
claims={
|
|
"sub": "agent-123",
|
|
"iss": "https://issuer.example",
|
|
"aud": "personal-agent",
|
|
"scope": "availability:read unsubscribe:read",
|
|
"exp": int(time.time()) + 3600,
|
|
},
|
|
)
|
|
|
|
context = backend.authenticate(
|
|
x_api_key=None,
|
|
authorization=f"Bearer {token}",
|
|
required_scopes={"availability:read"},
|
|
)
|
|
|
|
assert context.auth_type == "jwt"
|
|
assert context.subject == "agent-123"
|
|
assert "availability:read" in context.scopes
|
|
|
|
|
|
def test_auth_backend_jwt_mode_rejects_missing_scope() -> None:
|
|
secret = "jwt-secret"
|
|
settings = _make_settings(
|
|
auth_mode="jwt",
|
|
auth_jwt_secret=secret,
|
|
auth_jwt_issuer=None,
|
|
auth_jwt_audience=None,
|
|
agent_api_key="",
|
|
)
|
|
backend = AuthBackend(settings=settings)
|
|
token = _build_jwt(
|
|
secret=secret,
|
|
claims={
|
|
"sub": "agent-123",
|
|
"scope": "unsubscribe:read",
|
|
"exp": int(time.time()) + 3600,
|
|
},
|
|
)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
backend.authenticate(
|
|
x_api_key=None,
|
|
authorization=f"Bearer {token}",
|
|
required_scopes={"availability:read"},
|
|
)
|
|
|
|
assert exc_info.value.status_code == 403
|
|
assert str(exc_info.value.detail) == "Missing required scope."
|
|
|
|
|
|
def test_auth_backend_hybrid_mode_uses_jwt_when_api_key_missing() -> None:
|
|
secret = "jwt-secret"
|
|
settings = _make_settings(
|
|
auth_mode="hybrid",
|
|
agent_api_key="expected-api-key",
|
|
auth_jwt_secret=secret,
|
|
auth_jwt_issuer=None,
|
|
auth_jwt_audience=None,
|
|
)
|
|
backend = AuthBackend(settings=settings)
|
|
token = _build_jwt(
|
|
secret=secret,
|
|
claims={
|
|
"sub": "fallback-jwt",
|
|
"scope": "availability:read",
|
|
"exp": int(time.time()) + 3600,
|
|
},
|
|
)
|
|
|
|
context = backend.authenticate(
|
|
x_api_key=None,
|
|
authorization=f"Bearer {token}",
|
|
required_scopes={"availability:read"},
|
|
)
|
|
|
|
assert context.auth_type == "jwt"
|
|
assert context.subject == "fallback-jwt"
|
|
|
|
|
|
def test_auth_backend_oauth_mode_validates_introspection_token(monkeypatch) -> None:
|
|
settings = _make_settings(
|
|
auth_mode="oauth",
|
|
agent_api_key="",
|
|
auth_jwt_secret="",
|
|
)
|
|
backend = AuthBackend(
|
|
settings=settings,
|
|
oauth_introspection_url="https://issuer.example/introspect",
|
|
oauth_issuer="https://issuer.example",
|
|
oauth_audience="personal-agent-mcp",
|
|
)
|
|
monkeypatch.setattr(
|
|
backend,
|
|
"_introspect_oauth_token",
|
|
lambda _: {
|
|
"active": True,
|
|
"sub": "oauth-agent",
|
|
"iss": "https://issuer.example",
|
|
"aud": "personal-agent-mcp",
|
|
"scope": "availability:read unsubscribe:read",
|
|
"exp": int(time.time()) + 3600,
|
|
},
|
|
)
|
|
|
|
context = backend.authenticate(
|
|
x_api_key=None,
|
|
authorization="Bearer oauth-token",
|
|
required_scopes={"availability:read"},
|
|
)
|
|
|
|
assert context.auth_type == "oauth"
|
|
assert context.subject == "oauth-agent"
|
|
assert "availability:read" in context.scopes
|
|
|
|
|
|
def test_auth_backend_oauth_mode_rejects_inactive_token(monkeypatch) -> None:
|
|
settings = _make_settings(
|
|
auth_mode="oauth",
|
|
agent_api_key="",
|
|
auth_jwt_secret="",
|
|
)
|
|
backend = AuthBackend(
|
|
settings=settings,
|
|
oauth_introspection_url="https://issuer.example/introspect",
|
|
)
|
|
monkeypatch.setattr(
|
|
backend,
|
|
"_introspect_oauth_token",
|
|
lambda _: {"active": False},
|
|
)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
backend.authenticate(
|
|
x_api_key=None,
|
|
authorization="Bearer oauth-token",
|
|
required_scopes={"availability:read"},
|
|
)
|
|
|
|
assert exc_info.value.status_code == 401
|
|
assert str(exc_info.value.detail) == "Inactive OAuth token."
|