fix: fix the tests

ref: N25B-334
This commit is contained in:
Björn Otgaar
2025-12-09 15:37:00 +01:00
parent 3d62e7fc0c
commit 7f34fede81
2 changed files with 249 additions and 244 deletions

View File

@@ -34,14 +34,16 @@ async def test_setup_bind(zmq_context, mocker):
# Check PUB socket binding
fake_socket.bind.assert_any_call("tcp://localhost:5556")
# Check REP socket binding
fake_socket.bind.assert_any_call("tcp://localhost:7788")
# Check SUB socket connection and subscriptions
fake_socket.connect.assert_any_call("tcp://internal:1234")
fake_socket.setsockopt.assert_any_call(zmq.SUBSCRIBE, b"command")
fake_socket.setsockopt.assert_any_call(zmq.SUBSCRIBE, b"send_gestures")
# Check behavior was added
agent.add_behavior.assert_called() # Twice, even.
# Check behavior was added (twice: once for command loop, once for fetch gestures loop)
assert agent.add_behavior.call_count == 2
@pytest.mark.asyncio
@@ -60,21 +62,23 @@ async def test_setup_connect(zmq_context, mocker):
# Check PUB socket connection (not binding)
fake_socket.connect.assert_any_call("tcp://localhost:5556")
fake_socket.connect.assert_any_call("tcp://internal:1234")
# Check REP socket binding (always binds)
fake_socket.bind.assert_any_call("tcp://localhost:7788")
# Check behavior was added
agent.add_behavior.assert_called() # Twice, actually.
# Check behavior was added (twice)
assert agent.add_behavior.call_count == 2
@pytest.mark.asyncio
async def test_handle_message_sends_valid_gesture_command():
"""Internal message with valid gesture tag is forwarded to robot pub socket."""
pubsocket = AsyncMock()
agent = RobotGestureAgent("robot_gesture")
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent.pubsocket = pubsocket
payload = {
"endpoint": RIEndpoint.GESTURE_TAG,
"data": "hello", # "hello" is in availableTags
"data": "hello", # "hello" is in gesture_data
}
msg = InternalMessage(to="robot", sender="tester", body=json.dumps(payload))
@@ -85,9 +89,9 @@ async def test_handle_message_sends_valid_gesture_command():
@pytest.mark.asyncio
async def test_handle_message_sends_non_gesture_command():
"""Internal message with non-gesture endpoint is not handled by this agent."""
"""Internal message with non-gesture endpoint is not forwarded by this agent."""
pubsocket = AsyncMock()
agent = RobotGestureAgent("robot_gesture")
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent.pubsocket = pubsocket
payload = {"endpoint": "some_other_endpoint", "data": "invalid_tag_not_in_list"}
@@ -95,6 +99,7 @@ async def test_handle_message_sends_non_gesture_command():
await agent.handle_message(msg)
# Non-gesture endpoints should not be forwarded by this agent
pubsocket.send_json.assert_not_awaited()
@@ -102,10 +107,10 @@ async def test_handle_message_sends_non_gesture_command():
async def test_handle_message_rejects_invalid_gesture_tag():
"""Internal message with invalid gesture tag is not forwarded."""
pubsocket = AsyncMock()
agent = RobotGestureAgent("robot_gesture")
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent.pubsocket = pubsocket
# Use a tag that's not in availableTags
# Use a tag that's not in gesture_data
payload = {"endpoint": RIEndpoint.GESTURE_TAG, "data": "invalid_tag_not_in_list"}
msg = InternalMessage(to="robot", sender="tester", body=json.dumps(payload))
@@ -118,7 +123,7 @@ async def test_handle_message_rejects_invalid_gesture_tag():
async def test_handle_message_invalid_payload():
"""Invalid payload is caught and does not send."""
pubsocket = AsyncMock()
agent = RobotGestureAgent("robot_gesture")
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent.pubsocket = pubsocket
msg = InternalMessage(to="robot", sender="tester", body=json.dumps({"bad": "data"}))
@@ -142,7 +147,7 @@ async def test_zmq_command_loop_valid_gesture_payload():
fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture")
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent._running = True
@@ -154,7 +159,7 @@ async def test_zmq_command_loop_valid_gesture_payload():
@pytest.mark.asyncio
async def test_zmq_command_loop_valid_non_gesture_payload():
"""UI command with non-gesture endpoint is not handled by this agent."""
"""UI command with non-gesture endpoint is not forwarded by this agent."""
command = {"endpoint": "some_other_endpoint", "data": "anything"}
fake_socket = AsyncMock()
@@ -165,7 +170,7 @@ async def test_zmq_command_loop_valid_non_gesture_payload():
fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture")
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent._running = True
@@ -188,7 +193,7 @@ async def test_zmq_command_loop_invalid_gesture_tag():
fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture")
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent._running = True
@@ -210,7 +215,7 @@ async def test_zmq_command_loop_invalid_json():
fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture")
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent._running = True
@@ -232,7 +237,7 @@ async def test_zmq_command_loop_ignores_send_gestures_topic():
fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture")
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent._running = True
@@ -245,139 +250,165 @@ async def test_zmq_command_loop_ignores_send_gestures_topic():
@pytest.mark.asyncio
async def test_fetch_gestures_loop_without_amount():
"""Fetch gestures request without amount returns all tags."""
fake_socket = AsyncMock()
fake_repsocket = AsyncMock()
async def recv_once():
agent._running = False
return (b"send_gestures", b"{}")
return b"{}" # Empty JSON request
fake_socket.recv_multipart = recv_once
fake_socket.send_multipart = AsyncMock()
fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture")
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no", "wave", "point"])
agent.repsocket = fake_repsocket
agent._running = True
await agent._fetch_gestures_loop()
fake_socket.send_multipart.assert_awaited_once()
fake_repsocket.send.assert_awaited_once()
# Check the response contains all tags
args, kwargs = fake_socket.send_multipart.call_args
assert args[0][0] == b"get_gestures"
response = json.loads(args[0][1])
args, kwargs = fake_repsocket.send.call_args
response = json.loads(args[0])
assert "tags" in response
assert len(response["tags"]) > 0
# Check it includes some expected tags
assert "hello" in response["tags"]
assert "yes" in response["tags"]
assert response["tags"] == ["hello", "yes", "no", "wave", "point"]
@pytest.mark.asyncio
async def test_fetch_gestures_loop_with_amount():
"""Fetch gestures request with amount returns limited tags."""
fake_socket = AsyncMock()
amount = 5
fake_repsocket = AsyncMock()
amount = 3
async def recv_once():
agent._running = False
return (b"send_gestures", json.dumps(amount).encode())
return json.dumps(amount).encode()
fake_socket.recv_multipart = recv_once
fake_socket.send_multipart = AsyncMock()
fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture")
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no", "wave", "point"])
agent.repsocket = fake_repsocket
agent._running = True
await agent._fetch_gestures_loop()
fake_socket.send_multipart.assert_awaited_once()
fake_repsocket.send.assert_awaited_once()
args, kwargs = fake_socket.send_multipart.call_args
assert args[0][0] == b"get_gestures"
response = json.loads(args[0][1])
args, kwargs = fake_repsocket.send.call_args
response = json.loads(args[0])
assert "tags" in response
assert len(response["tags"]) == amount
assert response["tags"] == ["hello", "yes", "no"]
@pytest.mark.asyncio
async def test_fetch_gestures_loop_ignores_command_topic():
"""Command topic is ignored in fetch gestures loop."""
fake_socket = AsyncMock()
async def test_fetch_gestures_loop_with_integer_request():
"""Fetch gestures request with integer amount."""
fake_repsocket = AsyncMock()
amount = 2
async def recv_once():
agent._running = False
return (b"command", b"{}")
return json.dumps(amount).encode()
fake_socket.recv_multipart = recv_once
fake_socket.send_multipart = AsyncMock()
fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture")
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent.repsocket = fake_repsocket
agent._running = True
await agent._fetch_gestures_loop()
fake_socket.send_multipart.assert_not_awaited()
fake_repsocket.send.assert_awaited_once()
args, kwargs = fake_repsocket.send.call_args
response = json.loads(args[0])
assert response["tags"] == ["hello", "yes"]
@pytest.mark.asyncio
async def test_fetch_gestures_loop_invalid_request():
"""Invalid request body is handled gracefully."""
fake_socket = AsyncMock()
async def test_fetch_gestures_loop_with_invalid_json():
"""Invalid JSON request returns all tags."""
fake_repsocket = AsyncMock()
async def recv_once():
agent._running = False
# Send a non-integer, non-JSON body
return (b"send_gestures", b"not_json")
return b"not_json"
fake_socket.recv_multipart = recv_once
fake_socket.send_multipart = AsyncMock()
fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture")
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent.repsocket = fake_repsocket
agent._running = True
await agent._fetch_gestures_loop()
# Should still send a response (all tags)
fake_socket.send_multipart.assert_awaited_once()
fake_repsocket.send.assert_awaited_once()
args, kwargs = fake_repsocket.send.call_args
response = json.loads(args[0])
assert response["tags"] == ["hello", "yes", "no"]
def test_available_tags():
"""Test that availableTags returns the expected list."""
agent = RobotGestureAgent("robot_gesture")
@pytest.mark.asyncio
async def test_fetch_gestures_loop_with_non_integer_json():
"""Non-integer JSON request returns all tags."""
fake_repsocket = AsyncMock()
tags = agent.availableTags()
async def recv_once():
agent._running = False
return json.dumps({"not": "an_integer"}).encode()
assert isinstance(tags, list)
assert len(tags) > 0
# Check some expected tags are present
assert "hello" in tags
assert "yes" in tags
assert "no" in tags
# Check a non-existent tag is not present
assert "invalid_tag_not_in_list" not in tags
fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent.repsocket = fake_repsocket
agent._running = True
await agent._fetch_gestures_loop()
fake_repsocket.send.assert_awaited_once()
args, kwargs = fake_repsocket.send.call_args
response = json.loads(args[0])
assert response["tags"] == ["hello", "yes", "no"]
def test_gesture_data_attribute():
"""Test that gesture_data returns the expected list."""
gesture_data = ["hello", "yes", "no", "wave"]
agent = RobotGestureAgent("robot_gesture", gesture_data=gesture_data)
assert agent.gesture_data == gesture_data
assert isinstance(agent.gesture_data, list)
assert len(agent.gesture_data) == 4
assert "hello" in agent.gesture_data
assert "yes" in agent.gesture_data
assert "no" in agent.gesture_data
assert "invalid_tag_not_in_list" not in agent.gesture_data
@pytest.mark.asyncio
async def test_stop_closes_sockets():
"""Stop method closes both sockets."""
"""Stop method closes all sockets."""
pubsocket = MagicMock()
subsocket = MagicMock()
repsocket = MagicMock()
agent = RobotGestureAgent("robot_gesture")
agent.pubsocket = pubsocket
agent.subsocket = subsocket
agent.repsocket = repsocket
await agent.stop()
pubsocket.close.assert_called_once()
subsocket.close.assert_called_once()
# Note: repsocket is not closed in stop() method, but you might want to add it
# repsocket.close.assert_called_once()
@pytest.mark.asyncio
@@ -386,7 +417,28 @@ async def test_initialization_with_custom_gesture_data():
custom_gestures = ["custom1", "custom2", "custom3"]
agent = RobotGestureAgent("robot_gesture", gesture_data=custom_gestures)
# Note: The current implementation doesn't use the gesture_data parameter
# in availableTags(). This test documents that behavior.
# If you update the agent to use gesture_data, update this test accordingly.
assert agent.gesture_data == custom_gestures
@pytest.mark.asyncio
async def test_fetch_gestures_loop_handles_exception():
"""Exception in fetch gestures loop is caught and logged."""
fake_repsocket = AsyncMock()
async def recv_once():
agent._running = False
raise Exception("Test exception")
fake_repsocket.recv = recv_once
fake_repsocket.send = AsyncMock()
agent = RobotGestureAgent("robot_gesture", gesture_data=["hello", "yes", "no"])
agent.repsocket = fake_repsocket
agent.logger = MagicMock()
agent._running = True
# Should not raise exception
await agent._fetch_gestures_loop()
# Exception should be logged
agent.logger.exception.assert_called_once()

