feat: started ping router and internal messaging for pings
ref: N25B-151
This commit is contained in:
@@ -5,11 +5,13 @@ from spade.agent import Agent
|
||||
from spade.behaviour import CyclicBehaviour
|
||||
import zmq
|
||||
|
||||
|
||||
from control_backend.core.config import settings
|
||||
from control_backend.core.zmq_context import context
|
||||
from control_backend.schemas.message import Message
|
||||
from control_backend.schemas.ri_message import RIMessage
|
||||
from control_backend.agents.ri_command_agent import RICommandAgent
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -63,6 +65,11 @@ class RICommunicationAgent(Agent):
|
||||
logger.info(f"No ping back retrieved in {seconds_to_wait_total/2} seconds totalling {seconds_to_wait_total} of time, killing myself.")
|
||||
self.agent.connected = False
|
||||
# TODO: Send event to UI letting know that we've lost connection
|
||||
topic = b"ping"
|
||||
data = json.dumps(False).encode()
|
||||
pub_socket = context.socket(zmq.PUB)
|
||||
pub_socket.connect(settings.zmq_settings.internal_comm_address)
|
||||
pub_socket.send_multipart([topic, data])
|
||||
|
||||
self.kill()
|
||||
|
||||
@@ -90,6 +97,7 @@ class RICommunicationAgent(Agent):
|
||||
logger.info("Setting up %s", self.jid)
|
||||
retries = 0
|
||||
|
||||
|
||||
# Let's try a certain amount of times before failing connection
|
||||
while retries < max_retries:
|
||||
# Bind request socket
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
from fastapi import APIRouter, Request
|
||||
import logging
|
||||
|
||||
from zmq import Socket
|
||||
|
||||
from control_backend.schemas.ri_message import SpeechCommand, RIEndpoint
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post("/command", status_code=202)
|
||||
async def receive_command(command: SpeechCommand, request: Request):
|
||||
# Validate and retrieve data.
|
||||
SpeechCommand.model_validate(command)
|
||||
topic = b"command"
|
||||
pub_socket: Socket = request.app.state.internal_comm_socket
|
||||
pub_socket.send_multipart([topic, command.model_dump_json().encode()])
|
||||
|
||||
return {"status": "Command received"}
|
||||
74
src/control_backend/api/v1/endpoints/robot.py
Normal file
74
src/control_backend/api/v1/endpoints/robot.py
Normal file
@@ -0,0 +1,74 @@
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
import logging
|
||||
import asyncio
|
||||
import zmq.asyncio
|
||||
import json
|
||||
import datetime
|
||||
|
||||
from zmq import Socket
|
||||
from control_backend.core.zmq_context import context
|
||||
from control_backend.core.config import settings
|
||||
from control_backend.schemas.ri_message import SpeechCommand, RIEndpoint
|
||||
|
||||
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post("/command", status_code=202)
|
||||
async def receive_command(command: SpeechCommand, request: Request):
|
||||
# Validate and retrieve data.
|
||||
SpeechCommand.model_validate(command)
|
||||
topic = b"command"
|
||||
pub_socket: Socket = request.app.state.internal_comm_socket
|
||||
pub_socket.send_multipart([topic, command.model_dump_json().encode()])
|
||||
|
||||
return {"status": "Command received"}
|
||||
|
||||
|
||||
@router.get("/ping_check")
|
||||
async def ping(request: Request):
|
||||
pass
|
||||
|
||||
|
||||
@router.get("/ping_stream")
|
||||
async def ping_stream(request: Request):
|
||||
"""Stream live updates whenever the device state changes."""
|
||||
async def event_stream():
|
||||
# Set up internal socket to receive ping updates
|
||||
logger.debug("Ping stream router event stream entered.")
|
||||
sub_socket = zmq.asyncio.Context().socket(zmq.SUB)
|
||||
sub_socket.connect(settings.zmq_settings.internal_comm_address)
|
||||
sub_socket.setsockopt(zmq.SUBSCRIBE, b"ping")
|
||||
connected = True
|
||||
|
||||
ping_frequency = 1 # How many seconds between ping attempts
|
||||
|
||||
# Even though its most likely the updates should alternate
|
||||
# So, True - False - True - False for connectivity.
|
||||
# Let's still check:)
|
||||
while True:
|
||||
logger.debug("Ping stream entered listening ")
|
||||
try:
|
||||
topic, body = await asyncio.wait_for(sub_socket.recv_multipart(), timeout=ping_frequency)
|
||||
logger.debug("got ping change in ping_stream router")
|
||||
connected = json.loads(body)
|
||||
except TimeoutError as e:
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Stop if client disconnected
|
||||
if await request.is_disconnected():
|
||||
print("Client disconnected from SSE")
|
||||
break
|
||||
|
||||
|
||||
logger.debug(f"Yielded new connection event in robot ping router: {str(connected)}")
|
||||
yield f"data: {str(connected)}, time:{str(datetime.datetime.now().strftime("%H:%M:%S"))}\n\n"
|
||||
|
||||
|
||||
|
||||
return StreamingResponse(event_stream(), media_type="text/event-stream")
|
||||
@@ -1,6 +1,6 @@
|
||||
from fastapi.routing import APIRouter
|
||||
|
||||
from control_backend.api.v1.endpoints import message, sse, command
|
||||
from control_backend.api.v1.endpoints import message, sse, robot
|
||||
|
||||
api_router = APIRouter()
|
||||
|
||||
@@ -8,4 +8,4 @@ api_router.include_router(message.router, tags=["Messages"])
|
||||
|
||||
api_router.include_router(sse.router, tags=["SSE"])
|
||||
|
||||
api_router.include_router(command.router, tags=["Commands"])
|
||||
api_router.include_router(robot.router, prefix="/robot", tags=["Pings", "Commands"])
|
||||
Reference in New Issue
Block a user