Files
pepperplus-cb/src/control_backend/agents/bdi/bdi_program_manager.py
Twirre Meulenbelt 8cc177041a feat: add a second phase in test_program
ref: N25B-376
2025-12-16 15:12:22 +01:00

572 lines
19 KiB
Python

import zmq
from pydantic import ValidationError
from slugify import slugify
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.core.config import settings
from control_backend.schemas.program import (
Action,
BasicBelief,
BasicNorm,
Belief,
ConditionalNorm,
GestureAction,
Goal,
InferredBelief,
KeywordBelief,
LLMAction,
LogicalOperator,
Phase,
Plan,
Program,
ProgramElement,
SemanticBelief,
SpeechAction,
Trigger,
)
test_program = Program(
phases=[
Phase(
norms=[
BasicNorm(norm="Talk like a pirate"),
ConditionalNorm(
condition=InferredBelief(
left=KeywordBelief(keyword="Arr"),
right=SemanticBelief(description="testing", name="semantic belief"),
operator=LogicalOperator.OR,
name="Talking to a pirate",
),
norm="Use nautical terms",
),
ConditionalNorm(
condition=SemanticBelief(
description="We are talking to a child", name="talking to child"
),
norm="Do not use cuss words",
),
],
triggers=[
Trigger(
condition=InferredBelief(
left=KeywordBelief(keyword="key"),
right=InferredBelief(
left=KeywordBelief(keyword="key2"),
right=SemanticBelief(
description="Decode this", name="semantic belief 2"
),
operator=LogicalOperator.OR,
name="test trigger inferred inner",
),
operator=LogicalOperator.OR,
name="test trigger inferred outer",
),
plan=Plan(
steps=[
SpeechAction(text="Testing trigger"),
Goal(
name="Testing trigger",
plan=Plan(steps=[LLMAction(goal="Do something")]),
),
]
),
)
],
goals=[
Goal(
name="Determine user age",
plan=Plan(steps=[LLMAction(goal="Determine the age of the user.")]),
),
Goal(
name="Find the user's name",
plan=Plan(
steps=[
Goal(
name="Greet the user",
plan=Plan(steps=[LLMAction(goal="Greet the user.")]),
can_fail=False,
),
Goal(
name="Ask for name",
plan=Plan(steps=[LLMAction(goal="Obtain the user's name.")]),
),
]
),
),
Goal(name="Tell a joke", plan=Plan(steps=[LLMAction(goal="Tell a joke.")])),
],
id=1,
),
Phase(
id=2,
norms=[
BasicNorm(norm="Use very gentle speech."),
ConditionalNorm(
condition=SemanticBelief(
description="We are talking to a child", name="talking to child"
),
norm="Do not use cuss words",
),
],
triggers=[
Trigger(
condition=InferredBelief(
left=KeywordBelief(keyword="help"),
right=SemanticBelief(description="User is stuck", name="stuck"),
operator=LogicalOperator.OR,
name="help_or_stuck",
),
plan=Plan(
steps=[
Goal(
name="Unblock user",
plan=Plan(
steps=[
LLMAction(
goal="Provide a step-by-step path to "
"resolve the user's issue."
)
]
),
),
]
),
),
],
goals=[
Goal(
name="Clarify intent",
plan=Plan(
steps=[
LLMAction(
goal="Ask 1-2 targeted questions to clarify the "
"user's intent, then proceed."
)
]
),
),
Goal(
name="Provide solution",
plan=Plan(
steps=[LLMAction(goal="Deliver a solution to complete the user's goal.")]
),
),
Goal(
name="Summarize next steps",
plan=Plan(steps=[LLMAction(goal="Summarize what the user should do next.")]),
),
],
),
]
)
def do_things():
print(AgentSpeakGenerator().generate(test_program))
class AgentSpeakGenerator:
"""
Converts Pydantic representation of behavior programs into AgentSpeak(L) code string.
"""
def generate(self, program: Program) -> str:
lines = []
lines.append("")
lines += self._generate_initial_beliefs(program)
lines += self._generate_norms(program)
lines += self._generate_belief_inference(program)
lines += self._generate_goals(program)
lines += self._generate_triggers(program)
return "\n".join(lines)
def _generate_initial_beliefs(self, program: Program) -> list[str]:
lines = []
lines.append("// --- Initial beliefs ---")
lines.append(f"phase({program.phases[0].id}).")
lines += ["", ""]
return lines
def _generate_norms(self, program: Program) -> list[str]:
lines = []
lines.append("// --- Norms ---")
for phase in program.phases:
for norm in phase.norms:
if type(norm) is BasicNorm:
lines.append(f"{self._slugify(norm)} :- phase({phase.id}).")
if type(norm) is ConditionalNorm:
lines.append(
f"{self._slugify(norm)} :- phase({phase.id}) & "
f"{self._slugify(norm.condition)}."
)
lines += ["", ""]
return lines
def _generate_belief_inference(self, program: Program) -> list[str]:
lines = []
lines.append("// --- Belief inference rules ---")
for phase in program.phases:
for norm in phase.norms:
if not isinstance(norm, ConditionalNorm):
continue
lines += self._belief_inference_recursive(norm.condition)
for trigger in phase.triggers:
lines += self._belief_inference_recursive(trigger.condition)
lines += ["", ""]
return lines
def _belief_inference_recursive(self, belief: Belief) -> list[str]:
lines = []
if type(belief) is KeywordBelief:
lines.append(
f"{self._slugify(belief)} :- user_said(Message) & "
f'.substring(Message, "{belief.keyword}", Pos) & Pos >= 0.'
)
if type(belief) is InferredBelief:
lines.append(
f"{self._slugify(belief)} :- {self._slugify(belief.left)} "
f"{'&' if belief.operator == LogicalOperator.AND else '|'} "
f"{self._slugify(belief.right)}."
)
lines += self._belief_inference_recursive(belief.left)
lines += self._belief_inference_recursive(belief.right)
return lines
def _generate_goals(self, program: Program) -> list[str]:
lines = []
lines.append("// --- Goals ---")
for phase in program.phases:
previous_goal: Goal | None = None
for goal in phase.goals:
lines += self._generate_goal_plan_recursive(goal, phase, previous_goal)
previous_goal = goal
lines += ["", ""]
return lines
def _generate_goal_plan_recursive(
self, goal: Goal, phase: Phase, previous_goal: Goal | None = None
) -> list[str]:
lines = []
lines.append(f"+{self._slugify(goal, include_prefix=True)}")
# Context
lines.append(f"{' ' * 2}:{' ' * 3}phase({phase.id}) &")
lines.append(f"{' ' * 6}not responded_this_turn &")
lines.append(f"{' ' * 6}not achieved_{self._slugify(goal)} &")
if previous_goal:
lines.append(f"{' ' * 6}achieved_{self._slugify(previous_goal)}")
else:
lines.append(f"{' ' * 6}true")
extra_goals_to_generate = []
steps = goal.plan.steps
if len(steps) == 0:
lines.append(f"{' ' * 2}<-{' ' * 2}true.")
return lines
first_step = steps[0]
lines.append(
f"{' ' * 2}<-{' ' * 2}{self._slugify(first_step, include_prefix=True)}"
f"{'.' if len(steps) == 1 and goal.can_fail else ';'}"
)
if isinstance(first_step, Goal):
extra_goals_to_generate.append(first_step)
for step in steps[1:-1]:
lines.append(f"{' ' * 6}{self._slugify(step, include_prefix=True)};")
if isinstance(step, Goal):
extra_goals_to_generate.append(step)
if len(steps) > 1:
last_step = steps[-1]
lines.append(
f"{' ' * 6}{self._slugify(last_step, include_prefix=True)}"
f"{'.' if goal.can_fail else ';'}"
)
if isinstance(last_step, Goal):
extra_goals_to_generate.append(last_step)
if not goal.can_fail:
lines.append(f"{' ' * 6}+achieved_{self._slugify(goal)}.")
lines.append("")
extra_previous_goal: Goal | None = None
for extra_goal in extra_goals_to_generate:
lines += self._generate_goal_plan_recursive(extra_goal, phase, extra_previous_goal)
extra_previous_goal = extra_goal
return lines
def _generate_triggers(self, program: Program) -> list[str]:
lines = []
lines.append("// --- Triggers ---")
for phase in program.phases:
for trigger in phase.triggers:
lines += self._generate_trigger_plan(trigger, phase)
lines += ["", ""]
return lines
def _generate_trigger_plan(self, trigger: Trigger, phase: Phase) -> list[str]:
lines = []
belief_name = self._slugify(trigger.condition)
lines.append(f"+{belief_name}")
lines.append(f"{' ' * 2}:{' ' * 3}phase({phase.id})")
extra_goals_to_generate = []
steps = trigger.plan.steps
if len(steps) == 0:
lines.append(f"{' ' * 2}<-{' ' * 2}true.")
return lines
first_step = steps[0]
lines.append(
f"{' ' * 2}<-{' ' * 2}{self._slugify(first_step, include_prefix=True)}"
f"{'.' if len(steps) == 1 else ';'}"
)
if isinstance(first_step, Goal):
extra_goals_to_generate.append(first_step)
for step in steps[1:-1]:
lines.append(f"{' ' * 6}{self._slugify(step, include_prefix=True)};")
if isinstance(step, Goal):
extra_goals_to_generate.append(step)
if len(steps) > 1:
last_step = steps[-1]
lines.append(f"{' ' * 6}{self._slugify(last_step, include_prefix=True)}.")
if isinstance(last_step, Goal):
extra_goals_to_generate.append(last_step)
lines.append("")
extra_previous_goal: Goal | None = None
for extra_goal in extra_goals_to_generate:
lines += self._generate_trigger_plan_recursive(extra_goal, phase, extra_previous_goal)
extra_previous_goal = extra_goal
return lines
def _generate_trigger_plan_recursive(
self, goal: Goal, phase: Phase, previous_goal: Goal | None = None
) -> list[str]:
lines = []
lines.append(f"+{self._slugify(goal, include_prefix=True)}")
extra_goals_to_generate = []
steps = goal.plan.steps
if len(steps) == 0:
lines.append(f"{' ' * 2}<-{' ' * 2}true.")
return lines
first_step = steps[0]
lines.append(
f"{' ' * 2}<-{' ' * 2}{self._slugify(first_step, include_prefix=True)}"
f"{'.' if len(steps) == 1 and goal.can_fail else ';'}"
)
if isinstance(first_step, Goal):
extra_goals_to_generate.append(first_step)
for step in steps[1:-1]:
lines.append(f"{' ' * 6}{self._slugify(step, include_prefix=True)};")
if isinstance(step, Goal):
extra_goals_to_generate.append(step)
if len(steps) > 1:
last_step = steps[-1]
lines.append(
f"{' ' * 6}{self._slugify(last_step, include_prefix=True)}"
f"{'.' if goal.can_fail else ';'}"
)
if isinstance(last_step, Goal):
extra_goals_to_generate.append(last_step)
if not goal.can_fail:
lines.append(f"{' ' * 6}+achieved_{self._slugify(goal)}.")
lines.append("")
extra_previous_goal: Goal | None = None
for extra_goal in extra_goals_to_generate:
lines += self._generate_goal_plan_recursive(extra_goal, phase, extra_previous_goal)
extra_previous_goal = extra_goal
return lines
def _slugify(self, element: ProgramElement, include_prefix: bool = False) -> str:
def base_slugify_call(text: str):
return slugify(text, separator="_", stopwords=["a", "the"])
if type(element) is KeywordBelief:
return f'keyword_said("{element.keyword}")'
if type(element) is SemanticBelief:
name = element.name
return f"semantic_{base_slugify_call(name if name else element.description)}"
if isinstance(element, BasicNorm):
return f'norm("{element.norm}")'
if isinstance(element, Goal):
return f"{'!' if include_prefix else ''}{base_slugify_call(element.name)}"
if isinstance(element, SpeechAction):
return f'.say("{element.text}")'
if isinstance(element, GestureAction):
return f'.gesture("{element.gesture}")'
if isinstance(element, LLMAction):
return f'!generate_response_with_goal("{element.goal}")'
if isinstance(element, Action.__value__):
raise NotImplementedError(
"Have not implemented an ASL string representation for this action."
)
if element.name == "":
raise ValueError("Name must be initialized for this type of ProgramElement.")
return base_slugify_call(element.name)
def _extract_basic_beliefs_from_program(self, program: Program) -> list[BasicBelief]:
beliefs = []
for phase in program.phases:
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += self._extract_basic_beliefs_from_belief(norm.condition)
for trigger in phase.triggers:
beliefs += self._extract_basic_beliefs_from_belief(trigger.condition)
return beliefs
def _extract_basic_beliefs_from_belief(self, belief: Belief) -> list[BasicBelief]:
if isinstance(belief, InferredBelief):
return self._extract_basic_beliefs_from_belief(
belief.left
) + self._extract_basic_beliefs_from_belief(belief.right)
return [belief]
class BDIProgramManager(BaseAgent):
"""
BDI Program Manager Agent.
This agent is responsible for receiving high-level programs (sequences of instructions/goals)
from the external HTTP API (via ZMQ) and translating them into core beliefs (norms and goals)
for the BDI Core Agent. In the future, it will be responsible for determining when goals are
met, and passing on new norms and goals accordingly.
:ivar sub_socket: The ZMQ SUB socket used to receive program updates.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sub_socket = None
# async def _send_to_bdi(self, program: Program):
# """
# Convert a received program into BDI beliefs and send them to the BDI Core Agent.
#
# Currently, it takes the **first phase** of the program and extracts:
# - **Norms**: Constraints or rules the agent must follow.
# - **Goals**: Objectives the agent must achieve.
#
# These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will
# overwrite any existing norms/goals of the same name in the BDI agent.
#
# :param program: The program object received from the API.
# """
# first_phase = program.phases[0]
# norms_belief = Belief(
# name="norms",
# arguments=[norm.norm for norm in first_phase.norms],
# replace=True,
# )
# goals_belief = Belief(
# name="goals",
# arguments=[goal.description for goal in first_phase.goals],
# replace=True,
# )
# program_beliefs = BeliefMessage(beliefs=[norms_belief, goals_belief])
#
# message = InternalMessage(
# to=settings.agent_settings.bdi_core_name,
# sender=self.name,
# body=program_beliefs.model_dump_json(),
# thread="beliefs",
# )
# await self.send(message)
# self.logger.debug("Sent new norms and goals to the BDI agent.")
async def _receive_programs(self):
"""
Continuous loop that receives program updates from the HTTP endpoint.
It listens to the ``program`` topic on the internal ZMQ SUB socket.
When a program is received, it is validated and forwarded to BDI via :meth:`_send_to_bdi`.
"""
while True:
topic, body = await self.sub_socket.recv_multipart()
try:
program = Program.model_validate_json(body)
except ValidationError:
self.logger.exception("Received an invalid program.")
continue
await self._send_to_bdi(program)
async def setup(self):
"""
Initialize the agent.
Connects the internal ZMQ SUB socket and subscribes to the 'program' topic.
Starts the background behavior to receive programs.
"""
context = Context.instance()
self.sub_socket = context.socket(zmq.SUB)
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
self.sub_socket.subscribe("program")
self.add_behavior(self._receive_programs())
if __name__ == "__main__":
do_things()