100 Commits

Author SHA1 Message Date
06e3dad25d Merge branch 'fix/send-video' into 'main'
fix: send video

See merge request ics/sp/2025/n25b/pepperplus-ri!28
2026-01-30 19:19:03 +00:00
Storm
fe8bad1f8c Merge branch 'main' into fix/send-video 2026-01-30 17:28:13 +01:00
5bb5d8a0cc Merge branch 'chore/copyright-all-files' into 'main'
chore: add copyright to all source files

See merge request ics/sp/2025/n25b/pepperplus-ri!29
2026-01-30 11:47:30 +00:00
Pim Hutting
ea208175de chore: add copyright to all source files 2026-01-29 15:57:22 +01:00
Storm
8333f2fc2a chore: removed numpy import 2026-01-29 13:09:25 +01:00
Storm
24c7fa216f test: 100% coverage
ref: N25B-393
2026-01-29 12:28:34 +01:00
Storm
56becd84ac test: fixed video_sender tests
ref: N25B-393
2026-01-29 12:16:48 +01:00
Storm
4a2cace1cf chore: changed socket option to set HWM to 3 (max 3 packets in queue 2026-01-29 12:02:28 +01:00
ad58b16559 Merge branch 'dev' into 'main'
Merge dev with main

See merge request ics/sp/2025/n25b/pepperplus-ri!27
2026-01-28 10:54:22 +00:00
fb0d7850cc Merge branch 'main' into dev 2026-01-28 11:53:23 +01:00
Storm
891ebf5e3f chore: changed video sending to work without cv2 2026-01-27 17:58:06 +01:00
Pim Hutting
da97eb8a1a Merge branch 'feat/robot-speech-agent-force-speech' into 'dev'
feat: implemented forced speech and speech queue

See merge request ics/sp/2025/n25b/pepperplus-ri!23
2026-01-14 14:26:39 +00:00
Luijkx,S.O.H. (Storm)
e51cf8fe65 feat: implemented forced speech and speech queue 2026-01-14 14:26:38 +00:00
Twirre
1e77548622 Merge branch 'feat/ri-gestures' into 'dev'
feat: gestures to ri

See merge request ics/sp/2025/n25b/pepperplus-ri!21
2025-12-16 08:35:26 +00:00
JobvAlewijk
a8fe887c48 feat: gestures to ri 2025-12-16 08:35:26 +00:00
JobvAlewijk
df702f1e44 Merge branch 'feat/environment-variables' into 'dev'
Add environment variables

See merge request ics/sp/2025/n25b/pepperplus-ri!22
2025-12-13 13:43:52 +00:00
JobvAlewijk
a2cb2ae90a Merge branch 'dev' of ssh://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-ri into feat/environment-variables 2025-12-13 14:43:02 +01:00
Luijkx,S.O.H. (Storm)
017dbfaa28 Merge branch 'docs/extract-installation-instructions' into 'dev'
Improve installation instructions

See merge request ics/sp/2025/n25b/pepperplus-ri!20
2025-12-11 10:58:56 +00:00
Twirre
9ff1d9a4d3 Improve installation instructions 2025-12-11 10:58:56 +00:00
Twirre Meulenbelt
3a259c1170 feat: add environment variables and docs
ref: N25B-352
2025-12-10 13:28:13 +01:00
JobvAlewijk
c86eda497c Merge branch 'feat/ci-cd' into 'dev'
Introduce CI/CD with tests

See merge request ics/sp/2025/n25b/pepperplus-ri!19
2025-12-03 15:23:37 +00:00
Twirre Meulenbelt
94b92b3e4a feat: re-introduce git hooks
Now using the standardized method from the CB.

ref: N25B-367
2025-12-02 22:04:46 +01:00
Twirre Meulenbelt
f469e4ce36 fix: install in a .venv artifact
This artifact can be reused in different stages.

ref: N25B-367
2025-12-02 21:46:24 +01:00
Twirre Meulenbelt
28a556becd feat: introduce CI/CD with tests
Using a custom base image installed on the runner, the installation and tests should work (fast).

ref: N25B-367
2025-12-02 21:12:15 +01:00
Twirre
89c9f2ebea Merge branch 'test/video-sender' into 'dev'
test: added full video sender coverage tests

See merge request ics/sp/2025/n25b/pepperplus-ri!18
2025-11-24 20:41:08 +00:00
JobvAlewijk
96f328d56c test: added full video sender coverage tests 2025-11-24 20:41:08 +00:00
Twirre
4d634a3b4e Merge branch 'test/main-start' into 'dev'
test: added main tests

See merge request ics/sp/2025/n25b/pepperplus-ri!17
2025-11-24 20:37:59 +00:00
JobvAlewijk
e2a71ad6c2 test: added main tests 2025-11-24 20:37:59 +00:00
2fcd885a00 Merge branch 'test/state' into 'dev'
test: added tests for full state coverage

See merge request ics/sp/2025/n25b/pepperplus-ri!16
2025-11-24 20:24:19 +00:00
JobvAlewijk
336acac440 test: added tests for full state coverage 2025-11-24 20:24:19 +00:00
Twirre
f4fbc69c7f Merge branch 'test/reciever-base' into 'dev'
test: added not overridden reciever base test

See merge request ics/sp/2025/n25b/pepperplus-ri!15
2025-11-24 20:06:58 +00:00
JobvAlewijk
fbe8f59c38 test: added not overridden reciever base test 2025-11-24 20:06:58 +00:00
Twirre
e99d7e8557 Merge branch 'test/audio-sender' into 'dev'
test: added init failure test in audio sender

See merge request ics/sp/2025/n25b/pepperplus-ri!14
2025-11-24 20:05:10 +00:00
JobvAlewijk
2350f6eec7 test: added init failure test in audio sender 2025-11-24 20:05:10 +00:00
Twirre
2852b714f5 Merge branch 'test/qi-utils' into 'dev'
test: added qi_utils test

See merge request ics/sp/2025/n25b/pepperplus-ri!12
2025-11-24 20:02:28 +00:00
JobvAlewijk
7628e47478 test: added qi_utils test 2025-11-24 20:02:28 +00:00
Twirre
36f5fae45c Merge branch 'test/socket-base' into 'dev'
test: added socket base tests

See merge request ics/sp/2025/n25b/pepperplus-ri!13
2025-11-24 13:32:31 +00:00
JobvAlewijk
6ea870623b test: added socket base tests 2025-11-24 13:32:31 +00:00
8d6dd23acb Merge branch 'chore/add-documentation' into 'dev'
chore: add documentation RI

See merge request ics/sp/2025/n25b/pepperplus-ri!11
2025-11-22 19:14:51 +00:00
Twirre Meulenbelt
a53871360e docs: remove duplicate and double space
ref: N25B-298
2025-11-22 19:32:50 +01:00
Pim Hutting
c1e92feba7 Apply 1 suggestion(s) to 1 file(s)
Co-authored-by: Kasper Marinus <k.marinus@students.uu.nl>
2025-11-22 12:37:39 +00:00
Pim Hutting
6859451bf9 Apply 1 suggestion(s) to 1 file(s)
Co-authored-by: Twirre <s.a.meulenbelt@students.uu.nl>
2025-11-22 12:36:34 +00:00
Twirre Meulenbelt
64c6f0addb docs: make doc generator understand multi line
ref: N25B-298
2025-11-22 12:44:13 +01:00
Pim Hutting
c53307530b chore: applied all feedback
close: N25B-298
2025-11-22 11:45:32 +01:00
Pim Hutting
051f904576 chore: add documentation RI
Code functionality left unchanged, only added docs where missing

close: N25B-298
2025-11-21 16:35:40 +01:00
Twirre
1e3531ac6e Merge branch 'docs/gen_documentation' into 'dev'
docs: added auto-generation of documentation

See merge request ics/sp/2025/n25b/pepperplus-ri!10
2025-11-19 17:14:36 +00:00
Storm
cec29f6206 chore: updated .gitignore
ref: N25B-270
2025-11-19 18:10:18 +01:00
Storm
a0a8ad2689 docs: changed readme
ref: N25B-270
2025-11-19 17:59:37 +01:00
JobvAlewijk
1c9467d03a fix: conf includes correct path
ref: N25B-270
2025-11-19 17:57:24 +01:00
Storm
9dd39d2048 docs: added auto-generation of documentation
ref: N25B-270
2025-11-19 13:49:50 +01:00
Twirre
b05aa5e834 Merge branch 'refactor/config-file' into 'dev'
refactor: added config file and moved constants

See merge request ics/sp/2025/n25b/pepperplus-ri!9
2025-11-14 14:15:06 +00:00
Twirre Meulenbelt
c691e279cd style: two lines between top level declarations
ref: N25B-236
2025-11-14 15:13:48 +01:00
Pim Hutting
16b64e41c8 style: applied style suggestions
close: N25B-236
2025-11-14 14:12:14 +00:00
Twirre Meulenbelt
03519e2a16 test: fix microphone interactive test
This was created with the assumption that all devices were choosable, but now only ones with input channels are.

ref: N25B-119
2025-11-14 13:08:31 +01:00
Pim Hutting
643d7b919c fix: made all tests pass
before some tests failed because of a faulty edit
to microphone util

ref: N25B-236
2025-11-09 16:00:36 +01:00
Pim Hutting
4402b21a73 refactor: added config file and moved constants
- Moved hardcoded configuration constants to a dedicated config.py file.
- Created VideoConfig, AudioConfig, MainConfig, and Settings classes in config.py

ref: N25B-236
2025-11-09 15:43:22 +01:00
Pim Hutting
c037eb7ec2 Merge branch 'feat/stream-audio' into 'dev'
Implement audio streaming

See merge request ics/sp/2025/n25b/pepperplus-ri!8
2025-11-05 12:08:28 +00:00
Twirre Meulenbelt
8a095323ec docs: describe extra WSL installation step
ref: N25B-119
2025-11-02 16:35:15 +01:00
Twirre Meulenbelt
854a14bf0c docs: describe --microphone program parameter
ref: N25B-119
2025-11-02 16:16:43 +01:00
Twirre Meulenbelt
fab5127cac feat: add application parameter to choose a custom microphone
ref: N25B-119
2025-11-02 16:12:56 +01:00
Twirre Meulenbelt
5912ac606a docs: add installation instructions for the portaudio dependency
ref: N25B-119
2025-11-02 15:01:18 +01:00
Twirre Meulenbelt
9ea446275e fix: allow speaking text with Unicode characters
When speaking, the actuation receiver logs the message to speak. If the message includes Unicode characters, it will now no longer crash.

ref: N25B-119
2025-11-02 14:59:16 +01:00
Twirre Meulenbelt
a6a12a5886 fix: remove unused qi import
It had already been made so that the VideoSender does not depend on `qi`, but the import was not yet removed.

ref: N25B-119
2025-11-02 14:58:32 +01:00
Twirre Meulenbelt
230ab5d5cc test: add case for microphone failure
When the microphone fails, it will raise an IOError during the `read`. This is simulated with a new test.

ref: N25B-119
2025-10-22 15:38:30 +02:00
Twirre Meulenbelt
0499cd8a24 feat: send audio
AudioSender runs in a separate thread to send audio from the microphone.

ref: N25B-119
2025-10-22 15:10:27 +02:00
Twirre Meulenbelt
f8db719bfa test: unit test mock PyAudio, integration test use real
Make unit tests use a mock version of PyAudio, while making integration tests using the real version. If no real microphone is available, these integration tests are skipped.

ref: N25B-119
2025-10-22 13:27:35 +02:00
Twirre Meulenbelt
1e3e077029 fix: disallow selecting non-microphone audio device
Previously any audio device was allowed to be selected as microphone. Now, only ones with at least one input channel can be selected.

ref: N25B-119
2025-10-22 13:24:46 +02:00
Twirre Meulenbelt
0f60f67ab9 feat: add microphone selection utils
Providing two functions, one to choose the default microphone, the other to choose a microphone interactively. With tests.

ref: N25B-119
2025-10-22 11:44:51 +02:00
Pim Hutting
4da83a0a7e Merge branch 'feat/actuation-receiver' into 'dev'
Implement negotiation and actuation endpoints

See merge request ics/sp/2025/n25b/pepperplus-ri!5
2025-10-22 08:49:03 +00:00
Twirre Meulenbelt
9d728f78fe Merge remote-tracking branch 'origin/dev' into feat/actuation-receiver
# Conflicts:
#	README.md
2025-10-21 13:56:57 +02:00
Twirre Meulenbelt
5631a55697 test: convert to pytest
Instead of built-in `unittest`, now use `pytest`. Find versions that work, convert tests.

ref: N25B-168
2025-10-21 13:55:06 +02:00
2584433
5dce0e3438 Merge branch 'fix/githook-mac' into 'dev'
fix: fixed githooks

See merge request ics/sp/2025/n25b/pepperplus-ri!7
2025-10-17 14:27:58 +00:00
2584433
670d1f0a6a fix: fixed githooks 2025-10-17 14:27:58 +00:00
Twirre Meulenbelt
45be0366ba style: correct and clarify docs and comments
ref: N25B-168
2025-10-16 22:03:50 +02:00
Twirre Meulenbelt
4c3aa3a911 feat: adapt actuation receiver to state's qi_session
Makes actuation tests pass. In main, the timing of the socket no longer contains the time to receive and send data, but only the processing time of the message handler.

ref: N25B-168
2025-10-16 21:46:46 +02:00
Twirre Meulenbelt
56c804b7eb test: add unit tests for main and actuation receivers
Exhaustive test cases for both classes, with 100% coverage. Adds `mock` dependency. Tests for actuation receiver do not yet pass.

ref: N25B-168
2025-10-16 21:43:24 +02:00
Twirre Meulenbelt
55483808ff fix: use qi session from state in actuation receiver
ref: N25B-168
2025-10-16 18:09:01 +02:00
Twirre Meulenbelt
c10fbc7c90 fix: use different port, fix endpoint name matching
ref: N25B-168
2025-10-16 17:37:01 +02:00
Twirre Meulenbelt
23c3379bfb refactor: use new port negotiation style
As changed in the API document, this now uses the new port negotiation style.

ref: N25B-168
2025-10-16 17:22:04 +02:00
Twirre Meulenbelt
e12d88726d Merge remote-tracking branch 'origin/dev' into feat/actuation-receiver
# Conflicts:
#	src/robot_interface/endpoints/socket_base.py
2025-10-16 17:01:16 +02:00
Twirre
785756683e Merge branch 'feat/ri-receive-video' into 'dev'
Implemented receiving video in RI from robot

See merge request ics/sp/2025/n25b/pepperplus-ri!4
2025-10-16 14:41:02 +00:00
Luijkx,S.O.H. (Storm)
0b55d5c221 style: fixed docstrings
close: N25B-171
2025-10-16 14:06:31 +00:00
Twirre Meulenbelt
308a19bff2 fix: correct negotiate endpoint name
Was previously "negotiation/", but the API document described it as "negotiate/". It is now "negotiate/" in the implementation as well.

ref: N25B-168
2025-10-16 15:02:01 +02:00
Storm
0c5b47ae16 refactor: removed hardcoded IP and port and moved video functions from main to the VideoSender class
ref: N25B-171
2025-10-16 14:57:53 +02:00
Storm
a408fafc7c docs: minor type correction in documentation start_video_rcv and video_rcv_loop 2025-10-15 17:55:29 +02:00
Storm
e3663e1327 feat: implemented receiving video image from robot
The functionality is implemented in main.py in the functions start_video_rcv and video_rcv_loop.

close: N25B-171
2025-10-15 17:52:59 +02:00
Twirre Meulenbelt
df985a8cbc fix: log speech commands even when Pepper SDK is not connected
Previously, the `_handle_speech` function had an early return when no Pepper session was available, causing incoming messages not to get logged. Now messages are logged even when there is no session with the Pepper SDK.

ref: N25B-168
2025-10-15 14:58:31 +02:00
Twirre Meulenbelt
ff6abbfea1 feat: implement actuation receiver
The ActuationReceiver connects to the Pepper robot using the Qi library. The endpoint is automatically negotiated.

ref: N25B-168
2025-10-13 22:08:43 +02:00
Twirre Meulenbelt
c6916470e9 feat: implement negotiation
By implementing SocketBase and adding the socket to the state, the negotiation will automatically give the right endpoints.

ref: N25B-168
2025-10-13 22:06:27 +02:00
Luijkx,S.O.H. (Storm)
828871a2ad Merge branch 'feat/comm-standardization' into 'dev'
Implementation of standardized CB<->RI communication API

See merge request ics/sp/2025/n25b/pepperplus-ri!3
2025-10-09 16:01:30 +00:00
Twirre Meulenbelt
7cfa6b44e8 chore: add usage instructions
Describes how to run the main program.

ref: N25B-168
2025-10-09 17:36:25 +02:00
Twirre Meulenbelt
c95d4abd77 chore: re-add the installation instructions
These installation instructions come from the feat/cb2ri-communication branch which has been replaced by this branch.

ref: N25B-168
2025-10-09 17:28:03 +02:00
Twirre Meulenbelt
e9c6b918e0 refactor: rename EndpointBase to SocketBase
Because 'endpoint' is also used in the messages, the name 'socket' is more descriptive.

ref: N25B-168
2025-10-09 16:24:31 +02:00
Twirre Meulenbelt
23805812d5 feat: abstract base classes for endpoints
Introduces EndpointBase and ReceiverBase abstract base classes. Implements a ReceiverBase with the MainReceiver.

ref: N25B-168
2025-10-09 16:04:18 +02:00
Twirre Meulenbelt
c4530f0c3a feat: basic implementation of standardized CB2RI communication API
Based on the N25B-A-14 article, this is a stub implementation of the RI2CB communication API. It implements the ping endpoint and provides a stub for the negotiation endpoint.

ref: N25B-168
2025-10-09 13:54:34 +02:00
2584433
bc26c76437 Merge branch 'chore/correct-branch-name-regex' into 'dev'
Correct branch hook regex

See merge request ics/sp/2025/n25b/pepperplus-ri!2
2025-10-08 15:06:00 +00:00
Twirre Meulenbelt
99776480e8 chore: correct commit hook regex
Previously all branch names had to have two dashes. Now it can have one to six words.

ref: N25B-89
2025-10-08 16:25:51 +02:00
2584433
b7c6269435 Merge branch 'feat/git-automatic-hooks' into 'dev'
Added githooks

See merge request ics/sp/2025/n25b/pepperplus-ri!1
2025-10-07 14:55:04 +00:00
2584433
eb091968a6 Added githooks 2025-10-07 14:55:04 +00:00
aad2044b6e chore: add .gitignore 2025-09-27 17:58:12 +02:00
49 changed files with 4664 additions and 77 deletions

25
.env.example Normal file
View File

@@ -0,0 +1,25 @@
# Example .env file. To use, make a copy, call it ".env" (i.e. removing the ".example" suffix), then you edit values.
# To make a variable apply, uncomment it (remove the "#" in front of the line).
# First, some variables that are likely to be configured:
# The hostname or IP address of the Control Backend.
AGENT__CONTROL_BACKEND_HOST=localhost
# Variables that are unlikely to be configured, you can probably ignore these:
#AGENT__ACTUATION_RECEIVER_PORT=
#AGENT__MAIN_RECEIVER_PORT=
#AGENT__VIDEO_SENDER_PORT=
#AGENT__AUDIO_SENDER_PORT=
#VIDEO__CAMERA_INDEX=
#VIDEO__RESOLUTION=
#VIDEO__COLOR_SPACE=
#VIDEO__FPS=
#VIDEO__STREAM_NAME=
#VIDEO__IMAGE_BUFFER=
#AUDIO__SAMPLE_RATE=
#AUDIO__CHUNK_SIZE=
#AUDIO__CHANNELS=

77
.githooks/check-branch-name.sh Executable file
View File

@@ -0,0 +1,77 @@
#!/usr/bin/env bash
# This script checks if the current branch name follows the specified format.
# It's designed to be used as a 'pre-commit' git hook.
# Format: <type>/<short-description>
# Example: feat/add-user-login
# --- Configuration ---
# An array of allowed commit types
ALLOWED_TYPES=(feat fix refactor perf style test docs build chore revert)
# An array of branches to ignore
IGNORED_BRANCHES=(main dev demo)
# --- Colors for Output ---
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# --- Helper Functions ---
error_exit() {
echo -e "${RED}ERROR: $1${NC}" >&2
echo -e "${YELLOW}Branch name format is incorrect. Aborting commit.${NC}" >&2
exit 1
}
# --- Main Logic ---
# 1. Get the current branch name
BRANCH_NAME=$(git symbolic-ref --short HEAD)
# 2. Check if the current branch is in the ignored list
for ignored_branch in "${IGNORED_BRANCHES[@]}"; do
if [ "$BRANCH_NAME" == "$ignored_branch" ]; then
echo -e "${GREEN}Branch check skipped for default branch: $BRANCH_NAME${NC}"
exit 0
fi
done
# 3. Validate the overall structure: <type>/<description>
if ! [[ "$BRANCH_NAME" =~ ^[a-z]+/.+$ ]]; then
error_exit "Branch name must be in the format: <type>/<short-description>\nExample: feat/add-user-login"
fi
# 4. Extract the type and description
TYPE=$(echo "$BRANCH_NAME" | cut -d'/' -f1)
DESCRIPTION=$(echo "$BRANCH_NAME" | cut -d'/' -f2-)
# 5. Validate the <type>
type_valid=false
for allowed_type in "${ALLOWED_TYPES[@]}"; do
if [ "$TYPE" == "$allowed_type" ]; then
type_valid=true
break
fi
done
if [ "$type_valid" == false ]; then
error_exit "Invalid type '$TYPE'.\nAllowed types are: ${ALLOWED_TYPES[*]}"
fi
# 6. Validate the <short-description>
# Regex breakdown:
# ^[a-z0-9]+ - Starts with one or more lowercase letters/numbers (the first word).
# (-[a-z0-9]+){0,5} - Followed by a group of (dash + word) 0 to 5 times.
# $ - End of the string.
# This entire pattern enforces 1 to 6 words total, separated by dashes.
DESCRIPTION_REGEX="^[a-z0-9]+(-[a-z0-9]+){0,5}$"
if ! [[ "$DESCRIPTION" =~ $DESCRIPTION_REGEX ]]; then
error_exit "Invalid short description '$DESCRIPTION'.\nIt must be a maximum of 6 words, all lowercase, separated by dashes.\nExample: add-new-user-authentication-feature"
fi
# If all checks pass, exit successfully
echo -e "${GREEN}Branch name '$BRANCH_NAME' is valid.${NC}"
exit 0

135
.githooks/check-commit-msg.sh Executable file
View File

@@ -0,0 +1,135 @@
#!/usr/bin/env bash
# This script checks if a commit message follows the specified format.
# It's designed to be used as a 'commit-msg' git hook.
# Format:
# <type>: <short description>
#
# [optional]<body>
#
# [ref/close]: <issue identifier>
# --- Configuration ---
# An array of allowed commit types
ALLOWED_TYPES=(feat fix refactor perf style test docs build chore revert)
# --- Colors for Output ---
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# The first argument to the hook is the path to the file containing the commit message
COMMIT_MSG_FILE=$1
# --- Automated Commit Detection ---
# Read the first line (header) for initial checks
HEADER=$(head -n 1 "$COMMIT_MSG_FILE")
# Check for Merge commits (covers 'git merge' and PR merges from GitHub/GitLab)
# Examples: "Merge branch 'main' into ...", "Merge pull request #123 from ..."
MERGE_PATTERN="^Merge (remote-tracking )?(branch|pull request|tag) .*"
if [[ "$HEADER" =~ $MERGE_PATTERN ]]; then
echo -e "${GREEN}Merge commit detected by message content. Skipping validation.${NC}"
exit 0
fi
# Check for Revert commits
# Example: "Revert "feat: add new feature""
REVERT_PATTERN="^Revert \".*\""
if [[ "$HEADER" =~ $REVERT_PATTERN ]]; then
echo -e "${GREEN}Revert commit detected by message content. Skipping validation.${NC}"
exit 0
fi
# Check for Cherry-pick commits (this pattern appears at the end of the message)
# Example: "(cherry picked from commit deadbeef...)"
# We use grep -q to search the whole file quietly.
CHERRY_PICK_PATTERN="\(cherry picked from commit [a-f0-9]{7,40}\)"
if grep -qE "$CHERRY_PICK_PATTERN" "$COMMIT_MSG_FILE"; then
echo -e "${GREEN}Cherry-pick detected by message content. Skipping validation.${NC}"
exit 0
fi
# Check for Squash
# Example: "Squash commits ..."
SQUASH_PATTERN="^Squash .+"
if [[ "$HEADER" =~ $SQUASH_PATTERN ]]; then
echo -e "${GREEN}Squash commit detected by message content. Skipping validation.${NC}"
exit 0
fi
# --- Validation Functions ---
# Function to print an error message and exit
# Usage: error_exit "Your error message here"
error_exit() {
# >&2 redirects echo to stderr
echo -e "${RED}ERROR: $1${NC}" >&2
echo -e "${YELLOW}Commit message format is incorrect. Aborting commit.${NC}" >&2
exit 1
}
# --- Main Logic ---
# 1. Read the header (first line) of the commit message
HEADER=$(head -n 1 "$COMMIT_MSG_FILE")
# 2. Validate the header format: <type>: <description>
# Regex breakdown:
# ^(type1|type2|...) - Starts with one of the allowed types
# : - Followed by a literal colon
# \s - Followed by a single space
# .+ - Followed by one or more characters for the description
# $ - End of the line
TYPES_REGEX=$(
IFS="|"
echo "${ALLOWED_TYPES[*]}"
)
HEADER_REGEX="^($TYPES_REGEX): .+$"
if ! [[ "$HEADER" =~ $HEADER_REGEX ]]; then
error_exit "Invalid header format.\n\nHeader must be in the format: <type>: <short description>\nAllowed types: ${ALLOWED_TYPES[*]}\nExample: feat: add new user authentication feature"
fi
# Only validate footer if commit type is not chore
TYPE=$(echo "$HEADER" | cut -d':' -f1)
if [ "$TYPE" != "chore" ]; then
# 3. Validate the footer (last line) of the commit message
FOOTER=$(tail -n 1 "$COMMIT_MSG_FILE")
# Regex breakdown:
# ^(ref|close) - Starts with 'ref' or 'close'
# : - Followed by a literal colon
# \s - Followed by a single space
# N25B- - Followed by the literal string 'N25B-'
# [0-9]+ - Followed by one or more digits
# $ - End of the line
FOOTER_REGEX="^(ref|close): N25B-[0-9]+$"
if ! [[ "$FOOTER" =~ $FOOTER_REGEX ]]; then
error_exit "Invalid footer format.\n\nFooter must be in the format: [ref/close]: <issue identifier>\nExample: ref: N25B-123"
fi
fi
# 4. If the message has more than 2 lines, validate the separator
# A blank line must exist between the header and the body.
LINE_COUNT=$(wc -l <"$COMMIT_MSG_FILE" | xargs) # xargs trims whitespace
# We only care if there is a body. Header + Footer = 2 lines.
# Header + Blank Line + Body... + Footer > 2 lines.
if [ "$LINE_COUNT" -gt 2 ]; then
# Get the second line
SECOND_LINE=$(sed -n '2p' "$COMMIT_MSG_FILE")
# Check if the second line is NOT empty. If it's not, it's an error.
if [ -n "$SECOND_LINE" ]; then
error_exit "Missing blank line between header and body.\n\nThe second line of your commit message must be empty if a body is present."
fi
fi
# If all checks pass, exit with success
echo -e "${GREEN}Commit message is valid.${NC}"
exit 0

224
.gitignore vendored Normal file
View File

@@ -0,0 +1,224 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
# Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
# poetry.lock
# poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
# pdm.lock
# pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
# pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# Redis
*.rdb
*.aof
*.pid
# RabbitMQ
mnesia/
rabbitmq/
rabbitmq-data/
# ActiveMQ
activemq-data/
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
# Streamlit
.streamlit/secrets.toml
.DS_Store
# Docs
docs/*
!docs/conf.py
!docs/installation/
!docs/installation/**

42
.gitlab-ci.yml Normal file
View File

@@ -0,0 +1,42 @@
# ---------- GLOBAL SETUP ---------- #
workflow:
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
stages:
- install
- test
default:
image: qi-py-ri-base:latest
cache:
key: "${CI_COMMIT_REF_SLUG}"
paths:
- .venv/
policy: pull-push
# --------- INSTALLING --------- #
install:
stage: install
tags:
- install
script:
- python -m virtualenv .venv
- source .venv/bin/activate
- echo /qi/pynaoqi-python2.7-2.5.7.1-linux64/lib/python2.7/site-packages/ > .venv/lib/python2.7/site-packages/pynaoqi-python2.7.pth
- pip install -r requirements.txt
artifacts:
paths:
- .venv/
expire_in: 1h
# ---------- TESTING ---------- #
test:
stage: test
needs:
- install
tags:
- test
script:
- source .venv/bin/activate
- PYTHONPATH=src pytest test/

15
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,15 @@
repos:
- repo: local
hooks:
- id: check-commit-msg
name: Check commit message format
entry: .githooks/check-commit-msg.sh
language: script
stages: [commit-msg]
- id: check-branch-name
name: Check branch name format
entry: .githooks/check-branch-name.sh
language: script
stages: [commit]
always_run: true
pass_filenames: false

159
README.md
View File

@@ -1,93 +1,98 @@
# PepperPlus-RI # PepperPlus-RI
The robot interface is a high-level API for controlling the robot. It implements the API as designed: https://utrechtuniversity.youtrack.cloud/articles/N25B-A-14/RI-CB-Communication.
This is an implementation for the Pepper robot, using the Pepper SDK and Python 2.7 as required by the SDK.
## Getting started
To make it easy for you to get started with GitLab, here's a list of recommended next steps.
Already a pro? Just edit this README.md and make it your own. Want to make it easy? [Use the template at the bottom](#editing-this-readme)!
## Add your files
- [ ] [Create](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#create-a-file) or [upload](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#upload-a-file) files
- [ ] [Add files using the command line](https://docs.gitlab.com/topics/git/add_files/#add-files-to-a-git-repository) or push an existing Git repository with the following command:
```
cd existing_repo
git remote add origin https://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-ri.git
git branch -M main
git push -uf origin main
```
## Integrate with your tools
- [ ] [Set up project integrations](https://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-ri/-/settings/integrations)
## Collaborate with your team
- [ ] [Invite team members and collaborators](https://docs.gitlab.com/ee/user/project/members/)
- [ ] [Create a new merge request](https://docs.gitlab.com/ee/user/project/merge_requests/creating_merge_requests.html)
- [ ] [Automatically close issues from merge requests](https://docs.gitlab.com/ee/user/project/issues/managing_issues.html#closing-issues-automatically)
- [ ] [Enable merge request approvals](https://docs.gitlab.com/ee/user/project/merge_requests/approvals/)
- [ ] [Set auto-merge](https://docs.gitlab.com/user/project/merge_requests/auto_merge/)
## Test and Deploy
Use the built-in continuous integration in GitLab.
- [ ] [Get started with GitLab CI/CD](https://docs.gitlab.com/ee/ci/quick_start/)
- [ ] [Analyze your code for known vulnerabilities with Static Application Security Testing (SAST)](https://docs.gitlab.com/ee/user/application_security/sast/)
- [ ] [Deploy to Kubernetes, Amazon EC2, or Amazon ECS using Auto Deploy](https://docs.gitlab.com/ee/topics/autodevops/requirements.html)
- [ ] [Use pull-based deployments for improved Kubernetes management](https://docs.gitlab.com/ee/user/clusters/agent/)
- [ ] [Set up protected environments](https://docs.gitlab.com/ee/ci/environments/protected_environments.html)
***
# Editing this README
When you're ready to make this README your own, just edit this file and use the handy template below (or feel free to structure it however you want - this is just a starting point!). Thanks to [makeareadme.com](https://www.makeareadme.com/) for this template.
## Suggestions for a good README
Every project is different, so consider which of these sections apply to yours. The sections used in the template are suggestions for most open source projects. Also keep in mind that while a README can be too long and detailed, too long is better than too short. If you think your README is too long, consider utilizing another form of documentation rather than cutting out information.
## Name
Choose a self-explaining name for your project.
## Description
Let people know what your project can do specifically. Provide context and add a link to any reference visitors might be unfamiliar with. A list of Features or a Background subsection can also be added here. If there are alternatives to your project, this is a good place to list differentiating factors.
## Badges
On some READMEs, you may see small images that convey metadata, such as whether or not all the tests are passing for the project. You can use Shields to add some to your README. Many services also have instructions for adding a badge.
## Visuals
Depending on what you are making, it can be a good idea to include screenshots or even a video (you'll frequently see GIFs rather than actual videos). Tools like ttygif can help, but check out Asciinema for a more sophisticated method.
## Installation ## Installation
Within a particular ecosystem, there may be a common way of installing things, such as using Yarn, NuGet, or Homebrew. However, consider the possibility that whoever is reading your README is a novice and would like more guidance. Listing specific steps helps remove ambiguity and gets people to using your project as quickly as possible. If it only runs in a specific context like a particular programming language version or operating system or has dependencies that have to be installed manually, also add a Requirements subsection.
- [Linux](./docs/installation/linux.md)
- [macOS](./docs/installation/macos.md)
- [Windows](./docs/installation/windows.md)
### Git Hooks
To activate automatic linting, formatting, branch name checks and commit message checks, run (after installing requirements):
```bash
pre-commit install
pre-commit install --hook-type commit-msg
```
## Usage ## Usage
Use examples liberally, and show the expected output if you can. It's helpful to have inline the smallest example of usage that you can demonstrate, while providing links to more sophisticated examples if they are too long to reasonably include in the README.
## Support On Linux and macOS:
Tell people where they can go to for help. It can be any combination of an issue tracker, a chat room, an email address, etc.
## Roadmap ```shell
If you have ideas for releases in the future, it is a good idea to list them in the README. PYTHONPATH=src python -m robot_interface.main
```
## Contributing On Windows:
State if you are open to contributions and what your requirements are for accepting them.
For people who want to make changes to your project, it's helpful to have some documentation on how to get started. Perhaps there is a script that they should run or some environment variables that they need to set. Make these steps explicit. These instructions could also be useful to your future self. ```shell
$env:PYTHONPATH="src"; python -m robot_interface.main
```
You can also document commands to lint the code or run tests. These steps help to ensure high code quality and reduce the likelihood that the changes inadvertently break something. Having instructions for running tests is especially helpful if it requires external setup, such as starting a Selenium server for testing in a browser. ### Program Arguments
## Authors and acknowledgment If you want to connect to the actual robot (or simulator), pass the `--qi-url` argument.
Show your appreciation to those who have contributed to the project.
## License There's also a `--microphone` argument that can be used to choose a microphone to use. If not given, the program will try the default microphone. If you don't know the name of the microphone, pass the argument with any value, and it will list the names of available microphones.
For open source projects, say how it is licensed.
## Project status ### Environment Variables
If you have run out of energy or time for your project, put a note at the top of the README saying that development has slowed down or stopped completely. Someone may choose to fork your project or volunteer to step in as a maintainer or owner, allowing your project to keep going. You can also make an explicit request for maintainers.
You may use environment variables to change settings. Make a copy of the [`.env.example`](.env.example) file, name it `.env` and put it in the root directory. The file itself describes how to do the configuration.
## Testing
To run the unit tests, on Linux and macOS:
```shell
PYTHONPATH=src pytest test/
```
On Windows:
```shell
$env:PYTHONPATH="src"; pytest test/
```
### Coverage
For coverage, add `--cov=robot_interface` as an argument to `pytest`.
## Documentation
Generate documentation web pages using:
### Linux & macOS
```bash
PYTHONPATH=src sphinx-apidoc -F -o docs src/robot_interface
```
### Windows
```bash
$env:PYTHONPATH="src"; sphinx-apidoc -F -o docs src/control_backend
```
Optionally, in the `conf.py` file in the new `docs` folder, change preferences.
In the `docs` folder:
### Linux & macOS
```bash
make html
```
### Windows
```bash
.\make.bat html
```

184
docs/conf.py Normal file
View File

@@ -0,0 +1,184 @@
# -*- coding: utf-8 -*-
#
# Configuration file for the Sphinx documentation builder.
#
# This file does only contain a selection of the most common options. For a
# full list see the documentation:
# http://www.sphinx-doc.org/en/master/config
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath("../src"))
# -- Project information -----------------------------------------------------
project = u'robot_interface'
copyright = u'2025, Author'
author = u'Author'
# The short X.Y version
version = u''
# The full version, including alpha/beta/rc tags
release = u''
# -- General configuration ---------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.viewcode',
'sphinx.ext.todo',
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = 'en'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = None
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'sphinx_rtd_theme'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# Custom sidebar templates, must be a dictionary that maps document names
# to template names.
#
# The default sidebars (for documents that don't match any pattern) are
# defined by theme itself. Builtin themes are using these templates by
# default: ``['localtoc.html', 'relations.html', 'sourcelink.html',
# 'searchbox.html']``.
#
# html_sidebars = {}
# -- Options for HTMLHelp output ---------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = 'robot_interfacedoc'
# -- Options for LaTeX output ------------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'robot_interface.tex', u'robot\\_interface Documentation',
u'Author', 'manual'),
]
# -- Options for manual page output ------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'robot_interface', u'robot_interface Documentation',
[author], 1)
]
# -- Options for Texinfo output ----------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'robot_interface', u'robot_interface Documentation',
author, 'robot_interface', 'One line description of project.',
'Miscellaneous'),
]
# -- Options for Epub output -------------------------------------------------
# Bibliographic Dublin Core info.
epub_title = project
# The unique identifier of the text. This can be a ISBN number
# or the project homepage.
#
# epub_identifier = ''
# A unique identification for the text.
#
# epub_uid = ''
# A list of files that should not be packed into the epub file.
epub_exclude_files = ['search.html']
# -- Extension configuration -------------------------------------------------
# -- Options for todo extension ----------------------------------------------
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = True

View File

@@ -0,0 +1,75 @@
# Installation
Of the Pepper Robot Interface on Linux (or WSL).
Start off by installing [Pyenv](https://github.com/pyenv/pyenv?tab=readme-ov-file#installation) and walk through the steps outlined there (be sure to also add it to PATH). Also install the [Python build requirements](https://github.com/pyenv/pyenv/wiki#suggested-build-environment). Afterwards, install Python 2.7 and activate it for your current shell:
```bash
pyenv install 2.7
pyenv shell 2.7
```
You can check that this worked by typing
```bash
python -V
```
Which should return `Python 2.7.18`.
Next, `cd` into this repository and create (and activate) a virtual environment:
```bash
cd <path to project>/
python -m pip install virtualenv
python -m virtualenv .venv
source .venv/bin/activate
```
We depend on PortAudio for the `pyaudio` package, so install it with:
```bash
sudo apt install -y portaudio19-dev
```
On WSL, also install:
```bash
sudo apt install -y libasound2-plugins
```
Install the required packages with
```bash
pip install -r requirements.txt
```
Now we need to install the NaoQi SDK into our virtual environment, which we need to do manually. Begin by downloading the SDK:
```bash
wget https://community-static.aldebaran.com/resources/2.5.10/Python%20SDK/pynaoqi-python2.7-2.5.7.1-linux64.tar.gz
```
Next, move into the `site-packages` directory and extract the file you just downloaded:
```bash
cd .venv/lib/python2.7/site-packages/
tar xvfz <path to SDK>/pynaoqi-python2.7-2.5.7.1-linux64.tar.gz
rm <path to SDK>/pynaoqi-python2.7-2.5.7.1-linux64.tar.gz
```
Lastly, we need to inform our virtual environment where to find our newly installed package:
```bash
echo <path to project>/.venv/lib/python2.7/site-packages/pynaoqi-python2.7-2.5.7.1-linux64/lib/python2.7/site-packages/ > pynaoqi-python2.7.pth
```
That's it! Verify that it works with
```bash
python -c "import qi; print(qi)"
```
You should now be able to run this project.
See the README for how to run.

106
docs/installation/macos.md Normal file
View File

@@ -0,0 +1,106 @@
# Installation
Of the Pepper Robot Interface on macOS.
## Python 2.7
Install Python 2.7.18 from the [Python website](https://www.python.org/downloads/release/python-2718/).
Check that it worked by executing
```shell
python2 -V
```
Which should return Python 2.7.18.
## Virtual Environment
Next, cd into this repository and create (and activate) a virtual environment:
```shell
cd /path/to/project/
python2 -m pip install virtualenv
python2 -m virtualenv .venv
source .venv/bin/activate
```
We depend on PortAudio for the `pyaudio` package. If on Intel, run `brew install portaudio`. If on Apple Silicon, compile manually using the steps described in [the YouTrack article](https://utrechtuniversity.youtrack.cloud/articles/N25B-A-22/Install-PyAudio-for-Python-2-on-Apple-Silicon).
Then install the required Python packages with
```shell
pip install -r requirements.txt
```
## NaoQi SDK
We need to manually install the NaoQi SDK into our virtual environment. There are two options:
1. Install a newer version (2.8) which will make running easier, but compatibility is uncertain.
2. Install the version expected by the robot (2.5). This will complicate running slightly.
### Option 1
Download the SDK from [twirre.io](https://twirre.io/files/pynaoqi-python2.7-2.8.6.23-mac64-20191127_144231.tar.gz), or find one on the Aldebaran website, or an archived version on Web Archive.
Extract it to `/path/to/project/.venv/lib/python2.7/site-packages/`.
We need to inform our virtual environment where to find our newly installed package:
```bash
echo "/path/to/project/.venv/lib/python2.7/site-packages/pynaoqi-python2.7-2.8.6.23-mac64-20191127_144231/lib/python2.7/site-packages/" > /path/to/project/.venv/lib/python2.7/site-packages/pynaoqi-python2.7.pth
```
Now continue with [verifying](#verifying).
### Option 2
This method of installation requires setting the `DYLD_LIBRARY_PATH` environment variable before running. How will be explained.
Download the SDK from [twirre.io](https://twirre.io/files/pynaoqi-2.5.7.1-mac64-deps.tar.gz). This is a modified version of the one from Aldebaran, this one including required Choregraphe dependencies.
Extract it to `/path/to/project/.venv/lib/python2.7/site-packages/`.
We need to inform our virtual environment where to find our newly installed package:
```shell
echo "/path/to/project/.venv/lib/python2.7/site-packages/pynaoqi-python2.7-2.5.7.1-mac64/lib/python2.7/site-packages/" > /path/to/project/.venv/lib/python2.7/site-packages/pynaoqi-python2.7.pth
```
Now, anytime before running you need to set the `DYLD_LIBRARY_PATH` environment variable.
```shell
export DYLD_LIBRARY_PATH="/path/to/project/.venv/lib/python2.7/site-packages/pynaoqi-python2.7-2.5.7.1-mac64/choregraphe_lib:${DYLD_LIBRARY_PATH}"
```
You may want to simplify environment activation with a script `activate.sh` like:
```shell
#!/bin/zsh
export DYLD_LIBRARY_PATH="/path/to/project/.venv/lib/python2.7/site-packages/pynaoqi-python2.7-2.8.6.23-mac64-20191127_144231/choregraphe_lib:${DYLD_LIBRARY_PATH}"
source .venv/bin/activate
```
[Verify](#verifying) if it works.
## Verifying
Verify that the NaoQI SDK installation works with
```bash
python -c "import qi; print(qi)"
```
If so, you should now be able to run this project.
See the README for how to run.

View File

@@ -0,0 +1,44 @@
# Installation
Of the Pepper Robot Interface on Windows.
Install Python 2.7.18 from [the Python website](https://www.python.org/downloads/release/python-2718/), choose the x86-64 installer (at the bottom of the page).
To see if it worked:
```shell
py -2 -V
```
Which should return `Python 2.7.18`.
Next, `cd` into this repository and create (and activate) a virtual environment:
```bash
cd <path to project>/
py -2 -m pip install virtualenv
py -2 -m virtualenv .venv
.\.venv\Scripts\activate
```
Install the required packages with
```bash
pip install -r requirements.txt
```
Now we need to install the NaoQi SDK into our virtual environment, which we need to do manually. Download the SDK from [Aldebaran](https://community-static.aldebaran.com/resources/2.5.5/sdk-python/pynaoqi-python2.7-2.5.5.5-win32-vs2013.zip), [Web Archive](https://web.archive.org/web/20240120111043/https://community-static.aldebaran.com/resources/2.5.5/sdk-python/pynaoqi-python2.7-2.5.5.5-win32-vs2013.zip) or [twirre.io](https://twirre.io/files/pynaoqi-python2.7-2.8.6.23-win64-vs2015-20191127_152649.zip).
Extract to `.\.venv\Lib\site-packages`.
Create a file `.venv\Lib\site-packages\pynaoqi-python2.7.pth`, put the full path of `pynaoqi-python2.7-2.8.6.23-win64-vs2015-20191127_152649\lib\python2.7\Lib\site-packages` in there.
Test if it worked by running:
```bash
python -c "import qi; print(qi)"
```
You should now be able to run this project.
See the README for how to run.

9
requirements.txt Normal file
View File

@@ -0,0 +1,9 @@
pyzmq<16
pyaudio<=0.2.11
pytest<5
pytest-mock<3.0.0
pytest-cov<3.0.0
sphinx
sphinx_rtd_theme
pre-commit
python-dotenv

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals
from robot_interface.utils.get_config import get_config
class AgentSettings(object):
"""
Agent port configuration.
:ivar control_backend_host: Hostname of the control backend, defaults to "localhost".
:vartype control_backend_host: string
:ivar actuation_receiver_port: Port for receiving actuation commands, defaults to 5557.
:vartype actuation_receiver_port: int
:ivar main_receiver_port: Port for receiving main messages, defaults to 5555.
:vartype main_receiver_port: int
:ivar video_sender_port: Port used for sending video frames, defaults to 5556.
:vartype video_sender_port: int
:ivar audio_sender_port: Port used for sending audio data, defaults to 5558.
:vartype audio_sender_port: int
"""
def __init__(
self,
control_backend_host=None,
actuation_receiver_port=None,
main_receiver_port=None,
video_sender_port=None,
audio_sender_port=None,
):
self.control_backend_host = get_config(control_backend_host, "AGENT__CONTROL_BACKEND_HOST", "localhost")
self.actuation_receiver_port = get_config(actuation_receiver_port, "AGENT__ACTUATION_RECEIVER_PORT", 5557, int)
self.main_receiver_port = get_config(main_receiver_port, "AGENT__MAIN_RECEIVER_PORT", 5555, int)
self.video_sender_port = get_config(video_sender_port, "AGENT__VIDEO_SENDER_PORT", 5556, int)
self.audio_sender_port = get_config(audio_sender_port, "AGENT__AUDIO_SENDER_PORT", 5558, int)
class VideoConfig(object):
"""
Video configuration constants.
:ivar camera_index: Index of the camera used, defaults to 0.
:vartype camera_index: int
:ivar resolution: Video resolution mode, defaults to 2.
:vartype resolution: int
:ivar color_space: Color space identifier, defaults to 11.
:vartype color_space: int
:ivar fps: Frames per second of the video stream, defaults to 15.
:vartype fps: int
:ivar stream_name: Name of the video stream, defaults to "Pepper Video".
:vartype stream_name: str
:ivar image_buffer: Internal buffer size for video frames, defaults to 6.
:vartype image_buffer: int
"""
def __init__(
self,
camera_index=None,
resolution=None,
color_space=None,
fps=None,
stream_name=None,
image_buffer=None,
):
self.camera_index = get_config(camera_index, "VIDEO__CAMERA_INDEX", 0, int)
self.resolution = get_config(resolution, "VIDEO__RESOLUTION", 2, int)
self.color_space = get_config(color_space, "VIDEO__COLOR_SPACE", 13, int)
self.fps = get_config(fps, "VIDEO__FPS", 15, int)
self.stream_name = get_config(stream_name, "VIDEO__STREAM_NAME", "Pepper Video")
self.image_buffer = get_config(image_buffer, "VIDEO__IMAGE_BUFFER", 6, int)
class AudioConfig(object):
"""
Audio configuration constants.
:ivar sample_rate: Audio sampling rate in Hz, defaults to 16000.
:vartype sample_rate: int
:ivar chunk_size: Size of audio chunks to capture/process, defaults to 512.
:vartype chunk_size: int
:ivar channels: Number of audio channels, defaults to 1.
:vartype channels: int
"""
def __init__(self, sample_rate=None, chunk_size=None, channels=None):
self.sample_rate = get_config(sample_rate, "AUDIO__SAMPLE_RATE", 16000, int)
self.chunk_size = get_config(chunk_size, "AUDIO__CHUNK_SIZE", 512, int)
self.channels = get_config(channels, "AUDIO__CHANNELS", 1, int)
class MainConfig(object):
"""
Main system configuration.
:ivar poll_timeout_ms: Timeout for polling events, in milliseconds, defaults to 100.
:vartype poll_timeout_ms: int
:ivar max_handler_time_ms: Maximum allowed handler time, in milliseconds, defaults to 50.
:vartype max_handler_time_ms: int
"""
def __init__(self, poll_timeout_ms=None, max_handler_time_ms=None):
self.poll_timeout_ms = get_config(poll_timeout_ms, "MAIN__POLL_TIMEOUT_MS", 100, int)
self.max_handler_time_ms = get_config(max_handler_time_ms, "MAIN__MAX_HANDLER_TIME_MS", 50, int)
class Settings(object):
"""
Global settings container.
:ivar agent_settings: Agent-related port configuration.
:vartype agent_settings: AgentSettings
:ivar video_config: Video stream configuration.
:vartype video_config: VideoConfig
:ivar audio_config: Audio stream configuration.
:vartype audio_config: AudioConfig
:ivar main_config: Main system-level configuration.
:vartype main_config: MainConfig
"""
def __init__(self, agent_settings=None, video_config=None, audio_config=None, main_config=None):
self.agent_settings = agent_settings or AgentSettings()
self.video_config = video_config or VideoConfig()
self.audio_config = audio_config or AudioConfig()
self.main_config = main_config or MainConfig()
settings = Settings()

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -0,0 +1,175 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that we can log texts with Unicode characters
import logging
from threading import Thread
import Queue
import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
from robot_interface.state import state
from robot_interface.core.config import settings
from robot_interface.endpoints.gesture_settings import GestureTags
class ActuationReceiver(ReceiverBase):
"""
The actuation receiver endpoint, responsible for handling speech and gesture requests.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use.
:type port: int
:ivar _tts_service: The text-to-speech service object from the Qi session.
:vartype _tts_service: qi.Session | None
:ivar _animation_service: The animation/gesture service object from the Qi session.
:vartype _animation_service: qi.Session | None
"""
def __init__(self, zmq_context, port=settings.agent_settings.actuation_receiver_port):
super(ActuationReceiver, self).__init__("actuation")
self.create_socket(zmq_context, zmq.SUB, port)
self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options
self._tts_service = None
self._animation_service = None
self._message_queue = Queue.Queue()
self.message_thread = Thread(target=self._handle_messages)
self.message_thread.start()
def _handle_speech(self, message):
"""
Handle a speech actuation request.
:param message: The message to handle, must contain properties "endpoint" and "data".
:type message: dict
"""
text = message.get("data")
if not text:
logging.warn("Received message to speak, but it lacks data.")
return
if not isinstance(text, (str, unicode)):
logging.warn("Received message to speak but it is not a string.")
return
logging.debug("Received message to speak: {}".format(text))
if not state.qi_session: return
# If state has a qi_session, we know that we can import qi
import qi # Takes a while only the first time it's imported
if not self._tts_service:
self._tts_service = state.qi_session.service("ALTextToSpeech")
if message.get("is_priority"):
# Bypass queue and speak immediately
self.clear_queue()
self._message_queue.put(text)
logging.debug("Force speaking immediately: {}".format(text))
else:
self._message_queue.put(text)
def clear_queue(self):
"""
Safely drains all pending messages from the queue.
"""
logging.info("Message queue size: {}".format(self._message_queue.qsize()))
try:
while True:
# Remove items one by one without waiting
self._message_queue.get_nowait()
except Queue.Empty:
pass
logging.info("Message queue cleared.")
def _handle_gesture(self, message, is_single):
"""
Handle a gesture actuation request.
:param message: The gesture to do, must contain properties "endpoint" and "data".
:type message: dict
:param is_single: Whether it's a specific single gesture or a gesture tag.
:type is_single: bool
"""
gesture = message.get("data")
if not gesture:
logging.warn("Received gesture to do, but it lacks data.")
return
if not isinstance(gesture, (str, unicode)):
logging.warn("Received gesture to do but it is not a string.")
return
logging.debug("Received gesture to do: {}".format(gesture))
if is_single:
if gesture not in GestureTags.single_gestures:
logging.warn("Received single gesture to do, but it does not exist in settings")
return
else:
if gesture not in GestureTags.tags:
logging.warn("Received single tag to do, but it does not exist in settings")
return
if not state.qi_session: return
# If state has a qi_session, we know that we can import qi
import qi # Takes a while only the first time it's imported
if not self._animation_service:
self._animation_service = state.qi_session.service("ALAnimationPlayer")
# Play the gesture. Pepper comes with predefined animations like "Wave", "Greet", "Clap"
# You can also create custom animations using Choregraphe and upload them to the robot.
if is_single:
logging.debug("Playing single gesture: {}".format(gesture))
getattr(qi, "async")(self._animation_service.run, gesture)
else:
logging.debug("Playing tag gesture: {}".format(gesture))
getattr(qi, "async")(self._animation_service.runTag, gesture)
def handle_message(self, message):
"""
Handle an actuation/speech message with the receiver.
:param message: The message to handle, must contain properties "endpoint" and "data".
:type message: dict
"""
if message["endpoint"] == "actuate/speech":
self._handle_speech(message)
if message["endpoint"] == "actuate/gesture/tag":
self._handle_gesture(message, False)
if message["endpoint"] == "actuate/gesture/single":
self._handle_gesture(message, True)
def _handle_messages(self):
while not state.exit_event.is_set():
try:
text = self._message_queue.get(timeout=0.1)
state.is_speaking = True
self._tts_service.say(text)
except Queue.Empty:
state.is_speaking = False
except RuntimeError:
logging.error("Lost connection to Pepper. Please check if you're connected to the "
"local WiFi and restart this application.")
state.exit_event.set()
def endpoint_description(self):
"""
Extend the default endpoint description with gesture tags.
Returned during negotiate/ports so the CB knows available gestures.
"""
desc = super(ActuationReceiver, self).endpoint_description()
desc["gestures"] = GestureTags.tags
desc["single_gestures"] = GestureTags.single_gestures
return desc

View File

@@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that `logging` can use Unicode characters in names
import threading
import logging
import pyaudio
import zmq
from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state
from robot_interface.utils.microphone import choose_mic
from robot_interface.core.config import settings
logger = logging.getLogger(__name__)
class AudioSender(SocketBase):
"""
Audio sender endpoint, responsible for sending microphone audio data.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use.
:type port: int
:ivar thread: Thread used for sending audio.
:vartype thread: threading.Thread | None
:ivar audio: PyAudio instance.
:vartype audio: pyaudio.PyAudio | None
:ivar microphone: Selected microphone information.
:vartype microphone: dict | None
"""
def __init__(self, zmq_context, port=settings.agent_settings.audio_sender_port):
super(AudioSender, self).__init__(str("audio")) # Convert future's unicode_literal to str
self.create_socket(zmq_context, zmq.PUB, port)
self.thread = None
try:
self.audio = pyaudio.PyAudio()
self.microphone = choose_mic(self.audio)
except IOError as e:
logger.warning("PyAudio is not available.", exc_info=e)
self.audio = None
self.microphone = None
def start(self):
"""
Start sending audio in a different thread.
Will not start if no microphone is available.
"""
if not self.microphone:
logger.info("Not listening: no microphone available.")
return
logger.info("Listening with microphone \"{}\".".format(self.microphone["name"]))
self.thread = threading.Thread(target=self._stream)
self.thread.start()
def wait_until_done(self):
"""
Wait until the audio thread is done.
Will block until `state.exit_event` is set. If the thread is not running, does nothing.
"""
if not self.thread: return
self.thread.join()
self.thread = None
def _stream(self):
"""
Internal method to continuously read audio from the microphone and send it over the socket.
"""
audio_settings = settings.audio_config
chunk = audio_settings.chunk_size # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD
# Docs say this only raises an error if neither `input` nor `output` is True
stream = self.audio.open(
format=pyaudio.paFloat32,
channels=audio_settings.channels,
rate=audio_settings.sample_rate,
input=True,
input_device_index=self.microphone["index"],
frames_per_buffer=chunk,
)
try:
while not state.exit_event.is_set():
data = stream.read(chunk)
if (state.is_speaking): continue # Do not send audio while the robot is speaking
self.socket.send(data)
except IOError as e:
logger.error("Stopped listening: failed to get audio from microphone.", exc_info=e)
finally:
stream.stop_stream()
stream.close()

View File

@@ -0,0 +1,419 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
class GestureTags:
tags = ["above", "affirmative", "afford", "agitated", "all", "allright", "alright", "any",
"assuage", "assuage", "attemper", "back", "bashful", "beg", "beseech", "blank",
"body language", "bored", "bow", "but", "call", "calm", "choose", "choice", "cloud",
"cogitate", "cool", "crazy", "disappointed", "down", "earth", "empty", "embarrassed",
"enthusiastic", "entire", "estimate", "except", "exalted", "excited", "explain", "far",
"field", "floor", "forlorn", "friendly", "front", "frustrated", "gentle", "gift",
"give", "ground", "happy", "hello", "her", "here", "hey", "hi", "him", "hopeless",
"hysterical", "I", "implore", "indicate", "joyful", "me", "meditate", "modest",
"negative", "nervous", "no", "not know", "nothing", "offer", "ok", "once upon a time",
"oppose", "or", "pacify", "pick", "placate", "please", "present", "proffer", "quiet",
"reason", "refute", "reject", "rousing", "sad", "select", "shamefaced", "show",
"show sky", "sky", "soothe", "sun", "supplicate", "tablet", "tall", "them", "there",
"think", "timid", "top", "unless", "up", "upstairs", "void", "warm", "winner", "yeah",
"yes", "yoo-hoo", "you", "your", "zero", "zestful"]
single_gestures = [
"animations/Stand/BodyTalk/Listening/Listening_1",
"animations/Stand/BodyTalk/Listening/Listening_2",
"animations/Stand/BodyTalk/Listening/Listening_3",
"animations/Stand/BodyTalk/Listening/Listening_4",
"animations/Stand/BodyTalk/Listening/Listening_5",
"animations/Stand/BodyTalk/Listening/Listening_6",
"animations/Stand/BodyTalk/Listening/Listening_7",
"animations/Stand/BodyTalk/Speaking/BodyTalk_1",
"animations/Stand/BodyTalk/Speaking/BodyTalk_10",
"animations/Stand/BodyTalk/Speaking/BodyTalk_11",
"animations/Stand/BodyTalk/Speaking/BodyTalk_12",
"animations/Stand/BodyTalk/Speaking/BodyTalk_13",
"animations/Stand/BodyTalk/Speaking/BodyTalk_14",
"animations/Stand/BodyTalk/Speaking/BodyTalk_15",
"animations/Stand/BodyTalk/Speaking/BodyTalk_16",
"animations/Stand/BodyTalk/Speaking/BodyTalk_2",
"animations/Stand/BodyTalk/Speaking/BodyTalk_3",
"animations/Stand/BodyTalk/Speaking/BodyTalk_4",
"animations/Stand/BodyTalk/Speaking/BodyTalk_5",
"animations/Stand/BodyTalk/Speaking/BodyTalk_6",
"animations/Stand/BodyTalk/Speaking/BodyTalk_7",
"animations/Stand/BodyTalk/Speaking/BodyTalk_8",
"animations/Stand/BodyTalk/Speaking/BodyTalk_9",
"animations/Stand/BodyTalk/Thinking/Remember_1",
"animations/Stand/BodyTalk/Thinking/Remember_2",
"animations/Stand/BodyTalk/Thinking/Remember_3",
"animations/Stand/BodyTalk/Thinking/ThinkingLoop_1",
"animations/Stand/BodyTalk/Thinking/ThinkingLoop_2",
"animations/Stand/Emotions/Negative/Angry_1",
"animations/Stand/Emotions/Negative/Angry_2",
"animations/Stand/Emotions/Negative/Angry_3",
"animations/Stand/Emotions/Negative/Angry_4",
"animations/Stand/Emotions/Negative/Anxious_1",
"animations/Stand/Emotions/Negative/Bored_1",
"animations/Stand/Emotions/Negative/Bored_2",
"animations/Stand/Emotions/Negative/Disappointed_1",
"animations/Stand/Emotions/Negative/Exhausted_1",
"animations/Stand/Emotions/Negative/Exhausted_2",
"animations/Stand/Emotions/Negative/Fear_1",
"animations/Stand/Emotions/Negative/Fear_2",
"animations/Stand/Emotions/Negative/Fearful_1",
"animations/Stand/Emotions/Negative/Frustrated_1",
"animations/Stand/Emotions/Negative/Humiliated_1",
"animations/Stand/Emotions/Negative/Hurt_1",
"animations/Stand/Emotions/Negative/Hurt_2",
"animations/Stand/Emotions/Negative/Late_1",
"animations/Stand/Emotions/Negative/Sad_1",
"animations/Stand/Emotions/Negative/Sad_2",
"animations/Stand/Emotions/Negative/Shocked_1",
"animations/Stand/Emotions/Negative/Sorry_1",
"animations/Stand/Emotions/Negative/Surprise_1",
"animations/Stand/Emotions/Negative/Surprise_2",
"animations/Stand/Emotions/Negative/Surprise_3",
"animations/Stand/Emotions/Neutral/Alienated_1",
"animations/Stand/Emotions/Neutral/AskForAttention_1",
"animations/Stand/Emotions/Neutral/AskForAttention_2",
"animations/Stand/Emotions/Neutral/AskForAttention_3",
"animations/Stand/Emotions/Neutral/Cautious_1",
"animations/Stand/Emotions/Neutral/Confused_1",
"animations/Stand/Emotions/Neutral/Determined_1",
"animations/Stand/Emotions/Neutral/Embarrassed_1",
"animations/Stand/Emotions/Neutral/Hesitation_1",
"animations/Stand/Emotions/Neutral/Innocent_1",
"animations/Stand/Emotions/Neutral/Lonely_1",
"animations/Stand/Emotions/Neutral/Mischievous_1",
"animations/Stand/Emotions/Neutral/Puzzled_1",
"animations/Stand/Emotions/Neutral/Sneeze",
"animations/Stand/Emotions/Neutral/Stubborn_1",
"animations/Stand/Emotions/Neutral/Suspicious_1",
"animations/Stand/Emotions/Positive/Amused_1",
"animations/Stand/Emotions/Positive/Confident_1",
"animations/Stand/Emotions/Positive/Ecstatic_1",
"animations/Stand/Emotions/Positive/Enthusiastic_1",
"animations/Stand/Emotions/Positive/Excited_1",
"animations/Stand/Emotions/Positive/Excited_2",
"animations/Stand/Emotions/Positive/Excited_3",
"animations/Stand/Emotions/Positive/Happy_1",
"animations/Stand/Emotions/Positive/Happy_2",
"animations/Stand/Emotions/Positive/Happy_3",
"animations/Stand/Emotions/Positive/Happy_4",
"animations/Stand/Emotions/Positive/Hungry_1",
"animations/Stand/Emotions/Positive/Hysterical_1",
"animations/Stand/Emotions/Positive/Interested_1",
"animations/Stand/Emotions/Positive/Interested_2",
"animations/Stand/Emotions/Positive/Laugh_1",
"animations/Stand/Emotions/Positive/Laugh_2",
"animations/Stand/Emotions/Positive/Laugh_3",
"animations/Stand/Emotions/Positive/Mocker_1",
"animations/Stand/Emotions/Positive/Optimistic_1",
"animations/Stand/Emotions/Positive/Peaceful_1",
"animations/Stand/Emotions/Positive/Proud_1",
"animations/Stand/Emotions/Positive/Proud_2",
"animations/Stand/Emotions/Positive/Proud_3",
"animations/Stand/Emotions/Positive/Relieved_1",
"animations/Stand/Emotions/Positive/Shy_1",
"animations/Stand/Emotions/Positive/Shy_2",
"animations/Stand/Emotions/Positive/Sure_1",
"animations/Stand/Emotions/Positive/Winner_1",
"animations/Stand/Emotions/Positive/Winner_2",
"animations/Stand/Gestures/Angry_1",
"animations/Stand/Gestures/Angry_2",
"animations/Stand/Gestures/Angry_3",
"animations/Stand/Gestures/BowShort_1",
"animations/Stand/Gestures/BowShort_2",
"animations/Stand/Gestures/BowShort_3",
"animations/Stand/Gestures/But_1",
"animations/Stand/Gestures/CalmDown_1",
"animations/Stand/Gestures/CalmDown_2",
"animations/Stand/Gestures/CalmDown_3",
"animations/Stand/Gestures/CalmDown_4",
"animations/Stand/Gestures/CalmDown_5",
"animations/Stand/Gestures/CalmDown_6",
"animations/Stand/Gestures/Choice_1",
"animations/Stand/Gestures/ComeOn_1",
"animations/Stand/Gestures/Confused_1",
"animations/Stand/Gestures/Confused_2",
"animations/Stand/Gestures/CountFive_1",
"animations/Stand/Gestures/CountFour_1",
"animations/Stand/Gestures/CountMore_1",
"animations/Stand/Gestures/CountOne_1",
"animations/Stand/Gestures/CountThree_1",
"animations/Stand/Gestures/CountTwo_1",
"animations/Stand/Gestures/Desperate_1",
"animations/Stand/Gestures/Desperate_2",
"animations/Stand/Gestures/Desperate_3",
"animations/Stand/Gestures/Desperate_4",
"animations/Stand/Gestures/Desperate_5",
"animations/Stand/Gestures/DontUnderstand_1",
"animations/Stand/Gestures/Enthusiastic_3",
"animations/Stand/Gestures/Enthusiastic_4",
"animations/Stand/Gestures/Enthusiastic_5",
"animations/Stand/Gestures/Everything_1",
"animations/Stand/Gestures/Everything_2",
"animations/Stand/Gestures/Everything_3",
"animations/Stand/Gestures/Everything_4",
"animations/Stand/Gestures/Everything_6",
"animations/Stand/Gestures/Excited_1",
"animations/Stand/Gestures/Explain_1",
"animations/Stand/Gestures/Explain_10",
"animations/Stand/Gestures/Explain_11",
"animations/Stand/Gestures/Explain_2",
"animations/Stand/Gestures/Explain_3",
"animations/Stand/Gestures/Explain_4",
"animations/Stand/Gestures/Explain_5",
"animations/Stand/Gestures/Explain_6",
"animations/Stand/Gestures/Explain_7",
"animations/Stand/Gestures/Explain_8",
"animations/Stand/Gestures/Far_1",
"animations/Stand/Gestures/Far_2",
"animations/Stand/Gestures/Far_3",
"animations/Stand/Gestures/Follow_1",
"animations/Stand/Gestures/Give_1",
"animations/Stand/Gestures/Give_2",
"animations/Stand/Gestures/Give_3",
"animations/Stand/Gestures/Give_4",
"animations/Stand/Gestures/Give_5",
"animations/Stand/Gestures/Give_6",
"animations/Stand/Gestures/Great_1",
"animations/Stand/Gestures/HeSays_1",
"animations/Stand/Gestures/HeSays_2",
"animations/Stand/Gestures/HeSays_3",
"animations/Stand/Gestures/Hey_1",
"animations/Stand/Gestures/Hey_10",
"animations/Stand/Gestures/Hey_2",
"animations/Stand/Gestures/Hey_3",
"animations/Stand/Gestures/Hey_4",
"animations/Stand/Gestures/Hey_6",
"animations/Stand/Gestures/Hey_7",
"animations/Stand/Gestures/Hey_8",
"animations/Stand/Gestures/Hey_9",
"animations/Stand/Gestures/Hide_1",
"animations/Stand/Gestures/Hot_1",
"animations/Stand/Gestures/Hot_2",
"animations/Stand/Gestures/IDontKnow_1",
"animations/Stand/Gestures/IDontKnow_2",
"animations/Stand/Gestures/IDontKnow_3",
"animations/Stand/Gestures/IDontKnow_4",
"animations/Stand/Gestures/IDontKnow_5",
"animations/Stand/Gestures/IDontKnow_6",
"animations/Stand/Gestures/Joy_1",
"animations/Stand/Gestures/Kisses_1",
"animations/Stand/Gestures/Look_1",
"animations/Stand/Gestures/Look_2",
"animations/Stand/Gestures/Maybe_1",
"animations/Stand/Gestures/Me_1",
"animations/Stand/Gestures/Me_2",
"animations/Stand/Gestures/Me_4",
"animations/Stand/Gestures/Me_7",
"animations/Stand/Gestures/Me_8",
"animations/Stand/Gestures/Mime_1",
"animations/Stand/Gestures/Mime_2",
"animations/Stand/Gestures/Next_1",
"animations/Stand/Gestures/No_1",
"animations/Stand/Gestures/No_2",
"animations/Stand/Gestures/No_3",
"animations/Stand/Gestures/No_4",
"animations/Stand/Gestures/No_5",
"animations/Stand/Gestures/No_6",
"animations/Stand/Gestures/No_7",
"animations/Stand/Gestures/No_8",
"animations/Stand/Gestures/No_9",
"animations/Stand/Gestures/Nothing_1",
"animations/Stand/Gestures/Nothing_2",
"animations/Stand/Gestures/OnTheEvening_1",
"animations/Stand/Gestures/OnTheEvening_2",
"animations/Stand/Gestures/OnTheEvening_3",
"animations/Stand/Gestures/OnTheEvening_4",
"animations/Stand/Gestures/OnTheEvening_5",
"animations/Stand/Gestures/Please_1",
"animations/Stand/Gestures/Please_2",
"animations/Stand/Gestures/Please_3",
"animations/Stand/Gestures/Reject_1",
"animations/Stand/Gestures/Reject_2",
"animations/Stand/Gestures/Reject_3",
"animations/Stand/Gestures/Reject_4",
"animations/Stand/Gestures/Reject_5",
"animations/Stand/Gestures/Reject_6",
"animations/Stand/Gestures/Salute_1",
"animations/Stand/Gestures/Salute_2",
"animations/Stand/Gestures/Salute_3",
"animations/Stand/Gestures/ShowFloor_1",
"animations/Stand/Gestures/ShowFloor_2",
"animations/Stand/Gestures/ShowFloor_3",
"animations/Stand/Gestures/ShowFloor_4",
"animations/Stand/Gestures/ShowFloor_5",
"animations/Stand/Gestures/ShowSky_1",
"animations/Stand/Gestures/ShowSky_10",
"animations/Stand/Gestures/ShowSky_11",
"animations/Stand/Gestures/ShowSky_12",
"animations/Stand/Gestures/ShowSky_2",
"animations/Stand/Gestures/ShowSky_3",
"animations/Stand/Gestures/ShowSky_4",
"animations/Stand/Gestures/ShowSky_5",
"animations/Stand/Gestures/ShowSky_6",
"animations/Stand/Gestures/ShowSky_7",
"animations/Stand/Gestures/ShowSky_8",
"animations/Stand/Gestures/ShowSky_9",
"animations/Stand/Gestures/ShowTablet_1",
"animations/Stand/Gestures/ShowTablet_2",
"animations/Stand/Gestures/ShowTablet_3",
"animations/Stand/Gestures/Shy_1",
"animations/Stand/Gestures/Stretch_1",
"animations/Stand/Gestures/Stretch_2",
"animations/Stand/Gestures/Surprised_1",
"animations/Stand/Gestures/TakePlace_1",
"animations/Stand/Gestures/TakePlace_2",
"animations/Stand/Gestures/Take_1",
"animations/Stand/Gestures/Thinking_1",
"animations/Stand/Gestures/Thinking_2",
"animations/Stand/Gestures/Thinking_3",
"animations/Stand/Gestures/Thinking_4",
"animations/Stand/Gestures/Thinking_5",
"animations/Stand/Gestures/Thinking_6",
"animations/Stand/Gestures/Thinking_7",
"animations/Stand/Gestures/Thinking_8",
"animations/Stand/Gestures/This_1",
"animations/Stand/Gestures/This_10",
"animations/Stand/Gestures/This_11",
"animations/Stand/Gestures/This_12",
"animations/Stand/Gestures/This_13",
"animations/Stand/Gestures/This_14",
"animations/Stand/Gestures/This_15",
"animations/Stand/Gestures/This_2",
"animations/Stand/Gestures/This_3",
"animations/Stand/Gestures/This_4",
"animations/Stand/Gestures/This_5",
"animations/Stand/Gestures/This_6",
"animations/Stand/Gestures/This_7",
"animations/Stand/Gestures/This_8",
"animations/Stand/Gestures/This_9",
"animations/Stand/Gestures/WhatSThis_1",
"animations/Stand/Gestures/WhatSThis_10",
"animations/Stand/Gestures/WhatSThis_11",
"animations/Stand/Gestures/WhatSThis_12",
"animations/Stand/Gestures/WhatSThis_13",
"animations/Stand/Gestures/WhatSThis_14",
"animations/Stand/Gestures/WhatSThis_15",
"animations/Stand/Gestures/WhatSThis_16",
"animations/Stand/Gestures/WhatSThis_2",
"animations/Stand/Gestures/WhatSThis_3",
"animations/Stand/Gestures/WhatSThis_4",
"animations/Stand/Gestures/WhatSThis_5",
"animations/Stand/Gestures/WhatSThis_6",
"animations/Stand/Gestures/WhatSThis_7",
"animations/Stand/Gestures/WhatSThis_8",
"animations/Stand/Gestures/WhatSThis_9",
"animations/Stand/Gestures/Whisper_1",
"animations/Stand/Gestures/Wings_1",
"animations/Stand/Gestures/Wings_2",
"animations/Stand/Gestures/Wings_3",
"animations/Stand/Gestures/Wings_4",
"animations/Stand/Gestures/Wings_5",
"animations/Stand/Gestures/Yes_1",
"animations/Stand/Gestures/Yes_2",
"animations/Stand/Gestures/Yes_3",
"animations/Stand/Gestures/YouKnowWhat_1",
"animations/Stand/Gestures/YouKnowWhat_2",
"animations/Stand/Gestures/YouKnowWhat_3",
"animations/Stand/Gestures/YouKnowWhat_4",
"animations/Stand/Gestures/YouKnowWhat_5",
"animations/Stand/Gestures/YouKnowWhat_6",
"animations/Stand/Gestures/You_1",
"animations/Stand/Gestures/You_2",
"animations/Stand/Gestures/You_3",
"animations/Stand/Gestures/You_4",
"animations/Stand/Gestures/You_5",
"animations/Stand/Gestures/Yum_1",
"animations/Stand/Reactions/EthernetOff_1",
"animations/Stand/Reactions/EthernetOn_1",
"animations/Stand/Reactions/Heat_1",
"animations/Stand/Reactions/Heat_2",
"animations/Stand/Reactions/LightShine_1",
"animations/Stand/Reactions/LightShine_2",
"animations/Stand/Reactions/LightShine_3",
"animations/Stand/Reactions/LightShine_4",
"animations/Stand/Reactions/SeeColor_1",
"animations/Stand/Reactions/SeeColor_2",
"animations/Stand/Reactions/SeeColor_3",
"animations/Stand/Reactions/SeeSomething_1",
"animations/Stand/Reactions/SeeSomething_3",
"animations/Stand/Reactions/SeeSomething_4",
"animations/Stand/Reactions/SeeSomething_5",
"animations/Stand/Reactions/SeeSomething_6",
"animations/Stand/Reactions/SeeSomething_7",
"animations/Stand/Reactions/SeeSomething_8",
"animations/Stand/Reactions/ShakeBody_1",
"animations/Stand/Reactions/ShakeBody_2",
"animations/Stand/Reactions/ShakeBody_3",
"animations/Stand/Reactions/TouchHead_1",
"animations/Stand/Reactions/TouchHead_2",
"animations/Stand/Reactions/TouchHead_3",
"animations/Stand/Reactions/TouchHead_4",
"animations/Stand/Waiting/AirGuitar_1",
"animations/Stand/Waiting/BackRubs_1",
"animations/Stand/Waiting/Bandmaster_1",
"animations/Stand/Waiting/Binoculars_1",
"animations/Stand/Waiting/BreathLoop_1",
"animations/Stand/Waiting/BreathLoop_2",
"animations/Stand/Waiting/BreathLoop_3",
"animations/Stand/Waiting/CallSomeone_1",
"animations/Stand/Waiting/Drink_1",
"animations/Stand/Waiting/DriveCar_1",
"animations/Stand/Waiting/Fitness_1",
"animations/Stand/Waiting/Fitness_2",
"animations/Stand/Waiting/Fitness_3",
"animations/Stand/Waiting/FunnyDancer_1",
"animations/Stand/Waiting/HappyBirthday_1",
"animations/Stand/Waiting/Helicopter_1",
"animations/Stand/Waiting/HideEyes_1",
"animations/Stand/Waiting/HideHands_1",
"animations/Stand/Waiting/Innocent_1",
"animations/Stand/Waiting/Knight_1",
"animations/Stand/Waiting/KnockEye_1",
"animations/Stand/Waiting/KungFu_1",
"animations/Stand/Waiting/LookHand_1",
"animations/Stand/Waiting/LookHand_2",
"animations/Stand/Waiting/LoveYou_1",
"animations/Stand/Waiting/Monster_1",
"animations/Stand/Waiting/MysticalPower_1",
"animations/Stand/Waiting/PlayHands_1",
"animations/Stand/Waiting/PlayHands_2",
"animations/Stand/Waiting/PlayHands_3",
"animations/Stand/Waiting/Relaxation_1",
"animations/Stand/Waiting/Relaxation_2",
"animations/Stand/Waiting/Relaxation_3",
"animations/Stand/Waiting/Relaxation_4",
"animations/Stand/Waiting/Rest_1",
"animations/Stand/Waiting/Robot_1",
"animations/Stand/Waiting/ScratchBack_1",
"animations/Stand/Waiting/ScratchBottom_1",
"animations/Stand/Waiting/ScratchEye_1",
"animations/Stand/Waiting/ScratchHand_1",
"animations/Stand/Waiting/ScratchHead_1",
"animations/Stand/Waiting/ScratchLeg_1",
"animations/Stand/Waiting/ScratchTorso_1",
"animations/Stand/Waiting/ShowMuscles_1",
"animations/Stand/Waiting/ShowMuscles_2",
"animations/Stand/Waiting/ShowMuscles_3",
"animations/Stand/Waiting/ShowMuscles_4",
"animations/Stand/Waiting/ShowMuscles_5",
"animations/Stand/Waiting/ShowSky_1",
"animations/Stand/Waiting/ShowSky_2",
"animations/Stand/Waiting/SpaceShuttle_1",
"animations/Stand/Waiting/Stretch_1",
"animations/Stand/Waiting/Stretch_2",
"animations/Stand/Waiting/TakePicture_1",
"animations/Stand/Waiting/Taxi_1",
"animations/Stand/Waiting/Think_1",
"animations/Stand/Waiting/Think_2",
"animations/Stand/Waiting/Think_3",
"animations/Stand/Waiting/Think_4",
"animations/Stand/Waiting/Waddle_1",
"animations/Stand/Waiting/Waddle_2",
"animations/Stand/Waiting/WakeUp_1",
"animations/Stand/Waiting/Zombie_1"]

View File

@@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
from robot_interface.state import state
from robot_interface.core.config import settings
class MainReceiver(ReceiverBase):
"""
The main receiver endpoint, responsible for handling ping and negotiation requests.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use, defaults to value in `settings.agent_settings.main_receiver_port`.
:type port: int
"""
def __init__(self, zmq_context, port=None):
if port is None:
port = settings.agent_settings.main_receiver_port
super(MainReceiver, self).__init__("main")
self.create_socket(zmq_context, zmq.REP, port, bind=False)
@staticmethod
def _handle_ping(message):
"""
Handle a ping request.
Returns the provided data in a standardized response dictionary.
:param message: The ping request message.
:type message: dict
:return: A response dictionary containing the original data.
:rtype: dict[str, str | list[dict]]
"""
return {"endpoint": "ping", "data": message.get("data")}
@staticmethod
def _handle_port_negotiation(message):
"""
Handle a port negotiation request.
Returns a list of all known endpoints and their descriptions.
:param message: The negotiation request message.
:type message: dict
:return: A response dictionary with endpoint descriptions as data.
:rtype: dict[str, list[dict]]
"""
endpoints = [socket.endpoint_description() for socket in state.sockets]
return {"endpoint": "negotiate/ports", "data": endpoints}
@staticmethod
def _handle_negotiation(message):
"""
Handle a negotiation request. Responds with ports that can be used to connect to the robot.
:param message: The negotiation request message.
:type message: dict
:return: A response dictionary with the negotiation result.
:rtype: dict[str, str | list[dict]]
"""
# In the future, the sender could send information like the robot's IP address, etc.
if message["endpoint"] == "negotiate/ports":
return MainReceiver._handle_port_negotiation(message)
return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."}
def handle_message(self, message):
"""
Main entry point for handling incoming messages.
Dispatches messages to the appropriate handler based on the endpoint.
:param message: The received message.
:type message: dict
:return: A response dictionary based on the requested endpoint.
:rtype: dict[str, str | list[dict]]
"""
if message["endpoint"] == "ping":
return self._handle_ping(message)
elif message["endpoint"].startswith("negotiate"):
return self._handle_negotiation(message)
return {"endpoint": "error", "data": "The requested endpoint is not supported."}

View File

@@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from abc import ABCMeta, abstractmethod
from robot_interface.endpoints.socket_base import SocketBase
class ReceiverBase(SocketBase, object):
"""Base class for receivers associated with a ZeroMQ socket."""
__metaclass__ = ABCMeta
@abstractmethod
def handle_message(self, message):
"""
Handle a message with the receiver.
:param message: The message to handle, must contain properties "endpoint" and "data".
:type message: dict
:return: A response message or None if this type of receiver doesn't publish.
:rtype: dict | None
"""
raise NotImplementedError()

View File

@@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from abc import ABCMeta
import zmq
from robot_interface.core.config import settings
class SocketBase(object):
"""
Base class for endpoints associated with a ZeroMQ socket.
:ivar identifier: The identifier of the endpoint.
:vartype identifier: str
:ivar port: The port used by the socket, set by `create_socket`.
:vartype port: int | None
:ivar socket: The ZeroMQ socket object, set by `create_socket`.
:vartype socket: zmq.Socket | None
:ivar bound: Whether the socket is bound or connected, set by `create_socket`.
:vartype bound: bool | None
"""
__metaclass__ = ABCMeta
name = None
socket = None
def __init__(self, identifier):
self.identifier = identifier
self.port = None # Set later by `create_socket`
self.socket = None # Set later by `create_socket`
self.bound = None # Set later by `create_socket`
def create_socket(self, zmq_context, socket_type, port, options=[], bind=True):
"""
Create a ZeroMQ socket.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param socket_type: The type of socket to create. Use zmq constants, e.g. zmq.SUB or zmq.REP.
:type socket_type: int
:param port: The port to use.
:type port: int
:param options: A list of tuples where the first element contains the option and the second the value.
:type options: list[tuple[int, int]]
:param bind: Whether to bind the socket or connect to it.
:type bind: bool
"""
self.port = port
self.socket = zmq_context.socket(socket_type)
for option, arg in options:
self.socket.setsockopt(option,arg)
self.bound = bind
if bind:
self.socket.bind("tcp://*:{}".format(port))
else:
self.socket.connect("tcp://{}:{}".format(settings.agent_settings.control_backend_host, port))
def close(self):
"""Close the ZeroMQ socket."""
if not self.socket: return
self.socket.close()
self.socket = None
def endpoint_description(self):
"""
Description of the endpoint. Used for negotiation.
:return: A dictionary with the following keys: id, port, bind. See API specification at:
https://utrechtuniversity.youtrack.cloud/articles/N25B-A-14/RI-CB-Communication#negotiation
:rtype: dict
"""
return {
"id": self.identifier,
"port": self.port,
"bind": not self.bound
}

View File

@@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import zmq
import threading
import logging
import struct
from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state
from robot_interface.core.config import settings
class VideoSender(SocketBase):
"""
Video sender endpoint, responsible for sending video frames.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use for sending video frames.
:type port: int
"""
def __init__(self, zmq_context, port=settings.agent_settings.video_sender_port):
super(VideoSender, self).__init__("video")
self.create_socket(zmq_context, zmq.PUB, port, [(zmq.SNDHWM,3)])
def start_video_rcv(self):
"""
Prepares arguments for retrieving video images from Pepper and starts video loop on a separate thread.
Will not start if no qi session is available.
"""
if not state.qi_session:
logging.info("No Qi session available. Not starting video loop.")
return
video = state.qi_session.service("ALVideoDevice")
video_settings = settings.video_config
camera_index = video_settings.camera_index
kQVGA = video_settings.resolution
kRGB = video_settings.color_space
FPS = video_settings.fps
video_name = video_settings.stream_name
vid_stream_name = video.subscribeCamera(video_name, camera_index, kQVGA, kRGB, FPS)
thread = threading.Thread(target=self.video_rcv_loop, args=(video, vid_stream_name))
thread.start()
def video_rcv_loop(self, vid_service, vid_stream_name):
"""
The main loop of retrieving video images from the robot.
:param vid_service: The video service object that the active Qi session is connected to.
:type vid_service: Object (Qi service object)
:param vid_stream_name: The name of a camera subscription on the video service object vid_service
:type vid_stream_name: str
"""
try:
while not state.exit_event.is_set():
try:
img = vid_service.getImageRemote(vid_stream_name)
if img is not None:
raw_data = img[6]
width = img[0]
height = img[1]
width_bytes = struct.pack('<I', width)
height_bytes = struct.pack('<I', height)
self.socket.send_multipart([width_bytes, height_bytes, raw_data])
except KeyboardInterrupt:
logging.info("Video receiving loop interrupted by user.")
except:
logging.warn("Failed to retrieve video image from robot.")
finally:
vid_service.unsubscribe(vid_stream_name)
logging.info("Unsubscribed from video stream.")

105
src/robot_interface/main.py Normal file
View File

@@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
from robot_interface.endpoints.audio_sender import AudioSender
logging.basicConfig(level=logging.DEBUG)
import zmq
from robot_interface.endpoints.actuation_receiver import ActuationReceiver
from robot_interface.endpoints.main_receiver import MainReceiver
from robot_interface.endpoints.video_sender import VideoSender
from robot_interface.state import state
from robot_interface.core.config import settings
from robot_interface.utils.timeblock import TimeBlock
def main_loop(context):
"""
Run the main loop, handling all incoming requests like pings, negotiation, actuation, etc.
:param context: The ZeroMQ context to use.
:type context: zmq.Context
"""
# When creating sockets, remember to add them to the `sockets` list of the state to ensure they're deinitialized
main_receiver = MainReceiver(context)
state.sockets.append(main_receiver)
actuation_receiver = ActuationReceiver(context)
state.sockets.append(actuation_receiver)
video_sender = VideoSender(context)
state.sockets.append(video_sender)
audio_sender = AudioSender(context)
state.sockets.append(audio_sender)
video_sender.start_video_rcv()
audio_sender.start()
# Sockets that can run on the main thread. These sockets' endpoints should not block for long (say 50 ms at most).
receivers = [main_receiver, actuation_receiver]
poller = zmq.Poller()
for receiver in receivers:
poller.register(receiver.socket, zmq.POLLIN)
logging.debug("Starting main loop.")
while True:
if state.exit_event.is_set(): break
socks = dict(poller.poll(settings.main_config.poll_timeout_ms))
for receiver in receivers:
if receiver.socket not in socks: continue
message = receiver.socket.recv_json()
if not isinstance(message, dict) or "endpoint" not in message or "data" not in message:
logging.error("Received message of unexpected format: {}".format(message))
continue
def overtime_callback(time_ms):
"""
A callback function executed by TimeBlock if the message handling
exceeds the allowed time limit.
:param time_ms: The elapsed time, in milliseconds, that the block took.
:type time_ms: float
"""
logging.warn("Endpoint \"%s\" took too long (%.2f ms) on the main thread.",
message["endpoint"], time_ms)
with TimeBlock(overtime_callback, settings.main_config.max_handler_time_ms):
response = receiver.handle_message(message)
if receiver.socket.getsockopt(zmq.TYPE) == zmq.REP:
receiver.socket.send_json(response)
def main():
"""
Initializes the ZeroMQ context and the application state.
It executes the main event loop (`main_loop`) and ensures that both the
application state and the ZeroMQ context are properly cleaned up (deinitialized/terminated)
upon exit, including handling a KeyboardInterrupt.
"""
context = zmq.Context()
state.initialize()
try:
main_loop(context)
except KeyboardInterrupt:
logging.info("User interrupted.")
finally:
state.deinitialize()
context.term()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,107 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
import signal
import threading
from robot_interface.utils.qi_utils import get_qi_session
class State(object):
"""
Do not create an instance of this class directly: use the instance `state` below. This state must be initiated once,
probably when your program starts.
This class is used to share state between threads. For example, when the program is quit, that all threads can
detect this via the `exit_event` property being set.
:ivar is_initialized: Flag indicating whether the state setup (exit handlers, QI session) has completed.
:vartype is_initialized: bool
:ivar exit_event: A thread event used to signal all threads that the program is shutting down.
:vartype exit_event: threading.Event | None
:ivar sockets: A list of ZeroMQ socket wrappers (`SocketBase`) that need to be closed during deinitialization.
:vartype sockets: List[SocketBase]
:ivar qi_session: The QI session object used for interaction with the robot/platform services.
:vartype qi_session: None | qi.Session
"""
def __init__(self):
self.is_initialized = False
self.exit_event = None
self.sockets = []
self.qi_session = None
self.is_speaking = False
def initialize(self):
"""
Sets up the application state. Creates the thread exit event, registers
signal handlers (`SIGINT`, `SIGTERM`) for graceful shutdown, and
establishes the QI session.
"""
if self.is_initialized:
logging.warn("Already initialized")
return
self.exit_event = threading.Event()
def handle_exit(_, __):
logging.info("Exiting.")
self.exit_event.set()
signal.signal(signal.SIGINT, handle_exit)
signal.signal(signal.SIGTERM, handle_exit)
self.qi_session = get_qi_session()
self.is_initialized = True
def deinitialize(self):
"""
Closes all sockets stored in the `sockets` list.
"""
if not self.is_initialized: return
for socket in self.sockets:
socket.close()
self.is_initialized = False
def __getattribute__(self, name):
"""
Custom attribute access method that enforces a check: the state must be
fully initialized before any non-setup attributes (like `sockets` or `qi_session`)
can be accessed.
:param name: The name of the attribute being accessed.
:type name: str
:return: The value of the requested attribute.
:rtype: Any
"""
if name in (
"initialize",
"deinitialize",
"is_initialized",
"__dict__",
"__class__",
"__doc__"):
return object.__getattribute__(self, name)
if not object.__getattribute__(self, "is_initialized"):
# Special case for the exit_event: if the event is set, return it without an error
if name == "exit_event":
exit_event = object.__getattribute__(self, "exit_event")
if exit_event and exit_event.is_set(): return exit_event
raise RuntimeError("State must be initialized before accessing '%s'" % name)
return object.__getattribute__(self, name)
# Must call `.initialize` before use
state = State()

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import os
from dotenv import load_dotenv
load_dotenv()
def get_config(value, env, default, cast=None):
"""
Small utility to get a configuration value, returns `value` if it is not None, else it will try to get the
environment variable cast with `cast`. If the environment variable is not set, it will return `default`.
:param value: The value to check.
:type value: Any
:param env: The environment variable to check.
:type env: string
:param default: The default value to return if the environment variable is not set.
:type default: Any
:param cast: A function to use to cast the environment variable. Must support string input.
:type cast: Callable[[Any], Any], optional
:return: The value, the environment variable value, or the default.
:rtype: Any
"""
if value is not None:
return value
env = os.environ.get(env, default)
if cast is None:
return env
return cast(env)

View File

@@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that `print` can print Unicode characters in names
import logging
import sys
logger = logging.getLogger(__name__)
def get_microphones(audio):
"""
Get audio devices which have input channels.
:param audio: An instance of PyAudio to use.
:type audio: pyaudio.PyAudio
:return: An interator of PaAudio dicts containing information about the microphone devices.
:rtype: Iterator[dict]
"""
for i in range(audio.get_device_count()):
device = audio.get_device_info_by_index(i)
if device["maxInputChannels"] > 0:
yield device
def choose_mic_interactive(audio):
"""
Choose a microphone to use, interactively in the CLI.
:param audio: An instance of PyAudio to use.
:type audio: pyaudio.PyAudio
:return: A dictionary from PyAudio containing information about the microphone to use, or None
if there is no microphone.
:rtype: dict | None
"""
microphones = list(get_microphones(audio))
if len(microphones) == 0: return None
print("Found {} microphones:".format(len(microphones)))
for i, mic in enumerate(microphones):
print("- {}: {}".format(i, mic["name"]))
chosen_microphone = None
while chosen_microphone is None:
chosen = raw_input("Which device would you like to use?\n> ")
try:
chosen = int(chosen)
if chosen < 0 or chosen >= len(microphones): raise ValueError()
chosen_microphone = microphones[chosen]
except ValueError:
print("Please enter a number between 0 and {}".format(len(microphones)-1))
logger.info("Chose microphone \"{}\"".format(chosen_microphone["name"]))
return chosen_microphone
def choose_mic_default(audio):
"""
Get the system's default microphone to use.
:param audio: An instance of PyAudio to use.
:type audio: pyaudio.PyAudio
:return: A dictionary from PyAudio containing information about the microphone to use, or None
if there is no microphone.
:rtype: dict | None
"""
try:
return audio.get_default_input_device_info()
except IOError:
return None
def choose_mic_arguments(audio):
"""
Get a microphone to use from command line arguments.
:param audio: An instance of PyAudio to use.
:type audio: pyaudio.PyAudio
:return: A dictionary from PyAudio containing information about the microphone to use, or None
if there is no microphone satisfied by the arguments.
:rtype: dict | None
"""
microphone_name = None
for i, arg in enumerate(sys.argv):
if arg == "--microphone" and len(sys.argv) > i+1:
microphone_name = sys.argv[i+1].strip()
if arg.startswith("--microphone="):
pre_fix_len = len("--microphone=")
microphone_name = arg[pre_fix_len:].strip()
if not microphone_name: return None
available_mics = list(get_microphones(audio))
for mic in available_mics:
if mic["name"] == microphone_name:
return mic
available_mic_names = [mic["name"] for mic in available_mics]
logger.warning("Microphone \"{}\" not found. Choose one of {}"
.format(microphone_name, available_mic_names))
return None
def choose_mic(audio):
"""
Get a microphone to use. Firstly, tries to see if there's an application argument specifying the
microphone to use. If not, get the default microphone.
:param audio: An instance of PyAudio to use.
:type audio: pyaudio.PyAudio
:return: A dictionary from PyAudio containing information about the microphone to use, or None
if there is no microphone.
:rtype: dict | None
"""
chosen_mic = choose_mic_arguments(audio)
if chosen_mic: return chosen_mic
return choose_mic_default(audio)

View File

@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging
import sys
try:
import qi
except ImportError:
qi = None
def get_qi_session():
"""
Create and return a Qi session if available.
:return: The active Qi session or ``None`` if unavailable.
:rtype: qi.Session | None
"""
if qi is None:
logging.info("Unable to import qi. Running in stand-alone mode.")
return None
if "--qi-url" not in sys.argv:
logging.info("No Qi URL argument given. Running in stand-alone mode.")
return None
try:
app = qi.Application()
app.start()
return app.session
except RuntimeError:
logging.info("Unable to connect to the robot. Running in stand-alone mode.")
return None

View File

@@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import time
class TimeBlock(object):
"""
A context manager that times the execution of the block it contains. If execution exceeds the
limit, or if no limit is given, the callback will be called with the time that the block took.
:param callback: The callback function that is called when the block of code is over,
unless the code block did not exceed the time limit.
:type callback: Callable[[float], None]
:param limit_ms: The number of milliseconds the block of code is allowed to take. If it
exceeds this time, or if it's None, the callback function will be called with the time the
block took.
:type limit_ms: int | None
:ivar limit_ms: The number of milliseconds the block of code is allowed to take.
:vartype limit_ms: float | None
:ivar callback: The callback function that is called when the block of code is over.
:vartype callback: Callable[[float], None]
ivar start: The start time of the block, set when entering the context.
:vartype start: float | None
"""
def __init__(self, callback, limit_ms=None):
self.limit_ms = float(limit_ms) if limit_ms is not None else None
self.callback = callback
self.start = None
def __enter__(self):
"""
Enter the context manager and record the start time.
:return: Returns itself so timing information can be accessed if needed.
:rtype: TimeBlock
"""
self.start = time.time()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""
Exit the context manager, calculate the elapsed time, and call the callback
if the time limit was exceeded or not provided.
:param exc_type: The exception type, or None if no exception occurred.
:type exc_type: Type[BaseException] | None
:param exc_value: The exception instance, or None if no exception occurred.
:type exc_value: BaseException | None
:param traceback: The traceback object, or None if no exception occurred.
:type traceback: TracebackType | None
"""
elapsed = (time.time() - self.start) * 1000.0 # ms
if self.limit_ms is None or elapsed > self.limit_ms:
self.callback(elapsed)

6
test/common/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -0,0 +1,197 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that we can format strings with Unicode characters
import random
import sys
from StringIO import StringIO
from robot_interface.utils.microphone import (
choose_mic_default,
choose_mic_interactive,
choose_mic_arguments,
choose_mic,
get_microphones,
)
class MicrophoneUtils(object):
"""Shared tests for any PyAudio-like implementation, e.g. mock and real."""
def test_choose_mic_default(self, pyaudio_instance):
"""
Tests that the default microphone selection function returns a valid
microphone dictionary containing all necessary keys with correct types and values.
The result must contain at least "index", as this is used to identify the microphone,
and "name" for logging. It must have one or more channels (`maxInputChannels`),
and a default sample rate of at least 16000 Hz.
"""
result = choose_mic_default(pyaudio_instance)
assert "index" in result
assert isinstance(result["index"], (int, long))
assert "name" in result
assert isinstance(result["name"], (str, unicode))
assert "maxInputChannels" in result
assert isinstance(result["maxInputChannels"], (int, long))
assert result["maxInputChannels"] > 0
assert "defaultSampleRate" in result
assert isinstance(result["defaultSampleRate"], float)
assert result["defaultSampleRate"] >= 16000
def test_choose_mic_interactive_input_not_int(self, pyaudio_instance, mocker):
"""
Tests the robustness of the interactive selection when the user first enters
a non-integer value, ensuring the system prompts again without error and accepts
a valid integer on the second attempt.
"""
microphones = get_microphones(pyaudio_instance)
target_microphone = next(microphones)
mock_input = mocker.patch("__builtin__.raw_input", side_effect=["not an integer", "0"])
fake_out = StringIO()
mocker.patch.object(sys, "stdout", fake_out)
result = choose_mic_interactive(pyaudio_instance)
assert "index" in result
assert isinstance(result["index"], (int, long))
assert result["index"] == target_microphone["index"]
assert mock_input.called
assert any(p.startswith("Please enter a number") for p in fake_out.getvalue().splitlines())
def test_choose_mic_interactive_negative_index(self, pyaudio_instance, mocker):
"""
Tests that the interactive selection method prevents the user from entering
a negative integer as a microphone index.
"""
microphones = get_microphones(pyaudio_instance)
target_microphone = next(microphones)
mock_input = mocker.patch("__builtin__.raw_input", side_effect=["-1", "0"])
fake_out = StringIO()
mocker.patch.object(sys, "stdout", fake_out)
result = choose_mic_interactive(pyaudio_instance)
assert "index" in result
assert isinstance(result["index"], (int, long))
assert result["index"] == target_microphone["index"]
assert mock_input.called
assert any(p.startswith("Please enter a number") for p in fake_out.getvalue().splitlines())
def test_choose_mic_interactive_index_too_high(self, pyaudio_instance, mocker):
"""
Tests that the interactive selection method prevents the user from entering
an index that exceeds the total number of available microphones.
"""
real_count = len(list(get_microphones(pyaudio_instance)))
mock_input = mocker.patch("__builtin__.raw_input", side_effect=[str(real_count), "0"])
fake_out = StringIO()
mocker.patch.object(sys, "stdout", fake_out)
result = choose_mic_interactive(pyaudio_instance)
assert "index" in result
assert isinstance(result["index"], (int, long))
assert mock_input.called
assert any(p.startswith("Please enter a number") for p in fake_out.getvalue().splitlines())
def test_choose_mic_interactive_random_index(self, pyaudio_instance, mocker):
"""
Tests the core interactive functionality by simulating the selection of a
random valid microphone index and verifying that the correct microphone
information is returned.
"""
microphones = list(get_microphones(pyaudio_instance))
random_index = random.randrange(len(microphones))
mocker.patch("__builtin__.raw_input", side_effect=[str(random_index)])
result = choose_mic_interactive(pyaudio_instance)
assert "index" in result
assert isinstance(result["index"], (int, long))
assert result["index"] == microphones[random_index]["index"]
def test_choose_mic_no_arguments(self, pyaudio_instance, mocker):
"""
Tests `choose_mic_arguments` when no command-line arguments are provided,
"""
mocker.patch.object(sys, "argv", [])
result = choose_mic_arguments(pyaudio_instance)
assert result is None
def test_choose_mic_arguments(self, pyaudio_instance, mocker):
"""
Tests `choose_mic_arguments` when the microphone name is passed as a separate
argument.
"""
for mic in get_microphones(pyaudio_instance):
mocker.patch.object(sys, "argv", ["--microphone", mic["name"]])
result = choose_mic_arguments(pyaudio_instance)
assert result is not None
assert result == mic
def test_choose_mic_arguments_eq(self, pyaudio_instance, mocker):
"""
Tests `choose_mic_arguments` when the microphone name is passed using an
equals sign (`--microphone=NAME`).
"""
for mic in get_microphones(pyaudio_instance):
mocker.patch.object(sys, "argv", ["--microphone={}".format(mic["name"])])
result = choose_mic_arguments(pyaudio_instance)
assert result is not None
assert result == mic
def test_choose_mic_arguments_not_exist(self, pyaudio_instance, mocker):
"""
Tests `choose_mic_arguments` when a non-existent microphone name is passed
via command-line arguments, expecting the function to return None.
"""
mocker.patch.object(sys, "argv", ["--microphone", "Surely this microphone doesn't exist"])
result = choose_mic_arguments(pyaudio_instance)
assert result is None
def test_choose_mic_with_argument(self, pyaudio_instance, mocker):
"""
Tests `choose_mic` function when a valid microphone is
specified via command-line arguments.
"""
mic = next(get_microphones(pyaudio_instance))
mocker.patch.object(sys, "argv", ["--microphone", mic["name"]])
result = choose_mic(pyaudio_instance)
assert result is not None
assert result == mic
def test_choose_mic_no_argument(self, pyaudio_instance, mocker):
"""
Tests `choose_mic` function when no command-line arguments
are provided, verifying that the function falls back correctly to the
system's default microphone selection.
"""
default_mic = choose_mic_default(pyaudio_instance)
mocker.patch.object(sys, "argv", [])
result = choose_mic(pyaudio_instance)
assert result is not None
assert result == default_mic

17
test/conftest.py Normal file
View File

@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from mock import patch, MagicMock
import pytest
@pytest.fixture(autouse=True)
def mock_zmq_context():
with patch("zmq.Context") as mock:
mock.instance.return_value = MagicMock()
yield mock

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from mock import patch, mock
from robot_interface.core.config import Settings
from robot_interface.endpoints.main_receiver import MainReceiver
def test_environment_variables(monkeypatch):
"""
When environment variables are set, creating settings should use these.
"""
monkeypatch.setenv("AGENT__CONTROL_BACKEND_HOST", "some_value_that_should_be_different")
settings = Settings()
assert settings.agent_settings.control_backend_host == "some_value_that_should_be_different"
@patch("robot_interface.endpoints.main_receiver.settings")
@patch("robot_interface.endpoints.socket_base.settings")
def test_create_endpoint_custom_host(base_settings, main_settings):
"""
When a custom host is given in the settings, check that an endpoint's socket connects to it.
"""
fake_context = mock.Mock()
fake_socket = mock.Mock()
fake_context.socket.return_value = fake_socket
base_settings.agent_settings.control_backend_host = "not_localhost"
main_settings.agent_settings.main_receiver_port = 9999
_ = MainReceiver(fake_context)
fake_socket.connect.assert_called_once_with("tcp://not_localhost:9999")

View File

@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pyaudio
import pytest
from common.microphone_utils import MicrophoneUtils
@pytest.fixture
def pyaudio_instance():
"""
A pytest fixture that provides an initialized PyAudio instance for tests
requiring microphone access.
It first initializes PyAudio. If a default input device (microphone) is not
found, the test is skipped to avoid failures in environments
without a mic.
:return: An initialized PyAudio instance.
:rtype: pyaudio.PyAudio
"""
audio = pyaudio.PyAudio()
try:
audio.get_default_input_device_info()
return audio
except IOError:
pytest.skip("No microphone available to test with.")
class TestAudioIntegration(MicrophoneUtils):
"""Run shared audio behavior tests with the mock implementation."""
pass

6
test/unit/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -0,0 +1,493 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import sys
import mock
import pytest
import zmq
from robot_interface.endpoints.actuation_receiver import ActuationReceiver
from robot_interface.endpoints.gesture_settings import GestureTags
@pytest.fixture
def zmq_context():
"""
A pytest fixture that creates and yields a ZMQ context.
:return: An initialized ZeroMQ context.
:rtype: zmq.Context
"""
context = zmq.Context()
yield context
def test_force_speech_clears_queue(mocker):
"""
Tests that a force speech message clears the existing queue
and places the high-priority message at the front.
"""
mocker.patch("threading.Thread")
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tts_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._message_queue.put("old_message_1")
receiver._message_queue.put("old_message_2")
assert receiver._message_queue.qsize() == 2
force_msg = {
"endpoint": "actuate/speech",
"data": "Emergency Notification",
"is_priority": True,
}
receiver.handle_message(force_msg)
assert receiver._message_queue.qsize() == 1
queued_item = receiver._message_queue.get()
assert queued_item == "Emergency Notification"
def test_handle_unimplemented_endpoint(mocker):
"""
Tests handling of unknown endpoints.
"""
mocker.patch("threading.Thread")
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver.handle_message({
"endpoint": "some_endpoint_that_definitely_does_not_exist",
"data": None,
})
def test_speech_message_no_data(mocker):
"""
Tests that if the message data is empty, the receiver returns immediately
WITHOUT attempting to access the global robot state or session.
"""
# 1. Prevent background threads from running
mocker.patch("threading.Thread")
# 2. Mock the global state object
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# 3. Create a PropertyMock to track whenever 'qi_session' is accessed
# We attach it to the class type of the mock so it acts like a real property
mock_session_prop = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_session_prop
# 4. Initialize Receiver (Mocking the context to avoid ZMQ errors)
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# 5. Send empty data
receiver.handle_message({"endpoint": "actuate/speech", "data": ""})
# 6. Assertion:
# Because the code does `if not text: return` BEFORE `if not state.qi_session`,
# the state property should NEVER be read.
mock_session_prop.assert_not_called()
def test_speech_message_invalid_data(mocker):
"""
Tests that if the message data is not a string, the function returns.
:param mocker: Description
"""
mocker.patch("threading.Thread")
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_session_prop = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_session_prop
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver.handle_message({"endpoint": "actuate/speech", "data": True})
# Because the code does `if not text: return` BEFORE `if not state.qi_session`,
# the state property should NEVER be read.
mock_session_prop.assert_not_called()
def test_speech_no_qi(mocker):
"""
Tests the actuation receiver's behavior when processing a speech request
but the global state does not have an active QI session.
"""
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi_session = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_qi_session
mock_tts_service = mock.Mock()
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._tts_service = mock_tts_service
receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."})
receiver._tts_service.assert_not_called()
def test_speech(mocker):
"""
Tests the core speech actuation functionality by mocking the QI TextToSpeech
service and verifying that the received message is put into the queue.
"""
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tts_service = mock.Mock()
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._tts_service = None
receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."})
assert receiver._message_queue.qsize() == 1
queued_item = receiver._message_queue.get()
assert queued_item == "Some message to speak."
def test_speech_priority(mocker):
"""
Tests that a priority speech message is handled correctly by clearing the queue
and placing the priority message at the front.
"""
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tts_service = mock.Mock()
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._message_queue.put("old_message_1")
receiver._message_queue.put("old_message_2")
assert receiver._message_queue.qsize() == 2
priority_msg = {
"endpoint": "actuate/speech",
"data": "Urgent Message",
"is_priority": True,
}
receiver._handle_speech(priority_msg)
assert receiver._message_queue.qsize() == 1
queued_item = receiver._message_queue.get()
assert queued_item == "Urgent Message"
def test_handle_messages_loop(mocker):
"""
Tests the background consumer loop (_handle_messages) processing an item.
Runs SYNCHRONOUSLY to ensure coverage tools pick up the lines.
"""
# Patch Thread so the real background thread NEVER starts automatically
mocker.patch("threading.Thread")
# Mock state
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Setup initial speaking state to False (covers "Started speaking" print)
mock_state.is_speaking = False
# Mock the TextToSpeech service
mock_tts_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
# Initialize receiver (Thread is patched, so no thread starts)
# Use Mock Context to avoid ZMQ errors
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Manually inject service (since lazy loading might handle it, but this is safer)
receiver._tts_service = mock_tts_service
# This ensures the while loop iterates exactly once
mock_state.exit_event.is_set.side_effect = [False, True]
# Put an item in the queue
receiver._message_queue.put("Hello World")
# RUN MANUALLY in the main thread
# This executes the code: while -> try -> get -> if print -> speaking=True -> say
receiver._handle_messages()
# Assertions
assert receiver._message_queue.empty()
mock_tts_service.say.assert_called_with("Hello World")
assert mock_state.is_speaking is True
def test_handle_messages_queue_empty(mocker):
"""
Tests the Queue.Empty exception handler in the consumer loop.
This covers the logic that resets 'state.is_speaking' to False.
"""
# Prevent the real background thread from starting
mocker.patch("threading.Thread")
# Mock the state object
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Setup 'is_speaking' property mock
# We set return_value=True so the code enters the 'if state.is_speaking:' block.
# We use PropertyMock to track when this attribute is set.
type(mock_state).is_speaking = True
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# This ensures the while loop body runs exactly once for our test
mock_state.exit_event.is_set.side_effect = [False, True]
# Force get() to raise Queue.Empty immediately (simulate timeout)
# We patch the 'get' method on the specific queue instance of our receiver
#mocker.patch.object(receiver._message_queue, 'get', side_effect=Queue.Empty)
# Run the loop logic manually (synchronously)
receiver._handle_messages()
# Final Assertion: Verify is_speaking was set to False
# The code execution order is: read (returns True) -> print -> set (to False)
# assert_called_with checks the arguments of the LAST call, which is the setter.
assert mock_state.is_speaking is False
def test_handle_messages_runtime_error(mocker):
"""
Tests the RuntimeError exception handler (e.g. lost WiFi connection).
Uses a Mock ZMQ context to avoid 'Address already in use' errors.
"""
# Patch Thread so we don't accidentally spawn real threads
mocker.patch("threading.Thread")
# Mock the state and logging
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Use a MOCK ZMQ context.
# This prevents the receiver from trying to bind to a real TCP port.
mock_zmq_ctx = mock.Mock()
# Initialize receiver with the mock context
receiver = ActuationReceiver(mock_zmq_ctx)
mock_state.exit_event.is_set.side_effect = [False, True]
receiver._message_queue.put("Test Message")
# Setup: ...BUT the service raises RuntimeError when asked to speak
mock_tts = mock.Mock()
mock_tts.say.side_effect = RuntimeError("Connection lost")
receiver._tts_service = mock_tts
# Run the loop logic manually
receiver._handle_messages()
# Assertions
assert mock_state.exit_event.is_set.called
def test_clear_queue(mocker):
"""
Tests that the clear_queue method properly drains all items from the message queue.
"""
mocker.patch("threading.Thread")
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Populate the queue with multiple items
receiver._message_queue.put("msg1")
receiver._message_queue.put("msg2")
receiver._message_queue.put("msg3")
assert receiver._message_queue.qsize() == 3
# Clear the queue
receiver.clear_queue()
# Assert the queue is empty
assert receiver._message_queue.qsize() == 0
def test_gesture_no_data(zmq_context, mocker):
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": ""}, True)
# Just ensuring no crash
def test_gesture_invalid_data(zmq_context, mocker):
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": 123}, True)
# No crash expected
def test_gesture_single_not_found(zmq_context, mocker):
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["wave", "bow"] # allowed single gestures
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "unknown_gesture"}, True)
# No crash expected
def test_gesture_tag_not_found(zmq_context, mocker):
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.tags = ["happy", "sad"]
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "not_a_tag"}, False)
# No crash expected
def test_gesture_no_qi_session(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.qi_session = None
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["hello"]
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "hello"}, True)
# No crash, path returns early
def test_gesture_single_success(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
# Setup gesture settings
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["wave"]
mock_animation_service = mock.Mock()
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "wave"}, True)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run
assert getattr(mock_qi, "async").call_args[0][1] == "wave"
def test_gesture_tag_success(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.tags = ["greeting"]
mock_animation_service = mock.Mock()
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "greeting"}, False)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.runTag
assert getattr(mock_qi, "async").call_args[0][1] == "greeting"
def test_handle_message_all_routes(zmq_context, mocker):
"""
Ensures all handle_message endpoint branches route correctly.
"""
receiver = ActuationReceiver(zmq_context)
mock_speech = mocker.patch.object(receiver, "_handle_speech")
mock_gesture = mocker.patch.object(receiver, "_handle_gesture")
receiver.handle_message({"endpoint": "actuate/speech", "data": "hi"})
receiver.handle_message({"endpoint": "actuate/gesture/tag", "data": "greeting"})
receiver.handle_message({"endpoint": "actuate/gesture/single", "data": "wave"})
mock_speech.assert_called_once()
assert mock_gesture.call_count == 2
def test_endpoint_description(zmq_context, mocker):
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.tags = ["happy"]
mock_tags.single_gestures = ["wave"]
receiver = ActuationReceiver(zmq_context)
desc = receiver.endpoint_description()
assert "gestures" in desc
assert desc["gestures"] == ["happy"]
assert "single_gestures" in desc
assert desc["single_gestures"] == ["wave"]
def test_gesture_single_real_gesturetags(zmq_context, mocker):
"""
Uses the real GestureTags (no mocking) to ensure the receiver
references GestureTags.single_gestures correctly.
"""
# Ensure qi session exists so we pass the early return
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.qi_session = mock.Mock()
# Mock qi.async to avoid real async calls
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
# Mock animation service
mock_animation_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context)
# Pick a real gesture from GestureTags.single_gestures
assert len(GestureTags.single_gestures) > 0, "GestureTags.single_gestures must not be empty"
gesture = GestureTags.single_gestures[0]
receiver._handle_gesture(
{"endpoint": "actuate/gesture/single", "data": gesture},
is_single=True,
)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run
assert getattr(mock_qi, "async").call_args[0][1] == gesture

View File

@@ -0,0 +1,184 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import os
import mock
import pytest
import zmq
from robot_interface.endpoints.audio_sender import AudioSender
@pytest.fixture
def zmq_context():
"""
A pytest fixture that creates and yields a ZMQ context.
:return: An initialized ZeroMQ context.
:rtype: zmq.Context
"""
context = zmq.Context()
yield context
def test_no_microphone(zmq_context, mocker):
"""
Tests the scenario where no valid microphone can be chosen for recording.
"""
mock_info_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger.info")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = None
sender = AudioSender(zmq_context)
assert sender.microphone is None
sender.start()
assert sender.thread is None
mock_info_logger.assert_called()
sender.wait_until_done() # Should return early because we didn't start a thread
def test_unicode_mic_name(zmq_context, mocker):
"""
Tests the robustness of the `AudioSender` when handling microphone names
that contain Unicode characters.
"""
mocker.patch("robot_interface.endpoints.audio_sender.threading")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"• Some Unicode name"}
sender = AudioSender(zmq_context)
assert sender.microphone is not None
# `.start()` logs the name of the microphone. It should not give an error if it contains Unicode
# symbols.
sender.start()
assert sender.thread is not None
sender.wait_until_done() # Should return instantly because we didn't start a real thread
def _fake_read(num_frames):
"""
Helper function to simulate reading raw audio data from a microphone stream.
"""
return os.urandom(num_frames * 4)
def test_sending_audio(mocker):
"""
Tests the successful sending of audio data over a ZeroMQ socket.
"""
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
send_socket = mock.Mock()
mock_state.is_speaking = False
# If there's something wrong with the microphone, it will raise an IOError when `read`ing.
stream = mock.Mock()
stream.read = _fake_read
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.audio.open = mock.Mock()
sender.audio.open.return_value = stream
sender.start()
sender.wait_until_done()
send_socket.assert_called()
def test_no_sending_if_speaking(mocker):
"""
Tests the successful sending of audio data over a ZeroMQ socket.
"""
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
send_socket = mock.Mock()
mock_state.is_speaking = True
# If there's something wrong with the microphone, it will raise an IOError when `read`ing.
stream = mock.Mock()
stream.read = _fake_read
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.audio.open = mock.Mock()
sender.audio.open.return_value = stream
sender.start()
sender.wait_until_done()
send_socket.assert_not_called()
def _fake_read_error(num_frames):
"""
Helper function to simulate an I/O error during microphone stream reading.
"""
raise IOError()
def test_break_microphone(mocker):
"""
Tests the error handling when the microphone stream breaks (raises an IOError).
"""
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
send_socket = mock.Mock()
# If there's something wrong with the microphone, it will raise an IOError when `read`ing.
stream = mock.Mock()
stream.read = _fake_read_error
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.audio.open = mock.Mock()
sender.audio.open.return_value = stream
sender.start()
sender.wait_until_done()
send_socket.assert_not_called()
def test_pyaudio_init_failure(mocker, zmq_context):
"""
Tests the behavior when PyAudio initialization fails (raises an IOError).
"""
# Prevent binding the ZMQ socket
mocker.patch("robot_interface.endpoints.audio_sender.AudioSender.create_socket")
# Simulate PyAudio() failing
mocker.patch(
"robot_interface.endpoints.audio_sender.pyaudio.PyAudio",
side_effect=IOError("boom")
)
sender = AudioSender(zmq_context)
assert sender.audio is None
assert sender.microphone is None

View File

@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from robot_interface.utils.get_config import get_config
def test_get_config_prefers_explicit_value(monkeypatch):
"""
When a direct value is provided it should be returned without reading the environment.
"""
monkeypatch.setenv("GET_CONFIG_TEST", "from-env")
result = get_config("explicit", "GET_CONFIG_TEST", "default")
assert result == "explicit"
def test_get_config_returns_env_value(monkeypatch):
"""
If value is None the environment variable should be used.
"""
monkeypatch.setenv("GET_CONFIG_TEST", "from-env")
result = get_config(None, "GET_CONFIG_TEST", "default")
assert result == "from-env"
def test_get_config_casts_env_value(monkeypatch):
"""
The env value should be cast when a cast function is provided.
"""
monkeypatch.setenv("GET_CONFIG_PORT", "1234")
result = get_config(None, "GET_CONFIG_PORT", 0, int)
assert result == 1234
def test_get_config_casts_default_when_env_missing(monkeypatch):
"""
When the env var is missing it should fall back to the default and still apply the cast.
"""
monkeypatch.delenv("GET_CONFIG_MISSING", raising=False)
result = get_config(None, "GET_CONFIG_MISSING", "42", int)
assert result == 42

229
test/unit/test_main.py Normal file
View File

@@ -0,0 +1,229 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pytest
import threading
import zmq
import robot_interface.main as main_mod
from robot_interface.state import state
class FakeSocket:
"""Mock ZMQ socket for testing."""
def __init__(self, socket_type, messages=None):
self.socket_type = socket_type
self.messages = messages or []
self.sent = []
self.closed = False
def recv_json(self):
if not self.messages:
raise RuntimeError("No more messages")
return self.messages.pop(0)
def send_json(self, msg):
self.sent.append(msg)
def getsockopt(self, opt):
if opt == zmq.TYPE:
return self.socket_type
def close(self):
self.closed = True
class FakeReceiver:
"""Base class for main/actuation receivers."""
def __init__(self, socket):
self.socket = socket
self._called = []
def handle_message(self, msg):
self._called.append(msg)
return {"endpoint": "pong", "data": "ok"}
def close(self):
pass
class DummySender:
"""Mock sender to test start methods."""
def __init__(self):
self.called = False
def start_video_rcv(self):
self.called = True
def start(self):
self.called = True
def close(self):
pass
@pytest.fixture
def fake_sockets():
"""Create default fake main and actuation sockets."""
main_sock = FakeSocket(zmq.REP)
act_sock = FakeSocket(zmq.SUB)
return main_sock, act_sock
@pytest.fixture
def fake_poll(monkeypatch):
"""Patch zmq.Poller to simulate a single polling cycle based on socket messages."""
class FakePoller:
def __init__(self):
self.registered = {}
self.used = False
def register(self, socket, flags):
self.registered[socket] = flags
def poll(self, timeout):
# Only return sockets that still have messages
active_socks = {
s: flags
for s, flags
in self.registered.items()
if getattr(s, "messages", [])
}
if active_socks:
return active_socks
# No more messages, exit loop
state.exit_event.set()
return {}
poller_instance = FakePoller()
monkeypatch.setattr(main_mod.zmq, "Poller", lambda: poller_instance)
return poller_instance
@pytest.fixture
def patched_main_components(monkeypatch, fake_sockets, fake_poll):
"""
Fixture to patch main receivers and senders with fakes.
Returns the fake instances for inspection in tests.
"""
main_sock, act_sock = fake_sockets
fake_main = FakeReceiver(main_sock)
fake_act = FakeReceiver(act_sock)
video_sender = DummySender()
audio_sender = DummySender()
monkeypatch.setattr(main_mod, "MainReceiver", lambda ctx: fake_main)
monkeypatch.setattr(main_mod, "ActuationReceiver", lambda ctx: fake_act)
monkeypatch.setattr(main_mod, "VideoSender", lambda ctx: video_sender)
monkeypatch.setattr(main_mod, "AudioSender", lambda ctx: audio_sender)
# Register sockets for the fake poller
fake_poll.registered = {main_sock: zmq.POLLIN, act_sock: zmq.POLLIN}
return fake_main, fake_act, video_sender, audio_sender
def test_main_loop_rep_response(patched_main_components):
"""REP socket returns proper response and handlers are called."""
state.initialize()
fake_main, fake_act, video_sender, audio_sender = patched_main_components
fake_main.socket.messages = [{"endpoint": "ping", "data": "x"}]
fake_act.socket.messages = [{"endpoint": "actuate/speech", "data": "hello"}]
main_mod.main_loop(object())
assert fake_main.socket.sent == [{"endpoint": "pong", "data": "ok"}]
assert fake_main._called
assert fake_act._called
assert video_sender.called
assert audio_sender.called
state.deinitialize()
@pytest.mark.parametrize(
"messages",
[
[{"no_endpoint": True}], # Invalid dict
[["not", "a", "dict"]] # Non-dict message
]
)
def test_main_loop_invalid_or_non_dict_message(patched_main_components, messages):
"""Invalid or non-dict messages are ignored."""
state.initialize()
fake_main, _, _, _ = patched_main_components
fake_main.socket.messages = messages
main_mod.main_loop(object())
assert fake_main.socket.sent == []
state.deinitialize()
def test_main_loop_handler_returns_none(patched_main_components, monkeypatch):
"""Handler returning None still triggers send_json(None)."""
state.initialize()
fake_main, _, _, _ = patched_main_components
class NoneHandler(FakeReceiver):
def handle_message(self, msg):
self._called.append(msg)
return None
monkeypatch.setattr(main_mod, "MainReceiver", lambda ctx: NoneHandler(fake_main.socket))
fake_main.socket.messages = [{"endpoint": "some", "data": None}]
main_mod.main_loop(object())
assert fake_main.socket.sent == [None]
state.deinitialize()
def test_main_loop_overtime_callback(patched_main_components, monkeypatch):
"""TimeBlock callback is triggered if handler takes too long."""
state.initialize()
fake_main, _, _, _ = patched_main_components
fake_main.socket.messages = [{"endpoint": "ping", "data": "x"}]
class FakeTimeBlock:
def __init__(self, callback, limit_ms):
self.callback = callback
def __enter__(self):
return self
def __exit__(self, *a):
self.callback(999.0)
monkeypatch.setattr(main_mod, "TimeBlock", FakeTimeBlock)
main_mod.main_loop(object())
assert fake_main.socket.sent == [{"endpoint": "pong", "data": "ok"}]
state.deinitialize()
def test_main_keyboard_interrupt(monkeypatch):
"""main() handles KeyboardInterrupt and cleans up."""
called = {"deinitialized": False, "term_called": False}
class FakeContext:
def term(self): called["term_called"] = True
monkeypatch.setattr(main_mod.zmq, "Context", lambda: FakeContext())
def raise_keyboard_interrupt(*_):
raise KeyboardInterrupt()
monkeypatch.setattr(main_mod, "main_loop", raise_keyboard_interrupt)
def fake_initialize():
state.is_initialized = True
state.exit_event = threading.Event()
def fake_deinitialize():
called["deinitialized"] = True
state.is_initialized = False
monkeypatch.setattr(main_mod.state, "initialize", fake_initialize)
monkeypatch.setattr(main_mod.state, "deinitialize", fake_deinitialize)
main_mod.main()
assert called["term_called"] is True
assert called["deinitialized"] is True

View File

@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock
import pytest
import zmq
from robot_interface.endpoints.main_receiver import MainReceiver
@pytest.fixture
def zmq_context():
"""
A pytest fixture that creates and yields a ZMQ context.
:return: An initialized ZeroMQ context.
:rtype: zmq.Context
"""
context = zmq.Context()
yield context
def test_handle_ping(zmq_context):
"""
Tests the receiver's ability to handle the "ping" endpoint with data.
"""
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({"endpoint": "ping", "data": "pong"})
assert "endpoint" in response
assert response["endpoint"] == "ping"
assert "data" in response
assert response["data"] == "pong"
def test_handle_ping_none(zmq_context):
"""
Tests the receiver's ability to handle the ping endpoint when the
data field is explicitly set to None.
"""
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({"endpoint": "ping", "data": None})
assert "endpoint" in response
assert response["endpoint"] == "ping"
assert "data" in response
assert response["data"] == None
@mock.patch("robot_interface.endpoints.main_receiver.state")
def test_handle_negotiate_ports(mock_state, zmq_context):
"""
Tests the handling of the "negotiate/ports" endpoint.
"""
receiver = MainReceiver(zmq_context)
mock_state.sockets = [receiver]
response = receiver.handle_message({"endpoint": "negotiate/ports", "data": None})
assert "endpoint" in response
assert response["endpoint"] == "negotiate/ports"
assert "data" in response
assert isinstance(response["data"], list)
for port in response["data"]:
assert "id" in port
assert isinstance(port["id"], str)
assert "port" in port
assert isinstance(port["port"], int)
assert "bind" in port
assert isinstance(port["bind"], bool)
assert any(port["id"] == "main" for port in response["data"])
def test_handle_unimplemented_endpoint(zmq_context):
"""
Tests that the receiver correctly handles a request to a completely
unknown or non-existent endpoint.
"""
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({
"endpoint": "some_endpoint_that_definitely_does_not_exist",
"data": None,
})
assert "endpoint" in response
assert response["endpoint"] == "error"
assert "data" in response
assert isinstance(response["data"], str)
def test_handle_unimplemented_negotiation_endpoint(zmq_context):
"""
Tests handling a request to an unknown sub-endpoint within a known
group
The expected behavior is to return a specific "negotiate/error" response
with a descriptive error string.
"""
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({
"endpoint": "negotiate/but_some_subpath_that_definitely_does_not_exist",
"data": None,
})
assert "endpoint" in response
assert response["endpoint"] == "negotiate/error"
assert "data" in response
assert isinstance(response["data"], str)

View File

@@ -0,0 +1,138 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock
import pytest
from common.microphone_utils import MicrophoneUtils
from robot_interface.utils.microphone import choose_mic_default, choose_mic_interactive
class MockPyAudio:
"""
A mock implementation of the PyAudio library class, designed for testing
microphone utility functions without requiring actual audio hardware.
It provides fake devices, including one input microphone, and implements
the core PyAudio methods required for device enumeration.
:ivar devices: A list of dictionaries representing mock audio devices.
:vartype devices: List[Dict[str, Any]]
"""
def __init__(self):
# You can predefine fake device info here
self.devices = [
{
"index": 0,
"name": u"Someones Microphone", # Using a Unicode character
"maxInputChannels": 2,
"maxOutputChannels": 0,
"defaultSampleRate": 44100.0,
"defaultLowInputLatency": 0.01,
"defaultLowOutputLatency": 0.01,
"defaultHighInputLatency": 0.1,
"defaultHighOutputLatency": 0.1,
"hostApi": 0,
},
{
"index": 1,
"name": u"Mock Speaker 1",
"maxInputChannels": 0,
"maxOutputChannels": 2,
"defaultSampleRate": 48000.0,
"defaultLowInputLatency": 0.01,
"defaultLowOutputLatency": 0.01,
"defaultHighInputLatency": 0.1,
"defaultHighOutputLatency": 0.1,
"hostApi": 0,
},
]
def get_device_count(self):
"""
Returns the number of available mock devices.
:return: The total number of devices in the mock list.
:rtype: int
"""
return len(self.devices)
def get_device_info_by_index(self, index):
"""
Returns information for a given mock device index.
:param index: The index of the device to retrieve.
:type index: int
:return: A dictionary containing device information.
:rtype: Dict[str, Any]
"""
if 0 <= index < len(self.devices):
return self.devices[index]
else:
raise IOError("Invalid device index: {}".format(index))
def get_default_input_device_info(self):
"""
Returns information for the default mock input device.
:return: A dictionary containing the default input device information.
:rtype: Dict[str, Any]
"""
for device in self.devices:
if device.get("maxInputChannels", 0) > 0:
return device
raise IOError("No default input device found")
@pytest.fixture
def pyaudio_instance():
"""
A pytest fixture that returns an instance of the `MockPyAudio` class.
:return: An initialized instance of the mock PyAudio class.
:rtype: MockPyAudio
"""
return MockPyAudio()
def _raise_io_error():
"""
Helper function used to mock PyAudio methods that are expected to fail
when no device is available.
"""
raise IOError()
class TestAudioUnit(MicrophoneUtils):
"""
Runs the shared microphone behavior tests defined in `MicrophoneUtils` using
the mock PyAudio implementation.
"""
def test_choose_mic_default_no_mic(self):
"""
Tests `choose_mic_default` when no microphones are available.
"""
mock_pyaudio = mock.Mock()
mock_pyaudio.get_device_count = mock.Mock(return_value=0L)
mock_pyaudio.get_default_input_device_info = _raise_io_error
result = choose_mic_default(mock_pyaudio)
assert result is None
def test_choose_mic_interactive_no_mic(self):
"""
Tests `choose_mic_interactive` when no microphones are available.
"""
mock_pyaudio = mock.Mock()
mock_pyaudio.get_device_count = mock.Mock(return_value=0L)
mock_pyaudio.get_default_input_device_info = _raise_io_error
result = choose_mic_interactive(mock_pyaudio)
assert result is None

View File

@@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import sys
# Import module under test
import robot_interface.utils.qi_utils as qi_utils
def reload_qi_utils_with(qi_module):
"""
Helper: reload qi_utils after injecting a fake qi module.
Python 2 uses built-in reload().
Just changing sys.modules[qi] won't affect the already imported module.
"""
if qi_module is None:
if "qi" in sys.modules:
del sys.modules["qi"]
else:
sys.modules["qi"] = qi_module
# Python 2 reload
global qi_utils
qi_utils = reload(qi_utils)
def test_get_qi_session_no_qi_module():
"""
Tests the 'qi is None' path.
"""
reload_qi_utils_with(None)
session = qi_utils.get_qi_session()
assert session is None
def test_get_qi_session_no_qi_url_argument(monkeypatch):
"""
Tests the '--qi-url not in sys.argv' path.
"""
class FakeQi:
pass
reload_qi_utils_with(FakeQi())
monkeypatch.setattr(sys, "argv", ["pytest"])
session = qi_utils.get_qi_session()
assert session is None
def test_get_qi_session_runtime_error(monkeypatch):
"""
Tests the 'exept RuntineError' path.
"""
class FakeApp:
def start(self):
raise RuntimeError("boom")
class FakeQi:
Application = lambda self=None: FakeApp()
reload_qi_utils_with(FakeQi())
monkeypatch.setattr(sys, "argv", ["pytest", "--qi-url", "tcp://localhost"])
session = qi_utils.get_qi_session()
assert session is None
def test_get_qi_session_success(monkeypatch):
"""
Tests a valid path.
"""
class FakeSession:
pass
class FakeApp:
def __init__(self):
self.session = FakeSession()
def start(self):
return True
class FakeQi:
Application = lambda self=None: FakeApp()
reload_qi_utils_with(FakeQi())
monkeypatch.setattr(sys, "argv", ["pytest", "--qi-url", "tcp://localhost"])
session = qi_utils.get_qi_session()
assert isinstance(session, FakeSession)

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pytest
from robot_interface.endpoints.receiver_base import ReceiverBase
def test_receiver_base_not_implemented(monkeypatch):
"""
Ensure that the base ReceiverBase raises NotImplementedError when
handle_message is called on a subclass that does not implement it.
"""
# Patch the __abstractmethods__ to allow instantiation
monkeypatch.setattr(ReceiverBase, "__abstractmethods__", frozenset())
class DummyReceiver(ReceiverBase):
pass
dummy = DummyReceiver("dummy") # Can now instantiate
with pytest.raises(NotImplementedError):
dummy.handle_message({"endpoint": "dummy", "data": None})

View File

@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock
import zmq
from robot_interface.endpoints.socket_base import SocketBase
def test_close_covers_both_branches():
"""
Exercise both possible paths inside SocketBase.close():
- when no socket exists (should just return),
- when a socket object is present (its close() method should be called).
"""
sb = SocketBase("x")
# First check the case where socket is None. Nothing should happen here.
sb.close()
# Now simulate a real socket so the close() call is triggered.
fake_socket = mock.Mock()
sb.socket = fake_socket
sb.close()
fake_socket.close.assert_called_once()
def test_create_socket_and_endpoint_description_full_coverage():
"""
Test the less-commonly used branch of create_socket() where bind=False.
This covers:
- the loop that sets socket options,
- the connect() path,
- the logic in endpoint_description() that inverts self.bound.
"""
fake_context = mock.Mock()
fake_socket = mock.Mock()
# The context should hand back our fake socket object.
fake_context.socket.return_value = fake_socket
sb = SocketBase("id")
# Calling create_socket with bind=False forces the connect() code path.
sb.create_socket(
zmq_context=fake_context,
socket_type=zmq.SUB,
port=9999,
options=[(zmq.CONFLATE, 1)], # one option is enough to hit the loop
bind=False,
)
fake_socket.setsockopt.assert_called_once_with(zmq.CONFLATE, 1)
fake_socket.connect.assert_called_once_with("tcp://localhost:9999")
# Check that endpoint_description reflects bound=False -> "bind": True
desc = sb.endpoint_description()
assert desc == {"id": "id", "port": 9999, "bind": True}

115
test/unit/test_state.py Normal file
View File

@@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import threading
import signal
import pytest
import mock
from robot_interface.state import State
def test_initialize_does_not_reinitialize():
"""
Check that calling `initialize` on an already initialized state does not change existing
attributes.
"""
state = State()
# Mock qi_session to avoid real session creation
mock_session = mock.MagicMock()
state.qi_session = mock_session
# Set state as already initialized
state.is_initialized = True
old_exit_event = state.exit_event
# Call initialize
state.initialize()
# Ensure existing attributes were not overwritten
assert state.exit_event == old_exit_event # exit_event should not be recreated
assert state.qi_session == mock_session # qi_session should not be replaced
assert state.is_initialized is True # is_initialized should remain True
def test_deinitialize_behavior():
"""Check that deinitialize closes sockets and updates the initialization state correctly."""
state = State()
# Case 1: Initialized with sockets
state.is_initialized = True
mock_socket_1 = mock.Mock()
mock_socket_2 = mock.Mock()
state.sockets = [mock_socket_1, mock_socket_2]
state.deinitialize()
# Sockets should be closed
mock_socket_1.close.assert_called_once()
mock_socket_2.close.assert_called_once()
# State should be marked as not initialized
assert not state.is_initialized
# Case 2: Not initialized, should not raise
state.is_initialized = False
state.sockets = []
state.deinitialize()
assert not state.is_initialized
def test_access_control_before_initialization():
"""Verify that accessing certain attributes before initialization raises RuntimeError."""
state = State()
with pytest.raises(RuntimeError, match=".*sockets.*"):
_ = state.sockets
with pytest.raises(RuntimeError, match=".*qi_session.*"):
_ = state.qi_session
def test_exit_event_before_initialized_returns_if_set():
"""Check that exit_event can be accessed even if state is not initialized,
but only if it is set."""
state = State()
# Manually create and set the exit_event
object.__setattr__(state, "exit_event", threading.Event())
object.__getattribute__(state, "exit_event").set()
# Should return the event without raising
assert state.exit_event.is_set()
def test_getattribute_allowed_attributes_before_init():
"""Ensure attributes allowed before initialization can be accessed without error."""
state = State()
assert callable(state.initialize)
assert callable(state.deinitialize)
assert state.is_initialized is False
assert state.__dict__ is not None
assert state.__class__.__name__ == "State"
assert state.__doc__ is not None
def test_signal_handler_sets_exit_event(monkeypatch):
"""Ensure SIGINT triggers the exit_event via signal handler."""
state = State()
# Patch get_qi_session to prevent real session creation
monkeypatch.setattr("robot_interface.state.get_qi_session", lambda: "dummy_session")
# Initialize state to set up signal handlers
state.initialize()
# Simulate SIGINT
signal_handler = signal.getsignal(signal.SIGINT)
signal_handler(None, None)
# Exit event should be set
assert state.exit_event.is_set()

View File

@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import time
import mock
from robot_interface.utils.timeblock import TimeBlock
class AnyFloat(object):
"""
A helper class used in tests to assert that a mock function was called
with an argument that is specifically a float, regardless of its value.
It overrides the equality comparison (`__eq__`) to check only the type.
"""
def __eq__(self, other):
return isinstance(other, float)
def test_no_limit():
"""
Tests the scenario where the `TimeBlock` context manager is used without
a time limit.
"""
callback = mock.Mock()
with TimeBlock(callback):
pass
callback.assert_called_once_with(AnyFloat())
def test_exceed_limit():
"""
Tests the scenario where the execution time within the `TimeBlock`
exceeds the provided limit.
"""
callback = mock.Mock()
with TimeBlock(callback, 0):
time.sleep(0.001)
callback.assert_called_once_with(AnyFloat())
def test_within_limit():
"""
Tests the scenario where the execution time within the `TimeBlock`
stays within the provided limit.
"""
callback = mock.Mock()
with TimeBlock(callback, 5):
pass
callback.assert_not_called()

View File

@@ -0,0 +1,132 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import struct
import mock
import pytest
import zmq
from robot_interface.endpoints.video_sender import VideoSender
from robot_interface.state import state
from robot_interface.core.config import settings
@pytest.fixture
def zmq_context():
"""Provide a ZMQ context."""
yield zmq.Context()
def _patch_basics(mocker):
"""Common patches: prevent real threads, port binds, and state errors."""
mocker.patch("robot_interface.endpoints.socket_base.zmq.Socket.bind")
mocker.patch("robot_interface.endpoints.video_sender.threading.Thread")
mocker.patch.object(state, "is_initialized", True)
def _patch_exit_event(mocker):
"""Make exit_event stop the loop after one iteration."""
fake_event = mock.Mock()
fake_event.is_set.side_effect = [False, True]
mocker.patch.object(state, "exit_event", fake_event)
def test_no_qi_session(zmq_context, mocker):
"""Video loop should not start without a qi_session."""
_patch_basics(mocker)
mocker.patch.object(state, "qi_session", None)
sender = VideoSender(zmq_context)
sender.start_video_rcv()
assert not hasattr(sender, "thread")
def test_video_streaming(zmq_context, mocker):
"""VideoSender should send retrieved image data."""
_patch_basics(mocker)
_patch_exit_event(mocker)
# Pepper's image buffer lives at index 6
mocker.patch.object(settings.video_config, "image_buffer", 6)
test_width = 320
test_height = 240
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.return_value = [test_width, test_height, None, None, None, None, b"fake_img"]
fake_session = mock.Mock()
fake_session.service.return_value = mock_video_service
mocker.patch.object(state, "qi_session", fake_session)
mocker.patch.object(
fake_session.service("ALVideoDevice"),
"subscribeCamera",
return_value="stream_name"
)
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send_multipart = send_socket
sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_called_with([
struct.pack('<I', 320),
struct.pack('<I', 240),
b"fake_img"
])
def test_video_receive_error(zmq_context, mocker):
"""Errors retrieving images should not call send()."""
_patch_basics(mocker)
_patch_exit_event(mocker)
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.side_effect = Exception("boom")
fake_session = mock.Mock()
fake_session.service.return_value = mock_video_service
mocker.patch.object(state, "qi_session", fake_session)
mocker.patch.object(
fake_session.service("ALVideoDevice"),
"subscribeCamera",
return_value="stream_name"
)
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send_multipart = send_socket
sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_not_called()
def test_video_loop_keyboard_interrupt(zmq_context, mocker):
"""Video loop should handle KeyboardInterrupt gracefully and unsubscribe."""
_patch_basics(mocker)
_patch_exit_event(mocker)
# We mock the video service to raise KeyboardInterrupt when accessed
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.side_effect = KeyboardInterrupt
# Mock logging to verify the specific interrupt message is logged
mock_logger = mocker.patch("robot_interface.endpoints.video_sender.logging")
sender = VideoSender(zmq_context)
# Execute the loop
sender.video_rcv_loop(mock_video_service, "stream_name")
# Verify the 'finally' block executed (unsubscribe)
mock_video_service.unsubscribe.assert_called_with("stream_name")
mock_logger.info.assert_any_call("Unsubscribed from video stream.")