Files
pepperplus-cb/main.py
2025-09-27 10:36:59 +02:00

49 lines
1.6 KiB
Python

import asyncio
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import datetime
# Use of Pydantic class for automatic request validation in FastAPI
class Message(BaseModel):
message: str
app = FastAPI()
# This middleware allows other origins to communicate with us
app.add_middleware(
CORSMiddleware, # https://developer.mozilla.org/en-US/docs/Web/HTTP/Guides/CORS
allow_origins=["http://localhost:5173"], # address of our UI application
allow_methods=["*"], # GET, POST, etc.
)
# Endpoint to receive messages from the UI
@app.post("/message")
async def receive_message(message: Message):
"""
Receives a message from the UI and prints it to the console.
"""
print(f"Received message: {message}")
return { "status": "Message received" }
# Endpoint for Server-Sent Events (SSE)
@app.get("/sse")
async def sse_endpoint(request: Request):
"""
Endpoint for Server-Sent Events.
"""
async def event_generator():
while True:
# If connection to client closes, stop sending events
if await request.is_disconnected():
break
# Send message containing current time every second
current_time = datetime.datetime.now().strftime("%H:%M:%S")
yield f"data: Server time: {current_time}\n\n" # \n\n is needed to separate events (SSE is text-based)
await asyncio.sleep(1)
return StreamingResponse(event_generator(), media_type="text/event-stream") # media_type specifies that this connection is for event streams