combined dataset creation pipeline

CPFC_both.py: creates parquet files chunkwise that include study level phase subjectID, FACE_AU and Eye tracking columns
combined_feature_creation.py : creates a single dataset by computing the features in parallel for the same time window --> same sample rate required
This commit is contained in:
Michael Weig 2025-12-18 15:25:18 +01:00
parent b8bebc0944
commit 7708ac1b29
3 changed files with 478 additions and 0 deletions

View File

@ -0,0 +1,97 @@
import os
import pandas as pd
from pathlib import Path
data_dir = Path("/home/jovyan/Fahrsimulator_MSY2526_AI/EDA")
# Get all .h5 files and sort them
matching_files = sorted(data_dir.glob("*.h5"))
# Chunk size for reading (adjust based on your RAM - 100k rows is ~50-100MB depending on columns)
CHUNK_SIZE = 50_000
for i, file_path in enumerate(matching_files):
print(f"Subject {i} gestartet")
print(f"{file_path} geoeffnet")
# Step 1: Get total number of rows and column names
with pd.HDFStore(file_path, mode="r") as store:
cols = store.select("SIGNALS", start=0, stop=1).columns
nrows = store.get_storer("SIGNALS").nrows
print(f"Total columns: {len(cols)}, Total rows: {nrows}")
# Step 2: Filter columns that start with "FACE_AU"
# Find columns starting with each prefix
face_au_cols = [c for c in cols if c.startswith("FACE_AU")]
eye_cols = [c for c in cols if c.startswith("EYE_")]
# Check that both have at least one column
if face_au_cols and eye_cols:
print(f"FACE_AU columns found: {face_au_cols}")
print(f"EYE_ columns found: {eye_cols}")
else:
missing = []
if not face_au_cols:
missing.append("FACE_AU")
if not eye_cols:
missing.append("EYE_")
print(f"Missing columns for: {', '.join(missing)}")
continue
# Columns to read
columns_to_read = ["STUDY", "LEVEL", "PHASE"] + eye_cols + face_au_cols
# Step 3: Process file in chunks
chunks_to_save = []
for start_row in range(0, nrows, CHUNK_SIZE):
stop_row = min(start_row + CHUNK_SIZE, nrows)
print(f"Processing rows {start_row} to {stop_row} ({stop_row/nrows*100:.1f}%)")
# Read chunk
df_chunk = pd.read_hdf(
file_path,
key="SIGNALS",
columns=columns_to_read,
start=start_row,
stop=stop_row
)
# Add metadata columns
df_chunk["subjectID"] = i
df_chunk["rowID"] = range(start_row, stop_row)
# Clean data
df_chunk = df_chunk[df_chunk["LEVEL"] != 0]
df_chunk = df_chunk.dropna()
# problematisch, weil die eye tracking auflösung kaputt geht
# Only keep non-empty chunks
if len(df_chunk) > 0:
chunks_to_save.append(df_chunk)
# Free memory
del df_chunk
print("load and cleaning done")
# Step 4: Combine all chunks and save
if chunks_to_save:
df_final = pd.concat(chunks_to_save, ignore_index=True)
print(f"Final dataframe shape: {df_final.shape}")
# Save to parquet
base_dir = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/both_mod_parquet_files")
os.makedirs(base_dir, exist_ok=True)
out_name = base_dir / f"both_mod_{i:04d}.parquet"
df_final.to_parquet(out_name, index=False)
print(f"Saved to {out_name}")
# Free memory
del df_final
del chunks_to_save
else:
print(f"No valid data found for Subject {i}")
print("All files processed!")

View File

