From 951550be96b8100b66b605626cc06917139d7cc5 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 4 Dec 2025 12:32:34 +0100 Subject: [PATCH] added eye feature dataset generator script --- dataset_creation/create_eye_feature_table.py | 1 - dataset_creation/eye_batch_processor.py | 435 +++++++++++++++++++ 2 files changed, 435 insertions(+), 1 deletion(-) delete mode 100644 dataset_creation/create_eye_feature_table.py create mode 100644 dataset_creation/eye_batch_processor.py diff --git a/dataset_creation/create_eye_feature_table.py b/dataset_creation/create_eye_feature_table.py deleted file mode 100644 index 386860a..0000000 --- a/dataset_creation/create_eye_feature_table.py +++ /dev/null @@ -1 +0,0 @@ -import pandas as pd \ No newline at end of file diff --git a/dataset_creation/eye_batch_processor.py b/dataset_creation/eye_batch_processor.py new file mode 100644 index 0000000..09b906d --- /dev/null +++ b/dataset_creation/eye_batch_processor.py @@ -0,0 +1,435 @@ +import numpy as np +import pandas as pd +import h5py +import yaml +import os +from pathlib import Path +from sklearn.preprocessing import MinMaxScaler +from scipy.signal import welch +from pygazeanalyser.detectors import fixation_detection, saccade_detection + + +############################################################################## +# KONFIGURATION - HIER ANPASSEN! +############################################################################## +INPUT_DIR = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/parquet_Eye_features_old/") +OUTPUT_FILE = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/Eye_dataset_old/eye_dataset_old.parquet") + +WINDOW_SIZE_SAMPLES = 12500 # Anzahl Samples pro Window (z.B. 1250 = 50s bei 25Hz, oder 5s bei 250Hz) +STEP_SIZE_SAMPLES = 1250 # Schrittweite (z.B. 125 = 5s bei 25Hz, oder 0.5s bei 250Hz) +SAMPLING_RATE = 250 # Hz + + +############################################################################## +# 1. HELFERFUNKTIONEN +############################################################################## +def clean_eye_df(df): + """ + Entfernt alle Zeilen, die keine echten Eyetracking-Daten enthalten. + Löst das Problem, dass das Haupt-DataFrame NaN-Zeilen für andere Sensoren enthält. + """ + eye_cols = [c for c in df.columns if ("LEFT_" in c or "RIGHT_" in c)] + df_eye = df[eye_cols] + + # INF → NaN + df_eye = df_eye.replace([np.inf, -np.inf], np.nan) + + # Nur Zeilen behalten, wo es echte Eyetracking-Daten gibt + df_eye = df_eye.dropna(subset=eye_cols, how="all") + + print(f" Eyetracking-Zeilen: {len(df)} → {len(df_eye)}") + + return df_eye.reset_index(drop=True) + + +def extract_gaze_signal(df): + """ + Extrahiert 2D-Gaze-Positionen auf dem Display, + maskiert ungültige Samples und interpoliert Lücken. + """ + # Gaze-Spalten + gx_L = df["LEFT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy() + gy_L = df["LEFT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy() + gx_R = df["RIGHT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy() + gy_R = df["RIGHT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy() + + # Validity-Spalten (1 = gültig) + val_L = (df["LEFT_GAZE_POINT_VALIDITY"] == 1) + val_R = (df["RIGHT_GAZE_POINT_VALIDITY"] == 1) + + # Inf ersetzen mit NaN (kommt bei Tobii bei Blinks vor) + gx_L.replace([np.inf, -np.inf], np.nan, inplace=True) + gy_L.replace([np.inf, -np.inf], np.nan, inplace=True) + gx_R.replace([np.inf, -np.inf], np.nan, inplace=True) + gy_R.replace([np.inf, -np.inf], np.nan, inplace=True) + + # Ungültige Werte maskieren + gx_L[~val_L] = np.nan + gy_L[~val_L] = np.nan + gx_R[~val_R] = np.nan + gy_R[~val_R] = np.nan + + # Mittelwert der beiden Augen pro Sample (nanmean ist robust) + gx = np.mean(np.column_stack([gx_L, gx_R]), axis=1) + gy = np.mean(np.column_stack([gy_L, gy_R]), axis=1) + + # Interpolation (wichtig für PyGaze!) + gx = pd.Series(gx).interpolate(limit=50, limit_direction="both").bfill().ffill() + gy = pd.Series(gy).interpolate(limit=50, limit_direction="both").bfill().ffill() + + out = np.column_stack((gx, gy)) + return out + + +def extract_pupil(df): + """Extrahiert Pupillengröße (beide Augen gemittelt).""" + pl = df["LEFT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan) + pr = df["RIGHT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan) + + vl = df.get("LEFT_PUPIL_VALIDITY") + vr = df.get("RIGHT_PUPIL_VALIDITY") + + if vl is None or vr is None: + validity = (~pl.isna() | ~pr.isna()).astype(int).to_numpy() + else: + validity = ((vl == 1) | (vr == 1)).astype(int).to_numpy() + + # Mittelwert der verfügbaren Pupillen + p = np.mean(np.column_stack([pl, pr]), axis=1) + + # INF/NaN reparieren + p = pd.Series(p).interpolate(limit=50, limit_direction="both").bfill().ffill() + p = p.to_numpy() + + return p, validity + + +def detect_blinks(pupil_validity, min_duration=5): + """Erkennt Blinks: Validity=0 → Blink.""" + blinks = [] + start = None + + for i, v in enumerate(pupil_validity): + if v == 0 and start is None: + start = i + elif v == 1 and start is not None: + if i - start >= min_duration: + blinks.append([start, i]) + start = None + + return blinks + + +def compute_IPA(pupil, fs=250): + """ + IPA = Index of Pupillary Activity (nach Duchowski 2018). + Hochfrequenzanteile der Pupillenzeitreihe. + """ + f, Pxx = welch(pupil, fs=fs, nperseg=int(fs*2)) # 2 Sekunden Fenster + + hf_band = (f >= 0.6) & (f <= 2.0) + ipa = np.sum(Pxx[hf_band]) + + return ipa + + +############################################################################## +# 2. FEATURE-EXTRAKTION MIT SLIDING WINDOW +############################################################################## + +def extract_eye_features_sliding(df_eye, df_meta, window_size, step_size, fs=250): + """ + Extrahiert Features mit Sliding Window aus einem einzelnen Level/Phase. + + Parameters: + ----------- + df_eye : DataFrame + Eye-Tracking Daten (bereits gereinigt) + df_meta : DataFrame + Metadaten (subjectID, rowID, STUDY, LEVEL, PHASE) + window_size : int + Anzahl Samples pro Window + step_size : int + Schrittweite in Samples + fs : int + Sampling Rate in Hz + """ + # Gaze + gaze = extract_gaze_signal(df_eye) + + # Pupille + pupil, pupil_validity = extract_pupil(df_eye) + + features = [] + num_windows = (len(df_eye) - window_size) // step_size + 1 + + if num_windows <= 0: + return pd.DataFrame() + + for i in range(num_windows): + start_idx = i * step_size + end_idx = start_idx + window_size + + w_gaze = gaze[start_idx:end_idx] + w_pupil = pupil[start_idx:end_idx] + w_valid = pupil_validity[start_idx:end_idx] + + # Metadaten für dieses Window + meta_row = df_meta.iloc[start_idx] + + # ---------------------------- + # FIXATIONS (PyGaze) + # ---------------------------- + time_ms = np.arange(window_size) * 1000.0 / fs + + fix, efix = fixation_detection( + x=w_gaze[:, 0], y=w_gaze[:, 1], time=time_ms, + missing=0.0, maxdist=0.003, mindur=10 + ) + + fixation_durations = [] + for f in efix: + if np.isfinite(f[2]) and f[2] > 0: + fixation_durations.append(f[2]) + + # Kategorien laut Paper + F_short = sum(66 <= d <= 150 for d in fixation_durations) + F_medium = sum(300 <= d <= 500 for d in fixation_durations) + F_long = sum(d >= 1000 for d in fixation_durations) + F_hundred = sum(d > 100 for d in fixation_durations) + # F_Cancel = sum(66 < d for d in fixation_durations) + + # ---------------------------- + # SACCADES + # ---------------------------- + sac, esac = saccade_detection( + x=w_gaze[:, 0], y=w_gaze[:, 1], time=time_ms, + missing=0, minlen=12, maxvel=0.2, maxacc=1 + ) + + sac_durations = [s[2] for s in esac] + sac_amplitudes = [((s[5]-s[3])**2 + (s[6]-s[4])**2)**0.5 for s in esac] + + # ---------------------------- + # BLINKS + # ---------------------------- + blinks = detect_blinks(w_valid) + blink_durations = [(b[1] - b[0]) / fs for b in blinks] + + # ---------------------------- + # PUPIL + # ---------------------------- + if np.all(np.isnan(w_pupil)): + mean_pupil = np.nan + ipa = np.nan + else: + mean_pupil = np.nanmean(w_pupil) + ipa = compute_IPA(w_pupil, fs=fs) + + # ---------------------------- + # FEATURE-DICTIONARY + # ---------------------------- + features.append({ + # Metadaten + 'subjectID': meta_row['subjectID'], + 'start_time': meta_row['rowID'], + 'STUDY': meta_row.get('STUDY', np.nan), + 'LEVEL': meta_row.get('LEVEL', np.nan), + 'PHASE': meta_row.get('PHASE', np.nan), + + # Fixation Features + "Fix_count_short_66_150": F_short, + "Fix_count_medium_300_500": F_medium, + "Fix_count_long_gt_1000": F_long, + "Fix_count_100": F_hundred, + # "Fix_cancel": F_Cancel, + "Fix_mean_duration": np.mean(fixation_durations) if fixation_durations else 0, + "Fix_median_duration": np.median(fixation_durations) if fixation_durations else 0, + + # Saccade Features + "Sac_count": len(sac), + "Sac_mean_amp": np.mean(sac_amplitudes) if sac_amplitudes else 0, + "Sac_mean_dur": np.mean(sac_durations) if sac_durations else 0, + "Sac_median_dur": np.median(sac_durations) if sac_durations else 0, + + # Blink Features + "Blink_count": len(blinks), + "Blink_mean_dur": np.mean(blink_durations) if blink_durations else 0, + "Blink_median_dur": np.median(blink_durations) if blink_durations else 0, + + # Pupil Features + "Pupil_mean": mean_pupil, + "Pupil_IPA": ipa + }) + + return pd.DataFrame(features) + + +############################################################################## +# 3. BATCH-VERARBEITUNG +############################################################################## + +def process_parquet_directory(input_dir, output_file, window_size, step_size, fs=250): + """ + Verarbeitet alle Parquet-Dateien in einem Verzeichnis. + + Parameters: + ----------- + input_dir : str + Pfad zum Verzeichnis mit Parquet-Dateien + output_file : str + Pfad für die Ausgabe-Parquet-Datei + window_size : int + Window-Größe in Samples + step_size : int + Schrittweite in Samples + fs : int + Sampling Rate in Hz + """ + input_path = Path(input_dir) + parquet_files = sorted(input_path.glob("*.parquet")) + + if not parquet_files: + print(f"FEHLER: Keine Parquet-Dateien in {input_dir} gefunden!") + return + + print(f"\n{'='*70}") + print(f"STARTE BATCH-VERARBEITUNG") + print(f"{'='*70}") + print(f"Gefundene Dateien: {len(parquet_files)}") + print(f"Window Size: {window_size} Samples ({window_size/fs:.1f}s bei {fs}Hz)") + print(f"Step Size: {step_size} Samples ({step_size/fs:.1f}s bei {fs}Hz)") + print(f"{'='*70}\n") + + all_features = [] + + for file_idx, parquet_file in enumerate(parquet_files, 1): + print(f"\n[{file_idx}/{len(parquet_files)}] Verarbeite: {parquet_file.name}") + + try: + # Lade Parquet-Datei + df = pd.read_parquet(parquet_file) + print(f" Einträge geladen: {len(df)}") + + # Prüfe ob benötigte Spalten vorhanden sind + required_cols = ['subjectID', 'rowID'] + missing_cols = [col for col in required_cols if col not in df.columns] + if missing_cols: + print(f" WARNUNG: Fehlende Spalten: {missing_cols} - Überspringe Datei") + continue + + # Reinige Eye-Tracking-Daten + df_eye = clean_eye_df(df) + + if len(df_eye) == 0: + print(f" WARNUNG: Keine gültigen Eye-Tracking-Daten - Überspringe Datei") + continue + + # Metadaten extrahieren (aligned mit df_eye) + meta_cols = ['subjectID', 'rowID'] + if 'STUDY' in df.columns: + meta_cols.append('STUDY') + if 'LEVEL' in df.columns: + meta_cols.append('LEVEL') + if 'PHASE' in df.columns: + meta_cols.append('PHASE') + + df_meta = df[meta_cols].iloc[df_eye.index].reset_index(drop=True) + + # Gruppiere nach STUDY, LEVEL, PHASE (falls vorhanden) + group_cols = [col for col in ['STUDY', 'LEVEL', 'PHASE'] if col in df_meta.columns] + + if group_cols: + print(f" Gruppiere nach: {', '.join(group_cols)}") + for group_vals, group_df in df_meta.groupby(group_cols, sort=False): + group_eye = df_eye.iloc[group_df.index].reset_index(drop=True) + group_meta = group_df.reset_index(drop=True) + + print(f" Gruppe {group_vals}: {len(group_eye)} Samples", end=" → ") + + features_df = extract_eye_features_sliding( + group_eye, group_meta, window_size, step_size, fs + ) + + if not features_df.empty: + all_features.append(features_df) + print(f"{len(features_df)} Windows") + else: + print("Zu wenige Daten") + else: + # Keine Gruppierung + print(f" Keine Gruppierungsspalten gefunden") + features_df = extract_eye_features_sliding( + df_eye, df_meta, window_size, step_size, fs + ) + + if not features_df.empty: + all_features.append(features_df) + print(f" → {len(features_df)} Windows erstellt") + else: + print(f" → Zu wenige Daten") + + except Exception as e: + print(f" FEHLER bei Verarbeitung: {str(e)}") + import traceback + traceback.print_exc() + continue + + # Kombiniere alle Features + if not all_features: + print("\nKEINE FEATURES EXTRAHIERT!") + return None + + print(f"\n{'='*70}") + print(f"ZUSAMMENFASSUNG") + print(f"{'='*70}") + + final_df = pd.concat(all_features, ignore_index=True) + + print(f"Gesamt Windows: {len(final_df)}") + print(f"Spalten: {len(final_df.columns)}") + print(f"Subjects: {final_df['subjectID'].nunique()}") + + # Speichere Ergebnis + output_path = Path(output_file) + output_path.parent.mkdir(parents=True, exist_ok=True) + final_df.to_parquet(output_file, index=False) + + print(f"\n✓ Ergebnis gespeichert: {output_file}") + print(f"{'='*70}\n") + + return final_df + + +############################################################################## +# 4. MAIN +############################################################################## + +def main(): + print("\n" + "="*70) + print("EYE-TRACKING FEATURE EXTRAKTION - BATCH MODE") + print("="*70) + + result = process_parquet_directory( + input_dir=INPUT_DIR, + output_file=OUTPUT_FILE, + window_size=WINDOW_SIZE_SAMPLES, + step_size=STEP_SIZE_SAMPLES, + fs=SAMPLING_RATE + ) + + if result is not None: + print("\nErste 5 Zeilen des Ergebnisses:") + print(result.head()) + + print("\nSpalten-Übersicht:") + print(result.columns.tolist()) + + print("\nDatentypen:") + print(result.dtypes) + + print("\n✓ FERTIG!\n") + + +if __name__ == "__main__": + main()