### Imports

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from sklearn.preprocessing import StandardScaler, MinMaxScaler

### Config

In [None]:
dataset_path = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/new_datasets/combined_dataset_25hz.parquet")
# dataset_path = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/new_datasets/60s_combined_dataset_25hz.parquet")
# dataset_path = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/new_datasets/120s_combined_dataset_25hz.parquet")

In [None]:
FILTER_MAD = True
THRESHOLD = 3.5
METHOD = 'minmax'
SCOPE = 'subject'
FILTER_SUBSETS = True

### Calculations

In [None]:
df = pd.read_parquet(dataset_path)
df.shape

In [None]:
if(FILTER_SUBSETS):
 # Special filter: Keep only specific subsets
# - k-drive L1 baseline
# - n-back L1 baseline 
# - k-drive test with levels 1, 2, 3

 df = df[
 (
 # k-drive L1 baseline
 ((df['STUDY'] == 'k-drive') & 
 (df['LEVEL'] == 1) & 
 (df['PHASE'] == 'baseline'))
 ) | 
 (
 # n-back L1 baseline
 ((df['STUDY'] == 'n-back') & 
 (df['LEVEL'] == 1) & 
 (df['PHASE'] == 'baseline'))
 ) | 
 (
 # k-drive test with levels 1, 2, 3
 ((df['STUDY'] == 'k-drive') & 
 (df['LEVEL'].isin([1, 2, 3])) & 
 (df['PHASE'] == 'test'))
 )].copy()

print(f"Filtered dataframe shape: {df.shape}")
print(f"Remaining subsets: {df.groupby(['STUDY', 'LEVEL', 'PHASE']).size()}")

In [None]:
face_au_cols = [c for c in df.columns if c.startswith("FACE_AU")]
eye_cols = ['Fix_count_short_66_150', 'Fix_count_medium_300_500',
 'Fix_count_long_gt_1000', 'Fix_count_100', 'Fix_mean_duration',
 'Fix_median_duration', 'Sac_count', 'Sac_mean_amp', 'Sac_mean_dur',
 'Sac_median_dur', 'Blink_count', 'Blink_mean_dur', 'Blink_median_dur',
 'Pupil_mean', 'Pupil_IPA']
eye_cols_without_blink = ['Fix_count_short_66_150', 'Fix_count_medium_300_500',
 'Fix_count_long_gt_1000', 'Fix_count_100', 'Fix_mean_duration',
 'Fix_median_duration', 'Sac_count', 'Sac_mean_amp', 'Sac_mean_dur',
 'Sac_median_dur', 'Pupil_mean', 'Pupil_IPA']
print(len(eye_cols))
all_signal_columns = eye_cols+face_au_cols
print(len(all_signal_columns))

MAD

In [None]:
def calculate_mad_params(df, columns):
 """
 Calculate median and MAD parameters for each column.
 This should be run ONLY on the training data.
 
 Returns a dictionary: {col: (median, mad)}
 """
 params = {}
 for col in columns:
 median = df[col].median()
 mad = np.median(np.abs(df[col] - median))
 params[col] = (median, mad)
 return params
def apply_mad_filter(df, params, threshold=3.5):
 """
 Apply MAD-based outlier removal using precomputed parameters.
 Works on training, validation, and test data.
 
 df: DataFrame to filter
 params: dictionary {col: (median, mad)} from training data
 threshold: cutoff for robust Z-score
 """
 df_clean = df.copy()

 for col, (median, mad) in params.items():
 if mad == 0:
 continue # no spread; nothing to remove for this column

 robust_z = 0.6745 * (df_clean[col] - median) / mad
 outlier_mask = np.abs(robust_z) > threshold

 # Remove values only in this specific column
 df_clean.loc[outlier_mask, col] = median
 
 
 print(df_clean.shape)
 return df_clean

In [None]:
if(FILTER_MAD):
 mad_params = calculate_mad_params(df, all_signal_columns)
 df = apply_mad_filter(df, mad_params, THRESHOLD)

Normalizer

