From 9c7e3cd0dcfc2ca090c6307e664356fd4939ce22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Otgaar?= Date: Tue, 7 Oct 2025 16:23:37 +0200 Subject: [PATCH] feat: initial setup of SUB/PUB ports with json handling of sub messages and message queue to UI ref: N25B-151. --- main.py | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/main.py b/main.py index d067140..9f6fc4c 100644 --- a/main.py +++ b/main.py @@ -1,15 +1,121 @@ import asyncio +from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel import datetime +import json; # Use of Pydantic class for automatic request validation in FastAPI class Message(BaseModel): message: str +@asynccontextmanager +async def lifespan(app: FastAPI): + context = zmq.Context() + + # Set up sub and pub + zmq_state["context"] = zmq.asyncio.Context() + + subport = 5555 + pubport = 5556 + asyncio.create_task(zmq_subscriber(subport)) + asyncio.create_task(zmq_publisher(pubport)) + + sse_queue = asyncio.Queue() + + # TODO: figure out how this relates in more than one port used. + # zmq_state["socket"] = socket + + print(f"ZeroMQ publisher bound to port {port}") + + yield + + # zmq_state["socket"].close() + zmq_state["context"].term() + + + +async def zmq_subscriber(port): + """ + Set up the zmq subscriber to listen to the port given + """ + sub = zmq_state["context"].socket(zmq.SUB) + sub.connect(f"tcp://localhost:{port}") + sub.setsockopt_string(zmq.SUBSCRIBE, u"") + while True: + msg = await sub.recv_string() + print(f"Received from SUB: {msg}") + + # We got a message, let's see what we want to do with it. + processMessage(msg) + +async def zmq_publisher(port): + """ + Set up the zmq publisher to send to the port given + """ + pub = zmq_state["context"].socket(zmq.PUB) + pub.bind(f"tcp://*:{port}") + while True: + print() + +async def processMessage(msg): + """ + Process a raw received message to handle it correctly. + """ + queue = app.lifespan.sse_queue + # string handling + if type(msg) is str: + return + # do shit + + # json handling + else: + try: + data = json.loads(msg) + + # Connection event + if (data["event"] is "robot_connected"): + if not data["id"]: + return + name = data.get("name", "no name") + port = data.get("port", "no port") + + dataToSend = { + "event": "robot_connected", + "id": data["id"], + "name": name, + "port": port + } + + queue.put(dataToSend) + return + + # Disconnection event + if (data["event"] is "robot_disconnected"): + if not data["id"]: + return + name = data.get("name", "no name") + port = data.get("port", "no port") + + dataToSend = { + "event": "robot_disconnected", + "id": data["id"], + "name": name, + "port": port + } + + queue.put(dataToSend) + return + + except: + print("message received from RI, however, not a str or json.") + return + + # do shit + app = FastAPI() # This middleware allows other origins to communicate with us @@ -40,6 +146,10 @@ async def sse_endpoint(request: Request): if await request.is_disconnected(): break + # Let's check if we have to send any messages from our sse queue + if len(app.lifespan.sse_endpoint) is not 0: + yield app.life.see_endpoint.get() + # Send message containing current time every second current_time = datetime.datetime.now().strftime("%H:%M:%S") yield f"data: Server time: {current_time}\n\n" # \n\n is needed to separate events (SSE is text-based)