feat: fixed socket typing for communication agent and ping router- automatically try to reconnect with robot.

ref: N25B-151
This commit is contained in:
Björn Otgaar
2025-10-29 21:55:23 +01:00
parent 669d0190d6
commit 4f2d45fb44
3 changed files with 47 additions and 28 deletions

View File

@@ -4,6 +4,7 @@ import logging
from spade.agent import Agent from spade.agent import Agent
from spade.behaviour import CyclicBehaviour from spade.behaviour import CyclicBehaviour
import zmq import zmq
import zmq.asyncio
from control_backend.core.config import settings from control_backend.core.config import settings
@@ -16,7 +17,8 @@ logger = logging.getLogger(__name__)
class RICommunicationAgent(Agent): class RICommunicationAgent(Agent):
req_socket: zmq.Socket _pub_socket: zmq.asyncio.Socket
req_socket: zmq.asyncio.Socket | None
_address = "" _address = ""
_bind = True _bind = True
connected = False connected = False
@@ -25,14 +27,18 @@ class RICommunicationAgent(Agent):
self, self,
jid: str, jid: str,
password: str, password: str,
pub_socket: zmq.asyncio.Socket,
port: int = 5222, port: int = 5222,
verify_security: bool = False, verify_security: bool = False,
address="tcp://localhost:0000", address="tcp://localhost:0000",
bind=False, bind=False,
): ):
super().__init__(jid, password, port, verify_security) super().__init__(jid, password, port, verify_security)
self._address = address self._address = address
self._bind = bind self._bind = bind
self.req_socket = None
self._pub_socket = pub_socket
class ListenBehaviour(CyclicBehaviour): class ListenBehaviour(CyclicBehaviour):
async def run(self): async def run(self):
@@ -43,7 +49,7 @@ class RICommunicationAgent(Agent):
# We need to listen and sent pings. # We need to listen and sent pings.
message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}} message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}}
seconds_to_wait_total = 4.0 seconds_to_wait_total = 1.0
try: try:
await asyncio.wait_for( await asyncio.wait_for(
self.agent.req_socket.send_json(message), timeout=seconds_to_wait_total / 2 self.agent.req_socket.send_json(message), timeout=seconds_to_wait_total / 2
@@ -62,16 +68,12 @@ class RICommunicationAgent(Agent):
# We didnt get a reply :( # We didnt get a reply :(
except TimeoutError as e: except TimeoutError as e:
logger.info(f"No ping back retrieved in {seconds_to_wait_total/2} seconds totalling {seconds_to_wait_total} of time, killing myself.") logger.info(f"No ping back retrieved in {seconds_to_wait_total/2} seconds totalling {seconds_to_wait_total} of time, killing myself (or maybe just laying low).")
self.agent.connected = False
# TODO: Send event to UI letting know that we've lost connection # TODO: Send event to UI letting know that we've lost connection
topic = b"ping" topic = b"ping"
data = json.dumps(False).encode() data = json.dumps(False).encode()
pub_socket = context.socket(zmq.PUB) self.agent._pub_socket.send_multipart([topic, data])
pub_socket.connect(settings.zmq_settings.internal_comm_address) await self.agent.setup()
pub_socket.send_multipart([topic, data])
self.kill()
except Exception as e: except Exception as e:
logger.debug(f"Differennt exception: {e}") logger.debug(f"Differennt exception: {e}")
@@ -90,25 +92,38 @@ class RICommunicationAgent(Agent):
"Received message with topic different than ping, while ping expected." "Received message with topic different than ping, while ping expected."
) )
async def setup(self, max_retries: int = 5):
"""
Try to setup the communication agent, we have 5 retries in case we dont have a response yet.
"""
logger.info("Setting up %s", self.jid)
retries = 0
async def setup_req_socket(self, force = False):
# Let's try a certain amount of times before failing connection """
while retries < max_retries: Sets up request socket for communication agent.
# Bind request socket """
if self.req_socket is None or force:
self.req_socket = context.socket(zmq.REQ) self.req_socket = context.socket(zmq.REQ)
if self._bind: if self._bind:
self.req_socket.bind(self._address) self.req_socket.bind(self._address)
else: else:
self.req_socket.connect(self._address) self.req_socket.connect(self._address)
async def setup(self, max_retries: int = 5):
"""
Try to setup the communication agent, we have 5 retries in case we dont have a response yet.
"""
logger.info("Setting up %s", self.jid)
# Bind request socket
await self.setup_req_socket()
retries = 0
# Let's try a certain amount of times before failing connection
while retries < max_retries:
# Make sure the socket is properly setup.
if self.req_socket is None:
continue
# Send our message and receive one back:) # Send our message and receive one back:)
message = {"endpoint": "negotiate/ports", "data": None} message = {"endpoint": "negotiate/ports", "data": {}}
await self.req_socket.send_json(message) await self.req_socket.send_json(message)
try: try:
@@ -190,5 +205,8 @@ class RICommunicationAgent(Agent):
self.add_behaviour(listen_behaviour) self.add_behaviour(listen_behaviour)
# TODO: Let UI know that we're connected >:) # TODO: Let UI know that we're connected >:)
topic = b"ping"
data = json.dumps(True).encode()
await self._pub_socket.send_multipart([topic, data])
self.connected = True self.connected = True
logger.info("Finished setting up %s", self.jid) logger.info("Finished setting up %s", self.jid)

