2025-03-24 14:34:56 +01:00

168 lines
4.9 KiB
Python

import logging
import datetime
import pytz
import string
import threading
import time
from ldap3 import Server, Connection, ALL, NTLM, ALL_ATTRIBUTES
from django.core.exceptions import ValidationError
from django.utils import timezone
from authstuff.models import ActiveDirectoryEntry
from django.conf import settings
logger = logging.getLogger(__name__)
LOGIN_FIELD = "sAMAccountName"
EXPIRE_FIELD = "accountExpires"
DEPARTMENT_FIELD = "department"
GIVENNAME_FIELD = "givenName"
LASTNAME_FIELD = "sn"
ROLE_FIELD = "description"
class UpdateThread(threading.Thread):
def __init__(self, connection, domain, delta):
super(UpdateThread, self).__init__(group=None)
self.logger = logging.getLogger(__name__)
self.connection = connection
self.domain = domain
self.delta = delta
def run(self):
if has_to_update(self.delta):
try:
startchars = string.ascii_lowercase[:26] + "_"
for char in startchars:
self.logger.info('Update AD entries with leading {}'.format(char))
update_entries_with_leading_char(self.connection, self.domain, char)
time.sleep(5)
except Exception as e:
self.logger.error(str(e))
self.connection.unbind()
def last_update():
result = datetime.datetime.combine(datetime.date.min, datetime.time.min)
try:
entry = ActiveDirectoryEntry.objects.latest('modified')
result = entry.modified
except:
logger.debug('No last update found')
logger.info('Last update: {}'.format(result))
return result
def has_to_update(delta):
deadline = timezone.now() - delta
deadline = deadline.replace(tzinfo=pytz.UTC)
last = last_update().replace(tzinfo=pytz.UTC)
result = settings.LDAP_FORCE_UPDATE or (last < deadline)
logger.info('Has to update: {}'.format(result))
return result
def update_entries(user, pwd, delta):
dom = "ADS1"
conn = get_connection(user, pwd, dom)
update_entries_with_conn(conn, dom, delta)
def update_entries_with_conn(conn, dom, delta):
asyncThread = UpdateThread(conn, dom, delta)
asyncThread.start()
def update_entries_with_leading_char(conn, dom, char):
conn.search(
search_base='OU=users,OU=EFI,OU=Faculties,DC=' + dom + ',DC=fh-nuernberg,DC=de',
search_filter='(&(objectclass=user)(CN=' + char + '*))',
attributes=ALL_ATTRIBUTES)
logger.info('Found {} entries'.format(len(conn.entries)))
for entry in conn.entries:
try:
clear_and_update_entry(entry)
except Exception as e:
logger.error(str(e))
def clear_and_update_entry(entry):
dic = parse_entry_dic(entry)
dic = clear_expire_date(dic)
dic = clear_department(dic)
dic = clear_description(dic)
update_entry(dic)
def update_entry(entry_dic):
login = entry_dic[LOGIN_FIELD]
try:
entry = ActiveDirectoryEntry.objects.get(canonicalName=login)
logger.debug('Update entry {}'.format(login))
except:
logger.debug('New entry {}'.format(login))
entry = ActiveDirectoryEntry()
entry.canonicalName = entry_dic[LOGIN_FIELD]
entry.accountExpires = entry_dic[EXPIRE_FIELD]
entry.department = entry_dic[DEPARTMENT_FIELD]
entry.givenName = entry_dic[GIVENNAME_FIELD]
entry.lastName = entry_dic[LASTNAME_FIELD]
entry.role = entry_dic[ROLE_FIELD]
entry.modified = timezone.now()
logger.debug(entry)
try:
entry.full_clean()
except ValidationError as e:
logger.warning('AD entry not valid: {}'.format(e.message_dict))
entry.save()
def get_connection(user, pwd, dom):
server = Server('gso1.ads1.fh-nuernberg.de', get_info=ALL)
conn = Connection(
server,
user=dom + "\\" + user,
password=pwd,
authentication=NTLM)
conn.bind()
return conn
def parse_entry_dic(entry):
def value_of_field(f):
try:
return entry[f].value
except:
return None
fieldNames = [LOGIN_FIELD, EXPIRE_FIELD, DEPARTMENT_FIELD, GIVENNAME_FIELD, LASTNAME_FIELD, ROLE_FIELD]
fields = map(value_of_field, fieldNames )
return dict(zip(fieldNames, fields))
def clear_expire_date(dic):
if dic[EXPIRE_FIELD] is not None:
expire_datetime = dic[EXPIRE_FIELD].replace(tzinfo=pytz.UTC)
else:
expire_datetime = datetime.datetime(datetime.MAXYEAR, 12, 31, tzinfo=pytz.UTC)
if expire_datetime.year == datetime.MAXYEAR:
expire_datetime = expire_datetime.replace(year=2099, hour=0, minute=0, second=0, microsecond=0)
dic[EXPIRE_FIELD] = expire_datetime
return dic
def clear_department(dic):
dic[DEPARTMENT_FIELD] = string_adjust(dic[DEPARTMENT_FIELD])
return dic
def clear_description(dic):
dic[ROLE_FIELD] = string_adjust(dic[ROLE_FIELD])
return dic
def string_adjust(s):
if s is not None:
return s[:32]
else:
return "N.N."