feat: add basic UI2CB and CB2UI communication

The Python application exposes an endpoint /message for the UI to send
messages to.

It also exposes an SSE endpoint /sse for the UI to listen to. Every
second, the CB sends the current time to UI.

ref: N25B-107
ref: N25B-110
This commit is contained in:
Kasper
2025-09-26 21:44:48 +02:00
parent cc9bfbb777
commit 349fcb5ac1

41
main.py
View File

@@ -1,6 +1,39 @@
def main(): import asyncio
print("Hello from pepperplus-cb!") from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import datetime
from sse_starlette import EventSourceResponse
if __name__ == "__main__": class Message(BaseModel):
main() message: str
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/message")
async def receive_message(message: Message):
print(f"Received message: {message}")
return { "status": "Message received" }
@app.get("/sse")
async def sse_endpoint(request: Request):
async def event_generator():
while True:
if await request.is_disconnected():
break
current_time = datetime.datetime.now().strftime("%H:%M:%S")
yield f"data: Server time: {current_time}\n\n"
await asyncio.sleep(1)
return EventSourceResponse(event_generator())