chore: added missing tests

This commit is contained in:
Pim Hutting
2026-01-19 16:01:59 +01:00
parent da0f48e96d
commit 215bafe27f
3 changed files with 349 additions and 2 deletions

View File

@@ -8,7 +8,17 @@ import pytest
from control_backend.agents.bdi.bdi_program_manager import BDIProgramManager
from control_backend.core.agent_system import InternalMessage
from control_backend.schemas.program import BasicNorm, Goal, Phase, Plan, Program
from control_backend.schemas.program import (
BasicNorm,
ConditionalNorm,
Goal,
InferredBelief,
KeywordBelief,
Phase,
Plan,
Program,
Trigger,
)
# Fix Windows Proactor loop for zmq
if sys.platform.startswith("win"):
@@ -295,3 +305,98 @@ async def test_setup(mock_settings):
# 3. Adds behavior
manager.add_behavior.assert_called()
@pytest.mark.asyncio
async def test_send_program_to_user_interrupt(mock_settings):
"""Test directly sending the program to the user interrupt agent."""
mock_settings.agent_settings.user_interrupt_name = "user_interrupt_agent"
manager = BDIProgramManager(name="program_manager_test")
manager.send = AsyncMock()
program = Program.model_validate_json(make_valid_program_json())
await manager._send_program_to_user_interrupt(program)
assert manager.send.await_count == 1
msg = manager.send.await_args[0][0]
assert msg.to == "user_interrupt_agent"
assert msg.thread == "new_program"
assert "Basic Phase" in msg.body
@pytest.mark.asyncio
async def test_complex_program_extraction():
manager = BDIProgramManager(name="program_manager_test")
# 1. Create Complex Components
# Inferred Belief (A & B)
belief_left = KeywordBelief(id=uuid.uuid4(), name="b1", keyword="hot")
belief_right = KeywordBelief(id=uuid.uuid4(), name="b2", keyword="sunny")
inferred_belief = InferredBelief(
id=uuid.uuid4(), name="b_inf", operator="AND", left=belief_left, right=belief_right
)
# Conditional Norm
cond_norm = ConditionalNorm(
id=uuid.uuid4(), name="norm_cond", norm="wear_hat", condition=inferred_belief
)
# Trigger with Inferred Belief condition
dummy_plan = Plan(id=uuid.uuid4(), name="dummy_plan", steps=[])
trigger = Trigger(id=uuid.uuid4(), name="trigger_1", condition=inferred_belief, plan=dummy_plan)
# Nested Goal
sub_goal = Goal(
id=uuid.uuid4(),
name="sub_goal",
description="desc",
plan=Plan(id=uuid.uuid4(), name="empty", steps=[]),
can_fail=True,
)
parent_goal = Goal(
id=uuid.uuid4(),
name="parent_goal",
description="desc",
# The plan contains the sub_goal as a step
plan=Plan(id=uuid.uuid4(), name="parent_plan", steps=[sub_goal]),
can_fail=False,
)
# 2. Assemble Program
phase = Phase(
id=uuid.uuid4(),
name="Complex Phase",
norms=[cond_norm],
goals=[parent_goal],
triggers=[trigger],
)
program = Program(phases=[phase])
# 3. Initialize Internal State (Triggers _populate_goal_mapping -> Nested Goal logic)
manager._initialize_internal_state(program)
# Assertion for Line 53-54 (Mapping population)
# Both parent and sub-goal should be mapped
assert str(parent_goal.id) in manager._goal_mapping
assert str(sub_goal.id) in manager._goal_mapping
# 4. Test Belief Extraction (Triggers lines 132-133, 142-146)
beliefs = manager._extract_current_beliefs()
# Should extract recursive beliefs from cond_norm and trigger
# Inferred belief splits into Left + Right. Since we use it twice, we get duplicates
# checking existence is enough.
belief_names = [b.name for b in beliefs]
assert "b1" in belief_names
assert "b2" in belief_names
# 5. Test Goal Extraction (Triggers lines 173, 185)
goals = manager._extract_current_goals()
goal_names = [g.name for g in goals]
assert "parent_goal" in goal_names
assert "sub_goal" in goal_names

View File

