feat: added extra endpoint for norm pings

also made sure that you cannot skip phase on end phase

ref: N25B-400
This commit is contained in:
Pim Hutting
2026-01-16 14:28:27 +01:00
parent 041fc4ab6e
commit 6d03ba8a41
3 changed files with 56 additions and 14 deletions

View File

@@ -52,11 +52,11 @@ async def experiment_stream(request: Request):
while True:
# Check if client closed the tab
if await request.is_disconnected():
logger.info("Client disconnected from experiment stream.")
logger.error("Client disconnected from experiment stream.")
break
try:
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=1.0)
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=10.0)
_, message = parts
yield f"data: {message.decode().strip()}\n\n"
except TimeoutError:
@@ -65,3 +65,30 @@ async def experiment_stream(request: Request):
socket.close()
return StreamingResponse(gen(), media_type="text/event-stream")
@router.get("/status_stream")
async def status_stream(request: Request):
context = Context.instance()
socket = context.socket(zmq.SUB)
socket.connect(settings.zmq_settings.internal_sub_address)
socket.subscribe(b"status")
async def gen():
try:
while True:
if await request.is_disconnected():
break
try:
# Shorter timeout since this is frequent
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=0.5)
_, message = parts
yield f"data: {message.decode().strip()}\n\n"
except TimeoutError:
yield ": ping\n\n" # Keep the connection alive
continue
finally:
socket.close()
return StreamingResponse(gen(), media_type="text/event-stream")