from collections import deque import threading import time import cv2 import pyramids import heartrate import preprocessing import eulerian import numpy as np class main(): def __init__(self): # Frequency range for Fast-Fourier Transform self.freq_min = 1 self.freq_max = 5 self.BUFFER_LEN = 10 self.BUFFER = deque(maxlen=self.BUFFER_LEN) self.FPS_BUFFER = deque(maxlen=self.BUFFER_LEN) self.buffer_lock = threading.Lock() self.FPS = [] def video(self): cap = cv2.VideoCapture(0) print("VideoCapture") while len(self.BUFFER) < self.BUFFER_LEN: start_time = time.time() ret, frame = cap.read() frame = cv2.resize(frame, (500, 500)) # why resize if we later call a pyramid? self.BUFFER.append(frame) stop_time = time.time() self.FPS_BUFFER.append(stop_time-start_time) self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER))) print("Buffer ready") while True: start_time = time.time() ret, frame = cap.read() frame = cv2.resize(frame, (500, 500)) self.BUFFER.append(frame) stop_time = time.time() self.FPS_BUFFER.append(stop_time-start_time) #threading.Event().wait(0.02) self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER))) def processing(self): # Build Laplacian video pyramid while True: with self.buffer_lock: PROCESS_BUFFER = np.array(self.BUFFER) lap_video = pyramids.build_video_pyramid(PROCESS_BUFFER) amplified_video_pyramid = [] for i, video in enumerate(lap_video): if i == 0 or i == len(lap_video)-1: continue # Eulerian magnification with temporal FFT filtering result, fft, frequencies = eulerian.fft_filter(video, self.freq_min, self.freq_max, self.FPS) lap_video[i] += result # Calculate heart rate heart_rate = heartrate.find_heart_rate(fft, frequencies, self.freq_min, self.freq_max) # Collapse laplacian pyramid to generate final video #amplified_frames = pyramids.collapse_laplacian_video_pyramid(lap_video, len(self.BUFFER)) # Output heart rate and final video print("Heart rate: ", heart_rate, "bpm") threading.Event().wait(2) if __name__ == '__main__': MAIN = main() video_thread = threading.Thread(target=MAIN.video) processing_thread = threading.Thread(target=MAIN.processing) # Starte die Threads video_thread.start() time.sleep(2) print("__SYNCING___") processing_thread.start()