refactor: program restructure

Also includes some AgentSpeak generation.

ref: N25B-376
This commit is contained in:
2025-12-16 10:21:50 +01:00
parent de2e56ffce
commit d043c54336
4 changed files with 532 additions and 69 deletions

View File

@@ -15,6 +15,7 @@ dependencies = [
"pydantic>=2.12.0",
"pydantic-settings>=2.11.0",
"python-json-logger>=4.0.0",
"python-slugify>=8.0.4",
"pyyaml>=6.0.3",
"pyzmq>=27.1.0",
"silero-vad>=6.0.0",

View File

@@ -1,12 +1,311 @@
import zmq
from pydantic import ValidationError
from slugify import slugify
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief, BeliefMessage
from control_backend.schemas.program import Program
from control_backend.schemas.program import (
Action,
BasicBelief,
BasicNorm,
Belief,
ConditionalNorm,
GestureAction,
Goal,
InferredBelief,
KeywordBelief,
LLMAction,
LogicalOperator,
Phase,
Plan,
Program,
ProgramElement,
SemanticBelief,
SpeechAction,
)
test_program = Program(
phases=[
Phase(
norms=[
BasicNorm(norm="Talk like a pirate"),
ConditionalNorm(
condition=InferredBelief(
left=KeywordBelief(keyword="Arr"),
right=SemanticBelief(description="testing", name="semantic belief"),
operator=LogicalOperator.OR,
name="Talking to a pirate",
),
norm="Use nautical terms",
),
ConditionalNorm(
condition=SemanticBelief(
description="We are talking to a child", name="talking to child"
),
norm="Do not use cuss words",
),
],
triggers=[
# Trigger(
# condition=InferredBelief(
# left=KeywordBelief(keyword="key"),
# right=InferredBelief(
# left=KeywordBelief(keyword="key2"),
# right=SemanticBelief(
# description="Decode this", name="semantic belief 2"
# ),
# operator=LogicalOperator.OR,
# name="test trigger inferred inner",
# ),
# operator=LogicalOperator.OR,
# name="test trigger inferred outer",
# ),
# plan=Plan(steps=[]),
# )
],
goals=[
Goal(
name="Determine user age",
plan=Plan(steps=[LLMAction(goal="Determine the age of the user.")]),
),
Goal(
name="Find the user's name",
plan=Plan(
steps=[
Goal(
name="Greet the user",
plan=Plan(steps=[LLMAction(goal="Greet the user.")]),
can_fail=False,
),
Goal(
name="Ask for name",
plan=Plan(steps=[LLMAction(goal="Obtain the user's name.")]),
),
]
),
),
Goal(name="Tell a joke", plan=Plan(steps=[LLMAction(goal="Tell a joke.")])),
],
id=1,
)
]
)
class AgentSpeakGenerator:
"""
Converts Pydantic representation of behavior programs into AgentSpeak(L) code string.
"""
def generate(self, program: Program) -> str:
lines = []
lines.append("")
lines += self._generate_initial_beliefs(program)
lines += self._generate_norms(program)
lines += self._generate_belief_inference(program)
lines += self._generate_goals(program)
lines += self._generate_triggers(program)
return "\n".join(lines)
def _generate_initial_beliefs(self, program: Program) -> list[str]:
lines = []
lines.append("// --- Initial beliefs ---")
lines.append(f"phase({program.phases[0].id}).")
lines += ["", ""]
return lines
def _generate_norms(self, program: Program) -> list[str]:
lines = []
lines.append("// --- Norms ---")
for phase in program.phases:
for norm in phase.norms:
if type(norm) is BasicNorm:
lines.append(f"{self._slugify(norm)} :- phase({phase.id}).")
if type(norm) is ConditionalNorm:
lines.append(
f"{self._slugify(norm)} :- phase({phase.id}) & "
f"{self._slugify(norm.condition)}."
)
lines += ["", ""]
return lines
def _generate_belief_inference(self, program: Program) -> list[str]:
lines = []
lines.append("// --- Belief inference rules ---")
for phase in program.phases:
for norm in phase.norms:
if not isinstance(norm, ConditionalNorm):
continue
lines += self._belief_inference_recursive(norm.condition)
for trigger in phase.triggers:
lines += self._belief_inference_recursive(trigger.condition)
lines += ["", ""]
return lines
def _belief_inference_recursive(self, belief: Belief) -> list[str]:
lines = []
if type(belief) is KeywordBelief:
lines.append(
f"{self._slugify(belief)} :- user_said(Message) & "
f'.substring(Message, "{belief.keyword}", Pos) & Pos >= 0.'
)
if type(belief) is InferredBelief:
lines.append(
f"{self._slugify(belief)} :- {self._slugify(belief.left)} "
f"{'&' if belief.operator == LogicalOperator.AND else '|'} "
f"{self._slugify(belief.right)}."
)
lines += self._belief_inference_recursive(belief.left)
lines += self._belief_inference_recursive(belief.right)
return lines
def _generate_goals(self, program: Program) -> list[str]:
lines = []
lines.append("// --- Goals ---")
for phase in program.phases:
previous_goal: Goal | None = None
for goal in phase.goals:
lines += self._generate_plan_recursive(goal, phase, previous_goal)
previous_goal = goal
lines += ["", ""]
return lines
def _generate_plan_recursive(
self, goal: Goal, phase: Phase, previous_goal: Goal | None = None
) -> list[str]:
lines = []
lines.append(f"+{self._slugify(goal, include_prefix=True)}")
# Context
lines.append(f"{' ' * 2}:{' ' * 3}phase({phase.id}) &")
lines.append(f"{' ' * 6}not responded_this_turn &")
lines.append(f"{' ' * 6}not achieved_{self._slugify(goal)} &")
if previous_goal:
lines.append(f"{' ' * 6}achieved_{self._slugify(previous_goal)}")
else:
lines.append(f"{' ' * 6}true")
extra_goals_to_generate = []
steps = goal.plan.steps
first_step = steps[0]
lines.append(
f"{' ' * 2}<-{' ' * 2}{self._slugify(first_step, include_prefix=True)}"
f"{'.' if len(steps) == 1 and goal.can_fail else ';'}"
)
if isinstance(first_step, Goal):
extra_goals_to_generate.append(first_step)
for step in steps[1:-1]:
lines.append(f"{' ' * 6}{self._slugify(step, include_prefix=True)};")
if isinstance(step, Goal):
extra_goals_to_generate.append(step)
if len(steps) > 1:
last_step = steps[-1]
lines.append(
f"{' ' * 6}{self._slugify(last_step, include_prefix=True)}"
f"{'.' if goal.can_fail else ';'}"
)
if isinstance(last_step, Goal):
extra_goals_to_generate.append(last_step)
if not goal.can_fail:
lines.append(f"{' ' * 6}+achieved_{self._slugify(goal)}.")
lines.append("")
extra_previous_goal: Goal | None = None
for extra_goal in extra_goals_to_generate:
lines += self._generate_plan_recursive(extra_goal, phase, extra_previous_goal)
extra_previous_goal = extra_goal
return lines
def _generate_triggers(self, program: Program) -> list[str]:
lines = []
lines.append("// --- Triggers ---")
lines += ["", ""]
return lines
def _slugify(self, element: ProgramElement, include_prefix: bool = False) -> str:
def base_slugify_call(text: str):
return slugify(text, separator="_", stopwords=["a", "the"])
if type(element) is KeywordBelief:
return f'keyword_said("{element.keyword}")'
if type(element) is SemanticBelief:
name = element.name
return f"semantic_{base_slugify_call(name if name else element.description)}"
if isinstance(element, BasicNorm):
return f'norm("{element.norm}")'
if isinstance(element, Goal):
return f"{'!' if include_prefix else ''}{base_slugify_call(element.name)}"
if isinstance(element, SpeechAction):
return f'.say("{element.text}")'
if isinstance(element, GestureAction):
return f'.gesture("{element.gesture}")'
if isinstance(element, LLMAction):
return f'!generate_response_with_goal("{element.goal}")'
if isinstance(element, Action.__value__):
raise NotImplementedError(
"Have not implemented an ASL string representation for this action."
)
if element.name == "":
raise ValueError("Name must be initialized for this type of ProgramElement.")
return base_slugify_call(element.name)
def _extract_basic_beliefs_from_program(self, program: Program) -> list[BasicBelief]:
beliefs = []
for phase in program.phases:
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += self._extract_basic_beliefs_from_belief(norm.condition)
for trigger in phase.triggers:
beliefs += self._extract_basic_beliefs_from_belief(trigger.condition)
return beliefs
def _extract_basic_beliefs_from_belief(self, belief: Belief) -> list[BasicBelief]:
if isinstance(belief, InferredBelief):
return self._extract_basic_beliefs_from_belief(
belief.left
) + self._extract_basic_beliefs_from_belief(belief.right)
return [belief]
class BDIProgramManager(BaseAgent):
@@ -25,40 +324,40 @@ class BDIProgramManager(BaseAgent):
super().__init__(**kwargs)
self.sub_socket = None
async def _send_to_bdi(self, program: Program):
"""
Convert a received program into BDI beliefs and send them to the BDI Core Agent.
Currently, it takes the **first phase** of the program and extracts:
- **Norms**: Constraints or rules the agent must follow.
- **Goals**: Objectives the agent must achieve.
These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will
overwrite any existing norms/goals of the same name in the BDI agent.
:param program: The program object received from the API.
"""
first_phase = program.phases[0]
norms_belief = Belief(
name="norms",
arguments=[norm.norm for norm in first_phase.norms],
replace=True,
)
goals_belief = Belief(
name="goals",
arguments=[goal.description for goal in first_phase.goals],
replace=True,
)
program_beliefs = BeliefMessage(beliefs=[norms_belief, goals_belief])
message = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=program_beliefs.model_dump_json(),
thread="beliefs",
)
await self.send(message)
self.logger.debug("Sent new norms and goals to the BDI agent.")
# async def _send_to_bdi(self, program: Program):
# """
# Convert a received program into BDI beliefs and send them to the BDI Core Agent.
#
# Currently, it takes the **first phase** of the program and extracts:
# - **Norms**: Constraints or rules the agent must follow.
# - **Goals**: Objectives the agent must achieve.
#
# These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will
# overwrite any existing norms/goals of the same name in the BDI agent.
#
# :param program: The program object received from the API.
# """
# first_phase = program.phases[0]
# norms_belief = Belief(
# name="norms",
# arguments=[norm.norm for norm in first_phase.norms],
# replace=True,
# )
# goals_belief = Belief(
# name="goals",
# arguments=[goal.description for goal in first_phase.goals],
# replace=True,
# )
# program_beliefs = BeliefMessage(beliefs=[norms_belief, goals_belief])
#
# message = InternalMessage(
# to=settings.agent_settings.bdi_core_name,
# sender=self.name,
# body=program_beliefs.model_dump_json(),
# thread="beliefs",
# )
# await self.send(message)
# self.logger.debug("Sent new norms and goals to the BDI agent.")
async def _receive_programs(self):
"""

View File

@@ -1,64 +1,204 @@
from enum import Enum
from pydantic import BaseModel
class Norm(BaseModel):
class ProgramElement(BaseModel):
"""
Represents a behavioral norm.
Represents a basic element of our behavior program.
:ivar name: The researcher-assigned name of the element.
:ivar id: Unique identifier.
:ivar label: Human-readable label.
:ivar norm: The actual norm text describing the behavior.
"""
id: str
label: str
norm: str
name: str
id: int
class Goal(BaseModel):
class LogicalOperator(Enum):
AND = "AND"
OR = "OR"
type Belief = KeywordBelief | SemanticBelief | InferredBelief
type BasicBelief = KeywordBelief | SemanticBelief
class KeywordBelief(ProgramElement):
"""
Represents an objective to be achieved.
Represents a belief that is set when the user spoken text contains a certain keyword.
:ivar id: Unique identifier.
:ivar label: Human-readable label.
:ivar description: Detailed description of the goal.
:ivar achieved: Status flag indicating if the goal has been met.
:ivar keyword: The keyword on which this belief gets set.
"""
id: str
label: str
description: str
achieved: bool
class TriggerKeyword(BaseModel):
id: str
name: str = ""
id: int = -1
keyword: str
class KeywordTrigger(BaseModel):
id: str
label: str
type: str
keywords: list[TriggerKeyword]
class SemanticBelief(ProgramElement):
"""
Represents a belief that is set by semantic LLM validation.
:ivar description: Description of how to form the belief, used by the LLM.
"""
name: str = ""
id: int = -1
description: str
class Phase(BaseModel):
class InferredBelief(ProgramElement):
"""
Represents a belief that gets formed by combining two beliefs with a logical AND or OR.
These beliefs can also be :class:`InferredBelief`, leading to arbitrarily deep nesting.
:ivar operator: The logical operator to apply.
:ivar left: The left part of the logical expression.
:ivar right: The right part of the logical expression.
"""
name: str = ""
id: int = -1
operator: LogicalOperator
left: Belief
right: Belief
type Norm = BasicNorm | ConditionalNorm
class BasicNorm(ProgramElement):
"""
Represents a behavioral norm.
:ivar norm: The actual norm text describing the behavior.
:ivar critical: When true, this norm should absolutely not be violated (checked separately).
"""
name: str = ""
id: int = -1
norm: str
critical: bool = False
class ConditionalNorm(BasicNorm):
"""
Represents a norm that is only active when a condition is met (i.e., a certain belief holds).
:ivar condition: When to activate this norm.
"""
name: str = ""
id: int = -1
condition: Belief
type PlanElement = Goal | Action
class Plan(ProgramElement):
"""
Represents a list of steps to execute. Each of these steps can be a goal (with its own plan)
or a simple action.
:ivar steps: The actions or subgoals to execute, in order.
"""
name: str = ""
id: int = -1
steps: list[PlanElement]
class Goal(ProgramElement):
"""
Represents an objective to be achieved. To reach the goal, we should execute
the corresponding plan. If we can fail to achieve a goal after executing the plan,
for example when the achieving of the goal is dependent on the user's reply, this means
that the achieved status will be set from somewhere else in the program.
:ivar plan: The plan to execute.
:ivar can_fail: Whether we can fail to achieve the goal after executing the plan.
"""
id: int = -1
plan: Plan
can_fail: bool = True
type Action = SpeechAction | GestureAction | LLMAction
class SpeechAction(ProgramElement):
"""
Represents the action of the robot speaking a literal text.
:ivar text: The text to speak.
"""
name: str = ""
id: int = -1
text: str
# TODO: gestures
class Gesture(Enum):
RAISE_HAND = "RAISE_HAND"
class GestureAction(ProgramElement):
"""
Represents the action of the robot performing a gesture.
:ivar gesture: The gesture to perform.
"""
name: str = ""
id: int = -1
gesture: Gesture
class LLMAction(ProgramElement):
"""
Represents the action of letting an LLM generate a reply based on its chat history
and an additional goal added in the prompt.
:ivar goal: The extra (temporary) goal to add to the LLM.
"""
name: str = ""
id: int = -1
goal: str
class Trigger(ProgramElement):
"""
Represents a belief-based trigger. When a belief is set, the corresponding plan is executed.
:ivar condition: When to activate the trigger.
:ivar plan: The plan to execute.
"""
name: str = ""
id: int = -1
condition: Belief
plan: Plan
class Phase(ProgramElement):
"""
A distinct phase within a program, containing norms, goals, and triggers.
:ivar id: Unique identifier.
:ivar label: Human-readable label.
:ivar norms: List of norms active in this phase.
:ivar goals: List of goals to pursue in this phase.
:ivar triggers: List of triggers that define transitions out of this phase.
"""
id: str
label: str
name: str = ""
norms: list[Norm]
goals: list[Goal]
triggers: list[KeywordTrigger]
triggers: list[Trigger]
class Program(BaseModel):

23
uv.lock generated
View File

@@ -997,6 +997,7 @@ dependencies = [
{ name = "pydantic" },
{ name = "pydantic-settings" },
{ name = "python-json-logger" },
{ name = "python-slugify" },
{ name = "pyyaml" },
{ name = "pyzmq" },
{ name = "silero-vad" },
@@ -1046,6 +1047,7 @@ requires-dist = [
{ name = "pydantic", specifier = ">=2.12.0" },
{ name = "pydantic-settings", specifier = ">=2.11.0" },
{ name = "python-json-logger", specifier = ">=4.0.0" },
{ name = "python-slugify", specifier = ">=8.0.4" },
{ name = "pyyaml", specifier = ">=6.0.3" },
{ name = "pyzmq", specifier = ">=27.1.0" },
{ name = "silero-vad", specifier = ">=6.0.0" },
@@ -1341,6 +1343,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/45/58/38b5afbc1a800eeea951b9285d3912613f2603bdf897a4ab0f4bd7f405fc/python_multipart-0.0.20-py3-none-any.whl", hash = "sha256:8a62d3a8335e06589fe01f2a3e178cdcc632f3fbe0d492ad9ee0ec35aab1f104", size = 24546, upload-time = "2024-12-16T19:45:44.423Z" },
]
[[package]]
name = "python-slugify"
version = "8.0.4"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "text-unidecode" },
]
sdist = { url = "https://files.pythonhosted.org/packages/87/c7/5e1547c44e31da50a460df93af11a535ace568ef89d7a811069ead340c4a/python-slugify-8.0.4.tar.gz", hash = "sha256:59202371d1d05b54a9e7720c5e038f928f45daaffe41dd10822f3907b937c856", size = 10921, upload-time = "2024-02-08T18:32:45.488Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a4/62/02da182e544a51a5c3ccf4b03ab79df279f9c60c5e82d5e8bec7ca26ac11/python_slugify-8.0.4-py2.py3-none-any.whl", hash = "sha256:276540b79961052b66b7d116620b36518847f52d5fd9e3a70164fc8c50faa6b8", size = 10051, upload-time = "2024-02-08T18:32:43.911Z" },
]
[[package]]
name = "pyyaml"
version = "6.0.3"
@@ -1864,6 +1878,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a2/09/77d55d46fd61b4a135c444fc97158ef34a095e5681d0a6c10b75bf356191/sympy-1.14.0-py3-none-any.whl", hash = "sha256:e091cc3e99d2141a0ba2847328f5479b05d94a6635cb96148ccb3f34671bd8f5", size = 6299353, upload-time = "2025-04-27T18:04:59.103Z" },
]
[[package]]
name = "text-unidecode"
version = "1.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ab/e2/e9a00f0ccb71718418230718b3d900e71a5d16e701a3dae079a21e9cd8f8/text-unidecode-1.3.tar.gz", hash = "sha256:bad6603bb14d279193107714b288be206cac565dfa49aa5b105294dd5c4aab93", size = 76885, upload-time = "2019-08-30T21:36:45.405Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a6/a5/c0b6468d3824fe3fde30dbb5e1f687b291608f9473681bbf7dabbf5a87d7/text_unidecode-1.3-py2.py3-none-any.whl", hash = "sha256:1311f10e8b895935241623731c2ba64f4c455287888b18189350b67134a822e8", size = 78154, upload-time = "2019-08-30T21:37:03.543Z" },
]
[[package]]
name = "tiktoken"
version = "0.12.0"