import asyncio import json import logging import zmq.asyncio from fastapi import APIRouter, Request from fastapi.responses import StreamingResponse from zmq.asyncio import Context, Socket from control_backend.core.config import settings from control_backend.schemas.ri_message import SpeechCommand logger = logging.getLogger(__name__) router = APIRouter() @router.post("/command", status_code=202) async def receive_command(command: SpeechCommand, request: Request): # Validate and retrieve data. SpeechCommand.model_validate(command) topic = b"command" pub_socket: Socket = request.app.state.endpoints_pub_socket await pub_socket.send_multipart([topic, command.model_dump_json().encode()]) return {"status": "Command received"} @router.get("/ping_check") async def ping(request: Request): pass @router.get("/ping_stream") async def ping_stream(request: Request): """Stream live updates whenever the device state changes.""" async def event_stream(): # Set up internal socket to receive ping updates sub_socket = Context.instance().socket(zmq.SUB) sub_socket.connect(settings.zmq_settings.internal_sub_address) sub_socket.setsockopt(zmq.SUBSCRIBE, b"ping") connected = False ping_frequency = 2 # Even though its most likely the updates should alternate # (So, True - False - True - False for connectivity), # let's still check. while True: try: topic, body = await asyncio.wait_for( sub_socket.recv_multipart(), timeout=ping_frequency ) connected = json.loads(body) except TimeoutError: logger.debug("got timeout error in ping loop in ping router") connected = False # Stop if client disconnected if await request.is_disconnected(): logger.info("Client disconnected from SSE") break logger.debug(f"Yielded new connection event in robot ping router: {str(connected)}") connectedJson = json.dumps(connected) yield (f"data: {connectedJson}\n\n") return StreamingResponse(event_stream(), media_type="text/event-stream")