Files
pepperplus-cb/test/unit/agents/bdi/test_bdi_program_manager.py
2026-01-07 11:44:48 +01:00

95 lines
3.1 KiB
Python

import asyncio
import sys
import uuid
from unittest.mock import AsyncMock
import pytest
from control_backend.agents.bdi.bdi_program_manager import BDIProgramManager
from control_backend.core.agent_system import InternalMessage
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.program import BasicNorm, Goal, Phase, Plan, Program
# Fix Windows Proactor loop for zmq
if sys.platform.startswith("win"):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
def make_valid_program_json(norm="N1", goal="G1") -> str:
return Program(
phases=[
Phase(
id=uuid.uuid4(),
name="Basic Phase",
norms=[
BasicNorm(
id=uuid.uuid4(),
name=norm,
norm=norm,
),
],
goals=[
Goal(
id=uuid.uuid4(),
name=goal,
description="This description can be used to determine whether the goal "
"has been achieved.",
plan=Plan(
id=uuid.uuid4(),
name="Goal Plan",
steps=[],
),
can_fail=False,
),
],
triggers=[],
),
],
).model_dump_json()
@pytest.mark.skip(reason="Functionality being rebuilt.")
@pytest.mark.asyncio
async def test_send_to_bdi():
manager = BDIProgramManager(name="program_manager_test")
manager.send = AsyncMock()
program = Program.model_validate_json(make_valid_program_json())
await manager._create_agentspeak_and_send_to_bdi(program)
assert manager.send.await_count == 1
msg: InternalMessage = manager.send.await_args[0][0]
assert msg.thread == "beliefs"
beliefs = BeliefMessage.model_validate_json(msg.body)
names = {b.name: b.arguments for b in beliefs.beliefs}
assert "norms" in names and names["norms"] == ["N1"]
assert "goals" in names and names["goals"] == ["G1"]
@pytest.mark.asyncio
async def test_receive_programs_valid_and_invalid():
sub = AsyncMock()
sub.recv_multipart.side_effect = [
(b"program", b"{bad json"),
(b"program", make_valid_program_json().encode()),
]
manager = BDIProgramManager(name="program_manager_test")
manager._internal_pub_socket = AsyncMock()
manager.sub_socket = sub
manager._create_agentspeak_and_send_to_bdi = AsyncMock()
try:
# Will give StopAsyncIteration when the predefined `sub.recv_multipart` side-effects run out
await manager._receive_programs()
except StopAsyncIteration:
pass
# Only valid Program should have triggered _send_to_bdi
assert manager._create_agentspeak_and_send_to_bdi.await_count == 1
forwarded: Program = manager._create_agentspeak_and_send_to_bdi.await_args[0][0]
assert forwarded.phases[0].norms[0].name == "N1"
assert forwarded.phases[0].goals[0].name == "G1"