From b83a362abe7c7247a1be2a4b3135b0f219237d6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Wed, 29 Oct 2025 13:31:24 +0100 Subject: [PATCH 01/15] fix: wait for req socket send to make sure we dont stay stuck - if there's no REP this would be awaited forever. ref: N25B-205 --- .../agents/ri_communication_agent.py | 29 ++++++++++++++++--- .../api/v1/endpoints/command.py | 1 - 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/control_backend/agents/ri_communication_agent.py b/src/control_backend/agents/ri_communication_agent.py index 504c707..2b92989 100644 --- a/src/control_backend/agents/ri_communication_agent.py +++ b/src/control_backend/agents/ri_communication_agent.py @@ -17,6 +17,7 @@ class RICommunicationAgent(Agent): req_socket: zmq.Socket _address = "" _bind = True + connected = False def __init__( self, @@ -40,17 +41,34 @@ class RICommunicationAgent(Agent): # We need to listen and sent pings. message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}} - await self.agent.req_socket.send_json(message) + seconds_to_wait_total = 4.0 + try: + await asyncio.wait_for( + self.agent.req_socket.send_json(message), timeout=seconds_to_wait_total / 2 + ) + except TimeoutError as e: + logger.debug( + f"Waited too long to send message - we probably dont have any receivers... but let's check!" + ) # Wait up to three seconds for a reply:) try: - message = await asyncio.wait_for(self.agent.req_socket.recv_json(), timeout=3.0) + logger.debug(f"waiting for message for {seconds_to_wait_total / 2} seconds.") + message = await asyncio.wait_for( + self.agent.req_socket.recv_json(), timeout=seconds_to_wait_total / 2 + ) # We didnt get a reply :( - except asyncio.TimeoutError as e: - logger.info("No ping retrieved in 3 seconds, killing myself.") + except TimeoutError as e: + logger.info(f"No ping back retrieved in {seconds_to_wait_total/2} seconds totalling {seconds_to_wait_total} of time, killing myself.") + self.agent.connected = False + # TODO: Send event to UI letting know that we've lost connection + self.kill() + except Exception as e: + logger.debug(f"Differennt exception: {e}") + logger.debug('Received message "%s"', message) if "endpoint" not in message: logger.error("No received endpoint in message, excepted ping endpoint.") @@ -162,4 +180,7 @@ class RICommunicationAgent(Agent): # Set up ping behaviour listen_behaviour = self.ListenBehaviour() self.add_behaviour(listen_behaviour) + + # TODO: Let UI know that we're connected >:) + self.connected = True logger.info("Finished setting up %s", self.jid) diff --git a/src/control_backend/api/v1/endpoints/command.py b/src/control_backend/api/v1/endpoints/command.py index badaf90..e7fef60 100644 --- a/src/control_backend/api/v1/endpoints/command.py +++ b/src/control_backend/api/v1/endpoints/command.py @@ -17,6 +17,5 @@ async def receive_command(command: SpeechCommand, request: Request): topic = b"command" pub_socket: Socket = request.app.state.internal_comm_socket pub_socket.send_multipart([topic, command.model_dump_json().encode()]) - return {"status": "Command received"} From 59c2edc3c6d676441fed7ff212aa840430037fe4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Wed, 29 Oct 2025 13:33:01 +0100 Subject: [PATCH 02/15] fix: small fix for testing ping timeouts ref: N25B-205 --- test/integration/agents/test_ri_communication_agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/agents/test_ri_communication_agent.py b/test/integration/agents/test_ri_communication_agent.py index 3e4a056..e8643c8 100644 --- a/test/integration/agents/test_ri_communication_agent.py +++ b/test/integration/agents/test_ri_communication_agent.py @@ -493,7 +493,7 @@ async def test_listen_behaviour_timeout(caplog): with caplog.at_level("INFO"): await behaviour.run() - assert "No ping retrieved in 3 seconds" in caplog.text + assert "No ping" in caplog.text @pytest.mark.asyncio From 669d0190d6b5f9564d6941883b557fdc7d4d4d7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Wed, 29 Oct 2025 19:22:06 +0100 Subject: [PATCH 03/15] feat: started ping router and internal messaging for pings ref: N25B-151 --- .../agents/ri_communication_agent.py | 10 ++- .../api/v1/endpoints/command.py | 21 ------ src/control_backend/api/v1/endpoints/robot.py | 74 +++++++++++++++++++ src/control_backend/api/v1/router.py | 4 +- ...and_endpoint.py => test_robot_endpoint.py} | 4 +- 5 files changed, 87 insertions(+), 26 deletions(-) delete mode 100644 src/control_backend/api/v1/endpoints/command.py create mode 100644 src/control_backend/api/v1/endpoints/robot.py rename test/integration/api/endpoints/{test_command_endpoint.py => test_robot_endpoint.py} (95%) diff --git a/src/control_backend/agents/ri_communication_agent.py b/src/control_backend/agents/ri_communication_agent.py index 2b92989..4da2c69 100644 --- a/src/control_backend/agents/ri_communication_agent.py +++ b/src/control_backend/agents/ri_communication_agent.py @@ -5,11 +5,13 @@ from spade.agent import Agent from spade.behaviour import CyclicBehaviour import zmq + from control_backend.core.config import settings from control_backend.core.zmq_context import context -from control_backend.schemas.message import Message +from control_backend.schemas.ri_message import RIMessage from control_backend.agents.ri_command_agent import RICommandAgent + logger = logging.getLogger(__name__) @@ -63,6 +65,11 @@ class RICommunicationAgent(Agent): logger.info(f"No ping back retrieved in {seconds_to_wait_total/2} seconds totalling {seconds_to_wait_total} of time, killing myself.") self.agent.connected = False # TODO: Send event to UI letting know that we've lost connection + topic = b"ping" + data = json.dumps(False).encode() + pub_socket = context.socket(zmq.PUB) + pub_socket.connect(settings.zmq_settings.internal_comm_address) + pub_socket.send_multipart([topic, data]) self.kill() @@ -90,6 +97,7 @@ class RICommunicationAgent(Agent): logger.info("Setting up %s", self.jid) retries = 0 + # Let's try a certain amount of times before failing connection while retries < max_retries: # Bind request socket diff --git a/src/control_backend/api/v1/endpoints/command.py b/src/control_backend/api/v1/endpoints/command.py deleted file mode 100644 index e7fef60..0000000 --- a/src/control_backend/api/v1/endpoints/command.py +++ /dev/null @@ -1,21 +0,0 @@ -from fastapi import APIRouter, Request -import logging - -from zmq import Socket - -from control_backend.schemas.ri_message import SpeechCommand, RIEndpoint - -logger = logging.getLogger(__name__) - -router = APIRouter() - - -@router.post("/command", status_code=202) -async def receive_command(command: SpeechCommand, request: Request): - # Validate and retrieve data. - SpeechCommand.model_validate(command) - topic = b"command" - pub_socket: Socket = request.app.state.internal_comm_socket - pub_socket.send_multipart([topic, command.model_dump_json().encode()]) - - return {"status": "Command received"} diff --git a/src/control_backend/api/v1/endpoints/robot.py b/src/control_backend/api/v1/endpoints/robot.py new file mode 100644 index 0000000..1d0da9c --- /dev/null +++ b/src/control_backend/api/v1/endpoints/robot.py @@ -0,0 +1,74 @@ +from fastapi import APIRouter, Request +from fastapi.responses import JSONResponse, StreamingResponse +import logging +import asyncio +import zmq.asyncio +import json +import datetime + +from zmq import Socket +from control_backend.core.zmq_context import context +from control_backend.core.config import settings +from control_backend.schemas.ri_message import SpeechCommand, RIEndpoint + + + + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +@router.post("/command", status_code=202) +async def receive_command(command: SpeechCommand, request: Request): + # Validate and retrieve data. + SpeechCommand.model_validate(command) + topic = b"command" + pub_socket: Socket = request.app.state.internal_comm_socket + pub_socket.send_multipart([topic, command.model_dump_json().encode()]) + + return {"status": "Command received"} + + +@router.get("/ping_check") +async def ping(request: Request): + pass + + +@router.get("/ping_stream") +async def ping_stream(request: Request): + """Stream live updates whenever the device state changes.""" + async def event_stream(): + # Set up internal socket to receive ping updates + logger.debug("Ping stream router event stream entered.") + sub_socket = zmq.asyncio.Context().socket(zmq.SUB) + sub_socket.connect(settings.zmq_settings.internal_comm_address) + sub_socket.setsockopt(zmq.SUBSCRIBE, b"ping") + connected = True + + ping_frequency = 1 # How many seconds between ping attempts + + # Even though its most likely the updates should alternate + # So, True - False - True - False for connectivity. + # Let's still check:) + while True: + logger.debug("Ping stream entered listening ") + try: + topic, body = await asyncio.wait_for(sub_socket.recv_multipart(), timeout=ping_frequency) + logger.debug("got ping change in ping_stream router") + connected = json.loads(body) + except TimeoutError as e: + await asyncio.sleep(0.1) + + # Stop if client disconnected + if await request.is_disconnected(): + print("Client disconnected from SSE") + break + + + logger.debug(f"Yielded new connection event in robot ping router: {str(connected)}") + yield f"data: {str(connected)}, time:{str(datetime.datetime.now().strftime("%H:%M:%S"))}\n\n" + + + + return StreamingResponse(event_stream(), media_type="text/event-stream") \ No newline at end of file diff --git a/src/control_backend/api/v1/router.py b/src/control_backend/api/v1/router.py index dc7aea9..dca7e27 100644 --- a/src/control_backend/api/v1/router.py +++ b/src/control_backend/api/v1/router.py @@ -1,6 +1,6 @@ from fastapi.routing import APIRouter -from control_backend.api.v1.endpoints import message, sse, command +from control_backend.api.v1.endpoints import message, sse, robot api_router = APIRouter() @@ -8,4 +8,4 @@ api_router.include_router(message.router, tags=["Messages"]) api_router.include_router(sse.router, tags=["SSE"]) -api_router.include_router(command.router, tags=["Commands"]) +api_router.include_router(robot.router, prefix="/robot", tags=["Pings", "Commands"]) \ No newline at end of file diff --git a/test/integration/api/endpoints/test_command_endpoint.py b/test/integration/api/endpoints/test_robot_endpoint.py similarity index 95% rename from test/integration/api/endpoints/test_command_endpoint.py rename to test/integration/api/endpoints/test_robot_endpoint.py index 07bd866..827fb17 100644 --- a/test/integration/api/endpoints/test_command_endpoint.py +++ b/test/integration/api/endpoints/test_robot_endpoint.py @@ -3,7 +3,7 @@ from fastapi import FastAPI from fastapi.testclient import TestClient from unittest.mock import MagicMock -from control_backend.api.v1.endpoints import command +from control_backend.api.v1.endpoints import robot from control_backend.schemas.ri_message import SpeechCommand @@ -14,7 +14,7 @@ def app(): Also sets up a mock internal_comm_socket. """ app = FastAPI() - app.include_router(command.router) + app.include_router(robot.router) app.state.internal_comm_socket = MagicMock() # mock ZMQ socket return app From 4f2d45fb44c17fe4575a10261f605c55c314c36c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Wed, 29 Oct 2025 21:55:23 +0100 Subject: [PATCH 04/15] feat: fixed socket typing for communication agent and ping router- automatically try to reconnect with robot. ref: N25B-151 --- .../agents/ri_communication_agent.py | 58 ++++++++++++------- src/control_backend/api/v1/endpoints/robot.py | 10 ++-- src/control_backend/main.py | 7 ++- 3 files changed, 47 insertions(+), 28 deletions(-) diff --git a/src/control_backend/agents/ri_communication_agent.py b/src/control_backend/agents/ri_communication_agent.py index 4da2c69..f46c623 100644 --- a/src/control_backend/agents/ri_communication_agent.py +++ b/src/control_backend/agents/ri_communication_agent.py @@ -4,6 +4,7 @@ import logging from spade.agent import Agent from spade.behaviour import CyclicBehaviour import zmq +import zmq.asyncio from control_backend.core.config import settings @@ -16,7 +17,8 @@ logger = logging.getLogger(__name__) class RICommunicationAgent(Agent): - req_socket: zmq.Socket + _pub_socket: zmq.asyncio.Socket + req_socket: zmq.asyncio.Socket | None _address = "" _bind = True connected = False @@ -25,14 +27,18 @@ class RICommunicationAgent(Agent): self, jid: str, password: str, + pub_socket: zmq.asyncio.Socket, port: int = 5222, verify_security: bool = False, address="tcp://localhost:0000", bind=False, + ): super().__init__(jid, password, port, verify_security) self._address = address self._bind = bind + self.req_socket = None + self._pub_socket = pub_socket class ListenBehaviour(CyclicBehaviour): async def run(self): @@ -43,7 +49,7 @@ class RICommunicationAgent(Agent): # We need to listen and sent pings. message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}} - seconds_to_wait_total = 4.0 + seconds_to_wait_total = 1.0 try: await asyncio.wait_for( self.agent.req_socket.send_json(message), timeout=seconds_to_wait_total / 2 @@ -62,16 +68,12 @@ class RICommunicationAgent(Agent): # We didnt get a reply :( except TimeoutError as e: - logger.info(f"No ping back retrieved in {seconds_to_wait_total/2} seconds totalling {seconds_to_wait_total} of time, killing myself.") - self.agent.connected = False + logger.info(f"No ping back retrieved in {seconds_to_wait_total/2} seconds totalling {seconds_to_wait_total} of time, killing myself (or maybe just laying low).") # TODO: Send event to UI letting know that we've lost connection topic = b"ping" data = json.dumps(False).encode() - pub_socket = context.socket(zmq.PUB) - pub_socket.connect(settings.zmq_settings.internal_comm_address) - pub_socket.send_multipart([topic, data]) - - self.kill() + self.agent._pub_socket.send_multipart([topic, data]) + await self.agent.setup() except Exception as e: logger.debug(f"Differennt exception: {e}") @@ -90,25 +92,38 @@ class RICommunicationAgent(Agent): "Received message with topic different than ping, while ping expected." ) - async def setup(self, max_retries: int = 5): - """ - Try to setup the communication agent, we have 5 retries in case we dont have a response yet. - """ - logger.info("Setting up %s", self.jid) - retries = 0 - - # Let's try a certain amount of times before failing connection - while retries < max_retries: - # Bind request socket + async def setup_req_socket(self, force = False): + """ + Sets up request socket for communication agent. + """ + if self.req_socket is None or force: self.req_socket = context.socket(zmq.REQ) if self._bind: self.req_socket.bind(self._address) else: self.req_socket.connect(self._address) + + async def setup(self, max_retries: int = 5): + """ + Try to setup the communication agent, we have 5 retries in case we dont have a response yet. + """ + logger.info("Setting up %s", self.jid) + + # Bind request socket + await self.setup_req_socket() + + retries = 0 + # Let's try a certain amount of times before failing connection + while retries < max_retries: + + # Make sure the socket is properly setup. + if self.req_socket is None: + continue + # Send our message and receive one back:) - message = {"endpoint": "negotiate/ports", "data": None} + message = {"endpoint": "negotiate/ports", "data": {}} await self.req_socket.send_json(message) try: @@ -190,5 +205,8 @@ class RICommunicationAgent(Agent): self.add_behaviour(listen_behaviour) # TODO: Let UI know that we're connected >:) + topic = b"ping" + data = json.dumps(True).encode() + await self._pub_socket.send_multipart([topic, data]) self.connected = True logger.info("Finished setting up %s", self.jid) diff --git a/src/control_backend/api/v1/endpoints/robot.py b/src/control_backend/api/v1/endpoints/robot.py index 1d0da9c..aa1b532 100644 --- a/src/control_backend/api/v1/endpoints/robot.py +++ b/src/control_backend/api/v1/endpoints/robot.py @@ -6,7 +6,7 @@ import zmq.asyncio import json import datetime -from zmq import Socket +from zmq.asyncio import Socket from control_backend.core.zmq_context import context from control_backend.core.config import settings from control_backend.schemas.ri_message import SpeechCommand, RIEndpoint @@ -24,7 +24,7 @@ async def receive_command(command: SpeechCommand, request: Request): # Validate and retrieve data. SpeechCommand.model_validate(command) topic = b"command" - pub_socket: Socket = request.app.state.internal_comm_socket + pub_socket : Socket = request.app.state.internal_comm_socket pub_socket.send_multipart([topic, command.model_dump_json().encode()]) return {"status": "Command received"} @@ -41,7 +41,8 @@ async def ping_stream(request: Request): async def event_stream(): # Set up internal socket to receive ping updates logger.debug("Ping stream router event stream entered.") - sub_socket = zmq.asyncio.Context().socket(zmq.SUB) + + sub_socket = context.socket(zmq.SUB) sub_socket.connect(settings.zmq_settings.internal_comm_address) sub_socket.setsockopt(zmq.SUBSCRIBE, b"ping") connected = True @@ -69,6 +70,5 @@ async def ping_stream(request: Request): logger.debug(f"Yielded new connection event in robot ping router: {str(connected)}") yield f"data: {str(connected)}, time:{str(datetime.datetime.now().strftime("%H:%M:%S"))}\n\n" - - + return StreamingResponse(event_stream(), media_type="text/event-stream") \ No newline at end of file diff --git a/src/control_backend/main.py b/src/control_backend/main.py index e398552..bd0cc74 100644 --- a/src/control_backend/main.py +++ b/src/control_backend/main.py @@ -32,11 +32,12 @@ async def lifespan(app: FastAPI): # Initiate agents ri_communication_agent = RICommunicationAgent( - settings.agent_settings.ri_communication_agent_name + "@" + settings.agent_settings.host, - settings.agent_settings.ri_communication_agent_name, + jid=settings.agent_settings.ri_communication_agent_name + "@" + settings.agent_settings.host, + password=settings.agent_settings.ri_communication_agent_name, + pub_socket=internal_comm_socket, address="tcp://*:5555", bind=True, - ) + ) await ri_communication_agent.start() bdi_core = BDICoreAgent( From df6a39866bbc3a87d85dae95fac7e4d1ae01180e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Wed, 29 Oct 2025 22:05:13 +0100 Subject: [PATCH 05/15] fix: fix unit tests with new feat ref: N25B-151 --- .../agents/test_ri_communication_agent.py | 61 +++++++++++-------- 1 file changed, 37 insertions(+), 24 deletions(-) diff --git a/test/integration/agents/test_ri_communication_agent.py b/test/integration/agents/test_ri_communication_agent.py index e8643c8..9a7cb41 100644 --- a/test/integration/agents/test_ri_communication_agent.py +++ b/test/integration/agents/test_ri_communication_agent.py @@ -92,6 +92,8 @@ async def test_setup_creates_socket_and_negotiate_1(monkeypatch): fake_socket.send_json = AsyncMock() fake_socket.recv_json = fake_json_correct_negototiate_1() + fake_pub_socket = AsyncMock() + # Mock context.socket to return our fake socket monkeypatch.setattr( "control_backend.agents.ri_communication_agent.context.socket", lambda _: fake_socket @@ -103,16 +105,17 @@ async def test_setup_creates_socket_and_negotiate_1(monkeypatch): ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() + fake_pub_socket = AsyncMock() # --- Act --- agent = RICommunicationAgent( - "test@server", "password", address="tcp://localhost:5555", bind=False + "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) await agent.setup() # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") - fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": None}) + fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( @@ -146,16 +149,17 @@ async def test_setup_creates_socket_and_negotiate_2(monkeypatch): ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() + fake_pub_socket = AsyncMock() # --- Act --- agent = RICommunicationAgent( - "test@server", "password", address="tcp://localhost:5555", bind=False + "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) await agent.setup() # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") - fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": None}) + fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( @@ -192,11 +196,11 @@ async def test_setup_creates_socket_and_negotiate_3(monkeypatch, caplog): ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() - + fake_pub_socket = AsyncMock() # --- Act --- with caplog.at_level("ERROR"): agent = RICommunicationAgent( - "test@server", "password", address="tcp://localhost:5555", bind=False + "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) await agent.setup(max_retries=1) @@ -233,16 +237,16 @@ async def test_setup_creates_socket_and_negotiate_4(monkeypatch): ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() - + fake_pub_socket = AsyncMock() # --- Act --- agent = RICommunicationAgent( - "test@server", "password", address="tcp://localhost:5555", bind=True + "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=True ) await agent.setup() # --- Assert --- fake_socket.bind.assert_any_call("tcp://localhost:5555") - fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": None}) + fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( @@ -276,16 +280,16 @@ async def test_setup_creates_socket_and_negotiate_5(monkeypatch): ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() - + fake_pub_socket = AsyncMock() # --- Act --- agent = RICommunicationAgent( - "test@server", "password", address="tcp://localhost:5555", bind=False + "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) await agent.setup() # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") - fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": None}) + fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( @@ -319,16 +323,16 @@ async def test_setup_creates_socket_and_negotiate_6(monkeypatch): ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() - + fake_pub_socket = AsyncMock() # --- Act --- agent = RICommunicationAgent( - "test@server", "password", address="tcp://localhost:5555", bind=False + "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) await agent.setup() # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") - fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": None}) + fake_socket.send_json.assert_any_call({"endpoint": "negotiate/ports", "data": {}}) fake_socket.recv_json.assert_awaited() fake_agent_instance.start.assert_awaited() MockCommandAgent.assert_called_once_with( @@ -365,11 +369,12 @@ async def test_setup_creates_socket_and_negotiate_7(monkeypatch, caplog): ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() + fake_pub_socket = AsyncMock() # --- Act --- with caplog.at_level("WARNING"): agent = RICommunicationAgent( - "test@server", "password", address="tcp://localhost:5555", bind=False + "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) await agent.setup(max_retries=1) @@ -402,11 +407,12 @@ async def test_setup_creates_socket_and_negotiate_timeout(monkeypatch, caplog): ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() - + fake_pub_socket = AsyncMock() + # --- Act --- with caplog.at_level("WARNING"): agent = RICommunicationAgent( - "test@server", "password", address="tcp://localhost:5555", bind=False + "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) await agent.setup(max_retries=1) @@ -426,9 +432,10 @@ async def test_listen_behaviour_ping_correct(caplog): fake_socket = AsyncMock() fake_socket.send_json = AsyncMock() fake_socket.recv_json = AsyncMock(return_value={"endpoint": "ping", "data": {}}) + fake_pub_socket = AsyncMock() # TODO: Integration test between actual server and password needed for spade agents - agent = RICommunicationAgent("test@server", "password") + agent = RICommunicationAgent("test@server", "password", fake_pub_socket) agent.req_socket = fake_socket behaviour = agent.ListenBehaviour() @@ -461,8 +468,9 @@ async def test_listen_behaviour_ping_wrong_endpoint(caplog): ], } ) + fake_pub_socket = AsyncMock() - agent = RICommunicationAgent("test@server", "password") + agent = RICommunicationAgent("test@server", "password", fake_pub_socket) agent.req_socket = fake_socket behaviour = agent.ListenBehaviour() @@ -483,8 +491,9 @@ async def test_listen_behaviour_timeout(caplog): fake_socket.send_json = AsyncMock() # recv_json will never resolve, simulate timeout fake_socket.recv_json = AsyncMock(side_effect=asyncio.TimeoutError) + fake_pub_socket = AsyncMock() - agent = RICommunicationAgent("test@server", "password") + agent = RICommunicationAgent("test@server", "password", fake_pub_socket) agent.req_socket = fake_socket behaviour = agent.ListenBehaviour() @@ -510,8 +519,9 @@ async def test_listen_behaviour_ping_no_endpoint(caplog): "data": "I dont have an endpoint >:)", } ) + fake_pub_socket = AsyncMock() - agent = RICommunicationAgent("test@server", "password") + agent = RICommunicationAgent("test@server", "password", fake_pub_socket) agent.req_socket = fake_socket behaviour = agent.ListenBehaviour() @@ -530,15 +540,17 @@ async def test_listen_behaviour_ping_no_endpoint(caplog): async def test_setup_unexpected_exception(monkeypatch, caplog): fake_socket = MagicMock() fake_socket.send_json = AsyncMock() + fake_pub_socket = AsyncMock() # Simulate unexpected exception during recv_json() fake_socket.recv_json = AsyncMock(side_effect=Exception("boom!")) + monkeypatch.setattr( "control_backend.agents.ri_communication_agent.context.socket", lambda _: fake_socket ) agent = RICommunicationAgent( - "test@server", "password", address="tcp://localhost:5555", bind=False + "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) with caplog.at_level("ERROR"): @@ -572,9 +584,10 @@ async def test_setup_unpacking_exception(monkeypatch, caplog): ) as MockCommandAgent: fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() + fake_pub_socket = AsyncMock() agent = RICommunicationAgent( - "test@server", "password", address="tcp://localhost:5555", bind=False + "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False ) # --- Act & Assert --- From af3e4ae56a49d1b60287fa49de000b7ac08bfc7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Thu, 30 Oct 2025 13:07:01 +0100 Subject: [PATCH 06/15] fix: adjusted ping data on ping_stream, and made it so that communication agent is more robust and quick in ping communication. ref: N25B-142 --- src/control_backend/agents/ri_communication_agent.py | 3 +++ src/control_backend/api/v1/endpoints/robot.py | 5 +++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/control_backend/agents/ri_communication_agent.py b/src/control_backend/agents/ri_communication_agent.py index f46c623..79e44ed 100644 --- a/src/control_backend/agents/ri_communication_agent.py +++ b/src/control_backend/agents/ri_communication_agent.py @@ -86,6 +86,9 @@ class RICommunicationAgent(Agent): # See what endpoint we received match message["endpoint"]: case "ping": + topic = b"ping" + data = json.dumps(True).encode() + await self.agent._pub_socket.send_multipart([topic, data]) await asyncio.sleep(1) case _: logger.info( diff --git a/src/control_backend/api/v1/endpoints/robot.py b/src/control_backend/api/v1/endpoints/robot.py index aa1b532..b8291ac 100644 --- a/src/control_backend/api/v1/endpoints/robot.py +++ b/src/control_backend/api/v1/endpoints/robot.py @@ -45,7 +45,7 @@ async def ping_stream(request: Request): sub_socket = context.socket(zmq.SUB) sub_socket.connect(settings.zmq_settings.internal_comm_address) sub_socket.setsockopt(zmq.SUBSCRIBE, b"ping") - connected = True + connected = False ping_frequency = 1 # How many seconds between ping attempts @@ -68,7 +68,8 @@ async def ping_stream(request: Request): logger.debug(f"Yielded new connection event in robot ping router: {str(connected)}") - yield f"data: {str(connected)}, time:{str(datetime.datetime.now().strftime("%H:%M:%S"))}\n\n" + falseJson = json.dumps(connected) + yield (f"data: {falseJson}\n\n") return StreamingResponse(event_stream(), media_type="text/event-stream") \ No newline at end of file From 30453be4b20ae910a98a3d467bc68d4ba2ce63b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Thu, 30 Oct 2025 16:41:35 +0100 Subject: [PATCH 07/15] fix: ruff checks is now in order:) ref: N25B-205 --- .../agents/ri_command_agent.py | 3 +- .../agents/ri_communication_agent.py | 36 +++++---- src/control_backend/api/v1/endpoints/robot.py | 34 ++++----- src/control_backend/api/v1/router.py | 4 +- src/control_backend/main.py | 9 ++- src/control_backend/schemas/ri_message.py | 4 +- .../agents/test_ri_commands_agent.py | 8 +- .../agents/test_ri_communication_agent.py | 75 ++++++++++++++----- .../api/endpoints/test_robot_endpoint.py | 3 +- test/integration/schemas/test_ri_message.py | 25 ++----- 10 files changed, 117 insertions(+), 84 deletions(-) diff --git a/src/control_backend/agents/ri_command_agent.py b/src/control_backend/agents/ri_command_agent.py index 01fc824..51b8064 100644 --- a/src/control_backend/agents/ri_command_agent.py +++ b/src/control_backend/agents/ri_command_agent.py @@ -1,8 +1,9 @@ import json import logging + +import zmq from spade.agent import Agent from spade.behaviour import CyclicBehaviour -import zmq from control_backend.core.config import settings from control_backend.core.zmq_context import context diff --git a/src/control_backend/agents/ri_communication_agent.py b/src/control_backend/agents/ri_communication_agent.py index 79e44ed..0bb369d 100644 --- a/src/control_backend/agents/ri_communication_agent.py +++ b/src/control_backend/agents/ri_communication_agent.py @@ -1,23 +1,21 @@ import asyncio import json import logging -from spade.agent import Agent -from spade.behaviour import CyclicBehaviour + import zmq import zmq.asyncio +from spade.agent import Agent +from spade.behaviour import CyclicBehaviour - +from control_backend.agents.ri_command_agent import RICommandAgent from control_backend.core.config import settings from control_backend.core.zmq_context import context -from control_backend.schemas.ri_message import RIMessage -from control_backend.agents.ri_command_agent import RICommandAgent - logger = logging.getLogger(__name__) class RICommunicationAgent(Agent): - _pub_socket: zmq.asyncio.Socket + _pub_socket: zmq.asyncio.Socket req_socket: zmq.asyncio.Socket | None _address = "" _bind = True @@ -32,7 +30,6 @@ class RICommunicationAgent(Agent): verify_security: bool = False, address="tcp://localhost:0000", bind=False, - ): super().__init__(jid, password, port, verify_security) self._address = address @@ -54,9 +51,10 @@ class RICommunicationAgent(Agent): await asyncio.wait_for( self.agent.req_socket.send_json(message), timeout=seconds_to_wait_total / 2 ) - except TimeoutError as e: + except TimeoutError: logger.debug( - f"Waited too long to send message - we probably dont have any receivers... but let's check!" + "Waited too long to send message - " + "we probably dont have any receivers... but let's check!" ) # Wait up to three seconds for a reply:) @@ -67,8 +65,11 @@ class RICommunicationAgent(Agent): ) # We didnt get a reply :( - except TimeoutError as e: - logger.info(f"No ping back retrieved in {seconds_to_wait_total/2} seconds totalling {seconds_to_wait_total} of time, killing myself (or maybe just laying low).") + except TimeoutError: + logger.info( + f"No ping back retrieved in {seconds_to_wait_total / 2} seconds totalling" + f"{seconds_to_wait_total} of time, killing myself (or maybe just laying low)." + ) # TODO: Send event to UI letting know that we've lost connection topic = b"ping" data = json.dumps(False).encode() @@ -95,8 +96,7 @@ class RICommunicationAgent(Agent): "Received message with topic different than ping, while ping expected." ) - - async def setup_req_socket(self, force = False): + async def setup_req_socket(self, force=False): """ Sets up request socket for communication agent. """ @@ -107,7 +107,6 @@ class RICommunicationAgent(Agent): else: self.req_socket.connect(self._address) - async def setup(self, max_retries: int = 5): """ Try to setup the communication agent, we have 5 retries in case we dont have a response yet. @@ -116,15 +115,14 @@ class RICommunicationAgent(Agent): # Bind request socket await self.setup_req_socket() - + retries = 0 # Let's try a certain amount of times before failing connection while retries < max_retries: - # Make sure the socket is properly setup. if self.req_socket is None: continue - + # Send our message and receive one back:) message = {"endpoint": "negotiate/ports", "data": {}} await self.req_socket.send_json(message) @@ -132,7 +130,7 @@ class RICommunicationAgent(Agent): try: received_message = await asyncio.wait_for(self.req_socket.recv_json(), timeout=20.0) - except asyncio.TimeoutError: + except TimeoutError: logger.warning( "No connection established in 20 seconds (attempt %d/%d)", retries + 1, diff --git a/src/control_backend/api/v1/endpoints/robot.py b/src/control_backend/api/v1/endpoints/robot.py index b8291ac..e114757 100644 --- a/src/control_backend/api/v1/endpoints/robot.py +++ b/src/control_backend/api/v1/endpoints/robot.py @@ -1,18 +1,15 @@ -from fastapi import APIRouter, Request -from fastapi.responses import JSONResponse, StreamingResponse -import logging import asyncio -import zmq.asyncio import json -import datetime +import logging +import zmq.asyncio +from fastapi import APIRouter, Request +from fastapi.responses import StreamingResponse from zmq.asyncio import Socket -from control_backend.core.zmq_context import context + from control_backend.core.config import settings -from control_backend.schemas.ri_message import SpeechCommand, RIEndpoint - - - +from control_backend.core.zmq_context import context +from control_backend.schemas.ri_message import SpeechCommand logger = logging.getLogger(__name__) @@ -24,7 +21,7 @@ async def receive_command(command: SpeechCommand, request: Request): # Validate and retrieve data. SpeechCommand.model_validate(command) topic = b"command" - pub_socket : Socket = request.app.state.internal_comm_socket + pub_socket: Socket = request.app.state.internal_comm_socket pub_socket.send_multipart([topic, command.model_dump_json().encode()]) return {"status": "Command received"} @@ -38,6 +35,7 @@ async def ping(request: Request): @router.get("/ping_stream") async def ping_stream(request: Request): """Stream live updates whenever the device state changes.""" + async def event_stream(): # Set up internal socket to receive ping updates logger.debug("Ping stream router event stream entered.") @@ -47,7 +45,7 @@ async def ping_stream(request: Request): sub_socket.setsockopt(zmq.SUBSCRIBE, b"ping") connected = False - ping_frequency = 1 # How many seconds between ping attempts + ping_frequency = 1 # How many seconds between ping attempts # Even though its most likely the updates should alternate # So, True - False - True - False for connectivity. @@ -55,21 +53,21 @@ async def ping_stream(request: Request): while True: logger.debug("Ping stream entered listening ") try: - topic, body = await asyncio.wait_for(sub_socket.recv_multipart(), timeout=ping_frequency) + topic, body = await asyncio.wait_for( + sub_socket.recv_multipart(), timeout=ping_frequency + ) logger.debug("got ping change in ping_stream router") connected = json.loads(body) - except TimeoutError as e: + except TimeoutError: await asyncio.sleep(0.1) - + # Stop if client disconnected if await request.is_disconnected(): print("Client disconnected from SSE") break - logger.debug(f"Yielded new connection event in robot ping router: {str(connected)}") falseJson = json.dumps(connected) yield (f"data: {falseJson}\n\n") - - return StreamingResponse(event_stream(), media_type="text/event-stream") \ No newline at end of file + return StreamingResponse(event_stream(), media_type="text/event-stream") diff --git a/src/control_backend/api/v1/router.py b/src/control_backend/api/v1/router.py index dca7e27..5c48872 100644 --- a/src/control_backend/api/v1/router.py +++ b/src/control_backend/api/v1/router.py @@ -1,6 +1,6 @@ from fastapi.routing import APIRouter -from control_backend.api.v1.endpoints import message, sse, robot +from control_backend.api.v1.endpoints import message, robot, sse api_router = APIRouter() @@ -8,4 +8,4 @@ api_router.include_router(message.router, tags=["Messages"]) api_router.include_router(sse.router, tags=["SSE"]) -api_router.include_router(robot.router, prefix="/robot", tags=["Pings", "Commands"]) \ No newline at end of file +api_router.include_router(robot.router, prefix="/robot", tags=["Pings", "Commands"]) diff --git a/src/control_backend/main.py b/src/control_backend/main.py index bd0cc74..a824ab1 100644 --- a/src/control_backend/main.py +++ b/src/control_backend/main.py @@ -8,9 +8,10 @@ import zmq from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware +from control_backend.agents.bdi.bdi_core import BDICoreAgent + # Internal imports from control_backend.agents.ri_communication_agent import RICommunicationAgent -from control_backend.agents.bdi.bdi_core import BDICoreAgent from control_backend.api.v1.router import api_router from control_backend.core.config import settings from control_backend.core.zmq_context import context @@ -32,12 +33,14 @@ async def lifespan(app: FastAPI): # Initiate agents ri_communication_agent = RICommunicationAgent( - jid=settings.agent_settings.ri_communication_agent_name + "@" + settings.agent_settings.host, + jid=settings.agent_settings.ri_communication_agent_name + + "@" + + settings.agent_settings.host, password=settings.agent_settings.ri_communication_agent_name, pub_socket=internal_comm_socket, address="tcp://*:5555", bind=True, - ) + ) await ri_communication_agent.start() bdi_core = BDICoreAgent( diff --git a/src/control_backend/schemas/ri_message.py b/src/control_backend/schemas/ri_message.py index 97b7930..488b823 100644 --- a/src/control_backend/schemas/ri_message.py +++ b/src/control_backend/schemas/ri_message.py @@ -1,7 +1,7 @@ from enum import Enum -from typing import Any, Literal +from typing import Any -from pydantic import BaseModel, Field, ValidationError +from pydantic import BaseModel class RIEndpoint(str, Enum): diff --git a/test/integration/agents/test_ri_commands_agent.py b/test/integration/agents/test_ri_commands_agent.py index 219d682..4249401 100644 --- a/test/integration/agents/test_ri_commands_agent.py +++ b/test/integration/agents/test_ri_commands_agent.py @@ -1,10 +1,10 @@ -import asyncio -import zmq import json -import pytest from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import zmq + from control_backend.agents.ri_command_agent import RICommandAgent -from control_backend.schemas.ri_message import SpeechCommand @pytest.mark.asyncio diff --git a/test/integration/agents/test_ri_communication_agent.py b/test/integration/agents/test_ri_communication_agent.py index 9a7cb41..baeb717 100644 --- a/test/integration/agents/test_ri_communication_agent.py +++ b/test/integration/agents/test_ri_communication_agent.py @@ -1,6 +1,8 @@ import asyncio +from unittest.mock import ANY, AsyncMock, MagicMock, patch + import pytest -from unittest.mock import AsyncMock, MagicMock, patch, ANY + from control_backend.agents.ri_communication_agent import RICommunicationAgent @@ -109,7 +111,11 @@ async def test_setup_creates_socket_and_negotiate_1(monkeypatch): # --- Act --- agent = RICommunicationAgent( - "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False + "test@server", + "password", + pub_socket=fake_pub_socket, + address="tcp://localhost:5555", + bind=False, ) await agent.setup() @@ -153,7 +159,11 @@ async def test_setup_creates_socket_and_negotiate_2(monkeypatch): # --- Act --- agent = RICommunicationAgent( - "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False + "test@server", + "password", + pub_socket=fake_pub_socket, + address="tcp://localhost:5555", + bind=False, ) await agent.setup() @@ -189,8 +199,8 @@ async def test_setup_creates_socket_and_negotiate_3(monkeypatch, caplog): # Mock RICommandAgent agent startup - # We are sending wrong negotiation info to the communication agent, so we should retry and expect a - # better response, within a limited time. + # We are sending wrong negotiation info to the communication agent, + # so we should retry and expect a better response, within a limited time. with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: @@ -200,7 +210,11 @@ async def test_setup_creates_socket_and_negotiate_3(monkeypatch, caplog): # --- Act --- with caplog.at_level("ERROR"): agent = RICommunicationAgent( - "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False + "test@server", + "password", + pub_socket=fake_pub_socket, + address="tcp://localhost:5555", + bind=False, ) await agent.setup(max_retries=1) @@ -240,7 +254,11 @@ async def test_setup_creates_socket_and_negotiate_4(monkeypatch): fake_pub_socket = AsyncMock() # --- Act --- agent = RICommunicationAgent( - "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=True + "test@server", + "password", + pub_socket=fake_pub_socket, + address="tcp://localhost:5555", + bind=True, ) await agent.setup() @@ -283,7 +301,11 @@ async def test_setup_creates_socket_and_negotiate_5(monkeypatch): fake_pub_socket = AsyncMock() # --- Act --- agent = RICommunicationAgent( - "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False + "test@server", + "password", + pub_socket=fake_pub_socket, + address="tcp://localhost:5555", + bind=False, ) await agent.setup() @@ -326,7 +348,11 @@ async def test_setup_creates_socket_and_negotiate_6(monkeypatch): fake_pub_socket = AsyncMock() # --- Act --- agent = RICommunicationAgent( - "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False + "test@server", + "password", + pub_socket=fake_pub_socket, + address="tcp://localhost:5555", + bind=False, ) await agent.setup() @@ -362,8 +388,8 @@ async def test_setup_creates_socket_and_negotiate_7(monkeypatch, caplog): # Mock RICommandAgent agent startup - # We are sending wrong negotiation info to the communication agent, so we should retry and expect a - # better response, within a limited time. + # We are sending wrong negotiation info to the communication agent, + # so we should retry and expect a etter response, within a limited time. with patch( "control_backend.agents.ri_communication_agent.RICommandAgent", autospec=True ) as MockCommandAgent: @@ -374,7 +400,11 @@ async def test_setup_creates_socket_and_negotiate_7(monkeypatch, caplog): # --- Act --- with caplog.at_level("WARNING"): agent = RICommunicationAgent( - "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False + "test@server", + "password", + pub_socket=fake_pub_socket, + address="tcp://localhost:5555", + bind=False, ) await agent.setup(max_retries=1) @@ -408,11 +438,15 @@ async def test_setup_creates_socket_and_negotiate_timeout(monkeypatch, caplog): fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() fake_pub_socket = AsyncMock() - + # --- Act --- with caplog.at_level("WARNING"): agent = RICommunicationAgent( - "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False + "test@server", + "password", + pub_socket=fake_pub_socket, + address="tcp://localhost:5555", + bind=False, ) await agent.setup(max_retries=1) @@ -544,13 +578,16 @@ async def test_setup_unexpected_exception(monkeypatch, caplog): # Simulate unexpected exception during recv_json() fake_socket.recv_json = AsyncMock(side_effect=Exception("boom!")) - monkeypatch.setattr( "control_backend.agents.ri_communication_agent.context.socket", lambda _: fake_socket ) agent = RICommunicationAgent( - "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False + "test@server", + "password", + pub_socket=fake_pub_socket, + address="tcp://localhost:5555", + bind=False, ) with caplog.at_level("ERROR"): @@ -587,7 +624,11 @@ async def test_setup_unpacking_exception(monkeypatch, caplog): fake_pub_socket = AsyncMock() agent = RICommunicationAgent( - "test@server", "password", pub_socket=fake_pub_socket, address="tcp://localhost:5555", bind=False + "test@server", + "password", + pub_socket=fake_pub_socket, + address="tcp://localhost:5555", + bind=False, ) # --- Act & Assert --- diff --git a/test/integration/api/endpoints/test_robot_endpoint.py b/test/integration/api/endpoints/test_robot_endpoint.py index 827fb17..3fd175f 100644 --- a/test/integration/api/endpoints/test_robot_endpoint.py +++ b/test/integration/api/endpoints/test_robot_endpoint.py @@ -1,7 +1,8 @@ +from unittest.mock import MagicMock + import pytest from fastapi import FastAPI from fastapi.testclient import TestClient -from unittest.mock import MagicMock from control_backend.api.v1.endpoints import robot from control_backend.schemas.ri_message import SpeechCommand diff --git a/test/integration/schemas/test_ri_message.py b/test/integration/schemas/test_ri_message.py index aef9ae6..966b582 100644 --- a/test/integration/schemas/test_ri_message.py +++ b/test/integration/schemas/test_ri_message.py @@ -1,7 +1,8 @@ import pytest -from control_backend.schemas.ri_message import RIMessage, RIEndpoint, SpeechCommand from pydantic import ValidationError +from control_backend.schemas.ri_message import RIEndpoint, RIMessage, SpeechCommand + def valid_command_1(): return SpeechCommand(data="Hallo?") @@ -13,24 +14,14 @@ def invalid_command_1(): def test_valid_speech_command_1(): command = valid_command_1() - try: - RIMessage.model_validate(command) - SpeechCommand.model_validate(command) - assert True - except ValidationError: - assert False + RIMessage.model_validate(command) + SpeechCommand.model_validate(command) + assert True def test_invalid_speech_command_1(): command = invalid_command_1() - passed_ri_message_validation = False - try: - # Should succeed, still. - RIMessage.model_validate(command) - passed_ri_message_validation = True - - # Should fail. + RIMessage.model_validate(command) + with pytest.raises(ValidationError): SpeechCommand.model_validate(command) - assert False - except ValidationError: - assert passed_ri_message_validation + assert True From ca8b57fec51396a63505faafc76beb777bbd8660 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Wed, 5 Nov 2025 16:59:36 +0100 Subject: [PATCH 08/15] fix: robot pings to router ref: N25B-256 --- src/control_backend/api/v1/endpoints/robot.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/control_backend/api/v1/endpoints/robot.py b/src/control_backend/api/v1/endpoints/robot.py index b2ca053..96e32ac 100644 --- a/src/control_backend/api/v1/endpoints/robot.py +++ b/src/control_backend/api/v1/endpoints/robot.py @@ -47,7 +47,7 @@ async def ping_stream(request: Request): sub_socket.setsockopt(zmq.SUBSCRIBE, b"ping") connected = False - ping_frequency = 1 # How many seconds between ping attempts + ping_frequency = 2 # Even though its most likely the updates should alternate # So, True - False - True - False for connectivity. @@ -58,9 +58,10 @@ async def ping_stream(request: Request): topic, body = await asyncio.wait_for( sub_socket.recv_multipart(), timeout=ping_frequency ) - logger.debug("got ping change in ping_stream router") + logger.debug(f"got ping change in ping_stream router: {body}") connected = json.loads(body) except TimeoutError: + logger.debug("got timeout error in ping loop in ping router") await asyncio.sleep(0.1) # Stop if client disconnected @@ -69,7 +70,7 @@ async def ping_stream(request: Request): break logger.debug(f"Yielded new connection event in robot ping router: {str(connected)}") - falseJson = json.dumps(connected) - yield (f"data: {falseJson}\n\n") + connectedJson = json.dumps(connected) + yield (f"data: {connectedJson}\n\n") return StreamingResponse(event_stream(), media_type="text/event-stream") From feb6875a4c3f9a9ef133495a630cf46443785d1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Thu, 6 Nov 2025 14:16:55 +0100 Subject: [PATCH 09/15] fix: make sure that the communication agent reboots propperly. ref: N25B-256 --- .../agents/ri_communication_agent.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/control_backend/agents/ri_communication_agent.py b/src/control_backend/agents/ri_communication_agent.py index fe99ad4..8d72c8a 100644 --- a/src/control_backend/agents/ri_communication_agent.py +++ b/src/control_backend/agents/ri_communication_agent.py @@ -39,6 +39,10 @@ class RICommunicationAgent(BaseAgent): """ assert self.agent is not None + if not self.agent.connected: + await asyncio.sleep(1) + return + # We need to listen and sent pings. message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}} seconds_to_wait_total = 1.0 @@ -63,7 +67,13 @@ class RICommunicationAgent(BaseAgent): # We didnt get a reply :( except TimeoutError: - self.agent.logger.info("No ping retrieved in 3 seconds, killing myself.") + self.agent.logger.info( + f"No ping retrieved in {seconds_to_wait_total} seconds, " + "sending UI disconnection event and soft killing myself." + ) + + # Make sure we dont retry receiving messages untill we're setup. + self.agent.connected = False # Tell UI we're disconnected. topic = b"ping" @@ -84,7 +94,7 @@ class RICommunicationAgent(BaseAgent): ) # Try to reboot. - self.agent.setup() + await self.agent.setup() self.agent.logger.debug('Received message "%s"', message) if "endpoint" not in message: @@ -111,12 +121,11 @@ class RICommunicationAgent(BaseAgent): # Bind request socket if self._req_socket is None or force: self._req_socket = Context.instance().socket(zmq.REQ) - if self._bind: # TODO: Should this ever be the case with new architecture? + if self._bind: self._req_socket.bind(self._address) else: self._req_socket.connect(self._address) - # TODO: Check with Kasper if self.pub_socket is None or force: self.pub_socket = Context.instance().socket(zmq.PUB) self.pub_socket.connect(settings.zmq_settings.internal_pub_address) @@ -231,5 +240,7 @@ class RICommunicationAgent(BaseAgent): self.logger.error( "Initial connection ping for router timed out in ri_communication_agent." ) + + # Make sure to start listening now that we're connected. self.connected = True self.logger.info("Finished setting up %s", self.jid) From be5dc7f04b2e036a3c9ea8e250153a0de039d8ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Thu, 6 Nov 2025 14:26:02 +0100 Subject: [PATCH 10/15] fix: fixed integration tests due to new change ref: N25B-256 --- test/integration/agents/test_ri_communication_agent.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/integration/agents/test_ri_communication_agent.py b/test/integration/agents/test_ri_communication_agent.py index 33051c8..a82a837 100644 --- a/test/integration/agents/test_ri_communication_agent.py +++ b/test/integration/agents/test_ri_communication_agent.py @@ -428,6 +428,7 @@ async def test_listen_behaviour_ping_correct(caplog): # TODO: Integration test between actual server and password needed for spade agents agent = RICommunicationAgent("test@server", "password") agent._req_socket = fake_socket + agent.connected = True behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) @@ -463,6 +464,7 @@ async def test_listen_behaviour_ping_wrong_endpoint(caplog): agent = RICommunicationAgent("test@server", "password", fake_pub_socket) agent._req_socket = fake_socket + agent.connected = True behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) @@ -486,6 +488,7 @@ async def test_listen_behaviour_timeout(zmq_context, caplog): agent = RICommunicationAgent("test@server", "password") agent._req_socket = fake_socket + agent.connected = True behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) @@ -514,6 +517,7 @@ async def test_listen_behaviour_ping_no_endpoint(caplog): agent = RICommunicationAgent("test@server", "password") agent._req_socket = fake_socket + agent.connected = True behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) From 6cc03efdaf911b39763f1e28698fff55a26bf21d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Thu, 6 Nov 2025 14:42:02 +0100 Subject: [PATCH 11/15] feat: new integration tests for robot, making sure to get 100% code coverage ref: N25B-256 --- src/control_backend/api/v1/endpoints/robot.py | 1 - .../api/endpoints/test_robot_endpoint.py | 97 ++++++++++++++++++- 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/src/control_backend/api/v1/endpoints/robot.py b/src/control_backend/api/v1/endpoints/robot.py index 96e32ac..dfa7332 100644 --- a/src/control_backend/api/v1/endpoints/robot.py +++ b/src/control_backend/api/v1/endpoints/robot.py @@ -41,7 +41,6 @@ async def ping_stream(request: Request): # Set up internal socket to receive ping updates logger.debug("Ping stream router event stream entered.") - # TODO: Check with Kasper sub_socket = Context.instance().socket(zmq.SUB) sub_socket.connect(settings.zmq_settings.internal_sub_address) sub_socket.setsockopt(zmq.SUBSCRIBE, b"ping") diff --git a/test/integration/api/endpoints/test_robot_endpoint.py b/test/integration/api/endpoints/test_robot_endpoint.py index 3a2df88..0f71951 100644 --- a/test/integration/api/endpoints/test_robot_endpoint.py +++ b/test/integration/api/endpoints/test_robot_endpoint.py @@ -1,4 +1,5 @@ -from unittest.mock import AsyncMock +import json +from unittest.mock import AsyncMock, MagicMock import pytest from fastapi import FastAPI @@ -59,3 +60,97 @@ def test_receive_command_invalid_payload(client): bad_payload = {"invalid": "data"} response = client.post("/command", json=bad_payload) assert response.status_code == 422 # validation error + + +def test_ping_check_returns_none(client): + """Ensure /ping_check returns 200 and None (currently unimplemented).""" + response = client.get("/ping_check") + assert response.status_code == 200 + assert response.json() is None + + +@pytest.mark.asyncio +async def test_ping_stream_yields_ping_event(monkeypatch): + """Test that ping_stream yields a proper SSE message when a ping is received.""" + mock_sub_socket = AsyncMock() + mock_sub_socket.connect = MagicMock() + mock_sub_socket.setsockopt = MagicMock() + mock_sub_socket.recv_multipart = AsyncMock(return_value=[b"ping", b"true"]) + + mock_context = MagicMock() + mock_context.socket.return_value = mock_sub_socket + monkeypatch.setattr(robot.Context, "instance", lambda: mock_context) + + mock_request = AsyncMock() + mock_request.is_disconnected = AsyncMock(side_effect=[False, True]) + + response = await robot.ping_stream(mock_request) + generator = aiter(response.body_iterator) + + event = await anext(generator) + event_text = event.decode() if isinstance(event, bytes) else str(event) + assert event_text.strip() == "data: true" + + with pytest.raises(StopAsyncIteration): + await anext(generator) + + mock_sub_socket.connect.assert_called_once() + mock_sub_socket.setsockopt.assert_called_once_with(robot.zmq.SUBSCRIBE, b"ping") + mock_sub_socket.recv_multipart.assert_awaited() + + +@pytest.mark.asyncio +async def test_ping_stream_handles_timeout(monkeypatch): + """Test that ping_stream continues looping on TimeoutError.""" + mock_sub_socket = AsyncMock() + mock_sub_socket.connect = MagicMock() + mock_sub_socket.setsockopt = MagicMock() + mock_sub_socket.recv_multipart.side_effect = TimeoutError() + + mock_context = MagicMock() + mock_context.socket.return_value = mock_sub_socket + monkeypatch.setattr(robot.Context, "instance", lambda: mock_context) + + mock_request = AsyncMock() + mock_request.is_disconnected = AsyncMock(return_value=True) + + response = await robot.ping_stream(mock_request) + generator = aiter(response.body_iterator) + + with pytest.raises(StopAsyncIteration): + await anext(generator) + + mock_sub_socket.connect.assert_called_once() + mock_sub_socket.setsockopt.assert_called_once_with(robot.zmq.SUBSCRIBE, b"ping") + mock_sub_socket.recv_multipart.assert_awaited() + + +@pytest.mark.asyncio +async def test_ping_stream_yields_json_values(monkeypatch): + """Ensure ping_stream correctly parses and yields JSON body values.""" + mock_sub_socket = AsyncMock() + mock_sub_socket.connect = MagicMock() + mock_sub_socket.setsockopt = MagicMock() + mock_sub_socket.recv_multipart = AsyncMock( + return_value=[b"ping", json.dumps({"connected": True}).encode()] + ) + + mock_context = MagicMock() + mock_context.socket.return_value = mock_sub_socket + monkeypatch.setattr(robot.Context, "instance", lambda: mock_context) + + mock_request = AsyncMock() + mock_request.is_disconnected = AsyncMock(side_effect=[False, True]) + + response = await robot.ping_stream(mock_request) + generator = aiter(response.body_iterator) + + event = await anext(generator) + event_text = event.decode() if isinstance(event, bytes) else str(event) + + assert "connected" in event_text + assert "true" in event_text + + mock_sub_socket.connect.assert_called_once() + mock_sub_socket.setsockopt.assert_called_once_with(robot.zmq.SUBSCRIBE, b"ping") + mock_sub_socket.recv_multipart.assert_awaited() From 2d1a25e4ae4f97e262006c418cc9a594e47ed516 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Thu, 6 Nov 2025 14:49:54 +0100 Subject: [PATCH 12/15] chore: fixing up logging messages --- src/control_backend/agents/ri_communication_agent.py | 7 ++----- src/control_backend/api/v1/endpoints/robot.py | 3 --- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/src/control_backend/agents/ri_communication_agent.py b/src/control_backend/agents/ri_communication_agent.py index 8d72c8a..960a4b8 100644 --- a/src/control_backend/agents/ri_communication_agent.py +++ b/src/control_backend/agents/ri_communication_agent.py @@ -56,11 +56,8 @@ class RICommunicationAgent(BaseAgent): "we probably dont have any receivers... but let's check!" ) - # Wait up to three seconds for a reply:) + # Wait up to {seconds_to_wait_total/2} seconds for a reply:) try: - self.agent.logger.debug( - f"waiting for message for{seconds_to_wait_total / 2} seconds." - ) message = await asyncio.wait_for( self.agent._req_socket.recv_json(), timeout=seconds_to_wait_total / 2 ) @@ -96,7 +93,7 @@ class RICommunicationAgent(BaseAgent): # Try to reboot. await self.agent.setup() - self.agent.logger.debug('Received message "%s"', message) + self.agent.logger.debug(f'Received message "{message}" from RI.') if "endpoint" not in message: self.agent.logger.error("No received endpoint in message, excepted ping endpoint.") return diff --git a/src/control_backend/api/v1/endpoints/robot.py b/src/control_backend/api/v1/endpoints/robot.py index dfa7332..ccc6bd6 100644 --- a/src/control_backend/api/v1/endpoints/robot.py +++ b/src/control_backend/api/v1/endpoints/robot.py @@ -39,7 +39,6 @@ async def ping_stream(request: Request): async def event_stream(): # Set up internal socket to receive ping updates - logger.debug("Ping stream router event stream entered.") sub_socket = Context.instance().socket(zmq.SUB) sub_socket.connect(settings.zmq_settings.internal_sub_address) @@ -52,12 +51,10 @@ async def ping_stream(request: Request): # So, True - False - True - False for connectivity. # Let's still check:) while True: - logger.debug("Ping stream entered listening ") try: topic, body = await asyncio.wait_for( sub_socket.recv_multipart(), timeout=ping_frequency ) - logger.debug(f"got ping change in ping_stream router: {body}") connected = json.loads(body) except TimeoutError: logger.debug("got timeout error in ping loop in ping router") From debc87c0bb4491a1d984fd476e86cb68e655325f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Tue, 11 Nov 2025 10:18:43 +0100 Subject: [PATCH 13/15] fix: Fix up merging request changes and make sure that there is no racing condition errors, and UI always gets correct information. ref: N25B-256 --- .../agents/ri_communication_agent.py | 47 ++++++---- src/control_backend/api/v1/endpoints/robot.py | 9 +- .../agents/test_ri_communication_agent.py | 86 ++++++++----------- 3 files changed, 67 insertions(+), 75 deletions(-) diff --git a/src/control_backend/agents/ri_communication_agent.py b/src/control_backend/agents/ri_communication_agent.py index 960a4b8..b489338 100644 --- a/src/control_backend/agents/ri_communication_agent.py +++ b/src/control_backend/agents/ri_communication_agent.py @@ -56,28 +56,29 @@ class RICommunicationAgent(BaseAgent): "we probably dont have any receivers... but let's check!" ) - # Wait up to {seconds_to_wait_total/2} seconds for a reply:) + # Wait up to {seconds_to_wait_total/2} seconds for a reply try: message = await asyncio.wait_for( self.agent._req_socket.recv_json(), timeout=seconds_to_wait_total / 2 ) - # We didnt get a reply :( + # We didnt get a reply except TimeoutError: self.agent.logger.info( f"No ping retrieved in {seconds_to_wait_total} seconds, " - "sending UI disconnection event and soft killing myself." + "sending UI disconnection event and attempting to restart." ) # Make sure we dont retry receiving messages untill we're setup. self.agent.connected = False + self.agent.remove_behaviour(self) # Tell UI we're disconnected. topic = b"ping" data = json.dumps(False).encode() if self.agent.pub_socket is None: self.agent.logger.error( - "communication agent pub socket not correctly initialized." + "Communication agent pub socket not correctly initialized." ) else: try: @@ -85,17 +86,20 @@ class RICommunicationAgent(BaseAgent): self.agent.pub_socket.send_multipart([topic, data]), 5 ) except TimeoutError: - self.agent.logger.error( + self.agent.logger.warning( "Initial connection ping for router timed" " out in ri_communication_agent." ) # Try to reboot. + self.agent.logger.debug("Restarting communication agent.") await self.agent.setup() self.agent.logger.debug(f'Received message "{message}" from RI.') if "endpoint" not in message: - self.agent.logger.error("No received endpoint in message, excepted ping endpoint.") + self.agent.logger.warning( + "No received endpoint in message, expected ping endpoint." + ) return # See what endpoint we received @@ -107,7 +111,7 @@ class RICommunicationAgent(BaseAgent): await self.agent.pub_socket.send_multipart([topic, data]) await asyncio.sleep(1) case _: - self.agent.logger.info( + self.agent.logger.debug( "Received message with topic different than ping, while ping expected." ) @@ -143,16 +147,20 @@ class RICommunicationAgent(BaseAgent): if self._req_socket is None: continue - # Send our message and receive one back:) + # Send our message and receive one back message = {"endpoint": "negotiate/ports", "data": {}} await self._req_socket.send_json(message) + retry_frequency = 1.0 try: - received_message = await asyncio.wait_for(self._req_socket.recv_json(), timeout=1.0) + received_message = await asyncio.wait_for( + self._req_socket.recv_json(), timeout=retry_frequency + ) except TimeoutError: self.logger.warning( - "No connection established in 20 seconds (attempt %d/%d)", + "No connection established in %d seconds (attempt %d/%d)", + retries * retry_frequency, retries + 1, max_retries, ) @@ -160,21 +168,21 @@ class RICommunicationAgent(BaseAgent): continue except Exception as e: - self.logger.error("Unexpected error during negotiation: %s", e) + self.logger.warning("Unexpected error during negotiation: %s", e) retries += 1 continue # Validate endpoint endpoint = received_message.get("endpoint") if endpoint != "negotiate/ports": - # TODO: Should this send a message back? - self.logger.error( + self.logger.warning( "Invalid endpoint '%s' received (attempt %d/%d)", endpoint, retries + 1, max_retries, ) retries += 1 + await asyncio.sleep(1) continue # At this point, we have a valid response @@ -194,7 +202,7 @@ class RICommunicationAgent(BaseAgent): if addr != self._address: if not bind: self._req_socket.connect(addr) - else: # TODO: Should this ever be the case? + else: self._req_socket.bind(addr) case "actuation": ri_commands_agent = RICommandAgent( @@ -210,31 +218,32 @@ class RICommunicationAgent(BaseAgent): self.logger.warning("Unhandled negotiation id: %s", id) except Exception as e: - self.logger.error("Error unpacking negotiation data: %s", e) + self.logger.warning("Error unpacking negotiation data: %s", e) retries += 1 + await asyncio.sleep(1) continue # setup succeeded break else: - self.logger.error("Failed to set up RICommunicationAgent after %d retries", max_retries) + self.logger.error("Failed to set up %s after %d retries", self.name, max_retries) return # Set up ping behaviour listen_behaviour = self.ListenBehaviour() self.add_behaviour(listen_behaviour) - # Let UI know that we're connected >:) + # Let UI know that we're connected topic = b"ping" data = json.dumps(True).encode() if self.pub_socket is None: - self.logger.error("communication agent pub socket not correctly initialized.") + self.logger.error("Communication agent pub socket not correctly initialized.") else: try: await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5) except TimeoutError: - self.logger.error( + self.logger.warning( "Initial connection ping for router timed out in ri_communication_agent." ) diff --git a/src/control_backend/api/v1/endpoints/robot.py b/src/control_backend/api/v1/endpoints/robot.py index ccc6bd6..eb67b0e 100644 --- a/src/control_backend/api/v1/endpoints/robot.py +++ b/src/control_backend/api/v1/endpoints/robot.py @@ -21,7 +21,6 @@ async def receive_command(command: SpeechCommand, request: Request): SpeechCommand.model_validate(command) topic = b"command" - # TODO: Check with Kasper pub_socket: Socket = request.app.state.endpoints_pub_socket await pub_socket.send_multipart([topic, command.model_dump_json().encode()]) @@ -48,8 +47,8 @@ async def ping_stream(request: Request): ping_frequency = 2 # Even though its most likely the updates should alternate - # So, True - False - True - False for connectivity. - # Let's still check:) + # (So, True - False - True - False for connectivity), + # let's still check. while True: try: topic, body = await asyncio.wait_for( @@ -58,11 +57,11 @@ async def ping_stream(request: Request): connected = json.loads(body) except TimeoutError: logger.debug("got timeout error in ping loop in ping router") - await asyncio.sleep(0.1) + connected = False # Stop if client disconnected if await request.is_disconnected(): - print("Client disconnected from SSE") + logger.info("Client disconnected from SSE") break logger.debug(f"Yielded new connection event in robot ping router: {str(connected)}") diff --git a/test/integration/agents/test_ri_communication_agent.py b/test/integration/agents/test_ri_communication_agent.py index a82a837..1925afa 100644 --- a/test/integration/agents/test_ri_communication_agent.py +++ b/test/integration/agents/test_ri_communication_agent.py @@ -196,14 +196,14 @@ async def test_setup_creates_socket_and_negotiate_3(zmq_context, caplog): fake_agent_instance = MockCommandAgent.return_value fake_agent_instance.start = AsyncMock() # --- Act --- - with caplog.at_level("ERROR"): - agent = RICommunicationAgent( - "test@server", - "password", - address="tcp://localhost:5555", - bind=False, - ) - await agent.setup(max_retries=1) + + agent = RICommunicationAgent( + "test@server", + "password", + address="tcp://localhost:5555", + bind=False, + ) + await agent.setup(max_retries=1) # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") @@ -211,7 +211,6 @@ async def test_setup_creates_socket_and_negotiate_3(zmq_context, caplog): # Since it failed, there should not be any command agent. fake_agent_instance.start.assert_not_awaited() - assert "Failed to set up RICommunicationAgent" in caplog.text # Ensure the agent did not attach a ListenBehaviour assert not any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @@ -362,14 +361,14 @@ async def test_setup_creates_socket_and_negotiate_7(zmq_context, caplog): fake_agent_instance.start = AsyncMock() # --- Act --- - with caplog.at_level("WARNING"): - agent = RICommunicationAgent( - "test@server", - "password", - address="tcp://localhost:5555", - bind=False, - ) - await agent.setup(max_retries=1) + + agent = RICommunicationAgent( + "test@server", + "password", + address="tcp://localhost:5555", + bind=False, + ) + await agent.setup(max_retries=1) # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") @@ -377,7 +376,6 @@ async def test_setup_creates_socket_and_negotiate_7(zmq_context, caplog): # Since it failed, there should not be any command agent. fake_agent_instance.start.assert_not_awaited() - assert "Unhandled negotiation id:" in caplog.text @pytest.mark.asyncio @@ -398,21 +396,20 @@ async def test_setup_creates_socket_and_negotiate_timeout(zmq_context, caplog): fake_agent_instance.start = AsyncMock() # --- Act --- - with caplog.at_level("WARNING"): - agent = RICommunicationAgent( - "test@server", - "password", - address="tcp://localhost:5555", - bind=False, - ) - await agent.setup(max_retries=1) + + agent = RICommunicationAgent( + "test@server", + "password", + address="tcp://localhost:5555", + bind=False, + ) + await agent.setup(max_retries=1) # --- Assert --- fake_socket.connect.assert_any_call("tcp://localhost:5555") # Since it failed, there should not be any command agent. fake_agent_instance.start.assert_not_awaited() - assert "No connection established in 20 seconds" in caplog.text # Ensure the agent did not attach a ListenBehaviour assert not any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) @@ -425,7 +422,6 @@ async def test_listen_behaviour_ping_correct(caplog): fake_socket.recv_json = AsyncMock(return_value={"endpoint": "ping", "data": {}}) fake_socket.send_multipart = AsyncMock() - # TODO: Integration test between actual server and password needed for spade agents agent = RICommunicationAgent("test@server", "password") agent._req_socket = fake_socket agent.connected = True @@ -433,13 +429,10 @@ async def test_listen_behaviour_ping_correct(caplog): behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) - # Run once (CyclicBehaviour normally loops) - with caplog.at_level("DEBUG"): - await behaviour.run() + await behaviour.run() fake_socket.send_json.assert_awaited() fake_socket.recv_json.assert_awaited() - assert "Received message" in caplog.text @pytest.mark.asyncio @@ -470,10 +463,9 @@ async def test_listen_behaviour_ping_wrong_endpoint(caplog): agent.add_behaviour(behaviour) # Run once (CyclicBehaviour normally loops) - with caplog.at_level("INFO"): - await behaviour.run() - assert "Received message with topic different than ping, while ping expected." in caplog.text + await behaviour.run() + fake_socket.send_json.assert_awaited() fake_socket.recv_json.assert_awaited() @@ -493,10 +485,9 @@ async def test_listen_behaviour_timeout(zmq_context, caplog): behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) - with caplog.at_level("INFO"): - await behaviour.run() - - assert "No ping" in caplog.text + await behaviour.run() + assert not any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) + assert not agent.connected @pytest.mark.asyncio @@ -522,11 +513,8 @@ async def test_listen_behaviour_ping_no_endpoint(caplog): behaviour = agent.ListenBehaviour() agent.add_behaviour(behaviour) - # Run once (CyclicBehaviour normally loops) - with caplog.at_level("ERROR"): - await behaviour.run() + await behaviour.run() - assert "No received endpoint in message, excepted ping endpoint." in caplog.text fake_socket.send_json.assert_awaited() fake_socket.recv_json.assert_awaited() @@ -546,11 +534,10 @@ async def test_setup_unexpected_exception(zmq_context, caplog): bind=False, ) - with caplog.at_level("ERROR"): - await agent.setup(max_retries=1) + await agent.setup(max_retries=1) - # Ensure that the error was logged - assert "Unexpected error during negotiation: boom!" in caplog.text + assert not any(isinstance(b, agent.ListenBehaviour) for b in agent.behaviours) + assert not agent.connected @pytest.mark.asyncio @@ -582,11 +569,8 @@ async def test_setup_unpacking_exception(zmq_context, caplog): ) # --- Act & Assert --- - with caplog.at_level("ERROR"): - await agent.setup(max_retries=1) - # Ensure the unpacking exception was logged - assert "Error unpacking negotiation data" in caplog.text + await agent.setup(max_retries=1) # Ensure no command agent was started fake_agent_instance.start.assert_not_awaited() From 41993a902b19dea4e899f6b6194f31b4cf02d4f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Mon, 17 Nov 2025 16:04:54 +0100 Subject: [PATCH 14/15] chore: remove caplog from test cases --- .../agents/test_ri_commands_agent.py | 6 ++---- .../agents/test_ri_communication_agent.py | 18 +++++++++--------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/test/integration/agents/test_ri_commands_agent.py b/test/integration/agents/test_ri_commands_agent.py index ea9fca9..477ab78 100644 --- a/test/integration/agents/test_ri_commands_agent.py +++ b/test/integration/agents/test_ri_commands_agent.py @@ -73,7 +73,7 @@ async def test_send_commands_behaviour_valid_message(): @pytest.mark.asyncio -async def test_send_commands_behaviour_invalid_message(caplog): +async def test_send_commands_behaviour_invalid_message(): """Test behaviour with invalid JSON message triggers error logging""" fake_socket = AsyncMock() fake_socket.recv_multipart = AsyncMock(return_value=(b"command", b"{invalid_json}")) @@ -86,9 +86,7 @@ async def test_send_commands_behaviour_invalid_message(caplog): behaviour = agent.SendCommandsBehaviour() behaviour.agent = agent - with caplog.at_level("ERROR"): - await behaviour.run() + await behaviour.run() fake_socket.recv_multipart.assert_awaited() fake_socket.send_json.assert_not_awaited() - assert "Error processing message" in caplog.text diff --git a/test/integration/agents/test_ri_communication_agent.py b/test/integration/agents/test_ri_communication_agent.py index 1925afa..6e29340 100644 --- a/test/integration/agents/test_ri_communication_agent.py +++ b/test/integration/agents/test_ri_communication_agent.py @@ -176,7 +176,7 @@ async def test_setup_creates_socket_and_negotiate_2(zmq_context): @pytest.mark.asyncio -async def test_setup_creates_socket_and_negotiate_3(zmq_context, caplog): +async def test_setup_creates_socket_and_negotiate_3(zmq_context): """ Test the functionality of setup with incorrect negotiation message """ @@ -340,7 +340,7 @@ async def test_setup_creates_socket_and_negotiate_6(zmq_context): @pytest.mark.asyncio -async def test_setup_creates_socket_and_negotiate_7(zmq_context, caplog): +async def test_setup_creates_socket_and_negotiate_7(zmq_context): """ Test the functionality of setup with incorrect id """ @@ -379,7 +379,7 @@ async def test_setup_creates_socket_and_negotiate_7(zmq_context, caplog): @pytest.mark.asyncio -async def test_setup_creates_socket_and_negotiate_timeout(zmq_context, caplog): +async def test_setup_creates_socket_and_negotiate_timeout(zmq_context): """ Test the functionality of setup with incorrect negotiation message """ @@ -416,7 +416,7 @@ async def test_setup_creates_socket_and_negotiate_timeout(zmq_context, caplog): @pytest.mark.asyncio -async def test_listen_behaviour_ping_correct(caplog): +async def test_listen_behaviour_ping_correct(): fake_socket = AsyncMock() fake_socket.send_json = AsyncMock() fake_socket.recv_json = AsyncMock(return_value={"endpoint": "ping", "data": {}}) @@ -436,7 +436,7 @@ async def test_listen_behaviour_ping_correct(caplog): @pytest.mark.asyncio -async def test_listen_behaviour_ping_wrong_endpoint(caplog): +async def test_listen_behaviour_ping_wrong_endpoint(): """ Test if our listen behaviour can work with wrong messages (wrong endpoint) """ @@ -471,7 +471,7 @@ async def test_listen_behaviour_ping_wrong_endpoint(caplog): @pytest.mark.asyncio -async def test_listen_behaviour_timeout(zmq_context, caplog): +async def test_listen_behaviour_timeout(zmq_context): fake_socket = zmq_context.return_value.socket.return_value fake_socket.send_json = AsyncMock() # recv_json will never resolve, simulate timeout @@ -491,7 +491,7 @@ async def test_listen_behaviour_timeout(zmq_context, caplog): @pytest.mark.asyncio -async def test_listen_behaviour_ping_no_endpoint(caplog): +async def test_listen_behaviour_ping_no_endpoint(): """ Test if our listen behaviour can work with wrong messages (wrong endpoint) """ @@ -520,7 +520,7 @@ async def test_listen_behaviour_ping_no_endpoint(caplog): @pytest.mark.asyncio -async def test_setup_unexpected_exception(zmq_context, caplog): +async def test_setup_unexpected_exception(zmq_context): fake_socket = zmq_context.return_value.socket.return_value fake_socket.send_json = AsyncMock() # Simulate unexpected exception during recv_json() @@ -541,7 +541,7 @@ async def test_setup_unexpected_exception(zmq_context, caplog): @pytest.mark.asyncio -async def test_setup_unpacking_exception(zmq_context, caplog): +async def test_setup_unpacking_exception(zmq_context): # --- Arrange --- fake_socket = zmq_context.return_value.socket.return_value fake_socket.send_json = AsyncMock() From 2eefcc4553a73a5c8cd8c6772441d13e73571488 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Mon, 17 Nov 2025 16:09:02 +0100 Subject: [PATCH 15/15] chore: fix error messages to be warnings. --- src/control_backend/agents/ri_communication_agent.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/control_backend/agents/ri_communication_agent.py b/src/control_backend/agents/ri_communication_agent.py index b489338..93fbf6c 100644 --- a/src/control_backend/agents/ri_communication_agent.py +++ b/src/control_backend/agents/ri_communication_agent.py @@ -77,7 +77,7 @@ class RICommunicationAgent(BaseAgent): topic = b"ping" data = json.dumps(False).encode() if self.agent.pub_socket is None: - self.agent.logger.error( + self.agent.logger.warning( "Communication agent pub socket not correctly initialized." ) else: @@ -227,7 +227,7 @@ class RICommunicationAgent(BaseAgent): break else: - self.logger.error("Failed to set up %s after %d retries", self.name, max_retries) + self.logger.warning("Failed to set up %s after %d retries", self.name, max_retries) return # Set up ping behaviour @@ -238,7 +238,7 @@ class RICommunicationAgent(BaseAgent): topic = b"ping" data = json.dumps(True).encode() if self.pub_socket is None: - self.logger.error("Communication agent pub socket not correctly initialized.") + self.logger.warning("Communication agent pub socket not correctly initialized.") else: try: await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5)