From 6d03ba8a4153015cda8c8ec4ba1372233c7084af Mon Sep 17 00:00:00 2001
From: Pim Hutting
Date: Fri, 16 Jan 2026 14:28:27 +0100
Subject: [PATCH] feat: added extra endpoint for norm pings
also made sure that you cannot skip phase on end phase
ref: N25B-400
---
.../agents/bdi/agentspeak_generator.py | 10 ++++++
.../user_interrupt/user_interrupt_agent.py | 29 ++++++++++-------
.../api/v1/endpoints/user_interact.py | 31 +++++++++++++++++--
3 files changed, 56 insertions(+), 14 deletions(-)
diff --git a/src/control_backend/agents/bdi/agentspeak_generator.py b/src/control_backend/agents/bdi/agentspeak_generator.py
index ed6f787..524f980 100644
--- a/src/control_backend/agents/bdi/agentspeak_generator.py
+++ b/src/control_backend/agents/bdi/agentspeak_generator.py
@@ -424,6 +424,16 @@ class AgentSpeakGenerator:
)
)
+ # Force phase transition fallback
+ self._asp.plans.append(
+ AstPlan(
+ TriggerType.ADDED_GOAL,
+ AstLiteral("force_transition_phase"),
+ [],
+ [AstStatement(StatementType.EMPTY, AstLiteral("true"))],
+ )
+ )
+
@singledispatchmethod
def _astify(self, element: ProgramElement) -> AstExpression:
raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.")
diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py
index 0bde563..9ba8409 100644
--- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py
+++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py
@@ -117,13 +117,13 @@ class UserInterruptAgent(BaseAgent):
event_context,
)
elif asl_cond_norm := self._cond_norm_map.get(ui_id):
- await self._send_to_bdi_belief(asl_cond_norm)
+ await self._send_to_bdi_belief(asl_cond_norm, "cond_norm")
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
event_context,
)
elif asl_goal := self._goal_map.get(ui_id):
- await self._send_to_bdi_belief(asl_goal)
+ await self._send_to_bdi_belief(asl_goal, "goal")
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
event_context,
@@ -141,7 +141,7 @@ class UserInterruptAgent(BaseAgent):
case "override_unachieve":
ui_id = str(event_context)
if asl_cond_norm := self._cond_norm_map.get(ui_id):
- await self._send_to_bdi_belief(asl_cond_norm, True)
+ await self._send_to_bdi_belief(asl_cond_norm, "cond_norm", True)
self.logger.info(
"Forwarded button press (override_unachieve)"
"with context '%s' to BDI Core.",
@@ -187,11 +187,9 @@ class UserInterruptAgent(BaseAgent):
payload = {"type": "trigger_update", "id": ui_id, "achieved": True}
await self._send_experiment_update(payload)
self.logger.info(f"UI Update: Trigger {asl_slug} started (ID: {ui_id})")
-
case "trigger_end":
asl_slug = msg.body
ui_id = self._trigger_reverse_map.get(asl_slug)
-
if ui_id:
payload = {"type": "trigger_update", "id": ui_id, "achieved": False}
await self._send_experiment_update(payload)
@@ -207,7 +205,7 @@ class UserInterruptAgent(BaseAgent):
goal_name = msg.body
ui_id = self._goal_reverse_map.get(goal_name)
if ui_id:
- payload = {"type": "goal_update", "id": ui_id, "active": True}
+ payload = {"type": "goal_update", "id": ui_id}
await self._send_experiment_update(payload)
self.logger.info(f"UI Update: Goal {goal_name} started (ID: {ui_id})")
case "active_norms_update":
@@ -224,15 +222,17 @@ class UserInterruptAgent(BaseAgent):
:param active_slugs: A list of slugs (strings) currently active in the BDI core.
"""
updates = []
-
for asl_slug, ui_id in self._cond_norm_reverse_map.items():
is_active = asl_slug in active_slugs
- updates.append({"id": ui_id, "name": asl_slug, "active": is_active})
+ updates.append({"id": ui_id, "active": is_active})
payload = {"type": "cond_norms_state_update", "norms": updates}
- await self._send_experiment_update(payload, should_log=False)
- # self.logger.debug(f"Broadcasted state for {len(updates)} conditional norms.")
+ if self.pub_socket:
+ topic = b"status"
+ body = json.dumps(payload).encode("utf-8")
+ await self.pub_socket.send_multipart([topic, body])
+ # self.logger.info(f"UI Update: Active norms {updates}")
def _create_mapping(self, program_json: str):
"""
@@ -325,9 +325,14 @@ class UserInterruptAgent(BaseAgent):
await self.send(msg)
self.logger.info(f"Directly forced {thread} in BDI: {body}")
- async def _send_to_bdi_belief(self, asl_goal: str, unachieve: bool = False):
+ async def _send_to_bdi_belief(self, asl: str, asl_type: str, unachieve: bool = False):
"""Send belief to BDI Core"""
- belief_name = f"achieved_{asl_goal}"
+ if asl_type == "goal":
+ belief_name = f"achieved_{asl}"
+ elif asl_type == "cond_norm":
+ belief_name = f"force_{asl}"
+ else:
+ self.logger.warning("Tried to send belief with unknown type")
belief = Belief(name=belief_name, arguments=None)
self.logger.debug(f"Sending belief to BDI Core: {belief_name}")
# Conditional norms are unachieved by removing the belief
diff --git a/src/control_backend/api/v1/endpoints/user_interact.py b/src/control_backend/api/v1/endpoints/user_interact.py
index 3d3406e..eb70f35 100644
--- a/src/control_backend/api/v1/endpoints/user_interact.py
+++ b/src/control_backend/api/v1/endpoints/user_interact.py
@@ -52,11 +52,11 @@ async def experiment_stream(request: Request):
while True:
# Check if client closed the tab
if await request.is_disconnected():
- logger.info("Client disconnected from experiment stream.")
+ logger.error("Client disconnected from experiment stream.")
break
try:
- parts = await asyncio.wait_for(socket.recv_multipart(), timeout=1.0)
+ parts = await asyncio.wait_for(socket.recv_multipart(), timeout=10.0)
_, message = parts
yield f"data: {message.decode().strip()}\n\n"
except TimeoutError:
@@ -65,3 +65,30 @@ async def experiment_stream(request: Request):
socket.close()
return StreamingResponse(gen(), media_type="text/event-stream")
+
+
+@router.get("/status_stream")
+async def status_stream(request: Request):
+ context = Context.instance()
+ socket = context.socket(zmq.SUB)
+ socket.connect(settings.zmq_settings.internal_sub_address)
+
+ socket.subscribe(b"status")
+
+ async def gen():
+ try:
+ while True:
+ if await request.is_disconnected():
+ break
+ try:
+ # Shorter timeout since this is frequent
+ parts = await asyncio.wait_for(socket.recv_multipart(), timeout=0.5)
+ _, message = parts
+ yield f"data: {message.decode().strip()}\n\n"
+ except TimeoutError:
+ yield ": ping\n\n" # Keep the connection alive
+ continue
+ finally:
+ socket.close()
+
+ return StreamingResponse(gen(), media_type="text/event-stream")