diff --git a/src/control_backend/main.py b/src/control_backend/main.py index 1543882..ff63e1f 100644 --- a/src/control_backend/main.py +++ b/src/control_backend/main.py @@ -3,6 +3,7 @@ # External imports import contextlib import logging +import threading import zmq from fastapi import FastAPI @@ -23,12 +24,7 @@ from control_backend.core.config import settings logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) - -@contextlib.asynccontextmanager -async def lifespan(app: FastAPI): - logger.info("%s starting up.", app.title) - - # Initiate sockets +def setup_sockets(): context = Context.instance() internal_pub_socket = context.socket(zmq.XPUB) @@ -38,8 +34,22 @@ async def lifespan(app: FastAPI): internal_sub_socket = context.socket(zmq.XSUB) internal_sub_socket.bind(settings.zmq_settings.internal_sub_address) logger.debug("Internal subscribing socket bound to %s", internal_sub_socket) + try: + zmq.proxy(internal_pub_socket, internal_sub_socket) + except zmq.ZMQError: + logger.warning("Error while handling PUB/SUB proxy. Closing sockets.") + finally: + internal_pub_socket.close() + internal_sub_socket.close() - zmq.proxy(internal_pub_socket, internal_sub_socket) +@contextlib.asynccontextmanager +async def lifespan(app: FastAPI): + logger.info("%s starting up.", app.title) + + # Initiate sockets + proxy_thread = threading.Thread(target=setup_sockets) + proxy_thread.daemon = True + proxy_thread.start() # Initiate agents ri_communication_agent = RICommunicationAgent(