From 669d0190d6b5f9564d6941883b557fdc7d4d4d7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Wed, 29 Oct 2025 19:22:06 +0100 Subject: [PATCH] feat: started ping router and internal messaging for pings ref: N25B-151 --- .../agents/ri_communication_agent.py | 10 ++- .../api/v1/endpoints/command.py | 21 ------ src/control_backend/api/v1/endpoints/robot.py | 74 +++++++++++++++++++ src/control_backend/api/v1/router.py | 4 +- ...and_endpoint.py => test_robot_endpoint.py} | 4 +- 5 files changed, 87 insertions(+), 26 deletions(-) delete mode 100644 src/control_backend/api/v1/endpoints/command.py create mode 100644 src/control_backend/api/v1/endpoints/robot.py rename test/integration/api/endpoints/{test_command_endpoint.py => test_robot_endpoint.py} (95%) diff --git a/src/control_backend/agents/ri_communication_agent.py b/src/control_backend/agents/ri_communication_agent.py index 2b92989..4da2c69 100644 --- a/src/control_backend/agents/ri_communication_agent.py +++ b/src/control_backend/agents/ri_communication_agent.py @@ -5,11 +5,13 @@ from spade.agent import Agent from spade.behaviour import CyclicBehaviour import zmq + from control_backend.core.config import settings from control_backend.core.zmq_context import context -from control_backend.schemas.message import Message +from control_backend.schemas.ri_message import RIMessage from control_backend.agents.ri_command_agent import RICommandAgent + logger = logging.getLogger(__name__) @@ -63,6 +65,11 @@ class RICommunicationAgent(Agent): logger.info(f"No ping back retrieved in {seconds_to_wait_total/2} seconds totalling {seconds_to_wait_total} of time, killing myself.") self.agent.connected = False # TODO: Send event to UI letting know that we've lost connection + topic = b"ping" + data = json.dumps(False).encode() + pub_socket = context.socket(zmq.PUB) + pub_socket.connect(settings.zmq_settings.internal_comm_address) + pub_socket.send_multipart([topic, data]) self.kill() @@ -90,6 +97,7 @@ class RICommunicationAgent(Agent): logger.info("Setting up %s", self.jid) retries = 0 + # Let's try a certain amount of times before failing connection while retries < max_retries: # Bind request socket diff --git a/src/control_backend/api/v1/endpoints/command.py b/src/control_backend/api/v1/endpoints/command.py deleted file mode 100644 index e7fef60..0000000 --- a/src/control_backend/api/v1/endpoints/command.py +++ /dev/null @@ -1,21 +0,0 @@ -from fastapi import APIRouter, Request -import logging - -from zmq import Socket - -from control_backend.schemas.ri_message import SpeechCommand, RIEndpoint - -logger = logging.getLogger(__name__) - -router = APIRouter() - - -@router.post("/command", status_code=202) -async def receive_command(command: SpeechCommand, request: Request): - # Validate and retrieve data. - SpeechCommand.model_validate(command) - topic = b"command" - pub_socket: Socket = request.app.state.internal_comm_socket - pub_socket.send_multipart([topic, command.model_dump_json().encode()]) - - return {"status": "Command received"} diff --git a/src/control_backend/api/v1/endpoints/robot.py b/src/control_backend/api/v1/endpoints/robot.py new file mode 100644 index 0000000..1d0da9c --- /dev/null +++ b/src/control_backend/api/v1/endpoints/robot.py @@ -0,0 +1,74 @@ +from fastapi import APIRouter, Request +from fastapi.responses import JSONResponse, StreamingResponse +import logging +import asyncio +import zmq.asyncio +import json +import datetime + +from zmq import Socket +from control_backend.core.zmq_context import context +from control_backend.core.config import settings +from control_backend.schemas.ri_message import SpeechCommand, RIEndpoint + + + + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +@router.post("/command", status_code=202) +async def receive_command(command: SpeechCommand, request: Request): + # Validate and retrieve data. + SpeechCommand.model_validate(command) + topic = b"command" + pub_socket: Socket = request.app.state.internal_comm_socket + pub_socket.send_multipart([topic, command.model_dump_json().encode()]) + + return {"status": "Command received"} + + +@router.get("/ping_check") +async def ping(request: Request): + pass + + +@router.get("/ping_stream") +async def ping_stream(request: Request): + """Stream live updates whenever the device state changes.""" + async def event_stream(): + # Set up internal socket to receive ping updates + logger.debug("Ping stream router event stream entered.") + sub_socket = zmq.asyncio.Context().socket(zmq.SUB) + sub_socket.connect(settings.zmq_settings.internal_comm_address) + sub_socket.setsockopt(zmq.SUBSCRIBE, b"ping") + connected = True + + ping_frequency = 1 # How many seconds between ping attempts + + # Even though its most likely the updates should alternate + # So, True - False - True - False for connectivity. + # Let's still check:) + while True: + logger.debug("Ping stream entered listening ") + try: + topic, body = await asyncio.wait_for(sub_socket.recv_multipart(), timeout=ping_frequency) + logger.debug("got ping change in ping_stream router") + connected = json.loads(body) + except TimeoutError as e: + await asyncio.sleep(0.1) + + # Stop if client disconnected + if await request.is_disconnected(): + print("Client disconnected from SSE") + break + + + logger.debug(f"Yielded new connection event in robot ping router: {str(connected)}") + yield f"data: {str(connected)}, time:{str(datetime.datetime.now().strftime("%H:%M:%S"))}\n\n" + + + + return StreamingResponse(event_stream(), media_type="text/event-stream") \ No newline at end of file diff --git a/src/control_backend/api/v1/router.py b/src/control_backend/api/v1/router.py index dc7aea9..dca7e27 100644 --- a/src/control_backend/api/v1/router.py +++ b/src/control_backend/api/v1/router.py @@ -1,6 +1,6 @@ from fastapi.routing import APIRouter -from control_backend.api.v1.endpoints import message, sse, command +from control_backend.api.v1.endpoints import message, sse, robot api_router = APIRouter() @@ -8,4 +8,4 @@ api_router.include_router(message.router, tags=["Messages"]) api_router.include_router(sse.router, tags=["SSE"]) -api_router.include_router(command.router, tags=["Commands"]) +api_router.include_router(robot.router, prefix="/robot", tags=["Pings", "Commands"]) \ No newline at end of file diff --git a/test/integration/api/endpoints/test_command_endpoint.py b/test/integration/api/endpoints/test_robot_endpoint.py similarity index 95% rename from test/integration/api/endpoints/test_command_endpoint.py rename to test/integration/api/endpoints/test_robot_endpoint.py index 07bd866..827fb17 100644 --- a/test/integration/api/endpoints/test_command_endpoint.py +++ b/test/integration/api/endpoints/test_robot_endpoint.py @@ -3,7 +3,7 @@ from fastapi import FastAPI from fastapi.testclient import TestClient from unittest.mock import MagicMock -from control_backend.api.v1.endpoints import command +from control_backend.api.v1.endpoints import robot from control_backend.schemas.ri_message import SpeechCommand @@ -14,7 +14,7 @@ def app(): Also sets up a mock internal_comm_socket. """ app = FastAPI() - app.include_router(command.router) + app.include_router(robot.router) app.state.internal_comm_socket = MagicMock() # mock ZMQ socket return app