From 4f2d45fb44c17fe4575a10261f605c55c314c36c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Wed, 29 Oct 2025 21:55:23 +0100 Subject: [PATCH] feat: fixed socket typing for communication agent and ping router- automatically try to reconnect with robot. ref: N25B-151 --- .../agents/ri_communication_agent.py | 58 ++++++++++++------- src/control_backend/api/v1/endpoints/robot.py | 10 ++-- src/control_backend/main.py | 7 ++- 3 files changed, 47 insertions(+), 28 deletions(-) diff --git a/src/control_backend/agents/ri_communication_agent.py b/src/control_backend/agents/ri_communication_agent.py index 4da2c69..f46c623 100644 --- a/src/control_backend/agents/ri_communication_agent.py +++ b/src/control_backend/agents/ri_communication_agent.py @@ -4,6 +4,7 @@ import logging from spade.agent import Agent from spade.behaviour import CyclicBehaviour import zmq +import zmq.asyncio from control_backend.core.config import settings @@ -16,7 +17,8 @@ logger = logging.getLogger(__name__) class RICommunicationAgent(Agent): - req_socket: zmq.Socket + _pub_socket: zmq.asyncio.Socket + req_socket: zmq.asyncio.Socket | None _address = "" _bind = True connected = False @@ -25,14 +27,18 @@ class RICommunicationAgent(Agent): self, jid: str, password: str, + pub_socket: zmq.asyncio.Socket, port: int = 5222, verify_security: bool = False, address="tcp://localhost:0000", bind=False, + ): super().__init__(jid, password, port, verify_security) self._address = address self._bind = bind + self.req_socket = None + self._pub_socket = pub_socket class ListenBehaviour(CyclicBehaviour): async def run(self): @@ -43,7 +49,7 @@ class RICommunicationAgent(Agent): # We need to listen and sent pings. message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}} - seconds_to_wait_total = 4.0 + seconds_to_wait_total = 1.0 try: await asyncio.wait_for( self.agent.req_socket.send_json(message), timeout=seconds_to_wait_total / 2 @@ -62,16 +68,12 @@ class RICommunicationAgent(Agent): # We didnt get a reply :( except TimeoutError as e: - logger.info(f"No ping back retrieved in {seconds_to_wait_total/2} seconds totalling {seconds_to_wait_total} of time, killing myself.") - self.agent.connected = False + logger.info(f"No ping back retrieved in {seconds_to_wait_total/2} seconds totalling {seconds_to_wait_total} of time, killing myself (or maybe just laying low).") # TODO: Send event to UI letting know that we've lost connection topic = b"ping" data = json.dumps(False).encode() - pub_socket = context.socket(zmq.PUB) - pub_socket.connect(settings.zmq_settings.internal_comm_address) - pub_socket.send_multipart([topic, data]) - - self.kill() + self.agent._pub_socket.send_multipart([topic, data]) + await self.agent.setup() except Exception as e: logger.debug(f"Differennt exception: {e}") @@ -90,25 +92,38 @@ class RICommunicationAgent(Agent): "Received message with topic different than ping, while ping expected." ) - async def setup(self, max_retries: int = 5): - """ - Try to setup the communication agent, we have 5 retries in case we dont have a response yet. - """ - logger.info("Setting up %s", self.jid) - retries = 0 - - # Let's try a certain amount of times before failing connection - while retries < max_retries: - # Bind request socket + async def setup_req_socket(self, force = False): + """ + Sets up request socket for communication agent. + """ + if self.req_socket is None or force: self.req_socket = context.socket(zmq.REQ) if self._bind: self.req_socket.bind(self._address) else: self.req_socket.connect(self._address) + + async def setup(self, max_retries: int = 5): + """ + Try to setup the communication agent, we have 5 retries in case we dont have a response yet. + """ + logger.info("Setting up %s", self.jid) + + # Bind request socket + await self.setup_req_socket() + + retries = 0 + # Let's try a certain amount of times before failing connection + while retries < max_retries: + + # Make sure the socket is properly setup. + if self.req_socket is None: + continue + # Send our message and receive one back:) - message = {"endpoint": "negotiate/ports", "data": None} + message = {"endpoint": "negotiate/ports", "data": {}} await self.req_socket.send_json(message) try: @@ -190,5 +205,8 @@ class RICommunicationAgent(Agent): self.add_behaviour(listen_behaviour) # TODO: Let UI know that we're connected >:) + topic = b"ping" + data = json.dumps(True).encode() + await self._pub_socket.send_multipart([topic, data]) self.connected = True logger.info("Finished setting up %s", self.jid) diff --git a/src/control_backend/api/v1/endpoints/robot.py b/src/control_backend/api/v1/endpoints/robot.py index 1d0da9c..aa1b532 100644 --- a/src/control_backend/api/v1/endpoints/robot.py +++ b/src/control_backend/api/v1/endpoints/robot.py @@ -6,7 +6,7 @@ import zmq.asyncio import json import datetime -from zmq import Socket +from zmq.asyncio import Socket from control_backend.core.zmq_context import context from control_backend.core.config import settings from control_backend.schemas.ri_message import SpeechCommand, RIEndpoint @@ -24,7 +24,7 @@ async def receive_command(command: SpeechCommand, request: Request): # Validate and retrieve data. SpeechCommand.model_validate(command) topic = b"command" - pub_socket: Socket = request.app.state.internal_comm_socket + pub_socket : Socket = request.app.state.internal_comm_socket pub_socket.send_multipart([topic, command.model_dump_json().encode()]) return {"status": "Command received"} @@ -41,7 +41,8 @@ async def ping_stream(request: Request): async def event_stream(): # Set up internal socket to receive ping updates logger.debug("Ping stream router event stream entered.") - sub_socket = zmq.asyncio.Context().socket(zmq.SUB) + + sub_socket = context.socket(zmq.SUB) sub_socket.connect(settings.zmq_settings.internal_comm_address) sub_socket.setsockopt(zmq.SUBSCRIBE, b"ping") connected = True @@ -69,6 +70,5 @@ async def ping_stream(request: Request): logger.debug(f"Yielded new connection event in robot ping router: {str(connected)}") yield f"data: {str(connected)}, time:{str(datetime.datetime.now().strftime("%H:%M:%S"))}\n\n" - - + return StreamingResponse(event_stream(), media_type="text/event-stream") \ No newline at end of file diff --git a/src/control_backend/main.py b/src/control_backend/main.py index e398552..bd0cc74 100644 --- a/src/control_backend/main.py +++ b/src/control_backend/main.py @@ -32,11 +32,12 @@ async def lifespan(app: FastAPI): # Initiate agents ri_communication_agent = RICommunicationAgent( - settings.agent_settings.ri_communication_agent_name + "@" + settings.agent_settings.host, - settings.agent_settings.ri_communication_agent_name, + jid=settings.agent_settings.ri_communication_agent_name + "@" + settings.agent_settings.host, + password=settings.agent_settings.ri_communication_agent_name, + pub_socket=internal_comm_socket, address="tcp://*:5555", bind=True, - ) + ) await ri_communication_agent.start() bdi_core = BDICoreAgent(