import asyncio import json import zmq.asyncio from spade.behaviour import CyclicBehaviour from zmq.asyncio import Context from control_backend.agents import BaseAgent from control_backend.core.config import settings from .ri_command_agent import RICommandAgent class RICommunicationAgent(BaseAgent): req_socket: zmq.Socket _address = "" _bind = True connected = False def __init__( self, jid: str, password: str, port: int = 5222, verify_security: bool = False, address="tcp://localhost:0000", bind=False, ): super().__init__(jid, password, port, verify_security) self._address = address self._bind = bind self._req_socket: zmq.asyncio.Socket | None = None self.pub_socket: zmq.asyncio.Socket | None = None class ListenBehaviour(CyclicBehaviour): async def run(self): """ Run the listening (ping) loop indefinetely. """ assert self.agent is not None if not self.agent.connected: await asyncio.sleep(1) return # We need to listen and sent pings. message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}} seconds_to_wait_total = 1.0 try: await asyncio.wait_for( self.agent._req_socket.send_json(message), timeout=seconds_to_wait_total / 2 ) except TimeoutError: self.agent.logger.debug( "Waited too long to send message - " "we probably dont have any receivers... but let's check!" ) # Wait up to {seconds_to_wait_total/2} seconds for a reply try: message = await asyncio.wait_for( self.agent._req_socket.recv_json(), timeout=seconds_to_wait_total / 2 ) # We didnt get a reply except TimeoutError: self.agent.logger.info( f"No ping retrieved in {seconds_to_wait_total} seconds, " "sending UI disconnection event and attempting to restart." ) # Make sure we dont retry receiving messages untill we're setup. self.agent.connected = False self.agent.remove_behaviour(self) # Tell UI we're disconnected. topic = b"ping" data = json.dumps(False).encode() if self.agent.pub_socket is None: self.agent.logger.warning( "Communication agent pub socket not correctly initialized." ) else: try: await asyncio.wait_for( self.agent.pub_socket.send_multipart([topic, data]), 5 ) except TimeoutError: self.agent.logger.warning( "Initial connection ping for router timed" " out in ri_communication_agent." ) # Try to reboot. self.agent.logger.debug("Restarting communication agent.") await self.agent.setup() self.agent.logger.debug(f'Received message "{message}" from RI.') if "endpoint" not in message: self.agent.logger.warning( "No received endpoint in message, expected ping endpoint." ) return # See what endpoint we received match message["endpoint"]: case "ping": topic = b"ping" data = json.dumps(True).encode() if self.agent.pub_socket is not None: await self.agent.pub_socket.send_multipart([topic, data]) await asyncio.sleep(1) case _: self.agent.logger.debug( "Received message with topic different than ping, while ping expected." ) async def setup_sockets(self, force=False): """ Sets up request socket for communication agent. """ # Bind request socket if self._req_socket is None or force: self._req_socket = Context.instance().socket(zmq.REQ) if self._bind: self._req_socket.bind(self._address) else: self._req_socket.connect(self._address) if self.pub_socket is None or force: self.pub_socket = Context.instance().socket(zmq.PUB) self.pub_socket.connect(settings.zmq_settings.internal_pub_address) async def setup(self, max_retries: int = 100): """ Try to setup the communication agent, we have 5 retries in case we dont have a response yet. """ self.logger.info("Setting up %s", self.jid) # Bind request socket await self.setup_sockets() retries = 0 # Let's try a certain amount of times before failing connection while retries < max_retries: # Make sure the socket is properly setup. if self._req_socket is None: continue # Send our message and receive one back message = {"endpoint": "negotiate/ports", "data": {}} await self._req_socket.send_json(message) retry_frequency = 1.0 try: received_message = await asyncio.wait_for( self._req_socket.recv_json(), timeout=retry_frequency ) except TimeoutError: self.logger.warning( "No connection established in %d seconds (attempt %d/%d)", retries * retry_frequency, retries + 1, max_retries, ) retries += 1 continue except Exception as e: self.logger.warning("Unexpected error during negotiation: %s", e) retries += 1 continue # Validate endpoint endpoint = received_message.get("endpoint") if endpoint != "negotiate/ports": self.logger.warning( "Invalid endpoint '%s' received (attempt %d/%d)", endpoint, retries + 1, max_retries, ) retries += 1 await asyncio.sleep(1) continue # At this point, we have a valid response try: for port_data in received_message["data"]: id = port_data["id"] port = port_data["port"] bind = port_data["bind"] if not bind: addr = f"tcp://localhost:{port}" else: addr = f"tcp://*:{port}" match id: case "main": if addr != self._address: if not bind: self._req_socket.connect(addr) else: self._req_socket.bind(addr) case "actuation": ri_commands_agent = RICommandAgent( settings.agent_settings.ri_command_agent_name + "@" + settings.agent_settings.host, settings.agent_settings.ri_command_agent_name, address=addr, bind=bind, ) await ri_commands_agent.start() case _: self.logger.warning("Unhandled negotiation id: %s", id) except Exception as e: self.logger.warning("Error unpacking negotiation data: %s", e) retries += 1 await asyncio.sleep(1) continue # setup succeeded break else: self.logger.warning("Failed to set up %s after %d retries", self.name, max_retries) return # Set up ping behaviour listen_behaviour = self.ListenBehaviour() self.add_behaviour(listen_behaviour) # Let UI know that we're connected topic = b"ping" data = json.dumps(True).encode() if self.pub_socket is None: self.logger.warning("Communication agent pub socket not correctly initialized.") else: try: await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5) except TimeoutError: self.logger.warning( "Initial connection ping for router timed out in ri_communication_agent." ) # Make sure to start listening now that we're connected. self.connected = True self.logger.info("Finished setting up %s", self.jid)