chore: add unit test for router and implement command router

ref: N25B-205
This commit is contained in:
Björn Otgaar
2025-10-23 16:45:41 +02:00
parent 1f8d769762
commit a2a04740e5
5 changed files with 79 additions and 52 deletions

View File

@@ -3,21 +3,18 @@ import logging
from zmq import Socket
from control_backend.schemas.message import Message
from control_backend.schemas.ri_message import SpeechCommand, RIEndpoint
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/message", status_code=202)
async def receive_message(message: Message, request: Request):
logger.info("Received message: %s", message.message)
topic = b"message"
body = message.model_dump_json().encode("utf-8")
@router.post("/command", status_code=202)
async def receive_command(command: SpeechCommand, request: Request):
# Validate and retrieve data.
SpeechCommand.model_validate(command)
topic = b"command"
pub_socket: Socket = request.app.state.internal_comm_socket
pub_socket.send_multipart([topic, command])
pub_socket.send_multipart([topic, body])
return {"status": "Message received"}
return {"status": "Command received"}

View File

@@ -1,6 +1,6 @@
from fastapi.routing import APIRouter
from control_backend.api.v1.endpoints import message, sse
from control_backend.api.v1.endpoints import message, sse, command
api_router = APIRouter()
@@ -12,4 +12,9 @@ api_router.include_router(
api_router.include_router(
sse.router,
tags=["SSE"]
)
api_router.include_router(
command.router,
tags=["Commands"]
)