test: convert to pytest

Instead of built-in `unittest`, now use `pytest`. Find versions that work, convert tests.

ref: N25B-168
This commit is contained in:
Twirre Meulenbelt
2025-10-21 13:55:06 +02:00
parent 45be0366ba
commit 5631a55697
5 changed files with 166 additions and 158 deletions

View File

@@ -105,11 +105,15 @@ With both, if you want to connect to the actual robot (or simulator), pass the `
To run the unit tests, on Linux and macOS: To run the unit tests, on Linux and macOS:
```shell ```shell
PYTHONPATH=src python -m unittest discover -s test -p "test_*.py" -v PYTHONPATH=src pytest test/
``` ```
On Windows: On Windows:
```shell ```shell
$env:PYTHONPATH="src"; python -m unittest discover -s test -p "test_*.py" -v $env:PYTHONPATH="src"; pytest test/
``` ```
### Coverage
For coverage, add `--cov=robot_interface` as an argument to `pytest`.

View File

@@ -1,3 +1,5 @@
pyzmq<16 pyzmq<16
pyaudio<=0.2.11 pyaudio<=0.2.11
mock~=3.0.5 pytest<5
pytest-mock<3.0.0
pytest-cov<3.0.0

View File

@@ -1,51 +1,60 @@
import sys import sys
import unittest
import mock import mock
import pytest
import zmq import zmq
from robot_interface.endpoints.actuation_receiver import ActuationReceiver from robot_interface.endpoints.actuation_receiver import ActuationReceiver
class TestActuationReceiver(unittest.TestCase): @pytest.fixture
@classmethod def zmq_context():
def setUpClass(cls): context = zmq.Context()
cls.context = zmq.Context() yield context
def test_handle_unimplemented_endpoint(self):
receiver = ActuationReceiver(self.context) def test_handle_unimplemented_endpoint(zmq_context):
receiver = ActuationReceiver(zmq_context)
# Should not error # Should not error
receiver.handle_message({ receiver.handle_message({
"endpoint": "some_endpoint_that_definitely_does_not_exist", "endpoint": "some_endpoint_that_definitely_does_not_exist",
"data": None, "data": None,
}) })
@mock.patch("logging.warn")
def test_speech_message_no_data(self, mock_warn): def test_speech_message_no_data(zmq_context, mocker):
receiver = ActuationReceiver(self.context) mock_warn = mocker.patch("logging.warn")
receiver = ActuationReceiver(zmq_context)
receiver.handle_message({"endpoint": "actuate/speech", "data": ""}) receiver.handle_message({"endpoint": "actuate/speech", "data": ""})
mock_warn.assert_called_with(mock.ANY) mock_warn.assert_called_with(mock.ANY)
@mock.patch("logging.warn")
def test_speech_message_invalid_data(self, mock_warn): def test_speech_message_invalid_data(zmq_context, mocker):
receiver = ActuationReceiver(self.context) mock_warn = mocker.patch("logging.warn")
receiver = ActuationReceiver(zmq_context)
receiver.handle_message({"endpoint": "actuate/speech", "data": True}) receiver.handle_message({"endpoint": "actuate/speech", "data": True})
mock_warn.assert_called_with(mock.ANY) mock_warn.assert_called_with(mock.ANY)
@mock.patch("robot_interface.endpoints.actuation_receiver.state")
def test_speech_no_qi(self, mock_state): def test_speech_no_qi(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi_session = mock.PropertyMock(return_value=None) mock_qi_session = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_qi_session type(mock_state).qi_session = mock_qi_session
receiver = ActuationReceiver(self.context) receiver = ActuationReceiver(zmq_context)
receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."})
mock_qi_session.assert_called() mock_qi_session.assert_called()
@mock.patch("robot_interface.endpoints.actuation_receiver.state")
def test_speech(self, mock_state): def test_speech(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock() mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi sys.modules["qi"] = mock_qi
@@ -53,7 +62,7 @@ class TestActuationReceiver(unittest.TestCase):
mock_state.qi_session = mock.Mock() mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service mock_state.qi_session.service.return_value = mock_tts_service
receiver = ActuationReceiver(self.context) receiver = ActuationReceiver(zmq_context)
receiver._tts_service = None receiver._tts_service = None
receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."}) receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."})
@@ -61,9 +70,5 @@ class TestActuationReceiver(unittest.TestCase):
mock_qi.async.assert_called_once() mock_qi.async.assert_called_once()
call_args = mock_qi.async.call_args[0] call_args = mock_qi.async.call_args[0]
self.assertEqual(call_args[0], mock_tts_service.say) assert call_args[0] == mock_tts_service.say
self.assertEqual(call_args[1], "Some message to speak.") assert call_args[1] == "Some message to speak."
if __name__ == "__main__":
unittest.main()

View File

@@ -1,79 +1,79 @@
import unittest
import mock import mock
import pytest
import zmq import zmq
from robot_interface.endpoints.main_receiver import MainReceiver from robot_interface.endpoints.main_receiver import MainReceiver
class TestMainReceiver(unittest.TestCase): @pytest.fixture
@classmethod def zmq_context():
def setUpClass(cls): context = zmq.Context()
cls.context = zmq.Context() yield context
def test_handle_ping(self):
receiver = MainReceiver(self.context) def test_handle_ping(zmq_context):
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({"endpoint": "ping", "data": "pong"}) response = receiver.handle_message({"endpoint": "ping", "data": "pong"})
self.assertIn("endpoint", response) assert "endpoint" in response
self.assertEqual(response["endpoint"], "ping") assert response["endpoint"] == "ping"
self.assertIn("data", response) assert "data" in response
self.assertEqual(response["data"], "pong") assert response["data"] == "pong"
def test_handle_ping_none(self):
receiver = MainReceiver(self.context) def test_handle_ping_none(zmq_context):
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({"endpoint": "ping", "data": None}) response = receiver.handle_message({"endpoint": "ping", "data": None})
self.assertIn("endpoint", response) assert "endpoint" in response
self.assertEqual(response["endpoint"], "ping") assert response["endpoint"] == "ping"
self.assertIn("data", response) assert "data" in response
self.assertEqual(response["data"], None) assert response["data"] == None
@mock.patch("robot_interface.endpoints.main_receiver.state")
def test_handle_negotiate_ports(self, mock_state): @mock.patch("robot_interface.endpoints.main_receiver.state")
receiver = MainReceiver(self.context) def test_handle_negotiate_ports(mock_state, zmq_context):
receiver = MainReceiver(zmq_context)
mock_state.sockets = [receiver] mock_state.sockets = [receiver]
response = receiver.handle_message({"endpoint": "negotiate/ports", "data": None}) response = receiver.handle_message({"endpoint": "negotiate/ports", "data": None})
self.assertIn("endpoint", response) assert "endpoint" in response
self.assertEqual(response["endpoint"], "negotiate/ports") assert response["endpoint"] == "negotiate/ports"
self.assertIn("data", response) assert "data" in response
self.assertIsInstance(response["data"], list) assert isinstance(response["data"], list)
for port in response["data"]: for port in response["data"]:
self.assertIn("id", port) assert "id" in port
self.assertIsInstance(port["id"], str) assert isinstance(port["id"], str)
self.assertIn("port", port) assert "port" in port
self.assertIsInstance(port["port"], int) assert isinstance(port["port"], int)
self.assertIn("bind", port) assert "bind" in port
self.assertIsInstance(port["bind"], bool) assert isinstance(port["bind"], bool)
self.assertTrue(any(port["id"] == "main" for port in response["data"])) assert any(port["id"] == "main" for port in response["data"])
def test_handle_unimplemented_endpoint(self):
receiver = MainReceiver(self.context) def test_handle_unimplemented_endpoint(zmq_context):
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({ response = receiver.handle_message({
"endpoint": "some_endpoint_that_definitely_does_not_exist", "endpoint": "some_endpoint_that_definitely_does_not_exist",
"data": None, "data": None,
}) })
self.assertIn("endpoint", response) assert "endpoint" in response
self.assertEqual(response["endpoint"], "error") assert response["endpoint"] == "error"
self.assertIn("data", response) assert "data" in response
self.assertIsInstance(response["data"], str) assert isinstance(response["data"], str)
def test_handle_unimplemented_negotiation_endpoint(self):
receiver = MainReceiver(self.context) def test_handle_unimplemented_negotiation_endpoint(zmq_context):
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({ response = receiver.handle_message({
"endpoint": "negotiate/but_some_subpath_that_definitely_does_not_exist", "endpoint": "negotiate/but_some_subpath_that_definitely_does_not_exist",
"data": None, "data": None,
}) })
self.assertIn("endpoint", response) assert "endpoint" in response
self.assertEqual(response["endpoint"], "negotiate/error") assert response["endpoint"] == "negotiate/error"
self.assertIn("data", response) assert "data" in response
self.assertIsInstance(response["data"], str) assert isinstance(response["data"], str)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,4 +1,4 @@
import unittest import time
import mock import mock
@@ -10,8 +10,7 @@ class AnyFloat(object):
return isinstance(other, float) return isinstance(other, float)
class TestTimeBlock(unittest.TestCase): def test_no_limit():
def test_no_limit(self):
callback = mock.Mock() callback = mock.Mock()
with TimeBlock(callback): with TimeBlock(callback):
@@ -19,22 +18,20 @@ class TestTimeBlock(unittest.TestCase):
callback.assert_called_once_with(AnyFloat()) callback.assert_called_once_with(AnyFloat())
def test_exceed_limit(self):
def test_exceed_limit():
callback = mock.Mock() callback = mock.Mock()
with TimeBlock(callback, 0): with TimeBlock(callback, 0):
pass time.sleep(0.001)
callback.assert_called_once_with(AnyFloat()) callback.assert_called_once_with(AnyFloat())
def test_within_limit(self):
def test_within_limit():
callback = mock.Mock() callback = mock.Mock()
with TimeBlock(callback, 5): with TimeBlock(callback, 5):
pass pass
callback.assert_not_called() callback.assert_not_called()
if __name__ == '__main__':
unittest.main()