From 7c9c9e106a5f7a19049dfd60814d1a52f8d27a1a Mon Sep 17 00:00:00 2001 From: Oliver Hofmann Date: Mon, 27 Apr 2026 09:51:16 +0200 Subject: [PATCH] feat: add get_current_user and require_admin dependencies --- app/modules/auth/dependencies.py | 32 +++++++++++ tests/test_auth_dependencies.py | 91 ++++++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+) create mode 100644 app/modules/auth/dependencies.py create mode 100644 tests/test_auth_dependencies.py diff --git a/app/modules/auth/dependencies.py b/app/modules/auth/dependencies.py new file mode 100644 index 0000000..0deddbc --- /dev/null +++ b/app/modules/auth/dependencies.py @@ -0,0 +1,32 @@ +from fastapi import Depends, HTTPException, Request +from sqlalchemy.orm import Session + +from app.core.auth import decode_token, get_token_from_request +from app.core.database import get_db +from app.modules.auth.models import User +from app.modules.auth.service import get_user + + +class RequiresLoginException(Exception): + pass + + +async def get_current_user( + request: Request, db: Session = Depends(get_db) +) -> User: + token = get_token_from_request(request) + if not token: + raise RequiresLoginException() + payload = decode_token(token) + if not payload: + raise RequiresLoginException() + user = get_user(db, payload.get("sub", "")) + if user is None or not user.is_active: + raise RequiresLoginException() + return user + + +async def require_admin(user: User = Depends(get_current_user)) -> User: + if not user.is_admin: + raise HTTPException(status_code=403, detail="Admin access required") + return user diff --git a/tests/test_auth_dependencies.py b/tests/test_auth_dependencies.py new file mode 100644 index 0000000..6f48038 --- /dev/null +++ b/tests/test_auth_dependencies.py @@ -0,0 +1,91 @@ +import pytest +from fastapi import Depends, FastAPI +from fastapi.responses import RedirectResponse +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import StaticPool + +from app.core.auth import create_access_token +from app.core.database import Base, get_db +from app.modules.auth.dependencies import RequiresLoginException, get_current_user, require_admin +from app.modules.auth.models import User +from app.modules.auth.service import hash_password + + +@pytest.fixture +def db(): + engine = create_engine( + "sqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + Base.metadata.create_all(bind=engine) + Session = sessionmaker(bind=engine) + session = Session() + yield session + session.close() + Base.metadata.drop_all(bind=engine) + + +@pytest.fixture +def test_app(db): + app = FastAPI() + + @app.exception_handler(RequiresLoginException) + async def handle_login_required(request, exc): + return RedirectResponse(url="/auth/login", status_code=307) + + @app.get("/protected") + async def protected(user=Depends(get_current_user)): + return {"username": user.username} + + @app.get("/admin-only") + async def admin_only(user=Depends(require_admin)): + return {"username": user.username} + + app.dependency_overrides[get_db] = lambda: db + return app + + +@pytest.fixture +def client(test_app): + return TestClient(test_app, follow_redirects=False) + + +def _token_cookie(username: str, is_admin: bool) -> dict: + token = create_access_token(username=username, is_admin=is_admin) + return {"access_token": token} + + +def test_no_cookie_redirects_to_login(client): + response = client.get("/protected") + assert response.status_code == 307 + assert "/auth/login" in response.headers["location"] + + +def test_valid_token_returns_user(client, db): + db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/protected", cookies=_token_cookie("bob", False)) + assert response.status_code == 200 + assert response.json()["username"] == "bob" + + +def test_invalid_token_redirects(client): + response = client.get("/protected", cookies={"access_token": "invalid"}) + assert response.status_code == 307 + + +def test_require_admin_blocks_non_admin(client, db): + db.add(User(username="regular", full_name="R", is_admin=False, pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/admin-only", cookies=_token_cookie("regular", False)) + assert response.status_code == 403 + + +def test_require_admin_allows_admin(client, db): + db.add(User(username="admin", full_name="A", is_admin=True, pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/admin-only", cookies=_token_cookie("admin", True)) + assert response.status_code == 200