Compare commits

..

120 Commits

Author SHA1 Message Date
Storm
ce99dc5ec3 Merge branch 'main' into feat/face-recognition 2026-01-30 17:23:01 +01:00
68f445c8bc Revert "build: remove test group usage"
This reverts commit 53ba8bb16b
2026-01-30 16:03:51 +00:00
53ba8bb16b build: remove test group usage 2026-01-30 15:57:47 +00:00
516fda4694 build: add cv2 dependencies to CI/CD 2026-01-30 15:50:28 +00:00
JobvAlewijk
12b905ff22 Merge branch 'main' of https://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-cb into feat/face-recognition 2026-01-30 16:26:30 +01:00
JobvAlewijk
11d7c1409e chore: remove pipeline stuff 2026-01-30 16:02:40 +01:00
JobvAlewijk
f89fb2266a feat: added face recognition and tests
ref: N25B-397
2026-01-30 15:59:27 +01:00
f9d477a6c8 Merge branch 'chore/copyright-all-files' into 'main'
chore: add copyright to all source file

See merge request ics/sp/2025/n25b/pepperplus-cb!51
2026-01-30 11:47:53 +00:00
Twirre Meulenbelt
0f964795f3 feat: subscribe instead of req/res for face detection
ref: N25B-395
2026-01-29 17:16:38 +01:00
JobvAlewijk
00a13426a0 Merge branch 'main' of https://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-cb into feat/face-recognition 2026-01-29 16:21:55 +01:00
JobvAlewijk
f6477e5325 chore: blabla 2026-01-29 16:20:51 +01:00
Pim Hutting
35d99e539a chore: add copyright to all source file 2026-01-29 15:36:28 +01:00
d9e5de6e51 Merge branch 'dev' into 'main'
merge dev into main

See merge request ics/sp/2025/n25b/pepperplus-cb!49
2026-01-28 10:49:14 +00:00
Pim Hutting
f2953fb1de Merge branch 'feat/use-experiment-logs' into 'dev'
Add useful experiment logs

See merge request ics/sp/2025/n25b/pepperplus-cb!48
2026-01-27 17:45:41 +00:00
Twirre Meulenbelt
4b6980a26e Merge remote-tracking branch 'origin/dev' into feat/use-experiment-logs
# Conflicts:
#	src/control_backend/agents/llm/llm_agent.py
2026-01-27 18:42:18 +01:00
Twirre
0413e0f710 Merge branch 'feat/longer-pauses-possible' into 'dev'
Stop LLM response when user adds something to their message

See merge request ics/sp/2025/n25b/pepperplus-cb!47
2026-01-27 17:34:28 +00:00
Twirre Meulenbelt
941aa00b7b chore: re-addd more silence before speech audio 2026-01-27 18:19:20 +01:00
Twirre
cc09c1b289 Merge branch 'feat/recursive-goals-override' into 'dev'
feat: add recursive goal mapping to UserInterruptAgent

See merge request ics/sp/2025/n25b/pepperplus-cb!46
2026-01-27 17:04:48 +00:00
Twirre Meulenbelt
82aa7c76df test: fix tests
ref: N25B-401
2026-01-27 17:06:13 +01:00
Pim Hutting
bc9045c977 chore: applied feedback 2026-01-27 17:03:36 +01:00
Twirre Meulenbelt
43d81002ec feat: add useful experiment logs
ref: N25B-401
2026-01-27 16:52:18 +01:00
Pim Hutting
1e7c2ba229 chore: added missing tests 2026-01-27 12:05:55 +01:00
Pim Hutting
2404c847ae feat: added recursive goal mapping and tests
ref: N25B-400
2026-01-27 12:05:27 +01:00
Björn Otgaar
7c8a56dfcc Merge branch 'chore/add-missing-tests' into 'dev'
chore: add missing tests

See merge request ics/sp/2025/n25b/pepperplus-cb!45
2026-01-27 10:58:11 +00:00
Pim Hutting
9b040ffc62 chore: applied feedback 2026-01-27 11:51:40 +01:00
Pim Hutting
27f91150e1 fix: look for goals in steps rather than plans
small bugfix, we used to look for goals in plans, but they are part of
a plan.

ref: N25B-400
2026-01-27 11:51:40 +01:00
Pim Hutting
215bafe27f chore: added missing tests 2026-01-27 11:50:26 +01:00
da0f48e96d Merge branch 'feat/reset-experiment-and-phase' into 'dev'
The Big One

See merge request ics/sp/2025/n25b/pepperplus-cb!43
2026-01-26 19:20:44 +00:00
Twirre Meulenbelt
7dd47c9de8 Merge remote-tracking branch 'origin/feat/reset-experiment-and-phase' into feat/reset-experiment-and-phase 2026-01-26 19:55:24 +01:00
Twirre Meulenbelt
4f927bc025 fix: make DOS from other agents impossible
There were some missing value checks. Other agents could cause errors in the User Interrupt agent or the Program Manager agent by sending malformed messages.

ref: N25B-453
2026-01-26 19:51:14 +01:00
bc1fa2ea35 Merge branch 'dev' into feat/reset-experiment-and-phase 2026-01-26 19:44:46 +01:00
650050fa0f chore: move magic numbers to env and cleanup 2026-01-26 19:28:16 +01:00
Twirre Meulenbelt
d8dc558d3e docs: update existing docstrings and add new docs
ref: N25B-453
2026-01-26 16:04:01 +01:00
Björn Otgaar
e7e305c4a3 Merge branch 'feat/experiment-logging' into 'dev'
Experiment log stream, to console, file and UI

See merge request ics/sp/2025/n25b/pepperplus-cb!44
2026-01-26 13:39:58 +00:00
b9df47b7d1 docs: add docstrings to AgentSpeak stuff
ref: N25B-449
2026-01-26 12:21:04 +01:00
Twirre Meulenbelt
a74ecc6c45 docs: add docstrings to dated file handler
ref: N25B-401
2026-01-22 11:48:02 +01:00
JobvAlewijk
09d8cca309 Merge branch 'feat/visual-emotion-recognition' of https://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-cb into feat/face-recognition 2026-01-21 17:57:23 +01:00
Storm
0b1c2ce20a feat: implemented pausing, implemented graceful stopping, removed old RI pausing code
ref: N25B-393
2026-01-20 18:53:24 +01:00
Storm
f9b807fc97 chore: quick push before demo; fixed image receiving from RI 2026-01-20 12:46:30 +01:00
Storm
424294b0a3 Merged feat/longer-pauses-possible into feat/visual-emotion-recognition 2026-01-19 18:35:07 +01:00
Pim Hutting
bc0947fac1 chore: added a dot 2026-01-19 18:26:15 +01:00
Storm
cd80cdf93b Merge branch 'feat/longer-pauses-possible' into feat/visual-emotion-recognition 2026-01-19 18:24:31 +01:00
230afef16f test: fix tests
ref: N25B-452
2026-01-19 16:06:17 +01:00
1cd5b46f97 fix: should work now
Also added trimming to Windows transcription.

ref: N25B-452
2026-01-19 15:03:59 +01:00
c0789e82a9 feat: add previously interrupted message to current
ref: N25B-452
2026-01-19 14:47:11 +01:00
04d19cee5c feat: (maybe) stop response when new user message
If we get a new message before the LLM is done responding, interrupt it.

ref: N25B-452
2026-01-19 14:08:26 +01:00
Storm
985327de70 docs: updated docstrings and fixed styling
ref: N25B-393
2026-01-19 12:52:00 +01:00
Twirre Meulenbelt
58881b5914 test: add test cases
ref: N25B-401
2026-01-19 12:47:59 +01:00
Storm
302c50934e feat: implemented emotion recognition functionality in AgentSpeak
ref: N25B-393
2026-01-19 12:10:58 +01:00
Storm
f9c69cafb3 Merge branch 'feat/reset-experiment-and-phase' into feat/visual-emotion-recognition 2026-01-19 11:45:31 +01:00
JobvAlewijk
37da9992ba feat: added face detection and communication with
RI
ref: N25B-397
2026-01-17 14:02:34 +01:00
JobvAlewijk
f41201dd8e chore: revert 2026-01-16 16:44:49 +01:00
JobvAlewijk
2033e02116 Merge branch 'dev' of https://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-cb into feat/face-recognition 2026-01-16 16:38:59 +01:00
Twirre Meulenbelt
ba79d09c5d feat: log download endpoints
ref: N25B-401
2026-01-16 16:32:51 +01:00
db64eaeb0b fix: failing tests and warnings
ref: N25B-449
2026-01-16 16:18:36 +01:00
7f7e0c542e docs: add missing docs
ref: N25B-115
2026-01-16 15:35:41 +01:00
Storm
1b0b72d63a chore: fixed broken uv.lock file 2026-01-16 15:10:55 +01:00
41bd3ffc50 Merge branch 'test/increase-coverage' into feat/reset-experiment-and-phase 2026-01-16 15:08:34 +01:00
8506c0d9ef chore: remove belief collector and small tweaks 2026-01-16 15:07:44 +01:00
Storm
0941b26703 refactor: updated how changes are passed to bdi_core_agent after merge
ref: N25B-393
2026-01-16 15:05:13 +01:00
Storm
48ae0c7a12 Merge remote-tracking branch 'origin/feat/reset-experiment-and-phase' into feat/visual-emotion-recognition 2026-01-16 14:45:16 +01:00
Storm
a09d8b3d9a chore: small changes 2026-01-16 14:40:59 +01:00
Pim Hutting
7c10c50336 chore: removed resetExperiment from backened
now it happens in UI

ref: N25B-400
2026-01-16 14:29:46 +01:00
Pim Hutting
6d03ba8a41 feat: added extra endpoint for norm pings
also made sure that you cannot skip phase on end phase

ref: N25B-400
2026-01-16 14:28:27 +01:00
Storm
ac20048f02 Merge branch 'dev' into feat/visual-emotion-recognition 2026-01-16 14:16:28 +01:00
Storm
05804c158d feat: fully implemented visual emotion recognition agent in pipeline
ref: N25B-393
2026-01-16 13:26:53 +01:00
b1c18abffd test: bunch of tests
Written with AI, still need to check them

ref: N25B-449
2026-01-16 13:11:41 +01:00
Storm
0771b0d607 feat: implemented visual emotion recogntion agent
ref: N25B-393
2026-01-16 09:50:59 +01:00
Twirre Meulenbelt
4cda4e5e70 feat: experiment log stream, to file and UI
Adds a few new logging utility classes. One to save to files with a date, one to support optional fields in formats, last to filter partial log messages.

ref: N25B-401
2026-01-15 17:07:49 +01:00
Luijkx,S.O.H. (Storm)
a9df9208bc Merge branch 'feat/multiple-receivers' into 'dev'
feat: able to send to multiple receivers

See merge request ics/sp/2025/n25b/pepperplus-cb!42
2026-01-15 09:26:12 +00:00
Pim Hutting
041fc4ab6e chore: cond_norms unachieve and via belief msg 2026-01-15 09:02:52 +01:00
39e1bb1ead fix: sync issues
ref: N25B-447
2026-01-14 15:28:29 +01:00
8f6662e64a feat: phase transitions
ref: N25B-446
2026-01-14 13:22:51 +01:00
0794c549a8 chore: remove agentspeak file from tracking 2026-01-14 11:27:29 +01:00
ff24ab7a27 fix: default behavior and end phase
ref: N25B-448
2026-01-14 11:24:19 +01:00
43ac8ad69f chore: delete outdated files
ref: N25B-446
2026-01-14 10:58:41 +01:00
Twirre Meulenbelt
d7d697b293 docs: update to docstring
ref: N25B-441
2026-01-13 17:09:26 +01:00
Twirre Meulenbelt
9a55067a13 fix: set sender for internal messages
ref: N25B-441
2026-01-13 17:07:17 +01:00
Twirre Meulenbelt
f7669c021b feat: support force completed goals in semantic belief agent
ref: N25B-427
2026-01-13 17:04:44 +01:00
Björn Otgaar
8f52f8bf0c Merge branch 'feat/monitoringpage-cb' of git.science.uu.nl:ics/sp/2025/n25b/pepperplus-cb into feat/monitoringpage-cb 2026-01-13 14:03:40 +01:00
Björn Otgaar
2a94a45b34 chore: adjust 'phase_id' to 'id' for correct payload 2026-01-13 14:03:37 +01:00
Storm
1c88ae6078 feat: visual emotion recognition agent
ref: N25B-393
2026-01-13 12:41:18 +01:00
f87651f691 fix: achieved goal in bdi core
ref: N25B-400
2026-01-13 12:26:18 +01:00
Pim Hutting
65e0b2d250 feat: added correct message
ref: N25B-400
2026-01-13 12:05:38 +01:00
177e844349 feat: send achieved goal from interrupt->manager->semantic
ref: N25B-400
2026-01-13 11:46:17 +01:00
Pim Hutting
0df6040444 feat: added sending goal overwrites in Userinter.
ref: N25B-400
2026-01-13 11:26:03 +01:00
Twirre Meulenbelt
af81bd8620 Merge branch 'feat/multiple-receivers' into feat/monitoringpage-cb
# Conflicts:
#	src/control_backend/core/agent_system.py
#	src/control_backend/schemas/internal_message.py
2026-01-13 11:14:18 +01:00
Twirre Meulenbelt
70e05b6c92 test: sending to multiple agents, including remote
ref: N25B-441
2026-01-13 11:10:35 +01:00
c0b8fb8612 feat: able to send to multiple receivers
ref: N25B-441
2026-01-13 11:06:42 +01:00
Pim Hutting
d499111ea4 feat: added pause functionality
Storms code wasnt fully included in Bjorns branch

ref: N25B-400
2026-01-13 00:52:04 +01:00
Pim Hutting
72c2c57f26 chore: merged button functionality and fix bug
merged björns branch that has the following button functionality
-Pause/resume
-Next phase
-Restart phase
-reset experiment
fix bug where norms where not properly sent to the user interrupt agent

ref: N25B-400
2026-01-12 19:31:50 +01:00
Pim Hutting
4a014b577a Merge remote-tracking branch 'origin/feat/reset-skip-buttons' into feat/monitoringpage-cb 2026-01-12 19:19:31 +01:00
Pim Hutting
c45a258b22 fix: fixed a bug where norms where not updated
Now in UserInterruptAgent we store the norm.norm and not the slugified norm

ref: N25B-400
2026-01-12 19:07:05 +01:00
0f09276477 fix: send norms back to UI
ref: N25B-400
2026-01-12 17:02:39 +01:00
4e113c2d5c fix: default plan and norm force
ref: N25B-400
2026-01-12 16:20:24 +01:00
Pim Hutting
54c835cc0f feat: added force_norm handling in BDI core agent
ref: N25B-400
2026-01-12 15:37:04 +01:00
Pim Hutting
c4ccbcd354 Merge remote-tracking branch 'origin/feat/extra-agentspeak-functionality' into feat/monitoringpage-cb 2026-01-12 15:24:48 +01:00
JobvAlewijk
6b790de53a Merge branch 'dev' of https://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-cb into feat/face-recognition 2026-01-12 14:23:44 +01:00
JobvAlewijk
1932ac959b feat: connected with RI properly
ref: N25B-397
2026-01-12 14:23:00 +01:00
Pim Hutting
d202abcd1b fix: phases update correctly
there was a bug where phases would not update without restarting cb

ref: N25B-400
2026-01-12 12:51:24 +01:00
Björn Otgaar
c91b999104 chore: fix bugs and make sure connected robots work 2026-01-08 15:31:44 +01:00
Pim Hutting
5e2126fc21 chore: code cleanup
ref: N25B-400
2026-01-08 15:05:43 +01:00
Pim Hutting
500bbc2d82 feat: added goal start sending functionality
ref: N25B-400
2026-01-08 14:52:55 +01:00
JobvAlewijk
bb0c1bd383 feat: cb can communicate face with ri
ref: N25B-397
2026-01-08 13:02:32 +01:00
Björn Otgaar
1360567820 chore: indenting 2026-01-08 13:01:38 +01:00
Björn Otgaar
cc0d5af28c chore: fixing bugs 2026-01-08 12:56:22 +01:00
Pim Hutting
3a8d1730a1 fix: made mapping for conditional norms only
ref: N25B-400
2026-01-08 12:29:16 +01:00
Pim Hutting
b27e5180c4 feat: small implementation change
ref: N25B-400
2026-01-08 11:25:53 +01:00
Pim Hutting
6b34f4b82c fix: small bugfix
ref: N25B-400
2026-01-08 10:59:24 +01:00
Pim Hutting
4bf2be6359 feat: added a functionality for monitoring page
ref: N25B-400
2026-01-08 09:56:10 +01:00
Pim Hutting
20e5e46639 Merge remote-tracking branch 'origin/feat/extra-agentspeak-functionality' into feat/monitoringpage-cb 2026-01-07 22:42:40 +01:00
Pim Hutting
365d449666 feat: commit before I can merge new changes
ref: N25B-400
2026-01-07 22:41:59 +01:00
Björn Otgaar
be88323cf7 chore: add one endpoint fo avoid errors 2026-01-07 18:24:35 +01:00
Pim Hutting
be6bbbb849 feat: added endpoint userinterrupt to userinterrupt
ref: N25B-400
2026-01-07 17:42:54 +01:00
Storm
76dfcb23ef feat: added pause functionality
ref: N25B-350
2026-01-07 16:03:49 +01:00
Björn Otgaar
34afca6652 chore: automatically send the experiment controls to the bdi core in the user interupt agent. 2026-01-07 15:07:33 +01:00
Björn Otgaar
324a63e5cc chore: add styles to user_interrupt_agent 2026-01-07 14:45:42 +01:00
JobvAlewijk
03954bef54 feat: face recognition agent
ref: N25B-397
2026-01-04 19:47:18 +01:00
8c36dd0805 Merge branch 'chore/merge-request-template' into 'main'
Add default merge request template

See merge request ics/sp/2025/n25b/pepperplus-cb!35
2025-12-03 15:43:47 +00:00
Twirre Meulenbelt
f881d4d99b chore: add default merge request template 2025-12-03 16:01:52 +01:00
105 changed files with 6938 additions and 1399 deletions

3
.gitignore vendored
View File

