diff --git a/src/robot_interface/endpoints/actuation_receiver.py b/src/robot_interface/endpoints/actuation_receiver.py index c7bfe91..0eb9077 100644 --- a/src/robot_interface/endpoints/actuation_receiver.py +++ b/src/robot_interface/endpoints/actuation_receiver.py @@ -28,20 +28,22 @@ class ActuationReceiver(ReceiverBase): logging.warn("Received message to speak, but it lacks data.") return + if not isinstance(text, (str, unicode)): + logging.warn("Received message to speak but it is not a string.") + return + logging.debug("Received message to speak: {}".format(text)) if not state.qi_session: return + # If state has a qi_session, we know that we can import qi + import qi # Takes a while only the first time it's imported if not self._tts_service: self._tts_service = state.qi_session.service("ALTextToSpeech") - self._tts_service.say(text) + # Returns instantly. Messages received while speaking will be queued. + qi.async(self._tts_service.say, text) def handle_message(self, message): - if "endpoint" not in message: - return {"endpoint": "error", "data": "No endpoint provided."} - if message["endpoint"] == "actuate/speech": self._handle_speech(message) - - return {"endpoint": "error", "data": "The requested endpoint is not supported."} diff --git a/src/robot_interface/endpoints/main_receiver.py b/src/robot_interface/endpoints/main_receiver.py index 4589aed..0ce9711 100644 --- a/src/robot_interface/endpoints/main_receiver.py +++ b/src/robot_interface/endpoints/main_receiver.py @@ -48,9 +48,6 @@ class MainReceiver(ReceiverBase): return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."} def handle_message(self, message): - if "endpoint" not in message: - return {"endpoint": "error", "data": "No endpoint provided."} - if message["endpoint"] == "ping": return self._handle_ping(message) elif message["endpoint"].startswith("negotiate"): diff --git a/src/robot_interface/main.py b/src/robot_interface/main.py index 8176740..934dfd3 100644 --- a/src/robot_interface/main.py +++ b/src/robot_interface/main.py @@ -1,6 +1,5 @@ import logging logging.basicConfig(level=logging.DEBUG) -import time import zmq @@ -8,6 +7,7 @@ from robot_interface.endpoints.actuation_receiver import ActuationReceiver from robot_interface.endpoints.main_receiver import MainReceiver from robot_interface.endpoints.video_sender import VideoSender from robot_interface.state import state +from robot_interface.utils.timeblock import TimeBlock def main_loop(context): @@ -44,17 +44,21 @@ def main_loop(context): for receiver in receivers: if receiver.socket not in socks: continue - start_time = time.time() - message = receiver.socket.recv_json() - response = receiver.handle_message(message) + if not isinstance(message, dict) or "endpoint" not in message or "data" not in message: + logging.error("Received message of unexpected format: {}".format(message)) + continue + + def overtime_callback(time_ms): + logging.warn("Endpoint \"%s\" took too long (%.2f ms) on the main thread.", + message["endpoint"], time_ms) + + with TimeBlock(overtime_callback, 50): + response = receiver.handle_message(message) + if receiver.socket.getsockopt(zmq.TYPE) == zmq.REP: receiver.socket.send_json(response) - time_spent_ms = (time.time() - start_time) * 1000 - if time_spent_ms > 50: - logging.warn("Endpoint \"%s\" took too long (%.2f ms) on the main thread.", receiver.name, time_spent_ms) - def main(): context = zmq.Context() diff --git a/src/robot_interface/utils/timeblock.py b/src/robot_interface/utils/timeblock.py new file mode 100644 index 0000000..23f1c85 --- /dev/null +++ b/src/robot_interface/utils/timeblock.py @@ -0,0 +1,31 @@ +import time + + +class TimeBlock(object): + """ + A context manager that times the execution of the block it contains. If execution exceeds the + limit, or if no limit is given, the callback will be called with the time that the block took. + """ + def __init__(self, callback, limit_ms=None): + """ + :param callback: The callback function that is called when the block of code is over, + unless the code block did not exceed the time limit. + :type callback: Callable[[float], None] + + :param limit_ms: The number of milliseconds the block of code is allowed to take. If it + exceeds this time, or if it's None, the callback function will be called with the time the + block took. + :type limit_ms: int | None + """ + self.limit_ms = float(limit_ms) if limit_ms is not None else None + self.callback = callback + self.start = None + + def __enter__(self): + self.start = time.time() + return self + + def __exit__(self, exc_type, exc_value, traceback): + elapsed = (time.time() - self.start) * 1000.0 # ms + if self.limit_ms is None or elapsed > self.limit_ms: + self.callback(elapsed) diff --git a/test/unit/test_time_block.py b/test/unit/test_time_block.py new file mode 100644 index 0000000..b6cabbc --- /dev/null +++ b/test/unit/test_time_block.py @@ -0,0 +1,40 @@ +import unittest + +import mock + +from robot_interface.utils.timeblock import TimeBlock + + +class AnyFloat(object): + def __eq__(self, other): + return isinstance(other, float) + + +class TestTimeBlock(unittest.TestCase): + def test_no_limit(self): + callback = mock.Mock() + + with TimeBlock(callback): + pass + + callback.assert_called_once_with(AnyFloat()) + + def test_exceed_limit(self): + callback = mock.Mock() + + with TimeBlock(callback, 0): + pass + + callback.assert_called_once_with(AnyFloat()) + + def test_within_limit(self): + callback = mock.Mock() + + with TimeBlock(callback, 5): + pass + + callback.assert_not_called() + + +if __name__ == '__main__': + unittest.main()