fix: wait for req socket send to make sure we dont stay stuck - if there's no... #23
@@ -56,11 +56,8 @@ class RICommunicationAgent(BaseAgent):
|
|||||||
"we probably dont have any receivers... but let's check!"
|
"we probably dont have any receivers... but let's check!"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Wait up to three seconds for a reply:)
|
# Wait up to {seconds_to_wait_total/2} seconds for a reply:)
|
||||||
try:
|
try:
|
||||||
self.agent.logger.debug(
|
|
||||||
f"waiting for message for{seconds_to_wait_total / 2} seconds."
|
|
||||||
)
|
|
||||||
message = await asyncio.wait_for(
|
message = await asyncio.wait_for(
|
||||||
self.agent._req_socket.recv_json(), timeout=seconds_to_wait_total / 2
|
self.agent._req_socket.recv_json(), timeout=seconds_to_wait_total / 2
|
||||||
)
|
)
|
||||||
@@ -96,7 +93,7 @@ class RICommunicationAgent(BaseAgent):
|
|||||||
# Try to reboot.
|
# Try to reboot.
|
||||||
await self.agent.setup()
|
await self.agent.setup()
|
||||||
|
|
||||||
self.agent.logger.debug('Received message "%s"', message)
|
self.agent.logger.debug(f'Received message "{message}" from RI.')
|
||||||
if "endpoint" not in message:
|
if "endpoint" not in message:
|
||||||
self.agent.logger.error("No received endpoint in message, excepted ping endpoint.")
|
self.agent.logger.error("No received endpoint in message, excepted ping endpoint.")
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -39,7 +39,6 @@ async def ping_stream(request: Request):
|
|||||||
|
|
||||||
async def event_stream():
|
async def event_stream():
|
||||||
# Set up internal socket to receive ping updates
|
# Set up internal socket to receive ping updates
|
||||||
logger.debug("Ping stream router event stream entered.")
|
|
||||||
|
|
||||||
sub_socket = Context.instance().socket(zmq.SUB)
|
sub_socket = Context.instance().socket(zmq.SUB)
|
||||||
sub_socket.connect(settings.zmq_settings.internal_sub_address)
|
sub_socket.connect(settings.zmq_settings.internal_sub_address)
|
||||||
@@ -52,12 +51,10 @@ async def ping_stream(request: Request):
|
|||||||
# So, True - False - True - False for connectivity.
|
# So, True - False - True - False for connectivity.
|
||||||
# Let's still check:)
|
# Let's still check:)
|
||||||
while True:
|
while True:
|
||||||
logger.debug("Ping stream entered listening ")
|
|
||||||
try:
|
try:
|
||||||
topic, body = await asyncio.wait_for(
|
topic, body = await asyncio.wait_for(
|
||||||
sub_socket.recv_multipart(), timeout=ping_frequency
|
sub_socket.recv_multipart(), timeout=ping_frequency
|
||||||
)
|
)
|
||||||
logger.debug(f"got ping change in ping_stream router: {body}")
|
|
||||||
connected = json.loads(body)
|
connected = json.loads(body)
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
logger.debug("got timeout error in ping loop in ping router")
|
logger.debug("got timeout error in ping loop in ping router")
|
||||||
|
|||||||
Reference in New Issue
Block a user