# Auth Part 1: Core Auth Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Implement local user authentication with bcrypt passwords, JWT in httpOnly cookies, SQLAlchemy User model, Alembic migrations, and FastAPI route protection. **Architecture:** Single `users` table via SQLAlchemy 2.0 + Alembic. `app/core/database.py` provides the engine and `get_db()` dependency. `app/core/auth.py` handles JWT only (no business logic). Business logic lives in `app/modules/auth/service.py`. Route protection via `get_current_user` dependency that raises `RequiresLoginException` → caught by exception handler in `main.py` → redirect to `/auth/login`. LDAP hook is a `pass` stub in `service.py` until Part 2. **Tech Stack:** SQLAlchemy 2.0, Alembic, python-jose[cryptography], passlib[bcrypt], python-multipart, pytest --- ### Task 1: Dependencies and config **Files:** - Modify: `requirements.txt` - Modify: `app/core/config.py` - Modify: `.env.example` - Test: `tests/test_config.py` - [ ] **Step 1: Update `requirements.txt`** ``` fastapi uvicorn[standard] pydantic-settings sqlalchemy jinja2 pytest httpx alembic python-jose[cryptography] passlib[bcrypt] python-multipart ldap3 ``` - [ ] **Step 2: Install** ```bash .venv/bin/pip install -r requirements.txt -q ``` Expected: no errors. `passlib`, `jose`, `alembic` appear in output. - [ ] **Step 3: Write failing test** Create `tests/test_config.py`: ```python from app.core.config import get_settings def test_new_config_fields_have_defaults(): s = get_settings() assert hasattr(s, "SECRET_KEY") assert hasattr(s, "LDAP_ENABLED") assert s.LDAP_ENABLED is False assert s.LDAP_SERVER == "gso1.ads1.fh-nuernberg.de" assert s.LDAP_DOMAIN == "ADS1" ``` - [ ] **Step 4: Run test to confirm failure** ```bash .venv/bin/pytest tests/test_config.py -v ``` Expected: FAIL — `AttributeError: 'Settings' object has no attribute 'SECRET_KEY'` - [ ] **Step 5: Replace `app/core/config.py`** ```python from functools import lru_cache from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8") APP_ENV: str = "development" APP_PREFIX: str = "" DATABASE_URL: str = "sqlite:///./app.db" SECRET_KEY: str = "changeme-replace-in-production" LDAP_ENABLED: bool = False LDAP_SERVER: str = "gso1.ads1.fh-nuernberg.de" LDAP_DOMAIN: str = "ADS1" LDAP_SEARCH_BASE: str = "OU=users,OU=EFI,OU=Faculties,DC=ADS1,DC=fh-nuernberg,DC=de" @lru_cache def get_settings() -> Settings: return Settings() ``` - [ ] **Step 6: Update `.env.example`** ``` APP_ENV=development APP_PREFIX= DATABASE_URL=sqlite:///./app.db SECRET_KEY=changeme-replace-in-production # Produktion (MariaDB + LDAP): # APP_ENV=production # APP_PREFIX=/uph # DATABASE_URL=mysql+pymysql://user:password@db:3306/efihub # SECRET_KEY= # LDAP_ENABLED=true ``` - [ ] **Step 7: Run test to confirm pass** ```bash .venv/bin/pytest tests/test_config.py -v ``` Expected: PASS. - [ ] **Step 8: Commit** ```bash git add requirements.txt app/core/config.py .env.example tests/test_config.py git commit -m "feat: add auth config fields (SECRET_KEY, LDAP_*)" ``` --- ### Task 2: Database layer **Files:** - Create: `app/core/database.py` - Test: `tests/test_database.py` - [ ] **Step 1: Write failing test** Create `tests/test_database.py`: ```python from sqlalchemy.orm import Session from app.core.database import Base, get_db def test_get_db_yields_session(): gen = get_db() db = next(gen) assert isinstance(db, Session) try: next(gen) except StopIteration: pass def test_base_has_metadata(): assert Base.metadata is not None ``` - [ ] **Step 2: Run to confirm failure** ```bash .venv/bin/pytest tests/test_database.py -v ``` Expected: FAIL — `ModuleNotFoundError: No module named 'app.core.database'` - [ ] **Step 3: Create `app/core/database.py`** ```python from sqlalchemy import create_engine from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker from app.core.config import get_settings settings = get_settings() _connect_args = {"check_same_thread": False} if settings.DATABASE_URL.startswith("sqlite") else {} engine = create_engine(settings.DATABASE_URL, connect_args=_connect_args) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) class Base(DeclarativeBase): pass def get_db() -> Session: db = SessionLocal() try: yield db finally: db.close() ``` - [ ] **Step 4: Run to confirm pass** ```bash .venv/bin/pytest tests/test_database.py -v ``` Expected: PASS. - [ ] **Step 5: Commit** ```bash git add app/core/database.py tests/test_database.py git commit -m "feat: add SQLAlchemy database layer with get_db dependency" ``` --- ### Task 3: User model and Alembic migrations **Files:** - Create: `app/modules/__init__.py` - Create: `app/modules/auth/__init__.py` - Create: `app/modules/auth/models.py` - Create: `alembic.ini`, `alembic/env.py`, `alembic/versions/` - Test: `tests/test_user_model.py` - [ ] **Step 1: Create module packages** ```bash mkdir -p app/modules/auth touch app/modules/__init__.py app/modules/auth/__init__.py ``` - [ ] **Step 2: Write failing test** Create `tests/test_user_model.py`: ```python import pytest from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from app.core.database import Base from app.modules.auth.models import User @pytest.fixture def db(): engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) Base.metadata.create_all(bind=engine) Session = sessionmaker(bind=engine) session = Session() yield session session.close() Base.metadata.drop_all(bind=engine) def test_create_user(db): user = User(username="testuser", full_name="Test User", pw_hash="hashed") db.add(user) db.commit() db.refresh(user) assert user.id is not None assert user.username == "testuser" assert user.is_active is True assert user.is_admin is False assert user.created_at is not None def test_user_defaults(db): user = User(username="alice", full_name="Alice") db.add(user) db.commit() db.refresh(user) assert user.pw_hash is None assert user.email is None assert user.department is None assert user.last_login is None def test_username_must_be_unique(db): db.add(User(username="dup", full_name="First")) db.commit() db.add(User(username="dup", full_name="Second")) with pytest.raises(Exception): db.commit() ``` - [ ] **Step 3: Run to confirm failure** ```bash .venv/bin/pytest tests/test_user_model.py -v ``` Expected: FAIL — `ModuleNotFoundError: No module named 'app.modules.auth.models'` - [ ] **Step 4: Create `app/modules/auth/models.py`** ```python from datetime import datetime from sqlalchemy import Boolean, DateTime, Integer, String from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.sql import func from app.core.database import Base class User(Base): __tablename__ = "users" id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True) username: Mapped[str] = mapped_column(String(64), unique=True, nullable=False, index=True) full_name: Mapped[str] = mapped_column(String(128), nullable=False, default="") email: Mapped[str | None] = mapped_column(String(128), nullable=True) department: Mapped[str | None] = mapped_column(String(64), nullable=True) role: Mapped[str | None] = mapped_column(String(64), nullable=True) pw_hash: Mapped[str | None] = mapped_column(String(256), nullable=True) is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False) is_admin: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) account_expires: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) last_login: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) created_at: Mapped[datetime] = mapped_column( DateTime, server_default=func.now(), nullable=False ) updated_at: Mapped[datetime] = mapped_column( DateTime, server_default=func.now(), onupdate=func.now(), nullable=False ) ``` - [ ] **Step 5: Run to confirm pass** ```bash .venv/bin/pytest tests/test_user_model.py -v ``` Expected: 3 tests PASS. - [ ] **Step 6: Initialize Alembic** ```bash .venv/bin/alembic init alembic ``` Creates `alembic.ini` and `alembic/` directory. - [ ] **Step 7: Configure `alembic.ini`** Find and replace the database URL line: ```ini # Before: sqlalchemy.url = driver://user:pass@localhost/dbname # After: sqlalchemy.url = sqlite:///./app.db ``` - [ ] **Step 8: Replace `alembic/env.py`** ```python from logging.config import fileConfig from sqlalchemy import engine_from_config, pool from alembic import context from app.core.database import Base import app.modules.auth.models # noqa: F401 — registers User with Base.metadata config = context.config if config.config_file_name is not None: fileConfig(config.config_file_name) target_metadata = Base.metadata def run_migrations_offline() -> None: url = config.get_main_option("sqlalchemy.url") context.configure(url=url, target_metadata=target_metadata, literal_binds=True) with context.begin_transaction(): context.run_migrations() def run_migrations_online() -> None: connectable = engine_from_config( config.get_section(config.config_ini_section, {}), prefix="sqlalchemy.", poolclass=pool.NullPool, ) with connectable.connect() as connection: context.configure(connection=connection, target_metadata=target_metadata) with context.begin_transaction(): context.run_migrations() if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online() ``` - [ ] **Step 9: Generate migration** ```bash .venv/bin/alembic revision --autogenerate -m "create users table" ``` Expected: `Generating alembic/versions/_create_users_table.py`. Open the file and verify it contains `op.create_table("users", ...)` with all 13 columns. - [ ] **Step 10: Run migration** ```bash .venv/bin/alembic upgrade head ``` Expected: `Running upgrade -> , create users table` - [ ] **Step 11: Commit** ```bash git add app/modules/ alembic/ alembic.ini tests/test_user_model.py git commit -m "feat: add User model and Alembic migration for users table" ``` --- ### Task 4: Auth schemas and service (local auth) **Files:** - Create: `app/modules/auth/schemas.py` - Create: `app/modules/auth/service.py` - Test: `tests/test_auth_service.py` - [ ] **Step 1: Create `app/modules/auth/schemas.py`** ```python from datetime import datetime from pydantic import BaseModel class UserOut(BaseModel): id: int username: str full_name: str email: str | None department: str | None role: str | None is_active: bool is_admin: bool last_login: datetime | None created_at: datetime model_config = {"from_attributes": True} ``` - [ ] **Step 2: Write failing test** Create `tests/test_auth_service.py`: ```python import pytest from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from app.core.database import Base from app.modules.auth.models import User from app.modules.auth.service import authenticate_user, get_user, hash_password, verify_password @pytest.fixture def db(): engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) Base.metadata.create_all(bind=engine) Session = sessionmaker(bind=engine) session = Session() yield session session.close() Base.metadata.drop_all(bind=engine) @pytest.fixture def alice(db): user = User(username="alice", full_name="Alice Smith", pw_hash=hash_password("secret123")) db.add(user) db.commit() db.refresh(user) return user def test_hash_and_verify_password(): hashed = hash_password("mypassword") assert verify_password("mypassword", hashed) is True assert verify_password("wrong", hashed) is False def test_get_user_found(db, alice): assert get_user(db, "alice") is not None def test_get_user_not_found(db): assert get_user(db, "nobody") is None def test_authenticate_correct_password(db, alice): user = authenticate_user(db, "alice", "secret123", ldap_enabled=False) assert user is not None assert user.last_login is not None def test_authenticate_wrong_password(db, alice): assert authenticate_user(db, "alice", "wrong", ldap_enabled=False) is None def test_authenticate_unknown_user(db): assert authenticate_user(db, "ghost", "any", ldap_enabled=False) is None def test_authenticate_inactive_user(db): user = User(username="inactive", full_name="X", pw_hash=hash_password("pw"), is_active=False) db.add(user) db.commit() assert authenticate_user(db, "inactive", "pw", ldap_enabled=False) is None def test_authenticate_no_pw_hash_no_ldap(db): user = User(username="ldaponly", full_name="Y", pw_hash=None) db.add(user) db.commit() assert authenticate_user(db, "ldaponly", "any", ldap_enabled=False) is None ``` - [ ] **Step 3: Run to confirm failure** ```bash .venv/bin/pytest tests/test_auth_service.py -v ``` Expected: FAIL — `ModuleNotFoundError: No module named 'app.modules.auth.service'` - [ ] **Step 4: Create `app/modules/auth/service.py`** ```python from datetime import datetime, timezone from typing import Optional from passlib.context import CryptContext from sqlalchemy.orm import Session from app.modules.auth.models import User pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(plain: str) -> str: return pwd_context.hash(plain) def verify_password(plain: str, hashed: str) -> bool: return pwd_context.verify(plain, hashed) def get_user(db: Session, username: str) -> Optional[User]: return db.query(User).filter(User.username == username).first() def authenticate_user( db: Session, username: str, password: str, ldap_enabled: bool ) -> Optional[User]: user = get_user(db, username) if user is None or not user.is_active: return None local_ok = user.pw_hash is not None and verify_password(password, user.pw_hash) if local_ok: _touch_last_login(db, user) return user if ldap_enabled: # LDAP auth implemented in Part 2 pass return None def _touch_last_login(db: Session, user: User) -> None: user.last_login = datetime.now(timezone.utc) db.commit() ``` - [ ] **Step 5: Run to confirm pass** ```bash .venv/bin/pytest tests/test_auth_service.py -v ``` Expected: All 8 tests PASS. - [ ] **Step 6: Commit** ```bash git add app/modules/auth/schemas.py app/modules/auth/service.py tests/test_auth_service.py git commit -m "feat: add auth service with local bcrypt password authentication" ``` --- ### Task 5: JWT and cookie helpers **Files:** - Create: `app/core/auth.py` - Test: `tests/test_core_auth.py` - [ ] **Step 1: Write failing test** Create `tests/test_core_auth.py`: ```python from app.core.auth import COOKIE_NAME, create_access_token, decode_token def test_create_and_decode_token(): token = create_access_token(username="alice", is_admin=False) payload = decode_token(token) assert payload is not None assert payload["sub"] == "alice" assert payload["is_admin"] is False def test_admin_claim(): token = create_access_token(username="admin", is_admin=True) assert decode_token(token)["is_admin"] is True def test_decode_invalid_token(): assert decode_token("not.a.valid.token") is None def test_decode_tampered_token(): token = create_access_token(username="alice", is_admin=False) assert decode_token(token[:-4] + "xxxx") is None def test_cookie_name(): assert COOKIE_NAME == "access_token" ``` - [ ] **Step 2: Run to confirm failure** ```bash .venv/bin/pytest tests/test_core_auth.py -v ``` Expected: FAIL — `ModuleNotFoundError: No module named 'app.core.auth'` - [ ] **Step 3: Create `app/core/auth.py`** ```python from datetime import datetime, timedelta, timezone from typing import Optional from fastapi import Request, Response from jose import JWTError, jwt from app.core.config import get_settings settings = get_settings() COOKIE_NAME = "access_token" ALGORITHM = "HS256" TOKEN_EXPIRE_HOURS = 8 def create_access_token(username: str, is_admin: bool) -> str: expire = datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRE_HOURS) return jwt.encode( {"sub": username, "is_admin": is_admin, "exp": expire}, settings.SECRET_KEY, algorithm=ALGORITHM, ) def decode_token(token: str) -> Optional[dict]: try: return jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM]) except JWTError: return None def get_token_from_request(request: Request) -> Optional[str]: return request.cookies.get(COOKIE_NAME) def set_auth_cookie(response: Response, token: str) -> None: response.set_cookie( key=COOKIE_NAME, value=token, httponly=True, samesite="lax", secure=settings.APP_ENV == "production", ) def clear_auth_cookie(response: Response) -> None: response.delete_cookie(key=COOKIE_NAME, httponly=True, samesite="lax") ``` - [ ] **Step 4: Run to confirm pass** ```bash .venv/bin/pytest tests/test_core_auth.py -v ``` Expected: All 5 tests PASS. - [ ] **Step 5: Commit** ```bash git add app/core/auth.py tests/test_core_auth.py git commit -m "feat: add JWT creation, decoding and cookie helpers" ``` --- ### Task 6: FastAPI auth dependencies **Files:** - Create: `app/modules/auth/dependencies.py` - Test: `tests/test_auth_dependencies.py` - [ ] **Step 1: Write failing test** Create `tests/test_auth_dependencies.py`: ```python import pytest from fastapi import Depends, FastAPI from fastapi.responses import RedirectResponse from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from app.core.auth import create_access_token from app.core.database import Base, get_db from app.modules.auth.dependencies import RequiresLoginException, get_current_user, require_admin from app.modules.auth.models import User from app.modules.auth.service import hash_password @pytest.fixture def db(): engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) Base.metadata.create_all(bind=engine) Session = sessionmaker(bind=engine) session = Session() yield session session.close() Base.metadata.drop_all(bind=engine) @pytest.fixture def test_app(db): app = FastAPI() @app.exception_handler(RequiresLoginException) async def handle_login_required(request, exc): return RedirectResponse(url="/auth/login", status_code=307) @app.get("/protected") async def protected(user=Depends(get_current_user)): return {"username": user.username} @app.get("/admin-only") async def admin_only(user=Depends(require_admin)): return {"username": user.username} app.dependency_overrides[get_db] = lambda: db return TestClient(test_app, follow_redirects=False) # noqa — replaced below @pytest.fixture def client(test_app): return TestClient(test_app, follow_redirects=False) def _token_cookie(username: str, is_admin: bool) -> dict: token = create_access_token(username=username, is_admin=is_admin) return {"access_token": token} def test_no_cookie_redirects_to_login(client): response = client.get("/protected") assert response.status_code == 307 assert "/auth/login" in response.headers["location"] def test_valid_token_returns_user(client, db): db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw"))) db.commit() response = client.get("/protected", cookies=_token_cookie("bob", False)) assert response.status_code == 200 assert response.json()["username"] == "bob" def test_invalid_token_redirects(client): response = client.get("/protected", cookies={"access_token": "invalid"}) assert response.status_code == 307 def test_require_admin_blocks_non_admin(client, db): db.add(User(username="regular", full_name="R", is_admin=False, pw_hash=hash_password("pw"))) db.commit() response = client.get("/admin-only", cookies=_token_cookie("regular", False)) assert response.status_code == 403 def test_require_admin_allows_admin(client, db): db.add(User(username="admin", full_name="A", is_admin=True, pw_hash=hash_password("pw"))) db.commit() response = client.get("/admin-only", cookies=_token_cookie("admin", True)) assert response.status_code == 200 ``` - [ ] **Step 2: Run to confirm failure** ```bash .venv/bin/pytest tests/test_auth_dependencies.py -v ``` Expected: FAIL — `ModuleNotFoundError: No module named 'app.modules.auth.dependencies'` - [ ] **Step 3: Create `app/modules/auth/dependencies.py`** ```python from fastapi import Depends, HTTPException from sqlalchemy.orm import Session from app.core.auth import decode_token, get_token_from_request from app.core.database import get_db from app.modules.auth.models import User from app.modules.auth.service import get_user from fastapi import Request class RequiresLoginException(Exception): pass async def get_current_user( request: Request, db: Session = Depends(get_db) ) -> User: token = get_token_from_request(request) if not token: raise RequiresLoginException() payload = decode_token(token) if not payload: raise RequiresLoginException() user = get_user(db, payload.get("sub", "")) if user is None or not user.is_active: raise RequiresLoginException() return user async def require_admin(user: User = Depends(get_current_user)) -> User: if not user.is_admin: raise HTTPException(status_code=403, detail="Admin access required") return user ``` - [ ] **Step 4: Fix the test fixture — there is a bug in the test above** In `tests/test_auth_dependencies.py`, the `test_app` fixture returns `TestClient` incorrectly. Replace the fixture with: ```python @pytest.fixture def test_app(db): app = FastAPI() @app.exception_handler(RequiresLoginException) async def handle_login_required(request, exc): return RedirectResponse(url="/auth/login", status_code=307) @app.get("/protected") async def protected(user=Depends(get_current_user)): return {"username": user.username} @app.get("/admin-only") async def admin_only(user=Depends(require_admin)): return {"username": user.username} app.dependency_overrides[get_db] = lambda: db return app @pytest.fixture def client(test_app): return TestClient(test_app, follow_redirects=False) ``` The full corrected `tests/test_auth_dependencies.py`: ```python import pytest from fastapi import Depends, FastAPI from fastapi.responses import RedirectResponse from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from app.core.auth import create_access_token from app.core.database import Base, get_db from app.modules.auth.dependencies import RequiresLoginException, get_current_user, require_admin from app.modules.auth.models import User from app.modules.auth.service import hash_password @pytest.fixture def db(): engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) Base.metadata.create_all(bind=engine) Session = sessionmaker(bind=engine) session = Session() yield session session.close() Base.metadata.drop_all(bind=engine) @pytest.fixture def test_app(db): app = FastAPI() @app.exception_handler(RequiresLoginException) async def handle_login_required(request, exc): return RedirectResponse(url="/auth/login", status_code=307) @app.get("/protected") async def protected(user=Depends(get_current_user)): return {"username": user.username} @app.get("/admin-only") async def admin_only(user=Depends(require_admin)): return {"username": user.username} app.dependency_overrides[get_db] = lambda: db return app @pytest.fixture def client(test_app): return TestClient(test_app, follow_redirects=False) def _token_cookie(username: str, is_admin: bool) -> dict: token = create_access_token(username=username, is_admin=is_admin) return {"access_token": token} def test_no_cookie_redirects_to_login(client): response = client.get("/protected") assert response.status_code == 307 assert "/auth/login" in response.headers["location"] def test_valid_token_returns_user(client, db): db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw"))) db.commit() response = client.get("/protected", cookies=_token_cookie("bob", False)) assert response.status_code == 200 assert response.json()["username"] == "bob" def test_invalid_token_redirects(client): response = client.get("/protected", cookies={"access_token": "invalid"}) assert response.status_code == 307 def test_require_admin_blocks_non_admin(client, db): db.add(User(username="regular", full_name="R", is_admin=False, pw_hash=hash_password("pw"))) db.commit() response = client.get("/admin-only", cookies=_token_cookie("regular", False)) assert response.status_code == 403 def test_require_admin_allows_admin(client, db): db.add(User(username="admin", full_name="A", is_admin=True, pw_hash=hash_password("pw"))) db.commit() response = client.get("/admin-only", cookies=_token_cookie("admin", True)) assert response.status_code == 200 ``` - [ ] **Step 5: Run to confirm pass** ```bash .venv/bin/pytest tests/test_auth_dependencies.py -v ``` Expected: All 5 tests PASS. - [ ] **Step 6: Commit** ```bash git add app/modules/auth/dependencies.py tests/test_auth_dependencies.py git commit -m "feat: add get_current_user and require_admin dependencies" ``` --- ### Task 7: Auth router and login template **Files:** - Create: `app/modules/auth/router.py` - Create: `app/templates/auth/login.html` - Test: `tests/test_auth_router.py` - [ ] **Step 1: Write failing test** Create `tests/test_auth_router.py`: ```python import pytest from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from app.core.database import Base, get_db from app.main import app from app.modules.auth.models import User from app.modules.auth.service import hash_password @pytest.fixture(autouse=True) def override_db(): engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) Base.metadata.create_all(bind=engine) Session = sessionmaker(bind=engine) session = Session() app.dependency_overrides[get_db] = lambda: session yield session app.dependency_overrides.clear() session.close() Base.metadata.drop_all(bind=engine) @pytest.fixture def client(): return TestClient(app, follow_redirects=False) @pytest.fixture def alice(override_db): user = User(username="alice", full_name="Alice Smith", pw_hash=hash_password("secret123")) override_db.add(user) override_db.commit() return user def test_get_login_page(client): response = client.get("/auth/login") assert response.status_code == 200 assert "text/html" in response.headers["content-type"] assert "Anmelden" in response.text def test_login_correct_credentials_redirects(client, alice): response = client.post("/auth/login", data={"username": "alice", "password": "secret123"}) assert response.status_code in (302, 303, 307) assert "access_token" in response.cookies def test_login_wrong_password_shows_error(client, alice): response = client.post("/auth/login", data={"username": "alice", "password": "wrong"}) assert response.status_code == 200 assert "Ungültige" in response.text def test_login_unknown_user_shows_error(client): response = client.post("/auth/login", data={"username": "ghost", "password": "any"}) assert response.status_code == 200 assert "Ungültige" in response.text def test_logout_clears_cookie(client, alice): client.post("/auth/login", data={"username": "alice", "password": "secret123"}) response = client.get("/auth/logout") assert response.status_code in (302, 303, 307) assert response.cookies.get("access_token", "") == "" ``` - [ ] **Step 2: Run to confirm failure** ```bash .venv/bin/pytest tests/test_auth_router.py -v ``` Expected: FAIL — router not yet registered in `app/main.py` - [ ] **Step 3: Create `app/templates/auth/login.html`** ```bash mkdir -p app/templates/auth ``` ```html {% extends "base.html" %} {% block title %}Anmelden · EFI Hub{% endblock %} {% block content %}

