import logging import time from datetime import datetime, timedelta, timezone from typing import Optional from ldap3 import ALL, ALL_ATTRIBUTES, NTLM, Connection, Server from ldap3.core.exceptions import LDAPSocketOpenError from sqlalchemy.orm import Session from app.modules.auth.models import User logger = logging.getLogger(__name__) def ldap_authenticate( username: str, password: str, ldap_server: str, ldap_domain: str, ) -> Optional[dict]: """NTLM bind + fetch user attributes. Returns attrs dict on success, None on failure.""" server = Server(ldap_server, get_info=ALL, connect_timeout=8) conn = Connection( server, user=f"{ldap_domain}\\{username}", password=password, authentication=NTLM, ) try: conn.bind() except LDAPSocketOpenError: logger.warning("LDAP server %s not reachable", ldap_server) return None except Exception: logger.warning("LDAP bind failed for user %s", username, exc_info=True) return None if not conn.extend.standard.who_am_i(): conn.unbind() return None conn.search( search_base=f"DC={ldap_domain},DC=fh-nuernberg,DC=de", search_filter=f"(&(objectclass=user)(CN={username}))", attributes=ALL_ATTRIBUTES, ) if not conn.entries: conn.unbind() logger.warning("LDAP: user %s authenticated but no entry found", username) return None attrs = _parse_entry(conn.entries[0]) conn.unbind() return attrs def _parse_entry(entry) -> dict: def val(field: str, default: str = "N.N.") -> str: try: v = entry[field].value return str(v)[:64] if v else default except Exception: return default def expire_date(field: str) -> Optional[datetime]: try: v = entry[field].value if v is None: return None if isinstance(v, datetime): dt = v else: dt = datetime(v.year, v.month, v.day, tzinfo=timezone.utc) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return None if dt.year >= 2099 else dt except Exception: return None return { "username": val("sAMAccountName"), "full_name": (f"{val('givenName', '')} {val('sn', '')}".strip() or "N.N."), "department": val("department"), "role": val("description"), "account_expires": expire_date("accountExpires"), } def _has_to_update(db: Session, min_interval_hours: int) -> bool: """True if a background sync is due.""" deadline = datetime.now(timezone.utc) - timedelta(hours=min_interval_hours) latest = ( db.query(User) .filter(User.pw_hash.is_(None)) .order_by(User.updated_at.desc()) .first() ) if latest is None: return True last = latest.updated_at if last.tzinfo is None: last = last.replace(tzinfo=timezone.utc) return last < deadline def _upsert_from_attrs(db: Session, attrs: dict) -> None: """Create or update a User from AD attribute dict (no commit).""" user = db.query(User).filter(User.username == attrs["username"]).first() if user is None: user = User(username=attrs["username"]) db.add(user) user.full_name = attrs["full_name"] user.department = attrs["department"] user.role = attrs["role"] user.account_expires = attrs["account_expires"] def sync_all_users( username: str, password: str, ldap_server: str, ldap_domain: str, ldap_search_base: str, min_interval_hours: int, letter_delay_seconds: float, ) -> None: """Login-triggered background sync of all AD users. Creates its own DB session.""" from app.core.database import SessionLocal with SessionLocal() as db: if not _has_to_update(db, min_interval_hours): logger.info("LDAP sync skipped: last sync is recent") return server = Server(ldap_server, get_info=ALL, connect_timeout=8) conn = Connection( server, user=f"{ldap_domain}\\{username}", password=password, authentication=NTLM, ) try: conn.bind() except Exception: logger.warning("LDAP sync: bind failed", exc_info=True) return found_usernames: set[str] = set() chars = "abcdefghijklmnopqrstuvwxyz_" for char in chars: try: conn.search( search_base=ldap_search_base, search_filter=f"(&(objectclass=user)(CN={char}*))", attributes=ALL_ATTRIBUTES, ) for entry in conn.entries: try: attrs = _parse_entry(entry) found_usernames.add(attrs["username"]) _upsert_from_attrs(db, attrs) except Exception: logger.warning("LDAP sync: failed to parse entry", exc_info=True) db.commit() except Exception: logger.warning("LDAP sync: search failed for prefix '%s'", char, exc_info=True) if letter_delay_seconds > 0: time.sleep(letter_delay_seconds) # Deactivate LDAP users no longer in AD ldap_users = db.query(User).filter(User.pw_hash.is_(None)).all() for user_obj in ldap_users: if user_obj.username not in found_usernames: logger.info("LDAP sync: deactivating %s (not in AD)", user_obj.username) user_obj.is_active = False db.commit() conn.unbind() logger.info("LDAP sync complete — %d users found", len(found_usernames))