Jonathan Frei 11 months ago
parent
commit
bbcc1cd3da

+ 18
- 0
EVM ohne Buffer/eulerian.py View File

@@ -0,0 +1,18 @@
import numpy as np
import scipy.fftpack as fftpack


# Temporal bandpass filter with Fast-Fourier Transform
def fft_filter(video, freq_min, freq_max, fps):
fft = fftpack.fft(video, axis=0)
frequencies = fftpack.fftfreq(video.shape[0], d=1.0 / fps)
bound_low = (np.abs(frequencies - freq_min)).argmin()
bound_high = (np.abs(frequencies - freq_max)).argmin()
fft[:bound_low] = 0
fft[bound_high:-bound_high] = 0
fft[-bound_low:] = 0
iff = fftpack.ifft(fft, axis=0)
result = np.abs(iff)
result *= 100 # Amplification factor

return result, fft, frequencies

+ 25
- 0
EVM ohne Buffer/heartrate.py View File

@@ -0,0 +1,25 @@
from scipy import signal


# Calculate heart rate from FFT peaks
def find_heart_rate(fft, freqs, freq_min, freq_max):
fft_maximums = []

for i in range(fft.shape[0]):
if freq_min <= freqs[i] <= freq_max:
fftMap = abs(fft[i])
fft_maximums.append(fftMap.max())
else:
fft_maximums.append(0)

peaks, properties = signal.find_peaks(fft_maximums)
max_peak = -1
max_freq = 0

# Find frequency with max amplitude in peaks
for peak in peaks:
if fft_maximums[peak] > max_freq:
max_freq = fft_maximums[peak]
max_peak = peak

return freqs[max_peak] * 60

+ 95
- 0
EVM ohne Buffer/main.py View File

@@ -0,0 +1,95 @@
from collections import deque
import threading
import time

import cv2
import pyramids
import heartrate
import preprocessing
import eulerian
import numpy as np

class main():
def __init__(self):
# Frequency range for Fast-Fourier Transform
self.freq_min = 1
self.freq_max = 5
self.BUFFER_LEN = 10
self.BUFFER = deque(maxlen=self.BUFFER_LEN)
self.FPS_BUFFER = deque(maxlen=self.BUFFER_LEN)
self.buffer_lock = threading.Lock()
self.FPS = []

def video(self):
cap = cv2.VideoCapture(0)

while len(self.BUFFER) < self.BUFFER_LEN:
start_time = time.time()
ret, frame = cap.read()
frame = cv2.resize(frame, (500, 500))
self.BUFFER.append(frame)
stop_time = time.time()
self.FPS_BUFFER.append(stop_time-start_time)
self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER)))

print("Buffer ready")


while True:
start_time = time.time()
ret, frame = cap.read()
frame = cv2.resize(frame, (500, 500))
self.BUFFER.append(frame)
stop_time = time.time()
self.FPS_BUFFER.append(stop_time-start_time)
#threading.Event().wait(0.02)
self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER)))


def processing(self):
# Build Laplacian video pyramid
while True:
with self.buffer_lock:
PROCESS_BUFFER = np.array(self.BUFFER)
lap_video = pyramids.build_video_pyramid(PROCESS_BUFFER)

amplified_video_pyramid = []

for i, video in enumerate(lap_video):
if i == 0 or i == len(lap_video)-1:
continue

# Eulerian magnification with temporal FFT filtering
result, fft, frequencies = eulerian.fft_filter(video, self.freq_min, self.freq_max, self.FPS)
lap_video[i] += result

# Calculate heart rate
heart_rate = heartrate.find_heart_rate(fft, frequencies, self.freq_min, self.freq_max)

# Collapse laplacian pyramid to generate final video
#amplified_frames = pyramids.collapse_laplacian_video_pyramid(lap_video, len(self.BUFFER))

# Output heart rate and final video
print("Heart rate: ", heart_rate, "bpm")

threading.Event().wait(2)


if __name__ == '__main__':
MAIN = main()
video_thread = threading.Thread(target=MAIN.video)
processing_thread = threading.Thread(target=MAIN.processing)

# Starte die Threads
video_thread.start()
time.sleep(2)
print("__SYNCING___")
processing_thread.start()





+ 38
- 0
EVM ohne Buffer/preprocessing.py View File

@@ -0,0 +1,38 @@
import cv2
import numpy as np

faceCascade = cv2.CascadeClassifier("haarcascades/haarcascade_frontalface_alt0.xml")