@@ -1,12 +1,13 @@
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.program import (
ConditionalNorm,
Goal,
@@ -309,3 +310,192 @@ async def test_send_pause_command(agent):
m for m in agent.send.call_args_list if m.args[0].to == settings.agent_settings.vad_name
).args[0]
assert vad_msg.body == "RESUME"
@pytest.mark.asyncio
async def test_setup(agent):
"""Test the setup method initializes sockets correctly."""
with patch("control_backend.agents.user_interrupt.user_interrupt_agent.Context") as MockContext:
mock_ctx_instance = MagicMock()
MockContext.instance.return_value = mock_ctx_instance
mock_sub = MagicMock()
mock_pub = MagicMock()
mock_ctx_instance.socket.side_effect = [mock_sub, mock_pub]
# MOCK add_behavior so we don't rely on internal attributes
agent.add_behavior = MagicMock()
await agent.setup()
# Check sockets
mock_sub.connect.assert_called_with(settings.zmq_settings.internal_sub_address)
mock_pub.connect.assert_called_with(settings.zmq_settings.internal_pub_address)
# Verify add_behavior was called
agent.add_behavior.assert_called_once()
@pytest.mark.asyncio
async def test_receive_loop_advanced_scenarios(agent):
"""
Covers:
- JSONDecodeError (lines 86-88)
- Override: Trigger found (lines 108-109)
- Override: Norm found (lines 114-115)
- Override: Nothing found (line 134)
- Override Unachieve: Success & Fail (lines 136-145)
- Pause: Context true/false logs (lines 150-157)
- Next Phase (line 160)
"""
# 1. Setup Data Maps
agent._trigger_map["101"] = "trigger_slug"
agent._cond_norm_map["202"] = "norm_slug"
# 2. Define Payloads
# A. Invalid JSON
bad_json = b"INVALID{JSON"
# B. Override -> Trigger
override_trigger = json.dumps({"type": "override", "context": "101"}).encode()
# C. Override -> Norm
override_norm = json.dumps({"type": "override", "context": "202"}).encode()
# D. Override -> Unknown
override_fail = json.dumps({"type": "override", "context": "999"}).encode()
# E. Unachieve -> Success
unachieve_success = json.dumps({"type": "override_unachieve", "context": "202"}).encode()
# F. Unachieve -> Fail
unachieve_fail = json.dumps({"type": "override_unachieve", "context": "999"}).encode()
# G. Pause (True)
pause_true = json.dumps({"type": "pause", "context": "true"}).encode()
# H. Pause (False/Resume)
pause_false = json.dumps({"type": "pause", "context": ""}).encode()
# I. Next Phase
next_phase = json.dumps({"type": "next_phase", "context": ""}).encode()
# 3. Setup Socket
agent.sub_socket.recv_multipart.side_effect = [
(b"topic", bad_json),
(b"topic", override_trigger),
(b"topic", override_norm),
(b"topic", override_fail),
(b"topic", unachieve_success),
(b"topic", unachieve_fail),
(b"topic", pause_true),
(b"topic", pause_false),
(b"topic", next_phase),
asyncio.CancelledError, # End loop
]
# Mock internal helpers to verify calls
agent._send_to_bdi = AsyncMock()
agent._send_to_bdi_belief = AsyncMock()
agent._send_pause_command = AsyncMock()
agent._send_experiment_control_to_bdi_core = AsyncMock()
# 4. Run Loop
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
# 5. Assertions
# JSON Error
agent.logger.error.assert_called_with("Received invalid JSON payload on topic %s", b"topic")
# Override Trigger
agent._send_to_bdi.assert_awaited_with("force_trigger", "trigger_slug")
# Override Norm
# We expect _send_to_bdi_belief to be called for the norm
# Note: The loop calls _send_to_bdi_belief(asl_cond_norm, "cond_norm")
agent._send_to_bdi_belief.assert_any_call("norm_slug", "cond_norm")
# Override Fail (Warning log)
agent.logger.warning.assert_any_call("Could not determine which element to override.")
# Unachieve Success
# Loop calls _send_to_bdi_belief(asl_cond_norm, "cond_norm", True)
agent._send_to_bdi_belief.assert_any_call("norm_slug", "cond_norm", True)
# Unachieve Fail
agent.logger.warning.assert_any_call("Could not determine which conditional norm to unachieve.")
# Pause Logic
agent._send_pause_command.assert_any_call("true")
agent.logger.info.assert_any_call("Sent pause command.")
# Resume Logic
agent._send_pause_command.assert_any_call("")
agent.logger.info.assert_any_call("Sent resume command.")
# Next Phase
agent._send_experiment_control_to_bdi_core.assert_awaited_with("next_phase")
@pytest.mark.asyncio
async def test_handle_message_unknown_thread(agent):
"""Test handling of an unknown message thread (lines 213-214)."""
msg = InternalMessage(to="me", thread="unknown_thread", body="test")
await agent.handle_message(msg)
agent.logger.debug.assert_called_with(
"Received internal message on unhandled thread: unknown_thread"
)
@pytest.mark.asyncio
async def test_send_to_bdi_belief_edge_cases(agent):
"""
Covers:
- Unknown asl_type warning (lines 326-328)
- unachieve=True logic (lines 334-337)
"""
# 1. Unknown Type
await agent._send_to_bdi_belief("slug", "unknown_type")
agent.logger.warning.assert_called_with("Tried to send belief with unknown type")
agent.send.assert_not_called()
# Reset mock for part 2
agent.send.reset_mock()
# 2. Unachieve = True
await agent._send_to_bdi_belief("slug", "cond_norm", unachieve=True)
agent.send.assert_awaited()
sent_msg = agent.send.call_args.args[0]
# Verify it is a delete operation
body_obj = BeliefMessage.model_validate_json(sent_msg.body)
# Verify 'delete' has content
assert body_obj.delete is not None
assert len(body_obj.delete) == 1
assert body_obj.delete[0].name == "force_slug"
# Verify 'create' is empty (handling both None and [])
assert not body_obj.create
@pytest.mark.asyncio
async def test_send_experiment_control_unknown(agent):
"""Test sending an unknown experiment control type (lines 366-367)."""
await agent._send_experiment_control_to_bdi_core("invalid_command")
agent.logger.warning.assert_called_with(
"Received unknown experiment control type '%s' to send to BDI Core.", "invalid_command"
)
# Ensure it still sends an empty message (as per code logic, though thread is empty)
agent.send.assert_awaited()
msg = agent.send.call_args[0][0]
assert msg.thread == ""

