From 0e25ba4a3e7e117589ed80272d36056b4afa4984 Mon Sep 17 00:00:00 2001 From: TimoKurz Date: Sun, 15 Feb 2026 16:08:23 +0100 Subject: [PATCH] - added database functionality - added writing of AUs into database in camera_stream - added script to measure ActionUnits and Eyetracking data in one file --- .../camera_handling/camera_stream.py | 36 ++- .../camera_stream_AU_and_ET.py | 296 ++++++++++++++++++ dataset_creation/camera_handling/db_helper.py | 166 ++++++++++ dataset_creation/camera_handling/db_test.py | 54 ++++ 4 files changed, 547 insertions(+), 5 deletions(-) create mode 100644 dataset_creation/camera_handling/camera_stream_AU_and_ET.py create mode 100644 dataset_creation/camera_handling/db_helper.py create mode 100644 dataset_creation/camera_handling/db_test.py diff --git a/dataset_creation/camera_handling/camera_stream.py b/dataset_creation/camera_handling/camera_stream.py index d2416a0..3126258 100644 --- a/dataset_creation/camera_handling/camera_stream.py +++ b/dataset_creation/camera_handling/camera_stream.py @@ -5,8 +5,15 @@ import threading from datetime import datetime from feat import Detector import torch +import pandas as pd + +# Import your helper functions +# from db_helper import connect_db, disconnect_db, insert_rows_into_table, create_table +import db_helper as db + # Konfiguration +DB_PATH = "action_units.db" # TODO CAMERA_INDEX = 0 OUTPUT_DIR = "recordings" VIDEO_DURATION = 50 # Sekunden @@ -39,9 +46,9 @@ def extract_aus(path, skip_frames): return res except Exception as e: print(f"Fehler bei der Summenbildung: {e}") - return 0 + return None -def startAU_creation(video_path): +def startAU_creation(video_path, db_path): """Diese Funktion läuft nun in einem eigenen Thread.""" try: print(f"\n[THREAD START] Analyse läuft für: {video_path}") @@ -51,12 +58,31 @@ def startAU_creation(video_path): print(f"\n--- Ergebnis für {os.path.basename(video_path)} ---") print(output) print("--------------------------------------------------\n") + if output is not None: + # Verbindung für diesen Thread öffnen (SQLite Sicherheit) + conn, cursor = db.connect_db(db_path) + + # Daten vorbereiten: Timestamp + AU Ergebnisse + # Wir wandeln die Series/Dataframe in ein Dictionary um + data_to_insert = output.to_dict() + data_to_insert['timestamp'] = [datetime.now().strftime("%Y-%m-%d %H:%M:%S")] + + # Da die AU-Spaltennamen dynamisch sind, stellen wir sicher, dass sie Listen sind + # (insert_rows_into_table erwartet Listen für jeden Key) + final_payload = {k: [v] if not isinstance(v, list) else v for k, v in data_to_insert.items()} + + + db.insert_rows_into_table(conn, cursor, "actionUnits", final_payload) + + db.disconnect_db(conn, cursor) + print(f"--- Ergebnis für {os.path.basename(video_path)} in DB gespeichert ---") except Exception as e: print(f"Fehler bei der Analyse von {video_path}: {e}") class VideoRecorder: - def __init__(self, filename, width, height): + def __init__(self, filename, width, height, db_path): self.filename = filename + self.db_path = db_path fourcc = cv2.VideoWriter_fourcc(*'XVID') self.out = cv2.VideoWriter(filename, fourcc, FPS, (width, height)) self.frames_to_record = int(VIDEO_DURATION * FPS) @@ -79,7 +105,7 @@ class VideoRecorder: # --- MULTITHREADING HIER --- # Wir starten die Analyse in einem neuen Thread, damit main() sofort weiter frames lesen kann - analysis_thread = threading.Thread(target=startAU_creation, args=(abs_path,)) + analysis_thread = threading.Thread(target=startAU_creation, args=(abs_path, self.db_path)) analysis_thread.daemon = True # Beendet sich, wenn das Hauptprogramm schließt analysis_thread.start() @@ -108,7 +134,7 @@ def main(): if current_time - last_start_time >= START_INTERVAL: timestamp = datetime.now().strftime("%H%M%S") filename = os.path.join(OUTPUT_DIR, f"rec_{timestamp}.avi") - new_recorder = VideoRecorder(filename, width, height) + new_recorder = VideoRecorder(filename, width, height, DB_PATH) active_recorders.append(new_recorder) last_start_time = current_time diff --git a/dataset_creation/camera_handling/camera_stream_AU_and_ET.py b/dataset_creation/camera_handling/camera_stream_AU_and_ET.py new file mode 100644 index 0000000..0d0580f --- /dev/null +++ b/dataset_creation/camera_handling/camera_stream_AU_and_ET.py @@ -0,0 +1,296 @@ +import cv2 +import time +import os +import threading +from datetime import datetime +from feat import Detector +import torch +import mediapipe as mp +import csv + +# Konfiguration +CAMERA_INDEX = 0 +OUTPUT_DIR = "recordings" +VIDEO_DURATION = 10 # Sekunden +START_INTERVAL = 5 # Sekunden bis zum nächsten Start +FPS = 25.0 # Feste FPS + +if not os.path.exists(OUTPUT_DIR): + os.makedirs(OUTPUT_DIR) + +# Globaler Detector, um ihn nicht bei jedem Video neu laden zu müssen (spart massiv Zeit/Speicher) +print("Initialisiere AU-Detector (bitte warten)...") +detector = Detector(au_model="xgb") + +# ===== MediaPipe FaceMesh Setup ===== +mp_face_mesh = mp.solutions.face_mesh +face_mesh = mp_face_mesh.FaceMesh( + static_image_mode=False, + max_num_faces=1, + refine_landmarks=True, # wichtig für Iris + min_detection_confidence=0.5, + min_tracking_confidence=0.5 +) + +LEFT_IRIS = [474, 475, 476, 477] +RIGHT_IRIS = [469, 470, 471, 472] + +LEFT_EYE_LIDS = (159, 145) +RIGHT_EYE_LIDS = (386, 374) + +LEFT_EYE_GAZE_IDXS = (33, 133, 159, 145) +RIGHT_EYE_GAZE_IDXS = (263, 362, 386, 374) + +EYE_OPEN_THRESHOLD = 6 + +# CSV vorbereiten +gaze_csv = open("gaze_data.csv", mode="w", newline="") +gaze_writer = csv.writer(gaze_csv) +gaze_writer.writerow([ + "timestamp", + "left_gaze_x", + "left_gaze_y", + "right_gaze_x", + "right_gaze_y", + "left_valid", + "right_valid", + "left_diameter", + "right_diameter" +]) + +def eye_openness(landmarks, top_idx, bottom_idx, img_height): + top = landmarks[top_idx] + bottom = landmarks[bottom_idx] + return abs(top.y - bottom.y) * img_height + + +def compute_gaze(landmarks, iris_center, indices, w, h): + idx1, idx2, top_idx, bottom_idx = indices + + p1 = landmarks[idx1] + p2 = landmarks[idx2] + top = landmarks[top_idx] + bottom = landmarks[bottom_idx] + + x1 = p1.x * w + x2 = p2.x * w + y_top = top.y * h + y_bottom = bottom.y * h + + iris_x, iris_y = iris_center + + eye_left = min(x1, x2) + eye_right = max(x1, x2) + + eye_width = eye_right - eye_left + eye_height = abs(y_bottom - y_top) + + if eye_width == 0 or eye_height == 0: + return 0.5, 0.5 + + gaze_x = (iris_x - eye_left) / eye_width + gaze_y = (iris_y - min(y_top, y_bottom)) / eye_height + + gaze_x = max(0, min(1, gaze_x)) + gaze_y = max(0, min(1, gaze_y)) + + return gaze_x, gaze_y + +def extract_aus(path, skip_frames): + + # torch.no_grad() deaktiviert die Gradientenberechnung. + # Das löst den "Can't call numpy() on Tensor that requires grad" Fehler. + with torch.no_grad(): + video_prediction = detector.detect_video( + path, + skip_frames=skip_frames, + face_detection_threshold=0.95 + ) + + # Falls video_prediction oder .aus noch Tensoren sind, + # stellen wir sicher, dass sie korrekt summiert werden. + try: + # Wir nehmen die Summe der Action Units über alle detektierten Frames + res = video_prediction.aus.sum() + return res + except Exception as e: + print(f"Fehler bei der Summenbildung: {e}") + return 0 + +def startAU_creation(video_path): + """Diese Funktion läuft nun in einem eigenen Thread.""" + try: + print(f"\n[THREAD START] Analyse läuft für: {video_path}") + # skip_frames berechnen (z.B. alle 5 Sekunden bei 25 FPS = 125) + output = extract_aus(video_path, skip_frames=int(FPS*5)) + + print(f"\n--- Ergebnis für {os.path.basename(video_path)} ---") + print(output) + print("--------------------------------------------------\n") + except Exception as e: + print(f"Fehler bei der Analyse von {video_path}: {e}") + +class VideoRecorder: + def __init__(self, filename, width, height): + self.filename = filename + fourcc = cv2.VideoWriter_fourcc(*'XVID') + self.out = cv2.VideoWriter(filename, fourcc, FPS, (width, height)) + self.frames_to_record = int(VIDEO_DURATION * FPS) + self.frames_count = 0 + self.is_finished = False + + def write_frame(self, frame): + if self.frames_count < self.frames_to_record: + self.out.write(frame) + self.frames_count += 1 + else: + self.finish() + + def finish(self): + if not self.is_finished: + self.out.release() + self.is_finished = True + abs_path = os.path.abspath(self.filename) + print(f"Video fertig gespeichert: {self.filename}") + + # --- MULTITHREADING HIER --- + # Wir starten die Analyse in einem neuen Thread, damit main() sofort weiter frames lesen kann + analysis_thread = threading.Thread(target=startAU_creation, args=(abs_path,)) + analysis_thread.daemon = True # Beendet sich, wenn das Hauptprogramm schließt + analysis_thread.start() + +def main(): + cap = cv2.VideoCapture(CAMERA_INDEX) + if not cap.isOpened(): + print("Fehler: Kamera konnte nicht geöffnet werden.") + return + + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + + active_recorders = [] + last_start_time = 0 + + print("Aufnahme läuft. Drücke 'q' zum Beenden.") + + try: + while True: + ret, frame = cap.read() + if not ret: + break + + rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + h, w, _ = frame.shape + results = face_mesh.process(rgb) + + left_valid = 0 + right_valid = 0 + left_diameter = None + right_diameter = None + + left_gaze_x = None + left_gaze_y = None + right_gaze_x = None + right_gaze_y = None + + if results.multi_face_landmarks: + face_landmarks = results.multi_face_landmarks[0] + + left_open = eye_openness( + face_landmarks.landmark, + LEFT_EYE_LIDS[0], + LEFT_EYE_LIDS[1], + h + ) + + right_open = eye_openness( + face_landmarks.landmark, + RIGHT_EYE_LIDS[0], + RIGHT_EYE_LIDS[1], + h + ) + + left_valid = 1 if left_open > EYE_OPEN_THRESHOLD else 0 + right_valid = 1 if right_open > EYE_OPEN_THRESHOLD else 0 + + for eye_name, eye_indices in [("left", LEFT_IRIS), ("right", RIGHT_IRIS)]: + iris_points = [] + + for idx in eye_indices: + lm = face_landmarks.landmark[idx] + x_i, y_i = int(lm.x * w), int(lm.y * h) + iris_points.append((x_i, y_i)) + + if len(iris_points) == 4: + cx = int(sum(p[0] for p in iris_points) / 4) + cy = int(sum(p[1] for p in iris_points) / 4) + + radius = max( + ((x - cx) ** 2 + (y - cy) ** 2) ** 0.5 + for (x, y) in iris_points + ) + + diameter = 2 * radius + + cv2.circle(frame, (cx, cy), int(radius), (0, 255, 0), 2) + + if eye_name == "left" and left_valid: + left_diameter = diameter + left_gaze_x, left_gaze_y = compute_gaze( + face_landmarks.landmark, + (cx, cy), + LEFT_EYE_GAZE_IDXS, + w, h + ) + + elif eye_name == "right" and right_valid: + right_diameter = diameter + right_gaze_x, right_gaze_y = compute_gaze( + face_landmarks.landmark, + (cx, cy), + RIGHT_EYE_GAZE_IDXS, + w, h + ) + + # CSV schreiben + gaze_writer.writerow([ + time.time(), + left_gaze_x, + left_gaze_y, + right_gaze_x, + right_gaze_y, + left_valid, + right_valid, + left_diameter, + right_diameter + ]) + + current_time = time.time() + + if current_time - last_start_time >= START_INTERVAL: + timestamp = datetime.now().strftime("%H%M%S") + filename = os.path.join(OUTPUT_DIR, f"rec_{timestamp}.avi") + new_recorder = VideoRecorder(filename, width, height) + active_recorders.append(new_recorder) + last_start_time = current_time + + for rec in active_recorders[:]: + rec.write_frame(frame) + if rec.is_finished: + active_recorders.remove(rec) + + cv2.imshow('Kamera Livestream', frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + time.sleep(1/FPS) + + finally: + gaze_csv.close() + face_mesh.close() + cap.release() + cv2.destroyAllWindows() + print("Programm beendet. Warte ggf. auf laufende Analysen...") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/dataset_creation/camera_handling/db_helper.py b/dataset_creation/camera_handling/db_helper.py new file mode 100644 index 0000000..1c6ac48 --- /dev/null +++ b/dataset_creation/camera_handling/db_helper.py @@ -0,0 +1,166 @@ +import os +import sqlite3 +import pandas as pd + + +def connect_db(path_to_file: os.PathLike) -> tuple[sqlite3.Connection, sqlite3.Cursor]: + ''' Establishes a connection with a sqlite3 database. ''' + conn = sqlite3.connect(path_to_file) + cursor = conn.cursor() + return conn, cursor + +def disconnect_db(conn: sqlite3.Connection, cursor: sqlite3.Cursor, commit: bool = True) -> None: + ''' Commits all remaining changes and closes the connection with an sqlite3 database. ''' + cursor.close() + if commit: conn.commit() # commit all pending changes made to the sqlite3 database before closing + conn.close() + +def create_table( + conn: sqlite3.Connection, + cursor: sqlite3.Cursor, + table_name: str, + columns: dict, + constraints: dict, + primary_key: dict, + commit: bool = True + ) -> str: + ''' + Creates a new empty table with the given columns, constraints and primary key. + + :param columns: dict with column names (=keys) and dtypes (=values) (e.g. BIGINT, INT, ...) + :param constraints: dict with column names (=keys) and list of constraints (=values) (like [\'NOT NULL\'(,...)]) + :param primary_key: dict with primary key name (=key) and list of attributes which combined define the table's primary key (=values, like [\'att1\'(,...)]) + ''' + assert len(primary_key.keys()) == 1 + sql = f'CREATE TABLE {table_name} (\n ' + for column,dtype in columns.items(): + sql += f'{column} {dtype}{" "+" ".join(constraints[column]) if column in constraints.keys() else ""},\n ' + if list(primary_key.keys())[0]: sql += f'CONSTRAINT {list(primary_key.keys())[0]} ' + sql += f'PRIMARY KEY ({", ".join(list(primary_key.values())[0])})\n)' + cursor.execute(sql) + if commit: conn.commit() + return sql + +def add_columns_to_table( + conn: sqlite3.Connection, + cursor: sqlite3.Cursor, + table_name: str, + columns: dict, + constraints: dict = dict(), + commit: bool = True + ) -> str: + ''' Adds one/multiple columns (each with a list of constraints) to the given table. ''' + sql_total = '' + for column,dtype in columns.items(): # sqlite can only add one column per query + sql = f'ALTER TABLE {table_name}\n ' + sql += f'ADD "{column}" {dtype}{" "+" ".join(constraints[column]) if column in constraints.keys() else ""}' + sql_total += sql + '\n' + cursor.execute(sql) + if commit: conn.commit() + return sql_total + + + + +def insert_rows_into_table( + conn: sqlite3.Connection, + cursor: sqlite3.Cursor, + table_name: str, + columns: dict, + commit: bool = True + ) -> str: + ''' + Inserts values as multiple rows into the given table. + + :param columns: dict with column names (=keys) and values to insert as lists with at least one element (=values) + + Note: The number of given values per attribute must match the number of rows to insert! + Note: The values for the rows must be of normal python types (e.g. list, str, int, ...) instead of e.g. numpy arrays! + ''' + assert len(set(map(len, columns.values()))) == 1, 'ERROR: Provide equal number of values for each column!' + assert len(set(list(map(type,columns.values())))) == 1 and isinstance(list(columns.values())[0], list), 'ERROR: Provide values as Python lists!' + assert set([type(a) for b in list(columns.values()) for a in b]).issubset({str,int,float,bool}), 'ERROR: Provide values as basic Python data types!' + + values = list(zip(*columns.values())) + sql = f'INSERT INTO {table_name} ({", ".join(columns.keys())})\n VALUES ({("?,"*len(values[0]))[:-1]})' + cursor.executemany(sql, values) + if commit: conn.commit() + return sql + +def update_multiple_rows_in_table( + conn: sqlite3.Connection, + cursor: sqlite3.Cursor, + table_name: str, + new_vals: dict, + conditions: str, + commit: bool = True + ) -> str: + ''' + Updates attribute values of some rows in the given table. + + :param new_vals: dict with column names (=keys) and the new values to set (=values) + :param conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...) + ''' + assignments = ', '.join([f'{k}={v}' for k,v in zip(new_vals.keys(), new_vals.values())]) + sql = f'UPDATE {table_name}\n SET {assignments}\n WHERE {conditions}' + cursor.execute(sql) + if commit: conn.commit() + return sql + +def delete_rows_from_table( + conn: sqlite3.Connection, + cursor: sqlite3.Cursor, + table_name: str, + conditions: str, + commit: bool = True + ) -> str: + ''' + Deletes rows from the given table. + + :param conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...) + ''' + sql = f'DELETE FROM {table_name} WHERE {conditions}' + cursor.execute(sql) + if commit: conn.commit() + return sql + + + +def get_data_from_table( + conn: sqlite3.Connection, + table_name: str, + columns_list: list = ['*'], + aggregations: [None,dict] = None, + where_conditions: [None,str] = None, + order_by: [None, dict] = None, + limit: [None, int] = None, + offset: [None, int] = None + ) -> pd.DataFrame: + ''' + Helper function which returns (if desired: aggregated) contents from the given table as a pandas DataFrame. The rows can be filtered by providing the condition as a string. + + :param columns_list: use if no aggregation is needed to select which columns to get from the table + :param (optional) aggregations: use to apply aggregations on the data from the table; dictionary with column(s) as key(s) and aggregation(s) as corresponding value(s) (e.g. {'col1': 'MIN', 'col2': 'AVG', ...} or {'*': 'COUNT'}) + :param (optional) where_conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...) applied on table. + :param (optional) order_by: dict defining the ordering of the outputs with column(s) as key(s) and ordering as corresponding value(s) (e.g. {'col1': 'ASC'}) + :param (optional) limit: use to limit the number of returned rows + :param (optional) offset: use to skip the first n rows before displaying + + Note: If aggregations is set, the columns_list is ignored. + Note: Get all data as a DataFrame with get_data_from_table(conn, table_name). + Note: If one output is wanted (e.g. count(*) or similar), get it with get_data_from_table(...).iloc[0,0] from the DataFrame. + ''' + assert columns_list or aggregations + + if aggregations: + selection = [f'{agg}({col})' for col,agg in aggregations.items()] + else: + selection = columns_list + selection = ", ".join(selection) + where_conditions = 'WHERE ' + where_conditions if where_conditions else '' + order_by = 'ORDER BY ' + ', '.join([f'{k} {v}' for k,v in order_by.items()]) if order_by else '' + limit = f'LIMIT {limit}' if limit else '' + offset = f'OFFSET {offset}' if offset else '' + + sql = f'SELECT {selection} FROM {table_name} {where_conditions} {order_by} {limit} {offset}' + return pd.read_sql_query(sql, conn) diff --git a/dataset_creation/camera_handling/db_test.py b/dataset_creation/camera_handling/db_test.py new file mode 100644 index 0000000..be077e9 --- /dev/null +++ b/dataset_creation/camera_handling/db_test.py @@ -0,0 +1,54 @@ +import db_helper as db + +DB_PATH = "action_units.db" + +def setup_test_db(): + # 1. Verbindung herstellen (erstellt die Datei, falls nicht vorhanden) + conn, cursor = db.connect_db(DB_PATH) + + # 2. Spalten definieren + # Wir erstellen eine Spalte für den Zeitstempel und beispielhaft einige AUs. + # In SQLite können wir später mit deinem Helper weitere Spalten hinzufügen. + columns = { + "timestamp": "TEXT", + "AU01": "REAL", + "AU02": "REAL", + "AU04": "REAL", + "AU05": "REAL", + "AU06": "REAL", + "AU07": "REAL", + "AU09": "REAL", + "AU10": "REAL", + "AU11": "REAL", + "AU12": "REAL", + "AU14": "REAL", + "AU15": "REAL", + "AU17": "REAL", + "AU20": "REAL", + "AU23": "REAL", + "AU24": "REAL", + "AU25": "REAL", + "AU26": "REAL", + "AU28": "REAL", + "AU43": "REAL", + } + + # Constraints (z.B. Zeitstempel darf nicht leer sein) + constraints = { + "timestamp": ["NOT NULL"] + } + + # Primärschlüssel definieren (Kombination aus Zeitstempel und ggf. ID) + primary_key = {"pk_timestamp": ["timestamp"]} + + try: + sql = db.create_table(conn, cursor, "actionUnits", columns, constraints, primary_key) + print("Tabelle erfolgreich erstellt!") + print(f"SQL-Befehl:\n{sql}") + except Exception as e: + print(f"Hinweis: {e}") + finally: + db.disconnect_db(conn, cursor) + +if __name__ == "__main__": + setup_test_db() \ No newline at end of file