355 lines
11 KiB
Python
355 lines
11 KiB
Python
import datetime
|
|
import sqlite3
|
|
from datetime import datetime
|
|
|
|
class SecretSantaDB:
|
|
|
|
def __init__(self):
|
|
self.connection = sqlite3.connect("secret_santa_db.db", check_same_thread=False)
|
|
self.connection.execute("PRAGMA foreign_keys = ON;")
|
|
self.cursor = self.connection.cursor()
|
|
self.create_tables()
|
|
|
|
|
|
def create_tables(self):
|
|
self.cursor.executescript(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS CYCLES
|
|
(
|
|
CYCLE_ID INTEGER PRIMARY KEY,
|
|
CONSTELLATION_KEY TEXT NOT NULL,
|
|
START_DATE TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS ROUNDS
|
|
(
|
|
ROUND_ID INTEGER PRIMARY KEY,
|
|
CYCLE_ID INTEGER NOT NULL,
|
|
ROUND_NAME TEXT,
|
|
CONSTELLATION_KEY TEXT NOT NULL,
|
|
CREATION_DATE TEXT NOT NULL,
|
|
FOREIGN KEY (CYCLE_ID) REFERENCES CYCLES (CYCLE_ID)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS PARTICIPANTS
|
|
(
|
|
IMP_ID INTEGER PRIMARY KEY,
|
|
ROUND_ID INTEGER NOT NULL,
|
|
NAME TEXT NOT NULL,
|
|
WISH TEXT,
|
|
EMAIL TEXT,
|
|
FOREIGN KEY (ROUND_ID) REFERENCES ROUNDS (ROUND_ID),
|
|
UNIQUE (ROUND_ID, EMAIL)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS PAIRS
|
|
(
|
|
ROUND_ID INTEGER NOT NULL,
|
|
GIVER_IMP_ID INTEGER NOT NULL,
|
|
RECEIVER_IMP_ID INTEGER NOT NULL,
|
|
FOREIGN KEY (ROUND_ID) REFERENCES ROUNDS (ROUND_ID),
|
|
FOREIGN KEY (GIVER_IMP_ID) REFERENCES PARTICIPANTS (IMP_ID),
|
|
FOREIGN KEY (RECEIVER_IMP_ID) REFERENCES PARTICIPANTS (IMP_ID),
|
|
UNIQUE (ROUND_ID, GIVER_IMP_ID),
|
|
UNIQUE (ROUND_ID, RECEIVER_IMP_ID),
|
|
CHECK (GIVER_IMP_ID != RECEIVER_IMP_ID)
|
|
);
|
|
|
|
"""
|
|
)
|
|
self.commit_to_db()
|
|
|
|
def flush_tables(self):
|
|
self.connection.execute("PRAGMA foreign_keys = OFF;")
|
|
self.cursor.executescript(
|
|
"""
|
|
DROP TABLE ROUNDS;
|
|
DROP TABLE PARTICIPANTS;
|
|
DROP TABLE PAIRS;
|
|
DROP TABLE CYCLES;
|
|
"""
|
|
)
|
|
self.commit_to_db()
|
|
self.connection.execute("PRAGMA foreign_keys = ON;")
|
|
|
|
def add_new_cycle(self, constellation_key: str) -> int:
|
|
now = datetime.now().isoformat(timespec="seconds")
|
|
self.cursor.execute(
|
|
"""
|
|
INSERT INTO CYCLES (CONSTELLATION_KEY, START_DATE)
|
|
VALUES (?, ?)
|
|
""",
|
|
(constellation_key, now,)
|
|
)
|
|
return self.cursor.lastrowid
|
|
|
|
def get_latest_cycle_id(self, constellation_key: str) -> int | None:
|
|
self.cursor.execute(
|
|
"""
|
|
SELECT CYCLE_ID
|
|
FROM CYCLES
|
|
WHERE CONSTELLATION_KEY = ?
|
|
ORDER BY CYCLE_ID DESC
|
|
LIMIT 1
|
|
""",
|
|
(constellation_key,)
|
|
)
|
|
row = self.cursor.fetchone()
|
|
return row[0] if row else None
|
|
|
|
def count_rounds_in_cycle(self, cycle_id: int) -> int:
|
|
self.cursor.execute(
|
|
"""
|
|
SELECT COUNT(*)
|
|
FROM ROUNDS
|
|
WHERE CYCLE_ID = ?
|
|
""",
|
|
(cycle_id,)
|
|
)
|
|
return self.cursor.fetchone()[0]
|
|
|
|
def pair_used_in_cycle(self, cycle_id: int, giver_email: str, receiver_email: str) -> bool:
|
|
self.cursor.execute(
|
|
"""
|
|
SELECT 1
|
|
FROM PAIRS p
|
|
JOIN ROUNDS r ON r.ROUND_ID = p.ROUND_ID
|
|
JOIN PARTICIPANTS g ON g.IMP_ID = p.GIVER_IMP_ID
|
|
JOIN PARTICIPANTS rec ON rec.IMP_ID = p.RECEIVER_IMP_ID
|
|
WHERE r.CYCLE_ID = ?
|
|
AND lower(g.EMAIL) = lower(?)
|
|
AND lower(rec.EMAIL) = lower(?)
|
|
LIMIT 1
|
|
""",
|
|
(cycle_id, giver_email, receiver_email)
|
|
)
|
|
return self.cursor.fetchone() is not None
|
|
|
|
def add_participants(self, round_id:int, imps: dict):
|
|
for name, data in imps.items():
|
|
wish = data["wish"]
|
|
email = data["email"]
|
|
self.add_new_participant(round_id, name, wish, email)
|
|
|
|
def add_new_round(self,round_name:str, constellation_key: str, cycle_id: int) -> tuple[int, str]:
|
|
created_at = datetime.now().isoformat(timespec="seconds")
|
|
self.cursor.execute(
|
|
"""
|
|
INSERT INTO ROUNDS (CREATION_DATE, CONSTELLATION_KEY, CYCLE_ID, ROUND_NAME)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
(created_at, constellation_key, cycle_id, round_name,)
|
|
)
|
|
return self.cursor.lastrowid, created_at
|
|
|
|
def add_new_participant(self, round_id:int, name:str, wish: str | None, email: str) -> int:
|
|
self.cursor.execute(
|
|
"""
|
|
INSERT INTO PARTICIPANTS (ROUND_ID, NAME, WISH, EMAIL)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
(round_id, name, wish, email,)
|
|
)
|
|
return self.cursor.lastrowid
|
|
|
|
|
|
def add_new_pair(self, round_id:int, giver_id:int, receiver_id:int):
|
|
self.cursor.execute(
|
|
"""
|
|
INSERT INTO PAIRS (ROUND_ID, GIVER_IMP_ID, RECEIVER_IMP_ID)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
(round_id, giver_id, receiver_id,)
|
|
)
|
|
|
|
def check_constellation(self, imps: dict) -> bool:
|
|
prev_id = None
|
|
for name, email, wish in imps.items():
|
|
self.cursor.execute(
|
|
"""
|
|
SELECT ROUND_ID
|
|
FROM PARTICIPANTS
|
|
WHERE EMAIL = ?
|
|
""",
|
|
(imps[email])
|
|
)
|
|
row = self.cursor.fetchone()
|
|
current_id = row[0]
|
|
if prev_id is None:
|
|
prev_id = current_id
|
|
if current_id != prev_id:
|
|
return False
|
|
else:
|
|
prev_id = current_id
|
|
return True
|
|
|
|
|
|
def get_imp_id_from_name(self, round_id, name:str) -> int | None:
|
|
self.cursor.execute(
|
|
"""
|
|
SELECT IMP_ID
|
|
FROM PARTICIPANTS
|
|
WHERE ROUND_ID = ? AND NAME = ?
|
|
""",
|
|
(round_id, name,)
|
|
)
|
|
|
|
row = self.cursor.fetchone()
|
|
return row[0] if row else None
|
|
|
|
def get_rows_from_email(self, email:str) -> list[tuple]:
|
|
self.cursor.execute(
|
|
"""
|
|
SELECT *
|
|
FROM PARTICIPANTS
|
|
WHERE EMAIL = ?
|
|
""",
|
|
(email,)
|
|
)
|
|
return self.cursor.fetchall()
|
|
|
|
def get_dates_with_round_ids_and_round_names_from_email(self, email:str) -> list[list]:
|
|
self.cursor.execute(
|
|
"""
|
|
SELECT rounds.ROUND_ID, rounds.ROUND_NAME, rounds.CREATION_DATE
|
|
FROM ROUNDS rounds
|
|
JOIN PARTICIPANTS participants ON participants.ROUND_ID = rounds.ROUND_ID AND participants.email = ?
|
|
ORDER BY rounds.CREATION_DATE ASC
|
|
""",
|
|
(email,)
|
|
)
|
|
|
|
return [[row[0], row[1], row[2]] for row in self.cursor.fetchall()]
|
|
|
|
def get_round_id_from_date(self, date:str) -> int:
|
|
self.cursor.execute(
|
|
"""
|
|
SELECT ROUND_ID
|
|
FROM ROUNDS
|
|
WHERE CREATION_DATE = ?
|
|
""",
|
|
(date,)
|
|
)
|
|
row = self.cursor.fetchone()
|
|
return row[0]
|
|
|
|
def get_name_from_email_and_round_id(self, email:str, round_id:int) -> str:
|
|
self.cursor.execute(
|
|
"""
|
|
SELECT NAME
|
|
FROM PARTICIPANTS
|
|
WHERE EMAIL = ? AND ROUND_ID = ?
|
|
""",
|
|
(email, round_id,)
|
|
)
|
|
row = self.cursor.fetchone()
|
|
return row[0]
|
|
|
|
def get_name_from_email(self, email:str) -> str:
|
|
self.cursor.execute(
|
|
"""
|
|
SELECT NAME
|
|
FROM PARTICIPANTS
|
|
WHERE EMAIL = ?
|
|
""",
|
|
(email,)
|
|
)
|
|
row = self.cursor.fetchone()
|
|
return row[0]
|
|
|
|
def get_wish_from_id_and_round_id(self, imp_id:int, round_id:int) -> str:
|
|
self.cursor.execute(
|
|
"""
|
|
SELECT WISH
|
|
FROM PARTICIPANTS
|
|
WHERE IMP_ID = ? AND ROUND_ID = ?
|
|
""",
|
|
(imp_id, round_id,)
|
|
)
|
|
row = self.cursor.fetchone()
|
|
return row[0]
|
|
|
|
|
|
def get_participants_from_round_id(self, round_id:int) -> list[str]:
|
|
self.cursor.execute(
|
|
"""
|
|
SELECT NAME
|
|
FROM PARTICIPANTS
|
|
WHERE ROUND_ID = ?
|
|
""",
|
|
(round_id,)
|
|
)
|
|
rows = self.cursor.fetchall()
|
|
return [row[0] for row in rows]
|
|
|
|
def get_round_id_from_email(self, email:str) -> int:
|
|
self.cursor.execute(
|
|
"""
|
|
SELECT rounds.ROUND_ID
|
|
FROM ROUNDS rounds
|
|
JOIN PARTICIPANTS participants ON participants.ROUND_ID = rounds.ROUND_ID
|
|
WHERE lower(participants.EMAIL) = lower(?)
|
|
ORDER BY rounds.CREATION_DATE DESC
|
|
LIMIT 1
|
|
""",
|
|
(email,)
|
|
)
|
|
row = self.cursor.fetchone()
|
|
if not row:
|
|
raise ValueError("Keine Runde für diese Email gefunden.")
|
|
return row[0]
|
|
|
|
def get_pairs(self, round_id:int) -> dict:
|
|
pairs = {}
|
|
self.cursor.execute(
|
|
"""
|
|
SELECT giver.NAME AS giver_name,
|
|
receiver.NAME AS receiver_name
|
|
FROM PAIRS p
|
|
JOIN PARTICIPANTS giver
|
|
ON giver.IMP_ID = p.GIVER_IMP_ID AND giver.ROUND_ID = p.ROUND_ID
|
|
JOIN PARTICIPANTS receiver
|
|
ON receiver.IMP_ID = p.RECEIVER_IMP_ID AND receiver.ROUND_ID = p.ROUND_ID
|
|
WHERE p.ROUND_ID = ?
|
|
""",
|
|
(round_id,)
|
|
)
|
|
rows = self.cursor.fetchall()
|
|
for row in rows:
|
|
giver_name = row[0]
|
|
receiver_name = row[1]
|
|
pairs[giver_name] = receiver_name
|
|
|
|
return pairs
|
|
|
|
def delete_round(self, round_id: int):
|
|
self.cursor.execute("DELETE FROM PAIRS WHERE ROUND_ID = ?", (round_id,))
|
|
self.cursor.execute("DELETE FROM PARTICIPANTS WHERE ROUND_ID = ?", (round_id,))
|
|
self.cursor.execute("DELETE FROM ROUNDS WHERE ROUND_ID = ?", (round_id,))
|
|
|
|
def commit_to_db(self):
|
|
try:
|
|
self.connection.commit()
|
|
except Exception:
|
|
self.connection.rollback()
|
|
raise
|
|
|
|
|
|
def debug_print_table(self, table: str, limit: int = 50):
|
|
self.cursor.execute(f"SELECT * FROM {table} LIMIT ?", (limit,))
|
|
rows = self.cursor.fetchall()
|
|
print(f"--- {table} ({len(rows)} rows) ---")
|
|
for r in rows:
|
|
print(r)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
db_test = SecretSantaDB()
|
|
#db_test.flush_tables()
|
|
#db_test.create_tables()
|
|
#db_test.debug_print_table("CYCLES")
|
|
db_test.debug_print_table("ROUNDS")
|
|
db_test.debug_print_table("PARTICIPANTS")
|
|
db_test.debug_print_table("PAIRS")
|
|
|