Files
pepperplus-cb/main.py
Björn Otgaar 6ebdde3836 feat: automatically pings RI for disconnection,
handles disconnection events and sends
disconenction messages to UI.

ref: N25B-151
2025-10-08 14:33:23 +02:00

220 lines
7.3 KiB
Python

import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
import zmq
import zmq.asyncio
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import datetime
import json;
zmq_state = {} # Contains our sockets and context
# Use of Pydantic class for automatic request validation in FastAPI
class Message(BaseModel):
message: str
@asynccontextmanager
async def lifespan(app: FastAPI):
context = zmq.Context()
# Set up sub and pub
zmq_state["context"] = zmq.asyncio.Context()
subport = 5555
pubport = 5556
asyncio.create_task(zmq_subscriber(subport, app))
asyncio.create_task(zmq_publisher(pubport, app))
print("Set up both sub and pub")
app.state.sse_queue = asyncio.Queue() # Messages to send to UI
app.state.ri_queue = asyncio.Queue() # Messages to send to RI
# Handle pings
app.state.received_ping = False
app.state.connected = False
app.state.connected_id = ""
yield
async def zmq_subscriber(port, app):
"""
Set up the zmq subscriber to listen to the port given
"""
print(f"Setting up ZMQ subscriber on port {port}")
sub = zmq_state["context"].socket(zmq.SUB)
sub.connect(f"tcp://localhost:{port}")
sub.setsockopt_string(zmq.SUBSCRIBE, u"")
zmq_state["subsocket"] = sub
print(f"Subscriber connected to localhost:{port}, waiting for messages...")
while True:
print(f"Listening for message from zmq sub on port {port}.")
msg = await sub.recv_string()
print(f"Received from SUB: {msg}")
# We got a message, let's see what we want to do with it.
await process_message_received(msg, app)
async def zmq_publisher(port, app):
"""
Set up the zmq publisher to send to the port given
"""
queue = app.state.ri_queue
pub = zmq_state["context"].socket(zmq.PUB)
pub.bind(f"tcp://*:{port}")
zmq_state["pubsocket"] = pub
while True:
if not queue.empty():
# send different message to RI
return
if app.state.connected == True:
# (In case we have nothing else to send:)
# Let's ping our RI to see if they're still listening!
app.state.received_ping = False
zmq_state["pubsocket"].send_string("ping")
await asyncio.sleep(5)
# Let's see if we haven't returned a ping in the last 5 seconds...
if not app.state.received_ping == True:
# Let's send our UI a message the robot disappeared.
dataToSend = {
"event": "robot_disconnected",
"id": app.state.connected_id
}
# Reset connection details
app.state.connected = False
app.state.connectedID = ""
await put_message_in_ui_queue(dataToSend, app)
await asyncio.sleep(1)
async def put_message_in_ui_queue(data, app):
queue = app.state.sse_queue
await queue.put(data)
async def put_message_in_ri_queue(data, app):
queue = app.state.ri_queue
await queue.put(data)
async def process_message_received(msg, app):
"""
Process a raw received message to handle it correctly.
"""
queue = app.state.sse_queue
# string handling
if type(msg) is str:
try:
print("converting received data into json.")
data = json.loads(msg)
print("converted data: ", data)
# Connection event
if (data['event'] == 'robot_connected'):
print("robot connection event received.")
if not data["id"]:
return
# Let our app know we're connected >:)
app.state.connected_id = data["id"]
app.state.connected = True
app.state.received_ping = True
# Send data to UI
name = data.get("name", "no name")
port = data.get("port", "no port")
dataToSend = {
"event": "robot_connected",
"id": data["id"],
"name": name,
"port": port
}
await queue.put(dataToSend)
# Disconnection event
if (data['event'] == 'robot_disconnected'):
if not data["id"]:
return
name = data.get("name", "no name")
port = data.get("port", "no port")
dataToSend = {
"event": "robot_disconnected",
"id": data["id"],
"name": name,
"port": port
}
await queue.put(dataToSend)
# Ping event
if (data['event'] == 'ping'):
print("ping received")
if not data["id"]:
print("no id given in ping event.")
return
# TODO: You can add some logic here if the ID doens't match (so we switched robot at the same frame lol)
app.state.received_ping = True
return
except:
print("message received from RI, however, not a str or json, or another error has occured.")
return
# do shit
app = FastAPI(lifespan=lifespan)
# This middleware allows other origins to communicate with us
app.add_middleware(
CORSMiddleware, # https://developer.mozilla.org/en-US/docs/Web/HTTP/Guides/CORS
allow_origins=["http://localhost:5173"], # address of our UI application
allow_methods=["*"], # GET, POST, etc.
)
# Endpoint to receive messages from the UI
@app.post("/message")
async def receive_message(message: Message):
"""
Receives a message from the UI and prints it to the console.
"""
print(f"Received message: {message}")
return { "status": "Message received" }
# Endpoint for Server-Sent Events (SSE)
@app.get("/sse")
async def sse_endpoint(request: Request):
"""
Endpoint for Server-Sent Events.
"""
async def event_generator():
while True:
# If connection to client closes, stop sending events
if await request.is_disconnected():
break
# Let's check if we have to send any messages from our sse queue
queue = app.state.sse_queue
if not queue.empty():
print("message queue not empty, fetching data.")
data = await queue.get()
data_json = json.dumps(data)
print(f"queue not empty. yielding msg to event_generator, msg: {data_json}\n\n")
yield f"data: {data_json}\n\n"
await asyncio.sleep(1)
else:
# Send message containing current time every second
current_time = datetime.datetime.now().strftime("%H:%M:%S")
yield f"data: Server time: {current_time}\n\n" # \n\n is needed to separate events (SSE is text-based)
await asyncio.sleep(1)
return StreamingResponse(event_generator(), media_type="text/event-stream") # media_type specifies that this connection is for event streams