diff --git a/pyproject.toml b/pyproject.toml index e57a03c..cdc2ce3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "pydantic>=2.12.0", "pydantic-settings>=2.11.0", "python-json-logger>=4.0.0", + "python-slugify>=8.0.4", "pyyaml>=6.0.3", "pyzmq>=27.1.0", "silero-vad>=6.0.0", diff --git a/src/control_backend/agents/bdi/asl_ast.py b/src/control_backend/agents/bdi/asl_ast.py new file mode 100644 index 0000000..104570b --- /dev/null +++ b/src/control_backend/agents/bdi/asl_ast.py @@ -0,0 +1,203 @@ +import typing +from dataclasses import dataclass, field + +# --- Types --- + + +@dataclass +class BeliefLiteral: + """ + Represents a literal or atom. + Example: phase(1), user_said("hello"), ~started + """ + + functor: str + args: list[str] = field(default_factory=list) + negated: bool = False + + def __str__(self): + # In ASL, 'not' is usually for closed-world assumption (prolog style), + # '~' is for explicit negation in beliefs. + # For simplicity in behavior trees, we often use 'not' for conditions. + prefix = "not " if self.negated else "" + if not self.args: + return f"{prefix}{self.functor}" + + # Clean args to ensure strings are quoted if they look like strings, + # but usually the converter handles the quoting of string literals. + args_str = ", ".join(self.args) + return f"{prefix}{self.functor}({args_str})" + + +@dataclass +class GoalLiteral: + name: str + + def __str__(self): + return f"!{self.name}" + + +@dataclass +class ActionLiteral: + """ + Represents a step in a plan body. + Example: .say("Hello") or !achieve_goal + """ + + code: str + + def __str__(self): + return self.code + + +@dataclass +class BinaryOp: + """ + Represents logical operations. + Example: (A & B) | C + """ + + left: "Expression | str" + operator: typing.Literal["&", "|"] + right: "Expression | str" + + def __str__(self): + l_str = str(self.left) + r_str = str(self.right) + + if isinstance(self.left, BinaryOp): + l_str = f"({l_str})" + if isinstance(self.right, BinaryOp): + r_str = f"({r_str})" + + return f"{l_str} {self.operator} {r_str}" + + +Literal = BeliefLiteral | GoalLiteral | ActionLiteral +Expression = Literal | BinaryOp | str + + +@dataclass +class Rule: + """ + Represents an inference rule. + Example: head :- body. + """ + + head: Expression + body: Expression | None = None + + def __str__(self): + if not self.body: + return f"{self.head}." + return f"{self.head} :- {self.body}." + + +@dataclass +class PersistentRule: + """ + Represents an inference rule, where the inferred belief is persistent when formed. + """ + + head: Expression + body: Expression + + def __str__(self): + if not self.body: + raise Exception("Rule without body should not be persistent.") + + lines = [] + + if isinstance(self.body, BinaryOp): + lines.append(f"+{self.body.left}") + if self.body.operator == "&": + lines.append(f" : {self.body.right}") + lines.append(f" <- +{self.head}.") + if self.body.operator == "|": + lines.append(f"+{self.body.right}") + lines.append(f" <- +{self.head}.") + + return "\n".join(lines) + + +@dataclass +class Plan: + """ + Represents a plan. + Syntax: +trigger : context <- body. + """ + + trigger: BeliefLiteral | GoalLiteral + context: list[Expression] = field(default_factory=list) + body: list[ActionLiteral] = field(default_factory=list) + + def __str__(self): + # Indentation settings + INDENT = " " + ARROW = "\n <- " + COLON = "\n : " + + # Build Header + header = f"+{self.trigger}" + if self.context: + ctx_str = f" &\n{INDENT}".join(str(c) for c in self.context) + header += f"{COLON}{ctx_str}" + + # Case 1: Empty body + if not self.body: + return f"{header}." + + # Case 2: Short body (optional optimization, keeping it uniform usually better) + header += ARROW + + lines = [] + # We start the first action on the same line or next line. + # Let's put it on the next line for readability if there are multiple. + + if len(self.body) == 1: + return f"{header}{self.body[0]}." + + # First item + lines.append(f"{header}{self.body[0]};") + # Middle items + for item in self.body[1:-1]: + lines.append(f"{INDENT}{item};") + # Last item + lines.append(f"{INDENT}{self.body[-1]}.") + + return "\n".join(lines) + + +@dataclass +class AgentSpeakFile: + """ + Root element representing the entire generated file. + """ + + initial_beliefs: list[Rule] = field(default_factory=list) + inference_rules: list[Rule | PersistentRule] = field(default_factory=list) + plans: list[Plan] = field(default_factory=list) + + def __str__(self): + sections = [] + + if self.initial_beliefs: + sections.append("// --- Initial Beliefs & Facts ---") + sections.extend(str(rule) for rule in self.initial_beliefs) + sections.append("") + + if self.inference_rules: + sections.append("// --- Inference Rules ---") + sections.extend(str(rule) for rule in self.inference_rules if isinstance(rule, Rule)) + sections.append("") + sections.extend( + str(rule) for rule in self.inference_rules if isinstance(rule, PersistentRule) + ) + sections.append("") + + if self.plans: + sections.append("// --- Plans ---") + # Separate plans by a newline for readability + sections.extend(str(plan) + "\n" for plan in self.plans) + + return "\n".join(sections) diff --git a/src/control_backend/agents/bdi/asl_gen.py b/src/control_backend/agents/bdi/asl_gen.py new file mode 100644 index 0000000..8233a36 --- /dev/null +++ b/src/control_backend/agents/bdi/asl_gen.py @@ -0,0 +1,425 @@ +import asyncio +import time +from functools import singledispatchmethod + +from slugify import slugify + +from control_backend.agents.bdi import BDICoreAgent +from control_backend.agents.bdi.asl_ast import ( + ActionLiteral, + AgentSpeakFile, + BeliefLiteral, + BinaryOp, + Expression, + GoalLiteral, + PersistentRule, + Plan, + Rule, +) +from control_backend.agents.bdi.bdi_program_manager import test_program +from control_backend.schemas.program import ( + BasicBelief, + Belief, + ConditionalNorm, + GestureAction, + Goal, + InferredBelief, + KeywordBelief, + LLMAction, + LogicalOperator, + Phase, + Program, + ProgramElement, + SemanticBelief, + SpeechAction, +) + + +async def do_things(): + res = input("Wanna generate") + if res == "y": + program = AgentSpeakGenerator().generate(test_program) + filename = f"{int(time.time())}.asl" + with open(filename, "w") as f: + f.write(program) + else: + # filename = "0test.asl" + filename = "1766062491.asl" + bdi_agent = BDICoreAgent("BDICoreAgent", filename) + flag = asyncio.Event() + await bdi_agent.start() + await flag.wait() + + +def do_other_things(): + print(AgentSpeakGenerator().generate(test_program)) + + +class AgentSpeakGenerator: + """ + Converts a Pydantic Program behavior model into an AgentSpeak(L) AST, + then renders it to a string. + """ + + def generate(self, program: Program) -> str: + asl = AgentSpeakFile() + + self._generate_startup(program, asl) + + for i, phase in enumerate(program.phases): + next_phase = program.phases[i + 1] if i < len(program.phases) - 1 else None + + self._generate_phase_flow(phase, next_phase, asl) + + self._generate_norms(phase, asl) + + self._generate_goals(phase, asl) + + self._generate_triggers(phase, asl) + + self._generate_fallbacks(program, asl) + + return str(asl) + + # --- Section: Startup & Phase Management --- + + def _generate_startup(self, program: Program, asl: AgentSpeakFile): + if not program.phases: + return + + # Initial belief: phase(start). + asl.initial_beliefs.append(Rule(head=BeliefLiteral("phase", ['"start"']))) + + # Startup plan: +started : phase(start) <- -phase(start); +phase(first_id). + asl.plans.append( + Plan( + trigger=BeliefLiteral("started"), + context=[BeliefLiteral("phase", ['"start"'])], + body=[ + ActionLiteral('-phase("start")'), + ActionLiteral(f'+phase("{program.phases[0].id}")'), + ], + ) + ) + + # Initial plans: + asl.plans.append( + Plan( + trigger=GoalLiteral("generate_response_with_goal(Goal)"), + context=[BeliefLiteral("user_said", ["Message"])], + body=[ + ActionLiteral("+responded_this_turn"), + ActionLiteral(".findall(Norm, norm(Norm), Norms)"), + ActionLiteral(".reply_with_goal(Message, Norms, Goal)"), + ], + ) + ) + + def _generate_phase_flow(self, phase: Phase, next_phase: Phase | None, asl: AgentSpeakFile): + """Generates the main loop listener and the transition logic for this phase.""" + + # +user_said(Message) : phase(ID) <- !goal1; !goal2; !transition_phase. + goal_actions = [ActionLiteral("-responded_this_turn")] + goal_actions += [ + ActionLiteral(f"!check_{self._slugify_str(keyword)}") + for keyword in self._get_keyword_conditionals(phase) + ] + goal_actions += [ActionLiteral(f"!{self._slugify(g)}") for g in phase.goals] + goal_actions.append(ActionLiteral("!transition_phase")) + + asl.plans.append( + Plan( + trigger=BeliefLiteral("user_said", ["Message"]), + context=[BeliefLiteral("phase", [f'"{phase.id}"'])], + body=goal_actions, + ) + ) + + # +!transition_phase : phase(ID) <- -phase(ID); +(NEXT_ID). + next_id = str(next_phase.id) if next_phase else "end" + + transition_context = [BeliefLiteral("phase", [f'"{phase.id}"'])] + if phase.goals: + transition_context.append(BeliefLiteral(f"achieved_{self._slugify(phase.goals[-1])}")) + + asl.plans.append( + Plan( + trigger=GoalLiteral("transition_phase"), + context=transition_context, + body=[ + ActionLiteral(f'-phase("{phase.id}")'), + ActionLiteral(f'+phase("{next_id}")'), + ActionLiteral("user_said(Anything)"), + ActionLiteral("-+user_said(Anything)"), + ], + ) + ) + + def _get_keyword_conditionals(self, phase: Phase) -> list[str]: + res = [] + for belief in self._extract_basic_beliefs_from_phase(phase): + if isinstance(belief, KeywordBelief): + res.append(belief.keyword) + + return res + + # --- Section: Norms & Beliefs --- + + def _generate_norms(self, phase: Phase, asl: AgentSpeakFile): + for norm in phase.norms: + norm_slug = f'"{norm.norm}"' + head = BeliefLiteral("norm", [norm_slug]) + + # Base context is the phase + phase_lit = BeliefLiteral("phase", [f'"{phase.id}"']) + + if isinstance(norm, ConditionalNorm): + self._ensure_belief_inference(norm.condition, asl) + + condition_expr = self._belief_to_expr(norm.condition) + body = BinaryOp(phase_lit, "&", condition_expr) + else: + body = phase_lit + + asl.inference_rules.append(Rule(head=head, body=body)) + + def _ensure_belief_inference(self, belief: Belief, asl: AgentSpeakFile): + """ + Recursively adds rules to infer beliefs. + Checks strictly to avoid duplicates if necessary, + though ASL engines often handle redefinition or we can use a set to track processed IDs. + """ + if isinstance(belief, KeywordBelief): + pass + # # Rule: keyword_said("word") :- user_said(M) & .substring("word", M, P) & P >= 0. + # kwd_slug = f'"{belief.keyword}"' + # head = BeliefLiteral("keyword_said", [kwd_slug]) + # + # # Avoid duplicates + # if any(str(r.head) == str(head) for r in asl.inference_rules): + # return + # + # body = BinaryOp( + # BeliefLiteral("user_said", ["Message"]), + # "&", + # BinaryOp(f".substring({kwd_slug}, Message, Pos)", "&", "Pos >= 0"), + # ) + # + # asl.inference_rules.append(Rule(head=head, body=body)) + + elif isinstance(belief, InferredBelief): + self._ensure_belief_inference(belief.left, asl) + self._ensure_belief_inference(belief.right, asl) + + slug = self._slugify(belief) + head = BeliefLiteral(slug) + + if any(str(r.head) == str(head) for r in asl.inference_rules): + return + + op_char = "&" if belief.operator == LogicalOperator.AND else "|" + body = BinaryOp( + self._belief_to_expr(belief.left), op_char, self._belief_to_expr(belief.right) + ) + asl.inference_rules.append(PersistentRule(head=head, body=body)) + + def _belief_to_expr(self, belief: Belief) -> Expression: + if isinstance(belief, KeywordBelief): + return BeliefLiteral("keyword_said", [f'"{belief.keyword}"']) + else: + return BeliefLiteral(self._slugify(belief)) + + # --- Section: Goals --- + + def _generate_goals(self, phase: Phase, asl: AgentSpeakFile): + previous_goal: Goal | None = None + for goal in phase.goals: + self._generate_goal_plan_recursive(goal, str(phase.id), previous_goal, asl) + previous_goal = goal + + def _generate_goal_plan_recursive( + self, + goal: Goal, + phase_id: str, + previous_goal: Goal | None, + asl: AgentSpeakFile, + responded_needed: bool = True, + can_fail: bool = True, + ): + goal_slug = self._slugify(goal) + + # phase(ID) & not responded_this_turn & not achieved_goal + context = [ + BeliefLiteral("phase", [f'"{phase_id}"']), + ] + + if responded_needed: + context.append(BeliefLiteral("responded_this_turn", negated=True)) + if can_fail: + context.append(BeliefLiteral(f"achieved_{goal_slug}", negated=True)) + + if previous_goal: + prev_slug = self._slugify(previous_goal) + context.append(BeliefLiteral(f"achieved_{prev_slug}")) + + body_actions = [] + sub_goals_to_process = [] + + for step in goal.plan.steps: + if isinstance(step, Goal): + sub_slug = self._slugify(step) + body_actions.append(ActionLiteral(f"!{sub_slug}")) + sub_goals_to_process.append(step) + elif isinstance(step, SpeechAction): + body_actions.append(ActionLiteral(f'.say("{step.text}")')) + elif isinstance(step, GestureAction): + body_actions.append(ActionLiteral(f'.gesture("{step.gesture}")')) + elif isinstance(step, LLMAction): + body_actions.append(ActionLiteral(f'!generate_response_with_goal("{step.goal}")')) + + # Mark achievement + if not goal.can_fail: + body_actions.append(ActionLiteral(f"+achieved_{goal_slug}")) + + asl.plans.append(Plan(trigger=GoalLiteral(goal_slug), context=context, body=body_actions)) + asl.plans.append( + Plan(trigger=GoalLiteral(goal_slug), context=[], body=[ActionLiteral("true")]) + ) + + prev_sub = None + for sub_goal in sub_goals_to_process: + self._generate_goal_plan_recursive(sub_goal, phase_id, prev_sub, asl) + prev_sub = sub_goal + + # --- Section: Triggers --- + + def _generate_triggers(self, phase: Phase, asl: AgentSpeakFile): + for keyword in self._get_keyword_conditionals(phase): + asl.plans.append( + Plan( + trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"), + context=[ + ActionLiteral( + f'user_said(Message) & .substring("{keyword}", Message, Pos) & Pos >= 0' + ) + ], + body=[ + ActionLiteral(f'+keyword_said("{keyword}")'), + ActionLiteral(f'-keyword_said("{keyword}")'), + ], + ) + ) + asl.plans.append( + Plan( + trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"), + body=[ActionLiteral("true")], + ) + ) + + for trigger in phase.triggers: + self._ensure_belief_inference(trigger.condition, asl) + + trigger_belief_slug = self._belief_to_expr(trigger.condition) + + body_actions = [] + sub_goals = [] + + for step in trigger.plan.steps: + if isinstance(step, Goal): + sub_slug = self._slugify(step) + body_actions.append(ActionLiteral(f"!{sub_slug}")) + sub_goals.append(step) + elif isinstance(step, SpeechAction): + body_actions.append(ActionLiteral(f'.say("{step.text}")')) + elif isinstance(step, GestureAction): + body_actions.append( + ActionLiteral(f'.gesture("{step.gesture.type}", "{step.gesture.name}")') + ) + elif isinstance(step, LLMAction): + body_actions.append( + ActionLiteral(f'!generate_response_with_goal("{step.goal}")') + ) + + asl.plans.append( + Plan( + trigger=BeliefLiteral(trigger_belief_slug), + context=[BeliefLiteral("phase", [f'"{phase.id}"'])], + body=body_actions, + ) + ) + + # Recurse for triggered goals + prev_sub = None + for sub_goal in sub_goals: + self._generate_goal_plan_recursive( + sub_goal, str(phase.id), prev_sub, asl, False, False + ) + prev_sub = sub_goal + + # --- Section: Fallbacks --- + + def _generate_fallbacks(self, program: Program, asl: AgentSpeakFile): + asl.plans.append( + Plan(trigger=GoalLiteral("transition_phase"), context=[], body=[ActionLiteral("true")]) + ) + + # --- Helpers --- + + @singledispatchmethod + def _slugify(self, element: ProgramElement) -> str: + if element.name: + raise NotImplementedError("Cannot slugify this element.") + return self._slugify_str(element.name) + + @_slugify.register + def _(self, goal: Goal) -> str: + if goal.name: + return self._slugify_str(goal.name) + return f"goal_{goal.id.hex}" + + @_slugify.register + def _(self, kwb: KeywordBelief) -> str: + return f"keyword_said({kwb.keyword})" + + @_slugify.register + def _(self, sb: SemanticBelief) -> str: + return self._slugify_str(sb.description) + + @_slugify.register + def _(self, ib: InferredBelief) -> str: + return self._slugify_str(ib.name) + + def _slugify_str(self, text: str) -> str: + return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"]) + + def _extract_basic_beliefs_from_program(self, program: Program) -> list[BasicBelief]: + beliefs = [] + + for phase in program.phases: + beliefs.extend(self._extract_basic_beliefs_from_phase(phase)) + + return beliefs + + def _extract_basic_beliefs_from_phase(self, phase: Phase) -> list[BasicBelief]: + beliefs = [] + + for norm in phase.norms: + if isinstance(norm, ConditionalNorm): + beliefs += self._extract_basic_beliefs_from_belief(norm.condition) + + for trigger in phase.triggers: + beliefs += self._extract_basic_beliefs_from_belief(trigger.condition) + + return beliefs + + def _extract_basic_beliefs_from_belief(self, belief: Belief) -> list[BasicBelief]: + if isinstance(belief, InferredBelief): + return self._extract_basic_beliefs_from_belief( + belief.left + ) + self._extract_basic_beliefs_from_belief(belief.right) + return [belief] + + +if __name__ == "__main__": + asyncio.run(do_things()) + # do_other_things()y diff --git a/src/control_backend/agents/bdi/astv2.py b/src/control_backend/agents/bdi/astv2.py new file mode 100644 index 0000000..f88fb6a --- /dev/null +++ b/src/control_backend/agents/bdi/astv2.py @@ -0,0 +1,272 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from enum import StrEnum + + +class AstNode(ABC): + """ + Abstract base class for all elements of an AgentSpeak program. + """ + + @abstractmethod + def _to_agentspeak(self) -> str: + """ + Generates the AgentSpeak code string. + """ + pass + + def __str__(self) -> str: + return self._to_agentspeak() + + +class AstExpression(AstNode, ABC): + """ + Intermediate class for anything that can be used in a logical expression. + """ + + def __and__(self, other: ExprCoalescible) -> AstBinaryOp: + return AstBinaryOp(self, BinaryOperatorType.AND, _coalesce_expr(other)) + + def __or__(self, other: ExprCoalescible) -> AstBinaryOp: + return AstBinaryOp(self, BinaryOperatorType.OR, _coalesce_expr(other)) + + def __invert__(self) -> AstLogicalExpression: + if isinstance(self, AstLogicalExpression): + self.negated = not self.negated + return self + return AstLogicalExpression(self, negated=True) + + +type ExprCoalescible = AstExpression | str | int | float + + +def _coalesce_expr(value: ExprCoalescible) -> AstExpression: + if isinstance(value, AstExpression): + return value + if isinstance(value, str): + return AstString(value) + if isinstance(value, (int, float)): + return AstNumber(value) + raise TypeError(f"Cannot coalesce type {type(value)} into an AstTerm.") + + +@dataclass +class AstTerm(AstExpression, ABC): + """ + Base class for terms appearing inside literals. + """ + + def __ge__(self, other: ExprCoalescible) -> AstBinaryOp: + return AstBinaryOp(self, BinaryOperatorType.GREATER_EQUALS, _coalesce_expr(other)) + + def __gt__(self, other: ExprCoalescible) -> AstBinaryOp: + return AstBinaryOp(self, BinaryOperatorType.GREATER_THAN, _coalesce_expr(other)) + + def __le__(self, other: ExprCoalescible) -> AstBinaryOp: + return AstBinaryOp(self, BinaryOperatorType.LESS_EQUALS, _coalesce_expr(other)) + + def __lt__(self, other: ExprCoalescible) -> AstBinaryOp: + return AstBinaryOp(self, BinaryOperatorType.LESS_THAN, _coalesce_expr(other)) + + def __eq__(self, other: ExprCoalescible) -> AstBinaryOp: + return AstBinaryOp(self, BinaryOperatorType.EQUALS, _coalesce_expr(other)) + + def __ne__(self, other: ExprCoalescible) -> AstBinaryOp: + return AstBinaryOp(self, BinaryOperatorType.NOT_EQUALS, _coalesce_expr(other)) + + +@dataclass +class AstAtom(AstTerm): + """ + Grounded expression in all lowercase. + """ + + value: str + + def _to_agentspeak(self) -> str: + return self.value.lower() + + +@dataclass +class AstVar(AstTerm): + """ + Ungrounded variable expression. First letter capitalized. + """ + + name: str + + def _to_agentspeak(self) -> str: + return self.name.capitalize() + + +@dataclass +class AstNumber(AstTerm): + value: int | float + + def _to_agentspeak(self) -> str: + return str(self.value) + + +@dataclass +class AstString(AstTerm): + value: str + + def _to_agentspeak(self) -> str: + return f'"{self.value}"' + + +@dataclass +class AstLiteral(AstTerm): + functor: str + terms: list[AstTerm] = field(default_factory=list) + + def _to_agentspeak(self) -> str: + if not self.terms: + return self.functor + args = ", ".join(map(str, self.terms)) + return f"{self.functor}({args})" + + +class BinaryOperatorType(StrEnum): + AND = "&" + OR = "|" + GREATER_THAN = ">" + LESS_THAN = "<" + EQUALS = "==" + NOT_EQUALS = "\\==" + GREATER_EQUALS = ">=" + LESS_EQUALS = "<=" + + +@dataclass +class AstBinaryOp(AstExpression): + left: AstExpression + operator: BinaryOperatorType + right: AstExpression + + def __post_init__(self): + self.left = _as_logical(self.left) + self.right = _as_logical(self.right) + + def _to_agentspeak(self) -> str: + l_str = str(self.left) + r_str = str(self.right) + + assert isinstance(self.left, AstLogicalExpression) + assert isinstance(self.right, AstLogicalExpression) + + if isinstance(self.left.expression, AstBinaryOp) or self.left.negated: + l_str = f"({l_str})" + if isinstance(self.right.expression, AstBinaryOp) or self.right.negated: + r_str = f"({r_str})" + + return f"{l_str} {self.operator.value} {r_str}" + + +@dataclass +class AstLogicalExpression(AstExpression): + expression: AstExpression + negated: bool = False + + def _to_agentspeak(self) -> str: + expr_str = str(self.expression) + if isinstance(self.expression, AstBinaryOp) and self.negated: + expr_str = f"({expr_str})" + return f"{'not ' if self.negated else ''}{expr_str}" + + +def _as_logical(expr: AstExpression) -> AstLogicalExpression: + if isinstance(expr, AstLogicalExpression): + return expr + return AstLogicalExpression(expr) + + +class StatementType(StrEnum): + EMPTY = "" + DO_ACTION = "." + ACHIEVE_GOAL = "!" + # TEST_GOAL = "?" # TODO + ADD_BELIEF = "+" + REMOVE_BELIEF = "-" + + +@dataclass +class AstStatement(AstNode): + """ + A statement that can appear inside a plan. + """ + + type: StatementType + expression: AstExpression + + def _to_agentspeak(self) -> str: + return f"{self.type.value}{self.expression}" + + +@dataclass +class AstRule(AstNode): + result: AstExpression + condition: AstExpression | None = None + + def __post_init__(self): + if self.condition is not None: + self.condition = _as_logical(self.condition) + + def _to_agentspeak(self) -> str: + if not self.condition: + return f"{self.result}." + return f"{self.result} :- {self.condition}." + + +class TriggerType(StrEnum): + ADDED_BELIEF = "+" + # REMOVED_BELIEF = "-" # TODO + # MODIFIED_BELIEF = "^" # TODO + ADDED_GOAL = "+!" + # REMOVED_GOAL = "-!" # TODO + + +@dataclass +class AstPlan(AstNode): + type: TriggerType + trigger_literal: AstExpression + context: list[AstExpression] + body: list[AstStatement] + + def _to_agentspeak(self) -> str: + assert isinstance(self.trigger_literal, AstLiteral) + + indent = " " * 6 + colon = " : " + arrow = " <- " + + lines = [] + + lines.append(f"{self.type.value}{self.trigger_literal}") + + if self.context: + lines.append(colon + f" &\n{indent}".join(str(c) for c in self.context)) + + if self.body: + lines.append(arrow + f";\n{indent}".join(str(s) for s in self.body) + ".") + + lines.append("") + + return "\n".join(lines) + + +@dataclass +class AstProgram(AstNode): + rules: list[AstRule] = field(default_factory=list) + plans: list[AstPlan] = field(default_factory=list) + + def _to_agentspeak(self) -> str: + lines = [] + lines.extend(map(str, self.rules)) + + lines.extend(["", ""]) + lines.extend(map(str, self.plans)) + + return "\n".join(lines) diff --git a/src/control_backend/agents/bdi/bdi_core_agent.py b/src/control_backend/agents/bdi/bdi_core_agent.py index f056e09..8ff271c 100644 --- a/src/control_backend/agents/bdi/bdi_core_agent.py +++ b/src/control_backend/agents/bdi/bdi_core_agent.py @@ -89,9 +89,9 @@ class BDICoreAgent(BaseAgent): the agent has deferred intentions (deadlines). """ while self._running: - await ( - self._wake_bdi_loop.wait() - ) # gets set whenever there's an update to the belief base + # await ( + # self._wake_bdi_loop.wait() + # ) # gets set whenever there's an update to the belief base # Agent knows when it's expected to have to do its next thing maybe_more_work = True @@ -160,7 +160,7 @@ class BDICoreAgent(BaseAgent): self._remove_all_with_name(belief.name) self._add_belief(belief.name, belief.arguments) - def _add_belief(self, name: str, args: Iterable[str] = []): + def _add_belief(self, name: str, args: list[str] = None): """ Add a single belief to the BDI agent. @@ -168,9 +168,13 @@ class BDICoreAgent(BaseAgent): :param args: Arguments for the belief. """ # new_args = (agentspeak.Literal(arg) for arg in args) # TODO: Eventually support multiple - merged_args = DELIMITER.join(arg for arg in args) - new_args = (agentspeak.Literal(merged_args),) - term = agentspeak.Literal(name, new_args) + args = args or [] + if args: + merged_args = DELIMITER.join(arg for arg in args) + new_args = (agentspeak.Literal(merged_args),) + term = agentspeak.Literal(name, new_args) + else: + term = agentspeak.Literal(name) self.bdi_agent.call( agentspeak.Trigger.addition, @@ -238,8 +242,7 @@ class BDICoreAgent(BaseAgent): @self.actions.add(".reply", 3) def _reply(agent: "BDICoreAgent", term, intention): """ - Sends text to the LLM (AgentSpeak action). - Example: .reply("Hello LLM!", "Some norm", "Some goal") + Let the LLM generate a response to a user's utterance with the current norms and goals. """ message_text = agentspeak.grounded(term.args[0], intention.scope) norms = agentspeak.grounded(term.args[1], intention.scope) @@ -252,15 +255,71 @@ class BDICoreAgent(BaseAgent): asyncio.create_task(self._send_to_llm(str(message_text), str(norms), str(goals))) yield - async def _send_to_llm(self, text: str, norms: str = None, goals: str = None): + @self.actions.add(".reply_with_goal", 3) + def _reply_with_goal(agent: "BDICoreAgent", term, intention): + """ + Let the LLM generate a response to a user's utterance with the current norms and a + specific goal. + """ + message_text = agentspeak.grounded(term.args[0], intention.scope) + norms = agentspeak.grounded(term.args[1], intention.scope) + goal = agentspeak.grounded(term.args[2], intention.scope) + + self.logger.debug( + '"reply_with_goal" action called with message=%s, norms=%s, goal=%s', + message_text, + norms, + goal, + ) + # asyncio.create_task(self._send_to_llm(str(message_text), norms, str(goal))) + yield + + @self.actions.add(".say", 1) + def _say(agent: "BDICoreAgent", term, intention): + """ + Make the robot say the given text instantly. + """ + message_text = agentspeak.grounded(term.args[0], intention.scope) + + self.logger.debug('"say" action called with text=%s', message_text) + + # speech_command = SpeechCommand(data=message_text) + # speech_message = InternalMessage( + # to=settings.agent_settings.robot_speech_name, + # sender=settings.agent_settings.bdi_core_name, + # body=speech_command.model_dump_json(), + # ) + # asyncio.create_task(agent.send(speech_message)) + yield + + @self.actions.add(".gesture", 2) + def _gesture(agent: "BDICoreAgent", term, intention): + """ + Make the robot perform the given gesture instantly. + """ + gesture_type = agentspeak.grounded(term.args[0], intention.scope) + gesture_name = agentspeak.grounded(term.args[1], intention.scope) + + self.logger.debug( + '"gesture" action called with type=%s, name=%s', + gesture_type, + gesture_name, + ) + + # gesture = Gesture(type=gesture_type, name=gesture_name) + # gesture_message = InternalMessage( + # to=settings.agent_settings.robot_gesture_name, + # sender=settings.agent_settings.bdi_core_name, + # body=gesture.model_dump_json(), + # ) + # asyncio.create_task(agent.send(gesture_message)) + yield + + async def _send_to_llm(self, text: str, norms: str, goals: str): """ Sends a text query to the LLM agent asynchronously. """ - prompt = LLMPromptMessage( - text=text, - norms=norms.split("\n") if norms else [], - goals=goals.split("\n") if norms else [], - ) + prompt = LLMPromptMessage(text=text, norms=norms.split("\n"), goals=goals.split("\n")) msg = InternalMessage( to=settings.agent_settings.llm_name, sender=self.name, diff --git a/src/control_backend/agents/bdi/bdi_program_manager.py b/src/control_backend/agents/bdi/bdi_program_manager.py index 83dea93..9925cfb 100644 --- a/src/control_backend/agents/bdi/bdi_program_manager.py +++ b/src/control_backend/agents/bdi/bdi_program_manager.py @@ -1,12 +1,598 @@ +import uuid +from collections.abc import Iterable + import zmq from pydantic import ValidationError +from slugify import slugify from zmq.asyncio import Context from control_backend.agents import BaseAgent -from control_backend.core.agent_system import InternalMessage from control_backend.core.config import settings -from control_backend.schemas.belief_message import Belief, BeliefMessage -from control_backend.schemas.program import Program +from control_backend.schemas.program import ( + Action, + BasicBelief, + BasicNorm, + Belief, + ConditionalNorm, + GestureAction, + Goal, + InferredBelief, + KeywordBelief, + LLMAction, + LogicalOperator, + Phase, + Plan, + Program, + ProgramElement, + SemanticBelief, + SpeechAction, + Trigger, +) + +test_program = Program( + phases=[ + Phase( + norms=[ + BasicNorm(norm="Talk like a pirate", id=uuid.uuid4()), + ConditionalNorm( + condition=InferredBelief( + left=KeywordBelief(keyword="Arr", id=uuid.uuid4()), + right=SemanticBelief( + description="testing", name="semantic belief", id=uuid.uuid4() + ), + operator=LogicalOperator.OR, + name="Talking to a pirate", + id=uuid.uuid4(), + ), + norm="Use nautical terms", + id=uuid.uuid4(), + ), + ConditionalNorm( + condition=SemanticBelief( + description="We are talking to a child", + name="talking to child", + id=uuid.uuid4(), + ), + norm="Do not use cuss words", + id=uuid.uuid4(), + ), + ], + triggers=[ + Trigger( + condition=InferredBelief( + left=KeywordBelief(keyword="key", id=uuid.uuid4()), + right=InferredBelief( + left=KeywordBelief(keyword="key2", id=uuid.uuid4()), + right=SemanticBelief( + description="Decode this", name="semantic belief 2", id=uuid.uuid4() + ), + operator=LogicalOperator.OR, + name="test trigger inferred inner", + id=uuid.uuid4(), + ), + operator=LogicalOperator.OR, + name="test trigger inferred outer", + id=uuid.uuid4(), + ), + plan=Plan( + steps=[ + SpeechAction(text="Testing trigger", id=uuid.uuid4()), + Goal( + name="Testing trigger", + plan=Plan( + steps=[LLMAction(goal="Do something", id=uuid.uuid4())], + id=uuid.uuid4(), + ), + id=uuid.uuid4(), + ), + ], + id=uuid.uuid4(), + ), + id=uuid.uuid4(), + ) + ], + goals=[ + Goal( + name="Determine user age", + plan=Plan( + steps=[LLMAction(goal="Determine the age of the user.", id=uuid.uuid4())], + id=uuid.uuid4(), + ), + id=uuid.uuid4(), + ), + Goal( + name="Find the user's name", + plan=Plan( + steps=[ + Goal( + name="Greet the user", + plan=Plan( + steps=[LLMAction(goal="Greet the user.", id=uuid.uuid4())], + id=uuid.uuid4(), + ), + can_fail=False, + id=uuid.uuid4(), + ), + Goal( + name="Ask for name", + plan=Plan( + steps=[ + LLMAction(goal="Obtain the user's name.", id=uuid.uuid4()) + ], + id=uuid.uuid4(), + ), + id=uuid.uuid4(), + ), + ], + id=uuid.uuid4(), + ), + id=uuid.uuid4(), + ), + Goal( + name="Tell a joke", + plan=Plan( + steps=[LLMAction(goal="Tell a joke.", id=uuid.uuid4())], id=uuid.uuid4() + ), + id=uuid.uuid4(), + ), + ], + id=uuid.uuid4(), + ), + Phase( + id=uuid.uuid4(), + norms=[ + BasicNorm(norm="Use very gentle speech.", id=uuid.uuid4()), + ConditionalNorm( + condition=SemanticBelief( + description="We are talking to a child", + name="talking to child", + id=uuid.uuid4(), + ), + norm="Do not use cuss words", + id=uuid.uuid4(), + ), + ], + triggers=[ + Trigger( + condition=InferredBelief( + left=KeywordBelief(keyword="help", id=uuid.uuid4()), + right=SemanticBelief( + description="User is stuck", name="stuck", id=uuid.uuid4() + ), + operator=LogicalOperator.OR, + name="help_or_stuck", + id=uuid.uuid4(), + ), + plan=Plan( + steps=[ + Goal( + name="Unblock user", + plan=Plan( + steps=[ + LLMAction( + goal="Provide a step-by-step path to " + "resolve the user's issue.", + id=uuid.uuid4(), + ) + ], + id=uuid.uuid4(), + ), + id=uuid.uuid4(), + ), + ], + id=uuid.uuid4(), + ), + id=uuid.uuid4(), + ), + ], + goals=[ + Goal( + name="Clarify intent", + plan=Plan( + steps=[ + LLMAction( + goal="Ask 1-2 targeted questions to clarify the " + "user's intent, then proceed.", + id=uuid.uuid4(), + ) + ], + id=uuid.uuid4(), + ), + id=uuid.uuid4(), + ), + Goal( + name="Provide solution", + plan=Plan( + steps=[ + LLMAction( + goal="Deliver a solution to complete the user's goal.", + id=uuid.uuid4(), + ) + ], + id=uuid.uuid4(), + ), + id=uuid.uuid4(), + ), + Goal( + name="Summarize next steps", + plan=Plan( + steps=[ + LLMAction( + goal="Summarize what the user should do next.", id=uuid.uuid4() + ) + ], + id=uuid.uuid4(), + ), + id=uuid.uuid4(), + ), + ], + ), + ] +) + + +def do_things(): + print(AgentSpeakGenerator().generate(test_program)) + + +class AgentSpeakGenerator: + """ + Converts Pydantic representation of behavior programs into AgentSpeak(L) code string. + """ + + arrow_prefix = f"{' ' * 2}<-{' ' * 2}" + colon_prefix = f"{' ' * 2}:{' ' * 3}" + indent_prefix = " " * 6 + + def generate(self, program: Program) -> str: + lines = [] + lines.append("") + + lines += self._generate_initial_beliefs(program) + + lines += self._generate_basic_flow(program) + + lines += self._generate_phase_transitions(program) + + lines += self._generate_norms(program) + + lines += self._generate_belief_inference(program) + + lines += self._generate_goals(program) + + lines += self._generate_triggers(program) + + return "\n".join(lines) + + def _generate_initial_beliefs(self, program: Program) -> Iterable[str]: + yield "// --- Initial beliefs and agent startup ---" + + yield "phase(start)." + + yield "" + + yield "+started" + yield f"{self.colon_prefix}phase(start)" + yield f"{self.arrow_prefix}phase({program.phases[0].id if program.phases else 'end'})." + + yield from ["", ""] + + def _generate_basic_flow(self, program: Program) -> Iterable[str]: + yield "// --- Basic flow ---" + + for phase in program.phases: + yield from self._generate_basic_flow_per_phase(phase) + + yield from ["", ""] + + def _generate_basic_flow_per_phase(self, phase: Phase) -> Iterable[str]: + yield "+user_said(Message)" + yield f"{self.colon_prefix}phase({phase.id})" + + goals = phase.goals + if goals: + yield f"{self.arrow_prefix}{self._slugify(goals[0], include_prefix=True)}" + for goal in goals[1:]: + yield f"{self.indent_prefix}{self._slugify(goal, include_prefix=True)}" + + yield f"{self.indent_prefix if goals else self.arrow_prefix}!transition_phase." + + def _generate_phase_transitions(self, program: Program) -> Iterable[str]: + yield "// --- Phase transitions ---" + + if len(program.phases) == 0: + yield from ["", ""] + return + + # TODO: remove outdated things + + for i in range(-1, len(program.phases)): + predecessor = program.phases[i] if i >= 0 else None + successor = program.phases[i + 1] if i < len(program.phases) - 1 else None + yield from self._generate_phase_transition(predecessor, successor) + + yield from self._generate_phase_transition(None, None) # to avoid failing plan + + yield from ["", ""] + + def _generate_phase_transition( + self, phase: Phase | None = None, next_phase: Phase | None = None + ) -> Iterable[str]: + yield "+!transition_phase" + + if phase is None and next_phase is None: # base case true to avoid failing plan + yield f"{self.arrow_prefix}true." + return + + yield f"{self.colon_prefix}phase({phase.id if phase else 'start'})" + yield f"{self.arrow_prefix}-+phase({next_phase.id if next_phase else 'end'})." + + def _generate_norms(self, program: Program) -> Iterable[str]: + yield "// --- Norms ---" + + for phase in program.phases: + for norm in phase.norms: + if type(norm) is BasicNorm: + yield f"{self._slugify(norm)} :- phase({phase.id})." + if type(norm) is ConditionalNorm: + yield ( + f"{self._slugify(norm)} :- phase({phase.id}) & " + f"{self._slugify(norm.condition)}." + ) + + yield from ["", ""] + + def _generate_belief_inference(self, program: Program) -> Iterable[str]: + yield "// --- Belief inference rules ---" + + for phase in program.phases: + for norm in phase.norms: + if not isinstance(norm, ConditionalNorm): + continue + + yield from self._belief_inference_recursive(norm.condition) + + for trigger in phase.triggers: + yield from self._belief_inference_recursive(trigger.condition) + + yield from ["", ""] + + def _belief_inference_recursive(self, belief: Belief) -> Iterable[str]: + if type(belief) is KeywordBelief: + yield ( + f"{self._slugify(belief)} :- user_said(Message) & " + f'.substring(Message, "{belief.keyword}", Pos) & Pos >= 0.' + ) + if type(belief) is InferredBelief: + yield ( + f"{self._slugify(belief)} :- {self._slugify(belief.left)} " + f"{'&' if belief.operator == LogicalOperator.AND else '|'} " + f"{self._slugify(belief.right)}." + ) + + yield from self._belief_inference_recursive(belief.left) + yield from self._belief_inference_recursive(belief.right) + + def _generate_goals(self, program: Program) -> Iterable[str]: + yield "// --- Goals ---" + + for phase in program.phases: + previous_goal: Goal | None = None + for goal in phase.goals: + yield from self._generate_goal_plan_recursive(goal, phase, previous_goal) + previous_goal = goal + + yield from ["", ""] + + def _generate_goal_plan_recursive( + self, goal: Goal, phase: Phase, previous_goal: Goal | None = None + ) -> Iterable[str]: + yield f"+{self._slugify(goal, include_prefix=True)}" + + # Context + yield f"{self.colon_prefix}phase({phase.id}) &" + yield f"{self.indent_prefix}not responded_this_turn &" + yield f"{self.indent_prefix}not achieved_{self._slugify(goal)} &" + if previous_goal: + yield f"{self.indent_prefix}achieved_{self._slugify(previous_goal)}" + else: + yield f"{self.indent_prefix}true" + + extra_goals_to_generate = [] + + steps = goal.plan.steps + + if len(steps) == 0: + yield f"{self.arrow_prefix}true." + return + + first_step = steps[0] + yield ( + f"{self.arrow_prefix}{self._slugify(first_step, include_prefix=True)}" + f"{'.' if len(steps) == 1 and goal.can_fail else ';'}" + ) + if isinstance(first_step, Goal): + extra_goals_to_generate.append(first_step) + + for step in steps[1:-1]: + yield f"{self.indent_prefix}{self._slugify(step, include_prefix=True)};" + if isinstance(step, Goal): + extra_goals_to_generate.append(step) + + if len(steps) > 1: + last_step = steps[-1] + yield ( + f"{self.indent_prefix}{self._slugify(last_step, include_prefix=True)}" + f"{'.' if goal.can_fail else ';'}" + ) + if isinstance(last_step, Goal): + extra_goals_to_generate.append(last_step) + + if not goal.can_fail: + yield f"{self.indent_prefix}+achieved_{self._slugify(goal)}." + + yield f"+{self._slugify(goal, include_prefix=True)}" + yield f"{self.arrow_prefix}true." + + yield "" + + extra_previous_goal: Goal | None = None + for extra_goal in extra_goals_to_generate: + yield from self._generate_goal_plan_recursive(extra_goal, phase, extra_previous_goal) + extra_previous_goal = extra_goal + + def _generate_triggers(self, program: Program) -> Iterable[str]: + yield "// --- Triggers ---" + + for phase in program.phases: + for trigger in phase.triggers: + yield from self._generate_trigger_plan(trigger, phase) + + yield from ["", ""] + + def _generate_trigger_plan(self, trigger: Trigger, phase: Phase) -> Iterable[str]: + belief_name = self._slugify(trigger.condition) + + yield f"+{belief_name}" + yield f"{self.colon_prefix}phase({phase.id})" + + extra_goals_to_generate = [] + + steps = trigger.plan.steps + + if len(steps) == 0: + yield f"{self.arrow_prefix}true." + return + + first_step = steps[0] + yield ( + f"{self.arrow_prefix}{self._slugify(first_step, include_prefix=True)}" + f"{'.' if len(steps) == 1 else ';'}" + ) + if isinstance(first_step, Goal): + extra_goals_to_generate.append(first_step) + + for step in steps[1:-1]: + yield f"{self.indent_prefix}{self._slugify(step, include_prefix=True)};" + if isinstance(step, Goal): + extra_goals_to_generate.append(step) + + if len(steps) > 1: + last_step = steps[-1] + yield f"{self.indent_prefix}{self._slugify(last_step, include_prefix=True)}." + if isinstance(last_step, Goal): + extra_goals_to_generate.append(last_step) + + yield "" + + extra_previous_goal: Goal | None = None + for extra_goal in extra_goals_to_generate: + yield from self._generate_trigger_plan_recursive(extra_goal, phase, extra_previous_goal) + extra_previous_goal = extra_goal + + def _generate_trigger_plan_recursive( + self, goal: Goal, phase: Phase, previous_goal: Goal | None = None + ) -> Iterable[str]: + yield f"+{self._slugify(goal, include_prefix=True)}" + + extra_goals_to_generate = [] + + steps = goal.plan.steps + + if len(steps) == 0: + yield f"{self.arrow_prefix}true." + return + + first_step = steps[0] + yield ( + f"{self.arrow_prefix}{self._slugify(first_step, include_prefix=True)}" + f"{'.' if len(steps) == 1 and goal.can_fail else ';'}" + ) + if isinstance(first_step, Goal): + extra_goals_to_generate.append(first_step) + + for step in steps[1:-1]: + yield f"{self.indent_prefix}{self._slugify(step, include_prefix=True)};" + if isinstance(step, Goal): + extra_goals_to_generate.append(step) + + if len(steps) > 1: + last_step = steps[-1] + yield ( + f"{self.indent_prefix}{self._slugify(last_step, include_prefix=True)}" + f"{'.' if goal.can_fail else ';'}" + ) + if isinstance(last_step, Goal): + extra_goals_to_generate.append(last_step) + + if not goal.can_fail: + yield f"{self.indent_prefix}+achieved_{self._slugify(goal)}." + + yield f"+{self._slugify(goal, include_prefix=True)}" + yield f"{self.arrow_prefix}true." + + yield "" + + extra_previous_goal: Goal | None = None + for extra_goal in extra_goals_to_generate: + yield from self._generate_goal_plan_recursive(extra_goal, phase, extra_previous_goal) + extra_previous_goal = extra_goal + + def _slugify(self, element: ProgramElement, include_prefix: bool = False) -> str: + def base_slugify_call(text: str): + return slugify(text, separator="_", stopwords=["a", "the"]) + + if type(element) is KeywordBelief: + return f'keyword_said("{element.keyword}")' + + if type(element) is SemanticBelief: + name = element.name + return f"semantic_{base_slugify_call(name if name else element.description)}" + + if isinstance(element, BasicNorm): + return f'norm("{element.norm}")' + + if isinstance(element, Goal): + return f"{'!' if include_prefix else ''}{base_slugify_call(element.name)}" + + if isinstance(element, SpeechAction): + return f'.say("{element.text}")' + + if isinstance(element, GestureAction): + return f'.gesture("{element.gesture}")' + + if isinstance(element, LLMAction): + return f'!generate_response_with_goal("{element.goal}")' + + if isinstance(element, Action.__value__): + raise NotImplementedError( + "Have not implemented an ASL string representation for this action." + ) + + if element.name == "": + raise ValueError("Name must be initialized for this type of ProgramElement.") + + return base_slugify_call(element.name) + + def _extract_basic_beliefs_from_program(self, program: Program) -> list[BasicBelief]: + beliefs = [] + + for phase in program.phases: + for norm in phase.norms: + if isinstance(norm, ConditionalNorm): + beliefs += self._extract_basic_beliefs_from_belief(norm.condition) + + for trigger in phase.triggers: + beliefs += self._extract_basic_beliefs_from_belief(trigger.condition) + + return beliefs + + def _extract_basic_beliefs_from_belief(self, belief: Belief) -> list[BasicBelief]: + if isinstance(belief, InferredBelief): + return self._extract_basic_beliefs_from_belief( + belief.left + ) + self._extract_basic_beliefs_from_belief(belief.right) + return [belief] class BDIProgramManager(BaseAgent): @@ -25,40 +611,40 @@ class BDIProgramManager(BaseAgent): super().__init__(**kwargs) self.sub_socket = None - async def _send_to_bdi(self, program: Program): - """ - Convert a received program into BDI beliefs and send them to the BDI Core Agent. - - Currently, it takes the **first phase** of the program and extracts: - - **Norms**: Constraints or rules the agent must follow. - - **Goals**: Objectives the agent must achieve. - - These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will - overwrite any existing norms/goals of the same name in the BDI agent. - - :param program: The program object received from the API. - """ - first_phase = program.phases[0] - norms_belief = Belief( - name="norms", - arguments=[norm.norm for norm in first_phase.norms], - replace=True, - ) - goals_belief = Belief( - name="goals", - arguments=[goal.description for goal in first_phase.goals], - replace=True, - ) - program_beliefs = BeliefMessage(beliefs=[norms_belief, goals_belief]) - - message = InternalMessage( - to=settings.agent_settings.bdi_core_name, - sender=self.name, - body=program_beliefs.model_dump_json(), - thread="beliefs", - ) - await self.send(message) - self.logger.debug("Sent new norms and goals to the BDI agent.") + # async def _send_to_bdi(self, program: Program): + # """ + # Convert a received program into BDI beliefs and send them to the BDI Core Agent. + # + # Currently, it takes the **first phase** of the program and extracts: + # - **Norms**: Constraints or rules the agent must follow. + # - **Goals**: Objectives the agent must achieve. + # + # These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will + # overwrite any existing norms/goals of the same name in the BDI agent. + # + # :param program: The program object received from the API. + # """ + # first_phase = program.phases[0] + # norms_belief = Belief( + # name="norms", + # arguments=[norm.norm for norm in first_phase.norms], + # replace=True, + # ) + # goals_belief = Belief( + # name="goals", + # arguments=[goal.description for goal in first_phase.goals], + # replace=True, + # ) + # program_beliefs = BeliefMessage(beliefs=[norms_belief, goals_belief]) + # + # message = InternalMessage( + # to=settings.agent_settings.bdi_core_name, + # sender=self.name, + # body=program_beliefs.model_dump_json(), + # thread="beliefs", + # ) + # await self.send(message) + # self.logger.debug("Sent new norms and goals to the BDI agent.") async def _receive_programs(self): """ @@ -92,3 +678,7 @@ class BDIProgramManager(BaseAgent): self.sub_socket.subscribe("program") self.add_behavior(self._receive_programs()) + + +if __name__ == "__main__": + do_things() diff --git a/src/control_backend/agents/bdi/gen.py b/src/control_backend/agents/bdi/gen.py new file mode 100644 index 0000000..e69de29 diff --git a/src/control_backend/agents/bdi/test.asl b/src/control_backend/agents/bdi/test.asl new file mode 100644 index 0000000..e69de29 diff --git a/src/control_backend/schemas/program.py b/src/control_backend/schemas/program.py index 529a23d..5a8caa9 100644 --- a/src/control_backend/schemas/program.py +++ b/src/control_backend/schemas/program.py @@ -64,10 +64,13 @@ class InferredBelief(ProgramElement): right: Belief -type Norm = BasicNorm | ConditionalNorm +class Norm(ProgramElement): + name: str = "" + norm: str + critical: bool = False -class BasicNorm(ProgramElement): +class BasicNorm(Norm): """ Represents a behavioral norm. @@ -75,12 +78,10 @@ class BasicNorm(ProgramElement): :ivar critical: When true, this norm should absolutely not be violated (checked separately). """ - name: str = "" - norm: str - critical: bool = False + pass -class ConditionalNorm(BasicNorm): +class ConditionalNorm(Norm): """ Represents a norm that is only active when a condition is met (i.e., a certain belief holds). diff --git a/uv.lock b/uv.lock index ff4b8a7..ce46ceb 100644 --- a/uv.lock +++ b/uv.lock @@ -997,6 +997,7 @@ dependencies = [ { name = "pydantic" }, { name = "pydantic-settings" }, { name = "python-json-logger" }, + { name = "python-slugify" }, { name = "pyyaml" }, { name = "pyzmq" }, { name = "silero-vad" }, @@ -1046,6 +1047,7 @@ requires-dist = [ { name = "pydantic", specifier = ">=2.12.0" }, { name = "pydantic-settings", specifier = ">=2.11.0" }, { name = "python-json-logger", specifier = ">=4.0.0" }, + { name = "python-slugify", specifier = ">=8.0.4" }, { name = "pyyaml", specifier = ">=6.0.3" }, { name = "pyzmq", specifier = ">=27.1.0" }, { name = "silero-vad", specifier = ">=6.0.0" }, @@ -1341,6 +1343,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/45/58/38b5afbc1a800eeea951b9285d3912613f2603bdf897a4ab0f4bd7f405fc/python_multipart-0.0.20-py3-none-any.whl", hash = "sha256:8a62d3a8335e06589fe01f2a3e178cdcc632f3fbe0d492ad9ee0ec35aab1f104", size = 24546, upload-time = "2024-12-16T19:45:44.423Z" }, ] +[[package]] +name = "python-slugify" +version = "8.0.4" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "text-unidecode" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/87/c7/5e1547c44e31da50a460df93af11a535ace568ef89d7a811069ead340c4a/python-slugify-8.0.4.tar.gz", hash = "sha256:59202371d1d05b54a9e7720c5e038f928f45daaffe41dd10822f3907b937c856", size = 10921, upload-time = "2024-02-08T18:32:45.488Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a4/62/02da182e544a51a5c3ccf4b03ab79df279f9c60c5e82d5e8bec7ca26ac11/python_slugify-8.0.4-py2.py3-none-any.whl", hash = "sha256:276540b79961052b66b7d116620b36518847f52d5fd9e3a70164fc8c50faa6b8", size = 10051, upload-time = "2024-02-08T18:32:43.911Z" }, +] + [[package]] name = "pyyaml" version = "6.0.3" @@ -1864,6 +1878,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a2/09/77d55d46fd61b4a135c444fc97158ef34a095e5681d0a6c10b75bf356191/sympy-1.14.0-py3-none-any.whl", hash = "sha256:e091cc3e99d2141a0ba2847328f5479b05d94a6635cb96148ccb3f34671bd8f5", size = 6299353, upload-time = "2025-04-27T18:04:59.103Z" }, ] +[[package]] +name = "text-unidecode" +version = "1.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ab/e2/e9a00f0ccb71718418230718b3d900e71a5d16e701a3dae079a21e9cd8f8/text-unidecode-1.3.tar.gz", hash = "sha256:bad6603bb14d279193107714b288be206cac565dfa49aa5b105294dd5c4aab93", size = 76885, upload-time = "2019-08-30T21:36:45.405Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a6/a5/c0b6468d3824fe3fde30dbb5e1f687b291608f9473681bbf7dabbf5a87d7/text_unidecode-1.3-py2.py3-none-any.whl", hash = "sha256:1311f10e8b895935241623731c2ba64f4c455287888b18189350b67134a822e8", size = 78154, upload-time = "2019-08-30T21:37:03.543Z" }, +] + [[package]] name = "tiktoken" version = "0.12.0"