import json from unittest.mock import AsyncMock, MagicMock import pytest import zmq from control_backend.agents.actuation.robot_speech_agent import RobotSpeechAgent from control_backend.core.agent_system import InternalMessage @pytest.fixture def zmq_context(mocker): mock_context = mocker.patch( "control_backend.agents.actuation.robot_speech_agent.azmq.Context.instance" ) mock_context.return_value = MagicMock() return mock_context @pytest.mark.asyncio async def test_setup_bind(zmq_context, mocker): """Setup binds and subscribes to internal commands.""" fake_socket = zmq_context.return_value.socket.return_value agent = RobotSpeechAgent("robot_speech", address="tcp://localhost:5555", bind=True) settings = mocker.patch("control_backend.agents.actuation.robot_speech_agent.settings") settings.zmq_settings.internal_sub_address = "tcp://internal:1234" agent.add_behavior = MagicMock() await agent.setup() fake_socket.bind.assert_any_call("tcp://localhost:5555") fake_socket.connect.assert_any_call("tcp://internal:1234") fake_socket.setsockopt.assert_any_call(zmq.SUBSCRIBE, b"command") agent.add_behavior.assert_called_once() @pytest.mark.asyncio async def test_setup_connect(zmq_context, mocker): """Setup connects when bind=False.""" fake_socket = zmq_context.return_value.socket.return_value agent = RobotSpeechAgent("robot_speech", address="tcp://localhost:5555", bind=False) settings = mocker.patch("control_backend.agents.actuation.robot_speech_agent.settings") settings.zmq_settings.internal_sub_address = "tcp://internal:1234" agent.add_behavior = MagicMock() await agent.setup() fake_socket.connect.assert_any_call("tcp://localhost:5555") fake_socket.connect.assert_any_call("tcp://internal:1234") agent.add_behavior.assert_called_once() @pytest.mark.asyncio async def test_handle_message_sends_command(): """Internal message is forwarded to robot pub socket as JSON.""" pubsocket = AsyncMock() agent = RobotSpeechAgent("robot_speech") agent.pubsocket = pubsocket payload = {"endpoint": "actuate/speech", "data": "hello"} msg = InternalMessage(to="robot", sender="tester", body=json.dumps(payload)) await agent.handle_message(msg) pubsocket.send_json.assert_awaited_once_with(payload) @pytest.mark.asyncio async def test_zmq_command_loop_valid_payload(zmq_context): """UI command is read from SUB and published.""" command = {"endpoint": "actuate/speech", "data": "hello"} fake_socket = AsyncMock() async def recv_once(): # stop after first iteration agent._running = False return (b"command", json.dumps(command).encode("utf-8")) fake_socket.recv_multipart = recv_once fake_socket.send_json = AsyncMock() agent = RobotSpeechAgent("robot_speech") agent.subsocket = fake_socket agent.pubsocket = fake_socket agent._running = True await agent._zmq_command_loop() fake_socket.send_json.assert_awaited_once_with(command) @pytest.mark.asyncio async def test_zmq_command_loop_invalid_json(): """Invalid JSON is ignored without sending.""" fake_socket = AsyncMock() async def recv_once(): agent._running = False return (b"command", b"{not_json}") fake_socket.recv_multipart = recv_once fake_socket.send_json = AsyncMock() agent = RobotSpeechAgent("robot_speech") agent.subsocket = fake_socket agent.pubsocket = fake_socket agent._running = True await agent._zmq_command_loop() fake_socket.send_json.assert_not_awaited() @pytest.mark.asyncio async def test_handle_message_invalid_payload(): """Invalid payload is caught and does not send.""" pubsocket = AsyncMock() agent = RobotSpeechAgent("robot_speech") agent.pubsocket = pubsocket msg = InternalMessage(to="robot", sender="tester", body=json.dumps({"bad": "data"})) await agent.handle_message(msg) pubsocket.send_json.assert_not_awaited() @pytest.mark.asyncio async def test_stop_closes_sockets(): pubsocket = MagicMock() subsocket = MagicMock() agent = RobotSpeechAgent("robot_speech") agent.pubsocket = pubsocket agent.subsocket = subsocket await agent.stop() pubsocket.close.assert_called_once() subsocket.close.assert_called_once()