efihub/docs/superpowers/plans/2026-04-27-auth-part1.md
2026-04-27 09:26:24 +02:00

40 KiB

Auth Part 1: Core Auth Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Implement local user authentication with bcrypt passwords, JWT in httpOnly cookies, SQLAlchemy User model, Alembic migrations, and FastAPI route protection.

Architecture: Single users table via SQLAlchemy 2.0 + Alembic. app/core/database.py provides the engine and get_db() dependency. app/core/auth.py handles JWT only (no business logic). Business logic lives in app/modules/auth/service.py. Route protection via get_current_user dependency that raises RequiresLoginException → caught by exception handler in main.py → redirect to /auth/login. LDAP hook is a pass stub in service.py until Part 2.

Tech Stack: SQLAlchemy 2.0, Alembic, python-jose[cryptography], passlib[bcrypt], python-multipart, pytest


Task 1: Dependencies and config

Files:

  • Modify: requirements.txt

  • Modify: app/core/config.py

  • Modify: .env.example

  • Test: tests/test_config.py

  • Step 1: Update requirements.txt

fastapi
uvicorn[standard]
pydantic-settings
sqlalchemy
jinja2
pytest
httpx
alembic
python-jose[cryptography]
passlib[bcrypt]
python-multipart
ldap3
  • Step 2: Install
.venv/bin/pip install -r requirements.txt -q

Expected: no errors. passlib, jose, alembic appear in output.

  • Step 3: Write failing test

Create tests/test_config.py:

from app.core.config import get_settings


def test_new_config_fields_have_defaults():
    s = get_settings()
    assert hasattr(s, "SECRET_KEY")
    assert hasattr(s, "LDAP_ENABLED")
    assert s.LDAP_ENABLED is False
    assert s.LDAP_SERVER == "gso1.ads1.fh-nuernberg.de"
    assert s.LDAP_DOMAIN == "ADS1"
  • Step 4: Run test to confirm failure
.venv/bin/pytest tests/test_config.py -v

Expected: FAIL — AttributeError: 'Settings' object has no attribute 'SECRET_KEY'

  • Step 5: Replace app/core/config.py
from functools import lru_cache

from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")

    APP_ENV: str = "development"
    APP_PREFIX: str = ""
    DATABASE_URL: str = "sqlite:///./app.db"
    SECRET_KEY: str = "changeme-replace-in-production"
    LDAP_ENABLED: bool = False
    LDAP_SERVER: str = "gso1.ads1.fh-nuernberg.de"
    LDAP_DOMAIN: str = "ADS1"
    LDAP_SEARCH_BASE: str = "OU=users,OU=EFI,OU=Faculties,DC=ADS1,DC=fh-nuernberg,DC=de"


@lru_cache
def get_settings() -> Settings:
    return Settings()
  • Step 6: Update .env.example
APP_ENV=development
APP_PREFIX=
DATABASE_URL=sqlite:///./app.db
SECRET_KEY=changeme-replace-in-production

# Produktion (MariaDB + LDAP):
# APP_ENV=production
# APP_PREFIX=/uph
# DATABASE_URL=mysql+pymysql://user:password@db:3306/efihub
# SECRET_KEY=<python -c "import secrets; print(secrets.token_hex(32))">
# LDAP_ENABLED=true
  • Step 7: Run test to confirm pass
.venv/bin/pytest tests/test_config.py -v

Expected: PASS.

  • Step 8: Commit
git add requirements.txt app/core/config.py .env.example tests/test_config.py
git commit -m "feat: add auth config fields (SECRET_KEY, LDAP_*)"

Task 2: Database layer

Files:

  • Create: app/core/database.py

  • Test: tests/test_database.py

  • Step 1: Write failing test

Create tests/test_database.py:

from sqlalchemy.orm import Session
from app.core.database import Base, get_db


def test_get_db_yields_session():
    gen = get_db()
    db = next(gen)
    assert isinstance(db, Session)
    try:
        next(gen)
    except StopIteration:
        pass


def test_base_has_metadata():
    assert Base.metadata is not None
  • Step 2: Run to confirm failure
.venv/bin/pytest tests/test_database.py -v

Expected: FAIL — ModuleNotFoundError: No module named 'app.core.database'

  • Step 3: Create app/core/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker

from app.core.config import get_settings

settings = get_settings()

_connect_args = {"check_same_thread": False} if settings.DATABASE_URL.startswith("sqlite") else {}
engine = create_engine(settings.DATABASE_URL, connect_args=_connect_args)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