View File

@@ -1,3 +1,4 @@
# tests/test_robot_endpoints.py
import json
from unittest.mock import AsyncMock, MagicMock, patch
@@ -29,7 +30,7 @@ def client(app):
@pytest.fixture
def mock_zmq_context():
"""Mock the ZMQ context."""
"""Mock the ZMQ context used by the endpoint module."""
with patch("control_backend.api.v1.endpoints.robot.Context.instance") as mock_context:
context_instance = MagicMock()
mock_context.return_value = context_instance
@@ -38,13 +39,13 @@ def mock_zmq_context():
@pytest.fixture
def mock_sockets(mock_zmq_context):
"""Mock ZMQ sockets."""
"""Optional helper if you want both a sub and req/push socket available."""
mock_sub_socket = AsyncMock(spec=zmq.asyncio.Socket)
mock_pub_socket = AsyncMock(spec=zmq.asyncio.Socket)
mock_req_socket = AsyncMock(spec=zmq.asyncio.Socket)
mock_zmq_context.socket.return_value = mock_sub_socket
return {"sub": mock_sub_socket, "pub": mock_pub_socket}
return {"sub": mock_sub_socket, "req": mock_req_socket}
def test_receive_speech_command_success(client):
@@ -75,9 +76,8 @@ def test_receive_speech_command_success(client):
def test_receive_gesture_command_success(client):
"""
Test for successful reception of a command. Ensures the status code is 202 and the response body
is correct. It also verifies that the ZeroMQ socket's send_multipart method is called with the
expected data.
Test for successful reception of a command that is a gesture command.
Ensures the status code is 202 and the response body is correct.
"""
# Arrange
mock_pub_socket = AsyncMock()
@@ -116,7 +116,9 @@ def test_ping_check_returns_none(client):
assert response.json() is None
# TODO: Convert these mock sockets to the fixture.
# ----------------------------
# ping_stream tests (unchanged behavior)
# ----------------------------
@pytest.mark.asyncio
async def test_ping_stream_yields_ping_event(monkeypatch):
"""Test that ping_stream yields a proper SSE message when a ping is received."""
@@ -129,6 +131,11 @@ async def test_ping_stream_yields_ping_event(monkeypatch):
mock_context.socket.return_value = mock_sub_socket
monkeypatch.setattr(robot.Context, "instance", lambda: mock_context)
# patch settings address used by ping_stream
mock_settings = MagicMock()
mock_settings.zmq_settings.internal_sub_address = "tcp://localhost:5555"
monkeypatch.setattr(robot, "settings", mock_settings)
mock_request = AsyncMock()
mock_request.is_disconnected = AsyncMock(side_effect=[False, True])
@@ -142,7 +149,7 @@ async def test_ping_stream_yields_ping_event(monkeypatch):
with pytest.raises(StopAsyncIteration):
await anext(generator)
mock_sub_socket.connect.assert_called_once()
mock_sub_socket.connect.assert_called_once_with("tcp://localhost:5555")
mock_sub_socket.setsockopt.assert_called_once_with(robot.zmq.SUBSCRIBE, b"ping")
mock_sub_socket.recv_multipart.assert_awaited()
@@ -159,6 +166,10 @@ async def test_ping_stream_handles_timeout(monkeypatch):
mock_context.socket.return_value = mock_sub_socket
monkeypatch.setattr(robot.Context, "instance", lambda: mock_context)
mock_settings = MagicMock()
mock_settings.zmq_settings.internal_sub_address = "tcp://localhost:5555"
monkeypatch.setattr(robot, "settings", mock_settings)
mock_request = AsyncMock()
mock_request.is_disconnected = AsyncMock(return_value=True)
@@ -168,7 +179,7 @@ async def test_ping_stream_handles_timeout(monkeypatch):
with pytest.raises(StopAsyncIteration):
await anext(generator)
mock_sub_socket.connect.assert_called_once()
mock_sub_socket.connect.assert_called_once_with("tcp://localhost:5555")
mock_sub_socket.setsockopt.assert_called_once_with(robot.zmq.SUBSCRIBE, b"ping")
mock_sub_socket.recv_multipart.assert_awaited()
@@ -187,6 +198,10 @@ async def test_ping_stream_yields_json_values(monkeypatch):
mock_context.socket.return_value = mock_sub_socket
monkeypatch.setattr(robot.Context, "instance", lambda: mock_context)
mock_settings = MagicMock()
mock_settings.zmq_settings.internal_sub_address = "tcp://localhost:5555"
monkeypatch.setattr(robot, "settings", mock_settings)
mock_request = AsyncMock()
mock_request.is_disconnected = AsyncMock(side_effect=[False, True])
@@ -199,43 +214,33 @@ async def test_ping_stream_yields_json_values(monkeypatch):
assert "connected" in event_text
assert "true" in event_text
mock_sub_socket.connect.assert_called_once()
mock_sub_socket.connect.assert_called_once_with("tcp://localhost:5555")
mock_sub_socket.setsockopt.assert_called_once_with(robot.zmq.SUBSCRIBE, b"ping")
mock_sub_socket.recv_multipart.assert_awaited()
# New tests for get_available_gesture_tags endpoint
# ----------------------------
# Updated get_available_gesture_tags tests (REQ socket on tcp://localhost:7788)
# ----------------------------
@pytest.mark.asyncio
async def test_get_available_gesture_tags_success(client, monkeypatch):
"""
Test successful retrieval of available gesture tags.
Test successful retrieval of available gesture tags using a REQ socket.
"""
# Arrange
mock_sub_socket = AsyncMock()
mock_sub_socket.connect = MagicMock()
mock_sub_socket.setsockopt = MagicMock()
# Simulate a response with gesture tags
mock_req_socket = AsyncMock(spec=zmq.asyncio.Socket)
mock_req_socket.connect = MagicMock()
mock_req_socket.send = AsyncMock()
response_data = {"tags": ["wave", "nod", "point", "dance"]}
mock_sub_socket.recv_multipart = AsyncMock(
return_value=[b"get_gestures", json.dumps(response_data).encode()]
)
mock_req_socket.recv = AsyncMock(return_value=json.dumps(response_data).encode())
mock_context = MagicMock()
mock_context.socket.return_value = mock_sub_socket
mock_context.socket.return_value = mock_req_socket
monkeypatch.setattr(robot.Context, "instance", lambda: mock_context)
mock_pub_socket = AsyncMock()
client.app.state.endpoints_pub_socket = mock_pub_socket
# Mock settings
mock_settings = MagicMock()
mock_settings.zmq_settings.internal_sub_address = "tcp://localhost:5555"
monkeypatch.setattr(robot, "settings", mock_settings)
# Mock logger to avoid actual logging
mock_logger = MagicMock()
monkeypatch.setattr(robot.logger, "debug", mock_logger)
# Replace logger methods to avoid noisy logs in tests
monkeypatch.setattr(robot.logger, "debug", MagicMock())
monkeypatch.setattr(robot.logger, "error", MagicMock())
# Act
response = client.get("/get_available_gesture_tags")
@@ -244,135 +249,97 @@ async def test_get_available_gesture_tags_success(client, monkeypatch):
assert response.status_code == 200
assert response.json() == {"available_gesture_tags": ["wave", "nod", "point", "dance"]}
# Verify ZeroMQ interactions
mock_sub_socket.connect.assert_called_once_with("tcp://localhost:5555")
mock_sub_socket.setsockopt.assert_called_once_with(robot.zmq.SUBSCRIBE, b"get_gestures")
mock_pub_socket.send_multipart.assert_awaited_once_with([b"send_gestures", b""])
mock_sub_socket.recv_multipart.assert_awaited_once()
# Verify ZeroMQ REQ interactions
mock_req_socket.connect.assert_called_once_with("tcp://localhost:7788")
mock_req_socket.send.assert_awaited_once_with(b"None")
mock_req_socket.recv.assert_awaited_once()
@pytest.mark.asyncio
async def test_get_available_gesture_tags_with_amount(client, monkeypatch):
"""
Test retrieval of gesture tags with a specific amount parameter.
This tests the TODO in the endpoint about getting a certain amount from the UI.
The endpoint currently ignores the 'amount' TODO, so behavior is the same as 'success'.
This test asserts that the endpoint still sends b"None" and returns the tags.
"""
# Arrange
mock_sub_socket = AsyncMock()
mock_sub_socket.connect = MagicMock()
mock_sub_socket.setsockopt = MagicMock()
# Simulate a response with gesture tags
mock_req_socket = AsyncMock(spec=zmq.asyncio.Socket)
mock_req_socket.connect = MagicMock()
mock_req_socket.send = AsyncMock()
response_data = {"tags": ["wave", "nod"]}
mock_sub_socket.recv_multipart = AsyncMock(
return_value=[b"get_gestures", json.dumps(response_data).encode()]
)
mock_req_socket.recv = AsyncMock(return_value=json.dumps(response_data).encode())
mock_context = MagicMock()
mock_context.socket.return_value = mock_sub_socket
mock_context.socket.return_value = mock_req_socket
monkeypatch.setattr(robot.Context, "instance", lambda: mock_context)
mock_pub_socket = AsyncMock()
client.app.state.endpoints_pub_socket = mock_pub_socket
# Mock settings
mock_settings = MagicMock()
mock_settings.zmq_settings.internal_sub_address = "tcp://localhost:5555"
monkeypatch.setattr(robot, "settings", mock_settings)
# Mock logger
mock_logger = MagicMock()
monkeypatch.setattr(robot.logger, "debug", mock_logger)
# Act - Note: The endpoint currently doesn't support query parameters for amount,
# but we're testing what happens if the UI sends an amount (the TODO in the code)
# For now, we test the current behavior
response = client.get("/get_available_gesture_tags")
# Assert
assert response.status_code == 200
assert response.json() == {"available_gesture_tags": ["wave", "nod"]}
# The endpoint currently doesn't use the amount parameter, so it should send empty bytes
mock_pub_socket.send_multipart.assert_awaited_once_with([b"send_gestures", b""])
@pytest.mark.asyncio
async def test_get_available_gesture_tags_timeout(client, monkeypatch):
"""
Test timeout scenario when fetching gesture tags.
"""
# Arrange
mock_sub_socket = AsyncMock()
mock_sub_socket.connect = MagicMock()
mock_sub_socket.setsockopt = MagicMock()
# Simulate a timeout
mock_sub_socket.recv_multipart = AsyncMock(side_effect=TimeoutError)
mock_context = MagicMock()
mock_context.socket.return_value = mock_sub_socket
monkeypatch.setattr(robot.Context, "instance", lambda: mock_context)
mock_pub_socket = AsyncMock()
client.app.state.endpoints_pub_socket = mock_pub_socket
# Mock settings
mock_settings = MagicMock()
mock_settings.zmq_settings.internal_sub_address = "tcp://localhost:5555"
monkeypatch.setattr(robot, "settings", mock_settings)
# Mock logger to verify debug message is logged
mock_logger = MagicMock()
monkeypatch.setattr(robot.logger, "debug", mock_logger)
monkeypatch.setattr(robot.logger, "debug", MagicMock())
monkeypatch.setattr(robot.logger, "error", MagicMock())
# Act
response = client.get("/get_available_gesture_tags")
# Assert
assert response.status_code == 200
assert response.json() == {"available_gesture_tags": ["wave", "nod"]}
mock_req_socket.connect.assert_called_once_with("tcp://localhost:7788")
mock_req_socket.send.assert_awaited_once_with(b"None")
@pytest.mark.asyncio
async def test_get_available_gesture_tags_timeout(client, monkeypatch):
"""
Test timeout scenario when fetching gesture tags. Endpoint should handle TimeoutError
and return an empty list while logging the timeout.
"""
# Arrange
mock_req_socket = AsyncMock(spec=zmq.asyncio.Socket)
mock_req_socket.connect = MagicMock()
mock_req_socket.send = AsyncMock()
mock_req_socket.recv = AsyncMock(side_effect=TimeoutError)
mock_context = MagicMock()
mock_context.socket.return_value = mock_req_socket
monkeypatch.setattr(robot.Context, "instance", lambda: mock_context)
# Patch logger.debug so we can assert it was called with the expected message
mock_debug = MagicMock()
monkeypatch.setattr(robot.logger, "debug", mock_debug)
monkeypatch.setattr(robot.logger, "error", MagicMock())
# Act
response = client.get("/get_available_gesture_tags")
# Assert
assert response.status_code == 200
# On timeout, body becomes b"" and json.loads(b"") raises JSONDecodeError
# But looking at the endpoint code, it will try to parse empty bytes which will fail
# Let's check what actually happens
assert response.json() == {"available_gesture_tags": []}
# Verify the timeout was logged
mock_logger.assert_called_once_with("got timeout error fetching gestures")
# Verify the timeout was logged using the exact string from the endpoint code
mock_debug.assert_called_once_with("Got timeout error fetching gestures.")
# Verify ZeroMQ interactions
mock_sub_socket.connect.assert_called_once_with("tcp://localhost:5555")
mock_sub_socket.setsockopt.assert_called_once_with(robot.zmq.SUBSCRIBE, b"get_gestures")
mock_pub_socket.send_multipart.assert_awaited_once_with([b"send_gestures", b""])
mock_sub_socket.recv_multipart.assert_awaited_once()
mock_req_socket.connect.assert_called_once_with("tcp://localhost:7788")
mock_req_socket.send.assert_awaited_once_with(b"None")
mock_req_socket.recv.assert_awaited_once()
@pytest.mark.asyncio
async def test_get_available_gesture_tags_empty_response(client, monkeypatch):
"""
Test scenario when response contains no tags.
Test scenario when response contains an empty 'tags' list.
"""
# Arrange
mock_sub_socket = AsyncMock()
mock_sub_socket.connect = MagicMock()
mock_sub_socket.setsockopt = MagicMock()
# Simulate a response with empty tags
mock_req_socket = AsyncMock(spec=zmq.asyncio.Socket)
mock_req_socket.connect = MagicMock()
mock_req_socket.send = AsyncMock()
response_data = {"tags": []}
mock_sub_socket.recv_multipart = AsyncMock(
return_value=[b"get_gestures", json.dumps(response_data).encode()]
)
mock_req_socket.recv = AsyncMock(return_value=json.dumps(response_data).encode())
mock_context = MagicMock()
mock_context.socket.return_value = mock_sub_socket
mock_context.socket.return_value = mock_req_socket
monkeypatch.setattr(robot.Context, "instance", lambda: mock_context)
mock_pub_socket = AsyncMock()
client.app.state.endpoints_pub_socket = mock_pub_socket
# Mock settings
mock_settings = MagicMock()
mock_settings.zmq_settings.internal_sub_address = "tcp://localhost:5555"
monkeypatch.setattr(robot, "settings", mock_settings)
monkeypatch.setattr(robot.logger, "debug", MagicMock())
monkeypatch.setattr(robot.logger, "error", MagicMock())
# Act
response = client.get("/get_available_gesture_tags")
@@ -388,65 +355,51 @@ async def test_get_available_gesture_tags_missing_tags_key(client, monkeypatch):
Test scenario when response JSON doesn't contain 'tags' key.
"""
# Arrange
mock_sub_socket = AsyncMock()
mock_sub_socket.connect = MagicMock()
mock_sub_socket.setsockopt = MagicMock()
# Simulate a response without 'tags' key
mock_req_socket = AsyncMock(spec=zmq.asyncio.Socket)
mock_req_socket.connect = MagicMock()
mock_req_socket.send = AsyncMock()
response_data = {"some_other_key": "value"}
mock_sub_socket.recv_multipart = AsyncMock(
return_value=[b"get_gestures", json.dumps(response_data).encode()]
)
mock_req_socket.recv = AsyncMock(return_value=json.dumps(response_data).encode())
mock_context = MagicMock()
mock_context.socket.return_value = mock_sub_socket
mock_context.socket.return_value = mock_req_socket
monkeypatch.setattr(robot.Context, "instance", lambda: mock_context)
mock_pub_socket = AsyncMock()
client.app.state.endpoints_pub_socket = mock_pub_socket
# Mock settings
mock_settings = MagicMock()
mock_settings.zmq_settings.internal_sub_address = "tcp://localhost:5555"
monkeypatch.setattr(robot, "settings", mock_settings)
monkeypatch.setattr(robot.logger, "debug", MagicMock())
monkeypatch.setattr(robot.logger, "error", MagicMock())
# Act
response = client.get("/get_available_gesture_tags")
# Assert
assert response.status_code == 200
# .get("tags", []) should return empty list if 'tags' key is missing
assert response.json() == {"available_gesture_tags": []}
@pytest.mark.asyncio
async def test_get_available_gesture_tags_invalid_json(client, monkeypatch):
"""
Test scenario when response contains invalid JSON.
Test scenario when response contains invalid JSON. Endpoint should log the error
and return an empty list.
"""
# Arrange
mock_sub_socket = AsyncMock()
mock_sub_socket.connect = MagicMock()
mock_sub_socket.setsockopt = MagicMock()
# Simulate a response with invalid JSON
mock_sub_socket.recv_multipart = AsyncMock(return_value=[b"get_gestures", b"invalid json"])
mock_req_socket = AsyncMock(spec=zmq.asyncio.Socket)
mock_req_socket.connect = MagicMock()
mock_req_socket.send = AsyncMock()
mock_req_socket.recv = AsyncMock(return_value=b"invalid json")
mock_context = MagicMock()
mock_context.socket.return_value = mock_sub_socket
mock_context.socket.return_value = mock_req_socket
monkeypatch.setattr(robot.Context, "instance", lambda: mock_context)
mock_pub_socket = AsyncMock()
client.app.state.endpoints_pub_socket = mock_pub_socket
# Mock settings
mock_settings = MagicMock()
mock_settings.zmq_settings.internal_sub_address = "tcp://localhost:5555"
monkeypatch.setattr(robot, "settings", mock_settings)
mock_error = MagicMock()
monkeypatch.setattr(robot.logger, "error", mock_error)
monkeypatch.setattr(robot.logger, "debug", MagicMock())
# Act
response = client.get("/get_available_gesture_tags")
# Assert - invalid JSON should raise an exception
# Assert - invalid JSON should lead to empty list and error log invocation
assert response.status_code == 200
assert response.json() == {"available_gesture_tags": []}
assert mock_error.call_count == 1