95 lines
3.4 KiB
Python
95 lines
3.4 KiB
Python
import zmq
|
|
from pydantic import ValidationError
|
|
from zmq.asyncio import Context
|
|
|
|
from control_backend.agents import BaseAgent
|
|
from control_backend.core.agent_system import InternalMessage
|
|
from control_backend.core.config import settings
|
|
from control_backend.schemas.belief_message import Belief, BeliefMessage
|
|
from control_backend.schemas.program import Program
|
|
|
|
|
|
class BDIProgramManager(BaseAgent):
|
|
"""
|
|
BDI Program Manager Agent.
|
|
|
|
This agent is responsible for receiving high-level programs (sequences of instructions/goals)
|
|
from the external HTTP API (via ZMQ) and translating them into core beliefs (norms and goals)
|
|
for the BDI Core Agent. In the future, it will be responsible for determining when goals are
|
|
met, and passing on new norms and goals accordingly.
|
|
|
|
:ivar sub_socket: The ZMQ SUB socket used to receive program updates.
|
|
"""
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.sub_socket = None
|
|
|
|
async def _send_to_bdi(self, program: Program):
|
|
"""
|
|
Convert a received program into BDI beliefs and send them to the BDI Core Agent.
|
|
|
|
Currently, it takes the **first phase** of the program and extracts:
|
|
- **Norms**: Constraints or rules the agent must follow.
|
|
- **Goals**: Objectives the agent must achieve.
|
|
|
|
These are sent as a ``BeliefMessage`` with ``replace=True``, meaning they will
|
|
overwrite any existing norms/goals of the same name in the BDI agent.
|
|
|
|
:param program: The program object received from the API.
|
|
"""
|
|
first_phase = program.phases[0]
|
|
norms_belief = Belief(
|
|
name="norms",
|
|
arguments=[norm.norm for norm in first_phase.norms],
|
|
replace=True,
|
|
)
|
|
goals_belief = Belief(
|
|
name="goals",
|
|
arguments=[goal.description for goal in first_phase.goals],
|
|
replace=True,
|
|
)
|
|
program_beliefs = BeliefMessage(beliefs=[norms_belief, goals_belief])
|
|
|
|
message = InternalMessage(
|
|
to=settings.agent_settings.bdi_core_name,
|
|
sender=self.name,
|
|
body=program_beliefs.model_dump_json(),
|
|
thread="beliefs",
|
|
)
|
|
await self.send(message)
|
|
self.logger.debug("Sent new norms and goals to the BDI agent.")
|
|
|
|
async def _receive_programs(self):
|
|
"""
|
|
Continuous loop that receives program updates from the HTTP endpoint.
|
|
|
|
It listens to the ``program`` topic on the internal ZMQ SUB socket.
|
|
When a program is received, it is validated and forwarded to BDI via :meth:`_send_to_bdi`.
|
|
"""
|
|
while True:
|
|
topic, body = await self.sub_socket.recv_multipart()
|
|
|
|
try:
|
|
program = Program.model_validate_json(body)
|
|
except ValidationError as e:
|
|
self.logger.exception("Received an invalid program.")
|
|
continue
|
|
|
|
await self._send_to_bdi(program)
|
|
|
|
async def setup(self):
|
|
"""
|
|
Initialize the agent.
|
|
|
|
Connects the internal ZMQ SUB socket and subscribes to the 'program' topic.
|
|
Starts the background behavior to receive programs.
|
|
"""
|
|
context = Context.instance()
|
|
|
|
self.sub_socket = context.socket(zmq.SUB)
|
|
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
|
|
self.sub_socket.subscribe("program")
|
|
|
|
self.add_behavior(self._receive_programs())
|