|
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- from collections import deque
- import threading
- import time
-
- import cv2
- import pyramids
- import heartrate
- import preprocessing
- import eulerian
- import numpy as np
-
- class main():
- def __init__(self):
- # Frequency range for Fast-Fourier Transform
- self.freq_min = 1
- self.freq_max = 5
- self.BUFFER_LEN = 10
- self.BUFFER = deque(maxlen=self.BUFFER_LEN)
- self.FPS_BUFFER = deque(maxlen=self.BUFFER_LEN)
- self.buffer_lock = threading.Lock()
- self.FPS = []
-
- def video(self):
- cap = cv2.VideoCapture(0)
-
- while len(self.BUFFER) < self.BUFFER_LEN:
- start_time = time.time()
- ret, frame = cap.read()
- frame = cv2.resize(frame, (500, 500))
- self.BUFFER.append(frame)
- stop_time = time.time()
- self.FPS_BUFFER.append(stop_time-start_time)
- self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER)))
-
- print("Buffer ready")
-
-
- while True:
- start_time = time.time()
- ret, frame = cap.read()
- frame = cv2.resize(frame, (500, 500))
- self.BUFFER.append(frame)
- stop_time = time.time()
- self.FPS_BUFFER.append(stop_time-start_time)
- #threading.Event().wait(0.02)
- self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER)))
-
-
-
- def processing(self):
- # Build Laplacian video pyramid
- while True:
- with self.buffer_lock:
- PROCESS_BUFFER = np.array(self.BUFFER)
- lap_video = pyramids.build_video_pyramid(PROCESS_BUFFER)
-
- amplified_video_pyramid = []
-
- for i, video in enumerate(lap_video):
- if i == 0 or i == len(lap_video)-1:
- continue
-
- # Eulerian magnification with temporal FFT filtering
- result, fft, frequencies = eulerian.fft_filter(video, self.freq_min, self.freq_max, self.FPS)
- lap_video[i] += result
-
- # Calculate heart rate
- heart_rate = heartrate.find_heart_rate(fft, frequencies, self.freq_min, self.freq_max)
-
- # Collapse laplacian pyramid to generate final video
- #amplified_frames = pyramids.collapse_laplacian_video_pyramid(lap_video, len(self.BUFFER))
-
- # Output heart rate and final video
- print("Heart rate: ", heart_rate, "bpm")
-
- threading.Event().wait(2)
-
-
-
- if __name__ == '__main__':
- MAIN = main()
-
- video_thread = threading.Thread(target=MAIN.video)
- processing_thread = threading.Thread(target=MAIN.processing)
-
- # Starte die Threads
- video_thread.start()
- time.sleep(2)
- print("__SYNCING___")
- processing_thread.start()
-
-
-
-
-
|