test: increased cb test coverage

This commit is contained in:
JobvAlewijk
2025-12-09 13:14:02 +00:00
committed by Luijkx,S.O.H. (Storm)
parent de2e56ffce
commit 7f7c658901
16 changed files with 941 additions and 7 deletions

View File

@@ -2,7 +2,7 @@
import asyncio
import logging
from unittest.mock import AsyncMock
from unittest.mock import AsyncMock, MagicMock
import pytest
@@ -70,3 +70,142 @@ async def test_get_agent():
agent = ConcreteTestAgent("registrant")
assert AgentDirectory.get("registrant") == agent
assert AgentDirectory.get("non_existent") is None
class DummyAgent(BaseAgent):
async def setup(self):
pass # we will test this separately
async def handle_message(self, msg: InternalMessage):
self.last_handled = msg
@pytest.mark.asyncio
async def test_base_agent_setup_is_noop():
agent = DummyAgent("dummy")
# Should simply return without error
assert await agent.setup() is None
@pytest.mark.asyncio
async def test_send_to_local_agent(monkeypatch):
sender = DummyAgent("sender")
target = DummyAgent("receiver")
# Fake logger
sender.logger = MagicMock()
# Patch inbox.put
target.inbox.put = AsyncMock()
message = InternalMessage(to="receiver", sender="sender", body="hello")
await sender.send(message)
target.inbox.put.assert_awaited_once_with(message)
sender.logger.debug.assert_called_once()
@pytest.mark.asyncio
async def test_process_inbox_calls_handle_message(monkeypatch):
agent = DummyAgent("dummy")
agent.logger = MagicMock()
# Make agent running so loop triggers
agent._running = True
# Prepare inbox to give one message then stop
msg = InternalMessage(to="dummy", sender="x", body="test")
async def get_once():
agent._running = False # stop after first iteration
return msg
agent.inbox.get = AsyncMock(side_effect=get_once)
agent.handle_message = AsyncMock()
await agent._process_inbox()
agent.handle_message.assert_awaited_once_with(msg)
@pytest.mark.asyncio
async def test_receive_internal_zmq_loop_success(monkeypatch):
agent = DummyAgent("dummy")
agent.logger = MagicMock()
agent._running = True
mock_socket = MagicMock()
mock_socket.recv_multipart = AsyncMock(
side_effect=[
(
b"topic",
InternalMessage(to="dummy", sender="x", body="hi").model_dump_json().encode(),
),
asyncio.CancelledError(), # stop loop
]
)
agent._internal_sub_socket = mock_socket
agent.inbox.put = AsyncMock()
await agent._receive_internal_zmq_loop()
agent.inbox.put.assert_awaited() # message forwarded
@pytest.mark.asyncio
async def test_receive_internal_zmq_loop_exception_logs_error():
agent = DummyAgent("dummy")
agent.logger = MagicMock()
agent._running = True
mock_socket = MagicMock()
mock_socket.recv_multipart = AsyncMock(
side_effect=[Exception("boom"), asyncio.CancelledError()]
)
agent._internal_sub_socket = mock_socket
agent.inbox.put = AsyncMock()
await agent._receive_internal_zmq_loop()
agent.logger.exception.assert_called_once()
assert "Could not process ZMQ message." in agent.logger.exception.call_args[0][0]
@pytest.mark.asyncio
async def test_base_agent_handle_message_not_implemented():
class RawAgent(BaseAgent):
async def setup(self):
pass
agent = RawAgent("raw")
msg = InternalMessage(to="raw", sender="x", body="hi")
with pytest.raises(NotImplementedError):
await BaseAgent.handle_message(agent, msg)
@pytest.mark.asyncio
async def test_base_agent_setup_abstract_method_body_executes():
"""
Covers the 'pass' inside BaseAgent.setup().
Since BaseAgent is abstract, we do NOT instantiate it.
We call the coroutine function directly on BaseAgent and pass a dummy self.
"""
class Dummy:
"""Minimal stub to act as 'self'."""
pass
stub = Dummy()
# Call BaseAgent.setup() as an unbound coroutine, passing stub as 'self'
result = await BaseAgent.setup(stub)
# The method contains only 'pass', so it returns None
assert result is None

View File

@@ -86,3 +86,34 @@ def test_setup_logging_zmq_handler(mock_zmq_context):
args = mock_dict_config.call_args[0][0]
assert "interface_or_socket" in args["handlers"]["ui"]
def test_add_logging_level_method_name_exists_in_logging():
# method_name explicitly set to an existing logging method → triggers first hasattr branch
with pytest.raises(AttributeError) as exc:
add_logging_level("NEWDUPLEVEL", 37, method_name="info")
assert "info already defined in logging module" in str(exc.value)
def test_add_logging_level_method_name_exists_in_logger_class():
# 'makeRecord' exists on Logger class but not on the logging module
with pytest.raises(AttributeError) as exc:
add_logging_level("ANOTHERLEVEL", 38, method_name="makeRecord")
assert "makeRecord already defined in logger class" in str(exc.value)
def test_add_logging_level_log_to_root_path_executes_without_error():
# Verify log_to_root is installed and callable — without asserting logging output
level_name = "ROOTTEST"
level_num = 36
add_logging_level(level_name, level_num)
# Simply call the injected root logger method
# The line is executed even if we don't validate output
root_logging_method = getattr(logging, level_name.lower(), None)
assert callable(root_logging_method)
# Execute the method to hit log_to_root in coverage.
# No need to verify log output.
root_logging_method("some message")