import os import asyncio import cozmo import PIL.Image import PIL.ImageFont import PIL.ImageTk import cv2 import numpy import tkinter import tensorflow as tf import time from CozmoDriver import CozmoDriver from tensorflow.python.platform import gfile USE_LOGGING = False class CozmoController: #Only for testing def printTensors(self, pb_file): # read pb into graph_def with tf.io.gfile.GFile(pb_file, "rb") as f: graph_def = tf.compat.v1.GraphDef() graph_def.ParseFromString(f.read()) # import graph_def with tf.Graph().as_default() as graph: tf.import_graph_def(graph_def) # print operations for op in graph.get_operations(): print(op.name) def __init__(self, sess, logits): self._robot = None self._tk_root = 0 self._tk_label_input = 0 self._tk_label_output = 0 self.session = sess self.logits = logits self.cozmoDriver = 0 self._counter = 0 self._frameDecisonModifier = 1 self._startTime = time.time() # Testing #self.printTensors(GRAPH_PB_PATH) if USE_LOGGING: cozmo.setup_basic_logging() cozmo.connect(self.run) def on_new_camera_image(self, event, *, image:cozmo.world.CameraImage, **kw): ''' Gets called on every new image that arrives from cozmos camera :param event: :param image: :param kw: :return: ''' elapsedTime = time.time() - self._startTime self._startTime = time.time() #print("Elapsed Time: {}".format(elapsedTime)) self._counter = self._counter + 1 raw_image = image.raw_image # Convert PIL Image to OpenCV Image cv2_image = cv2.cvtColor(numpy.array(raw_image), cv2.COLOR_RGB2BGR) # Apply edge filter cv2_edges = self.auto_canny(cv2_image) #print("Shape of the image {}".format(cv2_edges.shape)) # Crop image cv2_edges = cv2_edges[60:180, :] # resize image cv2_edges = cv2.resize(cv2_edges, (80, 30)) # Convert output image back to PIL image pil_edges = PIL.Image.fromarray(cv2.cvtColor(cv2_edges, cv2.COLOR_GRAY2RGB)) # Display input and output feed display_image_input = PIL.ImageTk.PhotoImage(image=image.annotate_image()) display_image_output = PIL.ImageTk.PhotoImage(image=pil_edges) self._tk_label_input.imgtk = display_image_input self._tk_label_input.configure(image=display_image_input) self._tk_label_output.imgtk = display_image_output self._tk_label_output.configure(image=display_image_output) self._tk_root.update() cv2_edges = numpy.array(cv2_edges).reshape(-1, 30, 80, 1) #get prediction from model prediciton = self.session.run("action:0", {logits: cv2_edges, "action_masks:0": [[1,1,1,1]]},) #Control on which frame a decision should produce a new action if(self._counter % self._frameDecisonModifier == 0): if(numpy.argmax(prediciton) == 0): #behaves to strict no movement anymores #self.cozmoDriver.stop() pass elif(numpy.argmax(prediciton) == 1): self.cozmoDriver.drive() elif(numpy.argmax(prediciton) == 2): self.cozmoDriver.driveRight() elif(numpy.argmax(prediciton) == 3): self.cozmoDriver.driveLeft() # Auto-paramter Canny edge detection adapted from: # http://www.pyimagesearch.com/2015/04/06/zero-parameter-automatic-canny-edge-detection-with-python-and-opencv/ def auto_canny(self, img, sigma=0.33): ''' Canny algorithem with automatic parameters in opencv :param img: :param sigma: :return: ''' blurred = cv2.GaussianBlur(img, (3, 3), 0) v = numpy.median(blurred) lower = int(max(0, (1.0 - sigma) * v)) upper = int(min(255, (1.0 + sigma) * v)) edged = cv2.Canny(blurred, lower-33, upper) return edged async def set_up_cozmo(self, coz_conn): ''' initialize cozmo to get images from camera and let him drive :param coz_conn: :return: ''' asyncio.set_event_loop(coz_conn._loop) self._robot = await coz_conn.wait_for_robot() #enable camera stream self._robot.camera.image_stream_enabled = True #event handler for new images self._robot.add_event_handler(cozmo.world.EvtNewCameraImage, self.on_new_camera_image) #set cozmos head to lowest position await self._robot.set_head_angle(cozmo.robot.MIN_HEAD_ANGLE).wait_for_completed() #set cozmos lift to highest position await self._robot.set_lift_height(1).wait_for_completed() self.cozmoDriver = CozmoDriver(self._robot) async def run(self, coz_conn): # Set up Cozmo await self.set_up_cozmo(coz_conn) self.cozmo = CozmoDriver(self._robot) self._tk_root = tkinter.Tk() # TODO: ESC to exit self._tk_label_input = tkinter.Label(self._tk_root) self._tk_label_output = tkinter.Label(self._tk_root) self._tk_label_input.pack() self._tk_label_output.pack() while True: await asyncio.sleep(0) if __name__ == '__main__': # original trained brain GRAPH_PB_PATH = '..\\Models\\1_original_trained_brain\\CozmoLearningBrain\\frozen_graph_def.pb' # brain with stop movement allowed and slower turnspeed in training #GRAPH_PB_PATH = '..\\Models\\2_modified_turnspeed_actionmask_brain\\CozmoLearningBrain\\frozen_graph_def.pb' # brain with a new actionmask trained that remembers the 3 latest actions it took #GRAPH_PB_PATH = '..\\Models\\3_modified_actionmask_memory_3_brain\\CozmoLearningBrain\\frozen_graph_def.pb' # brain with a new actionmask trained that remembers the 5 latest actions it took #GRAPH_PB_PATH = '..\\Models\\4_modified_actionmask_memory_5_brain\\CozmoLearningBrain\\frozen_graph_def.pb' with tf.compat.v1.Session() as sess: print("load model graph") with tf.io.gfile.GFile(GRAPH_PB_PATH, 'rb') as f: graph_def = tf.compat.v1.GraphDef() graph_def.ParseFromString(f.read()) sess.graph.as_default() tf.import_graph_def(graph_def, name='') graph_nodes = [n for n in graph_def.node] logits = sess.graph.get_tensor_by_name("visual_observation_0:0") CozmoController(sess, logits)