import logging from robot_interface.endpoints.audio_sender import AudioSender logging.basicConfig(level=logging.DEBUG) import zmq from robot_interface.endpoints.actuation_receiver import ActuationReceiver from robot_interface.endpoints.main_receiver import MainReceiver from robot_interface.endpoints.video_sender import VideoSender from robot_interface.state import state from robot_interface.core.config import settings from robot_interface.utils.timeblock import TimeBlock def main_loop(context): """ Run the main loop, handling all incoming requests like pings, negotiation, actuation, etc. :param context: The ZeroMQ context to use. :type context: zmq.Context """ # When creating sockets, remember to add them to the `sockets` list of the state to ensure they're deinitialized main_receiver = MainReceiver(context) state.sockets.append(main_receiver) actuation_receiver = ActuationReceiver(context) state.sockets.append(actuation_receiver) video_sender = VideoSender(context) state.sockets.append(video_sender) audio_sender = AudioSender(context) state.sockets.append(audio_sender) video_sender.start_video_rcv() audio_sender.start() # Sockets that can run on the main thread. These sockets' endpoints should not block for long (say 50 ms at most). receivers = [main_receiver, actuation_receiver] poller = zmq.Poller() for receiver in receivers: poller.register(receiver.socket, zmq.POLLIN) logging.debug("Starting main loop.") while True: if state.exit_event.is_set(): break socks = dict(poller.poll(settings.main_config.poll_timeout_ms)) for receiver in receivers: if receiver.socket not in socks: continue message = receiver.socket.recv_json() if not isinstance(message, dict) or "endpoint" not in message or "data" not in message: logging.error("Received message of unexpected format: {}".format(message)) continue def overtime_callback(time_ms): """ A callback function executed by TimeBlock if the message handling exceeds the allowed time limit. :param time_ms: The elapsed time, in milliseconds, that the block took. :type time_ms: float """ logging.warn("Endpoint \"%s\" took too long (%.2f ms) on the main thread.", message["endpoint"], time_ms) with TimeBlock(overtime_callback, settings.main_config.max_handler_time_ms): response = receiver.handle_message(message) if receiver.socket.getsockopt(zmq.TYPE) == zmq.REP: receiver.socket.send_json(response) def main(): """ Initializes the ZeroMQ context and the application state. It executes the main event loop (`main_loop`) and ensures that both the application state and the ZeroMQ context are properly cleaned up (deinitialized/terminated) upon exit, including handling a KeyboardInterrupt. """ context = zmq.Context() state.initialize() try: main_loop(context) except KeyboardInterrupt: logging.info("User interrupted.") finally: state.deinitialize() context.term() if __name__ == "__main__": main()