feat: add check_permission, get_current_user_optional, require_permission
This commit is contained in:
parent
35312c41f5
commit
6e7a316091
@ -1,3 +1,5 @@
|
|||||||
|
from typing import Callable
|
||||||
|
|
||||||
from fastapi import Depends, HTTPException, Request
|
from fastapi import Depends, HTTPException, Request
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
@ -11,6 +13,44 @@ class RequiresLoginException(Exception):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def check_permission(user: User | None, permissions: list[str]) -> bool:
|
||||||
|
if "public" in permissions:
|
||||||
|
return True
|
||||||
|
if user is None:
|
||||||
|
return False
|
||||||
|
if "authenticated" in permissions:
|
||||||
|
return True
|
||||||
|
if "admin" in permissions and user.is_admin:
|
||||||
|
return True
|
||||||
|
return False # group membership check: Part 2
|
||||||
|
|
||||||
|
|
||||||
|
async def get_current_user_optional(
|
||||||
|
request: Request, db: Session = Depends(get_db)
|
||||||
|
) -> User | None:
|
||||||
|
token = get_token_from_request(request)
|
||||||
|
if not token:
|
||||||
|
return None
|
||||||
|
payload = decode_token(token)
|
||||||
|
if not payload:
|
||||||
|
return None
|
||||||
|
user = get_user(db, payload.get("sub", ""))
|
||||||
|
if user is None or not user.is_active:
|
||||||
|
return None
|
||||||
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
def require_permission(permissions: list[str]) -> Callable:
|
||||||
|
async def _dep(
|
||||||
|
request: Request, db: Session = Depends(get_db)
|
||||||
|
) -> User | None:
|
||||||
|
user = await get_current_user_optional(request, db)
|
||||||
|
if not check_permission(user, permissions):
|
||||||
|
raise RequiresLoginException()
|
||||||
|
return user
|
||||||
|
return _dep
|
||||||
|
|
||||||
|
|
||||||
async def get_current_user(
|
async def get_current_user(
|
||||||
request: Request, db: Session = Depends(get_db)
|
request: Request, db: Session = Depends(get_db)
|
||||||
) -> User:
|
) -> User:
|
||||||
|
|||||||
149
tests/test_permissions.py
Normal file
149
tests/test_permissions.py
Normal file
@ -0,0 +1,149 @@
|
|||||||
|
import pytest
|
||||||
|
from fastapi import Depends, FastAPI
|
||||||
|
from fastapi.responses import RedirectResponse
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
from sqlalchemy.orm import sessionmaker
|
||||||
|
from sqlalchemy.pool import StaticPool
|
||||||
|
|
||||||
|
from app.core.auth import create_access_token
|
||||||
|
from app.core.database import Base, get_db
|
||||||
|
from app.modules.auth.dependencies import (
|
||||||
|
RequiresLoginException,
|
||||||
|
check_permission,
|
||||||
|
get_current_user_optional,
|
||||||
|
require_permission,
|
||||||
|
)
|
||||||
|
from app.modules.auth.models import User
|
||||||
|
from app.modules.auth.service import hash_password
|
||||||
|
|
||||||
|
|
||||||
|
# --- check_permission: pure unit tests, no DB needed ---
|
||||||
|
|
||||||
|
def test_public_allows_anonymous():
|
||||||
|
assert check_permission(None, ["public"]) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_public_allows_logged_in():
|
||||||
|
user = User(username="u", full_name="U", is_admin=False)
|
||||||
|
assert check_permission(user, ["public"]) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticated_blocks_anonymous():
|
||||||
|
assert check_permission(None, ["authenticated"]) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticated_allows_user():
|
||||||
|
user = User(username="u", full_name="U")
|
||||||
|
assert check_permission(user, ["authenticated"]) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_admin_blocks_regular_user():
|
||||||
|
user = User(username="u", full_name="U", is_admin=False)
|
||||||
|
assert check_permission(user, ["admin"]) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_admin_allows_admin():
|
||||||
|
user = User(username="u", full_name="U", is_admin=True)
|
||||||
|
assert check_permission(user, ["admin"]) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_or_logic_grants_access_on_any_match():
|
||||||
|
regular = User(username="u", full_name="U", is_admin=False)
|
||||||
|
assert check_permission(regular, ["admin", "authenticated"]) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_empty_permissions_blocks_all():
|
||||||
|
user = User(username="u", full_name="U")
|
||||||
|
assert check_permission(user, []) is False
|
||||||
|
|
||||||
|
|
||||||
|
# --- get_current_user_optional and require_permission: integration tests ---
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def db():
|
||||||
|
engine = create_engine(
|
||||||
|
"sqlite:///:memory:",
|
||||||
|
connect_args={"check_same_thread": False},
|
||||||
|
poolclass=StaticPool,
|
||||||
|
)
|
||||||
|
Base.metadata.create_all(bind=engine)
|
||||||
|
Session = sessionmaker(bind=engine)
|
||||||
|
session = Session()
|
||||||
|
yield session
|
||||||
|
session.close()
|
||||||
|
Base.metadata.drop_all(bind=engine)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def test_app(db):
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
|
@app.exception_handler(RequiresLoginException)
|
||||||
|
async def handle(request, exc):
|
||||||
|
return RedirectResponse(url="/auth/login", status_code=307)
|
||||||
|
|
||||||
|
@app.get("/public")
|
||||||
|
async def public_route(user=Depends(get_current_user_optional)):
|
||||||
|
return {"user": user.username if user else None}
|
||||||
|
|
||||||
|
@app.get("/members")
|
||||||
|
async def members_route(user=Depends(require_permission(["authenticated"]))):
|
||||||
|
return {"user": user.username if user else None}
|
||||||
|
|
||||||
|
@app.get("/admin")
|
||||||
|
async def admin_route(user=Depends(require_permission(["admin"]))):
|
||||||
|
return {"user": user.username if user else None}
|
||||||
|
|
||||||
|
app.dependency_overrides[get_db] = lambda: db
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def client(test_app):
|
||||||
|
return TestClient(test_app, follow_redirects=False)
|
||||||
|
|
||||||
|
|
||||||
|
def _cookie(username: str, is_admin: bool) -> dict:
|
||||||
|
return {"access_token": create_access_token(username=username, is_admin=is_admin)}
|
||||||
|
|
||||||
|
|
||||||
|
def test_optional_returns_none_without_cookie(client):
|
||||||
|
response = client.get("/public")
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json()["user"] is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_optional_returns_user_with_valid_cookie(client, db):
|
||||||
|
db.add(User(username="alice", full_name="Alice", pw_hash=hash_password("pw")))
|
||||||
|
db.commit()
|
||||||
|
response = client.get("/public", cookies=_cookie("alice", False))
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json()["user"] == "alice"
|
||||||
|
|
||||||
|
|
||||||
|
def test_require_authenticated_redirects_anonymous(client):
|
||||||
|
response = client.get("/members")
|
||||||
|
assert response.status_code == 307
|
||||||
|
assert "/auth/login" in response.headers["location"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_require_authenticated_allows_logged_in(client, db):
|
||||||
|
db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw")))
|
||||||
|
db.commit()
|
||||||
|
response = client.get("/members", cookies=_cookie("bob", False))
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
def test_require_admin_redirects_regular_user(client, db):
|
||||||
|
db.add(User(username="reg", full_name="Reg", is_admin=False, pw_hash=hash_password("pw")))
|
||||||
|
db.commit()
|
||||||
|
response = client.get("/admin", cookies=_cookie("reg", False))
|
||||||
|
assert response.status_code == 307
|
||||||
|
|
||||||
|
|
||||||
|
def test_require_admin_allows_admin(client, db):
|
||||||
|
db.add(User(username="adm", full_name="Adm", is_admin=True, pw_hash=hash_password("pw")))
|
||||||
|
db.commit()
|
||||||
|
response = client.get("/admin", cookies=_cookie("adm", True))
|
||||||
|
assert response.status_code == 200
|
||||||
Loading…
x
Reference in New Issue
Block a user