EVM/main.py

96 lines
2.7 KiB
Python

from collections import deque
import threading
import time
import cv2
import pyramids
import heartrate
import preprocessing
import eulerian
import numpy as np
class main():
def __init__(self):
# Frequency range for Fast-Fourier Transform
self.freq_min = 1
self.freq_max = 5
self.BUFFER_LEN = 10
self.BUFFER = deque(maxlen=self.BUFFER_LEN)
self.FPS_BUFFER = deque(maxlen=self.BUFFER_LEN)
self.buffer_lock = threading.Lock()
self.FPS = []
def video(self):
cap = cv2.VideoCapture(0)
while len(self.BUFFER) < self.BUFFER_LEN:
start_time = time.time()
ret, frame = cap.read()
frame = cv2.resize(frame, (500, 500))
self.BUFFER.append(frame)
stop_time = time.time()
self.FPS_BUFFER.append(stop_time-start_time)
self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER)))
print("Buffer ready")
while True:
start_time = time.time()
ret, frame = cap.read()
frame = cv2.resize(frame, (500, 500))
self.BUFFER.append(frame)
stop_time = time.time()
self.FPS_BUFFER.append(stop_time-start_time)
#threading.Event().wait(0.02)
self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER)))
def processing(self):
# Build Laplacian video pyramid
while True:
with self.buffer_lock:
PROCESS_BUFFER = np.array(self.BUFFER)
lap_video = pyramids.build_video_pyramid(PROCESS_BUFFER)
amplified_video_pyramid = []
for i, video in enumerate(lap_video):
if i == 0 or i == len(lap_video)-1:
continue
# Eulerian magnification with temporal FFT filtering
result, fft, frequencies = eulerian.fft_filter(video, self.freq_min, self.freq_max, self.FPS)
lap_video[i] += result
# Calculate heart rate
heart_rate = heartrate.find_heart_rate(fft, frequencies, self.freq_min, self.freq_max)
# Collapse laplacian pyramid to generate final video
#amplified_frames = pyramids.collapse_laplacian_video_pyramid(lap_video, len(self.BUFFER))
# Output heart rate and final video
print("Heart rate: ", heart_rate, "bpm")
threading.Event().wait(2)
if __name__ == '__main__':
MAIN = main()
video_thread = threading.Thread(target=MAIN.video)
processing_thread = threading.Thread(target=MAIN.processing)
# Starte die Threads
video_thread.start()
time.sleep(2)
print("__SYNCING___")
processing_thread.start()