323 lines
11 KiB
Python

import numpy as np
import pandas as pd
import h5py
import yaml
import owncloud
import os
from sklearn.preprocessing import MinMaxScaler
from scipy.signal import welch
from pygazeanalyser.detectors import fixation_detection, saccade_detection
##############################################################################
# 1. HELFERFUNKTIONEN
##############################################################################
def clean_eye_df(df):
"""
Entfernt alle Zeilen, die keine echten Eyetracking-Daten enthalten.
Löst das Problem, dass das Haupt-DataFrame NaN-Zeilen für andere Sensoren enthält.
"""
eye_cols = [c for c in df.columns if "EYE_" in c]
df_eye = df[eye_cols]
# INF → NaN
df_eye = df_eye.replace([np.inf, -np.inf], np.nan)
# Nur Zeilen behalten, wo es echte Eyetracking-Daten gibt
df_eye = df_eye.dropna(subset=eye_cols, how="all")
print("Eyetracking-Zeilen vorher:", len(df))
print("Eyetracking-Zeilen nachher:", len(df_eye))
#Index zurücksetzen
return df_eye.reset_index(drop=True)
def extract_gaze_signal(df):
"""
Extrahiert 2D-Gaze-Positionen auf dem Display,
maskiert ungültige Samples und interpoliert Lücken.
"""
print("→ extract_gaze_signal(): Eingabegröße:", df.shape)
# Gaze-Spalten
gx_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
gy_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
gx_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
gy_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
# Validity-Spalten (1 = gültig)
val_L = (df["EYE_LEFT_GAZE_POINT_VALIDITY"] == 1)
val_R = (df["EYE_RIGHT_GAZE_POINT_VALIDITY"] == 1)
# Inf ersetzen mit NaN (kommt bei Tobii bei Blinks vor)
gx_L.replace([np.inf, -np.inf], np.nan, inplace=True)
gy_L.replace([np.inf, -np.inf], np.nan, inplace=True)
gx_R.replace([np.inf, -np.inf], np.nan, inplace=True)
gy_R.replace([np.inf, -np.inf], np.nan, inplace=True)
# Ungültige Werte maskieren
gx_L[~val_L] = np.nan
gy_L[~val_L] = np.nan
gx_R[~val_R] = np.nan
gy_R[~val_R] = np.nan
# Mittelwert der beiden Augen pro Sample (nanmean ist robust)
gx = np.mean(np.column_stack([gx_L, gx_R]), axis=1)
gy = np.mean(np.column_stack([gy_L, gy_R]), axis=1)
# Interpolation (wichtig für PyGaze!)
gx = pd.Series(gx).interpolate(limit=50, limit_direction="both").bfill().ffill()
gy = pd.Series(gy).interpolate(limit=50, limit_direction="both").bfill().ffill()
xscaler = MinMaxScaler()
gxscale = xscaler.fit_transform(gx.values.reshape(-1, 1))
yscaler = MinMaxScaler()
gyscale = yscaler.fit_transform(gx.values.reshape(-1, 1))
print("xmax ymax", gxscale.max(), gyscale.max())
out = np.column_stack((gxscale, gyscale))
print("→ extract_gaze_signal(): Ausgabegröße:", out.shape)
return out
def extract_pupil(df):
"""Extrahiert Pupillengröße (beide Augen gemittelt)."""
pl = df["EYE_LEFT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
pr = df["EYE_RIGHT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
vl = df.get("EYE_LEFT_PUPIL_VALIDITY")
vr = df.get("EYE_RIGHT_PUPIL_VALIDITY")
if vl is None or vr is None:
# Falls Validity-Spalten nicht vorhanden sind, versuchen wir grobe Heuristik:
# gültig, wenn Pupillendurchmesser nicht NaN.
validity = (~pl.isna() | ~pr.isna()).astype(int).to_numpy()
else:
# Falls vorhanden: 1 wenn mindestens eines der Augen gültig ist
validity = ( (vl == 1) | (vr == 1) ).astype(int).to_numpy()
# Mittelwert der verfügbaren Pupillen
p = np.mean(np.column_stack([pl, pr]), axis=1)
# INF/NaN reparieren
p = pd.Series(p).interpolate(limit=50, limit_direction="both").bfill().ffill()
p = p.to_numpy()
print("→ extract_pupil(): Pupillensignal Länge:", len(p))
return p, validity
def detect_blinks(pupil_validity, min_duration=5):
"""Erkennt Blinks: Validity=0 → Blink."""
blinks = []
start = None
for i, v in enumerate(pupil_validity):
if v == 0 and start is None:
start = i
elif v == 1 and start is not None:
if i - start >= min_duration:
blinks.append([start, i])
start = None
return blinks
def compute_IPA(pupil, fs=250):
"""
IPA = Index of Pupillary Activity (nach Duchowski 2018).
Hochfrequenzanteile der Pupillenzeitreihe.
"""
f, Pxx = welch(pupil, fs=fs, nperseg=int(fs*2)) # 2 Sekunden Fenster
hf_band = (f >= 0.6) & (f <= 2.0)
ipa = np.sum(Pxx[hf_band])
return ipa
##############################################################################
# 2. FEATURE-EXTRAKTION (HAUPTFUNKTION)
##############################################################################
def extract_eye_features(df, window_length_sec=50, fs=250):
"""
df = Tobii DataFrame
window_length_sec = Fenstergröße (z.B. W=1s)
"""
print("→ extract_eye_features(): Starte Feature-Berechnung...")
print(" Fensterlänge W =", window_length_sec, "s")
W = int(window_length_sec * fs) # Window größe in Samples
# Gaze
gaze = extract_gaze_signal(df)
gx, gy = gaze[:, 0], gaze[:, 1]
print("Gültige Werte (gx):", np.sum(~np.isnan(gx)), "von", len(gx))
print("Range:", np.nanmin(gx), np.nanmax(gx))
print("Gültige Werte (gy):", np.sum(~np.isnan(gy)), "von", len(gy))
print("Range:", np.nanmin(gy), np.nanmax(gy))
# Pupille
pupil, pupil_validity = extract_pupil(df)
features = []
# Sliding windows
for start in range(0, len(df), W):
end = start + W
if end > len(df):
break #das letzte Fenster wird ignoriert
w_gaze = gaze[start:end]
w_pupil = pupil[start:end]
w_valid = pupil_validity[start:end]
# ----------------------------
# FIXATIONS (PyGaze)
# ----------------------------
time_ms = np.arange(W) * 1000.0 / fs
# print("gx im Fenster:", w_gaze[:,0][:20])
# print("gy im Fenster:", w_gaze[:,1][:20])
# print("gx diff:", np.mean(np.abs(np.diff(w_gaze[:,0]))))
# print("Werte X im Fenster:", w_gaze[:,0])
# print("Werte Y im Fenster:", w_gaze[:,1])
# print("X-Stats: min/max/diff", np.nanmin(w_gaze[:,0]), np.nanmax(w_gaze[:,0]), np.nanmean(np.abs(np.diff(w_gaze[:,0]))))
# print("Y-Stats: min/max/diff", np.nanmin(w_gaze[:,1]), np.nanmax(w_gaze[:,1]), np.nanmean(np.abs(np.diff(w_gaze[:,1]))))
print("time_ms:", time_ms)
fix, efix = fixation_detection(
x=w_gaze[:, 0], y=w_gaze[:, 1], time=time_ms,
missing=0.0, maxdist=0.001, mindur=65 # mindur=100ms
)
#print("Raw Fixation Output:", efix[0])
if start == 0:
print("DEBUG fix raw:", fix[:10])
# Robust fixations: PyGaze may return malformed entries
fixation_durations = []
for f in efix:
print("Efix:", f[2])
# start_t = f[1] # in ms
# end_t = f[2] # in ms
# duration = (end_t - start_t) / 1000.0 # in Sekunden
#duration = f[2] / 1000.0
if np.isfinite(f[2]) and f[2] > 0:
fixation_durations.append(f[2])
# Kategorien laut Paper
F_short = sum(66 <= d <= 150 for d in fixation_durations)
F_medium = sum(300 <= d <= 500 for d in fixation_durations)
F_long = sum(d >= 1000 for d in fixation_durations)
F_hundred = sum(d > 100 for d in fixation_durations)
F_Cancel = sum(66 < d for d in fixation_durations)
# ----------------------------
# SACCADES
# ----------------------------
sac, esac = saccade_detection(
x=w_gaze[:, 0], y=w_gaze[:, 1], time=time_ms, missing=0, minlen=12, maxvel=0.2, maxacc=1
)
sac_durations = [s[2] for s in esac]
sac_amplitudes = [((s[5]-s[3])**2 + (s[6]-s[4])**2)**0.5 for s in esac]
# ----------------------------
# BLINKS
# ----------------------------
blinks = detect_blinks(w_valid)
blink_durations = [(b[1] - b[0]) / fs for b in blinks]
# ----------------------------
# PUPIL
# ----------------------------
if np.all(np.isnan(w_pupil)):
mean_pupil = np.nan
ipa = np.nan
else:
mean_pupil = np.nanmean(w_pupil)
ipa = compute_IPA(w_pupil, fs=fs)
# ----------------------------
# FEATURE-TABELLE FÜLLEN
# ----------------------------
features.append({
"Fix_count_short_66_150": F_short,
"Fix_count_medium_300_500": F_medium,
"Fix_count_long_gt_1000": F_long,
"Fix_count_100": F_hundred,
"Fix_cancel": F_Cancel,
"Fix_mean_duration": np.mean(fixation_durations) if fixation_durations else 0,
"Fix_median_duration": np.median(fixation_durations) if fixation_durations else 0,
"Sac_count": len(sac),
"Sac_mean_amp": np.mean(sac_amplitudes) if sac_amplitudes else 0,
"Sac_mean_dur": np.mean(sac_durations) if sac_durations else 0,
"Sac_median_dur": np.median(sac_durations) if sac_durations else 0,
"Blink_count": len(blinks),
"Blink_mean_dur": np.mean(blink_durations) if blink_durations else 0,
"Blink_median_dur": np.median(blink_durations) if blink_durations else 0,
"Pupil_mean": mean_pupil,
"Pupil_IPA": ipa
})
result = pd.DataFrame(features)
print("→ extract_eye_features(): Fertig! Ergebnisgröße:", result.shape)
return result
##############################################################################
# 3. MAIN FUNKTION
##############################################################################
def main():
print("### STARTE FEATURE-EXTRAKTION ###")
print("Aktueller Arbeitsordner:", os.getcwd())
df = pd.read_hdf("tmp22.h5", "SIGNALS", mode="r")
#df = pd.read_parquet("cleaned_0001.parquet")
print("DataFrame geladen:", df.shape)
# Nur Eye-Tracking auswählen
#eye_cols = [c for c in df.columns if "EYE_" in c]
#df_eye = df[eye_cols]
#print("Eye-Tracking-Spalten:", len(eye_cols))
#print("→", eye_cols[:10], " ...")
print("Reinige Eyetracking-Daten ...")
df_eye = clean_eye_df(df)
# Feature Extraction
features = extract_eye_features(df_eye, window_length_sec=50, fs=250)
print("\n### FEATURE-MATRIX (HEAD) ###")
print(features.head())
print("\nSpeichere Output in features.csv ...")
features.to_csv("features2.csv", index=False)
print("FERTIG!")
if __name__ == "__main__":
main()