import numpy as np | |||||
from scipy.signal import butter, lfilter | |||||
import pyramid | |||||
import video | |||||
import numpy as np | |||||
from scipy.fft import fft, fftfreq | |||||
import numpy as np | |||||
from scipy import fftpack, signal | |||||
from numpy.fft import fft, fftfreq | |||||
# Temporal bandpass filter with Fast-Fourier Transform | |||||
def fft_filter(video, freq_min, freq_max, fps): | |||||
fft = fftpack.fft(video, axis=0) | |||||
frequencies = fftpack.fftfreq(video.shape[0], d=1.0 / fps) | |||||
bound_low = (np.abs(frequencies - freq_min)).argmin() | |||||
bound_high = (np.abs(frequencies - freq_max)).argmin() | |||||
# Zero out frequencies outside the desired range | |||||
fft[:bound_low] = 0 | |||||
fft[bound_high:] = 0 | |||||
# Apply inverse FFT to get the filtered video | |||||
filtered_video = np.abs(fftpack.ifft(fft, axis=0)) | |||||
return filtered_video, fft, frequencies | |||||
def find_breath_rate(fft, freqs, freq_min, freq_max): | |||||
fft_maximums = [] | |||||
# Iterate through the frequencies and accumulate FFT amplitude | |||||
for i in range(fft.shape[0]): | |||||
if freq_min <= freqs[i] <= freq_max: | |||||
fft_maximums.append(np.abs(fft[i]).max()) | |||||
else: | |||||
fft_maximums.append(0) | |||||
peaks, _ = signal.find_peaks(fft_maximums) | |||||
if len(peaks) == 0: | |||||
return 0 # No peaks found | |||||
max_peak = peaks[np.argmax([fft_maximums[peak] for peak in peaks])] | |||||
return freqs[max_peak] * 60 # Convert frequency (Hz) to breaths per minute (bpm) | |||||
def butter_bandpass(lowcut, highcut, fs, order=1): | |||||
""" | |||||
Calculates the Butterworth bandpass filter coefficients. | |||||
:param lowcut: Low frequency cutoff (e.g., 0.1 Hz for breath detection). | |||||
:param highcut: High frequency cutoff (e.g., 0.5 Hz for breath detection). | |||||
:param fs: Video frame rate (sampling frequency). | |||||
:param order: Filter order (default is 1). | |||||
:return: Numerator (b) and denominator (a) polynomials of the IIR filter. | |||||
""" | |||||
low = lowcut / (0.5 * fs) # Normalize the frequencies by Nyquist frequency | |||||
high = highcut / (0.5 * fs) | |||||
b, a = butter(order, [low, high], btype='band') | |||||
return b, a | |||||
def apply_butter(laplace_video_list, levels, alpha, cutoff=20, low=0.1, high=0.5, fps=30, width=512, height=512, linearAttenuation=True): | |||||
""" | |||||
Applies the Butterworth filter to the video sequence, magnifies the filtered video sequence, | |||||
and attenuates spatial frequencies for breath detection. | |||||
:param laplace_video_list: Laplace video pyramid. | |||||
:param levels: Pyramid levels. | |||||
:param alpha: Magnification factor. | |||||
:param cutoff: Spatial frequencies cutoff factor. | |||||
:param low: Temporal low frequency cutoff (related to the breath rate). | |||||
:param high: Temporal high frequency cutoff. | |||||
:param fps: Video frame rate. | |||||
:param width: Video frame width. | |||||
:param height: Video frame height. | |||||
:param linearAttenuation: Whether to apply linear attenuation. | |||||
:return: List of filtered video frames. | |||||
""" | |||||
print('Applying Butterworth filter...') | |||||
filtered_video_list = [] | |||||
b, a = butter_bandpass(low, high, fps, order=1) | |||||
# Spatial wavelength (lambda) | |||||
lambda1 = (width ** 2 + height ** 2) ** 0.5 | |||||
delta = cutoff / 8 / (1 + alpha) | |||||
for i in range(levels): # Iterate through pyramid levels | |||||
current_alpha = lambda1 / (8 * delta) - 1 # Alpha calculation | |||||
current_alpha /= 2 | |||||
# Apply the Butterworth filter to the temporal image sequence | |||||
filtered = lfilter(b, a, laplace_video_list[i], axis=0) | |||||
# Ignore the lowest and highest pyramid levels | |||||
if i == levels - 1 or i == 0: | |||||
filtered *= 0 | |||||
# Spatial frequencies attenuation | |||||
if current_alpha > alpha: | |||||
filtered *= alpha | |||||
else: | |||||
filtered *= current_alpha if linearAttenuation else 0 | |||||
filtered_video_list.append(filtered) | |||||
lambda1 /= 2 # Decrease lambda for the next level | |||||
return filtered_video_list | |||||
def start(video_frames, alpha=50, cutoff=16, low=0.1, high=0.5, linearAttenuation=True, chromAttenuation=0.4, fps=30, width=512, height=512, time_window = 5): | |||||
""" | |||||
Performs motion magnification on the video frames by applying Butterworth bandpass filter and saves the output video. | |||||
This can be used for detecting breathing patterns. | |||||
:param video_frames: Numpy array containing video frames (shape: [num_frames, height, width, channels]) | |||||
:param alpha: Magnification factor (e.g., 50 for subtle motion like breathing). | |||||
:param cutoff: Spatial frequencies cutoff factor. | |||||
:param low: Temporal low frequency cutoff (e.g., 0.1 Hz for breaths). | |||||
:param high: Temporal high frequency cutoff (e.g., 0.5 Hz for breaths). | |||||
:param fps: Frames per second of the video. | |||||
:param width: Width of the video frames. | |||||
:param height: Height of the video frames. | |||||
:return: Filtered video frames and estimated breathing rate. | |||||
""" | |||||
# Convert RGB to YIQ for luminance-based processing | |||||
yiq_video = video.rgb2yiq(video_frames) | |||||
# Determine pyramid levels for the video | |||||
levels = video.calculate_pyramid_levels(width, height) | |||||
# Build Laplacian pyramid for each video frame | |||||
lap_video_list = pyramid.laplacian_video_pyramid(yiq_video, levels) | |||||
# Apply Butterworth filter for breath frequencies (low=0.1 Hz, high=0.5 Hz) | |||||
filtered_video_list = apply_butter(lap_video_list, levels, alpha, cutoff, low, high, fps, width, height, linearAttenuation) | |||||
# Reconstruct the magnified video from the filtered pyramid | |||||
final_video = pyramid.reconstruct(filtered_video_list, levels) | |||||
filtered_video, fft_data, frequencies = fft_filter(final_video.mean(axis=(1, 2, 3)), low, high, fps) | |||||
bpm = find_breath_rate(fft_data, frequencies, low, high) | |||||
final_video += yiq_video | |||||
final_rgb = video.yiq2rgb(final_video) | |||||
final_rgb = np.clip(final_rgb, 0, 255).astype(np.uint8) | |||||
return final_rgb, bpm | |||||
[Parameters] | |||||
alpha = 20 | |||||
low = 60 | |||||
high = 120 | |||||
chromattenuation = 0.1 | |||||
mode = some_mode | |||||
[Video] | |||||
width = 1280 | |||||
height = 720 | |||||
fps = 30.0 | |||||
<?xml version="1.0" encoding="UTF-8"?> | |||||
<ui version="4.0"> | |||||
<class>Dialog</class> | |||||
<widget class="QDialog" name="Dialog"> | |||||
<property name="geometry"> | |||||
<rect> | |||||
<x>0</x> | |||||
<y>0</y> | |||||
<width>1151</width> | |||||
<height>676</height> | |||||
</rect> | |||||
</property> | |||||
<property name="windowTitle"> | |||||
<string>Dialog</string> | |||||
</property> | |||||
<widget class="QLabel" name="video"> | |||||
<property name="geometry"> | |||||
<rect> | |||||
<x>370</x> | |||||
<y>30</y> | |||||
<width>691</width> | |||||
<height>521</height> | |||||
</rect> | |||||
</property> | |||||
<property name="text"> | |||||
<string/> | |||||
</property> | |||||
</widget> | |||||
<widget class="QPushButton" name="playButton"> | |||||
<property name="geometry"> | |||||
<rect> | |||||
<x>340</x> | |||||
<y>610</y> | |||||
<width>85</width> | |||||
<height>27</height> | |||||
</rect> | |||||
</property> | |||||
<property name="text"> | |||||
<string>Play Video</string> | |||||
</property> | |||||
</widget> | |||||
<widget class="QWidget" name=""> | |||||
<property name="geometry"> | |||||
<rect> | |||||
<x>20</x> | |||||
<y>30</y> | |||||
<width>326</width> | |||||
<height>521</height> | |||||
</rect> | |||||
</property> | |||||
<layout class="QVBoxLayout" name="verticalLayout"> | |||||
<item> | |||||
<layout class="QHBoxLayout" name="horizontalLayout"> | |||||
<item> | |||||
<widget class="QPushButton" name="lButton"> | |||||
<property name="text"> | |||||
<string>Load Video</string> | |||||
</property> | |||||
</widget> | |||||
</item> | |||||
<item> | |||||
<widget class="QLabel" name="nameLabel"> | |||||
<property name="text"> | |||||
<string/> | |||||
</property> | |||||
</widget> | |||||
</item> | |||||
</layout> | |||||
</item> | |||||
<item> | |||||
<widget class="QComboBox" name="comboBox"> | |||||
<item> | |||||
<property name="text"> | |||||
<string>Motion Magnification</string> | |||||
</property> | |||||
</item> | |||||
<item> | |||||
<property name="text"> | |||||
<string>Color Magnification </string> | |||||
</property> | |||||
</item> | |||||
</widget> | |||||
</item> | |||||
<item> | |||||
<layout class="QHBoxLayout" name="horizontalLayout_3"> | |||||
<item> | |||||
<widget class="QLabel" name="label"> | |||||
<property name="text"> | |||||
<string>Maginfication Factor </string> | |||||
</property> | |||||
</widget> | |||||
</item> | |||||
<item> | |||||
<widget class="QLineEdit" name="alpha"/> | |||||
</item> | |||||
</layout> | |||||
</item> | |||||
<item> | |||||
<layout class="QHBoxLayout" name="horizontalLayout_4"> | |||||
<item> | |||||
<widget class="QLabel" name="label_2"> | |||||
<property name="text"> | |||||
<string>Spatial Frequency Cutoff</string> | |||||
</property> | |||||
</widget> | |||||
</item> | |||||
<item> | |||||
<widget class="QLineEdit" name="cutoff"/> | |||||
</item> | |||||
</layout> | |||||
</item> | |||||
<item> | |||||
<layout class="QHBoxLayout" name="horizontalLayout_2"> | |||||
<item> | |||||
<widget class="QLabel" name="label_3"> | |||||
<property name="text"> | |||||
<string>Temporal Frequency Passband</string> | |||||
</property> | |||||
</widget> | |||||
</item> | |||||
<item> | |||||
<widget class="QLineEdit" name="low"> | |||||
<property name="text"> | |||||
<string/> | |||||
</property> | |||||
</widget> | |||||
</item> | |||||
<item> | |||||
<widget class="QLineEdit" name="high"> | |||||
<property name="text"> | |||||
<string/> | |||||
</property> | |||||
</widget> | |||||
</item> | |||||
</layout> | |||||
</item> | |||||
<item> | |||||
<layout class="QHBoxLayout" name="horizontalLayout_5"> | |||||
<item> | |||||
<widget class="QLabel" name="label_4"> | |||||
<property name="text"> | |||||
<string>Chromatic Attenuation</string> | |||||
</property> | |||||
</widget> | |||||
</item> | |||||
<item> | |||||
<widget class="QDoubleSpinBox" name="chromAtt"> | |||||
<property name="maximum"> | |||||
<double>1.000000000000000</double> | |||||
</property> | |||||
<property name="singleStep"> | |||||
<double>0.100000000000000</double> | |||||
</property> | |||||
</widget> | |||||
</item> | |||||
</layout> | |||||
</item> | |||||
<item> | |||||
<widget class="QCheckBox" name="linearAtt"> | |||||
<property name="text"> | |||||
<string>Linear Attenuation</string> | |||||
</property> | |||||
</widget> | |||||
</item> | |||||
<item> | |||||
<widget class="QPushButton" name="startButton"> | |||||
<property name="text"> | |||||
<string>Start</string> | |||||
</property> | |||||
</widget> | |||||
</item> | |||||
<item> | |||||
<widget class="QLabel" name="finished"> | |||||
<property name="text"> | |||||
<string/> | |||||
</property> | |||||
<property name="alignment"> | |||||
<set>Qt::AlignCenter</set> | |||||
</property> | |||||
</widget> | |||||
</item> | |||||
</layout> | |||||
</widget> | |||||
</widget> | |||||
<resources/> | |||||
<connections/> | |||||
</ui> |
import scipy.fftpack as fftpack | |||||
import numpy as np | |||||
import cv2 | |||||
from scipy.signal import find_peaks | |||||
import pyramid | |||||
import video | |||||
def start(vidFile, alpha, low, high, chromAttenuation, fps, width, height): | |||||
''' | |||||
Performs color magnification on the video by applying an ideal bandpass filter, | |||||
i.e. applies a discrete fourier transform on the gaussian downsapled video and | |||||
cuts off the frequencies outside the bandpass filter, magnifies the result and | |||||
saves the output video. Additionally, it detects heartbeat and returns BPM. | |||||
:param vidFile: Video file | |||||
:param alpha: Magnification factor | |||||
:param low: Low frequency cut-off | |||||
:param high: High frequency cut-off | |||||
:param chromAttenuation: Chrominance attenuation factor | |||||
:param mode: Processing mode (unused in this example, but kept for compatibility) | |||||
:param fps: Frames per second of the video | |||||
:param width: Width of the video frame | |||||
:param height: Height of the video frame | |||||
:return: final processed video, heart rate in BPM | |||||
''' | |||||
# Convert from RGB to YIQ for better processing of chrominance information | |||||
t = video.rgb2yiq(vidFile) | |||||
levels = 4 | |||||
# Build Gaussian pyramid and use the highest level | |||||
gauss_video_list = pyramid.gaussian_video(t, levels) | |||||
print('Apply Ideal filter') | |||||
# Apply discrete Fourier transformation (real) | |||||
fft = fftpack.rfft(gauss_video_list, axis=0) | |||||
frequencies = fftpack.rfftfreq(fft.shape[0], d=1.0 / fps) # Sample frequencies | |||||
mask = np.logical_and(frequencies > low, frequencies < high) # Logical array if values between low and high frequencies | |||||
fft[~mask] = 0 # Cutoff values outside the bandpass | |||||
filtered = fftpack.irfft(fft, axis=0) # Inverse Fourier transformation | |||||
filtered *= alpha # Magnification | |||||
# Chromatic attenuation | |||||
filtered[:, :, :, 1] *= chromAttenuation | |||||
filtered[:, :, :, 2] *= chromAttenuation | |||||
print(chromAttenuation) | |||||
# Resize last Gaussian level to the frame size | |||||
filtered_video_list = np.zeros(t.shape) | |||||
for i in range(t.shape[0]): | |||||
f = filtered[i] | |||||
filtered_video_list[i] = cv2.resize(f, (t.shape[2], t.shape[1])) | |||||
final = filtered_video_list | |||||
# Add to original | |||||
final += t | |||||
# Convert back from YIQ to RGB | |||||
final = video.yiq2rgb(final) | |||||
# Cutoff invalid values | |||||
final[final < 0] = 0 | |||||
final[final > 255] = 255 | |||||
# Detect heartbeat and return BPM | |||||
bpm = detect_heartbeat(filtered_video_list, fps) | |||||
return final, bpm | |||||
def detect_heartbeat(video_frames, fps): | |||||
''' | |||||
Detects heartbeat by analyzing pixel intensity variations in the filtered video over time. | |||||
:param video_frames: Processed video frames (filtered and magnified) | |||||
:param fps: Frames per second of the video | |||||
:return: Detected heart rate in BPM (beats per minute) | |||||
''' | |||||
# Focus on the green channel for heart rate detection (more sensitive to blood flow changes) | |||||
green_channel = video_frames[:, :, :, 1] # Extract green channel | |||||
# Calculate the average intensity of the green channel for each frame | |||||
avg_intensity = np.mean(green_channel, axis=(1, 2)) # Shape: (num_frames,) | |||||
# Normalize intensity values | |||||
avg_intensity -= np.mean(avg_intensity) | |||||
avg_intensity /= np.std(avg_intensity) | |||||
# Detect peaks in the intensity signal (peaks correspond to heartbeats) | |||||
peaks, _ = find_peaks(avg_intensity, distance=fps // 2) # Ensure at least half a second between peaks | |||||
# Calculate the time differences between peaks to compute the heart rate | |||||
peak_intervals = np.diff(peaks) / fps # Convert frame intervals to seconds | |||||
if len(peak_intervals) > 0: | |||||
avg_heartbeat_interval = np.mean(peak_intervals) | |||||
bpm = 60 / avg_heartbeat_interval # Convert to beats per minute | |||||
else: | |||||
bpm = 0 # No peaks detected | |||||
return bpm |
#.................................. | |||||
#........Visualisierung 2.......... | |||||
#.................................. | |||||
#...Eulerian Video Magnification... | |||||
#.................................. | |||||
#.. Author: Galya Pavlova.......... | |||||
#.................................. | |||||
import os | |||||
import sys | |||||
import cv2 | |||||
from PyQt5.QtCore import QTimer | |||||
from PyQt5.QtGui import QPixmap, QImage | |||||
from PyQt5.QtWidgets import QApplication, QDialog, QFileDialog | |||||
from PyQt5.uic import loadUi | |||||
import butterworth_filter | |||||
import ideal_filter | |||||
class App(QDialog): | |||||
def __init__(self): | |||||
''' | |||||
Initializes and loads the GUI PyQt file | |||||
''' | |||||
super(App, self).__init__() | |||||
self.vid = None | |||||
self.name = None | |||||
self.capture = None | |||||
self.len = None | |||||
self.l = 0 | |||||
loadUi('gui.ui', self) | |||||
self.startButton.clicked.connect(self.on_start_clicked) | |||||
self.lButton.clicked.connect(self.open_file) | |||||
self.playButton.clicked.connect(self.play_video) | |||||
def play_video(self): | |||||
''' | |||||
A function to play a given video | |||||
''' | |||||
self.capture = cv2.VideoCapture(self.videoOut) | |||||
frame_rate = self.capture.get(cv2.CAP_PROP_FPS) | |||||
self.len = int(self.capture.get(cv2.CAP_PROP_FRAME_COUNT)) | |||||
self.timer = QTimer(self) | |||||
self.timer.timeout.connect(self.dispayImage) | |||||
self.timer.start(frame_rate) | |||||
def dispayImage(self): | |||||
''' | |||||
Each video frame is read and loaded | |||||
''' | |||||
self.l += 1 | |||||
if self.l >= self.len: | |||||
self.timer.stop() | |||||
self.timer.deleteLater() | |||||
self.l = 0 | |||||
ret, img = self.capture.read() | |||||
qformat = QImage.Format_RGB888 | |||||
outImage = QImage(img, img.shape[1], img.shape[0], qformat) | |||||
outImage = outImage.rgbSwapped() | |||||
self.video.setPixmap(QPixmap.fromImage(outImage)) | |||||
def open_file(self): | |||||
''' | |||||
Opens Files | |||||
''' | |||||
filename, _ = QFileDialog.getOpenFileName(self, 'Open Video File', '../', 'All Files(*)') | |||||
if filename: | |||||
self.vid = filename | |||||
base = os.path.basename(filename) | |||||
self.name = os.path.splitext(base)[0] | |||||
self.nameLabel.setText(base) | |||||
def on_start_clicked(self): | |||||
''' | |||||
Reads the input from the GUI and uses the parameters to start the program | |||||
''' | |||||
self.finished.clear() | |||||
QApplication.instance().processEvents() | |||||
alpha = float(self.alpha.text()) | |||||
cutoff = float(self.cutoff.text()) | |||||
low = float(self.low.text()) | |||||
high = float(self.high.text()) | |||||
chromAttenuation = float(self.chromAtt.text()) | |||||
linearAttenuation = self.linearAtt.isChecked() | |||||
mode = self.comboBox.currentIndex() | |||||
if mode == 0: | |||||
butterworth_filter.start(self.vid, alpha, cutoff, low, high, linearAttenuation, chromAttenuation, self.name) | |||||
else: | |||||
if mode == 1: | |||||
ideal_filter.start(self.vid, alpha, low, high, chromAttenuation, self.name) | |||||
self.finished.setText('Done!') | |||||
self.videoOut = self.name+"Out.avi" | |||||
if __name__ == "__main__": | |||||
app = QApplication(sys.argv) | |||||
window = App() | |||||
window.setWindowTitle('Eulerian Video Magnification') | |||||
window.show() | |||||
sys.exit(app.exec_()) | |||||
import sys | |||||
import cv2 | |||||
import numpy as np | |||||
from PyQt5.QtWidgets import ( | |||||
QApplication, QWidget, QFormLayout, QPushButton, QLabel, QHBoxLayout, QVBoxLayout, QComboBox | |||||
) | |||||
from PyQt5.QtGui import QImage, QPixmap | |||||
from PyQt5.QtCore import QTimer, Qt, QThread, pyqtSignal, QElapsedTimer | |||||
import ideal_filter # Ensure this is properly implemented | |||||
import butterworth_filter # Ensure this is properly implemented | |||||
class CameraThread(QThread): | |||||
frame_ready = pyqtSignal(np.ndarray) | |||||
def __init__(self): | |||||
super().__init__() | |||||
self.is_running = True | |||||
self.cap = cv2.VideoCapture(0) | |||||
self.fps = self.cap.get(cv2.CAP_PROP_FPS) or 30 | |||||
def run(self): | |||||
while self.is_running: | |||||
ret, frame = self.cap.read() | |||||
if ret: | |||||
self.frame_ready.emit(frame) | |||||
else: | |||||
print("Error: Could not read frame from camera.") | |||||
self.msleep(int(1000 / self.fps)) | |||||
def stop(self): | |||||
self.is_running = False | |||||
self.cap.release() | |||||
class FilterWorker(QThread): | |||||
result_ready = pyqtSignal(np.ndarray, float) | |||||
def __init__(self, buffer, alpha, chromAttenuation, fps, filter_type="Ideal", width=512, height=512, time_window=5): | |||||
super().__init__() | |||||
self.buffer = buffer | |||||
self.alpha = alpha | |||||
self.chromAttenuation = chromAttenuation | |||||
self.fps = fps | |||||
self.width = width | |||||
self.height = height | |||||
self.time_window = time_window | |||||
self.is_running = True | |||||
self.filter_type = filter_type | |||||
self.low, self.high = (1, 2.5) if filter_type == "Ideal" else (0.1, 0.5) | |||||
def run(self): | |||||
if self.filter_type == "Ideal": | |||||
final_video, bpm = ideal_filter.start( | |||||
vidFile=self.buffer, | |||||
alpha=self.alpha, | |||||
low=self.low, | |||||
high=self.high, | |||||
chromAttenuation=self.chromAttenuation, | |||||
fps=self.fps, | |||||
width=self.width, height=self.height | |||||
) | |||||
elif self.filter_type == "Butterworth": | |||||
final_video, bpm = butterworth_filter.start( | |||||
video_frames=self.buffer, | |||||
alpha=self.alpha, | |||||
low=self.low, | |||||
high=self.high, | |||||
chromAttenuation=self.chromAttenuation, | |||||
fps=self.fps, | |||||
width=self.width, | |||||
height=self.height, | |||||
time_window=self.time_window | |||||
) | |||||
if self.is_running: | |||||
self.result_ready.emit(final_video, bpm) | |||||
def stop(self): | |||||
self.is_running = False | |||||
class ParameterGUI(QWidget): | |||||
def __init__(self): | |||||
super().__init__() | |||||
self.setWindowTitle('Video Filtering Display') | |||||
self.setFixedSize(1400, 800) | |||||
self.setup_ui() | |||||
modelFile = "res10_300x300_ssd_iter_140000_fp16.caffemodel" | |||||
configFile = "deploy.prototxt" | |||||
self.face_net = cv2.dnn.readNetFromCaffe(configFile, modelFile) | |||||
self.face_buffer = [] | |||||
self.video_buffer = [] | |||||
self.buffer_length = 0 | |||||
self.elapsed_timer = QElapsedTimer() | |||||
self.is_processing = False | |||||
self.worker = None | |||||
self.camera_thread = None | |||||
def setup_ui(self): | |||||
layout = QVBoxLayout() | |||||
# ComboBoxes for user parameters | |||||
self.alphaMenu = QComboBox(self) | |||||
alpha_values = [5, 10, 15, 20, 30, 40, 50, 60] | |||||
self.alphaMenu.addItems([str(value) for value in alpha_values]) | |||||
self.chromAtt = QComboBox(self) | |||||
chrom_values = [0.0001, 0.001,0.01,0.1,0.5] | |||||
self.chromAtt.addItems([str(value) for value in chrom_values]) | |||||
self.timeWindowMenu = QComboBox(self) | |||||
self.timeWindowMenu.addItems(["5", "10", "15", "20"]) | |||||
self.filterMenu = QComboBox(self) | |||||
self.filterMenu.addItems(["Ideal", "Butterworth"]) | |||||
# Form layout for parameters | |||||
form_layout = QFormLayout() | |||||
form_layout.addRow("Alpha:", self.alphaMenu) | |||||
form_layout.addRow("ChromAttenuation:", self.chromAtt) | |||||
form_layout.addRow("Filter:", self.filterMenu) | |||||
form_layout.addRow("Time Window (seconds):", self.timeWindowMenu) | |||||
self.submitButton = QPushButton('Start Camera') | |||||
self.submitButton.clicked.connect(self.start_camera) | |||||
form_layout.addRow(self.submitButton) | |||||
layout.addLayout(form_layout) | |||||
# Layout for displaying video | |||||
video_layout = QHBoxLayout() | |||||
self.liveVideoLabel = QLabel(self) | |||||
self.liveVideoLabel.setFixedSize(640, 480) | |||||
self.processedVideoLabel = QLabel(self) | |||||
self.processedVideoLabel.setFixedSize(640, 480) | |||||
video_layout.addWidget(self.liveVideoLabel, alignment=Qt.AlignCenter) | |||||
video_layout.addWidget(self.processedVideoLabel, alignment=Qt.AlignCenter) | |||||
layout.addLayout(video_layout) | |||||
# BPM and status labels | |||||
self.bpmLabel = QLabel('BPM: ', self) | |||||
layout.addWidget(self.bpmLabel) | |||||
self.bufferStatusLabel = QLabel('Buffer status: Waiting...', self) | |||||
layout.addWidget(self.bufferStatusLabel) | |||||
self.filterStatusLabel = QLabel('Filter status: Not running', self) | |||||
layout.addWidget(self.filterStatusLabel) | |||||
self.ParameterStatusLabel = QLabel('No parameters set', self) | |||||
layout.addWidget(self.ParameterStatusLabel) | |||||
self.setLayout(layout) | |||||
def start_camera(self): | |||||
# Stop existing camera thread if it's running | |||||
if self.camera_thread is not None: | |||||
self.camera_thread.stop() | |||||
self.camera_thread.wait() | |||||
# Stop existing worker thread if it's running | |||||
if self.worker is not None: | |||||
self.worker.stop() | |||||
self.worker.wait() | |||||
# Stop any existing timer for video display | |||||
if not hasattr(self, 'timer'): | |||||
self.timer = QTimer(self) | |||||
if self.timer.isActive(): | |||||
self.timer.stop() # Stop any running timer before starting new camera session | |||||
# Reset buffers and status labels | |||||
self.face_buffer.clear() | |||||
self.video_buffer.clear() | |||||
self.is_processing = False | |||||
self.bufferStatusLabel.setText('Buffer status: Waiting...') | |||||
self.filterStatusLabel.setText('Filter status: Not running') | |||||
self.bpmLabel.setText('BPM: ') | |||||
# Fetch parameters from UI | |||||
self.alpha = int(self.alphaMenu.currentText()) | |||||
self.chromAttenuation = float(self.chromAtt.currentText()) | |||||
self.filter = str(self.filterMenu.currentText()) | |||||
self.timeWindow = int(self.timeWindowMenu.currentText()) | |||||
# Update the parameter status label | |||||
self.ParameterStatusLabel.setText(f'Alpha: {self.alpha} ChromAttenuation: {self.chromAttenuation} TimeWindow: {self.timeWindow}') | |||||
# Start the camera thread | |||||
self.camera_thread = CameraThread() # Initialize the new camera thread | |||||
self.camera_thread.frame_ready.connect(self.update_frame) | |||||
self.camera_thread.start() | |||||
# Set FPS and buffer length based on the camera's FPS | |||||
self.fps = self.camera_thread.fps | |||||
self.buffer_length = int(self.camera_thread.fps * self.timeWindow) | |||||
# Start the elapsed timer to measure buffering time | |||||
self.elapsed_timer.start() | |||||
def update_frame(self, frame): | |||||
if not self.is_processing: | |||||
self.bufferStatusLabel.setText('Buffer status: Filling up') | |||||
if self.filter == "Butterworth": | |||||
upper_body_region, coords = self.get_upper_body(frame) | |||||
if upper_body_region is not None: | |||||
upper_body_resized = cv2.resize(upper_body_region, (512, 512)) | |||||
self.video_buffer.append(upper_body_resized) | |||||
startX, startY, endX, endY = coords | |||||
cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) | |||||
if self.filter == "Ideal": | |||||
face_region = self.get_face(frame) | |||||
if face_region is not None: | |||||
face_region_resized = cv2.resize(face_region, (512, 512)) | |||||
#Weißabgleich | |||||
face_region_resized = cv2.GaussianBlur(face_region_resized,(25,25),0) | |||||
face_region_resized = cv2.medianBlur(face_region_resized,25) | |||||
self.face_buffer.append(face_region_resized) | |||||
if self.elapsed_timer.elapsed() >= self.timeWindow * 1000: | |||||
self.process_buffers() | |||||
self.elapsed_timer.restart() | |||||
# Display the live frame | |||||
frame_display = self.resize_frame(frame, self.liveVideoLabel) | |||||
frame_display = cv2.cvtColor(frame_display, cv2.COLOR_BGR2RGB) | |||||
height, width, channel = frame_display.shape | |||||
bytes_per_line = channel * width | |||||
q_img = QImage(frame_display.data, width, height, bytes_per_line, QImage.Format_RGB888) | |||||
self.liveVideoLabel.setPixmap(QPixmap.fromImage(q_img)) | |||||
def get_face(self, frame): | |||||
(h, w) = frame.shape[:2] | |||||
blob = cv2.dnn.blobFromImage(cv2.resize(frame, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0)) | |||||
self.face_net.setInput(blob) | |||||
detections = self.face_net.forward() | |||||
for i in range(0, detections.shape[2]): | |||||
confidence = detections[0, 0, i, 2] | |||||
if confidence > 0.5: | |||||
box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) | |||||
(startX, startY, endX, endY) = box.astype("int") | |||||
face_region = frame[startY:endY, startX:endX] | |||||
cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) | |||||
return face_region | |||||
return None | |||||
def get_upper_body(self, frame): | |||||
(h, w) = frame.shape[:2] | |||||
startY = int(h / 2) | |||||
endY = h | |||||
startX = int(w / 4) | |||||
endX = int(w * 3 / 4) | |||||
cropped_frame = frame[startY:endY, startX:endX] | |||||
return cropped_frame, (startX, startY, endX, endY) | |||||
def process_buffers(self): | |||||
if self.is_processing: | |||||
return | |||||
self.is_processing = True | |||||
self.bufferStatusLabel.setText('Buffer status: Completed') | |||||
self.filterStatusLabel.setText('Filter status: Running') | |||||
time_window = int(self.timeWindowMenu.currentText()) | |||||
if self.filter == "Ideal" and self.face_buffer: | |||||
self.worker = FilterWorker( | |||||
self.face_buffer.copy(), # Copy buffer before clearing | |||||
self.alpha, | |||||
self.chromAttenuation, | |||||
self.camera_thread.fps, | |||||
filter_type="Ideal", | |||||
time_window=time_window | |||||
) | |||||
self.worker.result_ready.connect(self.display_filtered_video) | |||||
self.worker.start() | |||||
self.face_buffer.clear() | |||||
elif self.filter == "Butterworth" and self.video_buffer: | |||||
self.worker = FilterWorker( | |||||
self.video_buffer.copy(), # Copy buffer before clearing | |||||
self.alpha, | |||||
self.chromAttenuation, | |||||
self.camera_thread.fps, | |||||
filter_type="Butterworth", | |||||
time_window=time_window | |||||
) | |||||
self.worker.result_ready.connect(self.display_filtered_video) | |||||
self.worker.start() | |||||
# Clear the buffer after starting the filter worker | |||||
self.video_buffer.clear() | |||||
def display_filtered_video(self, final_video, bpm): | |||||
self.bpmLabel.setText(f'BPM: {bpm:.2f}') | |||||
self.filterStatusLabel.setText('Filter status: Displaying video') | |||||
self.frame_index = 0 | |||||
self.final_video = final_video | |||||
# Stop the existing timer (if any) and set up a new timer for frame display | |||||
if hasattr(self, 'frame_timer'): | |||||
self.frame_timer.stop() | |||||
self.frame_timer = QTimer(self) | |||||
self.frame_timer.timeout.connect(lambda: self.show_filtered_frame(self.final_video)) | |||||
self.frame_timer.start(int(1000 / self.fps)) # Display frames based on FPS | |||||
print(self.fps) | |||||
def show_filtered_frame(self, final_video): | |||||
"""Displays each frame from the filtered video using a QTimer.""" | |||||
if self.frame_index < len(final_video): | |||||
frame = final_video[self.frame_index] | |||||
if frame.dtype == np.float64: | |||||
frame = cv2.normalize(frame, None, 0, 255, cv2.NORM_MINMAX) | |||||
frame = frame.astype(np.uint8) | |||||
# Resize and display the filtered frame | |||||
frame_resized = self.resize_frame(frame, self.processedVideoLabel) | |||||
frame_resized = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2RGB) | |||||
height, width, channel = frame_resized.shape | |||||
bytes_per_line = channel * width | |||||
q_img = QImage(frame_resized.data, width, height, bytes_per_line, QImage.Format_RGB888) | |||||
self.processedVideoLabel.setPixmap(QPixmap.fromImage(q_img)) | |||||
QApplication.processEvents() | |||||
self.frame_index += 1 | |||||
else: | |||||
# Stop the filtered video display timer | |||||
self.frame_timer.stop() | |||||
# Restart the live video feed | |||||
if hasattr(self, 'timer'): | |||||
self.timer.start(int(1000 / self.fps)) # Restart the live feed timer | |||||
else: | |||||
print("Error: Timer for live video is not initialized.") | |||||
self.filterStatusLabel.setText('Filter status: Completed') | |||||
self.is_processing = False | |||||
self.bufferStatusLabel.setText('Buffer status: Blocked') | |||||
def resize_frame(self, frame, label): | |||||
size = label.size() | |||||
return cv2.resize(frame, (size.width(), size.height())) | |||||
def closeEvent(self, event): | |||||
if self.camera_thread: | |||||
self.camera_thread.stop() | |||||
if self.worker: | |||||
self.worker.stop() | |||||
if self.frame_timer: | |||||
self.frame_timer.stop() | |||||
event.accept() | |||||
if __name__ == '__main__': | |||||
app = QApplication(sys.argv) | |||||
window = ParameterGUI() | |||||
window.show() | |||||
sys.exit(app.exec_()) | |||||
for i in range(frame_count): | |||||
ret, frame = cap.read() | |||||
if not ret: | |||||
print(f"Frame {i+1} konnte nicht gelesen werden.") | |||||
break | |||||
frames[i] = frame | |||||
# Optional: Vorschau anzeigen | |||||
cv2.imshow("Aufnahme", frame) | |||||
if cv2.waitKey(1) & 0xFF == ord('q'): # Beenden durch Drücken von 'q' | |||||
break | |||||
# Kamera und Fenster freigeben | |||||
cap.release() | |||||
cv2.destroyAllWindows() |
import configparser | |||||
# Erstelle eine Parser-Klasse für INI-Dateien | |||||
class ParameterParser: | |||||
def __init__(self, config_file='config.ini'): | |||||
self.config_file = config_file | |||||
self.config = configparser.ConfigParser() | |||||
self.config.read(self.config_file) | |||||
# Ensure 'Parameters' and 'Video' sections exist | |||||
if 'Parameters' not in self.config: | |||||
self.config['Parameters'] = {} | |||||
if 'Video' not in self.config: | |||||
self.config['Video'] = {} | |||||
def get_parameters(self): | |||||
# Lese die Parameter aus der Konfigurationsdatei | |||||
try: | |||||
alpha = self.config.getfloat('Parameters', 'alpha') | |||||
cutoff = self.config.getfloat('Parameters', 'cutoff') | |||||
low = self.config.getfloat('Parameters', 'low') | |||||
high = self.config.getfloat('Parameters', 'high') | |||||
chromAttenuation = self.config.getfloat('Parameters', 'chromAttenuation') | |||||
mode = self.config.getint('Parameters', 'mode') | |||||
# Read video parameters | |||||
width = self.config.getint('Video', 'width', fallback=1280) | |||||
height = self.config.getint('Video', 'height', fallback=720) | |||||
fps = self.config.getint('Video', 'fps', fallback=30) | |||||
return { | |||||
"alpha": alpha, | |||||
"cutoff": cutoff, | |||||
"low": low, | |||||
"high": high, | |||||
"chromAttenuation": chromAttenuation, | |||||
"mode": mode, | |||||
"width": width, | |||||
"height": height, | |||||
"fps": fps | |||||
} | |||||
except Exception as e: | |||||
print(f"Error reading config: {e}") | |||||
return None | |||||
def set_parameters(self, alpha, cutoff, low, high, chromAttenuation, mode, width, height, fps): | |||||
# Schreibe die Parameter in die Konfigurationsdatei | |||||
self.config['Parameters'] = { | |||||
'alpha': str(alpha), | |||||
'cutoff': str(cutoff), | |||||
'low': str(low), | |||||
'high': str(high), | |||||
'chromAttenuation': str(chromAttenuation), | |||||
'mode': str(mode) | |||||
} | |||||
# Save video parameters | |||||
self.config['Video'] = { | |||||
'width': str(width), | |||||
'height': str(height), | |||||
'fps': str(fps) | |||||
} | |||||
with open(self.config_file, 'w') as configfile: | |||||
self.config.write(configfile) | |||||
# Beispiel: Erstellen und Speichern von Parametern | |||||
if __name__ == "__main__": | |||||
parser = ParameterParser() | |||||
# Beispielwerte speichern | |||||
parser.set_parameters(0.5, 1.0, 0.2, 1.5, 0.8, 1, 1280, 720, 30) | |||||
# Parameter auslesen | |||||
parameters = parser.get_parameters() | |||||
print(parameters) |
#.................................. | |||||
#........Visualisierung 2.......... | |||||
#.................................. | |||||
#...Eulerian Video Magnification... | |||||
#.................................. | |||||
#.. Author: Galya Pavlova.......... | |||||
#.................................. | |||||
import cv2 | |||||
import numpy as np | |||||
def create_gaussian_pyramid(image, levels): | |||||
''' | |||||
Creates a Gaussian pyramid for each image. | |||||
:param image: An image, i.e video frame | |||||
:param levels: The Gaussian pyramid level | |||||
:return: Returns a pyramid of nr. levels images | |||||
''' | |||||
gauss = image.copy() | |||||
gauss_pyr = [gauss] | |||||
for level in range(1, levels): | |||||
gauss = cv2.pyrDown(gauss) | |||||
gauss_pyr.append(gauss) | |||||
return gauss_pyr | |||||
def gaussian_video(video_tensor, levels): | |||||
''' | |||||
For a given video sequence the function creates a video with | |||||
the highest (specified by levels) Gaussian pyramid level | |||||
:param video_tensor: Video sequence | |||||
:param levels: Specifies the Gaussian pyramid levels | |||||
:return: a video sequence where each frame is the downsampled of the original frame | |||||
''' | |||||
for i in range(0, video_tensor.shape[0]): | |||||
frame = video_tensor[i] | |||||
pyr = create_gaussian_pyramid(frame, levels) | |||||
gaussian_frame = pyr[-1] # use only highest gaussian level | |||||
if i == 0: # initialize one time | |||||
vid_data = np.zeros((video_tensor.shape[0], gaussian_frame.shape[0], gaussian_frame.shape[1], 3)) | |||||
vid_data[i] = gaussian_frame | |||||
return vid_data | |||||
def create_laplacian_pyramid(image, levels): | |||||
''' | |||||
Builds a Laplace pyramid for an image, i.e. video frame | |||||
:param image: Image, i.e. single video frame | |||||
:param levels: Specifies the Laplace pyramid levels | |||||
:return: Returns a pyramid of nr. levels images | |||||
''' | |||||
gauss_pyramid = create_gaussian_pyramid(image, levels) | |||||
laplace_pyramid = [] | |||||
for i in range(levels-1): | |||||
size = (gauss_pyramid[i].shape[1], gauss_pyramid[i].shape[0]) # reshape | |||||
laplace_pyramid.append(gauss_pyramid[i]-cv2.pyrUp(gauss_pyramid[i+1], dstsize=size)) | |||||
laplace_pyramid.append(gauss_pyramid[-1]) # add last gauss pyramid level | |||||
return laplace_pyramid | |||||
def laplacian_video_pyramid(video_stack, levels): | |||||
''' | |||||
Creates a Laplacian pyramid for the whole video sequence | |||||
:param video_stack: Video sequence | |||||
:param levels: Specifies the Laplace pyramid levels | |||||
:return: A two-dimensional array where the first index is used for the pyramid levels | |||||
and the second for each video frame | |||||
''' | |||||
print('Build laplace pyramid') | |||||
# "2 dimensional" array - first index for pyramid level, second for frames | |||||
laplace_video_pyramid = [[0 for x in range(video_stack.shape[0])] for x in range(levels)] | |||||
for i in range(video_stack.shape[0]): | |||||
frame = video_stack[i] | |||||
pyr = create_laplacian_pyramid(frame, levels) | |||||
for n in range(levels): | |||||
laplace_video_pyramid[n][i] = pyr[n] | |||||
return laplace_video_pyramid | |||||
def reconstruct(filtered_video, levels): | |||||
''' | |||||
Reconstructs a video sequence from the filtered Laplace video pyramid | |||||
:param filtered_video: 2 dimensional video sequence - 1st. index pyramid levels, 2nd. - video frames | |||||
:param levels: pyramid levels | |||||
:return: video sequence | |||||
''' | |||||
print('Reconstruct video') | |||||
final = np.empty(filtered_video[0].shape) | |||||
for i in range(filtered_video[0].shape[0]): # iterate through frames | |||||
up = filtered_video[-1][i] # highest level | |||||
for k in range(levels-1, 0, -1): # going down to lowest level | |||||
size = (filtered_video[k-1][i].shape[1], filtered_video[k-1][i].shape[0]) # reshape | |||||
up = cv2.pyrUp(up, dstsize=size) + filtered_video[k-1][i] | |||||
final[i] = up | |||||
return final |
import numpy as np | |||||
import cv2 | |||||
import platform | |||||
def calculate_pyramid_levels(vidWidth, vidHeight): | |||||
''' | |||||
Calculates the maximal pyramid levels for the Laplacian pyramid | |||||
:param vidWidth: video frames' width | |||||
:param vidHeight: video frames' height | |||||
''' | |||||
if vidWidth < vidHeight: | |||||
levels = int(np.log2(vidWidth)) | |||||
else: | |||||
levels = int(np.log2(vidHeight)) | |||||
return levels | |||||
def rgb2yiq(video): | |||||
''' | |||||
Converts the video color from RGB to YIQ (NTSC) | |||||
:param video: RGB video sequence | |||||
:return: YIQ-color video sequence | |||||
''' | |||||
yiq_from_rgb = np.array([[0.299, 0.587, 0.114], | |||||
[0.596, -0.274, -0.322], | |||||
[0.211, -0.523, 0.312]]) | |||||
t = np.dot(video, yiq_from_rgb.T) | |||||
return t | |||||
def yiq2rgb(video): | |||||
''' | |||||
Converts the video color from YIQ (NTSC) to RGB | |||||
:param video: YIQ-color video sequence | |||||
:return: RGB video sequence | |||||
''' | |||||
rgb_from_yiq = np.array([[1, 0.956, 0.621], | |||||
[1, -0.272, -0.647], | |||||
[1, -1.106, 1.703]]) | |||||
t = np.dot(video, rgb_from_yiq.T) | |||||
return t |