diff --git a/src/robot_interface/endpoints/__init__.py b/src/robot_interface/endpoints/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/robot_interface/endpoints/endpoint_base.py b/src/robot_interface/endpoints/endpoint_base.py new file mode 100644 index 0000000..54ebf1c --- /dev/null +++ b/src/robot_interface/endpoints/endpoint_base.py @@ -0,0 +1,38 @@ +from abc import ABCMeta + + +class EndpointBase(object): + __metaclass__ = ABCMeta + + name = None + socket = None + + def __init__(self, name): + """ + :param name: The name of the endpoint. + :type name: str + """ + self.name = name + self.socket = None + + def create_socket(self, zmq_context, socket_type, port): + """ + Create a ZeroMQ socket. + + :param zmq_context: The ZeroMQ context to use. + :type zmq_context: zmq.Context + + :param socket_type: The type of socket to create. Use zmq constants, e.g. zmq.SUB or zmq.REP. + :type socket_type: int + + :param port: The port to use. + :type port: int + """ + self.socket = zmq_context.socket(socket_type) + self.socket.connect("tcp://localhost:{}".format(port)) + + def close(self): + """Close the ZeroMQ socket.""" + if not self.socket: return + self.socket.close() + self.socket = None diff --git a/src/robot_interface/endpoints/main_receiver.py b/src/robot_interface/endpoints/main_receiver.py new file mode 100644 index 0000000..4ad7846 --- /dev/null +++ b/src/robot_interface/endpoints/main_receiver.py @@ -0,0 +1,48 @@ +import zmq + +from robot_interface.endpoints.receiver_base import ReceiverBase + + +class MainReceiver(ReceiverBase): + def __init__(self, zmq_context, port=5555): + """ + The main receiver endpoint, responsible for handling ping and negotiation requests. + + :param zmq_context: The ZeroMQ context to use. + :type zmq_context: zmq.Context + + :param port: The port to use. + :type port: int + """ + super(MainReceiver, self).__init__("main") + self.create_socket(zmq_context, zmq.REP, port) + + @staticmethod + def _handle_ping(message): + """A simple ping endpoint. Returns the provided data.""" + return {"endpoint": "ping", "data": message.get("data")} + + @staticmethod + def _handle_negotiation(message): + """ + Handle a negotiation request. Will respond with ports that can be used to connect to the robot. + + :param message: The negotiation request message. + :type message: dict + + :return: A response dictionary with a 'ports' key containing a list of ports and their function. + :rtype: dict[str, list[dict]] + """ + # TODO: .../error on all endpoints? + return {"endpoint": "negotiation/error", "data": "The requested endpoint is not implemented."} + + def handle_message(self, message): + if "endpoint" not in message: + return {"endpoint": "error", "data": "No endpoint provided."} + + if message["endpoint"] == "ping": + return self._handle_ping(message) + elif message["endpoint"] == "negotiation": + return self._handle_negotiation(message) + + return {"endpoint": "error", "data": "The requested endpoint is not supported."} diff --git a/src/robot_interface/endpoints/receiver_base.py b/src/robot_interface/endpoints/receiver_base.py new file mode 100644 index 0000000..35f933e --- /dev/null +++ b/src/robot_interface/endpoints/receiver_base.py @@ -0,0 +1,21 @@ +from abc import ABCMeta, abstractmethod + +from robot_interface.endpoints.endpoint_base import EndpointBase + + +class ReceiverBase(EndpointBase, object): + """Associated with a ZeroMQ socket.""" + __metaclass__ = ABCMeta + + @abstractmethod + def handle_message(self, message): + """ + Handle a message with the receiver. + + :param message: The message to handle. + :type message: dict + + :return: A response message. + :rtype: dict + """ + return {"endpoint": "error", "data": "The requested receiver is not implemented."} diff --git a/src/robot_interface/main.py b/src/robot_interface/main.py index 9ea734f..11cb761 100644 --- a/src/robot_interface/main.py +++ b/src/robot_interface/main.py @@ -1,65 +1,59 @@ +import logging +import time + import zmq - -def handle_ping(message): - """A simple ping endpoint. Returns the provided data.""" - return {"endpoint": "ping", "data": message.get("data")} +from robot_interface.endpoints.main_receiver import MainReceiver +from robot_interface.state import state -def handle_negotiation(message): +def main_loop(context): """ - Handle a negotiation request. Will respond with ports that can be used to connect to the robot. + Run the main loop, handling all incoming requests like pings, negotiation, actuation, etc. - :param message: The negotiation request message. - :type message: dict - - :return: A response dictionary with a 'ports' key containing a list of ports and their function. - :rtype: dict[str, list[dict]] + :param context: The ZeroMQ context to use. + :type context: zmq.Context """ - # TODO: .../error on all endpoints? - return {"endpoint": "negotiation/error", "data": "The requested endpoint is not implemented."} + # When creating endpoints, remember to add them to the endpoint list of the state to ensure they're deinitialized + main_receiver = MainReceiver(context) + state.endpoints.append(main_receiver) + # Define endpoints that can run on the main thread. These endpoints should not block for long (say 50 ms at most). + receivers = [main_receiver] -def route_request(message): - """ - Handle a request message. + poller = zmq.Poller() + for receiver in receivers: + poller.register(receiver.socket, zmq.POLLIN) - :param message: The request message. - :type message: dict - - :return: A response message. - :rtype: dict - """ - print("Received request: {}".format(message)) - - if "endpoint" not in message: - return {"endpoint": "error", "data": "No endpoint provided."} - if message["endpoint"] == "ping": - return handle_ping(message) - elif message["endpoint"] == "negotiation": - return handle_negotiation(message) - - return {"endpoint": "error", "data": "The requested endpoint is not implemented."} - - -def main_loop(socket): while True: - request = socket.recv_json() - response = route_request(request) - socket.send_json(response) + if state.exit_event.is_set(): break + socks = dict(poller.poll(100)) + + for receiver in receivers: + if receiver.socket not in socks: continue + + start_time = time.time() + + message = receiver.socket.recv_json() + response = receiver.handle_message(message) + receiver.socket.send_json(response) + + time_spent_ms = (time.time() - start_time) * 1000 + if time_spent_ms > 50: + logging.warn("Endpoint \"%s\" took too long (%.2f ms) on the main thread.", receiver.name, time_spent_ms) def main(): context = zmq.Context() - socket = context.socket(zmq.REP) - socket.connect("tcp://localhost:5555") + + state.initialize() try: - main_loop(socket) + main_loop(context) except KeyboardInterrupt: print("User interrupted.") finally: - socket.close() + state.deinitialize() context.term() diff --git a/src/robot_interface/state.py b/src/robot_interface/state.py new file mode 100644 index 0000000..30d6a0f --- /dev/null +++ b/src/robot_interface/state.py @@ -0,0 +1,58 @@ +import logging +import signal +import threading + + +class State(object): + """ + Do not create an instance of this class directly: use the instance `state` below. This state must be initiated once, + probably when your program starts. + + This class is used to share state between threads. For example, when the program is quit, that all threads can + detect this via the `exit_event` property being set. + """ + def __init__(self): + self.is_initialized = False + self.exit_event = None + self.endpoints = [] # type: List[EndpointBase] + + def initialize(self): + if self.is_initialized: + logging.warn("Already initialized") + return + + self.exit_event = threading.Event() + def handle_exit(_, __): + logging.info("Exiting.") + self.exit_event.set() + signal.signal(signal.SIGINT, handle_exit) + signal.signal(signal.SIGTERM, handle_exit) + + self.is_initialized = True + + def deinitialize(self): + if not self.is_initialized: return + + for endpoint in self.endpoints: + endpoint.close() + + self.is_initialized = False + + def __getattribute__(self, name): + # Enforce that the state is initialized before accessing any property (aside from the basic ones) + if name in ("initialize", "deinitialize", "is_initialized", "__dict__", "__class__"): + return object.__getattribute__(self, name) + + if not object.__getattribute__(self, "is_initialized"): + # Special case for the exit_event: if the event is set, return it without an error + if name == "exit_event": + exit_event = object.__getattribute__(self, "exit_event") + if exit_event and exit_event.is_set(): return exit_event + + raise RuntimeError("State must be initialized before accessing '%s'" % name) + + return object.__getattribute__(self, name) + + +# Must call `.initialize` before use +state = State()