efihub/tests/test_auth_dependencies.py

92 lines
2.9 KiB
Python

import pytest
from fastapi import Depends, FastAPI
from fastapi.responses import RedirectResponse
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from app.core.auth import create_access_token
from app.core.database import Base, get_db
from app.modules.auth.dependencies import RequiresLoginException, get_current_user, require_admin
from app.modules.auth.models import User
from app.modules.auth.service import hash_password
@pytest.fixture
def db():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def test_app(db):
app = FastAPI()
@app.exception_handler(RequiresLoginException)
async def handle_login_required(request, exc):
return RedirectResponse(url="/auth/login", status_code=307)
@app.get("/protected")
async def protected(user=Depends(get_current_user)):
return {"username": user.username}
@app.get("/admin-only")
async def admin_only(user=Depends(require_admin)):
return {"username": user.username}
app.dependency_overrides[get_db] = lambda: db
return app
@pytest.fixture
def client(test_app):
return TestClient(test_app, follow_redirects=False)
def _token_cookie(username: str, is_admin: bool) -> dict:
token = create_access_token(username=username, is_admin=is_admin)
return {"access_token": token}
def test_no_cookie_redirects_to_login(client):
response = client.get("/protected")
assert response.status_code == 307
assert "/auth/login" in response.headers["location"]
def test_valid_token_returns_user(client, db):
db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw")))
db.commit()
response = client.get("/protected", cookies=_token_cookie("bob", False))
assert response.status_code == 200
assert response.json()["username"] == "bob"
def test_invalid_token_redirects(client):
response = client.get("/protected", cookies={"access_token": "invalid"})
assert response.status_code == 307
def test_require_admin_blocks_non_admin(client, db):
db.add(User(username="regular", full_name="R", is_admin=False, pw_hash=hash_password("pw")))
db.commit()
response = client.get("/admin-only", cookies=_token_cookie("regular", False))
assert response.status_code == 403
def test_require_admin_allows_admin(client, db):
db.add(User(username="admin", full_name="A", is_admin=True, pw_hash=hash_password("pw")))
db.commit()
response = client.get("/admin-only", cookies=_token_cookie("admin", True))
assert response.status_code == 200