""" This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences) """ import asyncio import logging import zmq import zmq.asyncio from fastapi import APIRouter, Request from fastapi.responses import StreamingResponse from zmq.asyncio import Context from control_backend.core.config import settings from control_backend.schemas.events import ButtonPressedEvent logger = logging.getLogger(__name__) router = APIRouter() @router.post("/button_pressed", status_code=202) async def receive_button_event(event: ButtonPressedEvent, request: Request): """ Endpoint to handle external button press events. Validates the event payload and publishes it to the internal 'button_pressed' topic. Subscribers (in this case user_interrupt_agent) will pick this up to trigger specific behaviors or state changes. :param event: The parsed ButtonPressedEvent object. :param request: The FastAPI request object. """ logger.debug("Received button event: %s | %s", event.type, event.context) topic = b"button_pressed" body = event.model_dump_json().encode() pub_socket = request.app.state.endpoints_pub_socket await pub_socket.send_multipart([topic, body]) return {"status": "Event received"} @router.get("/experiment_stream") async def experiment_stream(request: Request): # Use the asyncio-compatible context context = Context.instance() socket = context.socket(zmq.SUB) # Connect and subscribe socket.connect(settings.zmq_settings.internal_sub_address) socket.subscribe(b"experiment") async def gen(): try: while True: # Check if client closed the tab if await request.is_disconnected(): logger.error("Client disconnected from experiment stream.") break try: parts = await asyncio.wait_for(socket.recv_multipart(), timeout=10.0) _, message = parts yield f"data: {message.decode().strip()}\n\n" except TimeoutError: continue finally: socket.close() return StreamingResponse(gen(), media_type="text/event-stream") @router.get("/status_stream") async def status_stream(request: Request): context = Context.instance() socket = context.socket(zmq.SUB) socket.connect(settings.zmq_settings.internal_sub_address) socket.subscribe(b"status") async def gen(): try: while True: if await request.is_disconnected(): break try: # Shorter timeout since this is frequent parts = await asyncio.wait_for(socket.recv_multipart(), timeout=0.5) _, message = parts yield f"data: {message.decode().strip()}\n\n" except TimeoutError: yield ": ping\n\n" # Keep the connection alive continue finally: socket.close() return StreamingResponse(gen(), media_type="text/event-stream")