diff --git a/src/control_backend/agents/bdi/agentspeak_generator.py b/src/control_backend/agents/bdi/agentspeak_generator.py index ed6f787..524f980 100644 --- a/src/control_backend/agents/bdi/agentspeak_generator.py +++ b/src/control_backend/agents/bdi/agentspeak_generator.py @@ -424,6 +424,16 @@ class AgentSpeakGenerator: ) ) + # Force phase transition fallback + self._asp.plans.append( + AstPlan( + TriggerType.ADDED_GOAL, + AstLiteral("force_transition_phase"), + [], + [AstStatement(StatementType.EMPTY, AstLiteral("true"))], + ) + ) + @singledispatchmethod def _astify(self, element: ProgramElement) -> AstExpression: raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.") diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py index 0bde563..9ba8409 100644 --- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -117,13 +117,13 @@ class UserInterruptAgent(BaseAgent): event_context, ) elif asl_cond_norm := self._cond_norm_map.get(ui_id): - await self._send_to_bdi_belief(asl_cond_norm) + await self._send_to_bdi_belief(asl_cond_norm, "cond_norm") self.logger.info( "Forwarded button press (override) with context '%s' to BDI Core.", event_context, ) elif asl_goal := self._goal_map.get(ui_id): - await self._send_to_bdi_belief(asl_goal) + await self._send_to_bdi_belief(asl_goal, "goal") self.logger.info( "Forwarded button press (override) with context '%s' to BDI Core.", event_context, @@ -141,7 +141,7 @@ class UserInterruptAgent(BaseAgent): case "override_unachieve": ui_id = str(event_context) if asl_cond_norm := self._cond_norm_map.get(ui_id): - await self._send_to_bdi_belief(asl_cond_norm, True) + await self._send_to_bdi_belief(asl_cond_norm, "cond_norm", True) self.logger.info( "Forwarded button press (override_unachieve)" "with context '%s' to BDI Core.", @@ -187,11 +187,9 @@ class UserInterruptAgent(BaseAgent): payload = {"type": "trigger_update", "id": ui_id, "achieved": True} await self._send_experiment_update(payload) self.logger.info(f"UI Update: Trigger {asl_slug} started (ID: {ui_id})") - case "trigger_end": asl_slug = msg.body ui_id = self._trigger_reverse_map.get(asl_slug) - if ui_id: payload = {"type": "trigger_update", "id": ui_id, "achieved": False} await self._send_experiment_update(payload) @@ -207,7 +205,7 @@ class UserInterruptAgent(BaseAgent): goal_name = msg.body ui_id = self._goal_reverse_map.get(goal_name) if ui_id: - payload = {"type": "goal_update", "id": ui_id, "active": True} + payload = {"type": "goal_update", "id": ui_id} await self._send_experiment_update(payload) self.logger.info(f"UI Update: Goal {goal_name} started (ID: {ui_id})") case "active_norms_update": @@ -224,15 +222,17 @@ class UserInterruptAgent(BaseAgent): :param active_slugs: A list of slugs (strings) currently active in the BDI core. """ updates = [] - for asl_slug, ui_id in self._cond_norm_reverse_map.items(): is_active = asl_slug in active_slugs - updates.append({"id": ui_id, "name": asl_slug, "active": is_active}) + updates.append({"id": ui_id, "active": is_active}) payload = {"type": "cond_norms_state_update", "norms": updates} - await self._send_experiment_update(payload, should_log=False) - # self.logger.debug(f"Broadcasted state for {len(updates)} conditional norms.") + if self.pub_socket: + topic = b"status" + body = json.dumps(payload).encode("utf-8") + await self.pub_socket.send_multipart([topic, body]) + # self.logger.info(f"UI Update: Active norms {updates}") def _create_mapping(self, program_json: str): """ @@ -325,9 +325,14 @@ class UserInterruptAgent(BaseAgent): await self.send(msg) self.logger.info(f"Directly forced {thread} in BDI: {body}") - async def _send_to_bdi_belief(self, asl_goal: str, unachieve: bool = False): + async def _send_to_bdi_belief(self, asl: str, asl_type: str, unachieve: bool = False): """Send belief to BDI Core""" - belief_name = f"achieved_{asl_goal}" + if asl_type == "goal": + belief_name = f"achieved_{asl}" + elif asl_type == "cond_norm": + belief_name = f"force_{asl}" + else: + self.logger.warning("Tried to send belief with unknown type") belief = Belief(name=belief_name, arguments=None) self.logger.debug(f"Sending belief to BDI Core: {belief_name}") # Conditional norms are unachieved by removing the belief diff --git a/src/control_backend/api/v1/endpoints/user_interact.py b/src/control_backend/api/v1/endpoints/user_interact.py index 3d3406e..eb70f35 100644 --- a/src/control_backend/api/v1/endpoints/user_interact.py +++ b/src/control_backend/api/v1/endpoints/user_interact.py @@ -52,11 +52,11 @@ async def experiment_stream(request: Request): while True: # Check if client closed the tab if await request.is_disconnected(): - logger.info("Client disconnected from experiment stream.") + logger.error("Client disconnected from experiment stream.") break try: - parts = await asyncio.wait_for(socket.recv_multipart(), timeout=1.0) + parts = await asyncio.wait_for(socket.recv_multipart(), timeout=10.0) _, message = parts yield f"data: {message.decode().strip()}\n\n" except TimeoutError: @@ -65,3 +65,30 @@ async def experiment_stream(request: Request): socket.close() return StreamingResponse(gen(), media_type="text/event-stream") + + +@router.get("/status_stream") +async def status_stream(request: Request): + context = Context.instance() + socket = context.socket(zmq.SUB) + socket.connect(settings.zmq_settings.internal_sub_address) + + socket.subscribe(b"status") + + async def gen(): + try: + while True: + if await request.is_disconnected(): + break + try: + # Shorter timeout since this is frequent + parts = await asyncio.wait_for(socket.recv_multipart(), timeout=0.5) + _, message = parts + yield f"data: {message.decode().strip()}\n\n" + except TimeoutError: + yield ": ping\n\n" # Keep the connection alive + continue + finally: + socket.close() + + return StreamingResponse(gen(), media_type="text/event-stream")