diff --git a/eulerian.py b/eulerian.py deleted file mode 100644 index 729f49f..0000000 --- a/eulerian.py +++ /dev/null @@ -1,18 +0,0 @@ -import numpy as np -import scipy.fftpack as fftpack - - -# Temporal bandpass filter with Fast-Fourier Transform -def fft_filter(video, freq_min, freq_max, fps): - fft = fftpack.fft(video, axis=0) - frequencies = fftpack.fftfreq(video.shape[0], d=1.0 / fps) - bound_low = (np.abs(frequencies - freq_min)).argmin() - bound_high = (np.abs(frequencies - freq_max)).argmin() - fft[:bound_low] = 0 - fft[bound_high:-bound_high] = 0 - fft[-bound_low:] = 0 - iff = fftpack.ifft(fft, axis=0) - result = np.abs(iff) - result *= 100 # Amplification factor - - return result, fft, frequencies \ No newline at end of file diff --git a/heartrate.py b/heartrate.py deleted file mode 100644 index 3168300..0000000 --- a/heartrate.py +++ /dev/null @@ -1,25 +0,0 @@ -from scipy import signal - - -# Calculate heart rate from FFT peaks -def find_heart_rate(fft, freqs, freq_min, freq_max): - fft_maximums = [] - - for i in range(fft.shape[0]): - if freq_min <= freqs[i] <= freq_max: - fftMap = abs(fft[i]) - fft_maximums.append(fftMap.max()) - else: - fft_maximums.append(0) - - peaks, properties = signal.find_peaks(fft_maximums) - max_peak = -1 - max_freq = 0 - - # Find frequency with max amplitude in peaks - for peak in peaks: - if fft_maximums[peak] > max_freq: - max_freq = fft_maximums[peak] - max_peak = peak - - return freqs[max_peak] * 60 diff --git a/main.py b/main.py deleted file mode 100644 index 2906e28..0000000 --- a/main.py +++ /dev/null @@ -1,95 +0,0 @@ -from collections import deque -import threading -import time - -import cv2 -import pyramids -import heartrate -import preprocessing -import eulerian -import numpy as np - -class main(): - def __init__(self): - # Frequency range for Fast-Fourier Transform - self.freq_min = 1 - self.freq_max = 5 - self.BUFFER_LEN = 10 - self.BUFFER = deque(maxlen=self.BUFFER_LEN) - self.FPS_BUFFER = deque(maxlen=self.BUFFER_LEN) - self.buffer_lock = threading.Lock() - self.FPS = [] - - def video(self): - cap = cv2.VideoCapture(0) - - while len(self.BUFFER) < self.BUFFER_LEN: - start_time = time.time() - ret, frame = cap.read() - frame = cv2.resize(frame, (500, 500)) - self.BUFFER.append(frame) - stop_time = time.time() - self.FPS_BUFFER.append(stop_time-start_time) - self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER))) - - print("Buffer ready") - - - while True: - start_time = time.time() - ret, frame = cap.read() - frame = cv2.resize(frame, (500, 500)) - self.BUFFER.append(frame) - stop_time = time.time() - self.FPS_BUFFER.append(stop_time-start_time) - #threading.Event().wait(0.02) - self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER))) - - - - def processing(self): - # Build Laplacian video pyramid - while True: - with self.buffer_lock: - PROCESS_BUFFER = np.array(self.BUFFER) - lap_video = pyramids.build_video_pyramid(PROCESS_BUFFER) - - amplified_video_pyramid = [] - - for i, video in enumerate(lap_video): - if i == 0 or i == len(lap_video)-1: - continue - - # Eulerian magnification with temporal FFT filtering - result, fft, frequencies = eulerian.fft_filter(video, self.freq_min, self.freq_max, self.FPS) - lap_video[i] += result - - # Calculate heart rate - heart_rate = heartrate.find_heart_rate(fft, frequencies, self.freq_min, self.freq_max) - - # Collapse laplacian pyramid to generate final video - #amplified_frames = pyramids.collapse_laplacian_video_pyramid(lap_video, len(self.BUFFER)) - - # Output heart rate and final video - print("Heart rate: ", heart_rate, "bpm") - - threading.Event().wait(2) - - - -if __name__ == '__main__': - MAIN = main() - - video_thread = threading.Thread(target=MAIN.video) - processing_thread = threading.Thread(target=MAIN.processing) - - # Starte die Threads - video_thread.start() - time.sleep(2) - print("__SYNCING___") - processing_thread.start() - - - - - diff --git a/preprocessing.py b/preprocessing.py deleted file mode 100644 index a62eb40..0000000 --- a/preprocessing.py +++ /dev/null @@ -1,38 +0,0 @@ -import cv2 -import numpy as np - -faceCascade = cv2.CascadeClassifier("haarcascades/haarcascade_frontalface_alt0.xml") - - -# Read in and simultaneously preprocess video -def read_video(path): - cap = cv2.VideoCapture(path) - fps = int(cap.get(cv2.CAP_PROP_FPS)) - video_frames = [] - face_rects = () - - while cap.isOpened(): - ret, img = cap.read() - if not ret: - break - gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) - roi_frame = img - - # Detect face - if len(video_frames) == 0: - face_rects = faceCascade.detectMultiScale(gray, 1.3, 5) - - # Select ROI - if len(face_rects) > 0: - for (x, y, w, h) in face_rects: - roi_frame = img[y:y + h, x:x + w] - if roi_frame.size != img.size: - roi_frame = cv2.resize(roi_frame, (500, 500)) - frame = np.ndarray(shape=roi_frame.shape, dtype="float") - frame[:] = roi_frame * (1. / 255) - video_frames.append(frame) - - frame_ct = len(video_frames) - cap.release() - - return video_frames, frame_ct, fps diff --git a/pyramids.py b/pyramids.py deleted file mode 100644 index 814df40..0000000 --- a/pyramids.py +++ /dev/null @@ -1,73 +0,0 @@ -import cv2 -import numpy as np - - -# Build Gaussian image pyramid -def build_gaussian_pyramid(img, levels): - float_img = np.ndarray(shape=img.shape, dtype="float") - float_img[:] = img - pyramid = [float_img] - - for i in range(levels-1): - float_img = cv2.pyrDown(float_img) - pyramid.append(float_img) - - return pyramid - - -# Build Laplacian image pyramid from Gaussian pyramid -def build_laplacian_pyramid(img, levels): - gaussian_pyramid = build_gaussian_pyramid(img, levels) - laplacian_pyramid = [] - - for i in range(levels-1): - upsampled = cv2.pyrUp(gaussian_pyramid[i+1]) - (height, width, depth) = upsampled.shape - gaussian_pyramid[i] = cv2.resize(gaussian_pyramid[i], (height, width)) - diff = cv2.subtract(gaussian_pyramid[i],upsampled) - laplacian_pyramid.append(diff) - - laplacian_pyramid.append(gaussian_pyramid[-1]) - - return laplacian_pyramid - - -# Build video pyramid by building Laplacian pyramid for each frame -def build_video_pyramid(frames): - lap_video = [] - - for i, frame in enumerate(frames): - pyramid = build_laplacian_pyramid(frame, 3) - for j in range(3): - if i == 0: - lap_video.append(np.zeros((len(frames), pyramid[j].shape[0], pyramid[j].shape[1], 3))) - lap_video[j][i] = pyramid[j] - - return lap_video - - -# Collapse video pyramid by collapsing each frame's Laplacian pyramid -def collapse_laplacian_video_pyramid(video, frame_ct): - collapsed_video = [] - - for i in range(frame_ct): - prev_frame = video[-1][i] - - for level in range(len(video) - 1, 0, -1): - pyr_up_frame = cv2.pyrUp(prev_frame) - (height, width, depth) = pyr_up_frame.shape - prev_level_frame = video[level - 1][i] - prev_level_frame = cv2.resize(prev_level_frame, (height, width)) - prev_frame = pyr_up_frame + prev_level_frame - - # Normalize pixel values - min_val = min(0.0, prev_frame.min()) - prev_frame = prev_frame + min_val - max_val = max(1.0, prev_frame.max()) - prev_frame = prev_frame / max_val - prev_frame = prev_frame * 255 - - prev_frame = cv2.convertScaleAbs(prev_frame) - collapsed_video.append(prev_frame) - - return collapsed_video