feat: abstract base classes for endpoints

Introduces EndpointBase and ReceiverBase abstract base classes. Implements a ReceiverBase with the MainReceiver.

ref: N25B-168
This commit is contained in:
Twirre Meulenbelt
2025-10-09 16:04:18 +02:00
parent c4530f0c3a
commit 23805812d5
6 changed files with 201 additions and 42 deletions

View File

@@ -0,0 +1,38 @@
from abc import ABCMeta
class EndpointBase(object):
__metaclass__ = ABCMeta
name = None
socket = None
def __init__(self, name):
"""
:param name: The name of the endpoint.
:type name: str
"""
self.name = name
self.socket = None
def create_socket(self, zmq_context, socket_type, port):
"""
Create a ZeroMQ socket.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param socket_type: The type of socket to create. Use zmq constants, e.g. zmq.SUB or zmq.REP.
:type socket_type: int
:param port: The port to use.
:type port: int
"""
self.socket = zmq_context.socket(socket_type)
self.socket.connect("tcp://localhost:{}".format(port))
def close(self):
"""Close the ZeroMQ socket."""
if not self.socket: return
self.socket.close()
self.socket = None

View File

@@ -0,0 +1,48 @@
import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
class MainReceiver(ReceiverBase):
def __init__(self, zmq_context, port=5555):
"""
The main receiver endpoint, responsible for handling ping and negotiation requests.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use.
:type port: int
"""
super(MainReceiver, self).__init__("main")
self.create_socket(zmq_context, zmq.REP, port)
@staticmethod
def _handle_ping(message):
"""A simple ping endpoint. Returns the provided data."""
return {"endpoint": "ping", "data": message.get("data")}
@staticmethod
def _handle_negotiation(message):
"""
Handle a negotiation request. Will respond with ports that can be used to connect to the robot.
:param message: The negotiation request message.
:type message: dict
:return: A response dictionary with a 'ports' key containing a list of ports and their function.
:rtype: dict[str, list[dict]]
"""
# TODO: .../error on all endpoints?
return {"endpoint": "negotiation/error", "data": "The requested endpoint is not implemented."}
def handle_message(self, message):
if "endpoint" not in message:
return {"endpoint": "error", "data": "No endpoint provided."}
if message["endpoint"] == "ping":
return self._handle_ping(message)
elif message["endpoint"] == "negotiation":
return self._handle_negotiation(message)
return {"endpoint": "error", "data": "The requested endpoint is not supported."}

View File

@@ -0,0 +1,21 @@
from abc import ABCMeta, abstractmethod
from robot_interface.endpoints.endpoint_base import EndpointBase
class ReceiverBase(EndpointBase, object):
"""Associated with a ZeroMQ socket."""
__metaclass__ = ABCMeta
@abstractmethod
def handle_message(self, message):
"""
Handle a message with the receiver.
:param message: The message to handle.
:type message: dict
:return: A response message.
:rtype: dict
"""
return {"endpoint": "error", "data": "The requested receiver is not implemented."}