From 04d19cee5cc6faaa17a5b9f51f7fbed1eb95ea2a Mon Sep 17 00:00:00 2001 From: Kasper Marinus Date: Mon, 19 Jan 2026 14:08:26 +0100 Subject: [PATCH 1/7] feat: (maybe) stop response when new user message If we get a new message before the LLM is done responding, interrupt it. ref: N25B-452 --- .../agents/bdi/bdi_core_agent.py | 2 +- .../agents/bdi/text_belief_extractor_agent.py | 3 ++ src/control_backend/agents/llm/llm_agent.py | 45 ++++++++++++++----- src/control_backend/core/config.py | 1 + 4 files changed, 39 insertions(+), 12 deletions(-) diff --git a/src/control_backend/agents/bdi/bdi_core_agent.py b/src/control_backend/agents/bdi/bdi_core_agent.py index 628bb53..685a3b6 100644 --- a/src/control_backend/agents/bdi/bdi_core_agent.py +++ b/src/control_backend/agents/bdi/bdi_core_agent.py @@ -338,7 +338,7 @@ class BDICoreAgent(BaseAgent): yield @self.actions.add(".reply_with_goal", 3) - def _reply_with_goal(agent: "BDICoreAgent", term, intention): + def _reply_with_goal(agent, term, intention): """ Let the LLM generate a response to a user's utterance with the current norms and a specific goal. diff --git a/src/control_backend/agents/bdi/text_belief_extractor_agent.py b/src/control_backend/agents/bdi/text_belief_extractor_agent.py index 362dfbf..9ea6b9a 100644 --- a/src/control_backend/agents/bdi/text_belief_extractor_agent.py +++ b/src/control_backend/agents/bdi/text_belief_extractor_agent.py @@ -318,6 +318,9 @@ class TextBeliefExtractorAgent(BaseAgent): async with httpx.AsyncClient() as client: response = await client.post( settings.llm_settings.local_llm_url, + headers={"Authorization": f"Bearer {settings.llm_settings.api_key}"} + if settings.llm_settings.api_key + else {}, json={ "model": settings.llm_settings.local_llm_model, "messages": [{"role": "user", "content": prompt}], diff --git a/src/control_backend/agents/llm/llm_agent.py b/src/control_backend/agents/llm/llm_agent.py index 1c72dfc..7cac097 100644 --- a/src/control_backend/agents/llm/llm_agent.py +++ b/src/control_backend/agents/llm/llm_agent.py @@ -1,3 +1,4 @@ +import asyncio import json import re import uuid @@ -32,6 +33,9 @@ class LLMAgent(BaseAgent): def __init__(self, name: str): super().__init__(name) self.history = [] + self._querying = False + self._interrupted = False + self._go_ahead = asyncio.Event() async def setup(self): self.logger.info("Setting up %s.", self.name) @@ -50,7 +54,7 @@ class LLMAgent(BaseAgent): case "prompt_message": try: prompt_message = LLMPromptMessage.model_validate_json(msg.body) - await self._process_bdi_message(prompt_message) + self.add_behavior(self._process_bdi_message(prompt_message)) # no block except ValidationError: self.logger.debug("Prompt message from BDI core is invalid.") case "assistant_message": @@ -73,12 +77,35 @@ class LLMAgent(BaseAgent): :param message: The parsed prompt message containing text, norms, and goals. """ + if self._querying: + self.logger.debug("Received another BDI prompt while processing previous message.") + self._interrupted = True # interrupt the previous processing + await self._go_ahead.wait() # wait until we get the go-ahead + + self._go_ahead.clear() + self._querying = True full_message = "" async for chunk in self._query_llm(message.text, message.norms, message.goals): + if self._interrupted: + self.logger.debug("Interrupted processing of previous message.") + break await self._send_reply(chunk) full_message += chunk - self.logger.debug("Finished processing BDI message. Response sent in chunks to BDI core.") - await self._send_full_reply(full_message) + else: + self._querying = False + + self.history.append( + { + "role": "assistant", + "content": full_message, + } + ) + self.logger.debug( + "Finished processing BDI message. Response sent in chunks to BDI core." + ) + await self._send_full_reply(full_message) + + self._interrupted = False async def _send_reply(self, msg: str): """ @@ -141,7 +168,7 @@ class LLMAgent(BaseAgent): full_message += token current_chunk += token - self.logger.llm( + self.logger.debug( "Received token: %s", full_message, extra={"reference": message_id}, # Used in the UI to update old logs @@ -159,13 +186,6 @@ class LLMAgent(BaseAgent): # Yield any remaining tail if current_chunk: yield current_chunk - - self.history.append( - { - "role": "assistant", - "content": full_message, - } - ) except httpx.HTTPError as err: self.logger.error("HTTP error.", exc_info=err) yield "LLM service unavailable." @@ -185,6 +205,9 @@ class LLMAgent(BaseAgent): async with client.stream( "POST", settings.llm_settings.local_llm_url, + headers={"Authorization": f"Bearer {settings.llm_settings.api_key}"} + if settings.llm_settings.api_key + else {}, json={ "model": settings.llm_settings.local_llm_model, "messages": messages, diff --git a/src/control_backend/core/config.py b/src/control_backend/core/config.py index 329a246..82b9ede 100644 --- a/src/control_backend/core/config.py +++ b/src/control_backend/core/config.py @@ -117,6 +117,7 @@ class LLMSettings(BaseModel): local_llm_url: str = "http://localhost:1234/v1/chat/completions" local_llm_model: str = "gpt-oss" + api_key: str = "" chat_temperature: float = 1.0 code_temperature: float = 0.3 n_parallel: int = 4 From c0789e82a985efe6867dc7df029d170594d0de97 Mon Sep 17 00:00:00 2001 From: Kasper Marinus Date: Mon, 19 Jan 2026 14:47:11 +0100 Subject: [PATCH 2/7] feat: add previously interrupted message to current ref: N25B-452 --- src/control_backend/agents/llm/llm_agent.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/control_backend/agents/llm/llm_agent.py b/src/control_backend/agents/llm/llm_agent.py index 7cac097..ca0cd78 100644 --- a/src/control_backend/agents/llm/llm_agent.py +++ b/src/control_backend/agents/llm/llm_agent.py @@ -35,6 +35,7 @@ class LLMAgent(BaseAgent): self.history = [] self._querying = False self._interrupted = False + self._interrupted_message = "" self._go_ahead = asyncio.Event() async def setup(self): @@ -82,11 +83,14 @@ class LLMAgent(BaseAgent): self._interrupted = True # interrupt the previous processing await self._go_ahead.wait() # wait until we get the go-ahead + message.text = f"{self._interrupted_message} {message.text}" + self._go_ahead.clear() self._querying = True full_message = "" async for chunk in self._query_llm(message.text, message.norms, message.goals): if self._interrupted: + self._interrupted_message = message self.logger.debug("Interrupted processing of previous message.") break await self._send_reply(chunk) @@ -105,6 +109,7 @@ class LLMAgent(BaseAgent): ) await self._send_full_reply(full_message) + self._go_ahead.set() self._interrupted = False async def _send_reply(self, msg: str): From 1cd5b46f9743c823c40b7f2d8e484b03d4e4f33e Mon Sep 17 00:00:00 2001 From: Kasper Marinus Date: Mon, 19 Jan 2026 15:03:59 +0100 Subject: [PATCH 3/7] fix: should work now Also added trimming to Windows transcription. ref: N25B-452 --- src/control_backend/agents/llm/llm_agent.py | 4 ++-- .../perception/transcription_agent/speech_recognizer.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/control_backend/agents/llm/llm_agent.py b/src/control_backend/agents/llm/llm_agent.py index ca0cd78..db7e363 100644 --- a/src/control_backend/agents/llm/llm_agent.py +++ b/src/control_backend/agents/llm/llm_agent.py @@ -90,7 +90,7 @@ class LLMAgent(BaseAgent): full_message = "" async for chunk in self._query_llm(message.text, message.norms, message.goals): if self._interrupted: - self._interrupted_message = message + self._interrupted_message = message.text self.logger.debug("Interrupted processing of previous message.") break await self._send_reply(chunk) @@ -173,7 +173,7 @@ class LLMAgent(BaseAgent): full_message += token current_chunk += token - self.logger.debug( + self.logger.llm( "Received token: %s", full_message, extra={"reference": message_id}, # Used in the UI to update old logs diff --git a/src/control_backend/agents/perception/transcription_agent/speech_recognizer.py b/src/control_backend/agents/perception/transcription_agent/speech_recognizer.py index 9fae676..1fe7e3f 100644 --- a/src/control_backend/agents/perception/transcription_agent/speech_recognizer.py +++ b/src/control_backend/agents/perception/transcription_agent/speech_recognizer.py @@ -145,4 +145,6 @@ class OpenAIWhisperSpeechRecognizer(SpeechRecognizer): def recognize_speech(self, audio: np.ndarray) -> str: self.load_model() - return whisper.transcribe(self.model, audio, **self._get_decode_options(audio))["text"] + return whisper.transcribe(self.model, audio, **self._get_decode_options(audio))[ + "text" + ].strip() From 230afef16fe5630086aa9cfc609064dec38f7464 Mon Sep 17 00:00:00 2001 From: Kasper Marinus Date: Mon, 19 Jan 2026 16:06:17 +0100 Subject: [PATCH 4/7] test: fix tests ref: N25B-452 --- .../agents/bdi/bdi_core_agent.py | 4 -- src/control_backend/agents/llm/llm_agent.py | 12 +++- test/unit/agents/llm/test_llm_agent.py | 66 +++++++++++++------ 3 files changed, 55 insertions(+), 27 deletions(-) diff --git a/src/control_backend/agents/bdi/bdi_core_agent.py b/src/control_backend/agents/bdi/bdi_core_agent.py index 685a3b6..54b5149 100644 --- a/src/control_backend/agents/bdi/bdi_core_agent.py +++ b/src/control_backend/agents/bdi/bdi_core_agent.py @@ -512,10 +512,6 @@ class BDICoreAgent(BaseAgent): yield - @self.actions.add(".notify_ui", 0) - def _notify_ui(agent, term, intention): - pass - async def _send_to_llm(self, text: str, norms: str, goals: str): """ Sends a text query to the LLM agent asynchronously. diff --git a/src/control_backend/agents/llm/llm_agent.py b/src/control_backend/agents/llm/llm_agent.py index db7e363..8d81249 100644 --- a/src/control_backend/agents/llm/llm_agent.py +++ b/src/control_backend/agents/llm/llm_agent.py @@ -59,9 +59,9 @@ class LLMAgent(BaseAgent): except ValidationError: self.logger.debug("Prompt message from BDI core is invalid.") case "assistant_message": - self.history.append({"role": "assistant", "content": msg.body}) + self._apply_conversation_message({"role": "assistant", "content": msg.body}) case "user_message": - self.history.append({"role": "user", "content": msg.body}) + self._apply_conversation_message({"role": "user", "content": msg.body}) elif msg.sender == settings.agent_settings.bdi_program_manager_name: if msg.body == "clear_history": self.logger.debug("Clearing conversation history.") @@ -98,7 +98,7 @@ class LLMAgent(BaseAgent): else: self._querying = False - self.history.append( + self._apply_conversation_message( { "role": "assistant", "content": full_message, @@ -112,6 +112,12 @@ class LLMAgent(BaseAgent): self._go_ahead.set() self._interrupted = False + def _apply_conversation_message(self, message: dict[str, str]): + if len(self.history) > 0 and message["role"] == self.history[-1]["role"]: + self.history[-1]["content"] += " " + message["content"] + return + self.history.append(message) + async def _send_reply(self, msg: str): """ Sends a response message (chunk) back to the BDI Core Agent. diff --git a/test/unit/agents/llm/test_llm_agent.py b/test/unit/agents/llm/test_llm_agent.py index a1cc297..bd407cc 100644 --- a/test/unit/agents/llm/test_llm_agent.py +++ b/test/unit/agents/llm/test_llm_agent.py @@ -61,8 +61,52 @@ async def test_llm_processing_success(mock_httpx_client, mock_settings): thread="prompt_message", # REQUIRED: thread must match handle_message logic ) + agent._process_bdi_message = AsyncMock() + await agent.handle_message(msg) + agent._process_bdi_message.assert_called() + + +@pytest.mark.asyncio +async def test_process_bdi_message_success(mock_httpx_client, mock_settings): + # Setup the mock response for the stream + mock_response = MagicMock() + mock_response.raise_for_status = MagicMock() + + # Simulate stream lines + lines = [ + b'data: {"choices": [{"delta": {"content": "Hello"}}]}', + b'data: {"choices": [{"delta": {"content": " world"}}]}', + b'data: {"choices": [{"delta": {"content": "."}}]}', + b"data: [DONE]", + ] + + async def aiter_lines_gen(): + for line in lines: + yield line.decode() + + mock_response.aiter_lines.side_effect = aiter_lines_gen + + mock_stream_context = MagicMock() + mock_stream_context.__aenter__ = AsyncMock(return_value=mock_response) + mock_stream_context.__aexit__ = AsyncMock(return_value=None) + + # Configure the client + mock_httpx_client.stream = MagicMock(return_value=mock_stream_context) + + # Setup Agent + agent = LLMAgent("llm_agent") + agent.send = AsyncMock() # Mock the send method to verify replies + + mock_logger = MagicMock() + agent.logger = mock_logger + + # Simulate receiving a message from BDI + prompt = LLMPromptMessage(text="Hi", norms=[], goals=[]) + + await agent._process_bdi_message(prompt) + # Verification # "Hello world." constitutes one sentence/chunk based on punctuation split # The agent should call send once with the full sentence, PLUS once more for full reply @@ -79,28 +123,16 @@ async def test_llm_processing_errors(mock_httpx_client, mock_settings): agent = LLMAgent("llm_agent") agent.send = AsyncMock() prompt = LLMPromptMessage(text="Hi", norms=[], goals=[]) - msg = InternalMessage( - to="llm", - sender=mock_settings.agent_settings.bdi_core_name, - body=prompt.model_dump_json(), - thread="prompt_message", - ) # HTTP Error: stream method RAISES exception immediately mock_httpx_client.stream = MagicMock(side_effect=httpx.HTTPError("Fail")) - await agent.handle_message(msg) + await agent._process_bdi_message(prompt) # Check that error message was sent assert agent.send.called assert "LLM service unavailable." in agent.send.call_args_list[0][0][0].body - # General Exception - agent.send.reset_mock() - mock_httpx_client.stream = MagicMock(side_effect=Exception("Boom")) - await agent.handle_message(msg) - assert "Error processing the request." in agent.send.call_args_list[0][0][0].body - @pytest.mark.asyncio async def test_llm_json_error(mock_httpx_client, mock_settings): @@ -125,13 +157,7 @@ async def test_llm_json_error(mock_httpx_client, mock_settings): agent.logger = MagicMock() prompt = LLMPromptMessage(text="Hi", norms=[], goals=[]) - msg = InternalMessage( - to="llm", - sender=mock_settings.agent_settings.bdi_core_name, - body=prompt.model_dump_json(), - thread="prompt_message", - ) - await agent.handle_message(msg) + await agent._process_bdi_message(prompt) agent.logger.error.assert_called() # Should log JSONDecodeError From 2404c847aec1f104268cc0abc0adaad8e4728583 Mon Sep 17 00:00:00 2001 From: Pim Hutting Date: Tue, 27 Jan 2026 11:25:25 +0100 Subject: [PATCH 5/7] feat: added recursive goal mapping and tests ref: N25B-400 --- .../user_interrupt/user_interrupt_agent.py | 17 +++++-- .../user_interrupt/test_user_interrupt.py | 50 +++++++++++++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py index 117f83c..2046564 100644 --- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -8,7 +8,7 @@ from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings from control_backend.schemas.belief_message import Belief, BeliefMessage -from control_backend.schemas.program import ConditionalNorm, Program +from control_backend.schemas.program import ConditionalNorm, Goal, Program from control_backend.schemas.ri_message import ( GestureCommand, PauseCommand, @@ -246,6 +246,18 @@ class UserInterruptAgent(BaseAgent): self._cond_norm_map = {} self._cond_norm_reverse_map = {} + def _register_goal(goal: Goal): + """Recursively register goals and their subgoals.""" + slug = AgentSpeakGenerator.slugify(goal) + self._goal_map[str(goal.id)] = slug + self._goal_reverse_map[slug] = str(goal.id) + + # Recursively check steps for subgoals + if goal.plan and goal.plan.steps: + for step in goal.plan.steps: + if isinstance(step, Goal): + _register_goal(step) + for phase in program.phases: for trigger in phase.triggers: slug = AgentSpeakGenerator.slugify(trigger) @@ -253,8 +265,7 @@ class UserInterruptAgent(BaseAgent): self._trigger_reverse_map[slug] = str(trigger.id) for goal in phase.goals: - self._goal_map[str(goal.id)] = AgentSpeakGenerator.slugify(goal) - self._goal_reverse_map[AgentSpeakGenerator.slugify(goal)] = str(goal.id) + _register_goal(goal) for goal, id in self._goal_reverse_map.items(): self.logger.debug(f"Goal mapping: UI ID {goal} -> {id}") diff --git a/test/unit/agents/user_interrupt/test_user_interrupt.py b/test/unit/agents/user_interrupt/test_user_interrupt.py index a69a830..9f325f3 100644 --- a/test/unit/agents/user_interrupt/test_user_interrupt.py +++ b/test/unit/agents/user_interrupt/test_user_interrupt.py @@ -527,3 +527,53 @@ async def test_send_experiment_control_unknown(agent): agent.send.assert_awaited() msg = agent.send.call_args[0][0] assert msg.thread == "" + + +@pytest.mark.asyncio +async def test_create_mapping_recursive_goals(agent): + """Verify that nested subgoals are correctly registered in the mapping.""" + import uuid + + # 1. Setup IDs + parent_goal_id = uuid.uuid4() + child_goal_id = uuid.uuid4() + + # 2. Create the child goal + child_goal = Goal( + id=child_goal_id, + name="child_goal", + description="I am a subgoal", + plan=Plan(id=uuid.uuid4(), name="p_child", steps=[]), + ) + + # 3. Create the parent goal and put the child goal inside its plan steps + parent_goal = Goal( + id=parent_goal_id, + name="parent_goal", + description="I am a parent", + plan=Plan(id=uuid.uuid4(), name="p_parent", steps=[child_goal]), # Nested here + ) + + # 4. Build the program + phase = Phase( + id=uuid.uuid4(), + name="phase1", + norms=[], + goals=[parent_goal], # Only the parent is top-level + triggers=[], + ) + prog = Program(phases=[phase]) + + # 5. Execute mapping + msg = InternalMessage(to="me", thread="new_program", body=prog.model_dump_json()) + await agent.handle_message(msg) + + # 6. Assertions + # Check parent + assert str(parent_goal_id) in agent._goal_map + assert agent._goal_map[str(parent_goal_id)] == "parent_goal" + + # Check child (This confirms the recursion worked) + assert str(child_goal_id) in agent._goal_map + assert agent._goal_map[str(child_goal_id)] == "child_goal" + assert agent._goal_reverse_map["child_goal"] == str(child_goal_id) From 1e7c2ba229d008ee390d5aa6aa7688635a6658a2 Mon Sep 17 00:00:00 2001 From: Pim Hutting Date: Mon, 19 Jan 2026 16:01:59 +0100 Subject: [PATCH 6/7] chore: added missing tests --- .../user_interrupt/test_user_interrupt.py | 189 ++++++++++++++++++ 1 file changed, 189 insertions(+) diff --git a/test/unit/agents/user_interrupt/test_user_interrupt.py b/test/unit/agents/user_interrupt/test_user_interrupt.py index 9f325f3..3786f8d 100644 --- a/test/unit/agents/user_interrupt/test_user_interrupt.py +++ b/test/unit/agents/user_interrupt/test_user_interrupt.py @@ -577,3 +577,192 @@ async def test_create_mapping_recursive_goals(agent): assert str(child_goal_id) in agent._goal_map assert agent._goal_map[str(child_goal_id)] == "child_goal" assert agent._goal_reverse_map["child_goal"] == str(child_goal_id) + + +@pytest.mark.asyncio +async def test_setup(agent): + """Test the setup method initializes sockets correctly.""" + with patch("control_backend.agents.user_interrupt.user_interrupt_agent.Context") as MockContext: + mock_ctx_instance = MagicMock() + MockContext.instance.return_value = mock_ctx_instance + + mock_sub = MagicMock() + mock_pub = MagicMock() + mock_ctx_instance.socket.side_effect = [mock_sub, mock_pub] + + # MOCK add_behavior so we don't rely on internal attributes + agent.add_behavior = MagicMock() + + await agent.setup() + + # Check sockets + mock_sub.connect.assert_called_with(settings.zmq_settings.internal_sub_address) + mock_pub.connect.assert_called_with(settings.zmq_settings.internal_pub_address) + + # Verify add_behavior was called + agent.add_behavior.assert_called_once() + + +@pytest.mark.asyncio +async def test_receive_loop_advanced_scenarios(agent): + """ + Covers: + - JSONDecodeError (lines 86-88) + - Override: Trigger found (lines 108-109) + - Override: Norm found (lines 114-115) + - Override: Nothing found (line 134) + - Override Unachieve: Success & Fail (lines 136-145) + - Pause: Context true/false logs (lines 150-157) + - Next Phase (line 160) + """ + # 1. Setup Data Maps + agent._trigger_map["101"] = "trigger_slug" + agent._cond_norm_map["202"] = "norm_slug" + + # 2. Define Payloads + # A. Invalid JSON + bad_json = b"INVALID{JSON" + + # B. Override -> Trigger + override_trigger = json.dumps({"type": "override", "context": "101"}).encode() + + # C. Override -> Norm + override_norm = json.dumps({"type": "override", "context": "202"}).encode() + + # D. Override -> Unknown + override_fail = json.dumps({"type": "override", "context": "999"}).encode() + + # E. Unachieve -> Success + unachieve_success = json.dumps({"type": "override_unachieve", "context": "202"}).encode() + + # F. Unachieve -> Fail + unachieve_fail = json.dumps({"type": "override_unachieve", "context": "999"}).encode() + + # G. Pause (True) + pause_true = json.dumps({"type": "pause", "context": "true"}).encode() + + # H. Pause (False/Resume) + pause_false = json.dumps({"type": "pause", "context": ""}).encode() + + # I. Next Phase + next_phase = json.dumps({"type": "next_phase", "context": ""}).encode() + + # 3. Setup Socket + agent.sub_socket.recv_multipart.side_effect = [ + (b"topic", bad_json), + (b"topic", override_trigger), + (b"topic", override_norm), + (b"topic", override_fail), + (b"topic", unachieve_success), + (b"topic", unachieve_fail), + (b"topic", pause_true), + (b"topic", pause_false), + (b"topic", next_phase), + asyncio.CancelledError, # End loop + ] + + # Mock internal helpers to verify calls + agent._send_to_bdi = AsyncMock() + agent._send_to_bdi_belief = AsyncMock() + agent._send_pause_command = AsyncMock() + agent._send_experiment_control_to_bdi_core = AsyncMock() + + # 4. Run Loop + try: + await agent._receive_button_event() + except asyncio.CancelledError: + pass + + # 5. Assertions + + # JSON Error + agent.logger.error.assert_called_with("Received invalid JSON payload on topic %s", b"topic") + + # Override Trigger + agent._send_to_bdi.assert_awaited_with("force_trigger", "trigger_slug") + + # Override Norm + # We expect _send_to_bdi_belief to be called for the norm + # Note: The loop calls _send_to_bdi_belief(asl_cond_norm, "cond_norm") + agent._send_to_bdi_belief.assert_any_call("norm_slug", "cond_norm") + + # Override Fail (Warning log) + agent.logger.warning.assert_any_call("Could not determine which element to override.") + + # Unachieve Success + # Loop calls _send_to_bdi_belief(asl_cond_norm, "cond_norm", True) + agent._send_to_bdi_belief.assert_any_call("norm_slug", "cond_norm", True) + + # Unachieve Fail + agent.logger.warning.assert_any_call("Could not determine which conditional norm to unachieve.") + + # Pause Logic + agent._send_pause_command.assert_any_call("true") + agent.logger.info.assert_any_call("Sent pause command.") + + # Resume Logic + agent._send_pause_command.assert_any_call("") + agent.logger.info.assert_any_call("Sent resume command.") + + # Next Phase + agent._send_experiment_control_to_bdi_core.assert_awaited_with("next_phase") + + +@pytest.mark.asyncio +async def test_handle_message_unknown_thread(agent): + """Test handling of an unknown message thread (lines 213-214).""" + msg = InternalMessage(to="me", thread="unknown_thread", body="test") + await agent.handle_message(msg) + + agent.logger.debug.assert_called_with( + "Received internal message on unhandled thread: unknown_thread" + ) + + +@pytest.mark.asyncio +async def test_send_to_bdi_belief_edge_cases(agent): + """ + Covers: + - Unknown asl_type warning (lines 326-328) + - unachieve=True logic (lines 334-337) + """ + # 1. Unknown Type + await agent._send_to_bdi_belief("slug", "unknown_type") + + agent.logger.warning.assert_called_with("Tried to send belief with unknown type") + agent.send.assert_not_called() + + # Reset mock for part 2 + agent.send.reset_mock() + + # 2. Unachieve = True + await agent._send_to_bdi_belief("slug", "cond_norm", unachieve=True) + + agent.send.assert_awaited() + sent_msg = agent.send.call_args.args[0] + + # Verify it is a delete operation + body_obj = BeliefMessage.model_validate_json(sent_msg.body) + + # Verify 'delete' has content + assert body_obj.delete is not None + assert len(body_obj.delete) == 1 + assert body_obj.delete[0].name == "force_slug" + + # Verify 'create' is empty (handling both None and []) + assert not body_obj.create + + +@pytest.mark.asyncio +async def test_send_experiment_control_unknown(agent): + """Test sending an unknown experiment control type (lines 366-367).""" + await agent._send_experiment_control_to_bdi_core("invalid_command") + + agent.logger.warning.assert_called_with( + "Received unknown experiment control type '%s' to send to BDI Core.", "invalid_command" + ) + + # Ensure it still sends an empty message (as per code logic, though thread is empty) + agent.send.assert_awaited() + msg = agent.send.call_args[0][0] + assert msg.thread == "" From bc9045c977d5b4a69e1a4fe1de5af9f7a3555c31 Mon Sep 17 00:00:00 2001 From: Pim Hutting Date: Tue, 27 Jan 2026 17:03:36 +0100 Subject: [PATCH 7/7] chore: applied feedback --- .../agents/user_interrupt/user_interrupt_agent.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py index 2046564..25f24af 100644 --- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -252,11 +252,9 @@ class UserInterruptAgent(BaseAgent): self._goal_map[str(goal.id)] = slug self._goal_reverse_map[slug] = str(goal.id) - # Recursively check steps for subgoals - if goal.plan and goal.plan.steps: - for step in goal.plan.steps: - if isinstance(step, Goal): - _register_goal(step) + for step in goal.plan.steps: + if isinstance(step, Goal): + _register_goal(step) for phase in program.phases: for trigger in phase.triggers: