From 39e1bb1ead28c7acfa870b53fa3bbf1725400f2a Mon Sep 17 00:00:00 2001 From: Kasper Marinus Date: Wed, 14 Jan 2026 15:28:29 +0100 Subject: [PATCH] fix: sync issues ref: N25B-447 --- .../agents/bdi/agentspeak_generator.py | 20 +++++++++++++++++-- .../agents/bdi/bdi_core_agent.py | 20 ++++++++----------- .../agents/bdi/bdi_program_manager.py | 9 +++++++++ .../user_interrupt/user_interrupt_agent.py | 9 +++++---- src/control_backend/core/agent_system.py | 11 ++++++---- 5 files changed, 47 insertions(+), 22 deletions(-) diff --git a/src/control_backend/agents/bdi/agentspeak_generator.py b/src/control_backend/agents/bdi/agentspeak_generator.py index 11bb2c8..ed6f787 100644 --- a/src/control_backend/agents/bdi/agentspeak_generator.py +++ b/src/control_backend/agents/bdi/agentspeak_generator.py @@ -50,6 +50,8 @@ class AgentSpeakGenerator: else: self._asp.rules.append(AstRule(AstLiteral("phase", [AstString("end")]))) + self._asp.rules.append(AstRule(AstLiteral("!notify_cycle"))) + self._add_keyword_inference() self._add_default_plans() @@ -147,8 +149,18 @@ class AgentSpeakGenerator: AstLiteral("notify_cycle"), [], [ - AstStatement(StatementType.DO_ACTION, AstLiteral("notify_ui")), - AstStatement(StatementType.DO_ACTION, AstLiteral("wait", [AstNumber(1)])), + AstStatement( + StatementType.DO_ACTION, + AstLiteral( + "findall", + [AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")], + ), + ), + AstStatement( + StatementType.DO_ACTION, AstLiteral("notify_norms", [AstVar("Norms")]) + ), + AstStatement(StatementType.DO_ACTION, AstLiteral("wait", [AstNumber(100)])), + AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("notify_cycle")), ], ) ) @@ -365,6 +377,10 @@ class AgentSpeakGenerator: if isinstance(step, Goal): step.can_fail = False # triggers are continuous sequence subgoals.append(step) + + # Arbitrary wait for UI to display nicely + body.append(AstStatement(StatementType.DO_ACTION, AstLiteral("wait", [AstNumber(2000)]))) + body.append( AstStatement( StatementType.DO_ACTION, diff --git a/src/control_backend/agents/bdi/bdi_core_agent.py b/src/control_backend/agents/bdi/bdi_core_agent.py index 8eb4d23..0c217dc 100644 --- a/src/control_backend/agents/bdi/bdi_core_agent.py +++ b/src/control_backend/agents/bdi/bdi_core_agent.py @@ -107,7 +107,6 @@ class BDICoreAgent(BaseAgent): if not maybe_more_work: deadline = self.bdi_agent.shortest_deadline() if deadline: - self.logger.debug("Sleeping until %s", deadline) await asyncio.sleep(deadline - time.time()) maybe_more_work = True else: @@ -335,14 +334,6 @@ class BDICoreAgent(BaseAgent): message_text = agentspeak.grounded(term.args[0], intention.scope) norms = agentspeak.grounded(term.args[1], intention.scope) - norm_update_message = InternalMessage( - to=settings.agent_settings.user_interrupt_name, - thread="active_norms_update", - body=str(norms), - ) - - self.add_behavior(self.send(norm_update_message)) - self.add_behavior(self._send_to_llm(str(message_text), str(norms), "")) yield @@ -355,14 +346,20 @@ class BDICoreAgent(BaseAgent): message_text = agentspeak.grounded(term.args[0], intention.scope) norms = agentspeak.grounded(term.args[1], intention.scope) goal = agentspeak.grounded(term.args[2], intention.scope) + self.add_behavior(self._send_to_llm(str(message_text), str(norms), str(goal))) + yield + + @self.actions.add(".notify_norms", 1) + def _notify_norms(agent, term, intention): + norms = agentspeak.grounded(term.args[0], intention.scope) + norm_update_message = InternalMessage( to=settings.agent_settings.user_interrupt_name, thread="active_norms_update", body=str(norms), ) - self.add_behavior(self.send(norm_update_message)) - self.add_behavior(self._send_to_llm(str(message_text), str(norms), str(goal))) + self.add_behavior(self.send(norm_update_message, should_log=False)) yield @self.actions.add(".say", 1) @@ -473,7 +470,6 @@ class BDICoreAgent(BaseAgent): body=str(trigger_name), ) - # TODO: check with Pim self.add_behavior(self.send(msg)) yield diff --git a/src/control_backend/agents/bdi/bdi_program_manager.py b/src/control_backend/agents/bdi/bdi_program_manager.py index 75ea757..730c8e5 100644 --- a/src/control_backend/agents/bdi/bdi_program_manager.py +++ b/src/control_backend/agents/bdi/bdi_program_manager.py @@ -97,6 +97,15 @@ class BDIProgramManager(BaseAgent): if new == "end": self._phase = None + # Notify user interaction agent + msg = InternalMessage( + to=settings.agent_settings.user_interrupt_name, + thread="transition_phase", + body="end", + ) + self.logger.info("Transitioned to end phase, notifying UserInterruptAgent.") + + self.add_behavior(self.send(msg)) return for phase in self._program.phases: diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py index 4f12b34..deddbba 100644 --- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -214,8 +214,8 @@ class UserInterruptAgent(BaseAgent): payload = {"type": "cond_norms_state_update", "norms": updates} - await self._send_experiment_update(payload) - self.logger.debug(f"Broadcasted state for {len(updates)} conditional norms.") + await self._send_experiment_update(payload, should_log=False) + # self.logger.debug(f"Broadcasted state for {len(updates)} conditional norms.") def _create_mapping(self, program_json: str): """ @@ -259,7 +259,7 @@ class UserInterruptAgent(BaseAgent): except Exception as e: self.logger.error(f"Mapping failed: {e}") - async def _send_experiment_update(self, data): + async def _send_experiment_update(self, data, should_log: bool = True): """ Sends an update to the 'experiment' topic. The SSE endpoint will pick this up and push it to the UI. @@ -268,7 +268,8 @@ class UserInterruptAgent(BaseAgent): topic = b"experiment" body = json.dumps(data).encode("utf-8") await self.pub_socket.send_multipart([topic, body]) - self.logger.debug(f"Sent experiment update: {data}") + if should_log: + self.logger.debug(f"Sent experiment update: {data}") async def _send_to_speech_agent(self, text_to_say: str): """ diff --git a/src/control_backend/core/agent_system.py b/src/control_backend/core/agent_system.py index 2d8492a..e3c8dc4 100644 --- a/src/control_backend/core/agent_system.py +++ b/src/control_backend/core/agent_system.py @@ -120,7 +120,7 @@ class BaseAgent(ABC): task.cancel() self.logger.info(f"Agent {self.name} stopped") - async def send(self, message: InternalMessage): + async def send(self, message: InternalMessage, should_log: bool = True): """ Send a message to another agent. @@ -142,13 +142,17 @@ class BaseAgent(ABC): if target: await target.inbox.put(message) - self.logger.debug(f"Sent message {message.body} to {message.to} via regular inbox.") + if should_log: + self.logger.debug( + f"Sent message {message.body} to {message.to} via regular inbox." + ) else: # Apparently target agent is on a different process, send via ZMQ topic = f"internal/{receiver}".encode() body = message.model_dump_json().encode() await self._internal_pub_socket.send_multipart([topic, body]) - self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.") + if should_log: + self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.") async def _process_inbox(self): """ @@ -158,7 +162,6 @@ class BaseAgent(ABC): """ while self._running: msg = await self.inbox.get() - self.logger.debug(f"Received message from {msg.sender}.") await self.handle_message(msg) async def _receive_internal_zmq_loop(self):