Files
pepperplus-cb/src/control_backend/api/v1/endpoints/sse_ping.py
Björn Otgaar d71cb60523 fix: gitignore + testing map structure
ref: N25B-205
2025-10-22 12:41:47 +02:00

27 lines
893 B
Python

from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
import datetime
import asyncio
router = APIRouter()
@router.get("/sse_ping")
async def sse_ping(request: Request):
"""
Endpoint for Server-Sent Events.
"""
async def event_generator():
while True:
# If connection to client closes, stop sending events
if await request.is_disconnected():
break
# Send message containing current time every second
current_time = datetime.datetime.now().strftime("%H:%M:%S")
yield f"data: Server time: {current_time}\n\n" # \n\n is needed to separate events (SSE is text-based)
await asyncio.sleep(1)
return StreamingResponse(event_generator(), media_type="text/event-stream") # media_type specifies that this connection is for event streams