changed paths to paulusja ... directory changed feature extraction for AUs to mean instead of sum added v1 of merge script of datasets (needs to be adjusted)
113 lines
3.9 KiB
Python
113 lines
3.9 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
from pathlib import Path
|
|
|
|
def process_parquet_files(input_dir, output_file, window_size=1250, step_size=125):
|
|
"""
|
|
Verarbeitet Parquet-Dateien mit Sliding Window Aggregation.
|
|
|
|
Parameters:
|
|
-----------
|
|
input_dir : str
|
|
Verzeichnis mit Parquet-Dateien
|
|
output_file : str
|
|
Pfad für die Ausgabe-Parquet-Datei
|
|
window_size : int
|
|
Größe des Sliding Windows (default: 3000)
|
|
step_size : int
|
|
Schrittweite in Einträgen (default: 250 = 10 Sekunden bei 25 Hz)
|
|
"""
|
|
|
|
input_path = Path(input_dir)
|
|
parquet_files = sorted(input_path.glob("*.parquet"))
|
|
|
|
if not parquet_files:
|
|
print(f"Keine Parquet-Dateien in {input_dir} gefunden!")
|
|
return
|
|
|
|
print(f"Gefundene Dateien: {len(parquet_files)}")
|
|
|
|
all_windows = []
|
|
|
|
for file_idx, parquet_file in enumerate(parquet_files):
|
|
print(f"\nVerarbeite Datei {file_idx + 1}/{len(parquet_files)}: {parquet_file.name}")
|
|
|
|
# Lade Parquet-Datei
|
|
df = pd.read_parquet(parquet_file)
|
|
print(f" Einträge: {len(df)}")
|
|
|
|
# Identifiziere AU-Spalten
|
|
au_columns = [col for col in df.columns if col.startswith('FACE_AU')]
|
|
print(f" AU-Spalten: {len(au_columns)}")
|
|
|
|
# Gruppiere nach STUDY, LEVEL, PHASE (um Übergänge zu vermeiden)
|
|
for (study_val, level_val, phase_val), level_df in df.groupby(['STUDY', 'LEVEL', 'PHASE'], sort=False):
|
|
print(f" STUDY {study_val}, LEVEL {level_val}, PHASE {phase_val}: {len(level_df)} Einträge")
|
|
|
|
# Reset index für korrekte Position-Berechnung
|
|
level_df = level_df.reset_index(drop=True)
|
|
|
|
# Sliding Window über dieses Level
|
|
num_windows = (len(level_df) - window_size) // step_size + 1
|
|
|
|
if num_windows <= 0:
|
|
print(f" Zu wenige Einträge für Window (benötigt {window_size})")
|
|
continue
|
|
|
|
for i in range(num_windows):
|
|
start_idx = i * step_size
|
|
end_idx = start_idx + window_size
|
|
|
|
window_df = level_df.iloc[start_idx:end_idx]
|
|
|
|
# Erstelle aggregiertes Ergebnis
|
|
result = {
|
|
'subjectID': window_df['subjectID'].iloc[0],
|
|
'start_time': window_df['rowID'].iloc[0], # rowID als start_time
|
|
'STUDY': window_df['STUDY'].iloc[0],
|
|
'LEVEL': window_df['LEVEL'].iloc[0],
|
|
'PHASE': window_df['PHASE'].iloc[0]
|
|
}
|
|
|
|
# Summiere alle AU-Spalten
|
|
for au_col in au_columns:
|
|
# result[f'{au_col}_sum'] = window_df[au_col].sum()
|
|
result[f'{au_col}_mean'] = window_df[au_col].mean()
|
|
|
|
all_windows.append(result)
|
|
|
|
print(f" Windows erstellt: {num_windows}")
|
|
|
|
# Erstelle finalen DataFrame
|
|
result_df = pd.DataFrame(all_windows)
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"Gesamt Windows erstellt: {len(result_df)}")
|
|
print(f"Spalten: {list(result_df.columns)}")
|
|
|
|
# Speichere Ergebnis
|
|
result_df.to_parquet(output_file, index=False)
|
|
print(f"\nErgebnis gespeichert in: {output_file}")
|
|
|
|
return result_df
|
|
|
|
|
|
# Beispiel-Verwendung
|
|
if __name__ == "__main__":
|
|
# Anpassen an deine Pfade
|
|
input_directory = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/new_AU_parquet_files")
|
|
output_file = Path(r"/home/jovyan/data-paulusjafahrsimulator-gpu/new_AU_dataset_mean/AU_dataset_mean.parquet")
|
|
|
|
|
|
|
|
result = process_parquet_files(
|
|
input_dir=input_directory,
|
|
output_file=output_file,
|
|
window_size=1250,
|
|
step_size=125
|
|
)
|
|
|
|
# Zeige erste Zeilen
|
|
if result is not None:
|
|
print("\nErste 5 Zeilen des Ergebnisses:")
|
|
print(result.head()) |