import asyncio import json import logging from spade.agent import Agent from spade.behaviour import CyclicBehaviour import zmq from control_backend.core.config import settings from control_backend.core.zmq_context import context from control_backend.schemas.message import Message from control_backend.agents.ri_command_agent import RICommandAgent logger = logging.getLogger(__name__) class RICommunicationAgent(Agent): req_socket: zmq.Socket _address = "" _bind = True def __init__( self, jid: str, password: str, port: int = 5222, verify_security: bool = False, address="tcp://localhost:0000", bind=False, ): super().__init__(jid, password, port, verify_security) self._address = address self._bind = bind class ListenBehaviour(CyclicBehaviour): async def run(self): """ Run the listening (ping) loop indefinetely. """ assert self.agent is not None # We need to listen and sent pings. message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}} await self.agent.req_socket.send_json(message) # Wait up to three seconds for a reply:) try: message = await asyncio.wait_for(self.agent.req_socket.recv_json(), timeout=3.0) # We didnt get a reply :( except asyncio.TimeoutError as e: logger.info("No ping retrieved in 3 seconds, killing myself.") self.kill() logger.debug('Received message "%s"', message) if "endpoint" not in message: logger.error("No received endpoint in message, excepted ping endpoint.") return # See what endpoint we received match message["endpoint"]: case "ping": await asyncio.sleep(1) case _: logger.info( "Received message with topic different than ping, while ping expected." ) async def setup(self, max_retries: int = 5): """ Try to setup the communication agent, we have 5 retries in case we dont have a response yet. """ logger.info("Setting up %s", self.jid) retries = 0 # Let's try a certain amount of times before failing connection while retries < max_retries: # Bind request socket self.req_socket = context.socket(zmq.REQ) if self._bind: self.req_socket.bind(self._address) else: self.req_socket.connect(self._address) # Send our message and receive one back:) message = {"endpoint": "negotiate/ports", "data": None} await self.req_socket.send_json(message) try: received_message = await asyncio.wait_for(self.req_socket.recv_json(), timeout=20.0) except asyncio.TimeoutError: logger.warning( "No connection established in 20 seconds (attempt %d/%d)", retries + 1, max_retries, ) retries += 1 continue except Exception as e: logger.error("Unexpected error during negotiation: %s", e) retries += 1 continue # Validate endpoint endpoint = received_message.get("endpoint") if endpoint != "negotiate/ports": # TODO: Should this send a message back? logger.error( "Invalid endpoint '%s' received (attempt %d/%d)", endpoint, retries + 1, max_retries, ) retries += 1 continue # At this point, we have a valid response try: for port_data in received_message["data"]: id = port_data["id"] port = port_data["port"] bind = port_data["bind"] if not bind: addr = f"tcp://localhost:{port}" else: addr = f"tcp://*:{port}" match id: case "main": if addr != self._address: if not bind: self.req_socket.connect(addr) else: self.req_socket.bind(addr) case "actuation": ri_commands_agent = RICommandAgent( settings.agent_settings.ri_command_agent_name + "@" + settings.agent_settings.host, settings.agent_settings.ri_command_agent_name, address=addr, bind=bind, ) await ri_commands_agent.start() case _: logger.warning("Unhandled negotiation id: %s", id) except Exception as e: logger.error("Error unpacking negotiation data: %s", e) retries += 1 continue # setup succeeded break else: logger.error("Failed to set up RICommunicationAgent after %d retries", max_retries) return # Set up ping behaviour listen_behaviour = self.ListenBehaviour() self.add_behaviour(listen_behaviour) logger.info("Finished setting up %s", self.jid)