View File

@@ -6,7 +6,7 @@ import zmq.asyncio
import json import json
import datetime import datetime
from zmq import Socket from zmq.asyncio import Socket
from control_backend.core.zmq_context import context from control_backend.core.zmq_context import context
from control_backend.core.config import settings from control_backend.core.config import settings
from control_backend.schemas.ri_message import SpeechCommand, RIEndpoint from control_backend.schemas.ri_message import SpeechCommand, RIEndpoint
@@ -24,7 +24,7 @@ async def receive_command(command: SpeechCommand, request: Request):
# Validate and retrieve data. # Validate and retrieve data.
SpeechCommand.model_validate(command) SpeechCommand.model_validate(command)
topic = b"command" topic = b"command"
pub_socket: Socket = request.app.state.internal_comm_socket pub_socket : Socket = request.app.state.internal_comm_socket
pub_socket.send_multipart([topic, command.model_dump_json().encode()]) pub_socket.send_multipart([topic, command.model_dump_json().encode()])
return {"status": "Command received"} return {"status": "Command received"}
@@ -41,7 +41,8 @@ async def ping_stream(request: Request):
async def event_stream(): async def event_stream():
# Set up internal socket to receive ping updates # Set up internal socket to receive ping updates
logger.debug("Ping stream router event stream entered.") logger.debug("Ping stream router event stream entered.")
sub_socket = zmq.asyncio.Context().socket(zmq.SUB)
sub_socket = context.socket(zmq.SUB)
sub_socket.connect(settings.zmq_settings.internal_comm_address) sub_socket.connect(settings.zmq_settings.internal_comm_address)
sub_socket.setsockopt(zmq.SUBSCRIBE, b"ping") sub_socket.setsockopt(zmq.SUBSCRIBE, b"ping")
connected = True connected = True
@@ -70,5 +71,4 @@ async def ping_stream(request: Request):
yield f"data: {str(connected)}, time:{str(datetime.datetime.now().strftime("%H:%M:%S"))}\n\n" yield f"data: {str(connected)}, time:{str(datetime.datetime.now().strftime("%H:%M:%S"))}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream") return StreamingResponse(event_stream(), media_type="text/event-stream")

View File

@@ -32,8 +32,9 @@ async def lifespan(app: FastAPI):
# Initiate agents # Initiate agents
ri_communication_agent = RICommunicationAgent( ri_communication_agent = RICommunicationAgent(
settings.agent_settings.ri_communication_agent_name + "@" + settings.agent_settings.host, jid=settings.agent_settings.ri_communication_agent_name + "@" + settings.agent_settings.host,
settings.agent_settings.ri_communication_agent_name, password=settings.agent_settings.ri_communication_agent_name,
pub_socket=internal_comm_socket,
address="tcp://*:5555", address="tcp://*:5555",
bind=True, bind=True,
) )