From be6bbbb849a8dd4dd6ca02af7c298b38183d9b21 Mon Sep 17 00:00:00 2001 From: Pim Hutting Date: Wed, 7 Jan 2026 17:42:54 +0100 Subject: [PATCH] feat: added endpoint userinterrupt to userinterrupt ref: N25B-400 --- src/control_backend/agents/bdi/agentspeak.asl | 45 +++++++++++++ .../user_interrupt/user_interrupt_agent.py | 63 ++++++++++++----- .../api/v1/endpoints/button_pressed.py | 31 --------- .../api/v1/endpoints/user_interact.py | 67 +++++++++++++++++++ src/control_backend/api/v1/router.py | 4 +- 5 files changed, 162 insertions(+), 48 deletions(-) create mode 100644 src/control_backend/agents/bdi/agentspeak.asl delete mode 100644 src/control_backend/api/v1/endpoints/button_pressed.py create mode 100644 src/control_backend/api/v1/endpoints/user_interact.py diff --git a/src/control_backend/agents/bdi/agentspeak.asl b/src/control_backend/agents/bdi/agentspeak.asl new file mode 100644 index 0000000..7f71fbd --- /dev/null +++ b/src/control_backend/agents/bdi/agentspeak.asl @@ -0,0 +1,45 @@ +phase("9922935f-ec70-4792-9a61-37a129e1ec14"). +keyword_said(Keyword) :- (user_said(Message) & .substring(Keyword, Message, Pos)) & (Pos >= 0). + + ++!reply_with_goal(Goal) + : user_said(Message) + <- +responded_this_turn; + .findall(Norm, norm(Norm), Norms); + .reply_with_goal(Message, Norms, Goal). + ++!say(Text) + <- +responded_this_turn; + .say(Text). + ++!reply + : user_said(Message) + <- +responded_this_turn; + .findall(Norm, norm(Norm), Norms); + .reply(Message, Norms). + ++user_said(Message) + : phase("9922935f-ec70-4792-9a61-37a129e1ec14") + <- .notify_user_said(Message); + -responded_this_turn; + !check_triggers; + !transition_phase. + ++!transition_phase + : phase("9922935f-ec70-4792-9a61-37a129e1ec14") & + not responded_this_turn + <- -phase("9922935f-ec70-4792-9a61-37a129e1ec14"); + +phase("end"); + ?user_said(Message); + -+user_said(Message); + .notify_transition_phase("9922935f-ec70-4792-9a61-37a129e1ec14", "end"). + ++user_said(Message) + : phase("end") + <- !reply. + ++!check_triggers + <- true. + ++!transition_phase + <- true. diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py index b2efc41..af00a7b 100644 --- a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -1,3 +1,4 @@ +import asyncio import json import zmq @@ -30,6 +31,26 @@ class UserInterruptAgent(BaseAgent): def __init__(self, **kwargs): super().__init__(**kwargs) self.sub_socket = None + self.pub_socket = None + + async def setup(self): + """ + Initialize the agent. + + Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic. + Starts the background behavior to receive the user interrupts. + """ + context = Context.instance() + + self.sub_socket = context.socket(zmq.SUB) + self.sub_socket.connect(settings.zmq_settings.internal_sub_address) + self.sub_socket.subscribe("button_pressed") + + self.pub_socket = context.socket(zmq.PUB) + self.pub_socket.connect(settings.zmq_settings.internal_pub_address) + + self.add_behavior(self._receive_button_event()) + self.add_behavior(self.test_sending_behaviour()) async def _receive_button_event(self): """ @@ -78,6 +99,33 @@ class UserInterruptAgent(BaseAgent): event_context, ) + async def test_sending_behaviour(self): + self.logger.info("Starting simple test sending behaviour...") + + while True: + try: + test_data = {"type": "heartbeat", "status": "ok"} + + await self._send_experiment_update(test_data) + + except zmq.ZMQError as ze: + self.logger.error(f"ZMQ error: {ze}") + except Exception as e: + self.logger.error(f"Error: {e}") + + await asyncio.sleep(2) + + async def _send_experiment_update(self, data): + """ + Sends an update to the 'experiment' topic. + The SSE endpoint will pick this up and push it to the UI. + """ + if self.pub_socket: + topic = b"experiment" + body = json.dumps(data).encode("utf-8") + await self.pub_socket.send_multipart([topic, body]) + self.logger.debug(f"Sent experiment update: {data}") + async def _send_to_speech_agent(self, text_to_say: str): """ method to send prioritized speech command to RobotSpeechAgent. @@ -129,18 +177,3 @@ class UserInterruptAgent(BaseAgent): "Sent button_override belief with id '%s' to Program manager.", belief_id, ) - - async def setup(self): - """ - Initialize the agent. - - Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic. - Starts the background behavior to receive the user interrupts. - """ - context = Context.instance() - - self.sub_socket = context.socket(zmq.SUB) - self.sub_socket.connect(settings.zmq_settings.internal_sub_address) - self.sub_socket.subscribe("button_pressed") - - self.add_behavior(self._receive_button_event()) diff --git a/src/control_backend/api/v1/endpoints/button_pressed.py b/src/control_backend/api/v1/endpoints/button_pressed.py deleted file mode 100644 index 5a94a53..0000000 --- a/src/control_backend/api/v1/endpoints/button_pressed.py +++ /dev/null @@ -1,31 +0,0 @@ -import logging - -from fastapi import APIRouter, Request - -from control_backend.schemas.events import ButtonPressedEvent - -logger = logging.getLogger(__name__) -router = APIRouter() - - -@router.post("/button_pressed", status_code=202) -async def receive_button_event(event: ButtonPressedEvent, request: Request): - """ - Endpoint to handle external button press events. - - Validates the event payload and publishes it to the internal 'button_pressed' topic. - Subscribers (in this case user_interrupt_agent) will pick this up to trigger - specific behaviors or state changes. - - :param event: The parsed ButtonPressedEvent object. - :param request: The FastAPI request object. - """ - logger.debug("Received button event: %s | %s", event.type, event.context) - - topic = b"button_pressed" - body = event.model_dump_json().encode() - - pub_socket = request.app.state.endpoints_pub_socket - await pub_socket.send_multipart([topic, body]) - - return {"status": "Event received"} diff --git a/src/control_backend/api/v1/endpoints/user_interact.py b/src/control_backend/api/v1/endpoints/user_interact.py new file mode 100644 index 0000000..3d3406e --- /dev/null +++ b/src/control_backend/api/v1/endpoints/user_interact.py @@ -0,0 +1,67 @@ +import asyncio +import logging + +import zmq +import zmq.asyncio +from fastapi import APIRouter, Request +from fastapi.responses import StreamingResponse +from zmq.asyncio import Context + +from control_backend.core.config import settings +from control_backend.schemas.events import ButtonPressedEvent + +logger = logging.getLogger(__name__) +router = APIRouter() + + +@router.post("/button_pressed", status_code=202) +async def receive_button_event(event: ButtonPressedEvent, request: Request): + """ + Endpoint to handle external button press events. + + Validates the event payload and publishes it to the internal 'button_pressed' topic. + Subscribers (in this case user_interrupt_agent) will pick this up to trigger + specific behaviors or state changes. + + :param event: The parsed ButtonPressedEvent object. + :param request: The FastAPI request object. + """ + logger.debug("Received button event: %s | %s", event.type, event.context) + + topic = b"button_pressed" + body = event.model_dump_json().encode() + + pub_socket = request.app.state.endpoints_pub_socket + await pub_socket.send_multipart([topic, body]) + + return {"status": "Event received"} + + +@router.get("/experiment_stream") +async def experiment_stream(request: Request): + # Use the asyncio-compatible context + context = Context.instance() + socket = context.socket(zmq.SUB) + + # Connect and subscribe + socket.connect(settings.zmq_settings.internal_sub_address) + socket.subscribe(b"experiment") + + async def gen(): + try: + while True: + # Check if client closed the tab + if await request.is_disconnected(): + logger.info("Client disconnected from experiment stream.") + break + + try: + parts = await asyncio.wait_for(socket.recv_multipart(), timeout=1.0) + _, message = parts + yield f"data: {message.decode().strip()}\n\n" + except TimeoutError: + continue + finally: + socket.close() + + return StreamingResponse(gen(), media_type="text/event-stream") diff --git a/src/control_backend/api/v1/router.py b/src/control_backend/api/v1/router.py index ebba0db..c130ad3 100644 --- a/src/control_backend/api/v1/router.py +++ b/src/control_backend/api/v1/router.py @@ -1,6 +1,6 @@ from fastapi.routing import APIRouter -from control_backend.api.v1.endpoints import button_pressed, logs, message, program, robot, sse +from control_backend.api.v1.endpoints import logs, message, program, robot, sse, user_interact api_router = APIRouter() @@ -14,4 +14,4 @@ api_router.include_router(logs.router, tags=["Logs"]) api_router.include_router(program.router, tags=["Program"]) -api_router.include_router(button_pressed.router, tags=["Button Pressed Events"]) +api_router.include_router(user_interact.router, tags=["Button Pressed Events"])