diff --git a/main.py b/main.py index 1b53eac..874dfc4 100644 --- a/main.py +++ b/main.py @@ -3,11 +3,16 @@ from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware +import zmq +import zmq.asyncio from fastapi.responses import StreamingResponse from pydantic import BaseModel import datetime import json; + +zmq_state = {} # Contains our sockets and context + # Use of Pydantic class for automatic request validation in FastAPI class Message(BaseModel): message: str @@ -21,80 +26,118 @@ async def lifespan(app: FastAPI): subport = 5555 pubport = 5556 - # asyncio.create_task(zmq_subscriber(subport)) - # asyncio.create_task(zmq_publisher(pubport)) + asyncio.create_task(zmq_subscriber(subport, app)) + asyncio.create_task(zmq_publisher(pubport, app)) + print("Set up both sub and pub") - sse_queue = asyncio.Queue() + app.state.sse_queue = asyncio.Queue() # Messages to send to UI + app.state.ri_queue = asyncio.Queue() # Messages to send to RI - # TODO: figure out how this relates in more than one port used. - # zmq_state["socket"] = socket - - print(f"ZeroMQ publisher bound to port {port}") + # Handle pings + app.state.received_ping = False + app.state.connected = False + app.state.connected_id = "" yield + - # zmq_state["socket"].close() - zmq_state["context"].term() - - - -async def zmq_subscriber(port): +async def zmq_subscriber(port, app): """ Set up the zmq subscriber to listen to the port given """ + print(f"Setting up ZMQ subscriber on port {port}") sub = zmq_state["context"].socket(zmq.SUB) sub.connect(f"tcp://localhost:{port}") sub.setsockopt_string(zmq.SUBSCRIBE, u"") + zmq_state["subsocket"] = sub + + print(f"Subscriber connected to localhost:{port}, waiting for messages...") + while True: + print(f"Listening for message from zmq sub on port {port}.") msg = await sub.recv_string() print(f"Received from SUB: {msg}") - + # We got a message, let's see what we want to do with it. - processMessage(msg) + await process_message_received(msg, app) -async def zmq_publisher(port): +async def zmq_publisher(port, app): """ Set up the zmq publisher to send to the port given """ + queue = app.state.ri_queue pub = zmq_state["context"].socket(zmq.PUB) pub.bind(f"tcp://*:{port}") + zmq_state["pubsocket"] = pub while True: - print() + if not queue.empty(): + # send different message to RI + return + if app.state.connected == True: + # (In case we have nothing else to send:) + # Let's ping our RI to see if they're still listening! + app.state.received_ping = False + zmq_state["pubsocket"].send_string("ping") + await asyncio.sleep(5) + # Let's see if we haven't returned a ping in the last 5 seconds... + if not app.state.received_ping == True: + # Let's send our UI a message the robot disappeared. + dataToSend = { + "event": "robot_disconnected", + "id": app.state.connected_id + } + # Reset connection details + app.state.connected = False + app.state.connectedID = "" + await put_message_in_ui_queue(dataToSend, app) + await asyncio.sleep(1) -async def processMessage(msg): + +async def put_message_in_ui_queue(data, app): + queue = app.state.sse_queue + await queue.put(data) + +async def put_message_in_ri_queue(data, app): + queue = app.state.ri_queue + await queue.put(data) + +async def process_message_received(msg, app): """ Process a raw received message to handle it correctly. """ - queue = app.lifespan.sse_queue + queue = app.state.sse_queue # string handling if type(msg) is str: - return - # do shit - - # json handling - else: try: + print("converting received data into json.") data = json.loads(msg) + print("converted data: ", data) + # Connection event - if (data["event"] is "robot_connected"): + if (data['event'] == 'robot_connected'): + print("robot connection event received.") if not data["id"]: return + + # Let our app know we're connected >:) + app.state.connected_id = data["id"] + app.state.connected = True + app.state.received_ping = True + + # Send data to UI name = data.get("name", "no name") port = data.get("port", "no port") - dataToSend = { "event": "robot_connected", "id": data["id"], "name": name, "port": port } - - queue.put(dataToSend) - return - + await queue.put(dataToSend) + # Disconnection event - if (data["event"] is "robot_disconnected"): + if (data['event'] == 'robot_disconnected'): if not data["id"]: return name = data.get("name", "no name") @@ -107,16 +150,26 @@ async def processMessage(msg): "port": port } - queue.put(dataToSend) + await queue.put(dataToSend) + + # Ping event + if (data['event'] == 'ping'): + print("ping received") + if not data["id"]: + print("no id given in ping event.") + return + + # TODO: You can add some logic here if the ID doens't match (so we switched robot at the same frame lol) + app.state.received_ping = True return except: - print("message received from RI, however, not a str or json.") + print("message received from RI, however, not a str or json, or another error has occured.") return - # do shit -app = FastAPI() + +app = FastAPI(lifespan=lifespan) # This middleware allows other origins to communicate with us app.add_middleware( @@ -140,6 +193,7 @@ async def sse_endpoint(request: Request): """ Endpoint for Server-Sent Events. """ + async def event_generator(): while True: # If connection to client closes, stop sending events @@ -147,12 +201,19 @@ async def sse_endpoint(request: Request): break # Let's check if we have to send any messages from our sse queue - if len(app.lifespan.sse_endpoint) is not 0: - yield app.life.see_endpoint.get() + queue = app.state.sse_queue + if not queue.empty(): + print("message queue not empty, fetching data.") + data = await queue.get() + data_json = json.dumps(data) + print(f"queue not empty. yielding msg to event_generator, msg: {data_json}\n\n") + yield f"data: {data_json}\n\n" + await asyncio.sleep(1) - # Send message containing current time every second - current_time = datetime.datetime.now().strftime("%H:%M:%S") - yield f"data: Server time: {current_time}\n\n" # \n\n is needed to separate events (SSE is text-based) - await asyncio.sleep(1) + else: + # Send message containing current time every second + current_time = datetime.datetime.now().strftime("%H:%M:%S") + yield f"data: Server time: {current_time}\n\n" # \n\n is needed to separate events (SSE is text-based) + await asyncio.sleep(1) return StreamingResponse(event_generator(), media_type="text/event-stream") # media_type specifies that this connection is for event streams