65 Commits

Author SHA1 Message Date
JobvAlewijk
e129113c9e chore: forgot copyright 2026-01-29 17:53:57 +01:00
JobvAlewijk
18a4bde4ca test: added tests and docstrings
ref: N25B-397
2026-01-29 17:51:39 +01:00
Twirre Meulenbelt
815fc7bcde feat: publish face detection instead of req/res
ref: N25B-395
2026-01-29 17:18:39 +01:00
JobvAlewijk
3bbe97579d Merge branch 'main' of ssh://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-ri into feat/face-detection 2026-01-29 16:22:42 +01:00
ad58b16559 Merge branch 'dev' into 'main'
Merge dev with main

See merge request ics/sp/2025/n25b/pepperplus-ri!27
2026-01-28 10:54:22 +00:00
fb0d7850cc Merge branch 'main' into dev 2026-01-28 11:53:23 +01:00
JobvAlewijk
4afceccf46 feat: fixed connection !!! 2026-01-19 16:58:01 +01:00
JobvAlewijk
83099a2810 chore: modified into req reply socket on 5559 2026-01-17 14:01:32 +01:00
JobvAlewijk
4e9afbaaf5 Merge branch 'dev' of ssh://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-ri into feat/face-detection 2026-01-16 16:48:51 +01:00
Pim Hutting
da97eb8a1a Merge branch 'feat/robot-speech-agent-force-speech' into 'dev'
feat: implemented forced speech and speech queue

See merge request ics/sp/2025/n25b/pepperplus-ri!23
2026-01-14 14:26:39 +00:00
Luijkx,S.O.H. (Storm)
e51cf8fe65 feat: implemented forced speech and speech queue 2026-01-14 14:26:38 +00:00
JobvAlewijk
49386ef8cd feat: communicate face to CB
Had to do some weird socket stuff

ref: N25B-397
2026-01-12 14:25:10 +01:00
JobvAlewijk
3b470c8f29 feat: fully working face detection
ref: N25B-397
2026-01-07 17:56:21 +01:00
JobvAlewijk
b8f71f6bee feat: base face detection
ref: N25B-397
2026-01-04 18:56:04 +01:00
Twirre
1e77548622 Merge branch 'feat/ri-gestures' into 'dev'
feat: gestures to ri

See merge request ics/sp/2025/n25b/pepperplus-ri!21
2025-12-16 08:35:26 +00:00
JobvAlewijk
a8fe887c48 feat: gestures to ri 2025-12-16 08:35:26 +00:00
JobvAlewijk
df702f1e44 Merge branch 'feat/environment-variables' into 'dev'
Add environment variables

See merge request ics/sp/2025/n25b/pepperplus-ri!22
2025-12-13 13:43:52 +00:00
JobvAlewijk
a2cb2ae90a Merge branch 'dev' of ssh://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-ri into feat/environment-variables 2025-12-13 14:43:02 +01:00
Luijkx,S.O.H. (Storm)
017dbfaa28 Merge branch 'docs/extract-installation-instructions' into 'dev'
Improve installation instructions

See merge request ics/sp/2025/n25b/pepperplus-ri!20
2025-12-11 10:58:56 +00:00
Twirre
9ff1d9a4d3 Improve installation instructions 2025-12-11 10:58:56 +00:00
Twirre Meulenbelt
3a259c1170 feat: add environment variables and docs
ref: N25B-352
2025-12-10 13:28:13 +01:00
JobvAlewijk
c86eda497c Merge branch 'feat/ci-cd' into 'dev'
Introduce CI/CD with tests

See merge request ics/sp/2025/n25b/pepperplus-ri!19
2025-12-03 15:23:37 +00:00
Twirre Meulenbelt
94b92b3e4a feat: re-introduce git hooks
Now using the standardized method from the CB.

ref: N25B-367
2025-12-02 22:04:46 +01:00
Twirre Meulenbelt
f469e4ce36 fix: install in a .venv artifact
This artifact can be reused in different stages.

ref: N25B-367
2025-12-02 21:46:24 +01:00
Twirre Meulenbelt
28a556becd feat: introduce CI/CD with tests
Using a custom base image installed on the runner, the installation and tests should work (fast).

ref: N25B-367
2025-12-02 21:12:15 +01:00
Twirre
89c9f2ebea Merge branch 'test/video-sender' into 'dev'
test: added full video sender coverage tests

See merge request ics/sp/2025/n25b/pepperplus-ri!18
2025-11-24 20:41:08 +00:00
JobvAlewijk
96f328d56c test: added full video sender coverage tests 2025-11-24 20:41:08 +00:00
Twirre
4d634a3b4e Merge branch 'test/main-start' into 'dev'
test: added main tests

See merge request ics/sp/2025/n25b/pepperplus-ri!17
2025-11-24 20:37:59 +00:00
JobvAlewijk
e2a71ad6c2 test: added main tests 2025-11-24 20:37:59 +00:00
2fcd885a00 Merge branch 'test/state' into 'dev'
test: added tests for full state coverage

See merge request ics/sp/2025/n25b/pepperplus-ri!16
2025-11-24 20:24:19 +00:00
JobvAlewijk
336acac440 test: added tests for full state coverage 2025-11-24 20:24:19 +00:00
Twirre
f4fbc69c7f Merge branch 'test/reciever-base' into 'dev'
test: added not overridden reciever base test

See merge request ics/sp/2025/n25b/pepperplus-ri!15
2025-11-24 20:06:58 +00:00
JobvAlewijk
fbe8f59c38 test: added not overridden reciever base test 2025-11-24 20:06:58 +00:00
Twirre
e99d7e8557 Merge branch 'test/audio-sender' into 'dev'
test: added init failure test in audio sender

See merge request ics/sp/2025/n25b/pepperplus-ri!14
2025-11-24 20:05:10 +00:00
JobvAlewijk
2350f6eec7 test: added init failure test in audio sender 2025-11-24 20:05:10 +00:00
Twirre
2852b714f5 Merge branch 'test/qi-utils' into 'dev'
test: added qi_utils test

See merge request ics/sp/2025/n25b/pepperplus-ri!12
2025-11-24 20:02:28 +00:00
JobvAlewijk
7628e47478 test: added qi_utils test 2025-11-24 20:02:28 +00:00
Twirre
36f5fae45c Merge branch 'test/socket-base' into 'dev'
test: added socket base tests

See merge request ics/sp/2025/n25b/pepperplus-ri!13
2025-11-24 13:32:31 +00:00
JobvAlewijk
6ea870623b test: added socket base tests 2025-11-24 13:32:31 +00:00
8d6dd23acb Merge branch 'chore/add-documentation' into 'dev'
chore: add documentation RI

See merge request ics/sp/2025/n25b/pepperplus-ri!11
2025-11-22 19:14:51 +00:00
Twirre Meulenbelt
a53871360e docs: remove duplicate and double space
ref: N25B-298
2025-11-22 19:32:50 +01:00
Pim Hutting
c1e92feba7 Apply 1 suggestion(s) to 1 file(s)
Co-authored-by: Kasper Marinus <k.marinus@students.uu.nl>
2025-11-22 12:37:39 +00:00
Pim Hutting
6859451bf9 Apply 1 suggestion(s) to 1 file(s)
Co-authored-by: Twirre <s.a.meulenbelt@students.uu.nl>
2025-11-22 12:36:34 +00:00
Twirre Meulenbelt
64c6f0addb docs: make doc generator understand multi line
ref: N25B-298
2025-11-22 12:44:13 +01:00
Pim Hutting
c53307530b chore: applied all feedback
close: N25B-298
2025-11-22 11:45:32 +01:00
Pim Hutting
051f904576 chore: add documentation RI
Code functionality left unchanged, only added docs where missing

close: N25B-298
2025-11-21 16:35:40 +01:00
Twirre
1e3531ac6e Merge branch 'docs/gen_documentation' into 'dev'
docs: added auto-generation of documentation

See merge request ics/sp/2025/n25b/pepperplus-ri!10
2025-11-19 17:14:36 +00:00
Storm
cec29f6206 chore: updated .gitignore
ref: N25B-270
2025-11-19 18:10:18 +01:00
Storm
a0a8ad2689 docs: changed readme
ref: N25B-270
2025-11-19 17:59:37 +01:00
JobvAlewijk
1c9467d03a fix: conf includes correct path
ref: N25B-270
2025-11-19 17:57:24 +01:00
Storm
9dd39d2048 docs: added auto-generation of documentation
ref: N25B-270
2025-11-19 13:49:50 +01:00
Twirre
b05aa5e834 Merge branch 'refactor/config-file' into 'dev'
refactor: added config file and moved constants

See merge request ics/sp/2025/n25b/pepperplus-ri!9
2025-11-14 14:15:06 +00:00
Twirre Meulenbelt
c691e279cd style: two lines between top level declarations
ref: N25B-236
2025-11-14 15:13:48 +01:00
Pim Hutting
16b64e41c8 style: applied style suggestions
close: N25B-236
2025-11-14 14:12:14 +00:00
Twirre Meulenbelt
03519e2a16 test: fix microphone interactive test
This was created with the assumption that all devices were choosable, but now only ones with input channels are.

ref: N25B-119
2025-11-14 13:08:31 +01:00
Pim Hutting
643d7b919c fix: made all tests pass
before some tests failed because of a faulty edit
to microphone util

ref: N25B-236
2025-11-09 16:00:36 +01:00
Pim Hutting
4402b21a73 refactor: added config file and moved constants
- Moved hardcoded configuration constants to a dedicated config.py file.
- Created VideoConfig, AudioConfig, MainConfig, and Settings classes in config.py

ref: N25B-236
2025-11-09 15:43:22 +01:00
Pim Hutting
c037eb7ec2 Merge branch 'feat/stream-audio' into 'dev'
Implement audio streaming

See merge request ics/sp/2025/n25b/pepperplus-ri!8
2025-11-05 12:08:28 +00:00
Twirre Meulenbelt
8a095323ec docs: describe extra WSL installation step
ref: N25B-119
2025-11-02 16:35:15 +01:00
Twirre Meulenbelt
854a14bf0c docs: describe --microphone program parameter
ref: N25B-119
2025-11-02 16:16:43 +01:00
Twirre Meulenbelt
fab5127cac feat: add application parameter to choose a custom microphone
ref: N25B-119
2025-11-02 16:12:56 +01:00
Twirre Meulenbelt
5912ac606a docs: add installation instructions for the portaudio dependency
ref: N25B-119
2025-11-02 15:01:18 +01:00
Twirre Meulenbelt
9ea446275e fix: allow speaking text with Unicode characters
When speaking, the actuation receiver logs the message to speak. If the message includes Unicode characters, it will now no longer crash.

ref: N25B-119
2025-11-02 14:59:16 +01:00
Twirre Meulenbelt
a6a12a5886 fix: remove unused qi import
It had already been made so that the VideoSender does not depend on `qi`, but the import was not yet removed.

ref: N25B-119
2025-11-02 14:58:32 +01:00
aad2044b6e chore: add .gitignore 2025-09-27 17:58:12 +02:00
48 changed files with 3429 additions and 264 deletions

25
.env.example Normal file
View File

@@ -0,0 +1,25 @@
# Example .env file. To use, make a copy, call it ".env" (i.e. removing the ".example" suffix), then you edit values.
# To make a variable apply, uncomment it (remove the "#" in front of the line).
# First, some variables that are likely to be configured:
# The hostname or IP address of the Control Backend.
AGENT__CONTROL_BACKEND_HOST=localhost
# Variables that are unlikely to be configured, you can probably ignore these:
#AGENT__ACTUATION_RECEIVER_PORT=
#AGENT__MAIN_RECEIVER_PORT=
#AGENT__VIDEO_SENDER_PORT=
#AGENT__AUDIO_SENDER_PORT=
#VIDEO__CAMERA_INDEX=
#VIDEO__RESOLUTION=
#VIDEO__COLOR_SPACE=
#VIDEO__FPS=
#VIDEO__STREAM_NAME=
#VIDEO__IMAGE_BUFFER=
#AUDIO__SAMPLE_RATE=
#AUDIO__CHUNK_SIZE=
#AUDIO__CHANNELS=

77
.githooks/check-branch-name.sh Executable file
View File

@@ -0,0 +1,77 @@
#!/usr/bin/env bash
# This script checks if the current branch name follows the specified format.
# It's designed to be used as a 'pre-commit' git hook.
# Format: <type>/<short-description>
# Example: feat/add-user-login
# --- Configuration ---
# An array of allowed commit types
ALLOWED_TYPES=(feat fix refactor perf style test docs build chore revert)
# An array of branches to ignore
IGNORED_BRANCHES=(main dev demo)
# --- Colors for Output ---
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# --- Helper Functions ---
error_exit() {
echo -e "${RED}ERROR: $1${NC}" >&2
echo -e "${YELLOW}Branch name format is incorrect. Aborting commit.${NC}" >&2
exit 1
}
# --- Main Logic ---
# 1. Get the current branch name
BRANCH_NAME=$(git symbolic-ref --short HEAD)
# 2. Check if the current branch is in the ignored list
for ignored_branch in "${IGNORED_BRANCHES[@]}"; do
if [ "$BRANCH_NAME" == "$ignored_branch" ]; then
echo -e "${GREEN}Branch check skipped for default branch: $BRANCH_NAME${NC}"
exit 0
fi
done
# 3. Validate the overall structure: <type>/<description>
if ! [[ "$BRANCH_NAME" =~ ^[a-z]+/.+$ ]]; then
error_exit "Branch name must be in the format: <type>/<short-description>\nExample: feat/add-user-login"
fi
# 4. Extract the type and description
TYPE=$(echo "$BRANCH_NAME" | cut -d'/' -f1)
DESCRIPTION=$(echo "$BRANCH_NAME" | cut -d'/' -f2-)
# 5. Validate the <type>
type_valid=false
for allowed_type in "${ALLOWED_TYPES[@]}"; do
if [ "$TYPE" == "$allowed_type" ]; then
type_valid=true
break
fi
done
if [ "$type_valid" == false ]; then
error_exit "Invalid type '$TYPE'.\nAllowed types are: ${ALLOWED_TYPES[*]}"
fi
# 6. Validate the <short-description>
# Regex breakdown:
# ^[a-z0-9]+ - Starts with one or more lowercase letters/numbers (the first word).
# (-[a-z0-9]+){0,5} - Followed by a group of (dash + word) 0 to 5 times.
# $ - End of the string.
# This entire pattern enforces 1 to 6 words total, separated by dashes.
DESCRIPTION_REGEX="^[a-z0-9]+(-[a-z0-9]+){0,5}$"
if ! [[ "$DESCRIPTION" =~ $DESCRIPTION_REGEX ]]; then
error_exit "Invalid short description '$DESCRIPTION'.\nIt must be a maximum of 6 words, all lowercase, separated by dashes.\nExample: add-new-user-authentication-feature"
fi
# If all checks pass, exit successfully
echo -e "${GREEN}Branch name '$BRANCH_NAME' is valid.${NC}"
exit 0

135
.githooks/check-commit-msg.sh Executable file
View File

@@ -0,0 +1,135 @@
#!/usr/bin/env bash
# This script checks if a commit message follows the specified format.
# It's designed to be used as a 'commit-msg' git hook.
# Format:
# <type>: <short description>
#
# [optional]<body>
#
# [ref/close]: <issue identifier>
# --- Configuration ---
# An array of allowed commit types
ALLOWED_TYPES=(feat fix refactor perf style test docs build chore revert)
# --- Colors for Output ---
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# The first argument to the hook is the path to the file containing the commit message
COMMIT_MSG_FILE=$1
# --- Automated Commit Detection ---
# Read the first line (header) for initial checks
HEADER=$(head -n 1 "$COMMIT_MSG_FILE")
# Check for Merge commits (covers 'git merge' and PR merges from GitHub/GitLab)
# Examples: "Merge branch 'main' into ...", "Merge pull request #123 from ..."
MERGE_PATTERN="^Merge (remote-tracking )?(branch|pull request|tag) .*"
if [[ "$HEADER" =~ $MERGE_PATTERN ]]; then
echo -e "${GREEN}Merge commit detected by message content. Skipping validation.${NC}"
exit 0
fi
# Check for Revert commits
# Example: "Revert "feat: add new feature""
REVERT_PATTERN="^Revert \".*\""
if [[ "$HEADER" =~ $REVERT_PATTERN ]]; then
echo -e "${GREEN}Revert commit detected by message content. Skipping validation.${NC}"
exit 0
fi
# Check for Cherry-pick commits (this pattern appears at the end of the message)
# Example: "(cherry picked from commit deadbeef...)"
# We use grep -q to search the whole file quietly.
CHERRY_PICK_PATTERN="\(cherry picked from commit [a-f0-9]{7,40}\)"
if grep -qE "$CHERRY_PICK_PATTERN" "$COMMIT_MSG_FILE"; then
echo -e "${GREEN}Cherry-pick detected by message content. Skipping validation.${NC}"
exit 0
fi
# Check for Squash
# Example: "Squash commits ..."
SQUASH_PATTERN="^Squash .+"
if [[ "$HEADER" =~ $SQUASH_PATTERN ]]; then
echo -e "${GREEN}Squash commit detected by message content. Skipping validation.${NC}"
exit 0
fi
# --- Validation Functions ---
# Function to print an error message and exit
# Usage: error_exit "Your error message here"
error_exit() {
# >&2 redirects echo to stderr
echo -e "${RED}ERROR: $1${NC}" >&2
echo -e "${YELLOW}Commit message format is incorrect. Aborting commit.${NC}" >&2
exit 1
}
# --- Main Logic ---
# 1. Read the header (first line) of the commit message
HEADER=$(head -n 1 "$COMMIT_MSG_FILE")
# 2. Validate the header format: <type>: <description>
# Regex breakdown:
# ^(type1|type2|...) - Starts with one of the allowed types
# : - Followed by a literal colon
# \s - Followed by a single space
# .+ - Followed by one or more characters for the description
# $ - End of the line
TYPES_REGEX=$(
IFS="|"
echo "${ALLOWED_TYPES[*]}"
)
HEADER_REGEX="^($TYPES_REGEX): .+$"
if ! [[ "$HEADER" =~ $HEADER_REGEX ]]; then
error_exit "Invalid header format.\n\nHeader must be in the format: <type>: <short description>\nAllowed types: ${ALLOWED_TYPES[*]}\nExample: feat: add new user authentication feature"
fi
# Only validate footer if commit type is not chore
TYPE=$(echo "$HEADER" | cut -d':' -f1)
if [ "$TYPE" != "chore" ]; then
# 3. Validate the footer (last line) of the commit message
FOOTER=$(tail -n 1 "$COMMIT_MSG_FILE")
# Regex breakdown:
# ^(ref|close) - Starts with 'ref' or 'close'
# : - Followed by a literal colon
# \s - Followed by a single space
# N25B- - Followed by the literal string 'N25B-'
# [0-9]+ - Followed by one or more digits
# $ - End of the line
FOOTER_REGEX="^(ref|close): N25B-[0-9]+$"
if ! [[ "$FOOTER" =~ $FOOTER_REGEX ]]; then
error_exit "Invalid footer format.\n\nFooter must be in the format: [ref/close]: <issue identifier>\nExample: ref: N25B-123"
fi
fi
# 4. If the message has more than 2 lines, validate the separator
# A blank line must exist between the header and the body.
LINE_COUNT=$(wc -l <"$COMMIT_MSG_FILE" | xargs) # xargs trims whitespace
# We only care if there is a body. Header + Footer = 2 lines.
# Header + Blank Line + Body... + Footer > 2 lines.
if [ "$LINE_COUNT" -gt 2 ]; then
# Get the second line
SECOND_LINE=$(sed -n '2p' "$COMMIT_MSG_FILE")
# Check if the second line is NOT empty. If it's not, it's an error.
if [ -n "$SECOND_LINE" ]; then
error_exit "Missing blank line between header and body.\n\nThe second line of your commit message must be empty if a body is present."
fi
fi
# If all checks pass, exit with success
echo -e "${GREEN}Commit message is valid.${NC}"
exit 0

