import sys import cv2 import numpy as np from PyQt5.QtWidgets import ( QApplication, QWidget, QFormLayout, QPushButton, QLabel, QHBoxLayout, QVBoxLayout, QComboBox ) from PyQt5.QtGui import QImage, QPixmap from PyQt5.QtCore import QTimer, Qt, QThread, pyqtSignal, QElapsedTimer import ideal_filter # Ensure this is properly implemented import butterworth_filter # Ensure this is properly implemented class CameraThread(QThread): frame_ready = pyqtSignal(np.ndarray) def __init__(self): super().__init__() self.is_running = True self.cap = cv2.VideoCapture(0) self.fps = self.cap.get(cv2.CAP_PROP_FPS) or 30 def run(self): while self.is_running: ret, frame = self.cap.read() if ret: self.frame_ready.emit(frame) else: print("Error: Could not read frame from camera.") self.msleep(int(1000 / self.fps)) def stop(self): self.is_running = False self.cap.release() class FilterWorker(QThread): result_ready = pyqtSignal(np.ndarray, float) def __init__(self, buffer, alpha, chromAttenuation, fps, filter_type="Ideal", width=512, height=512, time_window=5): super().__init__() self.buffer = buffer self.alpha = alpha self.chromAttenuation = chromAttenuation self.fps = fps self.width = width self.height = height self.time_window = time_window self.is_running = True self.filter_type = filter_type self.low, self.high = (1, 2.5) if filter_type == "Ideal" else (0.1, 0.5) def run(self): if self.filter_type == "Ideal": final_video, bpm = ideal_filter.start( vidFile=self.buffer, alpha=self.alpha, low=self.low, high=self.high, chromAttenuation=self.chromAttenuation, fps=self.fps, width=self.width, height=self.height ) elif self.filter_type == "Butterworth": final_video, bpm = butterworth_filter.start( video_frames=self.buffer, alpha=self.alpha, low=self.low, high=self.high, chromAttenuation=self.chromAttenuation, fps=self.fps, width=self.width, height=self.height, time_window=self.time_window ) if self.is_running: self.result_ready.emit(final_video, bpm) def stop(self): self.is_running = False class ParameterGUI(QWidget): def __init__(self): super().__init__() self.setWindowTitle('Video Filtering Display') self.setFixedSize(1400, 800) self.setup_ui() modelFile = "res10_300x300_ssd_iter_140000_fp16.caffemodel" configFile = "deploy.prototxt" self.face_net = cv2.dnn.readNetFromCaffe(configFile, modelFile) self.face_buffer = [] self.video_buffer = [] self.buffer_length = 0 self.elapsed_timer = QElapsedTimer() self.is_processing = False self.worker = None self.camera_thread = None def setup_ui(self): layout = QVBoxLayout() # ComboBoxes for user parameters self.alphaMenu = QComboBox(self) alpha_values = [5, 10, 15, 20, 30, 40, 50, 60] self.alphaMenu.addItems([str(value) for value in alpha_values]) self.chromAtt = QComboBox(self) chrom_values = [0.0001, 0.001,0.01,0.1,0.5] self.chromAtt.addItems([str(value) for value in chrom_values]) self.timeWindowMenu = QComboBox(self) self.timeWindowMenu.addItems(["5", "10", "15", "20"]) self.filterMenu = QComboBox(self) self.filterMenu.addItems(["Ideal", "Butterworth"]) # Form layout for parameters form_layout = QFormLayout() form_layout.addRow("Alpha:", self.alphaMenu) form_layout.addRow("ChromAttenuation:", self.chromAtt) form_layout.addRow("Filter:", self.filterMenu) form_layout.addRow("Time Window (seconds):", self.timeWindowMenu) self.submitButton = QPushButton('Start Camera') self.submitButton.clicked.connect(self.start_camera) form_layout.addRow(self.submitButton) layout.addLayout(form_layout) # Layout for displaying video video_layout = QHBoxLayout() self.liveVideoLabel = QLabel(self) self.liveVideoLabel.setFixedSize(640, 480) self.processedVideoLabel = QLabel(self) self.processedVideoLabel.setFixedSize(640, 480) video_layout.addWidget(self.liveVideoLabel, alignment=Qt.AlignCenter) video_layout.addWidget(self.processedVideoLabel, alignment=Qt.AlignCenter) layout.addLayout(video_layout) # BPM and status labels self.bpmLabel = QLabel('BPM: ', self) layout.addWidget(self.bpmLabel) self.bufferStatusLabel = QLabel('Buffer status: Waiting...', self) layout.addWidget(self.bufferStatusLabel) self.filterStatusLabel = QLabel('Filter status: Not running', self) layout.addWidget(self.filterStatusLabel) self.ParameterStatusLabel = QLabel('No parameters set', self) layout.addWidget(self.ParameterStatusLabel) self.setLayout(layout) def start_camera(self): # Stop existing camera thread if it's running if self.camera_thread is not None: self.camera_thread.stop() self.camera_thread.wait() # Stop existing worker thread if it's running if self.worker is not None: self.worker.stop() self.worker.wait() # Stop any existing timer for video display if not hasattr(self, 'timer'): self.timer = QTimer(self) if self.timer.isActive(): self.timer.stop() # Stop any running timer before starting new camera session # Reset buffers and status labels self.face_buffer.clear() self.video_buffer.clear() self.is_processing = False self.bufferStatusLabel.setText('Buffer status: Waiting...') self.filterStatusLabel.setText('Filter status: Not running') self.bpmLabel.setText('BPM: ') # Fetch parameters from UI self.alpha = int(self.alphaMenu.currentText()) self.chromAttenuation = float(self.chromAtt.currentText()) self.filter = str(self.filterMenu.currentText()) self.timeWindow = int(self.timeWindowMenu.currentText()) # Update the parameter status label self.ParameterStatusLabel.setText(f'Alpha: {self.alpha} ChromAttenuation: {self.chromAttenuation} TimeWindow: {self.timeWindow}') # Start the camera thread self.camera_thread = CameraThread() # Initialize the new camera thread self.camera_thread.frame_ready.connect(self.update_frame) self.camera_thread.start() # Set FPS and buffer length based on the camera's FPS self.fps = self.camera_thread.fps self.buffer_length = int(self.camera_thread.fps * self.timeWindow) # Start the elapsed timer to measure buffering time self.elapsed_timer.start() def update_frame(self, frame): if not self.is_processing: self.bufferStatusLabel.setText('Buffer status: Filling up') if self.filter == "Butterworth": upper_body_region, coords = self.get_upper_body(frame) if upper_body_region is not None: upper_body_resized = cv2.resize(upper_body_region, (512, 512)) self.video_buffer.append(upper_body_resized) startX, startY, endX, endY = coords cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) if self.filter == "Ideal": face_region = self.get_face(frame) if face_region is not None: face_region_resized = cv2.resize(face_region, (512, 512)) #Weißabgleich face_region_resized = cv2.GaussianBlur(face_region_resized,(25,25),0) face_region_resized = cv2.medianBlur(face_region_resized,25) self.face_buffer.append(face_region_resized) if self.elapsed_timer.elapsed() >= self.timeWindow * 1000: self.process_buffers() self.elapsed_timer.restart() # Display the live frame frame_display = self.resize_frame(frame, self.liveVideoLabel) frame_display = cv2.cvtColor(frame_display, cv2.COLOR_BGR2RGB) height, width, channel = frame_display.shape bytes_per_line = channel * width q_img = QImage(frame_display.data, width, height, bytes_per_line, QImage.Format_RGB888) self.liveVideoLabel.setPixmap(QPixmap.fromImage(q_img)) def get_face(self, frame): (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(cv2.resize(frame, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0)) self.face_net.setInput(blob) detections = self.face_net.forward() for i in range(0, detections.shape[2]): confidence = detections[0, 0, i, 2] if confidence > 0.5: box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") face_region = frame[startY:endY, startX:endX] cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) return face_region return None def get_upper_body(self, frame): (h, w) = frame.shape[:2] startY = int(h / 2) endY = h startX = int(w / 4) endX = int(w * 3 / 4) cropped_frame = frame[startY:endY, startX:endX] return cropped_frame, (startX, startY, endX, endY) def process_buffers(self): if self.is_processing: return self.is_processing = True self.bufferStatusLabel.setText('Buffer status: Completed') self.filterStatusLabel.setText('Filter status: Running') time_window = int(self.timeWindowMenu.currentText()) if self.filter == "Ideal" and self.face_buffer: self.worker = FilterWorker( self.face_buffer.copy(), # Copy buffer before clearing self.alpha, self.chromAttenuation, self.camera_thread.fps, filter_type="Ideal", time_window=time_window ) self.worker.result_ready.connect(self.display_filtered_video) self.worker.start() self.face_buffer.clear() elif self.filter == "Butterworth" and self.video_buffer: self.worker = FilterWorker( self.video_buffer.copy(), # Copy buffer before clearing self.alpha, self.chromAttenuation, self.camera_thread.fps, filter_type="Butterworth", time_window=time_window ) self.worker.result_ready.connect(self.display_filtered_video) self.worker.start() # Clear the buffer after starting the filter worker self.video_buffer.clear() def display_filtered_video(self, final_video, bpm): self.bpmLabel.setText(f'BPM: {bpm:.2f}') self.filterStatusLabel.setText('Filter status: Displaying video') self.frame_index = 0 self.final_video = final_video # Stop the existing timer (if any) and set up a new timer for frame display if hasattr(self, 'frame_timer'): self.frame_timer.stop() self.frame_timer = QTimer(self) self.frame_timer.timeout.connect(lambda: self.show_filtered_frame(self.final_video)) self.frame_timer.start(int(1000 / self.fps)) # Display frames based on FPS print(self.fps) def show_filtered_frame(self, final_video): """Displays each frame from the filtered video using a QTimer.""" if self.frame_index < len(final_video): frame = final_video[self.frame_index] if frame.dtype == np.float64: frame = cv2.normalize(frame, None, 0, 255, cv2.NORM_MINMAX) frame = frame.astype(np.uint8) # Resize and display the filtered frame frame_resized = self.resize_frame(frame, self.processedVideoLabel) frame_resized = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2RGB) height, width, channel = frame_resized.shape bytes_per_line = channel * width q_img = QImage(frame_resized.data, width, height, bytes_per_line, QImage.Format_RGB888) self.processedVideoLabel.setPixmap(QPixmap.fromImage(q_img)) QApplication.processEvents() self.frame_index += 1 else: # Stop the filtered video display timer self.frame_timer.stop() # Restart the live video feed if hasattr(self, 'timer'): self.timer.start(int(1000 / self.fps)) # Restart the live feed timer else: print("Error: Timer for live video is not initialized.") self.filterStatusLabel.setText('Filter status: Completed') self.is_processing = False self.bufferStatusLabel.setText('Buffer status: Blocked') def resize_frame(self, frame, label): size = label.size() return cv2.resize(frame, (size.width(), size.height())) def closeEvent(self, event): if self.camera_thread: self.camera_thread.stop() if self.worker: self.worker.stop() if self.frame_timer: self.frame_timer.stop() event.accept() if __name__ == '__main__': app = QApplication(sys.argv) window = ParameterGUI() window.show() sys.exit(app.exec_()) for i in range(frame_count): ret, frame = cap.read() if not ret: print(f"Frame {i+1} konnte nicht gelesen werden.") break frames[i] = frame # Optional: Vorschau anzeigen cv2.imshow("Aufnahme", frame) if cv2.waitKey(1) & 0xFF == ord('q'): # Beenden durch Drücken von 'q' break # Kamera und Fenster freigeben cap.release() cv2.destroyAllWindows()