- moved eyeTrackingFeatures to EDA
- updated eyeTrackingFeatures - added eyeAlt
This commit is contained in:
parent
d7d0414b14
commit
080bdcfbe3
324
EDA/eyeAlt.py
Normal file
324
EDA/eyeAlt.py
Normal file
@ -0,0 +1,324 @@
|
|||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
import h5py
|
||||||
|
import yaml
|
||||||
|
import owncloud
|
||||||
|
import os
|
||||||
|
from sklearn.preprocessing import MinMaxScaler
|
||||||
|
from scipy.signal import welch
|
||||||
|
from pygazeanalyser.detectors import fixation_detection, saccade_detection
|
||||||
|
|
||||||
|
|
||||||
|
##############################################################################
|
||||||
|
# 1. HELFERFUNKTIONEN
|
||||||
|
##############################################################################
|
||||||
|
def clean_eye_df(df):
|
||||||
|
"""
|
||||||
|
Entfernt alle Zeilen, die keine echten Eyetracking-Daten enthalten.
|
||||||
|
Löst das Problem, dass das Haupt-DataFrame NaN-Zeilen für andere Sensoren enthält.
|
||||||
|
"""
|
||||||
|
eye_cols = [c for c in df.columns if ("LEFT_" in c or "RIGHT_" in c)]
|
||||||
|
df_eye = df[eye_cols]
|
||||||
|
|
||||||
|
# INF → NaN
|
||||||
|
df_eye = df_eye.replace([np.inf, -np.inf], np.nan)
|
||||||
|
|
||||||
|
# Nur Zeilen behalten, wo es echte Eyetracking-Daten gibt
|
||||||
|
df_eye = df_eye.dropna(subset=eye_cols, how="all")
|
||||||
|
|
||||||
|
print("Eyetracking-Zeilen vorher:", len(df))
|
||||||
|
print("Eyetracking-Zeilen nachher:", len(df_eye))
|
||||||
|
|
||||||
|
#Index zurücksetzen
|
||||||
|
return df_eye.reset_index(drop=True)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_gaze_signal(df):
|
||||||
|
"""
|
||||||
|
Extrahiert 2D-Gaze-Positionen auf dem Display,
|
||||||
|
maskiert ungültige Samples und interpoliert Lücken.
|
||||||
|
"""
|
||||||
|
|
||||||
|
print("→ extract_gaze_signal(): Eingabegröße:", df.shape)
|
||||||
|
|
||||||
|
# Gaze-Spalten
|
||||||
|
gx_L = df["LEFT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
|
||||||
|
gy_L = df["LEFT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
|
||||||
|
gx_R = df["RIGHT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
|
||||||
|
gy_R = df["RIGHT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
|
||||||
|
|
||||||
|
|
||||||
|
# Validity-Spalten (1 = gültig)
|
||||||
|
val_L = (df["LEFT_GAZE_POINT_VALIDITY"] == 1)
|
||||||
|
val_R = (df["RIGHT_GAZE_POINT_VALIDITY"] == 1)
|
||||||
|
|
||||||
|
# Inf ersetzen mit NaN (kommt bei Tobii bei Blinks vor)
|
||||||
|
gx_L.replace([np.inf, -np.inf], np.nan, inplace=True)
|
||||||
|
gy_L.replace([np.inf, -np.inf], np.nan, inplace=True)
|
||||||
|
gx_R.replace([np.inf, -np.inf], np.nan, inplace=True)
|
||||||
|
gy_R.replace([np.inf, -np.inf], np.nan, inplace=True)
|
||||||
|
|
||||||
|
# Ungültige Werte maskieren
|
||||||
|
gx_L[~val_L] = np.nan
|
||||||
|
gy_L[~val_L] = np.nan
|
||||||
|
gx_R[~val_R] = np.nan
|
||||||
|
gy_R[~val_R] = np.nan
|
||||||
|
|
||||||
|
# Mittelwert der beiden Augen pro Sample (nanmean ist robust)
|
||||||
|
gx = np.mean(np.column_stack([gx_L, gx_R]), axis=1)
|
||||||
|
gy = np.mean(np.column_stack([gy_L, gy_R]), axis=1)
|
||||||
|
|
||||||
|
# Interpolation (wichtig für PyGaze!)
|
||||||
|
gx = pd.Series(gx).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
||||||
|
gy = pd.Series(gy).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
||||||
|
|
||||||
|
# xscaler = MinMaxScaler()
|
||||||
|
# gxscale = xscaler.fit_transform(gx.values.reshape(-1, 1))
|
||||||
|
|
||||||
|
# yscaler = MinMaxScaler()
|
||||||
|
# gyscale = yscaler.fit_transform(gx.values.reshape(-1, 1))
|
||||||
|
|
||||||
|
#print("xmax ymax", gxscale.max(), gyscale.max())
|
||||||
|
|
||||||
|
#out = np.column_stack((gxscale, gyscale))
|
||||||
|
out = np.column_stack((gx, gy))
|
||||||
|
|
||||||
|
print("→ extract_gaze_signal(): Ausgabegröße:", out.shape)
|
||||||
|
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def extract_pupil(df):
|
||||||
|
"""Extrahiert Pupillengröße (beide Augen gemittelt)."""
|
||||||
|
|
||||||
|
pl = df["LEFT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
|
||||||
|
pr = df["RIGHT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
|
||||||
|
|
||||||
|
vl = df.get("LEFT_PUPIL_VALIDITY")
|
||||||
|
vr = df.get("RIGHT_PUPIL_VALIDITY")
|
||||||
|
|
||||||
|
if vl is None or vr is None:
|
||||||
|
# Falls Validity-Spalten nicht vorhanden sind, versuchen wir grobe Heuristik:
|
||||||
|
# gültig, wenn Pupillendurchmesser nicht NaN.
|
||||||
|
validity = (~pl.isna() | ~pr.isna()).astype(int).to_numpy()
|
||||||
|
else:
|
||||||
|
# Falls vorhanden: 1 wenn mindestens eines der Augen gültig ist
|
||||||
|
validity = ( (vl == 1) | (vr == 1) ).astype(int).to_numpy()
|
||||||
|
|
||||||
|
# Mittelwert der verfügbaren Pupillen
|
||||||
|
p = np.mean(np.column_stack([pl, pr]), axis=1)
|
||||||
|
|
||||||
|
# INF/NaN reparieren
|
||||||
|
p = pd.Series(p).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
||||||
|
p = p.to_numpy()
|
||||||
|
|
||||||
|
print("→ extract_pupil(): Pupillensignal Länge:", len(p))
|
||||||
|
return p, validity
|
||||||
|
|
||||||
|
|
||||||
|
def detect_blinks(pupil_validity, min_duration=5):
|
||||||
|
"""Erkennt Blinks: Validity=0 → Blink."""
|
||||||
|
blinks = []
|
||||||
|
start = None
|
||||||
|
|
||||||
|
for i, v in enumerate(pupil_validity):
|
||||||
|
if v == 0 and start is None:
|
||||||
|
start = i
|
||||||
|
elif v == 1 and start is not None:
|
||||||
|
if i - start >= min_duration:
|
||||||
|
blinks.append([start, i])
|
||||||
|
start = None
|
||||||
|
|
||||||
|
return blinks
|
||||||
|
|
||||||
|
|
||||||
|
def compute_IPA(pupil, fs=250):
|
||||||
|
"""
|
||||||
|
IPA = Index of Pupillary Activity (nach Duchowski 2018).
|
||||||
|
Hochfrequenzanteile der Pupillenzeitreihe.
|
||||||
|
"""
|
||||||
|
f, Pxx = welch(pupil, fs=fs, nperseg=int(fs*2)) # 2 Sekunden Fenster
|
||||||
|
|
||||||
|
hf_band = (f >= 0.6) & (f <= 2.0)
|
||||||
|
ipa = np.sum(Pxx[hf_band])
|
||||||
|
|
||||||
|
return ipa
|
||||||
|
|
||||||
|
|
||||||
|
##############################################################################
|
||||||
|
# 2. FEATURE-EXTRAKTION (HAUPTFUNKTION)
|
||||||
|
##############################################################################
|
||||||
|
|
||||||
|
def extract_eye_features(df, window_length_sec=50, fs=250):
|
||||||
|
"""
|
||||||
|
df = Tobii DataFrame
|
||||||
|
window_length_sec = Fenstergröße (z.B. W=1s)
|
||||||
|
"""
|
||||||
|
|
||||||
|
print("→ extract_eye_features(): Starte Feature-Berechnung...")
|
||||||
|
print(" Fensterlänge W =", window_length_sec, "s")
|
||||||
|
|
||||||
|
W = int(window_length_sec * fs) # Window größe in Samples
|
||||||
|
|
||||||
|
# Gaze
|
||||||
|
gaze = extract_gaze_signal(df)
|
||||||
|
gx, gy = gaze[:, 0], gaze[:, 1]
|
||||||
|
print("Gültige Werte (gx):", np.sum(~np.isnan(gx)), "von", len(gx))
|
||||||
|
print("Range:", np.nanmin(gx), np.nanmax(gx))
|
||||||
|
print("Gültige Werte (gy):", np.sum(~np.isnan(gy)), "von", len(gy))
|
||||||
|
print("Range:", np.nanmin(gy), np.nanmax(gy))
|
||||||
|
|
||||||
|
# Pupille
|
||||||
|
pupil, pupil_validity = extract_pupil(df)
|
||||||
|
|
||||||
|
features = []
|
||||||
|
|
||||||
|
# Sliding windows
|
||||||
|
for start in range(0, len(df), W):
|
||||||
|
end = start + W
|
||||||
|
if end > len(df):
|
||||||
|
break #das letzte Fenster wird ignoriert
|
||||||
|
|
||||||
|
|
||||||
|
w_gaze = gaze[start:end]
|
||||||
|
w_pupil = pupil[start:end]
|
||||||
|
w_valid = pupil_validity[start:end]
|
||||||
|
|
||||||
|
# ----------------------------
|
||||||
|
# FIXATIONS (PyGaze)
|
||||||
|
# ----------------------------
|
||||||
|
time_ms = np.arange(W) * 1000.0 / fs
|
||||||
|
|
||||||
|
# print("gx im Fenster:", w_gaze[:,0][:20])
|
||||||
|
# print("gy im Fenster:", w_gaze[:,1][:20])
|
||||||
|
# print("gx diff:", np.mean(np.abs(np.diff(w_gaze[:,0]))))
|
||||||
|
|
||||||
|
# print("Werte X im Fenster:", w_gaze[:,0])
|
||||||
|
# print("Werte Y im Fenster:", w_gaze[:,1])
|
||||||
|
# print("X-Stats: min/max/diff", np.nanmin(w_gaze[:,0]), np.nanmax(w_gaze[:,0]), np.nanmean(np.abs(np.diff(w_gaze[:,0]))))
|
||||||
|
# print("Y-Stats: min/max/diff", np.nanmin(w_gaze[:,1]), np.nanmax(w_gaze[:,1]), np.nanmean(np.abs(np.diff(w_gaze[:,1]))))
|
||||||
|
print("time_ms:", time_ms)
|
||||||
|
|
||||||
|
fix, efix = fixation_detection(
|
||||||
|
x=w_gaze[:, 0], y=w_gaze[:, 1], time=time_ms,
|
||||||
|
missing=0.0, maxdist=0.003, mindur=10 # mindur=100ms
|
||||||
|
)
|
||||||
|
|
||||||
|
#print("Raw Fixation Output:", efix[0])
|
||||||
|
|
||||||
|
if start == 0:
|
||||||
|
print("DEBUG fix raw:", fix[:10])
|
||||||
|
|
||||||
|
# Robust fixations: PyGaze may return malformed entries
|
||||||
|
fixation_durations = []
|
||||||
|
for f in efix:
|
||||||
|
print("Efix:", f[2])
|
||||||
|
# start_t = f[1] # in ms
|
||||||
|
# end_t = f[2] # in ms
|
||||||
|
# duration = (end_t - start_t) / 1000.0 # in Sekunden
|
||||||
|
|
||||||
|
#duration = f[2] / 1000.0
|
||||||
|
if np.isfinite(f[2]) and f[2] > 0:
|
||||||
|
fixation_durations.append(f[2])
|
||||||
|
|
||||||
|
# Kategorien laut Paper
|
||||||
|
F_short = sum(66 <= d <= 150 for d in fixation_durations)
|
||||||
|
F_medium = sum(300 <= d <= 500 for d in fixation_durations)
|
||||||
|
F_long = sum(d >= 1000 for d in fixation_durations)
|
||||||
|
F_hundred = sum(d > 100 for d in fixation_durations)
|
||||||
|
F_Cancel = sum(66 < d for d in fixation_durations)
|
||||||
|
|
||||||
|
# ----------------------------
|
||||||
|
# SACCADES
|
||||||
|
# ----------------------------
|
||||||
|
sac, esac = saccade_detection(
|
||||||
|
x=w_gaze[:, 0], y=w_gaze[:, 1], time=time_ms, missing=0, minlen=12, maxvel=0.2, maxacc=1
|
||||||
|
)
|
||||||
|
|
||||||
|
sac_durations = [s[2] for s in esac]
|
||||||
|
sac_amplitudes = [((s[5]-s[3])**2 + (s[6]-s[4])**2)**0.5 for s in esac]
|
||||||
|
|
||||||
|
# ----------------------------
|
||||||
|
# BLINKS
|
||||||
|
# ----------------------------
|
||||||
|
blinks = detect_blinks(w_valid)
|
||||||
|
blink_durations = [(b[1] - b[0]) / fs for b in blinks]
|
||||||
|
|
||||||
|
# ----------------------------
|
||||||
|
# PUPIL
|
||||||
|
# ----------------------------
|
||||||
|
if np.all(np.isnan(w_pupil)):
|
||||||
|
mean_pupil = np.nan
|
||||||
|
ipa = np.nan
|
||||||
|
else:
|
||||||
|
mean_pupil = np.nanmean(w_pupil)
|
||||||
|
ipa = compute_IPA(w_pupil, fs=fs)
|
||||||
|
|
||||||
|
# ----------------------------
|
||||||
|
# FEATURE-TABELLE FÜLLEN
|
||||||
|
# ----------------------------
|
||||||
|
features.append({
|
||||||
|
"Fix_count_short_66_150": F_short,
|
||||||
|
"Fix_count_medium_300_500": F_medium,
|
||||||
|
"Fix_count_long_gt_1000": F_long,
|
||||||
|
"Fix_count_100": F_hundred,
|
||||||
|
"Fix_cancel": F_Cancel,
|
||||||
|
"Fix_mean_duration": np.mean(fixation_durations) if fixation_durations else 0,
|
||||||
|
"Fix_median_duration": np.median(fixation_durations) if fixation_durations else 0,
|
||||||
|
|
||||||
|
"Sac_count": len(sac),
|
||||||
|
"Sac_mean_amp": np.mean(sac_amplitudes) if sac_amplitudes else 0,
|
||||||
|
"Sac_mean_dur": np.mean(sac_durations) if sac_durations else 0,
|
||||||
|
"Sac_median_dur": np.median(sac_durations) if sac_durations else 0,
|
||||||
|
|
||||||
|
"Blink_count": len(blinks),
|
||||||
|
"Blink_mean_dur": np.mean(blink_durations) if blink_durations else 0,
|
||||||
|
"Blink_median_dur": np.median(blink_durations) if blink_durations else 0,
|
||||||
|
|
||||||
|
"Pupil_mean": mean_pupil,
|
||||||
|
"Pupil_IPA": ipa
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
result = pd.DataFrame(features)
|
||||||
|
print("→ extract_eye_features(): Fertig! Ergebnisgröße:", result.shape)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
##############################################################################
|
||||||
|
# 3. MAIN FUNKTION
|
||||||
|
##############################################################################
|
||||||
|
|
||||||
|
def main():
|
||||||
|
print("### STARTE FEATURE-EXTRAKTION ###")
|
||||||
|
print("Aktueller Arbeitsordner:", os.getcwd())
|
||||||
|
|
||||||
|
#df = pd.read_hdf("tmp22.h5", "SIGNALS", mode="r")
|
||||||
|
df = pd.read_parquet("cleaned_0001.parquet")
|
||||||
|
print("DataFrame geladen:", df.shape)
|
||||||
|
|
||||||
|
# Nur Eye-Tracking auswählen
|
||||||
|
#eye_cols = [c for c in df.columns if "EYE_" in c]
|
||||||
|
#df_eye = df[eye_cols]
|
||||||
|
|
||||||
|
#print("Eye-Tracking-Spalten:", len(eye_cols))
|
||||||
|
#print("→", eye_cols[:10], " ...")
|
||||||
|
|
||||||
|
print("Reinige Eyetracking-Daten ...")
|
||||||
|
df_eye = clean_eye_df(df)
|
||||||
|
|
||||||
|
# Feature Extraction
|
||||||
|
features = extract_eye_features(df_eye, window_length_sec=50, fs=250)
|
||||||
|
|
||||||
|
print("\n### FEATURE-MATRIX (HEAD) ###")
|
||||||
|
print(features.head())
|
||||||
|
|
||||||
|
print("\nSpeichere Output in features.csv ...")
|
||||||
|
features.to_csv("features4.csv", index=False)
|
||||||
|
|
||||||
|
print("FERTIG!")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -4,6 +4,7 @@ import h5py
|
|||||||
import yaml
|
import yaml
|
||||||
import owncloud
|
import owncloud
|
||||||
import os
|
import os
|
||||||
|
from sklearn.preprocessing import MinMaxScaler
|
||||||
from scipy.signal import welch
|
from scipy.signal import welch
|
||||||
from pygazeanalyser.detectors import fixation_detection, saccade_detection
|
from pygazeanalyser.detectors import fixation_detection, saccade_detection
|
||||||
|
|
||||||
@ -28,6 +29,7 @@ def clean_eye_df(df):
|
|||||||
print("Eyetracking-Zeilen vorher:", len(df))
|
print("Eyetracking-Zeilen vorher:", len(df))
|
||||||
print("Eyetracking-Zeilen nachher:", len(df_eye))
|
print("Eyetracking-Zeilen nachher:", len(df_eye))
|
||||||
|
|
||||||
|
#Index zurücksetzen
|
||||||
return df_eye.reset_index(drop=True)
|
return df_eye.reset_index(drop=True)
|
||||||
|
|
||||||
|
|
||||||
@ -50,7 +52,7 @@ def extract_gaze_signal(df):
|
|||||||
val_L = (df["EYE_LEFT_GAZE_POINT_VALIDITY"] == 1)
|
val_L = (df["EYE_LEFT_GAZE_POINT_VALIDITY"] == 1)
|
||||||
val_R = (df["EYE_RIGHT_GAZE_POINT_VALIDITY"] == 1)
|
val_R = (df["EYE_RIGHT_GAZE_POINT_VALIDITY"] == 1)
|
||||||
|
|
||||||
# inf ersetzen (kommt bei Tobii bei Blinks vor)
|
# Inf ersetzen mit NaN (kommt bei Tobii bei Blinks vor)
|
||||||
gx_L.replace([np.inf, -np.inf], np.nan, inplace=True)
|
gx_L.replace([np.inf, -np.inf], np.nan, inplace=True)
|
||||||
gy_L.replace([np.inf, -np.inf], np.nan, inplace=True)
|
gy_L.replace([np.inf, -np.inf], np.nan, inplace=True)
|
||||||
gx_R.replace([np.inf, -np.inf], np.nan, inplace=True)
|
gx_R.replace([np.inf, -np.inf], np.nan, inplace=True)
|
||||||
@ -63,14 +65,22 @@ def extract_gaze_signal(df):
|
|||||||
gy_R[~val_R] = np.nan
|
gy_R[~val_R] = np.nan
|
||||||
|
|
||||||
# Mittelwert der beiden Augen pro Sample (nanmean ist robust)
|
# Mittelwert der beiden Augen pro Sample (nanmean ist robust)
|
||||||
gx = np.nanmean(np.column_stack([gx_L, gx_R]), axis=1)
|
gx = np.mean(np.column_stack([gx_L, gx_R]), axis=1)
|
||||||
gy = np.nanmean(np.column_stack([gy_L, gy_R]), axis=1)
|
gy = np.mean(np.column_stack([gy_L, gy_R]), axis=1)
|
||||||
|
|
||||||
# Interpolation (wichtig für PyGaze!)
|
# Interpolation (wichtig für PyGaze!)
|
||||||
gx = pd.Series(gx).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
gx = pd.Series(gx).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
||||||
gy = pd.Series(gy).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
gy = pd.Series(gy).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
||||||
|
|
||||||
out = np.column_stack((gx, gy))
|
xscaler = MinMaxScaler()
|
||||||
|
gxscale = xscaler.fit_transform(gx.values.reshape(-1, 1))
|
||||||
|
|
||||||
|
yscaler = MinMaxScaler()
|
||||||
|
gyscale = yscaler.fit_transform(gx.values.reshape(-1, 1))
|
||||||
|
|
||||||
|
print("xmax ymax", gxscale.max(), gyscale.max())
|
||||||
|
|
||||||
|
out = np.column_stack((gxscale, gyscale))
|
||||||
|
|
||||||
print("→ extract_gaze_signal(): Ausgabegröße:", out.shape)
|
print("→ extract_gaze_signal(): Ausgabegröße:", out.shape)
|
||||||
|
|
||||||
@ -95,7 +105,7 @@ def extract_pupil(df):
|
|||||||
validity = ( (vl == 1) | (vr == 1) ).astype(int).to_numpy()
|
validity = ( (vl == 1) | (vr == 1) ).astype(int).to_numpy()
|
||||||
|
|
||||||
# Mittelwert der verfügbaren Pupillen
|
# Mittelwert der verfügbaren Pupillen
|
||||||
p = np.nanmean(np.column_stack([pl, pr]), axis=1)
|
p = np.mean(np.column_stack([pl, pr]), axis=1)
|
||||||
|
|
||||||
# INF/NaN reparieren
|
# INF/NaN reparieren
|
||||||
p = pd.Series(p).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
p = pd.Series(p).interpolate(limit=50, limit_direction="both").bfill().ffill()
|
||||||
@ -138,7 +148,7 @@ def compute_IPA(pupil, fs=250):
|
|||||||
# 2. FEATURE-EXTRAKTION (HAUPTFUNKTION)
|
# 2. FEATURE-EXTRAKTION (HAUPTFUNKTION)
|
||||||
##############################################################################
|
##############################################################################
|
||||||
|
|
||||||
def extract_eye_features(df, window_length_sec=2, fs=250):
|
def extract_eye_features(df, window_length_sec=50, fs=250):
|
||||||
"""
|
"""
|
||||||
df = Tobii DataFrame
|
df = Tobii DataFrame
|
||||||
window_length_sec = Fenstergröße (z.B. W=1s)
|
window_length_sec = Fenstergröße (z.B. W=1s)
|
||||||
@ -147,10 +157,15 @@ def extract_eye_features(df, window_length_sec=2, fs=250):
|
|||||||
print("→ extract_eye_features(): Starte Feature-Berechnung...")
|
print("→ extract_eye_features(): Starte Feature-Berechnung...")
|
||||||
print(" Fensterlänge W =", window_length_sec, "s")
|
print(" Fensterlänge W =", window_length_sec, "s")
|
||||||
|
|
||||||
W = int(window_length_sec * fs)
|
W = int(window_length_sec * fs) # Window größe in Samples
|
||||||
|
|
||||||
# Gaze
|
# Gaze
|
||||||
gaze = extract_gaze_signal(df)
|
gaze = extract_gaze_signal(df)
|
||||||
|
gx, gy = gaze[:, 0], gaze[:, 1]
|
||||||
|
print("Gültige Werte (gx):", np.sum(~np.isnan(gx)), "von", len(gx))
|
||||||
|
print("Range:", np.nanmin(gx), np.nanmax(gx))
|
||||||
|
print("Gültige Werte (gy):", np.sum(~np.isnan(gy)), "von", len(gy))
|
||||||
|
print("Range:", np.nanmin(gy), np.nanmax(gy))
|
||||||
|
|
||||||
# Pupille
|
# Pupille
|
||||||
pupil, pupil_validity = extract_pupil(df)
|
pupil, pupil_validity = extract_pupil(df)
|
||||||
@ -161,8 +176,8 @@ def extract_eye_features(df, window_length_sec=2, fs=250):
|
|||||||
for start in range(0, len(df), W):
|
for start in range(0, len(df), W):
|
||||||
end = start + W
|
end = start + W
|
||||||
if end > len(df):
|
if end > len(df):
|
||||||
break
|
break #das letzte Fenster wird ignoriert
|
||||||
#print(f"→ Fenster {start}:{end} wird verarbeitet...")
|
|
||||||
|
|
||||||
w_gaze = gaze[start:end]
|
w_gaze = gaze[start:end]
|
||||||
w_pupil = pupil[start:end]
|
w_pupil = pupil[start:end]
|
||||||
@ -171,45 +186,56 @@ def extract_eye_features(df, window_length_sec=2, fs=250):
|
|||||||
# ----------------------------
|
# ----------------------------
|
||||||
# FIXATIONS (PyGaze)
|
# FIXATIONS (PyGaze)
|
||||||
# ----------------------------
|
# ----------------------------
|
||||||
fix, _ = fixation_detection(
|
time_ms = np.arange(W) * 1000.0 / fs
|
||||||
x=w_gaze[:, 0], y=w_gaze[:, 1], time=np.arange(W)/fs,
|
|
||||||
missing=np.nan, maxdist=0.02, mindur=0.1 # mindur=100ms
|
# print("gx im Fenster:", w_gaze[:,0][:20])
|
||||||
|
# print("gy im Fenster:", w_gaze[:,1][:20])
|
||||||
|
# print("gx diff:", np.mean(np.abs(np.diff(w_gaze[:,0]))))
|
||||||
|
|
||||||
|
# print("Werte X im Fenster:", w_gaze[:,0])
|
||||||
|
# print("Werte Y im Fenster:", w_gaze[:,1])
|
||||||
|
# print("X-Stats: min/max/diff", np.nanmin(w_gaze[:,0]), np.nanmax(w_gaze[:,0]), np.nanmean(np.abs(np.diff(w_gaze[:,0]))))
|
||||||
|
# print("Y-Stats: min/max/diff", np.nanmin(w_gaze[:,1]), np.nanmax(w_gaze[:,1]), np.nanmean(np.abs(np.diff(w_gaze[:,1]))))
|
||||||
|
print("time_ms:", time_ms)
|
||||||
|
|
||||||
|
fix, efix = fixation_detection(
|
||||||
|
x=w_gaze[:, 0], y=w_gaze[:, 1], time=time_ms,
|
||||||
|
missing=0.0, maxdist=0.001, mindur=65 # mindur=100ms
|
||||||
)
|
)
|
||||||
|
|
||||||
|
#print("Raw Fixation Output:", efix[0])
|
||||||
|
|
||||||
if start == 0:
|
if start == 0:
|
||||||
print("DEBUG fix raw:", fix[:10])
|
print("DEBUG fix raw:", fix[:10])
|
||||||
|
|
||||||
# nur gültige Fixationen
|
|
||||||
fix = [f for f in fix if isinstance(f, (list, tuple)) and len(f) >= 3]
|
|
||||||
|
|
||||||
# Robust fixations: PyGaze may return malformed entries
|
# Robust fixations: PyGaze may return malformed entries
|
||||||
fixation_durations = []
|
fixation_durations = []
|
||||||
for f in fix:
|
for f in efix:
|
||||||
start_t = f[1] # in ms
|
print("Efix:", f[2])
|
||||||
end_t = f[2] # in ms
|
# start_t = f[1] # in ms
|
||||||
duration = (end_t - start_t) / 1000.0 # in Sekunden
|
# end_t = f[2] # in ms
|
||||||
|
# duration = (end_t - start_t) / 1000.0 # in Sekunden
|
||||||
|
|
||||||
#duration = f[2] / 1000.0
|
#duration = f[2] / 1000.0
|
||||||
if np.isfinite(duration) and duration > 0:
|
if np.isfinite(f[2]) and f[2] > 0:
|
||||||
fixation_durations.append(duration)
|
fixation_durations.append(f[2])
|
||||||
|
|
||||||
# Kategorien laut Paper
|
# Kategorien laut Paper
|
||||||
F_short = sum(0.066 <= d <= 0.150 for d in fixation_durations)
|
F_short = sum(66 <= d <= 150 for d in fixation_durations)
|
||||||
F_medium = sum(0.300 <= d <= 0.500 for d in fixation_durations)
|
F_medium = sum(300 <= d <= 500 for d in fixation_durations)
|
||||||
F_long = sum(d >= 1.000 for d in fixation_durations)
|
F_long = sum(d >= 1000 for d in fixation_durations)
|
||||||
|
F_hundred = sum(d > 100 for d in fixation_durations)
|
||||||
|
F_Cancel = sum(66 < d for d in fixation_durations)
|
||||||
|
|
||||||
# ----------------------------
|
# ----------------------------
|
||||||
# SACCADES
|
# SACCADES
|
||||||
# ----------------------------
|
# ----------------------------
|
||||||
sac, _ = saccade_detection(
|
sac, esac = saccade_detection(
|
||||||
x=w_gaze[:, 0], y=w_gaze[:, 1], time=np.arange(W)/fs, missing=np.nan
|
x=w_gaze[:, 0], y=w_gaze[:, 1], time=time_ms, missing=0, minlen=12, maxvel=0.2, maxacc=1
|
||||||
)
|
)
|
||||||
|
|
||||||
# Korrektes Format: [start_index, end_index, duration_seconds, amplitude_deg]
|
sac_durations = [s[2] for s in esac]
|
||||||
sac = [s for s in sac if isinstance(s, (list, tuple)) and len(s) >= 4]
|
sac_amplitudes = [((s[5]-s[3])**2 + (s[6]-s[4])**2)**0.5 for s in esac]
|
||||||
|
|
||||||
sac_durations = [(s[2] - s[1]) for s in sac]
|
|
||||||
sac_amplitudes = [s[3] for s in sac]
|
|
||||||
|
|
||||||
# ----------------------------
|
# ----------------------------
|
||||||
# BLINKS
|
# BLINKS
|
||||||
@ -234,6 +260,8 @@ def extract_eye_features(df, window_length_sec=2, fs=250):
|
|||||||
"Fix_count_short_66_150": F_short,
|
"Fix_count_short_66_150": F_short,
|
||||||
"Fix_count_medium_300_500": F_medium,
|
"Fix_count_medium_300_500": F_medium,
|
||||||
"Fix_count_long_gt_1000": F_long,
|
"Fix_count_long_gt_1000": F_long,
|
||||||
|
"Fix_count_100": F_hundred,
|
||||||
|
"Fix_cancel": F_Cancel,
|
||||||
"Fix_mean_duration": np.mean(fixation_durations) if fixation_durations else 0,
|
"Fix_mean_duration": np.mean(fixation_durations) if fixation_durations else 0,
|
||||||
"Fix_median_duration": np.median(fixation_durations) if fixation_durations else 0,
|
"Fix_median_duration": np.median(fixation_durations) if fixation_durations else 0,
|
||||||
|
|
||||||
@ -265,6 +293,7 @@ def main():
|
|||||||
print("Aktueller Arbeitsordner:", os.getcwd())
|
print("Aktueller Arbeitsordner:", os.getcwd())
|
||||||
|
|
||||||
df = pd.read_hdf("tmp22.h5", "SIGNALS", mode="r")
|
df = pd.read_hdf("tmp22.h5", "SIGNALS", mode="r")
|
||||||
|
#df = pd.read_parquet("cleaned_0001.parquet")
|
||||||
print("DataFrame geladen:", df.shape)
|
print("DataFrame geladen:", df.shape)
|
||||||
|
|
||||||
# Nur Eye-Tracking auswählen
|
# Nur Eye-Tracking auswählen
|
||||||
@ -278,7 +307,7 @@ def main():
|
|||||||
df_eye = clean_eye_df(df)
|
df_eye = clean_eye_df(df)
|
||||||
|
|
||||||
# Feature Extraction
|
# Feature Extraction
|
||||||
features = extract_eye_features(df_eye, window_length_sec=2, fs=250)
|
features = extract_eye_features(df_eye, window_length_sec=50, fs=250)
|
||||||
|
|
||||||
print("\n### FEATURE-MATRIX (HEAD) ###")
|
print("\n### FEATURE-MATRIX (HEAD) ###")
|
||||||
print(features.head())
|
print(features.head())
|
||||||
Loading…
x
Reference in New Issue
Block a user