chore: applied feedback

This commit is contained in:
Pim Hutting
2026-01-27 11:46:24 +01:00
parent 27f91150e1
commit 9b040ffc62

View File

@@ -337,108 +337,136 @@ async def test_setup(agent):
@pytest.mark.asyncio
async def test_receive_loop_advanced_scenarios(agent):
"""
Covers:
- JSONDecodeError (lines 86-88)
- Override: Trigger found (lines 108-109)
- Override: Norm found (lines 114-115)
- Override: Nothing found (line 134)
- Override Unachieve: Success & Fail (lines 136-145)
- Pause: Context true/false logs (lines 150-157)
- Next Phase (line 160)
"""
# 1. Setup Data Maps
agent._trigger_map["101"] = "trigger_slug"
agent._cond_norm_map["202"] = "norm_slug"
# 2. Define Payloads
# A. Invalid JSON
bad_json = b"INVALID{JSON"
# B. Override -> Trigger
override_trigger = json.dumps({"type": "override", "context": "101"}).encode()
# C. Override -> Norm
override_norm = json.dumps({"type": "override", "context": "202"}).encode()
# D. Override -> Unknown
override_fail = json.dumps({"type": "override", "context": "999"}).encode()
# E. Unachieve -> Success
unachieve_success = json.dumps({"type": "override_unachieve", "context": "202"}).encode()
# F. Unachieve -> Fail
unachieve_fail = json.dumps({"type": "override_unachieve", "context": "999"}).encode()
# G. Pause (True)
pause_true = json.dumps({"type": "pause", "context": "true"}).encode()
# H. Pause (False/Resume)
pause_false = json.dumps({"type": "pause", "context": ""}).encode()
# I. Next Phase
next_phase = json.dumps({"type": "next_phase", "context": ""}).encode()
# 3. Setup Socket
async def test_receive_loop_json_error(agent):
"""Verify that malformed JSON is caught and logged without crashing the loop."""
agent.sub_socket.recv_multipart.side_effect = [
(b"topic", bad_json),
(b"topic", override_trigger),
(b"topic", override_norm),
(b"topic", override_fail),
(b"topic", unachieve_success),
(b"topic", unachieve_fail),
(b"topic", pause_true),
(b"topic", pause_false),
(b"topic", next_phase),
asyncio.CancelledError, # End loop
(b"topic", b"INVALID{JSON"),
asyncio.CancelledError,
]
# Mock internal helpers to verify calls
agent._send_to_bdi = AsyncMock()
agent._send_to_bdi_belief = AsyncMock()
agent._send_pause_command = AsyncMock()
agent._send_experiment_control_to_bdi_core = AsyncMock()
# 4. Run Loop
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
# 5. Assertions
# JSON Error
agent.logger.error.assert_called_with("Received invalid JSON payload on topic %s", b"topic")
# Override Trigger
agent._send_to_bdi.assert_awaited_with("force_trigger", "trigger_slug")
# Override Norm
# We expect _send_to_bdi_belief to be called for the norm
# Note: The loop calls _send_to_bdi_belief(asl_cond_norm, "cond_norm")
agent._send_to_bdi_belief.assert_any_call("norm_slug", "cond_norm")
@pytest.mark.asyncio
async def test_receive_loop_override_trigger(agent):
"""Verify routing 'override' to a Trigger."""
agent._trigger_map["101"] = "trigger_slug"
payload = json.dumps({"type": "override", "context": "101"}).encode()
# Override Fail (Warning log)
agent.logger.warning.assert_any_call("Could not determine which element to override.")
agent.sub_socket.recv_multipart.side_effect = [(b"topic", payload), asyncio.CancelledError]
agent._send_to_bdi = AsyncMock()
# Unachieve Success
# Loop calls _send_to_bdi_belief(asl_cond_norm, "cond_norm", True)
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
agent._send_to_bdi.assert_awaited_once_with("force_trigger", "trigger_slug")
@pytest.mark.asyncio
async def test_receive_loop_override_norm(agent):
"""Verify routing 'override' to a Conditional Norm."""
agent._cond_norm_map["202"] = "norm_slug"
payload = json.dumps({"type": "override", "context": "202"}).encode()
agent.sub_socket.recv_multipart.side_effect = [(b"topic", payload), asyncio.CancelledError]
agent._send_to_bdi_belief = AsyncMock()
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
agent._send_to_bdi_belief.assert_awaited_once_with("norm_slug", "cond_norm")
@pytest.mark.asyncio
async def test_receive_loop_override_missing(agent):
"""Verify warning log when an override ID is not found in any map."""
payload = json.dumps({"type": "override", "context": "999"}).encode()
agent.sub_socket.recv_multipart.side_effect = [(b"topic", payload), asyncio.CancelledError]
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
agent.logger.warning.assert_called_with("Could not determine which element to override.")
@pytest.mark.asyncio
async def test_receive_loop_unachieve_logic(agent):
"""Verify success and failure paths for override_unachieve."""
agent._cond_norm_map["202"] = "norm_slug"
success_payload = json.dumps({"type": "override_unachieve", "context": "202"}).encode()
fail_payload = json.dumps({"type": "override_unachieve", "context": "999"}).encode()
agent.sub_socket.recv_multipart.side_effect = [
(b"topic", success_payload),
(b"topic", fail_payload),
asyncio.CancelledError,
]
agent._send_to_bdi_belief = AsyncMock()
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
# Assert success call (True flag for unachieve)
agent._send_to_bdi_belief.assert_any_call("norm_slug", "cond_norm", True)
# Assert failure log
agent.logger.warning.assert_called_with(
"Could not determine which conditional norm to unachieve."
)
# Unachieve Fail
agent.logger.warning.assert_any_call("Could not determine which conditional norm to unachieve.")
# Pause Logic
@pytest.mark.asyncio
async def test_receive_loop_pause_resume(agent):
"""Verify pause and resume toggle logic and logging."""
pause_payload = json.dumps({"type": "pause", "context": "true"}).encode()
resume_payload = json.dumps({"type": "pause", "context": ""}).encode()
agent.sub_socket.recv_multipart.side_effect = [
(b"topic", pause_payload),
(b"topic", resume_payload),
asyncio.CancelledError,
]
agent._send_pause_command = AsyncMock()
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
agent._send_pause_command.assert_any_call("true")
agent.logger.info.assert_any_call("Sent pause command.")
# Resume Logic
agent._send_pause_command.assert_any_call("")
agent.logger.info.assert_any_call("Sent pause command.")
agent.logger.info.assert_any_call("Sent resume command.")
# Next Phase
agent._send_experiment_control_to_bdi_core.assert_awaited_with("next_phase")
@pytest.mark.asyncio
async def test_receive_loop_phase_control(agent):
"""Verify experiment flow control (next_phase)."""
payload = json.dumps({"type": "next_phase", "context": ""}).encode()
agent.sub_socket.recv_multipart.side_effect = [(b"topic", payload), asyncio.CancelledError]
agent._send_experiment_control_to_bdi_core = AsyncMock()
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
agent._send_experiment_control_to_bdi_core.assert_awaited_once_with("next_phase")
@pytest.mark.asyncio