import numpy as np | |||||
import scipy.fftpack as fftpack | |||||
# Temporal bandpass filter with Fast-Fourier Transform | |||||
def fft_filter(video, freq_min, freq_max, fps): | |||||
fft = fftpack.fft(video, axis=0) | |||||
frequencies = fftpack.fftfreq(video.shape[0], d=1.0 / fps) | |||||
bound_low = (np.abs(frequencies - freq_min)).argmin() | |||||
bound_high = (np.abs(frequencies - freq_max)).argmin() | |||||
fft[:bound_low] = 0 | |||||
fft[bound_high:-bound_high] = 0 | |||||
fft[-bound_low:] = 0 | |||||
iff = fftpack.ifft(fft, axis=0) | |||||
result = np.abs(iff) | |||||
result *= 100 # Amplification factor | |||||
return result, fft, frequencies |
from scipy import signal | |||||
# Calculate heart rate from FFT peaks | |||||
def find_heart_rate(fft, freqs, freq_min, freq_max): | |||||
fft_maximums = [] | |||||
for i in range(fft.shape[0]): | |||||
if freq_min <= freqs[i] <= freq_max: | |||||
fftMap = abs(fft[i]) | |||||
fft_maximums.append(fftMap.max()) | |||||
else: | |||||
fft_maximums.append(0) | |||||
peaks, properties = signal.find_peaks(fft_maximums) | |||||
max_peak = -1 | |||||
max_freq = 0 | |||||
# Find frequency with max amplitude in peaks | |||||
for peak in peaks: | |||||
if fft_maximums[peak] > max_freq: | |||||
max_freq = fft_maximums[peak] | |||||
max_peak = peak | |||||
return freqs[max_peak] * 60 |
from collections import deque | |||||
import threading | |||||
import time | |||||
import cv2 | |||||
import pyramids | |||||
import heartrate | |||||
import preprocessing | |||||
import eulerian | |||||
import numpy as np | |||||
class main(): | |||||
def __init__(self): | |||||
# Frequency range for Fast-Fourier Transform | |||||
self.freq_min = 1 | |||||
self.freq_max = 5 | |||||
self.BUFFER_LEN = 10 | |||||
self.BUFFER = deque(maxlen=self.BUFFER_LEN) | |||||
self.FPS_BUFFER = deque(maxlen=self.BUFFER_LEN) | |||||
self.buffer_lock = threading.Lock() | |||||
self.FPS = [] | |||||
def video(self): | |||||
cap = cv2.VideoCapture(0) | |||||
while len(self.BUFFER) < self.BUFFER_LEN: | |||||
start_time = time.time() | |||||
ret, frame = cap.read() | |||||
frame = cv2.resize(frame, (500, 500)) | |||||
self.BUFFER.append(frame) | |||||
stop_time = time.time() | |||||
self.FPS_BUFFER.append(stop_time-start_time) | |||||
self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER))) | |||||
print("Buffer ready") | |||||
while True: | |||||
start_time = time.time() | |||||
ret, frame = cap.read() | |||||
frame = cv2.resize(frame, (500, 500)) | |||||
self.BUFFER.append(frame) | |||||
stop_time = time.time() | |||||
self.FPS_BUFFER.append(stop_time-start_time) | |||||
#threading.Event().wait(0.02) | |||||
self.FPS = round(1 / np.mean(np.array(self.FPS_BUFFER))) | |||||
def processing(self): | |||||
# Build Laplacian video pyramid | |||||
while True: | |||||
with self.buffer_lock: | |||||
PROCESS_BUFFER = np.array(self.BUFFER) | |||||
lap_video = pyramids.build_video_pyramid(PROCESS_BUFFER) | |||||
amplified_video_pyramid = [] | |||||
for i, video in enumerate(lap_video): | |||||
if i == 0 or i == len(lap_video)-1: | |||||
continue | |||||
# Eulerian magnification with temporal FFT filtering | |||||
result, fft, frequencies = eulerian.fft_filter(video, self.freq_min, self.freq_max, self.FPS) | |||||
lap_video[i] += result | |||||
# Calculate heart rate | |||||
heart_rate = heartrate.find_heart_rate(fft, frequencies, self.freq_min, self.freq_max) | |||||
# Collapse laplacian pyramid to generate final video | |||||
#amplified_frames = pyramids.collapse_laplacian_video_pyramid(lap_video, len(self.BUFFER)) | |||||
# Output heart rate and final video | |||||
print("Heart rate: ", heart_rate, "bpm") | |||||
threading.Event().wait(2) | |||||
if __name__ == '__main__': | |||||
MAIN = main() | |||||
video_thread = threading.Thread(target=MAIN.video) | |||||
processing_thread = threading.Thread(target=MAIN.processing) | |||||
# Starte die Threads | |||||
video_thread.start() | |||||
time.sleep(2) | |||||
print("__SYNCING___") | |||||
processing_thread.start() | |||||
import cv2 | |||||
import numpy as np | |||||
faceCascade = cv2.CascadeClassifier("haarcascades/haarcascade_frontalface_alt0.xml") | |||||
# Read in and simultaneously preprocess video | |||||
def read_video(path): | |||||
cap = cv2.VideoCapture(path) | |||||
fps = int(cap.get(cv2.CAP_PROP_FPS)) | |||||
video_frames = [] | |||||
face_rects = () | |||||
while cap.isOpened(): | |||||
ret, img = cap.read() | |||||
if not ret: | |||||
break | |||||
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) | |||||
roi_frame = img | |||||
# Detect face | |||||
if len(video_frames) == 0: | |||||
face_rects = faceCascade.detectMultiScale(gray, 1.3, 5) | |||||
# Select ROI | |||||
if len(face_rects) > 0: | |||||
for (x, y, w, h) in face_rects: | |||||
roi_frame = img[y:y + h, x:x + w] | |||||
if roi_frame.size != img.size: | |||||
roi_frame = cv2.resize(roi_frame, (500, 500)) | |||||
frame = np.ndarray(shape=roi_frame.shape, dtype="float") | |||||
frame[:] = roi_frame * (1. / 255) | |||||
video_frames.append(frame) | |||||
frame_ct = len(video_frames) | |||||
cap.release() | |||||
return video_frames, frame_ct, fps |
import cv2 | |||||
import numpy as np | |||||
# Build Gaussian image pyramid | |||||
def build_gaussian_pyramid(img, levels): | |||||
float_img = np.ndarray(shape=img.shape, dtype="float") | |||||
float_img[:] = img | |||||
pyramid = [float_img] | |||||
for i in range(levels-1): | |||||
float_img = cv2.pyrDown(float_img) | |||||
pyramid.append(float_img) | |||||
return pyramid | |||||
# Build Laplacian image pyramid from Gaussian pyramid | |||||
def build_laplacian_pyramid(img, levels): | |||||
gaussian_pyramid = build_gaussian_pyramid(img, levels) | |||||
laplacian_pyramid = [] | |||||
for i in range(levels-1): | |||||
upsampled = cv2.pyrUp(gaussian_pyramid[i+1]) | |||||
(height, width, depth) = upsampled.shape | |||||
gaussian_pyramid[i] = cv2.resize(gaussian_pyramid[i], (height, width)) | |||||
diff = cv2.subtract(gaussian_pyramid[i],upsampled) | |||||
laplacian_pyramid.append(diff) | |||||
laplacian_pyramid.append(gaussian_pyramid[-1]) | |||||
return laplacian_pyramid | |||||
# Build video pyramid by building Laplacian pyramid for each frame | |||||
def build_video_pyramid(frames): | |||||
lap_video = [] | |||||
for i, frame in enumerate(frames): | |||||
pyramid = build_laplacian_pyramid(frame, 3) | |||||
for j in range(3): | |||||
if i == 0: | |||||
lap_video.append(np.zeros((len(frames), pyramid[j].shape[0], pyramid[j].shape[1], 3))) | |||||
lap_video[j][i] = pyramid[j] | |||||
return lap_video | |||||
# Collapse video pyramid by collapsing each frame's Laplacian pyramid | |||||
def collapse_laplacian_video_pyramid(video, frame_ct): | |||||
collapsed_video = [] | |||||
for i in range(frame_ct): | |||||
prev_frame = video[-1][i] | |||||
for level in range(len(video) - 1, 0, -1): | |||||
pyr_up_frame = cv2.pyrUp(prev_frame) | |||||
(height, width, depth) = pyr_up_frame.shape | |||||
prev_level_frame = video[level - 1][i] | |||||
prev_level_frame = cv2.resize(prev_level_frame, (height, width)) | |||||
prev_frame = pyr_up_frame + prev_level_frame | |||||
# Normalize pixel values | |||||
min_val = min(0.0, prev_frame.min()) | |||||
prev_frame = prev_frame + min_val | |||||
max_val = max(1.0, prev_frame.max()) | |||||
prev_frame = prev_frame / max_val | |||||
prev_frame = prev_frame * 255 | |||||
prev_frame = cv2.convertScaleAbs(prev_frame) | |||||
collapsed_video.append(prev_frame) | |||||
return collapsed_video |
# EVM | |||||
# Driving Simulator Team Video | |||||
## Projektbeschreibung | |||||
ToDo | |||||
## Installation | |||||
pip install -r requirements.txt | |||||
## Verwendung | |||||
ToDo | |||||
## Beiträge | |||||
ToDo | |||||
## Lizenz | |||||
ToDo | |||||
## Danksagungen | |||||
ToDo |
# Author Contribution Team Video | |||||
## Authors | |||||
### Roberto Gelsinger | |||||
- Contribution: Algorithm development, Python Code | |||||
- Email: gelsingerro81137@th-nuernberg.de | |||||
## Contribution | |||||
### Roberto Gelsinger | |||||
#### General Contributen | |||||
- **Creating README.md** | |||||
- **Creating requiremenents.txt** | |||||
- **Creating author_contribution.md structure** | |||||
- **Added Docstrings to the code** | |||||
- **Creating the Trello Agile Kanban Board and updating Team Video Tasks** | |||||
#### Code Contribution | |||||
- **facedetection.py** | |||||
- Developed face and forehead detection using OpenCV. | |||||
- **heartrate.py** | |||||
- Implemented and optimized the heart rate calculation using SciPy | |||||
- **main.py** | |||||
- Created user interface using Tkinter for recording, processing and testing porpuses. | |||||
- Connected all functionalities of the code to create a effective testing environment | |||||
- **processing.py** | |||||
- Handled video data processing and applied image processing algorithms. | |||||
- Collaborated in the development of video manipulation techniques for analysis. | |||||
- **pyramids.py** | |||||
- Constructed image pyramids and performed image analysis. | |||||
- Employed computer vision techniques for feature extraction and manipulation. | |||||
- **recording.py** | |||||
- Implemented video recording functionalities. | |||||
- Developed modules for media file management and real-time video capture using threading. | |||||
- **constants.py** | |||||
- Established global constants and configuration parameters. | |||||
- Defined critical frequency and alpha value parameters for system-wide use. | |||||
- **eulerian.py** | |||||
- Applies the maginfication alpha to the signal | |||||
- **excel_processing.py** | |||||
- Is used to create test cases and prcoess the value for the test case | |||||
- Values and input is saved to a excel | |||||
- **excel_update.py** | |||||
- Creates entry in testrun excel file | |||||
- Calculates the deviation and colors the excel cells. | |||||
- Calculates the deviation for each test case and adds a overall deviation for the test run | |||||
#### Testing Contribution | |||||
- **Design and implement solution for test automation** | |||||
- **Create testcase sample for test automation** | |||||
- **Testing and optimizing the code** | |||||
--- | |||||
<div style="display: flex; justify-content: space-around; align-items: center;"> | |||||
<div> | |||||
<hr style="width: 200px;"> | |||||
<p style="text-align: center;">Roberto Gelsinger</p> | |||||
</div> | |||||
<div> | |||||
<hr style="width: 200px;"> | |||||
<p style="text-align: center;">Author 2</p> | |||||
</div> | |||||
<div> | |||||
<hr style="width: 200px;"> | |||||
<p style="text-align: center;">Author 3</p> | |||||
</div> | |||||
</div> |
""" | |||||
Parameter: | |||||
-minimale und maximale Frequenz | |||||
-Alpha-Wert | |||||
Autor: Roberto Gelsinger | |||||
Datum: 07.12.2023 | |||||
Version: Modulversion | |||||
""" | |||||
freq_min = 1 # Minimale Frequenzgrenze | |||||
freq_max = 3 # Maximale Frequenzgrenze | |||||
alpha = 100 # Alpha-Wert für die Analyse |
""" | |||||
Abhängigkeiten: | |||||
- numpy | |||||
- scipy.signal (butter, lfilter) | |||||
- constants (für die Verwendung von 'alpha') | |||||
Autor: Roberto Gelsinger | |||||
Datum: 07.12.2023 | |||||
Version: Modulversion | |||||
""" | |||||
import numpy as np | |||||
from scipy.signal import butter, lfilter | |||||
from constants import alpha | |||||
def fft_filter(video, freq_min, freq_max, fps): | |||||
""" | |||||
Diese Funktion nimmt Videodaten, eine Frequenzbandbreite und die Bildrate (FPS) des Videos entgegen. | |||||
Sie filtert das Video, um nur Frequenzen im angegebenen Band zu verstärken. Das verstärkte Video, die FFT-Daten | |||||
und die Frequenzen werden zurückgegeben. | |||||
Args: | |||||
video (ndarray): Die Videodaten als ndarray. | |||||
freq_min (float): Die untere Grenzfrequenz des zu verstärkenden Frequenzbands. | |||||
freq_max (float): Die obere Grenzfrequenz des zu verstärkenden Frequenzbands. | |||||
fps (int): Die Bildrate (Frames pro Sekunde) des Videos. | |||||
Returns: | |||||
tuple: Ein Tupel, bestehend aus: | |||||
- amplified_video (ndarray): Das verstärkte Videodaten als ndarray. | |||||
- fft (ndarray): Die FFT-Daten des verstärkten Videos. | |||||
- frequencies (ndarray): Die zugehörigen Frequenzen der FFT. | |||||
""" | |||||
nyquist = 0.5 * fps | |||||
low = freq_min / nyquist | |||||
high = freq_max / nyquist | |||||
# Min-Max-Frequenzen filtern | |||||
b, a = butter(4, [low, high], btype='band') | |||||
filtered_video = np.zeros_like(video) | |||||
for i in range(video.shape[2]): | |||||
filtered_video[:, :, i] = lfilter(b, a, video[:, :, i]) | |||||
# Verstärkung | |||||
amplified_video = np.abs(filtered_video) * alpha | |||||
fft = np.fft.fft(amplified_video, axis=0) | |||||
frequencies = np.fft.fftfreq(amplified_video.shape[0], d=1.0 / fps) | |||||
return amplified_video, fft, frequencies |
""" | |||||
Abhängigkeiten: | |||||
- pyramids (für den Aufbau der Bildpyramiden) | |||||
- heartrate (zur Berechnung der Herzfrequenz) | |||||
- preprocessing (für die Video-Vorverarbeitung) | |||||
- eulerian (für die Euler'sche Video-Magnifikation) | |||||
- tkinter und constants (für die GUI und Konstantenverwaltung) | |||||
Autor: Roberto Gelsinger | |||||
Datum: 07.12.2023 | |||||
Version: Modulversion | |||||
""" | |||||
import pyramids | |||||
import heartrate | |||||
import facedetection | |||||
import eulerian | |||||
from constants import freq_max, freq_min | |||||
import pandas as pd | |||||
from excel_update import color_cells_based_on_deviation | |||||
def process_video_for_excel(selected_video_name): | |||||
""" | |||||
Verarbeitet ein ausgewähltes Video, um die Herzfrequenz der abgebildeten Person zu ermitteln. | |||||
Dieser Prozess umfasst die Vorverarbeitung des Videos, den Aufbau einer Laplace-Pyramide, | |||||
die Anwendung von FFT-Filterung und Euler'scher Magnifikation, und schließlich die Berechnung | |||||
der Herzfrequenz aus den Video-Daten. | |||||
Args: | |||||
selected_video_name (str): Der Name des zu verarbeitenden Videos. | |||||
Returns: | |||||
None: Die Funktion gibt direkt die berechnete Herzfrequenz auf der Konsole aus. | |||||
""" | |||||
print("Reading + preprocessing video...") | |||||
video_frames, frame_ct, fps = facedetection.read_video("videos/"+selected_video_name) | |||||
print("Building Laplacian video pyramid...") | |||||
lap_video = pyramids.build_video_pyramid(video_frames) | |||||
for i, video in enumerate(lap_video): | |||||
if i == 0 or i == len(lap_video)-1: | |||||
continue | |||||
print("Running FFT and Eulerian magnification...") | |||||
result, fft, frequencies = eulerian.fft_filter(video, freq_min, freq_max, fps) | |||||
lap_video[i] += result | |||||
print("Calculating heart rate...") | |||||
heart_rate = heartrate.find_heart_rate(fft, frequencies, freq_min, freq_max) | |||||
print("Heart rate: ", heart_rate*0.7, "bpm") | |||||
return heart_rate *0.7 | |||||
def process_all_videos_and_save_results(testcase_excel_file_path, testruns_excel_file_path, code_version, kommentar): | |||||
try: | |||||
df_testruns = pd.read_excel(testruns_excel_file_path) | |||||
except FileNotFoundError: | |||||
df_testruns = pd.DataFrame() | |||||
df_testcases = pd.read_excel(testcase_excel_file_path) | |||||
existing_testcases = [col for col in df_testruns.columns if col.startswith('Testcase_')] | |||||
new_testcases = [f'Testcase_{tc}' for tc in df_testcases['Testcase'] if f'Testcase_{tc}' not in existing_testcases] | |||||
if df_testruns.empty: | |||||
df_testruns = pd.DataFrame(columns=['Testnummer', 'Codeversion', 'Kommentar', 'Abweichung']) | |||||
for col in new_testcases: | |||||
df_testruns[col] = None | |||||
df_testruns.to_excel(testruns_excel_file_path, index=False) | |||||
if new_testcases: | |||||
print(f"Folgende neue Testcases wurden hinzugefügt: {new_testcases}") | |||||
else: | |||||
print("Keine neuen Testcases zum Hinzufügen gefunden.") | |||||
next_testcase_index = len(df_testruns) + 1 | |||||
new_run = { | |||||
'Testnummer': next_testcase_index, | |||||
'Codeversion': code_version, | |||||
'Kommentar': kommentar, | |||||
'Abweichung': 'Wert_für_Abweichung' | |||||
} | |||||
for index, row in df_testcases.iterrows(): | |||||
video_name = row['VideoName'] | |||||
heart_rate = process_video_for_excel(video_name) | |||||
testcase_column_name = f'Testcase_{row["Testcase"]}' | |||||
new_run[testcase_column_name] = heart_rate | |||||
try: | |||||
df_testruns = df_testruns._append(new_run, ignore_index=True) | |||||
except TypeError: | |||||
pass | |||||
df_testruns.to_excel(testruns_excel_file_path, index=False) | |||||
print("Testrun wurde verarbeitet und das Ergebnis in der Testruns-Excel-Datei gespeichert.") | |||||
color_cells_based_on_deviation(testruns_excel_file_path, testcase_excel_file_path) | |||||
print("Zellen gefärbt") |
import openpyxl | |||||
from openpyxl.styles import PatternFill | |||||
import pandas as pd | |||||
def fill_cell(ws, cell, color): | |||||
fill = PatternFill(start_color=color, end_color=color, fill_type='solid') | |||||
cell.fill = fill | |||||
def calculate_and_fill_deviation(ws, row, absolute_deviations): | |||||
if absolute_deviations: | |||||
average_deviation = sum(absolute_deviations) / len(absolute_deviations) | |||||
deviation_cell = ws.cell(row=row[0].row, column=4) # Angenommen, die 'Abweichung'-Spalte ist Spalte D | |||||
deviation_cell.value = average_deviation | |||||
# Färbe die Zelle basierend auf der durchschnittlichen Abweichung | |||||
if average_deviation < 5: | |||||
fill_color = 'FF00FF00' # Grün | |||||
elif 5 <= average_deviation < 10: | |||||
fill_color = 'FFFFFF00' # Gelb | |||||
else: | |||||
fill_color = 'FFFF0000' # Rot | |||||
fill_cell(ws, deviation_cell, fill_color) | |||||
def color_cells_based_on_deviation(testruns_excel_file_path, testcases_excel_file_path): | |||||
wb_testruns = openpyxl.load_workbook(testruns_excel_file_path) | |||||
ws_testruns = wb_testruns.active | |||||
df_testcases = pd.read_excel(testcases_excel_file_path) | |||||
for row in ws_testruns.iter_rows(min_row=2, max_row=ws_testruns.max_row): | |||||
deviations = [] | |||||
absolute_deviations = [] | |||||
for cell in row[4:]: | |||||
header_cell_value = ws_testruns.cell(row=1, column=cell.column).value | |||||
if header_cell_value and "Testcase" in header_cell_value: | |||||
testcase_num = int(header_cell_value.split('_')[1]) | |||||
expected_pulse_row = df_testcases[df_testcases['Testcase'] == testcase_num] | |||||
if not expected_pulse_row.empty: | |||||
expected_pulse = expected_pulse_row.iloc[0]['Puls'] | |||||
actual_pulse = cell.value | |||||
if actual_pulse is not None and expected_pulse is not None: | |||||
relative_deviation = (actual_pulse - expected_pulse) / expected_pulse * 100 | |||||
absolute_deviation = abs(relative_deviation) | |||||
deviations.append(relative_deviation) | |||||
absolute_deviations.append(absolute_deviation) | |||||
if absolute_deviation < 5: | |||||
fill_color = 'FF00FF00' # Grün | |||||
elif 5 <= absolute_deviation < 10: | |||||
fill_color = 'FFFFA500' if relative_deviation < 0 else 'FFFFFF00' # Orange für niedriger, Gelb für höher | |||||
else: | |||||
fill_color = 'FFC0CB' if relative_deviation < 0 else 'FFFF0000' # Rosa für niedriger, Rot für höher | |||||
fill_cell(ws_testruns, cell, fill_color) | |||||
calculate_and_fill_deviation(ws_testruns, row, absolute_deviations) | |||||
wb_testruns.save(testruns_excel_file_path) |
""" | |||||
Abhängigkeiten: | |||||
- cv2 (OpenCV-Paket) | |||||
- numpy | |||||
Autor: Ihr Name | |||||
Datum: Erstellungs- oder Änderungsdatum | |||||
Version: Modulversion | |||||
""" | |||||
import cv2 | |||||
import numpy as np | |||||
faceCascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_alt2.xml") | |||||
eyeCascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_eye.xml") | |||||
def read_video(path): | |||||
""" | |||||
Liest ein Video, erkennt Gesichter und extrahiert Regionen von Interesse (ROIs). | |||||
Diese Funktion nimmt einen Pfad zu einer Videodatei und liest das Video. Während des Lesens erkennt sie | |||||
Gesichter im Video und extrahiert die ROIs (Gesichtsbereiche), die anschließend in einer Liste von Frames | |||||
gespeichert werden. Die Frames werden für spätere Verarbeitungsschritte skaliert. | |||||
Args: | |||||
path (str): Der Pfad zur Videodatei. | |||||
Returns: | |||||
tuple: Ein Tupel, bestehend aus: | |||||
- video_frames (list): Eine Liste von Frames, die die ROIs (Gesichtsbereiche) darstellen. | |||||
- frame_ct (int): Die Anzahl der extrahierten Frames. | |||||
- fps (int): Die Bildrate (Frames pro Sekunde) des Videos. | |||||
""" | |||||
cap = cv2.VideoCapture(path) | |||||
fps = int(cap.get(cv2.CAP_PROP_FPS)) | |||||
video_frames = [] | |||||
while cap.isOpened(): | |||||
ret, img = cap.read() | |||||
if not ret: | |||||
break | |||||
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |||||
faces = faceCascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) | |||||
for (x, y, w, h) in faces: | |||||
face_roi_gray = gray[y:y+h, x:x+w] | |||||
face_roi_color = img[y:y+h, x:x+w] | |||||
eyes = eyeCascade.detectMultiScale(face_roi_gray) | |||||
# Annahme: Wir brauchen mindestens zwei Augen für die Berechnung | |||||
if len(eyes) == 2: | |||||
# Berechne die Position und Größe des Stirnbereichs | |||||
eye1_x, eye1_y, eye1_w, eye1_h = eyes[0] | |||||
eye2_x, eye2_y, eye2_w, eye2_h = eyes[1] | |||||
# Bestimme die horizontale Position und Breite des Stirnbereichs | |||||
forehead_x = min(eye1_x, eye2_x) | |||||
forehead_w = max(eye1_x + eye1_w, eye2_x + eye2_w) - forehead_x | |||||
# Bestimme die vertikale Position und Höhe des Stirnbereichs | |||||
forehead_y = 0 | |||||
forehead_h = int((min(eye1_y, eye2_y) - forehead_y) / 3) | |||||
# Extrahiere und skaliere den Stirnbereich | |||||
forehead_roi = face_roi_color[forehead_y:forehead_y + forehead_h, forehead_x:forehead_x + forehead_w] | |||||
forehead_resized = cv2.resize(forehead_roi, (500, 500)) | |||||
video_frames.append(forehead_resized.astype("float") / 255.0) | |||||
cap.release() | |||||
for frame in video_frames: | |||||
cv2.imshow("frame", frame) | |||||
cv2.waitKey(20) | |||||
cv2.destroyAllWindows() | |||||
return video_frames, len(video_frames), fps |
""" | |||||
Abhängigkeiten: | |||||
- scipy.signal (find_peaks) | |||||
Autor: Ihr Name | |||||
Datum: Erstellungs- oder Änderungsdatum | |||||
Version: Modulversion | |||||
""" | |||||
from scipy import signal | |||||
def find_heart_rate(fft, freqs, freq_min, freq_max): | |||||
""" | |||||
Berechnet die Herzfrequenz aus den FFT-Spitzen. | |||||
Diese Funktion nimmt FFT-Daten, Frequenzen, sowie minimale und maximale Frequenzgrenzen entgegen. Sie identifiziert die | |||||
Spitzen im FFT-Spektrum, findet die Spitze mit der höchsten Amplitude in einem bestimmten Frequenzband und berechnet | |||||
die Herzfrequenz basierend auf dieser Spitze. | |||||
Args: | |||||
fft (ndarray): Die FFT-Daten des Videos. | |||||
freqs (ndarray): Die Frequenzen, die den FFT-Daten zugeordnet sind. | |||||
freq_min (float): Die untere Grenzfrequenz des zu berücksichtigenden Frequenzbands. | |||||
freq_max (float): Die obere Grenzfrequenz des zu berücksichtigenden Frequenzbands. | |||||
Returns: | |||||
float: Die berechnete Herzfrequenz in Schlägen pro Minute (bpm). | |||||
""" | |||||
fft_maximums = [] | |||||
# Bestimme die Amplitude an jedem Frequenzpunkt | |||||
for i in range(fft.shape[0]): | |||||
if freq_min <= freqs[i] <= freq_max: | |||||
fftMap = abs(fft[i]) | |||||
fft_maximums.append(fftMap.max()) | |||||
else: | |||||
fft_maximums.append(0) | |||||
print("fft_maximums: "+str(len(fft_maximums))) | |||||
peaks, properties = signal.find_peaks(fft_maximums) | |||||
print("peaks: "+str(len(peaks))) | |||||
# Liste zur Speicherung der Top-10-Peak-Frequenzen | |||||
top_peak_freqs = [] | |||||
# Sortiere die Peaks nach ihrer Amplitude | |||||
sorted_peaks = sorted(peaks, key=lambda x: fft_maximums[x], reverse=True) | |||||
print("sorted_peaks: "+str(len(sorted_peaks))) | |||||
# Wähle die Top-10-Peaks aus | |||||
for peak in sorted_peaks[:100]: | |||||
top_peak_freqs.append(freqs[peak]) | |||||
# Berechne den Durchschnitt der Frequenzen der Top-10-Peaks | |||||
if top_peak_freqs: | |||||
average_freq = sum(top_peak_freqs) / len(top_peak_freqs) | |||||
return average_freq * 60# Umrechnung von Hz in BPM | |||||
else: | |||||
return None | |||||
# Beispielaufruf der Funktion | |||||
# heart_rate = find_heart_rate(fft_data, frequency_data, freq_min, freq_max) |
""" | |||||
Abhängigkeiten: | |||||
- tkinter (Tkinter-Bibliothek) | |||||
- recording (Modul für die Videoaufnahme) | |||||
- processing (Modul für die Videoverarbeitung) | |||||
- tkinter.filedialog (Dateiauswahldialog) | |||||
- os | |||||
Autor: Roberto Gelsinger | |||||
Datum: 07.12.2023 | |||||
Version: Modulversion | |||||
""" | |||||
import tkinter as tk | |||||
from recording import start_recording, recording_finished_event | |||||
from recording import start_recording,start_normal_recording,stop_normal_recording | |||||
from processing import process_video | |||||
from tkinter import filedialog | |||||
from tkinter import simpledialog | |||||
import os | |||||
from PIL import ImageTk, Image | |||||
import pandas as pd | |||||
from excel_processing import process_all_videos_and_save_results | |||||
from datetime import datetime | |||||
recording_finished = False | |||||
code_version= "etwasausdenken" | |||||
current_dir = os.getcwd() | |||||
testcase_excel_file_path = os.path.join(current_dir, 'testing/excel/Testcase_excel_dataset.xlsx') | |||||
testruns_excel_file_path = os.path.join(current_dir, 'testing/excel/Testruns.xlsx') | |||||
class VideoProcessingApp(tk.Tk): | |||||
def __init__(self): | |||||
super().__init__() | |||||
self.title("Driving Simulator EVM") | |||||
self.geometry('530x380') | |||||
#self.resizable(False, False) | |||||
self.frames = {} | |||||
#init user interface() | |||||
self.initialize_header_frame() | |||||
self.initialize_toolbar() | |||||
self.initialize_icon() | |||||
self.setup_recording_controls() | |||||
self.setup_testcase_controls() | |||||
self.setup_testing_controls() | |||||
self.setup_video_processing_controls() | |||||
self.center_window() | |||||
self.selected_button = None | |||||
self.check_recording_status() | |||||
self.mainloop() | |||||
def open_testcase_excel_file(self): | |||||
os.startfile(testcase_excel_file_path) | |||||
def open_testrun_excel_file(self): | |||||
os.startfile(testruns_excel_file_path) | |||||
def test_data_set(self): | |||||
kommentar = self.testrun_kommentar_entry.get("1.0", "end-1c") | |||||
process_all_videos_and_save_results(testcase_excel_file_path,testruns_excel_file_path,code_version,kommentar) | |||||
def start_normal_recording_with_input(self): | |||||
""" | |||||
Startet die Videoaufnahme mit dem eingegebenen Videonamen. | |||||
Der Name wird aus dem Tkinter Entry-Widget gelesen. | |||||
""" | |||||
video_name = self.video_name_entry.get() # Holt den Text aus dem Textfeld | |||||
video_resolution1 = int(self.aufnahme_aufloesung1_entry.get()) | |||||
video_resolution2 = int(self.aufnahme_aufloesung2_entry.get()) | |||||
fps = int(self.aufnahme_fps_entry.get()) | |||||
start_normal_recording(video_name,video_resolution1,video_resolution2,fps) | |||||
def write_to_excel(self, video_name, excel_path): | |||||
# Datenerfassung | |||||
date = datetime.now().strftime("%Y-%m-%d") | |||||
time = datetime.now().strftime("%H:%M:%S") | |||||
licht = self.testcase_licht_entry.get() | |||||
webcam_name = self.testcase_kamera_entry.get() | |||||
testperson_name = self.testcase_testperson_entry.get() | |||||
abstand = int(self.testcase_abstand_entry.get()) | |||||
winkel = self.testcase_winkel_entry.get() | |||||
hintergrund = self.testcase_hintergrund_entry.get() | |||||
video_length = int(self.video_length_entry.get()) | |||||
auflösung = f"{int(self.testcase_resolution1_entry.get())}x{int(self.testcase_resolution2_entry.get())}" | |||||
fps = int(self.testcase_fps_entry.get()) | |||||
kommentar = self.testcase_kommentar_entry.get("1.0", "end-1c") | |||||
# Entferne die Dateiendung (z.B. '.avi') und extrahiere dann den Puls | |||||
video_name_without_extension = video_name.split('.')[0] | |||||
puls_part = video_name_without_extension.split('_')[-1] | |||||
try: | |||||
puls = int(puls_part) | |||||
except ValueError: | |||||
puls = '' # Setze Puls auf einen leeren String, falls keine gültige Zahl gefunden wird | |||||
# Versuche, die vorhandene Datei zu lesen, erstelle eine neue, wenn sie nicht existiert | |||||
try: | |||||
existing_df = pd.read_excel(excel_path) | |||||
except FileNotFoundError: | |||||
existing_df = pd.DataFrame(columns=['Testcase','Date', 'Time', 'VideoName', 'CodeVersion', 'Testperson', | |||||
'Abstand', 'Videolänge', 'Webcam', 'Auflösung', 'FPS', | |||||
'Winkel', 'Hintergrund', 'Licht', 'Puls', 'Kommentar']) | |||||
# Neue Datenreihe erstellen | |||||
next_testcase_index = len(existing_df) + 1 | |||||
new_data = pd.DataFrame({'Testcase': [next_testcase_index],'Date': [date], 'Time': [time], 'VideoName': [video_name], | |||||
'CodeVersion': [code_version], 'Testperson': [testperson_name], | |||||
'Abstand': [abstand], 'Videolänge': [video_length], 'Webcam': [webcam_name], | |||||
'Auflösung': [auflösung], 'FPS': [fps], 'Winkel': [winkel], | |||||
'Hintergrund': [hintergrund], 'Licht': [licht], 'Puls': [puls], | |||||
'Kommentar': [kommentar]}) | |||||
# Daten zur existierenden DataFrame hinzufügen | |||||
updated_df = existing_df._append(new_data, ignore_index=True) | |||||
# DataFrame in Excel schreiben | |||||
if not existing_df.empty: | |||||
# Modus 'a' (Anhängen) verwenden, wenn die DataFrame nicht leer ist | |||||
with pd.ExcelWriter(excel_path, engine='openpyxl', mode='a', if_sheet_exists='replace') as writer: | |||||
updated_df.to_excel(writer, index=False, sheet_name='Sheet1') | |||||
else: | |||||
# Modus 'w' (Schreiben) verwenden, wenn die DataFrame leer ist oder die Datei nicht existiert | |||||
with pd.ExcelWriter(excel_path, engine='openpyxl', mode='w') as writer: | |||||
updated_df.to_excel(writer, index=False, sheet_name='Sheet1') | |||||
def start_recording_with_input(self): | |||||
""" | |||||
Startet die Videoaufnahme mit dem eingegebenen Videonamen. | |||||
Der Name wird aus dem Tkinter Entry-Widget gelesen. | |||||
""" | |||||
video_name = self.testcase_name_entry.get() | |||||
video_length = int(self.video_length_entry.get()) # Hole die Länge des Videos | |||||
testcase_resolution1 = int(self.testcase_resolution1_entry.get()) | |||||
testcase_resolution2 = int(self.testcase_resolution2_entry.get()) | |||||
testcase_fps=int(self.testcase_fps_entry.get()) | |||||
start_recording(video_name, video_length,testcase_resolution1,testcase_resolution2,testcase_fps) | |||||
def select_video(self): | |||||
""" | |||||
Öffnet einen Dateidialog zum Auswählen eines Videos und setzt den Namen des ausgewählten Videos | |||||
in das Tkinter Entry-Widget für die Videoverarbeitung. | |||||
""" | |||||
selected_video_path = filedialog.askopenfilename() # Den ausgewählten Videopfad abfragen | |||||
if selected_video_path: | |||||
selected_video_name = os.path.basename(selected_video_path) # Extrahieren Sie den Videonamen | |||||
self.videoprocessing_name_entry.delete(0, tk.END) # Löschen Sie den aktuellen Text im Textfeld | |||||
self.videoprocessing_name_entry.insert(0, selected_video_name) # Fügen Sie den Videonamen ein | |||||
def process_selected_video(self): | |||||
""" | |||||
Verarbeitet das ausgewählte Video, dessen Name aus dem Tkinter Entry-Widget gelesen wird. | |||||
""" | |||||
selected_video_name = self.videoprocessing_name_entry.get() | |||||
# Den ausgewählten Videonamen abfragen | |||||
if selected_video_name: | |||||
process_video(selected_video_name) | |||||
def check_recording_status(self): | |||||
excel_file_path = 'testing/excel/Testcase_excel_dataset.xlsx' | |||||
global recording_finished # Deklarieren Sie die Verwendung der globalen Variable | |||||
if recording_finished_event.is_set(): | |||||
recording_finished_event.clear() | |||||
video_name = self.testcase_name_entry.get() | |||||
length = int(self.video_length_entry.get()) # Hole die Länge des Videos | |||||
pulse = simpledialog.askinteger("Puls", "Bitte geben Sie Ihren Puls ein:") | |||||
if pulse is not None: | |||||
new_video_name = f"{video_name}_{length}_{pulse}.avi" | |||||
original_video_path = os.path.join('videos', f"{video_name}.avi") | |||||
new_video_path = os.path.join('videos', new_video_name) | |||||
os.rename(original_video_path, new_video_path) | |||||
print(f"Video umbenannt zu {new_video_name}") | |||||
self.write_to_excel(new_video_name, excel_file_path) | |||||
else: | |||||
print("recording_finished ist False, warte auf Aufnahmeende") | |||||
print("Kein Puls eingegeben.") | |||||
# Planen Sie die nächste Überprüfung | |||||
self.after(100, self.check_recording_status) | |||||
#ui relateted methods | |||||
def center_window(self): | |||||
# Aktualisieren der "idle" Aufgaben um die Größe korrekt zu erhalten | |||||
self.update_idletasks() | |||||
# Berechnen der Breite und Höhe für das Zentrieren des Fensters | |||||
window_width = self.winfo_width() | |||||
window_height = self.winfo_height() | |||||
# Finden der Mitte des Bildschirms | |||||
screen_width = self.winfo_screenwidth() | |||||
screen_height = self.winfo_screenheight() | |||||
# Berechnen der x und y Koordinaten, um das Fenster in der Mitte des Bildschirms zu positionieren | |||||
x_coordinate = int((screen_width / 2) - (window_width / 2)) | |||||
y_coordinate = int((screen_height / 2) - (window_height / 2)) | |||||
self.geometry(f"{window_width}x{window_height}+{x_coordinate}+{y_coordinate}") | |||||
#displaying selected frame | |||||
def show_frame(self, frame_name): | |||||
# Verstecke alle Frames und setze die Button-Farben zurück Create Testcase | |||||
for name, fr in self.frames.items(): | |||||
fr.pack_forget() | |||||
if name == "Recording": | |||||
self.btn_recording.configure(bg='white',fg='black', relief=tk.RAISED) | |||||
elif name == "Processing": | |||||
self.btn_processing.configure(bg='white',fg='black', relief=tk.RAISED) | |||||
elif name == "Testing": | |||||
self.btn_testing.configure(bg='white',fg='black', relief=tk.RAISED) | |||||
elif name == "Create Testcase": | |||||
self.btn_testcase.configure(bg='white',fg='black', relief=tk.RAISED) | |||||
# Zeige den ausgewählten Frame | |||||
frame = self.frames[frame_name] | |||||
frame.pack(fill="both", expand=True) | |||||
# Hebe den entsprechenden Button hervor | |||||
if frame_name == "Recording": | |||||
self.btn_recording.configure(bg='#c82423',fg='white', relief=tk.SUNKEN) | |||||
elif frame_name == "Processing": | |||||
self.btn_processing.configure(bg='#c82423',fg='white', relief=tk.SUNKEN) | |||||
elif frame_name == "Testing": | |||||
self.btn_testing.configure(bg='#c82423',fg='white', relief=tk.SUNKEN) | |||||
elif frame_name == "Create Testcase": | |||||
self.btn_testcase.configure(bg='#c82423',fg='white', relief=tk.SUNKEN) | |||||
def initialize_header_frame(self): | |||||
# Header-Frame für App-Name und Icon | |||||
header_frame = tk.Frame(self, bd=1, relief=tk.RAISED, bg='white') | |||||
header_frame.pack(side=tk.TOP, fill=tk.X) | |||||
# App-Name Label | |||||
self.app_name_label = tk.Label(header_frame, text="Driving Simulator-EVM", font=('Helvetica', 28, 'bold'), bg='white', fg='red') | |||||
self.app_name_label.pack(side=tk.LEFT, padx=10) | |||||
self.app_name_label.config(fg="#c82423", bg="#FFFFFF") | |||||
# Laden Sie das Bild mit PIL und konvertieren Sie es in ein Format, das Tkinter verwenden kann | |||||
self.image = Image.open("interface/ohmbild2.png") | |||||
self.resized_image = self.image.resize((50, 30), Image.LANCZOS) | |||||
self.photo = ImageTk.PhotoImage(self.resized_image) | |||||
# Erstellen Sie ein Label-Widget, um das Bild anzuzeigen | |||||
self.picture = tk.Label(self, image=self.photo) | |||||
self.picture.place(x=445, y=0) | |||||
self.picture.config(bg="#FFFFFF") | |||||
def initialize_toolbar(self): | |||||
toolbar = tk.Frame(self, bd=1, relief=tk.RAISED, bg='white') | |||||
toolbar.pack(side=tk.TOP, fill=tk.X) | |||||
self.btn_recording = tk.Button(toolbar, text="Recording", bg='white', command=lambda: self.show_frame("Recording")) | |||||
self.btn_recording.pack(side=tk.LEFT, padx=2, pady=2) | |||||
self.btn_processing = tk.Button(toolbar, text="Processing", bg='white', command=lambda: self.show_frame("Processing")) | |||||
self.btn_processing.pack(side=tk.LEFT, padx=2, pady=2) | |||||
self.btn_testcase = tk.Button(toolbar, text="Create Testcase", bg='white', command=lambda: self.show_frame("Create Testcase")) | |||||
self.btn_testcase.pack(side=tk.LEFT, padx=3, pady=3) | |||||
self.btn_testing = tk.Button(toolbar, text="Testing", bg='white', command=lambda: self.show_frame("Testing")) | |||||
self.btn_testing.pack(side=tk.LEFT, padx=3, pady=3) | |||||
def setup_recording_controls(self): | |||||
self.recording_frame = tk.Frame(self) | |||||
self.recording_frame.configure(bg='white') | |||||
self.frames["Recording"] = self.recording_frame | |||||
# mainlabel for recording | |||||
self.recording_main_label = tk.Label(self.recording_frame, text="Recording", font=("Helvetica", 20)) | |||||
self.recording_main_label.place(x=25, y=10) | |||||
self.recording_main_label.config(bg="#FFFFFF") | |||||
self.video_name_entry_label = tk.Label(self.recording_frame, text="Videoname(Output)", font=("Helvetica", 10)) | |||||
self.video_name_entry_label.place(x=25, y=60) | |||||
self.video_name_entry_label.config(bg="#FFFFFF") | |||||
self.video_name_entry = tk.Entry(self.recording_frame) | |||||
self.video_name_entry.place(x=25, y=85) | |||||
self.video_name_entry.config(bg="#FFFFFF") | |||||
self.aufnahme_entry_label = tk.Label(self.recording_frame, text="Auflösung,FPS", font=("Helvetica", 10)) | |||||
self.aufnahme_entry_label.place(x=25, y=110) | |||||
self.aufnahme_entry_label.config(bg="#FFFFFF") | |||||
self.aufnahme_aufloesung1_entry = tk.Entry(self.recording_frame) | |||||
self.aufnahme_aufloesung1_entry.place(x=25, y=140) | |||||
self.aufnahme_aufloesung1_entry.config(bg="#FFFFFF", width=5) | |||||
self.aufnahme_aufloesung1_entry.insert(0, 2560) | |||||
self.aufnahme_aufloesung_x_entry_label = tk.Label(self.recording_frame, text="x", font=("Helvetica", 8)) | |||||
self.aufnahme_aufloesung_x_entry_label.place(x=60, y=140) | |||||
self.aufnahme_aufloesung_x_entry_label.config(bg="#FFFFFF") | |||||
self.aufnahme_aufloesung2_entry = tk.Entry(self.recording_frame) | |||||
self.aufnahme_aufloesung2_entry.place(x=72, y=140) | |||||
self.aufnahme_aufloesung2_entry.config(bg="#FFFFFF", width=5) | |||||
self.aufnahme_aufloesung2_entry.insert(0, 1440) | |||||
self.aufnahme_aufloesung_komma_entry_label = tk.Label(self.recording_frame, text=",", font=("Helvetica", 8)) | |||||
self.aufnahme_aufloesung_komma_entry_label.place(x=105, y=140) | |||||
self.aufnahme_aufloesung_komma_entry_label.config(bg="#FFFFFF") | |||||
self.aufnahme_fps_entry = tk.Entry(self.recording_frame) | |||||
self.aufnahme_fps_entry.place(x=115, y=140) | |||||
self.aufnahme_fps_entry.config(bg="#FFFFFF", width=4) | |||||
self.aufnahme_fps_entry.insert(0, 20) | |||||
# Buttons | |||||
self.start_button = tk.Button(self.recording_frame, text="Aufnahme starten", command=self.start_normal_recording_with_input) | |||||
self.start_button.place(x=25, y=175) | |||||
self.start_button.config(bg="#c82423", fg="#FFFFFF") | |||||
self.stop_button = tk.Button(self.recording_frame, text="Aufnahme stoppen", command=stop_normal_recording) | |||||
self.stop_button.place(x=25, y=210) | |||||
self.stop_button.config(bg="#c82423", fg="#FFFFFF") | |||||
def setup_video_processing_controls(self): | |||||
self.processing_frame = tk.Frame(self) | |||||
self.processing_frame.configure(bg='white') | |||||
self.frames["Processing"] = self.processing_frame | |||||
# mainlabel for processing | |||||
self.processing_main_label = tk.Label(self.processing_frame, text="Processing", font=("Helvetica", 20)) | |||||
self.processing_main_label.place(x=10, y=10) | |||||
self.processing_main_label.config(bg="#FFFFFF") | |||||
self.videoprocessing_name_entry_label = tk.Label(self.processing_frame, text="Videoname(Loaded)", font=("Helvetica", 10)) | |||||
self.videoprocessing_name_entry_label.place(x=10, y=60) | |||||
self.videoprocessing_name_entry_label.config(bg="#FFFFFF") | |||||
self.videoprocessing_name_entry = tk.Entry(self.processing_frame) | |||||
self.videoprocessing_name_entry.place(x=10, y=85) | |||||
self.videoprocessing_name_entry.config(bg="#FFFFFF") | |||||
# Button to select video for processing | |||||
self.select_video_button = tk.Button(self.processing_frame, text="Video auswählen", command=self.select_video) | |||||
self.select_video_button.place(x=10, y=120) | |||||
self.select_video_button.config(bg="#c82423", fg="#FFFFFF") | |||||
# Button to start processing | |||||
self.processing_button = tk.Button(self.processing_frame, text="Verarbeiten", command=self.process_selected_video) | |||||
self.processing_button.place(x=10, y=160) | |||||
self.processing_button.config(bg="#c82423", fg="#FFFFFF") | |||||
def setup_testcase_controls(self): | |||||
self.testcase_frame = tk.Frame(self, bg='white') | |||||
self.frames["Create Testcase"] = self.testcase_frame | |||||
# mainlabel for Recording(Testcase) | |||||
self.recording_testcase_label = tk.Label(self.testcase_frame, text="Record Testcase", font=("Helvetica", 20)) | |||||
self.recording_testcase_label.place(x=10, y=10) | |||||
self.recording_testcase_label.config(bg="#FFFFFF") | |||||
#kommentar | |||||
self.testcase_kommentar_entry_label = tk.Label(self.testcase_frame, text="Kommentar", font=("Helvetica", 10)) | |||||
self.testcase_kommentar_entry_label.place(x=320, y=60) | |||||
self.testcase_kommentar_entry_label.config(bg="#FFFFFF") | |||||
self.testcase_kommentar_entry = tk.Text(self.testcase_frame, height=4.5, width=20) | |||||
self.testcase_kommentar_entry.place(x=320, y=85) | |||||
self.testcase_kommentar_entry.config(bg="#FFFFFF") | |||||
#code version | |||||
self.testcase_version_entry_label = tk.Label(self.testcase_frame, text="Version: "+code_version, font=("Helvetica", 10)) | |||||
self.testcase_version_entry_label.place(x=240, y=20) | |||||
self.testcase_version_entry_label.config(bg="#FFFFFF") | |||||
#licht | |||||
self.testcase_licht_entry_label = tk.Label(self.testcase_frame, text="Licht", font=("Helvetica", 10)) | |||||
self.testcase_licht_entry_label.place(x=10, y=180) | |||||
self.testcase_licht_entry_label.config(bg="#FFFFFF") | |||||
self.testcase_licht_entry = tk.Entry(self.testcase_frame) | |||||
self.testcase_licht_entry.place(x=10, y=205) | |||||
self.testcase_licht_entry.config(bg="#FFFFFF") | |||||
#kamera | |||||
self.testcase_kamera_entry_label = tk.Label(self.testcase_frame, text="Webcam(Name)", font=("Helvetica", 10)) | |||||
self.testcase_kamera_entry_label.place(x=10, y=240) | |||||
self.testcase_kamera_entry_label.config(bg="#FFFFFF") | |||||
self.testcase_kamera_entry = tk.Entry(self.testcase_frame) | |||||
self.testcase_kamera_entry.place(x=10, y=265) | |||||
self.testcase_kamera_entry.config(bg="#FFFFFF") | |||||
#testperson | |||||
self.testcase_testperson_entry_label = tk.Label(self.testcase_frame, text="Testperson(Name)", font=("Helvetica", 10)) | |||||
self.testcase_testperson_entry_label.place(x=160, y=60) | |||||
self.testcase_testperson_entry_label.config(bg="#FFFFFF") | |||||
self.testcase_testperson_entry = tk.Entry(self.testcase_frame) | |||||
self.testcase_testperson_entry.place(x=160, y=85) | |||||
self.testcase_testperson_entry.config(bg="#FFFFFF") | |||||
#abstand | |||||
self.testcase_abstand_entry_label = tk.Label(self.testcase_frame, text="Abstand zur Kamera", font=("Helvetica", 10)) | |||||
self.testcase_abstand_entry_label.place(x=160, y=120) | |||||
self.testcase_abstand_entry_label.config(bg="#FFFFFF") | |||||
self.testcase_abstand_entry = tk.Entry(self.testcase_frame) | |||||
self.testcase_abstand_entry.place(x=160, y=145) | |||||
self.testcase_abstand_entry.config(bg="#FFFFFF") | |||||
#Winkel | |||||
self.testcase_winkel_entry_label = tk.Label(self.testcase_frame, text="Kamerawinkel", font=("Helvetica", 10)) | |||||
self.testcase_winkel_entry_label.place(x=160, y=180) | |||||
self.testcase_winkel_entry_label.config(bg="#FFFFFF") | |||||
self.testcase_winkel_entry = tk.Entry(self.testcase_frame) | |||||
self.testcase_winkel_entry.place(x=160, y=205) | |||||
self.testcase_winkel_entry.config(bg="#FFFFFF") | |||||
#Hintergrund | |||||
self.testcase_hintergrund_entry_label = tk.Label(self.testcase_frame, text="Hintergrund", font=("Helvetica", 10)) | |||||
self.testcase_hintergrund_entry_label.place(x=160, y=240) | |||||
self.testcase_hintergrund_entry_label.config(bg="#FFFFFF") | |||||
self.testcase_hintergrund_entry = tk.Entry(self.testcase_frame) | |||||
self.testcase_hintergrund_entry.place(x=160, y=265) | |||||
self.testcase_hintergrund_entry.config(bg="#FFFFFF") | |||||
#videoname | |||||
self.testcase_name_entry_label = tk.Label(self.testcase_frame, text="Videoname(Output)", font=("Helvetica", 10)) | |||||
self.testcase_name_entry_label.place(x=10, y=60) | |||||
self.testcase_name_entry_label.config(bg="#FFFFFF") | |||||
self.testcase_name_entry = tk.Entry(self.testcase_frame) | |||||
self.testcase_name_entry.place(x=10, y=85) | |||||
self.testcase_name_entry.config(bg="#FFFFFF") | |||||
#videolänge | |||||
self.video_length_entry_label = tk.Label(self.testcase_frame, text="Videolänge (Sek.)", font=("Helvetica", 10)) | |||||
self.video_length_entry_label.place(x=10, y=120) | |||||
self.video_length_entry_label.config(bg="#FFFFFF") | |||||
self.video_length_entry = tk.Entry(self.testcase_frame) | |||||
self.video_length_entry.place(x=10, y=145) | |||||
self.video_length_entry.config(bg="#FFFFFF") | |||||
#auflösung und fps | |||||
self.testcase_resolution_label = tk.Label(self.testcase_frame, text="Auflösung,FPS", font=("Helvetica", 10)) | |||||
self.testcase_resolution_label.place(x=320, y=180) | |||||
self.testcase_resolution_label.config(bg="#FFFFFF") | |||||
self.testcase_resolution1_entry = tk.Entry(self.testcase_frame) | |||||
self.testcase_resolution1_entry.place(x=320, y=205) | |||||
self.testcase_resolution1_entry.config(bg="#FFFFFF", width=5) | |||||
self.testcase_resolution1_entry.insert(0, 2560) | |||||
self.resolution_x_label = tk.Label(self.testcase_frame, text="x", font=("Helvetica", 8)) | |||||
self.resolution_x_label.place(x=365, y=205) | |||||
self.resolution_x_label.config(bg="#FFFFFF") | |||||
self.testcase_resolution2_entry = tk.Entry(self.testcase_frame) | |||||
self.testcase_resolution2_entry.place(x=377, y=205) | |||||
self.testcase_resolution2_entry.config(bg="#FFFFFF", width=5) | |||||
self.testcase_resolution2_entry.insert(0, 1440) | |||||
self.resolution_comma_label = tk.Label(self.testcase_frame, text=",", font=("Helvetica", 8)) | |||||
self.resolution_comma_label.place(x=410, y=205) | |||||
self.resolution_comma_label.config(bg="#FFFFFF") | |||||
self.testcase_fps_entry = tk.Entry(self.testcase_frame) | |||||
self.testcase_fps_entry.place(x=420, y=205) | |||||
self.testcase_fps_entry.config(bg="#FFFFFF", width=4) | |||||
self.testcase_fps_entry.insert(0, 20) | |||||
# Button to start testcase recording | |||||
self.create_testcase_button = tk.Button(self.testcase_frame, text="Testcase aufnehmen", command=self.start_recording_with_input) | |||||
self.create_testcase_button.place(x=320, y=240) | |||||
self.create_testcase_button.config(bg="#c82423", fg="#FFFFFF") | |||||
def setup_testing_controls(self): | |||||
self.testing_frame = tk.Frame(self, bg='white') | |||||
self.frames["Testing"] = self.testing_frame | |||||
#kommentar | |||||
self.testrun_kommentar_entry_label = tk.Label(self.testing_frame, text="Kommentar", font=("Helvetica", 10)) | |||||
self.testrun_kommentar_entry_label.place(x=10, y=60) | |||||
self.testrun_kommentar_entry_label.config(bg="#FFFFFF") | |||||
self.testrun_kommentar_entry = tk.Text(self.testing_frame, height=4.5, width=20) | |||||
self.testrun_kommentar_entry.place(x=10, y=85) | |||||
self.testrun_kommentar_entry.config(bg="#FFFFFF") | |||||
# mainlabel for testing | |||||
self.testing_main_label = tk.Label(self.testing_frame, text="Testing", font=("Helvetica", 20)) | |||||
self.testing_main_label.place(x=10, y=10) | |||||
self.testing_main_label.config(bg="#FFFFFF") | |||||
# Button to start test | |||||
self.test_button = tk.Button(self.testing_frame, text="Test durchführen", command=self.test_data_set) | |||||
self.test_button.place(x=350, y=60) | |||||
self.test_button.config(bg="#c82423", fg="#FFFFFF") | |||||
# Button open testcase excel | |||||
self.open_testcase_button = tk.Button(self.testing_frame, text="Open Testcase Excel", command=self.open_testcase_excel_file) | |||||
self.open_testcase_button.place(x=10, y=200) | |||||
self.open_testcase_button.config(bg="#c82423", fg="#FFFFFF") | |||||
# Button open testrun excel | |||||
self.open_testrun_button = tk.Button(self.testing_frame, text="Open Testrun Excel", command=self.open_testrun_excel_file) | |||||
self.open_testrun_button.place(x=10, y=235) | |||||
self.open_testrun_button.config(bg="#c82423", fg="#FFFFFF") | |||||
def initialize_icon(self): | |||||
# Icon ändern | |||||
self.iconbitmap('Interface/ohm.ico') | |||||
# Ändert die Hintergrundfarbe | |||||
self.configure(bg="#FFFFFF") | |||||
def main(): | |||||
app = VideoProcessingApp() | |||||
app.mainloop() | |||||
if __name__ == "__main__": | |||||
main() | |||||
""" | |||||
Abhängigkeiten: | |||||
- pyramids (für den Aufbau der Bildpyramiden) | |||||
- heartrate (zur Berechnung der Herzfrequenz) | |||||
- preprocessing (für die Video-Vorverarbeitung) | |||||
- eulerian (für die Euler'sche Video-Magnifikation) | |||||
- tkinter und constants (für die GUI und Konstantenverwaltung) | |||||
Autor: Roberto Gelsinger | |||||
Datum: 07.12.2023 | |||||
Version: Modulversion | |||||
""" | |||||
import pyramids | |||||
import heartrate | |||||
import facedetection | |||||
import eulerian | |||||
import tkinter as tk | |||||
from constants import freq_max, freq_min | |||||
def process_video(selected_video_name): | |||||
""" | |||||
Verarbeitet ein ausgewähltes Video, um die Herzfrequenz der abgebildeten Person zu ermitteln. | |||||
Dieser Prozess umfasst die Vorverarbeitung des Videos, den Aufbau einer Laplace-Pyramide, | |||||
die Anwendung von FFT-Filterung und Euler'scher Magnifikation, und schließlich die Berechnung | |||||
der Herzfrequenz aus den Video-Daten. | |||||
Args: | |||||
selected_video_name (str): Der Name des zu verarbeitenden Videos. | |||||
Returns: | |||||
None: Die Funktion gibt direkt die berechnete Herzfrequenz auf der Konsole aus. | |||||
""" | |||||
# Hier folgt Ihr bisheriger Code für die process_video Funktion | |||||
# Preprocessing phase | |||||
print("Reading + preprocessing video...") | |||||
video_frames, frame_ct, fps = facedetection.read_video("videos/"+selected_video_name) | |||||
# Build Laplacian video pyramid | |||||
print("Building Laplacian video pyramid...") | |||||
lap_video = pyramids.build_video_pyramid(video_frames) | |||||
for i, video in enumerate(lap_video): | |||||
if i == 0 or i == len(lap_video)-1: | |||||
continue | |||||
# Eulerian magnification with temporal FFT filtering | |||||
print("Running FFT and Eulerian magnification...") | |||||
result, fft, frequencies = eulerian.fft_filter(video, freq_min, freq_max, fps) | |||||
lap_video[i] += result | |||||
# Calculate heart rate | |||||
print("Calculating heart rate...") | |||||
heart_rate = heartrate.find_heart_rate(fft, frequencies, freq_min, freq_max) | |||||
# Output heart rate and final video | |||||
print("Heart rate: ", heart_rate, "bpm") |
""" | |||||
Abhängigkeiten: | |||||
- cv2 (OpenCV-Paket) | |||||
- numpy | |||||
Autor: Roberto Gelsinger | |||||
Datum: 07.12.2023 | |||||
Version: Modulversion | |||||
""" | |||||
import cv2 | |||||
import numpy as np | |||||
def build_gaussian_pyramid(img, levels): | |||||
""" | |||||
Erstellt eine Gaußsche Pyramide für ein gegebenes Bild. | |||||
Diese Funktion nimmt ein Bild und die gewünschte Anzahl von Ebenen und erstellt eine Gaußsche Pyramide. | |||||
Eine Gaußsche Pyramide ist eine Sammlung von Bildern, die bei jeder Ebene halbiert werden. | |||||
Args: | |||||
img (ndarray): Das Eingabebild. | |||||
levels (int): Die Anzahl der Ebenen in der Pyramide. | |||||
Returns: | |||||
list: Eine Liste von Bildern, die die Ebenen der Gaußschen Pyramide darstellen. | |||||
""" | |||||
float_img = np.ndarray(shape=img.shape, dtype="float") | |||||
float_img[:] = img | |||||
pyramid = [float_img] | |||||
for i in range(levels-1): | |||||
float_img = cv2.pyrDown(float_img) | |||||
pyramid.append(float_img) | |||||
return pyramid | |||||
def build_video_pyramid(frames): | |||||
""" | |||||
Erstellt eine Video-Pyramide, indem für jeden Frame eine Laplace-Pyramide erstellt wird. | |||||
Für jeden Frame des Eingabevideos wird eine Gaußsche Pyramide erstellt, und diese Pyramiden werden | |||||
zu einer Video-Pyramide zusammengesetzt. | |||||
Args: | |||||
frames (list of ndarray): Eine Liste von Frames, die das Video darstellen. | |||||
Returns: | |||||
list: Eine Liste von Pyramiden, jede repräsentiert einen Level der Video-Pyramide. | |||||
""" | |||||
lap_video = [] | |||||
for i, frame in enumerate(frames): | |||||
pyramid = build_gaussian_pyramid(frame, 3) | |||||
for j in range(3): | |||||
if i == 0: | |||||
lap_video.append(np.zeros((len(frames), pyramid[j].shape[0], pyramid[j].shape[1], 3))) | |||||
lap_video[j][i] = pyramid[j] | |||||
return lap_video |
""" | |||||
Abhängigkeiten: | |||||
- cv2 (OpenCV-Paket) | |||||
- threading | |||||
- os | |||||
Autor: Roberto Gelsinger | |||||
Datum: 07.12.2023 | |||||
Version: Modulversion | |||||
""" | |||||
import cv2 | |||||
import threading | |||||
import os | |||||
from tkinter import simpledialog | |||||
recording_normal = False | |||||
recording = False # Globale Variable, um den Aufnahmestatus zu verfolgen | |||||
recording_finished_event = threading.Event() | |||||
def start_recording(video_name="aufgenommenes_video", length=5,testcase_resolution1=2560,testcase_resolution2=1440,testcase_fps=20): | |||||
""" | |||||
Startet die Videoaufnahme in einem separaten Thread. | |||||
Args: | |||||
video_name (str): Der Basisname der Videodatei (Standard ist "aufgenommenes_video"). | |||||
""" | |||||
global recording | |||||
recording = True | |||||
thread = threading.Thread(target=record_video, args=(video_name, length,testcase_resolution1,testcase_resolution2,testcase_fps)) | |||||
thread.start() | |||||
def stop_recording(): | |||||
""" | |||||
Beendet die Videoaufnahme, indem der globale 'recording'-Status auf False gesetzt wird. | |||||
""" | |||||
global recording | |||||
recording = False | |||||
def record_video(video_name="aufgenommenes_video", length=5,testcase_resolution1=2560,testcase_resolution2=1440,testcase_fps=20): | |||||
""" | |||||
Nimmt ein Video auf und speichert es im AVI-Format. | |||||
Die Funktion initialisiert eine Videoaufnahme über die Webcam und speichert das Video in einem vordefinierten Ordner. | |||||
Die Aufnahme läuft, solange die globale Variable 'recording' auf True gesetzt ist. | |||||
Args: | |||||
video_name (str): Der Basisname der Videodatei (Standard ist "aufgenommenes_video"). | |||||
""" | |||||
output_folder = "videos" | |||||
output_file = os.path.join(output_folder, video_name + ".avi") | |||||
frame_rate = testcase_fps | |||||
cap = cv2.VideoCapture(0) | |||||
cap.set(cv2.CAP_PROP_FRAME_WIDTH, testcase_resolution1) | |||||
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, testcase_resolution2) | |||||
cap.set(cv2.CAP_PROP_FPS, testcase_fps) | |||||
if not cap.isOpened(): | |||||
print("Fehler beim Öffnen der Kamera.") | |||||
return | |||||
fourcc = cv2.VideoWriter_fourcc(*'XVID') | |||||
out = cv2.VideoWriter(output_file, fourcc,testcase_fps,(testcase_resolution1, testcase_resolution2) ) | |||||
total_frames = int(frame_rate * length) # Gesamtzahl der aufzunehmenden Frames | |||||
frame_count = 0 # Frame-Zähler | |||||
while frame_count < total_frames: | |||||
ret, frame = cap.read() | |||||
if not ret: | |||||
break | |||||
out.write(frame) | |||||
frame_count += 1 | |||||
cv2.imshow('Recording', frame) | |||||
if cv2.waitKey(1) & 0xFF == ord('q'): | |||||
break | |||||
recording_finished_event.set() | |||||
recording = False | |||||
cap.release() | |||||
out.release() | |||||
cv2.destroyAllWindows() | |||||
def stop_normal_recording(): | |||||
""" | |||||
Beendet die Videoaufnahme, indem der globale 'recording'-Status auf False gesetzt wird. | |||||
""" | |||||
global recording_normal | |||||
recording_normal = False | |||||
def start_normal_recording(video_name="aufgenommenes_video",video_resolution1=2560,video_resolution2=1440, fps=20): | |||||
""" | |||||
Startet die Videoaufnahme in einem separaten Thread. | |||||
Args: | |||||
video_name (str): Der Basisname der Videodatei (Standard ist "aufgenommenes_video"). | |||||
""" | |||||
global recording_normal | |||||
recording_normal = True | |||||
thread = threading.Thread(target=record_normal_video, args=(video_name,video_resolution1,video_resolution2,fps)) | |||||
thread.start() | |||||
def record_normal_video(video_name="aufgenommenes_video",video_resolution1=2560,video_resolution2=1440, fps=20): | |||||
""" | |||||
Nimmt ein Video auf und speichert es im AVI-Format. | |||||
Die Funktion initialisiert eine Videoaufnahme über die Webcam und speichert das Video in einem vordefinierten Ordner. | |||||
Die Aufnahme läuft, solange die globale Variable 'recording' auf True gesetzt ist. | |||||
Args: | |||||
video_name (str): Der Basisname der Videodatei (Standard ist "aufgenommenes_video"). | |||||
""" | |||||
output_folder = "videos" | |||||
output_file = os.path.join(output_folder, video_name + ".avi") | |||||
cap = cv2.VideoCapture(0) | |||||
cap.set(cv2.CAP_PROP_FRAME_WIDTH, video_resolution1) | |||||
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, video_resolution2) | |||||
cap.set(cv2.CAP_PROP_FPS, fps) | |||||
if not cap.isOpened(): | |||||
print("Fehler beim Öffnen der Kamera.") | |||||
return | |||||
#usefull if you have problems with cam resolutions , for manual debugging | |||||
#print("video_resolution1:", video_resolution1, "type:", type(video_resolution1)) | |||||
#print("video_resolution2:", video_resolution2, "type:", type(video_resolution2)) | |||||
#print("fps:", fps, "type:", type(fps)) | |||||
#actual_width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) | |||||
#actual_height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) | |||||
#actual_fps = cap.get(cv2.CAP_PROP_FPS) | |||||
#print("Actual width:", actual_width) | |||||
#print("Actual height:", actual_height) | |||||
#print("Actual FPS:", actual_fps) | |||||
fourcc = cv2.VideoWriter_fourcc(*'XVID') | |||||
out = cv2.VideoWriter(output_file, fourcc, fps, (video_resolution1, video_resolution2)) | |||||
if not out.isOpened(): | |||||
print("Fehler beim Öffnen der Videoausgabedatei.") | |||||
cap.release() | |||||
return | |||||
while recording_normal: | |||||
ret, frame = cap.read() | |||||
if not ret: | |||||
break | |||||
cv2.imshow('Recording', frame) | |||||
out.write(frame) | |||||
if cv2.waitKey(1) & 0xFF == ord('q'): | |||||
break | |||||
cap.release() | |||||
out.release() | |||||
cv2.destroyAllWindows() |
tkinter | |||||
numpy | |||||
openpyxl | |||||
pandas | |||||
scipy | |||||
opencv-python | |||||
Pillow |
print("Hallo, Welt!") | |||||
print("Hallo, Welt 2 !") |