2026-01-09 16:53:37 +01:00

355 lines
11 KiB
Python

import datetime
import sqlite3
from datetime import datetime
class SecretSantaDB:
def __init__(self):
self.connection = sqlite3.connect("secret_santa_db.db", check_same_thread=False)
self.connection.execute("PRAGMA foreign_keys = ON;")
self.cursor = self.connection.cursor()
self.create_tables()
def create_tables(self):
self.cursor.executescript(
"""
CREATE TABLE IF NOT EXISTS CYCLES
(
CYCLE_ID INTEGER PRIMARY KEY,
CONSTELLATION_KEY TEXT NOT NULL,
START_DATE TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS ROUNDS
(
ROUND_ID INTEGER PRIMARY KEY,
CYCLE_ID INTEGER NOT NULL,
ROUND_NAME TEXT,
CONSTELLATION_KEY TEXT NOT NULL,
CREATION_DATE TEXT NOT NULL,
FOREIGN KEY (CYCLE_ID) REFERENCES CYCLES (CYCLE_ID)
);
CREATE TABLE IF NOT EXISTS PARTICIPANTS
(
IMP_ID INTEGER PRIMARY KEY,
ROUND_ID INTEGER NOT NULL,
NAME TEXT NOT NULL,
WISH TEXT,
EMAIL TEXT,
FOREIGN KEY (ROUND_ID) REFERENCES ROUNDS (ROUND_ID),
UNIQUE (ROUND_ID, EMAIL)
);
CREATE TABLE IF NOT EXISTS PAIRS
(
ROUND_ID INTEGER NOT NULL,
GIVER_IMP_ID INTEGER NOT NULL,
RECEIVER_IMP_ID INTEGER NOT NULL,
FOREIGN KEY (ROUND_ID) REFERENCES ROUNDS (ROUND_ID),
FOREIGN KEY (GIVER_IMP_ID) REFERENCES PARTICIPANTS (IMP_ID),
FOREIGN KEY (RECEIVER_IMP_ID) REFERENCES PARTICIPANTS (IMP_ID),
UNIQUE (ROUND_ID, GIVER_IMP_ID),
UNIQUE (ROUND_ID, RECEIVER_IMP_ID),
CHECK (GIVER_IMP_ID != RECEIVER_IMP_ID)
);
"""
)
self.commit_to_db()
def flush_tables(self):
self.connection.execute("PRAGMA foreign_keys = OFF;")
self.cursor.executescript(
"""
DROP TABLE ROUNDS;
DROP TABLE PARTICIPANTS;
DROP TABLE PAIRS;
DROP TABLE CYCLES;
"""
)
self.commit_to_db()
self.connection.execute("PRAGMA foreign_keys = ON;")
def add_new_cycle(self, constellation_key: str) -> int:
now = datetime.now().isoformat(timespec="seconds")
self.cursor.execute(
"""
INSERT INTO CYCLES (CONSTELLATION_KEY, START_DATE)
VALUES (?, ?)
""",
(constellation_key, now,)
)
return self.cursor.lastrowid
def get_latest_cycle_id(self, constellation_key: str) -> int | None:
self.cursor.execute(
"""
SELECT CYCLE_ID
FROM CYCLES
WHERE CONSTELLATION_KEY = ?
ORDER BY CYCLE_ID DESC
LIMIT 1
""",
(constellation_key,)
)
row = self.cursor.fetchone()
return row[0] if row else None
def count_rounds_in_cycle(self, cycle_id: int) -> int:
self.cursor.execute(
"""
SELECT COUNT(*)
FROM ROUNDS
WHERE CYCLE_ID = ?
""",
(cycle_id,)
)
return self.cursor.fetchone()[0]
def pair_used_in_cycle(self, cycle_id: int, giver_email: str, receiver_email: str) -> bool:
self.cursor.execute(
"""
SELECT 1
FROM PAIRS p
JOIN ROUNDS r ON r.ROUND_ID = p.ROUND_ID
JOIN PARTICIPANTS g ON g.IMP_ID = p.GIVER_IMP_ID
JOIN PARTICIPANTS rec ON rec.IMP_ID = p.RECEIVER_IMP_ID
WHERE r.CYCLE_ID = ?
AND lower(g.EMAIL) = lower(?)
AND lower(rec.EMAIL) = lower(?)
LIMIT 1
""",
(cycle_id, giver_email, receiver_email)
)
return self.cursor.fetchone() is not None
def add_participants(self, round_id:int, imps: dict):
for name, data in imps.items():
wish = data["wish"]
email = data["email"]
self.add_new_participant(round_id, name, wish, email)
def add_new_round(self,round_name:str, constellation_key: str, cycle_id: int) -> tuple[int, str]:
created_at = datetime.now().isoformat(timespec="seconds")
self.cursor.execute(
"""
INSERT INTO ROUNDS (CREATION_DATE, CONSTELLATION_KEY, CYCLE_ID, ROUND_NAME)
VALUES (?, ?, ?, ?)
""",
(created_at, constellation_key, cycle_id, round_name,)
)
return self.cursor.lastrowid, created_at
def add_new_participant(self, round_id:int, name:str, wish: str | None, email: str) -> int:
self.cursor.execute(
"""
INSERT INTO PARTICIPANTS (ROUND_ID, NAME, WISH, EMAIL)
VALUES (?, ?, ?, ?)
""",
(round_id, name, wish, email,)
)
return self.cursor.lastrowid
def add_new_pair(self, round_id:int, giver_id:int, receiver_id:int):
self.cursor.execute(
"""
INSERT INTO PAIRS (ROUND_ID, GIVER_IMP_ID, RECEIVER_IMP_ID)
VALUES (?, ?, ?)
""",
(round_id, giver_id, receiver_id,)
)
def check_constellation(self, imps: dict) -> bool:
prev_id = None
for name, email, wish in imps.items():
self.cursor.execute(
"""
SELECT ROUND_ID
FROM PARTICIPANTS
WHERE EMAIL = ?
""",
(imps[email])
)
row = self.cursor.fetchone()
current_id = row[0]
if prev_id is None:
prev_id = current_id
if current_id != prev_id:
return False
else:
prev_id = current_id
return True
def get_imp_id_from_name(self, round_id, name:str) -> int | None:
self.cursor.execute(
"""
SELECT IMP_ID
FROM PARTICIPANTS
WHERE ROUND_ID = ? AND NAME = ?
""",
(round_id, name,)
)
row = self.cursor.fetchone()
return row[0] if row else None
def get_rows_from_email(self, email:str) -> list[tuple]:
self.cursor.execute(
"""
SELECT *
FROM PARTICIPANTS
WHERE EMAIL = ?
""",
(email,)
)
return self.cursor.fetchall()
def get_dates_with_round_ids_and_round_names_from_email(self, email:str) -> list[list]:
self.cursor.execute(
"""
SELECT rounds.ROUND_ID, rounds.ROUND_NAME, rounds.CREATION_DATE
FROM ROUNDS rounds
JOIN PARTICIPANTS participants ON participants.ROUND_ID = rounds.ROUND_ID AND participants.email = ?
ORDER BY rounds.CREATION_DATE ASC
""",
(email,)
)
return [[row[0], row[1], row[2]] for row in self.cursor.fetchall()]
def get_round_id_from_date(self, date:str) -> int:
self.cursor.execute(
"""
SELECT ROUND_ID
FROM ROUNDS
WHERE CREATION_DATE = ?
""",
(date,)
)
row = self.cursor.fetchone()
return row[0]
def get_name_from_email_and_round_id(self, email:str, round_id:int) -> str:
self.cursor.execute(
"""
SELECT NAME
FROM PARTICIPANTS
WHERE EMAIL = ? AND ROUND_ID = ?
""",
(email, round_id,)
)
row = self.cursor.fetchone()
return row[0]
def get_name_from_email(self, email:str) -> str:
self.cursor.execute(
"""
SELECT NAME
FROM PARTICIPANTS
WHERE EMAIL = ?
""",
(email,)
)
row = self.cursor.fetchone()
return row[0]
def get_wish_from_id_and_round_id(self, imp_id:int, round_id:int) -> str:
self.cursor.execute(
"""
SELECT WISH
FROM PARTICIPANTS
WHERE IMP_ID = ? AND ROUND_ID = ?
""",
(imp_id, round_id,)
)
row = self.cursor.fetchone()
return row[0]
def get_participants_from_round_id(self, round_id:int) -> list[str]:
self.cursor.execute(
"""
SELECT NAME
FROM PARTICIPANTS
WHERE ROUND_ID = ?
""",
(round_id,)
)
rows = self.cursor.fetchall()
return [row[0] for row in rows]
def get_round_id_from_email(self, email:str) -> int:
self.cursor.execute(
"""
SELECT rounds.ROUND_ID
FROM ROUNDS rounds
JOIN PARTICIPANTS participants ON participants.ROUND_ID = rounds.ROUND_ID
WHERE lower(participants.EMAIL) = lower(?)
ORDER BY rounds.CREATION_DATE DESC
LIMIT 1
""",
(email,)
)
row = self.cursor.fetchone()
if not row:
raise ValueError("Keine Runde für diese Email gefunden.")
return row[0]
def get_pairs(self, round_id:int) -> dict:
pairs = {}
self.cursor.execute(
"""
SELECT giver.NAME AS giver_name,
receiver.NAME AS receiver_name
FROM PAIRS p
JOIN PARTICIPANTS giver
ON giver.IMP_ID = p.GIVER_IMP_ID AND giver.ROUND_ID = p.ROUND_ID
JOIN PARTICIPANTS receiver
ON receiver.IMP_ID = p.RECEIVER_IMP_ID AND receiver.ROUND_ID = p.ROUND_ID
WHERE p.ROUND_ID = ?
""",
(round_id,)
)
rows = self.cursor.fetchall()
for row in rows:
giver_name = row[0]
receiver_name = row[1]
pairs[giver_name] = receiver_name
return pairs
def delete_round(self, round_id: int):
self.cursor.execute("DELETE FROM PAIRS WHERE ROUND_ID = ?", (round_id,))
self.cursor.execute("DELETE FROM PARTICIPANTS WHERE ROUND_ID = ?", (round_id,))
self.cursor.execute("DELETE FROM ROUNDS WHERE ROUND_ID = ?", (round_id,))
def commit_to_db(self):
try:
self.connection.commit()
except Exception:
self.connection.rollback()
raise
def debug_print_table(self, table: str, limit: int = 50):
self.cursor.execute(f"SELECT * FROM {table} LIMIT ?", (limit,))
rows = self.cursor.fetchall()
print(f"--- {table} ({len(rows)} rows) ---")
for r in rows:
print(r)
if __name__ == "__main__":
db_test = SecretSantaDB()
#db_test.flush_tables()
#db_test.create_tables()
#db_test.debug_print_table("CYCLES")
db_test.debug_print_table("ROUNDS")
db_test.debug_print_table("PARTICIPANTS")
db_test.debug_print_table("PAIRS")