import datetime import sqlite3 from datetime import datetime class SecretSantaDB: def __init__(self): self.connection = sqlite3.connect("secret_santa_db.db", check_same_thread=False) self.connection.execute("PRAGMA foreign_keys = ON;") self.cursor = self.connection.cursor() self.create_tables() def create_tables(self): self.cursor.executescript( """ CREATE TABLE IF NOT EXISTS CYCLES ( CYCLE_ID INTEGER PRIMARY KEY, CONSTELLATION_KEY TEXT NOT NULL, START_DATE TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS ROUNDS ( ROUND_ID INTEGER PRIMARY KEY, CYCLE_ID INTEGER NOT NULL, ROUND_NAME TEXT, CONSTELLATION_KEY TEXT NOT NULL, CREATION_DATE TEXT NOT NULL, FOREIGN KEY (CYCLE_ID) REFERENCES CYCLES (CYCLE_ID) ); CREATE TABLE IF NOT EXISTS PARTICIPANTS ( IMP_ID INTEGER PRIMARY KEY, ROUND_ID INTEGER NOT NULL, NAME TEXT NOT NULL, WISH TEXT, EMAIL TEXT, FOREIGN KEY (ROUND_ID) REFERENCES ROUNDS (ROUND_ID), UNIQUE (ROUND_ID, EMAIL) ); CREATE TABLE IF NOT EXISTS PAIRS ( ROUND_ID INTEGER NOT NULL, GIVER_IMP_ID INTEGER NOT NULL, RECEIVER_IMP_ID INTEGER NOT NULL, FOREIGN KEY (ROUND_ID) REFERENCES ROUNDS (ROUND_ID), FOREIGN KEY (GIVER_IMP_ID) REFERENCES PARTICIPANTS (IMP_ID), FOREIGN KEY (RECEIVER_IMP_ID) REFERENCES PARTICIPANTS (IMP_ID), UNIQUE (ROUND_ID, GIVER_IMP_ID), UNIQUE (ROUND_ID, RECEIVER_IMP_ID), CHECK (GIVER_IMP_ID != RECEIVER_IMP_ID) ); """ ) self.commit_to_db() def flush_tables(self): self.connection.execute("PRAGMA foreign_keys = OFF;") self.cursor.executescript( """ DROP TABLE ROUNDS; DROP TABLE PARTICIPANTS; DROP TABLE PAIRS; DROP TABLE CYCLES; """ ) self.commit_to_db() self.connection.execute("PRAGMA foreign_keys = ON;") def add_new_cycle(self, constellation_key: str) -> int: now = datetime.now().isoformat(timespec="seconds") self.cursor.execute( """ INSERT INTO CYCLES (CONSTELLATION_KEY, START_DATE) VALUES (?, ?) """, (constellation_key, now,) ) return self.cursor.lastrowid def get_latest_cycle_id(self, constellation_key: str) -> int | None: self.cursor.execute( """ SELECT CYCLE_ID FROM CYCLES WHERE CONSTELLATION_KEY = ? ORDER BY CYCLE_ID DESC LIMIT 1 """, (constellation_key,) ) row = self.cursor.fetchone() return row[0] if row else None def count_rounds_in_cycle(self, cycle_id: int) -> int: self.cursor.execute( """ SELECT COUNT(*) FROM ROUNDS WHERE CYCLE_ID = ? """, (cycle_id,) ) return self.cursor.fetchone()[0] def pair_used_in_cycle(self, cycle_id: int, giver_email: str, receiver_email: str) -> bool: self.cursor.execute( """ SELECT 1 FROM PAIRS p JOIN ROUNDS r ON r.ROUND_ID = p.ROUND_ID JOIN PARTICIPANTS g ON g.IMP_ID = p.GIVER_IMP_ID JOIN PARTICIPANTS rec ON rec.IMP_ID = p.RECEIVER_IMP_ID WHERE r.CYCLE_ID = ? AND lower(g.EMAIL) = lower(?) AND lower(rec.EMAIL) = lower(?) LIMIT 1 """, (cycle_id, giver_email, receiver_email) ) return self.cursor.fetchone() is not None def add_participants(self, round_id:int, imps: dict): for name, data in imps.items(): wish = data["wish"] email = data["email"] self.add_new_participant(round_id, name, wish, email) def add_new_round(self,round_name:str, constellation_key: str, cycle_id: int) -> tuple[int, str]: created_at = datetime.now().isoformat(timespec="seconds") self.cursor.execute( """ INSERT INTO ROUNDS (CREATION_DATE, CONSTELLATION_KEY, CYCLE_ID, ROUND_NAME) VALUES (?, ?, ?, ?) """, (created_at, constellation_key, cycle_id, round_name,) ) return self.cursor.lastrowid, created_at def add_new_participant(self, round_id:int, name:str, wish: str | None, email: str) -> int: self.cursor.execute( """ INSERT INTO PARTICIPANTS (ROUND_ID, NAME, WISH, EMAIL) VALUES (?, ?, ?, ?) """, (round_id, name, wish, email,) ) return self.cursor.lastrowid def add_new_pair(self, round_id:int, giver_id:int, receiver_id:int): self.cursor.execute( """ INSERT INTO PAIRS (ROUND_ID, GIVER_IMP_ID, RECEIVER_IMP_ID) VALUES (?, ?, ?) """, (round_id, giver_id, receiver_id,) ) def check_constellation(self, imps: dict) -> bool: prev_id = None for name, email, wish in imps.items(): self.cursor.execute( """ SELECT ROUND_ID FROM PARTICIPANTS WHERE EMAIL = ? """, (imps[email]) ) row = self.cursor.fetchone() current_id = row[0] if prev_id is None: prev_id = current_id if current_id != prev_id: return False else: prev_id = current_id return True def get_imp_id_from_name(self, round_id, name:str) -> int | None: self.cursor.execute( """ SELECT IMP_ID FROM PARTICIPANTS WHERE ROUND_ID = ? AND NAME = ? """, (round_id, name,) ) row = self.cursor.fetchone() return row[0] if row else None def get_rows_from_email(self, email:str) -> list[tuple]: self.cursor.execute( """ SELECT * FROM PARTICIPANTS WHERE EMAIL = ? """, (email,) ) return self.cursor.fetchall() def get_dates_with_round_ids_and_round_names_from_email(self, email:str) -> list[list]: self.cursor.execute( """ SELECT rounds.ROUND_ID, rounds.ROUND_NAME, rounds.CREATION_DATE FROM ROUNDS rounds JOIN PARTICIPANTS participants ON participants.ROUND_ID = rounds.ROUND_ID AND participants.email = ? ORDER BY rounds.CREATION_DATE ASC """, (email,) ) return [[row[0], row[1], row[2]] for row in self.cursor.fetchall()] def get_round_id_from_date(self, date:str) -> int: self.cursor.execute( """ SELECT ROUND_ID FROM ROUNDS WHERE CREATION_DATE = ? """, (date,) ) row = self.cursor.fetchone() return row[0] def get_name_from_email_and_round_id(self, email:str, round_id:int) -> str: self.cursor.execute( """ SELECT NAME FROM PARTICIPANTS WHERE EMAIL = ? AND ROUND_ID = ? """, (email, round_id,) ) row = self.cursor.fetchone() return row[0] def get_name_from_email(self, email:str) -> str: self.cursor.execute( """ SELECT NAME FROM PARTICIPANTS WHERE EMAIL = ? """, (email,) ) row = self.cursor.fetchone() return row[0] def get_wish_from_id_and_round_id(self, imp_id:int, round_id:int) -> str: self.cursor.execute( """ SELECT WISH FROM PARTICIPANTS WHERE IMP_ID = ? AND ROUND_ID = ? """, (imp_id, round_id,) ) row = self.cursor.fetchone() return row[0] def get_participants_from_round_id(self, round_id:int) -> list[str]: self.cursor.execute( """ SELECT NAME FROM PARTICIPANTS WHERE ROUND_ID = ? """, (round_id,) ) rows = self.cursor.fetchall() return [row[0] for row in rows] def get_round_id_from_email(self, email:str) -> int: self.cursor.execute( """ SELECT rounds.ROUND_ID FROM ROUNDS rounds JOIN PARTICIPANTS participants ON participants.ROUND_ID = rounds.ROUND_ID WHERE lower(participants.EMAIL) = lower(?) ORDER BY rounds.CREATION_DATE DESC LIMIT 1 """, (email,) ) row = self.cursor.fetchone() if not row: raise ValueError("Keine Runde für diese Email gefunden.") return row[0] def get_pairs(self, round_id:int) -> dict: pairs = {} self.cursor.execute( """ SELECT giver.NAME AS giver_name, receiver.NAME AS receiver_name FROM PAIRS p JOIN PARTICIPANTS giver ON giver.IMP_ID = p.GIVER_IMP_ID AND giver.ROUND_ID = p.ROUND_ID JOIN PARTICIPANTS receiver ON receiver.IMP_ID = p.RECEIVER_IMP_ID AND receiver.ROUND_ID = p.ROUND_ID WHERE p.ROUND_ID = ? """, (round_id,) ) rows = self.cursor.fetchall() for row in rows: giver_name = row[0] receiver_name = row[1] pairs[giver_name] = receiver_name return pairs def delete_round(self, round_id: int): self.cursor.execute("DELETE FROM PAIRS WHERE ROUND_ID = ?", (round_id,)) self.cursor.execute("DELETE FROM PARTICIPANTS WHERE ROUND_ID = ?", (round_id,)) self.cursor.execute("DELETE FROM ROUNDS WHERE ROUND_ID = ?", (round_id,)) def commit_to_db(self): try: self.connection.commit() except Exception: self.connection.rollback() raise def debug_print_table(self, table: str, limit: int = 50): self.cursor.execute(f"SELECT * FROM {table} LIMIT ?", (limit,)) rows = self.cursor.fetchall() print(f"--- {table} ({len(rows)} rows) ---") for r in rows: print(r) if __name__ == "__main__": db_test = SecretSantaDB() #db_test.flush_tables() #db_test.create_tables() #db_test.debug_print_table("CYCLES") db_test.debug_print_table("ROUNDS") db_test.debug_print_table("PARTICIPANTS") db_test.debug_print_table("PAIRS")