import asyncio import sys import uuid from unittest.mock import AsyncMock import pytest from control_backend.agents.bdi.bdi_program_manager import BDIProgramManager from control_backend.core.agent_system import InternalMessage from control_backend.schemas.belief_message import BeliefMessage from control_backend.schemas.program import BasicNorm, Goal, Phase, Plan, Program # Fix Windows Proactor loop for zmq if sys.platform.startswith("win"): asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) def make_valid_program_json(norm="N1", goal="G1") -> str: return Program( phases=[ Phase( id=uuid.uuid4(), name="Basic Phase", norms=[ BasicNorm( id=uuid.uuid4(), name=norm, norm=norm, ), ], goals=[ Goal( id=uuid.uuid4(), name=goal, plan=Plan( id=uuid.uuid4(), name="Goal Plan", steps=[], ), can_fail=False, ), ], triggers=[], ), ], ).model_dump_json() @pytest.mark.skip(reason="Functionality being rebuilt.") @pytest.mark.asyncio async def test_send_to_bdi(): manager = BDIProgramManager(name="program_manager_test") manager.send = AsyncMock() program = Program.model_validate_json(make_valid_program_json()) await manager._create_agentspeak_and_send_to_bdi(program) assert manager.send.await_count == 1 msg: InternalMessage = manager.send.await_args[0][0] assert msg.thread == "beliefs" beliefs = BeliefMessage.model_validate_json(msg.body) names = {b.name: b.arguments for b in beliefs.beliefs} assert "norms" in names and names["norms"] == ["N1"] assert "goals" in names and names["goals"] == ["G1"] @pytest.mark.asyncio async def test_receive_programs_valid_and_invalid(): sub = AsyncMock() sub.recv_multipart.side_effect = [ (b"program", b"{bad json"), (b"program", make_valid_program_json().encode()), ] manager = BDIProgramManager(name="program_manager_test") manager.sub_socket = sub manager._create_agentspeak_and_send_to_bdi = AsyncMock() try: # Will give StopAsyncIteration when the predefined `sub.recv_multipart` side-effects run out await manager._receive_programs() except StopAsyncIteration: pass # Only valid Program should have triggered _send_to_bdi assert manager._create_agentspeak_and_send_to_bdi.await_count == 1 forwarded: Program = manager._create_agentspeak_and_send_to_bdi.await_args[0][0] assert forwarded.phases[0].norms[0].name == "N1" assert forwarded.phases[0].goals[0].name == "G1"