From 4ac4a8636be92a1d93099d53d25061b271e90e4e Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 30 Oct 2025 15:26:05 +0100 Subject: [PATCH] step 2: feature table creation script --- create_feature_table.py | 111 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 create_feature_table.py diff --git a/create_feature_table.py b/create_feature_table.py new file mode 100644 index 0000000..53baa33 --- /dev/null +++ b/create_feature_table.py @@ -0,0 +1,111 @@ +import pandas as pd +import numpy as np +from pathlib import Path + +def process_parquet_files(input_dir, output_file, window_size=1250, step_size=125): + """ + Verarbeitet Parquet-Dateien mit Sliding Window Aggregation. + + Parameters: + ----------- + input_dir : str + Verzeichnis mit Parquet-Dateien + output_file : str + Pfad für die Ausgabe-Parquet-Datei + window_size : int + Größe des Sliding Windows (default: 3000) + step_size : int + Schrittweite in Einträgen (default: 250 = 10 Sekunden bei 25 Hz) + """ + + input_path = Path(input_dir) + parquet_files = sorted(input_path.glob("*.parquet")) + + if not parquet_files: + print(f"Keine Parquet-Dateien in {input_dir} gefunden!") + return + + print(f"Gefundene Dateien: {len(parquet_files)}") + + all_windows = [] + + for file_idx, parquet_file in enumerate(parquet_files): + print(f"\nVerarbeite Datei {file_idx + 1}/{len(parquet_files)}: {parquet_file.name}") + + # Lade Parquet-Datei + df = pd.read_parquet(parquet_file) + print(f" Einträge: {len(df)}") + + # Identifiziere AU-Spalten + au_columns = [col for col in df.columns if col.startswith('AU')] + print(f" AU-Spalten: {len(au_columns)}") + + # Gruppiere nach STUDY, LEVEL, PHASE (um Übergänge zu vermeiden) + for (study_val, level_val, phase_val), level_df in df.groupby(['STUDY', 'LEVEL', 'PHASE'], sort=False): + print(f" STUDY {study_val}, LEVEL {level_val}, PHASE {phase_val}: {len(level_df)} Einträge") + + # Reset index für korrekte Position-Berechnung + level_df = level_df.reset_index(drop=True) + + # Sliding Window über dieses Level + num_windows = (len(level_df) - window_size) // step_size + 1 + + if num_windows <= 0: + print(f" Zu wenige Einträge für Window (benötigt {window_size})") + continue + + for i in range(num_windows): + start_idx = i * step_size + end_idx = start_idx + window_size + + window_df = level_df.iloc[start_idx:end_idx] + + # Erstelle aggregiertes Ergebnis + result = { + 'subjectID': window_df['subjectID'].iloc[0], + 'start_time': window_df['rowID'].iloc[0], # rowID als start_time + 'STUDY': window_df['STUDY'].iloc[0], + 'LEVEL': window_df['LEVEL'].iloc[0], + 'PHASE': window_df['PHASE'].iloc[0] + } + + # Summiere alle AU-Spalten + for au_col in au_columns: + result[f'{au_col}_sum'] = window_df[au_col].sum() + + all_windows.append(result) + + print(f" Windows erstellt: {num_windows}") + + # Erstelle finalen DataFrame + result_df = pd.DataFrame(all_windows) + + print(f"\n{'='*60}") + print(f"Gesamt Windows erstellt: {len(result_df)}") + print(f"Spalten: {list(result_df.columns)}") + + # Speichere Ergebnis + result_df.to_parquet(output_file, index=False) + print(f"\nErgebnis gespeichert in: {output_file}") + + return result_df + + +# Beispiel-Verwendung +if __name__ == "__main__": + # Anpassen an deine Pfade + input_directory = "" + output_file = "./output/output_windowed.parquet" + + + result = process_parquet_files( + input_dir=input_directory, + output_file=output_file, + window_size=1250, + step_size=125 + ) + + # Zeige erste Zeilen + if result is not None: + print("\nErste 5 Zeilen des Ergebnisses:") + print(result.head()) \ No newline at end of file