import pandas as pd import numpy as np from pathlib import Path def process_parquet_files(input_dir, output_file, window_size=1250, step_size=125): """ Verarbeitet Parquet-Dateien mit Sliding Window Aggregation. Parameters: ----------- input_dir : str Verzeichnis mit Parquet-Dateien output_file : str Pfad für die Ausgabe-Parquet-Datei window_size : int Größe des Sliding Windows (default: 3000) step_size : int Schrittweite in Einträgen (default: 250 = 10 Sekunden bei 25 Hz) """ input_path = Path(input_dir) parquet_files = sorted(input_path.glob("*.parquet")) if not parquet_files: print(f"Keine Parquet-Dateien in {input_dir} gefunden!") return print(f"Gefundene Dateien: {len(parquet_files)}") all_windows = [] for file_idx, parquet_file in enumerate(parquet_files): print(f"\nVerarbeite Datei {file_idx + 1}/{len(parquet_files)}: {parquet_file.name}") # Lade Parquet-Datei df = pd.read_parquet(parquet_file) print(f" Einträge: {len(df)}") # Identifiziere AU-Spalten au_columns = [col for col in df.columns if col.startswith('FACE_AU')] print(f" AU-Spalten: {len(au_columns)}") # Gruppiere nach STUDY, LEVEL, PHASE (um Übergänge zu vermeiden) for (study_val, level_val, phase_val), level_df in df.groupby(['STUDY', 'LEVEL', 'PHASE'], sort=False): print(f" STUDY {study_val}, LEVEL {level_val}, PHASE {phase_val}: {len(level_df)} Einträge") # Reset index für korrekte Position-Berechnung level_df = level_df.reset_index(drop=True) # Sliding Window über dieses Level num_windows = (len(level_df) - window_size) // step_size + 1 if num_windows <= 0: print(f" Zu wenige Einträge für Window (benötigt {window_size})") continue for i in range(num_windows): start_idx = i * step_size end_idx = start_idx + window_size window_df = level_df.iloc[start_idx:end_idx] # Erstelle aggregiertes Ergebnis result = { 'subjectID': window_df['subjectID'].iloc[0], 'start_time': window_df['rowID'].iloc[0], # rowID als start_time 'STUDY': window_df['STUDY'].iloc[0], 'LEVEL': window_df['LEVEL'].iloc[0], 'PHASE': window_df['PHASE'].iloc[0] } # Summiere alle AU-Spalten for au_col in au_columns: result[f'{au_col}_sum'] = window_df[au_col].sum() all_windows.append(result) print(f" Windows erstellt: {num_windows}") # Erstelle finalen DataFrame result_df = pd.DataFrame(all_windows) print(f"\n{'='*60}") print(f"Gesamt Windows erstellt: {len(result_df)}") print(f"Spalten: {list(result_df.columns)}") # Speichere Ergebnis result_df.to_parquet(output_file, index=False) print(f"\nErgebnis gespeichert in: {output_file}") return result_df # Beispiel-Verwendung if __name__ == "__main__": # Anpassen an deine Pfade input_directory = r"C:\Users\x\FAUbox\WS2526_Fahrsimulator_MSY (Celina Korzer)\new_AU_parquet_files" output_file = r"C:\Users\x\FAUbox\WS2526_Fahrsimulator_MSY (Celina Korzer)\new_AU_dataset\AU_dataset.parquet" result = process_parquet_files( input_dir=input_directory, output_file=output_file, window_size=1250, step_size=125 ) # Zeige erste Zeilen if result is not None: print("\nErste 5 Zeilen des Ergebnisses:") print(result.head())