425 lines
18 KiB
Python
425 lines
18 KiB
Python
"""
|
|
This program has been developed by students from the bachelor Computer Science at Utrecht
|
|
University within the Software Project course.
|
|
© Copyright Utrecht University (Department of Information and Computing Sciences)
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
|
|
import zmq
|
|
from zmq.asyncio import Context
|
|
|
|
from control_backend.agents import BaseAgent
|
|
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
|
|
from control_backend.core.agent_system import InternalMessage
|
|
from control_backend.core.config import settings
|
|
from control_backend.schemas.belief_message import Belief, BeliefMessage
|
|
from control_backend.schemas.program import ConditionalNorm, Goal, Program
|
|
from control_backend.schemas.ri_message import (
|
|
GestureCommand,
|
|
RIEndpoint,
|
|
SpeechCommand,
|
|
)
|
|
|
|
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
|
|
|
|
|
|
class UserInterruptAgent(BaseAgent):
|
|
"""
|
|
User Interrupt Agent.
|
|
|
|
This agent receives button_pressed events from the external HTTP API
|
|
(via ZMQ) and uses the associated context to trigger one of the following actions:
|
|
|
|
- Send a prioritized message to the `RobotSpeechAgent`
|
|
- Send a prioritized gesture to the `RobotGestureAgent`
|
|
- Send a belief override to the `BDI Core` in order to activate a
|
|
trigger/conditional norm or complete a goal.
|
|
|
|
Prioritized actions clear the current RI queue before inserting the new item,
|
|
ensuring they are executed immediately after Pepper's current action has been fulfilled.
|
|
|
|
:ivar sub_socket: The ZMQ SUB socket used to receive user interrupts.
|
|
"""
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.sub_socket = None
|
|
self.pub_socket = None
|
|
self._trigger_map = {}
|
|
self._trigger_reverse_map = {}
|
|
|
|
self._goal_map = {} # id -> sluggified goal
|
|
self._goal_reverse_map = {} # sluggified goal -> id
|
|
|
|
self._cond_norm_map = {} # id -> sluggified cond norm
|
|
self._cond_norm_reverse_map = {} # sluggified cond norm -> id
|
|
|
|
async def setup(self):
|
|
"""
|
|
Initialize the agent by setting up ZMQ sockets for receiving button events and
|
|
publishing updates.
|
|
"""
|
|
context = Context.instance()
|
|
|
|
self.sub_socket = context.socket(zmq.SUB)
|
|
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
|
|
self.sub_socket.subscribe("button_pressed")
|
|
|
|
self.pub_socket = context.socket(zmq.PUB)
|
|
self.pub_socket.connect(settings.zmq_settings.internal_pub_address)
|
|
|
|
self.add_behavior(self._receive_button_event())
|
|
|
|
async def _receive_button_event(self):
|
|
"""
|
|
Main loop to receive and process button press events from the UI.
|
|
|
|
Handles different event types:
|
|
- `speech`: Triggers immediate robot speech.
|
|
- `gesture`: Triggers an immediate robot gesture.
|
|
- `override`: Forces a belief, trigger, or goal completion in the BDI core.
|
|
- `override_unachieve`: Removes a belief from the BDI core.
|
|
- `pause`: Toggles the system's pause state.
|
|
- `next_phase` / `reset_phase`: Controls experiment flow.
|
|
"""
|
|
while True:
|
|
topic, body = await self.sub_socket.recv_multipart()
|
|
|
|
try:
|
|
event_data = json.loads(body)
|
|
event_type = event_data.get("type") # e.g., "speech", "gesture"
|
|
event_context = event_data.get("context") # e.g., "Hello, I am Pepper!"
|
|
except json.JSONDecodeError:
|
|
self.logger.error("Received invalid JSON payload on topic %s", topic)
|
|
continue
|
|
|
|
self.logger.debug("Received event type %s", event_type)
|
|
|
|
match event_type:
|
|
case "speech":
|
|
await self._send_to_speech_agent(event_context)
|
|
self.logger.info(
|
|
"Forwarded button press (speech) with context '%s' to RobotSpeechAgent.",
|
|
event_context,
|
|
)
|
|
case "gesture":
|
|
await self._send_to_gesture_agent(event_context)
|
|
self.logger.info(
|
|
"Forwarded button press (gesture) with context '%s' to RobotGestureAgent.",
|
|
event_context,
|
|
)
|
|
case "override":
|
|
ui_id = str(event_context)
|
|
if asl_trigger := self._trigger_map.get(ui_id):
|
|
await self._send_to_bdi("force_trigger", asl_trigger)
|
|
self.logger.info(
|
|
"Forwarded button press (override) with context '%s' to BDI Core.",
|
|
event_context,
|
|
)
|
|
elif asl_cond_norm := self._cond_norm_map.get(ui_id):
|
|
await self._send_to_bdi_belief(asl_cond_norm, "cond_norm")
|
|
self.logger.info(
|
|
"Forwarded button press (override) with context '%s' to BDI Core.",
|
|
event_context,
|
|
)
|
|
elif asl_goal := self._goal_map.get(ui_id):
|
|
await self._send_to_bdi_belief(asl_goal, "goal")
|
|
self.logger.info(
|
|
"Forwarded button press (override) with context '%s' to BDI Core.",
|
|
event_context,
|
|
)
|
|
# Send achieve_goal to program manager to update semantic belief extractor
|
|
goal_achieve_msg = InternalMessage(
|
|
to=settings.agent_settings.bdi_program_manager_name,
|
|
thread="achieve_goal",
|
|
body=ui_id,
|
|
)
|
|
|
|
await self.send(goal_achieve_msg)
|
|
else:
|
|
self.logger.warning("Could not determine which element to override.")
|
|
case "override_unachieve":
|
|
ui_id = str(event_context)
|
|
if asl_cond_norm := self._cond_norm_map.get(ui_id):
|
|
await self._send_to_bdi_belief(asl_cond_norm, "cond_norm", True)
|
|
self.logger.info(
|
|
"Forwarded button press (override_unachieve)"
|
|
"with context '%s' to BDI Core.",
|
|
event_context,
|
|
)
|
|
else:
|
|
self.logger.warning(
|
|
"Could not determine which conditional norm to unachieve."
|
|
)
|
|
|
|
case "pause":
|
|
self.logger.debug(
|
|
"Received pause/resume button press with context '%s'.", event_context
|
|
)
|
|
await self._send_pause_command(event_context)
|
|
if event_context:
|
|
self.logger.info("Sent pause command.")
|
|
else:
|
|
self.logger.info("Sent resume command.")
|
|
|
|
case "next_phase" | "reset_phase":
|
|
await self._send_experiment_control_to_bdi_core(event_type)
|
|
case _:
|
|
self.logger.warning(
|
|
"Received button press with unknown type '%s' (context: '%s').",
|
|
event_type,
|
|
event_context,
|
|
)
|
|
|
|
async def handle_message(self, msg: InternalMessage):
|
|
"""
|
|
Handles internal messages from other agents, such as program updates or trigger
|
|
notifications.
|
|
|
|
:param msg: The incoming :class:`~control_backend.core.agent_system.InternalMessage`.
|
|
"""
|
|
match msg.thread:
|
|
case "new_program":
|
|
self._create_mapping(msg.body)
|
|
case "trigger_start":
|
|
# msg.body is the sluggified trigger
|
|
asl_slug = msg.body
|
|
ui_id = self._trigger_reverse_map.get(asl_slug)
|
|
|
|
if ui_id:
|
|
payload = {"type": "trigger_update", "id": ui_id, "achieved": True}
|
|
await self._send_experiment_update(payload)
|
|
self.logger.info(f"UI Update: Trigger {asl_slug} started (ID: {ui_id})")
|
|
case "trigger_end":
|
|
asl_slug = msg.body
|
|
ui_id = self._trigger_reverse_map.get(asl_slug)
|
|
if ui_id:
|
|
payload = {"type": "trigger_update", "id": ui_id, "achieved": False}
|
|
await self._send_experiment_update(payload)
|
|
self.logger.info(f"UI Update: Trigger {asl_slug} ended (ID: {ui_id})")
|
|
case "transition_phase":
|
|
new_phase_id = msg.body
|
|
self.logger.info(f"Phase transition detected: {new_phase_id}")
|
|
|
|
payload = {"type": "phase_update", "id": new_phase_id}
|
|
|
|
await self._send_experiment_update(payload)
|
|
case "goal_start":
|
|
goal_name = msg.body
|
|
ui_id = self._goal_reverse_map.get(goal_name)
|
|
if ui_id:
|
|
payload = {"type": "goal_update", "id": ui_id, "active": True}
|
|
await self._send_experiment_update(payload)
|
|
self.logger.info(f"UI Update: Goal {goal_name} started (ID: {ui_id})")
|
|
case "active_norms_update":
|
|
active_norms_asl = [
|
|
s.strip("() '\",") for s in msg.body.split(",") if s.strip("() '\",")
|
|
]
|
|
await self._broadcast_cond_norms(active_norms_asl)
|
|
case _:
|
|
self.logger.debug(f"Received internal message on unhandled thread: {msg.thread}")
|
|
|
|
async def _broadcast_cond_norms(self, active_slugs: list[str]):
|
|
"""
|
|
Broadcasts the current activation state of all conditional norms to the UI.
|
|
|
|
:param active_slugs: A list of sluggified norm names currently active in the BDI core.
|
|
"""
|
|
updates = []
|
|
for asl_slug, ui_id in self._cond_norm_reverse_map.items():
|
|
is_active = asl_slug in active_slugs
|
|
updates.append({"id": ui_id, "active": is_active})
|
|
|
|
payload = {"type": "cond_norms_state_update", "norms": updates}
|
|
|
|
if self.pub_socket:
|
|
topic = b"status"
|
|
body = json.dumps(payload).encode("utf-8")
|
|
await self.pub_socket.send_multipart([topic, body])
|
|
# self.logger.info(f"UI Update: Active norms {updates}")
|
|
|
|
def _create_mapping(self, program_json: str):
|
|
"""
|
|
Creates a bidirectional mapping between UI identifiers and AgentSpeak slugs.
|
|
|
|
:param program_json: The JSON representation of the behavioral program.
|
|
"""
|
|
try:
|
|
program = Program.model_validate_json(program_json)
|
|
self._trigger_map = {}
|
|
self._trigger_reverse_map = {}
|
|
self._goal_map = {}
|
|
self._cond_norm_map = {}
|
|
self._cond_norm_reverse_map = {}
|
|
|
|
def _register_goal(goal: Goal):
|
|
"""Recursively register goals and their subgoals."""
|
|
slug = AgentSpeakGenerator.slugify(goal)
|
|
self._goal_map[str(goal.id)] = slug
|
|
self._goal_reverse_map[slug] = str(goal.id)
|
|
|
|
for step in goal.plan.steps:
|
|
if isinstance(step, Goal):
|
|
_register_goal(step)
|
|
|
|
for phase in program.phases:
|
|
for trigger in phase.triggers:
|
|
slug = AgentSpeakGenerator.slugify(trigger)
|
|
self._trigger_map[str(trigger.id)] = slug
|
|
self._trigger_reverse_map[slug] = str(trigger.id)
|
|
|
|
for goal in phase.goals:
|
|
_register_goal(goal)
|
|
|
|
for goal, id in self._goal_reverse_map.items():
|
|
self.logger.debug(f"Goal mapping: UI ID {goal} -> {id}")
|
|
|
|
for norm in phase.norms:
|
|
if isinstance(norm, ConditionalNorm):
|
|
asl_slug = AgentSpeakGenerator.slugify(norm)
|
|
|
|
norm_id = str(norm.id)
|
|
|
|
self._cond_norm_map[norm_id] = asl_slug
|
|
self._cond_norm_reverse_map[norm.norm] = norm_id
|
|
self.logger.debug("Added conditional norm %s", asl_slug)
|
|
|
|
self.logger.info(
|
|
f"Mapped {len(self._trigger_map)} triggers and {len(self._goal_map)} goals "
|
|
f"and {len(self._cond_norm_map)} conditional norms for UserInterruptAgent."
|
|
)
|
|
except Exception as e:
|
|
self.logger.error(f"Mapping failed: {e}")
|
|
|
|
async def _send_experiment_update(self, data, should_log: bool = True):
|
|
"""
|
|
Publishes an experiment state update to the internal ZMQ bus for the UI.
|
|
|
|
:param data: The update payload.
|
|
:param should_log: Whether to log the update.
|
|
"""
|
|
if self.pub_socket:
|
|
topic = b"experiment"
|
|
body = json.dumps(data).encode("utf-8")
|
|
await self.pub_socket.send_multipart([topic, body])
|
|
if should_log:
|
|
self.logger.debug(f"Sent experiment update: {data}")
|
|
|
|
async def _send_to_speech_agent(self, text_to_say: str):
|
|
"""
|
|
method to send prioritized speech command to RobotSpeechAgent.
|
|
|
|
:param text_to_say: The string that the robot has to say.
|
|
"""
|
|
experiment_logger.chat(text_to_say, extra={"role": "assistant"})
|
|
cmd = SpeechCommand(data=text_to_say, is_priority=True)
|
|
out_msg = InternalMessage(
|
|
to=settings.agent_settings.robot_speech_name,
|
|
sender=self.name,
|
|
body=cmd.model_dump_json(),
|
|
)
|
|
await self.send(out_msg)
|
|
|
|
async def _send_to_gesture_agent(self, single_gesture_name: str):
|
|
"""
|
|
method to send prioritized gesture command to RobotGestureAgent.
|
|
|
|
:param single_gesture_name: The gesture tag that the robot has to perform.
|
|
"""
|
|
# the endpoint is set to always be GESTURE_SINGLE for user interrupts
|
|
cmd = GestureCommand(
|
|
endpoint=RIEndpoint.GESTURE_SINGLE, data=single_gesture_name, is_priority=True
|
|
)
|
|
out_msg = InternalMessage(
|
|
to=settings.agent_settings.robot_gesture_name,
|
|
sender=self.name,
|
|
body=cmd.model_dump_json(),
|
|
)
|
|
await self.send(out_msg)
|
|
|
|
async def _send_to_bdi(self, thread: str, body: str):
|
|
"""Send slug of trigger to BDI"""
|
|
msg = InternalMessage(to=settings.agent_settings.bdi_core_name, thread=thread, body=body)
|
|
await self.send(msg)
|
|
self.logger.info(f"Directly forced {thread} in BDI: {body}")
|
|
|
|
async def _send_to_bdi_belief(self, asl: str, asl_type: str, unachieve: bool = False):
|
|
"""Send belief to BDI Core"""
|
|
if asl_type == "goal":
|
|
belief_name = f"achieved_{asl}"
|
|
elif asl_type == "cond_norm":
|
|
belief_name = f"force_{asl}"
|
|
else:
|
|
self.logger.warning("Tried to send belief with unknown type")
|
|
return
|
|
belief = Belief(name=belief_name, arguments=None)
|
|
self.logger.debug(f"Sending belief to BDI Core: {belief_name}")
|
|
# Conditional norms are unachieved by removing the belief
|
|
belief_message = (
|
|
BeliefMessage(delete=[belief]) if unachieve else BeliefMessage(create=[belief])
|
|
)
|
|
msg = InternalMessage(
|
|
to=settings.agent_settings.bdi_core_name,
|
|
thread="beliefs",
|
|
body=belief_message.model_dump_json(),
|
|
)
|
|
await self.send(msg)
|
|
|
|
async def _send_experiment_control_to_bdi_core(self, type):
|
|
"""
|
|
method to send experiment control buttons to bdi core.
|
|
|
|
:param type: the type of control button we should send to the bdi core.
|
|
"""
|
|
# Switch which thread we should send to bdi core
|
|
thread = ""
|
|
match type:
|
|
case "next_phase":
|
|
thread = "force_next_phase"
|
|
case "reset_phase":
|
|
thread = "reset_current_phase"
|
|
case "reset_experiment":
|
|
thread = "reset_experiment"
|
|
case _:
|
|
self.logger.warning(
|
|
"Received unknown experiment control type '%s' to send to BDI Core.",
|
|
type,
|
|
)
|
|
|
|
out_msg = InternalMessage(
|
|
to=settings.agent_settings.bdi_core_name,
|
|
sender=self.name,
|
|
thread=thread,
|
|
body="",
|
|
)
|
|
self.logger.debug("Sending experiment control '%s' to BDI Core.", thread)
|
|
await self.send(out_msg)
|
|
|
|
async def _send_pause_command(self, pause: str):
|
|
"""
|
|
Send a pause command to the other internal agents; for now just VAD and VED agent.
|
|
"""
|
|
if pause == "true":
|
|
# Send pause to VAD and VED agent
|
|
vad_message = InternalMessage(
|
|
to=[settings.agent_settings.vad_name,
|
|
settings.agent_settings.visual_emotion_recognition_name],
|
|
sender=self.name,
|
|
body="PAUSE",
|
|
)
|
|
await self.send(vad_message)
|
|
# Voice Activity Detection and Visual Emotion Recognition agents
|
|
self.logger.info("Sent pause command to VAD and VED agents.")
|
|
else:
|
|
# Send resume to VAD and VED agents
|
|
vad_message = InternalMessage(
|
|
to=[settings.agent_settings.vad_name,
|
|
settings.agent_settings.visual_emotion_recognition_name],
|
|
sender=self.name,
|
|
body="RESUME",
|
|
)
|
|
await self.send(vad_message)
|
|
# Voice Activity Detection and Visual Emotion Recognition agents
|
|
self.logger.info("Sent resume command to VAD and VED agents.") |