335 lines
13 KiB
Python
335 lines
13 KiB
Python
import cv2
|
||
import mediapipe as mp
|
||
import numpy as np
|
||
import math
|
||
import time
|
||
import json
|
||
import threading
|
||
from queue import Queue, Empty
|
||
from pythonosc import udp_client
|
||
|
||
# -------------------------------
|
||
# SETTINGS (anpassen für 16GB Laptop)
|
||
# -------------------------------
|
||
TOUCH_CAM_INDEX = 1
|
||
GESTURE_CAM_INDEX = 0
|
||
|
||
GAME_SCREEN_WIDTH = 900
|
||
GAME_SCREEN_HEIGHT = 600 #600
|
||
|
||
STILL_REQUIRED = 1.0 # Sekunden Finger still
|
||
MOVE_TOLERANCE = 25 # Pixel
|
||
|
||
# Kamera / Performance
|
||
MODEL_COMPLEXITY = 1 # 0 = schnell, 1 = balanced, 2 = genau
|
||
CAMERA_FPS = 15 #30 # Ziel-FPS (best effort)
|
||
DISPLAY_WIDTH = 360 #1280
|
||
DISPLAY_HEIGHT = 240 #720
|
||
|
||
# Robustheits-Parameter
|
||
CAM_RECONNECT_DELAY = 2.0 # Sekunden Wartezeit beim Reconnect-Versuch
|
||
MAX_FRAME_AGE = 0.5 # Sekunden: wie alt ein Frame maximal sein darf
|
||
CAM_BUFFER_SIZE = 1 # versucht Puffer zu kappen
|
||
CLAP_COOLDOWN = 1.5
|
||
|
||
client = udp_client.SimpleUDPClient("127.0.0.1", 5005)
|
||
|
||
# -------------------------------
|
||
# Kalibrierung + Homographie
|
||
# -------------------------------
|
||
try:
|
||
with open("calibration.json", "r") as f:
|
||
CALIB_POINTS = json.load(f)
|
||
print("📐 Kalibrierung geladen:", CALIB_POINTS)
|
||
except Exception:
|
||
CALIB_POINTS = None
|
||
print("⚠️ Keine Kalibrierung gefunden – benutze Rohkoordinaten!")
|
||
|
||
H = None
|
||
if CALIB_POINTS is not None:
|
||
try:
|
||
src = np.array(CALIB_POINTS, dtype=np.float32)
|
||
dst = np.array([
|
||
[0, 0],
|
||
[GAME_SCREEN_WIDTH, 0],
|
||
[GAME_SCREEN_WIDTH, GAME_SCREEN_HEIGHT],
|
||
[0, GAME_SCREEN_HEIGHT]
|
||
], dtype=np.float32)
|
||
H, _ = cv2.findHomography(src, dst)
|
||
print("📐 Homographie-Matrix berechnet!")
|
||
except Exception as e:
|
||
print("⚠️ Homographie fehlgeschlagen:", e)
|
||
H = None
|
||
|
||
def map_point_homography(x, y):
|
||
if H is None:
|
||
return int(x), int(y)
|
||
p = np.array([[[x, y]]], dtype=np.float32)
|
||
mapped = cv2.perspectiveTransform(p, H)[0][0]
|
||
return int(mapped[0]), int(mapped[1])
|
||
|
||
# -------------------------------
|
||
# Kamerathread (liest non-stop, hält nur das letzte Frame)
|
||
# -------------------------------
|
||
class CameraReader(threading.Thread):
|
||
def __init__(self, index, width, height, fps, name="Cam"):
|
||
super().__init__(daemon=True)
|
||
self.index = index
|
||
self.width = width
|
||
self.height = height
|
||
self.fps = fps
|
||
self.name = f"{name}-{index}"
|
||
self.cap = None
|
||
self.latest_frame = None
|
||
self.latest_ts = 0.0
|
||
self.lock = threading.Lock()
|
||
self.stop_event = threading.Event()
|
||
self.connected = False
|
||
|
||
def run(self):
|
||
backoff = 0.5
|
||
while not self.stop_event.is_set():
|
||
if not self.connected:
|
||
try:
|
||
self.cap = cv2.VideoCapture(self.index, cv2.CAP_DSHOW) if hasattr(cv2, 'CAP_DSHOW') else cv2.VideoCapture(self.index)
|
||
# Versuche Einstellungen (best effort)
|
||
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
|
||
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
|
||
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
|
||
try:
|
||
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, CAM_BUFFER_SIZE)
|
||
except Exception:
|
||
pass
|
||
time.sleep(0.2) # kurz warten, damit Kamera initialisiert
|
||
if self.cap.isOpened():
|
||
self.connected = True
|
||
backoff = 0.5
|
||
print(f"▶ {self.name} verbunden (Index {self.index})")
|
||
else:
|
||
self.cap.release()
|
||
raise RuntimeError("Cannot open camera")
|
||
except Exception as e:
|
||
print(f"⚠️ {self.name} Verbindungsfehler: {e} — retry in {backoff:.1f}s")
|
||
time.sleep(backoff)
|
||
backoff = min(backoff * 2, CAM_RECONNECT_DELAY)
|
||
continue
|
||
|
||
# Falls verbunden: Frames lesen (non-blocking best effort)
|
||
try:
|
||
ok, frame = self.cap.read()
|
||
if not ok or frame is None:
|
||
# Kamera hat kurz Probleme → reconnect
|
||
print(f"⚠️ {self.name} read failed, reconnecting...")
|
||
self._reconnect()
|
||
continue
|
||
|
||
ts = time.time()
|
||
# optional: flip depending on camera usage. Keep raw here; main thread entscheidet
|
||
with self.lock:
|
||
self.latest_frame = frame
|
||
self.latest_ts = ts
|
||
|
||
# kein sleep hier — schneller Reader liefert aktuellstes Frame
|
||
# aber ein sehr kleines sleep reduziert CPU-Last der Reader-Threads
|
||
time.sleep(0.001)
|
||
except Exception as e:
|
||
print(f"⚠️ {self.name} Laufzeitfehler: {e}")
|
||
self._reconnect()
|
||
|
||
# cleanup
|
||
if self.cap and self.cap.isOpened():
|
||
self.cap.release()
|
||
print(f"■ {self.name} gestoppt")
|
||
|
||
def _reconnect(self):
|
||
try:
|
||
if self.cap and self.cap.isOpened():
|
||
self.cap.release()
|
||
except Exception:
|
||
pass
|
||
self.connected = False
|
||
time.sleep(0.5)
|
||
|
||
def read_latest(self):
|
||
with self.lock:
|
||
return self.latest_frame.copy() if self.latest_frame is not None else None, self.latest_ts
|
||
|
||
def stop(self):
|
||
self.stop_event.set()
|
||
|
||
# -------------------------------
|
||
# Hauptprogramm (MediaPipe im Main-Thread)
|
||
# -------------------------------
|
||
def run_gesture_input():
|
||
# Threads für Kameras starten
|
||
cam_touch = CameraReader(TOUCH_CAM_INDEX, DISPLAY_WIDTH, DISPLAY_HEIGHT, CAMERA_FPS, name="TouchCam")
|
||
cam_gest = CameraReader(GESTURE_CAM_INDEX, DISPLAY_WIDTH, DISPLAY_HEIGHT, CAMERA_FPS, name="GestCam")
|
||
cam_touch.start()
|
||
cam_gest.start()
|
||
|
||
# MediaPipe setup im Main-Thread
|
||
mp_hands = mp.solutions.hands
|
||
mp_draw = mp.solutions.drawing_utils
|
||
|
||
hands_touch = mp_hands.Hands(
|
||
max_num_hands=1,
|
||
model_complexity=MODEL_COMPLEXITY,
|
||
min_detection_confidence=0.6,
|
||
min_tracking_confidence=0.6
|
||
)
|
||
hands_gesture = mp_hands.Hands(
|
||
max_num_hands=2,
|
||
model_complexity=MODEL_COMPLEXITY,
|
||
min_detection_confidence=0.6,
|
||
min_tracking_confidence=0.6
|
||
)
|
||
|
||
last_finger_pos = None
|
||
finger_still_start = None
|
||
prev_touch_time = 0.0
|
||
prev_clap_time = 0.0
|
||
|
||
frame_duration = 1.0 / CAMERA_FPS
|
||
last_frame_time = time.time()
|
||
|
||
try:
|
||
while True:
|
||
loop_start = time.time()
|
||
|
||
# 1) Hol die aktuellsten Frames (wenn zu alt -> skippen)
|
||
frame_t, ts_t = cam_touch.read_latest()
|
||
frame_g, ts_g = cam_gest.read_latest()
|
||
|
||
now = time.time()
|
||
|
||
# Wenn kein Frame vorhanden, einfach kurz warten und weitermachen (nicht break)
|
||
if frame_t is None or (now - ts_t) > MAX_FRAME_AGE:
|
||
# kein gültiges Touch-Frame; zeige Hinweis oder skip
|
||
# wir machen einfach weiter (kein Freeze)
|
||
frame_t = None
|
||
|
||
if frame_g is None or (now - ts_g) > MAX_FRAME_AGE:
|
||
frame_g = None
|
||
|
||
# 2) Verarbeite Touch (falls vorhanden)
|
||
if frame_t is not None:
|
||
# Flip & convert
|
||
frame_touch = cv2.flip(frame_t, -1)
|
||
th, tw = frame_touch.shape[:2]
|
||
|
||
rgb_t = cv2.cvtColor(frame_touch, cv2.COLOR_BGR2RGB)
|
||
res_t = hands_touch.process(rgb_t)
|
||
|
||
if res_t.multi_hand_landmarks:
|
||
lm = res_t.multi_hand_landmarks[0]
|
||
mp_draw.draw_landmarks(frame_touch, lm, mp_hands.HAND_CONNECTIONS)
|
||
|
||
# Finger zeigt nach unten? (daumen-/finger-orientiert)
|
||
if lm.landmark[8].y < lm.landmark[5].y:
|
||
last_finger_pos = None
|
||
finger_still_start = None
|
||
else:
|
||
fx = int(lm.landmark[8].x * tw)
|
||
fy = int(lm.landmark[8].y * th)
|
||
sx, sy = map_point_homography(fx, fy)
|
||
|
||
current_pos = (fx, fy)
|
||
if last_finger_pos is None:
|
||
last_finger_pos = current_pos
|
||
finger_still_start = time.time()
|
||
else:
|
||
dist = math.hypot(current_pos[0] - last_finger_pos[0],
|
||
current_pos[1] - last_finger_pos[1])
|
||
if dist < MOVE_TOLERANCE:
|
||
if finger_still_start and (time.time() - finger_still_start) >= STILL_REQUIRED:
|
||
if time.time() - prev_touch_time > 0.5:
|
||
client.send_message("/touch", [sx, sy])
|
||
print(f"👉 TOUCH bei {sx},{sy}")
|
||
prev_touch_time = time.time()
|
||
finger_still_start = None
|
||
else:
|
||
finger_still_start = time.time()
|
||
last_finger_pos = current_pos
|
||
|
||
cv2.circle(frame_touch, (fx, fy), 10, (0, 255, 0), -1)
|
||
cv2.putText(frame_touch, f"{sx},{sy}", (fx+10, fy-10),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2)
|
||
else:
|
||
last_finger_pos = None
|
||
finger_still_start = None
|
||
|
||
# Debug-Info
|
||
cv2.putText(frame_touch, f"FPS:{CAMERA_FPS} MC:{MODEL_COMPLEXITY}", (10, 30),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,0), 2)
|
||
else:
|
||
# Frame fehlt: zeige schwarzen Platzhalter
|
||
frame_touch = np.zeros((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3), dtype=np.uint8)
|
||
cv2.putText(frame_touch, "No Touch Frame", (10, 30),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,255), 2)
|
||
|
||
# 3) Verarbeite Gesture (Clap) falls vorhanden
|
||
if frame_g is not None:
|
||
frame_gest = cv2.flip(frame_g, 1)
|
||
gh, gw = frame_gest.shape[:2]
|
||
|
||
rgb_g = cv2.cvtColor(frame_gest, cv2.COLOR_BGR2RGB)
|
||
res_g = hands_gesture.process(rgb_g)
|
||
|
||
if res_g.multi_hand_landmarks and len(res_g.multi_hand_landmarks) == 2:
|
||
h1, h2 = res_g.multi_hand_landmarks
|
||
x1 = np.mean([p.x for p in h1.landmark]) * gw
|
||
y1 = np.mean([p.y for p in h1.landmark]) * gh
|
||
x2 = np.mean([p.x for p in h2.landmark]) * gw
|
||
y2 = np.mean([p.y for p in h2.landmark]) * gh
|
||
dist = math.hypot(x2 - x1, y2 - y1)
|
||
if dist < 100 and (time.time() - prev_clap_time) > CLAP_COOLDOWN:
|
||
prev_clap_time = time.time()
|
||
client.send_message("/clap", 1)
|
||
print("👏 SEND /clap")
|
||
cv2.putText(frame_gest, "👏", (int(gw/2)-20, 80),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 2, (0,255,255), 3)
|
||
# Falls keine Hände: optionales Overlay entfernen / nichts tun
|
||
else:
|
||
frame_gest = np.zeros((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3), dtype=np.uint8)
|
||
cv2.putText(frame_gest, "No Gesture Frame", (10, 30),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,255), 2)
|
||
|
||
# 4) Show (nicht blockierend)
|
||
cv2.imshow("Touch-Cam", frame_touch)
|
||
cv2.imshow("Gesture-Cam", frame_gest)
|
||
|
||
# 5) Input handling (sehr kurze wait, damit Fenster-Events gehandelt werden)
|
||
key = cv2.waitKey(1)
|
||
if key == 27:
|
||
print("⏹ ESC gedrückt - Beenden")
|
||
break
|
||
|
||
# 6) FPS-Limiter (best-effort, kein long sleep)
|
||
elapsed = time.time() - loop_start
|
||
remaining = frame_duration - elapsed
|
||
if remaining > 0:
|
||
# kleine sleep um CPU zu schonen; nicht lang blockieren
|
||
time.sleep(min(remaining, 0.01))
|
||
|
||
except KeyboardInterrupt:
|
||
print("■ KeyboardInterrupt erhalten")
|
||
finally:
|
||
# Sauber schließen
|
||
try:
|
||
hands_touch.close()
|
||
hands_gesture.close()
|
||
except Exception:
|
||
pass
|
||
|
||
cam_touch.stop()
|
||
cam_gest.stop()
|
||
# Threads können etwas Zeit brauchen
|
||
cam_touch.join(timeout=2.0)
|
||
cam_gest.join(timeout=2.0)
|
||
|
||
cv2.destroyAllWindows()
|
||
print("✔ Programm beendet")
|
||
|
||
if __name__ == "__main__":
|
||
run_gesture_input()
|