Compare commits

...

10 Commits

14 changed files with 1607 additions and 77 deletions

View File

@ -4,6 +4,8 @@ DATABASE_URL=sqlite:///./app.db
SECRET_KEY=changeme-replace-in-production SECRET_KEY=changeme-replace-in-production
ADMIN_USERNAME=admin ADMIN_USERNAME=admin
ADMIN_PASSWORD=change_me ADMIN_PASSWORD=change_me
LDAP_SYNC_MIN_INTERVAL_HOURS=12
LDAP_SYNC_LETTER_DELAY_SECONDS=5.0
# Produktion (MariaDB + LDAP): # Produktion (MariaDB + LDAP):
# APP_ENV=production # APP_ENV=production

View File

@ -14,6 +14,8 @@ class Settings(BaseSettings):
LDAP_SERVER: str = "gso1.ads1.fh-nuernberg.de" LDAP_SERVER: str = "gso1.ads1.fh-nuernberg.de"
LDAP_DOMAIN: str = "ADS1" LDAP_DOMAIN: str = "ADS1"
LDAP_SEARCH_BASE: str = "OU=users,OU=EFI,OU=Faculties,DC=ADS1,DC=fh-nuernberg,DC=de" LDAP_SEARCH_BASE: str = "OU=users,OU=EFI,OU=Faculties,DC=ADS1,DC=fh-nuernberg,DC=de"
LDAP_SYNC_MIN_INTERVAL_HOURS: int = 12
LDAP_SYNC_LETTER_DELAY_SECONDS: float = 5.0
ADMIN_USERNAME: str = "admin" ADMIN_USERNAME: str = "admin"
ADMIN_PASSWORD: str = "change_me" ADMIN_PASSWORD: str = "change_me"

View File

