68 lines
2.2 KiB
Python
68 lines
2.2 KiB
Python
import zmq
|
|
from pydantic import ValidationError
|
|
from zmq.asyncio import Context
|
|
|
|
from control_backend.agents import BaseAgent
|
|
from control_backend.core.agent_system import InternalMessage
|
|
from control_backend.core.config import settings
|
|
from control_backend.schemas.belief_message import Belief, BeliefMessage
|
|
from control_backend.schemas.program import Program
|
|
|
|
|
|
class BDIProgramManager(BaseAgent):
|
|
"""
|
|
Will interpret programs received from the HTTP endpoint. Extracts norms, goals, triggers and
|
|
forwards them to the BDI as beliefs.
|
|
"""
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.sub_socket = None
|
|
|
|
async def _send_to_bdi(self, program: Program):
|
|
first_phase = program.phases[0]
|
|
norms_belief = Belief(
|
|
name="norms",
|
|
arguments=[norm.norm for norm in first_phase.norms],
|
|
replace=True,
|
|
)
|
|
goals_belief = Belief(
|
|
name="goals",
|
|
arguments=[goal.description for goal in first_phase.goals],
|
|
replace=True,
|
|
)
|
|
program_beliefs = BeliefMessage(beliefs=[norms_belief, goals_belief])
|
|
|
|
message = InternalMessage(
|
|
to=settings.agent_settings.bdi_core_name,
|
|
sender=self.name,
|
|
body=program_beliefs.model_dump_json(),
|
|
thread="beliefs",
|
|
)
|
|
await self.send(message)
|
|
self.logger.debug("Sent new norms and goals to the BDI agent.")
|
|
|
|
async def _receive_programs(self):
|
|
"""
|
|
Continuously receive programs from the HTTP endpoint, sent to us over ZMQ.
|
|
"""
|
|
while True:
|
|
topic, body = await self.sub_socket.recv_multipart()
|
|
|
|
try:
|
|
program = Program.model_validate_json(body)
|
|
except ValidationError:
|
|
self.logger.exception("Received an invalid program.")
|
|
continue
|
|
|
|
await self._send_to_bdi(program)
|
|
|
|
async def setup(self):
|
|
context = Context.instance()
|
|
|
|
self.sub_socket = context.socket(zmq.SUB)
|
|
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
|
|
self.sub_socket.subscribe("program")
|
|
|
|
self.add_behavior(self._receive_programs())
|