step 2: feature table creation script
This commit is contained in:
parent
a619f5ad8e
commit
4ac4a8636b
111
create_feature_table.py
Normal file
111
create_feature_table.py
Normal file
@ -0,0 +1,111 @@
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from pathlib import Path
|
||||
|
||||
def process_parquet_files(input_dir, output_file, window_size=1250, step_size=125):
|
||||
"""
|
||||
Verarbeitet Parquet-Dateien mit Sliding Window Aggregation.
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
input_dir : str
|
||||
Verzeichnis mit Parquet-Dateien
|
||||
output_file : str
|
||||
Pfad für die Ausgabe-Parquet-Datei
|
||||
window_size : int
|
||||
Größe des Sliding Windows (default: 3000)
|
||||
step_size : int
|
||||
Schrittweite in Einträgen (default: 250 = 10 Sekunden bei 25 Hz)
|
||||
"""
|
||||
|
||||
input_path = Path(input_dir)
|
||||
parquet_files = sorted(input_path.glob("*.parquet"))
|
||||
|
||||
if not parquet_files:
|
||||
print(f"Keine Parquet-Dateien in {input_dir} gefunden!")
|
||||
return
|
||||
|
||||
print(f"Gefundene Dateien: {len(parquet_files)}")
|
||||
|
||||
all_windows = []
|
||||
|
||||
for file_idx, parquet_file in enumerate(parquet_files):
|
||||
print(f"\nVerarbeite Datei {file_idx + 1}/{len(parquet_files)}: {parquet_file.name}")
|
||||
|
||||
# Lade Parquet-Datei
|
||||
df = pd.read_parquet(parquet_file)
|
||||
print(f" Einträge: {len(df)}")
|
||||
|
||||
# Identifiziere AU-Spalten
|
||||
au_columns = [col for col in df.columns if col.startswith('AU')]
|
||||
print(f" AU-Spalten: {len(au_columns)}")
|
||||
|
||||
# Gruppiere nach STUDY, LEVEL, PHASE (um Übergänge zu vermeiden)
|
||||
for (study_val, level_val, phase_val), level_df in df.groupby(['STUDY', 'LEVEL', 'PHASE'], sort=False):
|
||||
print(f" STUDY {study_val}, LEVEL {level_val}, PHASE {phase_val}: {len(level_df)} Einträge")
|
||||
|
||||
# Reset index für korrekte Position-Berechnung
|
||||
level_df = level_df.reset_index(drop=True)
|
||||
|
||||
# Sliding Window über dieses Level
|
||||
num_windows = (len(level_df) - window_size) // step_size + 1
|
||||
|
||||
if num_windows <= 0:
|
||||
print(f" Zu wenige Einträge für Window (benötigt {window_size})")
|
||||
continue
|
||||
|
||||
for i in range(num_windows):
|
||||
start_idx = i * step_size
|
||||
end_idx = start_idx + window_size
|
||||
|
||||
window_df = level_df.iloc[start_idx:end_idx]
|
||||
|
||||
# Erstelle aggregiertes Ergebnis
|
||||
result = {
|
||||
'subjectID': window_df['subjectID'].iloc[0],
|
||||
'start_time': window_df['rowID'].iloc[0], # rowID als start_time
|
||||
'STUDY': window_df['STUDY'].iloc[0],
|
||||
'LEVEL': window_df['LEVEL'].iloc[0],
|
||||
'PHASE': window_df['PHASE'].iloc[0]
|
||||
}
|
||||
|
||||
# Summiere alle AU-Spalten
|
||||
for au_col in au_columns:
|
||||
result[f'{au_col}_sum'] = window_df[au_col].sum()
|
||||
|
||||
all_windows.append(result)
|
||||
|
||||
print(f" Windows erstellt: {num_windows}")
|
||||
|
||||
# Erstelle finalen DataFrame
|
||||
result_df = pd.DataFrame(all_windows)
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Gesamt Windows erstellt: {len(result_df)}")
|
||||
print(f"Spalten: {list(result_df.columns)}")
|
||||
|
||||
# Speichere Ergebnis
|
||||
result_df.to_parquet(output_file, index=False)
|
||||
print(f"\nErgebnis gespeichert in: {output_file}")
|
||||
|
||||
return result_df
|
||||
|
||||
|
||||
# Beispiel-Verwendung
|
||||
if __name__ == "__main__":
|
||||
# Anpassen an deine Pfade
|
||||
input_directory = ""
|
||||
output_file = "./output/output_windowed.parquet"
|
||||
|
||||
|
||||
result = process_parquet_files(
|
||||
input_dir=input_directory,
|
||||
output_file=output_file,
|
||||
window_size=1250,
|
||||
step_size=125
|
||||
)
|
||||
|
||||
# Zeige erste Zeilen
|
||||
if result is not None:
|
||||
print("\nErste 5 Zeilen des Ergebnisses:")
|
||||
print(result.head())
|
||||
Loading…
x
Reference in New Issue
Block a user