import asyncio from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware import zmq import zmq.asyncio from fastapi.responses import StreamingResponse from pydantic import BaseModel import datetime import json; zmq_state = {} # Contains our sockets and context # Use of Pydantic class for automatic request validation in FastAPI class Message(BaseModel): message: str @asynccontextmanager async def lifespan(app: FastAPI): context = zmq.Context() # Set up sub and pub zmq_state["context"] = zmq.asyncio.Context() subport = 5555 pubport = 5556 asyncio.create_task(zmq_subscriber(subport, app)) asyncio.create_task(zmq_publisher(pubport, app)) print("Set up both sub and pub") app.state.sse_queue = asyncio.Queue() # Messages to send to UI app.state.ri_queue = asyncio.Queue() # Messages to send to RI # Handle pings app.state.received_ping = False app.state.connected = False app.state.connected_id = "" yield async def zmq_subscriber(port, app): """ Set up the zmq subscriber to listen to the port given """ print(f"Setting up ZMQ subscriber on port {port}") sub = zmq_state["context"].socket(zmq.SUB) sub.connect(f"tcp://localhost:{port}") sub.setsockopt_string(zmq.SUBSCRIBE, u"") zmq_state["subsocket"] = sub print(f"Subscriber connected to localhost:{port}, waiting for messages...") while True: print(f"Listening for message from zmq sub on port {port}.") msg = await sub.recv_string() print(f"Received from SUB: {msg}") # We got a message, let's see what we want to do with it. await process_message_received(msg, app) async def zmq_publisher(port, app): """ Set up the zmq publisher to send to the port given """ queue = app.state.ri_queue pub = zmq_state["context"].socket(zmq.PUB) pub.bind(f"tcp://*:{port}") zmq_state["pubsocket"] = pub while True: if not queue.empty(): # send different message to RI return if app.state.connected == True: # (In case we have nothing else to send:) # Let's ping our RI to see if they're still listening! app.state.received_ping = False zmq_state["pubsocket"].send_string("ping") await asyncio.sleep(5) # Let's see if we haven't returned a ping in the last 5 seconds... if not app.state.received_ping == True: # Let's send our UI a message the robot disappeared. dataToSend = { "event": "robot_disconnected", "id": app.state.connected_id } # Reset connection details app.state.connected = False app.state.connectedID = "" await put_message_in_ui_queue(dataToSend, app) await asyncio.sleep(1) async def put_message_in_ui_queue(data, app): queue = app.state.sse_queue await queue.put(data) async def put_message_in_ri_queue(data, app): queue = app.state.ri_queue await queue.put(data) async def process_message_received(msg, app): """ Process a raw received message to handle it correctly. """ queue = app.state.sse_queue # string handling if type(msg) is str: try: print("converting received data into json.") data = json.loads(msg) print("converted data: ", data) # Connection event if (data['event'] == 'robot_connected'): print("robot connection event received.") if not data["id"]: return # Let our app know we're connected >:) app.state.connected_id = data["id"] app.state.connected = True app.state.received_ping = True # Send data to UI name = data.get("name", "no name") port = data.get("port", "no port") dataToSend = { "event": "robot_connected", "id": data["id"], "name": name, "port": port } await queue.put(dataToSend) # Disconnection event if (data['event'] == 'robot_disconnected'): if not data["id"]: return name = data.get("name", "no name") port = data.get("port", "no port") dataToSend = { "event": "robot_disconnected", "id": data["id"], "name": name, "port": port } await queue.put(dataToSend) # Ping event if (data['event'] == 'ping'): print("ping received") if not data["id"]: print("no id given in ping event.") return # TODO: You can add some logic here if the ID doens't match (so we switched robot at the same frame lol) app.state.received_ping = True return except: print("message received from RI, however, not a str or json, or another error has occured.") return # do shit app = FastAPI(lifespan=lifespan) # This middleware allows other origins to communicate with us app.add_middleware( CORSMiddleware, # https://developer.mozilla.org/en-US/docs/Web/HTTP/Guides/CORS allow_origins=["http://localhost:5173"], # address of our UI application allow_methods=["*"], # GET, POST, etc. ) # Endpoint to receive messages from the UI @app.post("/message") async def receive_message(message: Message): """ Receives a message from the UI and prints it to the console. """ print(f"Received message: {message}") return { "status": "Message received" } # Endpoint for Server-Sent Events (SSE) @app.get("/sse") async def sse_endpoint(request: Request): """ Endpoint for Server-Sent Events. """ async def event_generator(): while True: # If connection to client closes, stop sending events if await request.is_disconnected(): break # Let's check if we have to send any messages from our sse queue queue = app.state.sse_queue if not queue.empty(): print("message queue not empty, fetching data.") data = await queue.get() data_json = json.dumps(data) print(f"queue not empty. yielding msg to event_generator, msg: {data_json}\n\n") yield f"data: {data_json}\n\n" await asyncio.sleep(1) else: # Send message containing current time every second current_time = datetime.datetime.now().strftime("%H:%M:%S") yield f"data: Server time: {current_time}\n\n" # \n\n is needed to separate events (SSE is text-based) await asyncio.sleep(1) return StreamingResponse(event_generator(), media_type="text/event-stream") # media_type specifies that this connection is for event streams