feat: added a functionality for monitoring page
ref: N25B-400
This commit is contained in:
@@ -5,8 +5,10 @@ import zmq
|
||||
from zmq.asyncio import Context
|
||||
|
||||
from control_backend.agents import BaseAgent
|
||||
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
|
||||
from control_backend.core.agent_system import InternalMessage
|
||||
from control_backend.core.config import settings
|
||||
from control_backend.schemas.program import Program
|
||||
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand
|
||||
|
||||
|
||||
@@ -32,6 +34,16 @@ class UserInterruptAgent(BaseAgent):
|
||||
super().__init__(**kwargs)
|
||||
self.sub_socket = None
|
||||
self.pub_socket = None
|
||||
self._trigger_map = {}
|
||||
self._trigger_reverse_map = {}
|
||||
|
||||
self._goal_map = {}
|
||||
self._goal_reverse_map = {}
|
||||
|
||||
self._cond_norm_map = {}
|
||||
self._cond_norm_reverse_map = {}
|
||||
|
||||
self._belief_condition_map = {}
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
@@ -87,11 +99,19 @@ class UserInterruptAgent(BaseAgent):
|
||||
event_context,
|
||||
)
|
||||
elif event_type == "override":
|
||||
await self._send_to_program_manager(event_context)
|
||||
self.logger.info(
|
||||
"Forwarded button press (override) with context '%s' to BDIProgramManager.",
|
||||
event_context,
|
||||
)
|
||||
ui_id = str(event_context)
|
||||
if asl_trigger := self._trigger_map.get(ui_id):
|
||||
await self._send_to_bdi("force_trigger", asl_trigger)
|
||||
self.logger.info(
|
||||
"Forwarded button press (override) with context '%s' to BDI Core.",
|
||||
event_context,
|
||||
)
|
||||
else:
|
||||
await self._send_to_program_manager(event_context)
|
||||
self.logger.info(
|
||||
"Forwarded button press (override) with context '%s' to BDIProgramManager.",
|
||||
event_context,
|
||||
)
|
||||
else:
|
||||
self.logger.warning(
|
||||
"Received button press with unknown type '%s' (context: '%s').",
|
||||
@@ -104,6 +124,26 @@ class UserInterruptAgent(BaseAgent):
|
||||
Handle commands received from other internal Python agents.
|
||||
"""
|
||||
match msg.thread:
|
||||
case "new_program":
|
||||
self._create_mapping(msg.body)
|
||||
case "trigger_start":
|
||||
# msg.body is the sluggified trigger
|
||||
asl_slug = msg.body
|
||||
ui_id = self._trigger_reverse_map.get(asl_slug)
|
||||
|
||||
if ui_id:
|
||||
payload = {"type": "trigger_update", "id": ui_id, "achieved": True}
|
||||
await self._send_experiment_update(payload)
|
||||
self.logger.info(f"UI Update: Trigger {asl_slug} started (ID: {ui_id})")
|
||||
|
||||
case "trigger_end":
|
||||
asl_slug = msg.body
|
||||
ui_id = self._trigger_reverse_map.get(asl_slug)
|
||||
|
||||
if ui_id:
|
||||
payload = {"type": "trigger_update", "id": ui_id, "achieved": False}
|
||||
await self._send_experiment_update(payload)
|
||||
self.logger.info(f"UI Update: Trigger {asl_slug} ended (ID: {ui_id})")
|
||||
case "transition_phase":
|
||||
new_phase_id = msg.body
|
||||
self.logger.info(f"Phase transition detected: {new_phase_id}")
|
||||
@@ -111,6 +151,10 @@ class UserInterruptAgent(BaseAgent):
|
||||
payload = {"type": "phase_update", "phase_id": new_phase_id}
|
||||
|
||||
await self._send_experiment_update(payload)
|
||||
case "active_norms_update":
|
||||
asl_slugs = [s.strip() for s in msg.body.split(";")]
|
||||
|
||||
await self._broadcast_cond_norms(asl_slugs)
|
||||
|
||||
case _:
|
||||
self.logger.debug(f"Received internal message on unhandled thread: {msg.thread}")
|
||||
@@ -132,6 +176,54 @@ class UserInterruptAgent(BaseAgent):
|
||||
|
||||
await asyncio.sleep(2)
|
||||
|
||||
async def _broadcast_cond_norms(self, active_slugs: list[str]):
|
||||
"""
|
||||
Sends the current state of all conditional norms to the UI.
|
||||
:param active_slugs: A list of slugs (strings) currently active in the BDI core.
|
||||
"""
|
||||
updates = []
|
||||
|
||||
for asl_slug, ui_id in self._cond_norm_reverse_map.items():
|
||||
is_active = asl_slug in active_slugs
|
||||
updates.append({"id": ui_id, "name": asl_slug, "active": is_active})
|
||||
|
||||
payload = {"type": "cond_norms_state_update", "norms": updates}
|
||||
|
||||
await self._send_experiment_update(payload)
|
||||
self.logger.debug(f"Broadcasted state for {len(updates)} conditional norms.")
|
||||
|
||||
def _create_mapping(self, program_json: str):
|
||||
try:
|
||||
program = Program.model_validate_json(program_json)
|
||||
self._trigger_map = {}
|
||||
self._trigger_reverse_map = {}
|
||||
self._goal_map = {}
|
||||
self._cond_norm_map = {}
|
||||
self._cond_norm_reverse_map = {}
|
||||
|
||||
for phase in program.phases:
|
||||
for trigger in phase.triggers:
|
||||
slug = AgentSpeakGenerator.slugify(trigger)
|
||||
self._trigger_map[str(trigger.id)] = slug
|
||||
self._trigger_reverse_map[slug] = str(trigger.id)
|
||||
|
||||
for goal in phase.goals:
|
||||
self._goal_map[str(goal.id)] = AgentSpeakGenerator.slugify(goal)
|
||||
|
||||
for norm in phase.conditional_norms:
|
||||
if norm.condition:
|
||||
asl_slug = AgentSpeakGenerator.slugify(norm)
|
||||
belief_id = str(norm.condition)
|
||||
|
||||
self._cond_norm_map[belief_id] = asl_slug
|
||||
self._cond_norm_reverse_map[asl_slug] = belief_id
|
||||
|
||||
self.logger.info(
|
||||
f"Mapped {len(self._trigger_map)} triggers and {len(self._goal_map)} goals."
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Mapping failed: {e}")
|
||||
|
||||
async def _send_experiment_update(self, data):
|
||||
"""
|
||||
Sends an update to the 'experiment' topic.
|
||||
@@ -194,3 +286,11 @@ class UserInterruptAgent(BaseAgent):
|
||||
"Sent button_override belief with id '%s' to Program manager.",
|
||||
belief_id,
|
||||
)
|
||||
|
||||
async def _send_to_bdi(self, thread: str, body: str):
|
||||
"""Send slug of trigger to BDI"""
|
||||
msg = InternalMessage(
|
||||
to=settings.agent_settings.bdi_core_name, sender=self.name, thread=thread, body=body
|
||||
)
|
||||
await self.send(msg)
|
||||
self.logger.info(f"Directly forced {thread} in BDI: {body}")
|
||||
|
||||
Reference in New Issue
Block a user