import zmq import threading import logging from robot_interface.endpoints.socket_base import SocketBase from robot_interface.state import state from robot_interface.core.config import settings class VideoSender(SocketBase): """ Video sender endpoint, responsible for sending video frames. :param zmq_context: The ZeroMQ context to use. :type zmq_context: zmq.Context :param port: The port to use for sending video frames. :type port: int """ def __init__(self, zmq_context, port=settings.agent_settings.video_sender_port): super(VideoSender, self).__init__("video") self.create_socket(zmq_context, zmq.PUB, port, [(zmq.CONFLATE,1)]) def start_video_rcv(self): """ Prepares arguments for retrieving video images from Pepper and starts video loop on a separate thread. Will not start if no qi session is available. """ if not state.qi_session: logging.info("No Qi session available. Not starting video loop.") return video = state.qi_session.service("ALVideoDevice") video_settings = settings.video_config camera_index = video_settings.camera_index kQVGA = video_settings.resolution kRGB = video_settings.color_space FPS = video_settings.fps video_name = video_settings.stream_name vid_stream_name = video.subscribeCamera(video_name, camera_index, kQVGA, kRGB, FPS) thread = threading.Thread(target=self.video_rcv_loop, args=(video, vid_stream_name)) thread.start() def video_rcv_loop(self, vid_service, vid_stream_name): """ The main loop of retrieving video images from the robot. :param vid_service: The video service object that the active Qi session is connected to. :type vid_service: Object (Qi service object) :param vid_stream_name: The name of a camera subscription on the video service object vid_service :type vid_stream_name: str """ while not state.exit_event.is_set(): try: img = vid_service.getImageRemote(vid_stream_name) #Possibly limit images sent if queuing issues arise self.socket.send(img[settings.video_config.image_buffer]) except: logging.warn("Failed to retrieve video image from robot.")