Files
pepperplus-cb/src/control_backend/main.py
2026-01-29 17:16:38 +01:00

190 lines
5.9 KiB
Python

"""
Control Backend Main Application.
This module defines the FastAPI application that serves as the entry point for the
Control Backend. It manages the lifecycle of the entire system, including:
1. **Socket Initialization**: Setting up the internal ZeroMQ PUB/SUB proxy for agent communication.
2. **Agent Management**: Instantiating and starting all agents.
3. **API Routing**: Exposing REST endpoints for external interaction.
Lifecycle Manager
-----------------
The :func:`lifespan` context manager handles the startup and shutdown sequences:
- **Startup**: Configures logging, starts the ZMQ proxy, connects sockets, and launches agents.
- **Shutdown**: Handles graceful cleanup (though currently minimal).
"""
import contextlib
import logging
import threading
import zmq
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from zmq.asyncio import Context
# BDI agents
from control_backend.agents.bdi import (
BDICoreAgent,
TextBeliefExtractorAgent,
)
from control_backend.agents.bdi.bdi_program_manager import BDIProgramManager
# Communication agents
from control_backend.agents.communication import RICommunicationAgent
# Emotional Agents
# LLM Agents
from control_backend.agents.llm import LLMAgent
# User Interrupt Agent
from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent
# Other backend imports
from control_backend.api.v1.router import api_router
from control_backend.core.config import settings
from control_backend.logging import setup_logging
from control_backend.schemas.program_status import PROGRAM_STATUS, ProgramStatus
logger = logging.getLogger(__name__)
def setup_sockets():
"""
Initialize and run the internal ZeroMQ Proxy (XPUB/XSUB).
This proxy acts as the central message bus, forwarding messages published on the
internal PUB address to all subscribers on the internal SUB address.
"""
context = Context.instance()
internal_pub_socket = context.socket(zmq.XPUB)
internal_pub_socket.bind(settings.zmq_settings.internal_sub_address)
logger.debug("Internal publishing socket bound to %s", internal_pub_socket)
internal_sub_socket = context.socket(zmq.XSUB)
internal_sub_socket.bind(settings.zmq_settings.internal_pub_address)
logger.debug("Internal subscribing socket bound to %s", internal_sub_socket)
try:
zmq.proxy(internal_sub_socket, internal_pub_socket)
except zmq.ZMQError:
logger.warning("Error while handling PUB/SUB proxy. Closing sockets.")
finally:
internal_pub_socket.close()
internal_sub_socket.close()
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
"""
Application lifespan context manager to handle startup and shutdown events.
"""
# --- APPLICATION STARTUP ---
setup_logging()
logger.info("%s is starting up.", app.title)
# Initiate sockets
proxy_thread = threading.Thread(target=setup_sockets)
proxy_thread.daemon = True
proxy_thread.start()
context = Context.instance()
endpoints_pub_socket = context.socket(zmq.PUB)
endpoints_pub_socket.connect(settings.zmq_settings.internal_pub_address)
app.state.endpoints_pub_socket = endpoints_pub_socket
await endpoints_pub_socket.send_multipart([PROGRAM_STATUS, ProgramStatus.STARTING.value])
# --- Initialize Agents ---
logger.info("Initializing and starting agents.")
agents_to_start = {
"RICommunicationAgent": (
RICommunicationAgent,
{
"name": settings.agent_settings.ri_communication_name,
"address": settings.zmq_settings.ri_communication_address,
"bind": True,
},
),
"LLMAgent": (
LLMAgent,
{
"name": settings.agent_settings.llm_name,
},
),
"BDICoreAgent": (
BDICoreAgent,
{
"name": settings.agent_settings.bdi_core_name,
},
),
"TextBeliefExtractorAgent": (
TextBeliefExtractorAgent,
{
"name": settings.agent_settings.text_belief_extractor_name,
},
),
"ProgramManagerAgent": (
BDIProgramManager,
{
"name": settings.agent_settings.bdi_program_manager_name,
},
),
"UserInterruptAgent": (
UserInterruptAgent,
{
"name": settings.agent_settings.user_interrupt_name,
},
),
}
agents = []
for name, (agent_class, kwargs) in agents_to_start.items():
try:
logger.debug("Starting agent: %s", name)
agent_instance = agent_class(**kwargs)
await agent_instance.start()
agents.append(agent_instance)
logger.info("Agent '%s' started successfully.", name)
except Exception as e:
logger.error("Failed to start agent '%s': %s", name, e, exc_info=True)
raise
logger.info("Application startup complete.")
await endpoints_pub_socket.send_multipart([PROGRAM_STATUS, ProgramStatus.RUNNING.value])
yield
# --- APPLICATION SHUTDOWN ---
logger.info("%s is shutting down.", app.title)
await endpoints_pub_socket.send_multipart([PROGRAM_STATUS, ProgramStatus.STOPPING.value])
# Additional shutdown logic goes here
for agent in agents:
await agent.stop()
logger.info("Application shutdown complete.")
# if __name__ == "__main__":
app = FastAPI(title=settings.app_title, lifespan=lifespan)
# This middleware allows other origins to communicate with us
app.add_middleware(
CORSMiddleware, # https://developer.mozilla.org/en-US/docs/Web/HTTP/Guides/CORS
allow_origins=[settings.ui_url], # address of our UI application
allow_methods=["*"], # GET, POST, etc.
)
app.include_router(api_router, prefix="") # TODO: make prefix /api/v1
@app.get("/")
async def root():
return {"status": "ok"}