In [None]:
def fit_normalizer(train_data, au_columns, method='standard', scope='global'):
 """
 Fit normalization scalers on training data.
 
 Parameters:
 -----------
 train_data : pd.DataFrame
 Training dataframe with AU columns and subjectID
 au_columns : list
 List of AU column names to normalize
 method : str, default='standard'
 Normalization method: 'standard' for StandardScaler or 'minmax' for MinMaxScaler
 scope : str, default='global'
 Normalization scope: 'subject' for per-subject or 'global' for across all subjects
 
 Returns:
 --------
 dict
 Dictionary containing fitted scalers and statistics for new subjects
 """
 if method == 'standard':
 Scaler = StandardScaler
 elif method == 'minmax':
 Scaler = MinMaxScaler
 else:
 raise ValueError("method must be 'standard' or 'minmax'")
 
 scalers = {}
 if scope == 'subject':
 # Fit one scaler per subject
 subject_stats = []
 
 for subject in train_data['subjectID'].unique():
 subject_mask = train_data['subjectID'] == subject
 scaler = Scaler()
 scaler.fit(train_data.loc[subject_mask, au_columns].values)
 scalers[subject] = scaler
 
 # Store statistics for averaging
 if method == 'standard':
 subject_stats.append({
 'mean': scaler.mean_,
 'std': scaler.scale_
 })
 elif method == 'minmax':
 subject_stats.append({
 'min': scaler.data_min_,
 'max': scaler.data_max_
 })
 
 # Calculate average statistics for new subjects
 if method == 'standard':
 avg_mean = np.mean([s['mean'] for s in subject_stats], axis=0)
 avg_std = np.mean([s['std'] for s in subject_stats], axis=0)
 fallback_scaler = StandardScaler()
 fallback_scaler.mean_ = avg_mean
 fallback_scaler.scale_ = avg_std
 fallback_scaler.var_ = avg_std ** 2
 fallback_scaler.n_features_in_ = len(au_columns)
 elif method == 'minmax':
 avg_min = np.mean([s['min'] for s in subject_stats], axis=0)
 avg_max = np.mean([s['max'] for s in subject_stats], axis=0)
 fallback_scaler = MinMaxScaler()
 fallback_scaler.data_min_ = avg_min
 fallback_scaler.data_max_ = avg_max
 fallback_scaler.data_range_ = avg_max - avg_min
 fallback_scaler.scale_ = 1.0 / fallback_scaler.data_range_
 fallback_scaler.min_ = -avg_min * fallback_scaler.scale_
 fallback_scaler.n_features_in_ = len(au_columns)
 
 scalers['_fallback'] = fallback_scaler
 
 elif scope == 'global':
 # Fit one scaler for all subjects
 scaler = Scaler()
 scaler.fit(train_data[au_columns].values)
 scalers['global'] = scaler
 
 else:
 raise ValueError("scope must be 'subject' or 'global'")
 
 return {'scalers': scalers, 'method': method, 'scope': scope}

In [None]:
def apply_normalizer(data, columns, normalizer_dict):
 """
 Apply fitted normalization scalers to data.
 
 Parameters:
 -----------
 data : pd.DataFrame
 Dataframe with AU columns and subjectID
 au_columns : list
 List of AU column names to normalize
 normalizer_dict : dict
 Dictionary containing fitted scalers from fit_normalizer()
 
 Returns:
 --------
 pd.DataFrame
 DataFrame with normalized AU columns
 """
 normalized_data = data.copy()
 scalers = normalizer_dict['scalers']
 scope = normalizer_dict['scope']
 normalized_data[columns] = normalized_data[columns].astype(np.float64)

 if scope == 'subject':
 # Apply per-subject normalization
 for subject in data['subjectID'].unique():
 subject_mask = data['subjectID'] == subject
 
 # Use the subject's scaler if available, otherwise use fallback
 if subject in scalers:
 scaler = scalers[subject]
 else:
 # Use averaged scaler for new subjects
 scaler = scalers['_fallback']
 print(f"Info: Subject {subject} not in training data. Using averaged scaler from training subjects.")
 
 normalized_data.loc[subject_mask, columns] = scaler.transform(
 data.loc[subject_mask, columns].values
 )
 
 elif scope == 'global':
 # Apply global normalization
 scaler = scalers['global']
 normalized_data[columns] = scaler.transform(data[columns].values)
 
 return normalized_data

