Michael b8bebc0944 minor fixes in dataset creation
changed paths to paulusja ... directory
changed feature extraction for AUs to mean instead of sum
added v1 of merge script of datasets (needs to be adjusted)
2025-12-18 13:04:11 +01:00

113 lines
3.9 KiB
Python

import pandas as pd
import numpy as np
from pathlib import Path
def process_parquet_files(input_dir, output_file, window_size=1250, step_size=125):
"""
Verarbeitet Parquet-Dateien mit Sliding Window Aggregation.
Parameters:
-----------
input_dir : str
Verzeichnis mit Parquet-Dateien
output_file : str
Pfad für die Ausgabe-Parquet-Datei
window_size : int
Größe des Sliding Windows (default: 3000)
step_size : int
Schrittweite in Einträgen (default: 250 = 10 Sekunden bei 25 Hz)
"""
input_path = Path(input_dir)
parquet_files = sorted(input_path.glob("*.parquet"))
if not parquet_files:
print(f"Keine Parquet-Dateien in {input_dir} gefunden!")
return
print(f"Gefundene Dateien: {len(parquet_files)}")
all_windows = []
for file_idx, parquet_file in enumerate(parquet_files):
print(f"\nVerarbeite Datei {file_idx + 1}/{len(parquet_files)}: {parquet_file.name}")
# Lade Parquet-Datei
df = pd.read_parquet(parquet_file)
print(f" Einträge: {len(df)}")
# Identifiziere AU-Spalten
au_columns = [col for col in df.columns if col.startswith('FACE_AU')]
print(f" AU-Spalten: {len(au_columns)}")
# Gruppiere nach STUDY, LEVEL, PHASE (um Übergänge zu vermeiden)
for (study_val, level_val, phase_val), level_df in df.groupby(['STUDY', 'LEVEL', 'PHASE'], sort=False):
print(f" STUDY {study_val}, LEVEL {level_val}, PHASE {phase_val}: {len(level_df)} Einträge")
# Reset index für korrekte Position-Berechnung
level_df = level_df.reset_index(drop=True)
# Sliding Window über dieses Level
num_windows = (len(level_df) - window_size) // step_size + 1
if num_windows <= 0:
print(f" Zu wenige Einträge für Window (benötigt {window_size})")
continue
for i in range(num_windows):
start_idx = i * step_size
end_idx = start_idx + window_size
window_df = level_df.iloc[start_idx:end_idx]
# Erstelle aggregiertes Ergebnis
result = {
'subjectID': window_df['subjectID'].iloc[0],
'start_time': window_df['rowID'].iloc[0], # rowID als start_time
'STUDY': window_df['STUDY'].iloc[0],
'LEVEL': window_df['LEVEL'].iloc[0],
'PHASE': window_df['PHASE'].iloc[0]
}
# Summiere alle AU-Spalten
for au_col in au_columns:
# result[f'{au_col}_sum'] = window_df[au_col].sum()
result[f'{au_col}_mean'] = window_df[au_col].mean()
all_windows.append(result)
print(f" Windows erstellt: {num_windows}")
# Erstelle finalen DataFrame
result_df = pd.DataFrame(all_windows)
print(f"\n{'='*60}")
print(f"Gesamt Windows erstellt: {len(result_df)}")
print(f"Spalten: {list(result_df.columns)}")
# Speichere Ergebnis
result_df.to_parquet(output_file, index=False)
print(f"\nErgebnis gespeichert in: {output_file}")
return result_df
# Beispiel-Verwendung
if __name__ == "__main__":
# Anpassen an deine Pfade
input_directory = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/new_AU_parquet_files")
output_file = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/new_AU_dataset_mean/AU_dataset_mean.parquet")
result = process_parquet_files(
input_dir=input_directory,
output_file=output_file,
window_size=1250,
step_size=125
)
# Zeige erste Zeilen
if result is not None:
print("\nErste 5 Zeilen des Ergebnisses:")
print(result.head())