import zmq from robot_interface.endpoints.receiver_base import ReceiverBase from robot_interface.state import state from robot_interface.core.config import settings from robot_interface.endpoints.face_detector import FaceDetectionSender class MainReceiver(ReceiverBase): """ The main receiver endpoint, responsible for handling ping and negotiation requests. :param zmq_context: The ZeroMQ context to use. :type zmq_context: zmq.Context :param port: The port to use, defaults to value in `settings.agent_settings.main_receiver_port`. :type port: int """ def __init__(self, zmq_context, port=None): if port is None: port = settings.agent_settings.main_receiver_port super(MainReceiver, self).__init__("main") self.create_socket(zmq_context, zmq.REP, port, bind=False) @staticmethod def _handle_ping(message): """ Handle a ping request. Returns the provided data in a standardized response dictionary. :param message: The ping request message. :type message: dict :return: A response dictionary containing the original data. :rtype: dict[str, str | list[dict]] """ return {"endpoint": "ping", "data": message.get("data")} @staticmethod def _handle_port_negotiation(message): """ Handle a port negotiation request. Returns a list of all known endpoints and their descriptions. :param message: The negotiation request message. :type message: dict :return: A response dictionary with endpoint descriptions as data. :rtype: dict[str, list[dict]] """ endpoints = [socket.endpoint_description() for socket in state.sockets] return {"endpoint": "negotiate/ports", "data": endpoints} @staticmethod def _handle_negotiation(message): """ Handle a negotiation request. Responds with ports that can be used to connect to the robot. :param message: The negotiation request message. :type message: dict :return: A response dictionary with the negotiation result. :rtype: dict[str, str | list[dict]] """ # In the future, the sender could send information like the robot's IP address, etc. if message["endpoint"] == "negotiate/ports": return MainReceiver._handle_port_negotiation(message) return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."} def handle_message(self, message): """ Main entry point for handling incoming messages. Dispatches messages to the appropriate handler based on the endpoint. :param message: The received message. :type message: dict :return: A response dictionary based on the requested endpoint. :rtype: dict[str, str | list[dict]] """ if message["endpoint"] == "ping": return self._handle_ping(message) elif message["endpoint"].startswith("negotiate"): return self._handle_negotiation(message) return {"endpoint": "error", "data": "The requested endpoint is not supported."}