test: add unit tests for main and actuation receivers

Exhaustive test cases for both classes, with 100% coverage. Adds `mock` dependency. Tests for actuation receiver do not yet pass.

ref: N25B-168
This commit is contained in:
Twirre Meulenbelt
2025-10-16 21:43:24 +02:00
parent 55483808ff
commit 56c804b7eb
7 changed files with 178 additions and 4 deletions

View File

@@ -84,7 +84,32 @@ Then resume the steps from above.
## Usage ## Usage
On Linux and macOS:
```shell ```shell
cd src PYTHONPATH=src python -m robot_interface.main
python -m robot_interface.main ```
On Windows:
```shell
$env:PYTHONPATH="src"; python -m robot_interface.main
```
With both, if you want to connect to the actual robot (or simulator), pass the `--qi-url` argument.
## Testing
To run the unit tests, on Linux and macOS:
```shell
PYTHONPATH=src python -m unittest discover -s test -p "test_*.py" -v
```
On Windows:
```shell
$env:PYTHONPATH="src"; python -m unittest discover -s test -p "test_*.py" -v
``` ```

View File

@@ -1,2 +1,3 @@
pyzmq<16 pyzmq<16
pyaudio<=0.2.11 pyaudio<=0.2.11
mock~=3.0.5

View File

@@ -12,7 +12,7 @@ class ReceiverBase(SocketBase, object):
""" """
Handle a message with the receiver. Handle a message with the receiver.
:param message: The message to handle. :param message: The message to handle, must contain properties "endpoint" and "data".
:type message: dict :type message: dict
:return: A response message or None if this type of receiver doesn't publish. :return: A response message or None if this type of receiver doesn't publish.

View File

@@ -20,7 +20,7 @@ class VideoSender(SocketBase):
logging.info("No QI session available, not starting video loop") logging.info("No QI session available, not starting video loop")
return return
video = state.session.service("ALVideoDevice") video = state.qi_session.service("ALVideoDevice")
camera_index = 0 camera_index = 0
kQVGA = 2 kQVGA = 2

0
test/unit/__init__.py Normal file
View File

View File

@@ -0,0 +1,69 @@
import sys
import unittest
import mock
import zmq
from robot_interface.endpoints.actuation_receiver import ActuationReceiver
class TestActuationReceiver(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.context = zmq.Context()
def test_handle_unimplemented_endpoint(self):
receiver = ActuationReceiver(self.context)
# Should not error
receiver.handle_message({
"endpoint": "some_endpoint_that_definitely_does_not_exist",
"data": None,
})
@mock.patch("logging.warn")
def test_speech_message_no_data(self, mock_warn):
receiver = ActuationReceiver(self.context)
receiver.handle_message({"endpoint": "actuate/speech", "data": ""})
mock_warn.assert_called_with(mock.ANY)
@mock.patch("logging.warn")
def test_speech_message_invalid_data(self, mock_warn):
receiver = ActuationReceiver(self.context)
receiver.handle_message({"endpoint": "actuate/speech", "data": True})
mock_warn.assert_called_with(mock.ANY)
@mock.patch("robot_interface.endpoints.actuation_receiver.state")
def test_speech_no_qi(self, mock_state):
mock_qi_session = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_qi_session
receiver = ActuationReceiver(self.context)
receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."})
mock_qi_session.assert_called()
@mock.patch("robot_interface.endpoints.actuation_receiver.state")
def test_speech(self, mock_state):
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tts_service = mock.Mock()
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
receiver = ActuationReceiver(self.context)
receiver._tts_service = None
receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."})
mock_state.qi_session.service.assert_called_once_with("ALTextToSpeech")
mock_qi.async.assert_called_once()
call_args = mock_qi.async.call_args[0]
self.assertEqual(call_args[0], mock_tts_service.say)
self.assertEqual(call_args[1], "Some message to speak.")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,79 @@
import unittest
import mock
import zmq
from robot_interface.endpoints.main_receiver import MainReceiver
class TestMainReceiver(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.context = zmq.Context()
def test_handle_ping(self):
receiver = MainReceiver(self.context)
response = receiver.handle_message({"endpoint": "ping", "data": "pong"})
self.assertIn("endpoint", response)
self.assertEqual(response["endpoint"], "ping")
self.assertIn("data", response)
self.assertEqual(response["data"], "pong")
def test_handle_ping_none(self):
receiver = MainReceiver(self.context)
response = receiver.handle_message({"endpoint": "ping", "data": None})
self.assertIn("endpoint", response)
self.assertEqual(response["endpoint"], "ping")
self.assertIn("data", response)
self.assertEqual(response["data"], None)
@mock.patch("robot_interface.endpoints.main_receiver.state")
def test_handle_negotiate_ports(self, mock_state):
receiver = MainReceiver(self.context)
mock_state.sockets = [receiver]
response = receiver.handle_message({"endpoint": "negotiate/ports", "data": None})
self.assertIn("endpoint", response)
self.assertEqual(response["endpoint"], "negotiate/ports")
self.assertIn("data", response)
self.assertIsInstance(response["data"], list)
for port in response["data"]:
self.assertIn("id", port)
self.assertIsInstance(port["id"], str)
self.assertIn("port", port)
self.assertIsInstance(port["port"], int)
self.assertIn("bind", port)
self.assertIsInstance(port["bind"], bool)
self.assertTrue(any(port["id"] == "main" for port in response["data"]))
def test_handle_unimplemented_endpoint(self):
receiver = MainReceiver(self.context)
response = receiver.handle_message({
"endpoint": "some_endpoint_that_definitely_does_not_exist",
"data": None,
})
self.assertIn("endpoint", response)
self.assertEqual(response["endpoint"], "error")
self.assertIn("data", response)
self.assertIsInstance(response["data"], str)
def test_handle_unimplemented_negotiation_endpoint(self):
receiver = MainReceiver(self.context)
response = receiver.handle_message({
"endpoint": "negotiate/but_some_subpath_that_definitely_does_not_exist",
"data": None,
})
self.assertIn("endpoint", response)
self.assertEqual(response["endpoint"], "negotiate/error")
self.assertIn("data", response)
self.assertIsInstance(response["data"], str)
if __name__ == "__main__":
unittest.main()