@@ -1,2 +0,0 @@ | |||
# EVM | |||
@@ -0,0 +1,18 @@ | |||
import numpy as np | |||
import scipy.fftpack as fftpack | |||
# Temporal bandpass filter with Fast-Fourier Transform | |||
def fft_filter(video, freq_min, freq_max, fps): | |||
fft = fftpack.fft(video, axis=0) | |||
frequencies = fftpack.fftfreq(video.shape[0], d=1.0 / fps) | |||
bound_low = (np.abs(frequencies - freq_min)).argmin() | |||
bound_high = (np.abs(frequencies - freq_max)).argmin() | |||
fft[:bound_low] = 0 | |||
fft[bound_high:-bound_high] = 0 | |||
fft[-bound_low:] = 0 | |||
iff = fftpack.ifft(fft, axis=0) | |||
result = np.abs(iff) | |||
result *= 100 # Amplification factor | |||
return result, fft, frequencies |
@@ -0,0 +1,25 @@ | |||
from scipy import signal | |||
# Calculate heart rate from FFT peaks | |||
def find_heart_rate(fft, freqs, freq_min, freq_max): | |||
fft_maximums = [] | |||
for i in range(fft.shape[0]): | |||
if freq_min <= freqs[i] <= freq_max: | |||
fftMap = abs(fft[i]) | |||
fft_maximums.append(fftMap.max()) | |||
else: | |||
fft_maximums.append(0) | |||
peaks, properties = signal.find_peaks(fft_maximums) | |||
max_peak = -1 | |||
max_freq = 0 | |||
# Find frequency with max amplitude in peaks | |||
for peak in peaks: | |||
if fft_maximums[peak] > max_freq: | |||
max_freq = fft_maximums[peak] | |||
max_peak = peak | |||
return freqs[max_peak] * 60 |
@@ -0,0 +1,94 @@ | |||
from collections import deque | |||
import threading | |||
import time | |||
import cv2 | |||
import pyramids | |||
import heartrate | |||
import preprocessing | |||
import eulerian | |||
import numpy as np | |||
class main(): | |||
def __init__(self): | |||
# Frequency range for Fast-Fourier Transform | |||
self.freq_min = 1 | |||
self.freq_max = 5 | |||
self.BUFFER_LEN = 10 | |||
self.BUFFER = deque(maxlen=self.BUFFER_LEN) | |||
self.FPS_BUFFER = deque(maxlen=self.BUFFER_LEN) | |||
self.buffer_lock = threading.Lock() | |||
self.FPS = [] | |||
def video(self): | |||
cap = cv2.VideoCapture(0) | |||
while len(self.BUFFER) < self.BUFFER_LEN: | |||
start_time = time.time() | |||
ret, frame = cap.read() | |||
frame = cv2.resize(frame, (500, 500)) | |||
self.BUFFER.append(frame) | |||
stop_time = time.time() | |||
self.FPS_BUFFER.append(stop_time-start_time) | |||
self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER))) | |||
print("Buffer ready") | |||
while True: | |||
start_time = time.time() | |||
ret, frame = cap.read() | |||
frame = cv2.resize(frame, (500, 500)) | |||
self.BUFFER.append(frame) | |||
stop_time = time.time() | |||
self.FPS_BUFFER.append(stop_time-start_time) | |||
#threading.Event().wait(0.02) | |||
self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER))) | |||
def processing(self): | |||
# Build Laplacian video pyramid | |||
while True: | |||
with self.buffer_lock: | |||
PROCESS_BUFFER = np.array(self.BUFFER) | |||
lap_video = pyramids.build_video_pyramid(PROCESS_BUFFER) | |||
amplified_video_pyramid = [] | |||
for i, video in enumerate(lap_video): | |||
if i == 0 or i == len(lap_video)-1: | |||
continue | |||
# Eulerian magnification with temporal FFT filtering | |||
result, fft, frequencies = eulerian.fft_filter(video, self.freq_min, self.freq_max, self.FPS) | |||
lap_video[i] += result | |||
# Calculate heart rate | |||
heart_rate = heartrate.find_heart_rate(fft, frequencies, self.freq_min, self.freq_max) | |||
# Collapse laplacian pyramid to generate final video | |||
#amplified_frames = pyramids.collapse_laplacian_video_pyramid(lap_video, len(self.BUFFER)) | |||
# Output heart rate and final video | |||
print("Heart rate: ", heart_rate, "bpm") | |||
threading.Event().wait(2) | |||
if __name__ == '__main__': | |||
MAIN = main() | |||
video_thread = threading.Thread(target=MAIN.video) | |||
processing_thread = threading.Thread(target=MAIN.processing) | |||
# Starte die Threads | |||
video_thread.start() | |||
time.sleep(2) | |||
print("__SYNCING___") | |||
processing_thread.start() | |||
@@ -0,0 +1,38 @@ | |||
import cv2 | |||
import numpy as np | |||
faceCascade = cv2.CascadeClassifier("haarcascades/haarcascade_frontalface_alt0.xml") | |||
# Read in and simultaneously preprocess video | |||
def read_video(path): | |||
cap = cv2.VideoCapture(path) | |||
fps = int(cap.get(cv2.CAP_PROP_FPS)) | |||
video_frames = [] | |||
face_rects = () | |||
while cap.isOpened(): | |||
ret, img = cap.read() | |||
if not ret: | |||
break | |||
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) | |||
roi_frame = img | |||
# Detect face | |||
if len(video_frames) == 0: | |||
face_rects = faceCascade.detectMultiScale(gray, 1.3, 5) | |||
# Select ROI | |||
if len(face_rects) > 0: | |||
for (x, y, w, h) in face_rects: | |||
roi_frame = img[y:y + h, x:x + w] | |||
if roi_frame.size != img.size: | |||
roi_frame = cv2.resize(roi_frame, (500, 500)) | |||
frame = np.ndarray(shape=roi_frame.shape, dtype="float") | |||
frame[:] = roi_frame * (1. / 255) | |||
video_frames.append(frame) | |||
frame_ct = len(video_frames) | |||
cap.release() | |||
return video_frames, frame_ct, fps |
@@ -0,0 +1,73 @@ | |||
import cv2 | |||
import numpy as np | |||
# Build Gaussian image pyramid | |||
def build_gaussian_pyramid(img, levels): | |||
float_img = np.ndarray(shape=img.shape, dtype="float") | |||
float_img[:] = img | |||
pyramid = [float_img] | |||
for i in range(levels-1): | |||
float_img = cv2.pyrDown(float_img) | |||
pyramid.append(float_img) | |||
return pyramid | |||
# Build Laplacian image pyramid from Gaussian pyramid | |||
def build_laplacian_pyramid(img, levels): | |||
gaussian_pyramid = build_gaussian_pyramid(img, levels) | |||
laplacian_pyramid = [] | |||
for i in range(levels-1): | |||
upsampled = cv2.pyrUp(gaussian_pyramid[i+1]) | |||
(height, width, depth) = upsampled.shape | |||
gaussian_pyramid[i] = cv2.resize(gaussian_pyramid[i], (height, width)) | |||
diff = cv2.subtract(gaussian_pyramid[i],upsampled) | |||
laplacian_pyramid.append(diff) | |||
laplacian_pyramid.append(gaussian_pyramid[-1]) | |||
return laplacian_pyramid | |||
# Build video pyramid by building Laplacian pyramid for each frame | |||
def build_video_pyramid(frames): | |||
lap_video = [] | |||
for i, frame in enumerate(frames): | |||
pyramid = build_laplacian_pyramid(frame, 3) | |||
for j in range(3): | |||
if i == 0: | |||
lap_video.append(np.zeros((len(frames), pyramid[j].shape[0], pyramid[j].shape[1], 3))) | |||
lap_video[j][i] = pyramid[j] | |||
return lap_video | |||
# Collapse video pyramid by collapsing each frame's Laplacian pyramid | |||
def collapse_laplacian_video_pyramid(video, frame_ct): | |||
collapsed_video = [] | |||
for i in range(frame_ct): | |||
prev_frame = video[-1][i] | |||
for level in range(len(video) - 1, 0, -1): | |||
pyr_up_frame = cv2.pyrUp(prev_frame) | |||
(height, width, depth) = pyr_up_frame.shape | |||
prev_level_frame = video[level - 1][i] | |||
prev_level_frame = cv2.resize(prev_level_frame, (height, width)) | |||
prev_frame = pyr_up_frame + prev_level_frame | |||
# Normalize pixel values | |||
min_val = min(0.0, prev_frame.min()) | |||
prev_frame = prev_frame + min_val | |||
max_val = max(1.0, prev_frame.max()) | |||
prev_frame = prev_frame / max_val | |||
prev_frame = prev_frame * 255 | |||
prev_frame = cv2.convertScaleAbs(prev_frame) | |||
collapsed_video.append(prev_frame) | |||
return collapsed_video |
@@ -1,3 +0,0 @@ | |||
print("Hallo, Welt!") | |||
print("Hallo, Welt 2 !") | |||
print("Hallo, Welt 3 !") |