# Read in and simultaneously preprocess video
def read_video(path):
cap = cv2.VideoCapture(path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
video_frames = []
face_rects = ()

while cap.isOpened():
ret, img = cap.read()
if not ret:
break
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
roi_frame = img

# Detect face
if len(video_frames) == 0:
face_rects = faceCascade.detectMultiScale(gray, 1.3, 5)

# Select ROI
if len(face_rects) > 0:
for (x, y, w, h) in face_rects:
roi_frame = img[y:y + h, x:x + w]
if roi_frame.size != img.size:
roi_frame = cv2.resize(roi_frame, (500, 500))
frame = np.ndarray(shape=roi_frame.shape, dtype="float")
frame[:] = roi_frame * (1. / 255)
video_frames.append(frame)

frame_ct = len(video_frames)
cap.release()

return video_frames, frame_ct, fps

+ 73
- 0
EVM ohne Buffer/pyramids.py View File

@@ -0,0 +1,73 @@
import cv2
import numpy as np


# Build Gaussian image pyramid
def build_gaussian_pyramid(img, levels):
float_img = np.ndarray(shape=img.shape, dtype="float")
float_img[:] = img
pyramid = [float_img]

for i in range(levels-1):
float_img = cv2.pyrDown(float_img)
pyramid.append(float_img)

return pyramid


# Build Laplacian image pyramid from Gaussian pyramid
def build_laplacian_pyramid(img, levels):
gaussian_pyramid = build_gaussian_pyramid(img, levels)
laplacian_pyramid = []

for i in range(levels-1):
upsampled = cv2.pyrUp(gaussian_pyramid[i+1])
(height, width, depth) = upsampled.shape
gaussian_pyramid[i] = cv2.resize(gaussian_pyramid[i], (height, width))
diff = cv2.subtract(gaussian_pyramid[i],upsampled)
laplacian_pyramid.append(diff)

laplacian_pyramid.append(gaussian_pyramid[-1])

return laplacian_pyramid


# Build video pyramid by building Laplacian pyramid for each frame
def build_video_pyramid(frames):
lap_video = []

for i, frame in enumerate(frames):
pyramid = build_laplacian_pyramid(frame, 3)
for j in range(3):
if i == 0:
lap_video.append(np.zeros((len(frames), pyramid[j].shape[0], pyramid[j].shape[1], 3)))
lap_video[j][i] = pyramid[j]

return lap_video


# Collapse video pyramid by collapsing each frame's Laplacian pyramid
def collapse_laplacian_video_pyramid(video, frame_ct):
collapsed_video = []

for i in range(frame_ct):
prev_frame = video[-1][i]

for level in range(len(video) - 1, 0, -1):
pyr_up_frame = cv2.pyrUp(prev_frame)
(height, width, depth) = pyr_up_frame.shape
prev_level_frame = video[level - 1][i]
prev_level_frame = cv2.resize(prev_level_frame, (height, width))
prev_frame = pyr_up_frame + prev_level_frame

# Normalize pixel values
min_val = min(0.0, prev_frame.min())
prev_frame = prev_frame + min_val
max_val = max(1.0, prev_frame.max())
prev_frame = prev_frame / max_val
prev_frame = prev_frame * 255

prev_frame = cv2.convertScaleAbs(prev_frame)
collapsed_video.append(prev_frame)

return collapsed_video

+ 18
- 0
eulerian.py View File

@@ -0,0 +1,18 @@
import numpy as np
import scipy.fftpack as fftpack


# Temporal bandpass filter with Fast-Fourier Transform
def fft_filter(video, freq_min, freq_max, fps):
fft = fftpack.fft(video, axis=0)
frequencies = fftpack.fftfreq(video.shape[0], d=1.0 / fps)
bound_low = (np.abs(frequencies - freq_min)).argmin()
bound_high = (np.abs(frequencies - freq_max)).argmin()
fft[:bound_low] = 0
fft[bound_high:-bound_high] = 0
fft[-bound_low:] = 0
iff = fftpack.ifft(fft, axis=0)
result = np.abs(iff)
result *= 100 # Amplification factor

return result, fft, frequencies

+ 25
- 0
heartrate.py View File

@@ -0,0 +1,25 @@
from scipy import signal


# Calculate heart rate from FFT peaks
def find_heart_rate(fft, freqs, freq_min, freq_max):
fft_maximums = []

for i in range(fft.shape[0]):
if freq_min <= freqs[i] <= freq_max:
fftMap = abs(fft[i])
fft_maximums.append(fftMap.max())
else:
fft_maximums.append(0)

peaks, properties = signal.find_peaks(fft_maximums)
max_peak = -1
max_freq = 0

# Find frequency with max amplitude in peaks
for peak in peaks:
if fft_maximums[peak] > max_freq:
max_freq = fft_maximums[peak]
max_peak = peak

return freqs[max_peak] * 60

+ 95
- 0
main.py View File

@@ -0,0 +1,95 @@
from collections import deque
import threading
import time

import cv2
import pyramids
import heartrate
import preprocessing
import eulerian
import numpy as np

class main():
def __init__(self):
# Frequency range for Fast-Fourier Transform
self.freq_min = 1
self.freq_max = 5
self.BUFFER_LEN = 10
self.BUFFER = deque(maxlen=self.BUFFER_LEN)
self.FPS_BUFFER = deque(maxlen=self.BUFFER_LEN)
self.buffer_lock = threading.Lock()
self.FPS = []

def video(self):
cap = cv2.VideoCapture(0)

while len(self.BUFFER) < self.BUFFER_LEN:
start_time = time.time()
ret, frame = cap.read()
frame = cv2.resize(frame, (500, 500))
self.BUFFER.append(frame)
stop_time = time.time()
self.FPS_BUFFER.append(stop_time-start_time)
self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER)))

