diff --git a/camera/video_stream/imagezmq.py b/camera/video_stream/imagezmq.py new file mode 100644 index 0000000..f05e6db --- /dev/null +++ b/camera/video_stream/imagezmq.py @@ -0,0 +1,232 @@ +""" imagezmq: Transport OpenCV images via ZMQ. + +Classes that transport OpenCV images from one computer to another. For example, +OpenCV images gathered by a Raspberry Pi camera could be sent to another +computer for displaying the images using cv2.imshow() or for further image +processing. See API and Usage Examples for details. + +Copyright (c) 2017 by Jeff Bass. +License: MIT, see LICENSE for more details. +""" + +import zmq +import numpy as np +import cv2 + + +class ImageSender(): + """Opens zmq REQ socket and sends images. + + Opens a zmq REQ socket on the image sending computer, often a + Raspberry Pi, that will be sending OpenCV images and + related text messages to the hub computer. Provides methods to + send images or send jpg compressed images. + + Arguments: + connect_to: the tcp address:port of the hub computer. + """ + + def __init__(self, connect_to='tcp://127.0.0.1:5555'): + """Initializes zmq socket for sending images to the hub. + + Expects an open socket at the connect_to tcp address; it will + connect to that remote socket after setting up the REQ + socket on this computer. + """ + + self.zmq_context = SerializingContext() + self.zmq_socket = self.zmq_context.socket(zmq.REQ) + self.zmq_socket.connect(connect_to) + + def send_image(self, msg, image): + """Sends OpenCV image and msg to hub computer. + + Arguments: + msg: text message or image name. + image: OpenCV image to send to hub. + + Returns: + A text reply from hub. + """ + + if image.flags['C_CONTIGUOUS']: + # if image is already contiguous in memory just send it + self.zmq_socket.send_array(image, msg, copy=False) + else: + # else make it contiguous before sending + image = np.ascontiguousarray(image) + self.zmq_socket.send_array(image, msg, copy=False) + hub_reply = self.zmq_socket.recv() # receive the reply message + return hub_reply + + def send_jpg(self, msg, jpg_buffer): + """Sends msg text and jpg buffer to hub computer. + + Arguments: + msg: image name or message text. + jpg_buffer: bytestring containing the jpg image to send to hub. + Returns: + A text reply from hub. + """ + + self.zmq_socket.send_jpg(msg, jpg_buffer, copy=False) + hub_reply = self.zmq_socket.recv() # receive the reply message + return hub_reply + + +class ImageHub(): + """Opens zmq REP socket and receives images. + + Opens a zmq REP socket on the hub compuer, for example, + a Mac, that will be receiving and displaying or processing OpenCV images + and related text messages. Provides methods to receive images or receive + jpg compressed images. + + Arguments: + open_port: (optional) the socket to open for receiving REQ requests. + """ + + def __init__(self, open_port='tcp://*:5555'): + """Initializes zmq REP socket to receive images and text. + """ + + self.zmq_context = SerializingContext() + self.zmq_socket = self.zmq_context.socket(zmq.REP) + self.zmq_socket.bind(open_port) + + def recv_image(self, copy=False): + """Receives OpenCV image and text msg. + + Arguments: + copy: (optional) zmq copy flag. + + Returns: + msg: text msg, often the image name. + image: OpenCV image. + """ + + msg, image = self.zmq_socket.recv_array(copy=False) + return msg, image + + def recv_jpg(self, copy=False): + """Receives text msg, jpg buffer. + + Arguments: + copy: (optional) zmq copy flag + Returns: + msg: text message, often image name + jpg_buffer: bytestring jpg compressed image + """ + + msg, jpg_buffer = self.zmq_socket.recv_jpg(copy=False) + return msg, jpg_buffer + + def send_reply(self, reply_message=b'OK'): + """Sends the zmq REP reply message. + + Arguments: + reply_message: reply message text, often just string 'OK' + """ + self.zmq_socket.send(reply_message) + + +class SerializingSocket(zmq.Socket): + """Numpy array serialization methods. + + Modelled on PyZMQ serialization examples. + + Used for sending / receiving OpenCV images, which are Numpy arrays. + Also used for sending / receiving jpg compressed OpenCV images. + """ + + def send_array(self, A, msg='NoName', flags=0, copy=True, track=False): + """Sends a numpy array with metadata and text message. + + Sends a numpy array with the metadata necessary for reconstructing + the array (dtype,shape). Also sends a text msg, often the array or + image name. + + Arguments: + A: numpy array or OpenCV image. + msg: (optional) array name, image name or text message. + flags: (optional) zmq flags. + copy: (optional) zmq copy flag. + track: (optional) zmq track flag. + """ + + md = dict( + msg=msg, + dtype=str(A.dtype), + shape=A.shape, + ) + self.send_json(md, flags | zmq.SNDMORE) + return self.send(A, flags, copy=copy, track=track) + + def send_jpg(self, + msg='NoName', + jpg_buffer=b'00', + flags=0, + copy=True, + track=False): + """Send a jpg buffer with a text message. + + Sends a jpg bytestring of an OpenCV image. + Also sends text msg, often the image name. + + Arguments: + msg: image name or text message. + jpg_buffer: jpg buffer of compressed image to be sent. + flags: (optional) zmq flags. + copy: (optional) zmq copy flag. + track: (optional) zmq track flag. + """ + + md = dict(msg=msg, ) + self.send_json(md, flags | zmq.SNDMORE) + return self.send(jpg_buffer, flags, copy=copy, track=track) + + def recv_array(self, flags=0, copy=True, track=False): + """Receives a numpy array with metadata and text message. + + Receives a numpy array with the metadata necessary + for reconstructing the array (dtype,shape). + Returns the array and a text msg, often the array or image name. + + Arguments: + flags: (optional) zmq flags. + copy: (optional) zmq copy flag. + track: (optional) zmq track flag. + + Returns: + msg: image name or text message. + A: numpy array or OpenCV image reconstructed with dtype and shape. + """ + + md = self.recv_json(flags=flags) + msg = self.recv(flags=flags, copy=copy, track=track) + A = np.frombuffer(msg, dtype=md['dtype']) + return (md['msg'], A.reshape(md['shape'])) + + def recv_jpg(self, flags=0, copy=True, track=False): + """Receives a jpg buffer and a text msg. + + Receives a jpg bytestring of an OpenCV image. + Also receives a text msg, often the image name. + + Arguments: + flags: (optional) zmq flags. + copy: (optional) zmq copy flag. + track: (optional) zmq track flag. + + Returns: + msg: image name or text message. + jpg_buffer: bytestring, containing jpg image. + """ + + md = self.recv_json(flags=flags) # metadata text + jpg_buffer = self.recv(flags=flags, copy=copy, track=track) + return (md['msg'], jpg_buffer) + + +class SerializingContext(zmq.Context): + _socket_class = SerializingSocket