- added implementation to create 50sec video files out of camera lifestream for every 5 seconds

- added action unit recognition to the camera stream
This commit is contained in:
TimoKurz 2026-02-07 11:27:40 +01:00
parent 9951d8b4f9
commit 832a765575
2 changed files with 141 additions and 9 deletions

View File

@ -3,11 +3,11 @@ from feat.utils.io import get_test_data_path
from moviepy.video.io.VideoFileClip import VideoFileClip from moviepy.video.io.VideoFileClip import VideoFileClip
import os import os
def extract_aus(path, model): def extract_aus(path, model, skip_frames):
detector = Detector(au_model=model) detector = Detector(au_model=model)
video_prediction = detector.detect( video_prediction = detector.detect(
path, data_type="video", skip_frames=24*5, face_detection_threshold=0.95 # alle 5 Sekunden einbeziehen - 24 Frames pro Sekunde path, data_type="video", skip_frames=skip_frames, face_detection_threshold=0.95 # alle 5 Sekunden einbeziehen - 24 Frames pro Sekunde
) )
return video_prediction.aus.sum() return video_prediction.aus.sum()
@ -38,13 +38,13 @@ def split_video(path, chunk_length=120):
return output_path return output_path
def start(path): # def start(path):
results = [] # results = []
clips = split_video(path) # clips = split_video(path)
for clip in clips: # for clip in clips:
results.append(extract_aus(clip, 'svm')) # results.append(extract_aus(clip, 'svm', 25*5))
return results # return results
if __name__ == "__main__": if __name__ == "__main__":
results = [] results = []
@ -53,6 +53,6 @@ if __name__ == "__main__":
clips = split_video(test_video_path) clips = split_video(test_video_path)
for clippath in clips: for clippath in clips:
results.append(extract_aus(clippath, 'svm')) results.append(extract_aus(clippath, 'svm', 25*5))
print(results) print(results)

View File

@ -0,0 +1,132 @@
import cv2
import time
import os
import threading
from datetime import datetime
from feat import Detector
import torch
# Konfiguration
CAMERA_INDEX = 0
OUTPUT_DIR = "recordings"
VIDEO_DURATION = 50 # Sekunden
START_INTERVAL = 5 # Sekunden bis zum nächsten Start
FPS = 25.0 # Feste FPS
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
# Globaler Detector, um ihn nicht bei jedem Video neu laden zu müssen (spart massiv Zeit/Speicher)
print("Initialisiere AU-Detector (bitte warten)...")
detector = Detector(au_model="xgb")
def extract_aus(path, skip_frames):
# torch.no_grad() deaktiviert die Gradientenberechnung.
# Das löst den "Can't call numpy() on Tensor that requires grad" Fehler.
with torch.no_grad():
video_prediction = detector.detect_video(
path,
skip_frames=skip_frames,
face_detection_threshold=0.95
)
# Falls video_prediction oder .aus noch Tensoren sind,
# stellen wir sicher, dass sie korrekt summiert werden.
try:
# Wir nehmen die Summe der Action Units über alle detektierten Frames
res = video_prediction.aus.sum()
return res
except Exception as e:
print(f"Fehler bei der Summenbildung: {e}")
return 0
def startAU_creation(video_path):
"""Diese Funktion läuft nun in einem eigenen Thread."""
try:
print(f"\n[THREAD START] Analyse läuft für: {video_path}")
# skip_frames berechnen (z.B. alle 5 Sekunden bei 25 FPS = 125)
output = extract_aus(video_path, skip_frames=int(FPS*5))
print(f"\n--- Ergebnis für {os.path.basename(video_path)} ---")
print(output)
print("--------------------------------------------------\n")
except Exception as e:
print(f"Fehler bei der Analyse von {video_path}: {e}")
class VideoRecorder:
def __init__(self, filename, width, height):
self.filename = filename
fourcc = cv2.VideoWriter_fourcc(*'XVID')
self.out = cv2.VideoWriter(filename, fourcc, FPS, (width, height))
self.frames_to_record = int(VIDEO_DURATION * FPS)
self.frames_count = 0
self.is_finished = False
def write_frame(self, frame):
if self.frames_count < self.frames_to_record:
self.out.write(frame)
self.frames_count += 1
else:
self.finish()
def finish(self):
if not self.is_finished:
self.out.release()
self.is_finished = True
abs_path = os.path.abspath(self.filename)
print(f"Video fertig gespeichert: {self.filename}")
# --- MULTITHREADING HIER ---
# Wir starten die Analyse in einem neuen Thread, damit main() sofort weiter frames lesen kann
analysis_thread = threading.Thread(target=startAU_creation, args=(abs_path,))
analysis_thread.daemon = True # Beendet sich, wenn das Hauptprogramm schließt
analysis_thread.start()
def main():
cap = cv2.VideoCapture(CAMERA_INDEX)
if not cap.isOpened():
print("Fehler: Kamera konnte nicht geöffnet werden.")
return
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
active_recorders = []
last_start_time = 0
print("Aufnahme läuft. Drücke 'q' zum Beenden.")
try:
while True:
ret, frame = cap.read()
if not ret:
break
current_time = time.time()
if current_time - last_start_time >= START_INTERVAL:
timestamp = datetime.now().strftime("%H%M%S")
filename = os.path.join(OUTPUT_DIR, f"rec_{timestamp}.avi")
new_recorder = VideoRecorder(filename, width, height)
active_recorders.append(new_recorder)
last_start_time = current_time
for rec in active_recorders[:]:
rec.write_frame(frame)
if rec.is_finished:
active_recorders.remove(rec)
cv2.imshow('Kamera Livestream', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
time.sleep(1/FPS)
finally:
cap.release()
cv2.destroyAllWindows()
print("Programm beendet. Warte ggf. auf laufende Analysen...")
if __name__ == "__main__":
main()