efihub/app/modules/auth/service.py

65 lines
1.8 KiB
Python

from datetime import datetime, timezone
from typing import Optional
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from app.modules.auth.models import User
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(plain: str) -> str:
return pwd_context.hash(plain)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def get_user(db: Session, username: str) -> Optional[User]:
return db.query(User).filter(User.username == username).first()
def upsert_ldap_user(db: Session, attrs: dict) -> User:
"""Create or update a User from LDAP attribute dict, commit and return."""
from app.modules.auth.ldap import _upsert_from_attrs
_upsert_from_attrs(db, attrs)
db.commit()
return get_user(db, attrs["username"])
def authenticate_user(
db: Session, username: str, password: str, ldap_enabled: bool
) -> Optional[User]:
user = get_user(db, username)
# Explicitly deactivated users are always blocked
if user is not None and not user.is_active:
return None
# Local auth
if user is not None and user.pw_hash is not None:
if verify_password(password, user.pw_hash):
_touch_last_login(db, user)
return user
# LDAP auth
if ldap_enabled:
from app.core.config import get_settings
from app.modules.auth.ldap import ldap_authenticate
s = get_settings()
attrs = ldap_authenticate(username, password, s.LDAP_SERVER, s.LDAP_DOMAIN)
if attrs is None:
return None
user = upsert_ldap_user(db, attrs)
_touch_last_login(db, user)
return user
return None
def _touch_last_login(db: Session, user: User) -> None:
user.last_login = datetime.now(timezone.utc)
db.commit()