diff --git a/src/control_backend/agents/bdi/bdi_program_manager.py b/src/control_backend/agents/bdi/bdi_program_manager.py index 2353fcb..0eae52a 100644 --- a/src/control_backend/agents/bdi/bdi_program_manager.py +++ b/src/control_backend/agents/bdi/bdi_program_manager.py @@ -1,3 +1,5 @@ +from collections.abc import Iterable + import zmq from pydantic import ValidationError from slugify import slugify @@ -187,109 +189,94 @@ class AgentSpeakGenerator: return "\n".join(lines) - def _generate_initial_beliefs(self, program: Program) -> list[str]: - lines = [] - lines.append("// --- Initial beliefs ---") + def _generate_initial_beliefs(self, program: Program) -> Iterable[str]: + yield "// --- Initial beliefs ---" - lines.append(f"phase({program.phases[0].id}).") + yield f"phase({program.phases[0].id})." - lines += ["", ""] + yield from ["", ""] - return lines - - def _generate_norms(self, program: Program) -> list[str]: - lines = [] - lines.append("// --- Norms ---") + def _generate_norms(self, program: Program) -> Iterable[str]: + yield "// --- Norms ---" for phase in program.phases: for norm in phase.norms: if type(norm) is BasicNorm: - lines.append(f"{self._slugify(norm)} :- phase({phase.id}).") + yield f"{self._slugify(norm)} :- phase({phase.id})." if type(norm) is ConditionalNorm: - lines.append( + yield ( f"{self._slugify(norm)} :- phase({phase.id}) & " f"{self._slugify(norm.condition)}." ) - lines += ["", ""] + yield from ["", ""] - return lines - - def _generate_belief_inference(self, program: Program) -> list[str]: - lines = [] - lines.append("// --- Belief inference rules ---") + def _generate_belief_inference(self, program: Program) -> Iterable[str]: + yield "// --- Belief inference rules ---" for phase in program.phases: for norm in phase.norms: if not isinstance(norm, ConditionalNorm): continue - lines += self._belief_inference_recursive(norm.condition) + yield from self._belief_inference_recursive(norm.condition) for trigger in phase.triggers: - lines += self._belief_inference_recursive(trigger.condition) + yield from self._belief_inference_recursive(trigger.condition) - lines += ["", ""] - - return lines - - def _belief_inference_recursive(self, belief: Belief) -> list[str]: - lines = [] + yield from ["", ""] + def _belief_inference_recursive(self, belief: Belief) -> Iterable[str]: if type(belief) is KeywordBelief: - lines.append( + yield ( f"{self._slugify(belief)} :- user_said(Message) & " f'.substring(Message, "{belief.keyword}", Pos) & Pos >= 0.' ) if type(belief) is InferredBelief: - lines.append( + yield ( f"{self._slugify(belief)} :- {self._slugify(belief.left)} " f"{'&' if belief.operator == LogicalOperator.AND else '|'} " f"{self._slugify(belief.right)}." ) - lines += self._belief_inference_recursive(belief.left) - lines += self._belief_inference_recursive(belief.right) - return lines + yield from self._belief_inference_recursive(belief.left) + yield from self._belief_inference_recursive(belief.right) - def _generate_goals(self, program: Program) -> list[str]: - lines = [] - lines.append("// --- Goals ---") + def _generate_goals(self, program: Program) -> Iterable[str]: + yield "// --- Goals ---" for phase in program.phases: previous_goal: Goal | None = None for goal in phase.goals: - lines += self._generate_goal_plan_recursive(goal, phase, previous_goal) + yield from self._generate_goal_plan_recursive(goal, phase, previous_goal) previous_goal = goal - lines += ["", ""] - return lines + yield from ["", ""] def _generate_goal_plan_recursive( self, goal: Goal, phase: Phase, previous_goal: Goal | None = None - ) -> list[str]: - lines = [] - lines.append(f"+{self._slugify(goal, include_prefix=True)}") + ) -> Iterable[str]: + yield f"+{self._slugify(goal, include_prefix=True)}" # Context - lines.append(f"{' ' * 2}:{' ' * 3}phase({phase.id}) &") - lines.append(f"{' ' * 6}not responded_this_turn &") - lines.append(f"{' ' * 6}not achieved_{self._slugify(goal)} &") + yield f"{' ' * 2}:{' ' * 3}phase({phase.id}) &" + yield f"{' ' * 6}not responded_this_turn &" + yield f"{' ' * 6}not achieved_{self._slugify(goal)} &" if previous_goal: - lines.append(f"{' ' * 6}achieved_{self._slugify(previous_goal)}") + yield f"{' ' * 6}achieved_{self._slugify(previous_goal)}" else: - lines.append(f"{' ' * 6}true") + yield f"{' ' * 6}true" extra_goals_to_generate = [] steps = goal.plan.steps if len(steps) == 0: - lines.append(f"{' ' * 2}<-{' ' * 2}true.") - return lines + yield f"{' ' * 2}<-{' ' * 2}true." + return first_step = steps[0] - lines.append( + yield ( f"{' ' * 2}<-{' ' * 2}{self._slugify(first_step, include_prefix=True)}" f"{'.' if len(steps) == 1 and goal.can_fail else ';'}" ) @@ -297,13 +284,13 @@ class AgentSpeakGenerator: extra_goals_to_generate.append(first_step) for step in steps[1:-1]: - lines.append(f"{' ' * 6}{self._slugify(step, include_prefix=True)};") + yield f"{' ' * 6}{self._slugify(step, include_prefix=True)};" if isinstance(step, Goal): extra_goals_to_generate.append(step) if len(steps) > 1: last_step = steps[-1] - lines.append( + yield ( f"{' ' * 6}{self._slugify(last_step, include_prefix=True)}" f"{'.' if goal.can_fail else ';'}" ) @@ -311,46 +298,40 @@ class AgentSpeakGenerator: extra_goals_to_generate.append(last_step) if not goal.can_fail: - lines.append(f"{' ' * 6}+achieved_{self._slugify(goal)}.") + yield f"{' ' * 6}+achieved_{self._slugify(goal)}." - lines.append("") + yield "" extra_previous_goal: Goal | None = None for extra_goal in extra_goals_to_generate: - lines += self._generate_goal_plan_recursive(extra_goal, phase, extra_previous_goal) + yield from self._generate_goal_plan_recursive(extra_goal, phase, extra_previous_goal) extra_previous_goal = extra_goal - return lines - - def _generate_triggers(self, program: Program) -> list[str]: - lines = [] - lines.append("// --- Triggers ---") + def _generate_triggers(self, program: Program) -> Iterable[str]: + yield "// --- Triggers ---" for phase in program.phases: for trigger in phase.triggers: - lines += self._generate_trigger_plan(trigger, phase) + yield from self._generate_trigger_plan(trigger, phase) - lines += ["", ""] - return lines - - def _generate_trigger_plan(self, trigger: Trigger, phase: Phase) -> list[str]: - lines = [] + yield from ["", ""] + def _generate_trigger_plan(self, trigger: Trigger, phase: Phase) -> Iterable[str]: belief_name = self._slugify(trigger.condition) - lines.append(f"+{belief_name}") - lines.append(f"{' ' * 2}:{' ' * 3}phase({phase.id})") + yield f"+{belief_name}" + yield f"{' ' * 2}:{' ' * 3}phase({phase.id})" extra_goals_to_generate = [] steps = trigger.plan.steps if len(steps) == 0: - lines.append(f"{' ' * 2}<-{' ' * 2}true.") - return lines + yield f"{' ' * 2}<-{' ' * 2}true." + return first_step = steps[0] - lines.append( + yield ( f"{' ' * 2}<-{' ' * 2}{self._slugify(first_step, include_prefix=True)}" f"{'.' if len(steps) == 1 else ';'}" ) @@ -358,41 +339,38 @@ class AgentSpeakGenerator: extra_goals_to_generate.append(first_step) for step in steps[1:-1]: - lines.append(f"{' ' * 6}{self._slugify(step, include_prefix=True)};") + yield f"{' ' * 6}{self._slugify(step, include_prefix=True)};" if isinstance(step, Goal): extra_goals_to_generate.append(step) if len(steps) > 1: last_step = steps[-1] - lines.append(f"{' ' * 6}{self._slugify(last_step, include_prefix=True)}.") + yield f"{' ' * 6}{self._slugify(last_step, include_prefix=True)}." if isinstance(last_step, Goal): extra_goals_to_generate.append(last_step) - lines.append("") + yield "" extra_previous_goal: Goal | None = None for extra_goal in extra_goals_to_generate: - lines += self._generate_trigger_plan_recursive(extra_goal, phase, extra_previous_goal) + yield from self._generate_trigger_plan_recursive(extra_goal, phase, extra_previous_goal) extra_previous_goal = extra_goal - return lines - def _generate_trigger_plan_recursive( self, goal: Goal, phase: Phase, previous_goal: Goal | None = None - ) -> list[str]: - lines = [] - lines.append(f"+{self._slugify(goal, include_prefix=True)}") + ) -> Iterable[str]: + yield f"+{self._slugify(goal, include_prefix=True)}" extra_goals_to_generate = [] steps = goal.plan.steps if len(steps) == 0: - lines.append(f"{' ' * 2}<-{' ' * 2}true.") - return lines + yield f"{' ' * 2}<-{' ' * 2}true." + return first_step = steps[0] - lines.append( + yield ( f"{' ' * 2}<-{' ' * 2}{self._slugify(first_step, include_prefix=True)}" f"{'.' if len(steps) == 1 and goal.can_fail else ';'}" ) @@ -400,13 +378,13 @@ class AgentSpeakGenerator: extra_goals_to_generate.append(first_step) for step in steps[1:-1]: - lines.append(f"{' ' * 6}{self._slugify(step, include_prefix=True)};") + yield f"{' ' * 6}{self._slugify(step, include_prefix=True)};" if isinstance(step, Goal): extra_goals_to_generate.append(step) if len(steps) > 1: last_step = steps[-1] - lines.append( + yield ( f"{' ' * 6}{self._slugify(last_step, include_prefix=True)}" f"{'.' if goal.can_fail else ';'}" ) @@ -414,17 +392,15 @@ class AgentSpeakGenerator: extra_goals_to_generate.append(last_step) if not goal.can_fail: - lines.append(f"{' ' * 6}+achieved_{self._slugify(goal)}.") + yield f"{' ' * 6}+achieved_{self._slugify(goal)}." - lines.append("") + yield "" extra_previous_goal: Goal | None = None for extra_goal in extra_goals_to_generate: - lines += self._generate_goal_plan_recursive(extra_goal, phase, extra_previous_goal) + yield from self._generate_goal_plan_recursive(extra_goal, phase, extra_previous_goal) extra_previous_goal = extra_goal - return lines - def _slugify(self, element: ProgramElement, include_prefix: bool = False) -> str: def base_slugify_call(text: str): return slugify(text, separator="_", stopwords=["a", "the"])