From 56c804b7eb909ce1df36768e91f7b8c5e8339b68 Mon Sep 17 00:00:00 2001 From: Twirre Meulenbelt <43213592+TwirreM@users.noreply.github.com> Date: Thu, 16 Oct 2025 21:43:24 +0200 Subject: [PATCH] test: add unit tests for main and actuation receivers Exhaustive test cases for both classes, with 100% coverage. Adds `mock` dependency. Tests for actuation receiver do not yet pass. ref: N25B-168 --- README.md | 29 ++++++- requirements.txt | 1 + .../endpoints/receiver_base.py | 2 +- src/robot_interface/endpoints/video_sender.py | 2 +- test/unit/__init__.py | 0 test/unit/test_actuation_receiver.py | 69 ++++++++++++++++ test/unit/test_main_receiver.py | 79 +++++++++++++++++++ 7 files changed, 178 insertions(+), 4 deletions(-) create mode 100644 test/unit/__init__.py create mode 100644 test/unit/test_actuation_receiver.py create mode 100644 test/unit/test_main_receiver.py diff --git a/README.md b/README.md index e7cfd21..6fffab0 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,32 @@ Then resume the steps from above. ## Usage +On Linux and macOS: + ```shell -cd src -python -m robot_interface.main +PYTHONPATH=src python -m robot_interface.main +``` + +On Windows: + +```shell +$env:PYTHONPATH="src"; python -m robot_interface.main +``` + +With both, if you want to connect to the actual robot (or simulator), pass the `--qi-url` argument. + + + +## Testing + +To run the unit tests, on Linux and macOS: + +```shell +PYTHONPATH=src python -m unittest discover -s test -p "test_*.py" -v +``` + +On Windows: + +```shell +$env:PYTHONPATH="src"; python -m unittest discover -s test -p "test_*.py" -v ``` diff --git a/requirements.txt b/requirements.txt index aee002a..84b4d20 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ pyzmq<16 pyaudio<=0.2.11 +mock~=3.0.5 diff --git a/src/robot_interface/endpoints/receiver_base.py b/src/robot_interface/endpoints/receiver_base.py index 3d42ec8..498d38b 100644 --- a/src/robot_interface/endpoints/receiver_base.py +++ b/src/robot_interface/endpoints/receiver_base.py @@ -12,7 +12,7 @@ class ReceiverBase(SocketBase, object): """ Handle a message with the receiver. - :param message: The message to handle. + :param message: The message to handle, must contain properties "endpoint" and "data". :type message: dict :return: A response message or None if this type of receiver doesn't publish. diff --git a/src/robot_interface/endpoints/video_sender.py b/src/robot_interface/endpoints/video_sender.py index 41a9e6e..19d58a1 100644 --- a/src/robot_interface/endpoints/video_sender.py +++ b/src/robot_interface/endpoints/video_sender.py @@ -20,7 +20,7 @@ class VideoSender(SocketBase): logging.info("No QI session available, not starting video loop") return - video = state.session.service("ALVideoDevice") + video = state.qi_session.service("ALVideoDevice") camera_index = 0 kQVGA = 2 diff --git a/test/unit/__init__.py b/test/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/unit/test_actuation_receiver.py b/test/unit/test_actuation_receiver.py new file mode 100644 index 0000000..11039e6 --- /dev/null +++ b/test/unit/test_actuation_receiver.py @@ -0,0 +1,69 @@ +import sys +import unittest + +import mock +import zmq + +from robot_interface.endpoints.actuation_receiver import ActuationReceiver + + +class TestActuationReceiver(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.context = zmq.Context() + + def test_handle_unimplemented_endpoint(self): + receiver = ActuationReceiver(self.context) + # Should not error + receiver.handle_message({ + "endpoint": "some_endpoint_that_definitely_does_not_exist", + "data": None, + }) + + @mock.patch("logging.warn") + def test_speech_message_no_data(self, mock_warn): + receiver = ActuationReceiver(self.context) + receiver.handle_message({"endpoint": "actuate/speech", "data": ""}) + + mock_warn.assert_called_with(mock.ANY) + + @mock.patch("logging.warn") + def test_speech_message_invalid_data(self, mock_warn): + receiver = ActuationReceiver(self.context) + receiver.handle_message({"endpoint": "actuate/speech", "data": True}) + + mock_warn.assert_called_with(mock.ANY) + + @mock.patch("robot_interface.endpoints.actuation_receiver.state") + def test_speech_no_qi(self, mock_state): + mock_qi_session = mock.PropertyMock(return_value=None) + type(mock_state).qi_session = mock_qi_session + + receiver = ActuationReceiver(self.context) + receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) + + mock_qi_session.assert_called() + + @mock.patch("robot_interface.endpoints.actuation_receiver.state") + def test_speech(self, mock_state): + mock_qi = mock.Mock() + sys.modules["qi"] = mock_qi + + mock_tts_service = mock.Mock() + mock_state.qi_session = mock.Mock() + mock_state.qi_session.service.return_value = mock_tts_service + + receiver = ActuationReceiver(self.context) + receiver._tts_service = None + receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) + + mock_state.qi_session.service.assert_called_once_with("ALTextToSpeech") + + mock_qi.async.assert_called_once() + call_args = mock_qi.async.call_args[0] + self.assertEqual(call_args[0], mock_tts_service.say) + self.assertEqual(call_args[1], "Some message to speak.") + + +if __name__ == "__main__": + unittest.main() diff --git a/test/unit/test_main_receiver.py b/test/unit/test_main_receiver.py new file mode 100644 index 0000000..31c34cc --- /dev/null +++ b/test/unit/test_main_receiver.py @@ -0,0 +1,79 @@ +import unittest + +import mock +import zmq + +from robot_interface.endpoints.main_receiver import MainReceiver + + +class TestMainReceiver(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.context = zmq.Context() + + def test_handle_ping(self): + receiver = MainReceiver(self.context) + response = receiver.handle_message({"endpoint": "ping", "data": "pong"}) + + self.assertIn("endpoint", response) + self.assertEqual(response["endpoint"], "ping") + self.assertIn("data", response) + self.assertEqual(response["data"], "pong") + + def test_handle_ping_none(self): + receiver = MainReceiver(self.context) + response = receiver.handle_message({"endpoint": "ping", "data": None}) + + self.assertIn("endpoint", response) + self.assertEqual(response["endpoint"], "ping") + self.assertIn("data", response) + self.assertEqual(response["data"], None) + + @mock.patch("robot_interface.endpoints.main_receiver.state") + def test_handle_negotiate_ports(self, mock_state): + receiver = MainReceiver(self.context) + mock_state.sockets = [receiver] + + response = receiver.handle_message({"endpoint": "negotiate/ports", "data": None}) + + self.assertIn("endpoint", response) + self.assertEqual(response["endpoint"], "negotiate/ports") + self.assertIn("data", response) + self.assertIsInstance(response["data"], list) + for port in response["data"]: + self.assertIn("id", port) + self.assertIsInstance(port["id"], str) + self.assertIn("port", port) + self.assertIsInstance(port["port"], int) + self.assertIn("bind", port) + self.assertIsInstance(port["bind"], bool) + + self.assertTrue(any(port["id"] == "main" for port in response["data"])) + + def test_handle_unimplemented_endpoint(self): + receiver = MainReceiver(self.context) + response = receiver.handle_message({ + "endpoint": "some_endpoint_that_definitely_does_not_exist", + "data": None, + }) + + self.assertIn("endpoint", response) + self.assertEqual(response["endpoint"], "error") + self.assertIn("data", response) + self.assertIsInstance(response["data"], str) + + def test_handle_unimplemented_negotiation_endpoint(self): + receiver = MainReceiver(self.context) + response = receiver.handle_message({ + "endpoint": "negotiate/but_some_subpath_that_definitely_does_not_exist", + "data": None, + }) + + self.assertIn("endpoint", response) + self.assertEqual(response["endpoint"], "negotiate/error") + self.assertIn("data", response) + self.assertIsInstance(response["data"], str) + + +if __name__ == "__main__": + unittest.main()