Files
pepperplus-ri/src/robot_interface/endpoints/main_receiver.py
Twirre Meulenbelt 3a259c1170 feat: add environment variables and docs
ref: N25B-352
2025-12-10 13:28:13 +01:00

93 lines
3.1 KiB
Python

import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
from robot_interface.state import state
from robot_interface.core.config import settings
class MainReceiver(ReceiverBase):
"""
The main receiver endpoint, responsible for handling ping and negotiation requests.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use, defaults to value in `settings.agent_settings.main_receiver_port`.
:type port: int
"""
def __init__(self, zmq_context, port=None):
if port is None:
port = settings.agent_settings.main_receiver_port
super(MainReceiver, self).__init__("main")
self.create_socket(zmq_context, zmq.REP, port, bind=False)
@staticmethod
def _handle_ping(message):
"""
Handle a ping request.
Returns the provided data in a standardized response dictionary.
:param message: The ping request message.
:type message: dict
:return: A response dictionary containing the original data.
:rtype: dict[str, str | list[dict]]
"""
return {"endpoint": "ping", "data": message.get("data")}
@staticmethod
def _handle_port_negotiation(message):
"""
Handle a port negotiation request.
Returns a list of all known endpoints and their descriptions.
:param message: The negotiation request message.
:type message: dict
:return: A response dictionary with endpoint descriptions as data.
:rtype: dict[str, list[dict]]
"""
endpoints = [socket.endpoint_description() for socket in state.sockets]
return {"endpoint": "negotiate/ports", "data": endpoints}
@staticmethod
def _handle_negotiation(message):
"""
Handle a negotiation request. Responds with ports that can be used to connect to the robot.
:param message: The negotiation request message.
:type message: dict
:return: A response dictionary with the negotiation result.
:rtype: dict[str, str | list[dict]]
"""
# In the future, the sender could send information like the robot's IP address, etc.
if message["endpoint"] == "negotiate/ports":
return MainReceiver._handle_port_negotiation(message)
return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."}
def handle_message(self, message):
"""
Main entry point for handling incoming messages.
Dispatches messages to the appropriate handler based on the endpoint.
:param message: The received message.
:type message: dict
:return: A response dictionary based on the requested endpoint.
:rtype: dict[str, str | list[dict]]
"""
if message["endpoint"] == "ping":
return self._handle_ping(message)
elif message["endpoint"].startswith("negotiate"):
return self._handle_negotiation(message)
return {"endpoint": "error", "data": "The requested endpoint is not supported."}