- added new files

- implemented data base writing of AU and ET
This commit is contained in:
TimoKurz 2026-02-19 17:14:31 +01:00
parent 0e25ba4a3e
commit 42965a4733
2 changed files with 577 additions and 0 deletions

View File

@ -0,0 +1,372 @@
import warnings
warnings.filterwarnings(
"ignore",
message=r".*SymbolDatabase\.GetPrototype\(\) is deprecated.*",
category=UserWarning,
module=r"google\.protobuf\.symbol_database"
)
import cv2
import time
import os
import threading
from datetime import datetime
from feat import Detector
import torch
import mediapipe as mp
import pandas as pd
from pathlib import Path
from eyeFeature_new import compute_features_from_parquet
# Import your helper functions
# from db_helper import connect_db, disconnect_db, insert_rows_into_table, create_table
import db_helper as db
# Konfiguration
DB_PATH = Path("~/MSY_FS/databases/database.sqlite").expanduser()
CAMERA_INDEX = 0
OUTPUT_DIR = "recordings"
VIDEO_DURATION = 50 # Sekunden
START_INTERVAL = 5 # Sekunden bis zum nächsten Start
FPS = 25.0 # Feste FPS
eye_tracking_features = {}
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
# Globaler Detector, um ihn nicht bei jedem Video neu laden zu müssen (spart massiv Zeit/Speicher)
print("Initialisiere AU-Detector (bitte warten)...")
detector = Detector(au_model="xgb")
# ===== MediaPipe FaceMesh Setup =====
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True, # wichtig für Iris
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
LEFT_IRIS = [474, 475, 476, 477]
RIGHT_IRIS = [469, 470, 471, 472]
LEFT_EYE_LIDS = (159, 145)
RIGHT_EYE_LIDS = (386, 374)
EYE_OPEN_THRESHOLD = 6
LEFT_EYE_ALL = [33, 7, 163, 144, 145, 153, 154, 155,
133, 173, 157, 158, 159, 160, 161, 246
]
RIGHT_EYE_ALL = [263, 249, 390, 373, 374, 380, 381, 382,
362, 398, 384, 385, 386, 387, 388, 466
]
def eye_openness(landmarks, top_idx, bottom_idx, img_height):
top = landmarks[top_idx]
bottom = landmarks[bottom_idx]
return abs(top.y - bottom.y) * img_height
def compute_gaze(landmarks, iris_center, eye_indices, w, h):
iris_x, iris_y = iris_center
eye_points = []
for idx in eye_indices:
lm = landmarks[idx]
eye_points.append((lm.x * w, lm.y * h))
xs = [p[0] for p in eye_points]
ys = [p[1] for p in eye_points]
eye_left = min(xs)
eye_right = max(xs)
eye_top = min(ys)
eye_bottom = max(ys)
eye_width = eye_right - eye_left
eye_height = eye_bottom - eye_top
if eye_width < 1 or eye_height < 1:
return 0.5, 0.5
gaze_x = (iris_x - eye_left) / eye_width
gaze_y = (iris_y - eye_top) / eye_height
return gaze_x, gaze_y
def extract_aus(path, skip_frames):
# torch.no_grad() deaktiviert die Gradientenberechnung.
# Das löst den "Can't call numpy() on Tensor that requires grad" Fehler.
with torch.no_grad():
video_prediction = detector.detect_video(
path,
skip_frames=skip_frames,
face_detection_threshold=0.95
)
# Falls video_prediction oder .aus noch Tensoren sind,
# stellen wir sicher, dass sie korrekt summiert werden.
try:
# Wir nehmen die Summe der Action Units über alle detektierten Frames
res = video_prediction.aus.mean()
return res
except Exception as e:
print(f"Fehler bei der Summenbildung: {e}")
return None
def startAU_creation(video_path, db_path):
"""Diese Funktion läuft nun in einem eigenen Thread."""
try:
print(f"\n[THREAD START] Analyse läuft für: {video_path}")
# skip_frames berechnen (z.B. alle 5 Sekunden bei 25 FPS = 125)
output = extract_aus(video_path, skip_frames=int(FPS*5))
print(f"\n--- Ergebnis für {os.path.basename(video_path)} ---")
print(output)
print("--------------------------------------------------\n")
if output is not None:
# Verbindung für diesen Thread öffnen (SQLite Sicherheit)
conn, cursor = db.connect_db(db_path)
# Daten vorbereiten: Timestamp + AU Ergebnisse
# Wir wandeln die Series/Dataframe in ein Dictionary um
data_to_insert = output.to_dict()
data_to_insert = {
f"FACE_{k}_mean": v for k, v in data_to_insert.items()
}
now = datetime.now()
ticks = int(time.mktime(now.timetuple()))
data_to_insert['start_time'] = [ticks]
data_to_insert = data_to_insert | eye_tracking_features
#data_to_insert['start_time'] = [datetime.now().strftime("%Y-%m-%d %H:%M:%S")]
# Da die AU-Spaltennamen dynamisch sind, stellen wir sicher, dass sie Listen sind
# (insert_rows_into_table erwartet Listen für jeden Key)
final_payload = {k: [v] if not isinstance(v, list) else v for k, v in data_to_insert.items()}
db.insert_rows_into_table(conn, cursor, "feature_table", final_payload)
db.disconnect_db(conn, cursor)
print(f"--- Ergebnis für {os.path.basename(video_path)} in DB gespeichert ---")
os.remove(video_path)
os.remove(video_path.replace(".avi", "_gaze.parquet"))
print(f"Löschen der Datei: {video_path}")
except Exception as e:
print(f"Fehler bei der Analyse von {video_path}: {e}")
class VideoRecorder:
def __init__(self, filename, width, height, db_path):
self.gaze_data = []
self.filename = filename
self.db_path = db_path
fourcc = cv2.VideoWriter_fourcc(*'XVID')
self.out = cv2.VideoWriter(filename, fourcc, FPS, (width, height))
self.frames_to_record = int(VIDEO_DURATION * FPS)
self.frames_count = 0
self.is_finished = False
def write_frame(self, frame):
if self.frames_count < self.frames_to_record:
self.out.write(frame)
self.frames_count += 1
else:
self.finish()
def finish(self):
if not self.is_finished:
self.out.release()
self.is_finished = True
abs_path = os.path.abspath(self.filename)
print(f"Video fertig gespeichert: {self.filename}")
# --- MULTITHREADING HIER ---
# Wir starten die Analyse in einem neuen Thread, damit main() sofort weiter frames lesen kann
analysis_thread = threading.Thread(target=startAU_creation, args=(abs_path, self.db_path))
analysis_thread.daemon = True # Beendet sich, wenn das Hauptprogramm schließt
analysis_thread.start()
class GazeRecorder:
def __init__(self, filename):
self.filename = filename
self.frames_to_record = int(VIDEO_DURATION * FPS)
self.frames_count = 0
self.gaze_data = []
self.is_finished = False
def write_frame(self, gaze_row):
if self.frames_count < self.frames_to_record:
self.gaze_data.append(gaze_row)
self.frames_count += 1
else:
self.finish()
def finish(self):
if not self.is_finished:
df = pd.DataFrame(self.gaze_data)
df.to_parquet(self.filename, engine="pyarrow", index=False)
print(f"Gaze-Parquet gespeichert: {self.filename}")
features = compute_features_from_parquet(self.filename)
print("Features:", features)
self.is_finished = True
eye_tracking_features = features
def main():
cap = cv2.VideoCapture(CAMERA_INDEX)
if not cap.isOpened():
print("Fehler: Kamera konnte nicht geöffnet werden.")
return
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
active_video_recorders = []
active_gaze_recorders = []
last_start_time = 0
print("Aufnahme läuft. Drücke 'q' zum Beenden.")
try:
while True:
ret, frame = cap.read()
if not ret:
break
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, _ = frame.shape
results = face_mesh.process(rgb)
left_valid = 0
right_valid = 0
left_diameter = None
right_diameter = None
left_gaze_x = None
left_gaze_y = None
right_gaze_x = None
right_gaze_y = None
if results.multi_face_landmarks:
face_landmarks = results.multi_face_landmarks[0]
left_open = eye_openness(
face_landmarks.landmark,
LEFT_EYE_LIDS[0],
LEFT_EYE_LIDS[1],
h
)
right_open = eye_openness(
face_landmarks.landmark,
RIGHT_EYE_LIDS[0],
RIGHT_EYE_LIDS[1],
h
)
left_valid = 1 if left_open > EYE_OPEN_THRESHOLD else 0
right_valid = 1 if right_open > EYE_OPEN_THRESHOLD else 0
for eye_name, eye_indices in [("left", LEFT_IRIS), ("right", RIGHT_IRIS)]:
iris_points = []
for idx in eye_indices:
lm = face_landmarks.landmark[idx]
x_i, y_i = int(lm.x * w), int(lm.y * h)
iris_points.append((x_i, y_i))
if len(iris_points) == 4:
cx = int(sum(p[0] for p in iris_points) / 4)
cy = int(sum(p[1] for p in iris_points) / 4)
radius = max(
((x - cx) ** 2 + (y - cy) ** 2) ** 0.5
for (x, y) in iris_points
)
diameter = 2 * radius
cv2.circle(frame, (cx, cy), int(radius), (0, 255, 0), 2)
if eye_name == "left" and left_valid:
left_diameter = diameter
left_gaze_x, left_gaze_y = compute_gaze(
face_landmarks.landmark,
(cx, cy),
RIGHT_EYE_ALL,
w, h
)
elif eye_name == "right" and right_valid:
right_diameter = diameter
right_gaze_x, right_gaze_y = compute_gaze(
face_landmarks.landmark,
(cx, cy),
LEFT_EYE_ALL,
w, h
)
gaze_row = {
"timestamp": time.time(),
"EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_X": left_gaze_x,
"EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_Y": left_gaze_y,
"EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_X": right_gaze_x,
"EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_Y": right_gaze_y,
"EYE_LEFT_PUPIL_VALIDITY": left_valid,
"EYE_RIGHT_PUPIL_VALIDITY": right_valid,
"EYE_LEFT_PUPIL_DIAMETER": left_diameter,
"EYE_RIGHT_PUPIL_DIAMETER": right_diameter
}
current_time = time.time()
if current_time - last_start_time >= START_INTERVAL:
timestamp = datetime.now().strftime("%H%M%S")
filename = os.path.join(OUTPUT_DIR, f"rec_{timestamp}.avi")
video_recorder = VideoRecorder(filename, width, height, DB_PATH)
gaze_filename = filename.replace(".avi", "_gaze.parquet")
gaze_recorder = GazeRecorder(gaze_filename)
active_video_recorders.append(video_recorder)
active_gaze_recorders.append(gaze_recorder)
last_start_time = current_time
for v_rec, g_rec in zip(active_video_recorders[:], active_gaze_recorders[:]):
v_rec.write_frame(frame)
g_rec.write_frame(gaze_row)
if v_rec.is_finished:
active_video_recorders.remove(v_rec)
if g_rec.is_finished:
active_gaze_recorders.remove(g_rec)
cv2.imshow('Kamera Livestream', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
time.sleep(1/FPS)
finally:
face_mesh.close()
cap.release()
cv2.destroyAllWindows()
print("Programm beendet. Warte ggf. auf laufende Analysen...")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,205 @@
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.preprocessing import MinMaxScaler
from scipy.signal import welch
from pygazeanalyser.detectors import fixation_detection, saccade_detection
##############################################################################
# KONFIGURATION
##############################################################################
SAMPLING_RATE = 25 # Hz
MIN_DUR_BLINKS = 2 # x * 40ms
##############################################################################
# EYE-TRACKING FUNKTIONEN
##############################################################################
def clean_eye_df(df):
"""Extrahiert nur Eye-Tracking Spalten und entfernt leere Zeilen."""
eye_cols = [c for c in df.columns if c.startswith("EYE_")]
if not eye_cols:
return pd.DataFrame()
df_eye = df[eye_cols].copy()
df_eye = df_eye.replace([np.inf, -np.inf], np.nan)
df_eye = df_eye.dropna(subset=eye_cols, how="all")
return df_eye.reset_index(drop=True)
def extract_gaze_signal(df):
"""Extrahiert 2D-Gaze-Positionen, maskiert ungültige Samples und interpoliert."""
gx_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
gy_L = df["EYE_LEFT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
gx_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_X"].astype(float).copy()
gy_R = df["EYE_RIGHT_GAZE_POINT_ON_DISPLAY_AREA_Y"].astype(float).copy()
val_L = (df["EYE_LEFT_PUPIL_VALIDITY"] == 1)
val_R = (df["EYE_RIGHT_PUPIL_VALIDITY"] == 1)
# Inf → NaN
for arr in [gx_L, gy_L, gx_R, gy_R]:
arr.replace([np.inf, -np.inf], np.nan, inplace=True)
# Ungültige maskieren
gx_L[~val_L] = np.nan
gy_L[~val_L] = np.nan
gx_R[~val_R] = np.nan
gy_R[~val_R] = np.nan
# Mittelwert beider Augen
gx = np.mean(np.column_stack([gx_L, gx_R]), axis=1)
gy = np.mean(np.column_stack([gy_L, gy_R]), axis=1)
# Interpolation
gx = pd.Series(gx).interpolate(limit=None, limit_direction="both").bfill().ffill()
gy = pd.Series(gy).interpolate(limit=None, limit_direction="both").bfill().ffill()
# MinMax Skalierung
xscaler = MinMaxScaler()
gxscale = xscaler.fit_transform(gx.values.reshape(-1, 1))
yscaler = MinMaxScaler()
gyscale = yscaler.fit_transform(gy.values.reshape(-1, 1))
return np.column_stack((gxscale, gyscale))
def extract_pupil(df):
"""Extrahiert Pupillengröße (beide Augen gemittelt)."""
pl = df["EYE_LEFT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
pr = df["EYE_RIGHT_PUPIL_DIAMETER"].replace([np.inf, -np.inf], np.nan)
vl = df.get("EYE_LEFT_PUPIL_VALIDITY")
vr = df.get("EYE_RIGHT_PUPIL_VALIDITY")
if vl is None or vr is None:
validity = (~pl.isna() | ~pr.isna()).astype(int).to_numpy()
else:
validity = ((vl == 1) | (vr == 1)).astype(int).to_numpy()
p = np.mean(np.column_stack([pl, pr]), axis=1)
p = pd.Series(p).interpolate(limit=50, limit_direction="both").bfill().ffill()
return p.to_numpy(), validity
def detect_blinks(pupil_validity, min_duration=5):
"""Erkennt Blinks: Validity=0 → Blink."""
blinks = []
start = None
for i, v in enumerate(pupil_validity):
if v == 0 and start is None:
start = i
elif v == 1 and start is not None:
if i - start >= min_duration:
blinks.append([start, i])
start = None
return blinks
def compute_IPA(pupil, fs=25):
"""Index of Pupillary Activity (Duchowski 2018)."""
f, Pxx = welch(pupil, fs=fs, nperseg=int(fs*2))
hf_band = (f >= 0.6) & (f <= 2.0)
return np.sum(Pxx[hf_band])
def extract_eye_features(df_eye, fs=25, min_dur_blinks=2):
"""
Extrahiert Eye-Tracking Features für ein einzelnes Window.
Gibt Dictionary mit allen Eye-Features zurück.
"""
# Gaze
gaze = extract_gaze_signal(df_eye)
# Pupille
pupil, pupil_validity = extract_pupil(df_eye)
# ----------------------------
# FIXATIONS
# ----------------------------
time_ms = np.arange(len(df_eye)) * 1000.0 / fs
fix, efix = fixation_detection(
x=gaze[:, 0], y=gaze[:, 1], time=time_ms,
missing=0.0, maxdist=0.003, mindur=10
)
fixation_durations = [f[2] for f in efix if np.isfinite(f[2]) and f[2] > 0]
# Kategorien
F_short = sum(66 <= d <= 150 for d in fixation_durations)
F_medium = sum(300 <= d <= 500 for d in fixation_durations)
F_long = sum(d >= 1000 for d in fixation_durations)
F_hundred = sum(d > 100 for d in fixation_durations)
# ----------------------------
# SACCADES
# ----------------------------
sac, esac = saccade_detection(
x=gaze[:, 0], y=gaze[:, 1], time=time_ms,
missing=0, minlen=12, maxvel=0.2, maxacc=1
)
sac_durations = [s[2] for s in esac]
sac_amplitudes = [((s[5]-s[3])**2 + (s[6]-s[4])**2)**0.5 for s in esac]
# ----------------------------
# BLINKS
# ----------------------------
blinks = detect_blinks(pupil_validity, min_duration=min_dur_blinks)
blink_durations = [(b[1] - b[0]) / fs for b in blinks]
# ----------------------------
# PUPIL
# ----------------------------
if np.all(np.isnan(pupil)):
mean_pupil = np.nan
ipa = np.nan
else:
mean_pupil = np.nanmean(pupil)
ipa = compute_IPA(pupil, fs=fs)
# Feature Dictionary
return {
"Fix_count_short_66_150": F_short,
"Fix_count_medium_300_500": F_medium,
"Fix_count_long_gt_1000": F_long,
"Fix_count_100": F_hundred,
"Fix_mean_duration": np.mean(fixation_durations) if fixation_durations else 0,
"Fix_median_duration": np.median(fixation_durations) if fixation_durations else 0,
"Sac_count": len(sac),
"Sac_mean_amp": np.mean(sac_amplitudes) if sac_amplitudes else 0,
"Sac_mean_dur": np.mean(sac_durations) if sac_durations else 0,
"Sac_median_dur": np.median(sac_durations) if sac_durations else 0,
"Blink_count": len(blinks),
"Blink_mean_dur": np.mean(blink_durations) if blink_durations else 0,
"Blink_median_dur": np.median(blink_durations) if blink_durations else 0,
"Pupil_mean": mean_pupil,
"Pupil_IPA": ipa
}
def compute_features_from_parquet(parquet_path):
df = pd.read_parquet(parquet_path)
df_eye = clean_eye_df(df)
if df_eye.empty:
return None
features = extract_eye_features(
df_eye,
fs=SAMPLING_RATE,
min_dur_blinks=MIN_DUR_BLINKS
)
return features