@ -0,0 +1,381 @@
## gemeinsame Verarbeitung
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.preprocessing import MinMaxScaler
from scipy.signal import welch
from pygazeanalyser.detectors import fixation_detection, saccade_detection
##############################################################################
# KONFIGURATION
##############################################################################
INPUT_DIR = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/both_mod_parquet_files")
OUTPUT_FILE = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/new_datasets/combined_dataset_25hz.parquet")
WINDOW_SIZE_SAMPLES = 1250 # 50s bei 25Hz
STEP_SIZE_SAMPLES = 125 # 5s bei 25Hz
SAMPLING_RATE = 25 # Hz
##############################################################################
# EYE-TRACKING FUNKTIONEN
##############################################################################
def clean_eye_df(df):
"""Extrahiert nur Eye-Tracking Spalten und entfernt leere Zeilen."""
eye_cols = [c for c in df.columns if c.startswith("EYE_")]
if not eye_cols:
return pd.DataFrame()
df_eye = df[eye_cols].copy()
df_eye = df_eye.replace([np.inf, -np.inf], np.nan)
df_eye = df_eye.dropna(subset=eye_cols, how="all")
return df_eye.reset_index(drop=True)
def extract_gaze_signal(df):
"""Extrahiert 2D-Gaze-Positionen, maskiert ungültige Samples und interpoliert."""
gx_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
gy_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
gx_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
gy_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
val_L = (df["EYE_LEFT_GAZE_POINT_VALIDITY"] == 1)
val_R = (df["EYE_RIGHT_GAZE_POINT_VALIDITY"] == 1)
# Inf → NaN
for arr in [gx_L, gy_L, gx_R, gy_R]:
arr.replace([np.inf, -np.inf], np.nan, inplace=True)
# Ungültige maskieren
gx_L[~val_L] = np.nan
gy_L[~val_L] = np.nan
gx_R[~val_R] = np.nan
gy_R[~val_R] = np.nan
# Mittelwert beider Augen
gx = np.mean(np.column_stack([gx_L, gx_R]), axis=1)
gy = np.mean(np.column_stack([gy_L, gy_R]), axis=1)
# Interpolation
gx = pd.Series(gx).interpolate(limit=50, limit_direction="both").bfill().ffill()
gy = pd.Series(gy).interpolate(limit=50, limit_direction="both").bfill().ffill()
# MinMax Skalierung
xscaler = MinMaxScaler()
gxscale = xscaler.fit_transform(gx.values.reshape(-1, 1))
yscaler = MinMaxScaler()
gyscale = yscaler.fit_transform(gy.values.reshape(-1, 1))
return np.column_stack((gxscale, gyscale))
def extract_pupil(df):
"""Extrahiert Pupillengröße (beide Augen gemittelt)."""
pl = df["EYE_LEFT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
pr = df["EYE_RIGHT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
vl = df.get("EYE_LEFT_PUPIL_VALIDITY")
vr = df.get("EYE_RIGHT_PUPIL_VALIDITY")
if vl is None or vr is None:
validity = (~pl.isna() | ~pr.isna()).astype(int).to_numpy()
else:
validity = ((vl == 1) | (vr == 1)).astype(int).to_numpy()
p = np.mean(np.column_stack([pl, pr]), axis=1)
p = pd.Series(p).interpolate(limit=50, limit_direction="both").bfill().ffill()
return p.to_numpy(), validity
def detect_blinks(pupil_validity, min_duration=5):
"""Erkennt Blinks: Validity=0 → Blink."""
blinks = []
start = None
for i, v in enumerate(pupil_validity):
if v == 0 and start is None:
start = i
elif v == 1 and start is not None:
if i - start >= min_duration:
blinks.append([start, i])
start = None
return blinks
def compute_IPA(pupil, fs=25):
"""Index of Pupillary Activity (Duchowski 2018)."""
f, Pxx = welch(pupil, fs=fs, nperseg=int(fs*2))
hf_band = (f >= 0.6) & (f <= 2.0)
return np.sum(Pxx[hf_band])
def extract_eye_features_window(df_eye_window, fs=25):
"""
Extrahiert Eye-Tracking Features für ein einzelnes Window.
Gibt Dictionary mit allen Eye-Features zurück.
"""
# Gaze
gaze = extract_gaze_signal(df_eye_window)
# Pupille
pupil, pupil_validity = extract_pupil(df_eye_window)
window_size = len(df_eye_window)
# ----------------------------
# FIXATIONS
# ----------------------------
time_ms = np.arange(window_size) * 1000.0 / fs
fix, efix = fixation_detection(
x=gaze[:, 0], y=gaze[:, 1], time=time_ms,
missing=0.0, maxdist=0.003, mindur=10
)
fixation_durations = [f[2] for f in efix if np.isfinite(f[2]) and f[2] > 0]
# Kategorien
F_short = sum(66 <= d <= 150 for d in fixation_durations)
F_medium = sum(300 <= d <= 500 for d in fixation_durations)
F_long = sum(d >= 1000 for d in fixation_durations)
F_hundred = sum(d > 100 for d in fixation_durations)
# ----------------------------
# SACCADES
# ----------------------------
sac, esac = saccade_detection(
x=gaze[:, 0], y=gaze[:, 1], time=time_ms,
missing=0, minlen=12, maxvel=0.2, maxacc=1
)
sac_durations = [s[2] for s in esac]
sac_amplitudes = [((s[5]-s[3])**2 + (s[6]-s[4])**2)**0.5 for s in esac]
# ----------------------------
# BLINKS
# ----------------------------
blinks = detect_blinks(pupil_validity)
blink_durations = [(b[1] - b[0]) / fs for b in blinks]
# ----------------------------
# PUPIL
# ----------------------------
if np.all(np.isnan(pupil)):
mean_pupil = np.nan
ipa = np.nan
else:
mean_pupil = np.nanmean(pupil)
ipa = compute_IPA(pupil, fs=fs)
# Feature Dictionary
return {
"Fix_count_short_66_150": F_short,
"Fix_count_medium_300_500": F_medium,
"Fix_count_long_gt_1000": F_long,
"Fix_count_100": F_hundred,
"Fix_mean_duration": np.mean(fixation_durations) if fixation_durations else 0,
"Fix_median_duration": np.median(fixation_durations) if fixation_durations else 0,
"Sac_count": len(sac),
"Sac_mean_amp": np.mean(sac_amplitudes) if sac_amplitudes else 0,
"Sac_mean_dur": np.mean(sac_durations) if sac_durations else 0,
"Sac_median_dur": np.median(sac_durations) if sac_durations else 0,
"Blink_count": len(blinks),
"Blink_mean_dur": np.mean(blink_durations) if blink_durations else 0,
"Blink_median_dur": np.median(blink_durations) if blink_durations else 0,
"Pupil_mean": mean_pupil,
"Pupil_IPA": ipa
}
##############################################################################
# KOMBINIERTE FEATURE-EXTRAKTION
##############################################################################
def process_combined_features(input_dir, output_file, window_size, step_size, fs=25):
"""
Verarbeitet Parquet-Dateien mit FACE_AU und EYE Spalten.
Extrahiert beide Feature-Sets und kombiniert sie.
"""
input_path = Path(input_dir)
parquet_files = sorted(input_path.glob("*.parquet"))
if not parquet_files:
print(f"FEHLER: Keine Parquet-Dateien in {input_dir} gefunden!")
return None
print(f"\n{'='*70}")
print(f"KOMBINIERTE FEATURE-EXTRAKTION")
print(f"{'='*70}")
print(f"Dateien: {len(parquet_files)}")
print(f"Window: {window_size} Samples ({window_size/fs:.1f}s bei {fs}Hz)")
print(f"Step: {step_size} Samples ({step_size/fs:.1f}s bei {fs}Hz)")
print(f"{'='*70}\n")
all_windows = []
for file_idx, parquet_file in enumerate(parquet_files, 1):
print(f"\n[{file_idx}/{len(parquet_files)}] {parquet_file.name}")
try:
df = pd.read_parquet(parquet_file)
print(f" Einträge: {len(df)}")
# Identifiziere Spalten
au_columns = [col for col in df.columns if col.startswith('FACE_AU')]
eye_columns = [col for col in df.columns if col.startswith('EYE_')]
print(f" AU-Spalten: {len(au_columns)}")
print(f" Eye-Spalten: {len(eye_columns)}")
has_au = len(au_columns) > 0
has_eye = len(eye_columns) > 0
if not has_au and not has_eye:
print(f" WARNUNG: Keine AU oder Eye Spalten gefunden!")
continue
# Gruppiere nach STUDY, LEVEL, PHASE
group_cols = [col for col in ['STUDY', 'LEVEL', 'PHASE'] if col in df.columns]
if group_cols:
groups = df.groupby(group_cols, sort=False)
else:
groups = [(None, df)]
for group_vals, group_df in groups:
if group_cols:
print(f" Gruppe {group_vals}: {len(group_df)} Samples")
group_df = group_df.reset_index(drop=True)
# Berechne Anzahl Windows
num_windows = (len(group_df) - window_size) // step_size + 1
if num_windows <= 0:
print(f" Zu wenige Einträge für Window (benötigt {window_size})")
continue
# Sliding Windows
for i in range(num_windows):
start_idx = i * step_size
end_idx = start_idx + window_size
window_df = group_df.iloc[start_idx:end_idx]
# Basis-Metadaten
result = {
'subjectID': window_df['subjectID'].iloc[0],
'start_time': window_df['rowID'].iloc[0],
'STUDY': window_df['STUDY'].iloc[0] if 'STUDY' in window_df.columns else np.nan,
'LEVEL': window_df['LEVEL'].iloc[0] if 'LEVEL' in window_df.columns else np.nan,
'PHASE': window_df['PHASE'].iloc[0] if 'PHASE' in window_df.columns else np.nan
}
# FACE AU Features
if has_au:
for au_col in au_columns:
result[f'{au_col}_mean'] = window_df[au_col].mean()
# Eye-Tracking Features
if has_eye:
try:
eye_features = extract_eye_features_window(window_df[eye_columns], fs=fs)
result.update(eye_features)
except Exception as e:
print(f" WARNUNG: Eye-Features fehlgeschlagen: {str(e)}")
# Füge NaN-Werte für Eye-Features hinzu
result.update({
"Fix_count_short_66_150": np.nan,
"Fix_count_medium_300_500": np.nan,
"Fix_count_long_gt_1000": np.nan,
"Fix_count_100": np.nan,
"Fix_mean_duration": np.nan,
"Fix_median_duration": np.nan,
"Sac_count": np.nan,
"Sac_mean_amp": np.nan,
"Sac_mean_dur": np.nan,
"Sac_median_dur": np.nan,
"Blink_count": np.nan,
"Blink_mean_dur": np.nan,
"Blink_median_dur": np.nan,
"Pupil_mean": np.nan,
"Pupil_IPA": np.nan
})
all_windows.append(result)
print(f"{num_windows} Windows erstellt")
except Exception as e:
print(f" FEHLER: {str(e)}")
import traceback
traceback.print_exc()
continue
# Kombiniere alle Windows
if not all_windows:
print("\nKEINE FEATURES EXTRAHIERT!")
return None
print(f"\n{'='*70}")
print(f"ZUSAMMENFASSUNG")
print(f"{'='*70}")
result_df = pd.DataFrame(all_windows)
print(f"Gesamt Windows: {len(result_df)}")
print(f"Spalten: {len(result_df.columns)}")
print(f"Subjects: {result_df['subjectID'].nunique()}")
# Speichern
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
result_df.to_parquet(output_file, index=False)
print(f"\n✓ Gespeichert: {output_file}")
print(f"{'='*70}\n")
return result_df
##############################################################################
# MAIN
##############################################################################
def main():
print("\n" + "="*70)
print("KOMBINIERTE FEATURE-EXTRAKTION (AU + EYE)")
print("="*70)
result = process_combined_features(
input_dir=INPUT_DIR,
output_file=OUTPUT_FILE,
window_size=WINDOW_SIZE_SAMPLES,
step_size=STEP_SIZE_SAMPLES,
fs=SAMPLING_RATE
)
if result is not None:
print("\nErste 5 Zeilen:")
print(result.head())
print("\nSpalten-Übersicht:")
print(result.columns.tolist())
print("\nDatentypen:")
print(result.dtypes)
print("\nStatistik:")
print(result.describe())
print("\n✓ FERTIG!\n")
if __name__ == "__main__":
main()