117 lines
3.8 KiB
Python
117 lines
3.8 KiB
Python
import asyncio
|
|
import sys
|
|
import uuid
|
|
from unittest.mock import AsyncMock
|
|
|
|
import pytest
|
|
|
|
from control_backend.agents.bdi.bdi_program_manager import BDIProgramManager
|
|
from control_backend.core.agent_system import InternalMessage
|
|
from control_backend.schemas.belief_message import BeliefMessage
|
|
from control_backend.schemas.program import BasicNorm, Goal, Phase, Plan, Program
|
|
|
|
# Fix Windows Proactor loop for zmq
|
|
if sys.platform.startswith("win"):
|
|
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
|
|
|
|
|
|
def make_valid_program_json(norm="N1", goal="G1") -> str:
|
|
return Program(
|
|
phases=[
|
|
Phase(
|
|
id=uuid.uuid4(),
|
|
name="Basic Phase",
|
|
norms=[
|
|
BasicNorm(
|
|
id=uuid.uuid4(),
|
|
name=norm,
|
|
norm=norm,
|
|
),
|
|
],
|
|
goals=[
|
|
Goal(
|
|
id=uuid.uuid4(),
|
|
name=goal,
|
|
description="This description can be used to determine whether the goal "
|
|
"has been achieved.",
|
|
plan=Plan(
|
|
id=uuid.uuid4(),
|
|
name="Goal Plan",
|
|
steps=[],
|
|
),
|
|
can_fail=False,
|
|
),
|
|
],
|
|
triggers=[],
|
|
),
|
|
],
|
|
).model_dump_json()
|
|
|
|
|
|
@pytest.mark.skip(reason="Functionality being rebuilt.")
|
|
@pytest.mark.asyncio
|
|
async def test_send_to_bdi():
|
|
manager = BDIProgramManager(name="program_manager_test")
|
|
manager.send = AsyncMock()
|
|
|
|
program = Program.model_validate_json(make_valid_program_json())
|
|
await manager._create_agentspeak_and_send_to_bdi(program)
|
|
|
|
assert manager.send.await_count == 1
|
|
msg: InternalMessage = manager.send.await_args[0][0]
|
|
assert msg.thread == "beliefs"
|
|
|
|
beliefs = BeliefMessage.model_validate_json(msg.body)
|
|
names = {b.name: b.arguments for b in beliefs.beliefs}
|
|
|
|
assert "norms" in names and names["norms"] == ["N1"]
|
|
assert "goals" in names and names["goals"] == ["G1"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_receive_programs_valid_and_invalid():
|
|
sub = AsyncMock()
|
|
sub.recv_multipart.side_effect = [
|
|
(b"program", b"{bad json"),
|
|
(b"program", make_valid_program_json().encode()),
|
|
]
|
|
|
|
manager = BDIProgramManager(name="program_manager_test")
|
|
manager._internal_pub_socket = AsyncMock()
|
|
manager.sub_socket = sub
|
|
manager._create_agentspeak_and_send_to_bdi = AsyncMock()
|
|
manager._send_clear_llm_history = AsyncMock()
|
|
|
|
try:
|
|
# Will give StopAsyncIteration when the predefined `sub.recv_multipart` side-effects run out
|
|
await manager._receive_programs()
|
|
except StopAsyncIteration:
|
|
pass
|
|
|
|
# Only valid Program should have triggered _send_to_bdi
|
|
assert manager._create_agentspeak_and_send_to_bdi.await_count == 1
|
|
forwarded: Program = manager._create_agentspeak_and_send_to_bdi.await_args[0][0]
|
|
assert forwarded.phases[0].norms[0].name == "N1"
|
|
assert forwarded.phases[0].goals[0].name == "G1"
|
|
|
|
# Verify history clear was triggered
|
|
assert manager._send_clear_llm_history.await_count == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_clear_llm_history(mock_settings):
|
|
# Ensure the mock returns a string for the agent name (just like in your LLM tests)
|
|
mock_settings.agent_settings.llm_agent_name = "llm_agent"
|
|
|
|
manager = BDIProgramManager(name="program_manager_test")
|
|
manager.send = AsyncMock()
|
|
|
|
await manager._send_clear_llm_history()
|
|
|
|
assert manager.send.await_count == 2
|
|
msg: InternalMessage = manager.send.await_args_list[0][0][0]
|
|
|
|
# Verify the content and recipient
|
|
assert msg.body == "clear_history"
|
|
assert msg.to == "llm_agent"
|