# Auth Permissions Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Öffentliche Routen ermöglichen, Permission-basierte Kachel-Filterung einführen, und Navbar kontextabhängig gestalten. **Architecture:** Drei neue Funktionen in `dependencies.py` (`check_permission`, `get_current_user_optional`, `require_permission`). `MODULES` in `main.py` bekommt `slug` und `permission`, aktive Module werden automatisch registriert. `GET /` wird öffentlich. Navbar in `base.html` zeigt User-Info conditional. Bestehende `get_current_user` und `require_admin` bleiben unverändert. **Tech Stack:** FastAPI, Jinja2, SQLAlchemy, pytest, importlib --- ### Task 1: check_permission, get_current_user_optional, require_permission **Files:** - Modify: `app/modules/auth/dependencies.py` - Create: `tests/test_permissions.py` - [ ] **Step 1: Write failing tests** Create `tests/test_permissions.py`: ```python import pytest from fastapi import Depends, FastAPI from fastapi.responses import RedirectResponse from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import StaticPool from app.core.auth import create_access_token from app.core.database import Base, get_db from app.modules.auth.dependencies import ( RequiresLoginException, check_permission, get_current_user_optional, require_permission, ) from app.modules.auth.models import User from app.modules.auth.service import hash_password # --- check_permission: pure unit tests, no DB needed --- def test_public_allows_anonymous(): assert check_permission(None, ["public"]) is True def test_public_allows_logged_in(): user = User(username="u", full_name="U", is_admin=False) assert check_permission(user, ["public"]) is True def test_authenticated_blocks_anonymous(): assert check_permission(None, ["authenticated"]) is False def test_authenticated_allows_user(): user = User(username="u", full_name="U") assert check_permission(user, ["authenticated"]) is True def test_admin_blocks_regular_user(): user = User(username="u", full_name="U", is_admin=False) assert check_permission(user, ["admin"]) is False def test_admin_allows_admin(): user = User(username="u", full_name="U", is_admin=True) assert check_permission(user, ["admin"]) is True def test_or_logic_grants_access_on_any_match(): regular = User(username="u", full_name="U", is_admin=False) assert check_permission(regular, ["admin", "authenticated"]) is True def test_empty_permissions_blocks_all(): user = User(username="u", full_name="U") assert check_permission(user, []) is False # --- get_current_user_optional and require_permission: integration tests --- @pytest.fixture def db(): engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(bind=engine) Session = sessionmaker(bind=engine) session = Session() yield session session.close() Base.metadata.drop_all(bind=engine) @pytest.fixture def test_app(db): app = FastAPI() @app.exception_handler(RequiresLoginException) async def handle(request, exc): return RedirectResponse(url="/auth/login", status_code=307) @app.get("/public") async def public_route(user=Depends(get_current_user_optional)): return {"user": user.username if user else None} @app.get("/members") async def members_route(user=Depends(require_permission(["authenticated"]))): return {"user": user.username if user else None} @app.get("/admin") async def admin_route(user=Depends(require_permission(["admin"]))): return {"user": user.username if user else None} app.dependency_overrides[get_db] = lambda: db return app @pytest.fixture def client(test_app): return TestClient(test_app, follow_redirects=False) def _cookie(username: str, is_admin: bool) -> dict: return {"access_token": create_access_token(username=username, is_admin=is_admin)} def test_optional_returns_none_without_cookie(client): response = client.get("/public") assert response.status_code == 200 assert response.json()["user"] is None def test_optional_returns_user_with_valid_cookie(client, db): db.add(User(username="alice", full_name="Alice", pw_hash=hash_password("pw"))) db.commit() response = client.get("/public", cookies=_cookie("alice", False)) assert response.status_code == 200 assert response.json()["user"] == "alice" def test_require_authenticated_redirects_anonymous(client): response = client.get("/members") assert response.status_code == 307 assert "/auth/login" in response.headers["location"] def test_require_authenticated_allows_logged_in(client, db): db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw"))) db.commit() response = client.get("/members", cookies=_cookie("bob", False)) assert response.status_code == 200 def test_require_admin_redirects_regular_user(client, db): db.add(User(username="reg", full_name="Reg", is_admin=False, pw_hash=hash_password("pw"))) db.commit() response = client.get("/admin", cookies=_cookie("reg", False)) assert response.status_code == 307 def test_require_admin_allows_admin(client, db): db.add(User(username="adm", full_name="Adm", is_admin=True, pw_hash=hash_password("pw"))) db.commit() response = client.get("/admin", cookies=_cookie("adm", True)) assert response.status_code == 200 ``` - [ ] **Step 2: Run to confirm failure** ```bash .venv/bin/pytest tests/test_permissions.py -v ``` Expected: FAIL — `ImportError: cannot import name 'check_permission'` - [ ] **Step 3: Update `app/modules/auth/dependencies.py`** ```python from fastapi import Depends, HTTPException, Request from sqlalchemy.orm import Session from typing import Callable from app.core.auth import decode_token, get_token_from_request from app.core.database import get_db from app.modules.auth.models import User from app.modules.auth.service import get_user class RequiresLoginException(Exception): pass def check_permission(user: User | None, permissions: list[str]) -> bool: if "public" in permissions: return True if user is None: return False if "authenticated" in permissions: return True if "admin" in permissions and user.is_admin: return True return False # group membership check: Part 2 async def get_current_user_optional( request: Request, db: Session = Depends(get_db) ) -> User | None: token = get_token_from_request(request) if not token: return None payload = decode_token(token) if not payload: return None user = get_user(db, payload.get("sub", "")) if user is None or not user.is_active: return None return user def require_permission(permissions: list[str]) -> Callable: async def _dep( request: Request, db: Session = Depends(get_db) ) -> User | None: user = await get_current_user_optional(request, db) if not check_permission(user, permissions): raise RequiresLoginException() return user return _dep async def get_current_user( request: Request, db: Session = Depends(get_db) ) -> User: token = get_token_from_request(request) if not token: raise RequiresLoginException() payload = decode_token(token) if not payload: raise RequiresLoginException() user = get_user(db, payload.get("sub", "")) if user is None or not user.is_active: raise RequiresLoginException() return user async def require_admin(user: User = Depends(get_current_user)) -> User: if not user.is_admin: raise HTTPException(status_code=403, detail="Admin access required") return user ``` - [ ] **Step 4: Run to confirm pass** ```bash .venv/bin/pytest tests/test_permissions.py -v ``` Expected: All 16 tests PASS. - [ ] **Step 5: Run full suite to confirm no regressions** ```bash .venv/bin/pytest -v ``` Expected: All tests PASS. - [ ] **Step 6: Commit** ```bash git add app/modules/auth/dependencies.py tests/test_permissions.py git commit -m "feat: add check_permission, get_current_user_optional, require_permission" ``` --- ### Task 2: MODULES registry + public landing page **Files:** - Modify: `app/main.py` - Modify: `tests/test_landing.py` - [ ] **Step 1: Replace `app/main.py`** ```python import importlib import logging from contextlib import asynccontextmanager from fastapi import Depends, FastAPI, Request, WebSocket from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from app.core.config import get_settings from app.core.database import get_db from app.modules.auth.dependencies import ( RequiresLoginException, check_permission, get_current_user, get_current_user_optional, ) from app.modules.auth.router import router as auth_router logger = logging.getLogger(__name__) settings = get_settings() MODULES = [ { "slug": "rss", "icon": "📡", "name": "RSS-Feed Server", "description": "Aggregiert und verteilt Neuigkeiten der Fakultät als standardkonformen RSS 2.0 Feed.", "status": "active", "permission": ["authenticated"], }, { "slug": "kalender", "icon": "📅", "name": "Kalender", "description": "Veranstaltungen, Prüfungstermine und Fristen im iCal-Format.", "status": "planned", "permission": ["authenticated"], }, { "slug": "benachrichtigungen", "icon": "🔔", "name": "Benachrichtigungen", "description": "Push-Nachrichten und WebSocket-basierte Echtzeit-Alerts für Studierende.", "status": "planned", "permission": ["authenticated"], }, { "slug": "auslastung", "icon": "📊", "name": "Auslastung", "description": "Raum- und Ressourcenauslastung der Fakultät in Echtzeit.", "status": "planned", "permission": ["authenticated"], }, { "slug": "lehrveranstaltungen", "icon": "📚", "name": "Lehrveranstaltungen", "description": "Stundenplan-API und Kursinformationen aus dem Campus-System.", "status": "planned", "permission": ["authenticated"], }, ] NAV_ITEMS = [ {"label": "Übersicht", "url": "/", "active": True}, {"label": "RSS-Feeds", "url": "/rss", "active": False}, {"label": "Kalender", "url": "/kalender", "active": False}, {"label": "Docs", "url": "/docs", "active": False}, ] def _reset_admin() -> None: from app.core.database import Base, SessionLocal, engine from app.modules.auth.models import User # noqa: F401 from app.modules.auth.service import get_user, hash_password Base.metadata.create_all(bind=engine) with SessionLocal() as db: user = get_user(db, settings.ADMIN_USERNAME) if user is None: user = User(username=settings.ADMIN_USERNAME, full_name="Administrator") db.add(user) user.pw_hash = hash_password(settings.ADMIN_PASSWORD) user.is_admin = True user.is_active = True db.commit() @asynccontextmanager async def lifespan(app: FastAPI): _reset_admin() yield app = FastAPI( title="University Process Hub", root_path=settings.APP_PREFIX, lifespan=lifespan, ) app.mount("/static", StaticFiles(directory="app/static"), name="static") templates = Jinja2Templates(directory="app/templates") app.include_router(auth_router) for _module in MODULES: if _module["status"] == "active": try: _mod = importlib.import_module(f"app.modules.{_module['slug']}.router") app.include_router(_mod.router, prefix=f"/{_module['slug']}") except ModuleNotFoundError: logger.warning("Module router not found: app.modules.%s.router", _module["slug"]) @app.exception_handler(RequiresLoginException) async def requires_login_handler(request: Request, exc: RequiresLoginException): return RedirectResponse(url="/auth/login", status_code=307) def _db_mode() -> str: url = settings.DATABASE_URL return "SQLite (dev)" if url.startswith("sqlite") else "MariaDB (prod)" @app.get("/", response_class=HTMLResponse) async def root(request: Request, current_user=Depends(get_current_user_optional)): visible = [m for m in MODULES if check_permission(current_user, m["permission"])] return templates.TemplateResponse( request, "index.html", { "nav_items": NAV_ITEMS, "modules": visible, "db_mode": _db_mode(), "app_version": "0.1.0", "current_user": current_user, }, ) @app.websocket("/ws/hello/{name}") async def websocket_hello(websocket: WebSocket, name: str): await websocket.accept() await websocket.send_text(f"Hello, {name}!") await websocket.close() ``` - [ ] **Step 2: Replace `tests/test_landing.py`** ```python import pytest from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import StaticPool from app.core.auth import create_access_token from app.core.database import Base, get_db from app.main import app from app.modules.auth.models import User from app.modules.auth.service import hash_password @pytest.fixture(autouse=True) def override_db(): engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(bind=engine) Session = sessionmaker(bind=engine) session = Session() app.dependency_overrides[get_db] = lambda: session yield session app.dependency_overrides.clear() session.close() Base.metadata.drop_all(bind=engine) @pytest.fixture def client(): return TestClient(app, follow_redirects=False) @pytest.fixture def auth_cookies(override_db): user = User(username="testuser", full_name="Test User", pw_hash=hash_password("pw")) override_db.add(user) override_db.commit() token = create_access_token(username="testuser", is_admin=False) return {"access_token": token} @pytest.fixture def admin_cookies(override_db): user = User(username="adminuser", full_name="Admin User", is_admin=True, pw_hash=hash_password("pw")) override_db.add(user) override_db.commit() token = create_access_token(username="adminuser", is_admin=True) return {"access_token": token} def test_landing_accessible_without_auth(client): response = client.get("/") assert response.status_code == 200 assert "text/html" in response.headers["content-type"] def test_landing_shows_no_modules_for_anonymous(client): response = client.get("/") assert response.status_code == 200 assert "RSS-Feed Server" not in response.text def test_landing_shows_modules_for_authenticated(client, auth_cookies): response = client.get("/", cookies=auth_cookies) assert response.status_code == 200 assert "RSS-Feed Server" in response.text def test_landing_contains_title(client): response = client.get("/") assert "University Process Hub" in response.text def test_landing_info_strip_shows_db_mode(client, auth_cookies): response = client.get("/", cookies=auth_cookies) assert "SQLite" in response.text or "MariaDB" in response.text ``` - [ ] **Step 3: Run to confirm all tests pass** ```bash .venv/bin/pytest -v ``` Expected: All tests PASS. Note: `test_landing_without_auth_redirects` is replaced by `test_landing_accessible_without_auth`. - [ ] **Step 4: Commit** ```bash git add app/main.py tests/test_landing.py git commit -m "feat: public landing page with permission-based tile filtering" ``` --- ### Task 3: Navbar conditional user display **Files:** - Modify: `app/templates/base.html` - Modify: `tests/test_landing.py` (add navbar tests) - [ ] **Step 1: Add navbar tests to `tests/test_landing.py`** Append these tests to the existing file: ```python def test_navbar_shows_login_button_for_anonymous(client): response = client.get("/") assert "Anmelden" in response.text assert "Abmelden" not in response.text def test_navbar_shows_username_and_logout_when_logged_in(client, auth_cookies): response = client.get("/", cookies=auth_cookies) assert "testuser" in response.text assert "Abmelden" in response.text assert "Anmelden" not in response.text ``` - [ ] **Step 2: Run to confirm failure** ```bash .venv/bin/pytest tests/test_landing.py::test_navbar_shows_username_and_logout_when_logged_in -v ``` Expected: FAIL — navbar currently always shows "Anmelden" button. - [ ] **Step 3: Replace `app/templates/base.html`** ```html