commit b926f604a9c412cb22f159523c02c03e4659fed5 Author: Legaeli Date: Fri Jan 9 16:53:37 2026 +0100 Backend v1 (Prototype) diff --git a/api.py b/api.py new file mode 100644 index 0000000..b1930a5 --- /dev/null +++ b/api.py @@ -0,0 +1,131 @@ +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel, EmailStr +from typing import List, Optional, Any + +from db import SecretSantaDB +from generator import SecretSantaGenerator + +app = FastAPI() +database = SecretSantaDB() + +app.add_middleware( + CORSMiddleware, + allow_origins=[ + "http://localhost:5173", + "http://127.0.0.1:5173", + "http://localhost:5500", + "http://127.0.0.1:5500", + "http://localhost:3000", + "http://127.0.0.1:3000", + ], + allow_credentials=True, + allow_methods=["*"], +) + +class Participant(BaseModel): + name: str + email: EmailStr + wish: str | None = None + +class CreateRoundRequest(BaseModel): + round_name: Optional[str] + participants: List[Participant] + +class CreateRoundResponse(BaseModel): + round_id: int + created_at: str + +class RevealRequest(BaseModel): + email: EmailStr + choice_index: Optional[int] = None + +class RoundChoice(BaseModel): + index: int + round_name: str + created_at: str + participants: list[str] + +class RevealResponse(BaseModel): + giver: Optional[str] = None + receiver: Optional[str] = None + wish: Optional[str] = None + choices: Optional[List[RoundChoice]] = None + + +@app.post("/api/rounds", response_model=CreateRoundResponse) +def create_round(req: CreateRoundRequest): + participants_list = req.participants + round_name = req.round_name + if len(participants_list) < 3: + raise HTTPException(status_code=400, detail="Mindestens 3 Teilnehmer nötig.") + + participants = { + participant.name: {"email": str(participant.email), "wish": participant.wish} + for participant in participants_list + } + + generator = SecretSantaGenerator(imp_size=len(participants_list), db=database) + round_id, created_at = generator.create_new_round(round_name, participants) + + return { + "round_id": round_id, + "created_at": created_at + } + + + +@app.post("/api/reveal", response_model=RevealResponse) +def reveal(req: RevealRequest): + email = str(req.email).strip().lower() + + rows = database.get_rows_from_email(email) + appearances = len(rows) + + if appearances > 1: + choices = get_choices(email) + + response_choice = req.choice_index + if response_choice is None: + public_choices = [ + RoundChoice(index=i, created_at=choice["date"], participants=choice.get("participants", []), round_name=choice["round_name"]) + for i, choice in choices.items() + ] + return RevealResponse(choices=public_choices) + + if response_choice not in choices: + raise HTTPException(status_code=400, detail="Ungültige Auswahl.") + + round_id = choices[response_choice]["round_id"] + + + giver = database.get_name_from_email_and_round_id(email, round_id) + imp_pairs = database.get_pairs(round_id) + + else: + giver = database.get_name_from_email(email) + try: + round_id = database.get_round_id_from_email(email) + imp_pairs = database.get_pairs(round_id) + except Exception: + raise RuntimeError("Bitte erst eine Runde erstellen...") + + if giver and imp_pairs: + try: + receiver = imp_pairs[giver] + receiver_id = database.get_imp_id_from_name(round_id, receiver) + wish = database.get_wish_from_id_and_round_id(receiver_id, round_id) + return {"giver": giver, "receiver": receiver, "wish": wish} + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Etwas ist schiefgelaufen: {e}") + + +def get_choices(email) -> dict[int, dict[str, list[str] | Any]]: + choices = {} + rows = database.get_dates_with_round_ids_and_round_names_from_email(email) + for i, row in enumerate(rows, start=1): + round_id = row[0] + participants = database.get_participants_from_round_id(round_id) + choices[i] = {"round_id": row[0], "round_name": row[1], "date": row[2], "participants": participants} + return choices \ No newline at end of file diff --git a/db.py b/db.py new file mode 100644 index 0000000..5079b56 --- /dev/null +++ b/db.py @@ -0,0 +1,354 @@ +import datetime +import sqlite3 +from datetime import datetime + +class SecretSantaDB: + + def __init__(self): + self.connection = sqlite3.connect("secret_santa_db.db", check_same_thread=False) + self.connection.execute("PRAGMA foreign_keys = ON;") + self.cursor = self.connection.cursor() + self.create_tables() + + + def create_tables(self): + self.cursor.executescript( + """ + CREATE TABLE IF NOT EXISTS CYCLES + ( + CYCLE_ID INTEGER PRIMARY KEY, + CONSTELLATION_KEY TEXT NOT NULL, + START_DATE TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS ROUNDS + ( + ROUND_ID INTEGER PRIMARY KEY, + CYCLE_ID INTEGER NOT NULL, + ROUND_NAME TEXT, + CONSTELLATION_KEY TEXT NOT NULL, + CREATION_DATE TEXT NOT NULL, + FOREIGN KEY (CYCLE_ID) REFERENCES CYCLES (CYCLE_ID) + ); + + CREATE TABLE IF NOT EXISTS PARTICIPANTS + ( + IMP_ID INTEGER PRIMARY KEY, + ROUND_ID INTEGER NOT NULL, + NAME TEXT NOT NULL, + WISH TEXT, + EMAIL TEXT, + FOREIGN KEY (ROUND_ID) REFERENCES ROUNDS (ROUND_ID), + UNIQUE (ROUND_ID, EMAIL) + ); + + CREATE TABLE IF NOT EXISTS PAIRS + ( + ROUND_ID INTEGER NOT NULL, + GIVER_IMP_ID INTEGER NOT NULL, + RECEIVER_IMP_ID INTEGER NOT NULL, + FOREIGN KEY (ROUND_ID) REFERENCES ROUNDS (ROUND_ID), + FOREIGN KEY (GIVER_IMP_ID) REFERENCES PARTICIPANTS (IMP_ID), + FOREIGN KEY (RECEIVER_IMP_ID) REFERENCES PARTICIPANTS (IMP_ID), + UNIQUE (ROUND_ID, GIVER_IMP_ID), + UNIQUE (ROUND_ID, RECEIVER_IMP_ID), + CHECK (GIVER_IMP_ID != RECEIVER_IMP_ID) + ); + + """ + ) + self.commit_to_db() + + def flush_tables(self): + self.connection.execute("PRAGMA foreign_keys = OFF;") + self.cursor.executescript( + """ + DROP TABLE ROUNDS; + DROP TABLE PARTICIPANTS; + DROP TABLE PAIRS; + DROP TABLE CYCLES; + """ + ) + self.commit_to_db() + self.connection.execute("PRAGMA foreign_keys = ON;") + + def add_new_cycle(self, constellation_key: str) -> int: + now = datetime.now().isoformat(timespec="seconds") + self.cursor.execute( + """ + INSERT INTO CYCLES (CONSTELLATION_KEY, START_DATE) + VALUES (?, ?) + """, + (constellation_key, now,) + ) + return self.cursor.lastrowid + + def get_latest_cycle_id(self, constellation_key: str) -> int | None: + self.cursor.execute( + """ + SELECT CYCLE_ID + FROM CYCLES + WHERE CONSTELLATION_KEY = ? + ORDER BY CYCLE_ID DESC + LIMIT 1 + """, + (constellation_key,) + ) + row = self.cursor.fetchone() + return row[0] if row else None + + def count_rounds_in_cycle(self, cycle_id: int) -> int: + self.cursor.execute( + """ + SELECT COUNT(*) + FROM ROUNDS + WHERE CYCLE_ID = ? + """, + (cycle_id,) + ) + return self.cursor.fetchone()[0] + + def pair_used_in_cycle(self, cycle_id: int, giver_email: str, receiver_email: str) -> bool: + self.cursor.execute( + """ + SELECT 1 + FROM PAIRS p + JOIN ROUNDS r ON r.ROUND_ID = p.ROUND_ID + JOIN PARTICIPANTS g ON g.IMP_ID = p.GIVER_IMP_ID + JOIN PARTICIPANTS rec ON rec.IMP_ID = p.RECEIVER_IMP_ID + WHERE r.CYCLE_ID = ? + AND lower(g.EMAIL) = lower(?) + AND lower(rec.EMAIL) = lower(?) + LIMIT 1 + """, + (cycle_id, giver_email, receiver_email) + ) + return self.cursor.fetchone() is not None + + def add_participants(self, round_id:int, imps: dict): + for name, data in imps.items(): + wish = data["wish"] + email = data["email"] + self.add_new_participant(round_id, name, wish, email) + + def add_new_round(self,round_name:str, constellation_key: str, cycle_id: int) -> tuple[int, str]: + created_at = datetime.now().isoformat(timespec="seconds") + self.cursor.execute( + """ + INSERT INTO ROUNDS (CREATION_DATE, CONSTELLATION_KEY, CYCLE_ID, ROUND_NAME) + VALUES (?, ?, ?, ?) + """, + (created_at, constellation_key, cycle_id, round_name,) + ) + return self.cursor.lastrowid, created_at + + def add_new_participant(self, round_id:int, name:str, wish: str | None, email: str) -> int: + self.cursor.execute( + """ + INSERT INTO PARTICIPANTS (ROUND_ID, NAME, WISH, EMAIL) + VALUES (?, ?, ?, ?) + """, + (round_id, name, wish, email,) + ) + return self.cursor.lastrowid + + + def add_new_pair(self, round_id:int, giver_id:int, receiver_id:int): + self.cursor.execute( + """ + INSERT INTO PAIRS (ROUND_ID, GIVER_IMP_ID, RECEIVER_IMP_ID) + VALUES (?, ?, ?) + """, + (round_id, giver_id, receiver_id,) + ) + + def check_constellation(self, imps: dict) -> bool: + prev_id = None + for name, email, wish in imps.items(): + self.cursor.execute( + """ + SELECT ROUND_ID + FROM PARTICIPANTS + WHERE EMAIL = ? + """, + (imps[email]) + ) + row = self.cursor.fetchone() + current_id = row[0] + if prev_id is None: + prev_id = current_id + if current_id != prev_id: + return False + else: + prev_id = current_id + return True + + + def get_imp_id_from_name(self, round_id, name:str) -> int | None: + self.cursor.execute( + """ + SELECT IMP_ID + FROM PARTICIPANTS + WHERE ROUND_ID = ? AND NAME = ? + """, + (round_id, name,) + ) + + row = self.cursor.fetchone() + return row[0] if row else None + + def get_rows_from_email(self, email:str) -> list[tuple]: + self.cursor.execute( + """ + SELECT * + FROM PARTICIPANTS + WHERE EMAIL = ? + """, + (email,) + ) + return self.cursor.fetchall() + + def get_dates_with_round_ids_and_round_names_from_email(self, email:str) -> list[list]: + self.cursor.execute( + """ + SELECT rounds.ROUND_ID, rounds.ROUND_NAME, rounds.CREATION_DATE + FROM ROUNDS rounds + JOIN PARTICIPANTS participants ON participants.ROUND_ID = rounds.ROUND_ID AND participants.email = ? + ORDER BY rounds.CREATION_DATE ASC + """, + (email,) + ) + + return [[row[0], row[1], row[2]] for row in self.cursor.fetchall()] + + def get_round_id_from_date(self, date:str) -> int: + self.cursor.execute( + """ + SELECT ROUND_ID + FROM ROUNDS + WHERE CREATION_DATE = ? + """, + (date,) + ) + row = self.cursor.fetchone() + return row[0] + + def get_name_from_email_and_round_id(self, email:str, round_id:int) -> str: + self.cursor.execute( + """ + SELECT NAME + FROM PARTICIPANTS + WHERE EMAIL = ? AND ROUND_ID = ? + """, + (email, round_id,) + ) + row = self.cursor.fetchone() + return row[0] + + def get_name_from_email(self, email:str) -> str: + self.cursor.execute( + """ + SELECT NAME + FROM PARTICIPANTS + WHERE EMAIL = ? + """, + (email,) + ) + row = self.cursor.fetchone() + return row[0] + + def get_wish_from_id_and_round_id(self, imp_id:int, round_id:int) -> str: + self.cursor.execute( + """ + SELECT WISH + FROM PARTICIPANTS + WHERE IMP_ID = ? AND ROUND_ID = ? + """, + (imp_id, round_id,) + ) + row = self.cursor.fetchone() + return row[0] + + + def get_participants_from_round_id(self, round_id:int) -> list[str]: + self.cursor.execute( + """ + SELECT NAME + FROM PARTICIPANTS + WHERE ROUND_ID = ? + """, + (round_id,) + ) + rows = self.cursor.fetchall() + return [row[0] for row in rows] + + def get_round_id_from_email(self, email:str) -> int: + self.cursor.execute( + """ + SELECT rounds.ROUND_ID + FROM ROUNDS rounds + JOIN PARTICIPANTS participants ON participants.ROUND_ID = rounds.ROUND_ID + WHERE lower(participants.EMAIL) = lower(?) + ORDER BY rounds.CREATION_DATE DESC + LIMIT 1 + """, + (email,) + ) + row = self.cursor.fetchone() + if not row: + raise ValueError("Keine Runde für diese Email gefunden.") + return row[0] + + def get_pairs(self, round_id:int) -> dict: + pairs = {} + self.cursor.execute( + """ + SELECT giver.NAME AS giver_name, + receiver.NAME AS receiver_name + FROM PAIRS p + JOIN PARTICIPANTS giver + ON giver.IMP_ID = p.GIVER_IMP_ID AND giver.ROUND_ID = p.ROUND_ID + JOIN PARTICIPANTS receiver + ON receiver.IMP_ID = p.RECEIVER_IMP_ID AND receiver.ROUND_ID = p.ROUND_ID + WHERE p.ROUND_ID = ? + """, + (round_id,) + ) + rows = self.cursor.fetchall() + for row in rows: + giver_name = row[0] + receiver_name = row[1] + pairs[giver_name] = receiver_name + + return pairs + + def delete_round(self, round_id: int): + self.cursor.execute("DELETE FROM PAIRS WHERE ROUND_ID = ?", (round_id,)) + self.cursor.execute("DELETE FROM PARTICIPANTS WHERE ROUND_ID = ?", (round_id,)) + self.cursor.execute("DELETE FROM ROUNDS WHERE ROUND_ID = ?", (round_id,)) + + def commit_to_db(self): + try: + self.connection.commit() + except Exception: + self.connection.rollback() + raise + + + def debug_print_table(self, table: str, limit: int = 50): + self.cursor.execute(f"SELECT * FROM {table} LIMIT ?", (limit,)) + rows = self.cursor.fetchall() + print(f"--- {table} ({len(rows)} rows) ---") + for r in rows: + print(r) + + +if __name__ == "__main__": + db_test = SecretSantaDB() + #db_test.flush_tables() + #db_test.create_tables() + #db_test.debug_print_table("CYCLES") + db_test.debug_print_table("ROUNDS") + db_test.debug_print_table("PARTICIPANTS") + db_test.debug_print_table("PAIRS") + diff --git a/generator.py b/generator.py new file mode 100644 index 0000000..14b2b3d --- /dev/null +++ b/generator.py @@ -0,0 +1,120 @@ +import random +from db import SecretSantaDB +import hashlib + +class SecretSantaGenerator: + + def __init__(self, imp_size: int, db: SecretSantaDB): + self.imp_size = imp_size + self.db = db + + def constellation_key(self, imps: dict) -> str: + emails = sorted([data["email"].strip().lower() for data in imps.values()]) + raw = "|".join(emails) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + def derangements(self, n: int) -> int: + if n == 0: return 1 + if n == 1: return 0 + a, b = 1, 0 + for k in range(2, n + 1): + a, b = b, (k - 1) * (a + b) + return b + + def pair_exists(self, cycle_id: int, imp_pairs: dict, giver: str, receiver: str, imps: dict) -> bool: + if giver == receiver: + return True + if giver in imp_pairs: + return True + if receiver in imp_pairs.values(): + return True + + giver_email = imps[giver]["email"] + receiver_email = imps[receiver]["email"] + + if self.db.pair_used_in_cycle(cycle_id, giver_email, receiver_email): + return True + + return False + + def generate_imp_pairs(self, round_id: int, imps: dict, cycle_id: int): + names = list(imps.keys()) + n = len(names) + + emails = {name: imps[name]["email"] for name in names} + + def is_allowed(giver: str, receiver: str, current: dict) -> bool: + if giver == receiver: + return False + if giver in current: + return False + if receiver in current.values(): + return False + return not self.db.pair_used_in_cycle(cycle_id, emails[giver], emails[receiver]) + + givers = names[:] + + def backtrack(i: int, current: dict) -> dict | None: + if i == n: + return current + + giver = givers[i] + + candidates = names[:] + random.shuffle(candidates) + + for receiver in candidates: + if not is_allowed(giver, receiver, current): + continue + current[giver] = receiver + result = backtrack(i + 1, current) + if result is not None: + return result + del current[giver] + + return None + + pairs = backtrack(0, {}) + if pairs is None: + raise RuntimeError("Keine gültige Zuordnung im aktuellen Cycle möglich. Neuer Cycle nötig.") + + for giver, receiver in pairs.items(): + giver_id = self.db.get_imp_id_from_name(round_id, giver) + receiver_id = self.db.get_imp_id_from_name(round_id, receiver) + self.db.add_new_pair(round_id, giver_id, receiver_id) + + def create_new_round(self,round_name: str, participants: dict[str, dict]) -> tuple[int, str]: + + try: + key = self.constellation_key(participants) + + latest_cycle = self.db.get_latest_cycle_id(key) + + max_permutations = self.derangements(len(participants)) + + if latest_cycle is None: + cycle_id = self.db.add_new_cycle(key) + else: + used_permutations = self.db.count_rounds_in_cycle(latest_cycle) + if used_permutations < max_permutations: + cycle_id = latest_cycle + else: + cycle_id = self.db.add_new_cycle(key) + + round_id, created_at = self.db.add_new_round(round_name, key, cycle_id) + + self.db.add_participants(round_id, participants) + try: + self.generate_imp_pairs(round_id, participants, cycle_id) + except RuntimeError: + cycle_id = self.db.add_new_cycle(key) + round_id, created_at = self.db.add_new_round(round_name, key, cycle_id) + self.db.add_participants(round_id, participants) + self.generate_imp_pairs(round_id, participants, cycle_id) + + self.db.commit_to_db() + return round_id, created_at + + except Exception as e: + self.db.connection.rollback() + raise RuntimeError(f"Fehler beim Erstellen der Runde: {e}") diff --git a/secret_santa_db.db b/secret_santa_db.db new file mode 100644 index 0000000..aad085a Binary files /dev/null and b/secret_santa_db.db differ