Compare commits
10 Commits
6e7a316091
...
97ec689341
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97ec689341 | ||
|
|
21574d3a57 | ||
|
|
c92351786a | ||
|
|
c9d8273680 | ||
|
|
361ba93afc | ||
|
|
929ce0c8d5 | ||
|
|
3832b81174 | ||
|
|
f3781b0de1 | ||
|
|
ffa037e240 | ||
|
|
9e8778f015 |
@ -4,6 +4,8 @@ DATABASE_URL=sqlite:///./app.db
|
|||||||
SECRET_KEY=changeme-replace-in-production
|
SECRET_KEY=changeme-replace-in-production
|
||||||
ADMIN_USERNAME=admin
|
ADMIN_USERNAME=admin
|
||||||
ADMIN_PASSWORD=change_me
|
ADMIN_PASSWORD=change_me
|
||||||
|
LDAP_SYNC_MIN_INTERVAL_HOURS=12
|
||||||
|
LDAP_SYNC_LETTER_DELAY_SECONDS=5.0
|
||||||
|
|
||||||
# Produktion (MariaDB + LDAP):
|
# Produktion (MariaDB + LDAP):
|
||||||
# APP_ENV=production
|
# APP_ENV=production
|
||||||
|
|||||||
@ -14,6 +14,8 @@ class Settings(BaseSettings):
|
|||||||
LDAP_SERVER: str = "gso1.ads1.fh-nuernberg.de"
|
LDAP_SERVER: str = "gso1.ads1.fh-nuernberg.de"
|
||||||
LDAP_DOMAIN: str = "ADS1"
|
LDAP_DOMAIN: str = "ADS1"
|
||||||
LDAP_SEARCH_BASE: str = "OU=users,OU=EFI,OU=Faculties,DC=ADS1,DC=fh-nuernberg,DC=de"
|
LDAP_SEARCH_BASE: str = "OU=users,OU=EFI,OU=Faculties,DC=ADS1,DC=fh-nuernberg,DC=de"
|
||||||
|
LDAP_SYNC_MIN_INTERVAL_HOURS: int = 12
|
||||||
|
LDAP_SYNC_LETTER_DELAY_SECONDS: float = 5.0
|
||||||
ADMIN_USERNAME: str = "admin"
|
ADMIN_USERNAME: str = "admin"
|
||||||
ADMIN_PASSWORD: str = "change_me"
|
ADMIN_PASSWORD: str = "change_me"
|
||||||
|
|
||||||
|
|||||||
120
app/main.py
120
app/main.py
@ -1,3 +1,5 @@
|
|||||||
|
import importlib
|
||||||
|
import logging
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
from fastapi import Depends, FastAPI, Request, WebSocket
|
from fastapi import Depends, FastAPI, Request, WebSocket
|
||||||
@ -6,16 +8,70 @@ from fastapi.staticfiles import StaticFiles
|
|||||||
from fastapi.templating import Jinja2Templates
|
from fastapi.templating import Jinja2Templates
|
||||||
|
|
||||||
from app.core.config import get_settings
|
from app.core.config import get_settings
|
||||||
from app.core.database import get_db
|
from app.modules.auth.dependencies import (
|
||||||
from app.modules.auth.dependencies import RequiresLoginException, get_current_user
|
RequiresLoginException,
|
||||||
|
check_permission,
|
||||||
|
get_current_user_optional,
|
||||||
|
)
|
||||||
from app.modules.auth.router import router as auth_router
|
from app.modules.auth.router import router as auth_router
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
settings = get_settings()
|
settings = get_settings()
|
||||||
|
|
||||||
|
MODULES = [
|
||||||
|
{
|
||||||
|
"slug": "rss",
|
||||||
|
"icon": "📡",
|
||||||
|
"name": "RSS-Feed Server",
|
||||||
|
"description": "Aggregiert und verteilt Neuigkeiten der Fakultät als standardkonformen RSS 2.0 Feed.",
|
||||||
|
"status": "active",
|
||||||
|
"permission": ["authenticated"],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"slug": "kalender",
|
||||||
|
"icon": "📅",
|
||||||
|
"name": "Kalender",
|
||||||
|
"description": "Veranstaltungen, Prüfungstermine und Fristen im iCal-Format.",
|
||||||
|
"status": "planned",
|
||||||
|
"permission": ["authenticated"],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"slug": "benachrichtigungen",
|
||||||
|
"icon": "🔔",
|
||||||
|
"name": "Benachrichtigungen",
|
||||||
|
"description": "Push-Nachrichten und WebSocket-basierte Echtzeit-Alerts für Studierende.",
|
||||||
|
"status": "planned",
|
||||||
|
"permission": ["authenticated"],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"slug": "auslastung",
|
||||||
|
"icon": "📊",
|
||||||
|
"name": "Auslastung",
|
||||||
|
"description": "Raum- und Ressourcenauslastung der Fakultät in Echtzeit.",
|
||||||
|
"status": "planned",
|
||||||
|
"permission": ["authenticated"],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"slug": "lehrveranstaltungen",
|
||||||
|
"icon": "📚",
|
||||||
|
"name": "Lehrveranstaltungen",
|
||||||
|
"description": "Stundenplan-API und Kursinformationen aus dem Campus-System.",
|
||||||
|
"status": "planned",
|
||||||
|
"permission": ["authenticated"],
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
NAV_ITEMS = [
|
||||||
|
{"label": "Übersicht", "url": "/", "active": True},
|
||||||
|
{"label": "RSS-Feeds", "url": "/rss", "active": False},
|
||||||
|
{"label": "Kalender", "url": "/kalender", "active": False},
|
||||||
|
{"label": "Docs", "url": "/docs", "active": False},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def _reset_admin() -> None:
|
def _reset_admin() -> None:
|
||||||
from app.core.database import Base, SessionLocal, engine
|
from app.core.database import Base, SessionLocal, engine
|
||||||
from app.modules.auth.models import User # noqa: F401 — registers table
|
from app.modules.auth.models import User # noqa: F401
|
||||||
from app.modules.auth.service import get_user, hash_password
|
from app.modules.auth.service import get_user, hash_password
|
||||||
|
|
||||||
Base.metadata.create_all(bind=engine)
|
Base.metadata.create_all(bind=engine)
|
||||||
@ -31,7 +87,7 @@ def _reset_admin() -> None:
|
|||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(_app: FastAPI):
|
||||||
_reset_admin()
|
_reset_admin()
|
||||||
yield
|
yield
|
||||||
|
|
||||||
@ -47,66 +103,34 @@ templates = Jinja2Templates(directory="app/templates")
|
|||||||
|
|
||||||
app.include_router(auth_router)
|
app.include_router(auth_router)
|
||||||
|
|
||||||
|
for _module in MODULES:
|
||||||
|
if _module["status"] == "active":
|
||||||
|
try:
|
||||||
|
_mod = importlib.import_module(f"app.modules.{_module['slug']}.router")
|
||||||
|
app.include_router(_mod.router, prefix=f"/{_module['slug']}")
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
logger.warning("Module router not found: app.modules.%s.router", _module["slug"])
|
||||||
|
|
||||||
|
|
||||||
@app.exception_handler(RequiresLoginException)
|
@app.exception_handler(RequiresLoginException)
|
||||||
async def requires_login_handler(request: Request, exc: RequiresLoginException):
|
async def requires_login_handler(_request: Request, _exc: RequiresLoginException):
|
||||||
return RedirectResponse(url="/auth/login", status_code=307)
|
return RedirectResponse(url="/auth/login", status_code=307)
|
||||||
|
|
||||||
|
|
||||||
MODULES = [
|
|
||||||
{
|
|
||||||
"icon": "📡",
|
|
||||||
"name": "RSS-Feed Server",
|
|
||||||
"description": "Aggregiert und verteilt Neuigkeiten der Fakultät als standardkonformen RSS 2.0 Feed.",
|
|
||||||
"status": "active",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"icon": "📅",
|
|
||||||
"name": "Kalender",
|
|
||||||
"description": "Veranstaltungen, Prüfungstermine und Fristen im iCal-Format.",
|
|
||||||
"status": "planned",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"icon": "🔔",
|
|
||||||
"name": "Benachrichtigungen",
|
|
||||||
"description": "Push-Nachrichten und WebSocket-basierte Echtzeit-Alerts für Studierende.",
|
|
||||||
"status": "planned",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"icon": "📊",
|
|
||||||
"name": "Auslastung",
|
|
||||||
"description": "Raum- und Ressourcenauslastung der Fakultät in Echtzeit.",
|
|
||||||
"status": "planned",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"icon": "📚",
|
|
||||||
"name": "Lehrveranstaltungen",
|
|
||||||
"description": "Stundenplan-API und Kursinformationen aus dem Campus-System.",
|
|
||||||
"status": "planned",
|
|
||||||
},
|
|
||||||
]
|
|
||||||
|
|
||||||
NAV_ITEMS = [
|
|
||||||
{"label": "Übersicht", "url": "/", "active": True},
|
|
||||||
{"label": "RSS-Feeds", "url": "/rss", "active": False},
|
|
||||||
{"label": "Kalender", "url": "/kalender", "active": False},
|
|
||||||
{"label": "Docs", "url": "/docs", "active": False},
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def _db_mode() -> str:
|
def _db_mode() -> str:
|
||||||
url = settings.DATABASE_URL
|
url = settings.DATABASE_URL
|
||||||
return "SQLite (dev)" if url.startswith("sqlite") else "MariaDB (prod)"
|
return "SQLite (dev)" if url.startswith("sqlite") else "MariaDB (prod)"
|
||||||
|
|
||||||
|
|
||||||
@app.get("/", response_class=HTMLResponse)
|
@app.get("/", response_class=HTMLResponse)
|
||||||
async def root(request: Request, current_user=Depends(get_current_user)):
|
async def root(request: Request, current_user=Depends(get_current_user_optional)):
|
||||||
|
visible = [m for m in MODULES if check_permission(current_user, m["permission"])]
|
||||||
return templates.TemplateResponse(
|
return templates.TemplateResponse(
|
||||||
request,
|
request,
|
||||||
"index.html",
|
"index.html",
|
||||||
{
|
{
|
||||||
"nav_items": NAV_ITEMS,
|
"nav_items": NAV_ITEMS,
|
||||||
"modules": MODULES,
|
"modules": visible,
|
||||||
"db_mode": _db_mode(),
|
"db_mode": _db_mode(),
|
||||||
"app_version": "0.1.0",
|
"app_version": "0.1.0",
|
||||||
"current_user": current_user,
|
"current_user": current_user,
|
||||||
|
|||||||
181
app/modules/auth/ldap.py
Normal file
181
app/modules/auth/ldap.py
Normal file
@ -0,0 +1,181 @@
|
|||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from ldap3 import ALL, ALL_ATTRIBUTES, NTLM, Connection, Server
|
||||||
|
from ldap3.core.exceptions import LDAPSocketOpenError
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from app.modules.auth.models import User
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def ldap_authenticate(
|
||||||
|
username: str,
|
||||||
|
password: str,
|
||||||
|
ldap_server: str,
|
||||||
|
ldap_domain: str,
|
||||||
|
) -> Optional[dict]:
|
||||||
|
"""NTLM bind + fetch user attributes. Returns attrs dict on success, None on failure."""
|
||||||
|
server = Server(ldap_server, get_info=ALL, connect_timeout=8)
|
||||||
|
conn = Connection(
|
||||||
|
server,
|
||||||
|
user=f"{ldap_domain}\\{username}",
|
||||||
|
password=password,
|
||||||
|
authentication=NTLM,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
conn.bind()
|
||||||
|
except LDAPSocketOpenError:
|
||||||
|
logger.warning("LDAP server %s not reachable", ldap_server)
|
||||||
|
return None
|
||||||
|
except Exception:
|
||||||
|
logger.warning("LDAP bind failed for user %s", username, exc_info=True)
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not conn.extend.standard.who_am_i():
|
||||||
|
conn.unbind()
|
||||||
|
return None
|
||||||
|
|
||||||
|
conn.search(
|
||||||
|
search_base=f"DC={ldap_domain},DC=fh-nuernberg,DC=de",
|
||||||
|
search_filter=f"(&(objectclass=user)(CN={username}))",
|
||||||
|
attributes=ALL_ATTRIBUTES,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not conn.entries:
|
||||||
|
conn.unbind()
|
||||||
|
logger.warning("LDAP: user %s authenticated but no entry found", username)
|
||||||
|
return None
|
||||||
|
|
||||||
|
attrs = _parse_entry(conn.entries[0])
|
||||||
|
conn.unbind()
|
||||||
|
return attrs
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_entry(entry) -> dict:
|
||||||
|
def val(field: str, default: str = "N.N.") -> str:
|
||||||
|
try:
|
||||||
|
v = entry[field].value
|
||||||
|
return str(v)[:64] if v else default
|
||||||
|
except Exception:
|
||||||
|
return default
|
||||||
|
|
||||||
|
def expire_date(field: str) -> Optional[datetime]:
|
||||||
|
try:
|
||||||
|
v = entry[field].value
|
||||||
|
if v is None:
|
||||||
|
return None
|
||||||
|
if isinstance(v, datetime):
|
||||||
|
dt = v
|
||||||
|
else:
|
||||||
|
dt = datetime(v.year, v.month, v.day, tzinfo=timezone.utc)
|
||||||
|
if dt.tzinfo is None:
|
||||||
|
dt = dt.replace(tzinfo=timezone.utc)
|
||||||
|
return None if dt.year >= 2099 else dt
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return {
|
||||||
|
"username": val("sAMAccountName"),
|
||||||
|
"full_name": (f"{val('givenName', '')} {val('sn', '')}".strip() or "N.N."),
|
||||||
|
"department": val("department"),
|
||||||
|
"role": val("description"),
|
||||||
|
"account_expires": expire_date("accountExpires"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _has_to_update(db: Session, min_interval_hours: int) -> bool:
|
||||||
|
"""True if a background sync is due."""
|
||||||
|
deadline = datetime.now(timezone.utc) - timedelta(hours=min_interval_hours)
|
||||||
|
latest = (
|
||||||
|
db.query(User)
|
||||||
|
.filter(User.pw_hash.is_(None))
|
||||||
|
.order_by(User.updated_at.desc())
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if latest is None:
|
||||||
|
return True
|
||||||
|
last = latest.updated_at
|
||||||
|
if last.tzinfo is None:
|
||||||
|
last = last.replace(tzinfo=timezone.utc)
|
||||||
|
return last < deadline
|
||||||
|
|
||||||
|
|
||||||
|
def _upsert_from_attrs(db: Session, attrs: dict) -> None:
|
||||||
|
"""Create or update a User from AD attribute dict (no commit)."""
|
||||||
|
user = db.query(User).filter(User.username == attrs["username"]).first()
|
||||||
|
if user is None:
|
||||||
|
user = User(username=attrs["username"])
|
||||||
|
db.add(user)
|
||||||
|
user.full_name = attrs["full_name"]
|
||||||
|
user.department = attrs["department"]
|
||||||
|
user.role = attrs["role"]
|
||||||
|
user.account_expires = attrs["account_expires"]
|
||||||
|
|
||||||
|
|
||||||
|
def sync_all_users(
|
||||||
|
username: str,
|
||||||
|
password: str,
|
||||||
|
ldap_server: str,
|
||||||
|
ldap_domain: str,
|
||||||
|
ldap_search_base: str,
|
||||||
|
min_interval_hours: int,
|
||||||
|
letter_delay_seconds: float,
|
||||||
|
) -> None:
|
||||||
|
"""Login-triggered background sync of all AD users. Creates its own DB session."""
|
||||||
|
from app.core.database import SessionLocal
|
||||||
|
|
||||||
|
with SessionLocal() as db:
|
||||||
|
if not _has_to_update(db, min_interval_hours):
|
||||||
|
logger.info("LDAP sync skipped: last sync is recent")
|
||||||
|
return
|
||||||
|
|
||||||
|
server = Server(ldap_server, get_info=ALL, connect_timeout=8)
|
||||||
|
conn = Connection(
|
||||||
|
server,
|
||||||
|
user=f"{ldap_domain}\\{username}",
|
||||||
|
password=password,
|
||||||
|
authentication=NTLM,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
conn.bind()
|
||||||
|
except Exception:
|
||||||
|
logger.warning("LDAP sync: bind failed", exc_info=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
found_usernames: set[str] = set()
|
||||||
|
chars = "abcdefghijklmnopqrstuvwxyz_"
|
||||||
|
|
||||||
|
for char in chars:
|
||||||
|
try:
|
||||||
|
conn.search(
|
||||||
|
search_base=ldap_search_base,
|
||||||
|
search_filter=f"(&(objectclass=user)(CN={char}*))",
|
||||||
|
attributes=ALL_ATTRIBUTES,
|
||||||
|
)
|
||||||
|
for entry in conn.entries:
|
||||||
|
try:
|
||||||
|
attrs = _parse_entry(entry)
|
||||||
|
found_usernames.add(attrs["username"])
|
||||||
|
_upsert_from_attrs(db, attrs)
|
||||||
|
except Exception:
|
||||||
|
logger.warning("LDAP sync: failed to parse entry", exc_info=True)
|
||||||
|
db.commit()
|
||||||
|
except Exception:
|
||||||
|
logger.warning("LDAP sync: search failed for prefix '%s'", char, exc_info=True)
|
||||||
|
|
||||||
|
if letter_delay_seconds > 0:
|
||||||
|
time.sleep(letter_delay_seconds)
|
||||||
|
|
||||||
|
# Deactivate LDAP users no longer in AD
|
||||||
|
ldap_users = db.query(User).filter(User.pw_hash.is_(None)).all()
|
||||||
|
for user_obj in ldap_users:
|
||||||
|
if user_obj.username not in found_usernames:
|
||||||
|
logger.info("LDAP sync: deactivating %s (not in AD)", user_obj.username)
|
||||||
|
user_obj.is_active = False
|
||||||
|
db.commit()
|
||||||
|
conn.unbind()
|
||||||
|
logger.info("LDAP sync complete — %d users found", len(found_usernames))
|
||||||
@ -1,4 +1,4 @@
|
|||||||
from fastapi import APIRouter, Depends, Form, Request
|
from fastapi import APIRouter, BackgroundTasks, Depends, Form, Request
|
||||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||||
from fastapi.templating import Jinja2Templates
|
from fastapi.templating import Jinja2Templates
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
@ -7,6 +7,7 @@ from app.core.auth import clear_auth_cookie, create_access_token, set_auth_cooki
|
|||||||
from app.core.config import get_settings
|
from app.core.config import get_settings
|
||||||
from app.core.database import get_db
|
from app.core.database import get_db
|
||||||
from app.modules.auth.dependencies import get_current_user
|
from app.modules.auth.dependencies import get_current_user
|
||||||
|
from app.modules.auth.ldap import sync_all_users
|
||||||
from app.modules.auth.schemas import UserOut
|
from app.modules.auth.schemas import UserOut
|
||||||
from app.modules.auth.service import authenticate_user
|
from app.modules.auth.service import authenticate_user
|
||||||
|
|
||||||
@ -27,6 +28,7 @@ async def login_page(request: Request):
|
|||||||
@router.post("/login", response_class=HTMLResponse)
|
@router.post("/login", response_class=HTMLResponse)
|
||||||
async def login(
|
async def login(
|
||||||
request: Request,
|
request: Request,
|
||||||
|
background_tasks: BackgroundTasks,
|
||||||
username: str = Form(...),
|
username: str = Form(...),
|
||||||
password: str = Form(...),
|
password: str = Form(...),
|
||||||
db: Session = Depends(get_db),
|
db: Session = Depends(get_db),
|
||||||
@ -39,6 +41,17 @@ async def login(
|
|||||||
{"nav_items": _NAV, "app_version": "0.1.0", "error": "Ungültige Zugangsdaten."},
|
{"nav_items": _NAV, "app_version": "0.1.0", "error": "Ungültige Zugangsdaten."},
|
||||||
status_code=200,
|
status_code=200,
|
||||||
)
|
)
|
||||||
|
if settings.LDAP_ENABLED:
|
||||||
|
background_tasks.add_task(
|
||||||
|
sync_all_users,
|
||||||
|
username,
|
||||||
|
password,
|
||||||
|
settings.LDAP_SERVER,
|
||||||
|
settings.LDAP_DOMAIN,
|
||||||
|
settings.LDAP_SEARCH_BASE,
|
||||||
|
settings.LDAP_SYNC_MIN_INTERVAL_HOURS,
|
||||||
|
settings.LDAP_SYNC_LETTER_DELAY_SECONDS,
|
||||||
|
)
|
||||||
token = create_access_token(username=user.username, is_admin=user.is_admin)
|
token = create_access_token(username=user.username, is_admin=user.is_admin)
|
||||||
response = RedirectResponse(url="/", status_code=303)
|
response = RedirectResponse(url="/", status_code=303)
|
||||||
set_auth_cookie(response, token)
|
set_auth_cookie(response, token)
|
||||||
@ -47,7 +60,7 @@ async def login(
|
|||||||
|
|
||||||
@router.get("/logout")
|
@router.get("/logout")
|
||||||
async def logout():
|
async def logout():
|
||||||
response = RedirectResponse(url="/auth/login", status_code=303)
|
response = RedirectResponse(url="/", status_code=303)
|
||||||
clear_auth_cookie(response)
|
clear_auth_cookie(response)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|||||||
@ -21,22 +21,41 @@ def get_user(db: Session, username: str) -> Optional[User]:
|
|||||||
return db.query(User).filter(User.username == username).first()
|
return db.query(User).filter(User.username == username).first()
|
||||||
|
|
||||||
|
|
||||||
|
def upsert_ldap_user(db: Session, attrs: dict) -> User:
|
||||||
|
"""Create or update a User from LDAP attribute dict, commit and return."""
|
||||||
|
from app.modules.auth.ldap import _upsert_from_attrs
|
||||||
|
_upsert_from_attrs(db, attrs)
|
||||||
|
db.commit()
|
||||||
|
return get_user(db, attrs["username"])
|
||||||
|
|
||||||
|
|
||||||
def authenticate_user(
|
def authenticate_user(
|
||||||
db: Session, username: str, password: str, ldap_enabled: bool
|
db: Session, username: str, password: str, ldap_enabled: bool
|
||||||
) -> Optional[User]:
|
) -> Optional[User]:
|
||||||
user = get_user(db, username)
|
user = get_user(db, username)
|
||||||
if user is None or not user.is_active:
|
|
||||||
|
# Explicitly deactivated users are always blocked
|
||||||
|
if user is not None and not user.is_active:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
local_ok = user.pw_hash is not None and verify_password(password, user.pw_hash)
|
# Local auth
|
||||||
if local_ok:
|
if user is not None and user.pw_hash is not None:
|
||||||
|
if verify_password(password, user.pw_hash):
|
||||||
|
_touch_last_login(db, user)
|
||||||
|
return user
|
||||||
|
|
||||||
|
# LDAP auth
|
||||||
|
if ldap_enabled:
|
||||||
|
from app.core.config import get_settings
|
||||||
|
from app.modules.auth.ldap import ldap_authenticate
|
||||||
|
s = get_settings()
|
||||||
|
attrs = ldap_authenticate(username, password, s.LDAP_SERVER, s.LDAP_DOMAIN)
|
||||||
|
if attrs is None:
|
||||||
|
return None
|
||||||
|
user = upsert_ldap_user(db, attrs)
|
||||||
_touch_last_login(db, user)
|
_touch_last_login(db, user)
|
||||||
return user
|
return user
|
||||||
|
|
||||||
if ldap_enabled:
|
|
||||||
# LDAP auth implemented in Part 2
|
|
||||||
pass
|
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -28,10 +28,19 @@
|
|||||||
{{ item.label }}
|
{{ item.label }}
|
||||||
</a>
|
</a>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
<div class="ml-auto">
|
<div class="ml-auto flex items-center gap-3">
|
||||||
<button class="text-white text-sm px-4 py-1.5 rounded border border-white/40 bg-white/15 hover:bg-white/25">
|
{% if current_user | default(None) %}
|
||||||
Anmelden
|
<span class="text-white/85 text-sm">{{ current_user.username }}</span>
|
||||||
</button>
|
<a href="/auth/logout"
|
||||||
|
class="text-white text-sm px-4 py-1.5 rounded border border-white/40 bg-white/15 hover:bg-white/25">
|
||||||
|
Abmelden
|
||||||
|
</a>
|
||||||
|
{% else %}
|
||||||
|
<a href="/auth/login"
|
||||||
|
class="text-white text-sm px-4 py-1.5 rounded border border-white/40 bg-white/15 hover:bg-white/25">
|
||||||
|
Anmelden
|
||||||
|
</a>
|
||||||
|
{% endif %}
|
||||||
</div>
|
</div>
|
||||||
</nav>
|
</nav>
|
||||||
|
|
||||||
@ -46,4 +55,4 @@
|
|||||||
</footer>
|
</footer>
|
||||||
|
|
||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
|
|||||||
832
docs/superpowers/plans/2026-04-27-auth-part2-ldap.md
Normal file
832
docs/superpowers/plans/2026-04-27-auth-part2-ldap.md
Normal file
@ -0,0 +1,832 @@
|
|||||||
|
# Auth Part 2: LDAP Integration Implementation Plan
|
||||||
|
|
||||||
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||||
|
|
||||||
|
**Goal:** LDAP-Authentifizierung gegen das Active Directory der TH Nürnberg als Fallback zur lokalen Auth, mit login-getrigentem Background-Sync aller AD-User.
|
||||||
|
|
||||||
|
**Architecture:** Neue Datei `app/modules/auth/ldap.py` enthält alle ldap3-Aufrufe (kein FastAPI, keine DB-Abhängigkeiten außer in `sync_all_users`). `service.py` füllt den bestehenden LDAP-Stub und legt User bei LDAP-Erfolg an. `router.py` startet den Sync als FastAPI `BackgroundTask`. `sync_all_users` erzeugt seine eigene DB-Session (long-running task).
|
||||||
|
|
||||||
|
**Tech Stack:** ldap3, FastAPI BackgroundTasks, SQLAlchemy, pytest mit unittest.mock
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 1: Config-Erweiterungen
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `app/core/config.py`
|
||||||
|
- Modify: `.env.example`
|
||||||
|
- Modify: `tests/test_config.py`
|
||||||
|
|
||||||
|
- [ ] **Step 1: Update `app/core/config.py`**
|
||||||
|
|
||||||
|
```python
|
||||||
|
from functools import lru_cache
|
||||||
|
|
||||||
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||||
|
|
||||||
|
|
||||||
|
class Settings(BaseSettings):
|
||||||
|
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
|
||||||
|
|
||||||
|
APP_ENV: str = "development"
|
||||||
|
APP_PREFIX: str = ""
|
||||||
|
DATABASE_URL: str = "sqlite:///./app.db"
|
||||||
|
SECRET_KEY: str = "changeme-replace-in-production"
|
||||||
|
LDAP_ENABLED: bool = False
|
||||||
|
LDAP_SERVER: str = "gso1.ads1.fh-nuernberg.de"
|
||||||
|
LDAP_DOMAIN: str = "ADS1"
|
||||||
|
LDAP_SEARCH_BASE: str = "OU=users,OU=EFI,OU=Faculties,DC=ADS1,DC=fh-nuernberg,DC=de"
|
||||||
|
LDAP_SYNC_MIN_INTERVAL_HOURS: int = 12
|
||||||
|
LDAP_SYNC_LETTER_DELAY_SECONDS: float = 5.0
|
||||||
|
ADMIN_USERNAME: str = "admin"
|
||||||
|
ADMIN_PASSWORD: str = "change_me"
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache
|
||||||
|
def get_settings() -> Settings:
|
||||||
|
return Settings()
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: Append to `.env.example`**
|
||||||
|
|
||||||
|
Add these two lines in the development section (before the Produktion comment):
|
||||||
|
|
||||||
|
```
|
||||||
|
LDAP_SYNC_MIN_INTERVAL_HOURS=12
|
||||||
|
LDAP_SYNC_LETTER_DELAY_SECONDS=5.0
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 3: Add assertions to `tests/test_config.py`**
|
||||||
|
|
||||||
|
Append to the existing `test_new_config_fields_have_defaults` function:
|
||||||
|
|
||||||
|
```python
|
||||||
|
assert s.LDAP_SYNC_MIN_INTERVAL_HOURS == 12
|
||||||
|
assert s.LDAP_SYNC_LETTER_DELAY_SECONDS == 5.0
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 4: Run test**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
.venv/bin/pytest tests/test_config.py -v
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected: PASS.
|
||||||
|
|
||||||
|
- [ ] **Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add app/core/config.py .env.example tests/test_config.py
|
||||||
|
git commit -m "feat: add LDAP sync interval config fields"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 2: `app/modules/auth/ldap.py`
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Create: `app/modules/auth/ldap.py`
|
||||||
|
- Create: `tests/test_ldap.py`
|
||||||
|
|
||||||
|
- [ ] **Step 1: Write failing tests**
|
||||||
|
|
||||||
|
Create `tests/test_ldap.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
from sqlalchemy.orm import sessionmaker
|
||||||
|
from sqlalchemy.pool import StaticPool
|
||||||
|
|
||||||
|
from app.core.database import Base
|
||||||
|
from app.modules.auth.ldap import (
|
||||||
|
_has_to_update,
|
||||||
|
_parse_entry,
|
||||||
|
_upsert_from_attrs,
|
||||||
|
ldap_authenticate,
|
||||||
|
)
|
||||||
|
from app.modules.auth.models import User
|
||||||
|
|
||||||
|
|
||||||
|
# --- Helpers ---
|
||||||
|
|
||||||
|
def _make_entry(**kwargs):
|
||||||
|
defaults = {
|
||||||
|
"sAMAccountName": "hofmannol",
|
||||||
|
"givenName": "Oliver",
|
||||||
|
"sn": "Hofmann",
|
||||||
|
"department": "EFI",
|
||||||
|
"description": "PF",
|
||||||
|
"accountExpires": None,
|
||||||
|
}
|
||||||
|
defaults.update(kwargs)
|
||||||
|
|
||||||
|
class _Attr:
|
||||||
|
def __init__(self, value):
|
||||||
|
self.value = value
|
||||||
|
|
||||||
|
class _Entry:
|
||||||
|
def __getitem__(self, key):
|
||||||
|
return _Attr(defaults.get(key))
|
||||||
|
|
||||||
|
return _Entry()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def db():
|
||||||
|
engine = create_engine(
|
||||||
|
"sqlite:///:memory:",
|
||||||
|
connect_args={"check_same_thread": False},
|
||||||
|
poolclass=StaticPool,
|
||||||
|
)
|
||||||
|
Base.metadata.create_all(bind=engine)
|
||||||
|
Session = sessionmaker(bind=engine)
|
||||||
|
session = Session()
|
||||||
|
yield session
|
||||||
|
session.close()
|
||||||
|
Base.metadata.drop_all(bind=engine)
|
||||||
|
|
||||||
|
|
||||||
|
# --- _parse_entry ---
|
||||||
|
|
||||||
|
def test_parse_entry_basic():
|
||||||
|
result = _parse_entry(_make_entry())
|
||||||
|
assert result["username"] == "hofmannol"
|
||||||
|
assert result["full_name"] == "Oliver Hofmann"
|
||||||
|
assert result["department"] == "EFI"
|
||||||
|
assert result["role"] == "PF"
|
||||||
|
assert result["account_expires"] is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_entry_truncates_long_fields():
|
||||||
|
result = _parse_entry(_make_entry(department="A" * 100))
|
||||||
|
assert len(result["department"]) == 64
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_entry_uses_nn_for_none_fields():
|
||||||
|
result = _parse_entry(_make_entry(department=None, description=None))
|
||||||
|
assert result["department"] == "N.N."
|
||||||
|
assert result["role"] == "N.N."
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_entry_handles_none_account_expires():
|
||||||
|
result = _parse_entry(_make_entry(accountExpires=None))
|
||||||
|
assert result["account_expires"] is None
|
||||||
|
|
||||||
|
|
||||||
|
# --- _has_to_update ---
|
||||||
|
|
||||||
|
def test_has_to_update_true_when_no_ldap_users(db):
|
||||||
|
assert _has_to_update(db, 12) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_has_to_update_false_when_recent_ldap_user(db):
|
||||||
|
user = User(
|
||||||
|
username="u", full_name="U", pw_hash=None,
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
db.add(user)
|
||||||
|
db.commit()
|
||||||
|
assert _has_to_update(db, 12) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_has_to_update_true_when_old_ldap_user(db):
|
||||||
|
old_time = datetime.now(timezone.utc) - timedelta(hours=25)
|
||||||
|
user = User(username="u", full_name="U", pw_hash=None, updated_at=old_time)
|
||||||
|
db.add(user)
|
||||||
|
db.commit()
|
||||||
|
assert _has_to_update(db, 12) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_has_to_update_ignores_local_users(db):
|
||||||
|
# Local user (pw_hash set) should NOT count for sync timing
|
||||||
|
user = User(
|
||||||
|
username="admin", full_name="Admin", pw_hash="hash",
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
db.add(user)
|
||||||
|
db.commit()
|
||||||
|
assert _has_to_update(db, 12) is True
|
||||||
|
|
||||||
|
|
||||||
|
# --- _upsert_from_attrs ---
|
||||||
|
|
||||||
|
def test_upsert_creates_new_user(db):
|
||||||
|
attrs = {
|
||||||
|
"username": "newuser",
|
||||||
|
"full_name": "New User",
|
||||||
|
"department": "EFI",
|
||||||
|
"role": "ST",
|
||||||
|
"account_expires": None,
|
||||||
|
}
|
||||||
|
_upsert_from_attrs(db, attrs)
|
||||||
|
db.commit()
|
||||||
|
user = db.query(User).filter(User.username == "newuser").first()
|
||||||
|
assert user is not None
|
||||||
|
assert user.full_name == "New User"
|
||||||
|
assert user.pw_hash is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_upsert_updates_existing_user(db):
|
||||||
|
user = User(username="existing", full_name="Old Name", pw_hash=None)
|
||||||
|
db.add(user)
|
||||||
|
db.commit()
|
||||||
|
attrs = {
|
||||||
|
"username": "existing",
|
||||||
|
"full_name": "New Name",
|
||||||
|
"department": "EFI",
|
||||||
|
"role": "PF",
|
||||||
|
"account_expires": None,
|
||||||
|
}
|
||||||
|
_upsert_from_attrs(db, attrs)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(user)
|
||||||
|
assert user.full_name == "New Name"
|
||||||
|
assert user.role == "PF"
|
||||||
|
|
||||||
|
|
||||||
|
# --- ldap_authenticate ---
|
||||||
|
|
||||||
|
@patch("app.modules.auth.ldap.Server")
|
||||||
|
@patch("app.modules.auth.ldap.Connection")
|
||||||
|
def test_ldap_authenticate_success(mock_conn_cls, mock_server_cls):
|
||||||
|
mock_conn = MagicMock()
|
||||||
|
mock_conn_cls.return_value = mock_conn
|
||||||
|
mock_conn.extend.standard.who_am_i.return_value = "u:ADS1\\hofmannol"
|
||||||
|
mock_conn.entries = [_make_entry()]
|
||||||
|
|
||||||
|
result = ldap_authenticate("hofmannol", "password", "server.example", "ADS1")
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert result["username"] == "hofmannol"
|
||||||
|
assert result["full_name"] == "Oliver Hofmann"
|
||||||
|
mock_conn.bind.assert_called_once()
|
||||||
|
mock_conn.unbind.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
@patch("app.modules.auth.ldap.Server")
|
||||||
|
@patch("app.modules.auth.ldap.Connection")
|
||||||
|
def test_ldap_authenticate_returns_none_on_socket_error(mock_conn_cls, mock_server_cls):
|
||||||
|
from ldap3.core.exceptions import LDAPSocketOpenError
|
||||||
|
mock_conn = MagicMock()
|
||||||
|
mock_conn_cls.return_value = mock_conn
|
||||||
|
mock_conn.bind.side_effect = LDAPSocketOpenError("unreachable")
|
||||||
|
|
||||||
|
result = ldap_authenticate("hofmannol", "pw", "server.example", "ADS1")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
@patch("app.modules.auth.ldap.Server")
|
||||||
|
@patch("app.modules.auth.ldap.Connection")
|
||||||
|
def test_ldap_authenticate_returns_none_when_who_am_i_empty(mock_conn_cls, mock_server_cls):
|
||||||
|
mock_conn = MagicMock()
|
||||||
|
mock_conn_cls.return_value = mock_conn
|
||||||
|
mock_conn.extend.standard.who_am_i.return_value = None
|
||||||
|
|
||||||
|
result = ldap_authenticate("hofmannol", "wrong", "server.example", "ADS1")
|
||||||
|
assert result is None
|
||||||
|
mock_conn.unbind.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
@patch("app.modules.auth.ldap.Server")
|
||||||
|
@patch("app.modules.auth.ldap.Connection")
|
||||||
|
def test_ldap_authenticate_returns_none_when_no_entries(mock_conn_cls, mock_server_cls):
|
||||||
|
mock_conn = MagicMock()
|
||||||
|
mock_conn_cls.return_value = mock_conn
|
||||||
|
mock_conn.extend.standard.who_am_i.return_value = "u:ADS1\\hofmannol"
|
||||||
|
mock_conn.entries = []
|
||||||
|
|
||||||
|
result = ldap_authenticate("hofmannol", "pw", "server.example", "ADS1")
|
||||||
|
assert result is None
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: Run to confirm failure**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
.venv/bin/pytest tests/test_ldap.py -v
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected: FAIL — `ModuleNotFoundError: No module named 'app.modules.auth.ldap'`
|
||||||
|
|
||||||
|
- [ ] **Step 3: Create `app/modules/auth/ldap.py`**
|
||||||
|
|
||||||
|
```python
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from ldap3 import ALL, ALL_ATTRIBUTES, NTLM, Connection, Server
|
||||||
|
from ldap3.core.exceptions import LDAPSocketOpenError
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from app.modules.auth.models import User
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def ldap_authenticate(
|
||||||
|
username: str,
|
||||||
|
password: str,
|
||||||
|
ldap_server: str,
|
||||||
|
ldap_domain: str,
|
||||||
|
) -> Optional[dict]:
|
||||||
|
"""NTLM bind + fetch user attributes. Returns attrs dict on success, None on failure."""
|
||||||
|
server = Server(ldap_server, get_info=ALL, connect_timeout=8)
|
||||||
|
conn = Connection(
|
||||||
|
server,
|
||||||
|
user=f"{ldap_domain}\\{username}",
|
||||||
|
password=password,
|
||||||
|
authentication=NTLM,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
conn.bind()
|
||||||
|
except LDAPSocketOpenError:
|
||||||
|
logger.warning("LDAP server %s not reachable", ldap_server)
|
||||||
|
return None
|
||||||
|
except Exception:
|
||||||
|
logger.warning("LDAP bind failed for user %s", username, exc_info=True)
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not conn.extend.standard.who_am_i():
|
||||||
|
conn.unbind()
|
||||||
|
return None
|
||||||
|
|
||||||
|
conn.search(
|
||||||
|
search_base=f"DC={ldap_domain},DC=fh-nuernberg,DC=de",
|
||||||
|
search_filter=f"(&(objectclass=user)(CN={username}))",
|
||||||
|
attributes=ALL_ATTRIBUTES,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not conn.entries:
|
||||||
|
conn.unbind()
|
||||||
|
logger.warning("LDAP: user %s authenticated but no entry found", username)
|
||||||
|
return None
|
||||||
|
|
||||||
|
attrs = _parse_entry(conn.entries[0])
|
||||||
|
conn.unbind()
|
||||||
|
return attrs
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_entry(entry) -> dict:
|
||||||
|
def val(field: str, default: str = "N.N.") -> str:
|
||||||
|
try:
|
||||||
|
v = entry[field].value
|
||||||
|
return str(v)[:64] if v else default
|
||||||
|
except Exception:
|
||||||
|
return default
|
||||||
|
|
||||||
|
def expire_date(field: str) -> Optional[datetime]:
|
||||||
|
try:
|
||||||
|
v = entry[field].value
|
||||||
|
if v is None:
|
||||||
|
return None
|
||||||
|
if isinstance(v, datetime):
|
||||||
|
dt = v
|
||||||
|
else:
|
||||||
|
dt = datetime(v.year, v.month, v.day, tzinfo=timezone.utc)
|
||||||
|
if dt.tzinfo is None:
|
||||||
|
dt = dt.replace(tzinfo=timezone.utc)
|
||||||
|
return None if dt.year >= 2099 else dt
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return {
|
||||||
|
"username": val("sAMAccountName"),
|
||||||
|
"full_name": (f"{val('givenName', '')} {val('sn', '')}".strip() or "N.N."),
|
||||||
|
"department": val("department"),
|
||||||
|
"role": val("description"),
|
||||||
|
"account_expires": expire_date("accountExpires"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _has_to_update(db: Session, min_interval_hours: int) -> bool:
|
||||||
|
"""True if a background sync is due."""
|
||||||
|
deadline = datetime.now(timezone.utc) - timedelta(hours=min_interval_hours)
|
||||||
|
latest = (
|
||||||
|
db.query(User)
|
||||||
|
.filter(User.pw_hash.is_(None))
|
||||||
|
.order_by(User.updated_at.desc())
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if latest is None:
|
||||||
|
return True
|
||||||
|
last = latest.updated_at
|
||||||
|
if last.tzinfo is None:
|
||||||
|
last = last.replace(tzinfo=timezone.utc)
|
||||||
|
return last < deadline
|
||||||
|
|
||||||
|
|
||||||
|
def _upsert_from_attrs(db: Session, attrs: dict) -> None:
|
||||||
|
"""Create or update a User from AD attribute dict (no commit)."""
|
||||||
|
user = db.query(User).filter(User.username == attrs["username"]).first()
|
||||||
|
if user is None:
|
||||||
|
user = User(username=attrs["username"])
|
||||||
|
db.add(user)
|
||||||
|
user.full_name = attrs["full_name"]
|
||||||
|
user.department = attrs["department"]
|
||||||
|
user.role = attrs["role"]
|
||||||
|
user.account_expires = attrs["account_expires"]
|
||||||
|
|
||||||
|
|
||||||
|
def sync_all_users(
|
||||||
|
username: str,
|
||||||
|
password: str,
|
||||||
|
ldap_server: str,
|
||||||
|
ldap_domain: str,
|
||||||
|
ldap_search_base: str,
|
||||||
|
min_interval_hours: int,
|
||||||
|
letter_delay_seconds: float,
|
||||||
|
) -> None:
|
||||||
|
"""Login-triggered background sync of all AD users. Creates its own DB session."""
|
||||||
|
from app.core.database import SessionLocal
|
||||||
|
|
||||||
|
with SessionLocal() as db:
|
||||||
|
if not _has_to_update(db, min_interval_hours):
|
||||||
|
logger.info("LDAP sync skipped: last sync is recent")
|
||||||
|
return
|
||||||
|
|
||||||
|
server = Server(ldap_server, get_info=ALL, connect_timeout=8)
|
||||||
|
conn = Connection(
|
||||||
|
server,
|
||||||
|
user=f"{ldap_domain}\\{username}",
|
||||||
|
password=password,
|
||||||
|
authentication=NTLM,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
conn.bind()
|
||||||
|
except Exception:
|
||||||
|
logger.warning("LDAP sync: bind failed", exc_info=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
found_usernames: set[str] = set()
|
||||||
|
chars = "abcdefghijklmnopqrstuvwxyz_"
|
||||||
|
|
||||||
|
for char in chars:
|
||||||
|
try:
|
||||||
|
conn.search(
|
||||||
|
search_base=ldap_search_base,
|
||||||
|
search_filter=f"(&(objectclass=user)(CN={char}*))",
|
||||||
|
attributes=ALL_ATTRIBUTES,
|
||||||
|
)
|
||||||
|
for entry in conn.entries:
|
||||||
|
try:
|
||||||
|
attrs = _parse_entry(entry)
|
||||||
|
found_usernames.add(attrs["username"])
|
||||||
|
_upsert_from_attrs(db, attrs)
|
||||||
|
except Exception:
|
||||||
|
logger.warning("LDAP sync: failed to parse entry", exc_info=True)
|
||||||
|
db.commit()
|
||||||
|
except Exception:
|
||||||
|
logger.warning("LDAP sync: search failed for prefix '%s'", char, exc_info=True)
|
||||||
|
|
||||||
|
if letter_delay_seconds > 0:
|
||||||
|
time.sleep(letter_delay_seconds)
|
||||||
|
|
||||||
|
# Deactivate LDAP users no longer in AD
|
||||||
|
ldap_users = db.query(User).filter(User.pw_hash.is_(None)).all()
|
||||||
|
for user_obj in ldap_users:
|
||||||
|
if user_obj.username not in found_usernames:
|
||||||
|
logger.info("LDAP sync: deactivating %s (not in AD)", user_obj.username)
|
||||||
|
user_obj.is_active = False
|
||||||
|
db.commit()
|
||||||
|
conn.unbind()
|
||||||
|
logger.info("LDAP sync complete — %d users found", len(found_usernames))
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 4: Run to confirm pass**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
.venv/bin/pytest tests/test_ldap.py -v
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected: All 14 tests PASS.
|
||||||
|
|
||||||
|
- [ ] **Step 5: Run full suite**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
.venv/bin/pytest -v
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected: All tests PASS.
|
||||||
|
|
||||||
|
- [ ] **Step 6: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add app/modules/auth/ldap.py tests/test_ldap.py
|
||||||
|
git commit -m "feat: add LDAP functions (authenticate, sync, parse, upsert)"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 3: service.py — LDAP-Auth-Integration
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `app/modules/auth/service.py`
|
||||||
|
- Modify: `tests/test_auth_service.py`
|
||||||
|
|
||||||
|
- [ ] **Step 1: Append LDAP tests to `tests/test_auth_service.py`**
|
||||||
|
|
||||||
|
```python
|
||||||
|
# --- LDAP auth integration (uses mocked ldap_authenticate) ---
|
||||||
|
|
||||||
|
from unittest.mock import patch as mock_patch
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticate_creates_user_on_ldap_success(db):
|
||||||
|
ldap_attrs = {
|
||||||
|
"username": "newldap",
|
||||||
|
"full_name": "New LDAP User",
|
||||||
|
"department": "EFI",
|
||||||
|
"role": "ST",
|
||||||
|
"account_expires": None,
|
||||||
|
}
|
||||||
|
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs):
|
||||||
|
user = authenticate_user(db, "newldap", "password", ldap_enabled=True)
|
||||||
|
assert user is not None
|
||||||
|
assert user.username == "newldap"
|
||||||
|
assert user.full_name == "New LDAP User"
|
||||||
|
assert user.last_login is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticate_updates_existing_user_on_ldap_success(db):
|
||||||
|
existing = User(username="existing", full_name="Old Name", pw_hash=None)
|
||||||
|
db.add(existing)
|
||||||
|
db.commit()
|
||||||
|
ldap_attrs = {
|
||||||
|
"username": "existing",
|
||||||
|
"full_name": "New Name",
|
||||||
|
"department": "EFI",
|
||||||
|
"role": "PF",
|
||||||
|
"account_expires": None,
|
||||||
|
}
|
||||||
|
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs):
|
||||||
|
user = authenticate_user(db, "existing", "password", ldap_enabled=True)
|
||||||
|
assert user.full_name == "New Name"
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticate_returns_none_on_ldap_failure(db):
|
||||||
|
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=None):
|
||||||
|
result = authenticate_user(db, "anyone", "wrong", ldap_enabled=True)
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticate_blocked_user_not_bypassed_by_ldap(db):
|
||||||
|
blocked = User(username="blocked", full_name="B", pw_hash=None, is_active=False)
|
||||||
|
db.add(blocked)
|
||||||
|
db.commit()
|
||||||
|
ldap_attrs = {"username": "blocked", "full_name": "B", "department": "", "role": "", "account_expires": None}
|
||||||
|
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs):
|
||||||
|
result = authenticate_user(db, "blocked", "any", ldap_enabled=True)
|
||||||
|
assert result is None
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: Run to confirm new tests fail**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
.venv/bin/pytest tests/test_auth_service.py -v -k "ldap"
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected: FAIL — `authenticate_user` LDAP stub is still `pass`.
|
||||||
|
|
||||||
|
- [ ] **Step 3: Replace `app/modules/auth/service.py`**
|
||||||
|
|
||||||
|
```python
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from passlib.context import CryptContext
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from app.modules.auth.models import User
|
||||||
|
|
||||||
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||||
|
|
||||||
|
|
||||||
|
def hash_password(plain: str) -> str:
|
||||||
|
return pwd_context.hash(plain)
|
||||||
|
|
||||||
|
|
||||||
|
def verify_password(plain: str, hashed: str) -> bool:
|
||||||
|
return pwd_context.verify(plain, hashed)
|
||||||
|
|
||||||
|
|
||||||
|
def get_user(db: Session, username: str) -> Optional[User]:
|
||||||
|
return db.query(User).filter(User.username == username).first()
|
||||||
|
|
||||||
|
|
||||||
|
def upsert_ldap_user(db: Session, attrs: dict) -> User:
|
||||||
|
"""Create or update a User from LDAP attribute dict, commit and return."""
|
||||||
|
from app.modules.auth.ldap import _upsert_from_attrs
|
||||||
|
_upsert_from_attrs(db, attrs)
|
||||||
|
db.commit()
|
||||||
|
return get_user(db, attrs["username"])
|
||||||
|
|
||||||
|
|
||||||
|
def authenticate_user(
|
||||||
|
db: Session, username: str, password: str, ldap_enabled: bool
|
||||||
|
) -> Optional[User]:
|
||||||
|
user = get_user(db, username)
|
||||||
|
|
||||||
|
# Explicitly deactivated users are always blocked
|
||||||
|
if user is not None and not user.is_active:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Local auth
|
||||||
|
if user is not None and user.pw_hash is not None:
|
||||||
|
if verify_password(password, user.pw_hash):
|
||||||
|
_touch_last_login(db, user)
|
||||||
|
return user
|
||||||
|
|
||||||
|
# LDAP auth
|
||||||
|
if ldap_enabled:
|
||||||
|
from app.core.config import get_settings
|
||||||
|
from app.modules.auth.ldap import ldap_authenticate
|
||||||
|
s = get_settings()
|
||||||
|
attrs = ldap_authenticate(username, password, s.LDAP_SERVER, s.LDAP_DOMAIN)
|
||||||
|
if attrs is None:
|
||||||
|
return None
|
||||||
|
user = upsert_ldap_user(db, attrs)
|
||||||
|
_touch_last_login(db, user)
|
||||||
|
return user
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _touch_last_login(db: Session, user: User) -> None:
|
||||||
|
user.last_login = datetime.now(timezone.utc)
|
||||||
|
db.commit()
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 4: Run to confirm all service tests pass**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
.venv/bin/pytest tests/test_auth_service.py -v
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected: All 12 tests PASS (8 existing + 4 new).
|
||||||
|
|
||||||
|
- [ ] **Step 5: Run full suite**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
.venv/bin/pytest -v
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected: All tests PASS.
|
||||||
|
|
||||||
|
- [ ] **Step 6: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add app/modules/auth/service.py tests/test_auth_service.py
|
||||||
|
git commit -m "feat: implement LDAP auth fallback in authenticate_user"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 4: router.py — BackgroundTask für Sync
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `app/modules/auth/router.py`
|
||||||
|
- Modify: `tests/test_auth_router.py`
|
||||||
|
|
||||||
|
- [ ] **Step 1: Add LDAP login test to `tests/test_auth_router.py`**
|
||||||
|
|
||||||
|
Append this test to the existing file:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from unittest.mock import patch as mock_patch
|
||||||
|
|
||||||
|
|
||||||
|
def test_ldap_login_sets_cookie_and_redirects(client, override_db):
|
||||||
|
"""LDAP login via mocked ldap_authenticate: cookie is set, redirects to /."""
|
||||||
|
ldap_attrs = {
|
||||||
|
"username": "ldapuser",
|
||||||
|
"full_name": "LDAP User",
|
||||||
|
"department": "EFI",
|
||||||
|
"role": "ST",
|
||||||
|
"account_expires": None,
|
||||||
|
}
|
||||||
|
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs), \
|
||||||
|
mock_patch("app.modules.auth.ldap.sync_all_users"), \
|
||||||
|
mock_patch("app.modules.auth.router.settings") as mock_settings:
|
||||||
|
mock_settings.LDAP_ENABLED = True
|
||||||
|
mock_settings.LDAP_SERVER = "test"
|
||||||
|
mock_settings.LDAP_DOMAIN = "ADS1"
|
||||||
|
mock_settings.LDAP_SEARCH_BASE = "OU=test"
|
||||||
|
mock_settings.LDAP_SYNC_MIN_INTERVAL_HOURS = 12
|
||||||
|
mock_settings.LDAP_SYNC_LETTER_DELAY_SECONDS = 0.0
|
||||||
|
response = client.post("/auth/login", data={"username": "ldapuser", "password": "any"})
|
||||||
|
assert response.status_code in (302, 303, 307)
|
||||||
|
assert "access_token" in response.cookies
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: Run to confirm failure**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
.venv/bin/pytest tests/test_auth_router.py::test_ldap_login_sets_cookie_and_redirects -v
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected: FAIL — Router importiert `sync_all_users` und `BackgroundTasks` noch nicht.
|
||||||
|
|
||||||
|
- [ ] **Step 3: Replace `app/modules/auth/router.py`**
|
||||||
|
|
||||||
|
```python
|
||||||
|
from fastapi import APIRouter, BackgroundTasks, Depends, Form, Request
|
||||||
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||||
|
from fastapi.templating import Jinja2Templates
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from app.core.auth import clear_auth_cookie, create_access_token, set_auth_cookie
|
||||||
|
from app.core.config import get_settings
|
||||||
|
from app.core.database import get_db
|
||||||
|
from app.modules.auth.dependencies import get_current_user
|
||||||
|
from app.modules.auth.ldap import sync_all_users
|
||||||
|
from app.modules.auth.schemas import UserOut
|
||||||
|
from app.modules.auth.service import authenticate_user
|
||||||
|
|
||||||
|
router = APIRouter(prefix="/auth", tags=["auth"])
|
||||||
|
templates = Jinja2Templates(directory="app/templates")
|
||||||
|
settings = get_settings()
|
||||||
|
|
||||||
|
_NAV: list[dict] = []
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/login", response_class=HTMLResponse)
|
||||||
|
async def login_page(request: Request):
|
||||||
|
return templates.TemplateResponse(
|
||||||
|
request, "auth/login.html", {"nav_items": _NAV, "app_version": "0.1.0"}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/login", response_class=HTMLResponse)
|
||||||
|
async def login(
|
||||||
|
request: Request,
|
||||||
|
background_tasks: BackgroundTasks,
|
||||||
|
username: str = Form(...),
|
||||||
|
password: str = Form(...),
|
||||||
|
db: Session = Depends(get_db),
|
||||||
|
):
|
||||||
|
user = authenticate_user(db, username, password, ldap_enabled=settings.LDAP_ENABLED)
|
||||||
|
if user is None:
|
||||||
|
return templates.TemplateResponse(
|
||||||
|
request,
|
||||||
|
"auth/login.html",
|
||||||
|
{"nav_items": _NAV, "app_version": "0.1.0", "error": "Ungültige Zugangsdaten."},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
if settings.LDAP_ENABLED:
|
||||||
|
background_tasks.add_task(
|
||||||
|
sync_all_users,
|
||||||
|
username,
|
||||||
|
password,
|
||||||
|
settings.LDAP_SERVER,
|
||||||
|
settings.LDAP_DOMAIN,
|
||||||
|
settings.LDAP_SEARCH_BASE,
|
||||||
|
settings.LDAP_SYNC_MIN_INTERVAL_HOURS,
|
||||||
|
settings.LDAP_SYNC_LETTER_DELAY_SECONDS,
|
||||||
|
)
|
||||||
|
token = create_access_token(username=user.username, is_admin=user.is_admin)
|
||||||
|
response = RedirectResponse(url="/", status_code=303)
|
||||||
|
set_auth_cookie(response, token)
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/logout")
|
||||||
|
async def logout():
|
||||||
|
response = RedirectResponse(url="/", status_code=303)
|
||||||
|
clear_auth_cookie(response)
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/me", response_model=UserOut)
|
||||||
|
async def me(user=Depends(get_current_user)):
|
||||||
|
return user
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 4: Run all router tests**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
.venv/bin/pytest tests/test_auth_router.py -v
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected: All tests PASS.
|
||||||
|
|
||||||
|
- [ ] **Step 5: Run full suite**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
.venv/bin/pytest -v
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected: All tests PASS.
|
||||||
|
|
||||||
|
- [ ] **Step 6: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add app/modules/auth/router.py tests/test_auth_router.py
|
||||||
|
git commit -m "feat: trigger LDAP background sync on successful login"
|
||||||
|
```
|
||||||
139
docs/superpowers/specs/2026-04-27-auth-part2-ldap-design.md
Normal file
139
docs/superpowers/specs/2026-04-27-auth-part2-ldap-design.md
Normal file
@ -0,0 +1,139 @@
|
|||||||
|
# Design Spec: Benutzerverwaltung Part 2 — LDAP-Integration
|
||||||
|
|
||||||
|
**Datum:** 2026-04-27
|
||||||
|
**Status:** Genehmigt
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Ziel
|
||||||
|
|
||||||
|
LDAP-Authentifizierung gegen das Active Directory der TH Nürnberg als Fallback zur lokalen Auth. Bei erfolgreichem Login wird der lokale User-Eintrag angelegt/aktualisiert und ein Hintergrund-Sync aller AD-User angestoßen (login-getriggert, nicht periodisch).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## AD-Verbindungsparameter (bereits in config.py)
|
||||||
|
|
||||||
|
| Setting | Wert |
|
||||||
|
|---|---|
|
||||||
|
| `LDAP_SERVER` | `gso1.ads1.fh-nuernberg.de` |
|
||||||
|
| `LDAP_DOMAIN` | `ADS1` |
|
||||||
|
| `LDAP_SEARCH_BASE` | `OU=users,OU=EFI,OU=Faculties,DC=ADS1,DC=fh-nuernberg,DC=de` |
|
||||||
|
| `LDAP_ENABLED` | `False` (dev), `True` (prod) |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Section 1: LDAP-Authentifizierung
|
||||||
|
|
||||||
|
### Neue Config-Felder (`app/core/config.py`)
|
||||||
|
|
||||||
|
```python
|
||||||
|
LDAP_SYNC_MIN_INTERVAL_HOURS: int = 12 # Mindestabstand zwischen Syncs
|
||||||
|
LDAP_SYNC_LETTER_DELAY_SECONDS: float = 5.0 # Wartezeit zwischen Buchstaben-Batches
|
||||||
|
```
|
||||||
|
|
||||||
|
### Neue Datei `app/modules/auth/ldap.py`
|
||||||
|
|
||||||
|
Reine Funktionen — kein FastAPI, kein DB-Zugriff, nur ldap3.
|
||||||
|
|
||||||
|
**`ldap_authenticate(username, password) -> dict | None`**
|
||||||
|
Öffnet NTLM-Verbindung gegen `LDAP_SERVER`, bindet, liest User-Attribute und schließt die Verbindung wieder. Gibt bei Erfolg ein Dict zurück, bei falschem Passwort oder Verbindungsfehler `None` (Verbindungsfehler wird geloggt). Eine einzige Verbindung für Bind + Attributabfrage.
|
||||||
|
|
||||||
|
Rückgabe-Dict:
|
||||||
|
|
||||||
|
```python
|
||||||
|
{
|
||||||
|
"username": str, # sAMAccountName
|
||||||
|
"full_name": str, # f"{givenName} {sn}"
|
||||||
|
"department": str, # department (max 64 Zeichen, "N.N." wenn leer)
|
||||||
|
"role": str, # description (max 64 Zeichen, "N.N." wenn leer)
|
||||||
|
"account_expires": datetime | None, # accountExpires, None = kein Ablauf
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**`sync_all_users(username, password, db, settings) -> None`**
|
||||||
|
Synchronisiert alle AD-User alphabetisch (a–z + `_`). Läuft in einem `BackgroundTask`-Thread:
|
||||||
|
|
||||||
|
1. `has_to_update()` prüfen — wenn letzter Sync < `LDAP_SYNC_MIN_INTERVAL_HOURS`, abbrechen
|
||||||
|
2. Neue NTLM-Verbindung mit den übergebenen Credentials öffnen
|
||||||
|
3. Für jeden Buchstaben:
|
||||||
|
- AD-Einträge mit `CN=<buchstabe>*` laden
|
||||||
|
- Jeden Entry upserten (full_name, department, role, account_expires)
|
||||||
|
- `time.sleep(LDAP_SYNC_LETTER_DELAY_SECONDS)`
|
||||||
|
4. User in lokaler DB die **nicht** in AD gefunden wurden UND `pw_hash is None` → `is_active = False`
|
||||||
|
5. Verbindung schließen
|
||||||
|
|
||||||
|
`has_to_update()` liest den Zeitstempel des zuletzt geänderten Users aus der DB (entspricht Vorversion).
|
||||||
|
|
||||||
|
### Geänderte Datei `app/modules/auth/service.py`
|
||||||
|
|
||||||
|
**Geänderter Auth-Flow in `authenticate_user`:**
|
||||||
|
|
||||||
|
```
|
||||||
|
1. user = get_user(db, username)
|
||||||
|
2. user existiert UND is_active=False → return None (absolute Sperre)
|
||||||
|
3. user existiert UND pw_hash gesetzt:
|
||||||
|
→ verify_password(password, pw_hash)
|
||||||
|
→ OK: last_login setzen, return user
|
||||||
|
4. ldap_enabled:
|
||||||
|
→ attrs = ldap_authenticate(username, password)
|
||||||
|
→ attrs is None: return None
|
||||||
|
→ OK: upsert_ldap_user(db, attrs), last_login setzen, return user
|
||||||
|
5. return None
|
||||||
|
```
|
||||||
|
|
||||||
|
**Neue Funktion `upsert_ldap_user(db, username, attrs) -> User`**
|
||||||
|
Legt User an falls nicht vorhanden, schreibt AD-Felder (full_name, department, role, account_expires). Setzt `pw_hash` nicht (bleibt None für reine LDAP-User).
|
||||||
|
|
||||||
|
### Geänderte Datei `app/modules/auth/router.py`
|
||||||
|
|
||||||
|
`POST /auth/login` erhält einen `BackgroundTasks`-Parameter. Nach erfolgreichem LDAP-Login wird `sync_all_users` als BackgroundTask gestartet:
|
||||||
|
|
||||||
|
```python
|
||||||
|
@router.post("/login")
|
||||||
|
async def login(background_tasks: BackgroundTasks, ...):
|
||||||
|
user = authenticate_user(...)
|
||||||
|
if user and settings.LDAP_ENABLED:
|
||||||
|
background_tasks.add_task(sync_all_users, username, password, db, settings)
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Section 2: AD-Sync-Logik
|
||||||
|
|
||||||
|
### Sync-Verhalten
|
||||||
|
|
||||||
|
| Situation | Aktion |
|
||||||
|
|---|---|
|
||||||
|
| User in AD, in DB | full_name, department, role, account_expires aktualisieren |
|
||||||
|
| User in AD, nicht in DB | Neuen User anlegen (`is_active=True`, `pw_hash=None`) |
|
||||||
|
| User nicht in AD, in DB, `pw_hash is None` | `is_active = False` |
|
||||||
|
| User nicht in AD, in DB, `pw_hash` gesetzt | Unverändert (lokaler Account, z. B. Admin) |
|
||||||
|
|
||||||
|
### Timing
|
||||||
|
|
||||||
|
- Sync läuft nur wenn letzter Sync > `LDAP_SYNC_MIN_INTERVAL_HOURS` her
|
||||||
|
- Zwischen jedem Buchstaben-Batch: `LDAP_SYNC_LETTER_DELAY_SECONDS` Sleep
|
||||||
|
- Gesamtdauer ca. 27 × 5s ≈ 2–3 Minuten (im Hintergrund, blockiert nichts)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Dateistruktur
|
||||||
|
|
||||||
|
```
|
||||||
|
app/modules/auth/
|
||||||
|
├── ldap.py # neu: ldap_authenticate, ldap_get_user_attrs, sync_all_users
|
||||||
|
├── service.py # geändert: LDAP-Stub füllen, upsert_ldap_user
|
||||||
|
└── router.py # geändert: BackgroundTasks in POST /auth/login
|
||||||
|
app/core/
|
||||||
|
└── config.py # geändert: LDAP_SYNC_MIN_INTERVAL_HOURS, LDAP_SYNC_LETTER_DELAY_SECONDS
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Out of Scope
|
||||||
|
|
||||||
|
- Serviceaccount / periodischer Celery-Sync (→ mögliche spätere Erweiterung)
|
||||||
|
- `account_expires`-Prüfung beim Login (Blockierung läuft über AD-Bind-Fehler und `is_active`)
|
||||||
|
- Admin-UI für User-Verwaltung (→ separater Spec)
|
||||||
|
- Gruppen aus AD (→ Part 3)
|
||||||
@ -65,8 +65,35 @@ def test_login_unknown_user_shows_error(client):
|
|||||||
assert "Ungültige" in response.text
|
assert "Ungültige" in response.text
|
||||||
|
|
||||||
|
|
||||||
def test_logout_clears_cookie(client, alice):
|
def test_logout_clears_cookie_and_redirects_to_landing(client, alice):
|
||||||
client.post("/auth/login", data={"username": "alice", "password": "secret123"})
|
client.post("/auth/login", data={"username": "alice", "password": "secret123"})
|
||||||
response = client.get("/auth/logout")
|
response = client.get("/auth/logout")
|
||||||
assert response.status_code in (302, 303, 307)
|
assert response.status_code in (302, 303, 307)
|
||||||
|
assert response.headers["location"] == "/"
|
||||||
assert response.cookies.get("access_token", "") == ""
|
assert response.cookies.get("access_token", "") == ""
|
||||||
|
|
||||||
|
|
||||||
|
from unittest.mock import patch as mock_patch
|
||||||
|
|
||||||
|
|
||||||
|
def test_ldap_login_sets_cookie_and_redirects(client, override_db):
|
||||||
|
"""LDAP login via mocked ldap_authenticate: cookie is set, redirects to /."""
|
||||||
|
ldap_attrs = {
|
||||||
|
"username": "ldapuser",
|
||||||
|
"full_name": "LDAP User",
|
||||||
|
"department": "EFI",
|
||||||
|
"role": "ST",
|
||||||
|
"account_expires": None,
|
||||||
|
}
|
||||||
|
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs), \
|
||||||
|
mock_patch("app.modules.auth.ldap.sync_all_users"), \
|
||||||
|
mock_patch("app.modules.auth.router.settings") as mock_settings:
|
||||||
|
mock_settings.LDAP_ENABLED = True
|
||||||
|
mock_settings.LDAP_SERVER = "test"
|
||||||
|
mock_settings.LDAP_DOMAIN = "ADS1"
|
||||||
|
mock_settings.LDAP_SEARCH_BASE = "OU=test"
|
||||||
|
mock_settings.LDAP_SYNC_MIN_INTERVAL_HOURS = 12
|
||||||
|
mock_settings.LDAP_SYNC_LETTER_DELAY_SECONDS = 0.0
|
||||||
|
response = client.post("/auth/login", data={"username": "ldapuser", "password": "any"})
|
||||||
|
assert response.status_code in (302, 303, 307)
|
||||||
|
assert "access_token" in response.cookies
|
||||||
|
|||||||
@ -67,3 +67,56 @@ def test_authenticate_no_pw_hash_no_ldap(db):
|
|||||||
db.add(user)
|
db.add(user)
|
||||||
db.commit()
|
db.commit()
|
||||||
assert authenticate_user(db, "ldaponly", "any", ldap_enabled=False) is None
|
assert authenticate_user(db, "ldaponly", "any", ldap_enabled=False) is None
|
||||||
|
|
||||||
|
|
||||||
|
# --- LDAP auth integration (uses mocked ldap_authenticate) ---
|
||||||
|
|
||||||
|
from unittest.mock import patch as mock_patch
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticate_creates_user_on_ldap_success(db):
|
||||||
|
ldap_attrs = {
|
||||||
|
"username": "newldap",
|
||||||
|
"full_name": "New LDAP User",
|
||||||
|
"department": "EFI",
|
||||||
|
"role": "ST",
|
||||||
|
"account_expires": None,
|
||||||
|
}
|
||||||
|
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs):
|
||||||
|
user = authenticate_user(db, "newldap", "password", ldap_enabled=True)
|
||||||
|
assert user is not None
|
||||||
|
assert user.username == "newldap"
|
||||||
|
assert user.full_name == "New LDAP User"
|
||||||
|
assert user.last_login is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticate_updates_existing_user_on_ldap_success(db):
|
||||||
|
existing = User(username="existing", full_name="Old Name", pw_hash=None)
|
||||||
|
db.add(existing)
|
||||||
|
db.commit()
|
||||||
|
ldap_attrs = {
|
||||||
|
"username": "existing",
|
||||||
|
"full_name": "New Name",
|
||||||
|
"department": "EFI",
|
||||||
|
"role": "PF",
|
||||||
|
"account_expires": None,
|
||||||
|
}
|
||||||
|
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs):
|
||||||
|
user = authenticate_user(db, "existing", "password", ldap_enabled=True)
|
||||||
|
assert user.full_name == "New Name"
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticate_returns_none_on_ldap_failure(db):
|
||||||
|
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=None):
|
||||||
|
result = authenticate_user(db, "anyone", "wrong", ldap_enabled=True)
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticate_blocked_user_not_bypassed_by_ldap(db):
|
||||||
|
blocked = User(username="blocked", full_name="B", pw_hash=None, is_active=False)
|
||||||
|
db.add(blocked)
|
||||||
|
db.commit()
|
||||||
|
ldap_attrs = {"username": "blocked", "full_name": "B", "department": "", "role": "", "account_expires": None}
|
||||||
|
with mock_patch("app.modules.auth.ldap.ldap_authenticate", return_value=ldap_attrs):
|
||||||
|
result = authenticate_user(db, "blocked", "any", ldap_enabled=True)
|
||||||
|
assert result is None
|
||||||
|
|||||||
@ -9,3 +9,5 @@ def test_new_config_fields_have_defaults():
|
|||||||
assert s.LDAP_SERVER == "gso1.ads1.fh-nuernberg.de"
|
assert s.LDAP_SERVER == "gso1.ads1.fh-nuernberg.de"
|
||||||
assert s.LDAP_DOMAIN == "ADS1"
|
assert s.LDAP_DOMAIN == "ADS1"
|
||||||
assert "EFI" in s.LDAP_SEARCH_BASE
|
assert "EFI" in s.LDAP_SEARCH_BASE
|
||||||
|
assert s.LDAP_SYNC_MIN_INTERVAL_HOURS == 12
|
||||||
|
assert s.LDAP_SYNC_LETTER_DELAY_SECONDS == 5.0
|
||||||
|
|||||||
@ -42,33 +42,51 @@ def auth_cookies(override_db):
|
|||||||
return {"access_token": token}
|
return {"access_token": token}
|
||||||
|
|
||||||
|
|
||||||
def test_landing_without_auth_redirects(client):
|
@pytest.fixture
|
||||||
|
def admin_cookies(override_db):
|
||||||
|
user = User(username="adminuser", full_name="Admin User", is_admin=True, pw_hash=hash_password("pw"))
|
||||||
|
override_db.add(user)
|
||||||
|
override_db.commit()
|
||||||
|
token = create_access_token(username="adminuser", is_admin=True)
|
||||||
|
return {"access_token": token}
|
||||||
|
|
||||||
|
|
||||||
|
def test_landing_accessible_without_auth(client):
|
||||||
response = client.get("/")
|
response = client.get("/")
|
||||||
assert response.status_code == 307
|
|
||||||
assert "/auth/login" in response.headers["location"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_landing_returns_html(client, auth_cookies):
|
|
||||||
response = client.get("/", cookies=auth_cookies)
|
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert "text/html" in response.headers["content-type"]
|
assert "text/html" in response.headers["content-type"]
|
||||||
|
|
||||||
|
|
||||||
def test_landing_contains_title(client, auth_cookies):
|
def test_landing_shows_no_modules_for_anonymous(client):
|
||||||
response = client.get("/", cookies=auth_cookies)
|
response = client.get("/")
|
||||||
assert "University Process Hub" in response.text
|
assert response.status_code == 200
|
||||||
|
assert "RSS-Feed Server" not in response.text
|
||||||
|
|
||||||
|
|
||||||
def test_landing_contains_rss_module(client, auth_cookies):
|
def test_landing_shows_modules_for_authenticated(client, auth_cookies):
|
||||||
response = client.get("/", cookies=auth_cookies)
|
response = client.get("/", cookies=auth_cookies)
|
||||||
|
assert response.status_code == 200
|
||||||
assert "RSS-Feed Server" in response.text
|
assert "RSS-Feed Server" in response.text
|
||||||
|
|
||||||
|
|
||||||
def test_landing_navbar_links_present(client, auth_cookies):
|
def test_landing_contains_title(client):
|
||||||
response = client.get("/", cookies=auth_cookies)
|
response = client.get("/")
|
||||||
assert "Übersicht" in response.text
|
assert "University Process Hub" in response.text
|
||||||
|
|
||||||
|
|
||||||
def test_landing_info_strip_shows_db_mode(client, auth_cookies):
|
def test_landing_info_strip_shows_db_mode(client, auth_cookies):
|
||||||
response = client.get("/", cookies=auth_cookies)
|
response = client.get("/", cookies=auth_cookies)
|
||||||
assert "SQLite" in response.text or "MariaDB" in response.text
|
assert "SQLite" in response.text or "MariaDB" in response.text
|
||||||
|
|
||||||
|
|
||||||
|
def test_navbar_shows_login_button_for_anonymous(client):
|
||||||
|
response = client.get("/")
|
||||||
|
assert "Anmelden" in response.text
|
||||||
|
assert "Abmelden" not in response.text
|
||||||
|
|
||||||
|
|
||||||
|
def test_navbar_shows_username_and_logout_when_logged_in(client, auth_cookies):
|
||||||
|
response = client.get("/", cookies=auth_cookies)
|
||||||
|
assert "testuser" in response.text
|
||||||
|
assert "Abmelden" in response.text
|
||||||
|
assert "Anmelden" not in response.text
|
||||||
|
|||||||
209
tests/test_ldap.py
Normal file
209
tests/test_ldap.py
Normal file
@ -0,0 +1,209 @@
|
|||||||
|
import time
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
from sqlalchemy.orm import sessionmaker
|
||||||
|
from sqlalchemy.pool import StaticPool
|
||||||
|
|
||||||
|
from app.core.database import Base
|
||||||
|
from app.modules.auth.ldap import (
|
||||||
|
_has_to_update,
|
||||||
|
_parse_entry,
|
||||||
|
_upsert_from_attrs,
|
||||||
|
ldap_authenticate,
|
||||||
|
)
|
||||||
|
from app.modules.auth.models import User
|
||||||
|
|
||||||
|
|
||||||
|
# --- Helpers ---
|
||||||
|
|
||||||
|
def _make_entry(**kwargs):
|
||||||
|
defaults = {
|
||||||
|
"sAMAccountName": "hofmannol",
|
||||||
|
"givenName": "Oliver",
|
||||||
|
"sn": "Hofmann",
|
||||||
|
"department": "EFI",
|
||||||
|
"description": "PF",
|
||||||
|
"accountExpires": None,
|
||||||
|
}
|
||||||
|
defaults.update(kwargs)
|
||||||
|
|
||||||
|
class _Attr:
|
||||||
|
def __init__(self, value):
|
||||||
|
self.value = value
|
||||||
|
|
||||||
|
class _Entry:
|
||||||
|
def __getitem__(self, key):
|
||||||
|
return _Attr(defaults.get(key))
|
||||||
|
|
||||||
|
return _Entry()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def db():
|
||||||
|
engine = create_engine(
|
||||||
|
"sqlite:///:memory:",
|
||||||
|
connect_args={"check_same_thread": False},
|
||||||
|
poolclass=StaticPool,
|
||||||
|
)
|
||||||
|
Base.metadata.create_all(bind=engine)
|
||||||
|
Session = sessionmaker(bind=engine)
|
||||||
|
session = Session()
|
||||||
|
yield session
|
||||||
|
session.close()
|
||||||
|
Base.metadata.drop_all(bind=engine)
|
||||||
|
|
||||||
|
|
||||||
|
# --- _parse_entry ---
|
||||||
|
|
||||||
|
def test_parse_entry_basic():
|
||||||
|
result = _parse_entry(_make_entry())
|
||||||
|
assert result["username"] == "hofmannol"
|
||||||
|
assert result["full_name"] == "Oliver Hofmann"
|
||||||
|
assert result["department"] == "EFI"
|
||||||
|
assert result["role"] == "PF"
|
||||||
|
assert result["account_expires"] is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_entry_truncates_long_fields():
|
||||||
|
result = _parse_entry(_make_entry(department="A" * 100))
|
||||||
|
assert len(result["department"]) == 64
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_entry_uses_nn_for_none_fields():
|
||||||
|
result = _parse_entry(_make_entry(department=None, description=None))
|
||||||
|
assert result["department"] == "N.N."
|
||||||
|
assert result["role"] == "N.N."
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_entry_handles_none_account_expires():
|
||||||
|
result = _parse_entry(_make_entry(accountExpires=None))
|
||||||
|
assert result["account_expires"] is None
|
||||||
|
|
||||||
|
|
||||||
|
# --- _has_to_update ---
|
||||||
|
|
||||||
|
def test_has_to_update_true_when_no_ldap_users(db):
|
||||||
|
assert _has_to_update(db, 12) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_has_to_update_false_when_recent_ldap_user(db):
|
||||||
|
user = User(
|
||||||
|
username="u", full_name="U", pw_hash=None,
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
db.add(user)
|
||||||
|
db.commit()
|
||||||
|
assert _has_to_update(db, 12) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_has_to_update_true_when_old_ldap_user(db):
|
||||||
|
old_time = datetime.now(timezone.utc) - timedelta(hours=25)
|
||||||
|
user = User(username="u", full_name="U", pw_hash=None, updated_at=old_time)
|
||||||
|
db.add(user)
|
||||||
|
db.commit()
|
||||||
|
assert _has_to_update(db, 12) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_has_to_update_ignores_local_users(db):
|
||||||
|
# Local user (pw_hash set) should NOT count for sync timing
|
||||||
|
user = User(
|
||||||
|
username="admin", full_name="Admin", pw_hash="hash",
|
||||||
|
updated_at=datetime.now(timezone.utc),
|
||||||
|
)
|
||||||
|
db.add(user)
|
||||||
|
db.commit()
|
||||||
|
assert _has_to_update(db, 12) is True
|
||||||
|
|
||||||
|
|
||||||
|
# --- _upsert_from_attrs ---
|
||||||
|
|
||||||
|
def test_upsert_creates_new_user(db):
|
||||||
|
attrs = {
|
||||||
|
"username": "newuser",
|
||||||
|
"full_name": "New User",
|
||||||
|
"department": "EFI",
|
||||||
|
"role": "ST",
|
||||||
|
"account_expires": None,
|
||||||
|
}
|
||||||
|
_upsert_from_attrs(db, attrs)
|
||||||
|
db.commit()
|
||||||
|
user = db.query(User).filter(User.username == "newuser").first()
|
||||||
|
assert user is not None
|
||||||
|
assert user.full_name == "New User"
|
||||||
|
assert user.pw_hash is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_upsert_updates_existing_user(db):
|
||||||
|
user = User(username="existing", full_name="Old Name", pw_hash=None)
|
||||||
|
db.add(user)
|
||||||
|
db.commit()
|
||||||
|
attrs = {
|
||||||
|
"username": "existing",
|
||||||
|
"full_name": "New Name",
|
||||||
|
"department": "EFI",
|
||||||
|
"role": "PF",
|
||||||
|
"account_expires": None,
|
||||||
|
}
|
||||||
|
_upsert_from_attrs(db, attrs)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(user)
|
||||||
|
assert user.full_name == "New Name"
|
||||||
|
assert user.role == "PF"
|
||||||
|
|
||||||
|
|
||||||
|
# --- ldap_authenticate ---
|
||||||
|
|
||||||
|
@patch("app.modules.auth.ldap.Server")
|
||||||
|
@patch("app.modules.auth.ldap.Connection")
|
||||||
|
def test_ldap_authenticate_success(mock_conn_cls, mock_server_cls):
|
||||||
|
mock_conn = MagicMock()
|
||||||
|
mock_conn_cls.return_value = mock_conn
|
||||||
|
mock_conn.extend.standard.who_am_i.return_value = "u:ADS1\\hofmannol"
|
||||||
|
mock_conn.entries = [_make_entry()]
|
||||||
|
|
||||||
|
result = ldap_authenticate("hofmannol", "password", "server.example", "ADS1")
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert result["username"] == "hofmannol"
|
||||||
|
assert result["full_name"] == "Oliver Hofmann"
|
||||||
|
mock_conn.bind.assert_called_once()
|
||||||
|
mock_conn.unbind.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
@patch("app.modules.auth.ldap.Server")
|
||||||
|
@patch("app.modules.auth.ldap.Connection")
|
||||||
|
def test_ldap_authenticate_returns_none_on_socket_error(mock_conn_cls, mock_server_cls):
|
||||||
|
from ldap3.core.exceptions import LDAPSocketOpenError
|
||||||
|
mock_conn = MagicMock()
|
||||||
|
mock_conn_cls.return_value = mock_conn
|
||||||
|
mock_conn.bind.side_effect = LDAPSocketOpenError("unreachable")
|
||||||
|
|
||||||
|
result = ldap_authenticate("hofmannol", "pw", "server.example", "ADS1")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
@patch("app.modules.auth.ldap.Server")
|
||||||
|
@patch("app.modules.auth.ldap.Connection")
|
||||||
|
def test_ldap_authenticate_returns_none_when_who_am_i_empty(mock_conn_cls, mock_server_cls):
|
||||||
|
mock_conn = MagicMock()
|
||||||
|
mock_conn_cls.return_value = mock_conn
|
||||||
|
mock_conn.extend.standard.who_am_i.return_value = None
|
||||||
|
|
||||||
|
result = ldap_authenticate("hofmannol", "wrong", "server.example", "ADS1")
|
||||||
|
assert result is None
|
||||||
|
mock_conn.unbind.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
@patch("app.modules.auth.ldap.Server")
|
||||||
|
@patch("app.modules.auth.ldap.Connection")
|
||||||
|
def test_ldap_authenticate_returns_none_when_no_entries(mock_conn_cls, mock_server_cls):
|
||||||
|
mock_conn = MagicMock()
|
||||||
|
mock_conn_cls.return_value = mock_conn
|
||||||
|
mock_conn.extend.standard.who_am_i.return_value = "u:ADS1\\hofmannol"
|
||||||
|
mock_conn.entries = []
|
||||||
|
|
||||||
|
result = ldap_authenticate("hofmannol", "pw", "server.example", "ADS1")
|
||||||
|
assert result is None
|
||||||
Loading…
x
Reference in New Issue
Block a user