Files
pepperplus-ri/src/robot_interface/endpoints/socket_base.py
Twirre Meulenbelt 3a259c1170 feat: add environment variables and docs
ref: N25B-352
2025-12-10 13:28:13 +01:00

85 lines
2.5 KiB
Python

from abc import ABCMeta
import zmq
from robot_interface.core.config import settings
class SocketBase(object):
"""
Base class for endpoints associated with a ZeroMQ socket.
:ivar identifier: The identifier of the endpoint.
:vartype identifier: str
:ivar port: The port used by the socket, set by `create_socket`.
:vartype port: int | None
:ivar socket: The ZeroMQ socket object, set by `create_socket`.
:vartype socket: zmq.Socket | None
:ivar bound: Whether the socket is bound or connected, set by `create_socket`.
:vartype bound: bool | None
"""
__metaclass__ = ABCMeta
name = None
socket = None
def __init__(self, identifier):
self.identifier = identifier
self.port = None # Set later by `create_socket`
self.socket = None # Set later by `create_socket`
self.bound = None # Set later by `create_socket`
def create_socket(self, zmq_context, socket_type, port, options=[], bind=True):
"""
Create a ZeroMQ socket.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param socket_type: The type of socket to create. Use zmq constants, e.g. zmq.SUB or zmq.REP.
:type socket_type: int
:param port: The port to use.
:type port: int
:param options: A list of tuples where the first element contains the option and the second the value.
:type options: list[tuple[int, int]]
:param bind: Whether to bind the socket or connect to it.
:type bind: bool
"""
self.port = port
self.socket = zmq_context.socket(socket_type)
for option, arg in options:
self.socket.setsockopt(option,arg)
self.bound = bind
if bind:
self.socket.bind("tcp://*:{}".format(port))
else:
self.socket.connect("tcp://{}:{}".format(settings.agent_settings.control_backend_host, port))
def close(self):
"""Close the ZeroMQ socket."""
if not self.socket: return
self.socket.close()
self.socket = None
def endpoint_description(self):
"""
Description of the endpoint. Used for negotiation.
:return: A dictionary with the following keys: id, port, bind. See API specification at:
https://utrechtuniversity.youtrack.cloud/articles/N25B-A-14/RI-CB-Communication#negotiation
:rtype: dict
"""
return {
"id": self.identifier,
"port": self.port,
"bind": not self.bound
}