112 lines
3.9 KiB
Python
112 lines
3.9 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
from pathlib import Path
|
|
|
|
def process_parquet_files(input_dir, output_file, window_size=1250, step_size=125):
|
|
"""
|
|
Verarbeitet Parquet-Dateien mit Sliding Window Aggregation.
|
|
|
|
Parameters:
|
|
-----------
|
|
input_dir : str
|
|
Verzeichnis mit Parquet-Dateien
|
|
output_file : str
|
|
Pfad für die Ausgabe-Parquet-Datei
|
|
window_size : int
|
|
Größe des Sliding Windows (default: 3000)
|
|
step_size : int
|
|
Schrittweite in Einträgen (default: 250 = 10 Sekunden bei 25 Hz)
|
|
"""
|
|
|
|
input_path = Path(input_dir)
|
|
parquet_files = sorted(input_path.glob("*.parquet"))
|
|
|
|
if not parquet_files:
|
|
print(f"Keine Parquet-Dateien in {input_dir} gefunden!")
|
|
return
|
|
|
|
print(f"Gefundene Dateien: {len(parquet_files)}")
|
|
|
|
all_windows = []
|
|
|
|
for file_idx, parquet_file in enumerate(parquet_files):
|
|
print(f"\nVerarbeite Datei {file_idx + 1}/{len(parquet_files)}: {parquet_file.name}")
|
|
|
|
# Lade Parquet-Datei
|
|
df = pd.read_parquet(parquet_file)
|
|
print(f" Einträge: {len(df)}")
|
|
|
|
# Identifiziere AU-Spalten
|
|
au_columns = [col for col in df.columns if col.startswith('FACE_AU')]
|
|
print(f" AU-Spalten: {len(au_columns)}")
|
|
|
|
# Gruppiere nach STUDY, LEVEL, PHASE (um Übergänge zu vermeiden)
|
|
for (study_val, level_val, phase_val), level_df in df.groupby(['STUDY', 'LEVEL', 'PHASE'], sort=False):
|
|
print(f" STUDY {study_val}, LEVEL {level_val}, PHASE {phase_val}: {len(level_df)} Einträge")
|
|
|
|
# Reset index für korrekte Position-Berechnung
|
|
level_df = level_df.reset_index(drop=True)
|
|
|
|
# Sliding Window über dieses Level
|
|
num_windows = (len(level_df) - window_size) // step_size + 1
|
|
|
|
if num_windows <= 0:
|
|
print(f" Zu wenige Einträge für Window (benötigt {window_size})")
|
|
continue
|
|
|
|
for i in range(num_windows):
|
|
start_idx = i * step_size
|
|
end_idx = start_idx + window_size
|
|
|
|
window_df = level_df.iloc[start_idx:end_idx]
|
|
|
|
# Erstelle aggregiertes Ergebnis
|
|
result = {
|
|
'subjectID': window_df['subjectID'].iloc[0],
|
|
'start_time': window_df['rowID'].iloc[0], # rowID als start_time
|
|
'STUDY': window_df['STUDY'].iloc[0],
|
|
'LEVEL': window_df['LEVEL'].iloc[0],
|
|
'PHASE': window_df['PHASE'].iloc[0]
|
|
}
|
|
|
|
# Summiere alle AU-Spalten
|
|
for au_col in au_columns:
|
|
result[f'{au_col}_sum'] = window_df[au_col].sum()
|
|
|
|
all_windows.append(result)
|
|
|
|
print(f" Windows erstellt: {num_windows}")
|
|
|
|
# Erstelle finalen DataFrame
|
|
result_df = pd.DataFrame(all_windows)
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"Gesamt Windows erstellt: {len(result_df)}")
|
|
print(f"Spalten: {list(result_df.columns)}")
|
|
|
|
# Speichere Ergebnis
|
|
result_df.to_parquet(output_file, index=False)
|
|
print(f"\nErgebnis gespeichert in: {output_file}")
|
|
|
|
return result_df
|
|
|
|
|
|
# Beispiel-Verwendung
|
|
if __name__ == "__main__":
|
|
# Anpassen an deine Pfade
|
|
input_directory = r"C:\Users\x\FAUbox\WS2526_Fahrsimulator_MSY (Celina Korzer)\new_AU_parquet_files"
|
|
output_file = r"C:\Users\x\FAUbox\WS2526_Fahrsimulator_MSY (Celina Korzer)\new_AU_dataset\AU_dataset.parquet"
|
|
|
|
|
|
|
|
result = process_parquet_files(
|
|
input_dir=input_directory,
|
|
output_file=output_file,
|
|
window_size=1250,
|
|
step_size=125
|
|
)
|
|
|
|
# Zeige erste Zeilen
|
|
if result is not None:
|
|
print("\nErste 5 Zeilen des Ergebnisses:")
|
|
print(result.head()) |