View File

@@ -94,3 +94,55 @@ async def test_experiment_stream_direct_call():
mock_socket.connect.assert_called()
mock_socket.subscribe.assert_called_with(b"experiment")
mock_socket.close.assert_called()
@pytest.mark.asyncio
async def test_status_stream_direct_call():
"""
Test the status stream, ensuring it handles messages and sends pings on timeout.
"""
mock_socket = AsyncMock()
# Define the sequence of events for the socket:
# 1. Successfully receive a message
# 2. Timeout (which should trigger the ': ping' yield)
# 3. Another message (which won't be reached because we'll simulate disconnect)
mock_socket.recv_multipart.side_effect = [
(b"topic", b"status_update"),
TimeoutError(),
(b"topic", b"ignored_msg"),
]
mock_socket.close = MagicMock()
mock_socket.connect = MagicMock()
mock_socket.subscribe = MagicMock()
mock_context = MagicMock()
mock_context.socket.return_value = mock_socket
# Mock the ZMQ Context to return our mock_socket
with patch(
"control_backend.api.v1.endpoints.user_interact.Context.instance", return_value=mock_context
):
mock_request = AsyncMock()
# is_disconnected sequence:
# 1. False -> Process "status_update"
# 2. False -> Process TimeoutError (yield ping)
# 3. True -> Break loop (client disconnected)
mock_request.is_disconnected.side_effect = [False, False, True]
# Call the status_stream function explicitly
response = await user_interact.status_stream(mock_request)
lines = []
async for line in response.body_iterator:
lines.append(line)
# Assertions
assert "data: status_update\n\n" in lines
assert ": ping\n\n" in lines # Verify lines 91-92 (ping logic)
mock_socket.connect.assert_called()
mock_socket.subscribe.assert_called_with(b"status")
mock_socket.close.assert_called()