from unittest.mock import AsyncMock, MagicMock import pytest import zmq import control_backend.agents.perception.face_rec_agent as face_module from control_backend.agents.perception.face_rec_agent import FacePerceptionAgent from control_backend.core.agent_system import InternalMessage from control_backend.schemas.belief_message import BeliefMessage # ------------------------- # Fixtures # ------------------------- @pytest.fixture def agent(): """Return a FacePerceptionAgent instance for testing.""" return FacePerceptionAgent( name="face_agent", zmq_address="inproc://test", zmq_bind=False, ) @pytest.fixture def socket(): """Return a mocked ZMQ socket.""" sock = AsyncMock() sock.setsockopt_string = MagicMock() sock.connect = MagicMock() sock.bind = MagicMock() return sock # ------------------------- # Socket setup tests # ------------------------- def test_connect_socket_connect(agent, socket, monkeypatch): """Test that _connect_socket properly connects when zmq_bind=False.""" ctx = MagicMock() ctx.socket.return_value = socket monkeypatch.setattr(face_module.azmq, "Context", MagicMock(instance=lambda: ctx)) agent._connect_socket() socket.setsockopt_string.assert_called_once_with(zmq.SUBSCRIBE, "") socket.connect.assert_called_once_with(agent._zmq_address) socket.bind.assert_not_called() def test_connect_socket_bind(agent, socket, monkeypatch): """Test that _connect_socket properly binds when zmq_bind=True.""" agent._zmq_bind = True ctx = MagicMock() ctx.socket.return_value = socket monkeypatch.setattr(face_module.azmq, "Context", MagicMock(instance=lambda: ctx)) agent._connect_socket() socket.bind.assert_called_once_with(agent._zmq_address) socket.connect.assert_not_called() def test_connect_socket_twice_is_noop(agent, socket): """Test that calling _connect_socket twice does not overwrite an existing socket.""" agent._socket = socket agent._connect_socket() assert agent._socket is socket # ------------------------- # Belief update tests # ------------------------- @pytest.mark.asyncio async def test_update_face_belief_present(agent): """Test that _update_face_belief(True) creates the 'face_present' belief.""" agent.send = AsyncMock() await agent._update_face_belief(True) msg = agent.send.await_args.args[0] payload = BeliefMessage.model_validate_json(msg.body) assert payload.create[0].name == "face_present" @pytest.mark.asyncio async def test_update_face_belief_absent(agent): """Test that _update_face_belief(False) deletes the 'face_present' belief.""" agent.send = AsyncMock() await agent._update_face_belief(False) msg = agent.send.await_args.args[0] payload = BeliefMessage.model_validate_json(msg.body) assert payload.delete[0].name == "face_present" @pytest.mark.asyncio async def test_post_face_belief_present(agent): """Test that _post_face_belief(True) sends a belief creation message.""" agent.send = AsyncMock() await agent._post_face_belief(True) msg = agent.send.await_args.args[0] assert '"create"' in msg.body and '"face_present"' in msg.body @pytest.mark.asyncio async def test_post_face_belief_absent(agent): """Test that _post_face_belief(False) sends a belief deletion message.""" agent.send = AsyncMock() await agent._post_face_belief(False) msg = agent.send.await_args.args[0] assert '"delete"' in msg.body and '"face_present"' in msg.body # ------------------------- # Message handling tests # ------------------------- @pytest.mark.asyncio async def test_handle_pause(agent): """Test that a 'PAUSE' message clears _paused and resets _last_face_state.""" agent._paused.set() agent._last_face_state = True msg = InternalMessage( to=agent.name, sender=face_module.settings.agent_settings.user_interrupt_name, thread="cmd", body="PAUSE", ) await agent.handle_message(msg) assert not agent._paused.is_set() assert agent._last_face_state is None @pytest.mark.asyncio async def test_handle_resume(agent): """Test that a 'RESUME' message sets _paused.""" agent._paused.clear() msg = InternalMessage( to=agent.name, sender=face_module.settings.agent_settings.user_interrupt_name, thread="cmd", body="RESUME", ) await agent.handle_message(msg) assert agent._paused.is_set() @pytest.mark.asyncio async def test_handle_unknown_command(agent): """Test that an unknown command from UserInterruptAgent is ignored (logs a warning).""" msg = InternalMessage( to=agent.name, sender=face_module.settings.agent_settings.user_interrupt_name, thread="cmd", body="???", ) await agent.handle_message(msg) @pytest.mark.asyncio async def test_handle_unknown_sender(agent): """Test that messages from unknown senders are ignored.""" msg = InternalMessage( to=agent.name, sender="someone_else", thread="cmd", body="PAUSE", ) await agent.handle_message(msg)