global_match_memory/gesture_input_osc_re2.py
2025-12-11 11:22:35 +01:00

335 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import mediapipe as mp
import numpy as np
import math
import time
import json
import threading
from queue import Queue, Empty
from pythonosc import udp_client
# -------------------------------
# SETTINGS (anpassen für 16GB Laptop)
# -------------------------------
TOUCH_CAM_INDEX = 1
GESTURE_CAM_INDEX = 0
GAME_SCREEN_WIDTH = 900
GAME_SCREEN_HEIGHT = 600 #600
STILL_REQUIRED = 1.0 # Sekunden Finger still
MOVE_TOLERANCE = 25 # Pixel
# Kamera / Performance
MODEL_COMPLEXITY = 1 # 0 = schnell, 1 = balanced, 2 = genau
CAMERA_FPS = 15 #30 # Ziel-FPS (best effort)
DISPLAY_WIDTH = 360 #1280
DISPLAY_HEIGHT = 240 #720
# Robustheits-Parameter
CAM_RECONNECT_DELAY = 2.0 # Sekunden Wartezeit beim Reconnect-Versuch
MAX_FRAME_AGE = 0.5 # Sekunden: wie alt ein Frame maximal sein darf
CAM_BUFFER_SIZE = 1 # versucht Puffer zu kappen
CLAP_COOLDOWN = 1.5
client = udp_client.SimpleUDPClient("127.0.0.1", 5005)
# -------------------------------
# Kalibrierung + Homographie
# -------------------------------
try:
with open("calibration.json", "r") as f:
CALIB_POINTS = json.load(f)
print("📐 Kalibrierung geladen:", CALIB_POINTS)
except Exception:
CALIB_POINTS = None
print("⚠️ Keine Kalibrierung gefunden benutze Rohkoordinaten!")
H = None
if CALIB_POINTS is not None:
try:
src = np.array(CALIB_POINTS, dtype=np.float32)
dst = np.array([
[0, 0],
[GAME_SCREEN_WIDTH, 0],
[GAME_SCREEN_WIDTH, GAME_SCREEN_HEIGHT],
[0, GAME_SCREEN_HEIGHT]
], dtype=np.float32)
H, _ = cv2.findHomography(src, dst)
print("📐 Homographie-Matrix berechnet!")
except Exception as e:
print("⚠️ Homographie fehlgeschlagen:", e)
H = None
def map_point_homography(x, y):
if H is None:
return int(x), int(y)
p = np.array([[[x, y]]], dtype=np.float32)
mapped = cv2.perspectiveTransform(p, H)[0][0]
return int(mapped[0]), int(mapped[1])
# -------------------------------
# Kamerathread (liest non-stop, hält nur das letzte Frame)
# -------------------------------
class CameraReader(threading.Thread):
def __init__(self, index, width, height, fps, name="Cam"):
super().__init__(daemon=True)
self.index = index
self.width = width
self.height = height
self.fps = fps
self.name = f"{name}-{index}"
self.cap = None
self.latest_frame = None
self.latest_ts = 0.0
self.lock = threading.Lock()
self.stop_event = threading.Event()
self.connected = False
def run(self):
backoff = 0.5
while not self.stop_event.is_set():
if not self.connected:
try:
self.cap = cv2.VideoCapture(self.index, cv2.CAP_DSHOW) if hasattr(cv2, 'CAP_DSHOW') else cv2.VideoCapture(self.index)
# Versuche Einstellungen (best effort)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
try:
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, CAM_BUFFER_SIZE)
except Exception:
pass
time.sleep(0.2) # kurz warten, damit Kamera initialisiert
if self.cap.isOpened():
self.connected = True
backoff = 0.5
print(f"{self.name} verbunden (Index {self.index})")
else:
self.cap.release()
raise RuntimeError("Cannot open camera")
except Exception as e:
print(f"⚠️ {self.name} Verbindungsfehler: {e} — retry in {backoff:.1f}s")
time.sleep(backoff)
backoff = min(backoff * 2, CAM_RECONNECT_DELAY)
continue
# Falls verbunden: Frames lesen (non-blocking best effort)
try:
ok, frame = self.cap.read()
if not ok or frame is None:
# Kamera hat kurz Probleme → reconnect
print(f"⚠️ {self.name} read failed, reconnecting...")
self._reconnect()
continue
ts = time.time()
# optional: flip depending on camera usage. Keep raw here; main thread entscheidet
with self.lock:
self.latest_frame = frame
self.latest_ts = ts
# kein sleep hier — schneller Reader liefert aktuellstes Frame
# aber ein sehr kleines sleep reduziert CPU-Last der Reader-Threads
time.sleep(0.001)
except Exception as e:
print(f"⚠️ {self.name} Laufzeitfehler: {e}")
self._reconnect()
# cleanup
if self.cap and self.cap.isOpened():
self.cap.release()
print(f"{self.name} gestoppt")
def _reconnect(self):
try:
if self.cap and self.cap.isOpened():
self.cap.release()
except Exception:
pass
self.connected = False
time.sleep(0.5)
def read_latest(self):
with self.lock:
return self.latest_frame.copy() if self.latest_frame is not None else None, self.latest_ts
def stop(self):
self.stop_event.set()
# -------------------------------
# Hauptprogramm (MediaPipe im Main-Thread)
# -------------------------------
def run_gesture_input():
# Threads für Kameras starten
cam_touch = CameraReader(TOUCH_CAM_INDEX, DISPLAY_WIDTH, DISPLAY_HEIGHT, CAMERA_FPS, name="TouchCam")
cam_gest = CameraReader(GESTURE_CAM_INDEX, DISPLAY_WIDTH, DISPLAY_HEIGHT, CAMERA_FPS, name="GestCam")
cam_touch.start()
cam_gest.start()
# MediaPipe setup im Main-Thread
mp_hands = mp.solutions.hands
mp_draw = mp.solutions.drawing_utils
hands_touch = mp_hands.Hands(
max_num_hands=1,
model_complexity=MODEL_COMPLEXITY,
min_detection_confidence=0.6,
min_tracking_confidence=0.6
)
hands_gesture = mp_hands.Hands(
max_num_hands=2,
model_complexity=MODEL_COMPLEXITY,
min_detection_confidence=0.6,
min_tracking_confidence=0.6
)
last_finger_pos = None
finger_still_start = None
prev_touch_time = 0.0
prev_clap_time = 0.0
frame_duration = 1.0 / CAMERA_FPS
last_frame_time = time.time()
try:
while True:
loop_start = time.time()
# 1) Hol die aktuellsten Frames (wenn zu alt -> skippen)
frame_t, ts_t = cam_touch.read_latest()
frame_g, ts_g = cam_gest.read_latest()
now = time.time()
# Wenn kein Frame vorhanden, einfach kurz warten und weitermachen (nicht break)
if frame_t is None or (now - ts_t) > MAX_FRAME_AGE:
# kein gültiges Touch-Frame; zeige Hinweis oder skip
# wir machen einfach weiter (kein Freeze)
frame_t = None
if frame_g is None or (now - ts_g) > MAX_FRAME_AGE:
frame_g = None
# 2) Verarbeite Touch (falls vorhanden)
if frame_t is not None:
# Flip & convert
frame_touch = cv2.flip(frame_t, -1)
th, tw = frame_touch.shape[:2]
rgb_t = cv2.cvtColor(frame_touch, cv2.COLOR_BGR2RGB)
res_t = hands_touch.process(rgb_t)
if res_t.multi_hand_landmarks:
lm = res_t.multi_hand_landmarks[0]
mp_draw.draw_landmarks(frame_touch, lm, mp_hands.HAND_CONNECTIONS)
# Finger zeigt nach unten? (daumen-/finger-orientiert)
if lm.landmark[8].y < lm.landmark[5].y:
last_finger_pos = None
finger_still_start = None
else:
fx = int(lm.landmark[8].x * tw)
fy = int(lm.landmark[8].y * th)
sx, sy = map_point_homography(fx, fy)
current_pos = (fx, fy)
if last_finger_pos is None:
last_finger_pos = current_pos
finger_still_start = time.time()
else:
dist = math.hypot(current_pos[0] - last_finger_pos[0],
current_pos[1] - last_finger_pos[1])
if dist < MOVE_TOLERANCE:
if finger_still_start and (time.time() - finger_still_start) >= STILL_REQUIRED:
if time.time() - prev_touch_time > 0.5:
client.send_message("/touch", [sx, sy])
print(f"👉 TOUCH bei {sx},{sy}")
prev_touch_time = time.time()
finger_still_start = None
else:
finger_still_start = time.time()
last_finger_pos = current_pos
cv2.circle(frame_touch, (fx, fy), 10, (0, 255, 0), -1)
cv2.putText(frame_touch, f"{sx},{sy}", (fx+10, fy-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2)
else:
last_finger_pos = None
finger_still_start = None
# Debug-Info
cv2.putText(frame_touch, f"FPS:{CAMERA_FPS} MC:{MODEL_COMPLEXITY}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,0), 2)
else:
# Frame fehlt: zeige schwarzen Platzhalter
frame_touch = np.zeros((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3), dtype=np.uint8)
cv2.putText(frame_touch, "No Touch Frame", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,255), 2)
# 3) Verarbeite Gesture (Clap) falls vorhanden
if frame_g is not None:
frame_gest = cv2.flip(frame_g, 1)
gh, gw = frame_gest.shape[:2]
rgb_g = cv2.cvtColor(frame_gest, cv2.COLOR_BGR2RGB)
res_g = hands_gesture.process(rgb_g)
if res_g.multi_hand_landmarks and len(res_g.multi_hand_landmarks) == 2:
h1, h2 = res_g.multi_hand_landmarks
x1 = np.mean([p.x for p in h1.landmark]) * gw
y1 = np.mean([p.y for p in h1.landmark]) * gh
x2 = np.mean([p.x for p in h2.landmark]) * gw
y2 = np.mean([p.y for p in h2.landmark]) * gh
dist = math.hypot(x2 - x1, y2 - y1)
if dist < 100 and (time.time() - prev_clap_time) > CLAP_COOLDOWN:
prev_clap_time = time.time()
client.send_message("/clap", 1)
print("👏 SEND /clap")
cv2.putText(frame_gest, "👏", (int(gw/2)-20, 80),
cv2.FONT_HERSHEY_SIMPLEX, 2, (0,255,255), 3)
# Falls keine Hände: optionales Overlay entfernen / nichts tun
else:
frame_gest = np.zeros((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3), dtype=np.uint8)
cv2.putText(frame_gest, "No Gesture Frame", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,255), 2)
# 4) Show (nicht blockierend)
cv2.imshow("Touch-Cam", frame_touch)
cv2.imshow("Gesture-Cam", frame_gest)
# 5) Input handling (sehr kurze wait, damit Fenster-Events gehandelt werden)
key = cv2.waitKey(1)
if key == 27:
print("⏹ ESC gedrückt - Beenden")
break
# 6) FPS-Limiter (best-effort, kein long sleep)
elapsed = time.time() - loop_start
remaining = frame_duration - elapsed
if remaining > 0:
# kleine sleep um CPU zu schonen; nicht lang blockieren
time.sleep(min(remaining, 0.01))
except KeyboardInterrupt:
print("■ KeyboardInterrupt erhalten")
finally:
# Sauber schließen
try:
hands_touch.close()
hands_gesture.close()
except Exception:
pass
cam_touch.stop()
cam_gest.stop()
# Threads können etwas Zeit brauchen
cam_touch.join(timeout=2.0)
cam_gest.join(timeout=2.0)
cv2.destroyAllWindows()
print("✔ Programm beendet")
if __name__ == "__main__":
run_gesture_input()