123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- import os
- import asyncio
- import cozmo
- import PIL.Image
- import PIL.ImageFont
- import PIL.ImageTk
- import cv2
- import numpy
- import tkinter
- import tensorflow as tf
- import time
- from CozmoDriver import CozmoDriver
- from tensorflow.python.platform import gfile
-
- USE_LOGGING = False
-
- class CozmoController:
-
- #Only for testing
- def printTensors(self, pb_file):
-
- # read pb into graph_def
- with tf.io.gfile.GFile(pb_file, "rb") as f:
- graph_def = tf.compat.v1.GraphDef()
- graph_def.ParseFromString(f.read())
-
- # import graph_def
- with tf.Graph().as_default() as graph:
- tf.import_graph_def(graph_def)
-
- # print operations
- for op in graph.get_operations():
- print(op.name)
-
-
- def __init__(self, sess, logits):
- self._robot = None
- self._tk_root = 0
- self._tk_label_input = 0
- self._tk_label_output = 0
- self.session = sess
- self.logits = logits
- self.cozmoDriver = 0
- self._counter = 0
- self._frameDecisonModifier = 1
- self._startTime = time.time()
- # Testing
- #self.printTensors(GRAPH_PB_PATH)
-
- if USE_LOGGING:
- cozmo.setup_basic_logging()
- cozmo.connect(self.run)
-
-
-
-
- def on_new_camera_image(self, event, *, image:cozmo.world.CameraImage, **kw):
- '''
- Gets called on every new image that arrives from cozmos camera
- :param event:
- :param image:
- :param kw:
- :return:
- '''
- elapsedTime = time.time() - self._startTime
- self._startTime = time.time()
- #print("Elapsed Time: {}".format(elapsedTime))
- self._counter = self._counter + 1
- raw_image = image.raw_image
-
- # Convert PIL Image to OpenCV Image
- cv2_image = cv2.cvtColor(numpy.array(raw_image), cv2.COLOR_RGB2BGR)
-
- # Apply edge filter
- cv2_edges = self.auto_canny(cv2_image)
- #print("Shape of the image {}".format(cv2_edges.shape))
- # Crop image
- cv2_edges = cv2_edges[60:180, :]
- # resize image
- cv2_edges = cv2.resize(cv2_edges, (80, 30))
-
- # Convert output image back to PIL image
- pil_edges = PIL.Image.fromarray(cv2.cvtColor(cv2_edges, cv2.COLOR_GRAY2RGB))
-
- # Display input and output feed
- display_image_input = PIL.ImageTk.PhotoImage(image=image.annotate_image())
- display_image_output = PIL.ImageTk.PhotoImage(image=pil_edges)
- self._tk_label_input.imgtk = display_image_input
- self._tk_label_input.configure(image=display_image_input)
- self._tk_label_output.imgtk = display_image_output
- self._tk_label_output.configure(image=display_image_output)
- self._tk_root.update()
-
- cv2_edges = numpy.array(cv2_edges).reshape(-1, 30, 80, 1)
- #get prediction from model
- prediciton = self.session.run("action:0", {logits: cv2_edges, "action_masks:0": [[1,1,1,1]]},)
-
- #Control on which frame a decision should produce a new action
- if(self._counter % self._frameDecisonModifier == 0):
- if(numpy.argmax(prediciton) == 0):
- #behaves to strict no movement anymores
- #self.cozmoDriver.stop()
- pass
- elif(numpy.argmax(prediciton) == 1):
- self.cozmoDriver.drive()
- elif(numpy.argmax(prediciton) == 2):
- self.cozmoDriver.driveRight()
- elif(numpy.argmax(prediciton) == 3):
- self.cozmoDriver.driveLeft()
-
-
-
-
- # Auto-paramter Canny edge detection adapted from:
- # http://www.pyimagesearch.com/2015/04/06/zero-parameter-automatic-canny-edge-detection-with-python-and-opencv/
- def auto_canny(self, img, sigma=0.33):
- '''
- Canny algorithem with automatic parameters in opencv
- :param img:
- :param sigma:
- :return:
- '''
- blurred = cv2.GaussianBlur(img, (3, 3), 0)
- v = numpy.median(blurred)
- lower = int(max(0, (1.0 - sigma) * v))
- upper = int(min(255, (1.0 + sigma) * v))
- edged = cv2.Canny(blurred, lower-33, upper)
- return edged
-
- async def set_up_cozmo(self, coz_conn):
- '''
- initialize cozmo to get images from camera and let him drive
- :param coz_conn:
- :return:
- '''
- asyncio.set_event_loop(coz_conn._loop)
- self._robot = await coz_conn.wait_for_robot()
- #enable camera stream
- self._robot.camera.image_stream_enabled = True
- #event handler for new images
- self._robot.add_event_handler(cozmo.world.EvtNewCameraImage, self.on_new_camera_image)
- #set cozmos head to lowest position
- await self._robot.set_head_angle(cozmo.robot.MIN_HEAD_ANGLE).wait_for_completed()
- #set cozmos lift to highest position
- await self._robot.set_lift_height(1).wait_for_completed()
- self.cozmoDriver = CozmoDriver(self._robot)
-
- async def run(self, coz_conn):
- # Set up Cozmo
- await self.set_up_cozmo(coz_conn)
- self.cozmo = CozmoDriver(self._robot)
- self._tk_root = tkinter.Tk()
- # TODO: ESC to exit
- self._tk_label_input = tkinter.Label(self._tk_root)
- self._tk_label_output = tkinter.Label(self._tk_root)
- self._tk_label_input.pack()
- self._tk_label_output.pack()
-
- while True:
- await asyncio.sleep(0)
-
-
- if __name__ == '__main__':
-
- # original trained brain
- GRAPH_PB_PATH = '..\\Models\\1_original_trained_brain\\CozmoLearningBrain\\frozen_graph_def.pb'
-
- # brain with stop movement allowed and slower turnspeed in training
- #GRAPH_PB_PATH = '..\\Models\\2_modified_turnspeed_actionmask_brain\\CozmoLearningBrain\\frozen_graph_def.pb'
-
- # brain with a new actionmask trained that remembers the 3 latest actions it took
- #GRAPH_PB_PATH = '..\\Models\\3_modified_actionmask_memory_3_brain\\CozmoLearningBrain\\frozen_graph_def.pb'
-
- # brain with a new actionmask trained that remembers the 5 latest actions it took
- #GRAPH_PB_PATH = '..\\Models\\4_modified_actionmask_memory_5_brain\\CozmoLearningBrain\\frozen_graph_def.pb'
-
- with tf.compat.v1.Session() as sess:
- print("load model graph")
- with tf.io.gfile.GFile(GRAPH_PB_PATH, 'rb') as f:
- graph_def = tf.compat.v1.GraphDef()
- graph_def.ParseFromString(f.read())
- sess.graph.as_default()
- tf.import_graph_def(graph_def, name='')
- graph_nodes = [n for n in graph_def.node]
- logits = sess.graph.get_tensor_by_name("visual_observation_0:0")
- CozmoController(sess, logits)
|