View File

@@ -1,16 +0,0 @@
#!/bin/sh
commit_msg_file=$1
commit_msg=$(cat "$commit_msg_file")
if echo "$commit_msg" | grep -Eq "^(feat|fix|refactor|perf|style|test|docs|build|chore|revert): .+"; then
if echo "$commit_msg" | grep -Eq "^(ref|close):\sN25B-.+"; then
exit 0
else
echo "❌ Commit message invalid! Must end with [ref/close]: N25B-000"
exit 1
fi
else
echo "❌ Commit message invalid! Must start with <type>: <description>"
exit 1
fi

View File

@@ -1,17 +0,0 @@
#!/bin/sh
# Get current branch
branch=$(git rev-parse --abbrev-ref HEAD)
if echo "$branch" | grep -Eq "(dev|main)"; then
echo 0
fi
# allowed pattern <type/>
if echo "$branch" | grep -Eq "^(feat|fix|refactor|perf|style|test|docs|build|chore|revert)\/\w+(-\w+){0,5}$"; then
exit 0
else
echo "❌ Invalid branch name: $branch"
echo "Branch must be named <type>/<description-of-branch> (must have one to six words separated by a dash)"
exit 1
fi

View File

@@ -1,9 +0,0 @@
#!/bin/sh
echo "#<type>: <description>
#[optional body]
#[optional footer(s)]
#[ref/close]: <issue identifier>" > $1

5
.gitignore vendored
View File