class Base(DeclarativeBase):
    pass


def get_db() -> Session:
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
  • Step 4: Run to confirm pass
.venv/bin/pytest tests/test_database.py -v

Expected: PASS.

  • Step 5: Commit
git add app/core/database.py tests/test_database.py
git commit -m "feat: add SQLAlchemy database layer with get_db dependency"

Task 3: User model and Alembic migrations

Files:

  • Create: app/modules/__init__.py

  • Create: app/modules/auth/__init__.py

  • Create: app/modules/auth/models.py

  • Create: alembic.ini, alembic/env.py, alembic/versions/

  • Test: tests/test_user_model.py

  • Step 1: Create module packages

mkdir -p app/modules/auth
touch app/modules/__init__.py app/modules/auth/__init__.py
  • Step 2: Write failing test

Create tests/test_user_model.py:

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from app.core.database import Base
from app.modules.auth.models import User


@pytest.fixture
def db():
    engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
    Base.metadata.create_all(bind=engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    yield session
    session.close()
    Base.metadata.drop_all(bind=engine)


def test_create_user(db):
    user = User(username="testuser", full_name="Test User", pw_hash="hashed")
    db.add(user)
    db.commit()
    db.refresh(user)
    assert user.id is not None
    assert user.username == "testuser"
    assert user.is_active is True
    assert user.is_admin is False
    assert user.created_at is not None


def test_user_defaults(db):
    user = User(username="alice", full_name="Alice")
    db.add(user)
    db.commit()
    db.refresh(user)
    assert user.pw_hash is None
    assert user.email is None
    assert user.department is None
    assert user.last_login is None


def test_username_must_be_unique(db):
    db.add(User(username="dup", full_name="First"))
    db.commit()
    db.add(User(username="dup", full_name="Second"))
    with pytest.raises(Exception):
        db.commit()
  • Step 3: Run to confirm failure
.venv/bin/pytest tests/test_user_model.py -v

Expected: FAIL — ModuleNotFoundError: No module named 'app.modules.auth.models'

  • Step 4: Create app/modules/auth/models.py
from datetime import datetime

from sqlalchemy import Boolean, DateTime, Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.sql import func

from app.core.database import Base


class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    username: Mapped[str] = mapped_column(String(64), unique=True, nullable=False, index=True)
    full_name: Mapped[str] = mapped_column(String(128), nullable=False, default="")
    email: Mapped[str | None] = mapped_column(String(128), nullable=True)
    department: Mapped[str | None] = mapped_column(String(64), nullable=True)
    role: Mapped[str | None] = mapped_column(String(64), nullable=True)
    pw_hash: Mapped[str | None] = mapped_column(String(256), nullable=True)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
    is_admin: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
    account_expires: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
    last_login: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
    created_at: Mapped[datetime] = mapped_column(
        DateTime, server_default=func.now(), nullable=False
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime, server_default=func.now(), onupdate=func.now(), nullable=False
    )
  • Step 5: Run to confirm pass
.venv/bin/pytest tests/test_user_model.py -v

Expected: 3 tests PASS.

  • Step 6: Initialize Alembic
.venv/bin/alembic init alembic

Creates alembic.ini and alembic/ directory.

  • Step 7: Configure alembic.ini

Find and replace the database URL line:

# Before:
sqlalchemy.url = driver://user:pass@localhost/dbname

# After:
sqlalchemy.url = sqlite:///./app.db
  • Step 8: Replace alembic/env.py
from logging.config import fileConfig

from sqlalchemy import engine_from_config, pool
from alembic import context

from app.core.database import Base
import app.modules.auth.models  # noqa: F401 — registers User with Base.metadata

config = context.config
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

target_metadata = Base.metadata


def run_migrations_offline() -> None:
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url, target_metadata=target_metadata, literal_binds=True)
    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online() -> None:
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)
        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()
  • Step 9: Generate migration
.venv/bin/alembic revision --autogenerate -m "create users table"

Expected: Generating alembic/versions/<hash>_create_users_table.py. Open the file and verify it contains op.create_table("users", ...) with all 13 columns.

  • Step 10: Run migration
.venv/bin/alembic upgrade head

Expected: Running upgrade -> <hash>, create users table

  • Step 11: Commit
git add app/modules/ alembic/ alembic.ini tests/test_user_model.py
git commit -m "feat: add User model and Alembic migration for users table"