In [None]:
scaler = fit_normalizer(df, all_signal_columns, method=METHOD, scope=SCOPE)
df_min_max_normalised = apply_normalizer(df, all_signal_columns, scaler)

In [None]:
a= df_min_max_normalised[['STUDY','LEVEL','PHASE']]
print(a.dtypes)

In [None]:
# Define signal columns (adjust only once)
signal_columns = all_signal_columns

# Get all unique combinations of STUDY, LEVEL and PHASE
unique_combinations = df_min_max_normalised[['STUDY', 'LEVEL', 'PHASE']].drop_duplicates().reset_index(drop=True)

# Dictionary to store subsets
subsets = {}
subset_sizes = {}

for idx, row in unique_combinations.iterrows():
 study = row['STUDY']
 level = row['LEVEL']
 phase = row['PHASE']
 key = f"{study}_L{level}_P{phase}"
 subset = df_min_max_normalised[
 (df_min_max_normalised['STUDY'] == study) & 
 (df_min_max_normalised['LEVEL'] == level) & 
 (df_min_max_normalised['PHASE'] == phase)
 ]
 subsets[key] = subset
 subset_sizes[key] = len(subset)

# Output subset sizes
print("Number of samples per subset:")
print("=" * 40)
for key, size in subset_sizes.items():
 print(f"{key}: {size} samples")
print("=" * 40)
print(f"Total number of subsets: {len(subsets)}")

In [None]:
import numpy as np

# Function to categorize subsets
def categorize_subset(key):
 """Categorizes a subset as 'low' or 'high' based on the given logic"""
 parts = key.split('_')
 study = parts[0]
 level = int(parts[1][1:]) # 'L1' -> 1
 phase = parts[2][1:] # 'Pbaseline' -> 'baseline'
 
 # LOW: baseline OR (n-back with level 1 or 4)
 if phase == "baseline":
 return 'low'
 elif study == "n-back" and level in [1, 4]:
 return 'low'
 
 # HIGH: (n-back with level 2,3,5,6 and phase train/test) OR (k-drive not baseline)
 elif study == "n-back" and level in [2, 3, 5, 6] and phase in ["train", "test"]:
 return 'high'
 elif study == "k-drive" and phase != "baseline":
 return 'high'
 
 return None

# Categorize subsets
low_subsets = {}
high_subsets = {}

for key, subset in subsets.items():
 category = categorize_subset(key)
 if category == 'low':
 low_subsets[key] = subset
 elif category == 'high':
 high_subsets[key] = subset

# Output statistics
print("\n" + "=" * 50)
print("SUBSET CATEGORIZATION")
print("=" * 50)

print("\nLOW subsets (Blue):")
print("-" * 50)
low_total = 0
for key in sorted(low_subsets.keys()):
 size = subset_sizes[key]
 low_total += size
 print(f" {key}: {size} samples")
print(f"{'TOTAL LOW:':<30} {low_total} samples")
print(f"{'NUMBER OF LOW SUBSETS:':<30} {len(low_subsets)}")

print("\nHIGH subsets (Red):")
print("-" * 50)
high_total = 0
for key in sorted(high_subsets.keys()):
 size = subset_sizes[key]
 high_total += size
 print(f" {key}: {size} samples")
print(f"{'TOTAL HIGH:':<30} {high_total} samples")
print(f"{'NUMBER OF HIGH SUBSETS:':<30} {len(high_subsets)}")

print("\n" + "=" * 50)
print(f"TOTAL SAMPLES: {low_total + high_total}")
print(f"TOTAL SUBSETS: {len(low_subsets) + len(high_subsets)}")
print("=" * 50)