@@ -222,6 +222,9 @@ __marimo__/
docs/*
!docs/conf.py
# Generated files
*.asl
experiment-*.log

View File

@@ -22,4 +22,5 @@ test:
tags:
- test
script:
- apt-get update && apt-get install ffmpeg libsm6 libxext6 -y
- uv run --only-group test pytest test

View File

@@ -1,36 +1,57 @@
version: 1
custom_levels:
OBSERVATION: 25
ACTION: 26
OBSERVATION: 24
ACTION: 25
CHAT: 26
LLM: 9
formatters:
# Console output
colored:
(): "colorlog.ColoredFormatter"
class: colorlog.ColoredFormatter
format: "{log_color}{asctime}.{msecs:03.0f} | {levelname:11} | {name:70} | {message}"
style: "{"
datefmt: "%H:%M:%S"
# User-facing UI (structured JSON)
json_experiment:
(): "pythonjsonlogger.jsonlogger.JsonFormatter"
json:
class: pythonjsonlogger.jsonlogger.JsonFormatter
format: "{name} {levelname} {levelno} {message} {created} {relativeCreated}"
style: "{"
# Experiment stream for console and file output, with optional `role` field
experiment:
class: control_backend.logging.OptionalFieldFormatter
format: "%(asctime)s %(levelname)s %(role?)s %(message)s"
defaults:
role: "-"
filters:
# Filter out any log records that have the extra field "partial" set to True, indicating that they
# will be replaced later.
partial:
(): control_backend.logging.PartialFilter
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: colored
filters: [partial]
stream: ext://sys.stdout
ui:
class: zmq.log.handlers.PUBHandler
level: LLM
formatter: json_experiment
formatter: json
file:
class: control_backend.logging.DatedFileHandler
formatter: experiment
filters: [partial]
# Directory must match config.logging_settings.experiment_log_directory
file_prefix: experiment_logs/experiment
# Level of external libraries
# Level for external libraries
root:
level: WARN
handlers: [console]
@@ -39,3 +60,6 @@ loggers:
control_backend:
level: LLM
handlers: [ui]
experiment: # This name must match config.logging_settings.experiment_logger_name
level: DEBUG
handlers: [ui, file]

View File

@@ -7,6 +7,7 @@ requires-python = ">=3.13"
dependencies = [
"agentspeak>=0.2.2",
"colorlog>=6.10.1",
"deepface>=0.0.96",
"fastapi[all]>=0.115.6",
"mlx-whisper>=0.4.3 ; sys_platform == 'darwin'",
"numpy>=2.3.3",
@@ -21,6 +22,7 @@ dependencies = [
"silero-vad>=6.0.0",
"sphinx>=7.3.7",
"sphinx-rtd-theme>=3.0.2",
"tf-keras>=2.20.1",
"torch>=2.8.0",
"uvicorn>=0.37.0",
]
@@ -48,6 +50,7 @@ test = [
"pytest-asyncio>=1.2.0",
"pytest-cov>=7.0.0",
"pytest-mock>=3.15.1",
"python-slugify>=8.0.4",
"pyyaml>=6.0.3",
"pyzmq>=27.1.0",
"soundfile>=0.13.1",

View File

@@ -0,0 +1,5 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
-------------------------------------------------------------------------------
This package contains all agent implementations for the PepperPlus Control Backend.
"""
from .base import BaseAgent as BaseAgent

View File

@@ -1,2 +1,10 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
--------------------------------------------------------------------------------
Agents responsible for controlling the robot's physical actions, such as speech and gestures.
"""
from .robot_gesture_agent import RobotGestureAgent as RobotGestureAgent
from .robot_speech_agent import RobotSpeechAgent as RobotSpeechAgent

View File

@@ -1,4 +1,11 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import json
import logging
import zmq
import zmq.asyncio as azmq
@@ -8,6 +15,8 @@ from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class RobotGestureAgent(BaseAgent):
"""
@@ -83,6 +92,8 @@ class RobotGestureAgent(BaseAgent):
self.subsocket.close()
if self.pubsocket:
self.pubsocket.close()
if self.repsocket:
self.repsocket.close()
await super().stop()
async def handle_message(self, msg: InternalMessage):
@@ -109,6 +120,7 @@ class RobotGestureAgent(BaseAgent):
gesture_command.data,
)
return
experiment_logger.action("Gesture: %s", gesture_command.data)
await self.pubsocket.send_json(gesture_command.model_dump())
except Exception:
self.logger.exception("Error processing internal message.")

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import json
import zmq

View File

@@ -1,9 +1,16 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
from abc import ABC
from control_backend.core.agent_system import BaseAgent as CoreBaseAgent
class BaseAgent(CoreBaseAgent):
class BaseAgent(CoreBaseAgent, ABC):
"""
The primary base class for all implementation agents.

View File

@@ -1,8 +1,14 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
--------------------------------------------------------------------------------
Agents and utilities for the BDI (Belief-Desire-Intention) reasoning system,
implementing AgentSpeak(L) logic.
"""
from control_backend.agents.bdi.bdi_core_agent import BDICoreAgent as BDICoreAgent
from .belief_collector_agent import (
BDIBeliefCollectorAgent as BDIBeliefCollectorAgent,
)
from .text_belief_extractor_agent import (
TextBeliefExtractorAgent as TextBeliefExtractorAgent,
)

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import annotations
from abc import ABC, abstractmethod
@@ -8,31 +14,78 @@ from enum import StrEnum
class AstNode(ABC):
"""
Abstract base class for all elements of an AgentSpeak program.
This class serves as the foundation for all AgentSpeak abstract syntax tree (AST) nodes.
It defines the core interface that all AST nodes must implement to generate AgentSpeak code.
"""
@abstractmethod
def _to_agentspeak(self) -> str:
"""
Generates the AgentSpeak code string.
This method converts the AST node into its corresponding
AgentSpeak source code representation.
:return: The AgentSpeak code string representation of this node.
"""
pass
def __str__(self) -> str:
"""
Returns the string representation of this AST node.
This method provides a convenient way to get the AgentSpeak code representation
by delegating to the _to_agentspeak method.
:return: The AgentSpeak code string representation of this node.
"""
return self._to_agentspeak()
class AstExpression(AstNode, ABC):
"""
Intermediate class for anything that can be used in a logical expression.
This class extends AstNode to provide common functionality for all expressions
that can be used in logical operations within AgentSpeak programs.
"""
def __and__(self, other: ExprCoalescible) -> AstBinaryOp:
"""
Creates a logical AND operation between this expression and another.
This method allows for operator overloading of the & operator to create
binary logical operations in a more intuitive syntax.
:param other: The right-hand side expression to combine with this one.
:return: A new AstBinaryOp representing the logical AND operation.
"""
return AstBinaryOp(self, BinaryOperatorType.AND, _coalesce_expr(other))
def __or__(self, other: ExprCoalescible) -> AstBinaryOp:
"""
Creates a logical OR operation between this expression and another.
This method allows for operator overloading of the | operator to create
binary logical operations in a more intuitive syntax.
:param other: The right-hand side expression to combine with this one.
:return: A new AstBinaryOp representing the logical OR operation.
"""
return AstBinaryOp(self, BinaryOperatorType.OR, _coalesce_expr(other))
def __invert__(self) -> AstLogicalExpression:
"""
Creates a logical negation of this expression.
This method allows for operator overloading of the ~ operator to create
negated expressions. If the expression is already a logical expression,
it toggles the negation flag. Otherwise, it wraps the expression in a
new AstLogicalExpression with negation set to True.
:return: An AstLogicalExpression representing the negated form of this expression.
"""
if isinstance(self, AstLogicalExpression):
self.negated = not self.negated
return self
@@ -77,52 +130,128 @@ class AstTerm(AstExpression, ABC):
return AstBinaryOp(self, BinaryOperatorType.NOT_EQUALS, _coalesce_expr(other))
@dataclass
@dataclass(eq=False)
class AstAtom(AstTerm):
"""
Grounded expression in all lowercase.
Represents a grounded atom in AgentSpeak (e.g., lowercase constants).
Atoms are the simplest form of terms in AgentSpeak, representing concrete,
unchanging values. They are typically used as constants in logical expressions.
:ivar value: The string value of this atom, which will be converted to lowercase
in the AgentSpeak representation.
"""
value: str
def _to_agentspeak(self) -> str:
"""
Converts this atom to its AgentSpeak string representation.
Atoms are represented in lowercase in AgentSpeak to distinguish them
from variables (which are capitalized).
:return: The lowercase string representation of this atom.
"""
return self.value.lower()
@dataclass
@dataclass(eq=False)
class AstVar(AstTerm):
"""
Ungrounded variable expression. First letter capitalized.
Represents an ungrounded variable in AgentSpeak (e.g., capitalized names).
Variables in AgentSpeak are placeholders that can be bound to specific values
during execution. They are distinguished from atoms by their capitalization.
:ivar name: The name of this variable, which will be capitalized in the
AgentSpeak representation.
"""
name: str
def _to_agentspeak(self) -> str:
"""
Converts this variable to its AgentSpeak string representation.
Variables are represented with capitalized names in AgentSpeak to distinguish
them from atoms (which are lowercase).
:return: The capitalized string representation of this variable.
"""
return self.name.capitalize()
@dataclass
@dataclass(eq=False)
class AstNumber(AstTerm):
"""
Represents a numeric constant in AgentSpeak.
Numeric constants can be either integers or floating-point numbers and are
used in logical expressions and comparisons.
:ivar value: The numeric value of this constant (can be int or float).
"""
value: int | float
def _to_agentspeak(self) -> str:
"""
Converts this numeric constant to its AgentSpeak string representation.
:return: The string representation of the numeric value.
"""
return str(self.value)
@dataclass
@dataclass(eq=False)
class AstString(AstTerm):
"""
Represents a string literal in AgentSpeak.
String literals are used to represent textual data and are enclosed in
double quotes in the AgentSpeak representation.
:ivar value: The string content of this literal.
"""
value: str
def _to_agentspeak(self) -> str:
"""
Converts this string literal to its AgentSpeak string representation.
String literals are enclosed in double quotes in AgentSpeak.
:return: The string literal enclosed in double quotes.
"""
return f'"{self.value}"'
@dataclass
@dataclass(eq=False)
class AstLiteral(AstTerm):
"""
Represents a literal (functor and terms) in AgentSpeak.
Literals are the fundamental building blocks of AgentSpeak programs, consisting
of a functor (predicate name) and an optional list of terms (arguments).
:ivar functor: The name of the predicate or function.
:ivar terms: A list of terms (arguments) for this literal. Defaults to an empty list.
"""
functor: str
terms: list[AstTerm] = field(default_factory=list)
def _to_agentspeak(self) -> str:
"""
Converts this literal to its AgentSpeak string representation.
If the literal has no terms, it returns just the functor name.
Otherwise, it returns the functor followed by the terms in parentheses.
:return: The AgentSpeak string representation of this literal.
"""
if not self.terms:
return self.functor
args = ", ".join(map(str, self.terms))
@@ -130,6 +259,13 @@ class AstLiteral(AstTerm):
class BinaryOperatorType(StrEnum):
"""
Enumeration of binary operator types used in AgentSpeak expressions.
These operators are used to create binary operations between expressions,
including logical operations (AND, OR) and comparison operations.
"""
AND = "&"
OR = "|"
GREATER_THAN = ">"
@@ -142,15 +278,41 @@ class BinaryOperatorType(StrEnum):
@dataclass
class AstBinaryOp(AstExpression):
"""
Represents a binary logical or relational operation in AgentSpeak.
Binary operations combine two expressions using a logical or comparison operator.
They are used to create complex logical conditions in AgentSpeak programs.
:ivar left: The left-hand side expression of the operation.
:ivar operator: The binary operator type (AND, OR, comparison operators, etc.).
:ivar right: The right-hand side expression of the operation.
"""
left: AstExpression
operator: BinaryOperatorType
right: AstExpression
def __post_init__(self):
"""
Post-initialization processing to ensure proper expression types.
This method wraps the left and right expressions in AstLogicalExpression
instances if they aren't already, ensuring consistent handling throughout
the AST.
"""
self.left = _as_logical(self.left)
self.right = _as_logical(self.right)
def _to_agentspeak(self) -> str:
"""
Converts this binary operation to its AgentSpeak string representation.
The method handles proper parenthesization of sub-expressions to maintain
correct operator precedence and readability.
:return: The AgentSpeak string representation of this binary operation.
"""
l_str = str(self.left)
r_str = str(self.right)
@@ -167,10 +329,29 @@ class AstBinaryOp(AstExpression):
@dataclass
class AstLogicalExpression(AstExpression):
"""
Represents a logical expression, potentially negated, in AgentSpeak.
Logical expressions can be either positive or negated and form the basis
of conditions and beliefs in AgentSpeak programs.
:ivar expression: The underlying expression being evaluated.
:ivar negated: Boolean flag indicating whether this expression is negated.
"""
expression: AstExpression
negated: bool = False
def _to_agentspeak(self) -> str:
"""
Converts this logical expression to its AgentSpeak string representation.
If the expression is negated, it prepends 'not ' to the expression string.
For complex expressions (binary operations), it adds parentheses when negated
to maintain correct logical interpretation.
:return: The AgentSpeak string representation of this logical expression.
"""
expr_str = str(self.expression)
if isinstance(self.expression, AstBinaryOp) and self.negated:
expr_str = f"({expr_str})"
@@ -178,65 +359,169 @@ class AstLogicalExpression(AstExpression):
def _as_logical(expr: AstExpression) -> AstLogicalExpression:
"""
Converts an expression to a logical expression if it isn't already.
This helper function ensures that expressions are properly wrapped in
AstLogicalExpression instances, which is necessary for consistent handling
of logical operations in the AST.
:param expr: The expression to convert.
:return: The expression wrapped in an AstLogicalExpression if it wasn't already.
"""
if isinstance(expr, AstLogicalExpression):
return expr
return AstLogicalExpression(expr)
class StatementType(StrEnum):
"""
Enumeration of statement types that can appear in AgentSpeak plans.
These statement types define the different kinds of actions and operations
that can be performed within the body of an AgentSpeak plan.
"""
EMPTY = ""
"""Empty statement (no operation, used when evaluating a plan to true)."""
DO_ACTION = "."
"""Execute an action defined in Python."""
ACHIEVE_GOAL = "!"
"""Achieve a goal (add a goal to be accomplished)."""
TEST_GOAL = "?"
"""Test a goal (check if a goal can be achieved)."""
ADD_BELIEF = "+"
"""Add a belief to the belief base."""
REMOVE_BELIEF = "-"
"""Remove a belief from the belief base."""
REPLACE_BELIEF = "-+"
"""Replace a belief in the belief base."""
@dataclass
class AstStatement(AstNode):
"""
A statement that can appear inside a plan.
Statements are the executable units within AgentSpeak plans. They consist
of a statement type (defining the operation) and an expression (defining
what to operate on).
:ivar type: The type of statement (action, goal, belief operation, etc.).
:ivar expression: The expression that this statement operates on.
"""
type: StatementType
expression: AstExpression
def _to_agentspeak(self) -> str:
"""
Converts this statement to its AgentSpeak string representation.
The representation consists of the statement type prefix followed by
the expression.
:return: The AgentSpeak string representation of this statement.
"""
return f"{self.type.value}{self.expression}"
@dataclass
class AstRule(AstNode):
"""
Represents an inference rule in AgentSpeak. If there is no condition, it always holds.
Rules define logical implications in AgentSpeak programs. They consist of a
result (conclusion) and an optional condition (premise). When the condition
holds, the result is inferred to be true.
:ivar result: The conclusion or result of this rule.
:ivar condition: The premise or condition for this rule (optional).
"""
result: AstExpression
condition: AstExpression | None = None
def __post_init__(self):
"""
Post-initialization processing to ensure proper expression types.
If a condition is provided, this method wraps it in an AstLogicalExpression
to ensure consistent handling throughout the AST.
"""
if self.condition is not None:
self.condition = _as_logical(self.condition)
def _to_agentspeak(self) -> str:
"""
Converts this rule to its AgentSpeak string representation.
If no condition is specified, the rule is represented as a simple fact.
If a condition is specified, it's represented as an implication (result :- condition).
:return: The AgentSpeak string representation of this rule.
"""
if not self.condition:
return f"{self.result}."
return f"{self.result} :- {self.condition}."
class TriggerType(StrEnum):
"""
Enumeration of trigger types for AgentSpeak plans.
Trigger types define what kind of events can activate an AgentSpeak plan.
Currently, the system supports triggers for added beliefs and added goals.
"""
ADDED_BELIEF = "+"
"""Trigger when a belief is added to the belief base."""
# REMOVED_BELIEF = "-" # TODO
# MODIFIED_BELIEF = "^" # TODO
ADDED_GOAL = "+!"
"""Trigger when a goal is added to be achieved."""
# REMOVED_GOAL = "-!" # TODO
@dataclass
class AstPlan(AstNode):
"""
Represents a plan in AgentSpeak, consisting of a trigger, context, and body.
Plans define the reactive behavior of agents in AgentSpeak. They specify what
actions to take when certain conditions are met (trigger and context).
:ivar type: The type of trigger that activates this plan.
:ivar trigger_literal: The specific event or condition that triggers this plan.
:ivar context: A list of conditions that must hold for this plan to be applicable.
:ivar body: A list of statements to execute when this plan is triggered.
"""
type: TriggerType
trigger_literal: AstExpression
context: list[AstExpression]
body: list[AstStatement]
def _to_agentspeak(self) -> str:
"""
Converts this plan to its AgentSpeak string representation.
The representation follows the standard AgentSpeak plan format:
trigger_type + trigger_literal
: context_conditions
<- body_statements.
:return: The AgentSpeak string representation of this plan.
"""
assert isinstance(self.trigger_literal, AstLiteral)
indent = " " * 6
@@ -260,10 +545,28 @@ class AstPlan(AstNode):
@dataclass
class AstProgram(AstNode):
"""
Represents a full AgentSpeak program, consisting of rules and plans.
This is the root node of the AgentSpeak AST, containing all the rules
and plans that define the agent's behavior.
:ivar rules: A list of inference rules in this program.
:ivar plans: A list of reactive plans in this program.
"""
rules: list[AstRule] = field(default_factory=list)
plans: list[AstPlan] = field(default_factory=list)
def _to_agentspeak(self) -> str:
"""
Converts this program to its AgentSpeak string representation.
The representation consists of all rules followed by all plans,
separated by blank lines for readability.
:return: The complete AgentSpeak source code for this program.
"""
lines = []
lines.extend(map(str, self.rules))

View File

@@ -1,11 +1,19 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from functools import singledispatchmethod
from slugify import slugify
from control_backend.agents.bdi.agentspeak_ast import (
AstAtom,
AstBinaryOp,
AstExpression,
AstLiteral,
AstNumber,
AstPlan,
AstProgram,
AstRule,
@@ -16,9 +24,13 @@ from control_backend.agents.bdi.agentspeak_ast import (
StatementType,
TriggerType,
)
from control_backend.core.config import settings
from control_backend.schemas.program import (
BaseGoal,
BasicNorm,
ConditionalNorm,
EmotionBelief,
FaceBelief,
GestureAction,
Goal,
InferredBelief,
@@ -37,12 +49,45 @@ from control_backend.schemas.program import (
class AgentSpeakGenerator:
"""
Generator class that translates a high-level :class:`~control_backend.schemas.program.Program`
into AgentSpeak(L) source code.
It handles the conversion of phases, norms, goals, and triggers into AgentSpeak rules and plans,
ensuring the robot follows the defined behavioral logic.
The generator follows a systematic approach:
1. Sets up initial phase and cycle notification rules
2. Adds keyword inference capabilities for natural language processing
3. Creates default plans for common operations
4. Processes each phase with its norms, goals, and triggers
5. Adds fallback plans for robust execution
:ivar _asp: The internal AgentSpeak program representation being built.
"""
_asp: AstProgram
def generate(self, program: Program) -> str:
"""
Translates a Program object into an AgentSpeak source string.
This is the main entry point for the code generation process. It initializes
the AgentSpeak program structure and orchestrates the conversion of all
program elements into their AgentSpeak representations.
:param program: The behavior program to translate.
:return: The generated AgentSpeak code as a string.
"""
self._asp = AstProgram()
self._asp.rules.append(AstRule(self._astify(program.phases[0])))
if program.phases:
self._asp.rules.append(AstRule(self._astify(program.phases[0])))
else:
self._asp.rules.append(AstRule(AstLiteral("phase", [AstString("end")])))
self._asp.rules.append(AstRule(AstLiteral("!notify_cycle")))
self._add_keyword_inference()
self._add_default_plans()
@@ -53,6 +98,18 @@ class AgentSpeakGenerator:
return str(self._asp)
def _add_keyword_inference(self) -> None:
"""
Adds inference rules for keyword detection in user messages.
This method creates rules that allow the system to detect when specific
keywords are mentioned in user messages. It uses string operations to
check if a keyword is a substring of the user's message.
The generated rule has the form:
keyword_said(Keyword) :- user_said(Message) & .substring(Keyword, Message, Pos) & Pos >= 0
This enables the system to trigger behaviors based on keyword detection.
"""
keyword = AstVar("Keyword")
message = AstVar("Message")
position = AstVar("Pos")
@@ -67,11 +124,32 @@ class AgentSpeakGenerator:
)
def _add_default_plans(self):
"""
Adds default plans for common operations.
This method sets up the standard plans that handle fundamental operations
like replying with goals, simple speech actions, general replies, and
cycle notifications. These plans provide the basic infrastructure for
the agent's reactive behavior.
"""
self._add_reply_with_goal_plan()
self._add_say_plan()
self._add_reply_plan()
self._add_notify_cycle_plan()
def _add_reply_with_goal_plan(self):
"""
Adds a plan for replying with a specific conversational goal.
This plan handles the case where the agent needs to respond to user input
while pursuing a specific conversational goal. It:
1. Marks that the agent has responded this turn
2. Gathers all active norms
3. Generates a reply that considers both the user message and the goal
Trigger: +!reply_with_goal(Goal)
Context: user_said(Message)
"""
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
@@ -97,6 +175,17 @@ class AgentSpeakGenerator:
)
def _add_say_plan(self):
"""
Adds a plan for simple speech actions.
This plan handles direct speech actions where the agent needs to say
a specific text. It:
1. Marks that the agent has responded this turn
2. Executes the speech action
Trigger: +!say(Text)
Context: None (can be executed anytime)
"""
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
@@ -110,6 +199,18 @@ class AgentSpeakGenerator:
)
def _add_reply_plan(self):
"""
Adds a plan for general reply actions.
This plan handles general reply actions where the agent needs to respond
to user input without a specific conversational goal. It:
1. Marks that the agent has responded this turn
2. Gathers all active norms
3. Generates a reply based on the user message and norms
Trigger: +!reply
Context: user_said(Message)
"""
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
@@ -132,7 +233,53 @@ class AgentSpeakGenerator:
)
)
def _add_notify_cycle_plan(self):
"""
Adds a plan for cycle notification.
This plan handles the periodic notification cycle that allows the system
to monitor and report on the current state. It:
1. Gathers all active norms
2. Notifies the system about the current norms
3. Waits briefly to allow processing
4. Recursively triggers the next cycle
Trigger: +!notify_cycle
Context: None (can be executed anytime)
"""
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("notify_cycle"),
[],
[
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
"findall",
[AstVar("Norm"), AstLiteral("norm", [AstVar("Norm")]), AstVar("Norms")],
),
),
AstStatement(
StatementType.DO_ACTION, AstLiteral("notify_norms", [AstVar("Norms")])
),
AstStatement(StatementType.DO_ACTION, AstLiteral("wait", [AstNumber(100)])),
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("notify_cycle")),
],
)
)
def _process_phases(self, phases: list[Phase]) -> None:
"""
Processes all phases in the program and their transitions.
This method iterates through each phase and:
1. Processes the current phase (norms, goals, triggers)
2. Sets up transitions between phases
3. Adds special handling for the end phase
:param phases: The list of phases to process.
"""
for curr_phase, next_phase in zip([None] + phases, phases + [None], strict=True):
if curr_phase:
self._process_phase(curr_phase)
@@ -146,13 +293,26 @@ class AgentSpeakGenerator:
trigger_literal=AstLiteral("user_said", [AstVar("Message")]),
context=[AstLiteral("phase", [AstString("end")])],
body=[
AstStatement(StatementType.DO_ACTION, AstLiteral("notify_user_said")),
AstStatement(
StatementType.DO_ACTION, AstLiteral("notify_user_said", [AstVar("Message")])
),
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("reply")),
],
)
)
def _process_phase(self, phase: Phase) -> None:
"""
Processes a single phase, including its norms, goals, and triggers.
This method handles the complete processing of a phase by:
1. Processing all norms in the phase
2. Setting up the default execution loop for the phase
3. Processing all goals in sequence
4. Processing all triggers for reactive behavior
:param phase: The phase to process.
"""
for norm in phase.norms:
self._process_norm(norm, phase)
@@ -167,6 +327,21 @@ class AgentSpeakGenerator:
self._process_trigger(trigger, phase)
def _add_phase_transition(self, from_phase: Phase | None, to_phase: Phase | None) -> None:
"""
Adds plans for transitioning between phases.
This method creates two plans for each phase transition:
1. A check plan that verifies if transition conditions are met
2. A force plan that actually performs the transition (can be forced externally)
The transition involves:
- Notifying the system about the phase change
- Removing the current phase belief
- Adding the next phase belief
:param from_phase: The phase being transitioned from (or None for initial setup).
:param to_phase: The phase being transitioned to (or None for end phase).
"""
if from_phase is None:
return
from_phase_ast = self._astify(from_phase)
@@ -174,30 +349,14 @@ class AgentSpeakGenerator:
self._astify(to_phase) if to_phase else AstLiteral("phase", [AstString("end")])
)
context = [from_phase_ast]
check_context = [from_phase_ast]
if from_phase:
for goal in from_phase.goals:
context.append(self._astify(goal, achieved=True))
check_context.append(self._astify(goal, achieved=True))
force_context = [from_phase_ast]
body = [
AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast),
AstStatement(StatementType.ADD_BELIEF, to_phase_ast),
]
# if from_phase:
# body.extend(
# [
# AstStatement(
# StatementType.TEST_GOAL, AstLiteral("user_said", [AstVar("Message")])
# ),
# AstStatement(
# StatementType.REPLACE_BELIEF, AstLiteral("user_said", [AstVar("Message")])
# ),
# ]
# )
# Notify outside world about transition
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral(
@@ -207,19 +366,51 @@ class AgentSpeakGenerator:
AstString(str(to_phase.id) if to_phase else "end"),
],
),
),
AstStatement(StatementType.REMOVE_BELIEF, from_phase_ast),
AstStatement(StatementType.ADD_BELIEF, to_phase_ast),
]
# Check
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("transition_phase"),
check_context,
[
AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("force_transition_phase")),
],
)
)
# Force
self._asp.plans.append(
AstPlan(TriggerType.ADDED_GOAL, AstLiteral("transition_phase"), context, body)
AstPlan(
TriggerType.ADDED_GOAL, AstLiteral("force_transition_phase"), force_context, body
)
)
def _process_norm(self, norm: Norm, phase: Phase) -> None:
"""
Processes a norm and adds it as an inference rule.
This method converts norms into AgentSpeak rules that define when
the norm should be active. It handles both basic norms (always active
in their phase) and conditional norms (active only when their condition
is met).
:param norm: The norm to process.
:param phase: The phase this norm belongs to.
"""
rule: AstRule | None = None
match norm:
case ConditionalNorm(condition=cond):
rule = AstRule(self._astify(norm), self._astify(phase) & self._astify(cond))
rule = AstRule(
self._astify(norm),
self._astify(phase) & self._astify(cond)
| AstAtom(f"force_{self.slugify(norm)}"),
)
case BasicNorm():
rule = AstRule(self._astify(norm), self._astify(phase))
@@ -229,6 +420,18 @@ class AgentSpeakGenerator:
self._asp.rules.append(rule)
def _add_default_loop(self, phase: Phase) -> None:
"""
Adds the default execution loop for a phase.
This method creates the main reactive loop that runs when the agent
receives user input during a phase. The loop:
1. Notifies the system about the user input
2. Resets the response tracking
3. Executes all phase goals
4. Attempts phase transition
:param phase: The phase to create the loop for.
"""
actions = []
actions.append(
@@ -237,7 +440,6 @@ class AgentSpeakGenerator:
)
)
actions.append(AstStatement(StatementType.REMOVE_BELIEF, AstLiteral("responded_this_turn")))
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, AstLiteral("check_triggers")))
for goal in phase.goals:
actions.append(AstStatement(StatementType.ACHIEVE_GOAL, self._astify(goal)))
@@ -261,6 +463,22 @@ class AgentSpeakGenerator:
continues_response: bool = False,
main_goal: bool = False,
) -> None:
"""
Processes a goal and creates plans for achieving it.
This method creates two plans for each goal:
1. A main plan that executes the goal's steps when conditions are met
2. A fallback plan that provides a default empty implementation (prevents crashes)
The method also recursively processes any subgoals contained within
the goal's plan.
:param goal: The goal to process.
:param phase: The phase this goal belongs to.
:param previous_goal: The previous goal in sequence (for dependency tracking).
:param continues_response: Whether this goal continues an existing response.
:param main_goal: Whether this is a main goal (for UI notification purposes).
"""
context: list[AstExpression] = [self._astify(phase)]
context.append(~self._astify(goal, achieved=True))
if previous_goal and previous_goal.can_fail:
@@ -303,14 +521,39 @@ class AgentSpeakGenerator:
prev_goal = subgoal
def _step_to_statement(self, step: PlanElement) -> AstStatement:
"""
Converts a plan step to an AgentSpeak statement.
This method transforms different types of plan elements into their
corresponding AgentSpeak statements. Goals and speech-related actions
become achieve-goal statements, while gesture actions become do-action
statements.
:param step: The plan element to convert.
:return: The corresponding AgentSpeak statement.
"""
match step:
# Note that SpeechAction gets included in the ACHIEVE_GOAL, since it's a goal internally
case Goal() | SpeechAction() | LLMAction() as a:
return AstStatement(StatementType.ACHIEVE_GOAL, self._astify(a))
case GestureAction() as a:
return AstStatement(StatementType.DO_ACTION, self._astify(a))
# TODO: separate handling of keyword and others
def _process_trigger(self, trigger: Trigger, phase: Phase) -> None:
"""
Processes a trigger and creates plans for its execution.
This method creates plans that execute when trigger conditions are met.
It handles both automatic triggering (when conditions are detected) and
manual forcing (from UI). The trigger execution includes:
1. Notifying the system about trigger start
2. Executing all trigger steps
3. Waiting briefly for UI display
4. Notifying the system about trigger end
:param trigger: The trigger to process.
:param phase: The phase this trigger belongs to.
"""
body = []
subgoals = []
@@ -325,6 +568,15 @@ class AgentSpeakGenerator:
if isinstance(step, Goal):
step.can_fail = False # triggers are continuous sequence
subgoals.append(step)
# Arbitrary wait for UI to display nicely
body.append(
AstStatement(
StatementType.DO_ACTION,
AstLiteral("wait", [AstNumber(settings.behaviour_settings.trigger_time_to_wait)]),
)
)
body.append(
AstStatement(
StatementType.DO_ACTION,
@@ -348,6 +600,18 @@ class AgentSpeakGenerator:
self._process_goal(subgoal, phase, continues_response=True)
def _add_fallbacks(self):
"""
Adds fallback plans for robust execution, preventing crashes.
This method creates fallback plans that provide default empty implementations
for key goals. These fallbacks ensure that the system can continue execution
even when no specific plans are applicable, preventing crashes.
The fallbacks are created for:
- check_triggers: When no triggers are applicable
- transition_phase: When phase transition conditions aren't met
- force_transition_phase: When forced transitions aren't possible
"""
# Trigger fallback
self._asp.plans.append(
AstPlan(
@@ -368,20 +632,77 @@ class AgentSpeakGenerator:
)
)
# Force phase transition fallback
self._asp.plans.append(
AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("force_transition_phase"),
[],
[AstStatement(StatementType.EMPTY, AstLiteral("true"))],
)
)
@singledispatchmethod
def _astify(self, element: ProgramElement) -> AstExpression:
"""
Converts program elements to AgentSpeak expressions (base method).
This is the base method for the singledispatch mechanism that handles
conversion of different program element types to their AgentSpeak
representations. Specific implementations are provided for each
element type through registered methods.
:param element: The program element to convert.
:return: The corresponding AgentSpeak expression.
:raises NotImplementedError: If no specific implementation exists for the element type.
"""
raise NotImplementedError(f"Cannot convert element {element} to an AgentSpeak expression.")
@_astify.register
def _(self, kwb: KeywordBelief) -> AstExpression:
"""
Converts a KeywordBelief to an AgentSpeak expression.
Keyword beliefs are converted to keyword_said literals that check
if the keyword was mentioned in user input.
:param kwb: The KeywordBelief to convert.
:return: An AstLiteral representing the keyword detection.
"""
return AstLiteral("keyword_said", [AstString(kwb.keyword)])
@_astify.register
def _(self, sb: SemanticBelief) -> AstExpression:
"""
Converts a SemanticBelief to an AgentSpeak expression.
Semantic beliefs are converted to literals using their slugified names,
which are used for LLM-based belief evaluation.
:param sb: The SemanticBelief to convert.
:return: An AstLiteral representing the semantic belief.
"""
return AstLiteral(self.slugify(sb))
@_astify.register
def _(self, eb: EmotionBelief) -> AstExpression:
return AstLiteral("emotion_detected", [AstAtom(eb.emotion)])
@_astify.register
def _(self, eb: FaceBelief) -> AstExpression:
return AstLiteral("face_present")
@_astify.register
def _(self, ib: InferredBelief) -> AstExpression:
"""
Converts an InferredBelief to an AgentSpeak expression.
Inferred beliefs are converted to binary operations that combine
their left and right operands using the appropriate logical operator.
:param ib: The InferredBelief to convert.
:return: An AstBinaryOp representing the logical combination.
"""
return AstBinaryOp(
self._astify(ib.left),
BinaryOperatorType.AND if ib.operator == LogicalOperator.AND else BinaryOperatorType.OR,
@@ -390,54 +711,187 @@ class AgentSpeakGenerator:
@_astify.register
def _(self, norm: Norm) -> AstExpression:
"""
Converts a Norm to an AgentSpeak expression.
Norms are converted to literals with either 'norm' or 'critical_norm'
functors depending on their critical flag, with the norm text as an argument.
Note that currently, critical norms are not yet functionally supported. They are possible
to astify for future use.
:param norm: The Norm to convert.
:return: An AstLiteral representing the norm.
"""
functor = "critical_norm" if norm.critical else "norm"
return AstLiteral(functor, [AstString(norm.norm)])
@_astify.register
def _(self, phase: Phase) -> AstExpression:
"""
Converts a Phase to an AgentSpeak expression.
Phases are converted to phase literals with their unique identifier
as an argument, which is used for phase tracking and transitions.
:param phase: The Phase to convert.
:return: An AstLiteral representing the phase.
"""
return AstLiteral("phase", [AstString(str(phase.id))])
@_astify.register
def _(self, goal: Goal, achieved: bool = False) -> AstExpression:
"""
Converts a Goal to an AgentSpeak expression.
Goals are converted to literals using their slugified names. If the
achieved parameter is True, the literal is prefixed with 'achieved_'.
:param goal: The Goal to convert.
:param achieved: Whether to represent this as an achieved goal.
:return: An AstLiteral representing the goal.
"""
return AstLiteral(f"{'achieved_' if achieved else ''}{self._slugify_str(goal.name)}")
@_astify.register
def _(self, trigger: Trigger) -> AstExpression:
"""
Converts a Trigger to an AgentSpeak expression.
Triggers are converted to literals using their slugified names,
which are used to identify and execute trigger plans.
:param trigger: The Trigger to convert.
:return: An AstLiteral representing the trigger.
"""
return AstLiteral(self.slugify(trigger))
@_astify.register
def _(self, sa: SpeechAction) -> AstExpression:
"""
Converts a SpeechAction to an AgentSpeak expression.
Speech actions are converted to say literals with the text content
as an argument, which are used for direct speech output.
:param sa: The SpeechAction to convert.
:return: An AstLiteral representing the speech action.
"""
return AstLiteral("say", [AstString(sa.text)])
@_astify.register
def _(self, ga: GestureAction) -> AstExpression:
"""
Converts a GestureAction to an AgentSpeak expression.
Gesture actions are converted to gesture literals with the gesture
type and name as arguments, which are used for physical robot gestures.
:param ga: The GestureAction to convert.
:return: An AstLiteral representing the gesture action.
"""
gesture = ga.gesture
return AstLiteral("gesture", [AstString(gesture.type), AstString(gesture.name)])
@_astify.register
def _(self, la: LLMAction) -> AstExpression:
"""
Converts an LLMAction to an AgentSpeak expression.
LLM actions are converted to reply_with_goal literals with the
conversational goal as an argument, which are used for LLM-generated
responses guided by specific goals.
:param la: The LLMAction to convert.
:return: An AstLiteral representing the LLM action.
"""
return AstLiteral("reply_with_goal", [AstString(la.goal)])
@singledispatchmethod
@staticmethod
def slugify(element: ProgramElement) -> str:
"""
Converts program elements to slugs (base method).
This is the base method for the singledispatch mechanism that handles
conversion of different program element types to their slug representations.
Specific implementations are provided for each element type through
registered methods.
Slugs are used outside of AgentSpeak, mostly for identifying what to send to the AgentSpeak
program as beliefs.
:param element: The program element to convert to a slug.
:return: The slug string representation.
:raises NotImplementedError: If no specific implementation exists for the element type.
"""
raise NotImplementedError(f"Cannot convert element {element} to a slug.")
@slugify.register
@staticmethod
def _(n: Norm) -> str:
"""
Converts a Norm to a slug.
Norms are converted to slugs with the 'norm_' prefix followed by
the slugified norm text.
:param n: The Norm to convert.
:return: The slug string representation.
"""
return f"norm_{AgentSpeakGenerator._slugify_str(n.norm)}"
@slugify.register
@staticmethod
def _(sb: SemanticBelief) -> str:
"""
Converts a SemanticBelief to a slug.
Semantic beliefs are converted to slugs with the 'semantic_' prefix
followed by the slugified belief name.
:param sb: The SemanticBelief to convert.
:return: The slug string representation.
"""
return f"semantic_{AgentSpeakGenerator._slugify_str(sb.name)}"
@slugify.register
@staticmethod
def _(g: Goal) -> str:
def _(g: BaseGoal) -> str:
"""
Converts a BaseGoal to a slug.
Goals are converted to slugs using their slugified names directly.
:param g: The BaseGoal to convert.
:return: The slug string representation.
"""
return AgentSpeakGenerator._slugify_str(g.name)
@slugify.register
@staticmethod
def _(t: Trigger):
def _(t: Trigger) -> str:
"""
Converts a Trigger to a slug.
Triggers are converted to slugs with the 'trigger_' prefix followed by
the slugified trigger name.
:param t: The Trigger to convert.
:return: The slug string representation.
"""
return f"trigger_{AgentSpeakGenerator._slugify_str(t.name)}"
@staticmethod
def _slugify_str(text: str) -> str:
"""
Converts a text string to a slug.
This helper method converts arbitrary text to a URL-friendly slug format
by converting to lowercase, removing special characters, and replacing
spaces with underscores. It also removes common stopwords.
:param text: The text string to convert.
:return: The slugified string.
"""
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])

View File

@@ -1,203 +0,0 @@
import typing
from dataclasses import dataclass, field
# --- Types ---
@dataclass
class BeliefLiteral:
"""
Represents a literal or atom.
Example: phase(1), user_said("hello"), ~started
"""
functor: str
args: list[str] = field(default_factory=list)
negated: bool = False
def __str__(self):
# In ASL, 'not' is usually for closed-world assumption (prolog style),
# '~' is for explicit negation in beliefs.
# For simplicity in behavior trees, we often use 'not' for conditions.
prefix = "not " if self.negated else ""
if not self.args:
return f"{prefix}{self.functor}"
# Clean args to ensure strings are quoted if they look like strings,
# but usually the converter handles the quoting of string literals.
args_str = ", ".join(self.args)
return f"{prefix}{self.functor}({args_str})"
@dataclass
class GoalLiteral:
name: str
def __str__(self):
return f"!{self.name}"
@dataclass
class ActionLiteral:
"""
Represents a step in a plan body.
Example: .say("Hello") or !achieve_goal
"""
code: str
def __str__(self):
return self.code
@dataclass
class BinaryOp:
"""
Represents logical operations.
Example: (A & B) | C
"""
left: "Expression | str"
operator: typing.Literal["&", "|"]
right: "Expression | str"
def __str__(self):
l_str = str(self.left)
r_str = str(self.right)
if isinstance(self.left, BinaryOp):
l_str = f"({l_str})"
if isinstance(self.right, BinaryOp):
r_str = f"({r_str})"
return f"{l_str} {self.operator} {r_str}"
Literal = BeliefLiteral | GoalLiteral | ActionLiteral
Expression = Literal | BinaryOp | str
@dataclass
class Rule:
"""
Represents an inference rule.
Example: head :- body.
"""
head: Expression
body: Expression | None = None
def __str__(self):
if not self.body:
return f"{self.head}."
return f"{self.head} :- {self.body}."
@dataclass
class PersistentRule:
"""
Represents an inference rule, where the inferred belief is persistent when formed.
"""
head: Expression
body: Expression
def __str__(self):
if not self.body:
raise Exception("Rule without body should not be persistent.")
lines = []
if isinstance(self.body, BinaryOp):
lines.append(f"+{self.body.left}")
if self.body.operator == "&":
lines.append(f" : {self.body.right}")
lines.append(f" <- +{self.head}.")
if self.body.operator == "|":
lines.append(f"+{self.body.right}")
lines.append(f" <- +{self.head}.")
return "\n".join(lines)
@dataclass
class Plan:
"""
Represents a plan.
Syntax: +trigger : context <- body.
"""
trigger: BeliefLiteral | GoalLiteral
context: list[Expression] = field(default_factory=list)
body: list[ActionLiteral] = field(default_factory=list)
def __str__(self):
# Indentation settings
INDENT = " "
ARROW = "\n <- "
COLON = "\n : "
# Build Header
header = f"+{self.trigger}"
if self.context:
ctx_str = f" &\n{INDENT}".join(str(c) for c in self.context)
header += f"{COLON}{ctx_str}"
# Case 1: Empty body
if not self.body:
return f"{header}."
# Case 2: Short body (optional optimization, keeping it uniform usually better)
header += ARROW
lines = []
# We start the first action on the same line or next line.
# Let's put it on the next line for readability if there are multiple.
if len(self.body) == 1:
return f"{header}{self.body[0]}."
# First item
lines.append(f"{header}{self.body[0]};")
# Middle items
for item in self.body[1:-1]:
lines.append(f"{INDENT}{item};")
# Last item
lines.append(f"{INDENT}{self.body[-1]}.")
return "\n".join(lines)
@dataclass
class AgentSpeakFile:
"""
Root element representing the entire generated file.
"""
initial_beliefs: list[Rule] = field(default_factory=list)
inference_rules: list[Rule | PersistentRule] = field(default_factory=list)
plans: list[Plan] = field(default_factory=list)
def __str__(self):
sections = []
if self.initial_beliefs:
sections.append("// --- Initial Beliefs & Facts ---")
sections.extend(str(rule) for rule in self.initial_beliefs)
sections.append("")
if self.inference_rules:
sections.append("// --- Inference Rules ---")
sections.extend(str(rule) for rule in self.inference_rules if isinstance(rule, Rule))
sections.append("")
sections.extend(
str(rule) for rule in self.inference_rules if isinstance(rule, PersistentRule)
)
sections.append("")
if self.plans:
sections.append("// --- Plans ---")
# Separate plans by a newline for readability
sections.extend(str(plan) + "\n" for plan in self.plans)
return "\n".join(sections)

View File

@@ -1,425 +0,0 @@
import asyncio
import time
from functools import singledispatchmethod
from slugify import slugify
from control_backend.agents.bdi import BDICoreAgent
from control_backend.agents.bdi.asl_ast import (
ActionLiteral,
AgentSpeakFile,
BeliefLiteral,
BinaryOp,
Expression,
GoalLiteral,
PersistentRule,
Plan,
Rule,
)
from control_backend.agents.bdi.bdi_program_manager import test_program
from control_backend.schemas.program import (
BasicBelief,
Belief,
ConditionalNorm,
GestureAction,
Goal,
InferredBelief,
KeywordBelief,
LLMAction,
LogicalOperator,
Phase,
Program,
ProgramElement,
SemanticBelief,
SpeechAction,
)
async def do_things():
res = input("Wanna generate")
if res == "y":
program = AgentSpeakGenerator().generate(test_program)
filename = f"{int(time.time())}.asl"
with open(filename, "w") as f:
f.write(program)
else:
# filename = "0test.asl"
filename = "1766062491.asl"
bdi_agent = BDICoreAgent("BDICoreAgent", filename)
flag = asyncio.Event()
await bdi_agent.start()
await flag.wait()
def do_other_things():
print(AgentSpeakGenerator().generate(test_program))
class AgentSpeakGenerator:
"""
Converts a Pydantic Program behavior model into an AgentSpeak(L) AST,
then renders it to a string.
"""
def generate(self, program: Program) -> str:
asl = AgentSpeakFile()
self._generate_startup(program, asl)
for i, phase in enumerate(program.phases):
next_phase = program.phases[i + 1] if i < len(program.phases) - 1 else None
self._generate_phase_flow(phase, next_phase, asl)
self._generate_norms(phase, asl)
self._generate_goals(phase, asl)
self._generate_triggers(phase, asl)
self._generate_fallbacks(program, asl)
return str(asl)
# --- Section: Startup & Phase Management ---
def _generate_startup(self, program: Program, asl: AgentSpeakFile):
if not program.phases:
return
# Initial belief: phase(start).
asl.initial_beliefs.append(Rule(head=BeliefLiteral("phase", ['"start"'])))
# Startup plan: +started : phase(start) <- -phase(start); +phase(first_id).
asl.plans.append(
Plan(
trigger=BeliefLiteral("started"),
context=[BeliefLiteral("phase", ['"start"'])],
body=[
ActionLiteral('-phase("start")'),
ActionLiteral(f'+phase("{program.phases[0].id}")'),
],
)
)
# Initial plans:
asl.plans.append(
Plan(
trigger=GoalLiteral("generate_response_with_goal(Goal)"),
context=[BeliefLiteral("user_said", ["Message"])],
body=[
ActionLiteral("+responded_this_turn"),
ActionLiteral(".findall(Norm, norm(Norm), Norms)"),
ActionLiteral(".reply_with_goal(Message, Norms, Goal)"),
],
)
)
def _generate_phase_flow(self, phase: Phase, next_phase: Phase | None, asl: AgentSpeakFile):
"""Generates the main loop listener and the transition logic for this phase."""
# +user_said(Message) : phase(ID) <- !goal1; !goal2; !transition_phase.
goal_actions = [ActionLiteral("-responded_this_turn")]
goal_actions += [
ActionLiteral(f"!check_{self._slugify_str(keyword)}")
for keyword in self._get_keyword_conditionals(phase)
]
goal_actions += [ActionLiteral(f"!{self._slugify(g)}") for g in phase.goals]
goal_actions.append(ActionLiteral("!transition_phase"))
asl.plans.append(
Plan(
trigger=BeliefLiteral("user_said", ["Message"]),
context=[BeliefLiteral("phase", [f'"{phase.id}"'])],
body=goal_actions,
)
)
# +!transition_phase : phase(ID) <- -phase(ID); +(NEXT_ID).
next_id = str(next_phase.id) if next_phase else "end"
transition_context = [BeliefLiteral("phase", [f'"{phase.id}"'])]
if phase.goals:
transition_context.append(BeliefLiteral(f"achieved_{self._slugify(phase.goals[-1])}"))
asl.plans.append(
Plan(
trigger=GoalLiteral("transition_phase"),
context=transition_context,
body=[
ActionLiteral(f'-phase("{phase.id}")'),
ActionLiteral(f'+phase("{next_id}")'),
ActionLiteral("user_said(Anything)"),
ActionLiteral("-+user_said(Anything)"),
],
)
)
def _get_keyword_conditionals(self, phase: Phase) -> list[str]:
res = []
for belief in self._extract_basic_beliefs_from_phase(phase):
if isinstance(belief, KeywordBelief):
res.append(belief.keyword)
return res
# --- Section: Norms & Beliefs ---
def _generate_norms(self, phase: Phase, asl: AgentSpeakFile):
for norm in phase.norms:
norm_slug = f'"{norm.norm}"'
head = BeliefLiteral("norm", [norm_slug])
# Base context is the phase
phase_lit = BeliefLiteral("phase", [f'"{phase.id}"'])
if isinstance(norm, ConditionalNorm):
self._ensure_belief_inference(norm.condition, asl)
condition_expr = self._belief_to_expr(norm.condition)
body = BinaryOp(phase_lit, "&", condition_expr)
else:
body = phase_lit
asl.inference_rules.append(Rule(head=head, body=body))
def _ensure_belief_inference(self, belief: Belief, asl: AgentSpeakFile):
"""
Recursively adds rules to infer beliefs.
Checks strictly to avoid duplicates if necessary,
though ASL engines often handle redefinition or we can use a set to track processed IDs.
"""
if isinstance(belief, KeywordBelief):
pass
# # Rule: keyword_said("word") :- user_said(M) & .substring("word", M, P) & P >= 0.
# kwd_slug = f'"{belief.keyword}"'
# head = BeliefLiteral("keyword_said", [kwd_slug])
#
# # Avoid duplicates
# if any(str(r.head) == str(head) for r in asl.inference_rules):
# return
#
# body = BinaryOp(
# BeliefLiteral("user_said", ["Message"]),
# "&",
# BinaryOp(f".substring({kwd_slug}, Message, Pos)", "&", "Pos >= 0"),
# )
#
# asl.inference_rules.append(Rule(head=head, body=body))
elif isinstance(belief, InferredBelief):
self._ensure_belief_inference(belief.left, asl)
self._ensure_belief_inference(belief.right, asl)
slug = self._slugify(belief)
head = BeliefLiteral(slug)
if any(str(r.head) == str(head) for r in asl.inference_rules):
return
op_char = "&" if belief.operator == LogicalOperator.AND else "|"
body = BinaryOp(
self._belief_to_expr(belief.left), op_char, self._belief_to_expr(belief.right)
)
asl.inference_rules.append(PersistentRule(head=head, body=body))
def _belief_to_expr(self, belief: Belief) -> Expression:
if isinstance(belief, KeywordBelief):
return BeliefLiteral("keyword_said", [f'"{belief.keyword}"'])
else:
return BeliefLiteral(self._slugify(belief))
# --- Section: Goals ---
def _generate_goals(self, phase: Phase, asl: AgentSpeakFile):
previous_goal: Goal | None = None
for goal in phase.goals:
self._generate_goal_plan_recursive(goal, str(phase.id), previous_goal, asl)
previous_goal = goal
def _generate_goal_plan_recursive(
self,
goal: Goal,
phase_id: str,
previous_goal: Goal | None,
asl: AgentSpeakFile,
responded_needed: bool = True,
can_fail: bool = True,
):
goal_slug = self._slugify(goal)
# phase(ID) & not responded_this_turn & not achieved_goal
context = [
BeliefLiteral("phase", [f'"{phase_id}"']),
]
if responded_needed:
context.append(BeliefLiteral("responded_this_turn", negated=True))
if can_fail:
context.append(BeliefLiteral(f"achieved_{goal_slug}", negated=True))
if previous_goal:
prev_slug = self._slugify(previous_goal)
context.append(BeliefLiteral(f"achieved_{prev_slug}"))
body_actions = []
sub_goals_to_process = []
for step in goal.plan.steps:
if isinstance(step, Goal):
sub_slug = self._slugify(step)
body_actions.append(ActionLiteral(f"!{sub_slug}"))
sub_goals_to_process.append(step)
elif isinstance(step, SpeechAction):
body_actions.append(ActionLiteral(f'.say("{step.text}")'))
elif isinstance(step, GestureAction):
body_actions.append(ActionLiteral(f'.gesture("{step.gesture}")'))
elif isinstance(step, LLMAction):
body_actions.append(ActionLiteral(f'!generate_response_with_goal("{step.goal}")'))
# Mark achievement
if not goal.can_fail:
body_actions.append(ActionLiteral(f"+achieved_{goal_slug}"))
asl.plans.append(Plan(trigger=GoalLiteral(goal_slug), context=context, body=body_actions))
asl.plans.append(
Plan(trigger=GoalLiteral(goal_slug), context=[], body=[ActionLiteral("true")])
)
prev_sub = None
for sub_goal in sub_goals_to_process:
self._generate_goal_plan_recursive(sub_goal, phase_id, prev_sub, asl)
prev_sub = sub_goal
# --- Section: Triggers ---
def _generate_triggers(self, phase: Phase, asl: AgentSpeakFile):
for keyword in self._get_keyword_conditionals(phase):
asl.plans.append(
Plan(
trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"),
context=[
ActionLiteral(
f'user_said(Message) & .substring("{keyword}", Message, Pos) & Pos >= 0'
)
],
body=[
ActionLiteral(f'+keyword_said("{keyword}")'),
ActionLiteral(f'-keyword_said("{keyword}")'),
],
)
)
asl.plans.append(
Plan(
trigger=GoalLiteral(f"check_{self._slugify_str(keyword)}"),
body=[ActionLiteral("true")],
)
)
for trigger in phase.triggers:
self._ensure_belief_inference(trigger.condition, asl)
trigger_belief_slug = self._belief_to_expr(trigger.condition)
body_actions = []
sub_goals = []
for step in trigger.plan.steps:
if isinstance(step, Goal):
sub_slug = self._slugify(step)
body_actions.append(ActionLiteral(f"!{sub_slug}"))
sub_goals.append(step)
elif isinstance(step, SpeechAction):
body_actions.append(ActionLiteral(f'.say("{step.text}")'))
elif isinstance(step, GestureAction):
body_actions.append(
ActionLiteral(f'.gesture("{step.gesture.type}", "{step.gesture.name}")')
)
elif isinstance(step, LLMAction):
body_actions.append(
ActionLiteral(f'!generate_response_with_goal("{step.goal}")')
)
asl.plans.append(
Plan(
trigger=BeliefLiteral(trigger_belief_slug),
context=[BeliefLiteral("phase", [f'"{phase.id}"'])],
body=body_actions,
)
)
# Recurse for triggered goals
prev_sub = None
for sub_goal in sub_goals:
self._generate_goal_plan_recursive(
sub_goal, str(phase.id), prev_sub, asl, False, False
)
prev_sub = sub_goal
# --- Section: Fallbacks ---
def _generate_fallbacks(self, program: Program, asl: AgentSpeakFile):
asl.plans.append(
Plan(trigger=GoalLiteral("transition_phase"), context=[], body=[ActionLiteral("true")])
)
# --- Helpers ---
@singledispatchmethod
def _slugify(self, element: ProgramElement) -> str:
if element.name:
raise NotImplementedError("Cannot slugify this element.")
return self._slugify_str(element.name)
@_slugify.register
def _(self, goal: Goal) -> str:
if goal.name:
return self._slugify_str(goal.name)
return f"goal_{goal.id.hex}"
@_slugify.register
def _(self, kwb: KeywordBelief) -> str:
return f"keyword_said({kwb.keyword})"
@_slugify.register
def _(self, sb: SemanticBelief) -> str:
return self._slugify_str(sb.description)
@_slugify.register
def _(self, ib: InferredBelief) -> str:
return self._slugify_str(ib.name)
def _slugify_str(self, text: str) -> str:
return slugify(text, separator="_", stopwords=["a", "an", "the", "we", "you", "I"])
def _extract_basic_beliefs_from_program(self, program: Program) -> list[BasicBelief]:
beliefs = []
for phase in program.phases:
beliefs.extend(self._extract_basic_beliefs_from_phase(phase))
return beliefs
def _extract_basic_beliefs_from_phase(self, phase: Phase) -> list[BasicBelief]:
beliefs = []
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
beliefs += self._extract_basic_beliefs_from_belief(norm.condition)
for trigger in phase.triggers:
beliefs += self._extract_basic_beliefs_from_belief(trigger.condition)
return beliefs
def _extract_basic_beliefs_from_belief(self, belief: Belief) -> list[BasicBelief]:
if isinstance(belief, InferredBelief):
return self._extract_basic_beliefs_from_belief(
belief.left
) + self._extract_basic_beliefs_from_belief(belief.right)
return [belief]
if __name__ == "__main__":
asyncio.run(do_things())
# do_other_things()y

View File

@@ -1,6 +1,13 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import copy
import json
import logging
import time
from collections.abc import Iterable
@@ -19,6 +26,9 @@ from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, Speec
DELIMITER = ";\n" # TODO: temporary until we support lists in AgentSpeak
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class BDICoreAgent(BaseAgent):
"""
BDI Core Agent.
@@ -107,7 +117,6 @@ class BDICoreAgent(BaseAgent):
if not maybe_more_work:
deadline = self.bdi_agent.shortest_deadline()
if deadline:
self.logger.debug("Sleeping until %s", deadline)
await asyncio.sleep(deadline - time.time())
maybe_more_work = True
else:
@@ -156,16 +165,19 @@ class BDICoreAgent(BaseAgent):
)
await self.send(out_msg)
case settings.agent_settings.user_interrupt_name:
content = msg.body
self.logger.debug("Received user interruption: %s", content)
self.logger.debug("Received user interruption: %s", msg)
match msg.thread:
case "force_phase_transition":
self._set_goal("transition_phase")
case "force_trigger":
self._force_trigger(msg.body)
case "force_norm":
self._force_norm(msg.body)
case "force_next_phase":
self._force_next_phase()
case _:
self.logger.warning("Received unknow user interruption: %s", msg)
self.logger.warning("Received unknown user interruption: %s", msg)
def _apply_belief_changes(self, belief_changes: BeliefMessage):
"""
@@ -205,6 +217,9 @@ class BDICoreAgent(BaseAgent):
else:
term = agentspeak.Literal(name)
if name != "user_said":
experiment_logger.observation(f"Formed new belief: {name}{f'={args}' if args else ''}")
self.bdi_agent.call(
agentspeak.Trigger.addition,
agentspeak.GoalType.belief,
@@ -242,6 +257,9 @@ class BDICoreAgent(BaseAgent):
new_args = (agentspeak.Literal(arg) for arg in args)
term = agentspeak.Literal(name, new_args)
if name != "user_said":
experiment_logger.observation(f"Removed belief: {name}{f'={args}' if args else ''}")
result = self.bdi_agent.call(
agentspeak.Trigger.removal,
agentspeak.GoalType.belief,
@@ -302,15 +320,21 @@ class BDICoreAgent(BaseAgent):
self.logger.debug(f"Set goal !{self.format_belief_string(name, args)}.")
def _force_trigger(self, name: str):
self.bdi_agent.call(
agentspeak.Trigger.addition,
agentspeak.GoalType.achievement,
agentspeak.Literal(name),
agentspeak.runtime.Intention(),
)
self._set_goal(name)
self.logger.info("Manually forced trigger %s.", name)
# TODO: make this compatible for critical norms
def _force_norm(self, name: str):
self._add_belief(f"force_{name}")
self.logger.info("Manually forced norm %s.", name)
def _force_next_phase(self):
self._set_goal("force_transition_phase")
self.logger.info("Manually forced phase transition.")
def _add_custom_actions(self) -> None:
"""
Add any custom actions here. Inside `@self.actions.add()`, the first argument is
@@ -326,14 +350,11 @@ class BDICoreAgent(BaseAgent):
message_text = agentspeak.grounded(term.args[0], intention.scope)
norms = agentspeak.grounded(term.args[1], intention.scope)
self.logger.debug("Norms: %s", norms)
self.logger.debug("User text: %s", message_text)
self.add_behavior(self._send_to_llm(str(message_text), str(norms), ""))
yield
@self.actions.add(".reply_with_goal", 3)
def _reply_with_goal(agent: "BDICoreAgent", term, intention):
def _reply_with_goal(agent, term, intention):
"""
Let the LLM generate a response to a user's utterance with the current norms and a
specific goal.
@@ -341,16 +362,22 @@ class BDICoreAgent(BaseAgent):
message_text = agentspeak.grounded(term.args[0], intention.scope)
norms = agentspeak.grounded(term.args[1], intention.scope)
goal = agentspeak.grounded(term.args[2], intention.scope)
self.logger.debug(
'"reply_with_goal" action called with message=%s, norms=%s, goal=%s',
message_text,
norms,
goal,
)
self.add_behavior(self._send_to_llm(str(message_text), str(norms), str(goal)))
yield
@self.actions.add(".notify_norms", 1)
def _notify_norms(agent, term, intention):
norms = agentspeak.grounded(term.args[0], intention.scope)
norm_update_message = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
thread="active_norms_update",
body=str(norms),
)
self.add_behavior(self.send(norm_update_message, should_log=False))
yield
@self.actions.add(".say", 1)
def _say(agent, term, intention):
"""
@@ -375,6 +402,8 @@ class BDICoreAgent(BaseAgent):
body=str(message_text),
)
experiment_logger.chat(str(message_text), extra={"role": "assistant"})
self.add_behavior(self.send(chat_history_message))
yield
@@ -459,7 +488,6 @@ class BDICoreAgent(BaseAgent):
body=str(trigger_name),
)
# TODO: check with Pim
self.add_behavior(self.send(msg))
yield

View File

@@ -1,10 +1,18 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import json
import logging
import zmq
from pydantic import ValidationError
from zmq.asyncio import Context
import control_backend
from control_backend.agents import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.config import settings
@@ -19,17 +27,21 @@ from control_backend.schemas.program import (
Program,
)
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class BDIProgramManager(BaseAgent):
"""
BDI Program Manager Agent.
This agent is responsible for receiving high-level programs (sequences of instructions/goals)
from the external HTTP API (via ZMQ) and translating them into core beliefs (norms and goals)
for the BDI Core Agent. In the future, it will be responsible for determining when goals are
met, and passing on new norms and goals accordingly.
from the external HTTP API (via ZMQ), transforming it into an AgentSpeak program, sharing the
program and its components to other agents, and keeping agents informed of the current state.
:ivar sub_socket: The ZMQ SUB socket used to receive program updates.
:ivar _program: The current Program.
:ivar _phase: The current Phase.
:ivar _goal_mapping: A mapping of goal IDs to goals.
"""
_program: Program
@@ -38,10 +50,32 @@ class BDIProgramManager(BaseAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sub_socket = None
self._goal_mapping: dict[str, Goal] = {}
def _initialize_internal_state(self, program: Program):
"""
Initialize the state of the program manager given a new Program. Reset the tracking of the
current phase to the first phase, make a mapping of goal IDs to goals, used during the life
of the program.
:param program: The new program.
"""
self._program = program
self._phase = program.phases[0] # start in first phase
self._goal_mapping = {}
for phase in program.phases:
for goal in phase.goals:
self._populate_goal_mapping_with_goal(goal)
def _populate_goal_mapping_with_goal(self, goal: Goal):
"""
Recurse through the given goal and its subgoals and add all goals found to the
``self._goal_mapping``.
:param goal: The goal to add to the ``self._goal_mapping``, including subgoals.
"""
self._goal_mapping[str(goal.id)] = goal
for step in goal.plan.steps:
if isinstance(step, Goal):
self._populate_goal_mapping_with_goal(step)
async def _create_agentspeak_and_send_to_bdi(self, program: Program):
"""
@@ -53,7 +87,7 @@ class BDIProgramManager(BaseAgent):
asl_str = asg.generate(program)
file_name = "src/control_backend/agents/bdi/agentspeak.asl"
file_name = settings.behaviour_settings.agentspeak_file
with open(file_name, "w") as f:
f.write(asl_str)
@@ -73,12 +107,39 @@ class BDIProgramManager(BaseAgent):
phases = json.loads(msg.body)
await self._transition_phase(phases["old"], phases["new"])
case "achieve_goal":
goal_id = msg.body
await self._send_achieved_goal_to_semantic_belief_extractor(goal_id)
async def _transition_phase(self, old: str, new: str):
assert old == str(self._phase.id)
"""
When receiving a signal from the BDI core that the phase has changed, apply this change to
the current state and inform other agents about the change.
:param old: The ID of the old phase.
:param new: The ID of the new phase.
"""
if self._phase is None:
return
if old != str(self._phase.id):
self.logger.warning(
f"Phase transition desync detected! ASL requested move from '{old}', "
f"but Python is currently in '{self._phase.id}'. Request ignored."
)
return
if new == "end":
self._phase = None
# Notify user interaction agent
msg = InternalMessage(
to=settings.agent_settings.user_interrupt_name,
thread="transition_phase",
body="end",
)
self.logger.info("Transitioned to end phase, notifying UserInterruptAgent.")
self.add_behavior(self.send(msg))
return
for phase in self._program.phases:
@@ -94,10 +155,18 @@ class BDIProgramManager(BaseAgent):
thread="transition_phase",
body=str(self._phase.id),
)
self.logger.info(f"Transitioned to phase {new}, notifying UserInterruptAgent.")
self.add_behavior(self.send(msg))
def _extract_current_beliefs(self) -> list[Belief]:
"""Extract beliefs from the current phase."""
assert self._phase is not None, (
"Invalid state, no phase set. Call this method only when "
"a program has been received and the end-phase has not "
"been reached."
)
beliefs: list[Belief] = []
for norm in self._phase.norms:
@@ -111,6 +180,7 @@ class BDIProgramManager(BaseAgent):
@staticmethod
def _extract_beliefs_from_belief(belief: Belief) -> list[Belief]:
"""Recursively extract beliefs from the given belief."""
if isinstance(belief, InferredBelief):
return BDIProgramManager._extract_beliefs_from_belief(
belief.left
@@ -118,9 +188,7 @@ class BDIProgramManager(BaseAgent):
return [belief]
async def _send_beliefs_to_semantic_belief_extractor(self):
"""
Extract beliefs from the program and send them to the Semantic Belief Extractor Agent.
"""
"""Extract beliefs from the program and send them to the Semantic Belief Extractor Agent."""
beliefs = BeliefList(beliefs=self._extract_current_beliefs())
message = InternalMessage(
@@ -132,23 +200,35 @@ class BDIProgramManager(BaseAgent):
await self.send(message)
@staticmethod
def _extract_goals_from_goal(goal: Goal) -> list[Goal]:
"""
Extract all goals from a given goal, that is: the goal itself and any subgoals.
:return: All goals within and including the given goal.
"""
goals: list[Goal] = [goal]
for step in goal.plan.steps:
if isinstance(step, Goal):
goals.extend(BDIProgramManager._extract_goals_from_goal(step))
return goals
def _extract_current_goals(self) -> list[Goal]:
"""
Extract all goals from the program, including subgoals.
:return: A list of Goal objects.
"""
assert self._phase is not None, (
"Invalid state, no phase set. Call this method only when "
"a program has been received and the end-phase has not "
"been reached."
)
goals: list[Goal] = []
def extract_goals_from_goal(goal_: Goal) -> list[Goal]:
goals_: list[Goal] = [goal]
for plan in goal_.plan:
if isinstance(plan, Goal):
goals_.extend(extract_goals_from_goal(plan))
return goals_
for goal in self._phase.goals:
goals.extend(extract_goals_from_goal(goal))
goals.extend(self._extract_goals_from_goal(goal))
return goals
@@ -167,6 +247,25 @@ class BDIProgramManager(BaseAgent):
await self.send(message)
async def _send_achieved_goal_to_semantic_belief_extractor(self, achieved_goal_id: str):
"""
Inform the semantic belief extractor when a goal is marked achieved.
:param achieved_goal_id: The id of the achieved goal.
"""
goal = self._goal_mapping.get(achieved_goal_id)
if goal is None:
self.logger.debug(f"Goal with ID {achieved_goal_id} marked achieved but was not found.")
return
goals = self._extract_goals_from_goal(goal)
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
body=GoalList(goals=goals).model_dump_json(),
thread="achieved_goals",
)
await self.send(message)
async def _send_clear_llm_history(self):
"""
Clear the LLM Agent's conversation history.
@@ -188,6 +287,18 @@ class BDIProgramManager(BaseAgent):
await self.send(extractor_msg)
self.logger.debug("Sent message to extractor agent to clear history.")
@staticmethod
def _rollover_experiment_logs():
"""
A new experiment program started; make a new experiment log file.
"""
handlers = logging.getLogger(settings.logging_settings.experiment_logger_name).handlers
for handler in handlers:
if isinstance(handler, control_backend.logging.DatedFileHandler):
experiment_logger.action("Doing rollover...")
handler.do_rollover()
experiment_logger.debug("Finished rollover.")
async def _receive_programs(self):
"""
Continuous loop that receives program updates from the HTTP endpoint.
@@ -206,8 +317,9 @@ class BDIProgramManager(BaseAgent):
continue
self._initialize_internal_state(program)
await self._send_program_to_user_interrupt(program)
await self._send_clear_llm_history()
self._rollover_experiment_logs()
await asyncio.gather(
self._create_agentspeak_and_send_to_bdi(program),
@@ -215,13 +327,30 @@ class BDIProgramManager(BaseAgent):
self._send_goals_to_semantic_belief_extractor(),
)
async def _send_program_to_user_interrupt(self, program: Program):
"""
Send the received program to the User Interrupt Agent.
:param program: The program object received from the API.
"""
msg = InternalMessage(
sender=self.name,
to=settings.agent_settings.user_interrupt_name,
body=program.model_dump_json(),
thread="new_program",
)
await self.send(msg)
async def setup(self):
"""
Initialize the agent.
Connects the internal ZMQ SUB socket and subscribes to the 'program' topic.
Starts the background behavior to receive programs.
Starts the background behavior to receive programs. Initializes a default program.
"""
await self._create_agentspeak_and_send_to_bdi(Program(phases=[]))
context = Context.instance()
self.sub_socket = context.socket(zmq.SUB)

View File

@@ -1,152 +0,0 @@
import json
from pydantic import ValidationError
from control_backend.agents.base import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief, BeliefMessage
class BDIBeliefCollectorAgent(BaseAgent):
"""
BDI Belief Collector Agent.
This agent acts as a central aggregator for beliefs derived from various sources (e.g., text,
emotion, vision). It receives raw extracted data from other agents,
normalizes them into valid :class:`Belief` objects, and forwards them as a unified packet to the
BDI Core Agent.
It serves as a funnel to ensure the BDI agent receives a consistent stream of beliefs.
"""
async def setup(self):
"""
Initialize the agent.
"""
self.logger.info("Setting up %s", self.name)
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages from other extractor agents.
Routes the message to specific handlers based on the 'type' field in the JSON body.
Supported types:
- ``belief_extraction_text``: Handled by :meth:`_handle_belief_text`
- ``emotion_extraction_text``: Handled by :meth:`_handle_emo_text`
:param msg: The received internal message.
"""
sender_node = msg.sender
# Parse JSON payload
try:
payload = json.loads(msg.body)
except Exception as e:
self.logger.warning(
"BeliefCollector: failed to parse JSON from %s. Body=%r Error=%s",
sender_node,
msg.body,
e,
)
return
msg_type = payload.get("type")
# Prefer explicit 'type' field
if msg_type == "belief_extraction_text":
self.logger.debug("Message routed to _handle_belief_text (sender=%s)", sender_node)
await self._handle_belief_text(payload, sender_node)
# This is not implemented yet, but we keep the structure for future use
elif msg_type == "emotion_extraction_text":
self.logger.debug("Message routed to _handle_emo_text (sender=%s)", sender_node)
await self._handle_emo_text(payload, sender_node)
else:
self.logger.warning(
"Unrecognized message (sender=%s, type=%r). Ignoring.", sender_node, msg_type
)
async def _handle_belief_text(self, payload: dict, origin: str):
"""
Process text-based belief extraction payloads.
Expected payload format::
{
"type": "belief_extraction_text",
"beliefs": {
"user_said": ["Can you help me?"],
"intention": ["ask_help"]
}
}
Validates and converts the dictionary items into :class:`Belief` objects.
:param payload: The dictionary payload containing belief data.
:param origin: The name of the sender agent.
"""
beliefs = payload.get("beliefs", {})
if not beliefs:
self.logger.debug("Received empty beliefs set.")
return
def try_create_belief(name, arguments) -> Belief | None:
"""
Create a belief object from name and arguments, or return None silently if the input is
not correct.
:param name: The name of the belief.
:param arguments: The arguments of the belief.
:return: A Belief object if the input is valid or None.
"""
try:
return Belief(name=name, arguments=arguments)
except ValidationError:
return None
beliefs = [
belief
for name, arguments in beliefs.items()
if (belief := try_create_belief(name, arguments)) is not None
]
self.logger.debug("Forwarding %d beliefs.", len(beliefs))
for belief in beliefs:
for argument in belief.arguments:
self.logger.debug(" - %s %s", belief.name, argument)
await self._send_beliefs_to_bdi(beliefs, origin=origin)
async def _handle_emo_text(self, payload: dict, origin: str):
"""
Process emotion extraction payloads.
**TODO**: Implement this method once emotion recognition is integrated.
:param payload: The dictionary payload containing emotion data.
:param origin: The name of the sender agent.
"""
pass
async def _send_beliefs_to_bdi(self, beliefs: list[Belief], origin: str | None = None):
"""
Send a list of aggregated beliefs to the BDI Core Agent.
Wraps the beliefs in a :class:`BeliefMessage` and sends it via the 'beliefs' thread.
:param beliefs: The list of Belief objects to send.
:param origin: (Optional) The original source of the beliefs (unused currently).
"""
if not beliefs:
return
msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=BeliefMessage(create=beliefs).model_dump_json(),
thread="beliefs",
)
await self.send(msg)
self.logger.info("Sent %d belief(s) to BDI core.", len(beliefs))

View File

@@ -1,6 +1,38 @@
norms("").
//This program has been developed by students from the bachelor Computer Science at Utrecht
//University within the Software Project course.
//© Copyright Utrecht University (Department of Information and Computing Sciences)
+user_said(Message) : norms(Norms) <-
.notify_user_said(Message);
-user_said(Message);
.reply(Message, Norms).
phase("end").
keyword_said(Keyword) :- (user_said(Message) & .substring(Keyword, Message, Pos)) & (Pos >= 0).
+!reply_with_goal(Goal)
: user_said(Message)
<- +responded_this_turn;
.findall(Norm, norm(Norm), Norms);
.reply_with_goal(Message, Norms, Goal).
+!say(Text)
<- +responded_this_turn;
.say(Text).
+!reply
: user_said(Message)
<- +responded_this_turn;
.findall(Norm, norm(Norm), Norms);
.reply(Message, Norms).
+!notify_cycle
<- .notify_ui;
.wait(1).
+user_said(Message)
: phase("end")
<- .notify_user_said(Message);
!reply.
+!check_triggers
<- true.
+!transition_phase
<- true.

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import json
@@ -12,12 +18,18 @@ from control_backend.schemas.belief_list import BeliefList, GoalList
from control_backend.schemas.belief_message import Belief as InternalBelief
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.chat_history import ChatHistory, ChatMessage
from control_backend.schemas.program import Goal, SemanticBelief
from control_backend.schemas.program import BaseGoal, SemanticBelief
type JSONLike = None | bool | int | float | str | list["JSONLike"] | dict[str, "JSONLike"]
class BeliefState(BaseModel):
"""
Represents the state of inferred semantic beliefs.
Maintains sets of beliefs that are currently considered true or false.
"""
true: set[InternalBelief] = set()
false: set[InternalBelief] = set()
@@ -62,6 +74,7 @@ class TextBeliefExtractorAgent(BaseAgent):
self.goal_inferrer = GoalAchievementInferrer(self._llm)
self._current_beliefs = BeliefState()
self._current_goal_completions: dict[str, bool] = {}
self._force_completed_goals: set[BaseGoal] = set()
self.conversation = ChatHistory(messages=[])
async def setup(self):
@@ -118,13 +131,19 @@ class TextBeliefExtractorAgent(BaseAgent):
case "goals":
self._handle_goals_message(msg)
await self._infer_goal_completions()
case "achieved_goals":
self._handle_goal_achieved_message(msg)
case "conversation_history":
if msg.body == "reset":
self._reset()
self._reset_phase()
case _:
self.logger.warning("Received unexpected message from %s", msg.sender)
def _reset(self):
def _reset_phase(self):
"""
Delete all state about the current phase, such as what beliefs exist and which ones are
true.
"""
self.conversation = ChatHistory(messages=[])
self.belief_inferrer.available_beliefs.clear()
self._current_beliefs = BeliefState()
@@ -132,6 +151,11 @@ class TextBeliefExtractorAgent(BaseAgent):
self._current_goal_completions = {}
def _handle_beliefs_message(self, msg: InternalMessage):
"""
Handle the message from the Program Manager agent containing the beliefs that exist for this
phase.
:param msg: A list of beliefs.
"""
try:
belief_list = BeliefList.model_validate_json(msg.body)
except ValidationError:
@@ -149,6 +173,11 @@ class TextBeliefExtractorAgent(BaseAgent):
)
def _handle_goals_message(self, msg: InternalMessage):
"""
Handle the message from the Program Manager agent containing the goals that exist for this
phase.
:param msg: A list of goals.
"""
try:
goals_list = GoalList.model_validate_json(msg.body)
except ValidationError:
@@ -158,7 +187,8 @@ class TextBeliefExtractorAgent(BaseAgent):
return
# Use only goals that can fail, as the others are always assumed to be completed
available_goals = [g for g in goals_list.goals if g.can_fail]
available_goals = {g for g in goals_list.goals if g.can_fail}
available_goals -= self._force_completed_goals
self.goal_inferrer.goals = available_goals
self.logger.debug(
"Received %d failable goals from the program manager: %s",
@@ -166,6 +196,28 @@ class TextBeliefExtractorAgent(BaseAgent):
", ".join(g.name for g in available_goals),
)
def _handle_goal_achieved_message(self, msg: InternalMessage):
"""
Handle message that gets sent when goals are marked achieved from a user interrupt. This
goal should then not be changed by this agent anymore.
:param msg: List of goals that are marked achieved.
"""
# NOTE: When goals can be marked unachieved, remember to re-add them to the goal_inferrer
try:
goals_list = GoalList.model_validate_json(msg.body)
except ValidationError:
self.logger.warning(
"Received goal achieved message from the program manager, "
"but it is not a valid list of goals."
)
return
for goal in goals_list.goals:
self._force_completed_goals.add(goal)
self._current_goal_completions[f"achieved_{AgentSpeakGenerator.slugify(goal)}"] = True
self.goal_inferrer.goals -= self._force_completed_goals
async def _user_said(self, text: str):
"""
Create a belief for the user's full speech.
@@ -183,6 +235,10 @@ class TextBeliefExtractorAgent(BaseAgent):
await self.send(belief_msg)
async def _infer_new_beliefs(self):
"""
Determine which beliefs hold and do not hold for the current conversation state. When
beliefs change, a message is sent to the BDI core.
"""
conversation_beliefs = await self.belief_inferrer.infer_from_conversation(self.conversation)
new_beliefs = conversation_beliefs - self._current_beliefs
@@ -206,6 +262,10 @@ class TextBeliefExtractorAgent(BaseAgent):
await self.send(message)
async def _infer_goal_completions(self):
"""
Determine which goals have been achieved given the current conversation state. When
a goal's achieved state changes, a message is sent to the BDI core.
"""
goal_completions = await self.goal_inferrer.infer_from_conversation(self.conversation)
new_achieved = [
@@ -291,6 +351,9 @@ class TextBeliefExtractorAgent(BaseAgent):
async with httpx.AsyncClient() as client:
response = await client.post(
settings.llm_settings.local_llm_url,
headers={"Authorization": f"Bearer {settings.llm_settings.api_key}"}
if settings.llm_settings.api_key
else {},
json={
"model": settings.llm_settings.local_llm_model,
"messages": [{"role": "user", "content": prompt}],
@@ -317,7 +380,7 @@ class TextBeliefExtractorAgent(BaseAgent):
class SemanticBeliefInferrer:
"""
Class that handles only prompting an LLM for semantic beliefs.
Infers semantic beliefs from conversation history using an LLM.
"""
def __init__(
@@ -347,19 +410,22 @@ class SemanticBeliefInferrer:
for beliefs in self._split_into_chunks(self.available_beliefs, n_parallel)
]
)
retval = BeliefState()
new_beliefs = BeliefState()
# Collect beliefs from all parallel calls
for beliefs in all_beliefs:
if beliefs is None:
continue
# For each, convert them to InternalBeliefs
for belief_name, belief_holds in beliefs.items():
# Skip beliefs that were marked not possible to determine
if belief_holds is None:
continue
belief = InternalBelief(name=belief_name, arguments=None)
if belief_holds:
retval.true.add(belief)
new_beliefs.true.add(belief)
else:
retval.false.add(belief)
return retval
new_beliefs.false.add(belief)
return new_beliefs
@staticmethod
def _split_into_chunks[T](items: list[T], n: int) -> list[list[T]]:
@@ -443,9 +509,13 @@ Respond with a JSON similar to the following, but with the property names as giv
class GoalAchievementInferrer(SemanticBeliefInferrer):
"""
Infers whether specific conversational goals have been achieved using an LLM.
"""
def __init__(self, llm: TextBeliefExtractorAgent.LLM):
super().__init__(llm)
self.goals = []
self.goals: set[BaseGoal] = set()
async def infer_from_conversation(self, conversation: ChatHistory) -> dict[str, bool]:
"""
@@ -465,7 +535,7 @@ class GoalAchievementInferrer(SemanticBeliefInferrer):
for goal, achieved in zip(self.goals, goals_achieved, strict=True)
}
async def _infer_goal(self, conversation: ChatHistory, goal: Goal) -> bool:
async def _infer_goal(self, conversation: ChatHistory, goal: BaseGoal) -> bool:
prompt = f"""{self._format_conversation(conversation)}
Given the above conversation, what has the following goal been achieved?

View File

@@ -1 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
--------------------------------------------------------------------------------
Agents responsible for external communication and service discovery.
"""
from .ri_communication_agent import RICommunicationAgent as RICommunicationAgent

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import json
@@ -7,10 +13,13 @@ from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.agents.actuation.robot_gesture_agent import RobotGestureAgent
from control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognition_agent import ( # noqa
VisualEmotionRecognitionAgent,
)
from control_backend.core.config import settings
from ..actuation.robot_speech_agent import RobotSpeechAgent
from ..perception import VADAgent
from ..perception import FacePerceptionAgent, VADAgent
class RICommunicationAgent(BaseAgent):
@@ -47,6 +56,9 @@ class RICommunicationAgent(BaseAgent):
self._req_socket: azmq.Socket | None = None
self.pub_socket: azmq.Socket | None = None
self.connected = False
self.gesture_agent: RobotGestureAgent | None = None
self.speech_agent: RobotSpeechAgent | None = None
self.visual_emotion_recognition_agent: VisualEmotionRecognitionAgent | None = None
async def setup(self):
"""
@@ -140,6 +152,7 @@ class RICommunicationAgent(BaseAgent):
# At this point, we have a valid response
try:
self.logger.debug("Negotiation successful.")
await self._handle_negotiation_response(received_message)
# Let UI know that we're connected
topic = b"ping"
@@ -168,7 +181,7 @@ class RICommunicationAgent(BaseAgent):
bind = port_data["bind"]
if not bind:
addr = f"tcp://{settings.ri_host}:{port}"
addr = f"tcp://localhost:{port}"
else:
addr = f"tcp://*:{port}"
@@ -188,6 +201,7 @@ class RICommunicationAgent(BaseAgent):
address=addr,
bind=bind,
)
self.speech_agent = robot_speech_agent
robot_gesture_agent = RobotGestureAgent(
settings.agent_settings.robot_gesture_name,
address=addr,
@@ -195,12 +209,28 @@ class RICommunicationAgent(BaseAgent):
gesture_data=gesture_data,
single_gesture_data=single_gesture_data,
)
self.gesture_agent = robot_gesture_agent
await robot_speech_agent.start()
await asyncio.sleep(0.1) # Small delay
await robot_gesture_agent.start()
case "audio":
vad_agent = VADAgent(audio_in_address=addr, audio_in_bind=bind)
await vad_agent.start()
case "video":
visual_emotion_agent = VisualEmotionRecognitionAgent(
settings.agent_settings.visual_emotion_recognition_name,
socket_address=addr,
bind=bind,
)
self.visual_emotion_recognition_agent = visual_emotion_agent
await visual_emotion_agent.start()
case "face":
face_agent = FacePerceptionAgent(
settings.agent_settings.face_agent_name,
zmq_address=addr,
zmq_bind=bind,
)
await face_agent.start()
case _:
self.logger.warning("Unhandled negotiation id: %s", id)
@@ -225,6 +255,7 @@ class RICommunicationAgent(BaseAgent):
while self._running:
if not self.connected:
await asyncio.sleep(settings.behaviour_settings.sleep_s)
self.logger.debug("Not connected, skipping ping loop iteration.")
continue
# We need to listen and send pings.
@@ -289,13 +320,28 @@ class RICommunicationAgent(BaseAgent):
# Tell UI we're disconnected.
topic = b"ping"
data = json.dumps(False).encode()
self.logger.debug("1")
if self.pub_socket:
try:
self.logger.debug("2")
await asyncio.wait_for(self.pub_socket.send_multipart([topic, data]), 5)
except TimeoutError:
self.logger.debug("3")
self.logger.warning("Connection ping for router timed out.")
# Try to reboot/renegotiate
if self.gesture_agent is not None:
await self.gesture_agent.stop()
if self.speech_agent is not None:
await self.speech_agent.stop()
if self.visual_emotion_recognition_agent is not None:
await self.visual_emotion_recognition_agent.stop()
if self.pub_socket is not None:
self.pub_socket.close()
self.logger.debug("Restarting communication negotiation.")
if await self._negotiate_connection(max_retries=1):
if await self._negotiate_connection(max_retries=2):
self.connected = True

View File

@@ -1 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
--------------------------------------------------------------------------------
Agents that interface with Large Language Models for natural language processing and generation.
"""
from .llm_agent import LLMAgent as LLMAgent

View File

@@ -1,4 +1,12 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import json
import logging
import re
import uuid
from collections.abc import AsyncGenerator
@@ -13,6 +21,8 @@ from control_backend.core.config import settings
from ...schemas.llm_prompt_message import LLMPromptMessage
from .llm_instructions import LLMInstructions
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class LLMAgent(BaseAgent):
"""
@@ -32,6 +42,10 @@ class LLMAgent(BaseAgent):
def __init__(self, name: str):
super().__init__(name)
self.history = []
self._querying = False
self._interrupted = False
self._interrupted_message = ""
self._go_ahead = asyncio.Event()
async def setup(self):
self.logger.info("Setting up %s.", self.name)
@@ -50,13 +64,13 @@ class LLMAgent(BaseAgent):
case "prompt_message":
try:
prompt_message = LLMPromptMessage.model_validate_json(msg.body)
await self._process_bdi_message(prompt_message)
self.add_behavior(self._process_bdi_message(prompt_message)) # no block
except ValidationError:
self.logger.debug("Prompt message from BDI core is invalid.")
case "assistant_message":
self.history.append({"role": "assistant", "content": msg.body})
self._apply_conversation_message({"role": "assistant", "content": msg.body})
case "user_message":
self.history.append({"role": "user", "content": msg.body})
self._apply_conversation_message({"role": "user", "content": msg.body})
elif msg.sender == settings.agent_settings.bdi_program_manager_name:
if msg.body == "clear_history":
self.logger.debug("Clearing conversation history.")
@@ -73,12 +87,45 @@ class LLMAgent(BaseAgent):
:param message: The parsed prompt message containing text, norms, and goals.
"""
if self._querying:
self.logger.debug("Received another BDI prompt while processing previous message.")
self._interrupted = True # interrupt the previous processing
await self._go_ahead.wait() # wait until we get the go-ahead
message.text = f"{self._interrupted_message} {message.text}"
self._go_ahead.clear()
self._querying = True
full_message = ""
async for chunk in self._query_llm(message.text, message.norms, message.goals):
if self._interrupted:
self._interrupted_message = message.text
self.logger.debug("Interrupted processing of previous message.")
break
await self._send_reply(chunk)
full_message += chunk
self.logger.debug("Finished processing BDI message. Response sent in chunks to BDI core.")
await self._send_full_reply(full_message)
else:
self._querying = False
self._apply_conversation_message(
{
"role": "assistant",
"content": full_message,
}
)
self.logger.debug(
"Finished processing BDI message. Response sent in chunks to BDI core."
)
await self._send_full_reply(full_message)
self._go_ahead.set()
self._interrupted = False
def _apply_conversation_message(self, message: dict[str, str]):
if len(self.history) > 0 and message["role"] == self.history[-1]["role"]:
self.history[-1]["content"] += " " + message["content"]
return
self.history.append(message)
async def _send_reply(self, msg: str):
"""
@@ -132,7 +179,7 @@ class LLMAgent(BaseAgent):
*self.history,
]
message_id = str(uuid.uuid4()) # noqa
message_id = str(uuid.uuid4())
try:
full_message = ""
@@ -141,10 +188,9 @@ class LLMAgent(BaseAgent):
full_message += token
current_chunk += token
self.logger.llm(
"Received token: %s",
experiment_logger.chat(
full_message,
extra={"reference": message_id}, # Used in the UI to update old logs
extra={"role": "assistant", "reference": message_id, "partial": True},
)
# Stream the message in chunks separated by punctuation.
@@ -160,11 +206,9 @@ class LLMAgent(BaseAgent):
if current_chunk:
yield current_chunk
self.history.append(
{
"role": "assistant",
"content": full_message,
}
experiment_logger.chat(
full_message,
extra={"role": "assistant", "reference": message_id, "partial": False},
)
except httpx.HTTPError as err:
self.logger.error("HTTP error.", exc_info=err)
@@ -185,6 +229,9 @@ class LLMAgent(BaseAgent):
async with client.stream(
"POST",
settings.llm_settings.local_llm_url,
headers={"Authorization": f"Bearer {settings.llm_settings.api_key}"}
if settings.llm_settings.api_key
else {},
json={
"model": settings.llm_settings.local_llm_model,
"messages": messages,

View File

@@ -1,3 +1,10 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
class LLMInstructions:
"""
Helper class to construct the system instructions for the LLM.

View File

@@ -1,3 +1,13 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
--------------------------------------------------------------------------------
Agents responsible for processing sensory input, such as audio transcription and voice activity
detection.
"""
from .face_rec_agent import FacePerceptionAgent as FacePerceptionAgent
from .transcription_agent.transcription_agent import (
TranscriptionAgent as TranscriptionAgent,
)

View File

@@ -0,0 +1,144 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import zmq
import zmq.asyncio as azmq
from control_backend.agents import BaseAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief, BeliefMessage
class FacePerceptionAgent(BaseAgent):
"""
Receives face presence updates from the RICommunicationAgent
via the internal PUB/SUB bus.
"""
def __init__(self, name: str, zmq_address: str, zmq_bind: bool):
"""
:param name: The name of the agent.
:param zmq_address: The ZMQ address to subscribe to, an endpoint which sends face presence
updates.
:param zmq_bind: Whether to connect to the ZMQ endpoint, or to bind.
"""
super().__init__(name)
self._zmq_address = zmq_address
self._zmq_bind = zmq_bind
self._socket: azmq.Socket | None = None
self._last_face_state: bool | None = None
# Pause functionality
# NOTE: flag is set when running, cleared when paused
self._paused = asyncio.Event()
self._paused.set()
async def setup(self):
self.logger.info("Starting FacePerceptionAgent")
if self._socket is None:
self._connect_socket()
self.add_behavior(self._poll_loop())
self.logger.info("Finished setting up %s", self.name)
def _connect_socket(self):
if self._socket is not None:
self.logger.warning("ZMQ socket already initialized. Did you call setup() twice?")
return
self._socket = azmq.Context.instance().socket(zmq.SUB)
self._socket.setsockopt_string(zmq.SUBSCRIBE, "")
if self._zmq_bind:
self._socket.bind(self._zmq_address)
else:
self._socket.connect(self._zmq_address)
async def _poll_loop(self):
if self._socket is None:
self.logger.warning("Connection not initialized before poll loop. Call setup() first.")
return
while self._running:
try:
await self._paused.wait()
response = await asyncio.wait_for(
self._socket.recv_json(), timeout=settings.behaviour_settings.sleep_s
)
face_present = response.get("face_detected", False)
if self._last_face_state is None:
self._last_face_state = face_present
continue
if face_present != self._last_face_state:
self._last_face_state = face_present
self.logger.debug("Face detected" if face_present else "Face lost")
await self._update_face_belief(face_present)
except TimeoutError:
pass
except Exception as e:
self.logger.error("Face polling failed", exc_info=e)
async def _post_face_belief(self, present: bool):
"""
Send a face_present belief update to the BDI Core Agent.
"""
if present:
belief_msg = BeliefMessage(create=[{"name": "face_present", "arguments": []}])
else:
belief_msg = BeliefMessage(delete=[{"name": "face_present", "arguments": []}])
msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
thread="beliefs",
body=belief_msg.model_dump_json(),
)
await self.send(msg)
async def _update_face_belief(self, present: bool):
"""
Add or remove the `face_present` belief in the BDI Core Agent.
"""
if present:
payload = BeliefMessage(create=[Belief(name="face_present").model_dump()])
else:
payload = BeliefMessage(delete=[Belief(name="face_present").model_dump()])
message = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
thread="beliefs",
body=payload.model_dump_json(),
)
await self.send(message)
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming pause/resume commands from User Interrupt Agent.
"""
sender = msg.sender
if sender == settings.agent_settings.user_interrupt_name:
if msg.body == "PAUSE":
self.logger.info("Pausing Face Perception processing.")
self._paused.clear()
self._last_face_state = None
elif msg.body == "RESUME":
self.logger.info("Resuming Face Perception processing.")
self._paused.set()
else:
self.logger.warning("Unknown command from User Interrupt Agent: %s", msg.body)
else:
self.logger.debug("Ignoring message from unknown sender: %s", sender)

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import abc
import sys
@@ -145,4 +151,6 @@ class OpenAIWhisperSpeechRecognizer(SpeechRecognizer):
def recognize_speech(self, audio: np.ndarray) -> str:
self.load_model()
return whisper.transcribe(self.model, audio, **self._get_decode_options(audio))["text"]
return whisper.transcribe(self.model, audio, **self._get_decode_options(audio))[
"text"
].strip()

View File

@@ -1,4 +1,11 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import logging
import numpy as np
import zmq
@@ -10,6 +17,8 @@ from control_backend.core.config import settings
from .speech_recognizer import SpeechRecognizer
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class TranscriptionAgent(BaseAgent):
"""
@@ -25,6 +34,8 @@ class TranscriptionAgent(BaseAgent):
:ivar audio_in_socket: The ZMQ SUB socket instance.
:ivar speech_recognizer: The speech recognition engine instance.
:ivar _concurrency: Semaphore to limit concurrent transcriptions.
:ivar _current_speech_reference: The reference of the current user utterance, for synchronising
experiment logs.
"""
def __init__(self, audio_in_address: str):
@@ -39,6 +50,7 @@ class TranscriptionAgent(BaseAgent):
self.audio_in_socket: azmq.Socket | None = None
self.speech_recognizer = None
self._concurrency = None
self._current_speech_reference: str | None = None
async def setup(self):
"""
@@ -63,6 +75,10 @@ class TranscriptionAgent(BaseAgent):
self.logger.info("Finished setting up %s", self.name)
async def handle_message(self, msg: InternalMessage):
if msg.thread == "voice_activity":
self._current_speech_reference = msg.body
async def stop(self):
"""
Stop the agent and close sockets.
@@ -74,7 +90,7 @@ class TranscriptionAgent(BaseAgent):
def _connect_audio_in_socket(self):
"""
Helper to connect the ZMQ SUB socket for audio input.
Connects the ZMQ SUB socket for receiving audio data.
"""
self.audio_in_socket = azmq.Context.instance().socket(zmq.SUB)
self.audio_in_socket.setsockopt_string(zmq.SUBSCRIBE, "")
@@ -96,24 +112,25 @@ class TranscriptionAgent(BaseAgent):
async def _share_transcription(self, transcription: str):
"""
Share a transcription to the other agents that depend on it.
Share a transcription to the other agents that depend on it, and to experiment logs.
Currently sends to:
- :attr:`settings.agent_settings.text_belief_extractor_name`
- The UI via the experiment logger
:param transcription: The transcribed text.
"""
receiver_names = [
settings.agent_settings.text_belief_extractor_name,
]
experiment_logger.chat(
transcription,
extra={"role": "user", "reference": self._current_speech_reference, "partial": False},
)
for receiver_name in receiver_names:
message = InternalMessage(
to=receiver_name,
sender=self.name,
body=transcription,
)
await self.send(message)
message = InternalMessage(
to=settings.agent_settings.text_belief_extractor_name,
sender=self.name,
body=transcription,
)
await self.send(message)
async def _transcribing_loop(self) -> None:
"""
@@ -129,10 +146,9 @@ class TranscriptionAgent(BaseAgent):
audio = np.frombuffer(audio_data, dtype=np.float32)
speech = await self._transcribe(audio)
if not speech:
self.logger.info("Nothing transcribed.")
self.logger.debug("Nothing transcribed.")
continue
self.logger.info("Transcribed speech: %s", speech)
await self._share_transcription(speech)
except Exception as e:
self.logger.error(f"Error in transcription loop: {e}")

View File

@@ -1,4 +1,12 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import logging
import uuid
import numpy as np
import torch
@@ -7,10 +15,13 @@ import zmq.asyncio as azmq
from control_backend.agents import BaseAgent
from control_backend.core.config import settings
from control_backend.schemas.internal_message import InternalMessage
from ...schemas.program_status import PROGRAM_STATUS, ProgramStatus
from .transcription_agent.transcription_agent import TranscriptionAgent
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class SocketPoller[T]:
"""
@@ -86,6 +97,12 @@ class VADAgent(BaseAgent):
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
self._ready = asyncio.Event()
# Pause control
self._reset_needed = False
self._paused = asyncio.Event()
self._paused.set() # Not paused at start
self.model = None
async def setup(self):
@@ -213,6 +230,16 @@ class VADAgent(BaseAgent):
"""
await self._ready.wait()
while self._running:
await self._paused.wait()
# After being unpaused, reset stream and buffers
if self._reset_needed:
self.logger.debug("Resuming: resetting stream and buffers.")
await self._reset_stream()
self.audio_buffer = np.array([], dtype=np.float32)
self.i_since_speech = settings.behaviour_settings.vad_initial_since_speech
self._reset_needed = False
assert self.audio_in_poller is not None
data = await self.audio_in_poller.poll()
if data is None:
@@ -235,6 +262,18 @@ class VADAgent(BaseAgent):
if prob > prob_threshold:
if self.i_since_speech > non_speech_patience + begin_silence_length:
self.logger.debug("Speech started.")
reference = str(uuid.uuid4())
experiment_logger.chat(
"...",
extra={"role": "user", "reference": reference, "partial": True},
)
await self.send(
InternalMessage(
to=settings.agent_settings.transcription_name,
body=reference,
thread="voice_activity",
)
)
self.audio_buffer = np.append(self.audio_buffer, chunk)
self.i_since_speech = 0
continue
@@ -256,3 +295,27 @@ class VADAgent(BaseAgent):
# Prepend the last few chunks that had no speech, for a more fluent boundary.
self.audio_buffer = np.append(self.audio_buffer, chunk)
self.audio_buffer = self.audio_buffer[-begin_silence_length * len(chunk) :]
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages.
Expects messages to pause or resume the VAD processing from User Interrupt Agent.
:param msg: The received internal message.
"""
sender = msg.sender
if sender == settings.agent_settings.user_interrupt_name:
if msg.body == "PAUSE":
self.logger.info("Pausing VAD processing.")
self._paused.clear()
# If the robot needs to pick up speaking where it left off, do not set _reset_needed
self._reset_needed = True
elif msg.body == "RESUME":
self.logger.info("Resuming VAD processing.")
self._paused.set()
else:
self.logger.warning(f"Unknown command from User Interrupt Agent: {msg.body}")
else:
self.logger.debug(f"Ignoring message from unknown sender: {sender}")

View File

@@ -0,0 +1,207 @@
import asyncio
import json
import time
from collections import Counter, defaultdict
import cv2
import numpy as np
import zmq
import zmq.asyncio as azmq
from pydantic_core import ValidationError
from control_backend.agents import BaseAgent
from control_backend.agents.perception.visual_emotion_recognition_agent.visual_emotion_recognizer import ( # noqa
DeepFaceEmotionRecognizer,
)
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief
class VisualEmotionRecognitionAgent(BaseAgent):
def __init__(
self,
name: str,
socket_address: str,
bind: bool = False,
timeout_ms: int = 1000,
window_duration: int = settings.behaviour_settings.visual_emotion_recognition_window_duration_s, # noqa
min_frames_required: int = settings.behaviour_settings.visual_emotion_recognition_min_frames_per_face, # noqa
):
"""
Initialize the Visual Emotion Recognition Agent.
:param name: Name of the agent
:param socket_address: Address of the socket to connect or bind to
:param bind: Whether to bind to the socket address (True) or connect (False)
:param timeout_ms: Timeout for socket receive operations in milliseconds
:param window_duration: Duration in seconds over which to aggregate emotions
:param min_frames_required: Minimum number of frames per face required to consider a face
valid
"""
super().__init__(name)
self.socket_address = socket_address
self.socket_bind = bind
self.timeout_ms = timeout_ms
self.window_duration = window_duration
self.min_frames_required = min_frames_required
# Pause functionality
# NOTE: flag is set when running, cleared when paused
self._paused = asyncio.Event()
self._paused.set()
async def setup(self):
"""
Initialize the agent resources.
1. Initializes the :class:`VisualEmotionRecognizer`.
2. Connects to the video input ZMQ socket.
3. Starts the background emotion recognition loop.
"""
self.logger.info("Setting up %s.", self.name)
self.emotion_recognizer = DeepFaceEmotionRecognizer()
self.video_in_socket = azmq.Context.instance().socket(zmq.SUB)
if self.socket_bind:
self.video_in_socket.bind(self.socket_address)
else:
self.video_in_socket.connect(self.socket_address)
self.video_in_socket.setsockopt_string(zmq.SUBSCRIBE, "")
self.video_in_socket.setsockopt(zmq.RCVTIMEO, self.timeout_ms)
self.video_in_socket.setsockopt(zmq.CONFLATE, 1)
self.add_behavior(self.emotion_update_loop())
self.logger.info("Finished setting up %s", self.name)
async def emotion_update_loop(self):
"""
Background loop to receive video frames, recognize emotions, and update beliefs.
1. Receives video frames from the ZMQ socket.
2. Uses the :class:`VisualEmotionRecognizer` to detect emotions.
3. Aggregates emotions over a time window.
4. Sends updates to the BDI Core Agent about detected emotions.
"""
# Next time to process the window and update emotions
next_window_time = time.time() + self.window_duration
# Tracks counts of detected emotions per face index
face_stats = defaultdict(Counter)
prev_dominant_emotions = set()
while self._running:
try:
await self._paused.wait()
frame_bytes = await self.video_in_socket.recv()
# Convert bytes to a numpy buffer
nparr = np.frombuffer(frame_bytes, np.uint8)
# Decode image into the generic Numpy Array DeepFace expects
frame_image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame_image is None:
# Could not decode image, skip this frame
self.logger.warning("Received invalid video frame, skipping.")
continue
# Get the dominant emotion from each face
current_emotions = self.emotion_recognizer.sorted_dominant_emotions(frame_image)
# Update emotion counts for each detected face
for i, emotion in enumerate(current_emotions):
face_stats[i][emotion] += 1
# If window duration has passed, process the collected stats
if time.time() >= next_window_time:
window_dominant_emotions = set()
# Determine dominant emotion for each face in the window
for _, counter in face_stats.items():
total_detections = sum(counter.values())
if total_detections >= self.min_frames_required:
dominant_emotion = counter.most_common(1)[0][0]
window_dominant_emotions.add(dominant_emotion)
await self.update_emotions(prev_dominant_emotions, window_dominant_emotions)
prev_dominant_emotions = window_dominant_emotions
face_stats.clear()
next_window_time = time.time() + self.window_duration
except zmq.Again:
self.logger.warning("No video frame received within timeout.")
async def update_emotions(self, prev_emotions: set[str], emotions: set[str]):
"""
Compare emotions from previous window and current emotions,
send updates to BDI Core Agent.
"""
emotions_to_remove = prev_emotions - emotions
emotions_to_add = emotions - prev_emotions
if not emotions_to_add and not emotions_to_remove:
return
emotion_beliefs_remove = []
for emotion in emotions_to_remove:
self.logger.info(f"Emotion '{emotion}' has disappeared.")
try:
emotion_beliefs_remove.append(
Belief(name="emotion_detected", arguments=[emotion], remove=True)
)
except ValidationError:
self.logger.warning("Invalid belief for emotion removal: %s", emotion)
emotion_beliefs_add = []
for emotion in emotions_to_add:
self.logger.info(f"New emotion detected: '{emotion}'")
try:
emotion_beliefs_add.append(Belief(name="emotion_detected", arguments=[emotion]))
except ValidationError:
self.logger.warning("Invalid belief for new emotion: %s", emotion)
beliefs_list_add = [b.model_dump() for b in emotion_beliefs_add]
beliefs_list_remove = [b.model_dump() for b in emotion_beliefs_remove]
payload = {"create": beliefs_list_add, "delete": beliefs_list_remove}
message = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=json.dumps(payload),
thread="beliefs",
)
await self.send(message)
async def handle_message(self, msg: InternalMessage):
"""
Handle incoming messages.
Expects messages to pause or resume the Visual Emotion Recognition
processing from User Interrupt Agent.
:param msg: The received internal message.
"""
sender = msg.sender
if sender == settings.agent_settings.user_interrupt_name:
if msg.body == "PAUSE":
self.logger.info("Pausing Visual Emotion Recognition processing.")
self._paused.clear()
elif msg.body == "RESUME":
self.logger.info("Resuming Visual Emotion Recognition processing.")
self._paused.set()
else:
self.logger.warning(f"Unknown command from User Interrupt Agent: {msg.body}")
else:
self.logger.debug(f"Ignoring message from unknown sender: {sender}")
async def stop(self):
"""
Clean up resources used by the agent.
"""
self.video_in_socket.close()
await super().stop()

View File

@@ -0,0 +1,54 @@
import abc
import numpy as np
from deepface import DeepFace
class VisualEmotionRecognizer(abc.ABC):
@abc.abstractmethod
def load_model(self):
"""Load the visual emotion recognition model into memory."""
pass
@abc.abstractmethod
def sorted_dominant_emotions(self, image) -> list[str]:
"""
Recognize dominant emotions from faces in the given image.
Emotions can be one of ['angry', 'disgust', 'fear', 'happy', 'sad', 'surprise', 'neutral'].
To minimize false positives, consider filtering faces with low confidence.
:param image: The input image for emotion recognition.
:return: List of dominant emotion detected for each face in the image,
sorted per face.
"""
pass
class DeepFaceEmotionRecognizer(VisualEmotionRecognizer):
"""
DeepFace-based implementation of VisualEmotionRecognizer.
DeepFape has proven to be quite a pessimistic model, so expect sad, fear and neutral
emotions to be over-represented.
"""
def __init__(self):
self.load_model()
def load_model(self):
print("Loading Deepface Emotion Model...")
dummy_img = np.zeros((224, 224, 3), dtype=np.uint8)
# analyze does not take a model as an argument, calling it once on a dummy image to load
# the model
DeepFace.analyze(dummy_img, actions=["emotion"], enforce_detection=False)
print("Deepface Emotion Model loaded.")
def sorted_dominant_emotions(self, image) -> list[str]:
analysis = DeepFace.analyze(image, actions=["emotion"], enforce_detection=False)
# Sort faces by x coordinate to maintain left-to-right order
analysis.sort(key=lambda face: face["region"]["x"])
analysis = [face for face in analysis if face["face_confidence"] >= 0.90]
dominant_emotions = [face["dominant_emotion"] for face in analysis]
return dominant_emotions

View File

@@ -0,0 +1,5 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,12 +1,28 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import json
import logging
import zmq
from zmq.asyncio import Context
from control_backend.agents import BaseAgent
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.ri_message import GestureCommand, RIEndpoint, SpeechCommand
from control_backend.schemas.belief_message import Belief, BeliefMessage
from control_backend.schemas.program import ConditionalNorm, Goal, Program
from control_backend.schemas.ri_message import (
GestureCommand,
RIEndpoint,
SpeechCommand,
)
experiment_logger = logging.getLogger(settings.logging_settings.experiment_logger_name)
class UserInterruptAgent(BaseAgent):
@@ -18,29 +34,55 @@ class UserInterruptAgent(BaseAgent):
- Send a prioritized message to the `RobotSpeechAgent`
- Send a prioritized gesture to the `RobotGestureAgent`
- Send a belief override to the `BDIProgramManager`in order to activate a
- Send a belief override to the `BDI Core` in order to activate a
trigger/conditional norm or complete a goal.
Prioritized actions clear the current RI queue before inserting the new item,
ensuring they are executed immediately after Pepper's current action has been fulfilled.
:ivar sub_socket: The ZMQ SUB socket used to receive user intterupts.
:ivar sub_socket: The ZMQ SUB socket used to receive user interrupts.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sub_socket = None
self.pub_socket = None
self._trigger_map = {}
self._trigger_reverse_map = {}
self._goal_map = {} # id -> sluggified goal
self._goal_reverse_map = {} # sluggified goal -> id
self._cond_norm_map = {} # id -> sluggified cond norm
self._cond_norm_reverse_map = {} # sluggified cond norm -> id
async def setup(self):
"""
Initialize the agent by setting up ZMQ sockets for receiving button events and
publishing updates.
"""
context = Context.instance()
self.sub_socket = context.socket(zmq.SUB)
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
self.sub_socket.subscribe("button_pressed")
self.pub_socket = context.socket(zmq.PUB)
self.pub_socket.connect(settings.zmq_settings.internal_pub_address)
self.add_behavior(self._receive_button_event())
async def _receive_button_event(self):
"""
The behaviour of the UserInterruptAgent.
Continuous loop that receives button_pressed events from the button_pressed HTTP endpoint.
These events contain a type and a context.
Main loop to receive and process button press events from the UI.
These are the different types and contexts:
- type: "speech", context: string that the robot has to say.
- type: "gesture", context: single gesture name that the robot has to perform.
- type: "override", context: belief_id that overrides the goal/trigger/conditional norm.
Handles different event types:
- `speech`: Triggers immediate robot speech.
- `gesture`: Triggers an immediate robot gesture.
- `override`: Forces a belief, trigger, or goal completion in the BDI core.
- `override_unachieve`: Removes a belief from the BDI core.
- `pause`: Toggles the system's pause state.
- `next_phase` / `reset_phase`: Controls experiment flow.
"""
while True:
topic, body = await self.sub_socket.recv_multipart()
@@ -53,30 +95,217 @@ class UserInterruptAgent(BaseAgent):
self.logger.error("Received invalid JSON payload on topic %s", topic)
continue
if event_type == "speech":
await self._send_to_speech_agent(event_context)
self.logger.info(
"Forwarded button press (speech) with context '%s' to RobotSpeechAgent.",
event_context,
)
elif event_type == "gesture":
await self._send_to_gesture_agent(event_context)
self.logger.info(
"Forwarded button press (gesture) with context '%s' to RobotGestureAgent.",
event_context,
)
elif event_type == "override":
await self._send_to_program_manager(event_context)
self.logger.info(
"Forwarded button press (override) with context '%s' to BDIProgramManager.",
event_context,
)
else:
self.logger.warning(
"Received button press with unknown type '%s' (context: '%s').",
event_type,
event_context,
)
self.logger.debug("Received event type %s", event_type)
match event_type:
case "speech":
await self._send_to_speech_agent(event_context)
self.logger.info(
"Forwarded button press (speech) with context '%s' to RobotSpeechAgent.",
event_context,
)
case "gesture":
await self._send_to_gesture_agent(event_context)
self.logger.info(
"Forwarded button press (gesture) with context '%s' to RobotGestureAgent.",
event_context,
)
case "override":
ui_id = str(event_context)
if asl_trigger := self._trigger_map.get(ui_id):
await self._send_to_bdi("force_trigger", asl_trigger)
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
event_context,
)
elif asl_cond_norm := self._cond_norm_map.get(ui_id):
await self._send_to_bdi_belief(asl_cond_norm, "cond_norm")
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
event_context,
)
elif asl_goal := self._goal_map.get(ui_id):
await self._send_to_bdi_belief(asl_goal, "goal")
self.logger.info(
"Forwarded button press (override) with context '%s' to BDI Core.",
event_context,
)
# Send achieve_goal to program manager to update semantic belief extractor
goal_achieve_msg = InternalMessage(
to=settings.agent_settings.bdi_program_manager_name,
thread="achieve_goal",
body=ui_id,
)
await self.send(goal_achieve_msg)
else:
self.logger.warning("Could not determine which element to override.")
case "override_unachieve":
ui_id = str(event_context)
if asl_cond_norm := self._cond_norm_map.get(ui_id):
await self._send_to_bdi_belief(asl_cond_norm, "cond_norm", True)
self.logger.info(
"Forwarded button press (override_unachieve)"
"with context '%s' to BDI Core.",
event_context,
)
else:
self.logger.warning(
"Could not determine which conditional norm to unachieve."
)
case "pause":
self.logger.debug(
"Received pause/resume button press with context '%s'.", event_context
)
await self._send_pause_command(event_context)
if event_context:
self.logger.info("Sent pause command.")
else:
self.logger.info("Sent resume command.")
case "next_phase" | "reset_phase":
await self._send_experiment_control_to_bdi_core(event_type)
case _:
self.logger.warning(
"Received button press with unknown type '%s' (context: '%s').",
event_type,
event_context,
)
async def handle_message(self, msg: InternalMessage):
"""
Handles internal messages from other agents, such as program updates or trigger
notifications.
:param msg: The incoming :class:`~control_backend.core.agent_system.InternalMessage`.
"""
match msg.thread:
case "new_program":
self._create_mapping(msg.body)
case "trigger_start":
# msg.body is the sluggified trigger
asl_slug = msg.body
ui_id = self._trigger_reverse_map.get(asl_slug)
if ui_id:
payload = {"type": "trigger_update", "id": ui_id, "achieved": True}
await self._send_experiment_update(payload)
self.logger.info(f"UI Update: Trigger {asl_slug} started (ID: {ui_id})")
case "trigger_end":
asl_slug = msg.body
ui_id = self._trigger_reverse_map.get(asl_slug)
if ui_id:
payload = {"type": "trigger_update", "id": ui_id, "achieved": False}
await self._send_experiment_update(payload)
self.logger.info(f"UI Update: Trigger {asl_slug} ended (ID: {ui_id})")
case "transition_phase":
new_phase_id = msg.body
self.logger.info(f"Phase transition detected: {new_phase_id}")
payload = {"type": "phase_update", "id": new_phase_id}
await self._send_experiment_update(payload)
case "goal_start":
goal_name = msg.body
ui_id = self._goal_reverse_map.get(goal_name)
if ui_id:
payload = {"type": "goal_update", "id": ui_id, "active": True}
await self._send_experiment_update(payload)
self.logger.info(f"UI Update: Goal {goal_name} started (ID: {ui_id})")
case "active_norms_update":
active_norms_asl = [
s.strip("() '\",") for s in msg.body.split(",") if s.strip("() '\",")
]
await self._broadcast_cond_norms(active_norms_asl)
case _:
self.logger.debug(f"Received internal message on unhandled thread: {msg.thread}")
async def _broadcast_cond_norms(self, active_slugs: list[str]):
"""
Broadcasts the current activation state of all conditional norms to the UI.
:param active_slugs: A list of sluggified norm names currently active in the BDI core.
"""
updates = []
for asl_slug, ui_id in self._cond_norm_reverse_map.items():
is_active = asl_slug in active_slugs
updates.append({"id": ui_id, "active": is_active})
payload = {"type": "cond_norms_state_update", "norms": updates}
if self.pub_socket:
topic = b"status"
body = json.dumps(payload).encode("utf-8")
await self.pub_socket.send_multipart([topic, body])
# self.logger.info(f"UI Update: Active norms {updates}")
def _create_mapping(self, program_json: str):
"""
Creates a bidirectional mapping between UI identifiers and AgentSpeak slugs.
:param program_json: The JSON representation of the behavioral program.
"""
try:
program = Program.model_validate_json(program_json)
self._trigger_map = {}
self._trigger_reverse_map = {}
self._goal_map = {}
self._cond_norm_map = {}
self._cond_norm_reverse_map = {}
def _register_goal(goal: Goal):
"""Recursively register goals and their subgoals."""
slug = AgentSpeakGenerator.slugify(goal)
self._goal_map[str(goal.id)] = slug
self._goal_reverse_map[slug] = str(goal.id)
for step in goal.plan.steps:
if isinstance(step, Goal):
_register_goal(step)
for phase in program.phases:
for trigger in phase.triggers:
slug = AgentSpeakGenerator.slugify(trigger)
self._trigger_map[str(trigger.id)] = slug
self._trigger_reverse_map[slug] = str(trigger.id)
for goal in phase.goals:
_register_goal(goal)
for goal, id in self._goal_reverse_map.items():
self.logger.debug(f"Goal mapping: UI ID {goal} -> {id}")
for norm in phase.norms:
if isinstance(norm, ConditionalNorm):
asl_slug = AgentSpeakGenerator.slugify(norm)
norm_id = str(norm.id)
self._cond_norm_map[norm_id] = asl_slug
self._cond_norm_reverse_map[norm.norm] = norm_id
self.logger.debug("Added conditional norm %s", asl_slug)
self.logger.info(
f"Mapped {len(self._trigger_map)} triggers and {len(self._goal_map)} goals "
f"and {len(self._cond_norm_map)} conditional norms for UserInterruptAgent."
)
except Exception as e:
self.logger.error(f"Mapping failed: {e}")
async def _send_experiment_update(self, data, should_log: bool = True):
"""
Publishes an experiment state update to the internal ZMQ bus for the UI.
:param data: The update payload.
:param should_log: Whether to log the update.
"""
if self.pub_socket:
topic = b"experiment"
body = json.dumps(data).encode("utf-8")
await self.pub_socket.send_multipart([topic, body])
if should_log:
self.logger.debug(f"Sent experiment update: {data}")
async def _send_to_speech_agent(self, text_to_say: str):
"""
@@ -84,6 +313,7 @@ class UserInterruptAgent(BaseAgent):
:param text_to_say: The string that the robot has to say.
"""
experiment_logger.chat(text_to_say, extra={"role": "assistant"})
cmd = SpeechCommand(data=text_to_say, is_priority=True)
out_msg = InternalMessage(
to=settings.agent_settings.robot_speech_name,
@@ -109,38 +339,93 @@ class UserInterruptAgent(BaseAgent):
)
await self.send(out_msg)
async def _send_to_program_manager(self, belief_id: str):
"""
Send a button_override belief to the BDIProgramManager.
async def _send_to_bdi(self, thread: str, body: str):
"""Send slug of trigger to BDI"""
msg = InternalMessage(to=settings.agent_settings.bdi_core_name, thread=thread, body=body)
await self.send(msg)
self.logger.info(f"Directly forced {thread} in BDI: {body}")
:param belief_id: The belief_id that overrides the goal/trigger/conditional norm.
this id can belong to a basic belief or an inferred belief.
See also: https://utrechtuniversity.youtrack.cloud/articles/N25B-A-27/UI-components
async def _send_to_bdi_belief(self, asl: str, asl_type: str, unachieve: bool = False):
"""Send belief to BDI Core"""
if asl_type == "goal":
belief_name = f"achieved_{asl}"
elif asl_type == "cond_norm":
belief_name = f"force_{asl}"
else:
self.logger.warning("Tried to send belief with unknown type")
return
belief = Belief(name=belief_name, arguments=None)
self.logger.debug(f"Sending belief to BDI Core: {belief_name}")
# Conditional norms are unachieved by removing the belief
belief_message = (
BeliefMessage(delete=[belief]) if unachieve else BeliefMessage(create=[belief])
)
msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
thread="beliefs",
body=belief_message.model_dump_json(),
)
await self.send(msg)
async def _send_experiment_control_to_bdi_core(self, type):
"""
data = {"belief": belief_id}
message = InternalMessage(
to=settings.agent_settings.bdi_program_manager_name,
method to send experiment control buttons to bdi core.
:param type: the type of control button we should send to the bdi core.
"""
# Switch which thread we should send to bdi core
thread = ""
match type:
case "next_phase":
thread = "force_next_phase"
case "reset_phase":
thread = "reset_current_phase"
case "reset_experiment":
thread = "reset_experiment"
case _:
self.logger.warning(
"Received unknown experiment control type '%s' to send to BDI Core.",
type,
)
out_msg = InternalMessage(
to=settings.agent_settings.bdi_core_name,
sender=self.name,
body=json.dumps(data),
thread="belief_override_id",
)
await self.send(message)
self.logger.info(
"Sent button_override belief with id '%s' to Program manager.",
belief_id,
thread=thread,
body="",
)
self.logger.debug("Sending experiment control '%s' to BDI Core.", thread)
await self.send(out_msg)
async def setup(self):
async def _send_pause_command(self, pause: str):
"""
Initialize the agent.
Connects the internal ZMQ SUB socket and subscribes to the 'button_pressed' topic.
Starts the background behavior to receive the user interrupts.
Send a pause command to the other internal agents; for now just VAD and VED agent.
"""
context = Context.instance()
self.sub_socket = context.socket(zmq.SUB)
self.sub_socket.connect(settings.zmq_settings.internal_sub_address)
self.sub_socket.subscribe("button_pressed")
self.add_behavior(self._receive_button_event())
if pause == "true":
# Send pause to VAD and VED agent
vad_message = InternalMessage(
to=[
settings.agent_settings.vad_name,
settings.agent_settings.visual_emotion_recognition_name,
settings.agent_settings.face_agent_name,
],
sender=self.name,
body="PAUSE",
)
await self.send(vad_message)
# Voice Activity Detection and Visual Emotion Recognition agents
self.logger.info("Sent pause command to perception agents.")
else:
# Send resume to VAD and VED agents
vad_message = InternalMessage(
to=[
settings.agent_settings.vad_name,
settings.agent_settings.visual_emotion_recognition_name,
settings.agent_settings.face_agent_name,
],
sender=self.name,
body="RESUME",
)
await self.send(vad_message)
# Voice Activity Detection and Visual Emotion Recognition agents
self.logger.info("Sent resume command to perception agents.")

View File

@@ -0,0 +1,5 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -0,0 +1,5 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -0,0 +1,5 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,31 +0,0 @@
import logging
from fastapi import APIRouter, Request
from control_backend.schemas.events import ButtonPressedEvent
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/button_pressed", status_code=202)
async def receive_button_event(event: ButtonPressedEvent, request: Request):
"""
Endpoint to handle external button press events.
Validates the event payload and publishes it to the internal 'button_pressed' topic.
Subscribers (in this case user_interrupt_agent) will pick this up to trigger
specific behaviors or state changes.
:param event: The parsed ButtonPressedEvent object.
:param request: The FastAPI request object.
"""
logger.debug("Received button event: %s | %s", event.type, event.context)
topic = b"button_pressed"
body = event.model_dump_json().encode()
pub_socket = request.app.state.endpoints_pub_socket
await pub_socket.send_multipart([topic, body])
return {"status": "Event received"}

View File

@@ -1,8 +1,15 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
from pathlib import Path
import zmq
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from fastapi import APIRouter, HTTPException
from fastapi.responses import FileResponse, StreamingResponse
from zmq.asyncio import Context
from control_backend.core.config import settings
@@ -38,3 +45,29 @@ async def log_stream():
yield f"data: {message}\n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
LOGGING_DIR = Path(settings.logging_settings.experiment_log_directory).resolve()
@router.get("/logs/files")
@router.get("/api/logs/files")
async def log_directory():
"""
Get a list of all log files stored in the experiment log file directory.
"""
return [f.name for f in LOGGING_DIR.glob("*.log")]
@router.get("/logs/files/{filename}")
@router.get("/api/logs/files/{filename}")
async def log_file(filename: str):
# Prevent path-traversal
file_path = (LOGGING_DIR / filename).resolve() # This .resolve() is important
if not file_path.is_relative_to(LOGGING_DIR):
raise HTTPException(status_code=400, detail="Invalid filename.")
if not file_path.is_file():
raise HTTPException(status_code=404, detail="File not found.")
return FileResponse(file_path, filename=file_path.name)

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
from fastapi import APIRouter, Request

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
from fastapi import APIRouter, Request

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import json
import logging
@@ -137,7 +143,6 @@ async def ping_stream(request: Request):
logger.info("Client disconnected from SSE")
break
logger.debug(f"Yielded new connection event in robot ping router: {str(connected)}")
connectedJson = json.dumps(connected)
yield (f"data: {connectedJson}\n\n")

View File

@@ -1,12 +0,0 @@
from fastapi import APIRouter, Request
router = APIRouter()
# TODO: implement
@router.get("/sse")
async def sse(request: Request):
"""
Placeholder for future Server-Sent Events endpoint.
"""
pass

View File

@@ -0,0 +1,100 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import logging
import zmq
import zmq.asyncio
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
from zmq.asyncio import Context
from control_backend.core.config import settings
from control_backend.schemas.events import ButtonPressedEvent
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/button_pressed", status_code=202)
async def receive_button_event(event: ButtonPressedEvent, request: Request):
"""
Endpoint to handle external button press events.
Validates the event payload and publishes it to the internal 'button_pressed' topic.
Subscribers (in this case user_interrupt_agent) will pick this up to trigger
specific behaviors or state changes.
:param event: The parsed ButtonPressedEvent object.
:param request: The FastAPI request object.
"""
logger.debug("Received button event: %s | %s", event.type, event.context)
topic = b"button_pressed"
body = event.model_dump_json().encode()
pub_socket = request.app.state.endpoints_pub_socket
await pub_socket.send_multipart([topic, body])
return {"status": "Event received"}
@router.get("/experiment_stream")
async def experiment_stream(request: Request):
# Use the asyncio-compatible context
context = Context.instance()
socket = context.socket(zmq.SUB)
# Connect and subscribe
socket.connect(settings.zmq_settings.internal_sub_address)
socket.subscribe(b"experiment")
async def gen():
try:
while True:
# Check if client closed the tab
if await request.is_disconnected():
logger.error("Client disconnected from experiment stream.")
break
try:
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=10.0)
_, message = parts
yield f"data: {message.decode().strip()}\n\n"
except TimeoutError:
continue
finally:
socket.close()
return StreamingResponse(gen(), media_type="text/event-stream")
@router.get("/status_stream")
async def status_stream(request: Request):
context = Context.instance()
socket = context.socket(zmq.SUB)
socket.connect(settings.zmq_settings.internal_sub_address)
socket.subscribe(b"status")
async def gen():
try:
while True:
if await request.is_disconnected():
break
try:
# Shorter timeout since this is frequent
parts = await asyncio.wait_for(socket.recv_multipart(), timeout=0.5)
_, message = parts
yield f"data: {message.decode().strip()}\n\n"
except TimeoutError:
yield ": ping\n\n" # Keep the connection alive
continue
finally:
socket.close()
return StreamingResponse(gen(), media_type="text/event-stream")

View File

@@ -1,17 +1,21 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from fastapi.routing import APIRouter
from control_backend.api.v1.endpoints import button_pressed, logs, message, program, robot, sse
from control_backend.api.v1.endpoints import logs, message, program, robot, user_interact
api_router = APIRouter()
api_router.include_router(message.router, tags=["Messages"])
api_router.include_router(sse.router, tags=["SSE"])
api_router.include_router(robot.router, prefix="/robot", tags=["Pings", "Commands"])
api_router.include_router(robot.router, prefix="/robot", tags=["Pings", "Commands", "Face"])
api_router.include_router(logs.router, tags=["Logs"])
api_router.include_router(program.router, tags=["Program"])
api_router.include_router(button_pressed.router, tags=["Button Pressed Events"])
api_router.include_router(user_interact.router, tags=["Button Pressed Events"])

View File

@@ -0,0 +1,5 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import logging
from abc import ABC, abstractmethod
@@ -22,10 +28,22 @@ class AgentDirectory:
@staticmethod
def register(name: str, agent: "BaseAgent"):
"""
Registers an agent instance with a unique name.
:param name: The name of the agent.
:param agent: The :class:`BaseAgent` instance.
"""
_agent_directory[name] = agent
@staticmethod
def get(name: str) -> "BaseAgent | None":
"""
Retrieves a registered agent instance by name.
:param name: The name of the agent to retrieve.
:return: The :class:`BaseAgent` instance, or None if not found.
"""
return _agent_directory.get(name)
@@ -60,6 +78,9 @@ class BaseAgent(ABC):
self._tasks: set[asyncio.Task] = set()
self._running = False
self._internal_pub_socket: None | azmq.Socket = None
self._internal_sub_socket: None | azmq.Socket = None
# Register immediately
AgentDirectory.register(name, self)
@@ -117,7 +138,7 @@ class BaseAgent(ABC):
task.cancel()
self.logger.info(f"Agent {self.name} stopped")
async def send(self, message: InternalMessage):
async def send(self, message: InternalMessage, should_log: bool = True):
"""
Send a message to another agent.
@@ -130,17 +151,26 @@ class BaseAgent(ABC):
:param message: The message to send.
"""
target = AgentDirectory.get(message.to)
message.sender = self.name
if target:
await target.inbox.put(message)
self.logger.debug(f"Sent message {message.body} to {message.to} via regular inbox.")
else:
# Apparently target agent is on a different process, send via ZMQ
topic = f"internal/{message.to}".encode()
body = message.model_dump_json().encode()
await self._internal_pub_socket.send_multipart([topic, body])
self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.")
to = message.to
receivers = [to] if isinstance(to, str) else to
for receiver in receivers:
target = AgentDirectory.get(receiver)
if target:
await target.inbox.put(message)
if should_log:
self.logger.debug(
f"Sent message {message.body} to {message.to} via regular inbox."
)
else:
# Apparently target agent is on a different process, send via ZMQ
topic = f"internal/{receiver}".encode()
body = message.model_dump_json().encode()
await self._internal_pub_socket.send_multipart([topic, body])
if should_log:
self.logger.debug(f"Sent message {message.body} to {message.to} via ZMQ.")
async def _process_inbox(self):
"""
@@ -150,7 +180,6 @@ class BaseAgent(ABC):
"""
while self._running:
msg = await self.inbox.get()
self.logger.debug(f"Received message from {msg.sender}.")
await self.handle_message(msg)
async def _receive_internal_zmq_loop(self):

View File

@@ -1,4 +1,8 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
--------------------------------------------------------------------------------
An exhaustive overview of configurable options. All of these can be set using environment variables
by nesting with double underscores (__). Start from the ``Settings`` class.
@@ -35,7 +39,6 @@ class AgentSettings(BaseModel):
Names of the various agents in the system. These names are used for routing messages.
:ivar bdi_core_name: Name of the BDI Core Agent.
:ivar bdi_belief_collector_name: Name of the Belief Collector Agent.
:ivar bdi_program_manager_name: Name of the BDI Program Manager Agent.
:ivar text_belief_extractor_name: Name of the Text Belief Extractor Agent.
:ivar vad_name: Name of the Voice Activity Detection (VAD) Agent.
@@ -50,8 +53,8 @@ class AgentSettings(BaseModel):
# agent names
bdi_core_name: str = "bdi_core_agent"
bdi_belief_collector_name: str = "belief_collector_agent"
bdi_program_manager_name: str = "bdi_program_manager_agent"
visual_emotion_recognition_name: str = "visual_emotion_recognition_agent"
text_belief_extractor_name: str = "text_belief_extractor_agent"
vad_name: str = "vad_agent"
llm_name: str = "llm_agent"
@@ -61,6 +64,7 @@ class AgentSettings(BaseModel):
robot_speech_name: str = "robot_speech_agent"
robot_gesture_name: str = "robot_gesture_agent"
user_interrupt_name: str = "user_interrupt_agent"
face_agent_name: str = "face_detection_agent"
class BehaviourSettings(BaseModel):
@@ -79,6 +83,12 @@ class BehaviourSettings(BaseModel):
:ivar transcription_words_per_token: Estimated words per token for transcription timing.
:ivar transcription_token_buffer: Buffer for transcription tokens.
:ivar conversation_history_length_limit: The maximum amount of messages to extract beliefs from.
:ivar trigger_time_to_wait: Amount of milliseconds to wait before informing the UI about trigger
completion.
:ivar visual_emotion_recognition_window_duration_s: Duration in seconds over which to aggregate
emotions and update emotion beliefs.
:ivar visual_emotion_recognition_min_frames_per_face: Minimum number of frames per face required
to consider a face valid.
"""
# ATTENTION: When adding/removing settings, make sure to update the .env.example file
@@ -102,6 +112,14 @@ class BehaviourSettings(BaseModel):
# Text belief extractor settings
conversation_history_length_limit: int = 10
# AgentSpeak related settings
trigger_time_to_wait: int = 2000
agentspeak_file: str = "src/control_backend/agents/bdi/agentspeak.asl"
# Visual Emotion Recognition settings
visual_emotion_recognition_window_duration_s: int = 5
visual_emotion_recognition_min_frames_per_face: int = 3
class LLMSettings(BaseModel):
"""
@@ -119,6 +137,7 @@ class LLMSettings(BaseModel):
local_llm_url: str = "http://localhost:1234/v1/chat/completions"
local_llm_model: str = "gpt-oss"
api_key: str = ""
chat_temperature: float = 1.0
code_temperature: float = 0.3
n_parallel: int = 4
@@ -155,6 +174,20 @@ class SpeechModelSettings(BaseModel):
openai_model_name: str = "small.en"
class LoggingSettings(BaseModel):
"""
Configuration for logging.
:ivar logging_config_file: Path to the logging configuration file.
:ivar experiment_log_directory: Location of the experiment logs. Must match the logging config.
:ivar experiment_logger_name: Name of the experiment logger. Must match the logging config.
"""
logging_config_file: str = ".logging_config.yaml"
experiment_log_directory: str = "experiment_logs"
experiment_logger_name: str = "experiment"
class Settings(BaseSettings):
"""
Global application settings.
@@ -176,6 +209,8 @@ class Settings(BaseSettings):
ri_host: str = "localhost"
logging_settings: LoggingSettings = LoggingSettings()
zmq_settings: ZMQSettings = ZMQSettings()
agent_settings: AgentSettings = AgentSettings()

View File

@@ -1 +1,10 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from .dated_file_handler import DatedFileHandler as DatedFileHandler
from .optional_field_formatter import OptionalFieldFormatter as OptionalFieldFormatter
from .partial_filter import PartialFilter as PartialFilter
from .setup_logging import setup_logging as setup_logging

View File

@@ -0,0 +1,44 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from datetime import datetime
from logging import FileHandler
from pathlib import Path
class DatedFileHandler(FileHandler):
def __init__(self, file_prefix: str, **kwargs):
if not file_prefix:
raise ValueError("`file_prefix` argument cannot be empty.")
self._file_prefix = file_prefix
kwargs["filename"] = self._make_filename()
super().__init__(**kwargs)
def _make_filename(self) -> str:
"""
Create the filename for the current logfile, using the configured file prefix and the
current date and time. If the directory does not exist, it gets created.
:return: A filepath.
"""
filepath = Path(f"{self._file_prefix}-{datetime.now():%Y%m%d-%H%M%S}.log")
if not filepath.parent.is_dir():
filepath.parent.mkdir(parents=True, exist_ok=True)
return str(filepath)
def do_rollover(self):
"""
Close the current logfile and create a new one with the current date and time.
"""
self.acquire()
try:
if self.stream:
self.stream.close()
self.baseFilename = self._make_filename()
self.stream = self._open()
finally:
self.release()

View File

@@ -0,0 +1,73 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
import re
class OptionalFieldFormatter(logging.Formatter):
"""
Logging formatter that supports optional fields marked by `?`.
Optional fields are denoted by placing a `?` after the field name inside
the parentheses, e.g., `%(role?)s`. If the field is not provided in the
log call's `extra` dict, it will use the default value from `defaults`
or `None` if no default is specified.
:param fmt: Format string with optional `%(name?)s` style fields.
:type fmt: str or None
:param datefmt: Date format string, passed to parent :class:`logging.Formatter`.
:type datefmt: str or None
:param style: Formatting style, must be '%'. Passed to parent.
:type style: str
:param defaults: Default values for optional fields when not provided.
:type defaults: dict or None
:example:
>>> formatter = OptionalFieldFormatter(
... fmt="%(asctime)s %(levelname)s %(role?)s %(message)s",
... defaults={"role": ""-""}
... )
>>> handler = logging.StreamHandler()
>>> handler.setFormatter(formatter)
>>> logger = logging.getLogger(__name__)
>>> logger.addHandler(handler)
>>>
>>> logger.chat("Hello there!", extra={"role": "USER"})
2025-01-15 10:30:00 CHAT USER Hello there!
>>>
>>> logger.info("A logging message")
2025-01-15 10:30:01 INFO - A logging message
.. note::
Only `%`-style formatting is supported. The `{` and `$` styles are not
compatible with this formatter.
.. seealso::
:class:`logging.Formatter` for base formatter documentation.
"""
# Match %(name?)s or %(name?)d etc.
OPTIONAL_PATTERN = re.compile(r"%\((\w+)\?\)([sdifFeEgGxXocrba%])")
def __init__(self, fmt=None, datefmt=None, style="%", defaults=None):
self.defaults = defaults or {}
self.optional_fields = set(self.OPTIONAL_PATTERN.findall(fmt or ""))
# Convert %(name?)s to %(name)s for standard formatting
normalized_fmt = self.OPTIONAL_PATTERN.sub(r"%(\1)\2", fmt or "")
super().__init__(normalized_fmt, datefmt, style)
def format(self, record):
for field, _ in self.optional_fields:
if not hasattr(record, field):
default = self.defaults.get(field, None)
setattr(record, field, default)
return super().format(record)

View File

@@ -0,0 +1,16 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
class PartialFilter(logging.Filter):
"""
Class to filter any log records that have the "partial" attribute set to ``True``.
"""
def filter(self, record):
return getattr(record, "partial", False) is not True

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
import logging.config
import os
@@ -37,7 +43,7 @@ def add_logging_level(level_name: str, level_num: int, method_name: str | None =
setattr(logging, method_name, log_to_root)
def setup_logging(path: str = ".logging_config.yaml") -> None:
def setup_logging(path: str = settings.logging_settings.logging_config_file) -> None:
"""
Setup logging configuration of the CB. Tries to load the logging configuration from a file,
in which we specify custom loggers, formatters, handlers, etc.
@@ -65,7 +71,7 @@ def setup_logging(path: str = ".logging_config.yaml") -> None:
# Patch ZMQ PUBHandler to know about custom levels
if custom_levels:
for logger_name in ("control_backend",):
for logger_name in config.get("loggers", {}):
logger = logging.getLogger(logger_name)
for handler in logger.handlers:
if isinstance(handler, PUBHandler):

View File

@@ -1,4 +1,8 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
--------------------------------------------------------------------------------
Control Backend Main Application.
This module defines the FastAPI application that serves as the entry point for the
@@ -26,7 +30,6 @@ from zmq.asyncio import Context
# BDI agents
from control_backend.agents.bdi import (
BDIBeliefCollectorAgent,
BDICoreAgent,
TextBeliefExtractorAgent,
)
@@ -122,12 +125,6 @@ async def lifespan(app: FastAPI):
"name": settings.agent_settings.bdi_core_name,
},
),
"BeliefCollectorAgent": (
BDIBeliefCollectorAgent,
{
"name": settings.agent_settings.bdi_belief_collector_name,
},
),
"TextBeliefExtractorAgent": (
TextBeliefExtractorAgent,
{
@@ -172,6 +169,8 @@ async def lifespan(app: FastAPI):
await endpoints_pub_socket.send_multipart([PROGRAM_STATUS, ProgramStatus.STOPPING.value])
# Additional shutdown logic goes here
for agent in agents:
await agent.stop()
logger.info("Application shutdown complete.")

View File

@@ -0,0 +1,5 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,7 +1,13 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from pydantic import BaseModel
from control_backend.schemas.program import BaseGoal
from control_backend.schemas.program import Belief as ProgramBelief
from control_backend.schemas.program import Goal
class BeliefList(BaseModel):
@@ -16,4 +22,10 @@ class BeliefList(BaseModel):
class GoalList(BaseModel):
goals: list[Goal]
"""
Represents a list of goals, used for communicating multiple goals between agents.
:ivar goals: The list of goals.
"""
goals: list[BaseGoal]

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from pydantic import BaseModel
@@ -11,7 +17,7 @@ class Belief(BaseModel):
"""
name: str
arguments: list[str] | None
arguments: list[str] | None = None
# To make it hashable
model_config = {"frozen": True}

View File

@@ -1,10 +1,29 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from pydantic import BaseModel
class ChatMessage(BaseModel):
"""
Represents a single message in a conversation.
:ivar role: The role of the speaker (e.g., 'user', 'assistant').
:ivar content: The text content of the message.
"""
role: str
content: str
class ChatHistory(BaseModel):
"""
Represents a sequence of chat messages, forming a conversation history.
:ivar messages: An ordered list of :class:`ChatMessage` objects.
"""
messages: list[ChatMessage]

View File

@@ -1,6 +1,20 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from pydantic import BaseModel
class ButtonPressedEvent(BaseModel):
"""
Represents a button press event from the UI.
:ivar type: The type of event (e.g., 'speech', 'gesture', 'override').
:ivar context: Additional data associated with the event (e.g., speech text, gesture name,
or ID).
"""
type: str
context: str

View File

@@ -1,3 +1,11 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from collections.abc import Iterable
from pydantic import BaseModel
@@ -5,13 +13,13 @@ class InternalMessage(BaseModel):
"""
Standard message envelope for communication between agents within the Control Backend.
:ivar to: The name of the destination agent.
:ivar to: The name(s) of the destination agent(s).
:ivar sender: The name of the sending agent.
:ivar body: The string payload (often a JSON-serialized model).
:ivar thread: An optional thread identifier/topic to categorize the message (e.g., 'beliefs').
"""
to: str
to: str | Iterable[str]
sender: str | None = None
body: str
thread: str | None = None

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from pydantic import BaseModel

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from pydantic import BaseModel

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from enum import Enum
from typing import Literal
@@ -15,21 +21,43 @@ class ProgramElement(BaseModel):
name: str
id: UUID4
# To make program elements hashable
model_config = {"frozen": True}
class LogicalOperator(Enum):
"""
Logical operators for combining beliefs.
These operators define how beliefs can be combined to form more complex
logical conditions. They are used in inferred beliefs to create compound
beliefs from simpler ones.
AND: Both operands must be true for the result to be true.
OR: At least one operand must be true for the result to be true.
"""
AND = "AND"
OR = "OR"
type Belief = KeywordBelief | SemanticBelief | InferredBelief
type BasicBelief = KeywordBelief | SemanticBelief
type Belief = KeywordBelief | SemanticBelief | InferredBelief | EmotionBelief | FaceBelief
type BasicBelief = KeywordBelief | SemanticBelief | EmotionBelief | FaceBelief
class KeywordBelief(ProgramElement):
"""
Represents a belief that is set when the user spoken text contains a certain keyword.
Represents a belief that is activated when a specific keyword is detected in the user's speech.
:ivar keyword: The keyword on which this belief gets set.
Keyword beliefs provide a simple but effective way to detect specific topics
or intentions in user speech. They are triggered when the exact keyword
string appears in the transcribed user input.
:ivar keyword: The string to look for in the transcription.
Example:
A keyword belief with keyword="robot" would be activated when the user
says "I like the robot" or "Tell me about robots".
"""
name: str = ""
@@ -38,9 +66,21 @@ class KeywordBelief(ProgramElement):
class SemanticBelief(ProgramElement):
"""
Represents a belief that is set by semantic LLM validation.
Represents a belief whose truth value is determined by an LLM analyzing the conversation
context.
:ivar description: Description of how to form the belief, used by the LLM.
Semantic beliefs provide more sophisticated belief detection by using
an LLM to analyze the conversation context and determine
if the belief should be considered true. This allows for more nuanced
and context-aware belief evaluation.
:ivar description: A natural language description of what this belief represents,
used as a prompt for the LLM.
Example:
A semantic belief with description="The user is expressing frustration"
would be activated when the LLM determines that the user's statements
indicate frustration, even if no specific keywords are used.
"""
description: str
@@ -48,13 +88,16 @@ class SemanticBelief(ProgramElement):
class InferredBelief(ProgramElement):
"""
Represents a belief that gets formed by combining two beliefs with a logical AND or OR.
Represents a belief derived from other beliefs using logical operators.
These beliefs can also be :class:`InferredBelief`, leading to arbitrarily deep nesting.
Inferred beliefs allow for the creation of complex belief structures by
combining simpler beliefs using logical operators. This enables the
representation of sophisticated conditions and relationships between
different aspects of the conversation or context.
:ivar operator: The logical operator to apply.
:ivar left: The left part of the logical expression.
:ivar right: The right part of the logical expression.
:ivar operator: The :class:`LogicalOperator` (AND/OR) to apply.
:ivar left: The left operand (another belief).
:ivar right: The right operand (another belief).
"""
name: str = ""
@@ -63,7 +106,43 @@ class InferredBelief(ProgramElement):
right: Belief
class EmotionBelief(ProgramElement):
"""
Represents a belief that is set when a certain emotion is detected.
:ivar emotion: The emotion on which this belief gets set.
"""
name: str = ""
emotion: str
class FaceBelief(ProgramElement):
"""
Represents the belief that at least one face is currently detected.
This belief is maintained by a perception agent (not inferred).
"""
face_present: bool
name: str = ""
class Norm(ProgramElement):
"""
Base class for behavioral norms that guide the robot's interactions.
Norms represent guidelines, principles, or rules that should govern the
robot's behavior during interactions. They can be either basic (always
active in their phase) or conditional (active only when specific beliefs
are true).
:ivar norm: The textual description of the norm.
:ivar critical: Whether this norm is considered critical and should be strictly enforced.
Critical norms are currently not supported yet, but are intended for norms that should
ABSOLUTELY NOT be violated, possible cheched by additional validator agents.
"""
name: str = ""
norm: str
critical: bool = False
@@ -71,10 +150,14 @@ class Norm(ProgramElement):
class BasicNorm(Norm):
"""
Represents a behavioral norm.
A simple behavioral norm that is always considered for activation when its phase is active.
:ivar norm: The actual norm text describing the behavior.
:ivar critical: When true, this norm should absolutely not be violated (checked separately).
Basic norms are the most straightforward type of norms. They are active
throughout their assigned phase and provide consistent behavioral guidance
without any additional conditions.
These norms are suitable for general principles that should always apply
during a particular interaction phase.
"""
pass
@@ -82,9 +165,22 @@ class BasicNorm(Norm):
class ConditionalNorm(Norm):
"""
Represents a norm that is only active when a condition is met (i.e., a certain belief holds).
A behavioral norm that is only active when a specific condition (belief) is met.
:ivar condition: When to activate this norm.
Conditional norms provide context-sensitive behavioral guidance. They are
only active and considered for activation when their associated condition
(belief) is true. This allows for more nuanced and adaptive behavior that
responds to the specific context of the interaction.
An important note, is that the current implementation of these norms for keyword-based beliefs
is that they only hold for 1 turn, as keyword-based conditions often express temporary
conditions.
:ivar condition: The :class:`Belief` that must hold for this norm to be active.
Example:
A conditional norm with the condition "user is frustrated" might specify
that the robot should use more empathetic language and avoid complex topics.
"""
condition: Belief
@@ -96,7 +192,12 @@ type PlanElement = Goal | Action
class Plan(ProgramElement):
"""
Represents a list of steps to execute. Each of these steps can be a goal (with its own plan)
or a simple action.
or a simple action.
Plans define sequences of actions and subgoals that the robot should execute
to achieve a particular objective. They form the procedural knowledge of
the behavior program, specifying what the robot should do in different
situations.
:ivar steps: The actions or subgoals to execute, in order.
"""
@@ -105,31 +206,49 @@ class Plan(ProgramElement):
steps: list[PlanElement]
class Goal(ProgramElement):
class BaseGoal(ProgramElement):
"""
Represents an objective to be achieved. To reach the goal, we should execute
the corresponding plan. If we can fail to achieve a goal after executing the plan,
for example when the achieving of the goal is dependent on the user's reply, this means
that the achieved status will be set from somewhere else in the program.
Represents an objective to be achieved. This base version does not include a plan to achieve
this goal, and is used in semantic belief extraction.
:ivar description: A description of the goal, used to determine if it has been achieved.
:ivar plan: The plan to execute.
:ivar can_fail: Whether we can fail to achieve the goal after executing the plan.
The can_fail attribute determines whether goal achievement is binary
(success/failure) or whether it can be determined through conversation
analysis.
"""
description: str = ""
plan: Plan
can_fail: bool = True
class Goal(BaseGoal):
"""
Represents an objective to be achieved. To reach the goal, we should execute the corresponding
plan. It inherits from the BaseGoal a variable `can_fail`, which, if true, will cause the
completion to be determined based on the conversation.
Goals extend base goals by including a specific plan to achieve the objective.
They form the core of the robot's proactive behavior, defining both what
should be accomplished and how to accomplish it.
Instances of this goal are not hashable because a plan is not hashable.
:ivar plan: The plan to execute.
"""
plan: Plan
type Action = SpeechAction | GestureAction | LLMAction
class SpeechAction(ProgramElement):
"""
Represents the action of the robot speaking a literal text.
An action where the robot speaks a predefined literal text.
:ivar text: The text to speak.
:ivar text: The text content to be spoken.
"""
name: str = ""
@@ -138,11 +257,14 @@ class SpeechAction(ProgramElement):
class Gesture(BaseModel):
"""
Represents a gesture to be performed. Can be either a single gesture,
or a random gesture from a category (tag).
Defines a physical gesture for the robot to perform.
:ivar type: The type of the gesture, "tag" or "single".
:ivar name: The name of the single gesture or tag.
:ivar type: Whether to use a specific "single" gesture or a random one from a "tag" category.
:ivar name: The identifier for the gesture or tag.
The type field determines how the gesture is selected:
- "single": Use the specific gesture identified by name
- "tag": Select a random gesture from the category identified by name
"""
type: Literal["tag", "single"]
@@ -151,9 +273,9 @@ class Gesture(BaseModel):
class GestureAction(ProgramElement):
"""
Represents the action of the robot performing a gesture.
An action where the robot performs a physical gesture.
:ivar gesture: The gesture to perform.
:ivar gesture: The :class:`Gesture` definition.
"""
name: str = ""
@@ -162,10 +284,13 @@ class GestureAction(ProgramElement):
class LLMAction(ProgramElement):
"""
Represents the action of letting an LLM generate a reply based on its chat history
and an additional goal added in the prompt.
An action that triggers an LLM-generated conversational response.
:ivar goal: The extra (temporary) goal to add to the LLM.
:ivar goal: A temporary conversational goal to guide the LLM's response generation.
The goal parameter provides high-level guidance to the LLM about what
the response should aim to achieve, while allowing the LLM flexibility
in how to express it.
"""
name: str = ""
@@ -174,10 +299,10 @@ class LLMAction(ProgramElement):
class Trigger(ProgramElement):
"""
Represents a belief-based trigger. When a belief is set, the corresponding plan is executed.
Defines a reactive behavior: when the condition (belief) is met, the plan is executed.
:ivar condition: When to activate the trigger.
:ivar plan: The plan to execute.
:ivar condition: The :class:`Belief` that triggers this behavior.
:ivar plan: The :class:`Plan` to execute upon activation.
"""
condition: Belief
@@ -186,11 +311,11 @@ class Trigger(ProgramElement):
class Phase(ProgramElement):
"""
A distinct phase within a program, containing norms, goals, and triggers.
A logical stage in the interaction program, grouping norms, goals, and triggers.
:ivar norms: List of norms active in this phase.
:ivar goals: List of goals to pursue in this phase.
:ivar triggers: List of triggers that define transitions out of this phase.
:ivar norms: List of norms active during this phase.
:ivar goals: List of goals the robot pursues in this phase.
:ivar triggers: List of reactive behaviors defined for this phase.
"""
name: str = ""
@@ -201,9 +326,19 @@ class Phase(ProgramElement):
class Program(BaseModel):
"""
Represents a complete interaction program, consisting of a sequence or set of phases.
The top-level container for a complete robot behavior definition.
:ivar phases: The list of phases that make up the program.
The Program class represents the complete specification of a robot's
behavioral logic. It contains all the phases, norms, goals, triggers,
and actions that define how the robot should behave during interactions.
:ivar phases: An ordered list of :class:`Phase` objects defining the interaction flow.
"""
phases: list[Phase]
if __name__ == "__main__":
input = input("Enter program JSON: ")
program = Program.model_validate_json(input)
print(program)

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from enum import Enum
PROGRAM_STATUS = b"internal/program_status"

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from enum import Enum
from typing import Any, Literal
@@ -14,6 +20,7 @@ class RIEndpoint(str, Enum):
GESTURE_TAG = "actuate/gesture/tag"
PING = "ping"
NEGOTIATE_PORTS = "negotiate/ports"
PAUSE = ""
class RIMessage(BaseModel):
@@ -64,3 +71,15 @@ class GestureCommand(RIMessage):
if self.endpoint not in allowed:
raise ValueError("endpoint must be GESTURE_SINGLE or GESTURE_TAG")
return self
class PauseCommand(RIMessage):
"""
A specific command to pause or unpause the robot's actions.
:ivar endpoint: Fixed to ``RIEndpoint.PAUSE``.
:ivar data: A boolean indicating whether to pause (True) or unpause (False).
"""
endpoint: RIEndpoint = RIEndpoint(RIEndpoint.PAUSE)
data: bool

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import random
from unittest.mock import AsyncMock, MagicMock
@@ -40,7 +46,7 @@ async def test_normal_setup(per_transcription_agent):
per_vad_agent = VADAgent("tcp://localhost:12345", False)
per_vad_agent._streaming_loop = AsyncMock()
async def swallow_background_task(coro):
def swallow_background_task(coro):
coro.close()
per_vad_agent.add_behavior = swallow_background_task
@@ -106,7 +112,7 @@ async def test_out_socket_creation_failure(zmq_context):
per_vad_agent._streaming_loop = AsyncMock()
per_vad_agent._connect_audio_out_socket = MagicMock(return_value=None)
async def swallow_background_task(coro):
def swallow_background_task(coro):
coro.close()
per_vad_agent.add_behavior = swallow_background_task
@@ -126,7 +132,7 @@ async def test_stop(zmq_context, per_transcription_agent):
per_vad_agent._reset_stream = AsyncMock()
per_vad_agent._streaming_loop = AsyncMock()
async def swallow_background_task(coro):
def swallow_background_task(coro):
coro.close()
per_vad_agent.add_behavior = swallow_background_task
@@ -150,6 +156,7 @@ async def test_application_startup_complete(zmq_context):
vad_agent._running = True
vad_agent._reset_stream = AsyncMock()
vad_agent.program_sub_socket = AsyncMock()
vad_agent.program_sub_socket.close = MagicMock()
vad_agent.program_sub_socket.recv_multipart.side_effect = [
(PROGRAM_STATUS, ProgramStatus.RUNNING.value),
]

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import os
from unittest.mock import AsyncMock, MagicMock

View File

@@ -1,5 +1,11 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import json
from unittest.mock import AsyncMock, MagicMock
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import zmq
@@ -19,6 +25,12 @@ def zmq_context(mocker):
return mock_context
@pytest.fixture(autouse=True)
def mock_experiment_logger():
with patch("control_backend.agents.actuation.robot_gesture_agent.experiment_logger") as logger:
yield logger
@pytest.mark.asyncio
async def test_setup_bind(zmq_context, mocker):
"""Setup binds and subscribes to internal commands."""
@@ -28,7 +40,11 @@ async def test_setup_bind(zmq_context, mocker):
settings = mocker.patch("control_backend.agents.actuation.robot_gesture_agent.settings")
settings.zmq_settings.internal_sub_address = "tcp://internal:1234"
agent.add_behavior = MagicMock()
def close_coro(coro):
coro.close()
return MagicMock()
agent.add_behavior = MagicMock(side_effect=close_coro)
await agent.setup()
@@ -55,7 +71,11 @@ async def test_setup_connect(zmq_context, mocker):
settings = mocker.patch("control_backend.agents.actuation.robot_gesture_agent.settings")
settings.zmq_settings.internal_sub_address = "tcp://internal:1234"
agent.add_behavior = MagicMock()
def close_coro(coro):
coro.close()
return MagicMock()
agent.add_behavior = MagicMock(side_effect=close_coro)
await agent.setup()
@@ -119,6 +139,65 @@ async def test_handle_message_rejects_invalid_gesture_tag():
pubsocket.send_json.assert_not_awaited()
@pytest.mark.asyncio
async def test_handle_message_sends_valid_single_gesture_command():
"""Internal message with valid single gesture is forwarded."""
pubsocket = AsyncMock()
agent = RobotGestureAgent("robot_gesture", single_gesture_data=["wave", "point"], address="")
agent.pubsocket = pubsocket
payload = {
"endpoint": RIEndpoint.GESTURE_SINGLE,
"data": "wave",
}
msg = InternalMessage(to="robot", sender="tester", body=json.dumps(payload))
await agent.handle_message(msg)
pubsocket.send_json.assert_awaited_once()
@pytest.mark.asyncio
async def test_handle_message_rejects_invalid_single_gesture():
"""Internal message with invalid single gesture is not forwarded."""
pubsocket = AsyncMock()
agent = RobotGestureAgent("robot_gesture", single_gesture_data=["wave", "point"], address="")
agent.pubsocket = pubsocket
payload = {
"endpoint": RIEndpoint.GESTURE_SINGLE,
"data": "dance",
}
msg = InternalMessage(to="robot", sender="tester", body=json.dumps(payload))
await agent.handle_message(msg)
pubsocket.send_json.assert_not_awaited()
@pytest.mark.asyncio
async def test_zmq_command_loop_valid_single_gesture_payload():
"""UI command with valid single gesture is read from SUB and published."""
command = {"endpoint": RIEndpoint.GESTURE_SINGLE, "data": "wave"}
fake_socket = AsyncMock()
async def recv_once():
agent._running = False
return b"command", json.dumps(command).encode("utf-8")
fake_socket.recv_multipart = recv_once
fake_socket.send_json = AsyncMock()
agent = RobotGestureAgent("robot_gesture", single_gesture_data=["wave", "point"], address="")
agent.subsocket = fake_socket
agent.pubsocket = fake_socket
agent._running = True
await agent._zmq_command_loop()
fake_socket.send_json.assert_awaited_once()
@pytest.mark.asyncio
async def test_handle_message_invalid_payload():
"""Invalid payload is caught and does not send."""
@@ -411,8 +490,7 @@ async def test_stop_closes_sockets():
pubsocket.close.assert_called_once()
subsocket.close.assert_called_once()
# Note: repsocket is not closed in stop() method, but you might want to add it
# repsocket.close.assert_called_once()
repsocket.close.assert_called_once()
@pytest.mark.asyncio

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import json
from unittest.mock import AsyncMock, MagicMock
@@ -30,7 +36,11 @@ async def test_setup_bind(zmq_context, mocker):
settings = mocker.patch("control_backend.agents.actuation.robot_speech_agent.settings")
settings.zmq_settings.internal_sub_address = "tcp://internal:1234"
agent.add_behavior = MagicMock()
def close_coro(coro):
coro.close()
return MagicMock()
agent.add_behavior = MagicMock(side_effect=close_coro)
await agent.setup()
@@ -48,7 +58,11 @@ async def test_setup_connect(zmq_context, mocker):
settings = mocker.patch("control_backend.agents.actuation.robot_speech_agent.settings")
settings.zmq_settings.internal_sub_address = "tcp://internal:1234"
agent.add_behavior = MagicMock()
def close_coro(coro):
coro.close()
return MagicMock()
agent.add_behavior = MagicMock(side_effect=close_coro)
await agent.setup()

View File

@@ -0,0 +1,192 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pytest
from control_backend.agents.bdi.agentspeak_ast import (
AstAtom,
AstBinaryOp,
AstLiteral,
AstLogicalExpression,
AstNumber,
AstPlan,
AstProgram,
AstRule,
AstStatement,
AstString,
AstVar,
BinaryOperatorType,
StatementType,
TriggerType,
_coalesce_expr,
)
def test_ast_atom():
atom = AstAtom("test")
assert str(atom) == "test"
assert atom._to_agentspeak() == "test"
def test_ast_var():
var = AstVar("Variable")
assert str(var) == "Variable"
assert var._to_agentspeak() == "Variable"
def test_ast_number():
num = AstNumber(42)
assert str(num) == "42"
num_float = AstNumber(3.14)
assert str(num_float) == "3.14"
def test_ast_string():
s = AstString("hello")
assert str(s) == '"hello"'
def test_ast_literal():
lit = AstLiteral("functor", [AstAtom("atom"), AstNumber(1)])
assert str(lit) == "functor(atom, 1)"
lit_empty = AstLiteral("functor")
assert str(lit_empty) == "functor"
def test_ast_binary_op():
left = AstNumber(1)
right = AstNumber(2)
op = AstBinaryOp(left, BinaryOperatorType.GREATER_THAN, right)
assert str(op) == "1 > 2"
# Test logical wrapper
assert isinstance(op.left, AstLogicalExpression)
assert isinstance(op.right, AstLogicalExpression)
def test_ast_binary_op_parens():
# 1 > 2
inner = AstBinaryOp(AstNumber(1), BinaryOperatorType.GREATER_THAN, AstNumber(2))
# (1 > 2) & 3
outer = AstBinaryOp(inner, BinaryOperatorType.AND, AstNumber(3))
assert str(outer) == "(1 > 2) & 3"
# 3 & (1 > 2)
outer_right = AstBinaryOp(AstNumber(3), BinaryOperatorType.AND, inner)
assert str(outer_right) == "3 & (1 > 2)"
def test_ast_binary_op_parens_negated():
inner = AstLogicalExpression(AstAtom("foo"), negated=True)
outer = AstBinaryOp(inner, BinaryOperatorType.AND, AstAtom("bar"))
# The current implementation checks `if self.left.negated: l_str = f"({l_str})"`
# str(inner) is "not foo"
# so we expect "(not foo) & bar"
assert str(outer) == "(not foo) & bar"
outer_right = AstBinaryOp(AstAtom("bar"), BinaryOperatorType.AND, inner)
assert str(outer_right) == "bar & (not foo)"
def test_ast_logical_expression_negation():
expr = AstLogicalExpression(AstAtom("true"), negated=True)
assert str(expr) == "not true"
expr_neg_neg = ~expr
assert str(expr_neg_neg) == "true"
assert not expr_neg_neg.negated
# Invert a non-logical expression (wraps it)
term = AstAtom("true")
inverted = ~term
assert isinstance(inverted, AstLogicalExpression)
assert inverted.negated
assert str(inverted) == "not true"
def test_ast_logical_expression_no_negation():
# _as_logical on already logical expression
expr = AstLogicalExpression(AstAtom("x"))
# Doing binary op will call _as_logical
op = AstBinaryOp(expr, BinaryOperatorType.AND, AstAtom("y"))
assert isinstance(op.left, AstLogicalExpression)
assert op.left is expr # Should reuse instance
def test_ast_operators():
t1 = AstAtom("a")
t2 = AstAtom("b")
assert str(t1 & t2) == "a & b"
assert str(t1 | t2) == "a | b"
assert str(t1 >= t2) == "a >= b"
assert str(t1 > t2) == "a > b"
assert str(t1 <= t2) == "a <= b"
assert str(t1 < t2) == "a < b"
assert str(t1 == t2) == "a == b"
assert str(t1 != t2) == r"a \== b"
def test_coalesce_expr():
t = AstAtom("a")
assert str(t & "b") == 'a & "b"'
assert str(t & 1) == "a & 1"
assert str(t & 1.5) == "a & 1.5"
with pytest.raises(TypeError):
_coalesce_expr(None)
def test_ast_statement():
stmt = AstStatement(StatementType.DO_ACTION, AstLiteral("action"))
assert str(stmt) == ".action"
def test_ast_rule():
# Rule with condition
rule = AstRule(AstLiteral("head"), AstLiteral("body"))
assert str(rule) == "head :- body."
# Rule without condition
rule_simple = AstRule(AstLiteral("fact"))
assert str(rule_simple) == "fact."
def test_ast_plan():
plan = AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("goal"),
[AstLiteral("context")],
[AstStatement(StatementType.DO_ACTION, AstLiteral("action"))],
)
output = str(plan)
# verify parts exist
assert "+!goal" in output
assert ": context" in output
assert "<- .action." in output
def test_ast_plan_no_context():
plan = AstPlan(
TriggerType.ADDED_GOAL,
AstLiteral("goal"),
[],
[AstStatement(StatementType.DO_ACTION, AstLiteral("action"))],
)
output = str(plan)
assert "+!goal" in output
assert ": " not in output
assert "<- .action." in output
def test_ast_program():
prog = AstProgram(
rules=[AstRule(AstLiteral("fact"))],
plans=[AstPlan(TriggerType.ADDED_BELIEF, AstLiteral("b"), [], [])],
)
output = str(prog)
assert "fact." in output
assert "+b" in output

View File

@@ -0,0 +1,193 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import uuid
import pytest
from control_backend.agents.bdi.agentspeak_ast import AstProgram
from control_backend.agents.bdi.agentspeak_generator import AgentSpeakGenerator
from control_backend.schemas.program import (
BasicNorm,
ConditionalNorm,
Gesture,
GestureAction,
Goal,
InferredBelief,
KeywordBelief,
LLMAction,
LogicalOperator,
Phase,
Plan,
Program,
SemanticBelief,
SpeechAction,
Trigger,
)
@pytest.fixture
def generator():
return AgentSpeakGenerator()
def test_generate_empty_program(generator):
prog = Program(phases=[])
code = generator.generate(prog)
assert 'phase("end").' in code
assert "!notify_cycle" in code
def test_generate_basic_norm(generator):
norm = BasicNorm(id=uuid.uuid4(), name="n1", norm="be nice")
phase = Phase(id=uuid.uuid4(), norms=[norm], goals=[], triggers=[])
prog = Program(phases=[phase])
code = generator.generate(prog)
assert f'norm("be nice") :- phase("{phase.id}").' in code
def test_generate_critical_norm(generator):
norm = BasicNorm(id=uuid.uuid4(), name="n1", norm="safety", critical=True)
phase = Phase(id=uuid.uuid4(), norms=[norm], goals=[], triggers=[])
prog = Program(phases=[phase])
code = generator.generate(prog)
assert f'critical_norm("safety") :- phase("{phase.id}").' in code
def test_generate_conditional_norm(generator):
cond = KeywordBelief(id=uuid.uuid4(), name="k1", keyword="please")
norm = ConditionalNorm(id=uuid.uuid4(), name="n1", norm="help", condition=cond)
phase = Phase(id=uuid.uuid4(), norms=[norm], goals=[], triggers=[])
prog = Program(phases=[phase])
code = generator.generate(prog)
assert 'norm("help")' in code
assert 'keyword_said("please")' in code
assert f"force_norm_{generator._slugify_str(norm.norm)}" in code
def test_generate_goal_and_plan(generator):
action = SpeechAction(id=uuid.uuid4(), name="s1", text="hello")
plan = Plan(id=uuid.uuid4(), name="p1", steps=[action])
# IMPORTANT: can_fail must be False for +achieved_ belief to be added
goal = Goal(id=uuid.uuid4(), name="g1", description="desc", plan=plan, can_fail=False)
phase = Phase(id=uuid.uuid4(), norms=[], goals=[goal], triggers=[])
prog = Program(phases=[phase])
code = generator.generate(prog)
# Check trigger for goal
goal_slug = generator._slugify_str(goal.name)
assert f"+!{goal_slug}" in code
assert f'phase("{phase.id}")' in code
assert '!say("hello")' in code
# Check success belief addition
assert f"+achieved_{goal_slug}" in code
def test_generate_subgoal(generator):
subplan = Plan(id=uuid.uuid4(), name="p2", steps=[])
subgoal = Goal(id=uuid.uuid4(), name="sub1", description="sub", plan=subplan)
plan = Plan(id=uuid.uuid4(), name="p1", steps=[subgoal])
goal = Goal(id=uuid.uuid4(), name="g1", description="main", plan=plan)
phase = Phase(id=uuid.uuid4(), norms=[], goals=[goal], triggers=[])
prog = Program(phases=[phase])
code = generator.generate(prog)
subgoal_slug = generator._slugify_str(subgoal.name)
# Main goal calls subgoal
assert f"!{subgoal_slug}" in code
# Subgoal plan exists
assert f"+!{subgoal_slug}" in code
def test_generate_trigger(generator):
cond = SemanticBelief(id=uuid.uuid4(), name="s1", description="desc")
plan = Plan(id=uuid.uuid4(), name="p1", steps=[])
trigger = Trigger(id=uuid.uuid4(), name="t1", condition=cond, plan=plan)
phase = Phase(id=uuid.uuid4(), norms=[], goals=[], triggers=[trigger])
prog = Program(phases=[phase])
code = generator.generate(prog)
# Trigger logic is added to check_triggers
assert f"{generator.slugify(cond)}" in code
assert f'notify_trigger_start("{generator.slugify(trigger)}")' in code
assert f'notify_trigger_end("{generator.slugify(trigger)}")' in code
def test_phase_transition(generator):
phase1 = Phase(id=uuid.uuid4(), name="p1", norms=[], goals=[], triggers=[])
phase2 = Phase(id=uuid.uuid4(), name="p2", norms=[], goals=[], triggers=[])
prog = Program(phases=[phase1, phase2])
code = generator.generate(prog)
assert "transition_phase" in code
assert f'phase("{phase1.id}")' in code
assert f'phase("{phase2.id}")' in code
assert "force_transition_phase" in code
def test_astify_gesture(generator):
gesture = Gesture(type="single", name="wave")
action = GestureAction(id=uuid.uuid4(), name="g1", gesture=gesture)
ast = generator._astify(action)
assert str(ast) == 'gesture("single", "wave")'
def test_astify_llm_action(generator):
action = LLMAction(id=uuid.uuid4(), name="l1", goal="be funny")
ast = generator._astify(action)
assert str(ast) == 'reply_with_goal("be funny")'
def test_astify_inferred_belief_and(generator):
left = KeywordBelief(id=uuid.uuid4(), name="k1", keyword="a")
right = KeywordBelief(id=uuid.uuid4(), name="k2", keyword="b")
inf = InferredBelief(
id=uuid.uuid4(), name="i1", operator=LogicalOperator.AND, left=left, right=right
)
ast = generator._astify(inf)
assert 'keyword_said("a") & keyword_said("b")' == str(ast)
def test_astify_inferred_belief_or(generator):
left = KeywordBelief(id=uuid.uuid4(), name="k1", keyword="a")
right = KeywordBelief(id=uuid.uuid4(), name="k2", keyword="b")
inf = InferredBelief(
id=uuid.uuid4(), name="i1", operator=LogicalOperator.OR, left=left, right=right
)
ast = generator._astify(inf)
assert 'keyword_said("a") | keyword_said("b")' == str(ast)
def test_astify_semantic_belief(generator):
sb = SemanticBelief(id=uuid.uuid4(), name="s1", description="desc")
ast = generator._astify(sb)
assert str(ast) == f"semantic_{generator._slugify_str(sb.name)}"
def test_slugify_not_implemented(generator):
with pytest.raises(NotImplementedError):
generator.slugify("not a program element")
def test_astify_not_implemented(generator):
with pytest.raises(NotImplementedError):
generator._astify("not a program element")
def test_process_phase_transition_from_none(generator):
# Initialize AstProgram manually as we are bypassing generate()
generator._asp = AstProgram()
# Should safely return doing nothing
generator._add_phase_transition(None, None)
assert len(generator._asp.plans) == 0

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import json
import time
@@ -26,6 +32,12 @@ def agent():
return agent
@pytest.fixture(autouse=True)
def mock_experiment_logger():
with patch("control_backend.agents.bdi.bdi_core_agent.experiment_logger") as logger:
yield logger
@pytest.mark.asyncio
async def test_setup_loads_asl(mock_agentspeak_env, agent):
# Mock file opening
@@ -45,23 +57,34 @@ async def test_setup_no_asl(mock_agentspeak_env, agent):
@pytest.mark.asyncio
async def test_handle_belief_collector_message(agent, mock_settings):
async def test_handle_belief_message(agent, mock_settings):
"""Test that incoming beliefs are added to the BDI agent"""
beliefs = [Belief(name="user_said", arguments=["Hello"])]
msg = InternalMessage(
to="bdi_agent",
sender=mock_settings.agent_settings.bdi_belief_collector_name,
sender=mock_settings.agent_settings.text_belief_extractor_name,
body=BeliefMessage(create=beliefs).model_dump_json(),
thread="beliefs",
)
await agent.handle_message(msg)
# Expect bdi_agent.call to be triggered to add belief
args = agent.bdi_agent.call.call_args.args
assert args[0] == agentspeak.Trigger.addition
assert args[1] == agentspeak.GoalType.belief
assert args[2] == agentspeak.Literal("user_said", (agentspeak.Literal("Hello"),))
# Check for the specific call we expect among all calls
# bdi_agent.call is called multiple times (for transition_phase, check_triggers)
# We want to confirm the belief addition call exists
found_call = False
for call in agent.bdi_agent.call.call_args_list:
args = call.args
if (
args[0] == agentspeak.Trigger.addition
and args[1] == agentspeak.GoalType.belief
and args[2].functor == "user_said"
and args[2].args[0].functor == "Hello"
):
found_call = True
break
assert found_call, "Expected belief addition call not found in bdi_agent.call history"
@pytest.mark.asyncio
@@ -71,25 +94,33 @@ async def test_handle_delete_belief_message(agent, mock_settings):
msg = InternalMessage(
to="bdi_agent",
sender=mock_settings.agent_settings.bdi_belief_collector_name,
sender=mock_settings.agent_settings.text_belief_extractor_name,
body=BeliefMessage(delete=beliefs).model_dump_json(),
thread="beliefs",
)
await agent.handle_message(msg)
# Expect bdi_agent.call to be triggered to remove belief
args = agent.bdi_agent.call.call_args.args
assert args[0] == agentspeak.Trigger.removal
assert args[1] == agentspeak.GoalType.belief
assert args[2] == agentspeak.Literal("user_said", (agentspeak.Literal("Hello"),))
found_call = False
for call in agent.bdi_agent.call.call_args_list:
args = call.args
if (
args[0] == agentspeak.Trigger.removal
and args[1] == agentspeak.GoalType.belief
and args[2].functor == "user_said"
and args[2].args[0].functor == "Hello"
):
found_call = True
break
assert found_call
@pytest.mark.asyncio
async def test_incorrect_belief_collector_message(agent, mock_settings):
async def test_incorrect_belief_message(agent, mock_settings):
"""Test that incorrect message format triggers an exception."""
msg = InternalMessage(
to="bdi_agent",
sender=mock_settings.agent_settings.bdi_belief_collector_name,
sender=mock_settings.agent_settings.text_belief_extractor_name,
body=json.dumps({"bad_format": "bad_format"}),
thread="beliefs",
)
@@ -171,7 +202,11 @@ def test_remove_belief_success_wakes_loop(agent):
agent._remove_belief("remove_me", ["x"])
assert agent.bdi_agent.call.called
trigger, goaltype, literal, *_ = agent.bdi_agent.call.call_args.args
call_args = agent.bdi_agent.call.call_args.args
trigger = call_args[0]
goaltype = call_args[1]
literal = call_args[2]
assert trigger == agentspeak.Trigger.removal
assert goaltype == agentspeak.GoalType.belief
@@ -288,3 +323,216 @@ async def test_deadline_sleep_branch(agent):
duration = time.time() - start_time
assert duration >= 0.004 # loop slept until deadline
@pytest.mark.asyncio
async def test_handle_new_program(agent):
agent._load_asl = AsyncMock()
agent.add_behavior = MagicMock()
# Mock existing loop task so it can be cancelled
mock_task = MagicMock()
mock_task.cancel = MagicMock()
agent._bdi_loop_task = mock_task
def close_coro(coro):
coro.close()
return MagicMock()
agent.add_behavior = MagicMock(side_effect=close_coro)
msg = InternalMessage(to="bdi_agent", thread="new_program", body="path/to/asl.asl")
await agent.handle_message(msg)
mock_task.cancel.assert_called_once()
agent._load_asl.assert_awaited_once_with("path/to/asl.asl")
agent.add_behavior.assert_called()
@pytest.mark.asyncio
async def test_handle_user_interrupts(agent, mock_settings):
mock_settings.agent_settings.user_interrupt_name = "user_interrupt_agent"
# force_phase_transition
agent._set_goal = MagicMock()
msg = InternalMessage(
to="bdi_agent",
sender=mock_settings.agent_settings.user_interrupt_name,
thread="force_phase_transition",
body="",
)
await agent.handle_message(msg)
agent._set_goal.assert_called_with("transition_phase")
# force_trigger
agent._force_trigger = MagicMock()
msg.thread = "force_trigger"
msg.body = "trigger_x"
await agent.handle_message(msg)
agent._force_trigger.assert_called_with("trigger_x")
# force_norm
agent._force_norm = MagicMock()
msg.thread = "force_norm"
msg.body = "norm_y"
await agent.handle_message(msg)
agent._force_norm.assert_called_with("norm_y")
# force_next_phase
agent._force_next_phase = MagicMock()
msg.thread = "force_next_phase"
msg.body = ""
await agent.handle_message(msg)
agent._force_next_phase.assert_called_once()
# unknown interrupt
agent.logger = MagicMock()
msg.thread = "unknown_thing"
await agent.handle_message(msg)
agent.logger.warning.assert_called()
@pytest.mark.asyncio
async def test_custom_action_reply_with_goal(agent):
agent._send_to_llm = MagicMock(side_effect=agent.send)
agent._add_custom_actions()
action_fn = agent.actions.actions[(".reply_with_goal", 3)]
mock_term = MagicMock(args=["msg", "norms", "goal"])
gen = action_fn(agent, mock_term, MagicMock())
next(gen)
agent._send_to_llm.assert_called_with("msg", "norms", "goal")
@pytest.mark.asyncio
async def test_custom_action_notify_norms(agent):
agent._add_custom_actions()
action_fn = agent.actions.actions[(".notify_norms", 1)]
mock_term = MagicMock(args=["norms_list"])
gen = action_fn(agent, mock_term, MagicMock())
next(gen)
agent.send.assert_called()
msg = agent.send.call_args[0][0]
assert msg.thread == "active_norms_update"
assert msg.body == "norms_list"
@pytest.mark.asyncio
async def test_custom_action_say(agent):
agent._add_custom_actions()
action_fn = agent.actions.actions[(".say", 1)]
mock_term = MagicMock(args=["hello"])
gen = action_fn(agent, mock_term, MagicMock())
next(gen)
assert agent.send.call_count == 2
msgs = [c[0][0] for c in agent.send.call_args_list]
assert any(m.to == settings.agent_settings.robot_speech_name for m in msgs)
assert any(
m.to == settings.agent_settings.llm_name and m.thread == "assistant_message" for m in msgs
)
@pytest.mark.asyncio
async def test_custom_action_gesture(agent):
agent._add_custom_actions()
# Test single
action_fn = agent.actions.actions[(".gesture", 2)]
mock_term = MagicMock(args=["single", "wave"])
gen = action_fn(agent, mock_term, MagicMock())
next(gen)
msg = agent.send.call_args[0][0]
assert "actuate/gesture/single" in msg.body
# Test tag
mock_term.args = ["tag", "happy"]
gen = action_fn(agent, mock_term, MagicMock())
next(gen)
msg = agent.send.call_args[0][0]
assert "actuate/gesture/tag" in msg.body
@pytest.mark.asyncio
async def test_custom_action_notify_user_said(agent):
agent._add_custom_actions()
action_fn = agent.actions.actions[(".notify_user_said", 1)]
mock_term = MagicMock(args=["hello"])
gen = action_fn(agent, mock_term, MagicMock())
next(gen)
msg = agent.send.call_args[0][0]
assert msg.to == settings.agent_settings.llm_name
assert msg.thread == "user_message"
@pytest.mark.asyncio
async def test_custom_action_notify_trigger_start_end(agent):
agent._add_custom_actions()
# Start
action_fn = agent.actions.actions[(".notify_trigger_start", 1)]
gen = action_fn(agent, MagicMock(args=["t1"]), MagicMock())
next(gen)
assert agent.send.call_args[0][0].thread == "trigger_start"
# End
action_fn = agent.actions.actions[(".notify_trigger_end", 1)]
gen = action_fn(agent, MagicMock(args=["t1"]), MagicMock())
next(gen)
assert agent.send.call_args[0][0].thread == "trigger_end"
@pytest.mark.asyncio
async def test_custom_action_notify_goal_start(agent):
agent._add_custom_actions()
action_fn = agent.actions.actions[(".notify_goal_start", 1)]
gen = action_fn(agent, MagicMock(args=["g1"]), MagicMock())
next(gen)
assert agent.send.call_args[0][0].thread == "goal_start"
@pytest.mark.asyncio
async def test_custom_action_notify_transition_phase(agent):
agent._add_custom_actions()
action_fn = agent.actions.actions[(".notify_transition_phase", 2)]
gen = action_fn(agent, MagicMock(args=["old", "new"]), MagicMock())
next(gen)
msg = agent.send.call_args[0][0]
assert msg.thread == "transition_phase"
assert "old" in msg.body and "new" in msg.body
def test_remove_belief_no_args(agent):
agent._wake_bdi_loop = MagicMock()
agent.bdi_agent.call.return_value = True
agent._remove_belief("fact", None)
assert agent.bdi_agent.call.called
def test_set_goal_with_args(agent):
agent._wake_bdi_loop = MagicMock()
agent._set_goal("goal", ["arg1", "arg2"])
assert agent.bdi_agent.call.called
def test_format_belief_string():
assert BDICoreAgent.format_belief_string("b") == "b"
assert BDICoreAgent.format_belief_string("b", ["a1", "a2"]) == "b(a1,a2)"
def test_force_norm(agent):
agent._add_belief = MagicMock()
agent._force_norm("be_polite")
agent._add_belief.assert_called_with("force_be_polite")
def test_force_trigger(agent):
agent._set_goal = MagicMock()
agent._force_trigger("trig")
agent._set_goal.assert_called_with("trig")
def test_force_next_phase(agent):
agent._set_goal = MagicMock()
agent._force_next_phase()
agent._set_goal.assert_called_with("force_transition_phase")

View File

@@ -1,14 +1,30 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import json
import sys
import uuid
from unittest.mock import AsyncMock
from unittest.mock import AsyncMock, MagicMock, mock_open, patch
import pytest
from control_backend.agents.bdi.bdi_program_manager import BDIProgramManager
from control_backend.core.agent_system import InternalMessage
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.program import BasicNorm, Goal, Phase, Plan, Program
from control_backend.schemas.program import (
BasicNorm,
ConditionalNorm,
Goal,
InferredBelief,
KeywordBelief,
Phase,
Plan,
Program,
Trigger,
)
# Fix Windows Proactor loop for zmq
if sys.platform.startswith("win"):
@@ -48,24 +64,26 @@ def make_valid_program_json(norm="N1", goal="G1") -> str:
).model_dump_json()
@pytest.mark.skip(reason="Functionality being rebuilt.")
@pytest.mark.asyncio
async def test_send_to_bdi():
async def test_create_agentspeak_and_send_to_bdi(mock_settings):
manager = BDIProgramManager(name="program_manager_test")
manager.send = AsyncMock()
program = Program.model_validate_json(make_valid_program_json())
await manager._create_agentspeak_and_send_to_bdi(program)
with patch("builtins.open", mock_open()) as mock_file:
await manager._create_agentspeak_and_send_to_bdi(program)
# Check file writing
mock_file.assert_called_with(mock_settings.behaviour_settings.agentspeak_file, "w")
handle = mock_file()
handle.write.assert_called()
assert manager.send.await_count == 1
msg: InternalMessage = manager.send.await_args[0][0]
assert msg.thread == "beliefs"
beliefs = BeliefMessage.model_validate_json(msg.body)
names = {b.name: b.arguments for b in beliefs.beliefs}
assert "norms" in names and names["norms"] == ["N1"]
assert "goals" in names and names["goals"] == ["G1"]
assert msg.thread == "new_program"
assert msg.to == mock_settings.agent_settings.bdi_core_name
assert msg.body == mock_settings.behaviour_settings.agentspeak_file
@pytest.mark.asyncio
@@ -81,6 +99,9 @@ async def test_receive_programs_valid_and_invalid():
manager.sub_socket = sub
manager._create_agentspeak_and_send_to_bdi = AsyncMock()
manager._send_clear_llm_history = AsyncMock()
manager._send_program_to_user_interrupt = AsyncMock()
manager._send_beliefs_to_semantic_belief_extractor = AsyncMock()
manager._send_goals_to_semantic_belief_extractor = AsyncMock()
try:
# Will give StopAsyncIteration when the predefined `sub.recv_multipart` side-effects run out
@@ -94,7 +115,8 @@ async def test_receive_programs_valid_and_invalid():
assert forwarded.phases[0].norms[0].name == "N1"
assert forwarded.phases[0].goals[0].name == "G1"
# Verify history clear was triggered
# Verify history clear was triggered exactly once (for the valid program)
# The invalid program loop `continue`s before calling _send_clear_llm_history
assert manager._send_clear_llm_history.await_count == 1
@@ -113,4 +135,274 @@ async def test_send_clear_llm_history(mock_settings):
# Verify the content and recipient
assert msg.body == "clear_history"
assert msg.to == "llm_agent"
@pytest.mark.asyncio
async def test_handle_message_transition_phase(mock_settings):
mock_settings.agent_settings.user_interrupt_name = "user_interrupt_agent"
manager = BDIProgramManager(name="program_manager_test")
manager.send = AsyncMock()
# Setup state
prog = Program.model_validate_json(make_valid_program_json(norm="N1", goal="G1"))
manager._initialize_internal_state(prog)
# Test valid transition (to same phase for simplicity, or we need 2 phases)
# Let's create a program with 2 phases
phase2_id = uuid.uuid4()
phase2 = Phase(id=phase2_id, name="Phase 2", norms=[], goals=[], triggers=[])
prog.phases.append(phase2)
manager._initialize_internal_state(prog)
current_phase_id = str(prog.phases[0].id)
next_phase_id = str(phase2_id)
payload = json.dumps({"old": current_phase_id, "new": next_phase_id})
msg = InternalMessage(to="me", sender="bdi", body=payload, thread="transition_phase")
await manager.handle_message(msg)
assert str(manager._phase.id) == next_phase_id
# Allow background tasks to run (add_behavior)
await asyncio.sleep(0)
# Check notifications sent
# 1. beliefs to extractor
# 2. goals to extractor
# 3. notification to user interrupt
assert manager.send.await_count >= 3
# Verify user interrupt notification
calls = manager.send.await_args_list
ui_msgs = [
c[0][0] for c in calls if c[0][0].to == mock_settings.agent_settings.user_interrupt_name
]
assert len(ui_msgs) > 0
assert ui_msgs[-1].body == next_phase_id
@pytest.mark.asyncio
async def test_handle_message_transition_phase_desync():
manager = BDIProgramManager(name="program_manager_test")
manager.logger = MagicMock()
prog = Program.model_validate_json(make_valid_program_json())
manager._initialize_internal_state(prog)
current_phase_id = str(prog.phases[0].id)
# Request transition from WRONG old phase
payload = json.dumps({"old": "wrong_id", "new": "some_new_id"})
msg = InternalMessage(to="me", sender="bdi", body=payload, thread="transition_phase")
await manager.handle_message(msg)
# Should warn and do nothing
manager.logger.warning.assert_called_once()
assert "Phase transition desync detected" in manager.logger.warning.call_args[0][0]
assert str(manager._phase.id) == current_phase_id
@pytest.mark.asyncio
async def test_handle_message_transition_phase_end(mock_settings):
mock_settings.agent_settings.user_interrupt_name = "user_interrupt_agent"
manager = BDIProgramManager(name="program_manager_test")
manager.send = AsyncMock()
prog = Program.model_validate_json(make_valid_program_json())
manager._initialize_internal_state(prog)
current_phase_id = str(prog.phases[0].id)
payload = json.dumps({"old": current_phase_id, "new": "end"})
msg = InternalMessage(to="me", sender="bdi", body=payload, thread="transition_phase")
await manager.handle_message(msg)
assert manager._phase is None
# Allow background tasks to run (add_behavior)
await asyncio.sleep(0)
# Verify notification to user interrupt
assert manager.send.await_count == 1
msg_sent = manager.send.await_args[0][0]
assert msg_sent.to == mock_settings.agent_settings.user_interrupt_name
assert msg_sent.body == "end"
@pytest.mark.asyncio
async def test_handle_message_achieve_goal(mock_settings):
mock_settings.agent_settings.text_belief_extractor_name = "text_belief_extractor_agent"
manager = BDIProgramManager(name="program_manager_test")
manager.send = AsyncMock()
prog = Program.model_validate_json(make_valid_program_json(goal="TargetGoal"))
manager._initialize_internal_state(prog)
goal_id = str(prog.phases[0].goals[0].id)
msg = InternalMessage(to="me", sender="ui", body=goal_id, thread="achieve_goal")
await manager.handle_message(msg)
# Should send achieved goals to text extractor
assert manager.send.await_count == 1
msg_sent = manager.send.await_args[0][0]
assert msg_sent.to == mock_settings.agent_settings.text_belief_extractor_name
assert msg_sent.thread == "achieved_goals"
# Verify body
from control_backend.schemas.belief_list import GoalList
gl = GoalList.model_validate_json(msg_sent.body)
assert len(gl.goals) == 1
assert gl.goals[0].name == "TargetGoal"
@pytest.mark.asyncio
async def test_handle_message_achieve_goal_not_found():
manager = BDIProgramManager(name="program_manager_test")
manager.send = AsyncMock()
manager.logger = MagicMock()
prog = Program.model_validate_json(make_valid_program_json())
manager._initialize_internal_state(prog)
msg = InternalMessage(to="me", sender="ui", body="non_existent_id", thread="achieve_goal")
await manager.handle_message(msg)
manager.send.assert_not_called()
manager.logger.debug.assert_called()
@pytest.mark.asyncio
async def test_setup(mock_settings):
manager = BDIProgramManager(name="program_manager_test")
manager.send = AsyncMock()
def close_coro(coro):
coro.close()
return MagicMock()
manager.add_behavior = MagicMock(side_effect=close_coro)
mock_context = MagicMock()
mock_sub = MagicMock()
mock_context.socket.return_value = mock_sub
with patch(
"control_backend.agents.bdi.bdi_program_manager.Context.instance", return_value=mock_context
):
# We also need to mock file writing in _create_agentspeak_and_send_to_bdi
with patch("builtins.open", new_callable=MagicMock):
await manager.setup()
# Check logic
# 1. Sends default empty program to BDI
assert manager.send.await_count == 1
assert manager.send.await_args[0][0].to == mock_settings.agent_settings.bdi_core_name
# 2. Connects SUB socket
mock_sub.connect.assert_called_with(mock_settings.zmq_settings.internal_sub_address)
mock_sub.subscribe.assert_called_with("program")
# 3. Adds behavior
manager.add_behavior.assert_called()
@pytest.mark.asyncio
async def test_send_program_to_user_interrupt(mock_settings):
"""Test directly sending the program to the user interrupt agent."""
mock_settings.agent_settings.user_interrupt_name = "user_interrupt_agent"
manager = BDIProgramManager(name="program_manager_test")
manager.send = AsyncMock()
program = Program.model_validate_json(make_valid_program_json())
await manager._send_program_to_user_interrupt(program)
assert manager.send.await_count == 1
msg = manager.send.await_args[0][0]
assert msg.to == "user_interrupt_agent"
assert msg.thread == "new_program"
assert "Basic Phase" in msg.body
@pytest.mark.asyncio
async def test_complex_program_extraction():
manager = BDIProgramManager(name="program_manager_test")
# 1. Create Complex Components
# Inferred Belief (A & B)
belief_left = KeywordBelief(id=uuid.uuid4(), name="b1", keyword="hot")
belief_right = KeywordBelief(id=uuid.uuid4(), name="b2", keyword="sunny")
inferred_belief = InferredBelief(
id=uuid.uuid4(), name="b_inf", operator="AND", left=belief_left, right=belief_right
)
# Conditional Norm
cond_norm = ConditionalNorm(
id=uuid.uuid4(), name="norm_cond", norm="wear_hat", condition=inferred_belief
)
# Trigger with Inferred Belief condition
dummy_plan = Plan(id=uuid.uuid4(), name="dummy_plan", steps=[])
trigger = Trigger(id=uuid.uuid4(), name="trigger_1", condition=inferred_belief, plan=dummy_plan)
# Nested Goal
sub_goal = Goal(
id=uuid.uuid4(),
name="sub_goal",
description="desc",
plan=Plan(id=uuid.uuid4(), name="empty", steps=[]),
can_fail=True,
)
parent_goal = Goal(
id=uuid.uuid4(),
name="parent_goal",
description="desc",
# The plan contains the sub_goal as a step
plan=Plan(id=uuid.uuid4(), name="parent_plan", steps=[sub_goal]),
can_fail=False,
)
# 2. Assemble Program
phase = Phase(
id=uuid.uuid4(),
name="Complex Phase",
norms=[cond_norm],
goals=[parent_goal],
triggers=[trigger],
)
program = Program(phases=[phase])
# 3. Initialize Internal State (Triggers _populate_goal_mapping -> Nested Goal logic)
manager._initialize_internal_state(program)
# Assertion for Line 53-54 (Mapping population)
# Both parent and sub-goal should be mapped
assert str(parent_goal.id) in manager._goal_mapping
assert str(sub_goal.id) in manager._goal_mapping
# 4. Test Belief Extraction (Triggers lines 132-133, 142-146)
beliefs = manager._extract_current_beliefs()
# Should extract recursive beliefs from cond_norm and trigger
# Inferred belief splits into Left + Right. Since we use it twice, we get duplicates
# checking existence is enough.
belief_names = [b.name for b in beliefs]
assert "b1" in belief_names
assert "b2" in belief_names
# 5. Test Goal Extraction (Triggers lines 173, 185)
goals = manager._extract_current_goals()
goal_names = [g.name for g in goals]
assert "parent_goal" in goal_names
assert "sub_goal" in goal_names

View File

@@ -1,135 +0,0 @@
import json
from unittest.mock import AsyncMock
import pytest
from control_backend.agents.bdi import (
BDIBeliefCollectorAgent,
)
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import Belief
@pytest.fixture
def agent():
agent = BDIBeliefCollectorAgent("belief_collector_agent")
return agent
def make_msg(body: dict, sender: str = "sender"):
return InternalMessage(to="collector", sender=sender, body=json.dumps(body))
@pytest.mark.asyncio
async def test_handle_message_routes_belief_text(agent, mocker):
"""
Test that when a message is received, _handle_belief_text is called with that message.
"""
payload = {"type": "belief_extraction_text", "beliefs": {"user_said": [["hi"]]}}
spy = mocker.patch.object(agent, "_handle_belief_text", new_callable=AsyncMock)
await agent.handle_message(make_msg(payload))
spy.assert_awaited_once_with(payload, "sender")
@pytest.mark.asyncio
async def test_handle_message_routes_emotion(agent, mocker):
payload = {"type": "emotion_extraction_text"}
spy = mocker.patch.object(agent, "_handle_emo_text", new_callable=AsyncMock)
await agent.handle_message(make_msg(payload))
spy.assert_awaited_once_with(payload, "sender")
@pytest.mark.asyncio
async def test_handle_message_bad_json(agent, mocker):
agent._handle_belief_text = AsyncMock()
bad_msg = InternalMessage(to="collector", sender="sender", body="not json")
await agent.handle_message(bad_msg)
agent._handle_belief_text.assert_not_awaited()
@pytest.mark.asyncio
async def test_handle_belief_text_sends_when_beliefs_exist(agent, mocker):
payload = {"type": "belief_extraction_text", "beliefs": {"user_said": ["hello"]}}
spy = mocker.patch.object(agent, "_send_beliefs_to_bdi", new_callable=AsyncMock)
expected = [Belief(name="user_said", arguments=["hello"])]
await agent._handle_belief_text(payload, "origin")
spy.assert_awaited_once_with(expected, origin="origin")
@pytest.mark.asyncio
async def test_handle_belief_text_no_send_when_empty(agent, mocker):
payload = {"type": "belief_extraction_text", "beliefs": {}}
spy = mocker.patch.object(agent, "_send_beliefs_to_bdi", new_callable=AsyncMock)
await agent._handle_belief_text(payload, "origin")
spy.assert_not_awaited()
@pytest.mark.asyncio
async def test_send_beliefs_to_bdi(agent):
agent.send = AsyncMock()
beliefs = [Belief(name="user_said", arguments=["hello", "world"])]
await agent._send_beliefs_to_bdi(beliefs, origin="origin")
agent.send.assert_awaited_once()
sent: InternalMessage = agent.send.call_args.args[0]
assert sent.to == settings.agent_settings.bdi_core_name
assert sent.thread == "beliefs"
assert json.loads(sent.body)["create"] == [belief.model_dump() for belief in beliefs]
@pytest.mark.asyncio
async def test_setup_executes(agent):
"""Covers setup and asserts the agent has a name."""
await agent.setup()
assert agent.name == "belief_collector_agent" # simple property assertion
@pytest.mark.asyncio
async def test_handle_message_unrecognized_type_executes(agent):
"""Covers the else branch for unrecognized message type."""
payload = {"type": "unknown_type"}
msg = make_msg(payload, sender="tester")
# Wrap send to ensure nothing is sent
agent.send = AsyncMock()
await agent.handle_message(msg)
# Assert no messages were sent
agent.send.assert_not_awaited()
@pytest.mark.asyncio
async def test_handle_emo_text_executes(agent):
"""Covers the _handle_emo_text method."""
# The method does nothing, but we can assert it returns None
result = await agent._handle_emo_text({}, "origin")
assert result is None
@pytest.mark.asyncio
async def test_send_beliefs_to_bdi_empty_executes(agent):
"""Covers early return when beliefs are empty."""
agent.send = AsyncMock()
await agent._send_beliefs_to_bdi({})
# Assert that nothing was sent
agent.send.assert_not_awaited()
@pytest.mark.asyncio
async def test_handle_belief_text_invalid_returns_none(agent, mocker):
payload = {"type": "belief_extraction_text", "beliefs": {"user_said": "invalid-argument"}}
result = await agent._handle_belief_text(payload, "origin")
# The method itself returns None
assert result is None

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import json
import uuid
from unittest.mock import AsyncMock, MagicMock, patch
@@ -14,6 +20,7 @@ from control_backend.schemas.belief_message import Belief as InternalBelief
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.chat_history import ChatHistory, ChatMessage
from control_backend.schemas.program import (
BaseGoal, # Changed from Goal
ConditionalNorm,
KeywordBelief,
LLMAction,
@@ -28,7 +35,8 @@ from control_backend.schemas.program import (
@pytest.fixture
def llm():
llm = TextBeliefExtractorAgent.LLM(MagicMock(), 4)
llm._query_llm = AsyncMock()
# We must ensure _query_llm returns a dictionary so iterating it doesn't fail
llm._query_llm = AsyncMock(return_value={})
return llm
@@ -357,6 +365,30 @@ async def test_simulated_real_turn_remove_belief(agent, llm, sample_program):
assert any(b.name == "no_more_booze" for b in agent._current_beliefs.false)
@pytest.mark.asyncio
async def test_infer_goal_completions_sends_beliefs(agent, llm):
"""Test that inferred goal completions are sent to the BDI core."""
goal = BaseGoal(
id=uuid.uuid4(), name="Say Hello", description="The user said hello", can_fail=True
)
agent.goal_inferrer.goals = {goal}
# Mock goal inference: goal is achieved
llm.query = AsyncMock(return_value=True)
await agent._infer_goal_completions()
# Should send belief change to BDI core
agent.send.assert_awaited_once()
sent: InternalMessage = agent.send.call_args.args[0]
assert sent.to == settings.agent_settings.bdi_core_name
assert sent.thread == "beliefs"
parsed = BeliefMessage.model_validate_json(sent.body)
assert len(parsed.create) == 1
assert parsed.create[0].name == "achieved_say_hello"
@pytest.mark.asyncio
async def test_llm_failure_handling(agent, llm, sample_program):
"""
@@ -374,3 +406,155 @@ async def test_llm_failure_handling(agent, llm, sample_program):
assert len(belief_changes.true) == 0
assert len(belief_changes.false) == 0
def test_belief_state_bool():
# Empty
bs = BeliefState()
assert not bs
# True set
bs_true = BeliefState(true={InternalBelief(name="a", arguments=None)})
assert bs_true
# False set
bs_false = BeliefState(false={InternalBelief(name="a", arguments=None)})
assert bs_false
@pytest.mark.asyncio
async def test_handle_beliefs_message_validation_error(agent, mock_settings):
# Invalid JSON
mock_settings.agent_settings.bdi_program_manager_name = "bdi_program_manager_agent"
msg = InternalMessage(
to="me",
sender=mock_settings.agent_settings.bdi_program_manager_name,
thread="beliefs",
body="invalid json",
)
# Should log warning and return
agent.logger = MagicMock()
await agent.handle_message(msg)
agent.logger.warning.assert_called()
# Invalid Model
msg.body = json.dumps({"beliefs": [{"invalid": "obj"}]})
await agent.handle_message(msg)
agent.logger.warning.assert_called()
@pytest.mark.asyncio
async def test_handle_goals_message_validation_error(agent, mock_settings):
mock_settings.agent_settings.bdi_program_manager_name = "bdi_program_manager_agent"
msg = InternalMessage(
to="me",
sender=mock_settings.agent_settings.bdi_program_manager_name,
thread="goals",
body="invalid json",
)
agent.logger = MagicMock()
await agent.handle_message(msg)
agent.logger.warning.assert_called()
@pytest.mark.asyncio
async def test_handle_goal_achieved_message_validation_error(agent, mock_settings):
mock_settings.agent_settings.bdi_program_manager_name = "bdi_program_manager_agent"
msg = InternalMessage(
to="me",
sender=mock_settings.agent_settings.bdi_program_manager_name,
thread="achieved_goals",
body="invalid json",
)
agent.logger = MagicMock()
await agent.handle_message(msg)
agent.logger.warning.assert_called()
@pytest.mark.asyncio
async def test_goal_inferrer_infer_from_conversation(agent, llm):
# Setup goals
# Use BaseGoal object as typically received by the extractor
g1 = BaseGoal(id=uuid.uuid4(), name="g1", description="desc", can_fail=True)
# Use real GoalAchievementInferrer
from control_backend.agents.bdi.text_belief_extractor_agent import GoalAchievementInferrer
inferrer = GoalAchievementInferrer(llm)
inferrer.goals = {g1}
# Mock LLM response
llm._query_llm.return_value = True
completions = await inferrer.infer_from_conversation(ChatHistory(messages=[]))
assert completions
# slugify uses slugify library, hard to predict exact string without it,
# but we can check values
assert list(completions.values())[0] is True
def test_apply_conversation_message_limit(agent):
with patch("control_backend.agents.bdi.text_belief_extractor_agent.settings") as mock_s:
mock_s.behaviour_settings.conversation_history_length_limit = 2
agent.conversation.messages = []
agent._apply_conversation_message(ChatMessage(role="user", content="1"))
agent._apply_conversation_message(ChatMessage(role="assistant", content="2"))
agent._apply_conversation_message(ChatMessage(role="user", content="3"))
assert len(agent.conversation.messages) == 2
assert agent.conversation.messages[0].content == "2"
assert agent.conversation.messages[1].content == "3"
@pytest.mark.asyncio
async def test_handle_program_manager_reset(agent):
with patch("control_backend.agents.bdi.text_belief_extractor_agent.settings") as mock_s:
mock_s.agent_settings.bdi_program_manager_name = "pm"
agent.conversation.messages = [ChatMessage(role="user", content="hi")]
agent.belief_inferrer.available_beliefs = [
SemanticBelief(id=uuid.uuid4(), name="b", description="d")
]
msg = InternalMessage(to="me", sender="pm", thread="conversation_history", body="reset")
await agent.handle_message(msg)
assert len(agent.conversation.messages) == 0
assert len(agent.belief_inferrer.available_beliefs) == 0
def test_split_into_chunks():
from control_backend.agents.bdi.text_belief_extractor_agent import SemanticBeliefInferrer
items = [1, 2, 3, 4, 5]
chunks = SemanticBeliefInferrer._split_into_chunks(items, 2)
assert len(chunks) == 2
assert len(chunks[0]) + len(chunks[1]) == 5
@pytest.mark.asyncio
async def test_infer_beliefs_call(agent, llm):
from control_backend.agents.bdi.text_belief_extractor_agent import SemanticBeliefInferrer
inferrer = SemanticBeliefInferrer(llm)
sb = SemanticBelief(id=uuid.uuid4(), name="is_happy", description="User is happy")
llm.query = AsyncMock(return_value={"is_happy": True})
res = await inferrer._infer_beliefs(ChatHistory(messages=[]), [sb])
assert res == {"is_happy": True}
llm.query.assert_called_once()
@pytest.mark.asyncio
async def test_infer_goal_call(agent, llm):
from control_backend.agents.bdi.text_belief_extractor_agent import GoalAchievementInferrer
inferrer = GoalAchievementInferrer(llm)
goal = BaseGoal(id=uuid.uuid4(), name="g1", description="d")
llm.query = AsyncMock(return_value=True)
res = await inferrer._infer_goal(ChatHistory(messages=[]), goal)
assert res is True
llm.query.assert_called_once()

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
from unittest.mock import ANY, AsyncMock, MagicMock, patch
@@ -53,7 +59,11 @@ async def test_setup_success_connects_and_starts_robot(zmq_context):
MockGesture.return_value.start = AsyncMock()
agent = RICommunicationAgent("ri_comm", address="tcp://localhost:5555", bind=False)
agent.add_behavior = MagicMock()
def close_coro(coro):
coro.close()
return MagicMock()
agent.add_behavior = MagicMock(side_effect=close_coro)
await agent.setup()
@@ -83,7 +93,11 @@ async def test_setup_binds_when_requested(zmq_context):
agent = RICommunicationAgent("ri_comm", address="tcp://localhost:5555", bind=True)
agent.add_behavior = MagicMock()
def close_coro(coro):
coro.close()
return MagicMock()
agent.add_behavior = MagicMock(side_effect=close_coro)
with (
patch(speech_agent_path(), autospec=True) as MockSpeech,
@@ -151,6 +165,7 @@ async def test_handle_negotiation_response_updates_req_socket(zmq_context):
@pytest.mark.asyncio
async def test_handle_disconnection_publishes_and_reconnects():
pub_socket = AsyncMock()
pub_socket.close = MagicMock()
agent = RICommunicationAgent("ri_comm")
agent.pub_socket = pub_socket
agent.connected = True
@@ -233,6 +248,25 @@ async def test_handle_negotiation_response_unhandled_id():
)
@pytest.mark.asyncio
async def test_handle_negotiation_response_audio(zmq_context):
agent = RICommunicationAgent("ri_comm")
with patch(
"control_backend.agents.communication.ri_communication_agent.VADAgent", autospec=True
) as MockVAD:
MockVAD.return_value.start = AsyncMock()
await agent._handle_negotiation_response(
{"data": [{"id": "audio", "port": 7000, "bind": False}]}
)
MockVAD.assert_called_once_with(
audio_in_address="tcp://localhost:7000", audio_in_bind=False
)
MockVAD.return_value.start.assert_awaited_once()
@pytest.mark.asyncio
async def test_stop_closes_sockets():
req = MagicMock()
@@ -323,6 +357,7 @@ async def test_listen_loop_generic_exception():
@pytest.mark.asyncio
async def test_handle_disconnection_timeout(monkeypatch):
pub = AsyncMock()
pub.close = MagicMock()
pub.send_multipart = AsyncMock(side_effect=TimeoutError)
agent = RICommunicationAgent("ri_comm")

View File

@@ -1,4 +1,10 @@
"""Mocks `httpx` and tests chunking logic."""
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
--------------------------------------------------------------------------------
Mocks `httpx` and tests chunking logic.
"""
from unittest.mock import AsyncMock, MagicMock, patch
@@ -18,6 +24,12 @@ def mock_httpx_client():
yield mock_client
@pytest.fixture(autouse=True)
def mock_experiment_logger():
with patch("control_backend.agents.llm.llm_agent.experiment_logger") as logger:
yield logger
@pytest.mark.asyncio
async def test_llm_processing_success(mock_httpx_client, mock_settings):
# Setup the mock response for the stream
@@ -58,17 +70,64 @@ async def test_llm_processing_success(mock_httpx_client, mock_settings):
to="llm_agent",
sender=mock_settings.agent_settings.bdi_core_name,
body=prompt.model_dump_json(),
thread="prompt_message", # REQUIRED: thread must match handle_message logic
)
agent._process_bdi_message = AsyncMock()
await agent.handle_message(msg)
agent._process_bdi_message.assert_called()
@pytest.mark.asyncio
async def test_process_bdi_message_success(mock_httpx_client, mock_settings):
# Setup the mock response for the stream
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
# Simulate stream lines
lines = [
b'data: {"choices": [{"delta": {"content": "Hello"}}]}',
b'data: {"choices": [{"delta": {"content": " world"}}]}',
b'data: {"choices": [{"delta": {"content": "."}}]}',
b"data: [DONE]",
]
async def aiter_lines_gen():
for line in lines:
yield line.decode()
mock_response.aiter_lines.side_effect = aiter_lines_gen
mock_stream_context = MagicMock()
mock_stream_context.__aenter__ = AsyncMock(return_value=mock_response)
mock_stream_context.__aexit__ = AsyncMock(return_value=None)
# Configure the client
mock_httpx_client.stream = MagicMock(return_value=mock_stream_context)
# Setup Agent
agent = LLMAgent("llm_agent")
agent.send = AsyncMock() # Mock the send method to verify replies
mock_logger = MagicMock()
agent.logger = mock_logger
# Simulate receiving a message from BDI
prompt = LLMPromptMessage(text="Hi", norms=[], goals=[])
await agent._process_bdi_message(prompt)
# Verification
# "Hello world." constitutes one sentence/chunk based on punctuation split
# The agent should call send once with the full sentence
# The agent should call send once with the full sentence, PLUS once more for full reply
assert agent.send.called
args = agent.send.call_args_list[0][0][0]
assert args.to == mock_settings.agent_settings.bdi_core_name
assert "Hello world." in args.body
# Check args. We expect at least one call sending "Hello world."
calls = agent.send.call_args_list
bodies = [c[0][0].body for c in calls]
assert any("Hello world." in b for b in bodies)
@pytest.mark.asyncio
@@ -76,22 +135,15 @@ async def test_llm_processing_errors(mock_httpx_client, mock_settings):
agent = LLMAgent("llm_agent")
agent.send = AsyncMock()
prompt = LLMPromptMessage(text="Hi", norms=[], goals=[])
msg = InternalMessage(
to="llm",
sender=mock_settings.agent_settings.bdi_core_name,
body=prompt.model_dump_json(),
)
# HTTP Error
# HTTP Error: stream method RAISES exception immediately
mock_httpx_client.stream = MagicMock(side_effect=httpx.HTTPError("Fail"))
await agent.handle_message(msg)
assert "LLM service unavailable." in agent.send.call_args[0][0].body
# General Exception
agent.send.reset_mock()
mock_httpx_client.stream = MagicMock(side_effect=Exception("Boom"))
await agent.handle_message(msg)
assert "Error processing the request." in agent.send.call_args[0][0].body
await agent._process_bdi_message(prompt)
# Check that error message was sent
assert agent.send.called
assert "LLM service unavailable." in agent.send.call_args_list[0][0][0].body
@pytest.mark.asyncio
@@ -113,16 +165,13 @@ async def test_llm_json_error(mock_httpx_client, mock_settings):
agent = LLMAgent("llm_agent")
agent.send = AsyncMock()
# Ensure logger is mocked
agent.logger = MagicMock()
with patch.object(agent.logger, "error") as log:
prompt = LLMPromptMessage(text="Hi", norms=[], goals=[])
msg = InternalMessage(
to="llm",
sender=mock_settings.agent_settings.bdi_core_name,
body=prompt.model_dump_json(),
)
await agent.handle_message(msg)
log.assert_called() # Should log JSONDecodeError
prompt = LLMPromptMessage(text="Hi", norms=[], goals=[])
await agent._process_bdi_message(prompt)
agent.logger.error.assert_called() # Should log JSONDecodeError
def test_llm_instructions():
@@ -157,6 +206,7 @@ async def test_handle_message_validation_error_branch_no_send(mock_httpx_client,
to="llm_agent",
sender=mock_settings.agent_settings.bdi_core_name,
body=invalid_json,
thread="prompt_message",
)
await agent.handle_message(msg)
@@ -285,3 +335,28 @@ async def test_clear_history_command(mock_settings):
)
await agent.handle_message(msg)
assert len(agent.history) == 0
@pytest.mark.asyncio
async def test_handle_assistant_and_user_messages(mock_settings):
agent = LLMAgent("llm_agent")
# Assistant message
msg_ast = InternalMessage(
to="llm_agent",
sender=mock_settings.agent_settings.bdi_core_name,
thread="assistant_message",
body="I said this",
)
await agent.handle_message(msg_ast)
assert agent.history[-1] == {"role": "assistant", "content": "I said this"}
# User message
msg_usr = InternalMessage(
to="llm_agent",
sender=mock_settings.agent_settings.bdi_core_name,
thread="user_message",
body="User said this",
)
await agent.handle_message(msg_usr)
assert agent.history[-1] == {"role": "user", "content": "User said this"}

View File

@@ -0,0 +1,152 @@
from unittest.mock import AsyncMock, MagicMock
import pytest
import zmq
import control_backend.agents.perception.face_rec_agent as face_module
from control_backend.agents.perception.face_rec_agent import FacePerceptionAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.schemas.belief_message import BeliefMessage
@pytest.fixture
def agent():
"""Return a FacePerceptionAgent instance for testing."""
return FacePerceptionAgent(
name="face_agent",
zmq_address="inproc://test",
zmq_bind=False,
)
@pytest.fixture
def socket():
"""Return a mocked ZMQ socket."""
sock = AsyncMock()
sock.setsockopt_string = MagicMock()
sock.connect = MagicMock()
sock.bind = MagicMock()
return sock
def test_connect_socket_connect(agent, socket, monkeypatch):
"""Test that _connect_socket properly connects when zmq_bind=False."""
ctx = MagicMock()
ctx.socket.return_value = socket
monkeypatch.setattr(face_module.azmq, "Context", MagicMock(instance=lambda: ctx))
agent._connect_socket()
socket.setsockopt_string.assert_called_once_with(zmq.SUBSCRIBE, "")
socket.connect.assert_called_once_with(agent._zmq_address)
socket.bind.assert_not_called()
def test_connect_socket_bind(agent, socket, monkeypatch):
"""Test that _connect_socket properly binds when zmq_bind=True."""
agent._zmq_bind = True
ctx = MagicMock()
ctx.socket.return_value = socket
monkeypatch.setattr(face_module.azmq, "Context", MagicMock(instance=lambda: ctx))
agent._connect_socket()
socket.bind.assert_called_once_with(agent._zmq_address)
socket.connect.assert_not_called()
def test_connect_socket_twice_is_noop(agent, socket):
"""Test that calling _connect_socket twice does not overwrite an existing socket."""
agent._socket = socket
agent._connect_socket()
assert agent._socket is socket
@pytest.mark.asyncio
async def test_update_face_belief_present(agent):
"""Test that _update_face_belief(True) creates the 'face_present' belief."""
agent.send = AsyncMock()
await agent._update_face_belief(True)
msg = agent.send.await_args.args[0]
payload = BeliefMessage.model_validate_json(msg.body)
assert payload.create[0].name == "face_present"
@pytest.mark.asyncio
async def test_update_face_belief_absent(agent):
"""Test that _update_face_belief(False) deletes the 'face_present' belief."""
agent.send = AsyncMock()
await agent._update_face_belief(False)
msg = agent.send.await_args.args[0]
payload = BeliefMessage.model_validate_json(msg.body)
assert payload.delete[0].name == "face_present"
@pytest.mark.asyncio
async def test_post_face_belief_present(agent):
"""Test that _post_face_belief(True) sends a belief creation message."""
agent.send = AsyncMock()
await agent._post_face_belief(True)
msg = agent.send.await_args.args[0]
assert '"create"' in msg.body and '"face_present"' in msg.body
@pytest.mark.asyncio
async def test_post_face_belief_absent(agent):
"""Test that _post_face_belief(False) sends a belief deletion message."""
agent.send = AsyncMock()
await agent._post_face_belief(False)
msg = agent.send.await_args.args[0]
assert '"delete"' in msg.body and '"face_present"' in msg.body
@pytest.mark.asyncio
async def test_handle_pause(agent):
"""Test that a 'PAUSE' message clears _paused and resets _last_face_state."""
agent._paused.set()
agent._last_face_state = True
msg = InternalMessage(
to=agent.name,
sender=face_module.settings.agent_settings.user_interrupt_name,
thread="cmd",
body="PAUSE",
)
await agent.handle_message(msg)
assert not agent._paused.is_set()
assert agent._last_face_state is None
@pytest.mark.asyncio
async def test_handle_resume(agent):
"""Test that a 'RESUME' message sets _paused."""
agent._paused.clear()
msg = InternalMessage(
to=agent.name,
sender=face_module.settings.agent_settings.user_interrupt_name,
thread="cmd",
body="RESUME",
)
await agent.handle_message(msg)
assert agent._paused.is_set()
@pytest.mark.asyncio
async def test_handle_unknown_command(agent):
"""Test that an unknown command from UserInterruptAgent is ignored (logs a warning)."""
msg = InternalMessage(
to=agent.name,
sender=face_module.settings.agent_settings.user_interrupt_name,
thread="cmd",
body="???",
)
await agent.handle_message(msg)
@pytest.mark.asyncio
async def test_handle_unknown_sender(agent):
"""Test that messages from unknown senders are ignored."""
msg = InternalMessage(
to=agent.name,
sender="someone_else",
thread="cmd",
body="PAUSE",
)
await agent.handle_message(msg)

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import numpy as np
import pytest
@@ -55,4 +61,6 @@ def test_get_decode_options():
assert isinstance(options["sample_len"], int)
# When disabled, it should not limit output length based on input size
assert "sample_rate" not in options
recognizer = OpenAIWhisperSpeechRecognizer(limit_output_length=False)
options = recognizer._get_decode_options(audio)
assert "sample_len" not in options

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
@@ -14,6 +20,15 @@ from control_backend.agents.perception.transcription_agent.transcription_agent i
)
@pytest.fixture(autouse=True)
def mock_experiment_logger():
with patch(
"control_backend.agents.perception"
".transcription_agent.transcription_agent.experiment_logger"
) as logger:
yield logger
@pytest.mark.asyncio
async def test_transcription_agent_flow(mock_zmq_context):
mock_sub = MagicMock()
@@ -36,7 +51,12 @@ async def test_transcription_agent_flow(mock_zmq_context):
agent.send = AsyncMock()
agent._running = True
agent.add_behavior = AsyncMock()
def close_coro(coro):
coro.close()
return MagicMock()
agent.add_behavior = MagicMock(side_effect=close_coro)
await agent.setup()
@@ -143,7 +163,12 @@ async def test_transcription_loop_continues_after_error(mock_zmq_context):
agent = TranscriptionAgent("tcp://in")
agent._running = True # ← REQUIRED to enter the loop
agent.send = AsyncMock() # should never be called
agent.add_behavior = AsyncMock() # match other tests
def close_coro(coro):
coro.close()
return MagicMock()
agent.add_behavior = MagicMock(side_effect=close_coro) # match other tests
await agent.setup()
@@ -180,7 +205,12 @@ async def test_transcription_continue_branch_when_empty(mock_zmq_context):
# Make loop runnable
agent._running = True
agent.send = AsyncMock()
agent.add_behavior = AsyncMock()
def close_coro(coro):
coro.close()
return MagicMock()
agent.add_behavior = MagicMock(side_effect=close_coro)
await agent.setup()

View File

@@ -0,0 +1,158 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from control_backend.agents.perception.vad_agent import VADAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.schemas.program_status import PROGRAM_STATUS, ProgramStatus
@pytest.fixture(autouse=True)
def mock_zmq():
with patch("zmq.asyncio.Context") as mock:
mock.instance.return_value = MagicMock()
yield mock
@pytest.fixture
def agent():
return VADAgent("tcp://localhost:5555", False)
@pytest.mark.asyncio
async def test_handle_message_pause(agent):
agent._paused = MagicMock()
# It starts set (not paused)
msg = InternalMessage(to="vad", sender="user_interrupt_agent", body="PAUSE")
# We need to mock settings to match sender name
with patch("control_backend.agents.perception.vad_agent.settings") as mock_settings:
mock_settings.agent_settings.user_interrupt_name = "user_interrupt_agent"
await agent.handle_message(msg)
agent._paused.clear.assert_called_once()
assert agent._reset_needed is True
@pytest.mark.asyncio
async def test_handle_message_resume(agent):
agent._paused = MagicMock()
msg = InternalMessage(to="vad", sender="user_interrupt_agent", body="RESUME")
with patch("control_backend.agents.perception.vad_agent.settings") as mock_settings:
mock_settings.agent_settings.user_interrupt_name = "user_interrupt_agent"
await agent.handle_message(msg)
agent._paused.set.assert_called_once()
@pytest.mark.asyncio
async def test_handle_message_unknown_command(agent):
agent._paused = MagicMock()
msg = InternalMessage(to="vad", sender="user_interrupt_agent", body="UNKNOWN")
with patch("control_backend.agents.perception.vad_agent.settings") as mock_settings:
mock_settings.agent_settings.user_interrupt_name = "user_interrupt_agent"
agent.logger = MagicMock()
await agent.handle_message(msg)
agent._paused.clear.assert_not_called()
agent._paused.set.assert_not_called()
@pytest.mark.asyncio
async def test_handle_message_unknown_sender(agent):
agent._paused = MagicMock()
msg = InternalMessage(to="vad", sender="other_agent", body="PAUSE")
with patch("control_backend.agents.perception.vad_agent.settings") as mock_settings:
mock_settings.agent_settings.user_interrupt_name = "user_interrupt_agent"
await agent.handle_message(msg)
agent._paused.clear.assert_not_called()
@pytest.mark.asyncio
async def test_status_loop_waits_for_running(agent):
agent._running = True
agent.program_sub_socket = AsyncMock()
agent.program_sub_socket.close = MagicMock()
agent._reset_stream = AsyncMock()
# Sequence of messages:
# 1. Wrong topic
# 2. Right topic, wrong status (STARTING)
# 3. Right topic, RUNNING -> Should break loop
agent.program_sub_socket.recv_multipart.side_effect = [
(b"wrong_topic", b"whatever"),
(PROGRAM_STATUS, ProgramStatus.STARTING.value),
(PROGRAM_STATUS, ProgramStatus.RUNNING.value),
]
await agent._status_loop()
assert agent._reset_stream.await_count == 1
agent.program_sub_socket.close.assert_called_once()
@pytest.mark.asyncio
async def test_setup_success(agent, mock_zmq):
def close_coro(coro):
coro.close()
return MagicMock()
agent.add_behavior = MagicMock(side_effect=close_coro)
mock_context = mock_zmq.instance.return_value
mock_sub = MagicMock()
mock_pub = MagicMock()
# We expect multiple socket calls:
# 1. audio_in (SUB)
# 2. audio_out (PUB)
# 3. program_sub (SUB)
mock_context.socket.side_effect = [mock_sub, mock_pub, mock_sub]
with patch("control_backend.agents.perception.vad_agent.torch.hub.load") as mock_load:
mock_load.return_value = (MagicMock(), None)
with patch("control_backend.agents.perception.vad_agent.TranscriptionAgent") as MockTrans:
mock_trans_instance = MockTrans.return_value
mock_trans_instance.start = AsyncMock()
await agent.setup()
mock_trans_instance.start.assert_awaited_once()
assert agent.add_behavior.call_count == 2 # streaming_loop + status_loop
assert agent.audio_in_socket is not None
assert agent.audio_out_socket is not None
assert agent.program_sub_socket is not None
@pytest.mark.asyncio
async def test_reset_stream(agent):
mock_poller = MagicMock()
agent.audio_in_poller = mock_poller
# poll(1) returns not None twice, then None
mock_poller.poll = AsyncMock(side_effect=[b"data", b"data", None])
agent._ready = MagicMock()
await agent._reset_stream()
assert mock_poller.poll.await_count == 3
agent._ready.set.assert_called_once()

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from unittest.mock import AsyncMock, MagicMock
import pytest

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from unittest.mock import AsyncMock, MagicMock, patch
import numpy as np
@@ -5,6 +11,7 @@ import pytest
import zmq
from control_backend.agents.perception.vad_agent import VADAgent
from control_backend.core.config import settings
# We don't want to use real ZMQ in unit tests, for example because it can give errors when sockets
@@ -23,7 +30,9 @@ def audio_out_socket():
@pytest.fixture
def vad_agent(audio_out_socket):
return VADAgent("tcp://localhost:5555", False)
agent = VADAgent("tcp://localhost:5555", False)
agent._internal_pub_socket = AsyncMock()
return agent
@pytest.fixture(autouse=True)
@@ -43,6 +52,12 @@ def patch_settings(monkeypatch):
monkeypatch.setattr(vad_agent.settings.vad_settings, "sample_rate_hz", 16_000, raising=False)
@pytest.fixture(autouse=True)
def mock_experiment_logger():
with patch("control_backend.agents.perception.vad_agent.experiment_logger") as logger:
yield logger
async def simulate_streaming_with_probabilities(streaming, probabilities: list[float]):
"""
Simulates a streaming scenario with given VAD model probabilities for testing purposes.
@@ -83,14 +98,15 @@ async def test_voice_activity_detected(audio_out_socket, vad_agent):
Test a scenario where there is voice activity detected between silences.
"""
speech_chunk_count = 5
probabilities = [0.0] * 5 + [1.0] * speech_chunk_count + [0.0] * 5
begin_silence_chunks = settings.behaviour_settings.vad_begin_silence_chunks
probabilities = [0.0] * 15 + [1.0] * speech_chunk_count + [0.0] * 5
vad_agent.audio_out_socket = audio_out_socket
await simulate_streaming_with_probabilities(vad_agent, probabilities)
audio_out_socket.send.assert_called_once()
data = audio_out_socket.send.call_args[0][0]
assert isinstance(data, bytes)
assert len(data) == 512 * 4 * (speech_chunk_count + 1)
assert len(data) == 512 * 4 * (begin_silence_chunks + speech_chunk_count)
@pytest.mark.asyncio
@@ -100,8 +116,9 @@ async def test_voice_activity_short_pause(audio_out_socket, vad_agent):
short pause.
"""
speech_chunk_count = 5
begin_silence_chunks = settings.behaviour_settings.vad_begin_silence_chunks
probabilities = (
[0.0] * 5 + [1.0] * speech_chunk_count + [0.0] + [1.0] * speech_chunk_count + [0.0] * 5
[0.0] * 15 + [1.0] * speech_chunk_count + [0.0] + [1.0] * speech_chunk_count + [0.0] * 5
)
vad_agent.audio_out_socket = audio_out_socket
await simulate_streaming_with_probabilities(vad_agent, probabilities)
@@ -109,8 +126,8 @@ async def test_voice_activity_short_pause(audio_out_socket, vad_agent):
audio_out_socket.send.assert_called_once()
data = audio_out_socket.send.call_args[0][0]
assert isinstance(data, bytes)
# Expecting 13 chunks (2*5 with speech, 1 pause between, 1 as padding)
assert len(data) == 512 * 4 * (speech_chunk_count * 2 + 1 + 1)
# Expecting 13 chunks (2*5 with speech, 1 pause between, begin_silence_chunks as padding)
assert len(data) == 512 * 4 * (speech_chunk_count * 2 + 1 + begin_silence_chunks)
@pytest.mark.asyncio
@@ -135,6 +152,54 @@ async def test_no_data(audio_out_socket, vad_agent):
assert len(vad_agent.audio_buffer) == 0
@pytest.mark.asyncio
async def test_streaming_loop_reset_needed(audio_out_socket, vad_agent):
"""Test that _reset_needed branch works as expected."""
vad_agent._reset_needed = True
vad_agent._ready.set()
vad_agent._paused.set()
vad_agent._running = True
vad_agent.audio_buffer = np.array([1.0], dtype=np.float32)
vad_agent.i_since_speech = 0
# Mock _reset_stream to stop the loop by setting _running=False
async def mock_reset():
vad_agent._running = False
vad_agent._reset_stream = mock_reset
# Needs a poller to avoid AssertionError
vad_agent.audio_in_poller = AsyncMock()
vad_agent.audio_in_poller.poll.return_value = None
await vad_agent._streaming_loop()
assert vad_agent._reset_needed is False
assert len(vad_agent.audio_buffer) == 0
assert vad_agent.i_since_speech == settings.behaviour_settings.vad_initial_since_speech
@pytest.mark.asyncio
async def test_streaming_loop_no_data_clears_buffer(audio_out_socket, vad_agent):
"""Test that if poll returns None, buffer is cleared if not empty."""
vad_agent.audio_buffer = np.array([1.0], dtype=np.float32)
vad_agent._ready.set()
vad_agent._paused.set()
vad_agent._running = True
class MockPoller:
async def poll(self, timeout_ms=None):
vad_agent._running = False # stop after one poll
return None
vad_agent.audio_in_poller = MockPoller()
await vad_agent._streaming_loop()
assert len(vad_agent.audio_buffer) == 0
assert vad_agent.i_since_speech == settings.behaviour_settings.vad_initial_since_speech
@pytest.mark.asyncio
async def test_vad_model_load_failure_stops_agent(vad_agent):
"""

View File

@@ -0,0 +1,30 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
from control_backend.agents.base import BaseAgent
class MyAgent(BaseAgent):
async def setup(self):
pass
async def handle_message(self, msg):
pass
def test_base_agent_logger_init():
# When defining a subclass, __init_subclass__ runs
# The BaseAgent in agents/base.py sets the logger
assert hasattr(MyAgent, "logger")
assert isinstance(MyAgent.logger, logging.Logger)
# The logger name depends on the package.
# Since this test file is running as a module, __package__ might be None or the test package.
# In 'src/control_backend/agents/base.py', it uses __package__ of base.py which is
# 'control_backend.agents'.
# So logger name should be control_backend.agents.MyAgent
assert MyAgent.logger.name == "control_backend.agents.MyAgent"

View File

@@ -1,12 +1,28 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from control_backend.agents.user_interrupt.user_interrupt_agent import UserInterruptAgent
from control_backend.core.agent_system import InternalMessage
from control_backend.core.config import settings
from control_backend.schemas.belief_message import BeliefMessage
from control_backend.schemas.program import (
ConditionalNorm,
Goal,
KeywordBelief,
Phase,
Plan,
Program,
Trigger,
)
from control_backend.schemas.ri_message import RIEndpoint
@@ -16,9 +32,18 @@ def agent():
agent.send = AsyncMock()
agent.logger = MagicMock()
agent.sub_socket = AsyncMock()
agent.pub_socket = AsyncMock()
return agent
@pytest.fixture(autouse=True)
def mock_experiment_logger():
with patch(
"control_backend.agents.user_interrupt.user_interrupt_agent.experiment_logger"
) as logger:
yield logger
@pytest.mark.asyncio
async def test_send_to_speech_agent(agent):
"""Verify speech command format."""
@@ -49,21 +74,18 @@ async def test_send_to_gesture_agent(agent):
@pytest.mark.asyncio
async def test_send_to_program_manager(agent):
async def test_send_to_bdi_belief(agent):
"""Verify belief update format."""
context_str = "2"
context_str = "some_goal"
await agent._send_to_program_manager(context_str)
await agent._send_to_bdi_belief(context_str, "goal")
agent.send.assert_awaited_once()
sent_msg: InternalMessage = agent.send.call_args.args[0]
assert agent.send.await_count == 1
sent_msg = agent.send.call_args.args[0]
assert sent_msg.to == settings.agent_settings.bdi_program_manager_name
assert sent_msg.thread == "belief_override_id"
body = json.loads(sent_msg.body)
assert body["belief"] == context_str
assert sent_msg.to == settings.agent_settings.bdi_core_name
assert sent_msg.thread == "beliefs"
assert "achieved_some_goal" in sent_msg.body
@pytest.mark.asyncio
@@ -77,6 +99,10 @@ async def test_receive_loop_routing_success(agent):
# Prepare JSON payloads as bytes
payload_speech = json.dumps({"type": "speech", "context": "Hello Speech"}).encode()
payload_gesture = json.dumps({"type": "gesture", "context": "Hello Gesture"}).encode()
# override calls _send_to_bdi (for trigger/norm) OR _send_to_bdi_belief (for goal).
# To test routing, we need to populate the maps
agent._goal_map["Hello Override"] = "some_goal_slug"
payload_override = json.dumps({"type": "override", "context": "Hello Override"}).encode()
agent.sub_socket.recv_multipart.side_effect = [
@@ -88,7 +114,7 @@ async def test_receive_loop_routing_success(agent):
agent._send_to_speech_agent = AsyncMock()
agent._send_to_gesture_agent = AsyncMock()
agent._send_to_program_manager = AsyncMock()
agent._send_to_bdi_belief = AsyncMock()
try:
await agent._receive_button_event()
@@ -103,12 +129,12 @@ async def test_receive_loop_routing_success(agent):
# Gesture
agent._send_to_gesture_agent.assert_awaited_once_with("Hello Gesture")
# Override
agent._send_to_program_manager.assert_awaited_once_with("Hello Override")
# Override (since we mapped it to a goal)
agent._send_to_bdi_belief.assert_awaited_once_with("some_goal_slug", "goal")
assert agent._send_to_speech_agent.await_count == 1
assert agent._send_to_gesture_agent.await_count == 1
assert agent._send_to_program_manager.await_count == 1
assert agent._send_to_bdi_belief.await_count == 1
@pytest.mark.asyncio
@@ -125,7 +151,6 @@ async def test_receive_loop_unknown_type(agent):
agent._send_to_speech_agent = AsyncMock()
agent._send_to_gesture_agent = AsyncMock()
agent._send_to_belief_collector = AsyncMock()
try:
await agent._receive_button_event()
@@ -137,10 +162,514 @@ async def test_receive_loop_unknown_type(agent):
# Ensure no handlers were called
agent._send_to_speech_agent.assert_not_called()
agent._send_to_gesture_agent.assert_not_called()
agent._send_to_belief_collector.assert_not_called()
agent.logger.warning.assert_called()
@pytest.mark.asyncio
async def test_create_mapping(agent):
# Create a program with a trigger, goal, and conditional norm
import uuid
trigger_id = uuid.uuid4()
goal_id = uuid.uuid4()
norm_id = uuid.uuid4()
cond = KeywordBelief(id=uuid.uuid4(), name="k1", keyword="key")
plan = Plan(id=uuid.uuid4(), name="p1", steps=[])
trigger = Trigger(id=trigger_id, name="my_trigger", condition=cond, plan=plan)
goal = Goal(id=goal_id, name="my_goal", description="desc", plan=plan)
cn = ConditionalNorm(id=norm_id, name="my_norm", norm="be polite", condition=cond)
phase = Phase(id=uuid.uuid4(), name="phase1", norms=[cn], goals=[goal], triggers=[trigger])
prog = Program(phases=[phase])
# Call create_mapping via handle_message
msg = InternalMessage(to="me", thread="new_program", body=prog.model_dump_json())
await agent.handle_message(msg)
# Check maps
assert str(trigger_id) in agent._trigger_map
assert agent._trigger_map[str(trigger_id)] == "trigger_my_trigger"
assert str(goal_id) in agent._goal_map
assert agent._goal_map[str(goal_id)] == "my_goal"
assert str(norm_id) in agent._cond_norm_map
assert agent._cond_norm_map[str(norm_id)] == "norm_be_polite"
@pytest.mark.asyncio
async def test_create_mapping_invalid_json(agent):
# Pass invalid json to handle_message thread "new_program"
msg = InternalMessage(to="me", thread="new_program", body="invalid json")
await agent.handle_message(msg)
# Should log error and maps should remain empty or cleared
agent.logger.error.assert_called()
@pytest.mark.asyncio
async def test_handle_message_trigger_start(agent):
# Setup reverse map manually
agent._trigger_reverse_map["trigger_slug"] = "ui_id_123"
msg = InternalMessage(to="me", thread="trigger_start", body="trigger_slug")
await agent.handle_message(msg)
agent.pub_socket.send_multipart.assert_awaited_once()
args = agent.pub_socket.send_multipart.call_args[0][0]
assert args[0] == b"experiment"
payload = json.loads(args[1])
assert payload["type"] == "trigger_update"
assert payload["id"] == "ui_id_123"
assert payload["achieved"] is True
@pytest.mark.asyncio
async def test_handle_message_trigger_end(agent):
agent._trigger_reverse_map["trigger_slug"] = "ui_id_123"
msg = InternalMessage(to="me", thread="trigger_end", body="trigger_slug")
await agent.handle_message(msg)
agent.pub_socket.send_multipart.assert_awaited_once()
payload = json.loads(agent.pub_socket.send_multipart.call_args[0][0][1])
assert payload["type"] == "trigger_update"
assert payload["achieved"] is False
@pytest.mark.asyncio
async def test_handle_message_transition_phase(agent):
msg = InternalMessage(to="me", thread="transition_phase", body="phase_id_123")
await agent.handle_message(msg)
agent.pub_socket.send_multipart.assert_awaited_once()
payload = json.loads(agent.pub_socket.send_multipart.call_args[0][0][1])
assert payload["type"] == "phase_update"
assert payload["id"] == "phase_id_123"
@pytest.mark.asyncio
async def test_handle_message_goal_start(agent):
agent._goal_reverse_map["goal_slug"] = "goal_id_123"
msg = InternalMessage(to="me", thread="goal_start", body="goal_slug")
await agent.handle_message(msg)
agent.pub_socket.send_multipart.assert_awaited_once()
payload = json.loads(agent.pub_socket.send_multipart.call_args[0][0][1])
assert payload["type"] == "goal_update"
assert payload["id"] == "goal_id_123"
assert payload["active"] is True
@pytest.mark.asyncio
async def test_handle_message_active_norms_update(agent):
agent._cond_norm_reverse_map["norm_active"] = "id_1"
agent._cond_norm_reverse_map["norm_inactive"] = "id_2"
# Body is like: "('norm_active', 'other')"
# The split logic handles quotes etc.
msg = InternalMessage(to="me", thread="active_norms_update", body="'norm_active', 'other'")
await agent.handle_message(msg)
agent.pub_socket.send_multipart.assert_awaited_once()
payload = json.loads(agent.pub_socket.send_multipart.call_args[0][0][1])
assert payload["type"] == "cond_norms_state_update"
norms = {n["id"]: n["active"] for n in payload["norms"]}
assert norms["id_1"] is True
assert norms["id_2"] is False
@pytest.mark.asyncio
async def test_send_experiment_control(agent):
# Test next_phase
await agent._send_experiment_control_to_bdi_core("next_phase")
agent.send.assert_awaited()
msg = agent.send.call_args[0][0]
assert msg.thread == "force_next_phase"
# Test reset_phase
await agent._send_experiment_control_to_bdi_core("reset_phase")
msg = agent.send.call_args[0][0]
assert msg.thread == "reset_current_phase"
# Test reset_experiment
await agent._send_experiment_control_to_bdi_core("reset_experiment")
msg = agent.send.call_args[0][0]
assert msg.thread == "reset_experiment"
@pytest.mark.asyncio
async def test_setup(agent):
"""Test the setup method initializes sockets correctly."""
with patch("control_backend.agents.user_interrupt.user_interrupt_agent.Context") as MockContext:
mock_ctx_instance = MagicMock()
MockContext.instance.return_value = mock_ctx_instance
mock_sub = MagicMock()
mock_pub = MagicMock()
mock_ctx_instance.socket.side_effect = [mock_sub, mock_pub]
# MOCK add_behavior so we don't rely on internal attributes
agent.add_behavior = MagicMock()
await agent.setup()
# Check sockets
mock_sub.connect.assert_called_with(settings.zmq_settings.internal_sub_address)
mock_pub.connect.assert_called_with(settings.zmq_settings.internal_pub_address)
# Verify add_behavior was called
agent.add_behavior.assert_called_once()
@pytest.mark.asyncio
async def test_receive_loop_json_error(agent):
"""Verify that malformed JSON is caught and logged without crashing the loop."""
agent.sub_socket.recv_multipart.side_effect = [
(b"topic", b"INVALID{JSON"),
asyncio.CancelledError,
]
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
agent.logger.error.assert_called_with("Received invalid JSON payload on topic %s", b"topic")
@pytest.mark.asyncio
async def test_receive_loop_override_trigger(agent):
"""Verify routing 'override' to a Trigger."""
agent._trigger_map["101"] = "trigger_slug"
payload = json.dumps({"type": "override", "context": "101"}).encode()
agent.sub_socket.recv_multipart.side_effect = [(b"topic", payload), asyncio.CancelledError]
agent._send_to_bdi = AsyncMock()
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
agent._send_to_bdi.assert_awaited_once_with("force_trigger", "trigger_slug")
@pytest.mark.asyncio
async def test_receive_loop_override_norm(agent):
"""Verify routing 'override' to a Conditional Norm."""
agent._cond_norm_map["202"] = "norm_slug"
payload = json.dumps({"type": "override", "context": "202"}).encode()
agent.sub_socket.recv_multipart.side_effect = [(b"topic", payload), asyncio.CancelledError]
agent._send_to_bdi_belief = AsyncMock()
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
agent._send_to_bdi_belief.assert_awaited_once_with("norm_slug", "cond_norm")
@pytest.mark.asyncio
async def test_receive_loop_override_missing(agent):
"""Verify warning log when an override ID is not found in any map."""
payload = json.dumps({"type": "override", "context": "999"}).encode()
agent.sub_socket.recv_multipart.side_effect = [(b"topic", payload), asyncio.CancelledError]
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
agent.logger.warning.assert_called_with("Could not determine which element to override.")
@pytest.mark.asyncio
async def test_receive_loop_unachieve_logic(agent):
"""Verify success and failure paths for override_unachieve."""
agent._cond_norm_map["202"] = "norm_slug"
success_payload = json.dumps({"type": "override_unachieve", "context": "202"}).encode()
fail_payload = json.dumps({"type": "override_unachieve", "context": "999"}).encode()
agent.sub_socket.recv_multipart.side_effect = [
(b"topic", success_payload),
(b"topic", fail_payload),
asyncio.CancelledError,
]
agent._send_to_bdi_belief = AsyncMock()
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
# Assert success call (True flag for unachieve)
agent._send_to_bdi_belief.assert_any_call("norm_slug", "cond_norm", True)
# Assert failure log
agent.logger.warning.assert_called_with(
"Could not determine which conditional norm to unachieve."
)
@pytest.mark.asyncio
async def test_receive_loop_pause_resume(agent):
"""Verify pause and resume toggle logic and logging."""
pause_payload = json.dumps({"type": "pause", "context": "true"}).encode()
resume_payload = json.dumps({"type": "pause", "context": ""}).encode()
agent.sub_socket.recv_multipart.side_effect = [
(b"topic", pause_payload),
(b"topic", resume_payload),
asyncio.CancelledError,
]
agent._send_pause_command = AsyncMock()
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
agent._send_pause_command.assert_any_call("true")
agent._send_pause_command.assert_any_call("")
agent.logger.info.assert_any_call("Sent pause command.")
agent.logger.info.assert_any_call("Sent resume command.")
@pytest.mark.asyncio
async def test_receive_loop_phase_control(agent):
"""Verify experiment flow control (next_phase)."""
payload = json.dumps({"type": "next_phase", "context": ""}).encode()
agent.sub_socket.recv_multipart.side_effect = [(b"topic", payload), asyncio.CancelledError]
agent._send_experiment_control_to_bdi_core = AsyncMock()
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
agent._send_experiment_control_to_bdi_core.assert_awaited_once_with("next_phase")
@pytest.mark.asyncio
async def test_handle_message_unknown_thread(agent):
"""Test handling of an unknown message thread (lines 213-214)."""
msg = InternalMessage(to="me", thread="unknown_thread", body="test")
await agent.handle_message(msg)
agent.logger.debug.assert_called_with(
"Received internal message on unhandled thread: unknown_thread"
)
@pytest.mark.asyncio
async def test_send_to_bdi_belief_edge_cases(agent):
"""
Covers:
- Unknown asl_type warning (lines 326-328)
- unachieve=True logic (lines 334-337)
"""
# 1. Unknown Type
await agent._send_to_bdi_belief("slug", "unknown_type")
agent.logger.warning.assert_called_with("Tried to send belief with unknown type")
agent.send.assert_not_called()
# Reset mock for part 2
agent.send.reset_mock()
# 2. Unachieve = True
await agent._send_to_bdi_belief("slug", "cond_norm", unachieve=True)
agent.send.assert_awaited()
sent_msg = agent.send.call_args.args[0]
# Verify it is a delete operation
body_obj = BeliefMessage.model_validate_json(sent_msg.body)
# Verify 'delete' has content
assert body_obj.delete is not None
assert len(body_obj.delete) == 1
assert body_obj.delete[0].name == "force_slug"
# Verify 'create' is empty (handling both None and [])
assert not body_obj.create
@pytest.mark.asyncio
async def test_send_experiment_control_unknown(agent):
"""Test sending an unknown experiment control type (lines 366-367)."""
await agent._send_experiment_control_to_bdi_core("invalid_command")
agent.logger.warning.assert_called_with(
"Received button press with unknown type '%s' (context: '%s').",
"unknown_thing",
"some_data",
"Received unknown experiment control type '%s' to send to BDI Core.", "invalid_command"
)
# Ensure it still sends an empty message (as per code logic, though thread is empty)
agent.send.assert_awaited()
msg = agent.send.call_args[0][0]
assert msg.thread == ""
@pytest.mark.asyncio
async def test_create_mapping_recursive_goals(agent):
"""Verify that nested subgoals are correctly registered in the mapping."""
import uuid
# 1. Setup IDs
parent_goal_id = uuid.uuid4()
child_goal_id = uuid.uuid4()
# 2. Create the child goal
child_goal = Goal(
id=child_goal_id,
name="child_goal",
description="I am a subgoal",
plan=Plan(id=uuid.uuid4(), name="p_child", steps=[]),
)
# 3. Create the parent goal and put the child goal inside its plan steps
parent_goal = Goal(
id=parent_goal_id,
name="parent_goal",
description="I am a parent",
plan=Plan(id=uuid.uuid4(), name="p_parent", steps=[child_goal]), # Nested here
)
# 4. Build the program
phase = Phase(
id=uuid.uuid4(),
name="phase1",
norms=[],
goals=[parent_goal], # Only the parent is top-level
triggers=[],
)
prog = Program(phases=[phase])
# 5. Execute mapping
msg = InternalMessage(to="me", thread="new_program", body=prog.model_dump_json())
await agent.handle_message(msg)
# 6. Assertions
# Check parent
assert str(parent_goal_id) in agent._goal_map
assert agent._goal_map[str(parent_goal_id)] == "parent_goal"
# Check child (This confirms the recursion worked)
assert str(child_goal_id) in agent._goal_map
assert agent._goal_map[str(child_goal_id)] == "child_goal"
assert agent._goal_reverse_map["child_goal"] == str(child_goal_id)
@pytest.mark.asyncio
async def test_receive_loop_advanced_scenarios(agent):
"""
Covers:
- JSONDecodeError (lines 86-88)
- Override: Trigger found (lines 108-109)
- Override: Norm found (lines 114-115)
- Override: Nothing found (line 134)
- Override Unachieve: Success & Fail (lines 136-145)
- Pause: Context true/false logs (lines 150-157)
- Next Phase (line 160)
"""
# 1. Setup Data Maps
agent._trigger_map["101"] = "trigger_slug"
agent._cond_norm_map["202"] = "norm_slug"
# 2. Define Payloads
# A. Invalid JSON
bad_json = b"INVALID{JSON"
# B. Override -> Trigger
override_trigger = json.dumps({"type": "override", "context": "101"}).encode()
# C. Override -> Norm
override_norm = json.dumps({"type": "override", "context": "202"}).encode()
# D. Override -> Unknown
override_fail = json.dumps({"type": "override", "context": "999"}).encode()
# E. Unachieve -> Success
unachieve_success = json.dumps({"type": "override_unachieve", "context": "202"}).encode()
# F. Unachieve -> Fail
unachieve_fail = json.dumps({"type": "override_unachieve", "context": "999"}).encode()
# G. Pause (True)
pause_true = json.dumps({"type": "pause", "context": "true"}).encode()
# H. Pause (False/Resume)
pause_false = json.dumps({"type": "pause", "context": ""}).encode()
# I. Next Phase
next_phase = json.dumps({"type": "next_phase", "context": ""}).encode()
# 3. Setup Socket
agent.sub_socket.recv_multipart.side_effect = [
(b"topic", bad_json),
(b"topic", override_trigger),
(b"topic", override_norm),
(b"topic", override_fail),
(b"topic", unachieve_success),
(b"topic", unachieve_fail),
(b"topic", pause_true),
(b"topic", pause_false),
(b"topic", next_phase),
asyncio.CancelledError, # End loop
]
# Mock internal helpers to verify calls
agent._send_to_bdi = AsyncMock()
agent._send_to_bdi_belief = AsyncMock()
agent._send_pause_command = AsyncMock()
agent._send_experiment_control_to_bdi_core = AsyncMock()
# 4. Run Loop
try:
await agent._receive_button_event()
except asyncio.CancelledError:
pass
# 5. Assertions
# JSON Error
agent.logger.error.assert_called_with("Received invalid JSON payload on topic %s", b"topic")
# Override Trigger
agent._send_to_bdi.assert_awaited_with("force_trigger", "trigger_slug")
# Override Norm
# We expect _send_to_bdi_belief to be called for the norm
# Note: The loop calls _send_to_bdi_belief(asl_cond_norm, "cond_norm")
agent._send_to_bdi_belief.assert_any_call("norm_slug", "cond_norm")
# Override Fail (Warning log)
agent.logger.warning.assert_any_call("Could not determine which element to override.")
# Unachieve Success
# Loop calls _send_to_bdi_belief(asl_cond_norm, "cond_norm", True)
agent._send_to_bdi_belief.assert_any_call("norm_slug", "cond_norm", True)
# Unachieve Fail
agent.logger.warning.assert_any_call("Could not determine which conditional norm to unachieve.")
# Pause Logic
agent._send_pause_command.assert_any_call("true")
agent.logger.info.assert_any_call("Sent pause command.")
# Resume Logic
agent._send_pause_command.assert_any_call("")
agent.logger.info.assert_any_call("Sent resume command.")
# Next Phase
agent._send_experiment_control_to_bdi_core.assert_awaited_with("next_phase")

View File

@@ -1,7 +1,13 @@
from unittest.mock import patch
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from unittest.mock import MagicMock, patch
import pytest
from fastapi import FastAPI
from fastapi import FastAPI, HTTPException
from fastapi.testclient import TestClient
from starlette.responses import StreamingResponse
@@ -61,3 +67,67 @@ async def test_log_stream_endpoint_lines(client):
# Optional: assert subscribe/connect were called
assert dummy_socket.subscribed # at least some log levels subscribed
assert dummy_socket.connected # connect was called
@patch("control_backend.api.v1.endpoints.logs.LOGGING_DIR")
def test_files_endpoint(LOGGING_DIR, client):
file_1, file_2 = MagicMock(), MagicMock()
file_1.name = "file_1"
file_2.name = "file_2"
LOGGING_DIR.glob.return_value = [file_1, file_2]
result = client.get("/api/logs/files")
assert result.status_code == 200
assert result.json() == ["file_1", "file_2"]
@patch("control_backend.api.v1.endpoints.logs.FileResponse")
@patch("control_backend.api.v1.endpoints.logs.LOGGING_DIR")
def test_log_file_endpoint_success(LOGGING_DIR, MockFileResponse, client):
mock_file_path = MagicMock()
mock_file_path.is_relative_to.return_value = True
mock_file_path.is_file.return_value = True
mock_file_path.name = "test.log"
LOGGING_DIR.__truediv__ = MagicMock(return_value=mock_file_path)
mock_file_path.resolve.return_value = mock_file_path
MockFileResponse.return_value = MagicMock()
result = client.get("/api/logs/files/test.log")
assert result.status_code == 200
MockFileResponse.assert_called_once_with(mock_file_path, filename="test.log")
@pytest.mark.asyncio
@patch("control_backend.api.v1.endpoints.logs.LOGGING_DIR")
async def test_log_file_endpoint_path_traversal(LOGGING_DIR):
from control_backend.api.v1.endpoints.logs import log_file
mock_file_path = MagicMock()
mock_file_path.is_relative_to.return_value = False
LOGGING_DIR.__truediv__ = MagicMock(return_value=mock_file_path)
mock_file_path.resolve.return_value = mock_file_path
with pytest.raises(HTTPException) as exc_info:
await log_file("../secret.txt")
assert exc_info.value.status_code == 400
assert exc_info.value.detail == "Invalid filename."
@patch("control_backend.api.v1.endpoints.logs.LOGGING_DIR")
def test_log_file_endpoint_file_not_found(LOGGING_DIR, client):
mock_file_path = MagicMock()
mock_file_path.is_relative_to.return_value = True
mock_file_path.is_file.return_value = False
LOGGING_DIR.__truediv__ = MagicMock(return_value=mock_file_path)
mock_file_path.resolve.return_value = mock_file_path
result = client.get("/api/logs/files/nonexistent.log")
assert result.status_code == 404
assert result.json()["detail"] == "File not found."

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import json
import pytest

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import json
import uuid
from unittest.mock import AsyncMock

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
# tests/test_robot_endpoints.py
import json
from unittest.mock import AsyncMock, MagicMock, patch

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from fastapi.routing import APIRoute
from control_backend.api.v1.router import api_router # <--- corrected import
@@ -11,6 +17,5 @@ def test_router_includes_expected_paths():
# Ensure at least one route under each prefix exists
assert any(p.startswith("/robot") for p in paths)
assert any(p.startswith("/message") for p in paths)
assert any(p.startswith("/sse") for p in paths)
assert any(p.startswith("/logs") for p in paths)
assert any(p.startswith("/program") for p in paths)

View File

@@ -1,24 +0,0 @@
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from control_backend.api.v1.endpoints import sse
@pytest.fixture
def app():
app = FastAPI()
app.include_router(sse.router)
return app
@pytest.fixture
def client(app):
return TestClient(app)
def test_sse_route_exists(client):
"""Minimal smoke test to ensure /sse route exists and responds."""
response = client.get("/sse")
# Since implementation is not done, we only assert it doesn't crash
assert response.status_code == 200

View File

@@ -0,0 +1,154 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from control_backend.api.v1.endpoints import user_interact
@pytest.fixture
def app():
app = FastAPI()
app.include_router(user_interact.router)
return app
@pytest.fixture
def client(app):
return TestClient(app)
@pytest.mark.asyncio
async def test_receive_button_event(client):
mock_pub_socket = AsyncMock()
client.app.state.endpoints_pub_socket = mock_pub_socket
payload = {"type": "speech", "context": "hello"}
response = client.post("/button_pressed", json=payload)
assert response.status_code == 202
assert response.json() == {"status": "Event received"}
mock_pub_socket.send_multipart.assert_awaited_once()
args = mock_pub_socket.send_multipart.call_args[0][0]
assert args[0] == b"button_pressed"
assert "speech" in args[1].decode()
@pytest.mark.asyncio
async def test_receive_button_event_invalid_payload(client):
mock_pub_socket = AsyncMock()
client.app.state.endpoints_pub_socket = mock_pub_socket
# Missing context
payload = {"type": "speech"}
response = client.post("/button_pressed", json=payload)
assert response.status_code == 422
mock_pub_socket.send_multipart.assert_not_called()
@pytest.mark.asyncio
async def test_experiment_stream_direct_call():
"""
Directly calling the endpoint function to test the streaming logic
without dealing with TestClient streaming limitations.
"""
mock_socket = AsyncMock()
# 1. recv data
# 2. recv timeout
# 3. disconnect (request.is_disconnected returns True)
mock_socket.recv_multipart.side_effect = [
(b"topic", b"message1"),
TimeoutError(),
(b"topic", b"message2"), # Should not be reached if disconnect checks work
]
mock_socket.close = MagicMock()
mock_socket.connect = MagicMock()
mock_socket.subscribe = MagicMock()
mock_context = MagicMock()
mock_context.socket.return_value = mock_socket
with patch(
"control_backend.api.v1.endpoints.user_interact.Context.instance", return_value=mock_context
):
mock_request = AsyncMock()
# is_disconnected sequence:
# 1. False (before first recv) -> reads message1
# 2. False (before second recv) -> triggers TimeoutError, continues
# 3. True (before third recv) -> break loop
mock_request.is_disconnected.side_effect = [False, False, True]
response = await user_interact.experiment_stream(mock_request)
lines = []
# Consume the generator
async for line in response.body_iterator:
lines.append(line)
assert "data: message1\n\n" in lines
assert len(lines) == 1
mock_socket.connect.assert_called()
mock_socket.subscribe.assert_called_with(b"experiment")
mock_socket.close.assert_called()
@pytest.mark.asyncio
async def test_status_stream_direct_call():
"""
Test the status stream, ensuring it handles messages and sends pings on timeout.
"""
mock_socket = AsyncMock()
# Define the sequence of events for the socket:
# 1. Successfully receive a message
# 2. Timeout (which should trigger the ': ping' yield)
# 3. Another message (which won't be reached because we'll simulate disconnect)
mock_socket.recv_multipart.side_effect = [
(b"topic", b"status_update"),
TimeoutError(),
(b"topic", b"ignored_msg"),
]
mock_socket.close = MagicMock()
mock_socket.connect = MagicMock()
mock_socket.subscribe = MagicMock()
mock_context = MagicMock()
mock_context.socket.return_value = mock_socket
# Mock the ZMQ Context to return our mock_socket
with patch(
"control_backend.api.v1.endpoints.user_interact.Context.instance", return_value=mock_context
):
mock_request = AsyncMock()
# is_disconnected sequence:
# 1. False -> Process "status_update"
# 2. False -> Process TimeoutError (yield ping)
# 3. True -> Break loop (client disconnected)
mock_request.is_disconnected.side_effect = [False, False, True]
# Call the status_stream function explicitly
response = await user_interact.status_stream(mock_request)
lines = []
async for line in response.body_iterator:
lines.append(line)
# Assertions
assert "data: status_update\n\n" in lines
assert ": ping\n\n" in lines # Verify lines 91-92 (ping logic)
mock_socket.connect.assert_called()
mock_socket.subscribe.assert_called_with(b"status")
mock_socket.close.assert_called()

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from unittest.mock import MagicMock, patch
import pytest
@@ -25,7 +31,6 @@ def mock_settings():
mock.zmq_settings.internal_sub_address = "tcp://localhost:5561"
mock.zmq_settings.ri_command_address = "tcp://localhost:0000"
mock.agent_settings.bdi_core_name = "bdi_core_agent"
mock.agent_settings.bdi_belief_collector_name = "belief_collector_agent"
mock.agent_settings.llm_name = "llm_agent"
mock.agent_settings.robot_speech_name = "robot_speech_agent"
mock.agent_settings.transcription_name = "transcription_agent"
@@ -33,6 +38,7 @@ def mock_settings():
mock.agent_settings.vad_name = "vad_agent"
mock.behaviour_settings.sleep_s = 0.01 # Speed up tests
mock.behaviour_settings.comm_setup_max_retries = 1
mock.behaviour_settings.agentspeak_file = "src/control_backend/agents/bdi/agentspeak.asl"
yield mock

View File

@@ -1,4 +1,10 @@
"""Test the base class logic, message passing and background task handling."""
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
--------------------------------------------------------------------------------
Test the base class logic, message passing and background task handling.
"""
import asyncio
import logging
@@ -99,12 +105,75 @@ async def test_send_to_local_agent(monkeypatch):
# Patch inbox.put
target.inbox.put = AsyncMock()
message = InternalMessage(to="receiver", sender="sender", body="hello")
message = InternalMessage(to=target.name, sender=sender.name, body="hello")
await sender.send(message)
target.inbox.put.assert_awaited_once_with(message)
sender.logger.debug.assert_called_once()
@pytest.mark.asyncio
async def test_send_to_zmq_agent(monkeypatch):
sender = DummyAgent("sender")
target = "remote_receiver"
# Fake logger
sender.logger = MagicMock()
# Fake zmq
sender._internal_pub_socket = AsyncMock()
message = InternalMessage(to=target, sender=sender.name, body="hello")
await sender.send(message)
zmq_calls = sender._internal_pub_socket.send_multipart.call_args[0][0]
assert zmq_calls[0] == f"internal/{target}".encode()
@pytest.mark.asyncio
async def test_send_to_multiple_local_agents(monkeypatch):
sender = DummyAgent("sender")
target1 = DummyAgent("receiver1")
target2 = DummyAgent("receiver2")
# Fake logger
sender.logger = MagicMock()
# Patch inbox.put
target1.inbox.put = AsyncMock()
target2.inbox.put = AsyncMock()
message = InternalMessage(to=[target1.name, target2.name], sender=sender.name, body="hello")
await sender.send(message)
target1.inbox.put.assert_awaited_once_with(message)
target2.inbox.put.assert_awaited_once_with(message)
@pytest.mark.asyncio
async def test_send_to_multiple_agents(monkeypatch):
sender = DummyAgent("sender")
target1 = DummyAgent("receiver1")
target2 = "remote_receiver"
# Fake logger
sender.logger = MagicMock()
# Fake zmq
sender._internal_pub_socket = AsyncMock()
# Patch inbox.put
target1.inbox.put = AsyncMock()
message = InternalMessage(to=[target1.name, target2], sender=sender.name, body="hello")
await sender.send(message)
target1.inbox.put.assert_awaited_once_with(message)
zmq_calls = sender._internal_pub_socket.send_multipart.call_args[0][0]
assert zmq_calls[0] == f"internal/{target2}".encode()
@pytest.mark.asyncio

View File

@@ -1,4 +1,10 @@
"""Test if settings load correctly and environment variables override defaults."""
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
--------------------------------------------------------------------------------
Test if settings load correctly and environment variables override defaults.
"""
from control_backend.core.config import Settings

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
from unittest.mock import mock_open, patch

View File

@@ -0,0 +1,51 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from unittest.mock import MagicMock, patch
import pytest
from control_backend.logging.dated_file_handler import DatedFileHandler
@patch("control_backend.logging.dated_file_handler.DatedFileHandler._open")
def test_reset(open_):
stream = MagicMock()
open_.return_value = stream
# A file should be opened when the logger is created
handler = DatedFileHandler(file_prefix="anything")
assert open_.call_count == 1
# Upon reset, the current file should be closed, and a new one should be opened
handler.do_rollover()
assert stream.close.call_count == 1
assert open_.call_count == 2
@patch("control_backend.logging.dated_file_handler.Path")
@patch("control_backend.logging.dated_file_handler.DatedFileHandler._open")
def test_creates_dir(open_, Path_):
stream = MagicMock()
open_.return_value = stream
test_path = MagicMock()
test_path.parent.is_dir.return_value = False
Path_.return_value = test_path
DatedFileHandler(file_prefix="anything")
# The directory should've been created
test_path.parent.mkdir.assert_called_once()
@patch("control_backend.logging.dated_file_handler.DatedFileHandler._open")
def test_invalid_constructor(_):
with pytest.raises(ValueError):
DatedFileHandler(file_prefix=None)
with pytest.raises(ValueError):
DatedFileHandler(file_prefix="")

View File

@@ -0,0 +1,224 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
import pytest
from control_backend.logging.optional_field_formatter import OptionalFieldFormatter
@pytest.fixture
def logger():
"""Create a fresh logger for each test."""
logger = logging.getLogger(f"test_{id(object())}")
logger.setLevel(logging.DEBUG)
logger.handlers = []
return logger
@pytest.fixture
def log_output(logger):
"""Capture log output and return a function to get it."""
class ListHandler(logging.Handler):
def __init__(self):
super().__init__()
self.records = []
def emit(self, record):
self.records.append(self.format(record))
handler = ListHandler()
logger.addHandler(handler)
def get_output():
return handler.records
return get_output
def test_optional_field_present(logger, log_output):
"""Optional field should appear when provided in extra."""
formatter = OptionalFieldFormatter("%(levelname)s - %(role?)s - %(message)s")
logger.handlers[0].setFormatter(formatter)
logger.info("test message", extra={"role": "user"})
assert log_output() == ["INFO - user - test message"]
def test_optional_field_missing_no_default(logger, log_output):
"""Missing optional field with no default should be None."""
formatter = OptionalFieldFormatter("%(levelname)s - %(role?)s - %(message)s")
logger.handlers[0].setFormatter(formatter)
logger.info("test message")
assert log_output() == ["INFO - None - test message"]
def test_optional_field_missing_with_default(logger, log_output):
"""Missing optional field should use provided default."""
formatter = OptionalFieldFormatter(
"%(levelname)s - %(role?)s - %(message)s", defaults={"role": "assistant"}
)
logger.handlers[0].setFormatter(formatter)
logger.info("test message")
assert log_output() == ["INFO - assistant - test message"]
def test_optional_field_overrides_default(logger, log_output):
"""Provided extra value should override default."""
formatter = OptionalFieldFormatter(
"%(levelname)s - %(role?)s - %(message)s", defaults={"role": "assistant"}
)
logger.handlers[0].setFormatter(formatter)
logger.info("test message", extra={"role": "user"})
assert log_output() == ["INFO - user - test message"]
def test_multiple_optional_fields(logger, log_output):
"""Multiple optional fields should work independently."""
formatter = OptionalFieldFormatter(
"%(levelname)s - %(role?)s - %(request_id?)s - %(message)s", defaults={"role": "assistant"}
)
logger.handlers[0].setFormatter(formatter)
logger.info("test", extra={"request_id": "123"})
assert log_output() == ["INFO - assistant - 123 - test"]
def test_mixed_optional_and_required_fields(logger, log_output):
"""Standard fields should work alongside optional fields."""
formatter = OptionalFieldFormatter("%(levelname)s %(name)s %(role?)s %(message)s")
logger.handlers[0].setFormatter(formatter)
logger.info("test", extra={"role": "user"})
output = log_output()[0]
assert "INFO" in output
assert "user" in output
assert "test" in output
def test_no_optional_fields(logger, log_output):
"""Formatter should work normally with no optional fields."""
formatter = OptionalFieldFormatter("%(levelname)s %(message)s")
logger.handlers[0].setFormatter(formatter)
logger.info("test message")
assert log_output() == ["INFO test message"]
def test_integer_format_specifier(logger, log_output):
"""Optional fields with %d specifier should work."""
formatter = OptionalFieldFormatter(
"%(levelname)s %(count?)d %(message)s", defaults={"count": 0}
)
logger.handlers[0].setFormatter(formatter)
logger.info("test", extra={"count": 42})
assert log_output() == ["INFO 42 test"]
def test_float_format_specifier(logger, log_output):
"""Optional fields with %f specifier should work."""
formatter = OptionalFieldFormatter(
"%(levelname)s %(duration?)f %(message)s", defaults={"duration": 0.0}
)
logger.handlers[0].setFormatter(formatter)
logger.info("test", extra={"duration": 1.5})
assert "1.5" in log_output()[0]
def test_empty_string_default(logger, log_output):
"""Empty string default should work."""
formatter = OptionalFieldFormatter("%(levelname)s %(role?)s %(message)s", defaults={"role": ""})
logger.handlers[0].setFormatter(formatter)
logger.info("test")
assert log_output() == ["INFO test"]
def test_none_format_string():
"""None format string should not raise."""
formatter = OptionalFieldFormatter(fmt=None)
assert formatter.optional_fields == set()
def test_optional_fields_parsed_correctly():
"""Check that optional fields are correctly identified."""
formatter = OptionalFieldFormatter("%(asctime)s %(role?)s %(level?)d %(name)s")
assert formatter.optional_fields == {("role", "s"), ("level", "d")}
def test_format_string_normalized():
"""Check that ? is removed from format string."""
formatter = OptionalFieldFormatter("%(role?)s %(message)s")
assert "?" not in formatter._style._fmt
assert "%(role)s" in formatter._style._fmt
def test_field_with_underscore(logger, log_output):
"""Field names with underscores should work."""
formatter = OptionalFieldFormatter("%(levelname)s %(user_id?)s %(message)s")
logger.handlers[0].setFormatter(formatter)
logger.info("test", extra={"user_id": "abc123"})
assert log_output() == ["INFO abc123 test"]
def test_field_with_numbers(logger, log_output):
"""Field names with numbers should work."""
formatter = OptionalFieldFormatter("%(levelname)s %(field2?)s %(message)s")
logger.handlers[0].setFormatter(formatter)
logger.info("test", extra={"field2": "value"})
assert log_output() == ["INFO value test"]
def test_multiple_log_calls(logger, log_output):
"""Formatter should work correctly across multiple log calls."""
formatter = OptionalFieldFormatter(
"%(levelname)s %(role?)s %(message)s", defaults={"role": "other"}
)
logger.handlers[0].setFormatter(formatter)
logger.info("first", extra={"role": "assistant"})
logger.info("second")
logger.info("third", extra={"role": "user"})
assert log_output() == [
"INFO assistant first",
"INFO other second",
"INFO user third",
]
def test_default_not_mutated(logger, log_output):
"""Original defaults dict should not be mutated."""
defaults = {"role": "other"}
formatter = OptionalFieldFormatter("%(levelname)s %(role?)s %(message)s", defaults=defaults)
logger.handlers[0].setFormatter(formatter)
logger.info("test")
assert defaults == {"role": "other"}

View File

@@ -0,0 +1,89 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
import pytest
from control_backend.logging import PartialFilter
@pytest.fixture
def logger():
"""Create a fresh logger for each test."""
logger = logging.getLogger(f"test_{id(object())}")
logger.setLevel(logging.DEBUG)
logger.handlers = []
return logger
@pytest.fixture
def log_output(logger):
"""Capture log output and return a function to get it."""
class ListHandler(logging.Handler):
def __init__(self):
super().__init__()
self.records = []
def emit(self, record):
self.records.append(self.format(record))
handler = ListHandler()
handler.addFilter(PartialFilter())
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)
return lambda: handler.records
def test_no_partial_attribute(logger, log_output):
"""Records without partial attribute should pass through."""
logger.info("normal message")
assert log_output() == ["normal message"]
def test_partial_true_filtered(logger, log_output):
"""Records with partial=True should be filtered out."""
logger.info("partial message", extra={"partial": True})
assert log_output() == []
def test_partial_false_passes(logger, log_output):
"""Records with partial=False should pass through."""
logger.info("complete message", extra={"partial": False})
assert log_output() == ["complete message"]
def test_partial_none_passes(logger, log_output):
"""Records with partial=None should pass through."""
logger.info("message", extra={"partial": None})
assert log_output() == ["message"]
def test_partial_truthy_value_passes(logger, log_output):
"""
Records with truthy but non-True partial should pass through, that is, only when it's exactly
``True`` should it pass.
"""
logger.info("message", extra={"partial": "yes"})
assert log_output() == ["message"]
def test_multiple_records_mixed(logger, log_output):
"""Filter should handle mixed records correctly."""
logger.info("first")
logger.info("second", extra={"partial": True})
logger.info("third", extra={"partial": False})
logger.info("fourth", extra={"partial": True})
logger.info("fifth")
assert log_output() == ["first", "third", "fifth"]

View File

@@ -1,3 +1,9 @@
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from control_backend.schemas.message import Message

Some files were not shown because too many files have changed in this diff Show More