feat: send logs to UI
Added SSE endpoint `/logs/stream` for the UI to listen to logs. ref: N25B-242
This commit is contained in:
@@ -15,7 +15,7 @@ formatters:
|
||||
# User-facing UI (structured JSON)
|
||||
json_experiment:
|
||||
(): "pythonjsonlogger.jsonlogger.JsonFormatter"
|
||||
format: "{asctime} {name} {levelname} {message}"
|
||||
format: "{name} {levelname} {levelno} {message} {created} {relativeCreated}"
|
||||
style: "{"
|
||||
|
||||
handlers:
|
||||
@@ -28,7 +28,6 @@ handlers:
|
||||
class: zmq.log.handlers.PUBHandler
|
||||
level: DEBUG
|
||||
formatter: json_experiment
|
||||
interface_or_socket: "PLACEHOLDER"
|
||||
|
||||
# Level of external libraries
|
||||
root:
|
||||
@@ -37,5 +36,5 @@ root:
|
||||
|
||||
loggers:
|
||||
control_backend:
|
||||
level: INFO
|
||||
level: DEBUG
|
||||
handlers: [ui]
|
||||
|
||||
33
src/control_backend/api/v1/endpoints/logs.py
Normal file
33
src/control_backend/api/v1/endpoints/logs.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import logging
|
||||
|
||||
import zmq
|
||||
from fastapi import APIRouter
|
||||
from fastapi.responses import StreamingResponse
|
||||
from pyjabber.server_parameters import json
|
||||
from zmq.asyncio import Context
|
||||
|
||||
from control_backend.core.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/logs/stream")
|
||||
async def log_stream():
|
||||
context = Context.instance()
|
||||
socket = context.socket(zmq.SUB)
|
||||
|
||||
for level in logging.getLevelNamesMapping():
|
||||
socket.subscribe(topic=level)
|
||||
|
||||
socket.connect(settings.zmq_settings.internal_sub_address)
|
||||
|
||||
async def gen():
|
||||
while True:
|
||||
_, message = await socket.recv_multipart()
|
||||
message = message.decode().strip()
|
||||
json_data = json.dumps(message)
|
||||
yield f"data: {json_data}\n\n"
|
||||
|
||||
return StreamingResponse(gen(), media_type="text/event-stream")
|
||||
@@ -1,6 +1,6 @@
|
||||
from fastapi.routing import APIRouter
|
||||
|
||||
from control_backend.api.v1.endpoints import command, message, sse
|
||||
from control_backend.api.v1.endpoints import command, logs, message, sse
|
||||
|
||||
api_router = APIRouter()
|
||||
|
||||
@@ -9,3 +9,5 @@ api_router.include_router(message.router, tags=["Messages"])
|
||||
api_router.include_router(sse.router, tags=["SSE"])
|
||||
|
||||
api_router.include_router(command.router, tags=["Commands"])
|
||||
|
||||
api_router.include_router(logs.router, tags=["Logs"])
|
||||
|
||||
@@ -3,6 +3,9 @@ import logging.config
|
||||
import os
|
||||
|
||||
import yaml
|
||||
import zmq
|
||||
|
||||
from control_backend.core.config import settings
|
||||
|
||||
|
||||
def add_logging_level(level_name: str, level_num: int, method_name: str | None = None) -> None:
|
||||
@@ -38,13 +41,19 @@ def setup_logging(path: str = ".logging_config.yaml") -> None:
|
||||
with open(path) as f:
|
||||
try:
|
||||
config = yaml.safe_load(f.read())
|
||||
|
||||
if "custom_levels" in config:
|
||||
for level_name, level_num in config["custom_levels"].items():
|
||||
add_logging_level(level_name, level_num)
|
||||
|
||||
logging.config.dictConfig(config)
|
||||
except (AttributeError, yaml.YAMLError) as e:
|
||||
logging.warning(f"Could not load logging configuration: {e}")
|
||||
config = {}
|
||||
|
||||
if "custom_levels" in config:
|
||||
for level_name, level_num in config["custom_levels"].items():
|
||||
add_logging_level(level_name, level_num)
|
||||
|
||||
if config.get("handlers") is not None and config.get("handlers").get("ui"):
|
||||
pub_socket = zmq.Context.instance().socket(zmq.PUB)
|
||||
pub_socket.connect(settings.zmq_settings.internal_pub_address)
|
||||
config["handlers"]["ui"]["interface_or_socket"] = pub_socket
|
||||
logging.config.dictConfig(config)
|
||||
|
||||
else:
|
||||
logging.warning("Logging config file not found. Using default logging configuration.")
|
||||
|
||||
@@ -48,6 +48,7 @@ async def lifespan(app: FastAPI):
|
||||
# --- APPLICATION STARTUP ---
|
||||
setup_logging()
|
||||
logger.info("%s is starting up.", app.title)
|
||||
logger.warning("testing extra", extra={"extra1": "one", "extra2": "two"})
|
||||
|
||||
# Initiate sockets
|
||||
proxy_thread = threading.Thread(target=setup_sockets)
|
||||
@@ -67,8 +68,8 @@ async def lifespan(app: FastAPI):
|
||||
RICommunicationAgent,
|
||||
{
|
||||
"name": settings.agent_settings.ri_communication_agent_name,
|
||||
"jid": f"{settings.agent_settings.ri_communication_agent_name}\
|
||||
@{settings.agent_settings.host}",
|
||||
"jid": f"{settings.agent_settings.ri_communication_agent_name}"
|
||||
f"@{settings.agent_settings.host}",
|
||||
"password": settings.agent_settings.ri_communication_agent_name,
|
||||
"address": "tcp://*:5555",
|
||||
"bind": True,
|
||||
@@ -86,8 +87,8 @@ async def lifespan(app: FastAPI):
|
||||
BDICoreAgent,
|
||||
{
|
||||
"name": settings.agent_settings.bdi_core_agent_name,
|
||||
"jid": f"{settings.agent_settings.bdi_core_agent_name}@\
|
||||
{settings.agent_settings.host}",
|
||||
"jid": f"{settings.agent_settings.bdi_core_agent_name}@"
|
||||
f"{settings.agent_settings.host}",
|
||||
"password": settings.agent_settings.bdi_core_agent_name,
|
||||
"asl": "src/control_backend/agents/bdi/rules.asl",
|
||||
},
|
||||
@@ -96,8 +97,8 @@ async def lifespan(app: FastAPI):
|
||||
BeliefCollectorAgent,
|
||||
{
|
||||
"name": settings.agent_settings.belief_collector_agent_name,
|
||||
"jid": f"{settings.agent_settings.belief_collector_agent_name}@\
|
||||
{settings.agent_settings.host}",
|
||||
"jid": f"{settings.agent_settings.belief_collector_agent_name}@"
|
||||
f"{settings.agent_settings.host}",
|
||||
"password": settings.agent_settings.belief_collector_agent_name,
|
||||
},
|
||||
),
|
||||
@@ -105,8 +106,8 @@ async def lifespan(app: FastAPI):
|
||||
TBeliefExtractorAgent,
|
||||
{
|
||||
"name": settings.agent_settings.text_belief_extractor_agent_name,
|
||||
"jid": f"{settings.agent_settings.text_belief_extractor_agent_name}@\
|
||||
{settings.agent_settings.host}",
|
||||
"jid": f"{settings.agent_settings.text_belief_extractor_agent_name}@"
|
||||
f"{settings.agent_settings.host}",
|
||||
"password": settings.agent_settings.text_belief_extractor_agent_name,
|
||||
},
|
||||
),
|
||||
|
||||
Reference in New Issue
Block a user