Task 4: Auth schemas and service (local auth)

Files:

  • Create: app/modules/auth/schemas.py

  • Create: app/modules/auth/service.py

  • Test: tests/test_auth_service.py

  • Step 1: Create app/modules/auth/schemas.py

from datetime import datetime

from pydantic import BaseModel


class UserOut(BaseModel):
    id: int
    username: str
    full_name: str
    email: str | None
    department: str | None
    role: str | None
    is_active: bool
    is_admin: bool
    last_login: datetime | None
    created_at: datetime

    model_config = {"from_attributes": True}
  • Step 2: Write failing test

Create tests/test_auth_service.py:

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from app.core.database import Base
from app.modules.auth.models import User
from app.modules.auth.service import authenticate_user, get_user, hash_password, verify_password


@pytest.fixture
def db():
    engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
    Base.metadata.create_all(bind=engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    yield session
    session.close()
    Base.metadata.drop_all(bind=engine)


@pytest.fixture
def alice(db):
    user = User(username="alice", full_name="Alice Smith", pw_hash=hash_password("secret123"))
    db.add(user)
    db.commit()
    db.refresh(user)
    return user


def test_hash_and_verify_password():
    hashed = hash_password("mypassword")
    assert verify_password("mypassword", hashed) is True
    assert verify_password("wrong", hashed) is False


def test_get_user_found(db, alice):
    assert get_user(db, "alice") is not None


def test_get_user_not_found(db):
    assert get_user(db, "nobody") is None


def test_authenticate_correct_password(db, alice):
    user = authenticate_user(db, "alice", "secret123", ldap_enabled=False)
    assert user is not None
    assert user.last_login is not None


def test_authenticate_wrong_password(db, alice):
    assert authenticate_user(db, "alice", "wrong", ldap_enabled=False) is None


def test_authenticate_unknown_user(db):
    assert authenticate_user(db, "ghost", "any", ldap_enabled=False) is None


def test_authenticate_inactive_user(db):
    user = User(username="inactive", full_name="X", pw_hash=hash_password("pw"), is_active=False)
    db.add(user)
    db.commit()
    assert authenticate_user(db, "inactive", "pw", ldap_enabled=False) is None


def test_authenticate_no_pw_hash_no_ldap(db):
    user = User(username="ldaponly", full_name="Y", pw_hash=None)
    db.add(user)
    db.commit()
    assert authenticate_user(db, "ldaponly", "any", ldap_enabled=False) is None
  • Step 3: Run to confirm failure
.venv/bin/pytest tests/test_auth_service.py -v

Expected: FAIL — ModuleNotFoundError: No module named 'app.modules.auth.service'

  • Step 4: Create app/modules/auth/service.py
from datetime import datetime, timezone
from typing import Optional

from passlib.context import CryptContext
from sqlalchemy.orm import Session

from app.modules.auth.models import User

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(plain: str) -> str:
    return pwd_context.hash(plain)


def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)


def get_user(db: Session, username: str) -> Optional[User]:
    return db.query(User).filter(User.username == username).first()


def authenticate_user(
    db: Session, username: str, password: str, ldap_enabled: bool
) -> Optional[User]:
    user = get_user(db, username)
    if user is None or not user.is_active:
        return None

    local_ok = user.pw_hash is not None and verify_password(password, user.pw_hash)
    if local_ok:
        _touch_last_login(db, user)
        return user

    if ldap_enabled:
        # LDAP auth implemented in Part 2
        pass

    return None


def _touch_last_login(db: Session, user: User) -> None:
    user.last_login = datetime.now(timezone.utc)
    db.commit()
  • Step 5: Run to confirm pass
.venv/bin/pytest tests/test_auth_service.py -v

Expected: All 8 tests PASS.

  • Step 6: Commit
git add app/modules/auth/schemas.py app/modules/auth/service.py tests/test_auth_service.py
git commit -m "feat: add auth service with local bcrypt password authentication"

Files:

  • Create: app/core/auth.py

  • Test: tests/test_core_auth.py

  • Step 1: Write failing test

Create tests/test_core_auth.py:

from app.core.auth import COOKIE_NAME, create_access_token, decode_token


def test_create_and_decode_token():
    token = create_access_token(username="alice", is_admin=False)
    payload = decode_token(token)
    assert payload is not None
    assert payload["sub"] == "alice"
    assert payload["is_admin"] is False


def test_admin_claim():
    token = create_access_token(username="admin", is_admin=True)
    assert decode_token(token)["is_admin"] is True


