32 lines
993 B
Python
32 lines
993 B
Python
import logging
|
|
|
|
from fastapi import APIRouter, Request
|
|
|
|
from control_backend.schemas.events import ButtonPressedEvent
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/button_pressed", status_code=202)
|
|
async def receive_button_event(event: ButtonPressedEvent, request: Request):
|
|
"""
|
|
Endpoint to handle external button press events.
|
|
|
|
Validates the event payload and publishes it to the internal 'button_pressed' topic.
|
|
Subscribers (in this case user_interrupt_agent) will pick this up to trigger
|
|
specific behaviors or state changes.
|
|
|
|
:param event: The parsed ButtonPressedEvent object.
|
|
:param request: The FastAPI request object.
|
|
"""
|
|
logger.debug("Received button event: %s | %s", event.type, event.context)
|
|
|
|
topic = b"button_pressed"
|
|
body = event.model_dump_json().encode()
|
|
|
|
pub_socket = request.app.state.endpoints_pub_socket
|
|
await pub_socket.send_multipart([topic, body])
|
|
|
|
return {"status": "Event received"}
|