Fahrsimulator_MSY2526_AI/EyeTracking/eyetrackingFeatures.py
2025-12-03 13:41:48 +01:00

294 lines
9.4 KiB
Python

import numpy as np
import pandas as pd
import h5py
import yaml
import owncloud
import os
from scipy.signal import welch
from pygazeanalyser.detectors import fixation_detection, saccade_detection
##############################################################################
# 1. HELFERFUNKTIONEN
##############################################################################
def clean_eye_df(df):
"""
Entfernt alle Zeilen, die keine echten Eyetracking-Daten enthalten.
Löst das Problem, dass das Haupt-DataFrame NaN-Zeilen für andere Sensoren enthält.
"""
eye_cols = [c for c in df.columns if "EYE_" in c]
df_eye = df[eye_cols]
# INF → NaN
df_eye = df_eye.replace([np.inf, -np.inf], np.nan)
# Nur Zeilen behalten, wo es echte Eyetracking-Daten gibt
df_eye = df_eye.dropna(subset=eye_cols, how="all")
print("Eyetracking-Zeilen vorher:", len(df))
print("Eyetracking-Zeilen nachher:", len(df_eye))
return df_eye.reset_index(drop=True)
def extract_gaze_signal(df):
"""
Extrahiert 2D-Gaze-Positionen auf dem Display,
maskiert ungültige Samples und interpoliert Lücken.
"""
print("→ extract_gaze_signal(): Eingabegröße:", df.shape)
# Gaze-Spalten
gx_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
gy_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
gx_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
gy_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
# Validity-Spalten (1 = gültig)
val_L = (df["EYE_LEFT_GAZE_POINT_VALIDITY"] == 1)
val_R = (df["EYE_RIGHT_GAZE_POINT_VALIDITY"] == 1)
# inf ersetzen (kommt bei Tobii bei Blinks vor)
gx_L.replace([np.inf, -np.inf], np.nan, inplace=True)
gy_L.replace([np.inf, -np.inf], np.nan, inplace=True)
gx_R.replace([np.inf, -np.inf], np.nan, inplace=True)
gy_R.replace([np.inf, -np.inf], np.nan, inplace=True)
# Ungültige Werte maskieren
gx_L[~val_L] = np.nan
gy_L[~val_L] = np.nan
gx_R[~val_R] = np.nan
gy_R[~val_R] = np.nan
# Mittelwert der beiden Augen pro Sample (nanmean ist robust)
gx = np.nanmean(np.column_stack([gx_L, gx_R]), axis=1)
gy = np.nanmean(np.column_stack([gy_L, gy_R]), axis=1)
# Interpolation (wichtig für PyGaze!)
gx = pd.Series(gx).interpolate(limit=50, limit_direction="both").bfill().ffill()
gy = pd.Series(gy).interpolate(limit=50, limit_direction="both").bfill().ffill()
out = np.column_stack((gx, gy))
print("→ extract_gaze_signal(): Ausgabegröße:", out.shape)
return out
def extract_pupil(df):
"""Extrahiert Pupillengröße (beide Augen gemittelt)."""
pl = df["EYE_LEFT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
pr = df["EYE_RIGHT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
vl = df.get("EYE_LEFT_PUPIL_VALIDITY")
vr = df.get("EYE_RIGHT_PUPIL_VALIDITY")
if vl is None or vr is None:
# Falls Validity-Spalten nicht vorhanden sind, versuchen wir grobe Heuristik:
# gültig, wenn Pupillendurchmesser nicht NaN.
validity = (~pl.isna() | ~pr.isna()).astype(int).to_numpy()
else:
# Falls vorhanden: 1 wenn mindestens eines der Augen gültig ist
validity = ( (vl == 1) | (vr == 1) ).astype(int).to_numpy()
# Mittelwert der verfügbaren Pupillen
p = np.nanmean(np.column_stack([pl, pr]), axis=1)
# INF/NaN reparieren
p = pd.Series(p).interpolate(limit=50, limit_direction="both").bfill().ffill()
p = p.to_numpy()
print("→ extract_pupil(): Pupillensignal Länge:", len(p))
return p, validity
def detect_blinks(pupil_validity, min_duration=5):
"""Erkennt Blinks: Validity=0 → Blink."""
blinks = []
start = None
for i, v in enumerate(pupil_validity):
if v == 0 and start is None:
start = i
elif v == 1 and start is not None:
if i - start >= min_duration:
blinks.append([start, i])
start = None
return blinks
def compute_IPA(pupil, fs=250):
"""
IPA = Index of Pupillary Activity (nach Duchowski 2018).
Hochfrequenzanteile der Pupillenzeitreihe.
"""
f, Pxx = welch(pupil, fs=fs, nperseg=int(fs*2)) # 2 Sekunden Fenster
hf_band = (f >= 0.6) & (f <= 2.0)
ipa = np.sum(Pxx[hf_band])
return ipa
##############################################################################
# 2. FEATURE-EXTRAKTION (HAUPTFUNKTION)
##############################################################################
def extract_eye_features(df, window_length_sec=2, fs=250):
"""
df = Tobii DataFrame
window_length_sec = Fenstergröße (z.B. W=1s)
"""
print("→ extract_eye_features(): Starte Feature-Berechnung...")
print(" Fensterlänge W =", window_length_sec, "s")
W = int(window_length_sec * fs)
# Gaze
gaze = extract_gaze_signal(df)
# Pupille
pupil, pupil_validity = extract_pupil(df)
features = []
# Sliding windows
for start in range(0, len(df), W):
end = start + W
if end > len(df):
break
#print(f"→ Fenster {start}:{end} wird verarbeitet...")
w_gaze = gaze[start:end]
w_pupil = pupil[start:end]
w_valid = pupil_validity[start:end]
# ----------------------------
# FIXATIONS (PyGaze)
# ----------------------------
fix, _ = fixation_detection(
x=w_gaze[:, 0], y=w_gaze[:, 1], time=np.arange(W)/fs,
missing=np.nan, maxdist=0.02, mindur=0.1 # mindur=100ms
)
if start == 0:
print("DEBUG fix raw:", fix[:10])
# nur gültige Fixationen
fix = [f for f in fix if isinstance(f, (list, tuple)) and len(f) >= 3]
# Robust fixations: PyGaze may return malformed entries
fixation_durations = []
for f in fix:
start_t = f[1] # in ms
end_t = f[2] # in ms
duration = (end_t - start_t) / 1000.0 # in Sekunden
#duration = f[2] / 1000.0
if np.isfinite(duration) and duration > 0:
fixation_durations.append(duration)
# Kategorien laut Paper
F_short = sum(0.066 <= d <= 0.150 for d in fixation_durations)
F_medium = sum(0.300 <= d <= 0.500 for d in fixation_durations)
F_long = sum(d >= 1.000 for d in fixation_durations)
# ----------------------------
# SACCADES
# ----------------------------
sac, _ = saccade_detection(
x=w_gaze[:, 0], y=w_gaze[:, 1], time=np.arange(W)/fs, missing=np.nan
)
# Korrektes Format: [start_index, end_index, duration_seconds, amplitude_deg]
sac = [s for s in sac if isinstance(s, (list, tuple)) and len(s) >= 4]
sac_durations = [(s[2] - s[1]) for s in sac]
sac_amplitudes = [s[3] for s in sac]
# ----------------------------
# BLINKS
# ----------------------------
blinks = detect_blinks(w_valid)
blink_durations = [(b[1] - b[0]) / fs for b in blinks]
# ----------------------------
# PUPIL
# ----------------------------
if np.all(np.isnan(w_pupil)):
mean_pupil = np.nan
ipa = np.nan
else:
mean_pupil = np.nanmean(w_pupil)
ipa = compute_IPA(w_pupil, fs=fs)
# ----------------------------
# FEATURE-TABELLE FÜLLEN
# ----------------------------
features.append({
"Fix_count_short_66_150": F_short,
"Fix_count_medium_300_500": F_medium,
"Fix_count_long_gt_1000": F_long,
"Fix_mean_duration": np.mean(fixation_durations) if fixation_durations else 0,
"Fix_median_duration": np.median(fixation_durations) if fixation_durations else 0,
"Sac_count": len(sac),
"Sac_mean_amp": np.mean(sac_amplitudes) if sac_amplitudes else 0,
"Sac_mean_dur": np.mean(sac_durations) if sac_durations else 0,
"Sac_median_dur": np.median(sac_durations) if sac_durations else 0,
"Blink_count": len(blinks),
"Blink_mean_dur": np.mean(blink_durations) if blink_durations else 0,
"Blink_median_dur": np.median(blink_durations) if blink_durations else 0,
"Pupil_mean": mean_pupil,
"Pupil_IPA": ipa
})
result = pd.DataFrame(features)
print("→ extract_eye_features(): Fertig! Ergebnisgröße:", result.shape)
return result
##############################################################################
# 3. MAIN FUNKTION
##############################################################################
def main():
print("### STARTE FEATURE-EXTRAKTION ###")
print("Aktueller Arbeitsordner:", os.getcwd())
df = pd.read_hdf("tmp22.h5", "SIGNALS", mode="r")
print("DataFrame geladen:", df.shape)
# Nur Eye-Tracking auswählen
#eye_cols = [c for c in df.columns if "EYE_" in c]
#df_eye = df[eye_cols]
#print("Eye-Tracking-Spalten:", len(eye_cols))
#print("→", eye_cols[:10], " ...")
print("Reinige Eyetracking-Daten ...")
df_eye = clean_eye_df(df)
# Feature Extraction
features = extract_eye_features(df_eye, window_length_sec=2, fs=250)
print("\n### FEATURE-MATRIX (HEAD) ###")
print(features.head())
print("\nSpeichere Output in features.csv ...")
features.to_csv("features2.csv", index=False)
print("FERTIG!")
if __name__ == "__main__":
main()