From 6e7a3160914e2edc14189142e73388f44c01ae7c Mon Sep 17 00:00:00 2001 From: Oliver Hofmann Date: Mon, 27 Apr 2026 13:44:11 +0200 Subject: [PATCH] feat: add check_permission, get_current_user_optional, require_permission --- app/modules/auth/dependencies.py | 40 +++++++++ tests/test_permissions.py | 149 +++++++++++++++++++++++++++++++ 2 files changed, 189 insertions(+) create mode 100644 tests/test_permissions.py diff --git a/app/modules/auth/dependencies.py b/app/modules/auth/dependencies.py index 0deddbc..bedc267 100644 --- a/app/modules/auth/dependencies.py +++ b/app/modules/auth/dependencies.py @@ -1,3 +1,5 @@ +from typing import Callable + from fastapi import Depends, HTTPException, Request from sqlalchemy.orm import Session @@ -11,6 +13,44 @@ class RequiresLoginException(Exception): pass +def check_permission(user: User | None, permissions: list[str]) -> bool: + if "public" in permissions: + return True + if user is None: + return False + if "authenticated" in permissions: + return True + if "admin" in permissions and user.is_admin: + return True + return False # group membership check: Part 2 + + +async def get_current_user_optional( + request: Request, db: Session = Depends(get_db) +) -> User | None: + token = get_token_from_request(request) + if not token: + return None + payload = decode_token(token) + if not payload: + return None + user = get_user(db, payload.get("sub", "")) + if user is None or not user.is_active: + return None + return user + + +def require_permission(permissions: list[str]) -> Callable: + async def _dep( + request: Request, db: Session = Depends(get_db) + ) -> User | None: + user = await get_current_user_optional(request, db) + if not check_permission(user, permissions): + raise RequiresLoginException() + return user + return _dep + + async def get_current_user( request: Request, db: Session = Depends(get_db) ) -> User: diff --git a/tests/test_permissions.py b/tests/test_permissions.py new file mode 100644 index 0000000..3480b1a --- /dev/null +++ b/tests/test_permissions.py @@ -0,0 +1,149 @@ +import pytest +from fastapi import Depends, FastAPI +from fastapi.responses import RedirectResponse +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import StaticPool + +from app.core.auth import create_access_token +from app.core.database import Base, get_db +from app.modules.auth.dependencies import ( + RequiresLoginException, + check_permission, + get_current_user_optional, + require_permission, +) +from app.modules.auth.models import User +from app.modules.auth.service import hash_password + + +# --- check_permission: pure unit tests, no DB needed --- + +def test_public_allows_anonymous(): + assert check_permission(None, ["public"]) is True + + +def test_public_allows_logged_in(): + user = User(username="u", full_name="U", is_admin=False) + assert check_permission(user, ["public"]) is True + + +def test_authenticated_blocks_anonymous(): + assert check_permission(None, ["authenticated"]) is False + + +def test_authenticated_allows_user(): + user = User(username="u", full_name="U") + assert check_permission(user, ["authenticated"]) is True + + +def test_admin_blocks_regular_user(): + user = User(username="u", full_name="U", is_admin=False) + assert check_permission(user, ["admin"]) is False + + +def test_admin_allows_admin(): + user = User(username="u", full_name="U", is_admin=True) + assert check_permission(user, ["admin"]) is True + + +def test_or_logic_grants_access_on_any_match(): + regular = User(username="u", full_name="U", is_admin=False) + assert check_permission(regular, ["admin", "authenticated"]) is True + + +def test_empty_permissions_blocks_all(): + user = User(username="u", full_name="U") + assert check_permission(user, []) is False + + +# --- get_current_user_optional and require_permission: integration tests --- + +@pytest.fixture +def db(): + engine = create_engine( + "sqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + Base.metadata.create_all(bind=engine) + Session = sessionmaker(bind=engine) + session = Session() + yield session + session.close() + Base.metadata.drop_all(bind=engine) + + +@pytest.fixture +def test_app(db): + app = FastAPI() + + @app.exception_handler(RequiresLoginException) + async def handle(request, exc): + return RedirectResponse(url="/auth/login", status_code=307) + + @app.get("/public") + async def public_route(user=Depends(get_current_user_optional)): + return {"user": user.username if user else None} + + @app.get("/members") + async def members_route(user=Depends(require_permission(["authenticated"]))): + return {"user": user.username if user else None} + + @app.get("/admin") + async def admin_route(user=Depends(require_permission(["admin"]))): + return {"user": user.username if user else None} + + app.dependency_overrides[get_db] = lambda: db + return app + + +@pytest.fixture +def client(test_app): + return TestClient(test_app, follow_redirects=False) + + +def _cookie(username: str, is_admin: bool) -> dict: + return {"access_token": create_access_token(username=username, is_admin=is_admin)} + + +def test_optional_returns_none_without_cookie(client): + response = client.get("/public") + assert response.status_code == 200 + assert response.json()["user"] is None + + +def test_optional_returns_user_with_valid_cookie(client, db): + db.add(User(username="alice", full_name="Alice", pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/public", cookies=_cookie("alice", False)) + assert response.status_code == 200 + assert response.json()["user"] == "alice" + + +def test_require_authenticated_redirects_anonymous(client): + response = client.get("/members") + assert response.status_code == 307 + assert "/auth/login" in response.headers["location"] + + +def test_require_authenticated_allows_logged_in(client, db): + db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/members", cookies=_cookie("bob", False)) + assert response.status_code == 200 + + +def test_require_admin_redirects_regular_user(client, db): + db.add(User(username="reg", full_name="Reg", is_admin=False, pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/admin", cookies=_cookie("reg", False)) + assert response.status_code == 307 + + +def test_require_admin_allows_admin(client, db): + db.add(User(username="adm", full_name="Adm", is_admin=True, pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/admin", cookies=_cookie("adm", True)) + assert response.status_code == 200