def test_decode_invalid_token():
    assert decode_token("not.a.valid.token") is None


def test_decode_tampered_token():
    token = create_access_token(username="alice", is_admin=False)
    assert decode_token(token[:-4] + "xxxx") is None


def test_cookie_name():
    assert COOKIE_NAME == "access_token"
  • Step 2: Run to confirm failure
.venv/bin/pytest tests/test_core_auth.py -v

Expected: FAIL — ModuleNotFoundError: No module named 'app.core.auth'

  • Step 3: Create app/core/auth.py
from datetime import datetime, timedelta, timezone
from typing import Optional

from fastapi import Request, Response
from jose import JWTError, jwt

from app.core.config import get_settings

settings = get_settings()

COOKIE_NAME = "access_token"
ALGORITHM = "HS256"
TOKEN_EXPIRE_HOURS = 8


def create_access_token(username: str, is_admin: bool) -> str:
    expire = datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRE_HOURS)
    return jwt.encode(
        {"sub": username, "is_admin": is_admin, "exp": expire},
        settings.SECRET_KEY,
        algorithm=ALGORITHM,
    )


def decode_token(token: str) -> Optional[dict]:
    try:
        return jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
    except JWTError:
        return None


def get_token_from_request(request: Request) -> Optional[str]:
    return request.cookies.get(COOKIE_NAME)


def set_auth_cookie(response: Response, token: str) -> None:
    response.set_cookie(
        key=COOKIE_NAME,
        value=token,
        httponly=True,
        samesite="lax",
        secure=settings.APP_ENV == "production",
    )


def clear_auth_cookie(response: Response) -> None:
    response.delete_cookie(key=COOKIE_NAME, httponly=True, samesite="lax")
  • Step 4: Run to confirm pass
.venv/bin/pytest tests/test_core_auth.py -v

Expected: All 5 tests PASS.

  • Step 5: Commit
git add app/core/auth.py tests/test_core_auth.py
git commit -m "feat: add JWT creation, decoding and cookie helpers"

Task 6: FastAPI auth dependencies

Files:

  • Create: app/modules/auth/dependencies.py

  • Test: tests/test_auth_dependencies.py

  • Step 1: Write failing test

Create tests/test_auth_dependencies.py:

import pytest
from fastapi import Depends, FastAPI
from fastapi.responses import RedirectResponse
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from app.core.auth import create_access_token
from app.core.database import Base, get_db
from app.modules.auth.dependencies import RequiresLoginException, get_current_user, require_admin
from app.modules.auth.models import User
from app.modules.auth.service import hash_password


