CPFC_both.py: creates parquet files chunkwise that include study level phase subjectID, FACE_AU and Eye tracking columns combined_feature_creation.py : creates a single dataset by computing the features in parallel for the same time window --> same sample rate required
381 lines
13 KiB
Python
381 lines
13 KiB
Python
## gemeinsame Verarbeitung
|
|
import numpy as np
|
|
import pandas as pd
|
|
from pathlib import Path
|
|
from sklearn.preprocessing import MinMaxScaler
|
|
from scipy.signal import welch
|
|
from pygazeanalyser.detectors import fixation_detection, saccade_detection
|
|
|
|
|
|
##############################################################################
|
|
# KONFIGURATION
|
|
##############################################################################
|
|
INPUT_DIR = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/both_mod_parquet_files")
|
|
OUTPUT_FILE = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/new_datasets/combined_dataset_25hz.parquet")
|
|
|
|
WINDOW_SIZE_SAMPLES = 1250 # 50s bei 25Hz
|
|
STEP_SIZE_SAMPLES = 125 # 5s bei 25Hz
|
|
SAMPLING_RATE = 25 # Hz
|
|
|
|
|
|
##############################################################################
|
|
# EYE-TRACKING FUNKTIONEN
|
|
##############################################################################
|
|
|
|
def clean_eye_df(df):
|
|
"""Extrahiert nur Eye-Tracking Spalten und entfernt leere Zeilen."""
|
|
eye_cols = [c for c in df.columns if c.startswith("EYE_")]
|
|
if not eye_cols:
|
|
return pd.DataFrame()
|
|
|
|
df_eye = df[eye_cols].copy()
|
|
df_eye = df_eye.replace([np.inf, -np.inf], np.nan)
|
|
df_eye = df_eye.dropna(subset=eye_cols, how="all")
|
|
|
|
return df_eye.reset_index(drop=True)
|
|
|
|
|
|
def extract_gaze_signal(df):
|
|
"""Extrahiert 2D-Gaze-Positionen, maskiert ungültige Samples und interpoliert."""
|
|
gx_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
|
|
gy_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
|
|
gx_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
|
|
gy_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
|
|
|
|
val_L = (df["EYE_LEFT_GAZE_POINT_VALIDITY"] == 1)
|
|
val_R = (df["EYE_RIGHT_GAZE_POINT_VALIDITY"] == 1)
|
|
|
|
# Inf → NaN
|
|
for arr in [gx_L, gy_L, gx_R, gy_R]:
|
|
arr.replace([np.inf, -np.inf], np.nan, inplace=True)
|
|
|
|
# Ungültige maskieren
|
|
gx_L[~val_L] = np.nan
|
|
gy_L[~val_L] = np.nan
|
|
gx_R[~val_R] = np.nan
|
|
gy_R[~val_R] = np.nan
|
|
|
|
# Mittelwert beider Augen
|
|
gx = np.mean(np.column_stack([gx_L, gx_R]), axis=1)
|
|
gy = np.mean(np.column_stack([gy_L, gy_R]), axis=1)
|
|
|
|
# Interpolation
|
|
gx = pd.Series(gx).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
|
gy = pd.Series(gy).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
|
|
|
# MinMax Skalierung
|
|
xscaler = MinMaxScaler()
|
|
gxscale = xscaler.fit_transform(gx.values.reshape(-1, 1))
|
|
|
|
yscaler = MinMaxScaler()
|
|
gyscale = yscaler.fit_transform(gy.values.reshape(-1, 1))
|
|
|
|
return np.column_stack((gxscale, gyscale))
|
|
|
|
|
|
def extract_pupil(df):
|
|
"""Extrahiert Pupillengröße (beide Augen gemittelt)."""
|
|
pl = df["EYE_LEFT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
|
|
pr = df["EYE_RIGHT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
|
|
|
|
vl = df.get("EYE_LEFT_PUPIL_VALIDITY")
|
|
vr = df.get("EYE_RIGHT_PUPIL_VALIDITY")
|
|
|
|
if vl is None or vr is None:
|
|
validity = (~pl.isna() | ~pr.isna()).astype(int).to_numpy()
|
|
else:
|
|
validity = ((vl == 1) | (vr == 1)).astype(int).to_numpy()
|
|
|
|
p = np.mean(np.column_stack([pl, pr]), axis=1)
|
|
p = pd.Series(p).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
|
|
|
return p.to_numpy(), validity
|
|
|
|
|
|
def detect_blinks(pupil_validity, min_duration=5):
|
|
"""Erkennt Blinks: Validity=0 → Blink."""
|
|
blinks = []
|
|
start = None
|
|
|
|
for i, v in enumerate(pupil_validity):
|
|
if v == 0 and start is None:
|
|
start = i
|
|
elif v == 1 and start is not None:
|
|
if i - start >= min_duration:
|
|
blinks.append([start, i])
|
|
start = None
|
|
|
|
return blinks
|
|
|
|
|
|
def compute_IPA(pupil, fs=25):
|
|
"""Index of Pupillary Activity (Duchowski 2018)."""
|
|
f, Pxx = welch(pupil, fs=fs, nperseg=int(fs*2))
|
|
hf_band = (f >= 0.6) & (f <= 2.0)
|
|
return np.sum(Pxx[hf_band])
|
|
|
|
|
|
def extract_eye_features_window(df_eye_window, fs=25):
|
|
"""
|
|
Extrahiert Eye-Tracking Features für ein einzelnes Window.
|
|
Gibt Dictionary mit allen Eye-Features zurück.
|
|
"""
|
|
# Gaze
|
|
gaze = extract_gaze_signal(df_eye_window)
|
|
|
|
# Pupille
|
|
pupil, pupil_validity = extract_pupil(df_eye_window)
|
|
|
|
window_size = len(df_eye_window)
|
|
|
|
# ----------------------------
|
|
# FIXATIONS
|
|
# ----------------------------
|
|
time_ms = np.arange(window_size) * 1000.0 / fs
|
|
|
|
fix, efix = fixation_detection(
|
|
x=gaze[:, 0], y=gaze[:, 1], time=time_ms,
|
|
missing=0.0, maxdist=0.003, mindur=10
|
|
)
|
|
|
|
fixation_durations = [f[2] for f in efix if np.isfinite(f[2]) and f[2] > 0]
|
|
|
|
# Kategorien
|
|
F_short = sum(66 <= d <= 150 for d in fixation_durations)
|
|
F_medium = sum(300 <= d <= 500 for d in fixation_durations)
|
|
F_long = sum(d >= 1000 for d in fixation_durations)
|
|
F_hundred = sum(d > 100 for d in fixation_durations)
|
|
|
|
# ----------------------------
|
|
# SACCADES
|
|
# ----------------------------
|
|
sac, esac = saccade_detection(
|
|
x=gaze[:, 0], y=gaze[:, 1], time=time_ms,
|
|
missing=0, minlen=12, maxvel=0.2, maxacc=1
|
|
)
|
|
|
|
sac_durations = [s[2] for s in esac]
|
|
sac_amplitudes = [((s[5]-s[3])**2 + (s[6]-s[4])**2)**0.5 for s in esac]
|
|
|
|
# ----------------------------
|
|
# BLINKS
|
|
# ----------------------------
|
|
blinks = detect_blinks(pupil_validity)
|
|
blink_durations = [(b[1] - b[0]) / fs for b in blinks]
|
|
|
|
# ----------------------------
|
|
# PUPIL
|
|
# ----------------------------
|
|
if np.all(np.isnan(pupil)):
|
|
mean_pupil = np.nan
|
|
ipa = np.nan
|
|
else:
|
|
mean_pupil = np.nanmean(pupil)
|
|
ipa = compute_IPA(pupil, fs=fs)
|
|
|
|
# Feature Dictionary
|
|
return {
|
|
"Fix_count_short_66_150": F_short,
|
|
"Fix_count_medium_300_500": F_medium,
|
|
"Fix_count_long_gt_1000": F_long,
|
|
"Fix_count_100": F_hundred,
|
|
"Fix_mean_duration": np.mean(fixation_durations) if fixation_durations else 0,
|
|
"Fix_median_duration": np.median(fixation_durations) if fixation_durations else 0,
|
|
"Sac_count": len(sac),
|
|
"Sac_mean_amp": np.mean(sac_amplitudes) if sac_amplitudes else 0,
|
|
"Sac_mean_dur": np.mean(sac_durations) if sac_durations else 0,
|
|
"Sac_median_dur": np.median(sac_durations) if sac_durations else 0,
|
|
"Blink_count": len(blinks),
|
|
"Blink_mean_dur": np.mean(blink_durations) if blink_durations else 0,
|
|
"Blink_median_dur": np.median(blink_durations) if blink_durations else 0,
|
|
"Pupil_mean": mean_pupil,
|
|
"Pupil_IPA": ipa
|
|
}
|
|
|
|
|
|
##############################################################################
|
|
# KOMBINIERTE FEATURE-EXTRAKTION
|
|
##############################################################################
|
|
|
|
def process_combined_features(input_dir, output_file, window_size, step_size, fs=25):
|
|
"""
|
|
Verarbeitet Parquet-Dateien mit FACE_AU und EYE Spalten.
|
|
Extrahiert beide Feature-Sets und kombiniert sie.
|
|
"""
|
|
input_path = Path(input_dir)
|
|
parquet_files = sorted(input_path.glob("*.parquet"))
|
|
|
|
if not parquet_files:
|
|
print(f"FEHLER: Keine Parquet-Dateien in {input_dir} gefunden!")
|
|
return None
|
|
|
|
print(f"\n{'='*70}")
|
|
print(f"KOMBINIERTE FEATURE-EXTRAKTION")
|
|
print(f"{'='*70}")
|
|
print(f"Dateien: {len(parquet_files)}")
|
|
print(f"Window: {window_size} Samples ({window_size/fs:.1f}s bei {fs}Hz)")
|
|
print(f"Step: {step_size} Samples ({step_size/fs:.1f}s bei {fs}Hz)")
|
|
print(f"{'='*70}\n")
|
|
|
|
all_windows = []
|
|
|
|
for file_idx, parquet_file in enumerate(parquet_files, 1):
|
|
print(f"\n[{file_idx}/{len(parquet_files)}] {parquet_file.name}")
|
|
|
|
try:
|
|
df = pd.read_parquet(parquet_file)
|
|
print(f" Einträge: {len(df)}")
|
|
|
|
# Identifiziere Spalten
|
|
au_columns = [col for col in df.columns if col.startswith('FACE_AU')]
|
|
eye_columns = [col for col in df.columns if col.startswith('EYE_')]
|
|
|
|
print(f" AU-Spalten: {len(au_columns)}")
|
|
print(f" Eye-Spalten: {len(eye_columns)}")
|
|
|
|
has_au = len(au_columns) > 0
|
|
has_eye = len(eye_columns) > 0
|
|
|
|
if not has_au and not has_eye:
|
|
print(f" WARNUNG: Keine AU oder Eye Spalten gefunden!")
|
|
continue
|
|
|
|
# Gruppiere nach STUDY, LEVEL, PHASE
|
|
group_cols = [col for col in ['STUDY', 'LEVEL', 'PHASE'] if col in df.columns]
|
|
|
|
if group_cols:
|
|
groups = df.groupby(group_cols, sort=False)
|
|
else:
|
|
groups = [(None, df)]
|
|
|
|
for group_vals, group_df in groups:
|
|
if group_cols:
|
|
print(f" Gruppe {group_vals}: {len(group_df)} Samples")
|
|
|
|
group_df = group_df.reset_index(drop=True)
|
|
|
|
# Berechne Anzahl Windows
|
|
num_windows = (len(group_df) - window_size) // step_size + 1
|
|
|
|
if num_windows <= 0:
|
|
print(f" Zu wenige Einträge für Window (benötigt {window_size})")
|
|
continue
|
|
|
|
# Sliding Windows
|
|
for i in range(num_windows):
|
|
start_idx = i * step_size
|
|
end_idx = start_idx + window_size
|
|
|
|
window_df = group_df.iloc[start_idx:end_idx]
|
|
|
|
# Basis-Metadaten
|
|
result = {
|
|
'subjectID': window_df['subjectID'].iloc[0],
|
|
'start_time': window_df['rowID'].iloc[0],
|
|
'STUDY': window_df['STUDY'].iloc[0] if 'STUDY' in window_df.columns else np.nan,
|
|
'LEVEL': window_df['LEVEL'].iloc[0] if 'LEVEL' in window_df.columns else np.nan,
|
|
'PHASE': window_df['PHASE'].iloc[0] if 'PHASE' in window_df.columns else np.nan
|
|
}
|
|
|
|
# FACE AU Features
|
|
if has_au:
|
|
for au_col in au_columns:
|
|
result[f'{au_col}_mean'] = window_df[au_col].mean()
|
|
|
|
# Eye-Tracking Features
|
|
if has_eye:
|
|
try:
|
|
eye_features = extract_eye_features_window(window_df[eye_columns], fs=fs)
|
|
result.update(eye_features)
|
|
except Exception as e:
|
|
print(f" WARNUNG: Eye-Features fehlgeschlagen: {str(e)}")
|
|
# Füge NaN-Werte für Eye-Features hinzu
|
|
result.update({
|
|
"Fix_count_short_66_150": np.nan,
|
|
"Fix_count_medium_300_500": np.nan,
|
|
"Fix_count_long_gt_1000": np.nan,
|
|
"Fix_count_100": np.nan,
|
|
"Fix_mean_duration": np.nan,
|
|
"Fix_median_duration": np.nan,
|
|
"Sac_count": np.nan,
|
|
"Sac_mean_amp": np.nan,
|
|
"Sac_mean_dur": np.nan,
|
|
"Sac_median_dur": np.nan,
|
|
"Blink_count": np.nan,
|
|
"Blink_mean_dur": np.nan,
|
|
"Blink_median_dur": np.nan,
|
|
"Pupil_mean": np.nan,
|
|
"Pupil_IPA": np.nan
|
|
})
|
|
|
|
all_windows.append(result)
|
|
|
|
print(f" → {num_windows} Windows erstellt")
|
|
|
|
except Exception as e:
|
|
print(f" FEHLER: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
continue
|
|
|
|
# Kombiniere alle Windows
|
|
if not all_windows:
|
|
print("\nKEINE FEATURES EXTRAHIERT!")
|
|
return None
|
|
|
|
print(f"\n{'='*70}")
|
|
print(f"ZUSAMMENFASSUNG")
|
|
print(f"{'='*70}")
|
|
|
|
result_df = pd.DataFrame(all_windows)
|
|
|
|
print(f"Gesamt Windows: {len(result_df)}")
|
|
print(f"Spalten: {len(result_df.columns)}")
|
|
print(f"Subjects: {result_df['subjectID'].nunique()}")
|
|
|
|
# Speichern
|
|
output_path = Path(output_file)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
result_df.to_parquet(output_file, index=False)
|
|
|
|
print(f"\n✓ Gespeichert: {output_file}")
|
|
print(f"{'='*70}\n")
|
|
|
|
return result_df
|
|
|
|
|
|
##############################################################################
|
|
# MAIN
|
|
##############################################################################
|
|
|
|
def main():
|
|
print("\n" + "="*70)
|
|
print("KOMBINIERTE FEATURE-EXTRAKTION (AU + EYE)")
|
|
print("="*70)
|
|
|
|
result = process_combined_features(
|
|
input_dir=INPUT_DIR,
|
|
output_file=OUTPUT_FILE,
|
|
window_size=WINDOW_SIZE_SAMPLES,
|
|
step_size=STEP_SIZE_SAMPLES,
|
|
fs=SAMPLING_RATE
|
|
)
|
|
|
|
if result is not None:
|
|
print("\nErste 5 Zeilen:")
|
|
print(result.head())
|
|
|
|
print("\nSpalten-Übersicht:")
|
|
print(result.columns.tolist())
|
|
|
|
print("\nDatentypen:")
|
|
print(result.dtypes)
|
|
|
|
print("\nStatistik:")
|
|
print(result.describe())
|
|
|
|
print("\n✓ FERTIG!\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |