Expanding abbreviations to remove ambiguity, simplifying agent names to reduce repetition. ref: N25B-257
250 lines
9.0 KiB
Python
250 lines
9.0 KiB
Python
import asyncio
|
|
import json
|
|
|
|
import zmq.asyncio
|
|
from spade.behaviour import CyclicBehaviour
|
|
from zmq.asyncio import Context
|
|
|
|
from control_backend.agents import BaseAgent
|
|
from control_backend.core.config import settings
|
|
|
|
from ..actuation.robot_speech_agent import RobotSpeechAgent
|
|
|
|
|
|
class RICommunicationAgent(BaseAgent):
|
|
req_socket: zmq.Socket
|
|
_address = ""
|
|
_bind = True
|
|
connected = False
|
|
|
|
def __init__(
|
|
self,
|
|
jid: str,
|
|
password: str,
|
|
port: int = 5222,
|
|
verify_security: bool = False,
|
|
address="tcp://localhost:0000",
|
|
bind=False,
|
|
):
|
|
super().__init__(jid, password, port, verify_security)
|
|
self._address = address
|
|
self._bind = bind
|
|
self._req_socket: zmq.asyncio.Socket | None = None
|
|
self.pub_socket: zmq.asyncio.Socket | None = None
|
|
|
|
class ListenBehaviour(CyclicBehaviour):
|
|
async def run(self):
|
|
"""
|
|
Run the listening (ping) loop indefinetely.
|
|
"""
|
|
assert self.agent is not None
|
|
|
|
if not self.agent.connected:
|
|
await asyncio.sleep(1)
|
|
return
|
|
|
|
# We need to listen and sent pings.
|
|
message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}}
|
|
seconds_to_wait_total = 1.0
|
|
try:
|
|
await asyncio.wait_for(
|
|
self.agent._req_socket.send_json(message), timeout=seconds_to_wait_total / 2
|
|
)
|
|
except TimeoutError:
|
|
self.agent.logger.debug(
|
|
"Waited too long to send message - "
|
|
"we probably dont have any receivers... but let's check!"
|
|
)
|
|
|
|
# Wait up to {seconds_to_wait_total/2} seconds for a reply
|
|
try:
|
|
message = await asyncio.wait_for(
|
|
self.agent._req_socket.recv_json(), timeout=seconds_to_wait_total / 2
|
|
)
|
|
|
|
# We didnt get a reply
|
|
except TimeoutError:
|
|
self.agent.logger.info(
|
|
f"No ping retrieved in {seconds_to_wait_total} seconds, "
|
|
"sending UI disconnection event and attempting to restart."
|
|
)
|
|
|
|
# Make sure we dont retry receiving messages untill we're setup.
|
|
self.agent.connected = False
|
|
self.agent.remove_behaviour(self)
|
|
|
|
# Tell UI we're disconnected.
|
|
topic = b"ping"
|
|
data = json.dumps(False).encode()
|
|
if self.agent.pub_socket is None:
|
|
self.agent.logger.warning(
|
|
"Communication agent pub socket not correctly initialized."
|
|
)
|
|
else:
|
|
try:
|
|
await asyncio.wait_for(
|
|
self.agent.pub_socket.send_multipart([topic, data]), 5
|
|
)
|
|
except TimeoutError:
|
|
self.agent.logger.warning(
|
|
"Initial connection ping for router timed out in com_ri_agent."
|
|
)
|
|
|
|
# Try to reboot.
|
|
self.agent.logger.debug("Restarting communication agent.")
|
|
await self.agent.setup()
|
|
|
|
self.agent.logger.debug(f'Received message "{message}" from RI.')
|
|
if "endpoint" not in message:
|
|
self.agent.logger.warning(
|
|
"No received endpoint in message, expected ping endpoint."
|
|
)
|
|
return
|
|
|
|
# See what endpoint we received
|
|
match message["endpoint"]:
|
|
case "ping":
|
|
topic = b"ping"
|
|
data = json.dumps(True).encode()
|
|
if self.agent.pub_socket is not None:
|
|
await self.agent.pub_socket.send_multipart([topic, data])
|
|
await asyncio.sleep(1)
|
|
case _:
|
|
self.agent.logger.debug(
|
|
"Received message with topic different than ping, while ping expected."
|
|
)
|
|
|
|
async def setup_sockets(self, force=False):
|
|
"""
|
|
Sets up request socket for communication agent.
|
|
"""
|
|
# Bind request socket
|
|
if self._req_socket is None or force:
|
|
self._req_socket = Context.instance().socket(zmq.REQ)
|
|
if self._bind:
|
|
self._req_socket.bind(self._address)
|
|
else:
|
|
self._req_socket.connect(self._address)
|
|
|
|
if self.pub_socket is None or force:
|
|
self.pub_socket = Context.instance().socket(zmq.PUB)
|
|
self.pub_socket.connect(settings.zmq_settings.internal_pub_address)
|
|
|
|
async def setup(self, max_retries: int = 100):
|
|
"""
|
|
Try to setup the communication agent, we have 5 retries in case we dont have a response yet.
|
|
"""
|
|
self.logger.info("Setting up %s", self.jid)
|
|
|
|
# Bind request socket
|
|
await self.setup_sockets()
|
|
|
|
retries = 0
|
|
# Let's try a certain amount of times before failing connection
|
|
while retries < max_retries:
|
|
# Make sure the socket is properly setup.
|
|
if self._req_socket is None:
|
|
continue
|
|
|
|
# Send our message and receive one back
|
|
message = {"endpoint": "negotiate/ports", "data": {}}
|
|
await self._req_socket.send_json(message)
|
|
|
|
retry_frequency = 1.0
|
|
try:
|
|
received_message = await asyncio.wait_for(
|
|
self._req_socket.recv_json(), timeout=retry_frequency
|
|
)
|
|
|
|
except TimeoutError:
|
|
self.logger.warning(
|
|
"No connection established in %d seconds (attempt %d/%d)",
|
|
retries * retry_frequency,
|
|
retries + 1,
|
|
max_retries,
|
|
)
|
|
retries += 1
|
|
continue
|
|
|
|
except Exception as e:
|
|
self.logger.warning("Unexpected error during negotiation: %s", e)
|
|
retries += 1
|
|
continue
|
|
|
|
# Validate endpoint
|
|
endpoint = received_message.get("endpoint")
|
|
if endpoint != "negotiate/ports":
|
|
self.logger.warning(
|
|
"Invalid endpoint '%s' received (attempt %d/%d)",
|
|
endpoint,
|
|
retries + 1,
|
|
max_retries,
|
|
)
|
|
retries += 1
|
|
await asyncio.sleep(1)
|
|
continue
|
|
|
|
# At this point, we have a valid response
|
|
try:
|
|
for port_data in received_message["data"]:
|
|
id = port_data["id"]
|
|
port = port_data["port"]
|
|
bind = port_data["bind"]
|
|
|
|
if not bind:
|
|
addr = f"tcp://localhost:{port}"
|
|
else:
|
|
addr = f"tcp://*:{port}"
|
|
|
|
match id:
|
|
case "main":
|
|
if addr != self._address:
|
|
if not bind:
|
|
self._req_socket.connect(addr)
|
|
else:
|
|
self._req_socket.bind(addr)
|
|
case "actuation":
|
|
ri_commands_agent = RobotSpeechAgent(
|
|
settings.agent_settings.robot_speech_name
|
|
+ "@"
|
|
+ settings.agent_settings.host,
|
|
settings.agent_settings.robot_speech_name,
|
|
address=addr,
|
|
bind=bind,
|
|
)
|
|
await ri_commands_agent.start()
|
|
case _:
|
|
self.logger.warning("Unhandled negotiation id: %s", id)
|
|
|
|
except Exception as e:
|
|
self.logger.warning("Error unpacking negotiation data: %s", e)
|
|
retries += 1
|
|
await asyncio.sleep(1)
|
|
continue
|
|
|
|
# setup succeeded
|
|
break
|
|
|
|
else:
|
|
self.logger.warning("Failed to set up %s after %d retries", self.name, max_retries)
|
|
return
|
|
|
|
# Set up ping behaviour
|
|
listen_behaviour = self.ListenBehaviour()
|
|
self.add_behaviour(listen_behaviour)
|
|
|
|
# Let UI know that we're connected
|
|
topic = b"ping"
|
|
data = json.dumps(True).encode()
|
|
if self.pub_socket is None:
|
|
self.logger.warning("Communication agent pub socket not correctly initialized.")
|
|
else:
|
|
try:
|
|
await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5)
|
|
except TimeoutError:
|
|
self.logger.warning("Initial connection ping for router timed out in com_ri_agent.")
|
|
|
|
# Make sure to start listening now that we're connected.
|
|
self.connected = True
|
|
self.logger.info("Finished setting up %s", self.jid)
|