367 lines
14 KiB
Python
Executable File
367 lines
14 KiB
Python
Executable File
#!/usr/bin/python
|
|
#
|
|
# Software License Agreement (BSD License)
|
|
#
|
|
# Copyright (c) 2009, Willow Garage, Inc.
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions
|
|
# are met:
|
|
#
|
|
# * Redistributions of source code must retain the above copyright
|
|
# notice, this list of conditions and the following disclaimer.
|
|
# * Redistributions in binary form must reproduce the above
|
|
# copyright notice, this list of conditions and the following
|
|
# disclaimer in the documentation and/or other materials provided
|
|
# with the distribution.
|
|
# * Neither the name of the Willow Garage nor the names of its
|
|
# contributors may be used to endorse or promote products derived
|
|
# from this software without specific prior written permission.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
|
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
|
|
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
|
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
|
|
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
|
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
|
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
|
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
import cv2
|
|
import message_filters
|
|
import numpy
|
|
import os
|
|
import rclpy
|
|
from rclpy.node import Node
|
|
import sensor_msgs.msg
|
|
import sensor_msgs.srv
|
|
import threading
|
|
import time
|
|
from camera_calibration.calibrator import MonoCalibrator, StereoCalibrator, ChessboardInfo, Patterns
|
|
from collections import deque
|
|
from message_filters import ApproximateTimeSynchronizer
|
|
from std_msgs.msg import String
|
|
from std_srvs.srv import Empty
|
|
|
|
class SpinThread(threading.Thread):
|
|
"""
|
|
Thread that spins the ros node, while imshow runs in the main thread
|
|
"""
|
|
|
|
def __init__(self, node):
|
|
threading.Thread.__init__(self)
|
|
self.node = node
|
|
|
|
def run(self):
|
|
rclpy.spin(self.node)
|
|
|
|
|
|
class ConsumerThread(threading.Thread):
|
|
def __init__(self, queue, function):
|
|
threading.Thread.__init__(self)
|
|
self.queue = queue
|
|
self.function = function
|
|
|
|
def run(self):
|
|
while True:
|
|
# wait for an image (could happen at the very beginning when the queue is still empty)
|
|
while len(self.queue) == 0:
|
|
time.sleep(0.1)
|
|
self.function(self.queue[0])
|
|
|
|
|
|
class CalibrationNode(Node):
|
|
def __init__(self, name, boards, service_check = True, synchronizer = message_filters.TimeSynchronizer, flags = 0,
|
|
pattern=Patterns.Chessboard, camera_name='', checkerboard_flags = 0):
|
|
super().__init__(name)
|
|
|
|
self.set_camera_info_service = self.create_client(sensor_msgs.srv.SetCameraInfo,
|
|
"camera/set_camera_info")
|
|
self.set_left_camera_info_service = self.create_client(sensor_msgs.srv.SetCameraInfo,
|
|
"left_camera/set_camera_info")
|
|
self.set_right_camera_info_service = self.create_client(sensor_msgs.srv.SetCameraInfo,
|
|
"right_camera/set_camera_info")
|
|
|
|
if service_check:
|
|
# assume any non-default service names have been set. Wait for the service to become ready
|
|
for cli in [self.set_camera_info_service, self.set_left_camera_info_service, self.set_right_camera_info_service]:
|
|
#remapped = rclpy.remap_name(svcname)
|
|
#if remapped != svcname:
|
|
#fullservicename = "%s/set_camera_info" % remapped
|
|
print("Waiting for service", cli.srv_name, "...")
|
|
# check all services so they are ready.
|
|
try:
|
|
cli.wait_for_service(timeout_sec=5)
|
|
print("OK")
|
|
except Exception as e:
|
|
print("Service not found: %s".format(e))
|
|
rclpy.shutdown()
|
|
|
|
self._boards = boards
|
|
self._calib_flags = flags
|
|
self._checkerboard_flags = checkerboard_flags
|
|
self._pattern = pattern
|
|
self._camera_name = camera_name
|
|
lsub = message_filters.Subscriber(self, sensor_msgs.msg.Image, 'left')
|
|
rsub = message_filters.Subscriber(self, sensor_msgs.msg.Image, 'right')
|
|
ts = synchronizer([lsub, rsub], 4)
|
|
ts.registerCallback(self.queue_stereo)
|
|
|
|
msub = message_filters.Subscriber(self, sensor_msgs.msg.Image, 'image')
|
|
msub.registerCallback(self.queue_monocular)
|
|
|
|
self.q_mono = deque([], 1)
|
|
self.q_stereo = deque([], 1)
|
|
|
|
self.c = None
|
|
|
|
mth = ConsumerThread(self.q_mono, self.handle_monocular)
|
|
mth.setDaemon(True)
|
|
mth.start()
|
|
|
|
sth = ConsumerThread(self.q_stereo, self.handle_stereo)
|
|
sth.setDaemon(True)
|
|
sth.start()
|
|
|
|
def redraw_stereo(self, *args):
|
|
pass
|
|
def redraw_monocular(self, *args):
|
|
pass
|
|
|
|
def queue_monocular(self, msg):
|
|
self.q_mono.append(msg)
|
|
|
|
def queue_stereo(self, lmsg, rmsg):
|
|
self.q_stereo.append((lmsg, rmsg))
|
|
|
|
def handle_monocular(self, msg):
|
|
if self.c == None:
|
|
if self._camera_name:
|
|
self.c = MonoCalibrator(self._boards, self._calib_flags, self._pattern, name=self._camera_name,
|
|
checkerboard_flags=self._checkerboard_flags)
|
|
else:
|
|
self.c = MonoCalibrator(self._boards, self._calib_flags, self._pattern,
|
|
checkerboard_flags=self.checkerboard_flags)
|
|
|
|
# This should just call the MonoCalibrator
|
|
drawable = self.c.handle_msg(msg)
|
|
self.displaywidth = drawable.scrib.shape[1]
|
|
self.redraw_monocular(drawable)
|
|
|
|
def handle_stereo(self, msg):
|
|
if self.c == None:
|
|
if self._camera_name:
|
|
self.c = StereoCalibrator(self._boards, self._calib_flags, self._pattern, name=self._camera_name,
|
|
checkerboard_flags=self._checkerboard_flags)
|
|
else:
|
|
self.c = StereoCalibrator(self._boards, self._calib_flags, self._pattern,
|
|
checkerboard_flags=self._checkerboard_flags)
|
|
|
|
drawable = self.c.handle_msg(msg)
|
|
self.displaywidth = drawable.lscrib.shape[1] + drawable.rscrib.shape[1]
|
|
self.redraw_stereo(drawable)
|
|
|
|
|
|
def check_set_camera_info(self, response):
|
|
if response.done():
|
|
if response.result() is not None:
|
|
if response.result().success:
|
|
return True
|
|
|
|
for i in range(10):
|
|
print("!" * 80)
|
|
print()
|
|
print("Attempt to set camera info failed: " + response.result() if response.result() is not None else "Not available")
|
|
print()
|
|
for i in range(10):
|
|
print("!" * 80)
|
|
print()
|
|
self.get_logger().error('Unable to set camera info for calibration. Failure message: %s' % response.result() if response.result() is not None else "Not available")
|
|
return False
|
|
|
|
def do_upload(self):
|
|
self.c.report()
|
|
print(self.c.ost())
|
|
info = self.c.as_message()
|
|
|
|
req = sensor_msgs.srv.SetCameraInfo.Request()
|
|
rv = True
|
|
if self.c.is_mono:
|
|
req.camera_info = info
|
|
response = self.set_camera_info_service.call_async(req)
|
|
rv = self.check_set_camera_info(response)
|
|
else:
|
|
req.camera_info = info[0]
|
|
response = self.set_left_camera_info_service.call_async(req)
|
|
rv = rv and self.check_set_camera_info(response)
|
|
req.camera_info = info[1]
|
|
response = self.set_right_camera_info_service.call_async(req)
|
|
rv = rv and self.check_set_camera_info(response)
|
|
return rv
|
|
|
|
|
|
class OpenCVCalibrationNode(CalibrationNode):
|
|
""" Calibration node with an OpenCV Gui """
|
|
FONT_FACE = cv2.FONT_HERSHEY_SIMPLEX
|
|
FONT_SCALE = 0.6
|
|
FONT_THICKNESS = 2
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
CalibrationNode.__init__(self, *args, **kwargs)
|
|
|
|
self.queue_display = deque([], 1)
|
|
self.initWindow()
|
|
|
|
def spin(self):
|
|
sth = SpinThread(self)
|
|
sth.setDaemon(True)
|
|
sth.start()
|
|
|
|
while True:
|
|
# wait for an image (could happen at the very beginning when the queue is still empty)
|
|
while len(self.queue_display) == 0:
|
|
time.sleep(0.1)
|
|
im = self.queue_display[0]
|
|
cv2.imshow("display", im)
|
|
k = cv2.waitKey(6) & 0xFF
|
|
if k in [27, ord('q')]:
|
|
rclpy.shutdown()
|
|
elif k == ord('s'):
|
|
self.screendump(im)
|
|
|
|
def initWindow(self):
|
|
cv2.namedWindow("display", cv2.WINDOW_NORMAL)
|
|
cv2.setMouseCallback("display", self.on_mouse)
|
|
cv2.createTrackbar("scale", "display", 0, 100, self.on_scale)
|
|
|
|
@classmethod
|
|
def putText(cls, img, text, org, color = (0,0,0)):
|
|
cv2.putText(img, text, org, cls.FONT_FACE, cls.FONT_SCALE, color, thickness = cls.FONT_THICKNESS)
|
|
|
|
@classmethod
|
|
def getTextSize(cls, text):
|
|
return cv2.getTextSize(text, cls.FONT_FACE, cls.FONT_SCALE, cls.FONT_THICKNESS)[0]
|
|
|
|
def on_mouse(self, event, x, y, flags, param):
|
|
if event == cv2.EVENT_LBUTTONDOWN and self.displaywidth < x:
|
|
if self.c.goodenough:
|
|
if 180 <= y < 280:
|
|
self.c.do_calibration()
|
|
if self.c.calibrated:
|
|
if 280 <= y < 380:
|
|
self.c.do_save()
|
|
elif 380 <= y < 480:
|
|
# Only shut down if we set camera info correctly, #3993
|
|
if self.do_upload():
|
|
rclpy.shutdown()
|
|
|
|
def on_scale(self, scalevalue):
|
|
if self.c.calibrated:
|
|
self.c.set_alpha(scalevalue / 100.0)
|
|
|
|
def button(self, dst, label, enable):
|
|
dst.fill(255)
|
|
size = (dst.shape[1], dst.shape[0])
|
|
if enable:
|
|
color = (155, 155, 80)
|
|
else:
|
|
color = (224, 224, 224)
|
|
cv2.circle(dst, (size[0] // 2, size[1] // 2), min(size) // 2, color, -1)
|
|
(w, h) = self.getTextSize(label)
|
|
self.putText(dst, label, ((size[0] - w) // 2, (size[1] + h) // 2), (255,255,255))
|
|
|
|
def buttons(self, display):
|
|
x = self.displaywidth
|
|
self.button(display[180:280,x:x+100], "CALIBRATE", self.c.goodenough)
|
|
self.button(display[280:380,x:x+100], "SAVE", self.c.calibrated)
|
|
self.button(display[380:480,x:x+100], "COMMIT", self.c.calibrated)
|
|
|
|
def y(self, i):
|
|
"""Set up right-size images"""
|
|
return 30 + 40 * i
|
|
|
|
def screendump(self, im):
|
|
i = 0
|
|
while os.access("/tmp/dump%d.png" % i, os.R_OK):
|
|
i += 1
|
|
cv2.imwrite("/tmp/dump%d.png" % i, im)
|
|
|
|
def redraw_monocular(self, drawable):
|
|
height = drawable.scrib.shape[0]
|
|
width = drawable.scrib.shape[1]
|
|
|
|
display = numpy.zeros((max(480, height), width + 100, 3), dtype=numpy.uint8)
|
|
display[0:height, 0:width,:] = drawable.scrib
|
|
display[0:height, width:width+100,:].fill(255)
|
|
|
|
|
|
self.buttons(display)
|
|
if not self.c.calibrated:
|
|
if drawable.params:
|
|
for i, (label, lo, hi, progress) in enumerate(drawable.params):
|
|
(w,_) = self.getTextSize(label)
|
|
self.putText(display, label, (width + (100 - w) // 2, self.y(i)))
|
|
color = (0,255,0)
|
|
if progress < 1.0:
|
|
color = (0, int(progress*255.), 255)
|
|
cv2.line(display,
|
|
(int(width + lo * 100), self.y(i) + 20),
|
|
(int(width + hi * 100), self.y(i) + 20),
|
|
color, 4)
|
|
|
|
else:
|
|
self.putText(display, "lin.", (width, self.y(0)))
|
|
linerror = drawable.linear_error
|
|
if linerror < 0:
|
|
msg = "?"
|
|
else:
|
|
msg = "%.2f" % linerror
|
|
#print "linear", linerror
|
|
self.putText(display, msg, (width, self.y(1)))
|
|
|
|
self.queue_display.append(display)
|
|
|
|
def redraw_stereo(self, drawable):
|
|
height = drawable.lscrib.shape[0]
|
|
width = drawable.lscrib.shape[1]
|
|
|
|
display = numpy.zeros((max(480, height), 2 * width + 100, 3), dtype=numpy.uint8)
|
|
display[0:height, 0:width,:] = drawable.lscrib
|
|
display[0:height, width:2*width,:] = drawable.rscrib
|
|
display[0:height, 2*width:2*width+100,:].fill(255)
|
|
|
|
self.buttons(display)
|
|
|
|
if not self.c.calibrated:
|
|
if drawable.params:
|
|
for i, (label, lo, hi, progress) in enumerate(drawable.params):
|
|
(w,_) = self.getTextSize(label)
|
|
self.putText(display, label, (2 * width + (100 - w) // 2, self.y(i)))
|
|
color = (0,255,0)
|
|
if progress < 1.0:
|
|
color = (0, int(progress*255.), 255)
|
|
cv2.line(display,
|
|
(int(2 * width + lo * 100), self.y(i) + 20),
|
|
(int(2 * width + hi * 100), self.y(i) + 20),
|
|
color, 4)
|
|
|
|
else:
|
|
self.putText(display, "epi.", (2 * width, self.y(0)))
|
|
if drawable.epierror == -1:
|
|
msg = "?"
|
|
else:
|
|
msg = "%.2f" % drawable.epierror
|
|
self.putText(display, msg, (2 * width, self.y(1)))
|
|
# TODO dim is never set anywhere. Supposed to be observed chessboard size?
|
|
if drawable.dim != -1:
|
|
self.putText(display, "dim", (2 * width, self.y(2)))
|
|
self.putText(display, "%.3f" % drawable.dim, (2 * width, self.y(3)))
|
|
|
|
self.queue_display.append(display)
|