efihub/tests/test_permissions.py

150 lines
4.6 KiB
Python

import pytest
from fastapi import Depends, FastAPI
from fastapi.responses import RedirectResponse
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from app.core.auth import create_access_token
from app.core.database import Base, get_db
from app.modules.auth.dependencies import (
RequiresLoginException,
check_permission,
get_current_user_optional,
require_permission,
)
from app.modules.auth.models import User
from app.modules.auth.service import hash_password
# --- check_permission: pure unit tests, no DB needed ---
def test_public_allows_anonymous():
assert check_permission(None, ["public"]) is True
def test_public_allows_logged_in():
user = User(username="u", full_name="U", is_admin=False)
assert check_permission(user, ["public"]) is True
def test_authenticated_blocks_anonymous():
assert check_permission(None, ["authenticated"]) is False
def test_authenticated_allows_user():
user = User(username="u", full_name="U")
assert check_permission(user, ["authenticated"]) is True
def test_admin_blocks_regular_user():
user = User(username="u", full_name="U", is_admin=False)
assert check_permission(user, ["admin"]) is False
def test_admin_allows_admin():
user = User(username="u", full_name="U", is_admin=True)
assert check_permission(user, ["admin"]) is True
def test_or_logic_grants_access_on_any_match():
regular = User(username="u", full_name="U", is_admin=False)
assert check_permission(regular, ["admin", "authenticated"]) is True
def test_empty_permissions_blocks_all():
user = User(username="u", full_name="U")
assert check_permission(user, []) is False
# --- get_current_user_optional and require_permission: integration tests ---
@pytest.fixture
def db():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def test_app(db):
app = FastAPI()
@app.exception_handler(RequiresLoginException)
async def handle(request, exc):
return RedirectResponse(url="/auth/login", status_code=307)
@app.get("/public")
async def public_route(user=Depends(get_current_user_optional)):
return {"user": user.username if user else None}
@app.get("/members")
async def members_route(user=Depends(require_permission(["authenticated"]))):
return {"user": user.username if user else None}
@app.get("/admin")
async def admin_route(user=Depends(require_permission(["admin"]))):
return {"user": user.username if user else None}
app.dependency_overrides[get_db] = lambda: db
return app
@pytest.fixture
def client(test_app):
return TestClient(test_app, follow_redirects=False)
def _cookie(username: str, is_admin: bool) -> dict:
return {"access_token": create_access_token(username=username, is_admin=is_admin)}
def test_optional_returns_none_without_cookie(client):
response = client.get("/public")
assert response.status_code == 200
assert response.json()["user"] is None
def test_optional_returns_user_with_valid_cookie(client, db):
db.add(User(username="alice", full_name="Alice", pw_hash=hash_password("pw")))
db.commit()
response = client.get("/public", cookies=_cookie("alice", False))
assert response.status_code == 200
assert response.json()["user"] == "alice"
def test_require_authenticated_redirects_anonymous(client):
response = client.get("/members")
assert response.status_code == 307
assert "/auth/login" in response.headers["location"]
def test_require_authenticated_allows_logged_in(client, db):
db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw")))
db.commit()
response = client.get("/members", cookies=_cookie("bob", False))
assert response.status_code == 200
def test_require_admin_redirects_regular_user(client, db):
db.add(User(username="reg", full_name="Reg", is_admin=False, pw_hash=hash_password("pw")))
db.commit()
response = client.get("/admin", cookies=_cookie("reg", False))
assert response.status_code == 307
def test_require_admin_allows_admin(client, db):
db.add(User(username="adm", full_name="Adm", is_admin=True, pw_hash=hash_password("pw")))
db.commit()
response = client.get("/admin", cookies=_cookie("adm", True))
assert response.status_code == 200