Merged the latest changes from dev into this branch to stay up to date with current development. ref: N25B-236
165 lines
5.8 KiB
Python
165 lines
5.8 KiB
Python
import asyncio
|
|
import logging
|
|
|
|
import zmq
|
|
from spade.agent import Agent
|
|
from spade.behaviour import CyclicBehaviour
|
|
from zmq.asyncio import Context
|
|
|
|
from control_backend.agents.ri_command_agent import RICommandAgent
|
|
from control_backend.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RICommunicationAgent(Agent):
|
|
req_socket: zmq.Socket
|
|
_address = ""
|
|
_bind = True
|
|
|
|
def __init__(
|
|
self,
|
|
jid: str,
|
|
password: str,
|
|
port: int = settings.agent_settings.default_spade_port,
|
|
verify_security: bool = False,
|
|
address=settings.zmq_settings.ri_command_address,
|
|
bind=False,
|
|
):
|
|
super().__init__(jid, password, port, verify_security)
|
|
self._address = address
|
|
self._bind = bind
|
|
|
|
class ListenBehaviour(CyclicBehaviour):
|
|
async def run(self):
|
|
"""
|
|
Run the listening (ping) loop indefinetely.
|
|
"""
|
|
assert self.agent is not None
|
|
|
|
# We need to listen and sent pings.
|
|
message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}}
|
|
await self.agent.req_socket.send_json(message)
|
|
|
|
# Wait up to three seconds for a reply:)
|
|
try:
|
|
message = await asyncio.wait_for(self.agent.req_socket.recv_json(), timeout=3.0)
|
|
|
|
# We didnt get a reply :(
|
|
except TimeoutError:
|
|
logger.info("No ping retrieved in 3 seconds, killing myself.")
|
|
self.kill()
|
|
|
|
logger.debug('Received message "%s"', message)
|
|
if "endpoint" not in message:
|
|
logger.error("No received endpoint in message, excepted ping endpoint.")
|
|
return
|
|
|
|
# See what endpoint we received
|
|
match message["endpoint"]:
|
|
case "ping":
|
|
await asyncio.sleep(settings.agent_settings.behaviour_settings.ping_sleep_s)
|
|
case _:
|
|
logger.info(
|
|
"Received message with topic different than ping, while ping expected."
|
|
)
|
|
|
|
async def setup(self, max_retries: int = settings.behaviour_settings.comm_setup_max_retries):
|
|
"""
|
|
Try to setup the communication agent, we have 5 retries in case we dont have a response yet.
|
|
"""
|
|
logger.info("Setting up %s", self.jid)
|
|
retries = 0
|
|
|
|
# Let's try a certain amount of times before failing connection
|
|
while retries < max_retries:
|
|
# Bind request socket
|
|
self.req_socket = Context.instance().socket(zmq.REQ)
|
|
if self._bind:
|
|
self.req_socket.bind(self._address)
|
|
else:
|
|
self.req_socket.connect(self._address)
|
|
|
|
# Send our message and receive one back:)
|
|
message = {"endpoint": "negotiate/ports", "data": None}
|
|
await self.req_socket.send_json(message)
|
|
|
|
try:
|
|
received_message = await asyncio.wait_for(self.req_socket.recv_json(), timeout=20.0)
|
|
|
|
except TimeoutError:
|
|
logger.warning(
|
|
"No connection established in 20 seconds (attempt %d/%d)",
|
|
retries + 1,
|
|
max_retries,
|
|
)
|
|
retries += 1
|
|
continue
|
|
|
|
except Exception as e:
|
|
logger.error("Unexpected error during negotiation: %s", e)
|
|
retries += 1
|
|
continue
|
|
|
|
# Validate endpoint
|
|
endpoint = received_message.get("endpoint")
|
|
if endpoint != "negotiate/ports":
|
|
# TODO: Should this send a message back?
|
|
logger.error(
|
|
"Invalid endpoint '%s' received (attempt %d/%d)",
|
|
endpoint,
|
|
retries + 1,
|
|
max_retries,
|
|
)
|
|
retries += 1
|
|
continue
|
|
|
|
# At this point, we have a valid response
|
|
try:
|
|
for port_data in received_message["data"]:
|
|
id = port_data["id"]
|
|
port = port_data["port"]
|
|
bind = port_data["bind"]
|
|
|
|
if not bind:
|
|
addr = f"tcp://localhost:{port}"
|
|
else:
|
|
addr = f"tcp://*:{port}"
|
|
|
|
match id:
|
|
case "main":
|
|
if addr != self._address:
|
|
if not bind:
|
|
self.req_socket.connect(addr)
|
|
else:
|
|
self.req_socket.bind(addr)
|
|
case "actuation":
|
|
ri_commands_agent = RICommandAgent(
|
|
settings.agent_settings.ri_command_agent_name
|
|
+ "@"
|
|
+ settings.agent_settings.host,
|
|
settings.agent_settings.ri_command_agent_name,
|
|
address=addr,
|
|
bind=bind,
|
|
)
|
|
await ri_commands_agent.start()
|
|
case _:
|
|
logger.warning("Unhandled negotiation id: %s", id)
|
|
|
|
except Exception as e:
|
|
logger.error("Error unpacking negotiation data: %s", e)
|
|
retries += 1
|
|
continue
|
|
|
|
# setup succeeded
|
|
break
|
|
|
|
else:
|
|
logger.error("Failed to set up RICommunicationAgent after %d retries", max_retries)
|
|
return
|
|
|
|
# Set up ping behaviour
|
|
listen_behaviour = self.ListenBehaviour()
|
|
self.add_behaviour(listen_behaviour)
|
|
logger.info("Finished setting up %s", self.jid)
|