## gemeinsame Verarbeitung import numpy as np import pandas as pd from pathlib import Path from sklearn.preprocessing import MinMaxScaler from scipy.signal import welch from pygazeanalyser.detectors import fixation_detection, saccade_detection ############################################################################## # KONFIGURATION ############################################################################## INPUT_DIR = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/both_mod_parquet_files") OUTPUT_FILE = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/new_datasets/fix_blink_combined_dataset_25hz.parquet") WINDOW_SIZE_SAMPLES = 1250 # 50s bei 25Hz STEP_SIZE_SAMPLES = 125 # 5s bei 25Hz SAMPLING_RATE = 25 # Hz MIN_DUR_BLINKS = 2 # x * 40ms ############################################################################## # EYE-TRACKING FUNKTIONEN ############################################################################## def clean_eye_df(df): """Extrahiert nur Eye-Tracking Spalten und entfernt leere Zeilen.""" eye_cols = [c for c in df.columns if c.startswith("EYE_")] if not eye_cols: return pd.DataFrame() df_eye = df[eye_cols].copy() df_eye = df_eye.replace([np.inf, -np.inf], np.nan) df_eye = df_eye.dropna(subset=eye_cols, how="all") return df_eye.reset_index(drop=True) def extract_gaze_signal(df): """Extrahiert 2D-Gaze-Positionen, maskiert ungültige Samples und interpoliert.""" gx_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy() gy_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy() gx_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy() gy_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy() val_L = (df["EYE_LEFT_GAZE_POINT_VALIDITY"] == 1) val_R = (df["EYE_RIGHT_GAZE_POINT_VALIDITY"] == 1) # Inf → NaN for arr in [gx_L, gy_L, gx_R, gy_R]: arr.replace([np.inf, -np.inf], np.nan, inplace=True) # Ungültige maskieren gx_L[~val_L] = np.nan gy_L[~val_L] = np.nan gx_R[~val_R] = np.nan gy_R[~val_R] = np.nan # Mittelwert beider Augen gx = np.mean(np.column_stack([gx_L, gx_R]), axis=1) gy = np.mean(np.column_stack([gy_L, gy_R]), axis=1) # Interpolation gx = pd.Series(gx).interpolate(limit=50, limit_direction="both").bfill().ffill() gy = pd.Series(gy).interpolate(limit=50, limit_direction="both").bfill().ffill() # MinMax Skalierung xscaler = MinMaxScaler() gxscale = xscaler.fit_transform(gx.values.reshape(-1, 1)) yscaler = MinMaxScaler() gyscale = yscaler.fit_transform(gy.values.reshape(-1, 1)) return np.column_stack((gxscale, gyscale)) def extract_pupil(df): """Extrahiert Pupillengröße (beide Augen gemittelt).""" pl = df["EYE_LEFT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan) pr = df["EYE_RIGHT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan) vl = df.get("EYE_LEFT_PUPIL_VALIDITY") vr = df.get("EYE_RIGHT_PUPIL_VALIDITY") if vl is None or vr is None: validity = (~pl.isna() | ~pr.isna()).astype(int).to_numpy() else: validity = ((vl == 1) | (vr == 1)).astype(int).to_numpy() p = np.mean(np.column_stack([pl, pr]), axis=1) p = pd.Series(p).interpolate(limit=50, limit_direction="both").bfill().ffill() return p.to_numpy(), validity def detect_blinks(pupil_validity, min_duration=5): """Erkennt Blinks: Validity=0 → Blink.""" blinks = [] start = None for i, v in enumerate(pupil_validity): if v == 0 and start is None: start = i elif v == 1 and start is not None: if i - start >= min_duration: blinks.append([start, i]) start = None return blinks def compute_IPA(pupil, fs=25): """Index of Pupillary Activity (Duchowski 2018).""" f, Pxx = welch(pupil, fs=fs, nperseg=int(fs*2)) hf_band = (f >= 0.6) & (f <= 2.0) return np.sum(Pxx[hf_band]) def extract_eye_features_window(df_eye_window, fs=25, min_dur_blinks=2): """ Extrahiert Eye-Tracking Features für ein einzelnes Window. Gibt Dictionary mit allen Eye-Features zurück. """ # Gaze gaze = extract_gaze_signal(df_eye_window) # Pupille pupil, pupil_validity = extract_pupil(df_eye_window) window_size = len(df_eye_window) # ---------------------------- # FIXATIONS # ---------------------------- time_ms = np.arange(window_size) * 1000.0 / fs fix, efix = fixation_detection( x=gaze[:, 0], y=gaze[:, 1], time=time_ms, missing=0.0, maxdist=0.003, mindur=10 ) fixation_durations = [f[2] for f in efix if np.isfinite(f[2]) and f[2] > 0] # Kategorien F_short = sum(66 <= d <= 150 for d in fixation_durations) F_medium = sum(300 <= d <= 500 for d in fixation_durations) F_long = sum(d >= 1000 for d in fixation_durations) F_hundred = sum(d > 100 for d in fixation_durations) # ---------------------------- # SACCADES # ---------------------------- sac, esac = saccade_detection( x=gaze[:, 0], y=gaze[:, 1], time=time_ms, missing=0, minlen=12, maxvel=0.2, maxacc=1 ) sac_durations = [s[2] for s in esac] sac_amplitudes = [((s[5]-s[3])**2 + (s[6]-s[4])**2)**0.5 for s in esac] # ---------------------------- # BLINKS # ---------------------------- blinks = detect_blinks(pupil_validity, min_duration=min_dur_blinks) blink_durations = [(b[1] - b[0]) / fs for b in blinks] # ---------------------------- # PUPIL # ---------------------------- if np.all(np.isnan(pupil)): mean_pupil = np.nan ipa = np.nan else: mean_pupil = np.nanmean(pupil) ipa = compute_IPA(pupil, fs=fs) # Feature Dictionary return { "Fix_count_short_66_150": F_short, "Fix_count_medium_300_500": F_medium, "Fix_count_long_gt_1000": F_long, "Fix_count_100": F_hundred, "Fix_mean_duration": np.mean(fixation_durations) if fixation_durations else 0, "Fix_median_duration": np.median(fixation_durations) if fixation_durations else 0, "Sac_count": len(sac), "Sac_mean_amp": np.mean(sac_amplitudes) if sac_amplitudes else 0, "Sac_mean_dur": np.mean(sac_durations) if sac_durations else 0, "Sac_median_dur": np.median(sac_durations) if sac_durations else 0, "Blink_count": len(blinks), "Blink_mean_dur": np.mean(blink_durations) if blink_durations else 0, "Blink_median_dur": np.median(blink_durations) if blink_durations else 0, "Pupil_mean": mean_pupil, "Pupil_IPA": ipa } ############################################################################## # KOMBINIERTE FEATURE-EXTRAKTION ############################################################################## def process_combined_features(input_dir, output_file, window_size, step_size, fs=25,min_duration_blinks=2): """ Verarbeitet Parquet-Dateien mit FACE_AU und EYE Spalten. Extrahiert beide Feature-Sets und kombiniert sie. """ input_path = Path(input_dir) parquet_files = sorted(input_path.glob("*.parquet")) if not parquet_files: print(f"FEHLER: Keine Parquet-Dateien in {input_dir} gefunden!") return None print(f"\n{'='*70}") print(f"KOMBINIERTE FEATURE-EXTRAKTION") print(f"{'='*70}") print(f"Dateien: {len(parquet_files)}") print(f"Window: {window_size} Samples ({window_size/fs:.1f}s bei {fs}Hz)") print(f"Step: {step_size} Samples ({step_size/fs:.1f}s bei {fs}Hz)") print(f"{'='*70}\n") all_windows = [] for file_idx, parquet_file in enumerate(parquet_files, 1): print(f"\n[{file_idx}/{len(parquet_files)}] {parquet_file.name}") try: df = pd.read_parquet(parquet_file) print(f" Einträge: {len(df)}") # Identifiziere Spalten au_columns = [col for col in df.columns if col.startswith('FACE_AU')] eye_columns = [col for col in df.columns if col.startswith('EYE_')] print(f" AU-Spalten: {len(au_columns)}") print(f" Eye-Spalten: {len(eye_columns)}") has_au = len(au_columns) > 0 has_eye = len(eye_columns) > 0 if not has_au and not has_eye: print(f" WARNUNG: Keine AU oder Eye Spalten gefunden!") continue # Gruppiere nach STUDY, LEVEL, PHASE group_cols = [col for col in ['STUDY', 'LEVEL', 'PHASE'] if col in df.columns] if group_cols: groups = df.groupby(group_cols, sort=False) else: groups = [(None, df)] for group_vals, group_df in groups: if group_cols: print(f" Gruppe {group_vals}: {len(group_df)} Samples") group_df = group_df.reset_index(drop=True) # Berechne Anzahl Windows num_windows = (len(group_df) - window_size) // step_size + 1 if num_windows <= 0: print(f" Zu wenige Einträge für Window (benötigt {window_size})") continue # Sliding Windows for i in range(num_windows): start_idx = i * step_size end_idx = start_idx + window_size window_df = group_df.iloc[start_idx:end_idx] # Basis-Metadaten result = { 'subjectID': window_df['subjectID'].iloc[0], 'start_time': window_df['rowID'].iloc[0], 'STUDY': window_df['STUDY'].iloc[0] if 'STUDY' in window_df.columns else np.nan, 'LEVEL': window_df['LEVEL'].iloc[0] if 'LEVEL' in window_df.columns else np.nan, 'PHASE': window_df['PHASE'].iloc[0] if 'PHASE' in window_df.columns else np.nan } # FACE AU Features if has_au: for au_col in au_columns: result[f'{au_col}_mean'] = window_df[au_col].mean() # Eye-Tracking Features if has_eye: try: eye_features = extract_eye_features_window(window_df[eye_columns], fs=fs,min_dur_blinks=min_duration_blinks) result.update(eye_features) except Exception as e: print(f" WARNUNG: Eye-Features fehlgeschlagen: {str(e)}") # Füge NaN-Werte für Eye-Features hinzu result.update({ "Fix_count_short_66_150": np.nan, "Fix_count_medium_300_500": np.nan, "Fix_count_long_gt_1000": np.nan, "Fix_count_100": np.nan, "Fix_mean_duration": np.nan, "Fix_median_duration": np.nan, "Sac_count": np.nan, "Sac_mean_amp": np.nan, "Sac_mean_dur": np.nan, "Sac_median_dur": np.nan, "Blink_count": np.nan, "Blink_mean_dur": np.nan, "Blink_median_dur": np.nan, "Pupil_mean": np.nan, "Pupil_IPA": np.nan }) all_windows.append(result) print(f" → {num_windows} Windows erstellt") except Exception as e: print(f" FEHLER: {str(e)}") import traceback traceback.print_exc() continue # Kombiniere alle Windows if not all_windows: print("\nKEINE FEATURES EXTRAHIERT!") return None print(f"\n{'='*70}") print(f"ZUSAMMENFASSUNG") print(f"{'='*70}") result_df = pd.DataFrame(all_windows) print(f"Gesamt Windows: {len(result_df)}") print(f"Spalten: {len(result_df.columns)}") print(f"Subjects: {result_df['subjectID'].nunique()}") # Speichern output_path = Path(output_file) output_path.parent.mkdir(parents=True, exist_ok=True) result_df.to_parquet(output_file, index=False) print(f"\n✓ Gespeichert: {output_file}") print(f"{'='*70}\n") return result_df ############################################################################## # MAIN ############################################################################## def main(): print("\n" + "="*70) print("KOMBINIERTE FEATURE-EXTRAKTION (AU + EYE)") print("="*70) result = process_combined_features( input_dir=INPUT_DIR, output_file=OUTPUT_FILE, window_size=WINDOW_SIZE_SAMPLES, step_size=STEP_SIZE_SAMPLES, fs=SAMPLING_RATE, min_duration_blinks=MIN_DUR_BLINKS ) if result is not None: print("\nErste 5 Zeilen:") print(result.head()) print("\nSpalten-Übersicht:") print(result.columns.tolist()) print("\nDatentypen:") print(result.dtypes) print("\nStatistik:") print(result.describe()) print("\n✓ FERTIG!\n") if __name__ == "__main__": main()