feat: pydantic models and inter-process messaging
Moved `InternalMessage` into schemas and created a `BeliefMessage` model. Also added the ability for agents to communicate via ZMQ to agents on another process. ref: N25B-316
This commit is contained in:
@@ -2,24 +2,17 @@ import asyncio
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Coroutine
|
||||
from dataclasses import dataclass
|
||||
|
||||
import zmq
|
||||
import zmq.asyncio as azmq
|
||||
|
||||
from control_backend.core.config import settings
|
||||
from control_backend.schemas.internal_message import InternalMessage
|
||||
|
||||
# Central directory to resolve agent names to instances
|
||||
_agent_directory: dict[str, "BaseAgent"] = {}
|
||||
|
||||
|
||||
@dataclass
|
||||
class InternalMessage:
|
||||
"""
|
||||
Represents a message to an agent.
|
||||
"""
|
||||
|
||||
to: str
|
||||
sender: str
|
||||
body: str
|
||||
thread: str | None = None
|
||||
|
||||
|
||||
class AgentDirectory:
|
||||
"""
|
||||
Helper class to keep track of which agents are registered.
|
||||
@@ -67,10 +60,23 @@ class BaseAgent(ABC):
|
||||
"""Starts the agent and its loops."""
|
||||
self.logger.info(f"Starting agent {self.name}")
|
||||
self._running = True
|
||||
|
||||
context = azmq.Context.instance()
|
||||
|
||||
# Setup the internal publishing socket
|
||||
self._internal_pub_socket = context.socket(zmq.PUB)
|
||||
self._internal_pub_socket.connect(settings.zmq_settings.internal_pub_address)
|
||||
|
||||
# Setup the internal receiving socket
|
||||
self._internal_sub_socket = context.socket(zmq.SUB)
|
||||
self._internal_sub_socket.connect(settings.zmq_settings.internal_sub_address)
|
||||
self._internal_sub_socket.subscribe(f"internal/{self.name}")
|
||||
|
||||
await self.setup()
|
||||
|
||||
# Start processing inbox
|
||||
# Start processing inbox and ZMQ messages
|
||||
await self.add_behavior(self._process_inbox())
|
||||
await self.add_behavior(self._receive_internal_zmq_loop())
|
||||
|
||||
async def stop(self):
|
||||
"""Stops the agent."""
|
||||
@@ -86,15 +92,38 @@ class BaseAgent(ABC):
|
||||
target = AgentDirectory.get(message.to)
|
||||
if target:
|
||||
await target.inbox.put(message)
|
||||
self.logger.debug(f"Sent message {message.body} to {message.to} via regular inbox.")
|
||||
else:
|
||||
self.logger.warning(f"Attempted to send message to unknown agent: {message.to}")
|
||||
# Apparently target agent is on a different process, send via ZMQ
|
||||
topic = f"internal/{message.to}".encode()
|
||||
body = message.model_dump_json().encode()
|
||||
await self._internal_pub_socket.send_multipart([topic, body])
|
||||
self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.")
|
||||
|
||||
async def _process_inbox(self):
|
||||
"""Default loop: equivalent to a CyclicBehaviour receiving messages."""
|
||||
while self._running:
|
||||
msg = await self.inbox.get()
|
||||
self.logger.debug(f"Received message from {msg.sender}.")
|
||||
await self.handle_message(msg)
|
||||
|
||||
async def _receive_internal_zmq_loop(self):
|
||||
"""
|
||||
Listens for internal messages sent from agents on another process via ZMQ
|
||||
and puts them into the normal inbox.
|
||||
"""
|
||||
while self._running:
|
||||
try:
|
||||
_, body = await self._internal_sub_socket.recv_multipart()
|
||||
|
||||
msg = InternalMessage.model_validate_json(body)
|
||||
|
||||
await self.inbox.put(msg)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception:
|
||||
self.logger.exception("Could not process ZMQ message.")
|
||||
|
||||
async def handle_message(self, msg: InternalMessage):
|
||||
"""Override this to handle incoming messages."""
|
||||
raise NotImplementedError
|
||||
|
||||
Reference in New Issue
Block a user