@@ -217,3 +217,8 @@ __marimo__/
.DS_Store
# Docs
docs/*
!docs/conf.py
!docs/installation/
!docs/installation/**

42
.gitlab-ci.yml Normal file
View File

@@ -0,0 +1,42 @@
# ---------- GLOBAL SETUP ---------- #
workflow:
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
stages:
- install
- test
default:
image: qi-py-ri-base:latest
cache:
key: "${CI_COMMIT_REF_SLUG}"
paths:
- .venv/
policy: pull-push
# --------- INSTALLING --------- #
install:
stage: install
tags:
- install
script:
- python -m virtualenv .venv
- source .venv/bin/activate
- echo /qi/pynaoqi-python2.7-2.5.7.1-linux64/lib/python2.7/site-packages/ > .venv/lib/python2.7/site-packages/pynaoqi-python2.7.pth
- pip install -r requirements.txt
artifacts:
paths:
- .venv/
expire_in: 1h
# ---------- TESTING ---------- #
test:
stage: test
needs:
- install
tags:
- test
script:
- source .venv/bin/activate
- PYTHONPATH=src pytest test/

15
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,15 @@
repos:
- repo: local
hooks:
- id: check-commit-msg
name: Check commit message format
entry: .githooks/check-commit-msg.sh
language: script
stages: [commit-msg]
- id: check-branch-name
name: Check branch name format
entry: .githooks/check-branch-name.sh
language: script
stages: [commit]
always_run: true
pass_filenames: false

118
README.md
View File

@@ -8,78 +8,21 @@ This is an implementation for the Pepper robot, using the Pepper SDK and Python
## Installation
### Linux (or WSL)
- [Linux](./docs/installation/linux.md)
- [macOS](./docs/installation/macos.md)
- [Windows](./docs/installation/windows.md)
Start off by installing [Pyenv](https://github.com/pyenv/pyenv?tab=readme-ov-file#installation) and walk through the steps outlined there (be sure to also add it to PATH). Also install the [Python build requirements](https://github.com/pyenv/pyenv/wiki#suggested-build-environment). Afterwards, install Python 2.7 and activate it for your current shell:
### Git Hooks
To activate automatic linting, formatting, branch name checks and commit message checks, run (after installing requirements):
```bash
pyenv install 2.7
pyenv shell 2.7
pre-commit install
pre-commit install --hook-type commit-msg
```
You can check that this worked by typing
```bash
python -V
```
Which should return `Python 2.7.18`.
Next, `cd` into this repository and create (and activate) a virtual environment:
```bash
cd <path to project>/
python -m pip install virtualenv
python -m virtualenv .venv
source .venv/bin/activate
```
Install the required packages with
```bash
pip install -r requirements.txt
```
Now we need to install the NaoQi SDK into our virtual environment, which we need to do manually. Begin by downloading the SDK:
```bash
wget https://community-static.aldebaran.com/resources/2.5.10/Python%20SDK/pynaoqi-python2.7-2.5.7.1-linux64.tar.gz
```
Next, move into the `site-packages` directory and extract the file you just downloaded:
```bash
cd .venv/lib/python2.7/site-packages/
tar xvfz <path to SDK>/pynaoqi-python2.7-2.5.7.1-linux64.tar.gz
rm <path to SDK>/pynaoqi-python2.7-2.5.7.1-linux64.tar.gz
```
Lastly, we need to inform our virtual environment where to find our newly installed package:
```bash
echo <path to project>/.venv/lib/python2.7/site-packages/pynaoqi-python2.7-2.5.7.1-linux64/lib/python2.7/site-packages/ > pynaoqi-python2.7.pth
```
That's it! Verify that it works with
```bash
python -c "import qi; print(qi)"
```
You should now be able to run this project.
### macOS
Similar to Linux, but don't bother installing `pyenv` as it won't be able to install Python 2 on Apple Silicon. Instead, install Python 2.7.18 from the [Python website](https://www.python.org/downloads/release/python-2718/).
Create the virtual environment as described above in the Linux section. Stop at the point where it shows you how to download the NaoQi SDK. Instead, use:
```shell
curl -OL https://community-static.aldebaran.com/resources/2.5.10/Python%20SDK/pynaoqi-python2.7-2.5.7.1-mac64.tar.gz
```
Then resume the steps from above.
## Usage
@@ -96,8 +39,15 @@ On Windows:
$env:PYTHONPATH="src"; python -m robot_interface.main
```
With both, if you want to connect to the actual robot (or simulator), pass the `--qi-url` argument.
### Program Arguments
If you want to connect to the actual robot (or simulator), pass the `--qi-url` argument.
There's also a `--microphone` argument that can be used to choose a microphone to use. If not given, the program will try the default microphone. If you don't know the name of the microphone, pass the argument with any value, and it will list the names of available microphones.
### Environment Variables
You may use environment variables to change settings. Make a copy of the [`.env.example`](.env.example) file, name it `.env` and put it in the root directory. The file itself describes how to do the configuration.
## Testing
@@ -120,15 +70,29 @@ For coverage, add `--cov=robot_interface` as an argument to `pytest`.
## GitHooks
## Documentation
Generate documentation web pages using:
To activate automatic commits/branch name checks run:
```shell
git config --local core.hooksPath .githooks
### Linux & macOS
```bash
PYTHONPATH=src sphinx-apidoc -F -o docs src/robot_interface
```
If your commit fails its either:
branch name != <type>/description-of-branch ,
commit name != <type>: description of the commit.
<ref>: N25B-Num's
### Windows
```bash
$env:PYTHONPATH="src"; sphinx-apidoc -F -o docs src/control_backend
```
Optionally, in the `conf.py` file in the new `docs` folder, change preferences.
In the `docs` folder:
### Linux & macOS
```bash
make html
```
### Windows
```bash
.\make.bat html
```

184
docs/conf.py Normal file
View File

@@ -0,0 +1,184 @@
# -*- coding: utf-8 -*-
#
# Configuration file for the Sphinx documentation builder.
#
# This file does only contain a selection of the most common options. For a
# full list see the documentation:
# http://www.sphinx-doc.org/en/master/config
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath("../src"))
# -- Project information -----------------------------------------------------
project = u'robot_interface'
copyright = u'2025, Author'
author = u'Author'
# The short X.Y version
version = u''
# The full version, including alpha/beta/rc tags
release = u''
# -- General configuration ---------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.viewcode',
'sphinx.ext.todo',
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = 'en'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = None
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'sphinx_rtd_theme'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# Custom sidebar templates, must be a dictionary that maps document names
# to template names.
#
# The default sidebars (for documents that don't match any pattern) are
# defined by theme itself. Builtin themes are using these templates by
# default: ``['localtoc.html', 'relations.html', 'sourcelink.html',
# 'searchbox.html']``.
#
# html_sidebars = {}
# -- Options for HTMLHelp output ---------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = 'robot_interfacedoc'
# -- Options for LaTeX output ------------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'robot_interface.tex', u'robot\\_interface Documentation',
u'Author', 'manual'),
]
# -- Options for manual page output ------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'robot_interface', u'robot_interface Documentation',
[author], 1)
]
# -- Options for Texinfo output ----------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'robot_interface', u'robot_interface Documentation',
author, 'robot_interface', 'One line description of project.',
'Miscellaneous'),
]
# -- Options for Epub output -------------------------------------------------
# Bibliographic Dublin Core info.
epub_title = project
# The unique identifier of the text. This can be a ISBN number
# or the project homepage.
#
# epub_identifier = ''
# A unique identification for the text.
#
# epub_uid = ''
# A list of files that should not be packed into the epub file.
epub_exclude_files = ['search.html']
# -- Extension configuration -------------------------------------------------
# -- Options for todo extension ----------------------------------------------
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = True

View File

@@ -0,0 +1,75 @@
# Installation
Of the Pepper Robot Interface on Linux (or WSL).
Start off by installing [Pyenv](https://github.com/pyenv/pyenv?tab=readme-ov-file#installation) and walk through the steps outlined there (be sure to also add it to PATH). Also install the [Python build requirements](https://github.com/pyenv/pyenv/wiki#suggested-build-environment). Afterwards, install Python 2.7 and activate it for your current shell:
```bash
pyenv install 2.7
pyenv shell 2.7
```
You can check that this worked by typing
```bash
python -V
```
Which should return `Python 2.7.18`.
Next, `cd` into this repository and create (and activate) a virtual environment:
```bash
cd <path to project>/
python -m pip install virtualenv
python -m virtualenv .venv
source .venv/bin/activate
```
We depend on PortAudio for the `pyaudio` package, so install it with:
```bash
sudo apt install -y portaudio19-dev
```
On WSL, also install:
```bash
sudo apt install -y libasound2-plugins
```
Install the required packages with
```bash
pip install -r requirements.txt
```
Now we need to install the NaoQi SDK into our virtual environment, which we need to do manually. Begin by downloading the SDK:
```bash
wget https://community-static.aldebaran.com/resources/2.5.10/Python%20SDK/pynaoqi-python2.7-2.5.7.1-linux64.tar.gz
```
Next, move into the `site-packages` directory and extract the file you just downloaded:
```bash
cd .venv/lib/python2.7/site-packages/
tar xvfz <path to SDK>/pynaoqi-python2.7-2.5.7.1-linux64.tar.gz
rm <path to SDK>/pynaoqi-python2.7-2.5.7.1-linux64.tar.gz
```
Lastly, we need to inform our virtual environment where to find our newly installed package:
```bash
echo <path to project>/.venv/lib/python2.7/site-packages/pynaoqi-python2.7-2.5.7.1-linux64/lib/python2.7/site-packages/ > pynaoqi-python2.7.pth
```
That's it! Verify that it works with
```bash
python -c "import qi; print(qi)"
```
You should now be able to run this project.
See the README for how to run.

106
docs/installation/macos.md Normal file
View File

@@ -0,0 +1,106 @@
# Installation
Of the Pepper Robot Interface on macOS.
## Python 2.7
Install Python 2.7.18 from the [Python website](https://www.python.org/downloads/release/python-2718/).
Check that it worked by executing
```shell
python2 -V
```
Which should return Python 2.7.18.
## Virtual Environment
Next, cd into this repository and create (and activate) a virtual environment:
```shell
cd /path/to/project/
python2 -m pip install virtualenv
python2 -m virtualenv .venv
source .venv/bin/activate
```
We depend on PortAudio for the `pyaudio` package. If on Intel, run `brew install portaudio`. If on Apple Silicon, compile manually using the steps described in [the YouTrack article](https://utrechtuniversity.youtrack.cloud/articles/N25B-A-22/Install-PyAudio-for-Python-2-on-Apple-Silicon).
Then install the required Python packages with
```shell
pip install -r requirements.txt
```
## NaoQi SDK
We need to manually install the NaoQi SDK into our virtual environment. There are two options:
1. Install a newer version (2.8) which will make running easier, but compatibility is uncertain.
2. Install the version expected by the robot (2.5). This will complicate running slightly.
### Option 1
Download the SDK from [twirre.io](https://twirre.io/files/pynaoqi-python2.7-2.8.6.23-mac64-20191127_144231.tar.gz), or find one on the Aldebaran website, or an archived version on Web Archive.
Extract it to `/path/to/project/.venv/lib/python2.7/site-packages/`.
We need to inform our virtual environment where to find our newly installed package:
```bash
echo "/path/to/project/.venv/lib/python2.7/site-packages/pynaoqi-python2.7-2.8.6.23-mac64-20191127_144231/lib/python2.7/site-packages/" > /path/to/project/.venv/lib/python2.7/site-packages/pynaoqi-python2.7.pth
```
Now continue with [verifying](#verifying).
### Option 2
This method of installation requires setting the `DYLD_LIBRARY_PATH` environment variable before running. How will be explained.
Download the SDK from [twirre.io](https://twirre.io/files/pynaoqi-2.5.7.1-mac64-deps.tar.gz). This is a modified version of the one from Aldebaran, this one including required Choregraphe dependencies.
Extract it to `/path/to/project/.venv/lib/python2.7/site-packages/`.
We need to inform our virtual environment where to find our newly installed package:
```shell
echo "/path/to/project/.venv/lib/python2.7/site-packages/pynaoqi-python2.7-2.5.7.1-mac64/lib/python2.7/site-packages/" > /path/to/project/.venv/lib/python2.7/site-packages/pynaoqi-python2.7.pth
```
Now, anytime before running you need to set the `DYLD_LIBRARY_PATH` environment variable.
```shell
export DYLD_LIBRARY_PATH="/path/to/project/.venv/lib/python2.7/site-packages/pynaoqi-python2.7-2.5.7.1-mac64/choregraphe_lib:${DYLD_LIBRARY_PATH}"
```
You may want to simplify environment activation with a script `activate.sh` like:
```shell
#!/bin/zsh
export DYLD_LIBRARY_PATH="/path/to/project/.venv/lib/python2.7/site-packages/pynaoqi-python2.7-2.8.6.23-mac64-20191127_144231/choregraphe_lib:${DYLD_LIBRARY_PATH}"
source .venv/bin/activate
```
[Verify](#verifying) if it works.
## Verifying
Verify that the NaoQI SDK installation works with
```bash
python -c "import qi; print(qi)"
```
If so, you should now be able to run this project.
See the README for how to run.

View File

@@ -0,0 +1,44 @@
# Installation
Of the Pepper Robot Interface on Windows.
Install Python 2.7.18 from [the Python website](https://www.python.org/downloads/release/python-2718/), choose the x86-64 installer (at the bottom of the page).
To see if it worked:
```shell
py -2 -V
```
Which should return `Python 2.7.18`.
Next, `cd` into this repository and create (and activate) a virtual environment:
```bash
cd <path to project>/
py -2 -m pip install virtualenv
py -2 -m virtualenv .venv
.\.venv\Scripts\activate
```
Install the required packages with
```bash
pip install -r requirements.txt
```
Now we need to install the NaoQi SDK into our virtual environment, which we need to do manually. Download the SDK from [Aldebaran](https://community-static.aldebaran.com/resources/2.5.5/sdk-python/pynaoqi-python2.7-2.5.5.5-win32-vs2013.zip), [Web Archive](https://web.archive.org/web/20240120111043/https://community-static.aldebaran.com/resources/2.5.5/sdk-python/pynaoqi-python2.7-2.5.5.5-win32-vs2013.zip) or [twirre.io](https://twirre.io/files/pynaoqi-python2.7-2.8.6.23-win64-vs2015-20191127_152649.zip).
Extract to `.\.venv\Lib\site-packages`.
Create a file `.venv\Lib\site-packages\pynaoqi-python2.7.pth`, put the full path of `pynaoqi-python2.7-2.8.6.23-win64-vs2015-20191127_152649\lib\python2.7\Lib\site-packages` in there.
Test if it worked by running:
```bash
python -c "import qi; print(qi)"
```
You should now be able to run this project.
See the README for how to run.

View File

@@ -3,3 +3,7 @@ pyaudio<=0.2.11
pytest<5
pytest-mock<3.0.0
pytest-cov<3.0.0
sphinx
sphinx_rtd_theme
pre-commit
python-dotenv

View File

View File

@@ -0,0 +1,129 @@
from __future__ import unicode_literals
from robot_interface.utils.get_config import get_config
class AgentSettings(object):
"""
Agent port configuration.
:ivar control_backend_host: Hostname of the control backend, defaults to "localhost".
:vartype control_backend_host: string
:ivar actuation_receiver_port: Port for receiving actuation commands, defaults to 5557.
:vartype actuation_receiver_port: int
:ivar main_receiver_port: Port for receiving main messages, defaults to 5555.
:vartype main_receiver_port: int
:ivar video_sender_port: Port used for sending video frames, defaults to 5556.
:vartype video_sender_port: int
:ivar audio_sender_port: Port used for sending audio data, defaults to 5558.
:vartype audio_sender_port: int
:ivar face_detection_port: Port used for sending face detection events, defaults to 5559.
:vartype face_detection_port: int
:ivar face_detection_interval: Time between face detection events, defaults to 1000 ms.
:vartype face_detection_interval: int
"""
def __init__(
self,
control_backend_host=None,
actuation_receiver_port=None,
main_receiver_port=None,
video_sender_port=None,
audio_sender_port=None,
face_detection_port=None,
face_detection_interval=None,
):
self.control_backend_host = get_config(control_backend_host, "AGENT__CONTROL_BACKEND_HOST", "localhost")
self.actuation_receiver_port = get_config(actuation_receiver_port, "AGENT__ACTUATION_RECEIVER_PORT", 5557, int)
self.main_receiver_port = get_config(main_receiver_port, "AGENT__MAIN_RECEIVER_PORT", 5555, int)
self.video_sender_port = get_config(video_sender_port, "AGENT__VIDEO_SENDER_PORT", 5556, int)
self.audio_sender_port = get_config(audio_sender_port, "AGENT__AUDIO_SENDER_PORT", 5558, int)
self.face_detection_port = get_config(face_detection_port, "AGENT__FACE_DETECTION_PORT", 5559, int)
self.face_detection_interval = get_config(face_detection_interval, "AGENT__FACE_DETECTION_INTERVAL", 1000, int)
class VideoConfig(object):
"""
Video configuration constants.
:ivar camera_index: Index of the camera used, defaults to 0.
:vartype camera_index: int
:ivar resolution: Video resolution mode, defaults to 2.
:vartype resolution: int
:ivar color_space: Color space identifier, defaults to 11.
:vartype color_space: int
:ivar fps: Frames per second of the video stream, defaults to 15.
:vartype fps: int
:ivar stream_name: Name of the video stream, defaults to "Pepper Video".
:vartype stream_name: str
:ivar image_buffer: Internal buffer size for video frames, defaults to 6.
:vartype image_buffer: int
"""
def __init__(
self,
camera_index=None,
resolution=None,
color_space=None,
fps=None,
stream_name=None,
image_buffer=None,
):
self.camera_index = get_config(camera_index, "VIDEO__CAMERA_INDEX", 0, int)
self.resolution = get_config(resolution, "VIDEO__RESOLUTION", 2, int)
self.color_space = get_config(color_space, "VIDEO__COLOR_SPACE", 11, int)
self.fps = get_config(fps, "VIDEO__FPS", 15, int)
self.stream_name = get_config(stream_name, "VIDEO__STREAM_NAME", "Pepper Video")
self.image_buffer = get_config(image_buffer, "VIDEO__IMAGE_BUFFER", 6, int)
class AudioConfig(object):
"""
Audio configuration constants.
:ivar sample_rate: Audio sampling rate in Hz, defaults to 16000.
:vartype sample_rate: int
:ivar chunk_size: Size of audio chunks to capture/process, defaults to 512.
:vartype chunk_size: int
:ivar channels: Number of audio channels, defaults to 1.
:vartype channels: int
"""
def __init__(self, sample_rate=None, chunk_size=None, channels=None):
self.sample_rate = get_config(sample_rate, "AUDIO__SAMPLE_RATE", 16000, int)
self.chunk_size = get_config(chunk_size, "AUDIO__CHUNK_SIZE", 512, int)
self.channels = get_config(channels, "AUDIO__CHANNELS", 1, int)
class MainConfig(object):
"""
Main system configuration.
:ivar poll_timeout_ms: Timeout for polling events, in milliseconds, defaults to 100.
:vartype poll_timeout_ms: int
:ivar max_handler_time_ms: Maximum allowed handler time, in milliseconds, defaults to 50.
:vartype max_handler_time_ms: int
"""
def __init__(self, poll_timeout_ms=None, max_handler_time_ms=None):
self.poll_timeout_ms = get_config(poll_timeout_ms, "MAIN__POLL_TIMEOUT_MS", 100, int)
self.max_handler_time_ms = get_config(max_handler_time_ms, "MAIN__MAX_HANDLER_TIME_MS", 50, int)
class Settings(object):
"""
Global settings container.
:ivar agent_settings: Agent-related port configuration.
:vartype agent_settings: AgentSettings
:ivar video_config: Video stream configuration.
:vartype video_config: VideoConfig
:ivar audio_config: Audio stream configuration.
:vartype audio_config: AudioConfig
:ivar main_config: Main system-level configuration.
:vartype main_config: MainConfig
"""
def __init__(self, agent_settings=None, video_config=None, audio_config=None, main_config=None):
self.agent_settings = agent_settings or AgentSettings()
self.video_config = video_config or VideoConfig()
self.audio_config = audio_config or AudioConfig()
self.main_config = main_config or MainConfig()
settings = Settings()

View File

@@ -1,29 +1,49 @@
from __future__ import unicode_literals # So that we can log texts with Unicode characters
import logging
from threading import Thread
import Queue
import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
from robot_interface.state import state
from robot_interface.core.config import settings
from robot_interface.endpoints.gesture_settings import GestureTags
class ActuationReceiver(ReceiverBase):
def __init__(self, zmq_context, port=5557):
"""
The actuation receiver endpoint, responsible for handling speech and gesture requests.
"""
The actuation receiver endpoint, responsible for handling speech and gesture requests.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use.
:type port: int
"""
:param port: The port to use.
:type port: int
:ivar _tts_service: The text-to-speech service object from the Qi session.
:vartype _tts_service: qi.Session | None
:ivar _animation_service: The animation/gesture service object from the Qi session.
:vartype _animation_service: qi.Session | None
"""
def __init__(self, zmq_context, port=settings.agent_settings.actuation_receiver_port):
super(ActuationReceiver, self).__init__("actuation")
self.create_socket(zmq_context, zmq.SUB, port)
self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options
self._tts_service = None
self._al_memory = None
self._animation_service = None
self._message_queue = Queue.Queue()
self.message_thread = Thread(target=self._handle_messages)
self.message_thread.start()
def _handle_speech(self, message):
"""
Handle a speech actuation request.
:param message: The message to handle, must contain properties "endpoint" and "data".
:type message: dict
"""
text = message.get("data")
if not text:
logging.warn("Received message to speak, but it lacks data.")
@@ -41,26 +61,108 @@ class ActuationReceiver(ReceiverBase):
if not self._tts_service:
self._tts_service = state.qi_session.service("ALTextToSpeech")
if not self._al_memory:
self._al_memory = state.qi_session.service("ALMemory")
# Subscribe to speech end event
self.status_subscriber = self._al_memory.subscriber("ALTextToSpeech/Status") # self because garbage collect
self.status_subscriber.signal.connect(self._on_status_changed)
if message.get("is_priority"):
# Bypass queue and speak immediately
self.clear_queue()
self._message_queue.put(text)
logging.debug("Force speaking immediately: {}".format(text))
else:
self._message_queue.put(text)
# Returns instantly. Messages received while speaking will be queued.
qi.async(self._tts_service.say, text)
def clear_queue(self):
"""
Safely drains all pending messages from the queue.
"""
logging.info("Message queue size: {}".format(self._message_queue.qsize()))
try:
while True:
# Remove items one by one without waiting
self._message_queue.get_nowait()
except Queue.Empty:
pass
logging.info("Message queue cleared.")
@staticmethod
def _on_status_changed(value): # value will contain either 'enqueued', 'started' or 'done' depending on the status
"""Callback function for when the speaking status changes. Will change the is_speaking value of the state."""
if "started" in value:
logging.debug("Started speaking.")
state.is_speaking = True
if "done" in value:
logging.debug("Done speaking.")
state.is_speaking = False
def _handle_gesture(self, message, is_single):
"""
Handle a gesture actuation request.
:param message: The gesture to do, must contain properties "endpoint" and "data".
:type message: dict
:param is_single: Whether it's a specific single gesture or a gesture tag.
:type is_single: bool
"""
gesture = message.get("data")
if not gesture:
logging.warn("Received gesture to do, but it lacks data.")
return
if not isinstance(gesture, (str, unicode)):
logging.warn("Received gesture to do but it is not a string.")
return
logging.debug("Received gesture to do: {}".format(gesture))
if is_single:
if gesture not in GestureTags.single_gestures:
logging.warn("Received single gesture to do, but it does not exist in settings")
return
else:
if gesture not in GestureTags.tags:
logging.warn("Received single tag to do, but it does not exist in settings")
return
if not state.qi_session: return
# If state has a qi_session, we know that we can import qi
import qi # Takes a while only the first time it's imported
if not self._animation_service:
self._animation_service = state.qi_session.service("ALAnimationPlayer")
# Play the gesture. Pepper comes with predefined animations like "Wave", "Greet", "Clap"
# You can also create custom animations using Choregraphe and upload them to the robot.
if is_single:
logging.debug("Playing single gesture: {}".format(gesture))
getattr(qi, "async")(self._animation_service.run, gesture)
else:
logging.debug("Playing tag gesture: {}".format(gesture))
getattr(qi, "async")(self._animation_service.runTag, gesture)
def handle_message(self, message):
"""
Handle an actuation/speech message with the receiver.
:param message: The message to handle, must contain properties "endpoint" and "data".
:type message: dict
"""
if message["endpoint"] == "actuate/speech":
self._handle_speech(message)
if message["endpoint"] == "actuate/gesture/tag":
self._handle_gesture(message, False)
if message["endpoint"] == "actuate/gesture/single":
self._handle_gesture(message, True)
def _handle_messages(self):
while not state.exit_event.is_set():
try:
text = self._message_queue.get(timeout=0.1)
state.is_speaking = True
self._tts_service.say(text)
except Queue.Empty:
state.is_speaking = False
except RuntimeError:
logging.error("Lost connection to Pepper. Please check if you're connected to the "
"local WiFi and restart this application.")
state.exit_event.set()
def endpoint_description(self):
"""
Extend the default endpoint description with gesture tags.
Returned during negotiate/ports so the CB knows available gestures.
"""
desc = super(ActuationReceiver, self).endpoint_description()
desc["gestures"] = GestureTags.tags
desc["single_gestures"] = GestureTags.single_gestures
return desc

View File

@@ -7,23 +7,49 @@ import zmq
from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state
from robot_interface.utils.microphone import choose_mic_default
from robot_interface.utils.microphone import choose_mic
from robot_interface.core.config import settings
logger = logging.getLogger(__name__)
class AudioSender(SocketBase):
def __init__(self, zmq_context, port=5558):
"""
Audio sender endpoint, responsible for sending microphone audio data.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use.
:type port: int
:ivar thread: Thread used for sending audio.
:vartype thread: threading.Thread | None
:ivar audio: PyAudio instance.
:vartype audio: pyaudio.PyAudio | None
:ivar microphone: Selected microphone information.
:vartype microphone: dict | None
"""
def __init__(self, zmq_context, port=settings.agent_settings.audio_sender_port):
super(AudioSender, self).__init__(str("audio")) # Convert future's unicode_literal to str
self.create_socket(zmq_context, zmq.PUB, port)
self.audio = pyaudio.PyAudio()
self.microphone = choose_mic_default(self.audio)
self.thread = None
try:
self.audio = pyaudio.PyAudio()
self.microphone = choose_mic(self.audio)
except IOError as e:
logger.warning("PyAudio is not available.", exc_info=e)
self.audio = None
self.microphone = None
def start(self):
"""
Start sending audio in a different thread.
Will not start if no microphone is available.
"""
if not self.microphone:
logger.info("Not listening: no microphone available.")
@@ -35,21 +61,26 @@ class AudioSender(SocketBase):
def wait_until_done(self):
"""
Wait until the audio thread is done. Will only be done if `state.exit_event` is set, so
make sure to set that before calling this method or it will block.
Wait until the audio thread is done.
Will block until `state.exit_event` is set. If the thread is not running, does nothing.
"""
if not self.thread: return
self.thread.join()
self.thread = None
def _stream(self):
chunk = 512 # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD
"""
Internal method to continuously read audio from the microphone and send it over the socket.
"""
audio_settings = settings.audio_config
chunk = audio_settings.chunk_size # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD
# Docs say this only raises an error if neither `input` nor `output` is True
stream = self.audio.open(
format=pyaudio.paFloat32,
channels=1,
rate=16000,
channels=audio_settings.channels,
rate=audio_settings.sample_rate,
input=True,
input_device_index=self.microphone["index"],
frames_per_buffer=chunk,
@@ -57,14 +88,8 @@ class AudioSender(SocketBase):
try:
while not state.exit_event.is_set():
# Don't send audio if Pepper is speaking
if state.is_speaking:
if stream.is_active(): stream.stop_stream()
continue
if stream.is_stopped(): stream.start_stream()
data = stream.read(chunk)
if (state.is_speaking): continue # Do not send audio while the robot is speaking
self.socket.send(data)
except IOError as e:
logger.error("Stopped listening: failed to get audio from microphone.", exc_info=e)

View File

@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals
import json
import logging
import threading
import time
import zmq
from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state
from robot_interface.core.config import settings
class FaceDetectionSender(SocketBase):
"""
Face detection endpoint.
Subscribes to and polls ALMemory["FaceDetected"], sends events to CB.
"""
def __init__(self, zmq_context, port=settings.agent_settings.face_detection_port):
super(FaceDetectionSender, self).__init__("face")
self.create_socket(zmq_context, zmq.PUB, port)
self._face_service = None
self._memory_service = None
self._face_thread = None
def start_face_detection(self):
if not state.qi_session:
logging.warning("No Qi session available. Face detection not started.")
return
self._face_service = state.qi_session.service("ALFaceDetection")
self._memory_service = state.qi_session.service("ALMemory")
self._face_service.setTrackingEnabled(False)
self._face_service.setRecognitionEnabled(False)
self._face_service.subscribe(
"FaceDetectionSender",
settings.agent_settings.face_detection_interval,
0.0,
)
self._face_thread = threading.Thread(target=self._face_loop)
self._face_thread.start()
logging.info("Face detection started.")
def _face_loop(self):
"""
Continuously send face detected to the CB, at the interval set in the
``start_face_detection`` method.
"""
while not state.exit_event.is_set():
try:
value = self._memory_service.getData("FaceDetected", 0)
face_present = (
value
and len(value) > 1
and value[1]
and value[1][0]
and len(value[1][0]) > 0
)
self.socket.send(json.dumps({"face_detected": face_present}).encode("utf-8"))
except Exception:
logging.exception("Error reading FaceDetected")
time.sleep(settings.agent_settings.face_detection_interval / 1000.0)
def stop_face_detection(self):
try:
if self._face_service:
self._face_service.unsubscribe("FaceDetectionSender")
self._face_service.setTrackingEnabled(False)
logging.info("Face detection stopped.")
except Exception:
logging.warning("Error during face detection cleanup.")
def close(self):
super(FaceDetectionSender, self).close()
self.stop_face_detection()

View File

@@ -0,0 +1,412 @@
class GestureTags:
tags = ["above", "affirmative", "afford", "agitated", "all", "allright", "alright", "any",
"assuage", "assuage", "attemper", "back", "bashful", "beg", "beseech", "blank",
"body language", "bored", "bow", "but", "call", "calm", "choose", "choice", "cloud",
"cogitate", "cool", "crazy", "disappointed", "down", "earth", "empty", "embarrassed",
"enthusiastic", "entire", "estimate", "except", "exalted", "excited", "explain", "far",
"field", "floor", "forlorn", "friendly", "front", "frustrated", "gentle", "gift",
"give", "ground", "happy", "hello", "her", "here", "hey", "hi", "him", "hopeless",
"hysterical", "I", "implore", "indicate", "joyful", "me", "meditate", "modest",
"negative", "nervous", "no", "not know", "nothing", "offer", "ok", "once upon a time",
"oppose", "or", "pacify", "pick", "placate", "please", "present", "proffer", "quiet",
"reason", "refute", "reject", "rousing", "sad", "select", "shamefaced", "show",
"show sky", "sky", "soothe", "sun", "supplicate", "tablet", "tall", "them", "there",
"think", "timid", "top", "unless", "up", "upstairs", "void", "warm", "winner", "yeah",
"yes", "yoo-hoo", "you", "your", "zero", "zestful"]
single_gestures = [
"animations/Stand/BodyTalk/Listening/Listening_1",
"animations/Stand/BodyTalk/Listening/Listening_2",
"animations/Stand/BodyTalk/Listening/Listening_3",
"animations/Stand/BodyTalk/Listening/Listening_4",
"animations/Stand/BodyTalk/Listening/Listening_5",
"animations/Stand/BodyTalk/Listening/Listening_6",
"animations/Stand/BodyTalk/Listening/Listening_7",
"animations/Stand/BodyTalk/Speaking/BodyTalk_1",
"animations/Stand/BodyTalk/Speaking/BodyTalk_10",
"animations/Stand/BodyTalk/Speaking/BodyTalk_11",
"animations/Stand/BodyTalk/Speaking/BodyTalk_12",
"animations/Stand/BodyTalk/Speaking/BodyTalk_13",
"animations/Stand/BodyTalk/Speaking/BodyTalk_14",
"animations/Stand/BodyTalk/Speaking/BodyTalk_15",
"animations/Stand/BodyTalk/Speaking/BodyTalk_16",
"animations/Stand/BodyTalk/Speaking/BodyTalk_2",
"animations/Stand/BodyTalk/Speaking/BodyTalk_3",
"animations/Stand/BodyTalk/Speaking/BodyTalk_4",
"animations/Stand/BodyTalk/Speaking/BodyTalk_5",
"animations/Stand/BodyTalk/Speaking/BodyTalk_6",
"animations/Stand/BodyTalk/Speaking/BodyTalk_7",
"animations/Stand/BodyTalk/Speaking/BodyTalk_8",
"animations/Stand/BodyTalk/Speaking/BodyTalk_9",
"animations/Stand/BodyTalk/Thinking/Remember_1",
"animations/Stand/BodyTalk/Thinking/Remember_2",
"animations/Stand/BodyTalk/Thinking/Remember_3",
"animations/Stand/BodyTalk/Thinking/ThinkingLoop_1",
"animations/Stand/BodyTalk/Thinking/ThinkingLoop_2",
"animations/Stand/Emotions/Negative/Angry_1",
"animations/Stand/Emotions/Negative/Angry_2",
"animations/Stand/Emotions/Negative/Angry_3",
"animations/Stand/Emotions/Negative/Angry_4",
"animations/Stand/Emotions/Negative/Anxious_1",
"animations/Stand/Emotions/Negative/Bored_1",
"animations/Stand/Emotions/Negative/Bored_2",
"animations/Stand/Emotions/Negative/Disappointed_1",
"animations/Stand/Emotions/Negative/Exhausted_1",
"animations/Stand/Emotions/Negative/Exhausted_2",
"animations/Stand/Emotions/Negative/Fear_1",
"animations/Stand/Emotions/Negative/Fear_2",
"animations/Stand/Emotions/Negative/Fearful_1",
"animations/Stand/Emotions/Negative/Frustrated_1",
"animations/Stand/Emotions/Negative/Humiliated_1",
"animations/Stand/Emotions/Negative/Hurt_1",
"animations/Stand/Emotions/Negative/Hurt_2",
"animations/Stand/Emotions/Negative/Late_1",
"animations/Stand/Emotions/Negative/Sad_1",
"animations/Stand/Emotions/Negative/Sad_2",
"animations/Stand/Emotions/Negative/Shocked_1",
"animations/Stand/Emotions/Negative/Sorry_1",
"animations/Stand/Emotions/Negative/Surprise_1",
"animations/Stand/Emotions/Negative/Surprise_2",
"animations/Stand/Emotions/Negative/Surprise_3",
"animations/Stand/Emotions/Neutral/Alienated_1",
"animations/Stand/Emotions/Neutral/AskForAttention_1",
"animations/Stand/Emotions/Neutral/AskForAttention_2",
"animations/Stand/Emotions/Neutral/AskForAttention_3",
"animations/Stand/Emotions/Neutral/Cautious_1",
"animations/Stand/Emotions/Neutral/Confused_1",
"animations/Stand/Emotions/Neutral/Determined_1",
"animations/Stand/Emotions/Neutral/Embarrassed_1",
"animations/Stand/Emotions/Neutral/Hesitation_1",
"animations/Stand/Emotions/Neutral/Innocent_1",
"animations/Stand/Emotions/Neutral/Lonely_1",
"animations/Stand/Emotions/Neutral/Mischievous_1",
"animations/Stand/Emotions/Neutral/Puzzled_1",
"animations/Stand/Emotions/Neutral/Sneeze",
"animations/Stand/Emotions/Neutral/Stubborn_1",
"animations/Stand/Emotions/Neutral/Suspicious_1",
"animations/Stand/Emotions/Positive/Amused_1",
"animations/Stand/Emotions/Positive/Confident_1",
"animations/Stand/Emotions/Positive/Ecstatic_1",
"animations/Stand/Emotions/Positive/Enthusiastic_1",
"animations/Stand/Emotions/Positive/Excited_1",
"animations/Stand/Emotions/Positive/Excited_2",
"animations/Stand/Emotions/Positive/Excited_3",
"animations/Stand/Emotions/Positive/Happy_1",
"animations/Stand/Emotions/Positive/Happy_2",
"animations/Stand/Emotions/Positive/Happy_3",
"animations/Stand/Emotions/Positive/Happy_4",
"animations/Stand/Emotions/Positive/Hungry_1",
"animations/Stand/Emotions/Positive/Hysterical_1",
"animations/Stand/Emotions/Positive/Interested_1",
"animations/Stand/Emotions/Positive/Interested_2",
"animations/Stand/Emotions/Positive/Laugh_1",
"animations/Stand/Emotions/Positive/Laugh_2",
"animations/Stand/Emotions/Positive/Laugh_3",
"animations/Stand/Emotions/Positive/Mocker_1",
"animations/Stand/Emotions/Positive/Optimistic_1",
"animations/Stand/Emotions/Positive/Peaceful_1",
"animations/Stand/Emotions/Positive/Proud_1",
"animations/Stand/Emotions/Positive/Proud_2",
"animations/Stand/Emotions/Positive/Proud_3",
"animations/Stand/Emotions/Positive/Relieved_1",
"animations/Stand/Emotions/Positive/Shy_1",
"animations/Stand/Emotions/Positive/Shy_2",
"animations/Stand/Emotions/Positive/Sure_1",
"animations/Stand/Emotions/Positive/Winner_1",
"animations/Stand/Emotions/Positive/Winner_2",
"animations/Stand/Gestures/Angry_1",
"animations/Stand/Gestures/Angry_2",
"animations/Stand/Gestures/Angry_3",
"animations/Stand/Gestures/BowShort_1",
"animations/Stand/Gestures/BowShort_2",
"animations/Stand/Gestures/BowShort_3",
"animations/Stand/Gestures/But_1",
"animations/Stand/Gestures/CalmDown_1",
"animations/Stand/Gestures/CalmDown_2",
"animations/Stand/Gestures/CalmDown_3",
"animations/Stand/Gestures/CalmDown_4",
"animations/Stand/Gestures/CalmDown_5",
"animations/Stand/Gestures/CalmDown_6",
"animations/Stand/Gestures/Choice_1",
"animations/Stand/Gestures/ComeOn_1",
"animations/Stand/Gestures/Confused_1",
"animations/Stand/Gestures/Confused_2",
"animations/Stand/Gestures/CountFive_1",
"animations/Stand/Gestures/CountFour_1",
"animations/Stand/Gestures/CountMore_1",
"animations/Stand/Gestures/CountOne_1",
"animations/Stand/Gestures/CountThree_1",
"animations/Stand/Gestures/CountTwo_1",
"animations/Stand/Gestures/Desperate_1",
"animations/Stand/Gestures/Desperate_2",
"animations/Stand/Gestures/Desperate_3",
"animations/Stand/Gestures/Desperate_4",
"animations/Stand/Gestures/Desperate_5",
"animations/Stand/Gestures/DontUnderstand_1",
"animations/Stand/Gestures/Enthusiastic_3",
"animations/Stand/Gestures/Enthusiastic_4",
"animations/Stand/Gestures/Enthusiastic_5",
"animations/Stand/Gestures/Everything_1",
"animations/Stand/Gestures/Everything_2",
"animations/Stand/Gestures/Everything_3",
"animations/Stand/Gestures/Everything_4",
"animations/Stand/Gestures/Everything_6",
"animations/Stand/Gestures/Excited_1",
"animations/Stand/Gestures/Explain_1",
"animations/Stand/Gestures/Explain_10",
"animations/Stand/Gestures/Explain_11",
"animations/Stand/Gestures/Explain_2",
"animations/Stand/Gestures/Explain_3",
"animations/Stand/Gestures/Explain_4",
"animations/Stand/Gestures/Explain_5",
"animations/Stand/Gestures/Explain_6",
"animations/Stand/Gestures/Explain_7",
"animations/Stand/Gestures/Explain_8",
"animations/Stand/Gestures/Far_1",
"animations/Stand/Gestures/Far_2",
"animations/Stand/Gestures/Far_3",
"animations/Stand/Gestures/Follow_1",
"animations/Stand/Gestures/Give_1",
"animations/Stand/Gestures/Give_2",
"animations/Stand/Gestures/Give_3",
"animations/Stand/Gestures/Give_4",
"animations/Stand/Gestures/Give_5",
"animations/Stand/Gestures/Give_6",
"animations/Stand/Gestures/Great_1",
"animations/Stand/Gestures/HeSays_1",
"animations/Stand/Gestures/HeSays_2",
"animations/Stand/Gestures/HeSays_3",
"animations/Stand/Gestures/Hey_1",
"animations/Stand/Gestures/Hey_10",
"animations/Stand/Gestures/Hey_2",
"animations/Stand/Gestures/Hey_3",
"animations/Stand/Gestures/Hey_4",
"animations/Stand/Gestures/Hey_6",
"animations/Stand/Gestures/Hey_7",
"animations/Stand/Gestures/Hey_8",
"animations/Stand/Gestures/Hey_9",
"animations/Stand/Gestures/Hide_1",
"animations/Stand/Gestures/Hot_1",
"animations/Stand/Gestures/Hot_2",
"animations/Stand/Gestures/IDontKnow_1",
"animations/Stand/Gestures/IDontKnow_2",
"animations/Stand/Gestures/IDontKnow_3",
"animations/Stand/Gestures/IDontKnow_4",
"animations/Stand/Gestures/IDontKnow_5",
"animations/Stand/Gestures/IDontKnow_6",
"animations/Stand/Gestures/Joy_1",
"animations/Stand/Gestures/Kisses_1",
"animations/Stand/Gestures/Look_1",
"animations/Stand/Gestures/Look_2",
"animations/Stand/Gestures/Maybe_1",
"animations/Stand/Gestures/Me_1",
"animations/Stand/Gestures/Me_2",
"animations/Stand/Gestures/Me_4",
"animations/Stand/Gestures/Me_7",
"animations/Stand/Gestures/Me_8",
"animations/Stand/Gestures/Mime_1",
"animations/Stand/Gestures/Mime_2",
"animations/Stand/Gestures/Next_1",
"animations/Stand/Gestures/No_1",
"animations/Stand/Gestures/No_2",
"animations/Stand/Gestures/No_3",
"animations/Stand/Gestures/No_4",
"animations/Stand/Gestures/No_5",
"animations/Stand/Gestures/No_6",
"animations/Stand/Gestures/No_7",
"animations/Stand/Gestures/No_8",
"animations/Stand/Gestures/No_9",
"animations/Stand/Gestures/Nothing_1",
"animations/Stand/Gestures/Nothing_2",
"animations/Stand/Gestures/OnTheEvening_1",
"animations/Stand/Gestures/OnTheEvening_2",
"animations/Stand/Gestures/OnTheEvening_3",
"animations/Stand/Gestures/OnTheEvening_4",
"animations/Stand/Gestures/OnTheEvening_5",
"animations/Stand/Gestures/Please_1",
"animations/Stand/Gestures/Please_2",
"animations/Stand/Gestures/Please_3",
"animations/Stand/Gestures/Reject_1",
"animations/Stand/Gestures/Reject_2",
"animations/Stand/Gestures/Reject_3",
"animations/Stand/Gestures/Reject_4",
"animations/Stand/Gestures/Reject_5",
"animations/Stand/Gestures/Reject_6",
"animations/Stand/Gestures/Salute_1",
"animations/Stand/Gestures/Salute_2",
"animations/Stand/Gestures/Salute_3",
"animations/Stand/Gestures/ShowFloor_1",
"animations/Stand/Gestures/ShowFloor_2",
"animations/Stand/Gestures/ShowFloor_3",
"animations/Stand/Gestures/ShowFloor_4",
"animations/Stand/Gestures/ShowFloor_5",
"animations/Stand/Gestures/ShowSky_1",
"animations/Stand/Gestures/ShowSky_10",
"animations/Stand/Gestures/ShowSky_11",
"animations/Stand/Gestures/ShowSky_12",
"animations/Stand/Gestures/ShowSky_2",
"animations/Stand/Gestures/ShowSky_3",
"animations/Stand/Gestures/ShowSky_4",
"animations/Stand/Gestures/ShowSky_5",
"animations/Stand/Gestures/ShowSky_6",
"animations/Stand/Gestures/ShowSky_7",
"animations/Stand/Gestures/ShowSky_8",
"animations/Stand/Gestures/ShowSky_9",
"animations/Stand/Gestures/ShowTablet_1",
"animations/Stand/Gestures/ShowTablet_2",
"animations/Stand/Gestures/ShowTablet_3",
"animations/Stand/Gestures/Shy_1",
"animations/Stand/Gestures/Stretch_1",
"animations/Stand/Gestures/Stretch_2",
"animations/Stand/Gestures/Surprised_1",
"animations/Stand/Gestures/TakePlace_1",
"animations/Stand/Gestures/TakePlace_2",
"animations/Stand/Gestures/Take_1",
"animations/Stand/Gestures/Thinking_1",
"animations/Stand/Gestures/Thinking_2",
"animations/Stand/Gestures/Thinking_3",
"animations/Stand/Gestures/Thinking_4",
"animations/Stand/Gestures/Thinking_5",
"animations/Stand/Gestures/Thinking_6",
"animations/Stand/Gestures/Thinking_7",
"animations/Stand/Gestures/Thinking_8",
"animations/Stand/Gestures/This_1",
"animations/Stand/Gestures/This_10",
"animations/Stand/Gestures/This_11",
"animations/Stand/Gestures/This_12",
"animations/Stand/Gestures/This_13",
"animations/Stand/Gestures/This_14",
"animations/Stand/Gestures/This_15",
"animations/Stand/Gestures/This_2",
"animations/Stand/Gestures/This_3",
"animations/Stand/Gestures/This_4",
"animations/Stand/Gestures/This_5",
"animations/Stand/Gestures/This_6",
"animations/Stand/Gestures/This_7",
"animations/Stand/Gestures/This_8",
"animations/Stand/Gestures/This_9",
"animations/Stand/Gestures/WhatSThis_1",
"animations/Stand/Gestures/WhatSThis_10",
"animations/Stand/Gestures/WhatSThis_11",
"animations/Stand/Gestures/WhatSThis_12",
"animations/Stand/Gestures/WhatSThis_13",
"animations/Stand/Gestures/WhatSThis_14",
"animations/Stand/Gestures/WhatSThis_15",
"animations/Stand/Gestures/WhatSThis_16",
"animations/Stand/Gestures/WhatSThis_2",
"animations/Stand/Gestures/WhatSThis_3",
"animations/Stand/Gestures/WhatSThis_4",
"animations/Stand/Gestures/WhatSThis_5",
"animations/Stand/Gestures/WhatSThis_6",
"animations/Stand/Gestures/WhatSThis_7",
"animations/Stand/Gestures/WhatSThis_8",
"animations/Stand/Gestures/WhatSThis_9",
"animations/Stand/Gestures/Whisper_1",
"animations/Stand/Gestures/Wings_1",
"animations/Stand/Gestures/Wings_2",
"animations/Stand/Gestures/Wings_3",
"animations/Stand/Gestures/Wings_4",
"animations/Stand/Gestures/Wings_5",
"animations/Stand/Gestures/Yes_1",
"animations/Stand/Gestures/Yes_2",
"animations/Stand/Gestures/Yes_3",
"animations/Stand/Gestures/YouKnowWhat_1",
"animations/Stand/Gestures/YouKnowWhat_2",
"animations/Stand/Gestures/YouKnowWhat_3",
"animations/Stand/Gestures/YouKnowWhat_4",
"animations/Stand/Gestures/YouKnowWhat_5",
"animations/Stand/Gestures/YouKnowWhat_6",
"animations/Stand/Gestures/You_1",
"animations/Stand/Gestures/You_2",
"animations/Stand/Gestures/You_3",
"animations/Stand/Gestures/You_4",
"animations/Stand/Gestures/You_5",
"animations/Stand/Gestures/Yum_1",
"animations/Stand/Reactions/EthernetOff_1",
"animations/Stand/Reactions/EthernetOn_1",
"animations/Stand/Reactions/Heat_1",
"animations/Stand/Reactions/Heat_2",
"animations/Stand/Reactions/LightShine_1",
"animations/Stand/Reactions/LightShine_2",
"animations/Stand/Reactions/LightShine_3",
"animations/Stand/Reactions/LightShine_4",
"animations/Stand/Reactions/SeeColor_1",
"animations/Stand/Reactions/SeeColor_2",
"animations/Stand/Reactions/SeeColor_3",
"animations/Stand/Reactions/SeeSomething_1",
"animations/Stand/Reactions/SeeSomething_3",
"animations/Stand/Reactions/SeeSomething_4",
"animations/Stand/Reactions/SeeSomething_5",
"animations/Stand/Reactions/SeeSomething_6",
"animations/Stand/Reactions/SeeSomething_7",
"animations/Stand/Reactions/SeeSomething_8",
"animations/Stand/Reactions/ShakeBody_1",
"animations/Stand/Reactions/ShakeBody_2",
"animations/Stand/Reactions/ShakeBody_3",
"animations/Stand/Reactions/TouchHead_1",
"animations/Stand/Reactions/TouchHead_2",
"animations/Stand/Reactions/TouchHead_3",
"animations/Stand/Reactions/TouchHead_4",
"animations/Stand/Waiting/AirGuitar_1",
"animations/Stand/Waiting/BackRubs_1",
"animations/Stand/Waiting/Bandmaster_1",
"animations/Stand/Waiting/Binoculars_1",
"animations/Stand/Waiting/BreathLoop_1",
"animations/Stand/Waiting/BreathLoop_2",
"animations/Stand/Waiting/BreathLoop_3",
"animations/Stand/Waiting/CallSomeone_1",
"animations/Stand/Waiting/Drink_1",
"animations/Stand/Waiting/DriveCar_1",
"animations/Stand/Waiting/Fitness_1",
"animations/Stand/Waiting/Fitness_2",
"animations/Stand/Waiting/Fitness_3",
"animations/Stand/Waiting/FunnyDancer_1",
"animations/Stand/Waiting/HappyBirthday_1",
"animations/Stand/Waiting/Helicopter_1",
"animations/Stand/Waiting/HideEyes_1",
"animations/Stand/Waiting/HideHands_1",
"animations/Stand/Waiting/Innocent_1",
"animations/Stand/Waiting/Knight_1",
"animations/Stand/Waiting/KnockEye_1",
"animations/Stand/Waiting/KungFu_1",
"animations/Stand/Waiting/LookHand_1",
"animations/Stand/Waiting/LookHand_2",
"animations/Stand/Waiting/LoveYou_1",
"animations/Stand/Waiting/Monster_1",
"animations/Stand/Waiting/MysticalPower_1",
"animations/Stand/Waiting/PlayHands_1",
"animations/Stand/Waiting/PlayHands_2",
"animations/Stand/Waiting/PlayHands_3",
"animations/Stand/Waiting/Relaxation_1",
"animations/Stand/Waiting/Relaxation_2",
"animations/Stand/Waiting/Relaxation_3",
"animations/Stand/Waiting/Relaxation_4",
"animations/Stand/Waiting/Rest_1",
"animations/Stand/Waiting/Robot_1",
"animations/Stand/Waiting/ScratchBack_1",
"animations/Stand/Waiting/ScratchBottom_1",
"animations/Stand/Waiting/ScratchEye_1",
"animations/Stand/Waiting/ScratchHand_1",
"animations/Stand/Waiting/ScratchHead_1",
"animations/Stand/Waiting/ScratchLeg_1",
"animations/Stand/Waiting/ScratchTorso_1",
"animations/Stand/Waiting/ShowMuscles_1",
"animations/Stand/Waiting/ShowMuscles_2",
"animations/Stand/Waiting/ShowMuscles_3",
"animations/Stand/Waiting/ShowMuscles_4",
"animations/Stand/Waiting/ShowMuscles_5",
"animations/Stand/Waiting/ShowSky_1",
"animations/Stand/Waiting/ShowSky_2",
"animations/Stand/Waiting/SpaceShuttle_1",
"animations/Stand/Waiting/Stretch_1",
"animations/Stand/Waiting/Stretch_2",
"animations/Stand/Waiting/TakePicture_1",
"animations/Stand/Waiting/Taxi_1",
"animations/Stand/Waiting/Think_1",
"animations/Stand/Waiting/Think_2",
"animations/Stand/Waiting/Think_3",
"animations/Stand/Waiting/Think_4",
"animations/Stand/Waiting/Waddle_1",
"animations/Stand/Waiting/Waddle_2",
"animations/Stand/Waiting/WakeUp_1",
"animations/Stand/Waiting/Zombie_1"]

View File

@@ -3,28 +3,55 @@ import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
from robot_interface.state import state
from robot_interface.core.config import settings
from robot_interface.endpoints.face_detector import FaceDetectionSender
class MainReceiver(ReceiverBase):
def __init__(self, zmq_context, port=5555):
"""
The main receiver endpoint, responsible for handling ping and negotiation requests.
"""
The main receiver endpoint, responsible for handling ping and negotiation requests.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use.
:type port: int
"""
:param port: The port to use, defaults to value in `settings.agent_settings.main_receiver_port`.
:type port: int
"""
def __init__(self, zmq_context, port=None):
if port is None:
port = settings.agent_settings.main_receiver_port
super(MainReceiver, self).__init__("main")
self.create_socket(zmq_context, zmq.REP, port, bind=False)
@staticmethod
def _handle_ping(message):
"""A simple ping endpoint. Returns the provided data."""
"""
Handle a ping request.
Returns the provided data in a standardized response dictionary.
:param message: The ping request message.
:type message: dict
:return: A response dictionary containing the original data.
:rtype: dict[str, str | list[dict]]
"""
return {"endpoint": "ping", "data": message.get("data")}
@staticmethod
def _handle_port_negotiation(message):
"""
Handle a port negotiation request.
Returns a list of all known endpoints and their descriptions.
:param message: The negotiation request message.
:type message: dict
:return: A response dictionary with endpoint descriptions as data.
:rtype: dict[str, list[dict]]
"""
endpoints = [socket.endpoint_description() for socket in state.sockets]
return {"endpoint": "negotiate/ports", "data": endpoints}
@@ -32,13 +59,13 @@ class MainReceiver(ReceiverBase):
@staticmethod
def _handle_negotiation(message):
"""
Handle a negotiation request. Will respond with ports that can be used to connect to the robot.
Handle a negotiation request. Responds with ports that can be used to connect to the robot.
:param message: The negotiation request message.
:type message: dict
:return: A response dictionary with a 'ports' key containing a list of ports and their function.
:rtype: dict[str, list[dict]]
:return: A response dictionary with the negotiation result.
:rtype: dict[str, str | list[dict]]
"""
# In the future, the sender could send information like the robot's IP address, etc.
@@ -48,6 +75,17 @@ class MainReceiver(ReceiverBase):
return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."}
def handle_message(self, message):
"""
Main entry point for handling incoming messages.
Dispatches messages to the appropriate handler based on the endpoint.
:param message: The received message.
:type message: dict
:return: A response dictionary based on the requested endpoint.
:rtype: dict[str, str | list[dict]]
"""
if message["endpoint"] == "ping":
return self._handle_ping(message)
elif message["endpoint"].startswith("negotiate"):

View File

@@ -4,7 +4,7 @@ from robot_interface.endpoints.socket_base import SocketBase
class ReceiverBase(SocketBase, object):
"""Associated with a ZeroMQ socket."""
"""Base class for receivers associated with a ZeroMQ socket."""
__metaclass__ = ABCMeta
@abstractmethod

View File

@@ -2,18 +2,31 @@ from abc import ABCMeta
import zmq
from robot_interface.core.config import settings
class SocketBase(object):
"""
Base class for endpoints associated with a ZeroMQ socket.
:ivar identifier: The identifier of the endpoint.
:vartype identifier: str
:ivar port: The port used by the socket, set by `create_socket`.
:vartype port: int | None
:ivar socket: The ZeroMQ socket object, set by `create_socket`.
:vartype socket: zmq.Socket | None
:ivar bound: Whether the socket is bound or connected, set by `create_socket`.
:vartype bound: bool | None
"""
__metaclass__ = ABCMeta
name = None
socket = None
def __init__(self, identifier):
"""
:param identifier: The identifier of the endpoint.
:type identifier: str
"""
self.identifier = identifier
self.port = None # Set later by `create_socket`
self.socket = None # Set later by `create_socket`
@@ -32,8 +45,7 @@ class SocketBase(object):
:param port: The port to use.
:type port: int
:param options: A list of options to be set on the socket. The list contains tuples where the first element contains the option
and the second the value, for example (zmq.CONFLATE, 1).
:param options: A list of tuples where the first element contains the option and the second the value.
:type options: list[tuple[int, int]]
:param bind: Whether to bind the socket or connect to it.
@@ -49,7 +61,7 @@ class SocketBase(object):
if bind:
self.socket.bind("tcp://*:{}".format(port))
else:
self.socket.connect("tcp://localhost:{}".format(port))
self.socket.connect("tcp://{}:{}".format(settings.agent_settings.control_backend_host, port))
def close(self):
"""Close the ZeroMQ socket."""
@@ -62,7 +74,7 @@ class SocketBase(object):
Description of the endpoint. Used for negotiation.
:return: A dictionary with the following keys: id, port, bind. See API specification at:
https://utrechtuniversity.youtrack.cloud/articles/N25B-A-14/RI-CB-Communication#negotiation
https://utrechtuniversity.youtrack.cloud/articles/N25B-A-14/RI-CB-Communication#negotiation
:rtype: dict
"""
return {

View File

@@ -1,32 +1,44 @@
import zmq
import threading
import qi
import logging
from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state
from robot_interface.core.config import settings
class VideoSender(SocketBase):
def __init__(self, zmq_context, port=5556):
"""
Video sender endpoint, responsible for sending video frames.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use for sending video frames.
:type port: int
"""
def __init__(self, zmq_context, port=settings.agent_settings.video_sender_port):
super(VideoSender, self).__init__("video")
self.create_socket(zmq_context, zmq.PUB, port, [(zmq.CONFLATE,1)])
def start_video_rcv(self):
"""
Prepares arguments for retrieving video images from Pepper and starts video loop on a separate thread.
Will not start if no qi session is available.
"""
if not state.qi_session:
logging.info("No Qi session available. Not starting video loop.")
return
video = state.qi_session.service("ALVideoDevice")
camera_index = 0
kQVGA = 2
kRGB = 11
FPS = 15
vid_stream_name = video.subscribeCamera("Pepper Video", camera_index, kQVGA, kRGB, FPS)
video_settings = settings.video_config
camera_index = video_settings.camera_index
kQVGA = video_settings.resolution
kRGB = video_settings.color_space
FPS = video_settings.fps
video_name = video_settings.stream_name
vid_stream_name = video.subscribeCamera(video_name, camera_index, kQVGA, kRGB, FPS)
thread = threading.Thread(target=self.video_rcv_loop, args=(video, vid_stream_name))
thread.start()
@@ -38,12 +50,12 @@ class VideoSender(SocketBase):
:type vid_service: Object (Qi service object)
:param vid_stream_name: The name of a camera subscription on the video service object vid_service
:type vid_stream_name: String
:type vid_stream_name: str
"""
while not state.exit_event.is_set():
try:
img = vid_service.getImageRemote(vid_stream_name)
#Possibly limit images sent if queuing issues arise
self.socket.send(img[6])
self.socket.send(img[settings.video_config.image_buffer])
except:
logging.warn("Failed to retrieve video image from robot.")

View File

@@ -10,7 +10,10 @@ from robot_interface.endpoints.actuation_receiver import ActuationReceiver
from robot_interface.endpoints.main_receiver import MainReceiver
from robot_interface.endpoints.video_sender import VideoSender
from robot_interface.state import state
from robot_interface.core.config import settings
from robot_interface.utils.timeblock import TimeBlock
from robot_interface.endpoints.face_detector import FaceDetectionSender
def main_loop(context):
@@ -34,6 +37,12 @@ def main_loop(context):
video_sender.start_video_rcv()
audio_sender.start()
# --- Face detection sender ---
face_sender = FaceDetectionSender(context)
state.sockets.append(face_sender)
face_sender.start_face_detection()
# Sockets that can run on the main thread. These sockets' endpoints should not block for long (say 50 ms at most).
receivers = [main_receiver, actuation_receiver]
@@ -43,20 +52,9 @@ def main_loop(context):
logging.debug("Starting main loop.")
import schedule
test_speaking_message = {"data": "Hi, my name is Pepper, and this is quite a long message."}
def test_speak():
logging.debug("Testing speech.")
actuation_receiver._handle_speech(test_speaking_message)
schedule.every(10).seconds.do(test_speak)
while True:
if state.exit_event.is_set(): break
schedule.run_pending()
socks = dict(poller.poll(100))
socks = dict(poller.poll(settings.main_config.poll_timeout_ms))
for receiver in receivers:
if receiver.socket not in socks: continue
@@ -67,10 +65,17 @@ def main_loop(context):
continue
def overtime_callback(time_ms):
"""
A callback function executed by TimeBlock if the message handling
exceeds the allowed time limit.
:param time_ms: The elapsed time, in milliseconds, that the block took.
:type time_ms: float
"""
logging.warn("Endpoint \"%s\" took too long (%.2f ms) on the main thread.",
message["endpoint"], time_ms)
with TimeBlock(overtime_callback, 50):
with TimeBlock(overtime_callback, settings.main_config.max_handler_time_ms):
response = receiver.handle_message(message)
if receiver.socket.getsockopt(zmq.TYPE) == zmq.REP:
@@ -78,6 +83,12 @@ def main_loop(context):
def main():
"""
Initializes the ZeroMQ context and the application state.
It executes the main event loop (`main_loop`) and ensures that both the
application state and the ZeroMQ context are properly cleaned up (deinitialized/terminated)
upon exit, including handling a KeyboardInterrupt.
"""
context = zmq.Context()
state.initialize()

View File

@@ -12,15 +12,32 @@ class State(object):
This class is used to share state between threads. For example, when the program is quit, that all threads can
detect this via the `exit_event` property being set.
:ivar is_initialized: Flag indicating whether the state setup (exit handlers, QI session) has completed.
:vartype is_initialized: bool
:ivar exit_event: A thread event used to signal all threads that the program is shutting down.
:vartype exit_event: threading.Event | None
:ivar sockets: A list of ZeroMQ socket wrappers (`SocketBase`) that need to be closed during deinitialization.
:vartype sockets: List[SocketBase]
:ivar qi_session: The QI session object used for interaction with the robot/platform services.
:vartype qi_session: None | qi.Session
"""
def __init__(self):
self.is_initialized = False
self.exit_event = None
self.sockets = [] # type: List[SocketBase]
self.qi_session = None # type: None | ssl.SSLSession
self.is_speaking = False # type: Boolean
self.sockets = []
self.qi_session = None
self.is_speaking = False
def initialize(self):
"""
Sets up the application state. Creates the thread exit event, registers
signal handlers (`SIGINT`, `SIGTERM`) for graceful shutdown, and
establishes the QI session.
"""
if self.is_initialized:
logging.warn("Already initialized")
return
@@ -37,6 +54,9 @@ class State(object):
self.is_initialized = True
def deinitialize(self):
"""
Closes all sockets stored in the `sockets` list.
"""
if not self.is_initialized: return
for socket in self.sockets:
@@ -45,8 +65,24 @@ class State(object):
self.is_initialized = False
def __getattribute__(self, name):
# Enforce that the state is initialized before accessing any property (aside from the basic ones)
if name in ("initialize", "deinitialize", "is_initialized", "__dict__", "__class__"):
"""
Custom attribute access method that enforces a check: the state must be
fully initialized before any non-setup attributes (like `sockets` or `qi_session`)
can be accessed.
:param name: The name of the attribute being accessed.
:type name: str
:return: The value of the requested attribute.
:rtype: Any
"""
if name in (
"initialize",
"deinitialize",
"is_initialized",
"__dict__",
"__class__",
"__doc__"):
return object.__getattribute__(self, name)
if not object.__getattribute__(self, "is_initialized"):

View File

@@ -0,0 +1,32 @@
import os
from dotenv import load_dotenv
load_dotenv()
def get_config(value, env, default, cast=None):
"""
Small utility to get a configuration value, returns `value` if it is not None, else it will try to get the
environment variable cast with `cast`. If the environment variable is not set, it will return `default`.
:param value: The value to check.
:type value: Any
:param env: The environment variable to check.
:type env: string
:param default: The default value to return if the environment variable is not set.
:type default: Any
:param cast: A function to use to cast the environment variable. Must support string input.
:type cast: Callable[[Any], Any], optional
:return: The value, the environment variable value, or the default.
:rtype: Any
"""
if value is not None:
return value
env = os.environ.get(env, default)
if cast is None:
return env
return cast(env)

View File

@@ -1,5 +1,6 @@
from __future__ import unicode_literals # So that `print` can print Unicode characters in names
import logging
import sys
logger = logging.getLogger(__name__)
@@ -28,7 +29,7 @@ def choose_mic_interactive(audio):
:type audio: pyaudio.PyAudio
:return: A dictionary from PyAudio containing information about the microphone to use, or None
if there is no microphone.
if there is no microphone.
:rtype: dict | None
"""
microphones = list(get_microphones(audio))
@@ -60,10 +61,61 @@ def choose_mic_default(audio):
:type audio: pyaudio.PyAudio
:return: A dictionary from PyAudio containing information about the microphone to use, or None
if there is no microphone.
if there is no microphone.
:rtype: dict | None
"""
try:
return audio.get_default_input_device_info()
except IOError:
return None
def choose_mic_arguments(audio):
"""
Get a microphone to use from command line arguments.
:param audio: An instance of PyAudio to use.
:type audio: pyaudio.PyAudio
:return: A dictionary from PyAudio containing information about the microphone to use, or None
if there is no microphone satisfied by the arguments.
:rtype: dict | None
"""
microphone_name = None
for i, arg in enumerate(sys.argv):
if arg == "--microphone" and len(sys.argv) > i+1:
microphone_name = sys.argv[i+1].strip()
if arg.startswith("--microphone="):
pre_fix_len = len("--microphone=")
microphone_name = arg[pre_fix_len:].strip()
if not microphone_name: return None
available_mics = list(get_microphones(audio))
for mic in available_mics:
if mic["name"] == microphone_name:
return mic
available_mic_names = [mic["name"] for mic in available_mics]
logger.warning("Microphone \"{}\" not found. Choose one of {}"
.format(microphone_name, available_mic_names))
return None
def choose_mic(audio):
"""
Get a microphone to use. Firstly, tries to see if there's an application argument specifying the
microphone to use. If not, get the default microphone.
:param audio: An instance of PyAudio to use.
:type audio: pyaudio.PyAudio
:return: A dictionary from PyAudio containing information about the microphone to use, or None
if there is no microphone.
:rtype: dict | None
"""
chosen_mic = choose_mic_arguments(audio)
if chosen_mic: return chosen_mic
return choose_mic_default(audio)

View File

@@ -8,6 +8,12 @@ except ImportError:
def get_qi_session():
"""
Create and return a Qi session if available.
:return: The active Qi session or ``None`` if unavailable.
:rtype: qi.Session | None
"""
if qi is None:
logging.info("Unable to import qi. Running in stand-alone mode.")
return None

View File

@@ -5,27 +5,54 @@ class TimeBlock(object):
"""
A context manager that times the execution of the block it contains. If execution exceeds the
limit, or if no limit is given, the callback will be called with the time that the block took.
"""
def __init__(self, callback, limit_ms=None):
"""
:param callback: The callback function that is called when the block of code is over,
unless the code block did not exceed the time limit.
:type callback: Callable[[float], None]
:param limit_ms: The number of milliseconds the block of code is allowed to take. If it
:param callback: The callback function that is called when the block of code is over,
unless the code block did not exceed the time limit.
:type callback: Callable[[float], None]
:param limit_ms: The number of milliseconds the block of code is allowed to take. If it
exceeds this time, or if it's None, the callback function will be called with the time the
block took.
:type limit_ms: int | None
"""
:type limit_ms: int | None
:ivar limit_ms: The number of milliseconds the block of code is allowed to take.
:vartype limit_ms: float | None
:ivar callback: The callback function that is called when the block of code is over.
:vartype callback: Callable[[float], None]
ivar start: The start time of the block, set when entering the context.
:vartype start: float | None
"""
def __init__(self, callback, limit_ms=None):
self.limit_ms = float(limit_ms) if limit_ms is not None else None
self.callback = callback
self.start = None
def __enter__(self):
"""
Enter the context manager and record the start time.
:return: Returns itself so timing information can be accessed if needed.
:rtype: TimeBlock
"""
self.start = time.time()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""
Exit the context manager, calculate the elapsed time, and call the callback
if the time limit was exceeded or not provided.
:param exc_type: The exception type, or None if no exception occurred.
:type exc_type: Type[BaseException] | None
:param exc_value: The exception instance, or None if no exception occurred.
:type exc_value: BaseException | None
:param traceback: The traceback object, or None if no exception occurred.
:type traceback: TracebackType | None
"""
elapsed = (time.time() - self.start) * 1000.0 # ms
if self.limit_ms is None or elapsed > self.limit_ms:
self.callback(elapsed)

View File

@@ -1,8 +1,15 @@
from __future__ import unicode_literals # So that we can format strings with Unicode characters
import random
import sys
from StringIO import StringIO
from robot_interface.utils.microphone import choose_mic_default, choose_mic_interactive, get_microphones
from robot_interface.utils.microphone import (
choose_mic_default,
choose_mic_interactive,
choose_mic_arguments,
choose_mic,
get_microphones,
)
class MicrophoneUtils(object):
@@ -10,10 +17,12 @@ class MicrophoneUtils(object):
def test_choose_mic_default(self, pyaudio_instance):
"""
The result must contain at least "index", as this is used to identify the microphone.
The "name" is used for logging, so it should also exist.
It must have one or more channels.
Lastly it must be capable of sending at least 16000 samples per second.
Tests that the default microphone selection function returns a valid
microphone dictionary containing all necessary keys with correct types and values.
The result must contain at least "index", as this is used to identify the microphone,
and "name" for logging. It must have one or more channels (`maxInputChannels`),
and a default sample rate of at least 16000 Hz.
"""
result = choose_mic_default(pyaudio_instance)
assert "index" in result
@@ -32,8 +41,13 @@ class MicrophoneUtils(object):
def test_choose_mic_interactive_input_not_int(self, pyaudio_instance, mocker):
"""
First mock an input that's not an integer, then a valid integer. There should be no errors.
Tests the robustness of the interactive selection when the user first enters
a non-integer value, ensuring the system prompts again without error and accepts
a valid integer on the second attempt.
"""
microphones = get_microphones(pyaudio_instance)
target_microphone = next(microphones)
mock_input = mocker.patch("__builtin__.raw_input", side_effect=["not an integer", "0"])
fake_out = StringIO()
mocker.patch.object(sys, "stdout", fake_out)
@@ -41,7 +55,7 @@ class MicrophoneUtils(object):
result = choose_mic_interactive(pyaudio_instance)
assert "index" in result
assert isinstance(result["index"], (int, long))
assert result["index"] == 0
assert result["index"] == target_microphone["index"]
assert mock_input.called
@@ -49,8 +63,12 @@ class MicrophoneUtils(object):
def test_choose_mic_interactive_negative_index(self, pyaudio_instance, mocker):
"""
Make sure that the interactive method does not allow negative integers as input.
Tests that the interactive selection method prevents the user from entering
a negative integer as a microphone index.
"""
microphones = get_microphones(pyaudio_instance)
target_microphone = next(microphones)
mock_input = mocker.patch("__builtin__.raw_input", side_effect=["-1", "0"])
fake_out = StringIO()
mocker.patch.object(sys, "stdout", fake_out)
@@ -58,7 +76,7 @@ class MicrophoneUtils(object):
result = choose_mic_interactive(pyaudio_instance)
assert "index" in result
assert isinstance(result["index"], (int, long))
assert result["index"] == 0
assert result["index"] == target_microphone["index"]
assert mock_input.called
@@ -66,7 +84,8 @@ class MicrophoneUtils(object):
def test_choose_mic_interactive_index_too_high(self, pyaudio_instance, mocker):
"""
Make sure that the interactive method does not allow indices higher than the highest mic index.
Tests that the interactive selection method prevents the user from entering
an index that exceeds the total number of available microphones.
"""
real_count = len(list(get_microphones(pyaudio_instance)))
mock_input = mocker.patch("__builtin__.raw_input", side_effect=[str(real_count), "0"])
@@ -83,7 +102,9 @@ class MicrophoneUtils(object):
def test_choose_mic_interactive_random_index(self, pyaudio_instance, mocker):
"""
Get a random index from the list of available mics, make sure it's correct.
Tests the core interactive functionality by simulating the selection of a
random valid microphone index and verifying that the correct microphone
information is returned.
"""
microphones = list(get_microphones(pyaudio_instance))
random_index = random.randrange(len(microphones))
@@ -93,3 +114,77 @@ class MicrophoneUtils(object):
assert "index" in result
assert isinstance(result["index"], (int, long))
assert result["index"] == microphones[random_index]["index"]
def test_choose_mic_no_arguments(self, pyaudio_instance, mocker):
"""
Tests `choose_mic_arguments` when no command-line arguments are provided,
"""
mocker.patch.object(sys, "argv", [])
result = choose_mic_arguments(pyaudio_instance)
assert result is None
def test_choose_mic_arguments(self, pyaudio_instance, mocker):
"""
Tests `choose_mic_arguments` when the microphone name is passed as a separate
argument.
"""
for mic in get_microphones(pyaudio_instance):
mocker.patch.object(sys, "argv", ["--microphone", mic["name"]])
result = choose_mic_arguments(pyaudio_instance)
assert result is not None
assert result == mic
def test_choose_mic_arguments_eq(self, pyaudio_instance, mocker):
"""
Tests `choose_mic_arguments` when the microphone name is passed using an
equals sign (`--microphone=NAME`).
"""
for mic in get_microphones(pyaudio_instance):
mocker.patch.object(sys, "argv", ["--microphone={}".format(mic["name"])])
result = choose_mic_arguments(pyaudio_instance)
assert result is not None
assert result == mic
def test_choose_mic_arguments_not_exist(self, pyaudio_instance, mocker):
"""
Tests `choose_mic_arguments` when a non-existent microphone name is passed
via command-line arguments, expecting the function to return None.
"""
mocker.patch.object(sys, "argv", ["--microphone", "Surely this microphone doesn't exist"])
result = choose_mic_arguments(pyaudio_instance)
assert result is None
def test_choose_mic_with_argument(self, pyaudio_instance, mocker):
"""
Tests `choose_mic` function when a valid microphone is
specified via command-line arguments.
"""
mic = next(get_microphones(pyaudio_instance))
mocker.patch.object(sys, "argv", ["--microphone", mic["name"]])
result = choose_mic(pyaudio_instance)
assert result is not None
assert result == mic
def test_choose_mic_no_argument(self, pyaudio_instance, mocker):
"""
Tests `choose_mic` function when no command-line arguments
are provided, verifying that the function falls back correctly to the
system's default microphone selection.
"""
default_mic = choose_mic_default(pyaudio_instance)
mocker.patch.object(sys, "argv", [])
result = choose_mic(pyaudio_instance)
assert result is not None
assert result == default_mic

10
test/conftest.py Normal file
View File

@@ -0,0 +1,10 @@
from mock import patch, MagicMock
import pytest
@pytest.fixture(autouse=True)
def mock_zmq_context():
with patch("zmq.Context") as mock:
mock.instance.return_value = MagicMock()
yield mock

View File

@@ -0,0 +1,32 @@
from mock import patch, mock
from robot_interface.core.config import Settings
from robot_interface.endpoints.main_receiver import MainReceiver
def test_environment_variables(monkeypatch):
"""
When environment variables are set, creating settings should use these.
"""
monkeypatch.setenv("AGENT__CONTROL_BACKEND_HOST", "some_value_that_should_be_different")
settings = Settings()
assert settings.agent_settings.control_backend_host == "some_value_that_should_be_different"
@patch("robot_interface.endpoints.main_receiver.settings")
@patch("robot_interface.endpoints.socket_base.settings")
def test_create_endpoint_custom_host(base_settings, main_settings):
"""
When a custom host is given in the settings, check that an endpoint's socket connects to it.
"""
fake_context = mock.Mock()
fake_socket = mock.Mock()
fake_context.socket.return_value = fake_socket
base_settings.agent_settings.control_backend_host = "not_localhost"
main_settings.agent_settings.main_receiver_port = 9999
_ = MainReceiver(fake_context)
fake_socket.connect.assert_called_once_with("tcp://not_localhost:9999")

View File

@@ -7,6 +7,17 @@ from common.microphone_utils import MicrophoneUtils
@pytest.fixture
def pyaudio_instance():
"""
A pytest fixture that provides an initialized PyAudio instance for tests
requiring microphone access.
It first initializes PyAudio. If a default input device (microphone) is not
found, the test is skipped to avoid failures in environments
without a mic.
:return: An initialized PyAudio instance.
:rtype: pyaudio.PyAudio
"""
audio = pyaudio.PyAudio()
try:
audio.get_default_input_device_info()

View File

@@ -5,54 +5,149 @@ import pytest
import zmq
from robot_interface.endpoints.actuation_receiver import ActuationReceiver
from robot_interface.endpoints.gesture_settings import GestureTags
@pytest.fixture
def zmq_context():
"""
A pytest fixture that creates and yields a ZMQ context.
:return: An initialized ZeroMQ context.
:rtype: zmq.Context
"""
context = zmq.Context()
yield context
def test_handle_unimplemented_endpoint(zmq_context):
receiver = ActuationReceiver(zmq_context)
# Should not error
def test_force_speech_clears_queue(mocker):
"""
Tests that a force speech message clears the existing queue
and places the high-priority message at the front.
"""
mocker.patch("threading.Thread")
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tts_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._message_queue.put("old_message_1")
receiver._message_queue.put("old_message_2")
assert receiver._message_queue.qsize() == 2
force_msg = {
"endpoint": "actuate/speech",
"data": "Emergency Notification",
"is_priority": True,
}
receiver.handle_message(force_msg)
assert receiver._message_queue.qsize() == 1
queued_item = receiver._message_queue.get()
assert queued_item == "Emergency Notification"
def test_handle_unimplemented_endpoint(mocker):
"""
Tests handling of unknown endpoints.
"""
mocker.patch("threading.Thread")
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver.handle_message({
"endpoint": "some_endpoint_that_definitely_does_not_exist",
"data": None,
})
def test_speech_message_no_data(zmq_context, mocker):
mock_warn = mocker.patch("logging.warn")
def test_speech_message_no_data(mocker):
"""
Tests that if the message data is empty, the receiver returns immediately
WITHOUT attempting to access the global robot state or session.
"""
# 1. Prevent background threads from running
mocker.patch("threading.Thread")
# 2. Mock the global state object
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
receiver = ActuationReceiver(zmq_context)
# 3. Create a PropertyMock to track whenever 'qi_session' is accessed
# We attach it to the class type of the mock so it acts like a real property
mock_session_prop = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_session_prop
# 4. Initialize Receiver (Mocking the context to avoid ZMQ errors)
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# 5. Send empty data
receiver.handle_message({"endpoint": "actuate/speech", "data": ""})
mock_warn.assert_called_with(mock.ANY)
# 6. Assertion:
# Because the code does `if not text: return` BEFORE `if not state.qi_session`,
# the state property should NEVER be read.
mock_session_prop.assert_not_called()
def test_speech_message_invalid_data(zmq_context, mocker):
mock_warn = mocker.patch("logging.warn")
def test_speech_message_invalid_data(mocker):
"""
Tests that if the message data is not a string, the function returns.
:param mocker: Description
"""
mocker.patch("threading.Thread")
receiver = ActuationReceiver(zmq_context)
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_session_prop = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_session_prop
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver.handle_message({"endpoint": "actuate/speech", "data": True})
mock_warn.assert_called_with(mock.ANY)
# Because the code does `if not text: return` BEFORE `if not state.qi_session`,
# the state property should NEVER be read.
mock_session_prop.assert_not_called()
def test_speech_no_qi(zmq_context, mocker):
def test_speech_no_qi(mocker):
"""
Tests the actuation receiver's behavior when processing a speech request
but the global state does not have an active QI session.
"""
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi_session = mock.PropertyMock(return_value=None)
type(mock_state).qi_session = mock_qi_session
receiver = ActuationReceiver(zmq_context)
mock_tts_service = mock.Mock()
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._tts_service = mock_tts_service
receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."})
mock_qi_session.assert_called()
receiver._tts_service.assert_not_called()
def test_speech(zmq_context, mocker):
def test_speech(mocker):
"""
Tests the core speech actuation functionality by mocking the QI TextToSpeech
service and verifying that the received message is put into the queue.
"""
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
@@ -62,13 +157,330 @@ def test_speech(zmq_context, mocker):
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
receiver = ActuationReceiver(zmq_context)
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._tts_service = None
receiver._handle_speech({"endpoint": "actuate/speech", "data": "Some message to speak."})
mock_state.qi_session.service.assert_called_once_with("ALTextToSpeech")
assert receiver._message_queue.qsize() == 1
queued_item = receiver._message_queue.get()
assert queued_item == "Some message to speak."
def test_speech_priority(mocker):
"""
Tests that a priority speech message is handled correctly by clearing the queue
and placing the priority message at the front.
"""
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tts_service = mock.Mock()
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
receiver._message_queue.put("old_message_1")
receiver._message_queue.put("old_message_2")
assert receiver._message_queue.qsize() == 2
priority_msg = {
"endpoint": "actuate/speech",
"data": "Urgent Message",
"is_priority": True,
}
receiver._handle_speech(priority_msg)
assert receiver._message_queue.qsize() == 1
queued_item = receiver._message_queue.get()
assert queued_item == "Urgent Message"
def test_handle_messages_loop(mocker):
"""
Tests the background consumer loop (_handle_messages) processing an item.
Runs SYNCHRONOUSLY to ensure coverage tools pick up the lines.
"""
# Patch Thread so the real background thread NEVER starts automatically
mocker.patch("threading.Thread")
# Mock state
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Setup initial speaking state to False (covers "Started speaking" print)
mock_state.is_speaking = False
# Mock the TextToSpeech service
mock_tts_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_tts_service
# Initialize receiver (Thread is patched, so no thread starts)
# Use Mock Context to avoid ZMQ errors
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Manually inject service (since lazy loading might handle it, but this is safer)
receiver._tts_service = mock_tts_service
# This ensures the while loop iterates exactly once
mock_state.exit_event.is_set.side_effect = [False, True]
# Put an item in the queue
receiver._message_queue.put("Hello World")
# RUN MANUALLY in the main thread
# This executes the code: while -> try -> get -> if print -> speaking=True -> say
receiver._handle_messages()
# Assertions
assert receiver._message_queue.empty()
mock_tts_service.say.assert_called_with("Hello World")
assert mock_state.is_speaking is True
def test_handle_messages_queue_empty(mocker):
"""
Tests the Queue.Empty exception handler in the consumer loop.
This covers the logic that resets 'state.is_speaking' to False.
"""
# Prevent the real background thread from starting
mocker.patch("threading.Thread")
# Mock the state object
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Setup 'is_speaking' property mock
# We set return_value=True so the code enters the 'if state.is_speaking:' block.
# We use PropertyMock to track when this attribute is set.
type(mock_state).is_speaking = True
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# This ensures the while loop body runs exactly once for our test
mock_state.exit_event.is_set.side_effect = [False, True]
# Force get() to raise Queue.Empty immediately (simulate timeout)
# We patch the 'get' method on the specific queue instance of our receiver
#mocker.patch.object(receiver._message_queue, 'get', side_effect=Queue.Empty)
# Run the loop logic manually (synchronously)
receiver._handle_messages()
# Final Assertion: Verify is_speaking was set to False
# The code execution order is: read (returns True) -> print -> set (to False)
# assert_called_with checks the arguments of the LAST call, which is the setter.
assert mock_state.is_speaking is False
def test_handle_messages_runtime_error(mocker):
"""
Tests the RuntimeError exception handler (e.g. lost WiFi connection).
Uses a Mock ZMQ context to avoid 'Address already in use' errors.
"""
# Patch Thread so we don't accidentally spawn real threads
mocker.patch("threading.Thread")
# Mock the state and logging
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
# Use a MOCK ZMQ context.
# This prevents the receiver from trying to bind to a real TCP port.
mock_zmq_ctx = mock.Mock()
# Initialize receiver with the mock context
receiver = ActuationReceiver(mock_zmq_ctx)
mock_state.exit_event.is_set.side_effect = [False, True]
receiver._message_queue.put("Test Message")
# Setup: ...BUT the service raises RuntimeError when asked to speak
mock_tts = mock.Mock()
mock_tts.say.side_effect = RuntimeError("Connection lost")
receiver._tts_service = mock_tts
# Run the loop logic manually
receiver._handle_messages()
# Assertions
assert mock_state.exit_event.is_set.called
def test_clear_queue(mocker):
"""
Tests that the clear_queue method properly drains all items from the message queue.
"""
mocker.patch("threading.Thread")
# Use Mock Context
mock_zmq_ctx = mock.Mock()
receiver = ActuationReceiver(mock_zmq_ctx)
# Populate the queue with multiple items
receiver._message_queue.put("msg1")
receiver._message_queue.put("msg2")
receiver._message_queue.put("msg3")
assert receiver._message_queue.qsize() == 3
# Clear the queue
receiver.clear_queue()
# Assert the queue is empty
assert receiver._message_queue.qsize() == 0
def test_gesture_no_data(zmq_context, mocker):
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": ""}, True)
# Just ensuring no crash
def test_gesture_invalid_data(zmq_context, mocker):
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": 123}, True)
# No crash expected
def test_gesture_single_not_found(zmq_context, mocker):
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["wave", "bow"] # allowed single gestures
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "unknown_gesture"}, True)
# No crash expected
def test_gesture_tag_not_found(zmq_context, mocker):
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.tags = ["happy", "sad"]
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "not_a_tag"}, False)
# No crash expected
def test_gesture_no_qi_session(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.qi_session = None
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["hello"]
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "hello"}, True)
# No crash, path returns early
def test_gesture_single_success(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
# Setup gesture settings
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.single_gestures = ["wave"]
mock_animation_service = mock.Mock()
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/single", "data": "wave"}, True)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run
assert getattr(mock_qi, "async").call_args[0][1] == "wave"
def test_gesture_tag_success(zmq_context, mocker):
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.tags = ["greeting"]
mock_animation_service = mock.Mock()
mock_state.qi_session = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context)
receiver._handle_gesture({"endpoint": "actuate/gesture/tag", "data": "greeting"}, False)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.runTag
assert getattr(mock_qi, "async").call_args[0][1] == "greeting"
def test_handle_message_all_routes(zmq_context, mocker):
"""
Ensures all handle_message endpoint branches route correctly.
"""
receiver = ActuationReceiver(zmq_context)
mock_speech = mocker.patch.object(receiver, "_handle_speech")
mock_gesture = mocker.patch.object(receiver, "_handle_gesture")
receiver.handle_message({"endpoint": "actuate/speech", "data": "hi"})
receiver.handle_message({"endpoint": "actuate/gesture/tag", "data": "greeting"})
receiver.handle_message({"endpoint": "actuate/gesture/single", "data": "wave"})
mock_speech.assert_called_once()
assert mock_gesture.call_count == 2
def test_endpoint_description(zmq_context, mocker):
mock_tags = mocker.patch("robot_interface.endpoints.actuation_receiver.GestureTags")
mock_tags.tags = ["happy"]
mock_tags.single_gestures = ["wave"]
receiver = ActuationReceiver(zmq_context)
desc = receiver.endpoint_description()
assert "gestures" in desc
assert desc["gestures"] == ["happy"]
assert "single_gestures" in desc
assert desc["single_gestures"] == ["wave"]
def test_gesture_single_real_gesturetags(zmq_context, mocker):
"""
Uses the real GestureTags (no mocking) to ensure the receiver
references GestureTags.single_gestures correctly.
"""
# Ensure qi session exists so we pass the early return
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_state.qi_session = mock.Mock()
# Mock qi.async to avoid real async calls
mock_qi = mock.Mock()
sys.modules["qi"] = mock_qi
# Mock animation service
mock_animation_service = mock.Mock()
mock_state.qi_session.service.return_value = mock_animation_service
receiver = ActuationReceiver(zmq_context)
# Pick a real gesture from GestureTags.single_gestures
assert len(GestureTags.single_gestures) > 0, "GestureTags.single_gestures must not be empty"
gesture = GestureTags.single_gestures[0]
receiver._handle_gesture(
{"endpoint": "actuate/gesture/single", "data": gesture},
is_single=True,
)
mock_state.qi_session.service.assert_called_once_with("ALAnimationPlayer")
getattr(mock_qi, "async").assert_called_once()
assert getattr(mock_qi, "async").call_args[0][0] == mock_animation_service.run
assert getattr(mock_qi, "async").call_args[0][1] == gesture
mock_qi.async.assert_called_once()
call_args = mock_qi.async.call_args[0]
assert call_args[0] == mock_tts_service.say
assert call_args[1] == "Some message to speak."

View File

@@ -1,6 +1,5 @@
# coding=utf-8
import os
import time
import mock
import pytest
@@ -11,13 +10,22 @@ from robot_interface.endpoints.audio_sender import AudioSender
@pytest.fixture
def zmq_context():
"""
A pytest fixture that creates and yields a ZMQ context.
:return: An initialized ZeroMQ context.
:rtype: zmq.Context
"""
context = zmq.Context()
yield context
def test_no_microphone(zmq_context, mocker):
"""
Tests the scenario where no valid microphone can be chosen for recording.
"""
mock_info_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger.info")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = None
sender = AudioSender(zmq_context)
@@ -31,8 +39,12 @@ def test_no_microphone(zmq_context, mocker):
def test_unicode_mic_name(zmq_context, mocker):
"""
Tests the robustness of the `AudioSender` when handling microphone names
that contain Unicode characters.
"""
mocker.patch("robot_interface.endpoints.audio_sender.threading")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"• Some Unicode name"}
sender = AudioSender(zmq_context)
@@ -47,11 +59,17 @@ def test_unicode_mic_name(zmq_context, mocker):
def _fake_read(num_frames):
"""
Helper function to simulate reading raw audio data from a microphone stream.
"""
return os.urandom(num_frames * 4)
def test_sending_audio(mocker):
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default")
"""
Tests the successful sending of audio data over a ZeroMQ socket.
"""
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
@@ -59,7 +77,8 @@ def test_sending_audio(mocker):
mock_zmq_context = mock.Mock()
send_socket = mock.Mock()
mock_state.is_speaking = False
# If there's something wrong with the microphone, it will raise an IOError when `read`ing.
stream = mock.Mock()
stream.read = _fake_read
@@ -75,12 +94,48 @@ def test_sending_audio(mocker):
send_socket.assert_called()
def test_no_sending_if_speaking(mocker):
"""
Tests the successful sending of audio data over a ZeroMQ socket.
"""
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock()
send_socket = mock.Mock()
mock_state.is_speaking = True
# If there's something wrong with the microphone, it will raise an IOError when `read`ing.
stream = mock.Mock()
stream.read = _fake_read
sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket
sender.audio.open = mock.Mock()
sender.audio.open.return_value = stream
sender.start()
sender.wait_until_done()
send_socket.assert_not_called()
def _fake_read_error(num_frames):
"""
Helper function to simulate an I/O error during microphone stream reading.
"""
raise IOError()
def test_break_microphone(mocker):
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default")
"""
Tests the error handling when the microphone stream breaks (raises an IOError).
"""
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
@@ -102,3 +157,22 @@ def test_break_microphone(mocker):
sender.wait_until_done()
send_socket.assert_not_called()
def test_pyaudio_init_failure(mocker, zmq_context):
"""
Tests the behavior when PyAudio initialization fails (raises an IOError).
"""
# Prevent binding the ZMQ socket
mocker.patch("robot_interface.endpoints.audio_sender.AudioSender.create_socket")
# Simulate PyAudio() failing
mocker.patch(
"robot_interface.endpoints.audio_sender.pyaudio.PyAudio",
side_effect=IOError("boom")
)
sender = AudioSender(zmq_context)
assert sender.audio is None
assert sender.microphone is None

View File

@@ -0,0 +1,175 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals
import json
import mock
import pytest
from robot_interface.endpoints.face_detector import FaceDetectionSender
from robot_interface.state import state
@pytest.fixture(autouse=True)
def initialized_state(monkeypatch):
"""
Fully initialize global state so __getattribute__ allows access.
"""
# Bypass the initialization guard
monkeypatch.setattr(state, "is_initialized", True, raising=False)
# Install a controllable exit_event
exit_event = mock.Mock()
exit_event.is_set = mock.Mock(return_value=True)
monkeypatch.setattr(state, "exit_event", exit_event, raising=False)
# Default qi_session is None unless overridden
monkeypatch.setattr(state, "qi_session", None, raising=False)
yield
def test_start_face_detection_no_qi_session():
"""
Returns early when qi_session is None.
"""
sender = FaceDetectionSender(mock.Mock())
sender.start_face_detection()
assert sender._face_thread is None
assert sender._face_service is None
assert sender._memory_service is None
def test_start_face_detection_happy_path(mocker):
"""
Initializes services and starts background thread.
"""
mock_face = mock.Mock()
mock_memory = mock.Mock()
mock_qi = mock.Mock()
mock_qi.service.side_effect = lambda name: {
"ALFaceDetection": mock_face,
"ALMemory": mock_memory,
}[name]
state.qi_session = mock_qi
fake_thread = mock.Mock()
mocker.patch("threading.Thread", return_value=fake_thread)
sender = FaceDetectionSender(mock.Mock())
sender.start_face_detection()
mock_face.setTrackingEnabled.assert_called_with(False)
mock_face.setRecognitionEnabled.assert_called_with(False)
mock_face.subscribe.assert_called_once()
fake_thread.start.assert_called_once()
def test_face_loop_face_detected_true(mocker):
"""
Sends face_detected=True when face data exists.
"""
sender = FaceDetectionSender(mock.Mock())
sender._memory_service = mock.Mock()
sender._memory_service.getData.return_value = [0, [[1]]]
sender.socket = mock.Mock()
mocker.patch("time.sleep")
state.exit_event.is_set.side_effect = [False, True]
sender._face_loop()
sent = sender.socket.send.call_args[0][0]
payload = json.loads(sent.decode("utf-8"))
assert payload["face_detected"] is True
def test_face_loop_face_detected_false(mocker):
"""
Sends face_detected=False when no face data exists.
"""
sender = FaceDetectionSender(mock.Mock())
sender._memory_service = mock.Mock()
sender._memory_service.getData.return_value = []
sender.socket = mock.Mock()
mocker.patch("time.sleep")
state.exit_event.is_set.side_effect = [False, True]
sender._face_loop()
sent = sender.socket.send.call_args[0][0]
payload = json.loads(sent.decode("utf-8"))
assert not payload["face_detected"]
def test_face_loop_handles_exception(mocker):
"""
Exceptions inside loop are swallowed.
"""
sender = FaceDetectionSender(mock.Mock())
sender._memory_service = mock.Mock()
sender._memory_service.getData.side_effect = Exception("boom")
sender.socket = mock.Mock()
mocker.patch("time.sleep")
state.exit_event.is_set.side_effect = [False, True]
# Must not raise
sender._face_loop()
def test_stop_face_detection_happy_path():
"""
Unsubscribes and disables tracking.
"""
sender = FaceDetectionSender(mock.Mock())
mock_face = mock.Mock()
sender._face_service = mock_face
sender.stop_face_detection()
mock_face.unsubscribe.assert_called_once()
mock_face.setTrackingEnabled.assert_called_with(False)
def test_stop_face_detection_exception():
"""
stop_face_detection swallows service exceptions.
"""
sender = FaceDetectionSender(mock.Mock())
mock_face = mock.Mock()
mock_face.unsubscribe.side_effect = Exception("fail")
sender._face_service = mock_face
sender.stop_face_detection()
def test_close_calls_stop_face_detection(mocker):
"""
close() calls parent close and stop_face_detection().
"""
sender = FaceDetectionSender(mock.Mock())
mocker.patch.object(sender, "stop_face_detection")
mocker.patch(
"robot_interface.endpoints.face_detector.SocketBase.close"
)
sender.close()
sender.stop_face_detection.assert_called_once()

View File

@@ -0,0 +1,45 @@
from robot_interface.utils.get_config import get_config
def test_get_config_prefers_explicit_value(monkeypatch):
"""
When a direct value is provided it should be returned without reading the environment.
"""
monkeypatch.setenv("GET_CONFIG_TEST", "from-env")
result = get_config("explicit", "GET_CONFIG_TEST", "default")
assert result == "explicit"
def test_get_config_returns_env_value(monkeypatch):
"""
If value is None the environment variable should be used.
"""
monkeypatch.setenv("GET_CONFIG_TEST", "from-env")
result = get_config(None, "GET_CONFIG_TEST", "default")
assert result == "from-env"
def test_get_config_casts_env_value(monkeypatch):
"""
The env value should be cast when a cast function is provided.
"""
monkeypatch.setenv("GET_CONFIG_PORT", "1234")
result = get_config(None, "GET_CONFIG_PORT", 0, int)
assert result == 1234
def test_get_config_casts_default_when_env_missing(monkeypatch):
"""
When the env var is missing it should fall back to the default and still apply the cast.
"""
monkeypatch.delenv("GET_CONFIG_MISSING", raising=False)
result = get_config(None, "GET_CONFIG_MISSING", "42", int)
assert result == 42

227
test/unit/test_main.py Normal file
View File

@@ -0,0 +1,227 @@
import pytest
import threading
import zmq
import robot_interface.main as main_mod
from robot_interface.state import state
class FakeSocket:
"""Mock ZMQ socket for testing."""
def __init__(self, socket_type, messages=None):
self.socket_type = socket_type
self.messages = messages or []
self.sent = []
self.closed = False
def recv_json(self):
if not self.messages:
raise RuntimeError("No more messages")
return self.messages.pop(0)
def send_json(self, msg):
self.sent.append(msg)
def getsockopt(self, opt):
if opt == zmq.TYPE:
return self.socket_type
def close(self):
self.closed = True
class FakeReceiver:
"""Base class for main/actuation receivers."""
def __init__(self, socket):
self.socket = socket
self._called = []
def handle_message(self, msg):
self._called.append(msg)
return {"endpoint": "pong", "data": "ok"}
def close(self):
pass
class DummySender:
"""Mock sender to test start methods."""
def __init__(self):
self.called = False
def start_video_rcv(self):
self.called = True
def start(self):
self.called = True
def start_face_detection(self):
self.called = True
def close(self):
pass
@pytest.fixture
def fake_sockets():
"""Create default fake main and actuation sockets."""
main_sock = FakeSocket(zmq.REP)
act_sock = FakeSocket(zmq.SUB)
return main_sock, act_sock
@pytest.fixture
def fake_poll(monkeypatch):
"""Patch zmq.Poller to simulate a single polling cycle based on socket messages."""
class FakePoller:
def __init__(self):
self.registered = {}
self.used = False
def register(self, socket, flags):
self.registered[socket] = flags
def poll(self, timeout):
# Only return sockets that still have messages
active_socks = {
s: flags
for s, flags
in self.registered.items()
if getattr(s, "messages", [])
}
if active_socks:
return active_socks
# No more messages, exit loop
state.exit_event.set()
return {}
poller_instance = FakePoller()
monkeypatch.setattr(main_mod.zmq, "Poller", lambda: poller_instance)
return poller_instance
@pytest.fixture
def patched_main_components(monkeypatch, fake_sockets, fake_poll):
"""
Fixture to patch main receivers and senders with fakes.
Returns the fake instances for inspection in tests.
"""
main_sock, act_sock = fake_sockets
fake_main = FakeReceiver(main_sock)
fake_act = FakeReceiver(act_sock)
video_sender = DummySender()
audio_sender = DummySender()
face_sender = DummySender()
monkeypatch.setattr(main_mod, "MainReceiver", lambda ctx: fake_main)
monkeypatch.setattr(main_mod, "ActuationReceiver", lambda ctx: fake_act)
monkeypatch.setattr(main_mod, "VideoSender", lambda ctx: video_sender)
monkeypatch.setattr(main_mod, "AudioSender", lambda ctx: audio_sender)
monkeypatch.setattr(main_mod, "FaceDetectionSender", lambda ctx: face_sender)
# Register sockets for the fake poller
fake_poll.registered = {main_sock: zmq.POLLIN, act_sock: zmq.POLLIN}
return fake_main, fake_act, video_sender, audio_sender
def test_main_loop_rep_response(patched_main_components):
"""REP socket returns proper response and handlers are called."""
state.initialize()
fake_main, fake_act, video_sender, audio_sender = patched_main_components
fake_main.socket.messages = [{"endpoint": "ping", "data": "x"}]
fake_act.socket.messages = [{"endpoint": "actuate/speech", "data": "hello"}]
main_mod.main_loop(object())
assert fake_main.socket.sent == [{"endpoint": "pong", "data": "ok"}]
assert fake_main._called
assert fake_act._called
assert video_sender.called
assert audio_sender.called
state.deinitialize()
@pytest.mark.parametrize(
"messages",
[
[{"no_endpoint": True}], # Invalid dict
[["not", "a", "dict"]] # Non-dict message
]
)
def test_main_loop_invalid_or_non_dict_message(patched_main_components, messages):
"""Invalid or non-dict messages are ignored."""
state.initialize()
fake_main, _, _, _ = patched_main_components
fake_main.socket.messages = messages
main_mod.main_loop(object())
assert fake_main.socket.sent == []
state.deinitialize()
def test_main_loop_handler_returns_none(patched_main_components, monkeypatch):
"""Handler returning None still triggers send_json(None)."""
state.initialize()
fake_main, _, _, _ = patched_main_components
class NoneHandler(FakeReceiver):
def handle_message(self, msg):
self._called.append(msg)
return None
monkeypatch.setattr(main_mod, "MainReceiver", lambda ctx: NoneHandler(fake_main.socket))
fake_main.socket.messages = [{"endpoint": "some", "data": None}]
main_mod.main_loop(object())
assert fake_main.socket.sent == [None]
state.deinitialize()
def test_main_loop_overtime_callback(patched_main_components, monkeypatch):
"""TimeBlock callback is triggered if handler takes too long."""
state.initialize()
fake_main, _, _, _ = patched_main_components
fake_main.socket.messages = [{"endpoint": "ping", "data": "x"}]
class FakeTimeBlock:
def __init__(self, callback, limit_ms):
self.callback = callback
def __enter__(self):
return self
def __exit__(self, *a):
self.callback(999.0)
monkeypatch.setattr(main_mod, "TimeBlock", FakeTimeBlock)
main_mod.main_loop(object())
assert fake_main.socket.sent == [{"endpoint": "pong", "data": "ok"}]
state.deinitialize()
def test_main_keyboard_interrupt(monkeypatch):
"""main() handles KeyboardInterrupt and cleans up."""
called = {"deinitialized": False, "term_called": False}
class FakeContext:
def term(self): called["term_called"] = True
monkeypatch.setattr(main_mod.zmq, "Context", lambda: FakeContext())
def raise_keyboard_interrupt(*_):
raise KeyboardInterrupt()
monkeypatch.setattr(main_mod, "main_loop", raise_keyboard_interrupt)
def fake_initialize():
state.is_initialized = True
state.exit_event = threading.Event()
def fake_deinitialize():
called["deinitialized"] = True
state.is_initialized = False
monkeypatch.setattr(main_mod.state, "initialize", fake_initialize)
monkeypatch.setattr(main_mod.state, "deinitialize", fake_deinitialize)
main_mod.main()
assert called["term_called"] is True
assert called["deinitialized"] is True

View File

@@ -7,11 +7,20 @@ from robot_interface.endpoints.main_receiver import MainReceiver
@pytest.fixture
def zmq_context():
"""
A pytest fixture that creates and yields a ZMQ context.
:return: An initialized ZeroMQ context.
:rtype: zmq.Context
"""
context = zmq.Context()
yield context
def test_handle_ping(zmq_context):
"""
Tests the receiver's ability to handle the "ping" endpoint with data.
"""
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({"endpoint": "ping", "data": "pong"})
@@ -22,6 +31,10 @@ def test_handle_ping(zmq_context):
def test_handle_ping_none(zmq_context):
"""
Tests the receiver's ability to handle the ping endpoint when the
data field is explicitly set to None.
"""
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({"endpoint": "ping", "data": None})
@@ -33,6 +46,9 @@ def test_handle_ping_none(zmq_context):
@mock.patch("robot_interface.endpoints.main_receiver.state")
def test_handle_negotiate_ports(mock_state, zmq_context):
"""
Tests the handling of the "negotiate/ports" endpoint.
"""
receiver = MainReceiver(zmq_context)
mock_state.sockets = [receiver]
@@ -54,6 +70,10 @@ def test_handle_negotiate_ports(mock_state, zmq_context):
def test_handle_unimplemented_endpoint(zmq_context):
"""
Tests that the receiver correctly handles a request to a completely
unknown or non-existent endpoint.
"""
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({
"endpoint": "some_endpoint_that_definitely_does_not_exist",
@@ -67,6 +87,13 @@ def test_handle_unimplemented_endpoint(zmq_context):
def test_handle_unimplemented_negotiation_endpoint(zmq_context):
"""
Tests handling a request to an unknown sub-endpoint within a known
group
The expected behavior is to return a specific "negotiate/error" response
with a descriptive error string.
"""
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({
"endpoint": "negotiate/but_some_subpath_that_definitely_does_not_exist",

View File

@@ -7,6 +7,16 @@ from robot_interface.utils.microphone import choose_mic_default, choose_mic_inte
class MockPyAudio:
"""
A mock implementation of the PyAudio library class, designed for testing
microphone utility functions without requiring actual audio hardware.
It provides fake devices, including one input microphone, and implements
the core PyAudio methods required for device enumeration.
:ivar devices: A list of dictionaries representing mock audio devices.
:vartype devices: List[Dict[str, Any]]
"""
def __init__(self):
# You can predefine fake device info here
self.devices = [
@@ -37,18 +47,36 @@ class MockPyAudio:
]
def get_device_count(self):
"""Return the number of available mock devices."""
"""
Returns the number of available mock devices.
:return: The total number of devices in the mock list.
:rtype: int
"""
return len(self.devices)
def get_device_info_by_index(self, index):
"""Return information for a given mock device index."""
"""
Returns information for a given mock device index.
:param index: The index of the device to retrieve.
:type index: int
:return: A dictionary containing device information.
:rtype: Dict[str, Any]
"""
if 0 <= index < len(self.devices):
return self.devices[index]
else:
raise IOError("Invalid device index: {}".format(index))
def get_default_input_device_info(self):
"""Return info for a default mock input device."""
"""
Returns information for the default mock input device.
:return: A dictionary containing the default input device information.
:rtype: Dict[str, Any]
"""
for device in self.devices:
if device.get("maxInputChannels", 0) > 0:
return device
@@ -57,16 +85,32 @@ class MockPyAudio:
@pytest.fixture
def pyaudio_instance():
"""
A pytest fixture that returns an instance of the `MockPyAudio` class.
:return: An initialized instance of the mock PyAudio class.
:rtype: MockPyAudio
"""
return MockPyAudio()
def _raise_io_error():
"""
Helper function used to mock PyAudio methods that are expected to fail
when no device is available.
"""
raise IOError()
class TestAudioUnit(MicrophoneUtils):
"""Run shared audio behavior tests with the mock implementation."""
"""
Runs the shared microphone behavior tests defined in `MicrophoneUtils` using
the mock PyAudio implementation.
"""
def test_choose_mic_default_no_mic(self):
"""
Tests `choose_mic_default` when no microphones are available.
"""
mock_pyaudio = mock.Mock()
mock_pyaudio.get_device_count = mock.Mock(return_value=0L)
mock_pyaudio.get_default_input_device_info = _raise_io_error
@@ -76,6 +120,9 @@ class TestAudioUnit(MicrophoneUtils):
assert result is None
def test_choose_mic_interactive_no_mic(self):
"""
Tests `choose_mic_interactive` when no microphones are available.
"""
mock_pyaudio = mock.Mock()
mock_pyaudio.get_device_count = mock.Mock(return_value=0L)
mock_pyaudio.get_default_input_device_info = _raise_io_error

View File

@@ -0,0 +1,90 @@
import sys
# Import module under test
import robot_interface.utils.qi_utils as qi_utils
def reload_qi_utils_with(qi_module):
"""
Helper: reload qi_utils after injecting a fake qi module.
Python 2 uses built-in reload().
Just changing sys.modules[qi] won't affect the already imported module.
"""
if qi_module is None:
if "qi" in sys.modules:
del sys.modules["qi"]
else:
sys.modules["qi"] = qi_module
# Python 2 reload
global qi_utils
qi_utils = reload(qi_utils)
def test_get_qi_session_no_qi_module():
"""
Tests the 'qi is None' path.
"""
reload_qi_utils_with(None)
session = qi_utils.get_qi_session()
assert session is None
def test_get_qi_session_no_qi_url_argument(monkeypatch):
"""
Tests the '--qi-url not in sys.argv' path.
"""
class FakeQi:
pass
reload_qi_utils_with(FakeQi())
monkeypatch.setattr(sys, "argv", ["pytest"])
session = qi_utils.get_qi_session()
assert session is None
def test_get_qi_session_runtime_error(monkeypatch):
"""
Tests the 'exept RuntineError' path.
"""
class FakeApp:
def start(self):
raise RuntimeError("boom")
class FakeQi:
Application = lambda self=None: FakeApp()
reload_qi_utils_with(FakeQi())
monkeypatch.setattr(sys, "argv", ["pytest", "--qi-url", "tcp://localhost"])
session = qi_utils.get_qi_session()
assert session is None
def test_get_qi_session_success(monkeypatch):
"""
Tests a valid path.
"""
class FakeSession:
pass
class FakeApp:
def __init__(self):
self.session = FakeSession()
def start(self):
return True
class FakeQi:
Application = lambda self=None: FakeApp()
reload_qi_utils_with(FakeQi())
monkeypatch.setattr(sys, "argv", ["pytest", "--qi-url", "tcp://localhost"])
session = qi_utils.get_qi_session()
assert isinstance(session, FakeSession)

View File

@@ -0,0 +1,19 @@
import pytest
from robot_interface.endpoints.receiver_base import ReceiverBase
def test_receiver_base_not_implemented(monkeypatch):
"""
Ensure that the base ReceiverBase raises NotImplementedError when
handle_message is called on a subclass that does not implement it.
"""
# Patch the __abstractmethods__ to allow instantiation
monkeypatch.setattr(ReceiverBase, "__abstractmethods__", frozenset())
class DummyReceiver(ReceiverBase):
pass
dummy = DummyReceiver("dummy") # Can now instantiate
with pytest.raises(NotImplementedError):
dummy.handle_message({"endpoint": "dummy", "data": None})

View File

@@ -0,0 +1,55 @@
import mock
import zmq
from robot_interface.endpoints.socket_base import SocketBase
def test_close_covers_both_branches():
"""
Exercise both possible paths inside SocketBase.close():
- when no socket exists (should just return),
- when a socket object is present (its close() method should be called).
"""
sb = SocketBase("x")
# First check the case where socket is None. Nothing should happen here.
sb.close()
# Now simulate a real socket so the close() call is triggered.
fake_socket = mock.Mock()
sb.socket = fake_socket
sb.close()
fake_socket.close.assert_called_once()
def test_create_socket_and_endpoint_description_full_coverage():
"""
Test the less-commonly used branch of create_socket() where bind=False.
This covers:
- the loop that sets socket options,
- the connect() path,
- the logic in endpoint_description() that inverts self.bound.
"""
fake_context = mock.Mock()
fake_socket = mock.Mock()
# The context should hand back our fake socket object.
fake_context.socket.return_value = fake_socket
sb = SocketBase("id")
# Calling create_socket with bind=False forces the connect() code path.
sb.create_socket(
zmq_context=fake_context,
socket_type=zmq.SUB,
port=9999,
options=[(zmq.CONFLATE, 1)], # one option is enough to hit the loop
bind=False,
)
fake_socket.setsockopt.assert_called_once_with(zmq.CONFLATE, 1)
fake_socket.connect.assert_called_once_with("tcp://localhost:9999")
# Check that endpoint_description reflects bound=False -> "bind": True
desc = sb.endpoint_description()
assert desc == {"id": "id", "port": 9999, "bind": True}

108
test/unit/test_state.py Normal file
View File

@@ -0,0 +1,108 @@
import threading
import signal
import pytest
import mock
from robot_interface.state import State
def test_initialize_does_not_reinitialize():
"""
Check that calling `initialize` on an already initialized state does not change existing
attributes.
"""
state = State()
# Mock qi_session to avoid real session creation
mock_session = mock.MagicMock()
state.qi_session = mock_session
# Set state as already initialized
state.is_initialized = True
old_exit_event = state.exit_event
# Call initialize
state.initialize()
# Ensure existing attributes were not overwritten
assert state.exit_event == old_exit_event # exit_event should not be recreated
assert state.qi_session == mock_session # qi_session should not be replaced
assert state.is_initialized is True # is_initialized should remain True
def test_deinitialize_behavior():
"""Check that deinitialize closes sockets and updates the initialization state correctly."""
state = State()
# Case 1: Initialized with sockets
state.is_initialized = True
mock_socket_1 = mock.Mock()
mock_socket_2 = mock.Mock()
state.sockets = [mock_socket_1, mock_socket_2]
state.deinitialize()
# Sockets should be closed
mock_socket_1.close.assert_called_once()
mock_socket_2.close.assert_called_once()
# State should be marked as not initialized
assert not state.is_initialized
# Case 2: Not initialized, should not raise
state.is_initialized = False
state.sockets = []
state.deinitialize()
assert not state.is_initialized
def test_access_control_before_initialization():
"""Verify that accessing certain attributes before initialization raises RuntimeError."""
state = State()
with pytest.raises(RuntimeError, match=".*sockets.*"):
_ = state.sockets
with pytest.raises(RuntimeError, match=".*qi_session.*"):
_ = state.qi_session
def test_exit_event_before_initialized_returns_if_set():
"""Check that exit_event can be accessed even if state is not initialized,
but only if it is set."""
state = State()
# Manually create and set the exit_event
object.__setattr__(state, "exit_event", threading.Event())
object.__getattribute__(state, "exit_event").set()
# Should return the event without raising
assert state.exit_event.is_set()
def test_getattribute_allowed_attributes_before_init():
"""Ensure attributes allowed before initialization can be accessed without error."""
state = State()
assert callable(state.initialize)
assert callable(state.deinitialize)
assert state.is_initialized is False
assert state.__dict__ is not None
assert state.__class__.__name__ == "State"
assert state.__doc__ is not None
def test_signal_handler_sets_exit_event(monkeypatch):
"""Ensure SIGINT triggers the exit_event via signal handler."""
state = State()
# Patch get_qi_session to prevent real session creation
monkeypatch.setattr("robot_interface.state.get_qi_session", lambda: "dummy_session")
# Initialize state to set up signal handlers
state.initialize()
# Simulate SIGINT
signal_handler = signal.getsignal(signal.SIGINT)
signal_handler(None, None)
# Exit event should be set
assert state.exit_event.is_set()

View File

@@ -6,11 +6,21 @@ from robot_interface.utils.timeblock import TimeBlock
class AnyFloat(object):
"""
A helper class used in tests to assert that a mock function was called
with an argument that is specifically a float, regardless of its value.
It overrides the equality comparison (`__eq__`) to check only the type.
"""
def __eq__(self, other):
return isinstance(other, float)
def test_no_limit():
"""
Tests the scenario where the `TimeBlock` context manager is used without
a time limit.
"""
callback = mock.Mock()
with TimeBlock(callback):
@@ -20,6 +30,10 @@ def test_no_limit():
def test_exceed_limit():
"""
Tests the scenario where the execution time within the `TimeBlock`
exceeds the provided limit.
"""
callback = mock.Mock()
with TimeBlock(callback, 0):
@@ -29,6 +43,10 @@ def test_exceed_limit():
def test_within_limit():
"""
Tests the scenario where the execution time within the `TimeBlock`
stays within the provided limit.
"""
callback = mock.Mock()
with TimeBlock(callback, 5):

View File

@@ -0,0 +1,99 @@
# coding=utf-8
import mock
import pytest
import zmq
from robot_interface.endpoints.video_sender import VideoSender
from robot_interface.state import state
from robot_interface.core.config import settings
@pytest.fixture
def zmq_context():
"""Provide a ZMQ context."""
yield zmq.Context()
def _patch_basics(mocker):
"""Common patches: prevent real threads, port binds, and state errors."""
mocker.patch("robot_interface.endpoints.socket_base.zmq.Socket.bind")
mocker.patch("robot_interface.endpoints.video_sender.threading.Thread")
mocker.patch.object(state, "is_initialized", True)
def _patch_exit_event(mocker):
"""Make exit_event stop the loop after one iteration."""
fake_event = mock.Mock()
fake_event.is_set.side_effect = [False, True]
mocker.patch.object(state, "exit_event", fake_event)
def test_no_qi_session(zmq_context, mocker):
"""Video loop should not start without a qi_session."""
_patch_basics(mocker)
mocker.patch.object(state, "qi_session", None)
sender = VideoSender(zmq_context)
sender.start_video_rcv()
assert not hasattr(sender, "thread")
def test_video_streaming(zmq_context, mocker):
"""VideoSender should send retrieved image data."""
_patch_basics(mocker)
_patch_exit_event(mocker)
# Pepper's image buffer lives at index 6
mocker.patch.object(settings.video_config, "image_buffer", 6)
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.return_value = [None]*6 + ["fake_img"]
fake_session = mock.Mock()
fake_session.service.return_value = mock_video_service
mocker.patch.object(state, "qi_session", fake_session)
mocker.patch.object(
fake_session.service("ALVideoDevice"),
"subscribeCamera",
return_value="stream_name"
)
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send = send_socket
sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_called_with("fake_img")
def test_video_receive_error(zmq_context, mocker):
"""Errors retrieving images should not call send()."""
_patch_basics(mocker)
_patch_exit_event(mocker)
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.side_effect = Exception("boom")
fake_session = mock.Mock()
fake_session.service.return_value = mock_video_service
mocker.patch.object(state, "qi_session", fake_session)
mocker.patch.object(
fake_session.service("ALVideoDevice"),
"subscribeCamera",
return_value="stream_name"
)
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send = send_socket
sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_not_called()