1500 lines
40 KiB
Markdown
1500 lines
40 KiB
Markdown
# Auth Part 1: Core Auth Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Implement local user authentication with bcrypt passwords, JWT in httpOnly cookies, SQLAlchemy User model, Alembic migrations, and FastAPI route protection.
|
|
|
|
**Architecture:** Single `users` table via SQLAlchemy 2.0 + Alembic. `app/core/database.py` provides the engine and `get_db()` dependency. `app/core/auth.py` handles JWT only (no business logic). Business logic lives in `app/modules/auth/service.py`. Route protection via `get_current_user` dependency that raises `RequiresLoginException` → caught by exception handler in `main.py` → redirect to `/auth/login`. LDAP hook is a `pass` stub in `service.py` until Part 2.
|
|
|
|
**Tech Stack:** SQLAlchemy 2.0, Alembic, python-jose[cryptography], passlib[bcrypt], python-multipart, pytest
|
|
|
|
---
|
|
|
|
### Task 1: Dependencies and config
|
|
|
|
**Files:**
|
|
- Modify: `requirements.txt`
|
|
- Modify: `app/core/config.py`
|
|
- Modify: `.env.example`
|
|
- Test: `tests/test_config.py`
|
|
|
|
- [ ] **Step 1: Update `requirements.txt`**
|
|
|
|
```
|
|
fastapi
|
|
uvicorn[standard]
|
|
pydantic-settings
|
|
sqlalchemy
|
|
jinja2
|
|
pytest
|
|
httpx
|
|
alembic
|
|
python-jose[cryptography]
|
|
passlib[bcrypt]
|
|
python-multipart
|
|
ldap3
|
|
```
|
|
|
|
- [ ] **Step 2: Install**
|
|
|
|
```bash
|
|
.venv/bin/pip install -r requirements.txt -q
|
|
```
|
|
|
|
Expected: no errors. `passlib`, `jose`, `alembic` appear in output.
|
|
|
|
- [ ] **Step 3: Write failing test**
|
|
|
|
Create `tests/test_config.py`:
|
|
|
|
```python
|
|
from app.core.config import get_settings
|
|
|
|
|
|
def test_new_config_fields_have_defaults():
|
|
s = get_settings()
|
|
assert hasattr(s, "SECRET_KEY")
|
|
assert hasattr(s, "LDAP_ENABLED")
|
|
assert s.LDAP_ENABLED is False
|
|
assert s.LDAP_SERVER == "gso1.ads1.fh-nuernberg.de"
|
|
assert s.LDAP_DOMAIN == "ADS1"
|
|
```
|
|
|
|
- [ ] **Step 4: Run test to confirm failure**
|
|
|
|
```bash
|
|
.venv/bin/pytest tests/test_config.py -v
|
|
```
|
|
|
|
Expected: FAIL — `AttributeError: 'Settings' object has no attribute 'SECRET_KEY'`
|
|
|
|
- [ ] **Step 5: Replace `app/core/config.py`**
|
|
|
|
```python
|
|
from functools import lru_cache
|
|
|
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
|
|
|
|
|
class Settings(BaseSettings):
|
|
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
|
|
|
|
APP_ENV: str = "development"
|
|
APP_PREFIX: str = ""
|
|
DATABASE_URL: str = "sqlite:///./app.db"
|
|
SECRET_KEY: str = "changeme-replace-in-production"
|
|
LDAP_ENABLED: bool = False
|
|
LDAP_SERVER: str = "gso1.ads1.fh-nuernberg.de"
|
|
LDAP_DOMAIN: str = "ADS1"
|
|
LDAP_SEARCH_BASE: str = "OU=users,OU=EFI,OU=Faculties,DC=ADS1,DC=fh-nuernberg,DC=de"
|
|
|
|
|
|
@lru_cache
|
|
def get_settings() -> Settings:
|
|
return Settings()
|
|
```
|
|
|
|
- [ ] **Step 6: Update `.env.example`**
|
|
|
|
```
|
|
APP_ENV=development
|
|
APP_PREFIX=
|
|
DATABASE_URL=sqlite:///./app.db
|
|
SECRET_KEY=changeme-replace-in-production
|
|
|
|
# Produktion (MariaDB + LDAP):
|
|
# APP_ENV=production
|
|
# APP_PREFIX=/uph
|
|
# DATABASE_URL=mysql+pymysql://user:password@db:3306/efihub
|
|
# SECRET_KEY=<python -c "import secrets; print(secrets.token_hex(32))">
|
|
# LDAP_ENABLED=true
|
|
```
|
|
|
|
- [ ] **Step 7: Run test to confirm pass**
|
|
|
|
```bash
|
|
.venv/bin/pytest tests/test_config.py -v
|
|
```
|
|
|
|
Expected: PASS.
|
|
|
|
- [ ] **Step 8: Commit**
|
|
|
|
```bash
|
|
git add requirements.txt app/core/config.py .env.example tests/test_config.py
|
|
git commit -m "feat: add auth config fields (SECRET_KEY, LDAP_*)"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 2: Database layer
|
|
|
|
**Files:**
|
|
- Create: `app/core/database.py`
|
|
- Test: `tests/test_database.py`
|
|
|
|
- [ ] **Step 1: Write failing test**
|
|
|
|
Create `tests/test_database.py`:
|
|
|
|
```python
|
|
from sqlalchemy.orm import Session
|
|
from app.core.database import Base, get_db
|
|
|
|
|
|
def test_get_db_yields_session():
|
|
gen = get_db()
|
|
db = next(gen)
|
|
assert isinstance(db, Session)
|
|
try:
|
|
next(gen)
|
|
except StopIteration:
|
|
pass
|
|
|
|
|
|
def test_base_has_metadata():
|
|
assert Base.metadata is not None
|
|
```
|
|
|
|
- [ ] **Step 2: Run to confirm failure**
|
|
|
|
```bash
|
|
.venv/bin/pytest tests/test_database.py -v
|
|
```
|
|
|
|
Expected: FAIL — `ModuleNotFoundError: No module named 'app.core.database'`
|
|
|
|
- [ ] **Step 3: Create `app/core/database.py`**
|
|
|
|
```python
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
|
|
|
|
from app.core.config import get_settings
|
|
|
|
settings = get_settings()
|
|
|
|
_connect_args = {"check_same_thread": False} if settings.DATABASE_URL.startswith("sqlite") else {}
|
|
engine = create_engine(settings.DATABASE_URL, connect_args=_connect_args)
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
|
|
def get_db() -> Session:
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
```
|
|
|
|
- [ ] **Step 4: Run to confirm pass**
|
|
|
|
```bash
|
|
.venv/bin/pytest tests/test_database.py -v
|
|
```
|
|
|
|
Expected: PASS.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add app/core/database.py tests/test_database.py
|
|
git commit -m "feat: add SQLAlchemy database layer with get_db dependency"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 3: User model and Alembic migrations
|
|
|
|
**Files:**
|
|
- Create: `app/modules/__init__.py`
|
|
- Create: `app/modules/auth/__init__.py`
|
|
- Create: `app/modules/auth/models.py`
|
|
- Create: `alembic.ini`, `alembic/env.py`, `alembic/versions/`
|
|
- Test: `tests/test_user_model.py`
|
|
|
|
- [ ] **Step 1: Create module packages**
|
|
|
|
```bash
|
|
mkdir -p app/modules/auth
|
|
touch app/modules/__init__.py app/modules/auth/__init__.py
|
|
```
|
|
|
|
- [ ] **Step 2: Write failing test**
|
|
|
|
Create `tests/test_user_model.py`:
|
|
|
|
```python
|
|
import pytest
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from app.core.database import Base
|
|
from app.modules.auth.models import User
|
|
|
|
|
|
@pytest.fixture
|
|
def db():
|
|
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
|
|
Base.metadata.create_all(bind=engine)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
yield session
|
|
session.close()
|
|
Base.metadata.drop_all(bind=engine)
|
|
|
|
|
|
def test_create_user(db):
|
|
user = User(username="testuser", full_name="Test User", pw_hash="hashed")
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
assert user.id is not None
|
|
assert user.username == "testuser"
|
|
assert user.is_active is True
|
|
assert user.is_admin is False
|
|
assert user.created_at is not None
|
|
|
|
|
|
def test_user_defaults(db):
|
|
user = User(username="alice", full_name="Alice")
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
assert user.pw_hash is None
|
|
assert user.email is None
|
|
assert user.department is None
|
|
assert user.last_login is None
|
|
|
|
|
|
def test_username_must_be_unique(db):
|
|
db.add(User(username="dup", full_name="First"))
|
|
db.commit()
|
|
db.add(User(username="dup", full_name="Second"))
|
|
with pytest.raises(Exception):
|
|
db.commit()
|
|
```
|
|
|
|
- [ ] **Step 3: Run to confirm failure**
|
|
|
|
```bash
|
|
.venv/bin/pytest tests/test_user_model.py -v
|
|
```
|
|
|
|
Expected: FAIL — `ModuleNotFoundError: No module named 'app.modules.auth.models'`
|
|
|
|
- [ ] **Step 4: Create `app/modules/auth/models.py`**
|
|
|
|
```python
|
|
from datetime import datetime
|
|
|
|
from sqlalchemy import Boolean, DateTime, Integer, String
|
|
from sqlalchemy.orm import Mapped, mapped_column
|
|
from sqlalchemy.sql import func
|
|
|
|
from app.core.database import Base
|
|
|
|
|
|
class User(Base):
|
|
__tablename__ = "users"
|
|
|
|
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
|
|
username: Mapped[str] = mapped_column(String(64), unique=True, nullable=False, index=True)
|
|
full_name: Mapped[str] = mapped_column(String(128), nullable=False, default="")
|
|
email: Mapped[str | None] = mapped_column(String(128), nullable=True)
|
|
department: Mapped[str | None] = mapped_column(String(64), nullable=True)
|
|
role: Mapped[str | None] = mapped_column(String(64), nullable=True)
|
|
pw_hash: Mapped[str | None] = mapped_column(String(256), nullable=True)
|
|
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
|
|
is_admin: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
|
|
account_expires: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
|
|
last_login: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
|
|
created_at: Mapped[datetime] = mapped_column(
|
|
DateTime, server_default=func.now(), nullable=False
|
|
)
|
|
updated_at: Mapped[datetime] = mapped_column(
|
|
DateTime, server_default=func.now(), onupdate=func.now(), nullable=False
|
|
)
|
|
```
|
|
|
|
- [ ] **Step 5: Run to confirm pass**
|
|
|
|
```bash
|
|
.venv/bin/pytest tests/test_user_model.py -v
|
|
```
|
|
|
|
Expected: 3 tests PASS.
|
|
|
|
- [ ] **Step 6: Initialize Alembic**
|
|
|
|
```bash
|
|
.venv/bin/alembic init alembic
|
|
```
|
|
|
|
Creates `alembic.ini` and `alembic/` directory.
|
|
|
|
- [ ] **Step 7: Configure `alembic.ini`**
|
|
|
|
Find and replace the database URL line:
|
|
```ini
|
|
# Before:
|
|
sqlalchemy.url = driver://user:pass@localhost/dbname
|
|
|
|
# After:
|
|
sqlalchemy.url = sqlite:///./app.db
|
|
```
|
|
|
|
- [ ] **Step 8: Replace `alembic/env.py`**
|
|
|
|
```python
|
|
from logging.config import fileConfig
|
|
|
|
from sqlalchemy import engine_from_config, pool
|
|
from alembic import context
|
|
|
|
from app.core.database import Base
|
|
import app.modules.auth.models # noqa: F401 — registers User with Base.metadata
|
|
|
|
config = context.config
|
|
if config.config_file_name is not None:
|
|
fileConfig(config.config_file_name)
|
|
|
|
target_metadata = Base.metadata
|
|
|
|
|
|
def run_migrations_offline() -> None:
|
|
url = config.get_main_option("sqlalchemy.url")
|
|
context.configure(url=url, target_metadata=target_metadata, literal_binds=True)
|
|
with context.begin_transaction():
|
|
context.run_migrations()
|
|
|
|
|
|
def run_migrations_online() -> None:
|
|
connectable = engine_from_config(
|
|
config.get_section(config.config_ini_section, {}),
|
|
prefix="sqlalchemy.",
|
|
poolclass=pool.NullPool,
|
|
)
|
|
with connectable.connect() as connection:
|
|
context.configure(connection=connection, target_metadata=target_metadata)
|
|
with context.begin_transaction():
|
|
context.run_migrations()
|
|
|
|
|
|
if context.is_offline_mode():
|
|
run_migrations_offline()
|
|
else:
|
|
run_migrations_online()
|
|
```
|
|
|
|
- [ ] **Step 9: Generate migration**
|
|
|
|
```bash
|
|
.venv/bin/alembic revision --autogenerate -m "create users table"
|
|
```
|
|
|
|
Expected: `Generating alembic/versions/<hash>_create_users_table.py`. Open the file and verify it contains `op.create_table("users", ...)` with all 13 columns.
|
|
|
|
- [ ] **Step 10: Run migration**
|
|
|
|
```bash
|
|
.venv/bin/alembic upgrade head
|
|
```
|
|
|
|
Expected: `Running upgrade -> <hash>, create users table`
|
|
|
|
- [ ] **Step 11: Commit**
|
|
|
|
```bash
|
|
git add app/modules/ alembic/ alembic.ini tests/test_user_model.py
|
|
git commit -m "feat: add User model and Alembic migration for users table"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 4: Auth schemas and service (local auth)
|
|
|
|
**Files:**
|
|
- Create: `app/modules/auth/schemas.py`
|
|
- Create: `app/modules/auth/service.py`
|
|
- Test: `tests/test_auth_service.py`
|
|
|
|
- [ ] **Step 1: Create `app/modules/auth/schemas.py`**
|
|
|
|
```python
|
|
from datetime import datetime
|
|
|
|
from pydantic import BaseModel
|
|
|
|
|
|
class UserOut(BaseModel):
|
|
id: int
|
|
username: str
|
|
full_name: str
|
|
email: str | None
|
|
department: str | None
|
|
role: str | None
|
|
is_active: bool
|
|
is_admin: bool
|
|
last_login: datetime | None
|
|
created_at: datetime
|
|
|
|
model_config = {"from_attributes": True}
|
|
```
|
|
|
|
- [ ] **Step 2: Write failing test**
|
|
|
|
Create `tests/test_auth_service.py`:
|
|
|
|
```python
|
|
import pytest
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from app.core.database import Base
|
|
from app.modules.auth.models import User
|
|
from app.modules.auth.service import authenticate_user, get_user, hash_password, verify_password
|
|
|
|
|
|
@pytest.fixture
|
|
def db():
|
|
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
|
|
Base.metadata.create_all(bind=engine)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
yield session
|
|
session.close()
|
|
Base.metadata.drop_all(bind=engine)
|
|
|
|
|
|
@pytest.fixture
|
|
def alice(db):
|
|
user = User(username="alice", full_name="Alice Smith", pw_hash=hash_password("secret123"))
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
|
|
def test_hash_and_verify_password():
|
|
hashed = hash_password("mypassword")
|
|
assert verify_password("mypassword", hashed) is True
|
|
assert verify_password("wrong", hashed) is False
|
|
|
|
|
|
def test_get_user_found(db, alice):
|
|
assert get_user(db, "alice") is not None
|
|
|
|
|
|
def test_get_user_not_found(db):
|
|
assert get_user(db, "nobody") is None
|
|
|
|
|
|
def test_authenticate_correct_password(db, alice):
|
|
user = authenticate_user(db, "alice", "secret123", ldap_enabled=False)
|
|
assert user is not None
|
|
assert user.last_login is not None
|
|
|
|
|
|
def test_authenticate_wrong_password(db, alice):
|
|
assert authenticate_user(db, "alice", "wrong", ldap_enabled=False) is None
|
|
|
|
|
|
def test_authenticate_unknown_user(db):
|
|
assert authenticate_user(db, "ghost", "any", ldap_enabled=False) is None
|
|
|
|
|
|
def test_authenticate_inactive_user(db):
|
|
user = User(username="inactive", full_name="X", pw_hash=hash_password("pw"), is_active=False)
|
|
db.add(user)
|
|
db.commit()
|
|
assert authenticate_user(db, "inactive", "pw", ldap_enabled=False) is None
|
|
|
|
|
|
def test_authenticate_no_pw_hash_no_ldap(db):
|
|
user = User(username="ldaponly", full_name="Y", pw_hash=None)
|
|
db.add(user)
|
|
db.commit()
|
|
assert authenticate_user(db, "ldaponly", "any", ldap_enabled=False) is None
|
|
```
|
|
|
|
- [ ] **Step 3: Run to confirm failure**
|
|
|
|
```bash
|
|
.venv/bin/pytest tests/test_auth_service.py -v
|
|
```
|
|
|
|
Expected: FAIL — `ModuleNotFoundError: No module named 'app.modules.auth.service'`
|
|
|
|
- [ ] **Step 4: Create `app/modules/auth/service.py`**
|
|
|
|
```python
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
from passlib.context import CryptContext
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.modules.auth.models import User
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
|
|
def hash_password(plain: str) -> str:
|
|
return pwd_context.hash(plain)
|
|
|
|
|
|
def verify_password(plain: str, hashed: str) -> bool:
|
|
return pwd_context.verify(plain, hashed)
|
|
|
|
|
|
def get_user(db: Session, username: str) -> Optional[User]:
|
|
return db.query(User).filter(User.username == username).first()
|
|
|
|
|
|
def authenticate_user(
|
|
db: Session, username: str, password: str, ldap_enabled: bool
|
|
) -> Optional[User]:
|
|
user = get_user(db, username)
|
|
if user is None or not user.is_active:
|
|
return None
|
|
|
|
local_ok = user.pw_hash is not None and verify_password(password, user.pw_hash)
|
|
if local_ok:
|
|
_touch_last_login(db, user)
|
|
return user
|
|
|
|
if ldap_enabled:
|
|
# LDAP auth implemented in Part 2
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def _touch_last_login(db: Session, user: User) -> None:
|
|
user.last_login = datetime.now(timezone.utc)
|
|
db.commit()
|
|
```
|
|
|
|
- [ ] **Step 5: Run to confirm pass**
|
|
|
|
```bash
|
|
.venv/bin/pytest tests/test_auth_service.py -v
|
|
```
|
|
|
|
Expected: All 8 tests PASS.
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add app/modules/auth/schemas.py app/modules/auth/service.py tests/test_auth_service.py
|
|
git commit -m "feat: add auth service with local bcrypt password authentication"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 5: JWT and cookie helpers
|
|
|
|
**Files:**
|
|
- Create: `app/core/auth.py`
|
|
- Test: `tests/test_core_auth.py`
|
|
|
|
- [ ] **Step 1: Write failing test**
|
|
|
|
Create `tests/test_core_auth.py`:
|
|
|
|
```python
|
|
from app.core.auth import COOKIE_NAME, create_access_token, decode_token
|
|
|
|
|
|
def test_create_and_decode_token():
|
|
token = create_access_token(username="alice", is_admin=False)
|
|
payload = decode_token(token)
|
|
assert payload is not None
|
|
assert payload["sub"] == "alice"
|
|
assert payload["is_admin"] is False
|
|
|
|
|
|
def test_admin_claim():
|
|
token = create_access_token(username="admin", is_admin=True)
|
|
assert decode_token(token)["is_admin"] is True
|
|
|
|
|
|
def test_decode_invalid_token():
|
|
assert decode_token("not.a.valid.token") is None
|
|
|
|
|
|
def test_decode_tampered_token():
|
|
token = create_access_token(username="alice", is_admin=False)
|
|
assert decode_token(token[:-4] + "xxxx") is None
|
|
|
|
|
|
def test_cookie_name():
|
|
assert COOKIE_NAME == "access_token"
|
|
```
|
|
|
|
- [ ] **Step 2: Run to confirm failure**
|
|
|
|
```bash
|
|
.venv/bin/pytest tests/test_core_auth.py -v
|
|
```
|
|
|
|
Expected: FAIL — `ModuleNotFoundError: No module named 'app.core.auth'`
|
|
|
|
- [ ] **Step 3: Create `app/core/auth.py`**
|
|
|
|
```python
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
|
|
from fastapi import Request, Response
|
|
from jose import JWTError, jwt
|
|
|
|
from app.core.config import get_settings
|
|
|
|
settings = get_settings()
|
|
|
|
COOKIE_NAME = "access_token"
|
|
ALGORITHM = "HS256"
|
|
TOKEN_EXPIRE_HOURS = 8
|
|
|
|
|
|
def create_access_token(username: str, is_admin: bool) -> str:
|
|
expire = datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRE_HOURS)
|
|
return jwt.encode(
|
|
{"sub": username, "is_admin": is_admin, "exp": expire},
|
|
settings.SECRET_KEY,
|
|
algorithm=ALGORITHM,
|
|
)
|
|
|
|
|
|
def decode_token(token: str) -> Optional[dict]:
|
|
try:
|
|
return jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
|
|
except JWTError:
|
|
return None
|
|
|
|
|
|
def get_token_from_request(request: Request) -> Optional[str]:
|
|
return request.cookies.get(COOKIE_NAME)
|
|
|
|
|
|
def set_auth_cookie(response: Response, token: str) -> None:
|
|
response.set_cookie(
|
|
key=COOKIE_NAME,
|
|
value=token,
|
|
httponly=True,
|
|
samesite="lax",
|
|
secure=settings.APP_ENV == "production",
|
|
)
|
|
|
|
|
|
def clear_auth_cookie(response: Response) -> None:
|
|
response.delete_cookie(key=COOKIE_NAME, httponly=True, samesite="lax")
|
|
```
|
|
|
|
- [ ] **Step 4: Run to confirm pass**
|
|
|
|
```bash
|
|
.venv/bin/pytest tests/test_core_auth.py -v
|
|
```
|
|
|
|
Expected: All 5 tests PASS.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add app/core/auth.py tests/test_core_auth.py
|
|
git commit -m "feat: add JWT creation, decoding and cookie helpers"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 6: FastAPI auth dependencies
|
|
|
|
**Files:**
|
|
- Create: `app/modules/auth/dependencies.py`
|
|
- Test: `tests/test_auth_dependencies.py`
|
|
|
|
- [ ] **Step 1: Write failing test**
|
|
|
|
Create `tests/test_auth_dependencies.py`:
|
|
|
|
```python
|
|
import pytest
|
|
from fastapi import Depends, FastAPI
|
|
from fastapi.responses import RedirectResponse
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from app.core.auth import create_access_token
|
|
from app.core.database import Base, get_db
|
|
from app.modules.auth.dependencies import RequiresLoginException, get_current_user, require_admin
|
|
from app.modules.auth.models import User
|
|
from app.modules.auth.service import hash_password
|
|
|
|
|
|
@pytest.fixture
|
|
def db():
|
|
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
|
|
Base.metadata.create_all(bind=engine)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
yield session
|
|
session.close()
|
|
Base.metadata.drop_all(bind=engine)
|
|
|
|
|
|
@pytest.fixture
|
|
def test_app(db):
|
|
app = FastAPI()
|
|
|
|
@app.exception_handler(RequiresLoginException)
|
|
async def handle_login_required(request, exc):
|
|
return RedirectResponse(url="/auth/login", status_code=307)
|
|
|
|
@app.get("/protected")
|
|
async def protected(user=Depends(get_current_user)):
|
|
return {"username": user.username}
|
|
|
|
@app.get("/admin-only")
|
|
async def admin_only(user=Depends(require_admin)):
|
|
return {"username": user.username}
|
|
|
|
app.dependency_overrides[get_db] = lambda: db
|
|
return TestClient(test_app, follow_redirects=False) # noqa — replaced below
|
|
|
|
|
|
@pytest.fixture
|
|
def client(test_app):
|
|
return TestClient(test_app, follow_redirects=False)
|
|
|
|
|
|
def _token_cookie(username: str, is_admin: bool) -> dict:
|
|
token = create_access_token(username=username, is_admin=is_admin)
|
|
return {"access_token": token}
|
|
|
|
|
|
def test_no_cookie_redirects_to_login(client):
|
|
response = client.get("/protected")
|
|
assert response.status_code == 307
|
|
assert "/auth/login" in response.headers["location"]
|
|
|
|
|
|
def test_valid_token_returns_user(client, db):
|
|
db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw")))
|
|
db.commit()
|
|
response = client.get("/protected", cookies=_token_cookie("bob", False))
|
|
assert response.status_code == 200
|
|
assert response.json()["username"] == "bob"
|
|
|
|
|
|
def test_invalid_token_redirects(client):
|
|
response = client.get("/protected", cookies={"access_token": "invalid"})
|
|
assert response.status_code == 307
|
|
|
|
|
|
def test_require_admin_blocks_non_admin(client, db):
|
|
db.add(User(username="regular", full_name="R", is_admin=False, pw_hash=hash_password("pw")))
|
|
db.commit()
|
|
response = client.get("/admin-only", cookies=_token_cookie("regular", False))
|
|
assert response.status_code == 403
|
|
|
|
|
|
def test_require_admin_allows_admin(client, db):
|
|
db.add(User(username="admin", full_name="A", is_admin=True, pw_hash=hash_password("pw")))
|
|
db.commit()
|
|
response = client.get("/admin-only", cookies=_token_cookie("admin", True))
|
|
assert response.status_code == 200
|
|
```
|
|
|
|
- [ ] **Step 2: Run to confirm failure**
|
|
|
|
```bash
|
|
.venv/bin/pytest tests/test_auth_dependencies.py -v
|
|
```
|
|
|
|
Expected: FAIL — `ModuleNotFoundError: No module named 'app.modules.auth.dependencies'`
|
|
|
|
- [ ] **Step 3: Create `app/modules/auth/dependencies.py`**
|
|
|
|
```python
|
|
from fastapi import Depends, HTTPException
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.auth import decode_token, get_token_from_request
|
|
from app.core.database import get_db
|
|
from app.modules.auth.models import User
|
|
from app.modules.auth.service import get_user
|
|
|
|
from fastapi import Request
|
|
|
|
|
|
class RequiresLoginException(Exception):
|
|
pass
|
|
|
|
|
|
async def get_current_user(
|
|
request: Request, db: Session = Depends(get_db)
|
|
) -> User:
|
|
token = get_token_from_request(request)
|
|
if not token:
|
|
raise RequiresLoginException()
|
|
payload = decode_token(token)
|
|
if not payload:
|
|
raise RequiresLoginException()
|
|
user = get_user(db, payload.get("sub", ""))
|
|
if user is None or not user.is_active:
|
|
raise RequiresLoginException()
|
|
return user
|
|
|
|
|
|
async def require_admin(user: User = Depends(get_current_user)) -> User:
|
|
if not user.is_admin:
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
return user
|
|
```
|
|
|
|
- [ ] **Step 4: Fix the test fixture — there is a bug in the test above**
|
|
|
|
In `tests/test_auth_dependencies.py`, the `test_app` fixture returns `TestClient` incorrectly. Replace the fixture with:
|
|
|
|
```python
|
|
@pytest.fixture
|
|
def test_app(db):
|
|
app = FastAPI()
|
|
|
|
@app.exception_handler(RequiresLoginException)
|
|
async def handle_login_required(request, exc):
|
|
return RedirectResponse(url="/auth/login", status_code=307)
|
|
|
|
@app.get("/protected")
|
|
async def protected(user=Depends(get_current_user)):
|
|
return {"username": user.username}
|
|
|
|
@app.get("/admin-only")
|
|
async def admin_only(user=Depends(require_admin)):
|
|
return {"username": user.username}
|
|
|
|
app.dependency_overrides[get_db] = lambda: db
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def client(test_app):
|
|
return TestClient(test_app, follow_redirects=False)
|
|
```
|
|
|
|
The full corrected `tests/test_auth_dependencies.py`:
|
|
|
|
```python
|
|
import pytest
|
|
from fastapi import Depends, FastAPI
|
|
from fastapi.responses import RedirectResponse
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from app.core.auth import create_access_token
|
|
from app.core.database import Base, get_db
|
|
from app.modules.auth.dependencies import RequiresLoginException, get_current_user, require_admin
|
|
from app.modules.auth.models import User
|
|
from app.modules.auth.service import hash_password
|
|
|
|
|
|
@pytest.fixture
|
|
def db():
|
|
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
|
|
Base.metadata.create_all(bind=engine)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
yield session
|
|
session.close()
|
|
Base.metadata.drop_all(bind=engine)
|
|
|
|
|
|
@pytest.fixture
|
|
def test_app(db):
|
|
app = FastAPI()
|
|
|
|
@app.exception_handler(RequiresLoginException)
|
|
async def handle_login_required(request, exc):
|
|
return RedirectResponse(url="/auth/login", status_code=307)
|
|
|
|
@app.get("/protected")
|
|
async def protected(user=Depends(get_current_user)):
|
|
return {"username": user.username}
|
|
|
|
@app.get("/admin-only")
|
|
async def admin_only(user=Depends(require_admin)):
|
|
return {"username": user.username}
|
|
|
|
app.dependency_overrides[get_db] = lambda: db
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def client(test_app):
|
|
return TestClient(test_app, follow_redirects=False)
|
|
|
|
|
|
def _token_cookie(username: str, is_admin: bool) -> dict:
|
|
token = create_access_token(username=username, is_admin=is_admin)
|
|
return {"access_token": token}
|
|
|
|
|
|
def test_no_cookie_redirects_to_login(client):
|
|
response = client.get("/protected")
|
|
assert response.status_code == 307
|
|
assert "/auth/login" in response.headers["location"]
|
|
|
|
|
|
def test_valid_token_returns_user(client, db):
|
|
db.add(User(username="bob", full_name="Bob", pw_hash=hash_password("pw")))
|
|
db.commit()
|
|
response = client.get("/protected", cookies=_token_cookie("bob", False))
|
|
assert response.status_code == 200
|
|
assert response.json()["username"] == "bob"
|
|
|
|
|
|
def test_invalid_token_redirects(client):
|
|
response = client.get("/protected", cookies={"access_token": "invalid"})
|
|
assert response.status_code == 307
|
|
|
|
|
|
def test_require_admin_blocks_non_admin(client, db):
|
|
db.add(User(username="regular", full_name="R", is_admin=False, pw_hash=hash_password("pw")))
|
|
db.commit()
|
|
response = client.get("/admin-only", cookies=_token_cookie("regular", False))
|
|
assert response.status_code == 403
|
|
|
|
|
|
def test_require_admin_allows_admin(client, db):
|
|
db.add(User(username="admin", full_name="A", is_admin=True, pw_hash=hash_password("pw")))
|
|
db.commit()
|
|
response = client.get("/admin-only", cookies=_token_cookie("admin", True))
|
|
assert response.status_code == 200
|
|
```
|
|
|
|
- [ ] **Step 5: Run to confirm pass**
|
|
|
|
```bash
|
|
.venv/bin/pytest tests/test_auth_dependencies.py -v
|
|
```
|
|
|
|
Expected: All 5 tests PASS.
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add app/modules/auth/dependencies.py tests/test_auth_dependencies.py
|
|
git commit -m "feat: add get_current_user and require_admin dependencies"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 7: Auth router and login template
|
|
|
|
**Files:**
|
|
- Create: `app/modules/auth/router.py`
|
|
- Create: `app/templates/auth/login.html`
|
|
- Test: `tests/test_auth_router.py`
|
|
|
|
- [ ] **Step 1: Write failing test**
|
|
|
|
Create `tests/test_auth_router.py`:
|
|
|
|
```python
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from app.core.database import Base, get_db
|
|
from app.main import app
|
|
from app.modules.auth.models import User
|
|
from app.modules.auth.service import hash_password
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def override_db():
|
|
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
|
|
Base.metadata.create_all(bind=engine)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
app.dependency_overrides[get_db] = lambda: session
|
|
yield session
|
|
app.dependency_overrides.clear()
|
|
session.close()
|
|
Base.metadata.drop_all(bind=engine)
|
|
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
return TestClient(app, follow_redirects=False)
|
|
|
|
|
|
@pytest.fixture
|
|
def alice(override_db):
|
|
user = User(username="alice", full_name="Alice Smith", pw_hash=hash_password("secret123"))
|
|
override_db.add(user)
|
|
override_db.commit()
|
|
return user
|
|
|
|
|
|
def test_get_login_page(client):
|
|
response = client.get("/auth/login")
|
|
assert response.status_code == 200
|
|
assert "text/html" in response.headers["content-type"]
|
|
assert "Anmelden" in response.text
|
|
|
|
|
|
def test_login_correct_credentials_redirects(client, alice):
|
|
response = client.post("/auth/login", data={"username": "alice", "password": "secret123"})
|
|
assert response.status_code in (302, 303, 307)
|
|
assert "access_token" in response.cookies
|
|
|
|
|
|
def test_login_wrong_password_shows_error(client, alice):
|
|
response = client.post("/auth/login", data={"username": "alice", "password": "wrong"})
|
|
assert response.status_code == 200
|
|
assert "Ungültige" in response.text
|
|
|
|
|
|
def test_login_unknown_user_shows_error(client):
|
|
response = client.post("/auth/login", data={"username": "ghost", "password": "any"})
|
|
assert response.status_code == 200
|
|
assert "Ungültige" in response.text
|
|
|
|
|
|
def test_logout_clears_cookie(client, alice):
|
|
client.post("/auth/login", data={"username": "alice", "password": "secret123"})
|
|
response = client.get("/auth/logout")
|
|
assert response.status_code in (302, 303, 307)
|
|
assert response.cookies.get("access_token", "") == ""
|
|
```
|
|
|
|
- [ ] **Step 2: Run to confirm failure**
|
|
|
|
```bash
|
|
.venv/bin/pytest tests/test_auth_router.py -v
|
|
```
|
|
|
|
Expected: FAIL — router not yet registered in `app/main.py`
|
|
|
|
- [ ] **Step 3: Create `app/templates/auth/login.html`**
|
|
|
|
```bash
|
|
mkdir -p app/templates/auth
|
|
```
|
|
|
|
```html
|
|
{% extends "base.html" %}
|
|
|
|
{% block title %}Anmelden · EFI Hub{% endblock %}
|
|
|
|
{% block content %}
|
|
<div class="min-h-[80vh] flex items-center justify-center px-4">
|
|
<div class="w-full max-w-sm">
|
|
|
|
<div class="text-center mb-8">
|
|
<p class="text-xs font-semibold uppercase tracking-widest text-efi mb-1">
|
|
Fakultät EFI · TH Nürnberg
|
|
</p>
|
|
<h1 class="text-xl font-bold text-gray-900">Anmelden</h1>
|
|
<p class="text-sm text-gray-500 mt-1">Bitte melde dich mit deinen Zugangsdaten an.</p>
|
|
</div>
|
|
|
|
{% if error %}
|
|
<div class="mb-4 px-4 py-3 rounded-md bg-red-50 border border-red-200 text-sm text-red-700">
|
|
{{ error }}
|
|
</div>
|
|
{% endif %}
|
|
|
|
<form method="post" action="/auth/login" class="bg-white rounded-lg border border-gray-200 shadow-sm p-6 space-y-4">
|
|
<div>
|
|
<label class="block text-xs font-semibold text-gray-600 uppercase tracking-wide mb-1" for="username">
|
|
Benutzername
|
|
</label>
|
|
<input
|
|
id="username" name="username" type="text"
|
|
class="w-full border border-gray-300 rounded-md px-3 py-2 text-sm focus:outline-none focus:border-efi focus:ring-1 focus:ring-efi"
|
|
autocomplete="username" autofocus required
|
|
>
|
|
</div>
|
|
<div>
|
|
<label class="block text-xs font-semibold text-gray-600 uppercase tracking-wide mb-1" for="password">
|
|
Passwort
|
|
</label>
|
|
<input
|
|
id="password" name="password" type="password"
|
|
class="w-full border border-gray-300 rounded-md px-3 py-2 text-sm focus:outline-none focus:border-efi focus:ring-1 focus:ring-efi"
|
|
autocomplete="current-password" required
|
|
>
|
|
</div>
|
|
<button
|
|
type="submit"
|
|
class="w-full bg-efi text-white text-sm font-semibold py-2 px-4 rounded-md hover:opacity-90 transition-opacity"
|
|
>
|
|
Anmelden
|
|
</button>
|
|
</form>
|
|
|
|
</div>
|
|
</div>
|
|
{% endblock %}
|
|
```
|
|
|
|
- [ ] **Step 4: Create `app/modules/auth/router.py`**
|
|
|
|
```python
|
|
from fastapi import APIRouter, Depends, Form, Request
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.auth import clear_auth_cookie, create_access_token, set_auth_cookie
|
|
from app.core.config import get_settings
|
|
from app.core.database import get_db
|
|
from app.modules.auth.dependencies import get_current_user
|
|
from app.modules.auth.schemas import UserOut
|
|
from app.modules.auth.service import authenticate_user
|
|
|
|
router = APIRouter(prefix="/auth", tags=["auth"])
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
settings = get_settings()
|
|
|
|
_NAV: list[dict] = []
|
|
|
|
|
|
@router.get("/login", response_class=HTMLResponse)
|
|
async def login_page(request: Request):
|
|
return templates.TemplateResponse(
|
|
request, "auth/login.html", {"nav_items": _NAV, "app_version": "0.1.0"}
|
|
)
|
|
|
|
|
|
@router.post("/login", response_class=HTMLResponse)
|
|
async def login(
|
|
request: Request,
|
|
username: str = Form(...),
|
|
password: str = Form(...),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
user = authenticate_user(db, username, password, ldap_enabled=settings.LDAP_ENABLED)
|
|
if user is None:
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"auth/login.html",
|
|
{"nav_items": _NAV, "app_version": "0.1.0", "error": "Ungültige Zugangsdaten."},
|
|
status_code=401,
|
|
)
|
|
token = create_access_token(username=user.username, is_admin=user.is_admin)
|
|
response = RedirectResponse(url="/", status_code=303)
|
|
set_auth_cookie(response, token)
|
|
return response
|
|
|
|
|
|
@router.get("/logout")
|
|
async def logout():
|
|
response = RedirectResponse(url="/auth/login", status_code=303)
|
|
clear_auth_cookie(response)
|
|
return response
|
|
|
|
|
|
@router.get("/me", response_model=UserOut)
|
|
async def me(user=Depends(get_current_user)):
|
|
return user
|
|
```
|
|
|
|
- [ ] **Step 5: Run to confirm failure (router not wired)**
|
|
|
|
```bash
|
|
.venv/bin/pytest tests/test_auth_router.py -v
|
|
```
|
|
|
|
Expected: FAIL — routes return 404 (router not included in main app yet).
|
|
|
|
- [ ] **Step 6: Commit templates and router**
|
|
|
|
```bash
|
|
git add app/modules/auth/router.py app/templates/auth/login.html tests/test_auth_router.py
|
|
git commit -m "feat: add auth router with login/logout/me and login template"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 8: Wire auth into main.py and create admin script
|
|
|
|
**Files:**
|
|
- Modify: `app/main.py`
|
|
- Create: `scripts/create_admin.py`
|
|
|
|
- [ ] **Step 1: Replace `app/main.py`**
|
|
|
|
```python
|
|
from fastapi import Depends, FastAPI, Request, WebSocket
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
from app.core.auth import get_token_from_request
|
|
from app.core.config import get_settings
|
|
from app.core.database import get_db
|
|
from app.modules.auth.dependencies import RequiresLoginException, get_current_user
|
|
from app.modules.auth.router import router as auth_router
|
|
|
|
settings = get_settings()
|
|
|
|
app = FastAPI(
|
|
title="University Process Hub",
|
|
root_path=settings.APP_PREFIX,
|
|
)
|
|
|
|
app.mount("/static", StaticFiles(directory="app/static"), name="static")
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
app.include_router(auth_router)
|
|
|
|
|
|
@app.exception_handler(RequiresLoginException)
|
|
async def requires_login_handler(request: Request, exc: RequiresLoginException):
|
|
return RedirectResponse(url="/auth/login", status_code=307)
|
|
|
|
|
|
MODULES = [
|
|
{
|
|
"icon": "📡",
|
|
"name": "RSS-Feed Server",
|
|
"description": "Aggregiert und verteilt Neuigkeiten der Fakultät als standardkonformen RSS 2.0 Feed.",
|
|
"status": "active",
|
|
},
|
|
{
|
|
"icon": "📅",
|
|
"name": "Kalender",
|
|
"description": "Veranstaltungen, Prüfungstermine und Fristen im iCal-Format.",
|
|
"status": "planned",
|
|
},
|
|
{
|
|
"icon": "🔔",
|
|
"name": "Benachrichtigungen",
|
|
"description": "Push-Nachrichten und WebSocket-basierte Echtzeit-Alerts für Studierende.",
|
|
"status": "planned",
|
|
},
|
|
{
|
|
"icon": "📊",
|
|
"name": "Auslastung",
|
|
"description": "Raum- und Ressourcenauslastung der Fakultät in Echtzeit.",
|
|
"status": "planned",
|
|
},
|
|
{
|
|
"icon": "📚",
|
|
"name": "Lehrveranstaltungen",
|
|
"description": "Stundenplan-API und Kursinformationen aus dem Campus-System.",
|
|
"status": "planned",
|
|
},
|
|
]
|
|
|
|
NAV_ITEMS = [
|
|
{"label": "Übersicht", "url": "/", "active": True},
|
|
{"label": "RSS-Feeds", "url": "/rss", "active": False},
|
|
{"label": "Kalender", "url": "/kalender", "active": False},
|
|
{"label": "Docs", "url": "/docs", "active": False},
|
|
]
|
|
|
|
|
|
def _db_mode() -> str:
|
|
url = settings.DATABASE_URL
|
|
return "SQLite (dev)" if url.startswith("sqlite") else "MariaDB (prod)"
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def root(request: Request, current_user=Depends(get_current_user)):
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"index.html",
|
|
{
|
|
"nav_items": NAV_ITEMS,
|
|
"modules": MODULES,
|
|
"db_mode": _db_mode(),
|
|
"app_version": "0.1.0",
|
|
"current_user": current_user,
|
|
},
|
|
)
|
|
|
|
|
|
@app.websocket("/ws/hello/{name}")
|
|
async def websocket_hello(websocket: WebSocket, name: str):
|
|
await websocket.accept()
|
|
await websocket.send_text(f"Hello, {name}!")
|
|
await websocket.close()
|
|
```
|
|
|
|
- [ ] **Step 2: Update `tests/test_landing.py`**
|
|
|
|
`GET /` erfordert jetzt Auth — die alten Tests ohne Cookie schlagen fehl. Ersetze `tests/test_landing.py` vollständig:
|
|
|
|
```python
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from app.core.auth import create_access_token
|
|
from app.core.database import Base, get_db
|
|
from app.main import app
|
|
from app.modules.auth.models import User
|
|
from app.modules.auth.service import hash_password
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def override_db():
|
|
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
|
|
Base.metadata.create_all(bind=engine)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
app.dependency_overrides[get_db] = lambda: session
|
|
yield session
|
|
app.dependency_overrides.clear()
|
|
session.close()
|
|
Base.metadata.drop_all(bind=engine)
|
|
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
return TestClient(app, follow_redirects=False)
|
|
|
|
|
|
@pytest.fixture
|
|
def auth_cookies(override_db):
|
|
user = User(username="testuser", full_name="Test User", pw_hash=hash_password("pw"))
|
|
override_db.add(user)
|
|
override_db.commit()
|
|
token = create_access_token(username="testuser", is_admin=False)
|
|
return {"access_token": token}
|
|
|
|
|
|
def test_landing_without_auth_redirects(client):
|
|
response = client.get("/")
|
|
assert response.status_code == 307
|
|
assert "/auth/login" in response.headers["location"]
|
|
|
|
|
|
def test_landing_returns_html(client, auth_cookies):
|
|
response = client.get("/", cookies=auth_cookies)
|
|
assert response.status_code == 200
|
|
assert "text/html" in response.headers["content-type"]
|
|
|
|
|
|
def test_landing_contains_title(client, auth_cookies):
|
|
response = client.get("/", cookies=auth_cookies)
|
|
assert "University Process Hub" in response.text
|
|
|
|
|
|
def test_landing_contains_rss_module(client, auth_cookies):
|
|
response = client.get("/", cookies=auth_cookies)
|
|
assert "RSS-Feed Server" in response.text
|
|
|
|
|
|
def test_landing_navbar_links_present(client, auth_cookies):
|
|
response = client.get("/", cookies=auth_cookies)
|
|
assert "Übersicht" in response.text
|
|
|
|
def test_landing_info_strip_shows_db_mode(client, auth_cookies):
|
|
response = client.get("/", cookies=auth_cookies)
|
|
assert "SQLite" in response.text or "MariaDB" in response.text
|
|
```
|
|
|
|
- [ ] **Step 3: Run all tests**
|
|
|
|
```bash
|
|
.venv/bin/pytest -v
|
|
```
|
|
|
|
Expected: All tests PASS, including `test_auth_router.py` which now has the router registered.
|
|
|
|
- [ ] **Step 4: Create admin user script**
|
|
|
|
```bash
|
|
mkdir -p scripts
|
|
```
|
|
|
|
Create `scripts/create_admin.py`:
|
|
|
|
```python
|
|
"""
|
|
Create or reset the admin user.
|
|
|
|
Usage:
|
|
.venv/bin/python scripts/create_admin.py <username> <password>
|
|
|
|
Example:
|
|
.venv/bin/python scripts/create_admin.py admin geheim123
|
|
"""
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from app.core.database import SessionLocal, engine, Base
|
|
from app.modules.auth.models import User # noqa: F401 — registers table
|
|
from app.modules.auth.service import get_user, hash_password
|
|
|
|
|
|
def create_admin(username: str, password: str) -> None:
|
|
Base.metadata.create_all(bind=engine)
|
|
with SessionLocal() as db:
|
|
user = get_user(db, username)
|
|
if user is None:
|
|
user = User(username=username, full_name="Administrator")
|
|
db.add(user)
|
|
user.pw_hash = hash_password(password)
|
|
user.is_admin = True
|
|
user.is_active = True
|
|
db.commit()
|
|
print(f"Admin user '{username}' created/updated.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) != 3:
|
|
print(__doc__)
|
|
sys.exit(1)
|
|
create_admin(sys.argv[1], sys.argv[2])
|
|
```
|
|
|
|
- [ ] **Step 5: Create initial admin user**
|
|
|
|
```bash
|
|
.venv/bin/python scripts/create_admin.py admin admin
|
|
```
|
|
|
|
Expected: `Admin user 'admin' created/updated.`
|
|
|
|
- [ ] **Step 6: Start server and verify login**
|
|
|
|
```bash
|
|
.venv/bin/uvicorn app.main:app --reload
|
|
```
|
|
|
|
Open `http://localhost:8000` — should redirect to `http://localhost:8000/auth/login`.
|
|
Log in with `admin` / `admin`. Should redirect to landing page.
|
|
|
|
- [ ] **Step 7: Run full test suite one last time**
|
|
|
|
```bash
|
|
.venv/bin/pytest -v
|
|
```
|
|
|
|
Expected: All tests PASS.
|
|
|
|
- [ ] **Step 8: Commit**
|
|
|
|
```bash
|
|
git add app/main.py scripts/
|
|
git commit -m "feat: wire auth into main app, protect landing page, add create_admin script"
|
|
``` |