diff --git a/docs/superpowers/plans/2026-04-27-auth-part1.md b/docs/superpowers/plans/2026-04-27-auth-part1.md new file mode 100644 index 0000000..885247c --- /dev/null +++ b/docs/superpowers/plans/2026-04-27-auth-part1.md @@ -0,0 +1,1500 @@ +# Auth Part 1: Core Auth Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Implement local user authentication with bcrypt passwords, JWT in httpOnly cookies, SQLAlchemy User model, Alembic migrations, and FastAPI route protection. + +**Architecture:** Single `users` table via SQLAlchemy 2.0 + Alembic. `app/core/database.py` provides the engine and `get_db()` dependency. `app/core/auth.py` handles JWT only (no business logic). Business logic lives in `app/modules/auth/service.py`. Route protection via `get_current_user` dependency that raises `RequiresLoginException` → caught by exception handler in `main.py` → redirect to `/auth/login`. LDAP hook is a `pass` stub in `service.py` until Part 2. + +**Tech Stack:** SQLAlchemy 2.0, Alembic, python-jose[cryptography], passlib[bcrypt], python-multipart, pytest + +--- + +### Task 1: Dependencies and config + +**Files:** +- Modify: `requirements.txt` +- Modify: `app/core/config.py` +- Modify: `.env.example` +- Test: `tests/test_config.py` + +- [ ] **Step 1: Update `requirements.txt`** + +``` +fastapi +uvicorn[standard] +pydantic-settings +sqlalchemy +jinja2 +pytest +httpx +alembic +python-jose[cryptography] +passlib[bcrypt] +python-multipart +ldap3 +``` + +- [ ] **Step 2: Install** + +```bash +.venv/bin/pip install -r requirements.txt -q +``` + +Expected: no errors. `passlib`, `jose`, `alembic` appear in output. + +- [ ] **Step 3: Write failing test** + +Create `tests/test_config.py`: + +```python +from app.core.config import get_settings + + +def test_new_config_fields_have_defaults(): + s = get_settings() + assert hasattr(s, "SECRET_KEY") + assert hasattr(s, "LDAP_ENABLED") + assert s.LDAP_ENABLED is False + assert s.LDAP_SERVER == "gso1.ads1.fh-nuernberg.de" + assert s.LDAP_DOMAIN == "ADS1" +``` + +- [ ] **Step 4: Run test to confirm failure** + +```bash +.venv/bin/pytest tests/test_config.py -v +``` + +Expected: FAIL — `AttributeError: 'Settings' object has no attribute 'SECRET_KEY'` + +- [ ] **Step 5: Replace `app/core/config.py`** + +```python +from functools import lru_cache + +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8") + + APP_ENV: str = "development" + APP_PREFIX: str = "" + DATABASE_URL: str = "sqlite:///./app.db" + SECRET_KEY: str = "changeme-replace-in-production" + LDAP_ENABLED: bool = False + LDAP_SERVER: str = "gso1.ads1.fh-nuernberg.de" + LDAP_DOMAIN: str = "ADS1" + LDAP_SEARCH_BASE: str = "OU=users,OU=EFI,OU=Faculties,DC=ADS1,DC=fh-nuernberg,DC=de" + + +@lru_cache +def get_settings() -> Settings: + return Settings() +``` + +- [ ] **Step 6: Update `.env.example`** + +``` +APP_ENV=development +APP_PREFIX= +DATABASE_URL=sqlite:///./app.db +SECRET_KEY=changeme-replace-in-production + +# Produktion (MariaDB + LDAP): +# APP_ENV=production +# APP_PREFIX=/uph +# DATABASE_URL=mysql+pymysql://user:password@db:3306/efihub +# SECRET_KEY= +# LDAP_ENABLED=true +``` + +- [ ] **Step 7: Run test to confirm pass** + +```bash +.venv/bin/pytest tests/test_config.py -v +``` + +Expected: PASS. + +- [ ] **Step 8: Commit** + +```bash +git add requirements.txt app/core/config.py .env.example tests/test_config.py +git commit -m "feat: add auth config fields (SECRET_KEY, LDAP_*)" +``` + +--- + +### Task 2: Database layer + +**Files:** +- Create: `app/core/database.py` +- Test: `tests/test_database.py` + +- [ ] **Step 1: Write failing test** + +Create `tests/test_database.py`: + +```python +from sqlalchemy.orm import Session +from app.core.database import Base, get_db + + +def test_get_db_yields_session(): + gen = get_db() + db = next(gen) + assert isinstance(db, Session) + try: + next(gen) + except StopIteration: + pass + + +def test_base_has_metadata(): + assert Base.metadata is not None +``` + +- [ ] **Step 2: Run to confirm failure** + +```bash +.venv/bin/pytest tests/test_database.py -v +``` + +Expected: FAIL — `ModuleNotFoundError: No module named 'app.core.database'` + +- [ ] **Step 3: Create `app/core/database.py`** + +```python +from sqlalchemy import create_engine +from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker + +from app.core.config import get_settings + +settings = get_settings() + +_connect_args = {"check_same_thread": False} if settings.DATABASE_URL.startswith("sqlite") else {} +engine = create_engine(settings.DATABASE_URL, connect_args=_connect_args) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +class Base(DeclarativeBase): + pass + + +def get_db() -> Session: + db = SessionLocal() + try: + yield db + finally: + db.close() +``` + +- [ ] **Step 4: Run to confirm pass** + +```bash +.venv/bin/pytest tests/test_database.py -v +``` + +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/core/database.py tests/test_database.py +git commit -m "feat: add SQLAlchemy database layer with get_db dependency" +``` + +--- + +### Task 3: User model and Alembic migrations + +**Files:** +- Create: `app/modules/__init__.py` +- Create: `app/modules/auth/__init__.py` +- Create: `app/modules/auth/models.py` +- Create: `alembic.ini`, `alembic/env.py`, `alembic/versions/` +- Test: `tests/test_user_model.py` + +- [ ] **Step 1: Create module packages** + +```bash +mkdir -p app/modules/auth +touch app/modules/__init__.py app/modules/auth/__init__.py +``` + +- [ ] **Step 2: Write failing test** + +Create `tests/test_user_model.py`: + +```python +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from app.core.database import Base +from app.modules.auth.models import User + + +@pytest.fixture +def db(): + engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) + Base.metadata.create_all(bind=engine) + Session = sessionmaker(bind=engine) + session = Session() + yield session + session.close() + Base.metadata.drop_all(bind=engine) + + +def test_create_user(db): + user = User(username="testuser", full_name="Test User", pw_hash="hashed") + db.add(user) + db.commit() + db.refresh(user) + assert user.id is not None + assert user.username == "testuser" + assert user.is_active is True + assert user.is_admin is False + assert user.created_at is not None + + +def test_user_defaults(db): + user = User(username="alice", full_name="Alice") + db.add(user) + db.commit() + db.refresh(user) + assert user.pw_hash is None + assert user.email is None + assert user.department is None + assert user.last_login is None + + +def test_username_must_be_unique(db): + db.add(User(username="dup", full_name="First")) + db.commit() + db.add(User(username="dup", full_name="Second")) + with pytest.raises(Exception): + db.commit() +``` + +- [ ] **Step 3: Run to confirm failure** + +```bash +.venv/bin/pytest tests/test_user_model.py -v +``` + +Expected: FAIL — `ModuleNotFoundError: No module named 'app.modules.auth.models'` + +- [ ] **Step 4: Create `app/modules/auth/models.py`** + +```python +from datetime import datetime + +from sqlalchemy import Boolean, DateTime, Integer, String +from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy.sql import func + +from app.core.database import Base + + +class User(Base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True) + username: Mapped[str] = mapped_column(String(64), unique=True, nullable=False, index=True) + full_name: Mapped[str] = mapped_column(String(128), nullable=False, default="") + email: Mapped[str | None] = mapped_column(String(128), nullable=True) + department: Mapped[str | None] = mapped_column(String(64), nullable=True) + role: Mapped[str | None] = mapped_column(String(64), nullable=True) + pw_hash: Mapped[str | None] = mapped_column(String(256), nullable=True) + is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False) + is_admin: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) + account_expires: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) + last_login: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime, server_default=func.now(), nullable=False + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime, server_default=func.now(), onupdate=func.now(), nullable=False + ) +``` + +- [ ] **Step 5: Run to confirm pass** + +```bash +.venv/bin/pytest tests/test_user_model.py -v +``` + +Expected: 3 tests PASS. + +- [ ] **Step 6: Initialize Alembic** + +```bash +.venv/bin/alembic init alembic +``` + +Creates `alembic.ini` and `alembic/` directory. + +- [ ] **Step 7: Configure `alembic.ini`** + +Find and replace the database URL line: +```ini +# Before: +sqlalchemy.url = driver://user:pass@localhost/dbname + +# After: +sqlalchemy.url = sqlite:///./app.db +``` + +- [ ] **Step 8: Replace `alembic/env.py`** + +```python +from logging.config import fileConfig + +from sqlalchemy import engine_from_config, pool +from alembic import context + +from app.core.database import Base +import app.modules.auth.models # noqa: F401 — registers User with Base.metadata + +config = context.config +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +target_metadata = Base.metadata + + +def run_migrations_offline() -> None: + url = config.get_main_option("sqlalchemy.url") + context.configure(url=url, target_metadata=target_metadata, literal_binds=True) + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + with connectable.connect() as connection: + context.configure(connection=connection, target_metadata=target_metadata) + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() +``` + +- [ ] **Step 9: Generate migration** + +```bash +.venv/bin/alembic revision --autogenerate -m "create users table" +``` + +Expected: `Generating alembic/versions/_create_users_table.py`. Open the file and verify it contains `op.create_table("users", ...)` with all 13 columns. + +- [ ] **Step 10: Run migration** + +```bash +.venv/bin/alembic upgrade head +``` + +Expected: `Running upgrade -> , create users table` + +- [ ] **Step 11: Commit** + +```bash +git add app/modules/ alembic/ alembic.ini tests/test_user_model.py +git commit -m "feat: add User model and Alembic migration for users table" +``` + +--- + +### Task 4: Auth schemas and service (local auth) + +**Files:** +- Create: `app/modules/auth/schemas.py` +- Create: `app/modules/auth/service.py` +- Test: `tests/test_auth_service.py` + +- [ ] **Step 1: Create `app/modules/auth/schemas.py`** + +```python +from datetime import datetime + +from pydantic import BaseModel + + +class UserOut(BaseModel): + id: int + username: str + full_name: str + email: str | None + department: str | None + role: str | None + is_active: bool + is_admin: bool + last_login: datetime | None + created_at: datetime + + model_config = {"from_attributes": True} +``` + +- [ ] **Step 2: Write failing test** + +Create `tests/test_auth_service.py`: + +```python +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from app.core.database import Base +from app.modules.auth.models import User +from app.modules.auth.service import authenticate_user, get_user, hash_password, verify_password + + +@pytest.fixture +def db(): + engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) + Base.metadata.create_all(bind=engine) + Session = sessionmaker(bind=engine) + session = Session() + yield session + session.close() + Base.metadata.drop_all(bind=engine) + + +@pytest.fixture +def alice(db): + user = User(username="alice", full_name="Alice Smith", pw_hash=hash_password("secret123")) + db.add(user) + db.commit() + db.refresh(user) + return user + + +def test_hash_and_verify_password(): + hashed = hash_password("mypassword") + assert verify_password("mypassword", hashed) is True + assert verify_password("wrong", hashed) is False + + +def test_get_user_found(db, alice): + assert get_user(db, "alice") is not None + + +def test_get_user_not_found(db): + assert get_user(db, "nobody") is None + + +def test_authenticate_correct_password(db, alice): + user = authenticate_user(db, "alice", "secret123", ldap_enabled=False) + assert user is not None + assert user.last_login is not None + + +def test_authenticate_wrong_password(db, alice): + assert authenticate_user(db, "alice", "wrong", ldap_enabled=False) is None + + +def test_authenticate_unknown_user(db): + assert authenticate_user(db, "ghost", "any", ldap_enabled=False) is None + + +def test_authenticate_inactive_user(db): + user = User(username="inactive", full_name="X", pw_hash=hash_password("pw"), is_active=False) + db.add(user) + db.commit() + assert authenticate_user(db, "inactive", "pw", ldap_enabled=False) is None + + +def test_authenticate_no_pw_hash_no_ldap(db): + user = User(username="ldaponly", full_name="Y", pw_hash=None) + db.add(user) + db.commit() + assert authenticate_user(db, "ldaponly", "any", ldap_enabled=False) is None +``` + +- [ ] **Step 3: Run to confirm failure** + +```bash +.venv/bin/pytest tests/test_auth_service.py -v +``` + +Expected: FAIL — `ModuleNotFoundError: No module named 'app.modules.auth.service'` + +- [ ] **Step 4: Create `app/modules/auth/service.py`** + +```python +from datetime import datetime, timezone +from typing import Optional + +from passlib.context import CryptContext +from sqlalchemy.orm import Session + +from app.modules.auth.models import User + +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + + +def hash_password(plain: str) -> str: + return pwd_context.hash(plain) + + +def verify_password(plain: str, hashed: str) -> bool: + return pwd_context.verify(plain, hashed) + + +def get_user(db: Session, username: str) -> Optional[User]: + return db.query(User).filter(User.username == username).first() + + +def authenticate_user( + db: Session, username: str, password: str, ldap_enabled: bool +) -> Optional[User]: + user = get_user(db, username) + if user is None or not user.is_active: + return None + + local_ok = user.pw_hash is not None and verify_password(password, user.pw_hash) + if local_ok: + _touch_last_login(db, user) + return user + + if ldap_enabled: + # LDAP auth implemented in Part 2 + pass + + return None + + +def _touch_last_login(db: Session, user: User) -> None: + user.last_login = datetime.now(timezone.utc) + db.commit() +``` + +- [ ] **Step 5: Run to confirm pass** + +```bash +.venv/bin/pytest tests/test_auth_service.py -v +``` + +Expected: All 8 tests PASS. + +- [ ] **Step 6: Commit** + +```bash +git add app/modules/auth/schemas.py app/modules/auth/service.py tests/test_auth_service.py +git commit -m "feat: add auth service with local bcrypt password authentication" +``` + +--- + +### Task 5: JWT and cookie helpers + +**Files:** +- Create: `app/core/auth.py` +- Test: `tests/test_core_auth.py` + +- [ ] **Step 1: Write failing test** + +Create `tests/test_core_auth.py`: + +```python +from app.core.auth import COOKIE_NAME, create_access_token, decode_token + + +def test_create_and_decode_token(): + token = create_access_token(username="alice", is_admin=False) + payload = decode_token(token) + assert payload is not None + assert payload["sub"] == "alice" + assert payload["is_admin"] is False + + +def test_admin_claim(): + token = create_access_token(username="admin", is_admin=True) + assert decode_token(token)["is_admin"] is True + + +def test_decode_invalid_token(): + assert decode_token("not.a.valid.token") is None + + +def test_decode_tampered_token(): + token = create_access_token(username="alice", is_admin=False) + assert decode_token(token[:-4] + "xxxx") is None + + +def test_cookie_name(): + assert COOKIE_NAME == "access_token" +``` + +- [ ] **Step 2: Run to confirm failure** + +```bash +.venv/bin/pytest tests/test_core_auth.py -v +``` + +Expected: FAIL — `ModuleNotFoundError: No module named 'app.core.auth'` + +- [ ] **Step 3: Create `app/core/auth.py`** + +```python +from datetime import datetime, timedelta, timezone +from typing import Optional + +from fastapi import Request, Response +from jose import JWTError, jwt + +from app.core.config import get_settings + +settings = get_settings() + +COOKIE_NAME = "access_token" +ALGORITHM = "HS256" +TOKEN_EXPIRE_HOURS = 8 + + +def create_access_token(username: str, is_admin: bool) -> str: + expire = datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRE_HOURS) + return jwt.encode( + {"sub": username, "is_admin": is_admin, "exp": expire}, + settings.SECRET_KEY, + algorithm=ALGORITHM, + ) + + +def decode_token(token: str) -> Optional[dict]: + try: + return jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM]) + except JWTError: + return None + + +def get_token_from_request(request: Request) -> Optional[str]: + return request.cookies.get(COOKIE_NAME) + + +def set_auth_cookie(response: Response, token: str) -> None: + response.set_cookie( + key=COOKIE_NAME, + value=token, + httponly=True, + samesite="lax", + secure=settings.APP_ENV == "production", + ) + + +def clear_auth_cookie(response: Response) -> None: + response.delete_cookie(key=COOKIE_NAME, httponly=True, samesite="lax") +``` + +- [ ] **Step 4: Run to confirm pass** + +```bash +.venv/bin/pytest tests/test_core_auth.py -v +``` + +Expected: All 5 tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/core/auth.py tests/test_core_auth.py +git commit -m "feat: add JWT creation, decoding and cookie helpers" +``` + +--- + +### Task 6: FastAPI auth dependencies + +**Files:** +- Create: `app/modules/auth/dependencies.py` +- Test: `tests/test_auth_dependencies.py` + +- [ ] **Step 1: Write failing test** + +Create `tests/test_auth_dependencies.py`: + +```python +import pytest +from fastapi import Depends, FastAPI +from fastapi.responses import RedirectResponse +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from app.core.auth import create_access_token +from app.core.database import Base, get_db +from app.modules.auth.dependencies import RequiresLoginException, get_current_user, require_admin +from app.modules.auth.models import User +from app.modules.auth.service import hash_password + + +@pytest.fixture +def db(): + engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) + Base.metadata.create_all(bind=engine) + Session = sessionmaker(bind=engine) + session = Session() + yield session + session.close() + Base.metadata.drop_all(bind=engine) + + +@pytest.fixture +def test_app(db): + app = FastAPI() + + @app.exception_handler(RequiresLoginException) + async def handle_login_required(request, exc): + return RedirectResponse(url="/auth/login", status_code=307) + + @app.get("/protected") + async def protected(user=Depends(get_current_user)): + return {"username": user.username} + + @app.get("/admin-only") + async def admin_only(user=Depends(require_admin)): + return {"username": user.username} + + app.dependency_overrides[get_db] = lambda: db + return TestClient(test_app, follow_redirects=False) # noqa — replaced below + + +@pytest.fixture +def client(test_app): + return TestClient(test_app, follow_redirects=False) + + +def _token_cookie(username: str, is_admin: bool) -> dict: + token = create_access_token(username=username, is_admin=is_admin) + return {"access_token": token} + + +def test_no_cookie_redirects_to_login(client): + response = client.get("/protected") + assert response.status_code == 307 + assert "/auth/login" in response.headers["location"] + + +def test_valid_token_returns_user(client, db): + db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/protected", cookies=_token_cookie("bob", False)) + assert response.status_code == 200 + assert response.json()["username"] == "bob" + + +def test_invalid_token_redirects(client): + response = client.get("/protected", cookies={"access_token": "invalid"}) + assert response.status_code == 307 + + +def test_require_admin_blocks_non_admin(client, db): + db.add(User(username="regular", full_name="R", is_admin=False, pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/admin-only", cookies=_token_cookie("regular", False)) + assert response.status_code == 403 + + +def test_require_admin_allows_admin(client, db): + db.add(User(username="admin", full_name="A", is_admin=True, pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/admin-only", cookies=_token_cookie("admin", True)) + assert response.status_code == 200 +``` + +- [ ] **Step 2: Run to confirm failure** + +```bash +.venv/bin/pytest tests/test_auth_dependencies.py -v +``` + +Expected: FAIL — `ModuleNotFoundError: No module named 'app.modules.auth.dependencies'` + +- [ ] **Step 3: Create `app/modules/auth/dependencies.py`** + +```python +from fastapi import Depends, HTTPException +from sqlalchemy.orm import Session + +from app.core.auth import decode_token, get_token_from_request +from app.core.database import get_db +from app.modules.auth.models import User +from app.modules.auth.service import get_user + +from fastapi import Request + + +class RequiresLoginException(Exception): + pass + + +async def get_current_user( + request: Request, db: Session = Depends(get_db) +) -> User: + token = get_token_from_request(request) + if not token: + raise RequiresLoginException() + payload = decode_token(token) + if not payload: + raise RequiresLoginException() + user = get_user(db, payload.get("sub", "")) + if user is None or not user.is_active: + raise RequiresLoginException() + return user + + +async def require_admin(user: User = Depends(get_current_user)) -> User: + if not user.is_admin: + raise HTTPException(status_code=403, detail="Admin access required") + return user +``` + +- [ ] **Step 4: Fix the test fixture — there is a bug in the test above** + +In `tests/test_auth_dependencies.py`, the `test_app` fixture returns `TestClient` incorrectly. Replace the fixture with: + +```python +@pytest.fixture +def test_app(db): + app = FastAPI() + + @app.exception_handler(RequiresLoginException) + async def handle_login_required(request, exc): + return RedirectResponse(url="/auth/login", status_code=307) + + @app.get("/protected") + async def protected(user=Depends(get_current_user)): + return {"username": user.username} + + @app.get("/admin-only") + async def admin_only(user=Depends(require_admin)): + return {"username": user.username} + + app.dependency_overrides[get_db] = lambda: db + return app + + +@pytest.fixture +def client(test_app): + return TestClient(test_app, follow_redirects=False) +``` + +The full corrected `tests/test_auth_dependencies.py`: + +```python +import pytest +from fastapi import Depends, FastAPI +from fastapi.responses import RedirectResponse +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from app.core.auth import create_access_token +from app.core.database import Base, get_db +from app.modules.auth.dependencies import RequiresLoginException, get_current_user, require_admin +from app.modules.auth.models import User +from app.modules.auth.service import hash_password + + +@pytest.fixture +def db(): + engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) + Base.metadata.create_all(bind=engine) + Session = sessionmaker(bind=engine) + session = Session() + yield session + session.close() + Base.metadata.drop_all(bind=engine) + + +@pytest.fixture +def test_app(db): + app = FastAPI() + + @app.exception_handler(RequiresLoginException) + async def handle_login_required(request, exc): + return RedirectResponse(url="/auth/login", status_code=307) + + @app.get("/protected") + async def protected(user=Depends(get_current_user)): + return {"username": user.username} + + @app.get("/admin-only") + async def admin_only(user=Depends(require_admin)): + return {"username": user.username} + + app.dependency_overrides[get_db] = lambda: db + return app + + +@pytest.fixture +def client(test_app): + return TestClient(test_app, follow_redirects=False) + + +def _token_cookie(username: str, is_admin: bool) -> dict: + token = create_access_token(username=username, is_admin=is_admin) + return {"access_token": token} + + +def test_no_cookie_redirects_to_login(client): + response = client.get("/protected") + assert response.status_code == 307 + assert "/auth/login" in response.headers["location"] + + +def test_valid_token_returns_user(client, db): + db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/protected", cookies=_token_cookie("bob", False)) + assert response.status_code == 200 + assert response.json()["username"] == "bob" + + +def test_invalid_token_redirects(client): + response = client.get("/protected", cookies={"access_token": "invalid"}) + assert response.status_code == 307 + + +def test_require_admin_blocks_non_admin(client, db): + db.add(User(username="regular", full_name="R", is_admin=False, pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/admin-only", cookies=_token_cookie("regular", False)) + assert response.status_code == 403 + + +def test_require_admin_allows_admin(client, db): + db.add(User(username="admin", full_name="A", is_admin=True, pw_hash=hash_password("pw"))) + db.commit() + response = client.get("/admin-only", cookies=_token_cookie("admin", True)) + assert response.status_code == 200 +``` + +- [ ] **Step 5: Run to confirm pass** + +```bash +.venv/bin/pytest tests/test_auth_dependencies.py -v +``` + +Expected: All 5 tests PASS. + +- [ ] **Step 6: Commit** + +```bash +git add app/modules/auth/dependencies.py tests/test_auth_dependencies.py +git commit -m "feat: add get_current_user and require_admin dependencies" +``` + +--- + +### Task 7: Auth router and login template + +**Files:** +- Create: `app/modules/auth/router.py` +- Create: `app/templates/auth/login.html` +- Test: `tests/test_auth_router.py` + +- [ ] **Step 1: Write failing test** + +Create `tests/test_auth_router.py`: + +```python +import pytest +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from app.core.database import Base, get_db +from app.main import app +from app.modules.auth.models import User +from app.modules.auth.service import hash_password + + +@pytest.fixture(autouse=True) +def override_db(): + engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) + Base.metadata.create_all(bind=engine) + Session = sessionmaker(bind=engine) + session = Session() + app.dependency_overrides[get_db] = lambda: session + yield session + app.dependency_overrides.clear() + session.close() + Base.metadata.drop_all(bind=engine) + + +@pytest.fixture +def client(): + return TestClient(app, follow_redirects=False) + + +@pytest.fixture +def alice(override_db): + user = User(username="alice", full_name="Alice Smith", pw_hash=hash_password("secret123")) + override_db.add(user) + override_db.commit() + return user + + +def test_get_login_page(client): + response = client.get("/auth/login") + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + assert "Anmelden" in response.text + + +def test_login_correct_credentials_redirects(client, alice): + response = client.post("/auth/login", data={"username": "alice", "password": "secret123"}) + assert response.status_code in (302, 303, 307) + assert "access_token" in response.cookies + + +def test_login_wrong_password_shows_error(client, alice): + response = client.post("/auth/login", data={"username": "alice", "password": "wrong"}) + assert response.status_code == 200 + assert "Ungültige" in response.text + + +def test_login_unknown_user_shows_error(client): + response = client.post("/auth/login", data={"username": "ghost", "password": "any"}) + assert response.status_code == 200 + assert "Ungültige" in response.text + + +def test_logout_clears_cookie(client, alice): + client.post("/auth/login", data={"username": "alice", "password": "secret123"}) + response = client.get("/auth/logout") + assert response.status_code in (302, 303, 307) + assert response.cookies.get("access_token", "") == "" +``` + +- [ ] **Step 2: Run to confirm failure** + +```bash +.venv/bin/pytest tests/test_auth_router.py -v +``` + +Expected: FAIL — router not yet registered in `app/main.py` + +- [ ] **Step 3: Create `app/templates/auth/login.html`** + +```bash +mkdir -p app/templates/auth +``` + +```html +{% extends "base.html" %} + +{% block title %}Anmelden · EFI Hub{% endblock %} + +{% block content %} +
+
+ +
+

