diff --git a/docs/superpowers/plans/2026-04-27-auth-permissions.md b/docs/superpowers/plans/2026-04-27-auth-permissions.md new file mode 100644 index 0000000..0763b5b --- /dev/null +++ b/docs/superpowers/plans/2026-04-27-auth-permissions.md @@ -0,0 +1,661 @@ +# Auth Permissions Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Öffentliche Routen ermöglichen, Permission-basierte Kachel-Filterung einführen, und Navbar kontextabhängig gestalten. + +**Architecture:** Drei neue Funktionen in `dependencies.py` (`check_permission`, `get_current_user_optional`, `require_permission`). `MODULES` in `main.py` bekommt `slug` und `permission`, aktive Module werden automatisch registriert. `GET /` wird öffentlich. Navbar in `base.html` zeigt User-Info conditional. Bestehende `get_current_user` und `require_admin` bleiben unverändert. + +**Tech Stack:** FastAPI, Jinja2, SQLAlchemy, pytest, importlib + +--- + +### Task 1: check_permission, get_current_user_optional, require_permission + +**Files:** +- Modify: `app/modules/auth/dependencies.py` +- Create: `tests/test_permissions.py` + +- [ ] **Step 1: Write failing tests** + +Create `tests/test_permissions.py`: + +```python +import pytest +from fastapi import Depends, FastAPI +from fastapi.responses import RedirectResponse +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import StaticPool + +from app.core.auth import create_access_token +from app.core.database import Base, get_db +from app.modules.auth.dependencies import ( + RequiresLoginException, + check_permission, + get_current_user_optional, + require_permission, +) +from app.modules.auth.models import User +from app.modules.auth.service import hash_password + + +# --- check_permission: pure unit tests, no DB needed --- + +def test_public_allows_anonymous(): + assert check_permission(None, ["public"]) is True + + +def test_public_allows_logged_in(): + user = User(username="u", full_name="U", is_admin=False) + assert check_permission(user, ["public"]) is True + + +def test_authenticated_blocks_anonymous(): + assert check_permission(None, ["authenticated"]) is False + + +def test_authenticated_allows_user(): + user = User(username="u", full_name="U") + assert check_permission(user, ["authenticated"]) is True + + +def test_admin_blocks_regular_user(): + user = User(username="u", full_name="U", is_admin=False) + assert check_permission(user, ["admin"]) is False + + +def test_admin_allows_admin(): + user = User(username="u", full_name="U", is_admin=True) + assert check_permission(user, ["admin"]) is True + + +def test_or_logic_grants_access_on_any_match(): + regular = User(username="u", full_name="U", is_admin=False) + assert check_permission(regular, ["admin", "authenticated"]) is True + + +def test_empty_permissions_blocks_all(): + user = User(username="u", full_name="U") + assert check_permission(user, []) is False + + +# --- get_current_user_optional and require_permission: integration tests --- + +@pytest.fixture +def db(): + engine = create_engine( + "sqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + Base.metadata.create_all(bind=engine) + Session = sessionmaker(bind=engine) + session = Session() + yield session + session.close() + Base.metadata.drop_all(bind=engine) + + +@pytest.fixture +def test_app(db): + app = FastAPI() + + @app.exception_handler(RequiresLoginException) + async def handle(request, exc): + return RedirectResponse(url="/auth/login", status_code=307) + + @app.get("/public") + async def public_route(user=Depends(get_current_user_optional)): + return {"user": user.username if user else None} + + @app.get("/members") + async def members_route(user=Depends(require_permission(["authenticated"]))): + return {"user": user.username if user else None} + + @app.get("/admin") + async def admin_route(user=Depends(require_permission(["admin"]))): + return {"user": user.username if user else None} + + app.dependency_overrides[get_db] = lambda: db + return app + + +@pytest.fixture +def client(test_app): + return TestClient(test_app, follow_redirects=False) + + +def _cookie(username: str, is_admin: bool) -> dict: + return {"access_token": create_access_token(username=username, is_admin=is_admin)} + + +def test_optional_returns_none_without_cookie(client): + response = client.get("/public") + assert response.status_code == 200 + assert response.json()["user"] is None + + +def test_optional_returns_user_with_valid_cookie(client, db): + db.add(User(username="alice", full_name="Alice", pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/public", cookies=_cookie("alice", False)) + assert response.status_code == 200 + assert response.json()["user"] == "alice" + + +def test_require_authenticated_redirects_anonymous(client): + response = client.get("/members") + assert response.status_code == 307 + assert "/auth/login" in response.headers["location"] + + +def test_require_authenticated_allows_logged_in(client, db): + db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/members", cookies=_cookie("bob", False)) + assert response.status_code == 200 + + +def test_require_admin_redirects_regular_user(client, db): + db.add(User(username="reg", full_name="Reg", is_admin=False, pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/admin", cookies=_cookie("reg", False)) + assert response.status_code == 307 + + +def test_require_admin_allows_admin(client, db): + db.add(User(username="adm", full_name="Adm", is_admin=True, pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/admin", cookies=_cookie("adm", True)) + assert response.status_code == 200 +``` + +- [ ] **Step 2: Run to confirm failure** + +```bash +.venv/bin/pytest tests/test_permissions.py -v +``` + +Expected: FAIL — `ImportError: cannot import name 'check_permission'` + +- [ ] **Step 3: Update `app/modules/auth/dependencies.py`** + +```python +from fastapi import Depends, HTTPException, Request +from sqlalchemy.orm import Session +from typing import Callable + +from app.core.auth import decode_token, get_token_from_request +from app.core.database import get_db +from app.modules.auth.models import User +from app.modules.auth.service import get_user + + +class RequiresLoginException(Exception): + pass + + +def check_permission(user: User | None, permissions: list[str]) -> bool: + if "public" in permissions: + return True + if user is None: + return False + if "authenticated" in permissions: + return True + if "admin" in permissions and user.is_admin: + return True + return False # group membership check: Part 2 + + +async def get_current_user_optional( + request: Request, db: Session = Depends(get_db) +) -> User | None: + token = get_token_from_request(request) + if not token: + return None + payload = decode_token(token) + if not payload: + return None + user = get_user(db, payload.get("sub", "")) + if user is None or not user.is_active: + return None + return user + + +def require_permission(permissions: list[str]) -> Callable: + async def _dep( + request: Request, db: Session = Depends(get_db) + ) -> User | None: + user = await get_current_user_optional(request, db) + if not check_permission(user, permissions): + raise RequiresLoginException() + return user + return _dep + + +async def get_current_user( + request: Request, db: Session = Depends(get_db) +) -> User: + token = get_token_from_request(request) + if not token: + raise RequiresLoginException() + payload = decode_token(token) + if not payload: + raise RequiresLoginException() + user = get_user(db, payload.get("sub", "")) + if user is None or not user.is_active: + raise RequiresLoginException() + return user + + +async def require_admin(user: User = Depends(get_current_user)) -> User: + if not user.is_admin: + raise HTTPException(status_code=403, detail="Admin access required") + return user +``` + +- [ ] **Step 4: Run to confirm pass** + +```bash +.venv/bin/pytest tests/test_permissions.py -v +``` + +Expected: All 16 tests PASS. + +- [ ] **Step 5: Run full suite to confirm no regressions** + +```bash +.venv/bin/pytest -v +``` + +Expected: All tests PASS. + +- [ ] **Step 6: Commit** + +```bash +git add app/modules/auth/dependencies.py tests/test_permissions.py +git commit -m "feat: add check_permission, get_current_user_optional, require_permission" +``` + +--- + +### Task 2: MODULES registry + public landing page + +**Files:** +- Modify: `app/main.py` +- Modify: `tests/test_landing.py` + +- [ ] **Step 1: Replace `app/main.py`** + +```python +import importlib +import logging +from contextlib import asynccontextmanager + +from fastapi import Depends, FastAPI, Request, WebSocket +from fastapi.responses import HTMLResponse, RedirectResponse +from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates + +from app.core.config import get_settings +from app.core.database import get_db +from app.modules.auth.dependencies import ( + RequiresLoginException, + check_permission, + get_current_user, + get_current_user_optional, +) +from app.modules.auth.router import router as auth_router + +logger = logging.getLogger(__name__) +settings = get_settings() + +MODULES = [ + { + "slug": "rss", + "icon": "📡", + "name": "RSS-Feed Server", + "description": "Aggregiert und verteilt Neuigkeiten der Fakultät als standardkonformen RSS 2.0 Feed.", + "status": "active", + "permission": ["authenticated"], + }, + { + "slug": "kalender", + "icon": "📅", + "name": "Kalender", + "description": "Veranstaltungen, Prüfungstermine und Fristen im iCal-Format.", + "status": "planned", + "permission": ["authenticated"], + }, + { + "slug": "benachrichtigungen", + "icon": "🔔", + "name": "Benachrichtigungen", + "description": "Push-Nachrichten und WebSocket-basierte Echtzeit-Alerts für Studierende.", + "status": "planned", + "permission": ["authenticated"], + }, + { + "slug": "auslastung", + "icon": "📊", + "name": "Auslastung", + "description": "Raum- und Ressourcenauslastung der Fakultät in Echtzeit.", + "status": "planned", + "permission": ["authenticated"], + }, + { + "slug": "lehrveranstaltungen", + "icon": "📚", + "name": "Lehrveranstaltungen", + "description": "Stundenplan-API und Kursinformationen aus dem Campus-System.", + "status": "planned", + "permission": ["authenticated"], + }, +] + +NAV_ITEMS = [ + {"label": "Übersicht", "url": "/", "active": True}, + {"label": "RSS-Feeds", "url": "/rss", "active": False}, + {"label": "Kalender", "url": "/kalender", "active": False}, + {"label": "Docs", "url": "/docs", "active": False}, +] + + +def _reset_admin() -> None: + from app.core.database import Base, SessionLocal, engine + from app.modules.auth.models import User # noqa: F401 + from app.modules.auth.service import get_user, hash_password + + Base.metadata.create_all(bind=engine) + with SessionLocal() as db: + user = get_user(db, settings.ADMIN_USERNAME) + if user is None: + user = User(username=settings.ADMIN_USERNAME, full_name="Administrator") + db.add(user) + user.pw_hash = hash_password(settings.ADMIN_PASSWORD) + user.is_admin = True + user.is_active = True + db.commit() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + _reset_admin() + yield + + +app = FastAPI( + title="University Process Hub", + root_path=settings.APP_PREFIX, + lifespan=lifespan, +) + +app.mount("/static", StaticFiles(directory="app/static"), name="static") +templates = Jinja2Templates(directory="app/templates") + +app.include_router(auth_router) + +for _module in MODULES: + if _module["status"] == "active": + try: + _mod = importlib.import_module(f"app.modules.{_module['slug']}.router") + app.include_router(_mod.router, prefix=f"/{_module['slug']}") + except ModuleNotFoundError: + logger.warning("Module router not found: app.modules.%s.router", _module["slug"]) + + +@app.exception_handler(RequiresLoginException) +async def requires_login_handler(request: Request, exc: RequiresLoginException): + return RedirectResponse(url="/auth/login", status_code=307) + + +def _db_mode() -> str: + url = settings.DATABASE_URL + return "SQLite (dev)" if url.startswith("sqlite") else "MariaDB (prod)" + + +@app.get("/", response_class=HTMLResponse) +async def root(request: Request, current_user=Depends(get_current_user_optional)): + visible = [m for m in MODULES if check_permission(current_user, m["permission"])] + return templates.TemplateResponse( + request, + "index.html", + { + "nav_items": NAV_ITEMS, + "modules": visible, + "db_mode": _db_mode(), + "app_version": "0.1.0", + "current_user": current_user, + }, + ) + + +@app.websocket("/ws/hello/{name}") +async def websocket_hello(websocket: WebSocket, name: str): + await websocket.accept() + await websocket.send_text(f"Hello, {name}!") + await websocket.close() +``` + +- [ ] **Step 2: Replace `tests/test_landing.py`** + +```python +import pytest +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import StaticPool + +from app.core.auth import create_access_token +from app.core.database import Base, get_db +from app.main import app +from app.modules.auth.models import User +from app.modules.auth.service import hash_password + + +@pytest.fixture(autouse=True) +def override_db(): + engine = create_engine( + "sqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + Base.metadata.create_all(bind=engine) + Session = sessionmaker(bind=engine) + session = Session() + app.dependency_overrides[get_db] = lambda: session + yield session + app.dependency_overrides.clear() + session.close() + Base.metadata.drop_all(bind=engine) + + +@pytest.fixture +def client(): + return TestClient(app, follow_redirects=False) + + +@pytest.fixture +def auth_cookies(override_db): + user = User(username="testuser", full_name="Test User", pw_hash=hash_password("pw")) + override_db.add(user) + override_db.commit() + token = create_access_token(username="testuser", is_admin=False) + return {"access_token": token} + + +@pytest.fixture +def admin_cookies(override_db): + user = User(username="adminuser", full_name="Admin User", is_admin=True, pw_hash=hash_password("pw")) + override_db.add(user) + override_db.commit() + token = create_access_token(username="adminuser", is_admin=True) + return {"access_token": token} + + +def test_landing_accessible_without_auth(client): + response = client.get("/") + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + + +def test_landing_shows_no_modules_for_anonymous(client): + response = client.get("/") + assert response.status_code == 200 + assert "RSS-Feed Server" not in response.text + + +def test_landing_shows_modules_for_authenticated(client, auth_cookies): + response = client.get("/", cookies=auth_cookies) + assert response.status_code == 200 + assert "RSS-Feed Server" in response.text + + +def test_landing_contains_title(client): + response = client.get("/") + assert "University Process Hub" in response.text + + +def test_landing_info_strip_shows_db_mode(client, auth_cookies): + response = client.get("/", cookies=auth_cookies) + assert "SQLite" in response.text or "MariaDB" in response.text +``` + +- [ ] **Step 3: Run to confirm all tests pass** + +```bash +.venv/bin/pytest -v +``` + +Expected: All tests PASS. Note: `test_landing_without_auth_redirects` is replaced by `test_landing_accessible_without_auth`. + +- [ ] **Step 4: Commit** + +```bash +git add app/main.py tests/test_landing.py +git commit -m "feat: public landing page with permission-based tile filtering" +``` + +--- + +### Task 3: Navbar conditional user display + +**Files:** +- Modify: `app/templates/base.html` +- Modify: `tests/test_landing.py` (add navbar tests) + +- [ ] **Step 1: Add navbar tests to `tests/test_landing.py`** + +Append these tests to the existing file: + +```python +def test_navbar_shows_login_button_for_anonymous(client): + response = client.get("/") + assert "Anmelden" in response.text + assert "Abmelden" not in response.text + + +def test_navbar_shows_username_and_logout_when_logged_in(client, auth_cookies): + response = client.get("/", cookies=auth_cookies) + assert "testuser" in response.text + assert "Abmelden" in response.text + assert "Anmelden" not in response.text +``` + +- [ ] **Step 2: Run to confirm failure** + +```bash +.venv/bin/pytest tests/test_landing.py::test_navbar_shows_username_and_logout_when_logged_in -v +``` + +Expected: FAIL — navbar currently always shows "Anmelden" button. + +- [ ] **Step 3: Replace `app/templates/base.html`** + +```html + + + + + + {% block title %}EFI Hub{% endblock %} + + + + + + + +
+ {% block content %}{% endblock %} +
+ + + + + +``` + +Wichtig: `current_user | default(None)` verhindert `UndefinedError` auf Seiten (z. B. Login-Page) die `current_user` nicht im Template-Context übergeben. + +- [ ] **Step 4: Run to confirm pass** + +```bash +.venv/bin/pytest tests/test_landing.py -v +``` + +Expected: All landing page tests PASS. + +- [ ] **Step 5: Run full suite** + +```bash +.venv/bin/pytest -v +``` + +Expected: All tests PASS. + +- [ ] **Step 6: Commit** + +```bash +git add app/templates/base.html tests/test_landing.py +git commit -m "feat: conditional navbar — shows username/logout when logged in" +```