print("Buffer ready")


while True:
start_time = time.time()
ret, frame = cap.read()
frame = cv2.resize(frame, (500, 500))
self.BUFFER.append(frame)
stop_time = time.time()
self.FPS_BUFFER.append(stop_time-start_time)
#threading.Event().wait(0.02)
self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER)))


def processing(self):
# Build Laplacian video pyramid
while True:
with self.buffer_lock:
PROCESS_BUFFER = np.array(self.BUFFER)
lap_video = pyramids.build_video_pyramid(PROCESS_BUFFER)

amplified_video_pyramid = []

for i, video in enumerate(lap_video):
if i == 0 or i == len(lap_video)-1:
continue

# Eulerian magnification with temporal FFT filtering
result, fft, frequencies = eulerian.fft_filter(video, self.freq_min, self.freq_max, self.FPS)
lap_video[i] += result

# Calculate heart rate
heart_rate = heartrate.find_heart_rate(fft, frequencies, self.freq_min, self.freq_max)

# Collapse laplacian pyramid to generate final video
#amplified_frames = pyramids.collapse_laplacian_video_pyramid(lap_video, len(self.BUFFER))

# Output heart rate and final video
print("Heart rate: ", heart_rate, "bpm")

threading.Event().wait(2)


if __name__ == '__main__':
MAIN = main()
video_thread = threading.Thread(target=MAIN.video)
processing_thread = threading.Thread(target=MAIN.processing)

# Starte die Threads
video_thread.start()
time.sleep(2)
print("__SYNCING___")
processing_thread.start()





+ 38
- 0
preprocessing.py View File

@@ -0,0 +1,38 @@
import cv2
import numpy as np

faceCascade = cv2.CascadeClassifier("haarcascades/haarcascade_frontalface_alt0.xml")


# Read in and simultaneously preprocess video
def read_video(path):
cap = cv2.VideoCapture(path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
video_frames = []
face_rects = ()

while cap.isOpened():
ret, img = cap.read()
if not ret:
break
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
roi_frame = img

# Detect face
if len(video_frames) == 0:
face_rects = faceCascade.detectMultiScale(gray, 1.3, 5)

# Select ROI
if len(face_rects) > 0:
for (x, y, w, h) in face_rects:
roi_frame = img[y:y + h, x:x + w]
if roi_frame.size != img.size:
roi_frame = cv2.resize(roi_frame, (500, 500))
frame = np.ndarray(shape=roi_frame.shape, dtype="float")
frame[:] = roi_frame * (1. / 255)
video_frames.append(frame)

frame_ct = len(video_frames)
cap.release()

return video_frames, frame_ct, fps

+ 73
- 0
pyramids.py View File

@@ -0,0 +1,73 @@
import cv2
import numpy as np


# Build Gaussian image pyramid
def build_gaussian_pyramid(img, levels):
float_img = np.ndarray(shape=img.shape, dtype="float")
float_img[:] = img
pyramid = [float_img]

for i in range(levels-1):
float_img = cv2.pyrDown(float_img)
pyramid.append(float_img)

return pyramid


# Build Laplacian image pyramid from Gaussian pyramid
def build_laplacian_pyramid(img, levels):
gaussian_pyramid = build_gaussian_pyramid(img, levels)
laplacian_pyramid = []

for i in range(levels-1):
upsampled = cv2.pyrUp(gaussian_pyramid[i+1])
(height, width, depth) = upsampled.shape
gaussian_pyramid[i] = cv2.resize(gaussian_pyramid[i], (height, width))
diff = cv2.subtract(gaussian_pyramid[i],upsampled)
laplacian_pyramid.append(diff)

laplacian_pyramid.append(gaussian_pyramid[-1])

return laplacian_pyramid


# Build video pyramid by building Laplacian pyramid for each frame
def build_video_pyramid(frames):
lap_video = []

for i, frame in enumerate(frames):
pyramid = build_laplacian_pyramid(frame, 3)
for j in range(3):
if i == 0:
lap_video.append(np.zeros((len(frames), pyramid[j].shape[0], pyramid[j].shape[1], 3)))
lap_video[j][i] = pyramid[j]

return lap_video


# Collapse video pyramid by collapsing each frame's Laplacian pyramid
def collapse_laplacian_video_pyramid(video, frame_ct):
collapsed_video = []

for i in range(frame_ct):
prev_frame = video[-1][i]

for level in range(len(video) - 1, 0, -1):
pyr_up_frame = cv2.pyrUp(prev_frame)
(height, width, depth) = pyr_up_frame.shape
prev_level_frame = video[level - 1][i]
prev_level_frame = cv2.resize(prev_level_frame, (height, width))
prev_frame = pyr_up_frame + prev_level_frame

# Normalize pixel values
min_val = min(0.0, prev_frame.min())
prev_frame = prev_frame + min_val
max_val = max(1.0, prev_frame.max())
prev_frame = prev_frame / max_val
prev_frame = prev_frame * 255

prev_frame = cv2.convertScaleAbs(prev_frame)
collapsed_video.append(prev_frame)

return collapsed_video

Loading…
Cancel
Save