Fahrsimulator_MSY2526_AI/dataset_creation/combined_feature_creation.py
Michael 7708ac1b29 combined dataset creation pipeline
CPFC_both.py: creates parquet files chunkwise that include study level phase subjectID, FACE_AU and Eye tracking columns
combined_feature_creation.py : creates a single dataset by computing the features in parallel for the same time window --> same sample rate required
2025-12-18 15:25:18 +01:00

381 lines
13 KiB
Python

## gemeinsame Verarbeitung
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.preprocessing import MinMaxScaler
from scipy.signal import welch
from pygazeanalyser.detectors import fixation_detection, saccade_detection
##############################################################################
# KONFIGURATION
##############################################################################
INPUT_DIR = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/both_mod_parquet_files")
OUTPUT_FILE = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/new_datasets/combined_dataset_25hz.parquet")
WINDOW_SIZE_SAMPLES = 1250 # 50s bei 25Hz
STEP_SIZE_SAMPLES = 125 # 5s bei 25Hz
SAMPLING_RATE = 25 # Hz
##############################################################################
# EYE-TRACKING FUNKTIONEN
##############################################################################
def clean_eye_df(df):
"""Extrahiert nur Eye-Tracking Spalten und entfernt leere Zeilen."""
eye_cols = [c for c in df.columns if c.startswith("EYE_")]
if not eye_cols:
return pd.DataFrame()
df_eye = df[eye_cols].copy()
df_eye = df_eye.replace([np.inf, -np.inf], np.nan)
df_eye = df_eye.dropna(subset=eye_cols, how="all")
return df_eye.reset_index(drop=True)
def extract_gaze_signal(df):
"""Extrahiert 2D-Gaze-Positionen, maskiert ungültige Samples und interpoliert."""
gx_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
gy_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
gx_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
gy_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
val_L = (df["EYE_LEFT_GAZE_POINT_VALIDITY"] == 1)
val_R = (df["EYE_RIGHT_GAZE_POINT_VALIDITY"] == 1)
# Inf → NaN
for arr in [gx_L, gy_L, gx_R, gy_R]:
arr.replace([np.inf, -np.inf], np.nan, inplace=True)
# Ungültige maskieren
gx_L[~val_L] = np.nan
gy_L[~val_L] = np.nan
gx_R[~val_R] = np.nan
gy_R[~val_R] = np.nan
# Mittelwert beider Augen
gx = np.mean(np.column_stack([gx_L, gx_R]), axis=1)
gy = np.mean(np.column_stack([gy_L, gy_R]), axis=1)
# Interpolation
gx = pd.Series(gx).interpolate(limit=50, limit_direction="both").bfill().ffill()
gy = pd.Series(gy).interpolate(limit=50, limit_direction="both").bfill().ffill()
# MinMax Skalierung
xscaler = MinMaxScaler()
gxscale = xscaler.fit_transform(gx.values.reshape(-1, 1))
yscaler = MinMaxScaler()
gyscale = yscaler.fit_transform(gy.values.reshape(-1, 1))
return np.column_stack((gxscale, gyscale))
def extract_pupil(df):
"""Extrahiert Pupillengröße (beide Augen gemittelt)."""
pl = df["EYE_LEFT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
pr = df["EYE_RIGHT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
vl = df.get("EYE_LEFT_PUPIL_VALIDITY")
vr = df.get("EYE_RIGHT_PUPIL_VALIDITY")
if vl is None or vr is None:
validity = (~pl.isna() | ~pr.isna()).astype(int).to_numpy()
else:
validity = ((vl == 1) | (vr == 1)).astype(int).to_numpy()
p = np.mean(np.column_stack([pl, pr]), axis=1)
p = pd.Series(p).interpolate(limit=50, limit_direction="both").bfill().ffill()
return p.to_numpy(), validity
def detect_blinks(pupil_validity, min_duration=5):
"""Erkennt Blinks: Validity=0 → Blink."""
blinks = []
start = None
for i, v in enumerate(pupil_validity):
if v == 0 and start is None:
start = i
elif v == 1 and start is not None:
if i - start >= min_duration:
blinks.append([start, i])
start = None
return blinks
def compute_IPA(pupil, fs=25):
"""Index of Pupillary Activity (Duchowski 2018)."""
f, Pxx = welch(pupil, fs=fs, nperseg=int(fs*2))
hf_band = (f >= 0.6) & (f <= 2.0)
return np.sum(Pxx[hf_band])
def extract_eye_features_window(df_eye_window, fs=25):
"""
Extrahiert Eye-Tracking Features für ein einzelnes Window.
Gibt Dictionary mit allen Eye-Features zurück.
"""
# Gaze
gaze = extract_gaze_signal(df_eye_window)
# Pupille
pupil, pupil_validity = extract_pupil(df_eye_window)
window_size = len(df_eye_window)
# ----------------------------
# FIXATIONS
# ----------------------------
time_ms = np.arange(window_size) * 1000.0 / fs
fix, efix = fixation_detection(
x=gaze[:, 0], y=gaze[:, 1], time=time_ms,
missing=0.0, maxdist=0.003, mindur=10
)
fixation_durations = [f[2] for f in efix if np.isfinite(f[2]) and f[2] > 0]
# Kategorien
F_short = sum(66 <= d <= 150 for d in fixation_durations)
F_medium = sum(300 <= d <= 500 for d in fixation_durations)
F_long = sum(d >= 1000 for d in fixation_durations)
F_hundred = sum(d > 100 for d in fixation_durations)
# ----------------------------
# SACCADES
# ----------------------------
sac, esac = saccade_detection(
x=gaze[:, 0], y=gaze[:, 1], time=time_ms,
missing=0, minlen=12, maxvel=0.2, maxacc=1
)
sac_durations = [s[2] for s in esac]
sac_amplitudes = [((s[5]-s[3])**2 + (s[6]-s[4])**2)**0.5 for s in esac]
# ----------------------------
# BLINKS
# ----------------------------
blinks = detect_blinks(pupil_validity)
blink_durations = [(b[1] - b[0]) / fs for b in blinks]
# ----------------------------
# PUPIL
# ----------------------------
if np.all(np.isnan(pupil)):
mean_pupil = np.nan
ipa = np.nan
else:
mean_pupil = np.nanmean(pupil)
ipa = compute_IPA(pupil, fs=fs)
# Feature Dictionary
return {
"Fix_count_short_66_150": F_short,
"Fix_count_medium_300_500": F_medium,
"Fix_count_long_gt_1000": F_long,
"Fix_count_100": F_hundred,
"Fix_mean_duration": np.mean(fixation_durations) if fixation_durations else 0,
"Fix_median_duration": np.median(fixation_durations) if fixation_durations else 0,
"Sac_count": len(sac),
"Sac_mean_amp": np.mean(sac_amplitudes) if sac_amplitudes else 0,
"Sac_mean_dur": np.mean(sac_durations) if sac_durations else 0,
"Sac_median_dur": np.median(sac_durations) if sac_durations else 0,
"Blink_count": len(blinks),
"Blink_mean_dur": np.mean(blink_durations) if blink_durations else 0,
"Blink_median_dur": np.median(blink_durations) if blink_durations else 0,
"Pupil_mean": mean_pupil,
"Pupil_IPA": ipa
}
##############################################################################
# KOMBINIERTE FEATURE-EXTRAKTION
##############################################################################
def process_combined_features(input_dir, output_file, window_size, step_size, fs=25):
"""
Verarbeitet Parquet-Dateien mit FACE_AU und EYE Spalten.
Extrahiert beide Feature-Sets und kombiniert sie.
"""
input_path = Path(input_dir)
parquet_files = sorted(input_path.glob("*.parquet"))
if not parquet_files:
print(f"FEHLER: Keine Parquet-Dateien in {input_dir} gefunden!")
return None
print(f"\n{'='*70}")
print(f"KOMBINIERTE FEATURE-EXTRAKTION")
print(f"{'='*70}")
print(f"Dateien: {len(parquet_files)}")
print(f"Window: {window_size} Samples ({window_size/fs:.1f}s bei {fs}Hz)")
print(f"Step: {step_size} Samples ({step_size/fs:.1f}s bei {fs}Hz)")
print(f"{'='*70}\n")
all_windows = []
for file_idx, parquet_file in enumerate(parquet_files, 1):
print(f"\n[{file_idx}/{len(parquet_files)}] {parquet_file.name}")
try:
df = pd.read_parquet(parquet_file)
print(f" Einträge: {len(df)}")
# Identifiziere Spalten
au_columns = [col for col in df.columns if col.startswith('FACE_AU')]
eye_columns = [col for col in df.columns if col.startswith('EYE_')]
print(f" AU-Spalten: {len(au_columns)}")
print(f" Eye-Spalten: {len(eye_columns)}")
has_au = len(au_columns) > 0
has_eye = len(eye_columns) > 0
if not has_au and not has_eye:
print(f" WARNUNG: Keine AU oder Eye Spalten gefunden!")
continue
# Gruppiere nach STUDY, LEVEL, PHASE
group_cols = [col for col in ['STUDY', 'LEVEL', 'PHASE'] if col in df.columns]
if group_cols:
groups = df.groupby(group_cols, sort=False)
else:
groups = [(None, df)]
for group_vals, group_df in groups:
if group_cols:
print(f" Gruppe {group_vals}: {len(group_df)} Samples")
group_df = group_df.reset_index(drop=True)
# Berechne Anzahl Windows
num_windows = (len(group_df) - window_size) // step_size + 1
if num_windows <= 0:
print(f" Zu wenige Einträge für Window (benötigt {window_size})")
continue
# Sliding Windows
for i in range(num_windows):
start_idx = i * step_size
end_idx = start_idx + window_size
window_df = group_df.iloc[start_idx:end_idx]
# Basis-Metadaten
result = {
'subjectID': window_df['subjectID'].iloc[0],
'start_time': window_df['rowID'].iloc[0],
'STUDY': window_df['STUDY'].iloc[0] if 'STUDY' in window_df.columns else np.nan,
'LEVEL': window_df['LEVEL'].iloc[0] if 'LEVEL' in window_df.columns else np.nan,
'PHASE': window_df['PHASE'].iloc[0] if 'PHASE' in window_df.columns else np.nan
}
# FACE AU Features
if has_au:
for au_col in au_columns:
result[f'{au_col}_mean'] = window_df[au_col].mean()
# Eye-Tracking Features
if has_eye:
try:
eye_features = extract_eye_features_window(window_df[eye_columns], fs=fs)
result.update(eye_features)
except Exception as e:
print(f" WARNUNG: Eye-Features fehlgeschlagen: {str(e)}")
# Füge NaN-Werte für Eye-Features hinzu
result.update({
"Fix_count_short_66_150": np.nan,
"Fix_count_medium_300_500": np.nan,
"Fix_count_long_gt_1000": np.nan,
"Fix_count_100": np.nan,
"Fix_mean_duration": np.nan,
"Fix_median_duration": np.nan,
"Sac_count": np.nan,
"Sac_mean_amp": np.nan,
"Sac_mean_dur": np.nan,
"Sac_median_dur": np.nan,
"Blink_count": np.nan,
"Blink_mean_dur": np.nan,
"Blink_median_dur": np.nan,
"Pupil_mean": np.nan,
"Pupil_IPA": np.nan
})
all_windows.append(result)
print(f"{num_windows} Windows erstellt")
except Exception as e:
print(f" FEHLER: {str(e)}")
import traceback
traceback.print_exc()
continue
# Kombiniere alle Windows
if not all_windows:
print("\nKEINE FEATURES EXTRAHIERT!")
return None
print(f"\n{'='*70}")
print(f"ZUSAMMENFASSUNG")
print(f"{'='*70}")
result_df = pd.DataFrame(all_windows)
print(f"Gesamt Windows: {len(result_df)}")
print(f"Spalten: {len(result_df.columns)}")
print(f"Subjects: {result_df['subjectID'].nunique()}")
# Speichern
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
result_df.to_parquet(output_file, index=False)
print(f"\n✓ Gespeichert: {output_file}")
print(f"{'='*70}\n")
return result_df
##############################################################################
# MAIN
##############################################################################
def main():
print("\n" + "="*70)
print("KOMBINIERTE FEATURE-EXTRAKTION (AU + EYE)")
print("="*70)
result = process_combined_features(
input_dir=INPUT_DIR,
output_file=OUTPUT_FILE,
window_size=WINDOW_SIZE_SAMPLES,
step_size=STEP_SIZE_SAMPLES,
fs=SAMPLING_RATE
)
if result is not None:
print("\nErste 5 Zeilen:")
print(result.head())
print("\nSpalten-Übersicht:")
print(result.columns.tolist())
print("\nDatentypen:")
print(result.dtypes)
print("\nStatistik:")
print(result.describe())
print("\n✓ FERTIG!\n")
if __name__ == "__main__":
main()