diff --git a/README.md b/README.md index eff576a..ae5e2b3 100644 --- a/README.md +++ b/README.md @@ -84,11 +84,42 @@ Then resume the steps from above. ## Usage +On Linux and macOS: + ```shell -cd src -python -m robot_interface.main +PYTHONPATH=src python -m robot_interface.main ``` +On Windows: + +```shell +$env:PYTHONPATH="src"; python -m robot_interface.main +``` + +With both, if you want to connect to the actual robot (or simulator), pass the `--qi-url` argument. + + + +## Testing + +To run the unit tests, on Linux and macOS: + +```shell +PYTHONPATH=src pytest test/ +``` + +On Windows: + +```shell +$env:PYTHONPATH="src"; pytest test/ +``` + +### Coverage + +For coverage, add `--cov=robot_interface` as an argument to `pytest`. + + + ## GitHooks To activate automatic commits/branch name checks run: diff --git a/requirements.txt b/requirements.txt index aee002a..f93c70d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,5 @@ pyzmq<16 pyaudio<=0.2.11 +pytest<5 +pytest-mock<3.0.0 +pytest-cov<3.0.0 diff --git a/src/robot_interface/endpoints/actuation_receiver.py b/src/robot_interface/endpoints/actuation_receiver.py new file mode 100644 index 0000000..7fe16b7 --- /dev/null +++ b/src/robot_interface/endpoints/actuation_receiver.py @@ -0,0 +1,49 @@ +import logging + +import zmq + +from robot_interface.endpoints.receiver_base import ReceiverBase +from robot_interface.state import state + + +class ActuationReceiver(ReceiverBase): + def __init__(self, zmq_context, port=5557): + """ + The actuation receiver endpoint, responsible for handling speech and gesture requests. + + :param zmq_context: The ZeroMQ context to use. + :type zmq_context: zmq.Context + + :param port: The port to use. + :type port: int + """ + super(ActuationReceiver, self).__init__("actuation") + self.create_socket(zmq_context, zmq.SUB, port) + self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options + self._tts_service = None + + def _handle_speech(self, message): + text = message.get("data") + if not text: + logging.warn("Received message to speak, but it lacks data.") + return + + if not isinstance(text, (str, unicode)): + logging.warn("Received message to speak but it is not a string.") + return + + logging.debug("Received message to speak: {}".format(text)) + + if not state.qi_session: return + # If state has a qi_session, we know that we can import qi + import qi # Takes a while only the first time it's imported + + if not self._tts_service: + self._tts_service = state.qi_session.service("ALTextToSpeech") + + # Returns instantly. Messages received while speaking will be queued. + qi.async(self._tts_service.say, text) + + def handle_message(self, message): + if message["endpoint"] == "actuate/speech": + self._handle_speech(message) diff --git a/src/robot_interface/endpoints/main_receiver.py b/src/robot_interface/endpoints/main_receiver.py index 4ad7846..0ce9711 100644 --- a/src/robot_interface/endpoints/main_receiver.py +++ b/src/robot_interface/endpoints/main_receiver.py @@ -1,6 +1,7 @@ import zmq from robot_interface.endpoints.receiver_base import ReceiverBase +from robot_interface.state import state class MainReceiver(ReceiverBase): @@ -15,13 +16,19 @@ class MainReceiver(ReceiverBase): :type port: int """ super(MainReceiver, self).__init__("main") - self.create_socket(zmq_context, zmq.REP, port) + self.create_socket(zmq_context, zmq.REP, port, bind=False) @staticmethod def _handle_ping(message): """A simple ping endpoint. Returns the provided data.""" return {"endpoint": "ping", "data": message.get("data")} + @staticmethod + def _handle_port_negotiation(message): + endpoints = [socket.endpoint_description() for socket in state.sockets] + + return {"endpoint": "negotiate/ports", "data": endpoints} + @staticmethod def _handle_negotiation(message): """ @@ -33,16 +40,17 @@ class MainReceiver(ReceiverBase): :return: A response dictionary with a 'ports' key containing a list of ports and their function. :rtype: dict[str, list[dict]] """ - # TODO: .../error on all endpoints? - return {"endpoint": "negotiation/error", "data": "The requested endpoint is not implemented."} + # In the future, the sender could send information like the robot's IP address, etc. + + if message["endpoint"] == "negotiate/ports": + return MainReceiver._handle_port_negotiation(message) + + return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."} def handle_message(self, message): - if "endpoint" not in message: - return {"endpoint": "error", "data": "No endpoint provided."} - if message["endpoint"] == "ping": return self._handle_ping(message) - elif message["endpoint"] == "negotiation": + elif message["endpoint"].startswith("negotiate"): return self._handle_negotiation(message) return {"endpoint": "error", "data": "The requested endpoint is not supported."} diff --git a/src/robot_interface/endpoints/receiver_base.py b/src/robot_interface/endpoints/receiver_base.py index b3183f7..498d38b 100644 --- a/src/robot_interface/endpoints/receiver_base.py +++ b/src/robot_interface/endpoints/receiver_base.py @@ -12,10 +12,10 @@ class ReceiverBase(SocketBase, object): """ Handle a message with the receiver. - :param message: The message to handle. + :param message: The message to handle, must contain properties "endpoint" and "data". :type message: dict - :return: A response message. - :rtype: dict + :return: A response message or None if this type of receiver doesn't publish. + :rtype: dict | None """ - return {"endpoint": "error", "data": "The requested receiver is not implemented."} + raise NotImplementedError() diff --git a/src/robot_interface/endpoints/socket_base.py b/src/robot_interface/endpoints/socket_base.py index f86b3ec..d08c360 100644 --- a/src/robot_interface/endpoints/socket_base.py +++ b/src/robot_interface/endpoints/socket_base.py @@ -1,5 +1,7 @@ from abc import ABCMeta +import zmq + class SocketBase(object): __metaclass__ = ABCMeta @@ -7,15 +9,17 @@ class SocketBase(object): name = None socket = None - def __init__(self, name): + def __init__(self, identifier): """ - :param name: The name of the endpoint. - :type name: str + :param identifier: The identifier of the endpoint. + :type identifier: str """ - self.name = name - self.socket = None + self.identifier = identifier + self.port = None # Set later by `create_socket` + self.socket = None # Set later by `create_socket` + self.bound = None # Set later by `create_socket` - def create_socket(self, zmq_context, socket_type, port, options=[]): + def create_socket(self, zmq_context, socket_type, port, options=[], bind=True): """ Create a ZeroMQ socket. @@ -31,16 +35,38 @@ class SocketBase(object): :param options: A list of options to be set on the socket. The list contains tuples where the first element contains the option and the second the value, for example (zmq.CONFLATE, 1). :type options: list[tuple[int, int]] + + :param bind: Whether to bind the socket or connect to it. + :type bind: bool """ + self.port = port self.socket = zmq_context.socket(socket_type) for option, arg in options: self.socket.setsockopt(option,arg) - self.socket.connect("tcp://localhost:{}".format(port)) + self.bound = bind + if bind: + self.socket.bind("tcp://*:{}".format(port)) + else: + self.socket.connect("tcp://localhost:{}".format(port)) def close(self): """Close the ZeroMQ socket.""" if not self.socket: return self.socket.close() self.socket = None + + def endpoint_description(self): + """ + Description of the endpoint. Used for negotiation. + + :return: A dictionary with the following keys: id, port, bind. See API specification at: + https://utrechtuniversity.youtrack.cloud/articles/N25B-A-14/RI-CB-Communication#negotiation + :rtype: dict + """ + return { + "id": self.identifier, + "port": self.port, + "bind": not self.bound + } diff --git a/src/robot_interface/endpoints/video_sender.py b/src/robot_interface/endpoints/video_sender.py index 793385b..c46b768 100644 --- a/src/robot_interface/endpoints/video_sender.py +++ b/src/robot_interface/endpoints/video_sender.py @@ -15,12 +15,12 @@ class VideoSender(SocketBase): def start_video_rcv(self): """ Prepares arguments for retrieving video images from Pepper and starts video loop on a separate thread. - """ - app = qi.Application() - app.start() - session = app.session + """ + if not state.qi_session: + logging.info("No Qi session available. Not starting video loop.") + return - video = session.service("ALVideoDevice") + video = state.qi_session.service("ALVideoDevice") camera_index = 0 kQVGA = 2 diff --git a/src/robot_interface/main.py b/src/robot_interface/main.py index fb48040..934dfd3 100644 --- a/src/robot_interface/main.py +++ b/src/robot_interface/main.py @@ -1,11 +1,13 @@ import logging -import time +logging.basicConfig(level=logging.DEBUG) import zmq +from robot_interface.endpoints.actuation_receiver import ActuationReceiver from robot_interface.endpoints.main_receiver import MainReceiver from robot_interface.endpoints.video_sender import VideoSender from robot_interface.state import state +from robot_interface.utils.timeblock import TimeBlock def main_loop(context): @@ -18,6 +20,8 @@ def main_loop(context): # When creating sockets, remember to add them to the `sockets` list of the state to ensure they're deinitialized main_receiver = MainReceiver(context) state.sockets.append(main_receiver) + actuation_receiver = ActuationReceiver(context) + state.sockets.append(actuation_receiver) video_sender = VideoSender(context) state.sockets.append(video_sender) @@ -25,12 +29,14 @@ def main_loop(context): video_sender.start_video_rcv() # Sockets that can run on the main thread. These sockets' endpoints should not block for long (say 50 ms at most). - receivers = [main_receiver] + receivers = [main_receiver, actuation_receiver] poller = zmq.Poller() for receiver in receivers: poller.register(receiver.socket, zmq.POLLIN) + logging.debug("Starting main loop.") + while True: if state.exit_event.is_set(): break socks = dict(poller.poll(100)) @@ -38,15 +44,20 @@ def main_loop(context): for receiver in receivers: if receiver.socket not in socks: continue - start_time = time.time() - message = receiver.socket.recv_json() - response = receiver.handle_message(message) - receiver.socket.send_json(response) + if not isinstance(message, dict) or "endpoint" not in message or "data" not in message: + logging.error("Received message of unexpected format: {}".format(message)) + continue - time_spent_ms = (time.time() - start_time) * 1000 - if time_spent_ms > 50: - logging.warn("Endpoint \"%s\" took too long (%.2f ms) on the main thread.", receiver.name, time_spent_ms) + def overtime_callback(time_ms): + logging.warn("Endpoint \"%s\" took too long (%.2f ms) on the main thread.", + message["endpoint"], time_ms) + + with TimeBlock(overtime_callback, 50): + response = receiver.handle_message(message) + + if receiver.socket.getsockopt(zmq.TYPE) == zmq.REP: + receiver.socket.send_json(response) def main(): @@ -57,7 +68,7 @@ def main(): try: main_loop(context) except KeyboardInterrupt: - print("User interrupted.") + logging.info("User interrupted.") finally: state.deinitialize() context.term() diff --git a/src/robot_interface/state.py b/src/robot_interface/state.py index d10cf77..b6f8ce1 100644 --- a/src/robot_interface/state.py +++ b/src/robot_interface/state.py @@ -2,6 +2,8 @@ import logging import signal import threading +from robot_interface.utils.qi_utils import get_qi_session + class State(object): """ @@ -15,6 +17,7 @@ class State(object): self.is_initialized = False self.exit_event = None self.sockets = [] # type: List[SocketBase] + self.qi_session = None # type: None | ssl.SSLSession def initialize(self): if self.is_initialized: @@ -28,6 +31,8 @@ class State(object): signal.signal(signal.SIGINT, handle_exit) signal.signal(signal.SIGTERM, handle_exit) + self.qi_session = get_qi_session() + self.is_initialized = True def deinitialize(self): diff --git a/src/robot_interface/utils/__init__.py b/src/robot_interface/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/robot_interface/utils/qi_utils.py b/src/robot_interface/utils/qi_utils.py new file mode 100644 index 0000000..fc7640b --- /dev/null +++ b/src/robot_interface/utils/qi_utils.py @@ -0,0 +1,25 @@ +import logging +import sys + +try: + import qi +except ImportError: + qi = None + + +def get_qi_session(): + if qi is None: + logging.info("Unable to import qi. Running in stand-alone mode.") + return None + + if "--qi-url" not in sys.argv: + logging.info("No Qi URL argument given. Running in stand-alone mode.") + return None + + try: + app = qi.Application() + app.start() + return app.session + except RuntimeError: + logging.info("Unable to connect to the robot. Running in stand-alone mode.") + return None diff --git a/src/robot_interface/utils/timeblock.py b/src/robot_interface/utils/timeblock.py new file mode 100644 index 0000000..23f1c85 --- /dev/null +++ b/src/robot_interface/utils/timeblock.py @@ -0,0 +1,31 @@ +import time + + +class TimeBlock(object): + """ + A context manager that times the execution of the block it contains. If execution exceeds the + limit, or if no limit is given, the callback will be called with the time that the block took. + """ + def __init__(self, callback, limit_ms=None): + """ + :param callback: The callback function that is called when the block of code is over, + unless the code block did not exceed the time limit. + :type callback: Callable[[float], None] + + :param limit_ms: The number of milliseconds the block of code is allowed to take. If it + exceeds this time, or if it's None, the callback function will be called with the time the + block took. + :type limit_ms: int | None + """ + self.limit_ms = float(limit_ms) if limit_ms is not None else None + self.callback = callback + self.start = None + + def __enter__(self): + self.start = time.time() + return self + + def __exit__(self, exc_type, exc_value, traceback): + elapsed = (time.time() - self.start) * 1000.0 # ms + if self.limit_ms is None or elapsed > self.limit_ms: + self.callback(elapsed) diff --git a/test/unit/__init__.py b/test/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/unit/test_actuation_receiver.py b/test/unit/test_actuation_receiver.py new file mode 100644 index 0000000..da70964 --- /dev/null +++ b/test/unit/test_actuation_receiver.py @@ -0,0 +1,74 @@ +import sys + +import mock +import pytest +import zmq + +from robot_interface.endpoints.actuation_receiver import ActuationReceiver + + +@pytest.fixture +def zmq_context(): + context = zmq.Context() + yield context + + +def test_handle_unimplemented_endpoint(zmq_context): + receiver = ActuationReceiver(zmq_context) + # Should not error + receiver.handle_message({ + "endpoint": "some_endpoint_that_definitely_does_not_exist", + "data": None, + }) + + +def test_speech_message_no_data(zmq_context, mocker): + mock_warn = mocker.patch("logging.warn") + + receiver = ActuationReceiver(zmq_context) + receiver.handle_message({"endpoint": "actuate/speech", "data": ""}) + + mock_warn.assert_called_with(mock.ANY) + + +def test_speech_message_invalid_data(zmq_context, mocker): + mock_warn = mocker.patch("logging.warn") + + receiver = ActuationReceiver(zmq_context) + receiver.handle_message({"endpoint": "actuate/speech", "data": True}) + + mock_warn.assert_called_with(mock.ANY) + + +def test_speech_no_qi(zmq_context, mocker): + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") + + mock_qi_session = mock.PropertyMock(return_value=None) + type(mock_state).qi_session = mock_qi_session + + receiver = ActuationReceiver(zmq_context) + receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) + + mock_qi_session.assert_called() + + +def test_speech(zmq_context, mocker): + mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state") + + mock_qi = mock.Mock() + sys.modules["qi"] = mock_qi + + mock_tts_service = mock.Mock() + mock_state.qi_session = mock.Mock() + mock_state.qi_session.service.return_value = mock_tts_service + + receiver = ActuationReceiver(zmq_context) + receiver._tts_service = None + receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) + + mock_state.qi_session.service.assert_called_once_with("ALTextToSpeech") + + mock_qi.async.assert_called_once() + call_args = mock_qi.async.call_args[0] + assert call_args[0] == mock_tts_service.say + assert call_args[1] == "Some message to speak." diff --git a/test/unit/test_main_receiver.py b/test/unit/test_main_receiver.py new file mode 100644 index 0000000..4ded502 --- /dev/null +++ b/test/unit/test_main_receiver.py @@ -0,0 +1,79 @@ +import mock +import pytest +import zmq + +from robot_interface.endpoints.main_receiver import MainReceiver + + +@pytest.fixture +def zmq_context(): + context = zmq.Context() + yield context + + +def test_handle_ping(zmq_context): + receiver = MainReceiver(zmq_context) + response = receiver.handle_message({"endpoint": "ping", "data": "pong"}) + + assert "endpoint" in response + assert response["endpoint"] == "ping" + assert "data" in response + assert response["data"] == "pong" + + +def test_handle_ping_none(zmq_context): + receiver = MainReceiver(zmq_context) + response = receiver.handle_message({"endpoint": "ping", "data": None}) + + assert "endpoint" in response + assert response["endpoint"] == "ping" + assert "data" in response + assert response["data"] == None + + +@mock.patch("robot_interface.endpoints.main_receiver.state") +def test_handle_negotiate_ports(mock_state, zmq_context): + receiver = MainReceiver(zmq_context) + mock_state.sockets = [receiver] + + response = receiver.handle_message({"endpoint": "negotiate/ports", "data": None}) + + assert "endpoint" in response + assert response["endpoint"] == "negotiate/ports" + assert "data" in response + assert isinstance(response["data"], list) + for port in response["data"]: + assert "id" in port + assert isinstance(port["id"], str) + assert "port" in port + assert isinstance(port["port"], int) + assert "bind" in port + assert isinstance(port["bind"], bool) + + assert any(port["id"] == "main" for port in response["data"]) + + +def test_handle_unimplemented_endpoint(zmq_context): + receiver = MainReceiver(zmq_context) + response = receiver.handle_message({ + "endpoint": "some_endpoint_that_definitely_does_not_exist", + "data": None, + }) + + assert "endpoint" in response + assert response["endpoint"] == "error" + assert "data" in response + assert isinstance(response["data"], str) + + +def test_handle_unimplemented_negotiation_endpoint(zmq_context): + receiver = MainReceiver(zmq_context) + response = receiver.handle_message({ + "endpoint": "negotiate/but_some_subpath_that_definitely_does_not_exist", + "data": None, + }) + + assert "endpoint" in response + assert response["endpoint"] == "negotiate/error" + assert "data" in response + assert isinstance(response["data"], str) diff --git a/test/unit/test_time_block.py b/test/unit/test_time_block.py new file mode 100644 index 0000000..eabc91b --- /dev/null +++ b/test/unit/test_time_block.py @@ -0,0 +1,37 @@ +import time + +import mock + +from robot_interface.utils.timeblock import TimeBlock + + +class AnyFloat(object): + def __eq__(self, other): + return isinstance(other, float) + + +def test_no_limit(): + callback = mock.Mock() + + with TimeBlock(callback): + pass + + callback.assert_called_once_with(AnyFloat()) + + +def test_exceed_limit(): + callback = mock.Mock() + + with TimeBlock(callback, 0): + time.sleep(0.001) + + callback.assert_called_once_with(AnyFloat()) + + +def test_within_limit(): + callback = mock.Mock() + + with TimeBlock(callback, 5): + pass + + callback.assert_not_called()