feat: initial setup of SUB/PUB ports with json

handling of sub messages and message queue to UI

ref: N25B-151.
This commit is contained in:
Björn Otgaar
2025-10-07 16:23:37 +02:00
parent 6e7c78e888
commit 9c7e3cd0dc

110
main.py
View File

@@ -1,15 +1,121 @@
import asyncio import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
from pydantic import BaseModel from pydantic import BaseModel
import datetime import datetime
import json;
# Use of Pydantic class for automatic request validation in FastAPI # Use of Pydantic class for automatic request validation in FastAPI
class Message(BaseModel): class Message(BaseModel):
message: str message: str
@asynccontextmanager
async def lifespan(app: FastAPI):
context = zmq.Context()
# Set up sub and pub
zmq_state["context"] = zmq.asyncio.Context()
subport = 5555
pubport = 5556
asyncio.create_task(zmq_subscriber(subport))
asyncio.create_task(zmq_publisher(pubport))
sse_queue = asyncio.Queue()
# TODO: figure out how this relates in more than one port used.
# zmq_state["socket"] = socket
print(f"ZeroMQ publisher bound to port {port}")
yield
# zmq_state["socket"].close()
zmq_state["context"].term()
async def zmq_subscriber(port):
"""
Set up the zmq subscriber to listen to the port given
"""
sub = zmq_state["context"].socket(zmq.SUB)
sub.connect(f"tcp://localhost:{port}")
sub.setsockopt_string(zmq.SUBSCRIBE, u"")
while True:
msg = await sub.recv_string()
print(f"Received from SUB: {msg}")
# We got a message, let's see what we want to do with it.
processMessage(msg)
async def zmq_publisher(port):
"""
Set up the zmq publisher to send to the port given
"""
pub = zmq_state["context"].socket(zmq.PUB)
pub.bind(f"tcp://*:{port}")
while True:
print()
async def processMessage(msg):
"""
Process a raw received message to handle it correctly.
"""
queue = app.lifespan.sse_queue
# string handling
if type(msg) is str:
return
# do shit
# json handling
else:
try:
data = json.loads(msg)
# Connection event
if (data["event"] is "robot_connected"):
if not data["id"]:
return
name = data.get("name", "no name")
port = data.get("port", "no port")
dataToSend = {
"event": "robot_connected",
"id": data["id"],
"name": name,
"port": port
}
queue.put(dataToSend)
return
# Disconnection event
if (data["event"] is "robot_disconnected"):
if not data["id"]:
return
name = data.get("name", "no name")
port = data.get("port", "no port")
dataToSend = {
"event": "robot_disconnected",
"id": data["id"],
"name": name,
"port": port
}
queue.put(dataToSend)
return
except:
print("message received from RI, however, not a str or json.")
return
# do shit
app = FastAPI() app = FastAPI()
# This middleware allows other origins to communicate with us # This middleware allows other origins to communicate with us
@@ -40,6 +146,10 @@ async def sse_endpoint(request: Request):
if await request.is_disconnected(): if await request.is_disconnected():
break break
# Let's check if we have to send any messages from our sse queue
if len(app.lifespan.sse_endpoint) is not 0:
yield app.life.see_endpoint.get()
# Send message containing current time every second # Send message containing current time every second
current_time = datetime.datetime.now().strftime("%H:%M:%S") current_time = datetime.datetime.now().strftime("%H:%M:%S")
yield f"data: Server time: {current_time}\n\n" # \n\n is needed to separate events (SSE is text-based) yield f"data: Server time: {current_time}\n\n" # \n\n is needed to separate events (SSE is text-based)