@pytest.fixture
def db():
    engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
    Base.metadata.create_all(bind=engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    yield session
    session.close()
    Base.metadata.drop_all(bind=engine)


@pytest.fixture
def test_app(db):
    app = FastAPI()

    @app.exception_handler(RequiresLoginException)
    async def handle_login_required(request, exc):
        return RedirectResponse(url="/auth/login", status_code=307)

    @app.get("/protected")
    async def protected(user=Depends(get_current_user)):
        return {"username": user.username}

    @app.get("/admin-only")
    async def admin_only(user=Depends(require_admin)):
        return {"username": user.username}

    app.dependency_overrides[get_db] = lambda: db
    return TestClient(test_app, follow_redirects=False)  # noqa — replaced below


@pytest.fixture
def client(test_app):
    return TestClient(test_app, follow_redirects=False)


def _token_cookie(username: str, is_admin: bool) -> dict:
    token = create_access_token(username=username, is_admin=is_admin)
    return {"access_token": token}


def test_no_cookie_redirects_to_login(client):
    response = client.get("/protected")
    assert response.status_code == 307
    assert "/auth/login" in response.headers["location"]


def test_valid_token_returns_user(client, db):
    db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw")))
    db.commit()
    response = client.get("/protected", cookies=_token_cookie("bob", False))
    assert response.status_code == 200
    assert response.json()["username"] == "bob"


def test_invalid_token_redirects(client):
    response = client.get("/protected", cookies={"access_token": "invalid"})
    assert response.status_code == 307


def test_require_admin_blocks_non_admin(client, db):
    db.add(User(username="regular", full_name="R", is_admin=False, pw_hash=hash_password("pw")))
    db.commit()
    response = client.get("/admin-only", cookies=_token_cookie("regular", False))
    assert response.status_code == 403


def test_require_admin_allows_admin(client, db):
    db.add(User(username="admin", full_name="A", is_admin=True, pw_hash=hash_password("pw")))
    db.commit()
    response = client.get("/admin-only", cookies=_token_cookie("admin", True))
    assert response.status_code == 200
  • Step 2: Run to confirm failure
.venv/bin/pytest tests/test_auth_dependencies.py -v

Expected: FAIL — ModuleNotFoundError: No module named 'app.modules.auth.dependencies'

  • Step 3: Create app/modules/auth/dependencies.py
from fastapi import Depends, HTTPException
from sqlalchemy.orm import Session

from app.core.auth import decode_token, get_token_from_request
from app.core.database import get_db
from app.modules.auth.models import User
from app.modules.auth.service import get_user

from fastapi import Request


class RequiresLoginException(Exception):
    pass


async def get_current_user(
    request: Request, db: Session = Depends(get_db)
) -> User:
    token = get_token_from_request(request)
    if not token:
        raise RequiresLoginException()
    payload = decode_token(token)
    if not payload:
        raise RequiresLoginException()
    user = get_user(db, payload.get("sub", ""))
    if user is None or not user.is_active:
        raise RequiresLoginException()
    return user


async def require_admin(user: User = Depends(get_current_user)) -> User:
    if not user.is_admin:
        raise HTTPException(status_code=403, detail="Admin access required")
    return user
  • Step 4: Fix the test fixture — there is a bug in the test above

In tests/test_auth_dependencies.py, the test_app fixture returns TestClient incorrectly. Replace the fixture with:

@pytest.fixture
def test_app(db):
    app = FastAPI()

    @app.exception_handler(RequiresLoginException)
    async def handle_login_required(request, exc):
        return RedirectResponse(url="/auth/login", status_code=307)

    @app.get("/protected")
    async def protected(user=Depends(get_current_user)):
        return {"username": user.username}

    @app.get("/admin-only")
    async def admin_only(user=Depends(require_admin)):
        return {"username": user.username}

    app.dependency_overrides[get_db] = lambda: db
    return app


@pytest.fixture
def client(test_app):
    return TestClient(test_app, follow_redirects=False)

The full corrected tests/test_auth_dependencies.py:

import pytest
from fastapi import Depends, FastAPI
from fastapi.responses import RedirectResponse
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from app.core.auth import create_access_token
from app.core.database import Base, get_db
from app.modules.auth.dependencies import RequiresLoginException, get_current_user, require_admin
from app.modules.auth.models import User
from app.modules.auth.service import hash_password


@pytest.fixture
def db():
    engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
    Base.metadata.create_all(bind=engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    yield session
    session.close()
    Base.metadata.drop_all(bind=engine)


@pytest.fixture
def test_app(db):
    app = FastAPI()

    @app.exception_handler(RequiresLoginException)
    async def handle_login_required(request, exc):
        return RedirectResponse(url="/auth/login", status_code=307)

    @app.get("/protected")
    async def protected(user=Depends(get_current_user)):
        return {"username": user.username}

    @app.get("/admin-only")
    async def admin_only(user=Depends(require_admin)):
        return {"username": user.username}

    app.dependency_overrides[get_db] = lambda: db
    return app


@pytest.fixture
def client(test_app):
    return TestClient(test_app, follow_redirects=False)


def _token_cookie(username: str, is_admin: bool) -> dict:
    token = create_access_token(username=username, is_admin=is_admin)
    return {"access_token": token}


def test_no_cookie_redirects_to_login(client):
    response = client.get("/protected")
    assert response.status_code == 307
    assert "/auth/login" in response.headers["location"]


def test_valid_token_returns_user(client, db):
    db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw")))
    db.commit()
    response = client.get("/protected", cookies=_token_cookie("bob", False))
    assert response.status_code == 200
    assert response.json()["username"] == "bob"


def test_invalid_token_redirects(client):
    response = client.get("/protected", cookies={"access_token": "invalid"})
    assert response.status_code == 307


def test_require_admin_blocks_non_admin(client, db):
    db.add(User(username="regular", full_name="R", is_admin=False, pw_hash=hash_password("pw")))
    db.commit()
    response = client.get("/admin-only", cookies=_token_cookie("regular", False))
    assert response.status_code == 403


def test_require_admin_allows_admin(client, db):
    db.add(User(username="admin", full_name="A", is_admin=True, pw_hash=hash_password("pw")))
    db.commit()
    response = client.get("/admin-only", cookies=_token_cookie("admin", True))
    assert response.status_code == 200
  • Step 5: Run to confirm pass
.venv/bin/pytest tests/test_auth_dependencies.py -v

Expected: All 5 tests PASS.

  • Step 6: Commit
git add app/modules/auth/dependencies.py tests/test_auth_dependencies.py
git commit -m "feat: add get_current_user and require_admin dependencies"

Task 7: Auth router and login template

Files:

  • Create: app/modules/auth/router.py

  • Create: app/templates/auth/login.html

  • Test: tests/test_auth_router.py

  • Step 1: Write failing test

Create tests/test_auth_router.py:

import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from app.core.database import Base, get_db
from app.main import app
from app.modules.auth.models import User
from app.modules.auth.service import hash_password


@pytest.fixture(autouse=True)
def override_db():
    engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
    Base.metadata.create_all(bind=engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    app.dependency_overrides[get_db] = lambda: session
    yield session
    app.dependency_overrides.clear()
    session.close()
    Base.metadata.drop_all(bind=engine)


@pytest.fixture
def client():
    return TestClient(app, follow_redirects=False)


@pytest.fixture
def alice(override_db):
    user = User(username="alice", full_name="Alice Smith", pw_hash=hash_password("secret123"))
    override_db.add(user)
    override_db.commit()
    return user


def test_get_login_page(client):
    response = client.get("/auth/login")
    assert response.status_code == 200
    assert "text/html" in response.headers["content-type"]
    assert "Anmelden" in response.text


def test_login_correct_credentials_redirects(client, alice):
    response = client.post("/auth/login", data={"username": "alice", "password": "secret123"})
    assert response.status_code in (302, 303, 307)
    assert "access_token" in response.cookies


def test_login_wrong_password_shows_error(client, alice):
    response = client.post("/auth/login", data={"username": "alice", "password": "wrong"})
    assert response.status_code == 200
    assert "Ungültige" in response.text


def test_login_unknown_user_shows_error(client):
    response = client.post("/auth/login", data={"username": "ghost", "password": "any"})
    assert response.status_code == 200
    assert "Ungültige" in response.text


def test_logout_clears_cookie(client, alice):
    client.post("/auth/login", data={"username": "alice", "password": "secret123"})
    response = client.get("/auth/logout")
    assert response.status_code in (302, 303, 307)
    assert response.cookies.get("access_token", "") == ""
  • Step 2: Run to confirm failure
.venv/bin/pytest tests/test_auth_router.py -v

Expected: FAIL — router not yet registered in app/main.py

  • Step 3: Create app/templates/auth/login.html
mkdir -p app/templates/auth
{% extends "base.html" %}

{% block title %}Anmelden · EFI Hub{% endblock %}

{% block content %}
<div class="min-h-[80vh] flex items-center justify-center px-4">
  <div class="w-full max-w-sm">

    <div class="text-center mb-8">
      <p class="text-xs font-semibold uppercase tracking-widest text-efi mb-1">
        Fakultät EFI · TH Nürnberg
      </p>
      <h1 class="text-xl font-bold text-gray-900">Anmelden</h1>
      <p class="text-sm text-gray-500 mt-1">Bitte melde dich mit deinen Zugangsdaten an.</p>
    </div>

    {% if error %}
    <div class="mb-4 px-4 py-3 rounded-md bg-red-50 border border-red-200 text-sm text-red-700">
      {{ error }}
    </div>
    {% endif %}

    <form method="post" action="/auth/login" class="bg-white rounded-lg border border-gray-200 shadow-sm p-6 space-y-4">
      <div>
        <label class="block text-xs font-semibold text-gray-600 uppercase tracking-wide mb-1" for="username">
          Benutzername
        </label>
        <input
          id="username" name="username" type="text"
          class="w-full border border-gray-300 rounded-md px-3 py-2 text-sm focus:outline-none focus:border-efi focus:ring-1 focus:ring-efi"
          autocomplete="username" autofocus required
        >
      </div>
      <div>
        <label class="block text-xs font-semibold text-gray-600 uppercase tracking-wide mb-1" for="password">
          Passwort
        </label>
        <input
          id="password" name="password" type="password"
          class="w-full border border-gray-300 rounded-md px-3 py-2 text-sm focus:outline-none focus:border-efi focus:ring-1 focus:ring-efi"
          autocomplete="current-password" required
        >
      </div>
      <button
        type="submit"
        class="w-full bg-efi text-white text-sm font-semibold py-2 px-4 rounded-md hover:opacity-90 transition-opacity"
      >
        Anmelden
      </button>
    </form>

  </div>
</div>
{% endblock %}
  • Step 4: Create app/modules/auth/router.py
from fastapi import APIRouter, Depends, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session

from app.core.auth import clear_auth_cookie, create_access_token, set_auth_cookie
from app.core.config import get_settings
from app.core.database import get_db
from app.modules.auth.dependencies import get_current_user
from app.modules.auth.schemas import UserOut
from app.modules.auth.service import authenticate_user

router = APIRouter(prefix="/auth", tags=["auth"])
templates = Jinja2Templates(directory="app/templates")
settings = get_settings()

_NAV: list[dict] = []


@router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
    return templates.TemplateResponse(
        request, "auth/login.html", {"nav_items": _NAV, "app_version": "0.1.0"}
    )


@router.post("/login", response_class=HTMLResponse)
async def login(
    request: Request,
    username: str = Form(...),
    password: str = Form(...),
    db: Session = Depends(get_db),
):
    user = authenticate_user(db, username, password, ldap_enabled=settings.LDAP_ENABLED)
    if user is None:
        return templates.TemplateResponse(
            request,
            "auth/login.html",
            {"nav_items": _NAV, "app_version": "0.1.0", "error": "Ungültige Zugangsdaten."},
            status_code=401,
        )
    token = create_access_token(username=user.username, is_admin=user.is_admin)
    response = RedirectResponse(url="/", status_code=303)
    set_auth_cookie(response, token)
    return response


@router.get("/logout")
async def logout():
    response = RedirectResponse(url="/auth/login", status_code=303)
    clear_auth_cookie(response)
    return response


@router.get("/me", response_model=UserOut)
async def me(user=Depends(get_current_user)):
    return user
  • Step 5: Run to confirm failure (router not wired)
.venv/bin/pytest tests/test_auth_router.py -v

Expected: FAIL — routes return 404 (router not included in main app yet).

  • Step 6: Commit templates and router
git add app/modules/auth/router.py app/templates/auth/login.html tests/test_auth_router.py
git commit -m "feat: add auth router with login/logout/me and login template"

Task 8: Wire auth into main.py and create admin script

Files:

  • Modify: app/main.py

  • Create: scripts/create_admin.py

  • Step 1: Replace app/main.py

from fastapi import Depends, FastAPI, Request, WebSocket
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates

from app.core.auth import get_token_from_request
from app.core.config import get_settings
from app.core.database import get_db
from app.modules.auth.dependencies import RequiresLoginException, get_current_user
from app.modules.auth.router import router as auth_router

settings = get_settings()

app = FastAPI(
    title="University Process Hub",
    root_path=settings.APP_PREFIX,
)

app.mount("/static", StaticFiles(directory="app/static"), name="static")
templates = Jinja2Templates(directory="app/templates")

app.include_router(auth_router)


@app.exception_handler(RequiresLoginException)
async def requires_login_handler(request: Request, exc: RequiresLoginException):
    return RedirectResponse(url="/auth/login", status_code=307)


MODULES = [
    {
        "icon": "📡",
        "name": "RSS-Feed Server",
        "description": "Aggregiert und verteilt Neuigkeiten der Fakultät als standardkonformen RSS 2.0 Feed.",
        "status": "active",
    },
    {
        "icon": "📅",
        "name": "Kalender",
        "description": "Veranstaltungen, Prüfungstermine und Fristen im iCal-Format.",
        "status": "planned",
    },
    {
        "icon": "🔔",
        "name": "Benachrichtigungen",
        "description": "Push-Nachrichten und WebSocket-basierte Echtzeit-Alerts für Studierende.",
        "status": "planned",
    },
    {
        "icon": "📊",
        "name": "Auslastung",
        "description": "Raum- und Ressourcenauslastung der Fakultät in Echtzeit.",
        "status": "planned",
    },
    {
        "icon": "📚",
        "name": "Lehrveranstaltungen",
        "description": "Stundenplan-API und Kursinformationen aus dem Campus-System.",
        "status": "planned",
    },
]

NAV_ITEMS = [
    {"label": "Übersicht", "url": "/", "active": True},
    {"label": "RSS-Feeds", "url": "/rss", "active": False},
    {"label": "Kalender", "url": "/kalender", "active": False},
    {"label": "Docs", "url": "/docs", "active": False},
]


def _db_mode() -> str:
    url = settings.DATABASE_URL
    return "SQLite (dev)" if url.startswith("sqlite") else "MariaDB (prod)"


@app.get("/", response_class=HTMLResponse)
async def root(request: Request, current_user=Depends(get_current_user)):
    return templates.TemplateResponse(
        request,
        "index.html",
        {
            "nav_items": NAV_ITEMS,
            "modules": MODULES,
            "db_mode": _db_mode(),
            "app_version": "0.1.0",
            "current_user": current_user,
        },
    )


@app.websocket("/ws/hello/{name}")
async def websocket_hello(websocket: WebSocket, name: str):
    await websocket.accept()
    await websocket.send_text(f"Hello, {name}!")
    await websocket.close()
  • Step 2: Update tests/test_landing.py

GET / erfordert jetzt Auth — die alten Tests ohne Cookie schlagen fehl. Ersetze tests/test_landing.py vollständig:

import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from app.core.auth import create_access_token
from app.core.database import Base, get_db
from app.main import app
from app.modules.auth.models import User
from app.modules.auth.service import hash_password


@pytest.fixture(autouse=True)
def override_db():
    engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
    Base.metadata.create_all(bind=engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    app.dependency_overrides[get_db] = lambda: session
    yield session
    app.dependency_overrides.clear()
    session.close()
    Base.metadata.drop_all(bind=engine)


@pytest.fixture
def client():
    return TestClient(app, follow_redirects=False)


@pytest.fixture
def auth_cookies(override_db):
    user = User(username="testuser", full_name="Test User", pw_hash=hash_password("pw"))
    override_db.add(user)
    override_db.commit()
    token = create_access_token(username="testuser", is_admin=False)
    return {"access_token": token}


def test_landing_without_auth_redirects(client):
    response = client.get("/")
    assert response.status_code == 307
    assert "/auth/login" in response.headers["location"]


def test_landing_returns_html(client, auth_cookies):
    response = client.get("/", cookies=auth_cookies)
    assert response.status_code == 200
    assert "text/html" in response.headers["content-type"]


def test_landing_contains_title(client, auth_cookies):
    response = client.get("/", cookies=auth_cookies)
    assert "University Process Hub" in response.text


def test_landing_contains_rss_module(client, auth_cookies):
    response = client.get("/", cookies=auth_cookies)
    assert "RSS-Feed Server" in response.text


def test_landing_navbar_links_present(client, auth_cookies):
    response = client.get("/", cookies=auth_cookies)
    assert "Übersicht" in response.text

def test_landing_info_strip_shows_db_mode(client, auth_cookies):
    response = client.get("/", cookies=auth_cookies)
    assert "SQLite" in response.text or "MariaDB" in response.text
  • Step 3: Run all tests
.venv/bin/pytest -v

Expected: All tests PASS, including test_auth_router.py which now has the router registered.

  • Step 4: Create admin user script
mkdir -p scripts

Create scripts/create_admin.py:

"""
Create or reset the admin user.

Usage:
    .venv/bin/python scripts/create_admin.py <username> <password>

Example:
    .venv/bin/python scripts/create_admin.py admin geheim123
"""
import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).parent.parent))

from app.core.database import SessionLocal, engine, Base
from app.modules.auth.models import User  # noqa: F401 — registers table
from app.modules.auth.service import get_user, hash_password


def create_admin(username: str, password: str) -> None:
    Base.metadata.create_all(bind=engine)
    with SessionLocal() as db:
        user = get_user(db, username)
        if user is None:
            user = User(username=username, full_name="Administrator")
            db.add(user)
        user.pw_hash = hash_password(password)
        user.is_admin = True
        user.is_active = True
        db.commit()
        print(f"Admin user '{username}' created/updated.")


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print(__doc__)
        sys.exit(1)
    create_admin(sys.argv[1], sys.argv[2])
  • Step 5: Create initial admin user
.venv/bin/python scripts/create_admin.py admin admin

Expected: Admin user 'admin' created/updated.

  • Step 6: Start server and verify login
.venv/bin/uvicorn app.main:app --reload

Open http://localhost:8000 — should redirect to http://localhost:8000/auth/login. Log in with admin / admin. Should redirect to landing page.

  • Step 7: Run full test suite one last time
.venv/bin/pytest -v

Expected: All tests PASS.

  • Step 8: Commit
git add app/main.py scripts/
git commit -m "feat: wire auth into main app, protect landing page, add create_admin script"