feat: add get_current_user and require_admin dependencies
This commit is contained in:
parent
fb7284b117
commit
7c9c9e106a
32
app/modules/auth/dependencies.py
Normal file
32
app/modules/auth/dependencies.py
Normal file
@ -0,0 +1,32 @@
|
||||
from fastapi import Depends, HTTPException, Request
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.core.auth import decode_token, get_token_from_request
|
||||
from app.core.database import get_db
|
||||
from app.modules.auth.models import User
|
||||
from app.modules.auth.service import get_user
|
||||
|
||||
|
||||
class RequiresLoginException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
async def get_current_user(
|
||||
request: Request, db: Session = Depends(get_db)
|
||||
) -> User:
|
||||
token = get_token_from_request(request)
|
||||
if not token:
|
||||
raise RequiresLoginException()
|
||||
payload = decode_token(token)
|
||||
if not payload:
|
||||
raise RequiresLoginException()
|
||||
user = get_user(db, payload.get("sub", ""))
|
||||
if user is None or not user.is_active:
|
||||
raise RequiresLoginException()
|
||||
return user
|
||||
|
||||
|
||||
async def require_admin(user: User = Depends(get_current_user)) -> User:
|
||||
if not user.is_admin:
|
||||
raise HTTPException(status_code=403, detail="Admin access required")
|
||||
return user
|
||||
91
tests/test_auth_dependencies.py
Normal file
91
tests/test_auth_dependencies.py
Normal file
@ -0,0 +1,91 @@
|
||||
import pytest
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.responses import RedirectResponse
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.pool import StaticPool
|
||||
|
||||
from app.core.auth import create_access_token
|
||||
from app.core.database import Base, get_db
|
||||
from app.modules.auth.dependencies import RequiresLoginException, get_current_user, require_admin
|
||||
from app.modules.auth.models import User
|
||||
from app.modules.auth.service import hash_password
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def db():
|
||||
engine = create_engine(
|
||||
"sqlite:///:memory:",
|
||||
connect_args={"check_same_thread": False},
|
||||
poolclass=StaticPool,
|
||||
)
|
||||
Base.metadata.create_all(bind=engine)
|
||||
Session = sessionmaker(bind=engine)
|
||||
session = Session()
|
||||
yield session
|
||||
session.close()
|
||||
Base.metadata.drop_all(bind=engine)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_app(db):
|
||||
app = FastAPI()
|
||||
|
||||
@app.exception_handler(RequiresLoginException)
|
||||
async def handle_login_required(request, exc):
|
||||
return RedirectResponse(url="/auth/login", status_code=307)
|
||||
|
||||
@app.get("/protected")
|
||||
async def protected(user=Depends(get_current_user)):
|
||||
return {"username": user.username}
|
||||
|
||||
@app.get("/admin-only")
|
||||
async def admin_only(user=Depends(require_admin)):
|
||||
return {"username": user.username}
|
||||
|
||||
app.dependency_overrides[get_db] = lambda: db
|
||||
return app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(test_app):
|
||||
return TestClient(test_app, follow_redirects=False)
|
||||
|
||||
|
||||
def _token_cookie(username: str, is_admin: bool) -> dict:
|
||||
token = create_access_token(username=username, is_admin=is_admin)
|
||||
return {"access_token": token}
|
||||
|
||||
|
||||
def test_no_cookie_redirects_to_login(client):
|
||||
response = client.get("/protected")
|
||||
assert response.status_code == 307
|
||||
assert "/auth/login" in response.headers["location"]
|
||||
|
||||
|
||||
def test_valid_token_returns_user(client, db):
|
||||
db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw")))
|
||||
db.commit()
|
||||
response = client.get("/protected", cookies=_token_cookie("bob", False))
|
||||
assert response.status_code == 200
|
||||
assert response.json()["username"] == "bob"
|
||||
|
||||
|
||||
def test_invalid_token_redirects(client):
|
||||
response = client.get("/protected", cookies={"access_token": "invalid"})
|
||||
assert response.status_code == 307
|
||||
|
||||
|
||||
def test_require_admin_blocks_non_admin(client, db):
|
||||
db.add(User(username="regular", full_name="R", is_admin=False, pw_hash=hash_password("pw")))
|
||||
db.commit()
|
||||
response = client.get("/admin-only", cookies=_token_cookie("regular", False))
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
def test_require_admin_allows_admin(client, db):
|
||||
db.add(User(username="admin", full_name="A", is_admin=True, pw_hash=hash_password("pw")))
|
||||
db.commit()
|
||||
response = client.get("/admin-only", cookies=_token_cookie("admin", True))
|
||||
assert response.status_code == 200
|
||||
Loading…
x
Reference in New Issue
Block a user