import pytest from fastapi import Depends, FastAPI from fastapi.responses import RedirectResponse from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import StaticPool from app.core.auth import create_access_token from app.core.database import Base, get_db from app.modules.auth.dependencies import ( RequiresLoginException, check_permission, get_current_user_optional, require_permission, ) from app.modules.auth.models import User from app.modules.auth.service import hash_password # --- check_permission: pure unit tests, no DB needed --- def test_public_allows_anonymous(): assert check_permission(None, ["public"]) is True def test_public_allows_logged_in(): user = User(username="u", full_name="U", is_admin=False) assert check_permission(user, ["public"]) is True def test_authenticated_blocks_anonymous(): assert check_permission(None, ["authenticated"]) is False def test_authenticated_allows_user(): user = User(username="u", full_name="U") assert check_permission(user, ["authenticated"]) is True def test_admin_blocks_regular_user(): user = User(username="u", full_name="U", is_admin=False) assert check_permission(user, ["admin"]) is False def test_admin_allows_admin(): user = User(username="u", full_name="U", is_admin=True) assert check_permission(user, ["admin"]) is True def test_or_logic_grants_access_on_any_match(): regular = User(username="u", full_name="U", is_admin=False) assert check_permission(regular, ["admin", "authenticated"]) is True def test_empty_permissions_blocks_all(): user = User(username="u", full_name="U") assert check_permission(user, []) is False # --- get_current_user_optional and require_permission: integration tests --- @pytest.fixture def db(): engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(bind=engine) Session = sessionmaker(bind=engine) session = Session() yield session session.close() Base.metadata.drop_all(bind=engine) @pytest.fixture def test_app(db): app = FastAPI() @app.exception_handler(RequiresLoginException) async def handle(request, exc): return RedirectResponse(url="/auth/login", status_code=307) @app.get("/public") async def public_route(user=Depends(get_current_user_optional)): return {"user": user.username if user else None} @app.get("/members") async def members_route(user=Depends(require_permission(["authenticated"]))): return {"user": user.username if user else None} @app.get("/admin") async def admin_route(user=Depends(require_permission(["admin"]))): return {"user": user.username if user else None} app.dependency_overrides[get_db] = lambda: db return app @pytest.fixture def client(test_app): return TestClient(test_app, follow_redirects=False) def _cookie(username: str, is_admin: bool) -> dict: return {"access_token": create_access_token(username=username, is_admin=is_admin)} def test_optional_returns_none_without_cookie(client): response = client.get("/public") assert response.status_code == 200 assert response.json()["user"] is None def test_optional_returns_user_with_valid_cookie(client, db): db.add(User(username="alice", full_name="Alice", pw_hash=hash_password("pw"))) db.commit() response = client.get("/public", cookies=_cookie("alice", False)) assert response.status_code == 200 assert response.json()["user"] == "alice" def test_require_authenticated_redirects_anonymous(client): response = client.get("/members") assert response.status_code == 307 assert "/auth/login" in response.headers["location"] def test_require_authenticated_allows_logged_in(client, db): db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw"))) db.commit() response = client.get("/members", cookies=_cookie("bob", False)) assert response.status_code == 200 def test_require_admin_redirects_regular_user(client, db): db.add(User(username="reg", full_name="Reg", is_admin=False, pw_hash=hash_password("pw"))) db.commit() response = client.get("/admin", cookies=_cookie("reg", False)) assert response.status_code == 307 def test_require_admin_allows_admin(client, db): db.add(User(username="adm", full_name="Adm", is_admin=True, pw_hash=hash_password("pw"))) db.commit() response = client.get("/admin", cookies=_cookie("adm", True)) assert response.status_code == 200