feat: added endpoint userinterrupt to userinterrupt
ref: N25B-400
This commit is contained in:
45
src/control_backend/agents/bdi/agentspeak.asl
Normal file
45
src/control_backend/agents/bdi/agentspeak.asl
Normal file
@@ -0,0 +1,45 @@
|
||||
phase("9922935f-ec70-4792-9a61-37a129e1ec14").
|
||||
keyword_said(Keyword) :- (user_said(Message) & .substring(Keyword, Message, Pos)) & (Pos >= 0).
|
||||
|
||||
|
||||
+!reply_with_goal(Goal)
|
||||
: user_said(Message)
|
||||
<- +responded_this_turn;
|
||||
.findall(Norm, norm(Norm), Norms);
|
||||
.reply_with_goal(Message, Norms, Goal).
|
||||
|
||||
+!say(Text)
|
||||
<- +responded_this_turn;
|
||||
.say(Text).
|
||||
|
||||
+!reply
|
||||
: user_said(Message)
|
||||
<- +responded_this_turn;
|
||||
.findall(Norm, norm(Norm), Norms);
|
||||
.reply(Message, Norms).
|
||||
|
||||
+user_said(Message)
|
||||
: phase("9922935f-ec70-4792-9a61-37a129e1ec14")
|
||||
<- .notify_user_said(Message);
|
||||
-responded_this_turn;
|
||||
!check_triggers;
|
||||
!transition_phase.
|
||||
|
||||
+!transition_phase
|
||||
: phase("9922935f-ec70-4792-9a61-37a129e1ec14") &
|
||||
not responded_this_turn
|
||||
<- -phase("9922935f-ec70-4792-9a61-37a129e1ec14");
|
||||
+phase("end");
|
||||
?user_said(Message);
|
||||
-+user_said(Message);
|
||||
.notify_transition_phase("9922935f-ec70-4792-9a61-37a129e1ec14", "end").
|
||||
|
||||
+user_said(Message)
|
||||
: phase("end")
|
||||
<- !reply.
|
||||
|
||||
+!check_triggers
|
||||
<- true.
|
||||
|
||||
+!transition_phase
|
||||
<- true.
|
||||
@@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
import zmq
|
||||
@@ -30,6 +31,26 @@ class UserInterruptAgent(BaseAgent):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.sub_socket = None
|
||||
self.pub_socket = None
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
Initialize the agent.
|
||||
|
||||
Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic.
|
||||
Starts the background behavior to receive the user interrupts.
|
||||
"""
|
||||
context = Context.instance()
|
||||
|
||||
self.sub_socket = context.socket(zmq.SUB)
|
||||
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
|
||||
self.sub_socket.subscribe("button_pressed")
|
||||
|
||||
self.pub_socket = context.socket(zmq.PUB)
|
||||
self.pub_socket.connect(settings.zmq_settings.internal_pub_address)
|
||||
|
||||
self.add_behavior(self._receive_button_event())
|
||||
self.add_behavior(self.test_sending_behaviour())
|
||||
|
||||
async def _receive_button_event(self):
|
||||
"""
|
||||
@@ -78,6 +99,33 @@ class UserInterruptAgent(BaseAgent):
|
||||
event_context,
|
||||
)
|
||||
|
||||
async def test_sending_behaviour(self):
|
||||
self.logger.info("Starting simple test sending behaviour...")
|
||||
|
||||
while True:
|
||||
try:
|
||||
test_data = {"type": "heartbeat", "status": "ok"}
|
||||
|
||||
await self._send_experiment_update(test_data)
|
||||
|
||||
except zmq.ZMQError as ze:
|
||||
self.logger.error(f"ZMQ error: {ze}")
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error: {e}")
|
||||
|
||||
await asyncio.sleep(2)
|
||||
|
||||
async def _send_experiment_update(self, data):
|
||||
"""
|
||||
Sends an update to the 'experiment' topic.
|
||||
The SSE endpoint will pick this up and push it to the UI.
|
||||
"""
|
||||
if self.pub_socket:
|
||||
topic = b"experiment"
|
||||
body = json.dumps(data).encode("utf-8")
|
||||
await self.pub_socket.send_multipart([topic, body])
|
||||
self.logger.debug(f"Sent experiment update: {data}")
|
||||
|
||||
async def _send_to_speech_agent(self, text_to_say: str):
|
||||
"""
|
||||
method to send prioritized speech command to RobotSpeechAgent.
|
||||
@@ -129,18 +177,3 @@ class UserInterruptAgent(BaseAgent):
|
||||
"Sent button_override belief with id '%s' to Program manager.",
|
||||
belief_id,
|
||||
)
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
Initialize the agent.
|
||||
|
||||
Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic.
|
||||
Starts the background behavior to receive the user interrupts.
|
||||
"""
|
||||
context = Context.instance()
|
||||
|
||||
self.sub_socket = context.socket(zmq.SUB)
|
||||
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
|
||||
self.sub_socket.subscribe("button_pressed")
|
||||
|
||||
self.add_behavior(self._receive_button_event())
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
import logging
|
||||
|
||||
from fastapi import APIRouter, Request
|
||||
|
||||
from control_backend.schemas.events import ButtonPressedEvent
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post("/button_pressed", status_code=202)
|
||||
async def receive_button_event(event: ButtonPressedEvent, request: Request):
|
||||
"""
|
||||
Endpoint to handle external button press events.
|
||||
|
||||
Validates the event payload and publishes it to the internal 'button_pressed' topic.
|
||||
Subscribers (in this case user_interrupt_agent) will pick this up to trigger
|
||||
specific behaviors or state changes.
|
||||
|
||||
:param event: The parsed ButtonPressedEvent object.
|
||||
:param request: The FastAPI request object.
|
||||
"""
|
||||
logger.debug("Received button event: %s | %s", event.type, event.context)
|
||||
|
||||
topic = b"button_pressed"
|
||||
body = event.model_dump_json().encode()
|
||||
|
||||
pub_socket = request.app.state.endpoints_pub_socket
|
||||
await pub_socket.send_multipart([topic, body])
|
||||
|
||||
return {"status": "Event received"}
|
||||
67
src/control_backend/api/v1/endpoints/user_interact.py
Normal file
67
src/control_backend/api/v1/endpoints/user_interact.py
Normal file
@@ -0,0 +1,67 @@
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
import zmq
|
||||
import zmq.asyncio
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.responses import StreamingResponse
|
||||
from zmq.asyncio import Context
|
||||
|
||||
from control_backend.core.config import settings
|
||||
from control_backend.schemas.events import ButtonPressedEvent
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post("/button_pressed", status_code=202)
|
||||
async def receive_button_event(event: ButtonPressedEvent, request: Request):
|
||||
"""
|
||||
Endpoint to handle external button press events.
|
||||
|
||||
Validates the event payload and publishes it to the internal 'button_pressed' topic.
|
||||
Subscribers (in this case user_interrupt_agent) will pick this up to trigger
|
||||
specific behaviors or state changes.
|
||||
|
||||
:param event: The parsed ButtonPressedEvent object.
|
||||
:param request: The FastAPI request object.
|
||||
"""
|
||||
logger.debug("Received button event: %s | %s", event.type, event.context)
|
||||
|
||||
topic = b"button_pressed"
|
||||
body = event.model_dump_json().encode()
|
||||
|
||||
pub_socket = request.app.state.endpoints_pub_socket
|
||||
await pub_socket.send_multipart([topic, body])
|
||||
|
||||
return {"status": "Event received"}
|
||||
|
||||
|
||||
@router.get("/experiment_stream")
|
||||
async def experiment_stream(request: Request):
|
||||
# Use the asyncio-compatible context
|
||||
context = Context.instance()
|
||||
socket = context.socket(zmq.SUB)
|
||||
|
||||
# Connect and subscribe
|
||||
socket.connect(settings.zmq_settings.internal_sub_address)
|
||||
socket.subscribe(b"experiment")
|
||||
|
||||
async def gen():
|
||||
try:
|
||||
while True:
|
||||
# Check if client closed the tab
|
||||
if await request.is_disconnected():
|
||||
logger.info("Client disconnected from experiment stream.")
|
||||
break
|
||||
|
||||
try:
|
||||
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=1.0)
|
||||
_, message = parts
|
||||
yield f"data: {message.decode().strip()}\n\n"
|
||||
except TimeoutError:
|
||||
continue
|
||||
finally:
|
||||
socket.close()
|
||||
|
||||
return StreamingResponse(gen(), media_type="text/event-stream")
|
||||
@@ -1,6 +1,6 @@
|
||||
from fastapi.routing import APIRouter
|
||||
|
||||
from control_backend.api.v1.endpoints import button_pressed, logs, message, program, robot, sse
|
||||
from control_backend.api.v1.endpoints import logs, message, program, robot, sse, user_interact
|
||||
|
||||
api_router = APIRouter()
|
||||
|
||||
@@ -14,4 +14,4 @@ api_router.include_router(logs.router, tags=["Logs"])
|
||||
|
||||
api_router.include_router(program.router, tags=["Program"])
|
||||
|
||||
api_router.include_router(button_pressed.router, tags=["Button Pressed Events"])
|
||||
api_router.include_router(user_interact.router, tags=["Button Pressed Events"])
|
||||
|
||||
Reference in New Issue
Block a user