From 4c3aa3a91102de83a866eb35a407a8dad64ee272 Mon Sep 17 00:00:00 2001 From: Twirre Meulenbelt <43213592+TwirreM@users.noreply.github.com> Date: Thu, 16 Oct 2025 21:46:46 +0200 Subject: [PATCH] feat: adapt actuation receiver to state's qi_session Makes actuation tests pass. In main, the timing of the socket no longer contains the time to receive and send data, but only the processing time of the message handler. ref: N25B-168 --- .../endpoints/actuation_receiver.py | 14 ++++--- .../endpoints/main_receiver.py | 3 -- src/robot_interface/main.py | 20 ++++++---- src/robot_interface/utils/timeblock.py | 31 ++++++++++++++ test/unit/test_time_block.py | 40 +++++++++++++++++++ 5 files changed, 91 insertions(+), 17 deletions(-) create mode 100644 src/robot_interface/utils/timeblock.py create mode 100644 test/unit/test_time_block.py diff --git a/src/robot_interface/endpoints/actuation_receiver.py b/src/robot_interface/endpoints/actuation_receiver.py index c7bfe91..0eb9077 100644 --- a/src/robot_interface/endpoints/actuation_receiver.py +++ b/src/robot_interface/endpoints/actuation_receiver.py @@ -28,20 +28,22 @@ class ActuationReceiver(ReceiverBase): logging.warn("Received message to speak, but it lacks data.") return + if not isinstance(text, (str, unicode)): + logging.warn("Received message to speak but it is not a string.") + return + logging.debug("Received message to speak: {}".format(text)) if not state.qi_session: return + # If state has a qi_session, we know that we can import qi + import qi # Takes a while only the first time it's imported if not self._tts_service: self._tts_service = state.qi_session.service("ALTextToSpeech") - self._tts_service.say(text) + # Returns instantly. Messages received while speaking will be queued. + qi.async(self._tts_service.say, text) def handle_message(self, message): - if "endpoint" not in message: - return {"endpoint": "error", "data": "No endpoint provided."} - if message["endpoint"] == "actuate/speech": self._handle_speech(message) - - return {"endpoint": "error", "data": "The requested endpoint is not supported."} diff --git a/src/robot_interface/endpoints/main_receiver.py b/src/robot_interface/endpoints/main_receiver.py index 4589aed..0ce9711 100644 --- a/src/robot_interface/endpoints/main_receiver.py +++ b/src/robot_interface/endpoints/main_receiver.py @@ -48,9 +48,6 @@ class MainReceiver(ReceiverBase): return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."} def handle_message(self, message): - if "endpoint" not in message: - return {"endpoint": "error", "data": "No endpoint provided."} - if message["endpoint"] == "ping": return self._handle_ping(message) elif message["endpoint"].startswith("negotiate"): diff --git a/src/robot_interface/main.py b/src/robot_interface/main.py index 8176740..934dfd3 100644 --- a/src/robot_interface/main.py +++ b/src/robot_interface/main.py @@ -1,6 +1,5 @@ import logging logging.basicConfig(level=logging.DEBUG) -import time import zmq @@ -8,6 +7,7 @@ from robot_interface.endpoints.actuation_receiver import ActuationReceiver from robot_interface.endpoints.main_receiver import MainReceiver from robot_interface.endpoints.video_sender import VideoSender from robot_interface.state import state +from robot_interface.utils.timeblock import TimeBlock def main_loop(context): @@ -44,17 +44,21 @@ def main_loop(context): for receiver in receivers: if receiver.socket not in socks: continue - start_time = time.time() - message = receiver.socket.recv_json() - response = receiver.handle_message(message) + if not isinstance(message, dict) or "endpoint" not in message or "data" not in message: + logging.error("Received message of unexpected format: {}".format(message)) + continue + + def overtime_callback(time_ms): + logging.warn("Endpoint \"%s\" took too long (%.2f ms) on the main thread.", + message["endpoint"], time_ms) + + with TimeBlock(overtime_callback, 50): + response = receiver.handle_message(message) + if receiver.socket.getsockopt(zmq.TYPE) == zmq.REP: receiver.socket.send_json(response) - time_spent_ms = (time.time() - start_time) * 1000 - if time_spent_ms > 50: - logging.warn("Endpoint \"%s\" took too long (%.2f ms) on the main thread.", receiver.name, time_spent_ms) - def main(): context = zmq.Context() diff --git a/src/robot_interface/utils/timeblock.py b/src/robot_interface/utils/timeblock.py new file mode 100644 index 0000000..23f1c85 --- /dev/null +++ b/src/robot_interface/utils/timeblock.py @@ -0,0 +1,31 @@ +import time + + +class TimeBlock(object): + """ + A context manager that times the execution of the block it contains. If execution exceeds the + limit, or if no limit is given, the callback will be called with the time that the block took. + """ + def __init__(self, callback, limit_ms=None): + """ + :param callback: The callback function that is called when the block of code is over, + unless the code block did not exceed the time limit. + :type callback: Callable[[float], None] + + :param limit_ms: The number of milliseconds the block of code is allowed to take. If it + exceeds this time, or if it's None, the callback function will be called with the time the + block took. + :type limit_ms: int | None + """ + self.limit_ms = float(limit_ms) if limit_ms is not None else None + self.callback = callback + self.start = None + + def __enter__(self): + self.start = time.time() + return self + + def __exit__(self, exc_type, exc_value, traceback): + elapsed = (time.time() - self.start) * 1000.0 # ms + if self.limit_ms is None or elapsed > self.limit_ms: + self.callback(elapsed) diff --git a/test/unit/test_time_block.py b/test/unit/test_time_block.py new file mode 100644 index 0000000..b6cabbc --- /dev/null +++ b/test/unit/test_time_block.py @@ -0,0 +1,40 @@ +import unittest + +import mock + +from robot_interface.utils.timeblock import TimeBlock + + +class AnyFloat(object): + def __eq__(self, other): + return isinstance(other, float) + + +class TestTimeBlock(unittest.TestCase): + def test_no_limit(self): + callback = mock.Mock() + + with TimeBlock(callback): + pass + + callback.assert_called_once_with(AnyFloat()) + + def test_exceed_limit(self): + callback = mock.Mock() + + with TimeBlock(callback, 0): + pass + + callback.assert_called_once_with(AnyFloat()) + + def test_within_limit(self): + callback = mock.Mock() + + with TimeBlock(callback, 5): + pass + + callback.assert_not_called() + + +if __name__ == '__main__': + unittest.main()