feat: adapt actuation receiver to state's qi_session

Makes actuation tests pass. In main, the timing of the socket no longer contains the time to receive and send data, but only the processing time of the message handler.

ref: N25B-168
This commit is contained in:
Twirre Meulenbelt
2025-10-16 21:46:46 +02:00
parent 56c804b7eb
commit 4c3aa3a911
5 changed files with 91 additions and 17 deletions

View File

@@ -28,20 +28,22 @@ class ActuationReceiver(ReceiverBase):
logging.warn("Received message to speak, but it lacks data.") logging.warn("Received message to speak, but it lacks data.")
return return
if not isinstance(text, (str, unicode)):
logging.warn("Received message to speak but it is not a string.")
return
logging.debug("Received message to speak: {}".format(text)) logging.debug("Received message to speak: {}".format(text))
if not state.qi_session: return if not state.qi_session: return
# If state has a qi_session, we know that we can import qi
import qi # Takes a while only the first time it's imported
if not self._tts_service: if not self._tts_service:
self._tts_service = state.qi_session.service("ALTextToSpeech") self._tts_service = state.qi_session.service("ALTextToSpeech")
self._tts_service.say(text) # Returns instantly. Messages received while speaking will be queued.
qi.async(self._tts_service.say, text)
def handle_message(self, message): def handle_message(self, message):
if "endpoint" not in message:
return {"endpoint": "error", "data": "No endpoint provided."}
if message["endpoint"] == "actuate/speech": if message["endpoint"] == "actuate/speech":
self._handle_speech(message) self._handle_speech(message)
return {"endpoint": "error", "data": "The requested endpoint is not supported."}

View File

@@ -48,9 +48,6 @@ class MainReceiver(ReceiverBase):
return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."} return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."}
def handle_message(self, message): def handle_message(self, message):
if "endpoint" not in message:
return {"endpoint": "error", "data": "No endpoint provided."}
if message["endpoint"] == "ping": if message["endpoint"] == "ping":
return self._handle_ping(message) return self._handle_ping(message)
elif message["endpoint"].startswith("negotiate"): elif message["endpoint"].startswith("negotiate"):

View File

@@ -1,6 +1,5 @@
import logging import logging
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
import time
import zmq import zmq
@@ -8,6 +7,7 @@ from robot_interface.endpoints.actuation_receiver import ActuationReceiver
from robot_interface.endpoints.main_receiver import MainReceiver from robot_interface.endpoints.main_receiver import MainReceiver
from robot_interface.endpoints.video_sender import VideoSender from robot_interface.endpoints.video_sender import VideoSender
from robot_interface.state import state from robot_interface.state import state
from robot_interface.utils.timeblock import TimeBlock
def main_loop(context): def main_loop(context):
@@ -44,17 +44,21 @@ def main_loop(context):
for receiver in receivers: for receiver in receivers:
if receiver.socket not in socks: continue if receiver.socket not in socks: continue
start_time = time.time()
message = receiver.socket.recv_json() message = receiver.socket.recv_json()
response = receiver.handle_message(message) if not isinstance(message, dict) or "endpoint" not in message or "data" not in message:
logging.error("Received message of unexpected format: {}".format(message))
continue
def overtime_callback(time_ms):
logging.warn("Endpoint \"%s\" took too long (%.2f ms) on the main thread.",
message["endpoint"], time_ms)
with TimeBlock(overtime_callback, 50):
response = receiver.handle_message(message)
if receiver.socket.getsockopt(zmq.TYPE) == zmq.REP: if receiver.socket.getsockopt(zmq.TYPE) == zmq.REP:
receiver.socket.send_json(response) receiver.socket.send_json(response)
time_spent_ms = (time.time() - start_time) * 1000
if time_spent_ms > 50:
logging.warn("Endpoint \"%s\" took too long (%.2f ms) on the main thread.", receiver.name, time_spent_ms)
def main(): def main():
context = zmq.Context() context = zmq.Context()

View File

@@ -0,0 +1,31 @@
import time
class TimeBlock(object):
"""
A context manager that times the execution of the block it contains. If execution exceeds the
limit, or if no limit is given, the callback will be called with the time that the block took.
"""
def __init__(self, callback, limit_ms=None):
"""
:param callback: The callback function that is called when the block of code is over,
unless the code block did not exceed the time limit.
:type callback: Callable[[float], None]
:param limit_ms: The number of milliseconds the block of code is allowed to take. If it
exceeds this time, or if it's None, the callback function will be called with the time the
block took.
:type limit_ms: int | None
"""
self.limit_ms = float(limit_ms) if limit_ms is not None else None
self.callback = callback
self.start = None
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, exc_type, exc_value, traceback):
elapsed = (time.time() - self.start) * 1000.0 # ms
if self.limit_ms is None or elapsed > self.limit_ms:
self.callback(elapsed)

View File

@@ -0,0 +1,40 @@
import unittest
import mock
from robot_interface.utils.timeblock import TimeBlock
class AnyFloat(object):
def __eq__(self, other):
return isinstance(other, float)
class TestTimeBlock(unittest.TestCase):
def test_no_limit(self):
callback = mock.Mock()
with TimeBlock(callback):
pass
callback.assert_called_once_with(AnyFloat())
def test_exceed_limit(self):
callback = mock.Mock()
with TimeBlock(callback, 0):
pass
callback.assert_called_once_with(AnyFloat())
def test_within_limit(self):
callback = mock.Mock()
with TimeBlock(callback, 5):
pass
callback.assert_not_called()
if __name__ == '__main__':
unittest.main()