chore: merged dev into branch and updated logging

This commit is contained in:
Björn Otgaar
2025-11-06 13:52:12 +01:00
34 changed files with 461 additions and 419 deletions

View File

@@ -1,19 +1,18 @@
import asyncio
import json
import logging
import zmq.asyncio
from spade.agent import Agent
from spade.behaviour import CyclicBehaviour
from zmq.asyncio import Context
from control_backend.agents.ri_command_agent import RICommandAgent
from control_backend.agents import BaseAgent
from control_backend.core.config import settings
logger = logging.getLogger(__name__)
from .ri_command_agent import RICommandAgent
class RICommunicationAgent(Agent):
class RICommunicationAgent(BaseAgent):
req_socket: zmq.Socket
_address = ""
_bind = True
connected = False
@@ -48,34 +47,38 @@ class RICommunicationAgent(Agent):
self.agent._req_socket.send_json(message), timeout=seconds_to_wait_total / 2
)
except TimeoutError:
logger.debug(
self.agent.logger.debug(
"Waited too long to send message - "
"we probably dont have any receivers... but let's check!"
)
# Wait up to three seconds for a reply:)
try:
logger.debug(f"waiting for message for {seconds_to_wait_total / 2} seconds.")
self.agent.logger.debug(
f"waiting for message for{seconds_to_wait_total / 2} seconds."
)
message = await asyncio.wait_for(
self.agent._req_socket.recv_json(), timeout=seconds_to_wait_total / 2
)
# We didnt get a reply :(
except TimeoutError:
logger.info("No ping retrieved in 3 seconds, killing myself.")
self.agent.logger.info("No ping retrieved in 3 seconds, killing myself.")
# Tell UI we're disconnected.
topic = b"ping"
data = json.dumps(False).encode()
if self.agent.pub_socket is None:
logger.error("communication agent pub socket not correctly initialized.")
self.agent.logger.error(
"communication agent pub socket not correctly initialized."
)
else:
try:
await asyncio.wait_for(
self.agent.pub_socket.send_multipart([topic, data]), 5
)
except TimeoutError:
logger.error(
self.agent.logger.error(
"Initial connection ping for router timed"
" out in ri_communication_agent."
)
@@ -83,9 +86,9 @@ class RICommunicationAgent(Agent):
# Try to reboot.
self.agent.setup()
logger.debug('Received message "%s"', message)
self.agent.logger.debug('Received message "%s"', message)
if "endpoint" not in message:
logger.error("No received endpoint in message, excepted ping endpoint.")
self.agent.logger.error("No received endpoint in message, excepted ping endpoint.")
return
# See what endpoint we received
@@ -97,7 +100,7 @@ class RICommunicationAgent(Agent):
await self.agent.pub_socket.send_multipart([topic, data])
await asyncio.sleep(1)
case _:
logger.info(
self.agent.logger.info(
"Received message with topic different than ping, while ping expected."
)
@@ -122,7 +125,7 @@ class RICommunicationAgent(Agent):
"""
Try to setup the communication agent, we have 5 retries in case we dont have a response yet.
"""
logger.info("Setting up %s", self.jid)
self.logger.info("Setting up %s", self.jid)
# Bind request socket
await self.setup_sockets()
@@ -142,7 +145,7 @@ class RICommunicationAgent(Agent):
received_message = await asyncio.wait_for(self._req_socket.recv_json(), timeout=1.0)
except TimeoutError:
logger.warning(
self.logger.warning(
"No connection established in 20 seconds (attempt %d/%d)",
retries + 1,
max_retries,
@@ -151,7 +154,7 @@ class RICommunicationAgent(Agent):
continue
except Exception as e:
logger.error("Unexpected error during negotiation: %s", e)
self.logger.error("Unexpected error during negotiation: %s", e)
retries += 1
continue
@@ -159,7 +162,7 @@ class RICommunicationAgent(Agent):
endpoint = received_message.get("endpoint")
if endpoint != "negotiate/ports":
# TODO: Should this send a message back?
logger.error(
self.logger.error(
"Invalid endpoint '%s' received (attempt %d/%d)",
endpoint,
retries + 1,
@@ -198,10 +201,10 @@ class RICommunicationAgent(Agent):
)
await ri_commands_agent.start()
case _:
logger.warning("Unhandled negotiation id: %s", id)
self.logger.warning("Unhandled negotiation id: %s", id)
except Exception as e:
logger.error("Error unpacking negotiation data: %s", e)
self.logger.error("Error unpacking negotiation data: %s", e)
retries += 1
continue
@@ -209,7 +212,7 @@ class RICommunicationAgent(Agent):
break
else:
logger.error("Failed to set up RICommunicationAgent after %d retries", max_retries)
self.logger.error("Failed to set up RICommunicationAgent after %d retries", max_retries)
return
# Set up ping behaviour
@@ -220,13 +223,13 @@ class RICommunicationAgent(Agent):
topic = b"ping"
data = json.dumps(True).encode()
if self.pub_socket is None:
logger.error("communication agent pub socket not correctly initialized.")
self.logger.error("communication agent pub socket not correctly initialized.")
else:
try:
await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5)
except TimeoutError:
logger.error(
self.logger.error(
"Initial connection ping for router timed out in ri_communication_agent."
)
self.connected = True
logger.info("Finished setting up %s", self.jid)
self.logger.info("Finished setting up %s", self.jid)