Fakultät EFI · TH Nürnberg

Anmelden

Bitte melde dich mit deinen Zugangsdaten an.

{% if error %}
{{ error }}
{% endif %}
{% endblock %} ``` - [ ] **Step 4: Create `app/modules/auth/router.py`** ```python from fastapi import APIRouter, Depends, Form, Request from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session from app.core.auth import clear_auth_cookie, create_access_token, set_auth_cookie from app.core.config import get_settings from app.core.database import get_db from app.modules.auth.dependencies import get_current_user from app.modules.auth.schemas import UserOut from app.modules.auth.service import authenticate_user router = APIRouter(prefix="/auth", tags=["auth"]) templates = Jinja2Templates(directory="app/templates") settings = get_settings() _NAV: list[dict] = [] @router.get("/login", response_class=HTMLResponse) async def login_page(request: Request): return templates.TemplateResponse( request, "auth/login.html", {"nav_items": _NAV, "app_version": "0.1.0"} ) @router.post("/login", response_class=HTMLResponse) async def login( request: Request, username: str = Form(...), password: str = Form(...), db: Session = Depends(get_db), ): user = authenticate_user(db, username, password, ldap_enabled=settings.LDAP_ENABLED) if user is None: return templates.TemplateResponse( request, "auth/login.html", {"nav_items": _NAV, "app_version": "0.1.0", "error": "Ungültige Zugangsdaten."}, status_code=401, ) token = create_access_token(username=user.username, is_admin=user.is_admin) response = RedirectResponse(url="/", status_code=303) set_auth_cookie(response, token) return response @router.get("/logout") async def logout(): response = RedirectResponse(url="/auth/login", status_code=303) clear_auth_cookie(response) return response @router.get("/me", response_model=UserOut) async def me(user=Depends(get_current_user)): return user ``` - [ ] **Step 5: Run to confirm failure (router not wired)** ```bash .venv/bin/pytest tests/test_auth_router.py -v ``` Expected: FAIL — routes return 404 (router not included in main app yet). - [ ] **Step 6: Commit templates and router** ```bash git add app/modules/auth/router.py app/templates/auth/login.html tests/test_auth_router.py git commit -m "feat: add auth router with login/logout/me and login template" ``` --- ### Task 8: Wire auth into main.py and create admin script **Files:** - Modify: `app/main.py` - Create: `scripts/create_admin.py` - [ ] **Step 1: Replace `app/main.py`** ```python from fastapi import Depends, FastAPI, Request, WebSocket from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from app.core.auth import get_token_from_request from app.core.config import get_settings from app.core.database import get_db from app.modules.auth.dependencies import RequiresLoginException, get_current_user from app.modules.auth.router import router as auth_router settings = get_settings() app = FastAPI( title="University Process Hub", root_path=settings.APP_PREFIX, ) app.mount("/static", StaticFiles(directory="app/static"), name="static") templates = Jinja2Templates(directory="app/templates") app.include_router(auth_router) @app.exception_handler(RequiresLoginException) async def requires_login_handler(request: Request, exc: RequiresLoginException): return RedirectResponse(url="/auth/login", status_code=307) MODULES = [ { "icon": "📡", "name": "RSS-Feed Server", "description": "Aggregiert und verteilt Neuigkeiten der Fakultät als standardkonformen RSS 2.0 Feed.", "status": "active", }, { "icon": "📅", "name": "Kalender", "description": "Veranstaltungen, Prüfungstermine und Fristen im iCal-Format.", "status": "planned", }, { "icon": "🔔", "name": "Benachrichtigungen", "description": "Push-Nachrichten und WebSocket-basierte Echtzeit-Alerts für Studierende.", "status": "planned", }, { "icon": "📊", "name": "Auslastung", "description": "Raum- und Ressourcenauslastung der Fakultät in Echtzeit.", "status": "planned", }, { "icon": "📚", "name": "Lehrveranstaltungen", "description": "Stundenplan-API und Kursinformationen aus dem Campus-System.", "status": "planned", }, ] NAV_ITEMS = [ {"label": "Übersicht", "url": "/", "active": True}, {"label": "RSS-Feeds", "url": "/rss", "active": False}, {"label": "Kalender", "url": "/kalender", "active": False}, {"label": "Docs", "url": "/docs", "active": False}, ] def _db_mode() -> str: url = settings.DATABASE_URL return "SQLite (dev)" if url.startswith("sqlite") else "MariaDB (prod)" @app.get("/", response_class=HTMLResponse) async def root(request: Request, current_user=Depends(get_current_user)): return templates.TemplateResponse( request, "index.html", { "nav_items": NAV_ITEMS, "modules": MODULES, "db_mode": _db_mode(), "app_version": "0.1.0", "current_user": current_user, }, ) @app.websocket("/ws/hello/{name}") async def websocket_hello(websocket: WebSocket, name: str): await websocket.accept() await websocket.send_text(f"Hello, {name}!") await websocket.close() ``` - [ ] **Step 2: Update `tests/test_landing.py`** `GET /` erfordert jetzt Auth — die alten Tests ohne Cookie schlagen fehl. Ersetze `tests/test_landing.py` vollständig: ```python import pytest from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from app.core.auth import create_access_token from app.core.database import Base, get_db from app.main import app from app.modules.auth.models import User from app.modules.auth.service import hash_password @pytest.fixture(autouse=True) def override_db(): engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) Base.metadata.create_all(bind=engine) Session = sessionmaker(bind=engine) session = Session() app.dependency_overrides[get_db] = lambda: session yield session app.dependency_overrides.clear() session.close() Base.metadata.drop_all(bind=engine) @pytest.fixture def client(): return TestClient(app, follow_redirects=False) @pytest.fixture def auth_cookies(override_db): user = User(username="testuser", full_name="Test User", pw_hash=hash_password("pw")) override_db.add(user) override_db.commit() token = create_access_token(username="testuser", is_admin=False) return {"access_token": token} def test_landing_without_auth_redirects(client): response = client.get("/") assert response.status_code == 307 assert "/auth/login" in response.headers["location"] def test_landing_returns_html(client, auth_cookies): response = client.get("/", cookies=auth_cookies) assert response.status_code == 200 assert "text/html" in response.headers["content-type"] def test_landing_contains_title(client, auth_cookies): response = client.get("/", cookies=auth_cookies) assert "University Process Hub" in response.text def test_landing_contains_rss_module(client, auth_cookies): response = client.get("/", cookies=auth_cookies) assert "RSS-Feed Server" in response.text def test_landing_navbar_links_present(client, auth_cookies): response = client.get("/", cookies=auth_cookies) assert "Übersicht" in response.text def test_landing_info_strip_shows_db_mode(client, auth_cookies): response = client.get("/", cookies=auth_cookies) assert "SQLite" in response.text or "MariaDB" in response.text ``` - [ ] **Step 3: Run all tests** ```bash .venv/bin/pytest -v ``` Expected: All tests PASS, including `test_auth_router.py` which now has the router registered. - [ ] **Step 4: Create admin user script** ```bash mkdir -p scripts ``` Create `scripts/create_admin.py`: ```python """ Create or reset the admin user. Usage: .venv/bin/python scripts/create_admin.py Example: .venv/bin/python scripts/create_admin.py admin geheim123 """ import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) from app.core.database import SessionLocal, engine, Base from app.modules.auth.models import User # noqa: F401 — registers table from app.modules.auth.service import get_user, hash_password def create_admin(username: str, password: str) -> None: Base.metadata.create_all(bind=engine) with SessionLocal() as db: user = get_user(db, username) if user is None: user = User(username=username, full_name="Administrator") db.add(user) user.pw_hash = hash_password(password) user.is_admin = True user.is_active = True db.commit() print(f"Admin user '{username}' created/updated.") if __name__ == "__main__": if len(sys.argv) != 3: print(__doc__) sys.exit(1) create_admin(sys.argv[1], sys.argv[2]) ``` - [ ] **Step 5: Create initial admin user** ```bash .venv/bin/python scripts/create_admin.py admin admin ``` Expected: `Admin user 'admin' created/updated.` - [ ] **Step 6: Start server and verify login** ```bash .venv/bin/uvicorn app.main:app --reload ``` Open `http://localhost:8000` — should redirect to `http://localhost:8000/auth/login`. Log in with `admin` / `admin`. Should redirect to landing page. - [ ] **Step 7: Run full test suite one last time** ```bash .venv/bin/pytest -v ``` Expected: All tests PASS. - [ ] **Step 8: Commit** ```bash git add app/main.py scripts/ git commit -m "feat: wire auth into main app, protect landing page, add create_admin script" ```