# Find minimum subset size
min_subset_size = min(subset_sizes.values())
print(f"\nMinimum subset size: {min_subset_size}")

# Number of points to plot per subset (50% of minimum size)
sampling_factor = 1
n_samples_per_subset = int(sampling_factor * min_subset_size)
print(f"Number of randomly drawn points per subset: {n_samples_per_subset}")

### Plot

In [None]:
# Create comparison plots
fig, axes = plt.subplots(len(signal_columns), 1, figsize=(14, 4 * len(signal_columns)))

# If only one signal column exists, convert axes to list
if len(signal_columns) == 1:
 axes = [axes]

# Create a plot for each signal column
for i, signal_col in enumerate(signal_columns):
 ax = axes[i]
 
 y_pos = 0
 labels = []
 
 # First plot all LOW subsets (sorted, blue)
 for label in sorted(low_subsets.keys()):
 subset = low_subsets[label]
 if len(subset) > 0 and signal_col in subset.columns:
 # Draw random sample
 n_samples = min(n_samples_per_subset, len(subset))
 sampled_data = subset[signal_col].sample(n=n_samples, random_state=42)
 
 # Calculate mean and median
 mean_val = subset[signal_col].mean()
 median_val = subset[signal_col].median()
 
 # Plot points in blue
 ax.scatter(sampled_data, [y_pos] * len(sampled_data), 
 alpha=0.5, s=30, color='blue')
 
 # Mean as black cross
 ax.plot(mean_val, y_pos, 'x', markersize=12, markeredgewidth=3, 
 color='black', zorder=5)
 
 # Median as brown cross
 ax.plot(median_val, y_pos, 'x', markersize=12, markeredgewidth=3, 
 color='brown', zorder=5)
 
 labels.append(f"{label} (n={subset_sizes[label]})")
 y_pos += 1
 
 # Separation line between LOW and HIGH
 if len(low_subsets) > 0 and len(high_subsets) > 0:
 ax.axhline(y=y_pos - 0.5, color='gray', linestyle='--', linewidth=2, alpha=0.7)
 
 # Then plot all HIGH subsets (sorted, red)
 for label in sorted(high_subsets.keys()):
 subset = high_subsets[label]
 if len(subset) > 0 and signal_col in subset.columns:
 # Draw random sample
 n_samples = min(n_samples_per_subset, len(subset))
 sampled_data = subset[signal_col].sample(n=n_samples, random_state=42)
 
 # Calculate mean and median
 mean_val = subset[signal_col].mean()
 median_val = subset[signal_col].median()
 
 # Plot points in red
 ax.scatter(sampled_data, [y_pos] * len(sampled_data), 
 alpha=0.5, s=30, color='red')
 
 # Mean as black cross
 ax.plot(mean_val, y_pos, 'x', markersize=12, markeredgewidth=3, 
 color='black', zorder=5)
 
 # Median as brown cross
 ax.plot(median_val, y_pos, 'x', markersize=12, markeredgewidth=3, 
 color='brown', zorder=5)
 
 labels.append(f"{label} (n={subset_sizes[label]})")
 y_pos += 1
 
 ax.set_yticks(range(len(labels)))
 ax.set_yticklabels(labels)
 ax.set_xlabel(f'{signal_col} value')
 ax.set_title(f'{signal_col}: LOW (Blue) vs HIGH (Red) | {n_samples_per_subset} points/subset | Black X = Mean, Brown X = Median')
 ax.grid(True, alpha=0.3, axis='x')
 ax.axvline(0, color='gray', linestyle='--', alpha=0.5)

plt.tight_layout()
plt.show()

print(f"\nNote: {n_samples_per_subset} random points were plotted per subset.")
print("Blue points = LOW subsets | Red points = HIGH subsets")
print("Black 'X' = Mean of entire subset | Brown 'X' = Median of entire subset")
print(f"Total subsets plotted: {len(low_subsets)} LOW + {len(high_subsets)} HIGH = {len(low_subsets) + len(high_subsets)} subsets")