Refactored ZMQ context implementation #16
@@ -3,6 +3,7 @@
|
|||||||
# External imports
|
# External imports
|
||||||
import contextlib
|
import contextlib
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
|
|
||||||
import zmq
|
import zmq
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
@@ -23,12 +24,7 @@ from control_backend.core.config import settings
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
|
||||||
|
def setup_sockets():
|
||||||
@contextlib.asynccontextmanager
|
|
||||||
async def lifespan(app: FastAPI):
|
|
||||||
logger.info("%s starting up.", app.title)
|
|
||||||
|
|
||||||
# Initiate sockets
|
|
||||||
context = Context.instance()
|
context = Context.instance()
|
||||||
|
|
||||||
internal_pub_socket = context.socket(zmq.XPUB)
|
internal_pub_socket = context.socket(zmq.XPUB)
|
||||||
@@ -38,8 +34,22 @@ async def lifespan(app: FastAPI):
|
|||||||
internal_sub_socket = context.socket(zmq.XSUB)
|
internal_sub_socket = context.socket(zmq.XSUB)
|
||||||
internal_sub_socket.bind(settings.zmq_settings.internal_sub_address)
|
internal_sub_socket.bind(settings.zmq_settings.internal_sub_address)
|
||||||
logger.debug("Internal subscribing socket bound to %s", internal_sub_socket)
|
logger.debug("Internal subscribing socket bound to %s", internal_sub_socket)
|
||||||
|
try:
|
||||||
|
zmq.proxy(internal_pub_socket, internal_sub_socket)
|
||||||
|
except zmq.ZMQError:
|
||||||
|
logger.warning("Error while handling PUB/SUB proxy. Closing sockets.")
|
||||||
|
finally:
|
||||||
|
internal_pub_socket.close()
|
||||||
|
internal_sub_socket.close()
|
||||||
|
|
||||||
zmq.proxy(internal_pub_socket, internal_sub_socket)
|
@contextlib.asynccontextmanager
|
||||||
|
async def lifespan(app: FastAPI):
|
||||||
|
logger.info("%s starting up.", app.title)
|
||||||
|
|
||||||
|
# Initiate sockets
|
||||||
|
proxy_thread = threading.Thread(target=setup_sockets)
|
||||||
|
proxy_thread.daemon = True
|
||||||
|
proxy_thread.start()
|
||||||
|
|
||||||
# Initiate agents
|
# Initiate agents
|
||||||
ri_communication_agent = RICommunicationAgent(
|
ri_communication_agent = RICommunicationAgent(
|
||||||
|
|||||||
Reference in New Issue
Block a user