efihub/docs/superpowers/plans/2026-04-27-auth-part1.md
2026-04-27 09:26:24 +02:00

1500 lines
40 KiB
Markdown

# Auth Part 1: Core Auth Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Implement local user authentication with bcrypt passwords, JWT in httpOnly cookies, SQLAlchemy User model, Alembic migrations, and FastAPI route protection.
**Architecture:** Single `users` table via SQLAlchemy 2.0 + Alembic. `app/core/database.py` provides the engine and `get_db()` dependency. `app/core/auth.py` handles JWT only (no business logic). Business logic lives in `app/modules/auth/service.py`. Route protection via `get_current_user` dependency that raises `RequiresLoginException` → caught by exception handler in `main.py` → redirect to `/auth/login`. LDAP hook is a `pass` stub in `service.py` until Part 2.
**Tech Stack:** SQLAlchemy 2.0, Alembic, python-jose[cryptography], passlib[bcrypt], python-multipart, pytest
---
### Task 1: Dependencies and config
**Files:**
- Modify: `requirements.txt`
- Modify: `app/core/config.py`
- Modify: `.env.example`
- Test: `tests/test_config.py`
- [ ] **Step 1: Update `requirements.txt`**
```
fastapi
uvicorn[standard]
pydantic-settings
sqlalchemy
jinja2
pytest
httpx
alembic
python-jose[cryptography]
passlib[bcrypt]
python-multipart
ldap3
```
- [ ] **Step 2: Install**
```bash
.venv/bin/pip install -r requirements.txt -q
```
Expected: no errors. `passlib`, `jose`, `alembic` appear in output.
- [ ] **Step 3: Write failing test**
Create `tests/test_config.py`:
```python
from app.core.config import get_settings
def test_new_config_fields_have_defaults():
s = get_settings()
assert hasattr(s, "SECRET_KEY")
assert hasattr(s, "LDAP_ENABLED")
assert s.LDAP_ENABLED is False
assert s.LDAP_SERVER == "gso1.ads1.fh-nuernberg.de"
assert s.LDAP_DOMAIN == "ADS1"
```
- [ ] **Step 4: Run test to confirm failure**
```bash
.venv/bin/pytest tests/test_config.py -v
```
Expected: FAIL — `AttributeError: 'Settings' object has no attribute 'SECRET_KEY'`
- [ ] **Step 5: Replace `app/core/config.py`**
```python
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
APP_ENV: str = "development"
APP_PREFIX: str = ""
DATABASE_URL: str = "sqlite:///./app.db"
SECRET_KEY: str = "changeme-replace-in-production"
LDAP_ENABLED: bool = False
LDAP_SERVER: str = "gso1.ads1.fh-nuernberg.de"
LDAP_DOMAIN: str = "ADS1"
LDAP_SEARCH_BASE: str = "OU=users,OU=EFI,OU=Faculties,DC=ADS1,DC=fh-nuernberg,DC=de"
@lru_cache
def get_settings() -> Settings:
return Settings()
```
- [ ] **Step 6: Update `.env.example`**
```
APP_ENV=development
APP_PREFIX=
DATABASE_URL=sqlite:///./app.db
SECRET_KEY=changeme-replace-in-production
# Produktion (MariaDB + LDAP):
# APP_ENV=production
# APP_PREFIX=/uph
# DATABASE_URL=mysql+pymysql://user:password@db:3306/efihub
# SECRET_KEY=<python -c "import secrets; print(secrets.token_hex(32))">
# LDAP_ENABLED=true
```
- [ ] **Step 7: Run test to confirm pass**
```bash
.venv/bin/pytest tests/test_config.py -v
```
Expected: PASS.
- [ ] **Step 8: Commit**
```bash
git add requirements.txt app/core/config.py .env.example tests/test_config.py
git commit -m "feat: add auth config fields (SECRET_KEY, LDAP_*)"
```
---
### Task 2: Database layer
**Files:**
- Create: `app/core/database.py`
- Test: `tests/test_database.py`
- [ ] **Step 1: Write failing test**
Create `tests/test_database.py`:
```python
from sqlalchemy.orm import Session
from app.core.database import Base, get_db
def test_get_db_yields_session():
gen = get_db()
db = next(gen)
assert isinstance(db, Session)
try:
next(gen)
except StopIteration:
pass
def test_base_has_metadata():
assert Base.metadata is not None
```
- [ ] **Step 2: Run to confirm failure**
```bash
.venv/bin/pytest tests/test_database.py -v
```
Expected: FAIL — `ModuleNotFoundError: No module named 'app.core.database'`
- [ ] **Step 3: Create `app/core/database.py`**
```python
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
from app.core.config import get_settings
settings = get_settings()
_connect_args = {"check_same_thread": False} if settings.DATABASE_URL.startswith("sqlite") else {}
engine = create_engine(settings.DATABASE_URL, connect_args=_connect_args)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class Base(DeclarativeBase):
pass
def get_db() -> Session:
db = SessionLocal()
try:
yield db
finally:
db.close()
```
- [ ] **Step 4: Run to confirm pass**
```bash
.venv/bin/pytest tests/test_database.py -v
```
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add app/core/database.py tests/test_database.py
git commit -m "feat: add SQLAlchemy database layer with get_db dependency"
```
---
### Task 3: User model and Alembic migrations
**Files:**
- Create: `app/modules/__init__.py`
- Create: `app/modules/auth/__init__.py`
- Create: `app/modules/auth/models.py`
- Create: `alembic.ini`, `alembic/env.py`, `alembic/versions/`
- Test: `tests/test_user_model.py`
- [ ] **Step 1: Create module packages**
```bash
mkdir -p app/modules/auth
touch app/modules/__init__.py app/modules/auth/__init__.py
```
- [ ] **Step 2: Write failing test**
Create `tests/test_user_model.py`:
```python
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.database import Base
from app.modules.auth.models import User
@pytest.fixture
def db():
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(bind=engine)
def test_create_user(db):
user = User(username="testuser", full_name="Test User", pw_hash="hashed")
db.add(user)
db.commit()
db.refresh(user)
assert user.id is not None
assert user.username == "testuser"
assert user.is_active is True
assert user.is_admin is False
assert user.created_at is not None
def test_user_defaults(db):
user = User(username="alice", full_name="Alice")
db.add(user)
db.commit()
db.refresh(user)
assert user.pw_hash is None
assert user.email is None
assert user.department is None
assert user.last_login is None
def test_username_must_be_unique(db):
db.add(User(username="dup", full_name="First"))
db.commit()
db.add(User(username="dup", full_name="Second"))
with pytest.raises(Exception):
db.commit()
```
- [ ] **Step 3: Run to confirm failure**
```bash
.venv/bin/pytest tests/test_user_model.py -v
```
Expected: FAIL — `ModuleNotFoundError: No module named 'app.modules.auth.models'`
- [ ] **Step 4: Create `app/modules/auth/models.py`**
```python
from datetime import datetime
from sqlalchemy import Boolean, DateTime, Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.sql import func
from app.core.database import Base
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
username: Mapped[str] = mapped_column(String(64), unique=True, nullable=False, index=True)
full_name: Mapped[str] = mapped_column(String(128), nullable=False, default="")
email: Mapped[str | None] = mapped_column(String(128), nullable=True)
department: Mapped[str | None] = mapped_column(String(64), nullable=True)
role: Mapped[str | None] = mapped_column(String(64), nullable=True)
pw_hash: Mapped[str | None] = mapped_column(String(256), nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
is_admin: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
account_expires: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
last_login: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime, server_default=func.now(), nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime, server_default=func.now(), onupdate=func.now(), nullable=False
)
```
- [ ] **Step 5: Run to confirm pass**
```bash
.venv/bin/pytest tests/test_user_model.py -v
```
Expected: 3 tests PASS.
- [ ] **Step 6: Initialize Alembic**
```bash
.venv/bin/alembic init alembic
```
Creates `alembic.ini` and `alembic/` directory.
- [ ] **Step 7: Configure `alembic.ini`**
Find and replace the database URL line:
```ini
# Before:
sqlalchemy.url = driver://user:pass@localhost/dbname
# After:
sqlalchemy.url = sqlite:///./app.db
```
- [ ] **Step 8: Replace `alembic/env.py`**
```python
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
from app.core.database import Base
import app.modules.auth.models # noqa: F401 — registers User with Base.metadata
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url, target_metadata=target_metadata, literal_binds=True)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
```
- [ ] **Step 9: Generate migration**
```bash
.venv/bin/alembic revision --autogenerate -m "create users table"
```
Expected: `Generating alembic/versions/<hash>_create_users_table.py`. Open the file and verify it contains `op.create_table("users", ...)` with all 13 columns.
- [ ] **Step 10: Run migration**
```bash
.venv/bin/alembic upgrade head
```
Expected: `Running upgrade -> <hash>, create users table`
- [ ] **Step 11: Commit**
```bash
git add app/modules/ alembic/ alembic.ini tests/test_user_model.py
git commit -m "feat: add User model and Alembic migration for users table"
```
---
### Task 4: Auth schemas and service (local auth)
**Files:**
- Create: `app/modules/auth/schemas.py`
- Create: `app/modules/auth/service.py`
- Test: `tests/test_auth_service.py`
- [ ] **Step 1: Create `app/modules/auth/schemas.py`**
```python
from datetime import datetime
from pydantic import BaseModel
class UserOut(BaseModel):
id: int
username: str
full_name: str
email: str | None
department: str | None
role: str | None
is_active: bool
is_admin: bool
last_login: datetime | None
created_at: datetime
model_config = {"from_attributes": True}
```
- [ ] **Step 2: Write failing test**
Create `tests/test_auth_service.py`:
```python
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.database import Base
from app.modules.auth.models import User
from app.modules.auth.service import authenticate_user, get_user, hash_password, verify_password
@pytest.fixture
def db():
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def alice(db):
user = User(username="alice", full_name="Alice Smith", pw_hash=hash_password("secret123"))
db.add(user)
db.commit()
db.refresh(user)
return user
def test_hash_and_verify_password():
hashed = hash_password("mypassword")
assert verify_password("mypassword", hashed) is True
assert verify_password("wrong", hashed) is False
def test_get_user_found(db, alice):
assert get_user(db, "alice") is not None
def test_get_user_not_found(db):
assert get_user(db, "nobody") is None
def test_authenticate_correct_password(db, alice):
user = authenticate_user(db, "alice", "secret123", ldap_enabled=False)
assert user is not None
assert user.last_login is not None
def test_authenticate_wrong_password(db, alice):
assert authenticate_user(db, "alice", "wrong", ldap_enabled=False) is None
def test_authenticate_unknown_user(db):
assert authenticate_user(db, "ghost", "any", ldap_enabled=False) is None
def test_authenticate_inactive_user(db):
user = User(username="inactive", full_name="X", pw_hash=hash_password("pw"), is_active=False)
db.add(user)
db.commit()
assert authenticate_user(db, "inactive", "pw", ldap_enabled=False) is None
def test_authenticate_no_pw_hash_no_ldap(db):
user = User(username="ldaponly", full_name="Y", pw_hash=None)
db.add(user)
db.commit()
assert authenticate_user(db, "ldaponly", "any", ldap_enabled=False) is None
```
- [ ] **Step 3: Run to confirm failure**
```bash
.venv/bin/pytest tests/test_auth_service.py -v
```
Expected: FAIL — `ModuleNotFoundError: No module named 'app.modules.auth.service'`
- [ ] **Step 4: Create `app/modules/auth/service.py`**
```python
from datetime import datetime, timezone
from typing import Optional
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from app.modules.auth.models import User
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(plain: str) -> str:
return pwd_context.hash(plain)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def get_user(db: Session, username: str) -> Optional[User]:
return db.query(User).filter(User.username == username).first()
def authenticate_user(
db: Session, username: str, password: str, ldap_enabled: bool
) -> Optional[User]:
user = get_user(db, username)
if user is None or not user.is_active:
return None
local_ok = user.pw_hash is not None and verify_password(password, user.pw_hash)
if local_ok:
_touch_last_login(db, user)
return user
if ldap_enabled:
# LDAP auth implemented in Part 2
pass
return None
def _touch_last_login(db: Session, user: User) -> None:
user.last_login = datetime.now(timezone.utc)
db.commit()
```
- [ ] **Step 5: Run to confirm pass**
```bash
.venv/bin/pytest tests/test_auth_service.py -v
```
Expected: All 8 tests PASS.
- [ ] **Step 6: Commit**
```bash
git add app/modules/auth/schemas.py app/modules/auth/service.py tests/test_auth_service.py
git commit -m "feat: add auth service with local bcrypt password authentication"
```
---
### Task 5: JWT and cookie helpers
**Files:**
- Create: `app/core/auth.py`
- Test: `tests/test_core_auth.py`
- [ ] **Step 1: Write failing test**
Create `tests/test_core_auth.py`:
```python
from app.core.auth import COOKIE_NAME, create_access_token, decode_token
def test_create_and_decode_token():
token = create_access_token(username="alice", is_admin=False)
payload = decode_token(token)
assert payload is not None
assert payload["sub"] == "alice"
assert payload["is_admin"] is False
def test_admin_claim():
token = create_access_token(username="admin", is_admin=True)
assert decode_token(token)["is_admin"] is True
def test_decode_invalid_token():
assert decode_token("not.a.valid.token") is None
def test_decode_tampered_token():
token = create_access_token(username="alice", is_admin=False)
assert decode_token(token[:-4] + "xxxx") is None
def test_cookie_name():
assert COOKIE_NAME == "access_token"
```
- [ ] **Step 2: Run to confirm failure**
```bash
.venv/bin/pytest tests/test_core_auth.py -v
```
Expected: FAIL — `ModuleNotFoundError: No module named 'app.core.auth'`
- [ ] **Step 3: Create `app/core/auth.py`**
```python
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import Request, Response
from jose import JWTError, jwt
from app.core.config import get_settings
settings = get_settings()
COOKIE_NAME = "access_token"
ALGORITHM = "HS256"
TOKEN_EXPIRE_HOURS = 8
def create_access_token(username: str, is_admin: bool) -> str:
expire = datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRE_HOURS)
return jwt.encode(
{"sub": username, "is_admin": is_admin, "exp": expire},
settings.SECRET_KEY,
algorithm=ALGORITHM,
)
def decode_token(token: str) -> Optional[dict]:
try:
return jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
return None
def get_token_from_request(request: Request) -> Optional[str]:
return request.cookies.get(COOKIE_NAME)
def set_auth_cookie(response: Response, token: str) -> None:
response.set_cookie(
key=COOKIE_NAME,
value=token,
httponly=True,
samesite="lax",
secure=settings.APP_ENV == "production",
)
def clear_auth_cookie(response: Response) -> None:
response.delete_cookie(key=COOKIE_NAME, httponly=True, samesite="lax")
```
- [ ] **Step 4: Run to confirm pass**
```bash
.venv/bin/pytest tests/test_core_auth.py -v
```
Expected: All 5 tests PASS.
- [ ] **Step 5: Commit**
```bash
git add app/core/auth.py tests/test_core_auth.py
git commit -m "feat: add JWT creation, decoding and cookie helpers"
```
---
### Task 6: FastAPI auth dependencies
**Files:**
- Create: `app/modules/auth/dependencies.py`
- Test: `tests/test_auth_dependencies.py`
- [ ] **Step 1: Write failing test**
Create `tests/test_auth_dependencies.py`:
```python
import pytest
from fastapi import Depends, FastAPI
from fastapi.responses import RedirectResponse
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.auth import create_access_token
from app.core.database import Base, get_db
from app.modules.auth.dependencies import RequiresLoginException, get_current_user, require_admin
from app.modules.auth.models import User
from app.modules.auth.service import hash_password
@pytest.fixture
def db():
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def test_app(db):
app = FastAPI()
@app.exception_handler(RequiresLoginException)
async def handle_login_required(request, exc):
return RedirectResponse(url="/auth/login", status_code=307)
@app.get("/protected")
async def protected(user=Depends(get_current_user)):
return {"username": user.username}
@app.get("/admin-only")
async def admin_only(user=Depends(require_admin)):
return {"username": user.username}
app.dependency_overrides[get_db] = lambda: db
return TestClient(test_app, follow_redirects=False) # noqa — replaced below
@pytest.fixture
def client(test_app):
return TestClient(test_app, follow_redirects=False)
def _token_cookie(username: str, is_admin: bool) -> dict:
token = create_access_token(username=username, is_admin=is_admin)
return {"access_token": token}
def test_no_cookie_redirects_to_login(client):
response = client.get("/protected")
assert response.status_code == 307
assert "/auth/login" in response.headers["location"]
def test_valid_token_returns_user(client, db):
db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw")))
db.commit()
response = client.get("/protected", cookies=_token_cookie("bob", False))
assert response.status_code == 200
assert response.json()["username"] == "bob"
def test_invalid_token_redirects(client):
response = client.get("/protected", cookies={"access_token": "invalid"})
assert response.status_code == 307
def test_require_admin_blocks_non_admin(client, db):
db.add(User(username="regular", full_name="R", is_admin=False, pw_hash=hash_password("pw")))
db.commit()
response = client.get("/admin-only", cookies=_token_cookie("regular", False))
assert response.status_code == 403
def test_require_admin_allows_admin(client, db):
db.add(User(username="admin", full_name="A", is_admin=True, pw_hash=hash_password("pw")))
db.commit()
response = client.get("/admin-only", cookies=_token_cookie("admin", True))
assert response.status_code == 200
```
- [ ] **Step 2: Run to confirm failure**
```bash
.venv/bin/pytest tests/test_auth_dependencies.py -v
```
Expected: FAIL — `ModuleNotFoundError: No module named 'app.modules.auth.dependencies'`
- [ ] **Step 3: Create `app/modules/auth/dependencies.py`**
```python
from fastapi import Depends, HTTPException
from sqlalchemy.orm import Session
from app.core.auth import decode_token, get_token_from_request
from app.core.database import get_db
from app.modules.auth.models import User
from app.modules.auth.service import get_user
from fastapi import Request
class RequiresLoginException(Exception):
pass
async def get_current_user(
request: Request, db: Session = Depends(get_db)
) -> User:
token = get_token_from_request(request)
if not token:
raise RequiresLoginException()
payload = decode_token(token)
if not payload:
raise RequiresLoginException()
user = get_user(db, payload.get("sub", ""))
if user is None or not user.is_active:
raise RequiresLoginException()
return user
async def require_admin(user: User = Depends(get_current_user)) -> User:
if not user.is_admin:
raise HTTPException(status_code=403, detail="Admin access required")
return user
```
- [ ] **Step 4: Fix the test fixture — there is a bug in the test above**
In `tests/test_auth_dependencies.py`, the `test_app` fixture returns `TestClient` incorrectly. Replace the fixture with:
```python
@pytest.fixture
def test_app(db):
app = FastAPI()
@app.exception_handler(RequiresLoginException)
async def handle_login_required(request, exc):
return RedirectResponse(url="/auth/login", status_code=307)
@app.get("/protected")
async def protected(user=Depends(get_current_user)):
return {"username": user.username}
@app.get("/admin-only")
async def admin_only(user=Depends(require_admin)):
return {"username": user.username}
app.dependency_overrides[get_db] = lambda: db
return app
@pytest.fixture
def client(test_app):
return TestClient(test_app, follow_redirects=False)
```
The full corrected `tests/test_auth_dependencies.py`:
```python
import pytest
from fastapi import Depends, FastAPI
from fastapi.responses import RedirectResponse
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.auth import create_access_token
from app.core.database import Base, get_db
from app.modules.auth.dependencies import RequiresLoginException, get_current_user, require_admin
from app.modules.auth.models import User
from app.modules.auth.service import hash_password
@pytest.fixture
def db():
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def test_app(db):
app = FastAPI()
@app.exception_handler(RequiresLoginException)
async def handle_login_required(request, exc):
return RedirectResponse(url="/auth/login", status_code=307)
@app.get("/protected")
async def protected(user=Depends(get_current_user)):
return {"username": user.username}
@app.get("/admin-only")
async def admin_only(user=Depends(require_admin)):
return {"username": user.username}
app.dependency_overrides[get_db] = lambda: db
return app
@pytest.fixture
def client(test_app):
return TestClient(test_app, follow_redirects=False)
def _token_cookie(username: str, is_admin: bool) -> dict:
token = create_access_token(username=username, is_admin=is_admin)
return {"access_token": token}
def test_no_cookie_redirects_to_login(client):
response = client.get("/protected")
assert response.status_code == 307
assert "/auth/login" in response.headers["location"]
def test_valid_token_returns_user(client, db):
db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw")))
db.commit()
response = client.get("/protected", cookies=_token_cookie("bob", False))
assert response.status_code == 200
assert response.json()["username"] == "bob"
def test_invalid_token_redirects(client):
response = client.get("/protected", cookies={"access_token": "invalid"})
assert response.status_code == 307
def test_require_admin_blocks_non_admin(client, db):
db.add(User(username="regular", full_name="R", is_admin=False, pw_hash=hash_password("pw")))
db.commit()
response = client.get("/admin-only", cookies=_token_cookie("regular", False))
assert response.status_code == 403
def test_require_admin_allows_admin(client, db):
db.add(User(username="admin", full_name="A", is_admin=True, pw_hash=hash_password("pw")))
db.commit()
response = client.get("/admin-only", cookies=_token_cookie("admin", True))
assert response.status_code == 200
```
- [ ] **Step 5: Run to confirm pass**
```bash
.venv/bin/pytest tests/test_auth_dependencies.py -v
```
Expected: All 5 tests PASS.
- [ ] **Step 6: Commit**
```bash
git add app/modules/auth/dependencies.py tests/test_auth_dependencies.py
git commit -m "feat: add get_current_user and require_admin dependencies"
```
---
### Task 7: Auth router and login template
**Files:**
- Create: `app/modules/auth/router.py`
- Create: `app/templates/auth/login.html`
- Test: `tests/test_auth_router.py`
- [ ] **Step 1: Write failing test**
Create `tests/test_auth_router.py`:
```python
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.database import Base, get_db
from app.main import app
from app.modules.auth.models import User
from app.modules.auth.service import hash_password
@pytest.fixture(autouse=True)
def override_db():
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()
app.dependency_overrides[get_db] = lambda: session
yield session
app.dependency_overrides.clear()
session.close()
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def client():
return TestClient(app, follow_redirects=False)
@pytest.fixture
def alice(override_db):
user = User(username="alice", full_name="Alice Smith", pw_hash=hash_password("secret123"))
override_db.add(user)
override_db.commit()
return user
def test_get_login_page(client):
response = client.get("/auth/login")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
assert "Anmelden" in response.text
def test_login_correct_credentials_redirects(client, alice):
response = client.post("/auth/login", data={"username": "alice", "password": "secret123"})
assert response.status_code in (302, 303, 307)
assert "access_token" in response.cookies
def test_login_wrong_password_shows_error(client, alice):
response = client.post("/auth/login", data={"username": "alice", "password": "wrong"})
assert response.status_code == 200
assert "Ungültige" in response.text
def test_login_unknown_user_shows_error(client):
response = client.post("/auth/login", data={"username": "ghost", "password": "any"})
assert response.status_code == 200
assert "Ungültige" in response.text
def test_logout_clears_cookie(client, alice):
client.post("/auth/login", data={"username": "alice", "password": "secret123"})
response = client.get("/auth/logout")
assert response.status_code in (302, 303, 307)
assert response.cookies.get("access_token", "") == ""
```
- [ ] **Step 2: Run to confirm failure**
```bash
.venv/bin/pytest tests/test_auth_router.py -v
```
Expected: FAIL — router not yet registered in `app/main.py`
- [ ] **Step 3: Create `app/templates/auth/login.html`**
```bash
mkdir -p app/templates/auth
```
```html
{% extends "base.html" %}
{% block title %}Anmelden · EFI Hub{% endblock %}
{% block content %}
<div class="min-h-[80vh] flex items-center justify-center px-4">
<div class="w-full max-w-sm">
<div class="text-center mb-8">
<p class="text-xs font-semibold uppercase tracking-widest text-efi mb-1">
Fakultät EFI · TH Nürnberg
</p>
<h1 class="text-xl font-bold text-gray-900">Anmelden</h1>
<p class="text-sm text-gray-500 mt-1">Bitte melde dich mit deinen Zugangsdaten an.</p>
</div>
{% if error %}
<div class="mb-4 px-4 py-3 rounded-md bg-red-50 border border-red-200 text-sm text-red-700">
{{ error }}
</div>
{% endif %}
<form method="post" action="/auth/login" class="bg-white rounded-lg border border-gray-200 shadow-sm p-6 space-y-4">
<div>
<label class="block text-xs font-semibold text-gray-600 uppercase tracking-wide mb-1" for="username">
Benutzername
</label>
<input
id="username" name="username" type="text"
class="w-full border border-gray-300 rounded-md px-3 py-2 text-sm focus:outline-none focus:border-efi focus:ring-1 focus:ring-efi"
autocomplete="username" autofocus required
>
</div>
<div>
<label class="block text-xs font-semibold text-gray-600 uppercase tracking-wide mb-1" for="password">
Passwort
</label>
<input
id="password" name="password" type="password"
class="w-full border border-gray-300 rounded-md px-3 py-2 text-sm focus:outline-none focus:border-efi focus:ring-1 focus:ring-efi"
autocomplete="current-password" required
>
</div>
<button
type="submit"
class="w-full bg-efi text-white text-sm font-semibold py-2 px-4 rounded-md hover:opacity-90 transition-opacity"
>
Anmelden
</button>
</form>
</div>
</div>
{% endblock %}
```
- [ ] **Step 4: Create `app/modules/auth/router.py`**
```python
from fastapi import APIRouter, Depends, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from app.core.auth import clear_auth_cookie, create_access_token, set_auth_cookie
from app.core.config import get_settings
from app.core.database import get_db
from app.modules.auth.dependencies import get_current_user
from app.modules.auth.schemas import UserOut
from app.modules.auth.service import authenticate_user
router = APIRouter(prefix="/auth", tags=["auth"])
templates = Jinja2Templates(directory="app/templates")
settings = get_settings()
_NAV: list[dict] = []
@router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
return templates.TemplateResponse(
request, "auth/login.html", {"nav_items": _NAV, "app_version": "0.1.0"}
)
@router.post("/login", response_class=HTMLResponse)
async def login(
request: Request,
username: str = Form(...),
password: str = Form(...),
db: Session = Depends(get_db),
):
user = authenticate_user(db, username, password, ldap_enabled=settings.LDAP_ENABLED)
if user is None:
return templates.TemplateResponse(
request,
"auth/login.html",
{"nav_items": _NAV, "app_version": "0.1.0", "error": "Ungültige Zugangsdaten."},
status_code=401,
)
token = create_access_token(username=user.username, is_admin=user.is_admin)
response = RedirectResponse(url="/", status_code=303)
set_auth_cookie(response, token)
return response
@router.get("/logout")
async def logout():
response = RedirectResponse(url="/auth/login", status_code=303)
clear_auth_cookie(response)
return response
@router.get("/me", response_model=UserOut)
async def me(user=Depends(get_current_user)):
return user
```
- [ ] **Step 5: Run to confirm failure (router not wired)**
```bash
.venv/bin/pytest tests/test_auth_router.py -v
```
Expected: FAIL — routes return 404 (router not included in main app yet).
- [ ] **Step 6: Commit templates and router**
```bash
git add app/modules/auth/router.py app/templates/auth/login.html tests/test_auth_router.py
git commit -m "feat: add auth router with login/logout/me and login template"
```
---
### Task 8: Wire auth into main.py and create admin script
**Files:**
- Modify: `app/main.py`
- Create: `scripts/create_admin.py`
- [ ] **Step 1: Replace `app/main.py`**
```python
from fastapi import Depends, FastAPI, Request, WebSocket
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from app.core.auth import get_token_from_request
from app.core.config import get_settings
from app.core.database import get_db
from app.modules.auth.dependencies import RequiresLoginException, get_current_user
from app.modules.auth.router import router as auth_router
settings = get_settings()
app = FastAPI(
title="University Process Hub",
root_path=settings.APP_PREFIX,
)
app.mount("/static", StaticFiles(directory="app/static"), name="static")
templates = Jinja2Templates(directory="app/templates")
app.include_router(auth_router)
@app.exception_handler(RequiresLoginException)
async def requires_login_handler(request: Request, exc: RequiresLoginException):
return RedirectResponse(url="/auth/login", status_code=307)
MODULES = [
{
"icon": "📡",
"name": "RSS-Feed Server",
"description": "Aggregiert und verteilt Neuigkeiten der Fakultät als standardkonformen RSS 2.0 Feed.",
"status": "active",
},
{
"icon": "📅",
"name": "Kalender",
"description": "Veranstaltungen, Prüfungstermine und Fristen im iCal-Format.",
"status": "planned",
},
{
"icon": "🔔",
"name": "Benachrichtigungen",
"description": "Push-Nachrichten und WebSocket-basierte Echtzeit-Alerts für Studierende.",
"status": "planned",
},
{
"icon": "📊",
"name": "Auslastung",
"description": "Raum- und Ressourcenauslastung der Fakultät in Echtzeit.",
"status": "planned",
},
{
"icon": "📚",
"name": "Lehrveranstaltungen",
"description": "Stundenplan-API und Kursinformationen aus dem Campus-System.",
"status": "planned",
},
]
NAV_ITEMS = [
{"label": "Übersicht", "url": "/", "active": True},
{"label": "RSS-Feeds", "url": "/rss", "active": False},
{"label": "Kalender", "url": "/kalender", "active": False},
{"label": "Docs", "url": "/docs", "active": False},
]
def _db_mode() -> str:
url = settings.DATABASE_URL
return "SQLite (dev)" if url.startswith("sqlite") else "MariaDB (prod)"
@app.get("/", response_class=HTMLResponse)
async def root(request: Request, current_user=Depends(get_current_user)):
return templates.TemplateResponse(
request,
"index.html",
{
"nav_items": NAV_ITEMS,
"modules": MODULES,
"db_mode": _db_mode(),
"app_version": "0.1.0",
"current_user": current_user,
},
)
@app.websocket("/ws/hello/{name}")
async def websocket_hello(websocket: WebSocket, name: str):
await websocket.accept()
await websocket.send_text(f"Hello, {name}!")
await websocket.close()
```
- [ ] **Step 2: Update `tests/test_landing.py`**
`GET /` erfordert jetzt Auth — die alten Tests ohne Cookie schlagen fehl. Ersetze `tests/test_landing.py` vollständig:
```python
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.auth import create_access_token
from app.core.database import Base, get_db
from app.main import app
from app.modules.auth.models import User
from app.modules.auth.service import hash_password
@pytest.fixture(autouse=True)
def override_db():
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()
app.dependency_overrides[get_db] = lambda: session
yield session
app.dependency_overrides.clear()
session.close()
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def client():
return TestClient(app, follow_redirects=False)
@pytest.fixture
def auth_cookies(override_db):
user = User(username="testuser", full_name="Test User", pw_hash=hash_password("pw"))
override_db.add(user)
override_db.commit()
token = create_access_token(username="testuser", is_admin=False)
return {"access_token": token}
def test_landing_without_auth_redirects(client):
response = client.get("/")
assert response.status_code == 307
assert "/auth/login" in response.headers["location"]
def test_landing_returns_html(client, auth_cookies):
response = client.get("/", cookies=auth_cookies)
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
def test_landing_contains_title(client, auth_cookies):
response = client.get("/", cookies=auth_cookies)
assert "University Process Hub" in response.text
def test_landing_contains_rss_module(client, auth_cookies):
response = client.get("/", cookies=auth_cookies)
assert "RSS-Feed Server" in response.text
def test_landing_navbar_links_present(client, auth_cookies):
response = client.get("/", cookies=auth_cookies)
assert "Übersicht" in response.text
def test_landing_info_strip_shows_db_mode(client, auth_cookies):
response = client.get("/", cookies=auth_cookies)
assert "SQLite" in response.text or "MariaDB" in response.text
```
- [ ] **Step 3: Run all tests**
```bash
.venv/bin/pytest -v
```
Expected: All tests PASS, including `test_auth_router.py` which now has the router registered.
- [ ] **Step 4: Create admin user script**
```bash
mkdir -p scripts
```
Create `scripts/create_admin.py`:
```python
"""
Create or reset the admin user.
Usage:
.venv/bin/python scripts/create_admin.py <username> <password>
Example:
.venv/bin/python scripts/create_admin.py admin geheim123
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from app.core.database import SessionLocal, engine, Base
from app.modules.auth.models import User # noqa: F401 — registers table
from app.modules.auth.service import get_user, hash_password
def create_admin(username: str, password: str) -> None:
Base.metadata.create_all(bind=engine)
with SessionLocal() as db:
user = get_user(db, username)
if user is None:
user = User(username=username, full_name="Administrator")
db.add(user)
user.pw_hash = hash_password(password)
user.is_admin = True
user.is_active = True
db.commit()
print(f"Admin user '{username}' created/updated.")
if __name__ == "__main__":
if len(sys.argv) != 3:
print(__doc__)
sys.exit(1)
create_admin(sys.argv[1], sys.argv[2])
```
- [ ] **Step 5: Create initial admin user**
```bash
.venv/bin/python scripts/create_admin.py admin admin
```
Expected: `Admin user 'admin' created/updated.`
- [ ] **Step 6: Start server and verify login**
```bash
.venv/bin/uvicorn app.main:app --reload
```
Open `http://localhost:8000` — should redirect to `http://localhost:8000/auth/login`.
Log in with `admin` / `admin`. Should redirect to landing page.
- [ ] **Step 7: Run full test suite one last time**
```bash
.venv/bin/pytest -v
```
Expected: All tests PASS.
- [ ] **Step 8: Commit**
```bash
git add app/main.py scripts/
git commit -m "feat: wire auth into main app, protect landing page, add create_admin script"
```