fix: wait for req socket send to make sure we dont stay stuck - if there's no... #23
@@ -39,6 +39,10 @@ class RICommunicationAgent(BaseAgent):
|
|||||||
"""
|
"""
|
||||||
assert self.agent is not None
|
assert self.agent is not None
|
||||||
|
|
||||||
|
if not self.agent.connected:
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
return
|
||||||
|
|
||||||
# We need to listen and sent pings.
|
# We need to listen and sent pings.
|
||||||
message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}}
|
message = {"endpoint": "ping", "data": {"id": "e.g. some reference id"}}
|
||||||
seconds_to_wait_total = 1.0
|
seconds_to_wait_total = 1.0
|
||||||
@@ -63,7 +67,13 @@ class RICommunicationAgent(BaseAgent):
|
|||||||
|
|
||||||
# We didnt get a reply :(
|
# We didnt get a reply :(
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
self.agent.logger.info("No ping retrieved in 3 seconds, killing myself.")
|
self.agent.logger.info(
|
||||||
|
f"No ping retrieved in {seconds_to_wait_total} seconds, "
|
||||||
|
"sending UI disconnection event and soft killing myself."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make sure we dont retry receiving messages untill we're setup.
|
||||||
|
self.agent.connected = False
|
||||||
|
|
||||||
# Tell UI we're disconnected.
|
# Tell UI we're disconnected.
|
||||||
topic = b"ping"
|
topic = b"ping"
|
||||||
@@ -84,7 +94,7 @@ class RICommunicationAgent(BaseAgent):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Try to reboot.
|
# Try to reboot.
|
||||||
self.agent.setup()
|
await self.agent.setup()
|
||||||
|
|
||||||
self.agent.logger.debug('Received message "%s"', message)
|
self.agent.logger.debug('Received message "%s"', message)
|
||||||
if "endpoint" not in message:
|
if "endpoint" not in message:
|
||||||
@@ -111,12 +121,11 @@ class RICommunicationAgent(BaseAgent):
|
|||||||
# Bind request socket
|
# Bind request socket
|
||||||
if self._req_socket is None or force:
|
if self._req_socket is None or force:
|
||||||
self._req_socket = Context.instance().socket(zmq.REQ)
|
self._req_socket = Context.instance().socket(zmq.REQ)
|
||||||
if self._bind: # TODO: Should this ever be the case with new architecture?
|
if self._bind:
|
||||||
self._req_socket.bind(self._address)
|
self._req_socket.bind(self._address)
|
||||||
else:
|
else:
|
||||||
self._req_socket.connect(self._address)
|
self._req_socket.connect(self._address)
|
||||||
|
|
||||||
# TODO: Check with Kasper
|
|
||||||
if self.pub_socket is None or force:
|
if self.pub_socket is None or force:
|
||||||
self.pub_socket = Context.instance().socket(zmq.PUB)
|
self.pub_socket = Context.instance().socket(zmq.PUB)
|
||||||
self.pub_socket.connect(settings.zmq_settings.internal_pub_address)
|
self.pub_socket.connect(settings.zmq_settings.internal_pub_address)
|
||||||
@@ -231,5 +240,7 @@ class RICommunicationAgent(BaseAgent):
|
|||||||
self.logger.error(
|
self.logger.error(
|
||||||
"Initial connection ping for router timed out in ri_communication_agent."
|
"Initial connection ping for router timed out in ri_communication_agent."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Make sure to start listening now that we're connected.
|
||||||
self.connected = True
|
self.connected = True
|
||||||
self.logger.info("Finished setting up %s", self.jid)
|
self.logger.info("Finished setting up %s", self.jid)
|
||||||
|
|||||||
Reference in New Issue
Block a user