from numpy.lib.function_base import _angle_dispatcher import rclpy from rclpy.node import Node from rclpy.qos import qos_profile_sensor_data from cv_bridge import CvBridge import numpy as np import cv2 from transforms3d.quaternions import mat2quat from transforms3d.euler import mat2euler from aruco_detector import yaml_handling from sensor_msgs.msg import CompressedImage, Image from geometry_msgs.msg import Pose from aruco_interfaces.msg import ArucoMarkerPose class ArucoDetectorNode(rclpy.node.Node): def __init__(self): super().__init__('aruco_detector_node') # Declare parameters self.declare_parameter("marker_size", .1) self.declare_parameter("aruco_dictionary_id", "DICT_6X6_100") self.declare_parameter("input_image_topic", "/image_raw/compressed") self.declare_parameter("url_yaml_file", "file:///home/ros2/dev2_ws/src/aruco_detector/hd_pro_webcam_c920.yaml") # url to yaml file (calibration file) # Read parameters self.marker_size = self.get_parameter("marker_size").get_parameter_value().double_value dictionary_id_name = self.get_parameter("aruco_dictionary_id").get_parameter_value().string_value input_image_topic = self.get_parameter("input_image_topic").get_parameter_value().string_value url_yaml_file = self.get_parameter("url_yaml_file").get_parameter_value().string_value # check dictionary_id_name try: dictionary_id = cv2.aruco.__getattribute__(dictionary_id_name) if type(dictionary_id) != type(cv2.aruco.DICT_5X5_100): raise AttributeError except AttributeError: self.get_logger().error("bad aruco_dictionary_id: {}".format(dictionary_id_name)) options = "\n".join([s for s in dir(cv2.aruco) if s.startswith("DICT")]) self.get_logger().error("valid options: {}".format(options)) # Set up subscriber self.create_subscription(CompressedImage, input_image_topic, self.image_callback, qos_profile_sensor_data) # subsribes the images from the camera # Set up publishers self.publisher_marker_poses = self.create_publisher(ArucoMarkerPose, '/aruco/aruco_poses', 10) # publishes the current poses self.publisher_output_images = self.create_publisher(CompressedImage, '/aruco/aruco_output_images/compressed', 10) # publishes the images with the detected aruco-tags # Load the camera parameters from yaml-file camera_info_msg = yaml_handling.yaml_to_CameraInfo("/home/ros2/dev2_ws/src/aruco_detector/hd_pro_webcam_c920.yaml")#url_yaml_file) if camera_info_msg is None: self.get_logger().warn("No camera info has been received!") return self.intrinsic_mat = np.reshape(np.array(camera_info_msg.k), (3, 3)) self.distortion = np.array(camera_info_msg.d) self.aruco_dictionary = cv2.aruco.Dictionary_get(dictionary_id) self.aruco_parameters = cv2.aruco.DetectorParameters_create() self.bridge = CvBridge() def image_callback(self, img_msg): cv_image = self.bridge.compressed_imgmsg_to_cv2(img_msg,desired_encoding='passthrough') corners, marker_ids, rejected = cv2.aruco.detectMarkers(cv_image, self.aruco_dictionary, parameters=self.aruco_parameters) #check if marker is detected, then estimate pose an publish if marker_ids is not None: rvecs, tvecs, _objPoints = cv2.aruco.estimatePoseSingleMarkers(corners, self.marker_size, self.intrinsic_mat, self.distortion) #self.get_logger().error("rvecs: {}".format(rvecs)) pose_array = ArucoMarkerPose() for i, marker_id in enumerate(marker_ids): pose = Pose() pose.position.x = tvecs[i][0][0] pose.position.y = tvecs[i][0][1] pose.position.z = tvecs[i][0][2] rot_matrix = np.eye(4) rot_matrix[0:3, 0:3] = cv2.Rodrigues(np.array(rvecs[i][0]))[0] import math angle = math.acos(rot_matrix[0][0])*(180/math.pi) angle_psi, angle_theta, angle_phi = mat2euler(rot_matrix) #quat = mat2quat(rot_matrix[0:3, 0:3]) self.get_logger().error("angle_theta: \n{}\n\n".format(angle_theta*(180/math.pi))) self.get_logger().error("angle_phi: \n{}\n\n".format(angle_phi*(180/math.pi))) self.get_logger().error("angle_psi: \n{}\n\n".format(angle_psi*(180/math.pi))) # parameter einfügen für unterscheidung zwischen 2D und 3D # einfache if abfrage pose.orientation.x = 0.0 #quat[0] pose.orientation.y = 0.0 #quat[1] pose.orientation.z = math.sin((angle_phi) / 2) #quat[2] pose.orientation.w = math.cos((angle_phi) / 2) #quat[3] pose_array.poses.append(pose) pose_array.marker_ids.append(marker_id[0]) self.publisher_marker_poses.publish(pose_array) output_img = cv2.aruco.drawAxis(cv_image, self.intrinsic_mat, self.distortion, rvecs, tvecs, 0.5 ) output_img = cv2.aruco.drawDetectedMarkers(cv_image, corners, marker_ids) output_img_compressed = self.bridge.cv2_to_compressed_imgmsg(output_img, dst_format='jpg') self.publisher_output_images.publish(output_img_compressed) def main(args=None): rclpy.init(args=args) minimal_publisher = ArucoDetectorNode() rclpy.spin(minimal_publisher) minimal_publisher.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()