changed paths to paulusja ... directory changed feature extraction for AUs to mean instead of sum added v1 of merge script of datasets (needs to be adjusted)
323 lines
11 KiB
Python
323 lines
11 KiB
Python
import numpy as np
|
|
import pandas as pd
|
|
import h5py
|
|
import yaml
|
|
import os
|
|
from sklearn.preprocessing import MinMaxScaler
|
|
from scipy.signal import welch
|
|
from pygazeanalyser.detectors import fixation_detection, saccade_detection
|
|
|
|
|
|
##############################################################################
|
|
# 1. HELFERFUNKTIONEN
|
|
##############################################################################
|
|
def clean_eye_df(df):
|
|
"""
|
|
Entfernt alle Zeilen, die keine echten Eyetracking-Daten enthalten.
|
|
Löst das Problem, dass das Haupt-DataFrame NaN-Zeilen für andere Sensoren enthält.
|
|
"""
|
|
eye_cols = [c for c in df.columns if ("LEFT_" in c or "RIGHT_" in c)]
|
|
df_eye = df[eye_cols]
|
|
|
|
# INF → NaN
|
|
df_eye = df_eye.replace([np.inf, -np.inf], np.nan)
|
|
|
|
# Nur Zeilen behalten, wo es echte Eyetracking-Daten gibt
|
|
df_eye = df_eye.dropna(subset=eye_cols, how="all")
|
|
|
|
print("Eyetracking-Zeilen vorher:", len(df))
|
|
print("Eyetracking-Zeilen nachher:", len(df_eye))
|
|
|
|
#Index zurücksetzen
|
|
return df_eye.reset_index(drop=True)
|
|
|
|
|
|
def extract_gaze_signal(df):
|
|
"""
|
|
Extrahiert 2D-Gaze-Positionen auf dem Display,
|
|
maskiert ungültige Samples und interpoliert Lücken.
|
|
"""
|
|
|
|
print("→ extract_gaze_signal(): Eingabegröße:", df.shape)
|
|
|
|
# Gaze-Spalten
|
|
gx_L = df["LEFT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
|
|
gy_L = df["LEFT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
|
|
gx_R = df["RIGHT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
|
|
gy_R = df["RIGHT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
|
|
|
|
|
|
# Validity-Spalten (1 = gültig)
|
|
val_L = (df["LEFT_GAZE_POINT_VALIDITY"] == 1)
|
|
val_R = (df["RIGHT_GAZE_POINT_VALIDITY"] == 1)
|
|
|
|
# Inf ersetzen mit NaN (kommt bei Tobii bei Blinks vor)
|
|
gx_L.replace([np.inf, -np.inf], np.nan, inplace=True)
|
|
gy_L.replace([np.inf, -np.inf], np.nan, inplace=True)
|
|
gx_R.replace([np.inf, -np.inf], np.nan, inplace=True)
|
|
gy_R.replace([np.inf, -np.inf], np.nan, inplace=True)
|
|
|
|
# Ungültige Werte maskieren
|
|
gx_L[~val_L] = np.nan
|
|
gy_L[~val_L] = np.nan
|
|
gx_R[~val_R] = np.nan
|
|
gy_R[~val_R] = np.nan
|
|
|
|
# Mittelwert der beiden Augen pro Sample (nanmean ist robust)
|
|
gx = np.mean(np.column_stack([gx_L, gx_R]), axis=1)
|
|
gy = np.mean(np.column_stack([gy_L, gy_R]), axis=1)
|
|
|
|
# Interpolation (wichtig für PyGaze!)
|
|
gx = pd.Series(gx).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
|
gy = pd.Series(gy).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
|
|
|
# xscaler = MinMaxScaler()
|
|
# gxscale = xscaler.fit_transform(gx.values.reshape(-1, 1))
|
|
|
|
# yscaler = MinMaxScaler()
|
|
# gyscale = yscaler.fit_transform(gx.values.reshape(-1, 1))
|
|
|
|
#print("xmax ymax", gxscale.max(), gyscale.max())
|
|
|
|
#out = np.column_stack((gxscale, gyscale))
|
|
out = np.column_stack((gx, gy))
|
|
|
|
print("→ extract_gaze_signal(): Ausgabegröße:", out.shape)
|
|
|
|
return out
|
|
|
|
|
|
def extract_pupil(df):
|
|
"""Extrahiert Pupillengröße (beide Augen gemittelt)."""
|
|
|
|
pl = df["LEFT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
|
|
pr = df["RIGHT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
|
|
|
|
vl = df.get("LEFT_PUPIL_VALIDITY")
|
|
vr = df.get("RIGHT_PUPIL_VALIDITY")
|
|
|
|
if vl is None or vr is None:
|
|
# Falls Validity-Spalten nicht vorhanden sind, versuchen wir grobe Heuristik:
|
|
# gültig, wenn Pupillendurchmesser nicht NaN.
|
|
validity = (~pl.isna() | ~pr.isna()).astype(int).to_numpy()
|
|
else:
|
|
# Falls vorhanden: 1 wenn mindestens eines der Augen gültig ist
|
|
validity = ( (vl == 1) | (vr == 1) ).astype(int).to_numpy()
|
|
|
|
# Mittelwert der verfügbaren Pupillen
|
|
p = np.mean(np.column_stack([pl, pr]), axis=1)
|
|
|
|
# INF/NaN reparieren
|
|
p = pd.Series(p).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
|
p = p.to_numpy()
|
|
|
|
print("→ extract_pupil(): Pupillensignal Länge:", len(p))
|
|
return p, validity
|
|
|
|
|
|
def detect_blinks(pupil_validity, min_duration=5):
|
|
"""Erkennt Blinks: Validity=0 → Blink."""
|
|
blinks = []
|
|
start = None
|
|
|
|
for i, v in enumerate(pupil_validity):
|
|
if v == 0 and start is None:
|
|
start = i
|
|
elif v == 1 and start is not None:
|
|
if i - start >= min_duration:
|
|
blinks.append([start, i])
|
|
start = None
|
|
|
|
return blinks
|
|
|
|
|
|
def compute_IPA(pupil, fs=250):
|
|
"""
|
|
IPA = Index of Pupillary Activity (nach Duchowski 2018).
|
|
Hochfrequenzanteile der Pupillenzeitreihe.
|
|
"""
|
|
f, Pxx = welch(pupil, fs=fs, nperseg=int(fs*2)) # 2 Sekunden Fenster
|
|
|
|
hf_band = (f >= 0.6) & (f <= 2.0)
|
|
ipa = np.sum(Pxx[hf_band])
|
|
|
|
return ipa
|
|
|
|
|
|
##############################################################################
|
|
# 2. FEATURE-EXTRAKTION (HAUPTFUNKTION)
|
|
##############################################################################
|
|
|
|
def extract_eye_features(df, window_length_sec=50, fs=250):
|
|
"""
|
|
df = Tobii DataFrame
|
|
window_length_sec = Fenstergröße (z.B. W=1s)
|
|
"""
|
|
|
|
print("→ extract_eye_features(): Starte Feature-Berechnung...")
|
|
print(" Fensterlänge W =", window_length_sec, "s")
|
|
|
|
W = int(window_length_sec * fs) # Window größe in Samples
|
|
|
|
# Gaze
|
|
gaze = extract_gaze_signal(df)
|
|
gx, gy = gaze[:, 0], gaze[:, 1]
|
|
print("Gültige Werte (gx):", np.sum(~np.isnan(gx)), "von", len(gx))
|
|
print("Range:", np.nanmin(gx), np.nanmax(gx))
|
|
print("Gültige Werte (gy):", np.sum(~np.isnan(gy)), "von", len(gy))
|
|
print("Range:", np.nanmin(gy), np.nanmax(gy))
|
|
|
|
# Pupille
|
|
pupil, pupil_validity = extract_pupil(df)
|
|
|
|
features = []
|
|
|
|
# Sliding windows
|
|
for start in range(0, len(df), W):
|
|
end = start + W
|
|
if end > len(df):
|
|
break #das letzte Fenster wird ignoriert
|
|
|
|
|
|
w_gaze = gaze[start:end]
|
|
w_pupil = pupil[start:end]
|
|
w_valid = pupil_validity[start:end]
|
|
|
|
# ----------------------------
|
|
# FIXATIONS (PyGaze)
|
|
# ----------------------------
|
|
time_ms = np.arange(W) * 1000.0 / fs
|
|
|
|
# print("gx im Fenster:", w_gaze[:,0][:20])
|
|
# print("gy im Fenster:", w_gaze[:,1][:20])
|
|
# print("gx diff:", np.mean(np.abs(np.diff(w_gaze[:,0]))))
|
|
|
|
# print("Werte X im Fenster:", w_gaze[:,0])
|
|
# print("Werte Y im Fenster:", w_gaze[:,1])
|
|
# print("X-Stats: min/max/diff", np.nanmin(w_gaze[:,0]), np.nanmax(w_gaze[:,0]), np.nanmean(np.abs(np.diff(w_gaze[:,0]))))
|
|
# print("Y-Stats: min/max/diff", np.nanmin(w_gaze[:,1]), np.nanmax(w_gaze[:,1]), np.nanmean(np.abs(np.diff(w_gaze[:,1]))))
|
|
print("time_ms:", time_ms)
|
|
|
|
fix, efix = fixation_detection(
|
|
x=w_gaze[:, 0], y=w_gaze[:, 1], time=time_ms,
|
|
missing=0.0, maxdist=0.003, mindur=10 # mindur=100ms
|
|
)
|
|
|
|
#print("Raw Fixation Output:", efix[0])
|
|
|
|
if start == 0:
|
|
print("DEBUG fix raw:", fix[:10])
|
|
|
|
# Robust fixations: PyGaze may return malformed entries
|
|
fixation_durations = []
|
|
for f in efix:
|
|
print("Efix:", f[2])
|
|
# start_t = f[1] # in ms
|
|
# end_t = f[2] # in ms
|
|
# duration = (end_t - start_t) / 1000.0 # in Sekunden
|
|
|
|
#duration = f[2] / 1000.0
|
|
if np.isfinite(f[2]) and f[2] > 0:
|
|
fixation_durations.append(f[2])
|
|
|
|
# Kategorien laut Paper
|
|
F_short = sum(66 <= d <= 150 for d in fixation_durations)
|
|
F_medium = sum(300 <= d <= 500 for d in fixation_durations)
|
|
F_long = sum(d >= 1000 for d in fixation_durations)
|
|
F_hundred = sum(d > 100 for d in fixation_durations)
|
|
F_Cancel = sum(66 < d for d in fixation_durations)
|
|
|
|
# ----------------------------
|
|
# SACCADES
|
|
# ----------------------------
|
|
sac, esac = saccade_detection(
|
|
x=w_gaze[:, 0], y=w_gaze[:, 1], time=time_ms, missing=0, minlen=12, maxvel=0.2, maxacc=1
|
|
)
|
|
|
|
sac_durations = [s[2] for s in esac]
|
|
sac_amplitudes = [((s[5]-s[3])**2 + (s[6]-s[4])**2)**0.5 for s in esac]
|
|
|
|
# ----------------------------
|
|
# BLINKS
|
|
# ----------------------------
|
|
blinks = detect_blinks(w_valid)
|
|
blink_durations = [(b[1] - b[0]) / fs for b in blinks]
|
|
|
|
# ----------------------------
|
|
# PUPIL
|
|
# ----------------------------
|
|
if np.all(np.isnan(w_pupil)):
|
|
mean_pupil = np.nan
|
|
ipa = np.nan
|
|
else:
|
|
mean_pupil = np.nanmean(w_pupil)
|
|
ipa = compute_IPA(w_pupil, fs=fs)
|
|
|
|
# ----------------------------
|
|
# FEATURE-TABELLE FÜLLEN
|
|
# ----------------------------
|
|
features.append({
|
|
"Fix_count_short_66_150": F_short,
|
|
"Fix_count_medium_300_500": F_medium,
|
|
"Fix_count_long_gt_1000": F_long,
|
|
"Fix_count_100": F_hundred,
|
|
"Fix_cancel": F_Cancel,
|
|
"Fix_mean_duration": np.mean(fixation_durations) if fixation_durations else 0,
|
|
"Fix_median_duration": np.median(fixation_durations) if fixation_durations else 0,
|
|
|
|
"Sac_count": len(sac),
|
|
"Sac_mean_amp": np.mean(sac_amplitudes) if sac_amplitudes else 0,
|
|
"Sac_mean_dur": np.mean(sac_durations) if sac_durations else 0,
|
|
"Sac_median_dur": np.median(sac_durations) if sac_durations else 0,
|
|
|
|
"Blink_count": len(blinks),
|
|
"Blink_mean_dur": np.mean(blink_durations) if blink_durations else 0,
|
|
"Blink_median_dur": np.median(blink_durations) if blink_durations else 0,
|
|
|
|
"Pupil_mean": mean_pupil,
|
|
"Pupil_IPA": ipa
|
|
})
|
|
|
|
|
|
result = pd.DataFrame(features)
|
|
print("→ extract_eye_features(): Fertig! Ergebnisgröße:", result.shape)
|
|
|
|
return result
|
|
|
|
##############################################################################
|
|
# 3. MAIN FUNKTION
|
|
##############################################################################
|
|
|
|
def main():
|
|
print("### STARTE FEATURE-EXTRAKTION ###")
|
|
print("Aktueller Arbeitsordner:", os.getcwd())
|
|
|
|
#df = pd.read_hdf("tmp22.h5", "SIGNALS", mode="r")
|
|
df = pd.read_parquet("cleaned_0001.parquet")
|
|
print("DataFrame geladen:", df.shape)
|
|
|
|
# Nur Eye-Tracking auswählen
|
|
#eye_cols = [c for c in df.columns if "EYE_" in c]
|
|
#df_eye = df[eye_cols]
|
|
|
|
#print("Eye-Tracking-Spalten:", len(eye_cols))
|
|
#print("→", eye_cols[:10], " ...")
|
|
|
|
print("Reinige Eyetracking-Daten ...")
|
|
df_eye = clean_eye_df(df)
|
|
|
|
# Feature Extraction
|
|
features = extract_eye_features(df_eye, window_length_sec=50, fs=250)
|
|
|
|
print("\n### FEATURE-MATRIX (HEAD) ###")
|
|
print(features.head())
|
|
|
|
print("\nSpeichere Output in features.csv ...")
|
|
features.to_csv("features4.csv", index=False)
|
|
|
|
print("FERTIG!")
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |