101 lines
3.2 KiB
Python
101 lines
3.2 KiB
Python
"""
|
|
This program has been developed by students from the bachelor Computer Science at Utrecht
|
|
University within the Software Project course.
|
|
© Copyright Utrecht University (Department of Information and Computing Sciences)
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
|
|
import zmq
|
|
import zmq.asyncio
|
|
from fastapi import APIRouter, Request
|
|
from fastapi.responses import StreamingResponse
|
|
from zmq.asyncio import Context
|
|
|
|
from control_backend.core.config import settings
|
|
from control_backend.schemas.events import ButtonPressedEvent
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/button_pressed", status_code=202)
|
|
async def receive_button_event(event: ButtonPressedEvent, request: Request):
|
|
"""
|
|
Endpoint to handle external button press events.
|
|
|
|
Validates the event payload and publishes it to the internal 'button_pressed' topic.
|
|
Subscribers (in this case user_interrupt_agent) will pick this up to trigger
|
|
specific behaviors or state changes.
|
|
|
|
:param event: The parsed ButtonPressedEvent object.
|
|
:param request: The FastAPI request object.
|
|
"""
|
|
logger.debug("Received button event: %s | %s", event.type, event.context)
|
|
|
|
topic = b"button_pressed"
|
|
body = event.model_dump_json().encode()
|
|
|
|
pub_socket = request.app.state.endpoints_pub_socket
|
|
await pub_socket.send_multipart([topic, body])
|
|
|
|
return {"status": "Event received"}
|
|
|
|
|
|
@router.get("/experiment_stream")
|
|
async def experiment_stream(request: Request):
|
|
# Use the asyncio-compatible context
|
|
context = Context.instance()
|
|
socket = context.socket(zmq.SUB)
|
|
|
|
# Connect and subscribe
|
|
socket.connect(settings.zmq_settings.internal_sub_address)
|
|
socket.subscribe(b"experiment")
|
|
|
|
async def gen():
|
|
try:
|
|
while True:
|
|
# Check if client closed the tab
|
|
if await request.is_disconnected():
|
|
logger.error("Client disconnected from experiment stream.")
|
|
break
|
|
|
|
try:
|
|
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=10.0)
|
|
_, message = parts
|
|
yield f"data: {message.decode().strip()}\n\n"
|
|
except TimeoutError:
|
|
continue
|
|
finally:
|
|
socket.close()
|
|
|
|
return StreamingResponse(gen(), media_type="text/event-stream")
|
|
|
|
|
|
@router.get("/status_stream")
|
|
async def status_stream(request: Request):
|
|
context = Context.instance()
|
|
socket = context.socket(zmq.SUB)
|
|
socket.connect(settings.zmq_settings.internal_sub_address)
|
|
|
|
socket.subscribe(b"status")
|
|
|
|
async def gen():
|
|
try:
|
|
while True:
|
|
if await request.is_disconnected():
|
|
break
|
|
try:
|
|
# Shorter timeout since this is frequent
|
|
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=0.5)
|
|
_, message = parts
|
|
yield f"data: {message.decode().strip()}\n\n"
|
|
except TimeoutError:
|
|
yield ": ping\n\n" # Keep the connection alive
|
|
continue
|
|
finally:
|
|
socket.close()
|
|
|
|
return StreamingResponse(gen(), media_type="text/event-stream")
|