From 7708ac1b296c4dee089afd2fa1dee1165dd170a2 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 18 Dec 2025 15:25:18 +0100 Subject: [PATCH] combined dataset creation pipeline CPFC_both.py: creates parquet files chunkwise that include study level phase subjectID, FACE_AU and Eye tracking columns combined_feature_creation.py : creates a single dataset by computing the features in parallel for the same time window --> same sample rate required --- dataset_creation/CPFC_both.py | 97 +++++ dataset_creation/combined_feature_creation.py | 381 ++++++++++++++++++ ... => create_multimodal_dataset_by_merge.py} | 0 3 files changed, 478 insertions(+) create mode 100644 dataset_creation/CPFC_both.py create mode 100644 dataset_creation/combined_feature_creation.py rename dataset_creation/{create_multimodal_dataset.py => create_multimodal_dataset_by_merge.py} (100%) diff --git a/dataset_creation/CPFC_both.py b/dataset_creation/CPFC_both.py new file mode 100644 index 0000000..3b37b9d --- /dev/null +++ b/dataset_creation/CPFC_both.py @@ -0,0 +1,97 @@ +import os +import pandas as pd +from pathlib import Path + +data_dir = Path("/home/jovyan/Fahrsimulator_MSY2526_AI/EDA") + +# Get all .h5 files and sort them +matching_files = sorted(data_dir.glob("*.h5")) + +# Chunk size for reading (adjust based on your RAM - 100k rows is ~50-100MB depending on columns) +CHUNK_SIZE = 50_000 + +for i, file_path in enumerate(matching_files): + print(f"Subject {i} gestartet") + print(f"{file_path} geoeffnet") + + # Step 1: Get total number of rows and column names + with pd.HDFStore(file_path, mode="r") as store: + cols = store.select("SIGNALS", start=0, stop=1).columns + nrows = store.get_storer("SIGNALS").nrows + print(f"Total columns: {len(cols)}, Total rows: {nrows}") + + # Step 2: Filter columns that start with "FACE_AU" + # Find columns starting with each prefix + face_au_cols = [c for c in cols if c.startswith("FACE_AU")] + eye_cols = [c for c in cols if c.startswith("EYE_")] + + # Check that both have at least one column + if face_au_cols and eye_cols: + print(f"FACE_AU columns found: {face_au_cols}") + print(f"EYE_ columns found: {eye_cols}") + else: + missing = [] + if not face_au_cols: + missing.append("FACE_AU") + if not eye_cols: + missing.append("EYE_") + print(f"Missing columns for: {', '.join(missing)}") + continue + + # Columns to read + columns_to_read = ["STUDY", "LEVEL", "PHASE"] + eye_cols + face_au_cols + + # Step 3: Process file in chunks + chunks_to_save = [] + + for start_row in range(0, nrows, CHUNK_SIZE): + stop_row = min(start_row + CHUNK_SIZE, nrows) + print(f"Processing rows {start_row} to {stop_row} ({stop_row/nrows*100:.1f}%)") + + # Read chunk + df_chunk = pd.read_hdf( + file_path, + key="SIGNALS", + columns=columns_to_read, + start=start_row, + stop=stop_row + ) + + # Add metadata columns + df_chunk["subjectID"] = i + df_chunk["rowID"] = range(start_row, stop_row) + + # Clean data + df_chunk = df_chunk[df_chunk["LEVEL"] != 0] + df_chunk = df_chunk.dropna() + # problematisch, weil die eye tracking auflösung kaputt geht + + # Only keep non-empty chunks + if len(df_chunk) > 0: + chunks_to_save.append(df_chunk) + + # Free memory + del df_chunk + + print("load and cleaning done") + + # Step 4: Combine all chunks and save + if chunks_to_save: + df_final = pd.concat(chunks_to_save, ignore_index=True) + print(f"Final dataframe shape: {df_final.shape}") + + # Save to parquet + base_dir = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/both_mod_parquet_files") + os.makedirs(base_dir, exist_ok=True) + + out_name = base_dir / f"both_mod_{i:04d}.parquet" + df_final.to_parquet(out_name, index=False) + print(f"Saved to {out_name}") + + # Free memory + del df_final + del chunks_to_save + else: + print(f"No valid data found for Subject {i}") + +print("All files processed!") \ No newline at end of file diff --git a/dataset_creation/combined_feature_creation.py b/dataset_creation/combined_feature_creation.py new file mode 100644 index 0000000..b1fc341 --- /dev/null +++ b/dataset_creation/combined_feature_creation.py @@ -0,0 +1,381 @@ +## gemeinsame Verarbeitung +import numpy as np +import pandas as pd +from pathlib import Path +from sklearn.preprocessing import MinMaxScaler +from scipy.signal import welch +from pygazeanalyser.detectors import fixation_detection, saccade_detection + + +############################################################################## +# KONFIGURATION +############################################################################## +INPUT_DIR = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/both_mod_parquet_files") +OUTPUT_FILE = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/new_datasets/combined_dataset_25hz.parquet") + +WINDOW_SIZE_SAMPLES = 1250 # 50s bei 25Hz +STEP_SIZE_SAMPLES = 125 # 5s bei 25Hz +SAMPLING_RATE = 25 # Hz + + +############################################################################## +# EYE-TRACKING FUNKTIONEN +############################################################################## + +def clean_eye_df(df): + """Extrahiert nur Eye-Tracking Spalten und entfernt leere Zeilen.""" + eye_cols = [c for c in df.columns if c.startswith("EYE_")] + if not eye_cols: + return pd.DataFrame() + + df_eye = df[eye_cols].copy() + df_eye = df_eye.replace([np.inf, -np.inf], np.nan) + df_eye = df_eye.dropna(subset=eye_cols, how="all") + + return df_eye.reset_index(drop=True) + + +def extract_gaze_signal(df): + """Extrahiert 2D-Gaze-Positionen, maskiert ungültige Samples und interpoliert.""" + gx_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy() + gy_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy() + gx_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy() + gy_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy() + + val_L = (df["EYE_LEFT_GAZE_POINT_VALIDITY"] == 1) + val_R = (df["EYE_RIGHT_GAZE_POINT_VALIDITY"] == 1) + + # Inf → NaN + for arr in [gx_L, gy_L, gx_R, gy_R]: + arr.replace([np.inf, -np.inf], np.nan, inplace=True) + + # Ungültige maskieren + gx_L[~val_L] = np.nan + gy_L[~val_L] = np.nan + gx_R[~val_R] = np.nan + gy_R[~val_R] = np.nan + + # Mittelwert beider Augen + gx = np.mean(np.column_stack([gx_L, gx_R]), axis=1) + gy = np.mean(np.column_stack([gy_L, gy_R]), axis=1) + + # Interpolation + gx = pd.Series(gx).interpolate(limit=50, limit_direction="both").bfill().ffill() + gy = pd.Series(gy).interpolate(limit=50, limit_direction="both").bfill().ffill() + + # MinMax Skalierung + xscaler = MinMaxScaler() + gxscale = xscaler.fit_transform(gx.values.reshape(-1, 1)) + + yscaler = MinMaxScaler() + gyscale = yscaler.fit_transform(gy.values.reshape(-1, 1)) + + return np.column_stack((gxscale, gyscale)) + + +def extract_pupil(df): + """Extrahiert Pupillengröße (beide Augen gemittelt).""" + pl = df["EYE_LEFT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan) + pr = df["EYE_RIGHT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan) + + vl = df.get("EYE_LEFT_PUPIL_VALIDITY") + vr = df.get("EYE_RIGHT_PUPIL_VALIDITY") + + if vl is None or vr is None: + validity = (~pl.isna() | ~pr.isna()).astype(int).to_numpy() + else: + validity = ((vl == 1) | (vr == 1)).astype(int).to_numpy() + + p = np.mean(np.column_stack([pl, pr]), axis=1) + p = pd.Series(p).interpolate(limit=50, limit_direction="both").bfill().ffill() + + return p.to_numpy(), validity + + +def detect_blinks(pupil_validity, min_duration=5): + """Erkennt Blinks: Validity=0 → Blink.""" + blinks = [] + start = None + + for i, v in enumerate(pupil_validity): + if v == 0 and start is None: + start = i + elif v == 1 and start is not None: + if i - start >= min_duration: + blinks.append([start, i]) + start = None + + return blinks + + +def compute_IPA(pupil, fs=25): + """Index of Pupillary Activity (Duchowski 2018).""" + f, Pxx = welch(pupil, fs=fs, nperseg=int(fs*2)) + hf_band = (f >= 0.6) & (f <= 2.0) + return np.sum(Pxx[hf_band]) + + +def extract_eye_features_window(df_eye_window, fs=25): + """ + Extrahiert Eye-Tracking Features für ein einzelnes Window. + Gibt Dictionary mit allen Eye-Features zurück. + """ + # Gaze + gaze = extract_gaze_signal(df_eye_window) + + # Pupille + pupil, pupil_validity = extract_pupil(df_eye_window) + + window_size = len(df_eye_window) + + # ---------------------------- + # FIXATIONS + # ---------------------------- + time_ms = np.arange(window_size) * 1000.0 / fs + + fix, efix = fixation_detection( + x=gaze[:, 0], y=gaze[:, 1], time=time_ms, + missing=0.0, maxdist=0.003, mindur=10 + ) + + fixation_durations = [f[2] for f in efix if np.isfinite(f[2]) and f[2] > 0] + + # Kategorien + F_short = sum(66 <= d <= 150 for d in fixation_durations) + F_medium = sum(300 <= d <= 500 for d in fixation_durations) + F_long = sum(d >= 1000 for d in fixation_durations) + F_hundred = sum(d > 100 for d in fixation_durations) + + # ---------------------------- + # SACCADES + # ---------------------------- + sac, esac = saccade_detection( + x=gaze[:, 0], y=gaze[:, 1], time=time_ms, + missing=0, minlen=12, maxvel=0.2, maxacc=1 + ) + + sac_durations = [s[2] for s in esac] + sac_amplitudes = [((s[5]-s[3])**2 + (s[6]-s[4])**2)**0.5 for s in esac] + + # ---------------------------- + # BLINKS + # ---------------------------- + blinks = detect_blinks(pupil_validity) + blink_durations = [(b[1] - b[0]) / fs for b in blinks] + + # ---------------------------- + # PUPIL + # ---------------------------- + if np.all(np.isnan(pupil)): + mean_pupil = np.nan + ipa = np.nan + else: + mean_pupil = np.nanmean(pupil) + ipa = compute_IPA(pupil, fs=fs) + + # Feature Dictionary + return { + "Fix_count_short_66_150": F_short, + "Fix_count_medium_300_500": F_medium, + "Fix_count_long_gt_1000": F_long, + "Fix_count_100": F_hundred, + "Fix_mean_duration": np.mean(fixation_durations) if fixation_durations else 0, + "Fix_median_duration": np.median(fixation_durations) if fixation_durations else 0, + "Sac_count": len(sac), + "Sac_mean_amp": np.mean(sac_amplitudes) if sac_amplitudes else 0, + "Sac_mean_dur": np.mean(sac_durations) if sac_durations else 0, + "Sac_median_dur": np.median(sac_durations) if sac_durations else 0, + "Blink_count": len(blinks), + "Blink_mean_dur": np.mean(blink_durations) if blink_durations else 0, + "Blink_median_dur": np.median(blink_durations) if blink_durations else 0, + "Pupil_mean": mean_pupil, + "Pupil_IPA": ipa + } + + +############################################################################## +# KOMBINIERTE FEATURE-EXTRAKTION +############################################################################## + +def process_combined_features(input_dir, output_file, window_size, step_size, fs=25): + """ + Verarbeitet Parquet-Dateien mit FACE_AU und EYE Spalten. + Extrahiert beide Feature-Sets und kombiniert sie. + """ + input_path = Path(input_dir) + parquet_files = sorted(input_path.glob("*.parquet")) + + if not parquet_files: + print(f"FEHLER: Keine Parquet-Dateien in {input_dir} gefunden!") + return None + + print(f"\n{'='*70}") + print(f"KOMBINIERTE FEATURE-EXTRAKTION") + print(f"{'='*70}") + print(f"Dateien: {len(parquet_files)}") + print(f"Window: {window_size} Samples ({window_size/fs:.1f}s bei {fs}Hz)") + print(f"Step: {step_size} Samples ({step_size/fs:.1f}s bei {fs}Hz)") + print(f"{'='*70}\n") + + all_windows = [] + + for file_idx, parquet_file in enumerate(parquet_files, 1): + print(f"\n[{file_idx}/{len(parquet_files)}] {parquet_file.name}") + + try: + df = pd.read_parquet(parquet_file) + print(f" Einträge: {len(df)}") + + # Identifiziere Spalten + au_columns = [col for col in df.columns if col.startswith('FACE_AU')] + eye_columns = [col for col in df.columns if col.startswith('EYE_')] + + print(f" AU-Spalten: {len(au_columns)}") + print(f" Eye-Spalten: {len(eye_columns)}") + + has_au = len(au_columns) > 0 + has_eye = len(eye_columns) > 0 + + if not has_au and not has_eye: + print(f" WARNUNG: Keine AU oder Eye Spalten gefunden!") + continue + + # Gruppiere nach STUDY, LEVEL, PHASE + group_cols = [col for col in ['STUDY', 'LEVEL', 'PHASE'] if col in df.columns] + + if group_cols: + groups = df.groupby(group_cols, sort=False) + else: + groups = [(None, df)] + + for group_vals, group_df in groups: + if group_cols: + print(f" Gruppe {group_vals}: {len(group_df)} Samples") + + group_df = group_df.reset_index(drop=True) + + # Berechne Anzahl Windows + num_windows = (len(group_df) - window_size) // step_size + 1 + + if num_windows <= 0: + print(f" Zu wenige Einträge für Window (benötigt {window_size})") + continue + + # Sliding Windows + for i in range(num_windows): + start_idx = i * step_size + end_idx = start_idx + window_size + + window_df = group_df.iloc[start_idx:end_idx] + + # Basis-Metadaten + result = { + 'subjectID': window_df['subjectID'].iloc[0], + 'start_time': window_df['rowID'].iloc[0], + 'STUDY': window_df['STUDY'].iloc[0] if 'STUDY' in window_df.columns else np.nan, + 'LEVEL': window_df['LEVEL'].iloc[0] if 'LEVEL' in window_df.columns else np.nan, + 'PHASE': window_df['PHASE'].iloc[0] if 'PHASE' in window_df.columns else np.nan + } + + # FACE AU Features + if has_au: + for au_col in au_columns: + result[f'{au_col}_mean'] = window_df[au_col].mean() + + # Eye-Tracking Features + if has_eye: + try: + eye_features = extract_eye_features_window(window_df[eye_columns], fs=fs) + result.update(eye_features) + except Exception as e: + print(f" WARNUNG: Eye-Features fehlgeschlagen: {str(e)}") + # Füge NaN-Werte für Eye-Features hinzu + result.update({ + "Fix_count_short_66_150": np.nan, + "Fix_count_medium_300_500": np.nan, + "Fix_count_long_gt_1000": np.nan, + "Fix_count_100": np.nan, + "Fix_mean_duration": np.nan, + "Fix_median_duration": np.nan, + "Sac_count": np.nan, + "Sac_mean_amp": np.nan, + "Sac_mean_dur": np.nan, + "Sac_median_dur": np.nan, + "Blink_count": np.nan, + "Blink_mean_dur": np.nan, + "Blink_median_dur": np.nan, + "Pupil_mean": np.nan, + "Pupil_IPA": np.nan + }) + + all_windows.append(result) + + print(f" → {num_windows} Windows erstellt") + + except Exception as e: + print(f" FEHLER: {str(e)}") + import traceback + traceback.print_exc() + continue + + # Kombiniere alle Windows + if not all_windows: + print("\nKEINE FEATURES EXTRAHIERT!") + return None + + print(f"\n{'='*70}") + print(f"ZUSAMMENFASSUNG") + print(f"{'='*70}") + + result_df = pd.DataFrame(all_windows) + + print(f"Gesamt Windows: {len(result_df)}") + print(f"Spalten: {len(result_df.columns)}") + print(f"Subjects: {result_df['subjectID'].nunique()}") + + # Speichern + output_path = Path(output_file) + output_path.parent.mkdir(parents=True, exist_ok=True) + result_df.to_parquet(output_file, index=False) + + print(f"\n✓ Gespeichert: {output_file}") + print(f"{'='*70}\n") + + return result_df + + +############################################################################## +# MAIN +############################################################################## + +def main(): + print("\n" + "="*70) + print("KOMBINIERTE FEATURE-EXTRAKTION (AU + EYE)") + print("="*70) + + result = process_combined_features( + input_dir=INPUT_DIR, + output_file=OUTPUT_FILE, + window_size=WINDOW_SIZE_SAMPLES, + step_size=STEP_SIZE_SAMPLES, + fs=SAMPLING_RATE + ) + + if result is not None: + print("\nErste 5 Zeilen:") + print(result.head()) + + print("\nSpalten-Übersicht:") + print(result.columns.tolist()) + + print("\nDatentypen:") + print(result.dtypes) + + print("\nStatistik:") + print(result.describe()) + + print("\n✓ FERTIG!\n") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/dataset_creation/create_multimodal_dataset.py b/dataset_creation/create_multimodal_dataset_by_merge.py similarity index 100% rename from dataset_creation/create_multimodal_dataset.py rename to dataset_creation/create_multimodal_dataset_by_merge.py