+ Fakultät EFI · TH Nürnberg +

+

Anmelden

+

Bitte melde dich mit deinen Zugangsdaten an.

+
+ + {% if error %} +
+ {{ error }} +
+ {% endif %} + +
+
+ + +
+
+ + +
+ +
+ +
+
+{% endblock %} +``` + +- [ ] **Step 4: Create `app/modules/auth/router.py`** + +```python +from fastapi import APIRouter, Depends, Form, Request +from fastapi.responses import HTMLResponse, RedirectResponse +from fastapi.templating import Jinja2Templates +from sqlalchemy.orm import Session + +from app.core.auth import clear_auth_cookie, create_access_token, set_auth_cookie +from app.core.config import get_settings +from app.core.database import get_db +from app.modules.auth.dependencies import get_current_user +from app.modules.auth.schemas import UserOut +from app.modules.auth.service import authenticate_user + +router = APIRouter(prefix="/auth", tags=["auth"]) +templates = Jinja2Templates(directory="app/templates") +settings = get_settings() + +_NAV: list[dict] = [] + + +@router.get("/login", response_class=HTMLResponse) +async def login_page(request: Request): + return templates.TemplateResponse( + request, "auth/login.html", {"nav_items": _NAV, "app_version": "0.1.0"} + ) + + +@router.post("/login", response_class=HTMLResponse) +async def login( + request: Request, + username: str = Form(...), + password: str = Form(...), + db: Session = Depends(get_db), +): + user = authenticate_user(db, username, password, ldap_enabled=settings.LDAP_ENABLED) + if user is None: + return templates.TemplateResponse( + request, + "auth/login.html", + {"nav_items": _NAV, "app_version": "0.1.0", "error": "Ungültige Zugangsdaten."}, + status_code=401, + ) + token = create_access_token(username=user.username, is_admin=user.is_admin) + response = RedirectResponse(url="/", status_code=303) + set_auth_cookie(response, token) + return response + + +@router.get("/logout") +async def logout(): + response = RedirectResponse(url="/auth/login", status_code=303) + clear_auth_cookie(response) + return response + + +@router.get("/me", response_model=UserOut) +async def me(user=Depends(get_current_user)): + return user +``` + +- [ ] **Step 5: Run to confirm failure (router not wired)** + +```bash +.venv/bin/pytest tests/test_auth_router.py -v +``` + +Expected: FAIL — routes return 404 (router not included in main app yet). + +- [ ] **Step 6: Commit templates and router** + +```bash +git add app/modules/auth/router.py app/templates/auth/login.html tests/test_auth_router.py +git commit -m "feat: add auth router with login/logout/me and login template" +``` + +--- + +### Task 8: Wire auth into main.py and create admin script + +**Files:** +- Modify: `app/main.py` +- Create: `scripts/create_admin.py` + +- [ ] **Step 1: Replace `app/main.py`** + +```python +from fastapi import Depends, FastAPI, Request, WebSocket +from fastapi.responses import HTMLResponse, RedirectResponse +from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates + +from app.core.auth import get_token_from_request +from app.core.config import get_settings +from app.core.database import get_db +from app.modules.auth.dependencies import RequiresLoginException, get_current_user +from app.modules.auth.router import router as auth_router + +settings = get_settings() + +app = FastAPI( + title="University Process Hub", + root_path=settings.APP_PREFIX, +) + +app.mount("/static", StaticFiles(directory="app/static"), name="static") +templates = Jinja2Templates(directory="app/templates") + +app.include_router(auth_router) + + +@app.exception_handler(RequiresLoginException) +async def requires_login_handler(request: Request, exc: RequiresLoginException): + return RedirectResponse(url="/auth/login", status_code=307) + + +MODULES = [ + { + "icon": "📡", + "name": "RSS-Feed Server", + "description": "Aggregiert und verteilt Neuigkeiten der Fakultät als standardkonformen RSS 2.0 Feed.", + "status": "active", + }, + { + "icon": "📅", + "name": "Kalender", + "description": "Veranstaltungen, Prüfungstermine und Fristen im iCal-Format.", + "status": "planned", + }, + { + "icon": "🔔", + "name": "Benachrichtigungen", + "description": "Push-Nachrichten und WebSocket-basierte Echtzeit-Alerts für Studierende.", + "status": "planned", + }, + { + "icon": "📊", + "name": "Auslastung", + "description": "Raum- und Ressourcenauslastung der Fakultät in Echtzeit.", + "status": "planned", + }, + { + "icon": "📚", + "name": "Lehrveranstaltungen", + "description": "Stundenplan-API und Kursinformationen aus dem Campus-System.", + "status": "planned", + }, +] + +NAV_ITEMS = [ + {"label": "Übersicht", "url": "/", "active": True}, + {"label": "RSS-Feeds", "url": "/rss", "active": False}, + {"label": "Kalender", "url": "/kalender", "active": False}, + {"label": "Docs", "url": "/docs", "active": False}, +] + + +def _db_mode() -> str: + url = settings.DATABASE_URL + return "SQLite (dev)" if url.startswith("sqlite") else "MariaDB (prod)" + + +@app.get("/", response_class=HTMLResponse) +async def root(request: Request, current_user=Depends(get_current_user)): + return templates.TemplateResponse( + request, + "index.html", + { + "nav_items": NAV_ITEMS, + "modules": MODULES, + "db_mode": _db_mode(), + "app_version": "0.1.0", + "current_user": current_user, + }, + ) + + +@app.websocket("/ws/hello/{name}") +async def websocket_hello(websocket: WebSocket, name: str): + await websocket.accept() + await websocket.send_text(f"Hello, {name}!") + await websocket.close() +``` + +- [ ] **Step 2: Update `tests/test_landing.py`** + +`GET /` erfordert jetzt Auth — die alten Tests ohne Cookie schlagen fehl. Ersetze `tests/test_landing.py` vollständig: + +```python +import pytest +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from app.core.auth import create_access_token +from app.core.database import Base, get_db +from app.main import app +from app.modules.auth.models import User +from app.modules.auth.service import hash_password + + +@pytest.fixture(autouse=True) +def override_db(): + engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) + Base.metadata.create_all(bind=engine) + Session = sessionmaker(bind=engine) + session = Session() + app.dependency_overrides[get_db] = lambda: session + yield session + app.dependency_overrides.clear() + session.close() + Base.metadata.drop_all(bind=engine) + + +@pytest.fixture +def client(): + return TestClient(app, follow_redirects=False) + + +@pytest.fixture +def auth_cookies(override_db): + user = User(username="testuser", full_name="Test User", pw_hash=hash_password("pw")) + override_db.add(user) + override_db.commit() + token = create_access_token(username="testuser", is_admin=False) + return {"access_token": token} + + +def test_landing_without_auth_redirects(client): + response = client.get("/") + assert response.status_code == 307 + assert "/auth/login" in response.headers["location"] + + +def test_landing_returns_html(client, auth_cookies): + response = client.get("/", cookies=auth_cookies) + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + + +def test_landing_contains_title(client, auth_cookies): + response = client.get("/", cookies=auth_cookies) + assert "University Process Hub" in response.text + + +def test_landing_contains_rss_module(client, auth_cookies): + response = client.get("/", cookies=auth_cookies) + assert "RSS-Feed Server" in response.text + + +def test_landing_navbar_links_present(client, auth_cookies): + response = client.get("/", cookies=auth_cookies) + assert "Übersicht" in response.text + +def test_landing_info_strip_shows_db_mode(client, auth_cookies): + response = client.get("/", cookies=auth_cookies) + assert "SQLite" in response.text or "MariaDB" in response.text +``` + +- [ ] **Step 3: Run all tests** + +```bash +.venv/bin/pytest -v +``` + +Expected: All tests PASS, including `test_auth_router.py` which now has the router registered. + +- [ ] **Step 4: Create admin user script** + +```bash +mkdir -p scripts +``` + +Create `scripts/create_admin.py`: + +```python +""" +Create or reset the admin user. + +Usage: + .venv/bin/python scripts/create_admin.py + +Example: + .venv/bin/python scripts/create_admin.py admin geheim123 +""" +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from app.core.database import SessionLocal, engine, Base +from app.modules.auth.models import User # noqa: F401 — registers table +from app.modules.auth.service import get_user, hash_password + + +def create_admin(username: str, password: str) -> None: + Base.metadata.create_all(bind=engine) + with SessionLocal() as db: + user = get_user(db, username) + if user is None: + user = User(username=username, full_name="Administrator") + db.add(user) + user.pw_hash = hash_password(password) + user.is_admin = True + user.is_active = True + db.commit() + print(f"Admin user '{username}' created/updated.") + + +if __name__ == "__main__": + if len(sys.argv) != 3: + print(__doc__) + sys.exit(1) + create_admin(sys.argv[1], sys.argv[2]) +``` + +- [ ] **Step 5: Create initial admin user** + +```bash +.venv/bin/python scripts/create_admin.py admin admin +``` + +Expected: `Admin user 'admin' created/updated.` + +- [ ] **Step 6: Start server and verify login** + +```bash +.venv/bin/uvicorn app.main:app --reload +``` + +Open `http://localhost:8000` — should redirect to `http://localhost:8000/auth/login`. +Log in with `admin` / `admin`. Should redirect to landing page. + +- [ ] **Step 7: Run full test suite one last time** + +```bash +.venv/bin/pytest -v +``` + +Expected: All tests PASS. + +- [ ] **Step 8: Commit** + +```bash +git add app/main.py scripts/ +git commit -m "feat: wire auth into main app, protect landing page, add create_admin script" +``` \ No newline at end of file