diff --git a/.gitlab/merge_request_templates/default.md b/.gitlab/merge_request_templates/default.md new file mode 100644 index 0000000..7a76ac5 --- /dev/null +++ b/.gitlab/merge_request_templates/default.md @@ -0,0 +1,9 @@ +%{first_multiline_commit_description} + +To verify: + +- [ ] Style checks pass +- [ ] Pipeline (tests) pass +- [ ] Documentation is up to date +- [ ] Tests are up to date (new code is covered) +- [ ] ... diff --git a/src/control_backend/agents/actuation/robot_gesture_agent.py b/src/control_backend/agents/actuation/robot_gesture_agent.py index e641eba..4f5dd79 100644 --- a/src/control_backend/agents/actuation/robot_gesture_agent.py +++ b/src/control_backend/agents/actuation/robot_gesture_agent.py @@ -28,6 +28,7 @@ class RobotGestureAgent(BaseAgent): address = "" bind = False gesture_data = [] + single_gesture_data = [] def __init__( self, @@ -35,8 +36,10 @@ class RobotGestureAgent(BaseAgent): address=settings.zmq_settings.ri_command_address, bind=False, gesture_data=None, + single_gesture_data=None, ): self.gesture_data = gesture_data or [] + self.single_gesture_data = single_gesture_data or [] super().__init__(name) self.address = address self.bind = bind @@ -99,7 +102,13 @@ class RobotGestureAgent(BaseAgent): gesture_command.data, ) return - + elif gesture_command.endpoint == RIEndpoint.GESTURE_SINGLE: + if gesture_command.data not in self.single_gesture_data: + self.logger.warning( + "Received gesture '%s' which is not in available gestures. Early returning", + gesture_command.data, + ) + return await self.pubsocket.send_json(gesture_command.model_dump()) except Exception: self.logger.exception("Error processing internal message.") diff --git a/src/control_backend/agents/communication/ri_communication_agent.py b/src/control_backend/agents/communication/ri_communication_agent.py index a50892c..34e5b25 100644 --- a/src/control_backend/agents/communication/ri_communication_agent.py +++ b/src/control_backend/agents/communication/ri_communication_agent.py @@ -182,6 +182,7 @@ class RICommunicationAgent(BaseAgent): self._req_socket.bind(addr) case "actuation": gesture_data = port_data.get("gestures", []) + single_gesture_data = port_data.get("single_gestures", []) robot_speech_agent = RobotSpeechAgent( settings.agent_settings.robot_speech_name, address=addr, @@ -192,6 +193,7 @@ class RICommunicationAgent(BaseAgent): address=addr, bind=bind, gesture_data=gesture_data, + single_gesture_data=single_gesture_data, ) await robot_speech_agent.start() await asyncio.sleep(0.1) # Small delay diff --git a/src/control_backend/agents/user_interrupt/__init__.py b/src/control_backend/agents/user_interrupt/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/control_backend/agents/user_interrupt/user_interrupt_agent.py b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py new file mode 100644 index 0000000..b2efc41 --- /dev/null +++ b/src/control_backend/agents/user_interrupt/user_interrupt_agent.py @@ -0,0 +1,146 @@ +import json + +import zmq +from zmq.asyncio import Context + +from control_backend.agents import BaseAgent +from control_backend.core.agent_system import InternalMessage +from control_backend.core.config import settings +from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand + + +class UserInterruptAgent(BaseAgent): + """ + User Interrupt Agent. + + This agent receives button_pressed events from the external HTTP API + (via ZMQ) and uses the associated context to trigger one of the following actions: + + - Send a prioritized message to the `RobotSpeechAgent` + - Send a prioritized gesture to the `RobotGestureAgent` + - Send a belief override to the `BDIProgramManager`in order to activate a + trigger/conditional norm or complete a goal. + + Prioritized actions clear the current RI queue before inserting the new item, + ensuring they are executed immediately after Pepper's current action has been fulfilled. + + :ivar sub_socket: The ZMQ SUB socket used to receive user intterupts. + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.sub_socket = None + + async def _receive_button_event(self): + """ + The behaviour of the UserInterruptAgent. + Continuous loop that receives button_pressed events from the button_pressed HTTP endpoint. + These events contain a type and a context. + + These are the different types and contexts: + - type: "speech", context: string that the robot has to say. + - type: "gesture", context: single gesture name that the robot has to perform. + - type: "override", context: belief_id that overrides the goal/trigger/conditional norm. + """ + while True: + topic, body = await self.sub_socket.recv_multipart() + + try: + event_data = json.loads(body) + event_type = event_data.get("type") # e.g., "speech", "gesture" + event_context = event_data.get("context") # e.g., "Hello, I am Pepper!" + except json.JSONDecodeError: + self.logger.error("Received invalid JSON payload on topic %s", topic) + continue + + if event_type == "speech": + await self._send_to_speech_agent(event_context) + self.logger.info( + "Forwarded button press (speech) with context '%s' to RobotSpeechAgent.", + event_context, + ) + elif event_type == "gesture": + await self._send_to_gesture_agent(event_context) + self.logger.info( + "Forwarded button press (gesture) with context '%s' to RobotGestureAgent.", + event_context, + ) + elif event_type == "override": + await self._send_to_program_manager(event_context) + self.logger.info( + "Forwarded button press (override) with context '%s' to BDIProgramManager.", + event_context, + ) + else: + self.logger.warning( + "Received button press with unknown type '%s' (context: '%s').", + event_type, + event_context, + ) + + async def _send_to_speech_agent(self, text_to_say: str): + """ + method to send prioritized speech command to RobotSpeechAgent. + + :param text_to_say: The string that the robot has to say. + """ + cmd = SpeechCommand(data=text_to_say, is_priority=True) + out_msg = InternalMessage( + to=settings.agent_settings.robot_speech_name, + sender=self.name, + body=cmd.model_dump_json(), + ) + await self.send(out_msg) + + async def _send_to_gesture_agent(self, single_gesture_name: str): + """ + method to send prioritized gesture command to RobotGestureAgent. + + :param single_gesture_name: The gesture tag that the robot has to perform. + """ + # the endpoint is set to always be GESTURE_SINGLE for user interrupts + cmd = GestureCommand( + endpoint=RIEndpoint.GESTURE_SINGLE, data=single_gesture_name, is_priority=True + ) + out_msg = InternalMessage( + to=settings.agent_settings.robot_gesture_name, + sender=self.name, + body=cmd.model_dump_json(), + ) + await self.send(out_msg) + + async def _send_to_program_manager(self, belief_id: str): + """ + Send a button_override belief to the BDIProgramManager. + + :param belief_id: The belief_id that overrides the goal/trigger/conditional norm. + this id can belong to a basic belief or an inferred belief. + See also: https://utrechtuniversity.youtrack.cloud/articles/N25B-A-27/UI-components + """ + data = {"belief": belief_id} + message = InternalMessage( + to=settings.agent_settings.bdi_program_manager_name, + sender=self.name, + body=json.dumps(data), + thread="belief_override_id", + ) + await self.send(message) + self.logger.info( + "Sent button_override belief with id '%s' to Program manager.", + belief_id, + ) + + async def setup(self): + """ + Initialize the agent. + + Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic. + Starts the background behavior to receive the user interrupts. + """ + context = Context.instance() + + self.sub_socket = context.socket(zmq.SUB) + self.sub_socket.connect(settings.zmq_settings.internal_sub_address) + self.sub_socket.subscribe("button_pressed") + + self.add_behavior(self._receive_button_event()) diff --git a/src/control_backend/api/v1/endpoints/button_pressed.py b/src/control_backend/api/v1/endpoints/button_pressed.py new file mode 100644 index 0000000..5a94a53 --- /dev/null +++ b/src/control_backend/api/v1/endpoints/button_pressed.py @@ -0,0 +1,31 @@ +import logging + +from fastapi import APIRouter, Request + +from control_backend.schemas.events import ButtonPressedEvent + +logger = logging.getLogger(__name__) +router = APIRouter() + + +@router.post("/button_pressed", status_code=202) +async def receive_button_event(event: ButtonPressedEvent, request: Request): + """ + Endpoint to handle external button press events. + + Validates the event payload and publishes it to the internal 'button_pressed' topic. + Subscribers (in this case user_interrupt_agent) will pick this up to trigger + specific behaviors or state changes. + + :param event: The parsed ButtonPressedEvent object. + :param request: The FastAPI request object. + """ + logger.debug("Received button event: %s | %s", event.type, event.context) + + topic = b"button_pressed" + body = event.model_dump_json().encode() + + pub_socket = request.app.state.endpoints_pub_socket + await pub_socket.send_multipart([topic, body]) + + return {"status": "Event received"} diff --git a/src/control_backend/api/v1/router.py b/src/control_backend/api/v1/router.py index ce5a70b..ebba0db 100644 --- a/src/control_backend/api/v1/router.py +++ b/src/control_backend/api/v1/router.py @@ -1,6 +1,6 @@ from fastapi.routing import APIRouter -from control_backend.api.v1.endpoints import logs, message, program, robot, sse +from control_backend.api.v1.endpoints import button_pressed, logs, message, program, robot, sse api_router = APIRouter() @@ -13,3 +13,5 @@ api_router.include_router(robot.router, prefix="/robot", tags=["Pings", "Command api_router.include_router(logs.router, tags=["Logs"]) api_router.include_router(program.router, tags=["Program"]) + +api_router.include_router(button_pressed.router, tags=["Button Pressed Events"]) diff --git a/src/control_backend/core/config.py b/src/control_backend/core/config.py index 2712d8a..927985b 100644 --- a/src/control_backend/core/config.py +++ b/src/control_backend/core/config.py @@ -48,6 +48,7 @@ class AgentSettings(BaseModel): ri_communication_name: str = "ri_communication_agent" robot_speech_name: str = "robot_speech_agent" robot_gesture_name: str = "robot_gesture_agent" + user_interrupt_name: str = "user_interrupt_agent" class BehaviourSettings(BaseModel): diff --git a/src/control_backend/main.py b/src/control_backend/main.py index 2c8b766..3509cbc 100644 --- a/src/control_backend/main.py +++ b/src/control_backend/main.py @@ -39,6 +39,9 @@ from control_backend.agents.communication import RICommunicationAgent # LLM Agents from control_backend.agents.llm import LLMAgent +# User Interrupt Agent +from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent + # Other backend imports from control_backend.api.v1.router import api_router from control_backend.core.config import settings @@ -138,6 +141,12 @@ async def lifespan(app: FastAPI): "name": settings.agent_settings.bdi_program_manager_name, }, ), + "UserInterruptAgent": ( + UserInterruptAgent, + { + "name": settings.agent_settings.user_interrupt_name, + }, + ), } agents = [] diff --git a/src/control_backend/schemas/events.py b/src/control_backend/schemas/events.py new file mode 100644 index 0000000..46967f7 --- /dev/null +++ b/src/control_backend/schemas/events.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel + + +class ButtonPressedEvent(BaseModel): + type: str + context: str diff --git a/src/control_backend/schemas/ri_message.py b/src/control_backend/schemas/ri_message.py index 3f3abea..a48dec6 100644 --- a/src/control_backend/schemas/ri_message.py +++ b/src/control_backend/schemas/ri_message.py @@ -38,6 +38,7 @@ class SpeechCommand(RIMessage): endpoint: RIEndpoint = RIEndpoint(RIEndpoint.SPEECH) data: str + is_priority: bool = False class GestureCommand(RIMessage): @@ -52,6 +53,7 @@ class GestureCommand(RIMessage): RIEndpoint.GESTURE_SINGLE, RIEndpoint.GESTURE_TAG ] data: str + is_priority: bool = False @model_validator(mode="after") def check_endpoint(self): diff --git a/test/unit/agents/actuation/test_robot_speech_agent.py b/test/unit/agents/actuation/test_robot_speech_agent.py index 3cd8fbf..d95f66a 100644 --- a/test/unit/agents/actuation/test_robot_speech_agent.py +++ b/test/unit/agents/actuation/test_robot_speech_agent.py @@ -64,7 +64,7 @@ async def test_handle_message_sends_command(): agent = mock_speech_agent() agent.pubsocket = pubsocket - payload = {"endpoint": "actuate/speech", "data": "hello"} + payload = {"endpoint": "actuate/speech", "data": "hello", "is_priority": False} msg = InternalMessage(to="robot", sender="tester", body=json.dumps(payload)) await agent.handle_message(msg) @@ -75,7 +75,7 @@ async def test_handle_message_sends_command(): @pytest.mark.asyncio async def test_zmq_command_loop_valid_payload(zmq_context): """UI command is read from SUB and published.""" - command = {"endpoint": "actuate/speech", "data": "hello"} + command = {"endpoint": "actuate/speech", "data": "hello", "is_priority": False} fake_socket = AsyncMock() async def recv_once(): diff --git a/test/unit/agents/communication/test_ri_communication_agent.py b/test/unit/agents/communication/test_ri_communication_agent.py index 018b19d..06d8766 100644 --- a/test/unit/agents/communication/test_ri_communication_agent.py +++ b/test/unit/agents/communication/test_ri_communication_agent.py @@ -67,6 +67,7 @@ async def test_setup_success_connects_and_starts_robot(zmq_context): address="tcp://localhost:5556", bind=False, gesture_data=[], + single_gesture_data=[], ) agent.add_behavior.assert_called_once() diff --git a/test/unit/agents/llm/test_llm_agent.py b/test/unit/agents/llm/test_llm_agent.py index 62c189e..5e84d8d 100644 --- a/test/unit/agents/llm/test_llm_agent.py +++ b/test/unit/agents/llm/test_llm_agent.py @@ -197,6 +197,9 @@ async def test_query_llm_yields_final_tail_chunk(mock_settings): agent = LLMAgent("llm_agent") agent.send = AsyncMock() + agent.logger = MagicMock() + agent.logger.llm = MagicMock() + # Patch _stream_query_llm to yield tokens that do NOT end with punctuation async def fake_stream(messages): yield "Hello" diff --git a/test/unit/agents/user_interrupt/test_user_interrupt.py b/test/unit/agents/user_interrupt/test_user_interrupt.py new file mode 100644 index 0000000..7e3e700 --- /dev/null +++ b/test/unit/agents/user_interrupt/test_user_interrupt.py @@ -0,0 +1,146 @@ +import asyncio +import json +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent +from control_backend.core.agent_system import InternalMessage +from control_backend.core.config import settings +from control_backend.schemas.ri_message import RIEndpoint + + +@pytest.fixture +def agent(): + agent = UserInterruptAgent(name="user_interrupt_agent") + agent.send = AsyncMock() + agent.logger = MagicMock() + agent.sub_socket = AsyncMock() + return agent + + +@pytest.mark.asyncio +async def test_send_to_speech_agent(agent): + """Verify speech command format.""" + await agent._send_to_speech_agent("Hello World") + + agent.send.assert_awaited_once() + sent_msg: InternalMessage = agent.send.call_args.args[0] + + assert sent_msg.to == settings.agent_settings.robot_speech_name + body = json.loads(sent_msg.body) + assert body["data"] == "Hello World" + assert body["is_priority"] is True + + +@pytest.mark.asyncio +async def test_send_to_gesture_agent(agent): + """Verify gesture command format.""" + await agent._send_to_gesture_agent("wave_hand") + + agent.send.assert_awaited_once() + sent_msg: InternalMessage = agent.send.call_args.args[0] + + assert sent_msg.to == settings.agent_settings.robot_gesture_name + body = json.loads(sent_msg.body) + assert body["data"] == "wave_hand" + assert body["is_priority"] is True + assert body["endpoint"] == RIEndpoint.GESTURE_SINGLE.value + + +@pytest.mark.asyncio +async def test_send_to_program_manager(agent): + """Verify belief update format.""" + context_str = "2" + + await agent._send_to_program_manager(context_str) + + agent.send.assert_awaited_once() + sent_msg: InternalMessage = agent.send.call_args.args[0] + + assert sent_msg.to == settings.agent_settings.bdi_program_manager_name + assert sent_msg.thread == "belief_override_id" + + body = json.loads(sent_msg.body) + + assert body["belief"] == context_str + + +@pytest.mark.asyncio +async def test_receive_loop_routing_success(agent): + """ + Test that the loop correctly: + 1. Receives 'button_pressed' topic from ZMQ + 2. Parses the JSON payload to find 'type' and 'context' + 3. Calls the correct handler method based on 'type' + """ + # Prepare JSON payloads as bytes + payload_speech = json.dumps({"type": "speech", "context": "Hello Speech"}).encode() + payload_gesture = json.dumps({"type": "gesture", "context": "Hello Gesture"}).encode() + payload_override = json.dumps({"type": "override", "context": "Hello Override"}).encode() + + agent.sub_socket.recv_multipart.side_effect = [ + (b"button_pressed", payload_speech), + (b"button_pressed", payload_gesture), + (b"button_pressed", payload_override), + asyncio.CancelledError, # Stop the infinite loop + ] + + agent._send_to_speech_agent = AsyncMock() + agent._send_to_gesture_agent = AsyncMock() + agent._send_to_program_manager = AsyncMock() + + try: + await agent._receive_button_event() + except asyncio.CancelledError: + pass + + await asyncio.sleep(0) + + # Speech + agent._send_to_speech_agent.assert_awaited_once_with("Hello Speech") + + # Gesture + agent._send_to_gesture_agent.assert_awaited_once_with("Hello Gesture") + + # Override + agent._send_to_program_manager.assert_awaited_once_with("Hello Override") + + assert agent._send_to_speech_agent.await_count == 1 + assert agent._send_to_gesture_agent.await_count == 1 + assert agent._send_to_program_manager.await_count == 1 + + +@pytest.mark.asyncio +async def test_receive_loop_unknown_type(agent): + """Test that unknown 'type' values in the JSON log a warning and do not crash.""" + + # Prepare a payload with an unknown type + payload_unknown = json.dumps({"type": "unknown_thing", "context": "some_data"}).encode() + + agent.sub_socket.recv_multipart.side_effect = [ + (b"button_pressed", payload_unknown), + asyncio.CancelledError, + ] + + agent._send_to_speech_agent = AsyncMock() + agent._send_to_gesture_agent = AsyncMock() + agent._send_to_belief_collector = AsyncMock() + + try: + await agent._receive_button_event() + except asyncio.CancelledError: + pass + + await asyncio.sleep(0) + + # Ensure no handlers were called + agent._send_to_speech_agent.assert_not_called() + agent._send_to_gesture_agent.assert_not_called() + agent._send_to_belief_collector.assert_not_called() + + agent.logger.warning.assert_called_with( + "Received button press with unknown type '%s' (context: '%s').", + "unknown_thing", + "some_data", + )