Files
pepperplus-cb/main.py
2025-10-07 18:30:39 +02:00

159 lines
4.5 KiB
Python

import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import datetime
import json;
# Use of Pydantic class for automatic request validation in FastAPI
class Message(BaseModel):
message: str
@asynccontextmanager
async def lifespan(app: FastAPI):
context = zmq.Context()
# Set up sub and pub
zmq_state["context"] = zmq.asyncio.Context()
subport = 5555
pubport = 5556
# asyncio.create_task(zmq_subscriber(subport))
# asyncio.create_task(zmq_publisher(pubport))
sse_queue = asyncio.Queue()
# TODO: figure out how this relates in more than one port used.
# zmq_state["socket"] = socket
print(f"ZeroMQ publisher bound to port {port}")
yield
# zmq_state["socket"].close()
zmq_state["context"].term()
async def zmq_subscriber(port):
"""
Set up the zmq subscriber to listen to the port given
"""
sub = zmq_state["context"].socket(zmq.SUB)
sub.connect(f"tcp://localhost:{port}")
sub.setsockopt_string(zmq.SUBSCRIBE, u"")
while True:
msg = await sub.recv_string()
print(f"Received from SUB: {msg}")
# We got a message, let's see what we want to do with it.
processMessage(msg)
async def zmq_publisher(port):
"""
Set up the zmq publisher to send to the port given
"""
pub = zmq_state["context"].socket(zmq.PUB)
pub.bind(f"tcp://*:{port}")
while True:
print()
async def processMessage(msg):
"""
Process a raw received message to handle it correctly.
"""
queue = app.lifespan.sse_queue
# string handling
if type(msg) is str:
return
# do shit
# json handling
else:
try:
data = json.loads(msg)
# Connection event
if (data["event"] is "robot_connected"):
if not data["id"]:
return
name = data.get("name", "no name")
port = data.get("port", "no port")
dataToSend = {
"event": "robot_connected",
"id": data["id"],
"name": name,
"port": port
}
queue.put(dataToSend)
return
# Disconnection event
if (data["event"] is "robot_disconnected"):
if not data["id"]:
return
name = data.get("name", "no name")
port = data.get("port", "no port")
dataToSend = {
"event": "robot_disconnected",
"id": data["id"],
"name": name,
"port": port
}
queue.put(dataToSend)
return
except:
print("message received from RI, however, not a str or json.")
return
# do shit
app = FastAPI()
# This middleware allows other origins to communicate with us
app.add_middleware(
CORSMiddleware, # https://developer.mozilla.org/en-US/docs/Web/HTTP/Guides/CORS
allow_origins=["http://localhost:5173"], # address of our UI application
allow_methods=["*"], # GET, POST, etc.
)
# Endpoint to receive messages from the UI
@app.post("/message")
async def receive_message(message: Message):
"""
Receives a message from the UI and prints it to the console.
"""
print(f"Received message: {message}")
return { "status": "Message received" }
# Endpoint for Server-Sent Events (SSE)
@app.get("/sse")
async def sse_endpoint(request: Request):
"""
Endpoint for Server-Sent Events.
"""
async def event_generator():
while True:
# If connection to client closes, stop sending events
if await request.is_disconnected():
break
# Let's check if we have to send any messages from our sse queue
if len(app.lifespan.sse_endpoint) is not 0:
yield app.life.see_endpoint.get()
# Send message containing current time every second
current_time = datetime.datetime.now().strftime("%H:%M:%S")
yield f"data: Server time: {current_time}\n\n" # \n\n is needed to separate events (SSE is text-based)
await asyncio.sleep(1)
return StreamingResponse(event_generator(), media_type="text/event-stream") # media_type specifies that this connection is for event streams