Compare commits
3 Commits
test/incre
...
feat/monit
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7c10c50336 | ||
|
|
6d03ba8a41 | ||
|
|
041fc4ab6e |
@@ -424,6 +424,16 @@ class AgentSpeakGenerator:
|
||||
)
|
||||
)
|
||||
|
||||
# Force phase transition fallback
|
||||
self._asp.plans.append(
|
||||
AstPlan(
|
||||
TriggerType.ADDED_GOAL,
|
||||
AstLiteral("force_transition_phase"),
|
||||
[],
|
||||
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
|
||||
)
|
||||
)
|
||||
|
||||
@singledispatchmethod
|
||||
def _astify(self, element: ProgramElement) -> AstExpression:
|
||||
raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.")
|
||||
|
||||
@@ -26,7 +26,7 @@ class UserInterruptAgent(BaseAgent):
|
||||
|
||||
- Send a prioritized message to the `RobotSpeechAgent`
|
||||
- Send a prioritized gesture to the `RobotGestureAgent`
|
||||
- Send a belief override to the `BDIProgramManager`in order to activate a
|
||||
- Send a belief override to the `BDI Core` in order to activate a
|
||||
trigger/conditional norm or complete a goal.
|
||||
|
||||
Prioritized actions clear the current RI queue before inserting the new item,
|
||||
@@ -75,10 +75,11 @@ class UserInterruptAgent(BaseAgent):
|
||||
These are the different types and contexts:
|
||||
- type: "speech", context: string that the robot has to say.
|
||||
- type: "gesture", context: single gesture name that the robot has to perform.
|
||||
- type: "override", context: belief_id that overrides the goal/trigger/conditional norm.
|
||||
- type: "override", context: id that belongs to the goal/trigger/conditional norm.
|
||||
- type: "override_unachieve", context: id that belongs to the conditional norm to unachieve.
|
||||
- type: "next_phase", context: None, indicates to the BDI Core to
|
||||
- type: "pause", context: boolean indicating whether to pause
|
||||
- type: "reset_phase", context: None, indicates to the BDI Core to
|
||||
- type: "reset_experiment", context: None, indicates to the BDI Core to
|
||||
"""
|
||||
while True:
|
||||
topic, body = await self.sub_socket.recv_multipart()
|
||||
@@ -93,19 +94,20 @@ class UserInterruptAgent(BaseAgent):
|
||||
|
||||
self.logger.debug("Received event type %s", event_type)
|
||||
|
||||
if event_type == "speech":
|
||||
match event_type:
|
||||
case "speech":
|
||||
await self._send_to_speech_agent(event_context)
|
||||
self.logger.info(
|
||||
"Forwarded button press (speech) with context '%s' to RobotSpeechAgent.",
|
||||
event_context,
|
||||
)
|
||||
elif event_type == "gesture":
|
||||
case "gesture":
|
||||
await self._send_to_gesture_agent(event_context)
|
||||
self.logger.info(
|
||||
"Forwarded button press (gesture) with context '%s' to RobotGestureAgent.",
|
||||
event_context,
|
||||
)
|
||||
elif event_type == "override":
|
||||
case "override":
|
||||
ui_id = str(event_context)
|
||||
if asl_trigger := self._trigger_map.get(ui_id):
|
||||
await self._send_to_bdi("force_trigger", asl_trigger)
|
||||
@@ -114,18 +116,18 @@ class UserInterruptAgent(BaseAgent):
|
||||
event_context,
|
||||
)
|
||||
elif asl_cond_norm := self._cond_norm_map.get(ui_id):
|
||||
await self._send_to_bdi("force_norm", asl_cond_norm)
|
||||
self.logger.info(
|
||||
"Forwarded button press (override) with context '%s' to BDIProgramManager.",
|
||||
event_context,
|
||||
)
|
||||
elif asl_goal := self._goal_map.get(ui_id):
|
||||
await self._send_to_bdi_belief(asl_goal)
|
||||
await self._send_to_bdi_belief(asl_cond_norm, "cond_norm")
|
||||
self.logger.info(
|
||||
"Forwarded button press (override) with context '%s' to BDI Core.",
|
||||
event_context,
|
||||
)
|
||||
|
||||
elif asl_goal := self._goal_map.get(ui_id):
|
||||
await self._send_to_bdi_belief(asl_goal, "goal")
|
||||
self.logger.info(
|
||||
"Forwarded button press (override) with context '%s' to BDI Core.",
|
||||
event_context,
|
||||
)
|
||||
# Send achieve_goal to program manager to update semantic belief extractor
|
||||
goal_achieve_msg = InternalMessage(
|
||||
to=settings.agent_settings.bdi_program_manager_name,
|
||||
thread="achieve_goal",
|
||||
@@ -135,8 +137,21 @@ class UserInterruptAgent(BaseAgent):
|
||||
await self.send(goal_achieve_msg)
|
||||
else:
|
||||
self.logger.warning("Could not determine which element to override.")
|
||||
case "override_unachieve":
|
||||
ui_id = str(event_context)
|
||||
if asl_cond_norm := self._cond_norm_map.get(ui_id):
|
||||
await self._send_to_bdi_belief(asl_cond_norm, "cond_norm", True)
|
||||
self.logger.info(
|
||||
"Forwarded button press (override_unachieve)"
|
||||
"with context '%s' to BDI Core.",
|
||||
event_context,
|
||||
)
|
||||
else:
|
||||
self.logger.warning(
|
||||
"Could not determine which conditional norm to unachieve."
|
||||
)
|
||||
|
||||
elif event_type == "pause":
|
||||
case "pause":
|
||||
self.logger.debug(
|
||||
"Received pause/resume button press with context '%s'.", event_context
|
||||
)
|
||||
@@ -146,9 +161,9 @@ class UserInterruptAgent(BaseAgent):
|
||||
else:
|
||||
self.logger.info("Sent resume command.")
|
||||
|
||||
elif event_type in ["next_phase", "reset_phase", "reset_experiment"]:
|
||||
case "next_phase" | "reset_phase":
|
||||
await self._send_experiment_control_to_bdi_core(event_type)
|
||||
else:
|
||||
case _:
|
||||
self.logger.warning(
|
||||
"Received button press with unknown type '%s' (context: '%s').",
|
||||
event_type,
|
||||
@@ -171,11 +186,9 @@ class UserInterruptAgent(BaseAgent):
|
||||
payload = {"type": "trigger_update", "id": ui_id, "achieved": True}
|
||||
await self._send_experiment_update(payload)
|
||||
self.logger.info(f"UI Update: Trigger {asl_slug} started (ID: {ui_id})")
|
||||
|
||||
case "trigger_end":
|
||||
asl_slug = msg.body
|
||||
ui_id = self._trigger_reverse_map.get(asl_slug)
|
||||
|
||||
if ui_id:
|
||||
payload = {"type": "trigger_update", "id": ui_id, "achieved": False}
|
||||
await self._send_experiment_update(payload)
|
||||
@@ -191,13 +204,14 @@ class UserInterruptAgent(BaseAgent):
|
||||
goal_name = msg.body
|
||||
ui_id = self._goal_reverse_map.get(goal_name)
|
||||
if ui_id:
|
||||
payload = {"type": "goal_update", "id": ui_id, "active": True}
|
||||
payload = {"type": "goal_update", "id": ui_id}
|
||||
await self._send_experiment_update(payload)
|
||||
self.logger.info(f"UI Update: Goal {goal_name} started (ID: {ui_id})")
|
||||
case "active_norms_update":
|
||||
norm_list = [s.strip("() '\",") for s in msg.body.split(",") if s.strip("() '\",")]
|
||||
|
||||
await self._broadcast_cond_norms(norm_list)
|
||||
active_norms_asl = [
|
||||
s.strip("() '\",") for s in msg.body.split(",") if s.strip("() '\",")
|
||||
]
|
||||
await self._broadcast_cond_norms(active_norms_asl)
|
||||
case _:
|
||||
self.logger.debug(f"Received internal message on unhandled thread: {msg.thread}")
|
||||
|
||||
@@ -207,15 +221,17 @@ class UserInterruptAgent(BaseAgent):
|
||||
:param active_slugs: A list of slugs (strings) currently active in the BDI core.
|
||||
"""
|
||||
updates = []
|
||||
|
||||
for asl_slug, ui_id in self._cond_norm_reverse_map.items():
|
||||
is_active = asl_slug in active_slugs
|
||||
updates.append({"id": ui_id, "name": asl_slug, "active": is_active})
|
||||
updates.append({"id": ui_id, "active": is_active})
|
||||
|
||||
payload = {"type": "cond_norms_state_update", "norms": updates}
|
||||
|
||||
await self._send_experiment_update(payload, should_log=False)
|
||||
# self.logger.debug(f"Broadcasted state for {len(updates)} conditional norms.")
|
||||
if self.pub_socket:
|
||||
topic = b"status"
|
||||
body = json.dumps(payload).encode("utf-8")
|
||||
await self.pub_socket.send_multipart([topic, body])
|
||||
# self.logger.info(f"UI Update: Active norms {updates}")
|
||||
|
||||
def _create_mapping(self, program_json: str):
|
||||
"""
|
||||
@@ -308,12 +324,20 @@ class UserInterruptAgent(BaseAgent):
|
||||
await self.send(msg)
|
||||
self.logger.info(f"Directly forced {thread} in BDI: {body}")
|
||||
|
||||
async def _send_to_bdi_belief(self, asl_goal: str):
|
||||
async def _send_to_bdi_belief(self, asl: str, asl_type: str, unachieve: bool = False):
|
||||
"""Send belief to BDI Core"""
|
||||
belief_name = f"achieved_{asl_goal}"
|
||||
if asl_type == "goal":
|
||||
belief_name = f"achieved_{asl}"
|
||||
elif asl_type == "cond_norm":
|
||||
belief_name = f"force_{asl}"
|
||||
else:
|
||||
self.logger.warning("Tried to send belief with unknown type")
|
||||
belief = Belief(name=belief_name, arguments=None)
|
||||
self.logger.debug(f"Sending belief to BDI Core: {belief_name}")
|
||||
belief_message = BeliefMessage(create=[belief])
|
||||
# Conditional norms are unachieved by removing the belief
|
||||
belief_message = (
|
||||
BeliefMessage(delete=[belief]) if unachieve else BeliefMessage(create=[belief])
|
||||
)
|
||||
msg = InternalMessage(
|
||||
to=settings.agent_settings.bdi_core_name,
|
||||
thread="beliefs",
|
||||
@@ -334,8 +358,6 @@ class UserInterruptAgent(BaseAgent):
|
||||
thread = "force_next_phase"
|
||||
case "reset_phase":
|
||||
thread = "reset_current_phase"
|
||||
case "reset_experiment":
|
||||
thread = "reset_experiment"
|
||||
case _:
|
||||
self.logger.warning(
|
||||
"Received unknown experiment control type '%s' to send to BDI Core.",
|
||||
|
||||
@@ -52,11 +52,11 @@ async def experiment_stream(request: Request):
|
||||
while True:
|
||||
# Check if client closed the tab
|
||||
if await request.is_disconnected():
|
||||
logger.info("Client disconnected from experiment stream.")
|
||||
logger.error("Client disconnected from experiment stream.")
|
||||
break
|
||||
|
||||
try:
|
||||
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=1.0)
|
||||
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=10.0)
|
||||
_, message = parts
|
||||
yield f"data: {message.decode().strip()}\n\n"
|
||||
except TimeoutError:
|
||||
@@ -65,3 +65,30 @@ async def experiment_stream(request: Request):
|
||||
socket.close()
|
||||
|
||||
return StreamingResponse(gen(), media_type="text/event-stream")
|
||||
|
||||
|
||||
@router.get("/status_stream")
|
||||
async def status_stream(request: Request):
|
||||
context = Context.instance()
|
||||
socket = context.socket(zmq.SUB)
|
||||
socket.connect(settings.zmq_settings.internal_sub_address)
|
||||
|
||||
socket.subscribe(b"status")
|
||||
|
||||
async def gen():
|
||||
try:
|
||||
while True:
|
||||
if await request.is_disconnected():
|
||||
break
|
||||
try:
|
||||
# Shorter timeout since this is frequent
|
||||
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=0.5)
|
||||
_, message = parts
|
||||
yield f"data: {message.decode().strip()}\n\n"
|
||||
except TimeoutError:
|
||||
yield ": ping\n\n" # Keep the connection alive
|
||||
continue
|
||||
finally:
|
||||
socket.close()
|
||||
|
||||
return StreamingResponse(gen(), media_type="text/event-stream")
|
||||
|
||||
Reference in New Issue
Block a user