@ -1,3 +1,5 @@
import importlib
import logging
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI, Request, WebSocket from fastapi import Depends, FastAPI, Request, WebSocket
@ -6,16 +8,70 @@ from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from app.core.config import get_settings from app.core.config import get_settings
from app.core.database import get_db from app.modules.auth.dependencies import (
from app.modules.auth.dependencies import RequiresLoginException, get_current_user RequiresLoginException,
check_permission,
get_current_user_optional,
)
from app.modules.auth.router import router as auth_router from app.modules.auth.router import router as auth_router
logger = logging.getLogger(__name__)
settings = get_settings() settings = get_settings()
MODULES = [
{
"slug": "rss",
"icon": "📡",
"name": "RSS-Feed Server",
"description": "Aggregiert und verteilt Neuigkeiten der Fakultät als standardkonformen RSS 2.0 Feed.",
"status": "active",
"permission": ["authenticated"],
},
{
"slug": "kalender",
"icon": "📅",
"name": "Kalender",
"description": "Veranstaltungen, Prüfungstermine und Fristen im iCal-Format.",
"status": "planned",
"permission": ["authenticated"],
},
{
"slug": "benachrichtigungen",
"icon": "🔔",
"name": "Benachrichtigungen",
"description": "Push-Nachrichten und WebSocket-basierte Echtzeit-Alerts für Studierende.",
"status": "planned",
"permission": ["authenticated"],
},
{
"slug": "auslastung",
"icon": "📊",
"name": "Auslastung",
"description": "Raum- und Ressourcenauslastung der Fakultät in Echtzeit.",
"status": "planned",
"permission": ["authenticated"],
},
{
"slug": "lehrveranstaltungen",
"icon": "📚",
"name": "Lehrveranstaltungen",
"description": "Stundenplan-API und Kursinformationen aus dem Campus-System.",
"status": "planned",
"permission": ["authenticated"],
},
]
NAV_ITEMS = [
{"label": "Übersicht", "url": "/", "active": True},
{"label": "RSS-Feeds", "url": "/rss", "active": False},
{"label": "Kalender", "url": "/kalender", "active": False},
{"label": "Docs", "url": "/docs", "active": False},
]
def _reset_admin() -> None: def _reset_admin() -> None:
from app.core.database import Base, SessionLocal, engine from app.core.database import Base, SessionLocal, engine
from app.modules.auth.models import User # noqa: F401 — registers table from app.modules.auth.models import User # noqa: F401
from app.modules.auth.service import get_user, hash_password from app.modules.auth.service import get_user, hash_password
Base.metadata.create_all(bind=engine) Base.metadata.create_all(bind=engine)
@ -31,7 +87,7 @@ def _reset_admin() -> None:
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(_app: FastAPI):
_reset_admin() _reset_admin()
yield yield
@ -47,66 +103,34 @@ templates = Jinja2Templates(directory="app/templates")
app.include_router(auth_router) app.include_router(auth_router)
for _module in MODULES:
if _module["status"] == "active":
try:
_mod = importlib.import_module(f"app.modules.{_module['slug']}.router")
app.include_router(_mod.router, prefix=f"/{_module['slug']}")
except ModuleNotFoundError:
logger.warning("Module router not found: app.modules.%s.router", _module["slug"])
@app.exception_handler(RequiresLoginException) @app.exception_handler(RequiresLoginException)
async def requires_login_handler(request: Request, exc: RequiresLoginException): async def requires_login_handler(_request: Request, _exc: RequiresLoginException):
return RedirectResponse(url="/auth/login", status_code=307) return RedirectResponse(url="/auth/login", status_code=307)
MODULES = [
{
"icon": "📡",
"name": "RSS-Feed Server",
"description": "Aggregiert und verteilt Neuigkeiten der Fakultät als standardkonformen RSS 2.0 Feed.",
"status": "active",
},
{
"icon": "📅",
"name": "Kalender",
"description": "Veranstaltungen, Prüfungstermine und Fristen im iCal-Format.",
"status": "planned",
},
{
"icon": "🔔",
"name": "Benachrichtigungen",
"description": "Push-Nachrichten und WebSocket-basierte Echtzeit-Alerts für Studierende.",
"status": "planned",
},
{
"icon": "📊",
"name": "Auslastung",
"description": "Raum- und Ressourcenauslastung der Fakultät in Echtzeit.",
"status": "planned",
},
{
"icon": "📚",
"name": "Lehrveranstaltungen",
"description": "Stundenplan-API und Kursinformationen aus dem Campus-System.",
"status": "planned",
},
]
NAV_ITEMS = [
{"label": "Übersicht", "url": "/", "active": True},
{"label": "RSS-Feeds", "url": "/rss", "active": False},
{"label": "Kalender", "url": "/kalender", "active": False},
{"label": "Docs", "url": "/docs", "active": False},
]
def _db_mode() -> str: def _db_mode() -> str:
url = settings.DATABASE_URL url = settings.DATABASE_URL
return "SQLite (dev)" if url.startswith("sqlite") else "MariaDB (prod)" return "SQLite (dev)" if url.startswith("sqlite") else "MariaDB (prod)"
@app.get("/", response_class=HTMLResponse) @app.get("/", response_class=HTMLResponse)
async def root(request: Request, current_user=Depends(get_current_user)): async def root(request: Request, current_user=Depends(get_current_user_optional)):
visible = [m for m in MODULES if check_permission(current_user, m["permission"])]
return templates.TemplateResponse( return templates.TemplateResponse(
request, request,
"index.html", "index.html",
{ {
"nav_items": NAV_ITEMS, "nav_items": NAV_ITEMS,
"modules": MODULES, "modules": visible,
"db_mode": _db_mode(), "db_mode": _db_mode(),
"app_version": "0.1.0", "app_version": "0.1.0",
"current_user": current_user, "current_user": current_user,

181
app/modules/auth/ldap.py Normal file
View File

@ -0,0 +1,181 @@
import logging
import time
from datetime import datetime, timedelta, timezone
from typing import Optional
from ldap3 import ALL, ALL_ATTRIBUTES, NTLM, Connection, Server
from ldap3.core.exceptions import LDAPSocketOpenError
from sqlalchemy.orm import Session
from app.modules.auth.models import User
logger = logging.getLogger(__name__)
def ldap_authenticate(
username: str,
password: str,
ldap_server: str,
ldap_domain: str,
) -> Optional[dict]:
"""NTLM bind + fetch user attributes. Returns attrs dict on success, None on failure."""
server = Server(ldap_server, get_info=ALL, connect_timeout=8)
conn = Connection(
server,
user=f"{ldap_domain}\\{username}",
password=password,
authentication=NTLM,
)
try:
conn.bind()
except LDAPSocketOpenError:
logger.warning("LDAP server %s not reachable", ldap_server)
return None
except Exception:
logger.warning("LDAP bind failed for user %s", username, exc_info=True)
return None
if not conn.extend.standard.who_am_i():
conn.unbind()
return None
conn.search(
search_base=f"DC={ldap_domain},DC=fh-nuernberg,DC=de",
search_filter=f"(&(objectclass=user)(CN={username}))",
attributes=ALL_ATTRIBUTES,
)
if not conn.entries:
conn.unbind()
logger.warning("LDAP: user %s authenticated but no entry found", username)
return None
attrs = _parse_entry(conn.entries[0])
conn.unbind()
return attrs
def _parse_entry(entry) -> dict:
def val(field: str, default: str = "N.N.") -> str:
try:
v = entry[field].value
return str(v)[:64] if v else default
except Exception:
return default
def expire_date(field: str) -> Optional[datetime]:
try:
v = entry[field].value
if v is None:
return None
if isinstance(v, datetime):
dt = v
else:
dt = datetime(v.year, v.month, v.day, tzinfo=timezone.utc)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return None if dt.year >= 2099 else dt
except Exception:
return None
return {
"username": val("sAMAccountName"),
"full_name": (f"{val('givenName', '')} {val('sn', '')}".strip() or "N.N."),
"department": val("department"),
"role": val("description"),
"account_expires": expire_date("accountExpires"),
}
def _has_to_update(db: Session, min_interval_hours: int) -> bool:
"""True if a background sync is due."""
deadline = datetime.now(timezone.utc) - timedelta(hours=min_interval_hours)
latest = (
db.query(User)
.filter(User.pw_hash.is_(None))
.order_by(User.updated_at.desc())
.first()
)
if latest is None:
return True
last = latest.updated_at
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
return last < deadline
def _upsert_from_attrs(db: Session, attrs: dict) -> None:
"""Create or update a User from AD attribute dict (no commit)."""
user = db.query(User).filter(User.username == attrs["username"]).first()
if user is None:
user = User(username=attrs["username"])
db.add(user)
user.full_name = attrs["full_name"]
user.department = attrs["department"]
user.role = attrs["role"]
user.account_expires = attrs["account_expires"]
def sync_all_users(
username: str,
password: str,
ldap_server: str,
ldap_domain: str,
ldap_search_base: str,
min_interval_hours: int,
letter_delay_seconds: float,
) -> None:
"""Login-triggered background sync of all AD users. Creates its own DB session."""
from app.core.database import SessionLocal
with SessionLocal() as db:
if not _has_to_update(db, min_interval_hours):
logger.info("LDAP sync skipped: last sync is recent")
return
server = Server(ldap_server, get_info=ALL, connect_timeout=8)
conn = Connection(
server,
user=f"{ldap_domain}\\{username}",
password=password,
authentication=NTLM,
)
try:
conn.bind()
except Exception:
logger.warning("LDAP sync: bind failed", exc_info=True)
return
found_usernames: set[str] = set()
chars = "abcdefghijklmnopqrstuvwxyz_"
for char in chars:
try:
conn.search(
search_base=ldap_search_base,
search_filter=f"(&(objectclass=user)(CN={char}*))",
attributes=ALL_ATTRIBUTES,
)
for entry in conn.entries:
try:
attrs = _parse_entry(entry)
found_usernames.add(attrs["username"])
_upsert_from_attrs(db, attrs)
except Exception:
logger.warning("LDAP sync: failed to parse entry", exc_info=True)
db.commit()
except Exception:
logger.warning("LDAP sync: search failed for prefix '%s'", char, exc_info=True)
if letter_delay_seconds > 0:
time.sleep(letter_delay_seconds)
# Deactivate LDAP users no longer in AD
ldap_users = db.query(User).filter(User.pw_hash.is_(None)).all()
for user_obj in ldap_users:
if user_obj.username not in found_usernames:
logger.info("LDAP sync: deactivating %s (not in AD)", user_obj.username)
user_obj.is_active = False
db.commit()
conn.unbind()
logger.info("LDAP sync complete — %d users found", len(found_usernames))

View File

@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends, Form, Request from fastapi import APIRouter, BackgroundTasks, Depends, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@ -7,6 +7,7 @@ from app.core.auth import clear_auth_cookie, create_access_token, set_auth_cooki
from app.core.config import get_settings from app.core.config import get_settings
from app.core.database import get_db from app.core.database import get_db
from app.modules.auth.dependencies import get_current_user from app.modules.auth.dependencies import get_current_user
from app.modules.auth.ldap import sync_all_users
from app.modules.auth.schemas import UserOut from app.modules.auth.schemas import UserOut
from app.modules.auth.service import authenticate_user from app.modules.auth.service import authenticate_user
@ -27,6 +28,7 @@ async def login_page(request: Request):
@router.post("/login", response_class=HTMLResponse) @router.post("/login", response_class=HTMLResponse)
async def login( async def login(
request: Request, request: Request,
background_tasks: BackgroundTasks,
username: str = Form(...), username: str = Form(...),
password: str = Form(...), password: str = Form(...),
db: Session = Depends(get_db), db: Session = Depends(get_db),
@ -39,6 +41,17 @@ async def login(
{"nav_items": _NAV, "app_version": "0.1.0", "error": "Ungültige Zugangsdaten."}, {"nav_items": _NAV, "app_version": "0.1.0", "error": "Ungültige Zugangsdaten."},
status_code=200, status_code=200,
) )
if settings.LDAP_ENABLED:
background_tasks.add_task(
sync_all_users,
username,
password,
settings.LDAP_SERVER,
settings.LDAP_DOMAIN,
settings.LDAP_SEARCH_BASE,
settings.LDAP_SYNC_MIN_INTERVAL_HOURS,
settings.LDAP_SYNC_LETTER_DELAY_SECONDS,
)
token = create_access_token(username=user.username, is_admin=user.is_admin) token = create_access_token(username=user.username, is_admin=user.is_admin)
response = RedirectResponse(url="/", status_code=303) response = RedirectResponse(url="/", status_code=303)
set_auth_cookie(response, token) set_auth_cookie(response, token)
@ -47,7 +60,7 @@ async def login(
@router.get("/logout") @router.get("/logout")
async def logout(): async def logout():
response = RedirectResponse(url="/auth/login", status_code=303) response = RedirectResponse(url="/", status_code=303)
clear_auth_cookie(response) clear_auth_cookie(response)
return response return response

View File

@ -21,22 +21,41 @@ def get_user(db: Session, username: str) -> Optional[User]:
return db.query(User).filter(User.username == username).first() return db.query(User).filter(User.username == username).first()
def upsert_ldap_user(db: Session, attrs: dict) -> User:
"""Create or update a User from LDAP attribute dict, commit and return."""
from app.modules.auth.ldap import _upsert_from_attrs
_upsert_from_attrs(db, attrs)
db.commit()
return get_user(db, attrs["username"])
def authenticate_user( def authenticate_user(
db: Session, username: str, password: str, ldap_enabled: bool db: Session, username: str, password: str, ldap_enabled: bool
) -> Optional[User]: ) -> Optional[User]:
user = get_user(db, username) user = get_user(db, username)
if user is None or not user.is_active:
# Explicitly deactivated users are always blocked
if user is not None and not user.is_active:
return None return None
local_ok = user.pw_hash is not None and verify_password(password, user.pw_hash) # Local auth
if local_ok: if user is not None and user.pw_hash is not None:
if verify_password(password, user.pw_hash):
_touch_last_login(db, user)
return user
# LDAP auth
if ldap_enabled:
from app.core.config import get_settings
from app.modules.auth.ldap import ldap_authenticate
s = get_settings()
attrs = ldap_authenticate(username, password, s.LDAP_SERVER, s.LDAP_DOMAIN)
if attrs is None:
return None
user = upsert_ldap_user(db, attrs)
_touch_last_login(db, user) _touch_last_login(db, user)
return user return user
if ldap_enabled:
# LDAP auth implemented in Part 2
pass
return None return None

View File

@ -28,10 +28,19 @@
{{ item.label }} {{ item.label }}
</a> </a>
{% endfor %} {% endfor %}
<div class="ml-auto"> <div class="ml-auto flex items-center gap-3">
<button class="text-white text-sm px-4 py-1.5 rounded border border-white/40 bg-white/15 hover:bg-white/25"> {% if current_user | default(None) %}
Anmelden <span class="text-white/85 text-sm">{{ current_user.username }}</span>
</button> <a href="/auth/logout"
class="text-white text-sm px-4 py-1.5 rounded border border-white/40 bg-white/15 hover:bg-white/25">
Abmelden
</a>
{% else %}
<a href="/auth/login"
class="text-white text-sm px-4 py-1.5 rounded border border-white/40 bg-white/15 hover:bg-white/25">
Anmelden
</a>
{% endif %}
</div> </div>
</nav> </nav>
@ -46,4 +55,4 @@
</footer> </footer>
</body> </body>
</html> </html>

View File

@ -0,0 +1,832 @@
# Auth Part 2: LDAP Integration Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** LDAP-Authentifizierung gegen das Active Directory der TH Nürnberg als Fallback zur lokalen Auth, mit login-getrigentem Background-Sync aller AD-User.
**Architecture:** Neue Datei `app/modules/auth/ldap.py` enthält alle ldap3-Aufrufe (kein FastAPI, keine DB-Abhängigkeiten außer in `sync_all_users`). `service.py` füllt den bestehenden LDAP-Stub und legt User bei LDAP-Erfolg an. `router.py` startet den Sync als FastAPI `BackgroundTask`. `sync_all_users` erzeugt seine eigene DB-Session (long-running task).
**Tech Stack:** ldap3, FastAPI BackgroundTasks, SQLAlchemy, pytest mit unittest.mock
---
### Task 1: Config-Erweiterungen
**Files:**
- Modify: `app/core/config.py`
- Modify: `.env.example`
- Modify: `tests/test_config.py`
- [ ] **Step 1: Update `app/core/config.py`**
```python
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
APP_ENV: str = "development"
APP_PREFIX: str = ""
DATABASE_URL: str = "sqlite:///./app.db"
SECRET_KEY: str = "changeme-replace-in-production"
LDAP_ENABLED: bool = False
LDAP_SERVER: str = "gso1.ads1.fh-nuernberg.de"
LDAP_DOMAIN: str = "ADS1"
LDAP_SEARCH_BASE: str = "OU=users,OU=EFI,OU=Faculties,DC=ADS1,DC=fh-nuernberg,DC=de"
LDAP_SYNC_MIN_INTERVAL_HOURS: int = 12
LDAP_SYNC_LETTER_DELAY_SECONDS: float = 5.0
ADMIN_USERNAME: str = "admin"
ADMIN_PASSWORD: str = "change_me"
@lru_cache
def get_settings() -> Settings:
return Settings()
```
- [ ] **Step 2: Append to `.env.example`**
Add these two lines in the development section (before the Produktion comment):
```
LDAP_SYNC_MIN_INTERVAL_HOURS=12
LDAP_SYNC_LETTER_DELAY_SECONDS=5.0
```
- [ ] **Step 3: Add assertions to `tests/test_config.py`**
Append to the existing `test_new_config_fields_have_defaults` function:
```python
assert s.LDAP_SYNC_MIN_INTERVAL_HOURS == 12
assert s.LDAP_SYNC_LETTER_DELAY_SECONDS == 5.0
```
- [ ] **Step 4: Run test**
```bash
.venv/bin/pytest tests/test_config.py -v
```
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add app/core/config.py .env.example tests/test_config.py
git commit -m "feat: add LDAP sync interval config fields"
```
---
### Task 2: `app/modules/auth/ldap.py`
**Files:**
- Create: `app/modules/auth/ldap.py`
- Create: `tests/test_ldap.py`
- [ ] **Step 1: Write failing tests**
Create `tests/test_ldap.py`:
```python
import time
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from app.core.database import Base
from app.modules.auth.ldap import (
_has_to_update,
_parse_entry,
_upsert_from_attrs,
ldap_authenticate,
)
from app.modules.auth.models import User
# --- Helpers ---
def _make_entry(**kwargs):
defaults = {
"sAMAccountName": "hofmannol",
"givenName": "Oliver",
"sn": "Hofmann",
"department": "EFI",
"description": "PF",
"accountExpires": None,
}
defaults.update(kwargs)
class _Attr:
def __init__(self, value):
self.value = value
class _Entry:
def __getitem__(self, key):
return _Attr(defaults.get(key))
return _Entry()
@pytest.fixture
def db():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(bind=engine)
# --- _parse_entry ---
def test_parse_entry_basic():
result = _parse_entry(_make_entry())
assert result["username"] == "hofmannol"
assert result["full_name"] == "Oliver Hofmann"
assert result["department"] == "EFI"
assert result["role"] == "PF"
assert result["account_expires"] is None
def test_parse_entry_truncates_long_fields():
result = _parse_entry(_make_entry(department="A" * 100))
assert len(result["department"]) == 64
def test_parse_entry_uses_nn_for_none_fields():
result = _parse_entry(_make_entry(department=None, description=None))
assert result["department"] == "N.N."
assert result["role"] == "N.N."
def test_parse_entry_handles_none_account_expires():
result = _parse_entry(_make_entry(accountExpires=None))
assert result["account_expires"] is None
# --- _has_to_update ---
def test_has_to_update_true_when_no_ldap_users(db):
assert _has_to_update(db, 12) is True
def test_has_to_update_false_when_recent_ldap_user(db):
user = User(
username="u", full_name="U", pw_hash=None,
updated_at=datetime.now(timezone.utc),
)
db.add(user)
db.commit()
assert _has_to_update(db, 12) is False
def test_has_to_update_true_when_old_ldap_user(db):
old_time = datetime.now(timezone.utc) - timedelta(hours=25)
user = User(username="u", full_name="U", pw_hash=None, updated_at=old_time)
db.add(user)
db.commit()
assert _has_to_update(db, 12) is True
def test_has_to_update_ignores_local_users(db):
# Local user (pw_hash set) should NOT count for sync timing
user = User(
username="admin", full_name="Admin", pw_hash="hash",
updated_at=datetime.now(timezone.utc),
)
db.add(user)
db.commit()
assert _has_to_update(db, 12) is True
# --- _upsert_from_attrs ---
def test_upsert_creates_new_user(db):
attrs = {
"username": "newuser",
"full_name": "New User",
"department": "EFI",
"role": "ST",
"account_expires": None,
}
_upsert_from_attrs(db, attrs)
db.commit()
user = db.query(User).filter(User.username == "newuser").first()
assert user is not None
assert user.full_name == "New User"
assert user.pw_hash is None
def test_upsert_updates_existing_user(db):
user = User(username="existing", full_name="Old Name", pw_hash=None)
db.add(user)
db.commit()
attrs = {
"username": "existing",
"full_name": "New Name",
"department": "EFI",
"role": "PF",
"account_expires": None,
}
_upsert_from_attrs(db, attrs)
db.commit()
db.refresh(user)
assert user.full_name == "New Name"
assert user.role == "PF"
# --- ldap_authenticate ---
@patch("app.modules.auth.ldap.Server")
@patch("app.modules.auth.ldap.Connection")
def test_ldap_authenticate_success(mock_conn_cls, mock_server_cls):
mock_conn = MagicMock()
mock_conn_cls.return_value = mock_conn
mock_conn.extend.standard.who_am_i.return_value = "u:ADS1\\hofmannol"
mock_conn.entries = [_make_entry()]
result = ldap_authenticate("hofmannol", "password", "server.example", "ADS1")
assert result is not None
assert result["username"] == "hofmannol"
assert result["full_name"] == "Oliver Hofmann"
mock_conn.bind.assert_called_once()
mock_conn.unbind.assert_called_once()
@patch("app.modules.auth.ldap.Server")
@patch("app.modules.auth.ldap.Connection")
def test_ldap_authenticate_returns_none_on_socket_error(mock_conn_cls, mock_server_cls):
from ldap3.core.exceptions import LDAPSocketOpenError
mock_conn = MagicMock()
mock_conn_cls.return_value = mock_conn
mock_conn.bind.side_effect = LDAPSocketOpenError("unreachable")
result = ldap_authenticate("hofmannol", "pw", "server.example", "ADS1")
assert result is None
@patch("app.modules.auth.ldap.Server")
@patch("app.modules.auth.ldap.Connection")
def test_ldap_authenticate_returns_none_when_who_am_i_empty(mock_conn_cls, mock_server_cls):
mock_conn = MagicMock()
mock_conn_cls.return_value = mock_conn
mock_conn.extend.standard.who_am_i.return_value = None
result = ldap_authenticate("hofmannol", "wrong", "server.example", "ADS1")
assert result is None
mock_conn.unbind.assert_called_once()
@patch("app.modules.auth.ldap.Server")
@patch("app.modules.auth.ldap.Connection")
def test_ldap_authenticate_returns_none_when_no_entries(mock_conn_cls, mock_server_cls):
mock_conn = MagicMock()
mock_conn_cls.return_value = mock_conn
mock_conn.extend.standard.who_am_i.return_value = "u:ADS1\\hofmannol"
mock_conn.entries = []
result = ldap_authenticate("hofmannol", "pw", "server.example", "ADS1")
assert result is None
```
- [ ] **Step 2: Run to confirm failure**
```bash
.venv/bin/pytest tests/test_ldap.py -v
```
Expected: FAIL — `ModuleNotFoundError: No module named 'app.modules.auth.ldap'`
- [ ] **Step 3: Create `app/modules/auth/ldap.py`**
```python
import logging
import time
from datetime import datetime, timedelta, timezone
from typing import Optional
from ldap3 import ALL, ALL_ATTRIBUTES, NTLM, Connection, Server
from ldap3.core.exceptions import LDAPSocketOpenError
from sqlalchemy.orm import Session
from app.modules.auth.models import User
logger = logging.getLogger(__name__)
def ldap_authenticate(
username: str,
password: str,
ldap_server: str,
ldap_domain: str,
) -> Optional[dict]:
"""NTLM bind + fetch user attributes. Returns attrs dict on success, None on failure."""
server = Server(ldap_server, get_info=ALL, connect_timeout=8)
conn = Connection(
server,
user=f"{ldap_domain}\\{username}",
password=password,
authentication=NTLM,
)
try:
conn.bind()
except LDAPSocketOpenError:
logger.warning("LDAP server %s not reachable", ldap_server)
return None
except Exception:
logger.warning("LDAP bind failed for user %s", username, exc_info=True)
return None
if not conn.extend.standard.who_am_i():
conn.unbind()
return None
conn.search(
search_base=f"DC={ldap_domain},DC=fh-nuernberg,DC=de",
search_filter=f"(&(objectclass=user)(CN={username}))",
attributes=ALL_ATTRIBUTES,
)
if not conn.entries:
conn.unbind()
logger.warning("LDAP: user %s authenticated but no entry found", username)
return None
attrs = _parse_entry(conn.entries[0])
conn.unbind()
return attrs
def _parse_entry(entry) -> dict:
def val(field: str, default: str = "N.N.") -> str:
try:
v = entry[field].value
return str(v)[:64] if v else default
except Exception:
return default
def expire_date(field: str) -> Optional[datetime]:
try:
v = entry[field].value
if v is None:
return None
if isinstance(v, datetime):
dt = v
else:
dt = datetime(v.year, v.month, v.day, tzinfo=timezone.utc)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return None if dt.year >= 2099 else dt
except Exception:
return None
return {
"username": val("sAMAccountName"),
"full_name": (f"{val('givenName', '')} {val('sn', '')}".strip() or "N.N."),
"department": val("department"),
"role": val("description"),
"account_expires": expire_date("accountExpires"),
}
def _has_to_update(db: Session, min_interval_hours: int) -> bool:
"""True if a background sync is due."""
deadline = datetime.now(timezone.utc) - timedelta(hours=min_interval_hours)
latest = (
db.query(User)
.filter(User.pw_hash.is_(None))
.order_by(User.updated_at.desc())
.first()
)
if latest is None:
return True
last = latest.updated_at
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
return last < deadline
def _upsert_from_attrs(db: Session, attrs: dict) -> None:
"""Create or update a User from AD attribute dict (no commit)."""
user = db.query(User).filter(User.username == attrs["username"]).first()
if user is None:
user = User(username=attrs["username"])
db.add(user)
user.full_name = attrs["full_name"]
user.department = attrs["department"]
user.role = attrs["role"]
user.account_expires = attrs["account_expires"]
def sync_all_users(
username: str,
password: str,
ldap_server: str,
ldap_domain: str,
ldap_search_base: str,
min_interval_hours: int,
letter_delay_seconds: float,
) -> None:
"""Login-triggered background sync of all AD users. Creates its own DB session."""
from app.core.database import SessionLocal
with SessionLocal() as db:
if not _has_to_update(db, min_interval_hours):
logger.info("LDAP sync skipped: last sync is recent")
return
server = Server(ldap_server, get_info=ALL, connect_timeout=8)
conn = Connection(
server,
user=f"{ldap_domain}\\{username}",
password=password,
authentication=NTLM,
)
try:
conn.bind()
except Exception:
logger.warning("LDAP sync: bind failed", exc_info=True)
return
found_usernames: set[str] = set()
chars = "abcdefghijklmnopqrstuvwxyz_"
for char in chars:
try:
conn.search(
search_base=ldap_search_base,
search_filter=f"(&(objectclass=user)(CN={char}*))",
attributes=ALL_ATTRIBUTES,
)
for entry in conn.entries:
try:
attrs = _parse_entry(entry)
found_usernames.add(attrs["username"])
_upsert_from_attrs(db, attrs)
except Exception:
logger.warning("LDAP sync: failed to parse entry", exc_info=True)
db.commit()
except Exception:
logger.warning("LDAP sync: search failed for prefix '%s'", char, exc_info=True)
if letter_delay_seconds > 0:
time.sleep(letter_delay_seconds)
# Deactivate LDAP users no longer in AD
ldap_users = db.query(User).filter(User.pw_hash.is_(None)).all()
for user_obj in ldap_users:
if user_obj.username not in found_usernames:
logger.info("LDAP sync: deactivating %s (not in AD)", user_obj.username)
user_obj.is_active = False
db.commit()
conn.unbind()
logger.info("LDAP sync complete — %d users found", len(found_usernames))
```
- [ ] **Step 4: Run to confirm pass**
```bash
.venv/bin/pytest tests/test_ldap.py -v
```
Expected: All 14 tests PASS.
- [ ] **Step 5: Run full suite**
```bash
.venv/bin/pytest -v
```
Expected: All tests PASS.
- [ ] **Step 6: Commit**
```bash
git add app/modules/auth/ldap.py tests/test_ldap.py
git commit -m "feat: add LDAP functions (authenticate, sync, parse, upsert)"
```
---
### Task 3: service.py — LDAP-Auth-Integration
**Files:**
- Modify: `app/modules/auth/service.py`
- Modify: `tests/test_auth_service.py`
- [ ] **Step 1: Append LDAP tests to `tests/test_auth_service.py`**
```python
# --- LDAP auth integration (uses mocked ldap_authenticate) ---
from unittest.mock import patch as mock_patch
def test_authenticate_creates_user_on_ldap_success(db):
ldap_attrs = {
"username": "newldap",
"full_name": "New LDAP User",
"department": "EFI",
"role": "ST",
"account_expires": None,
}
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs):
user = authenticate_user(db, "newldap", "password", ldap_enabled=True)
assert user is not None
assert user.username == "newldap"
assert user.full_name == "New LDAP User"
assert user.last_login is not None
def test_authenticate_updates_existing_user_on_ldap_success(db):
existing = User(username="existing", full_name="Old Name", pw_hash=None)
db.add(existing)
db.commit()
ldap_attrs = {
"username": "existing",
"full_name": "New Name",
"department": "EFI",
"role": "PF",
"account_expires": None,
}
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs):
user = authenticate_user(db, "existing", "password", ldap_enabled=True)
assert user.full_name == "New Name"
def test_authenticate_returns_none_on_ldap_failure(db):
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=None):
result = authenticate_user(db, "anyone", "wrong", ldap_enabled=True)
assert result is None
def test_authenticate_blocked_user_not_bypassed_by_ldap(db):
blocked = User(username="blocked", full_name="B", pw_hash=None, is_active=False)
db.add(blocked)
db.commit()
ldap_attrs = {"username": "blocked", "full_name": "B", "department": "", "role": "", "account_expires": None}
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs):
result = authenticate_user(db, "blocked", "any", ldap_enabled=True)
assert result is None
```
- [ ] **Step 2: Run to confirm new tests fail**
```bash
.venv/bin/pytest tests/test_auth_service.py -v -k "ldap"
```
Expected: FAIL — `authenticate_user` LDAP stub is still `pass`.
- [ ] **Step 3: Replace `app/modules/auth/service.py`**
```python
from datetime import datetime, timezone
from typing import Optional
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from app.modules.auth.models import User
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(plain: str) -> str:
return pwd_context.hash(plain)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def get_user(db: Session, username: str) -> Optional[User]:
return db.query(User).filter(User.username == username).first()
def upsert_ldap_user(db: Session, attrs: dict) -> User:
"""Create or update a User from LDAP attribute dict, commit and return."""
from app.modules.auth.ldap import _upsert_from_attrs
_upsert_from_attrs(db, attrs)
db.commit()
return get_user(db, attrs["username"])
def authenticate_user(
db: Session, username: str, password: str, ldap_enabled: bool
) -> Optional[User]:
user = get_user(db, username)
# Explicitly deactivated users are always blocked
if user is not None and not user.is_active:
return None
# Local auth
if user is not None and user.pw_hash is not None:
if verify_password(password, user.pw_hash):
_touch_last_login(db, user)
return user
# LDAP auth
if ldap_enabled:
from app.core.config import get_settings
from app.modules.auth.ldap import ldap_authenticate
s = get_settings()
attrs = ldap_authenticate(username, password, s.LDAP_SERVER, s.LDAP_DOMAIN)
if attrs is None:
return None
user = upsert_ldap_user(db, attrs)
_touch_last_login(db, user)
return user
return None
def _touch_last_login(db: Session, user: User) -> None:
user.last_login = datetime.now(timezone.utc)
db.commit()
```
- [ ] **Step 4: Run to confirm all service tests pass**
```bash
.venv/bin/pytest tests/test_auth_service.py -v
```
Expected: All 12 tests PASS (8 existing + 4 new).
- [ ] **Step 5: Run full suite**
```bash
.venv/bin/pytest -v
```
Expected: All tests PASS.
- [ ] **Step 6: Commit**
```bash
git add app/modules/auth/service.py tests/test_auth_service.py
git commit -m "feat: implement LDAP auth fallback in authenticate_user"
```
---
### Task 4: router.py — BackgroundTask für Sync
**Files:**
- Modify: `app/modules/auth/router.py`
- Modify: `tests/test_auth_router.py`
- [ ] **Step 1: Add LDAP login test to `tests/test_auth_router.py`**
Append this test to the existing file:
```python
from unittest.mock import patch as mock_patch
def test_ldap_login_sets_cookie_and_redirects(client, override_db):
"""LDAP login via mocked ldap_authenticate: cookie is set, redirects to /."""
ldap_attrs = {
"username": "ldapuser",
"full_name": "LDAP User",
"department": "EFI",
"role": "ST",
"account_expires": None,
}
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs), \
mock_patch("app.modules.auth.ldap.sync_all_users"), \
mock_patch("app.modules.auth.router.settings") as mock_settings:
mock_settings.LDAP_ENABLED = True
mock_settings.LDAP_SERVER = "test"
mock_settings.LDAP_DOMAIN = "ADS1"
mock_settings.LDAP_SEARCH_BASE = "OU=test"
mock_settings.LDAP_SYNC_MIN_INTERVAL_HOURS = 12
mock_settings.LDAP_SYNC_LETTER_DELAY_SECONDS = 0.0
response = client.post("/auth/login", data={"username": "ldapuser", "password": "any"})
assert response.status_code in (302, 303, 307)
assert "access_token" in response.cookies
```
- [ ] **Step 2: Run to confirm failure**
```bash
.venv/bin/pytest tests/test_auth_router.py::test_ldap_login_sets_cookie_and_redirects -v
```
Expected: FAIL — Router importiert `sync_all_users` und `BackgroundTasks` noch nicht.
- [ ] **Step 3: Replace `app/modules/auth/router.py`**
```python
from fastapi import APIRouter, BackgroundTasks, Depends, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from app.core.auth import clear_auth_cookie, create_access_token, set_auth_cookie
from app.core.config import get_settings
from app.core.database import get_db
from app.modules.auth.dependencies import get_current_user
from app.modules.auth.ldap import sync_all_users
from app.modules.auth.schemas import UserOut
from app.modules.auth.service import authenticate_user
router = APIRouter(prefix="/auth", tags=["auth"])
templates = Jinja2Templates(directory="app/templates")
settings = get_settings()
_NAV: list[dict] = []
@router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
return templates.TemplateResponse(
request, "auth/login.html", {"nav_items": _NAV, "app_version": "0.1.0"}
)
@router.post("/login", response_class=HTMLResponse)
async def login(
request: Request,
background_tasks: BackgroundTasks,
username: str = Form(...),
password: str = Form(...),
db: Session = Depends(get_db),
):
user = authenticate_user(db, username, password, ldap_enabled=settings.LDAP_ENABLED)
if user is None:
return templates.TemplateResponse(
request,
"auth/login.html",
{"nav_items": _NAV, "app_version": "0.1.0", "error": "Ungültige Zugangsdaten."},
status_code=200,
)
if settings.LDAP_ENABLED:
background_tasks.add_task(
sync_all_users,
username,
password,
settings.LDAP_SERVER,
settings.LDAP_DOMAIN,
settings.LDAP_SEARCH_BASE,
settings.LDAP_SYNC_MIN_INTERVAL_HOURS,
settings.LDAP_SYNC_LETTER_DELAY_SECONDS,
)
token = create_access_token(username=user.username, is_admin=user.is_admin)
response = RedirectResponse(url="/", status_code=303)
set_auth_cookie(response, token)
return response
@router.get("/logout")
async def logout():
response = RedirectResponse(url="/", status_code=303)
clear_auth_cookie(response)
return response
@router.get("/me", response_model=UserOut)
async def me(user=Depends(get_current_user)):
return user
```
- [ ] **Step 4: Run all router tests**
```bash
.venv/bin/pytest tests/test_auth_router.py -v
```
Expected: All tests PASS.
- [ ] **Step 5: Run full suite**
```bash
.venv/bin/pytest -v
```
Expected: All tests PASS.
- [ ] **Step 6: Commit**
```bash
git add app/modules/auth/router.py tests/test_auth_router.py
git commit -m "feat: trigger LDAP background sync on successful login"
```

View File

@ -0,0 +1,139 @@
# Design Spec: Benutzerverwaltung Part 2 — LDAP-Integration
**Datum:** 2026-04-27
**Status:** Genehmigt
---
## Ziel
LDAP-Authentifizierung gegen das Active Directory der TH Nürnberg als Fallback zur lokalen Auth. Bei erfolgreichem Login wird der lokale User-Eintrag angelegt/aktualisiert und ein Hintergrund-Sync aller AD-User angestoßen (login-getriggert, nicht periodisch).
---
## AD-Verbindungsparameter (bereits in config.py)
| Setting | Wert |
|---|---|
| `LDAP_SERVER` | `gso1.ads1.fh-nuernberg.de` |
| `LDAP_DOMAIN` | `ADS1` |
| `LDAP_SEARCH_BASE` | `OU=users,OU=EFI,OU=Faculties,DC=ADS1,DC=fh-nuernberg,DC=de` |
| `LDAP_ENABLED` | `False` (dev), `True` (prod) |
---
## Section 1: LDAP-Authentifizierung
### Neue Config-Felder (`app/core/config.py`)
```python
LDAP_SYNC_MIN_INTERVAL_HOURS: int = 12 # Mindestabstand zwischen Syncs
LDAP_SYNC_LETTER_DELAY_SECONDS: float = 5.0 # Wartezeit zwischen Buchstaben-Batches
```
### Neue Datei `app/modules/auth/ldap.py`
Reine Funktionen — kein FastAPI, kein DB-Zugriff, nur ldap3.
**`ldap_authenticate(username, password) -> dict | None`**
Öffnet NTLM-Verbindung gegen `LDAP_SERVER`, bindet, liest User-Attribute und schließt die Verbindung wieder. Gibt bei Erfolg ein Dict zurück, bei falschem Passwort oder Verbindungsfehler `None` (Verbindungsfehler wird geloggt). Eine einzige Verbindung für Bind + Attributabfrage.
Rückgabe-Dict:
```python
{
"username": str, # sAMAccountName
"full_name": str, # f"{givenName} {sn}"
"department": str, # department (max 64 Zeichen, "N.N." wenn leer)
"role": str, # description (max 64 Zeichen, "N.N." wenn leer)
"account_expires": datetime | None, # accountExpires, None = kein Ablauf
}
```
**`sync_all_users(username, password, db, settings) -> None`**
Synchronisiert alle AD-User alphabetisch (az + `_`). Läuft in einem `BackgroundTask`-Thread:
1. `has_to_update()` prüfen — wenn letzter Sync < `LDAP_SYNC_MIN_INTERVAL_HOURS`, abbrechen
2. Neue NTLM-Verbindung mit den übergebenen Credentials öffnen
3. Für jeden Buchstaben:
- AD-Einträge mit `CN=<buchstabe>*` laden
- Jeden Entry upserten (full_name, department, role, account_expires)
- `time.sleep(LDAP_SYNC_LETTER_DELAY_SECONDS)`
4. User in lokaler DB die **nicht** in AD gefunden wurden UND `pw_hash is None``is_active = False`
5. Verbindung schließen
`has_to_update()` liest den Zeitstempel des zuletzt geänderten Users aus der DB (entspricht Vorversion).
### Geänderte Datei `app/modules/auth/service.py`
**Geänderter Auth-Flow in `authenticate_user`:**
```
1. user = get_user(db, username)
2. user existiert UND is_active=False → return None (absolute Sperre)
3. user existiert UND pw_hash gesetzt:
→ verify_password(password, pw_hash)
→ OK: last_login setzen, return user
4. ldap_enabled:
→ attrs = ldap_authenticate(username, password)
→ attrs is None: return None
→ OK: upsert_ldap_user(db, attrs), last_login setzen, return user
5. return None
```
**Neue Funktion `upsert_ldap_user(db, username, attrs) -> User`**
Legt User an falls nicht vorhanden, schreibt AD-Felder (full_name, department, role, account_expires). Setzt `pw_hash` nicht (bleibt None für reine LDAP-User).
### Geänderte Datei `app/modules/auth/router.py`
`POST /auth/login` erhält einen `BackgroundTasks`-Parameter. Nach erfolgreichem LDAP-Login wird `sync_all_users` als BackgroundTask gestartet:
```python
@router.post("/login")
async def login(background_tasks: BackgroundTasks, ...):
user = authenticate_user(...)
if user and settings.LDAP_ENABLED:
background_tasks.add_task(sync_all_users, username, password, db, settings)
...
```
---
## Section 2: AD-Sync-Logik
### Sync-Verhalten
| Situation | Aktion |
|---|---|
| User in AD, in DB | full_name, department, role, account_expires aktualisieren |
| User in AD, nicht in DB | Neuen User anlegen (`is_active=True`, `pw_hash=None`) |
| User nicht in AD, in DB, `pw_hash is None` | `is_active = False` |
| User nicht in AD, in DB, `pw_hash` gesetzt | Unverändert (lokaler Account, z. B. Admin) |
### Timing
- Sync läuft nur wenn letzter Sync > `LDAP_SYNC_MIN_INTERVAL_HOURS` her
- Zwischen jedem Buchstaben-Batch: `LDAP_SYNC_LETTER_DELAY_SECONDS` Sleep
- Gesamtdauer ca. 27 × 5s ≈ 23 Minuten (im Hintergrund, blockiert nichts)
---
## Dateistruktur
```
app/modules/auth/
├── ldap.py # neu: ldap_authenticate, ldap_get_user_attrs, sync_all_users
├── service.py # geändert: LDAP-Stub füllen, upsert_ldap_user
└── router.py # geändert: BackgroundTasks in POST /auth/login
app/core/
└── config.py # geändert: LDAP_SYNC_MIN_INTERVAL_HOURS, LDAP_SYNC_LETTER_DELAY_SECONDS
```
---
## Out of Scope
- Serviceaccount / periodischer Celery-Sync (→ mögliche spätere Erweiterung)
- `account_expires`-Prüfung beim Login (Blockierung läuft über AD-Bind-Fehler und `is_active`)
- Admin-UI für User-Verwaltung (→ separater Spec)
- Gruppen aus AD (→ Part 3)

View File

@ -65,8 +65,35 @@ def test_login_unknown_user_shows_error(client):
assert "Ungültige" in response.text assert "Ungültige" in response.text
def test_logout_clears_cookie(client, alice): def test_logout_clears_cookie_and_redirects_to_landing(client, alice):
client.post("/auth/login", data={"username": "alice", "password": "secret123"}) client.post("/auth/login", data={"username": "alice", "password": "secret123"})
response = client.get("/auth/logout") response = client.get("/auth/logout")
assert response.status_code in (302, 303, 307) assert response.status_code in (302, 303, 307)
assert response.headers["location"] == "/"
assert response.cookies.get("access_token", "") == "" assert response.cookies.get("access_token", "") == ""
from unittest.mock import patch as mock_patch
def test_ldap_login_sets_cookie_and_redirects(client, override_db):
"""LDAP login via mocked ldap_authenticate: cookie is set, redirects to /."""
ldap_attrs = {
"username": "ldapuser",
"full_name": "LDAP User",
"department": "EFI",
"role": "ST",
"account_expires": None,
}
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs), \
mock_patch("app.modules.auth.ldap.sync_all_users"), \
mock_patch("app.modules.auth.router.settings") as mock_settings:
mock_settings.LDAP_ENABLED = True
mock_settings.LDAP_SERVER = "test"
mock_settings.LDAP_DOMAIN = "ADS1"
mock_settings.LDAP_SEARCH_BASE = "OU=test"
mock_settings.LDAP_SYNC_MIN_INTERVAL_HOURS = 12
mock_settings.LDAP_SYNC_LETTER_DELAY_SECONDS = 0.0
response = client.post("/auth/login", data={"username": "ldapuser", "password": "any"})
assert response.status_code in (302, 303, 307)
assert "access_token" in response.cookies

View File

@ -67,3 +67,56 @@ def test_authenticate_no_pw_hash_no_ldap(db):
db.add(user) db.add(user)
db.commit() db.commit()
assert authenticate_user(db, "ldaponly", "any", ldap_enabled=False) is None assert authenticate_user(db, "ldaponly", "any", ldap_enabled=False) is None
# --- LDAP auth integration (uses mocked ldap_authenticate) ---
from unittest.mock import patch as mock_patch
def test_authenticate_creates_user_on_ldap_success(db):
ldap_attrs = {
"username": "newldap",
"full_name": "New LDAP User",
"department": "EFI",
"role": "ST",
"account_expires": None,
}
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs):
user = authenticate_user(db, "newldap", "password", ldap_enabled=True)
assert user is not None
assert user.username == "newldap"
assert user.full_name == "New LDAP User"
assert user.last_login is not None
def test_authenticate_updates_existing_user_on_ldap_success(db):
existing = User(username="existing", full_name="Old Name", pw_hash=None)
db.add(existing)
db.commit()
ldap_attrs = {
"username": "existing",
"full_name": "New Name",
"department": "EFI",
"role": "PF",
"account_expires": None,
}
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs):
user = authenticate_user(db, "existing", "password", ldap_enabled=True)
assert user.full_name == "New Name"
def test_authenticate_returns_none_on_ldap_failure(db):
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=None):
result = authenticate_user(db, "anyone", "wrong", ldap_enabled=True)
assert result is None
def test_authenticate_blocked_user_not_bypassed_by_ldap(db):
blocked = User(username="blocked", full_name="B", pw_hash=None, is_active=False)
db.add(blocked)
db.commit()
ldap_attrs = {"username": "blocked", "full_name": "B", "department": "", "role": "", "account_expires": None}
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs):
result = authenticate_user(db, "blocked", "any", ldap_enabled=True)
assert result is None

View File

@ -9,3 +9,5 @@ def test_new_config_fields_have_defaults():
assert s.LDAP_SERVER == "gso1.ads1.fh-nuernberg.de" assert s.LDAP_SERVER == "gso1.ads1.fh-nuernberg.de"
assert s.LDAP_DOMAIN == "ADS1" assert s.LDAP_DOMAIN == "ADS1"
assert "EFI" in s.LDAP_SEARCH_BASE assert "EFI" in s.LDAP_SEARCH_BASE
assert s.LDAP_SYNC_MIN_INTERVAL_HOURS == 12
assert s.LDAP_SYNC_LETTER_DELAY_SECONDS == 5.0

View File

@ -42,33 +42,51 @@ def auth_cookies(override_db):
return {"access_token": token} return {"access_token": token}
def test_landing_without_auth_redirects(client): @pytest.fixture
def admin_cookies(override_db):
user = User(username="adminuser", full_name="Admin User", is_admin=True, pw_hash=hash_password("pw"))
override_db.add(user)
override_db.commit()
token = create_access_token(username="adminuser", is_admin=True)
return {"access_token": token}
def test_landing_accessible_without_auth(client):
response = client.get("/") response = client.get("/")
assert response.status_code == 307
assert "/auth/login" in response.headers["location"]
def test_landing_returns_html(client, auth_cookies):
response = client.get("/", cookies=auth_cookies)
assert response.status_code == 200 assert response.status_code == 200
assert "text/html" in response.headers["content-type"] assert "text/html" in response.headers["content-type"]
def test_landing_contains_title(client, auth_cookies): def test_landing_shows_no_modules_for_anonymous(client):
response = client.get("/", cookies=auth_cookies) response = client.get("/")
assert "University Process Hub" in response.text assert response.status_code == 200
assert "RSS-Feed Server" not in response.text
def test_landing_contains_rss_module(client, auth_cookies): def test_landing_shows_modules_for_authenticated(client, auth_cookies):
response = client.get("/", cookies=auth_cookies) response = client.get("/", cookies=auth_cookies)
assert response.status_code == 200
assert "RSS-Feed Server" in response.text assert "RSS-Feed Server" in response.text
def test_landing_navbar_links_present(client, auth_cookies): def test_landing_contains_title(client):
response = client.get("/", cookies=auth_cookies) response = client.get("/")
assert "Übersicht" in response.text assert "University Process Hub" in response.text
def test_landing_info_strip_shows_db_mode(client, auth_cookies): def test_landing_info_strip_shows_db_mode(client, auth_cookies):
response = client.get("/", cookies=auth_cookies) response = client.get("/", cookies=auth_cookies)
assert "SQLite" in response.text or "MariaDB" in response.text assert "SQLite" in response.text or "MariaDB" in response.text
def test_navbar_shows_login_button_for_anonymous(client):
response = client.get("/")
assert "Anmelden" in response.text
assert "Abmelden" not in response.text
def test_navbar_shows_username_and_logout_when_logged_in(client, auth_cookies):
response = client.get("/", cookies=auth_cookies)
assert "testuser" in response.text
assert "Abmelden" in response.text
assert "Anmelden" not in response.text

209
tests/test_ldap.py Normal file
View File

@ -0,0 +1,209 @@
import time
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from app.core.database import Base
from app.modules.auth.ldap import (
_has_to_update,
_parse_entry,
_upsert_from_attrs,
ldap_authenticate,
)
from app.modules.auth.models import User
# --- Helpers ---
def _make_entry(**kwargs):
defaults = {
"sAMAccountName": "hofmannol",
"givenName": "Oliver",
"sn": "Hofmann",
"department": "EFI",
"description": "PF",
"accountExpires": None,
}
defaults.update(kwargs)
class _Attr:
def __init__(self, value):
self.value = value
class _Entry:
def __getitem__(self, key):
return _Attr(defaults.get(key))
return _Entry()
@pytest.fixture
def db():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(bind=engine)
# --- _parse_entry ---
def test_parse_entry_basic():
result = _parse_entry(_make_entry())
assert result["username"] == "hofmannol"
assert result["full_name"] == "Oliver Hofmann"
assert result["department"] == "EFI"
assert result["role"] == "PF"
assert result["account_expires"] is None
def test_parse_entry_truncates_long_fields():
result = _parse_entry(_make_entry(department="A" * 100))
assert len(result["department"]) == 64
def test_parse_entry_uses_nn_for_none_fields():
result = _parse_entry(_make_entry(department=None, description=None))
assert result["department"] == "N.N."
assert result["role"] == "N.N."
def test_parse_entry_handles_none_account_expires():
result = _parse_entry(_make_entry(accountExpires=None))
assert result["account_expires"] is None
# --- _has_to_update ---
def test_has_to_update_true_when_no_ldap_users(db):
assert _has_to_update(db, 12) is True
def test_has_to_update_false_when_recent_ldap_user(db):
user = User(
username="u", full_name="U", pw_hash=None,
updated_at=datetime.now(timezone.utc),
)
db.add(user)
db.commit()
assert _has_to_update(db, 12) is False
def test_has_to_update_true_when_old_ldap_user(db):
old_time = datetime.now(timezone.utc) - timedelta(hours=25)
user = User(username="u", full_name="U", pw_hash=None, updated_at=old_time)
db.add(user)
db.commit()
assert _has_to_update(db, 12) is True
def test_has_to_update_ignores_local_users(db):
# Local user (pw_hash set) should NOT count for sync timing
user = User(
username="admin", full_name="Admin", pw_hash="hash",
updated_at=datetime.now(timezone.utc),
)
db.add(user)
db.commit()
assert _has_to_update(db, 12) is True
# --- _upsert_from_attrs ---
def test_upsert_creates_new_user(db):
attrs = {
"username": "newuser",
"full_name": "New User",
"department": "EFI",
"role": "ST",
"account_expires": None,
}
_upsert_from_attrs(db, attrs)
db.commit()
user = db.query(User).filter(User.username == "newuser").first()
assert user is not None
assert user.full_name == "New User"
assert user.pw_hash is None
def test_upsert_updates_existing_user(db):
user = User(username="existing", full_name="Old Name", pw_hash=None)
db.add(user)
db.commit()
attrs = {
"username": "existing",
"full_name": "New Name",
"department": "EFI",
"role": "PF",
"account_expires": None,
}
_upsert_from_attrs(db, attrs)
db.commit()
db.refresh(user)
assert user.full_name == "New Name"
assert user.role == "PF"
# --- ldap_authenticate ---
@patch("app.modules.auth.ldap.Server")
@patch("app.modules.auth.ldap.Connection")
def test_ldap_authenticate_success(mock_conn_cls, mock_server_cls):
mock_conn = MagicMock()
mock_conn_cls.return_value = mock_conn
mock_conn.extend.standard.who_am_i.return_value = "u:ADS1\\hofmannol"
mock_conn.entries = [_make_entry()]
result = ldap_authenticate("hofmannol", "password", "server.example", "ADS1")
assert result is not None
assert result["username"] == "hofmannol"
assert result["full_name"] == "Oliver Hofmann"
mock_conn.bind.assert_called_once()
mock_conn.unbind.assert_called_once()
@patch("app.modules.auth.ldap.Server")
@patch("app.modules.auth.ldap.Connection")
def test_ldap_authenticate_returns_none_on_socket_error(mock_conn_cls, mock_server_cls):
from ldap3.core.exceptions import LDAPSocketOpenError
mock_conn = MagicMock()
mock_conn_cls.return_value = mock_conn
mock_conn.bind.side_effect = LDAPSocketOpenError("unreachable")
result = ldap_authenticate("hofmannol", "pw", "server.example", "ADS1")
assert result is None
@patch("app.modules.auth.ldap.Server")
@patch("app.modules.auth.ldap.Connection")
def test_ldap_authenticate_returns_none_when_who_am_i_empty(mock_conn_cls, mock_server_cls):
mock_conn = MagicMock()
mock_conn_cls.return_value = mock_conn
mock_conn.extend.standard.who_am_i.return_value = None
result = ldap_authenticate("hofmannol", "wrong", "server.example", "ADS1")
assert result is None
mock_conn.unbind.assert_called_once()
@patch("app.modules.auth.ldap.Server")
@patch("app.modules.auth.ldap.Connection")
def test_ldap_authenticate_returns_none_when_no_entries(mock_conn_cls, mock_server_cls):
mock_conn = MagicMock()
mock_conn_cls.return_value = mock_conn
mock_conn.extend.standard.who_am_i.return_value = "u:ADS1\\hofmannol"
mock_conn.entries = []
result = ldap_authenticate("hofmannol", "pw", "server.example", "ADS1")
assert result is None