Merge remote-tracking branch 'origin/dev' into refactor/config-file
# Conflicts: # src/control_backend/agents/ri_communication_agent.py # src/control_backend/core/config.py # src/control_backend/main.py
This commit is contained in:
1
src/control_backend/agents/communication/__init__.py
Normal file
1
src/control_backend/agents/communication/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .ri_communication_agent import RICommunicationAgent as RICommunicationAgent
|
||||
@@ -0,0 +1,250 @@
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
import zmq.asyncio
|
||||
from spade.behaviour import CyclicBehaviour
|
||||
from zmq.asyncio import Context
|
||||
|
||||
from control_backend.agents import BaseAgent
|
||||
from control_backend.core.config import settings
|
||||
|
||||
from ..actuation.robot_speech_agent import RobotSpeechAgent
|
||||
|
||||
|
||||
class RICommunicationAgent(BaseAgent):
|
||||
req_socket: zmq.Socket
|
||||
_address = ""
|
||||
_bind = True
|
||||
connected = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
jid: str,
|
||||
password: str,
|
||||
port: int = settings.agent_settings.default_spade_port,
|
||||
verify_security: bool = False,
|
||||
address=settings.zmq_settings.ri_command_address,
|
||||
bind=False,
|
||||
):
|
||||
super().__init__(jid, password, port, verify_security)
|
||||
self._address = address
|
||||
self._bind = bind
|
||||
self._req_socket: zmq.asyncio.Socket | None = None
|
||||
self.pub_socket: zmq.asyncio.Socket | None = None
|
||||
|
||||
class ListenBehaviour(CyclicBehaviour):
|
||||
async def run(self):
|
||||
"""
|
||||
Run the listening (ping) loop indefinetely.
|
||||
"""
|
||||
assert self.agent is not None
|
||||
|
||||
if not self.agent.connected:
|
||||
await asyncio.sleep(settings.behaviour_settings.sleep_s)
|
||||
return
|
||||
|
||||
# We need to listen and sent pings.
|
||||
message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}}
|
||||
seconds_to_wait_total = settings.behaviour_settings.sleep_s
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
self.agent._req_socket.send_json(message), timeout=seconds_to_wait_total / 2
|
||||
)
|
||||
except TimeoutError:
|
||||
self.agent.logger.debug(
|
||||
"Waited too long to send message - "
|
||||
"we probably dont have any receivers... but let's check!"
|
||||
)
|
||||
|
||||
# Wait up to {seconds_to_wait_total/2} seconds for a reply
|
||||
try:
|
||||
message = await asyncio.wait_for(
|
||||
self.agent._req_socket.recv_json(), timeout=seconds_to_wait_total / 2
|
||||
)
|
||||
|
||||
# We didnt get a reply
|
||||
except TimeoutError:
|
||||
self.agent.logger.info(
|
||||
f"No ping retrieved in {seconds_to_wait_total} seconds, "
|
||||
"sending UI disconnection event and attempting to restart."
|
||||
)
|
||||
|
||||
# Make sure we dont retry receiving messages untill we're setup.
|
||||
self.agent.connected = False
|
||||
self.agent.remove_behaviour(self)
|
||||
|
||||
# Tell UI we're disconnected.
|
||||
topic = b"ping"
|
||||
data = json.dumps(False).encode()
|
||||
if self.agent.pub_socket is None:
|
||||
self.agent.logger.warning(
|
||||
"Communication agent pub socket not correctly initialized."
|
||||
)
|
||||
else:
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
self.agent.pub_socket.send_multipart([topic, data]), 5
|
||||
)
|
||||
except TimeoutError:
|
||||
self.agent.logger.warning(
|
||||
f"Initial connection ping for router timed out in {self.agent.name}."
|
||||
)
|
||||
|
||||
# Try to reboot.
|
||||
self.agent.logger.debug("Restarting communication agent.")
|
||||
await self.agent.setup()
|
||||
|
||||
self.agent.logger.debug(f'Received message "{message}" from RI.')
|
||||
if "endpoint" not in message:
|
||||
self.agent.logger.warning(
|
||||
"No received endpoint in message, expected ping endpoint."
|
||||
)
|
||||
return
|
||||
|
||||
# See what endpoint we received
|
||||
match message["endpoint"]:
|
||||
case "ping":
|
||||
topic = b"ping"
|
||||
data = json.dumps(True).encode()
|
||||
if self.agent.pub_socket is not None:
|
||||
await self.agent.pub_socket.send_multipart([topic, data])
|
||||
await asyncio.sleep(settings.behaviour_settings.sleep_s)
|
||||
case _:
|
||||
self.agent.logger.debug(
|
||||
"Received message with topic different than ping, while ping expected."
|
||||
)
|
||||
|
||||
async def setup_sockets(self, force=False):
|
||||
"""
|
||||
Sets up request socket for communication agent.
|
||||
"""
|
||||
# Bind request socket
|
||||
if self._req_socket is None or force:
|
||||
self._req_socket = Context.instance().socket(zmq.REQ)
|
||||
if self._bind:
|
||||
self._req_socket.bind(self._address)
|
||||
else:
|
||||
self._req_socket.connect(self._address)
|
||||
|
||||
if self.pub_socket is None or force:
|
||||
self.pub_socket = Context.instance().socket(zmq.PUB)
|
||||
self.pub_socket.connect(settings.zmq_settings.internal_pub_address)
|
||||
|
||||
async def setup(self, max_retries: int = settings.behaviour_settings.comm_setup_max_retries):
|
||||
"""
|
||||
Try to set up the communication agent, we have `behaviour_settings.comm_setup_max_retries`
|
||||
retries in case we don't have a response yet.
|
||||
"""
|
||||
self.logger.info("Setting up %s", self.jid)
|
||||
|
||||
# Bind request socket
|
||||
await self.setup_sockets()
|
||||
|
||||
retries = 0
|
||||
# Let's try a certain amount of times before failing connection
|
||||
while retries < max_retries:
|
||||
# Make sure the socket is properly setup.
|
||||
if self._req_socket is None:
|
||||
continue
|
||||
|
||||
# Send our message and receive one back
|
||||
message = {"endpoint": "negotiate/ports", "data": {}}
|
||||
await self._req_socket.send_json(message)
|
||||
|
||||
retry_frequency = 1.0
|
||||
try:
|
||||
received_message = await asyncio.wait_for(
|
||||
self._req_socket.recv_json(), timeout=retry_frequency
|
||||
)
|
||||
|
||||
except TimeoutError:
|
||||
self.logger.warning(
|
||||
"No connection established in %d seconds (attempt %d/%d)",
|
||||
retries * retry_frequency,
|
||||
retries + 1,
|
||||
max_retries,
|
||||
)
|
||||
retries += 1
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
self.logger.warning("Unexpected error during negotiation: %s", e)
|
||||
retries += 1
|
||||
continue
|
||||
|
||||
# Validate endpoint
|
||||
endpoint = received_message.get("endpoint")
|
||||
if endpoint != "negotiate/ports":
|
||||
self.logger.warning(
|
||||
"Invalid endpoint '%s' received (attempt %d/%d)",
|
||||
endpoint,
|
||||
retries + 1,
|
||||
max_retries,
|
||||
)
|
||||
retries += 1
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
|
||||
# At this point, we have a valid response
|
||||
try:
|
||||
for port_data in received_message["data"]:
|
||||
id = port_data["id"]
|
||||
port = port_data["port"]
|
||||
bind = port_data["bind"]
|
||||
|
||||
if not bind:
|
||||
addr = f"tcp://localhost:{port}"
|
||||
else:
|
||||
addr = f"tcp://*:{port}"
|
||||
|
||||
match id:
|
||||
case "main":
|
||||
if addr != self._address:
|
||||
if not bind:
|
||||
self._req_socket.connect(addr)
|
||||
else:
|
||||
self._req_socket.bind(addr)
|
||||
case "actuation":
|
||||
ri_commands_agent = RobotSpeechAgent(
|
||||
settings.agent_settings.robot_speech_name
|
||||
+ "@"
|
||||
+ settings.agent_settings.host,
|
||||
settings.agent_settings.robot_speech_name,
|
||||
address=addr,
|
||||
bind=bind,
|
||||
)
|
||||
await ri_commands_agent.start()
|
||||
case _:
|
||||
self.logger.warning("Unhandled negotiation id: %s", id)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.warning("Error unpacking negotiation data: %s", e)
|
||||
retries += 1
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
|
||||
# setup succeeded
|
||||
break
|
||||
|
||||
else:
|
||||
self.logger.warning("Failed to set up %s after %d retries", self.name, max_retries)
|
||||
return
|
||||
|
||||
# Set up ping behaviour
|
||||
listen_behaviour = self.ListenBehaviour()
|
||||
self.add_behaviour(listen_behaviour)
|
||||
|
||||
# Let UI know that we're connected
|
||||
topic = b"ping"
|
||||
data = json.dumps(True).encode()
|
||||
if self.pub_socket is None:
|
||||
self.logger.warning("Communication agent pub socket not correctly initialized.")
|
||||
else:
|
||||
try:
|
||||
await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5)
|
||||
except TimeoutError:
|
||||
self.logger.warning("Initial connection ping for router timed out in com_ri_agent.")
|
||||
|
||||
# Make sure to start listening now that we're connected.
|
||||
self.connected = True
|
||||
self.logger.info("Finished setting up %s", self.jid)
|
||||
Reference in New Issue
Block a user