43 Commits

Author SHA1 Message Date
JobvAlewijk
c86eda497c Merge branch 'feat/ci-cd' into 'dev'
Introduce CI/CD with tests

See merge request ics/sp/2025/n25b/pepperplus-ri!19
2025-12-03 15:23:37 +00:00
Twirre Meulenbelt
94b92b3e4a feat: re-introduce git hooks
Now using the standardized method from the CB.

ref: N25B-367
2025-12-02 22:04:46 +01:00
Twirre Meulenbelt
f469e4ce36 fix: install in a .venv artifact
This artifact can be reused in different stages.

ref: N25B-367
2025-12-02 21:46:24 +01:00
Twirre Meulenbelt
28a556becd feat: introduce CI/CD with tests
Using a custom base image installed on the runner, the installation and tests should work (fast).

ref: N25B-367
2025-12-02 21:12:15 +01:00
Twirre
89c9f2ebea Merge branch 'test/video-sender' into 'dev'
test: added full video sender coverage tests

See merge request ics/sp/2025/n25b/pepperplus-ri!18
2025-11-24 20:41:08 +00:00
JobvAlewijk
96f328d56c test: added full video sender coverage tests 2025-11-24 20:41:08 +00:00
Twirre
4d634a3b4e Merge branch 'test/main-start' into 'dev'
test: added main tests

See merge request ics/sp/2025/n25b/pepperplus-ri!17
2025-11-24 20:37:59 +00:00
JobvAlewijk
e2a71ad6c2 test: added main tests 2025-11-24 20:37:59 +00:00
2fcd885a00 Merge branch 'test/state' into 'dev'
test: added tests for full state coverage

See merge request ics/sp/2025/n25b/pepperplus-ri!16
2025-11-24 20:24:19 +00:00
JobvAlewijk
336acac440 test: added tests for full state coverage 2025-11-24 20:24:19 +00:00
Twirre
f4fbc69c7f Merge branch 'test/reciever-base' into 'dev'
test: added not overridden reciever base test

See merge request ics/sp/2025/n25b/pepperplus-ri!15
2025-11-24 20:06:58 +00:00
JobvAlewijk
fbe8f59c38 test: added not overridden reciever base test 2025-11-24 20:06:58 +00:00
Twirre
e99d7e8557 Merge branch 'test/audio-sender' into 'dev'
test: added init failure test in audio sender

See merge request ics/sp/2025/n25b/pepperplus-ri!14
2025-11-24 20:05:10 +00:00
JobvAlewijk
2350f6eec7 test: added init failure test in audio sender 2025-11-24 20:05:10 +00:00
Twirre
2852b714f5 Merge branch 'test/qi-utils' into 'dev'
test: added qi_utils test

See merge request ics/sp/2025/n25b/pepperplus-ri!12
2025-11-24 20:02:28 +00:00
JobvAlewijk
7628e47478 test: added qi_utils test 2025-11-24 20:02:28 +00:00
Twirre
36f5fae45c Merge branch 'test/socket-base' into 'dev'
test: added socket base tests

See merge request ics/sp/2025/n25b/pepperplus-ri!13
2025-11-24 13:32:31 +00:00
JobvAlewijk
6ea870623b test: added socket base tests 2025-11-24 13:32:31 +00:00
8d6dd23acb Merge branch 'chore/add-documentation' into 'dev'
chore: add documentation RI

See merge request ics/sp/2025/n25b/pepperplus-ri!11
2025-11-22 19:14:51 +00:00
Twirre Meulenbelt
a53871360e docs: remove duplicate and double space
ref: N25B-298
2025-11-22 19:32:50 +01:00
Pim Hutting
c1e92feba7 Apply 1 suggestion(s) to 1 file(s)
Co-authored-by: Kasper Marinus <k.marinus@students.uu.nl>
2025-11-22 12:37:39 +00:00
Pim Hutting
6859451bf9 Apply 1 suggestion(s) to 1 file(s)
Co-authored-by: Twirre <s.a.meulenbelt@students.uu.nl>
2025-11-22 12:36:34 +00:00
Twirre Meulenbelt
64c6f0addb docs: make doc generator understand multi line
ref: N25B-298
2025-11-22 12:44:13 +01:00
Pim Hutting
c53307530b chore: applied all feedback
close: N25B-298
2025-11-22 11:45:32 +01:00
Pim Hutting
051f904576 chore: add documentation RI
Code functionality left unchanged, only added docs where missing

close: N25B-298
2025-11-21 16:35:40 +01:00
Twirre
1e3531ac6e Merge branch 'docs/gen_documentation' into 'dev'
docs: added auto-generation of documentation

See merge request ics/sp/2025/n25b/pepperplus-ri!10
2025-11-19 17:14:36 +00:00
Storm
cec29f6206 chore: updated .gitignore
ref: N25B-270
2025-11-19 18:10:18 +01:00
Storm
a0a8ad2689 docs: changed readme
ref: N25B-270
2025-11-19 17:59:37 +01:00
JobvAlewijk
1c9467d03a fix: conf includes correct path
ref: N25B-270
2025-11-19 17:57:24 +01:00
Storm
9dd39d2048 docs: added auto-generation of documentation
ref: N25B-270
2025-11-19 13:49:50 +01:00
Twirre
b05aa5e834 Merge branch 'refactor/config-file' into 'dev'
refactor: added config file and moved constants

See merge request ics/sp/2025/n25b/pepperplus-ri!9
2025-11-14 14:15:06 +00:00
Twirre Meulenbelt
c691e279cd style: two lines between top level declarations
ref: N25B-236
2025-11-14 15:13:48 +01:00
Pim Hutting
16b64e41c8 style: applied style suggestions
close: N25B-236
2025-11-14 14:12:14 +00:00
Twirre Meulenbelt
03519e2a16 test: fix microphone interactive test
This was created with the assumption that all devices were choosable, but now only ones with input channels are.

ref: N25B-119
2025-11-14 13:08:31 +01:00
Pim Hutting
643d7b919c fix: made all tests pass
before some tests failed because of a faulty edit
to microphone util

ref: N25B-236
2025-11-09 16:00:36 +01:00
Pim Hutting
4402b21a73 refactor: added config file and moved constants
- Moved hardcoded configuration constants to a dedicated config.py file.
- Created VideoConfig, AudioConfig, MainConfig, and Settings classes in config.py

ref: N25B-236
2025-11-09 15:43:22 +01:00
Pim Hutting
c037eb7ec2 Merge branch 'feat/stream-audio' into 'dev'
Implement audio streaming

See merge request ics/sp/2025/n25b/pepperplus-ri!8
2025-11-05 12:08:28 +00:00
Twirre Meulenbelt
8a095323ec docs: describe extra WSL installation step
ref: N25B-119
2025-11-02 16:35:15 +01:00
Twirre Meulenbelt
854a14bf0c docs: describe --microphone program parameter
ref: N25B-119
2025-11-02 16:16:43 +01:00
Twirre Meulenbelt
fab5127cac feat: add application parameter to choose a custom microphone
ref: N25B-119
2025-11-02 16:12:56 +01:00
Twirre Meulenbelt
5912ac606a docs: add installation instructions for the portaudio dependency
ref: N25B-119
2025-11-02 15:01:18 +01:00
Twirre Meulenbelt
9ea446275e fix: allow speaking text with Unicode characters
When speaking, the actuation receiver logs the message to speak. If the message includes Unicode characters, it will now no longer crash.

ref: N25B-119
2025-11-02 14:59:16 +01:00
Twirre Meulenbelt
a6a12a5886 fix: remove unused qi import
It had already been made so that the VideoSender does not depend on `qi`, but the import was not yet removed.

ref: N25B-119
2025-11-02 14:58:32 +01:00
37 changed files with 1811 additions and 172 deletions

77
.githooks/check-branch-name.sh Executable file
View File

@@ -0,0 +1,77 @@
#!/usr/bin/env bash
# This script checks if the current branch name follows the specified format.
# It's designed to be used as a 'pre-commit' git hook.
# Format: <type>/<short-description>
# Example: feat/add-user-login
# --- Configuration ---
# An array of allowed commit types
ALLOWED_TYPES=(feat fix refactor perf style test docs build chore revert)
# An array of branches to ignore
IGNORED_BRANCHES=(main dev demo)
# --- Colors for Output ---
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# --- Helper Functions ---
error_exit() {
echo -e "${RED}ERROR: $1${NC}" >&2
echo -e "${YELLOW}Branch name format is incorrect. Aborting commit.${NC}" >&2
exit 1
}
# --- Main Logic ---
# 1. Get the current branch name
BRANCH_NAME=$(git symbolic-ref --short HEAD)
# 2. Check if the current branch is in the ignored list
for ignored_branch in "${IGNORED_BRANCHES[@]}"; do
if [ "$BRANCH_NAME" == "$ignored_branch" ]; then
echo -e "${GREEN}Branch check skipped for default branch: $BRANCH_NAME${NC}"
exit 0
fi
done
# 3. Validate the overall structure: <type>/<description>
if ! [[ "$BRANCH_NAME" =~ ^[a-z]+/.+$ ]]; then
error_exit "Branch name must be in the format: <type>/<short-description>\nExample: feat/add-user-login"
fi
# 4. Extract the type and description
TYPE=$(echo "$BRANCH_NAME" | cut -d'/' -f1)
DESCRIPTION=$(echo "$BRANCH_NAME" | cut -d'/' -f2-)
# 5. Validate the <type>
type_valid=false
for allowed_type in "${ALLOWED_TYPES[@]}"; do
if [ "$TYPE" == "$allowed_type" ]; then
type_valid=true
break
fi
done
if [ "$type_valid" == false ]; then
error_exit "Invalid type '$TYPE'.\nAllowed types are: ${ALLOWED_TYPES[*]}"
fi
# 6. Validate the <short-description>
# Regex breakdown:
# ^[a-z0-9]+ - Starts with one or more lowercase letters/numbers (the first word).
# (-[a-z0-9]+){0,5} - Followed by a group of (dash + word) 0 to 5 times.
# $ - End of the string.
# This entire pattern enforces 1 to 6 words total, separated by dashes.
DESCRIPTION_REGEX="^[a-z0-9]+(-[a-z0-9]+){0,5}$"
if ! [[ "$DESCRIPTION" =~ $DESCRIPTION_REGEX ]]; then
error_exit "Invalid short description '$DESCRIPTION'.\nIt must be a maximum of 6 words, all lowercase, separated by dashes.\nExample: add-new-user-authentication-feature"
fi
# If all checks pass, exit successfully
echo -e "${GREEN}Branch name '$BRANCH_NAME' is valid.${NC}"
exit 0

135
.githooks/check-commit-msg.sh Executable file
View File

@@ -0,0 +1,135 @@
#!/usr/bin/env bash
# This script checks if a commit message follows the specified format.
# It's designed to be used as a 'commit-msg' git hook.
# Format:
# <type>: <short description>
#
# [optional]<body>
#
# [ref/close]: <issue identifier>
# --- Configuration ---
# An array of allowed commit types
ALLOWED_TYPES=(feat fix refactor perf style test docs build chore revert)
# --- Colors for Output ---
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# The first argument to the hook is the path to the file containing the commit message
COMMIT_MSG_FILE=$1
# --- Automated Commit Detection ---
# Read the first line (header) for initial checks
HEADER=$(head -n 1 "$COMMIT_MSG_FILE")
# Check for Merge commits (covers 'git merge' and PR merges from GitHub/GitLab)
# Examples: "Merge branch 'main' into ...", "Merge pull request #123 from ..."
MERGE_PATTERN="^Merge (remote-tracking )?(branch|pull request|tag) .*"
if [[ "$HEADER" =~ $MERGE_PATTERN ]]; then
echo -e "${GREEN}Merge commit detected by message content. Skipping validation.${NC}"
exit 0
fi
# Check for Revert commits
# Example: "Revert "feat: add new feature""
REVERT_PATTERN="^Revert \".*\""
if [[ "$HEADER" =~ $REVERT_PATTERN ]]; then
echo -e "${GREEN}Revert commit detected by message content. Skipping validation.${NC}"
exit 0
fi
# Check for Cherry-pick commits (this pattern appears at the end of the message)
# Example: "(cherry picked from commit deadbeef...)"
# We use grep -q to search the whole file quietly.
CHERRY_PICK_PATTERN="\(cherry picked from commit [a-f0-9]{7,40}\)"
if grep -qE "$CHERRY_PICK_PATTERN" "$COMMIT_MSG_FILE"; then
echo -e "${GREEN}Cherry-pick detected by message content. Skipping validation.${NC}"
exit 0
fi
# Check for Squash
# Example: "Squash commits ..."
SQUASH_PATTERN="^Squash .+"
if [[ "$HEADER" =~ $SQUASH_PATTERN ]]; then
echo -e "${GREEN}Squash commit detected by message content. Skipping validation.${NC}"
exit 0
fi
# --- Validation Functions ---
# Function to print an error message and exit
# Usage: error_exit "Your error message here"
error_exit() {
# >&2 redirects echo to stderr
echo -e "${RED}ERROR: $1${NC}" >&2
echo -e "${YELLOW}Commit message format is incorrect. Aborting commit.${NC}" >&2
exit 1
}
# --- Main Logic ---
# 1. Read the header (first line) of the commit message
HEADER=$(head -n 1 "$COMMIT_MSG_FILE")
# 2. Validate the header format: <type>: <description>
# Regex breakdown:
# ^(type1|type2|...) - Starts with one of the allowed types
# : - Followed by a literal colon
# \s - Followed by a single space
# .+ - Followed by one or more characters for the description
# $ - End of the line
TYPES_REGEX=$(
IFS="|"
echo "${ALLOWED_TYPES[*]}"
)
HEADER_REGEX="^($TYPES_REGEX): .+$"
if ! [[ "$HEADER" =~ $HEADER_REGEX ]]; then
error_exit "Invalid header format.\n\nHeader must be in the format: <type>: <short description>\nAllowed types: ${ALLOWED_TYPES[*]}\nExample: feat: add new user authentication feature"
fi
# Only validate footer if commit type is not chore
TYPE=$(echo "$HEADER" | cut -d':' -f1)
if [ "$TYPE" != "chore" ]; then
# 3. Validate the footer (last line) of the commit message
FOOTER=$(tail -n 1 "$COMMIT_MSG_FILE")
# Regex breakdown:
# ^(ref|close) - Starts with 'ref' or 'close'
# : - Followed by a literal colon
# \s - Followed by a single space
# N25B- - Followed by the literal string 'N25B-'
# [0-9]+ - Followed by one or more digits
# $ - End of the line
FOOTER_REGEX="^(ref|close): N25B-[0-9]+$"
if ! [[ "$FOOTER" =~ $FOOTER_REGEX ]]; then
error_exit "Invalid footer format.\n\nFooter must be in the format: [ref/close]: <issue identifier>\nExample: ref: N25B-123"
fi
fi
# 4. If the message has more than 2 lines, validate the separator
# A blank line must exist between the header and the body.
LINE_COUNT=$(wc -l <"$COMMIT_MSG_FILE" | xargs) # xargs trims whitespace
# We only care if there is a body. Header + Footer = 2 lines.
# Header + Blank Line + Body... + Footer > 2 lines.
if [ "$LINE_COUNT" -gt 2 ]; then
# Get the second line
SECOND_LINE=$(sed -n '2p' "$COMMIT_MSG_FILE")
# Check if the second line is NOT empty. If it's not, it's an error.
if [ -n "$SECOND_LINE" ]; then
error_exit "Missing blank line between header and body.\n\nThe second line of your commit message must be empty if a body is present."
fi
fi
# If all checks pass, exit with success
echo -e "${GREEN}Commit message is valid.${NC}"
exit 0

View File

@@ -1,16 +0,0 @@
#!/bin/sh
commit_msg_file=$1
commit_msg=$(cat "$commit_msg_file")
if echo "$commit_msg" | grep -Eq "^(feat|fix|refactor|perf|style|test|docs|build|chore|revert): .+"; then
if echo "$commit_msg" | grep -Eq "^(ref|close):\sN25B-.+"; then
exit 0
else
echo "❌ Commit message invalid! Must end with [ref/close]: N25B-000"
exit 1
fi
else
echo "❌ Commit message invalid! Must start with <type>: <description>"
exit 1
fi

View File

@@ -1,17 +0,0 @@
#!/bin/sh
# Get current branch
branch=$(git rev-parse --abbrev-ref HEAD)
if echo "$branch" | grep -Eq "(dev|main)"; then
echo 0
fi
# allowed pattern <type/>
if echo "$branch" | grep -Eq "^(feat|fix|refactor|perf|style|test|docs|build|chore|revert)\/\w+(-\w+){0,5}$"; then
exit 0
else
echo "❌ Invalid branch name: $branch"
echo "Branch must be named <type>/<description-of-branch> (must have one to six words separated by a dash)"
exit 1
fi

View File

@@ -1,9 +0,0 @@
#!/bin/sh
echo "#<type>: <description>
#[optional body]
#[optional footer(s)]
#[ref/close]: <issue identifier>" > $1

3
.gitignore vendored
View File

@@ -217,3 +217,6 @@ __marimo__/
.DS_Store
# Docs
docs/*
!docs/conf.py

42
.gitlab-ci.yml Normal file
View File

@@ -0,0 +1,42 @@
# ---------- GLOBAL SETUP ---------- #
workflow:
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
stages:
- install
- test
default:
image: qi-py-ri-base:latest
cache:
key: "${CI_COMMIT_REF_SLUG}"
paths:
- .venv/
policy: pull-push
# --------- INSTALLING --------- #
install:
stage: install
tags:
- install
script:
- python -m virtualenv .venv
- source .venv/bin/activate
- echo /qi/pynaoqi-python2.7-2.5.7.1-linux64/lib/python2.7/site-packages/ > .venv/lib/python2.7/site-packages/pynaoqi-python2.7.pth
- pip install -r requirements.txt
artifacts:
paths:
- .venv/
expire_in: 1h
# ---------- TESTING ---------- #
test:
stage: test
needs:
- install
tags:
- test
script:
- source .venv/bin/activate
- PYTHONPATH=src pytest test/

15
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,15 @@
repos:
- repo: local
hooks:
- id: check-commit-msg
name: Check commit message format
entry: .githooks/check-commit-msg.sh
language: script
stages: [commit-msg]
- id: check-branch-name
name: Check branch name format
entry: .githooks/check-branch-name.sh
language: script
stages: [commit]
always_run: true
pass_filenames: false

View File

@@ -34,6 +34,18 @@ python -m virtualenv .venv
source .venv/bin/activate
```
We depend on PortAudio for the `pyaudio` package, so install it with:
```bash
sudo apt install -y portaudio19-dev
```
On WSL, also install:
```bash
sudo apt install -y libasound2-plugins
```
Install the required packages with
```bash
@@ -98,6 +110,8 @@ $env:PYTHONPATH="src"; python -m robot_interface.main
With both, if you want to connect to the actual robot (or simulator), pass the `--qi-url` argument.
There's also a `--microphone` argument that can be used to choose a microphone to use. If not given, the program will try the default microphone. If you don't know the name of the microphone, pass the argument with any value, and it will list the names of available microphones.
## Testing
@@ -120,15 +134,46 @@ For coverage, add `--cov=robot_interface` as an argument to `pytest`.
## GitHooks
## Git Hooks
To activate automatic commits/branch name checks run:
To activate automatic linting, formatting, branch name checks and commit message checks, run (after installing requirements):
```shell
git config --local core.hooksPath .githooks
```bash
pre-commit install
pre-commit install --hook-type commit-msg
```
If your commit fails its either:
branch name != <type>/description-of-branch ,
commit name != <type>: description of the commit.
<ref>: N25B-Num's
You might get an error along the lines of `Can't install pre-commit with core.hooksPath` set. To fix this, simply unset the hooksPath by running:
```bash
git config --local --unset core.hooksPath
```
Then run the pre-commit install commands again.
## Documentation
Generate documentation web pages using:
### Linux & macOS
```bash
PYTHONPATH=src sphinx-apidoc -F -o docs src/robot_interface
```
### Windows
```bash
$env:PYTHONPATH="src"; sphinx-apidoc -F -o docs src/control_backend
```
Optionally, in the `conf.py` file in the new `docs` folder, change preferences.
In the `docs` folder:
### Linux & macOS
```bash
make html
```
### Windows
```bash
.\make.bat html
```

184
docs/conf.py Normal file
View File

@@ -0,0 +1,184 @@
# -*- coding: utf-8 -*-
#
# Configuration file for the Sphinx documentation builder.
#
# This file does only contain a selection of the most common options. For a
# full list see the documentation:
# http://www.sphinx-doc.org/en/master/config
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath("../src"))
# -- Project information -----------------------------------------------------
project = u'robot_interface'
copyright = u'2025, Author'
author = u'Author'
# The short X.Y version
version = u''
# The full version, including alpha/beta/rc tags
release = u''
# -- General configuration ---------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.viewcode',
'sphinx.ext.todo',
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = 'en'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = None
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'sphinx_rtd_theme'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# Custom sidebar templates, must be a dictionary that maps document names
# to template names.
#
# The default sidebars (for documents that don't match any pattern) are
# defined by theme itself. Builtin themes are using these templates by
# default: ``['localtoc.html', 'relations.html', 'sourcelink.html',
# 'searchbox.html']``.
#
# html_sidebars = {}
# -- Options for HTMLHelp output ---------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = 'robot_interfacedoc'
# -- Options for LaTeX output ------------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'robot_interface.tex', u'robot\\_interface Documentation',
u'Author', 'manual'),
]
# -- Options for manual page output ------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'robot_interface', u'robot_interface Documentation',
[author], 1)
]
# -- Options for Texinfo output ----------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'robot_interface', u'robot_interface Documentation',
author, 'robot_interface', 'One line description of project.',
'Miscellaneous'),
]
# -- Options for Epub output -------------------------------------------------
# Bibliographic Dublin Core info.
epub_title = project
# The unique identifier of the text. This can be a ISBN number
# or the project homepage.
#
# epub_identifier = ''
# A unique identification for the text.
#
# epub_uid = ''
# A list of files that should not be packed into the epub file.
epub_exclude_files = ['search.html']
# -- Extension configuration -------------------------------------------------
# -- Options for todo extension ----------------------------------------------
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = True

View File

@@ -3,3 +3,6 @@ pyaudio<=0.2.11
pytest<5
pytest-mock<3.0.0
pytest-cov<3.0.0
sphinx
sphinx_rtd_theme
pre-commit

View File

View File

@@ -0,0 +1,115 @@
from __future__ import unicode_literals
class AgentSettings(object):
"""
Agent port configuration.
:ivar actuating_receiver_port: Port for receiving actuation commands.
:vartype actuating_receiver_port: int
:ivar main_receiver_port: Port for receiving main messages.
:vartype main_receiver_port: int
:ivar video_sender_port: Port used for sending video frames.
:vartype video_sender_port: int
:ivar audio_sender_port: Port used for sending audio data.
:vartype audio_sender_port: int
"""
def __init__(
self,
actuating_receiver_port=5557,
main_receiver_port=5555,
video_sender_port=5556,
audio_sender_port=5558,
):
self.actuating_receiver_port = actuating_receiver_port
self.main_receiver_port = main_receiver_port
self.video_sender_port = video_sender_port
self.audio_sender_port = audio_sender_port
class VideoConfig(object):
"""
Video configuration constants.
:ivar camera_index: Index of the camera used.
:vartype camera_index: int
:ivar resolution: Video resolution mode.
:vartype resolution: int
:ivar color_space: Color space identifier.
:vartype color_space: int
:ivar fps: Frames per second of the video stream.
:vartype fps: int
:ivar stream_name: Name of the video stream.
:vartype stream_name: str
:ivar image_buffer: Internal buffer size for video frames.
:vartype image_buffer: int
"""
def __init__(
self,
camera_index=0,
resolution=2,
color_space=11,
fps=15,
stream_name="Pepper Video",
image_buffer=6,
):
self.camera_index = camera_index
self.resolution = resolution
self.color_space = color_space
self.fps = fps
self.stream_name = stream_name
self.image_buffer = image_buffer
class AudioConfig(object):
"""
Audio configuration constants.
:ivar sample_rate: Audio sampling rate in Hz.
:vartype sample_rate: int
:ivar chunk_size: Size of audio chunks to capture/process.
:vartype chunk_size: int
:ivar channels: Number of audio channels.
:vartype channels: int
"""
def __init__(self, sample_rate=16000, chunk_size=512, channels=1):
self.sample_rate = sample_rate
self.chunk_size = chunk_size
self.channels = channels
class MainConfig(object):
"""
Main system configuration.
:ivar poll_timeout_ms: Timeout for polling events, in milliseconds.
:vartype poll_timeout_ms: int
:ivar max_handler_time_ms: Maximum allowed handler time, in milliseconds.
:vartype max_handler_time_ms: int
"""
def __init__(self, poll_timeout_ms=100, max_handler_time_ms=50):
self.poll_timeout_ms = poll_timeout_ms
self.max_handler_time_ms = max_handler_time_ms
class Settings(object):
"""
Global settings container.
:ivar agent_settings: Agent-related port configuration.
:vartype agent_settings: AgentSettings
:ivar video_config: Video stream configuration.
:vartype video_config: VideoConfig
:ivar audio_config: Audio stream configuration.
:vartype audio_config: AudioConfig
:ivar main_config: Main system-level configuration.
:vartype main_config: MainConfig
"""
def __init__(self, agent_settings=None, video_config=None, audio_config=None, main_config=None):
self.agent_settings = agent_settings or AgentSettings()
self.video_config = video_config or VideoConfig()
self.audio_config = audio_config or AudioConfig()
self.main_config = main_config or MainConfig()
settings = Settings()

View File

@@ -1,3 +1,4 @@
from __future__ import unicode_literals # So that we can log texts with Unicode characters
import logging
import zmq
@@ -5,25 +6,35 @@ import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
from robot_interface.state import state
from robot_interface.core.config import settings
class ActuationReceiver(ReceiverBase):
def __init__(self, zmq_context, port=5557):
"""
The actuation receiver endpoint, responsible for handling speech and gesture requests.
"""
The actuation receiver endpoint, responsible for handling speech and gesture requests.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use.
:type port: int
"""
:param port: The port to use.
:type port: int
:ivar _tts_service: The text-to-speech service object from the Qi session.
:vartype _tts_service: qi.Session | None
"""
def __init__(self, zmq_context, port=settings.agent_settings.actuating_receiver_port):
super(ActuationReceiver, self).__init__("actuation")
self.create_socket(zmq_context, zmq.SUB, port)
self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options
self._tts_service = None
self._al_memory = None
def _handle_speech(self, message):
"""
Handle a speech actuation request.
:param message: The message to handle, must contain properties "endpoint" and "data".
:type message: dict
"""
text = message.get("data")
if not text:
logging.warn("Received message to speak, but it lacks data.")
@@ -41,26 +52,16 @@ class ActuationReceiver(ReceiverBase):
if not self._tts_service:
self._tts_service = state.qi_session.service("ALTextToSpeech")
if not self._al_memory:
self._al_memory = state.qi_session.service("ALMemory")
# Subscribe to speech end event
self.status_subscriber = self._al_memory.subscriber("ALTextToSpeech/Status") # self because garbage collect
self.status_subscriber.signal.connect(self._on_status_changed)
# Returns instantly. Messages received while speaking will be queued.
qi.async(self._tts_service.say, text)
@staticmethod
def _on_status_changed(value): # value will contain either 'enqueued', 'started' or 'done' depending on the status
"""Callback function for when the speaking status changes. Will change the is_speaking value of the state."""
if "started" in value:
logging.debug("Started speaking.")
state.is_speaking = True
if "done" in value:
logging.debug("Done speaking.")
state.is_speaking = False
def handle_message(self, message):
"""
Handle an actuation/speech message with the receiver.
:param message: The message to handle, must contain properties "endpoint" and "data".
:type message: dict
"""
if message["endpoint"] == "actuate/speech":
self._handle_speech(message)

View File

@@ -7,23 +7,49 @@ import zmq
from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state
from robot_interface.utils.microphone import choose_mic_default
from robot_interface.utils.microphone import choose_mic
from robot_interface.core.config import settings
logger = logging.getLogger(__name__)
class AudioSender(SocketBase):
def __init__(self, zmq_context, port=5558):
"""
Audio sender endpoint, responsible for sending microphone audio data.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use.
:type port: int
:ivar thread: Thread used for sending audio.
:vartype thread: threading.Thread | None
:ivar audio: PyAudio instance.
:vartype audio: pyaudio.PyAudio | None
:ivar microphone: Selected microphone information.
:vartype microphone: dict | None
"""
def __init__(self, zmq_context, port=settings.agent_settings.audio_sender_port):
super(AudioSender, self).__init__(str("audio")) # Convert future's unicode_literal to str
self.create_socket(zmq_context, zmq.PUB, port)
self.audio = pyaudio.PyAudio()
self.microphone = choose_mic_default(self.audio)
self.thread = None
try:
self.audio = pyaudio.PyAudio()
self.microphone = choose_mic(self.audio)
except IOError as e:
logger.warning("PyAudio is not available.", exc_info=e)
self.audio = None
self.microphone = None
def start(self):
"""
Start sending audio in a different thread.
Will not start if no microphone is available.
"""
if not self.microphone:
logger.info("Not listening: no microphone available.")
@@ -35,21 +61,26 @@ class AudioSender(SocketBase):
def wait_until_done(self):
"""
Wait until the audio thread is done. Will only be done if `state.exit_event` is set, so
make sure to set that before calling this method or it will block.
Wait until the audio thread is done.
Will block until `state.exit_event` is set. If the thread is not running, does nothing.
"""
if not self.thread: return
self.thread.join()
self.thread = None
def _stream(self):
chunk = 512 # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD
"""
Internal method to continuously read audio from the microphone and send it over the socket.
"""
audio_settings = settings.audio_config
chunk = audio_settings.chunk_size # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD
# Docs say this only raises an error if neither `input` nor `output` is True
stream = self.audio.open(
format=pyaudio.paFloat32,
channels=1,
rate=16000,
channels=audio_settings.channels,
rate=audio_settings.sample_rate,
input=True,
input_device_index=self.microphone["index"],
frames_per_buffer=chunk,
@@ -57,13 +88,6 @@ class AudioSender(SocketBase):
try:
while not state.exit_event.is_set():
# Don't send audio if Pepper is speaking
if state.is_speaking:
if stream.is_active(): stream.stop_stream()
continue
if stream.is_stopped(): stream.start_stream()
data = stream.read(chunk)
self.socket.send(data)
except IOError as e:

View File

@@ -3,28 +3,50 @@ import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase
from robot_interface.state import state
from robot_interface.core.config import settings
class MainReceiver(ReceiverBase):
def __init__(self, zmq_context, port=5555):
"""
The main receiver endpoint, responsible for handling ping and negotiation requests.
"""
The main receiver endpoint, responsible for handling ping and negotiation requests.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use.
:type port: int
"""
:param port: The port to use.
:type port: int
"""
def __init__(self, zmq_context, port=settings.agent_settings.main_receiver_port):
super(MainReceiver, self).__init__("main")
self.create_socket(zmq_context, zmq.REP, port, bind=False)
@staticmethod
def _handle_ping(message):
"""A simple ping endpoint. Returns the provided data."""
"""
Handle a ping request.
Returns the provided data in a standardized response dictionary.
:param message: The ping request message.
:type message: dict
:return: A response dictionary containing the original data.
:rtype: dict[str, str | list[dict]]
"""
return {"endpoint": "ping", "data": message.get("data")}
@staticmethod
def _handle_port_negotiation(message):
"""
Handle a port negotiation request.
Returns a list of all known endpoints and their descriptions.
:param message: The negotiation request message.
:type message: dict
:return: A response dictionary with endpoint descriptions as data.
:rtype: dict[str, list[dict]]
"""
endpoints = [socket.endpoint_description() for socket in state.sockets]
return {"endpoint": "negotiate/ports", "data": endpoints}
@@ -32,13 +54,13 @@ class MainReceiver(ReceiverBase):
@staticmethod
def _handle_negotiation(message):
"""
Handle a negotiation request. Will respond with ports that can be used to connect to the robot.
Handle a negotiation request. Responds with ports that can be used to connect to the robot.
:param message: The negotiation request message.
:type message: dict
:return: A response dictionary with a 'ports' key containing a list of ports and their function.
:rtype: dict[str, list[dict]]
:return: A response dictionary with the negotiation result.
:rtype: dict[str, str | list[dict]]
"""
# In the future, the sender could send information like the robot's IP address, etc.
@@ -48,6 +70,17 @@ class MainReceiver(ReceiverBase):
return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."}
def handle_message(self, message):
"""
Main entry point for handling incoming messages.
Dispatches messages to the appropriate handler based on the endpoint.
:param message: The received message.
:type message: dict
:return: A response dictionary based on the requested endpoint.
:rtype: dict[str, str | list[dict]]
"""
if message["endpoint"] == "ping":
return self._handle_ping(message)
elif message["endpoint"].startswith("negotiate"):

View File

@@ -4,7 +4,7 @@ from robot_interface.endpoints.socket_base import SocketBase
class ReceiverBase(SocketBase, object):
"""Associated with a ZeroMQ socket."""
"""Base class for receivers associated with a ZeroMQ socket."""
__metaclass__ = ABCMeta
@abstractmethod

View File

@@ -4,16 +4,27 @@ import zmq
class SocketBase(object):
"""
Base class for endpoints associated with a ZeroMQ socket.
:ivar identifier: The identifier of the endpoint.
:vartype identifier: str
:ivar port: The port used by the socket, set by `create_socket`.
:vartype port: int | None
:ivar socket: The ZeroMQ socket object, set by `create_socket`.
:vartype socket: zmq.Socket | None
:ivar bound: Whether the socket is bound or connected, set by `create_socket`.
:vartype bound: bool | None
"""
__metaclass__ = ABCMeta
name = None
socket = None
def __init__(self, identifier):
"""
:param identifier: The identifier of the endpoint.
:type identifier: str
"""
self.identifier = identifier
self.port = None # Set later by `create_socket`
self.socket = None # Set later by `create_socket`
@@ -32,8 +43,7 @@ class SocketBase(object):
:param port: The port to use.
:type port: int
:param options: A list of options to be set on the socket. The list contains tuples where the first element contains the option
and the second the value, for example (zmq.CONFLATE, 1).
:param options: A list of tuples where the first element contains the option and the second the value.
:type options: list[tuple[int, int]]
:param bind: Whether to bind the socket or connect to it.
@@ -62,7 +72,7 @@ class SocketBase(object):
Description of the endpoint. Used for negotiation.
:return: A dictionary with the following keys: id, port, bind. See API specification at:
https://utrechtuniversity.youtrack.cloud/articles/N25B-A-14/RI-CB-Communication#negotiation
https://utrechtuniversity.youtrack.cloud/articles/N25B-A-14/RI-CB-Communication#negotiation
:rtype: dict
"""
return {

View File

@@ -1,32 +1,43 @@
import zmq
import threading
import qi
import logging
from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state
from robot_interface.core.config import settings
class VideoSender(SocketBase):
def __init__(self, zmq_context, port=5556):
"""
Video sender endpoint, responsible for sending video frames.
:param zmq_context: The ZeroMQ context to use.
:type zmq_context: zmq.Context
:param port: The port to use for sending video frames.
:type port: int
"""
def __init__(self, zmq_context, port=settings.agent_settings.video_sender_port):
super(VideoSender, self).__init__("video")
self.create_socket(zmq_context, zmq.PUB, port, [(zmq.CONFLATE,1)])
def start_video_rcv(self):
"""
Prepares arguments for retrieving video images from Pepper and starts video loop on a separate thread.
Will not start if no qi session is available.
"""
if not state.qi_session:
logging.info("No Qi session available. Not starting video loop.")
return
video = state.qi_session.service("ALVideoDevice")
camera_index = 0
kQVGA = 2
kRGB = 11
FPS = 15
vid_stream_name = video.subscribeCamera("Pepper Video", camera_index, kQVGA, kRGB, FPS)
video_settings = settings.video_config
camera_index = video_settings.camera_index
kQVGA = video_settings.resolution
kRGB = video_settings.color_space
FPS = video_settings.fps
video_name = video_settings.stream_name
vid_stream_name = video.subscribeCamera(video_name, camera_index, kQVGA, kRGB, FPS)
thread = threading.Thread(target=self.video_rcv_loop, args=(video, vid_stream_name))
thread.start()
@@ -38,12 +49,12 @@ class VideoSender(SocketBase):
:type vid_service: Object (Qi service object)
:param vid_stream_name: The name of a camera subscription on the video service object vid_service
:type vid_stream_name: String
:type vid_stream_name: str
"""
while not state.exit_event.is_set():
try:
img = vid_service.getImageRemote(vid_stream_name)
#Possibly limit images sent if queuing issues arise
self.socket.send(img[6])
self.socket.send(img[settings.video_config.image_buffer])
except:
logging.warn("Failed to retrieve video image from robot.")

View File

@@ -10,6 +10,7 @@ from robot_interface.endpoints.actuation_receiver import ActuationReceiver
from robot_interface.endpoints.main_receiver import MainReceiver
from robot_interface.endpoints.video_sender import VideoSender
from robot_interface.state import state
from robot_interface.core.config import settings
from robot_interface.utils.timeblock import TimeBlock
@@ -43,20 +44,9 @@ def main_loop(context):
logging.debug("Starting main loop.")
import schedule
test_speaking_message = {"data": "Hi, my name is Pepper, and this is quite a long message."}
def test_speak():
logging.debug("Testing speech.")
actuation_receiver._handle_speech(test_speaking_message)
schedule.every(10).seconds.do(test_speak)
while True:
if state.exit_event.is_set(): break
schedule.run_pending()
socks = dict(poller.poll(100))
socks = dict(poller.poll(settings.main_config.poll_timeout_ms))
for receiver in receivers:
if receiver.socket not in socks: continue
@@ -67,10 +57,17 @@ def main_loop(context):
continue
def overtime_callback(time_ms):
"""
A callback function executed by TimeBlock if the message handling
exceeds the allowed time limit.
:param time_ms: The elapsed time, in milliseconds, that the block took.
:type time_ms: float
"""
logging.warn("Endpoint \"%s\" took too long (%.2f ms) on the main thread.",
message["endpoint"], time_ms)
with TimeBlock(overtime_callback, 50):
with TimeBlock(overtime_callback, settings.main_config.max_handler_time_ms):
response = receiver.handle_message(message)
if receiver.socket.getsockopt(zmq.TYPE) == zmq.REP:
@@ -78,6 +75,12 @@ def main_loop(context):
def main():
"""
Initializes the ZeroMQ context and the application state.
It executes the main event loop (`main_loop`) and ensures that both the
application state and the ZeroMQ context are properly cleaned up (deinitialized/terminated)
upon exit, including handling a KeyboardInterrupt.
"""
context = zmq.Context()
state.initialize()

View File

@@ -12,15 +12,31 @@ class State(object):
This class is used to share state between threads. For example, when the program is quit, that all threads can
detect this via the `exit_event` property being set.
:ivar is_initialized: Flag indicating whether the state setup (exit handlers, QI session) has completed.
:vartype is_initialized: bool
:ivar exit_event: A thread event used to signal all threads that the program is shutting down.
:vartype exit_event: threading.Event | None
:ivar sockets: A list of ZeroMQ socket wrappers (`SocketBase`) that need to be closed during deinitialization.
:vartype sockets: List[SocketBase]
:ivar qi_session: The QI session object used for interaction with the robot/platform services.
:vartype qi_session: None | qi.Session
"""
def __init__(self):
self.is_initialized = False
self.exit_event = None
self.sockets = [] # type: List[SocketBase]
self.qi_session = None # type: None | ssl.SSLSession
self.is_speaking = False # type: Boolean
self.sockets = []
self.qi_session = None
def initialize(self):
"""
Sets up the application state. Creates the thread exit event, registers
signal handlers (`SIGINT`, `SIGTERM`) for graceful shutdown, and
establishes the QI session.
"""
if self.is_initialized:
logging.warn("Already initialized")
return
@@ -37,6 +53,9 @@ class State(object):
self.is_initialized = True
def deinitialize(self):
"""
Closes all sockets stored in the `sockets` list.
"""
if not self.is_initialized: return
for socket in self.sockets:
@@ -45,8 +64,24 @@ class State(object):
self.is_initialized = False
def __getattribute__(self, name):
# Enforce that the state is initialized before accessing any property (aside from the basic ones)
if name in ("initialize", "deinitialize", "is_initialized", "__dict__", "__class__"):
"""
Custom attribute access method that enforces a check: the state must be
fully initialized before any non-setup attributes (like `sockets` or `qi_session`)
can be accessed.
:param name: The name of the attribute being accessed.
:type name: str
:return: The value of the requested attribute.
:rtype: Any
"""
if name in (
"initialize",
"deinitialize",
"is_initialized",
"__dict__",
"__class__",
"__doc__"):
return object.__getattribute__(self, name)
if not object.__getattribute__(self, "is_initialized"):

View File

@@ -1,5 +1,6 @@
from __future__ import unicode_literals # So that `print` can print Unicode characters in names
import logging
import sys
logger = logging.getLogger(__name__)
@@ -28,7 +29,7 @@ def choose_mic_interactive(audio):
:type audio: pyaudio.PyAudio
:return: A dictionary from PyAudio containing information about the microphone to use, or None
if there is no microphone.
if there is no microphone.
:rtype: dict | None
"""
microphones = list(get_microphones(audio))
@@ -60,10 +61,61 @@ def choose_mic_default(audio):
:type audio: pyaudio.PyAudio
:return: A dictionary from PyAudio containing information about the microphone to use, or None
if there is no microphone.
if there is no microphone.
:rtype: dict | None
"""
try:
return audio.get_default_input_device_info()
except IOError:
return None
def choose_mic_arguments(audio):
"""
Get a microphone to use from command line arguments.
:param audio: An instance of PyAudio to use.
:type audio: pyaudio.PyAudio
:return: A dictionary from PyAudio containing information about the microphone to use, or None
if there is no microphone satisfied by the arguments.
:rtype: dict | None
"""
microphone_name = None
for i, arg in enumerate(sys.argv):
if arg == "--microphone" and len(sys.argv) > i+1:
microphone_name = sys.argv[i+1].strip()
if arg.startswith("--microphone="):
pre_fix_len = len("--microphone=")
microphone_name = arg[pre_fix_len:].strip()
if not microphone_name: return None
available_mics = list(get_microphones(audio))
for mic in available_mics:
if mic["name"] == microphone_name:
return mic
available_mic_names = [mic["name"] for mic in available_mics]
logger.warning("Microphone \"{}\" not found. Choose one of {}"
.format(microphone_name, available_mic_names))
return None
def choose_mic(audio):
"""
Get a microphone to use. Firstly, tries to see if there's an application argument specifying the
microphone to use. If not, get the default microphone.
:param audio: An instance of PyAudio to use.
:type audio: pyaudio.PyAudio
:return: A dictionary from PyAudio containing information about the microphone to use, or None
if there is no microphone.
:rtype: dict | None
"""
chosen_mic = choose_mic_arguments(audio)
if chosen_mic: return chosen_mic
return choose_mic_default(audio)

View File

@@ -8,6 +8,12 @@ except ImportError:
def get_qi_session():
"""
Create and return a Qi session if available.
:return: The active Qi session or ``None`` if unavailable.
:rtype: qi.Session | None
"""
if qi is None:
logging.info("Unable to import qi. Running in stand-alone mode.")
return None

View File

@@ -5,27 +5,54 @@ class TimeBlock(object):
"""
A context manager that times the execution of the block it contains. If execution exceeds the
limit, or if no limit is given, the callback will be called with the time that the block took.
"""
def __init__(self, callback, limit_ms=None):
"""
:param callback: The callback function that is called when the block of code is over,
unless the code block did not exceed the time limit.
:type callback: Callable[[float], None]
:param limit_ms: The number of milliseconds the block of code is allowed to take. If it
:param callback: The callback function that is called when the block of code is over,
unless the code block did not exceed the time limit.
:type callback: Callable[[float], None]
:param limit_ms: The number of milliseconds the block of code is allowed to take. If it
exceeds this time, or if it's None, the callback function will be called with the time the
block took.
:type limit_ms: int | None
"""
:type limit_ms: int | None
:ivar limit_ms: The number of milliseconds the block of code is allowed to take.
:vartype limit_ms: float | None
:ivar callback: The callback function that is called when the block of code is over.
:vartype callback: Callable[[float], None]
ivar start: The start time of the block, set when entering the context.
:vartype start: float | None
"""
def __init__(self, callback, limit_ms=None):
self.limit_ms = float(limit_ms) if limit_ms is not None else None
self.callback = callback
self.start = None
def __enter__(self):
"""
Enter the context manager and record the start time.
:return: Returns itself so timing information can be accessed if needed.
:rtype: TimeBlock
"""
self.start = time.time()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""
Exit the context manager, calculate the elapsed time, and call the callback
if the time limit was exceeded or not provided.
:param exc_type: The exception type, or None if no exception occurred.
:type exc_type: Type[BaseException] | None
:param exc_value: The exception instance, or None if no exception occurred.
:type exc_value: BaseException | None
:param traceback: The traceback object, or None if no exception occurred.
:type traceback: TracebackType | None
"""
elapsed = (time.time() - self.start) * 1000.0 # ms
if self.limit_ms is None or elapsed > self.limit_ms:
self.callback(elapsed)

View File

@@ -1,8 +1,15 @@
from __future__ import unicode_literals # So that we can format strings with Unicode characters
import random
import sys
from StringIO import StringIO
from robot_interface.utils.microphone import choose_mic_default, choose_mic_interactive, get_microphones
from robot_interface.utils.microphone import (
choose_mic_default,
choose_mic_interactive,
choose_mic_arguments,
choose_mic,
get_microphones,
)
class MicrophoneUtils(object):
@@ -10,10 +17,12 @@ class MicrophoneUtils(object):
def test_choose_mic_default(self, pyaudio_instance):
"""
The result must contain at least "index", as this is used to identify the microphone.
The "name" is used for logging, so it should also exist.
It must have one or more channels.
Lastly it must be capable of sending at least 16000 samples per second.
Tests that the default microphone selection function returns a valid
microphone dictionary containing all necessary keys with correct types and values.
The result must contain at least "index", as this is used to identify the microphone,
and "name" for logging. It must have one or more channels (`maxInputChannels`),
and a default sample rate of at least 16000 Hz.
"""
result = choose_mic_default(pyaudio_instance)
assert "index" in result
@@ -32,8 +41,13 @@ class MicrophoneUtils(object):
def test_choose_mic_interactive_input_not_int(self, pyaudio_instance, mocker):
"""
First mock an input that's not an integer, then a valid integer. There should be no errors.
Tests the robustness of the interactive selection when the user first enters
a non-integer value, ensuring the system prompts again without error and accepts
a valid integer on the second attempt.
"""
microphones = get_microphones(pyaudio_instance)
target_microphone = next(microphones)
mock_input = mocker.patch("__builtin__.raw_input", side_effect=["not an integer", "0"])
fake_out = StringIO()
mocker.patch.object(sys, "stdout", fake_out)
@@ -41,7 +55,7 @@ class MicrophoneUtils(object):
result = choose_mic_interactive(pyaudio_instance)
assert "index" in result
assert isinstance(result["index"], (int, long))
assert result["index"] == 0
assert result["index"] == target_microphone["index"]
assert mock_input.called
@@ -49,8 +63,12 @@ class MicrophoneUtils(object):
def test_choose_mic_interactive_negative_index(self, pyaudio_instance, mocker):
"""
Make sure that the interactive method does not allow negative integers as input.
Tests that the interactive selection method prevents the user from entering
a negative integer as a microphone index.
"""
microphones = get_microphones(pyaudio_instance)
target_microphone = next(microphones)
mock_input = mocker.patch("__builtin__.raw_input", side_effect=["-1", "0"])
fake_out = StringIO()
mocker.patch.object(sys, "stdout", fake_out)
@@ -58,7 +76,7 @@ class MicrophoneUtils(object):
result = choose_mic_interactive(pyaudio_instance)
assert "index" in result
assert isinstance(result["index"], (int, long))
assert result["index"] == 0
assert result["index"] == target_microphone["index"]
assert mock_input.called
@@ -66,7 +84,8 @@ class MicrophoneUtils(object):
def test_choose_mic_interactive_index_too_high(self, pyaudio_instance, mocker):
"""
Make sure that the interactive method does not allow indices higher than the highest mic index.
Tests that the interactive selection method prevents the user from entering
an index that exceeds the total number of available microphones.
"""
real_count = len(list(get_microphones(pyaudio_instance)))
mock_input = mocker.patch("__builtin__.raw_input", side_effect=[str(real_count), "0"])
@@ -83,7 +102,9 @@ class MicrophoneUtils(object):
def test_choose_mic_interactive_random_index(self, pyaudio_instance, mocker):
"""
Get a random index from the list of available mics, make sure it's correct.
Tests the core interactive functionality by simulating the selection of a
random valid microphone index and verifying that the correct microphone
information is returned.
"""
microphones = list(get_microphones(pyaudio_instance))
random_index = random.randrange(len(microphones))
@@ -93,3 +114,77 @@ class MicrophoneUtils(object):
assert "index" in result
assert isinstance(result["index"], (int, long))
assert result["index"] == microphones[random_index]["index"]
def test_choose_mic_no_arguments(self, pyaudio_instance, mocker):
"""
Tests `choose_mic_arguments` when no command-line arguments are provided,
"""
mocker.patch.object(sys, "argv", [])
result = choose_mic_arguments(pyaudio_instance)
assert result is None
def test_choose_mic_arguments(self, pyaudio_instance, mocker):
"""
Tests `choose_mic_arguments` when the microphone name is passed as a separate
argument.
"""
for mic in get_microphones(pyaudio_instance):
mocker.patch.object(sys, "argv", ["--microphone", mic["name"]])
result = choose_mic_arguments(pyaudio_instance)
assert result is not None
assert result == mic
def test_choose_mic_arguments_eq(self, pyaudio_instance, mocker):
"""
Tests `choose_mic_arguments` when the microphone name is passed using an
equals sign (`--microphone=NAME`).
"""
for mic in get_microphones(pyaudio_instance):
mocker.patch.object(sys, "argv", ["--microphone={}".format(mic["name"])])
result = choose_mic_arguments(pyaudio_instance)
assert result is not None
assert result == mic
def test_choose_mic_arguments_not_exist(self, pyaudio_instance, mocker):
"""
Tests `choose_mic_arguments` when a non-existent microphone name is passed
via command-line arguments, expecting the function to return None.
"""
mocker.patch.object(sys, "argv", ["--microphone", "Surely this microphone doesn't exist"])
result = choose_mic_arguments(pyaudio_instance)
assert result is None
def test_choose_mic_with_argument(self, pyaudio_instance, mocker):
"""
Tests `choose_mic` function when a valid microphone is
specified via command-line arguments.
"""
mic = next(get_microphones(pyaudio_instance))
mocker.patch.object(sys, "argv", ["--microphone", mic["name"]])
result = choose_mic(pyaudio_instance)
assert result is not None
assert result == mic
def test_choose_mic_no_argument(self, pyaudio_instance, mocker):
"""
Tests `choose_mic` function when no command-line arguments
are provided, verifying that the function falls back correctly to the
system's default microphone selection.
"""
default_mic = choose_mic_default(pyaudio_instance)
mocker.patch.object(sys, "argv", [])
result = choose_mic(pyaudio_instance)
assert result is not None
assert result == default_mic

View File

@@ -7,6 +7,17 @@ from common.microphone_utils import MicrophoneUtils
@pytest.fixture
def pyaudio_instance():
"""
A pytest fixture that provides an initialized PyAudio instance for tests
requiring microphone access.
It first initializes PyAudio. If a default input device (microphone) is not
found, the test is skipped to avoid failures in environments
without a mic.
:return: An initialized PyAudio instance.
:rtype: pyaudio.PyAudio
"""
audio = pyaudio.PyAudio()
try:
audio.get_default_input_device_info()

View File

@@ -9,11 +9,21 @@ from robot_interface.endpoints.actuation_receiver import ActuationReceiver
@pytest.fixture
def zmq_context():
"""
A pytest fixture that creates and yields a ZMQ context.
:return: An initialized ZeroMQ context.
:rtype: zmq.Context
"""
context = zmq.Context()
yield context
def test_handle_unimplemented_endpoint(zmq_context):
"""
Tests that the ``ActuationReceiver.handle_message`` method can
handle an unknown or unimplemented endpoint without raising an error.
"""
receiver = ActuationReceiver(zmq_context)
# Should not error
receiver.handle_message({
@@ -23,6 +33,10 @@ def test_handle_unimplemented_endpoint(zmq_context):
def test_speech_message_no_data(zmq_context, mocker):
"""
Tests that the message handler logs a warning when a speech actuation
request (`actuate/speech`) is received but contains empty string data.
"""
mock_warn = mocker.patch("logging.warn")
receiver = ActuationReceiver(zmq_context)
@@ -32,6 +46,10 @@ def test_speech_message_no_data(zmq_context, mocker):
def test_speech_message_invalid_data(zmq_context, mocker):
"""
Tests that the message handler logs a warning when a speech actuation
request (`actuate/speech`) is received with data that is not a string (e.g., a boolean).
"""
mock_warn = mocker.patch("logging.warn")
receiver = ActuationReceiver(zmq_context)
@@ -41,6 +59,10 @@ def test_speech_message_invalid_data(zmq_context, mocker):
def test_speech_no_qi(zmq_context, mocker):
"""
Tests the actuation receiver's behavior when processing a speech request
but the global state does not have an active QI session.
"""
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi_session = mock.PropertyMock(return_value=None)
@@ -53,6 +75,10 @@ def test_speech_no_qi(zmq_context, mocker):
def test_speech(zmq_context, mocker):
"""
Tests the core speech actuation functionality by mocking the QI TextToSpeech
service and verifying that it is called correctly.
"""
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
mock_qi = mock.Mock()

View File

@@ -1,6 +1,5 @@
# coding=utf-8
import os
import time
import mock
import pytest
@@ -11,13 +10,22 @@ from robot_interface.endpoints.audio_sender import AudioSender
@pytest.fixture
def zmq_context():
"""
A pytest fixture that creates and yields a ZMQ context.
:return: An initialized ZeroMQ context.
:rtype: zmq.Context
"""
context = zmq.Context()
yield context
def test_no_microphone(zmq_context, mocker):
"""
Tests the scenario where no valid microphone can be chosen for recording.
"""
mock_info_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger.info")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = None
sender = AudioSender(zmq_context)
@@ -31,8 +39,12 @@ def test_no_microphone(zmq_context, mocker):
def test_unicode_mic_name(zmq_context, mocker):
"""
Tests the robustness of the `AudioSender` when handling microphone names
that contain Unicode characters.
"""
mocker.patch("robot_interface.endpoints.audio_sender.threading")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"• Some Unicode name"}
sender = AudioSender(zmq_context)
@@ -47,11 +59,17 @@ def test_unicode_mic_name(zmq_context, mocker):
def _fake_read(num_frames):
"""
Helper function to simulate reading raw audio data from a microphone stream.
"""
return os.urandom(num_frames * 4)
def test_sending_audio(mocker):
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default")
"""
Tests the successful sending of audio data over a ZeroMQ socket.
"""
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
@@ -76,11 +94,17 @@ def test_sending_audio(mocker):
def _fake_read_error(num_frames):
"""
Helper function to simulate an I/O error during microphone stream reading.
"""
raise IOError()
def test_break_microphone(mocker):
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default")
"""
Tests the error handling when the microphone stream breaks (raises an IOError).
"""
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
@@ -102,3 +126,22 @@ def test_break_microphone(mocker):
sender.wait_until_done()
send_socket.assert_not_called()
def test_pyaudio_init_failure(mocker, zmq_context):
"""
Tests the behavior when PyAudio initialization fails (raises an IOError).
"""
# Prevent binding the ZMQ socket
mocker.patch("robot_interface.endpoints.audio_sender.AudioSender.create_socket")
# Simulate PyAudio() failing
mocker.patch(
"robot_interface.endpoints.audio_sender.pyaudio.PyAudio",
side_effect=IOError("boom")
)
sender = AudioSender(zmq_context)
assert sender.audio is None
assert sender.microphone is None

222
test/unit/test_main.py Normal file
View File

@@ -0,0 +1,222 @@
import pytest
import threading
import zmq
import robot_interface.main as main_mod
from robot_interface.state import state
class FakeSocket:
"""Mock ZMQ socket for testing."""
def __init__(self, socket_type, messages=None):
self.socket_type = socket_type
self.messages = messages or []
self.sent = []
self.closed = False
def recv_json(self):
if not self.messages:
raise RuntimeError("No more messages")
return self.messages.pop(0)
def send_json(self, msg):
self.sent.append(msg)
def getsockopt(self, opt):
if opt == zmq.TYPE:
return self.socket_type
def close(self):
self.closed = True
class FakeReceiver:
"""Base class for main/actuation receivers."""
def __init__(self, socket):
self.socket = socket
self._called = []
def handle_message(self, msg):
self._called.append(msg)
return {"endpoint": "pong", "data": "ok"}
def close(self):
pass
class DummySender:
"""Mock sender to test start methods."""
def __init__(self):
self.called = False
def start_video_rcv(self):
self.called = True
def start(self):
self.called = True
def close(self):
pass
@pytest.fixture
def fake_sockets():
"""Create default fake main and actuation sockets."""
main_sock = FakeSocket(zmq.REP)
act_sock = FakeSocket(zmq.SUB)
return main_sock, act_sock
@pytest.fixture
def fake_poll(monkeypatch):
"""Patch zmq.Poller to simulate a single polling cycle based on socket messages."""
class FakePoller:
def __init__(self):
self.registered = {}
self.used = False
def register(self, socket, flags):
self.registered[socket] = flags
def poll(self, timeout):
# Only return sockets that still have messages
active_socks = {
s: flags
for s, flags
in self.registered.items()
if getattr(s, "messages", [])
}
if active_socks:
return active_socks
# No more messages, exit loop
state.exit_event.set()
return {}
poller_instance = FakePoller()
monkeypatch.setattr(main_mod.zmq, "Poller", lambda: poller_instance)
return poller_instance
@pytest.fixture
def patched_main_components(monkeypatch, fake_sockets, fake_poll):
"""
Fixture to patch main receivers and senders with fakes.
Returns the fake instances for inspection in tests.
"""
main_sock, act_sock = fake_sockets
fake_main = FakeReceiver(main_sock)
fake_act = FakeReceiver(act_sock)
video_sender = DummySender()
audio_sender = DummySender()
monkeypatch.setattr(main_mod, "MainReceiver", lambda ctx: fake_main)
monkeypatch.setattr(main_mod, "ActuationReceiver", lambda ctx: fake_act)
monkeypatch.setattr(main_mod, "VideoSender", lambda ctx: video_sender)
monkeypatch.setattr(main_mod, "AudioSender", lambda ctx: audio_sender)
# Register sockets for the fake poller
fake_poll.registered = {main_sock: zmq.POLLIN, act_sock: zmq.POLLIN}
return fake_main, fake_act, video_sender, audio_sender
def test_main_loop_rep_response(patched_main_components):
"""REP socket returns proper response and handlers are called."""
state.initialize()
fake_main, fake_act, video_sender, audio_sender = patched_main_components
fake_main.socket.messages = [{"endpoint": "ping", "data": "x"}]
fake_act.socket.messages = [{"endpoint": "actuate/speech", "data": "hello"}]
main_mod.main_loop(object())
assert fake_main.socket.sent == [{"endpoint": "pong", "data": "ok"}]
assert fake_main._called
assert fake_act._called
assert video_sender.called
assert audio_sender.called
state.deinitialize()
@pytest.mark.parametrize(
"messages",
[
[{"no_endpoint": True}], # Invalid dict
[["not", "a", "dict"]] # Non-dict message
]
)
def test_main_loop_invalid_or_non_dict_message(patched_main_components, messages):
"""Invalid or non-dict messages are ignored."""
state.initialize()
fake_main, _, _, _ = patched_main_components
fake_main.socket.messages = messages
main_mod.main_loop(object())
assert fake_main.socket.sent == []
state.deinitialize()
def test_main_loop_handler_returns_none(patched_main_components, monkeypatch):
"""Handler returning None still triggers send_json(None)."""
state.initialize()
fake_main, _, _, _ = patched_main_components
class NoneHandler(FakeReceiver):
def handle_message(self, msg):
self._called.append(msg)
return None
monkeypatch.setattr(main_mod, "MainReceiver", lambda ctx: NoneHandler(fake_main.socket))
fake_main.socket.messages = [{"endpoint": "some", "data": None}]
main_mod.main_loop(object())
assert fake_main.socket.sent == [None]
state.deinitialize()
def test_main_loop_overtime_callback(patched_main_components, monkeypatch):
"""TimeBlock callback is triggered if handler takes too long."""
state.initialize()
fake_main, _, _, _ = patched_main_components
fake_main.socket.messages = [{"endpoint": "ping", "data": "x"}]
class FakeTimeBlock:
def __init__(self, callback, limit_ms):
self.callback = callback
def __enter__(self):
return self
def __exit__(self, *a):
self.callback(999.0)
monkeypatch.setattr(main_mod, "TimeBlock", FakeTimeBlock)
main_mod.main_loop(object())
assert fake_main.socket.sent == [{"endpoint": "pong", "data": "ok"}]
state.deinitialize()
def test_main_keyboard_interrupt(monkeypatch):
"""main() handles KeyboardInterrupt and cleans up."""
called = {"deinitialized": False, "term_called": False}
class FakeContext:
def term(self): called["term_called"] = True
monkeypatch.setattr(main_mod.zmq, "Context", lambda: FakeContext())
def raise_keyboard_interrupt(*_):
raise KeyboardInterrupt()
monkeypatch.setattr(main_mod, "main_loop", raise_keyboard_interrupt)
def fake_initialize():
state.is_initialized = True
state.exit_event = threading.Event()
def fake_deinitialize():
called["deinitialized"] = True
state.is_initialized = False
monkeypatch.setattr(main_mod.state, "initialize", fake_initialize)
monkeypatch.setattr(main_mod.state, "deinitialize", fake_deinitialize)
main_mod.main()
assert called["term_called"] is True
assert called["deinitialized"] is True

View File

@@ -7,11 +7,20 @@ from robot_interface.endpoints.main_receiver import MainReceiver
@pytest.fixture
def zmq_context():
"""
A pytest fixture that creates and yields a ZMQ context.
:return: An initialized ZeroMQ context.
:rtype: zmq.Context
"""
context = zmq.Context()
yield context
def test_handle_ping(zmq_context):
"""
Tests the receiver's ability to handle the "ping" endpoint with data.
"""
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({"endpoint": "ping", "data": "pong"})
@@ -22,6 +31,10 @@ def test_handle_ping(zmq_context):
def test_handle_ping_none(zmq_context):
"""
Tests the receiver's ability to handle the ping endpoint when the
data field is explicitly set to None.
"""
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({"endpoint": "ping", "data": None})
@@ -33,6 +46,9 @@ def test_handle_ping_none(zmq_context):
@mock.patch("robot_interface.endpoints.main_receiver.state")
def test_handle_negotiate_ports(mock_state, zmq_context):
"""
Tests the handling of the "negotiate/ports" endpoint.
"""
receiver = MainReceiver(zmq_context)
mock_state.sockets = [receiver]
@@ -54,6 +70,10 @@ def test_handle_negotiate_ports(mock_state, zmq_context):
def test_handle_unimplemented_endpoint(zmq_context):
"""
Tests that the receiver correctly handles a request to a completely
unknown or non-existent endpoint.
"""
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({
"endpoint": "some_endpoint_that_definitely_does_not_exist",
@@ -67,6 +87,13 @@ def test_handle_unimplemented_endpoint(zmq_context):
def test_handle_unimplemented_negotiation_endpoint(zmq_context):
"""
Tests handling a request to an unknown sub-endpoint within a known
group
The expected behavior is to return a specific "negotiate/error" response
with a descriptive error string.
"""
receiver = MainReceiver(zmq_context)
response = receiver.handle_message({
"endpoint": "negotiate/but_some_subpath_that_definitely_does_not_exist",

View File

@@ -7,6 +7,16 @@ from robot_interface.utils.microphone import choose_mic_default, choose_mic_inte
class MockPyAudio:
"""
A mock implementation of the PyAudio library class, designed for testing
microphone utility functions without requiring actual audio hardware.
It provides fake devices, including one input microphone, and implements
the core PyAudio methods required for device enumeration.
:ivar devices: A list of dictionaries representing mock audio devices.
:vartype devices: List[Dict[str, Any]]
"""
def __init__(self):
# You can predefine fake device info here
self.devices = [
@@ -37,18 +47,36 @@ class MockPyAudio:
]
def get_device_count(self):
"""Return the number of available mock devices."""
"""
Returns the number of available mock devices.
:return: The total number of devices in the mock list.
:rtype: int
"""
return len(self.devices)
def get_device_info_by_index(self, index):
"""Return information for a given mock device index."""
"""
Returns information for a given mock device index.
:param index: The index of the device to retrieve.
:type index: int
:return: A dictionary containing device information.
:rtype: Dict[str, Any]
"""
if 0 <= index < len(self.devices):
return self.devices[index]
else:
raise IOError("Invalid device index: {}".format(index))
def get_default_input_device_info(self):
"""Return info for a default mock input device."""
"""
Returns information for the default mock input device.
:return: A dictionary containing the default input device information.
:rtype: Dict[str, Any]
"""
for device in self.devices:
if device.get("maxInputChannels", 0) > 0:
return device
@@ -57,16 +85,32 @@ class MockPyAudio:
@pytest.fixture
def pyaudio_instance():
"""
A pytest fixture that returns an instance of the `MockPyAudio` class.
:return: An initialized instance of the mock PyAudio class.
:rtype: MockPyAudio
"""
return MockPyAudio()
def _raise_io_error():
"""
Helper function used to mock PyAudio methods that are expected to fail
when no device is available.
"""
raise IOError()
class TestAudioUnit(MicrophoneUtils):
"""Run shared audio behavior tests with the mock implementation."""
"""
Runs the shared microphone behavior tests defined in `MicrophoneUtils` using
the mock PyAudio implementation.
"""
def test_choose_mic_default_no_mic(self):
"""
Tests `choose_mic_default` when no microphones are available.
"""
mock_pyaudio = mock.Mock()
mock_pyaudio.get_device_count = mock.Mock(return_value=0L)
mock_pyaudio.get_default_input_device_info = _raise_io_error
@@ -76,6 +120,9 @@ class TestAudioUnit(MicrophoneUtils):
assert result is None
def test_choose_mic_interactive_no_mic(self):
"""
Tests `choose_mic_interactive` when no microphones are available.
"""
mock_pyaudio = mock.Mock()
mock_pyaudio.get_device_count = mock.Mock(return_value=0L)
mock_pyaudio.get_default_input_device_info = _raise_io_error

View File

@@ -0,0 +1,90 @@
import sys
# Import module under test
import robot_interface.utils.qi_utils as qi_utils
def reload_qi_utils_with(qi_module):
"""
Helper: reload qi_utils after injecting a fake qi module.
Python 2 uses built-in reload().
Just changing sys.modules[qi] won't affect the already imported module.
"""
if qi_module is None:
if "qi" in sys.modules:
del sys.modules["qi"]
else:
sys.modules["qi"] = qi_module
# Python 2 reload
global qi_utils
qi_utils = reload(qi_utils)
def test_get_qi_session_no_qi_module():
"""
Tests the 'qi is None' path.
"""
reload_qi_utils_with(None)
session = qi_utils.get_qi_session()
assert session is None
def test_get_qi_session_no_qi_url_argument(monkeypatch):
"""
Tests the '--qi-url not in sys.argv' path.
"""
class FakeQi:
pass
reload_qi_utils_with(FakeQi())
monkeypatch.setattr(sys, "argv", ["pytest"])
session = qi_utils.get_qi_session()
assert session is None
def test_get_qi_session_runtime_error(monkeypatch):
"""
Tests the 'exept RuntineError' path.
"""
class FakeApp:
def start(self):
raise RuntimeError("boom")
class FakeQi:
Application = lambda self=None: FakeApp()
reload_qi_utils_with(FakeQi())
monkeypatch.setattr(sys, "argv", ["pytest", "--qi-url", "tcp://localhost"])
session = qi_utils.get_qi_session()
assert session is None
def test_get_qi_session_success(monkeypatch):
"""
Tests a valid path.
"""
class FakeSession:
pass
class FakeApp:
def __init__(self):
self.session = FakeSession()
def start(self):
return True
class FakeQi:
Application = lambda self=None: FakeApp()
reload_qi_utils_with(FakeQi())
monkeypatch.setattr(sys, "argv", ["pytest", "--qi-url", "tcp://localhost"])
session = qi_utils.get_qi_session()
assert isinstance(session, FakeSession)

View File

@@ -0,0 +1,19 @@
import pytest
from robot_interface.endpoints.receiver_base import ReceiverBase
def test_receiver_base_not_implemented(monkeypatch):
"""
Ensure that the base ReceiverBase raises NotImplementedError when
handle_message is called on a subclass that does not implement it.
"""
# Patch the __abstractmethods__ to allow instantiation
monkeypatch.setattr(ReceiverBase, "__abstractmethods__", frozenset())
class DummyReceiver(ReceiverBase):
pass
dummy = DummyReceiver("dummy") # Can now instantiate
with pytest.raises(NotImplementedError):
dummy.handle_message({"endpoint": "dummy", "data": None})

View File

@@ -0,0 +1,55 @@
import mock
import zmq
from robot_interface.endpoints.socket_base import SocketBase
def test_close_covers_both_branches():
"""
Exercise both possible paths inside SocketBase.close():
- when no socket exists (should just return),
- when a socket object is present (its close() method should be called).
"""
sb = SocketBase("x")
# First check the case where socket is None. Nothing should happen here.
sb.close()
# Now simulate a real socket so the close() call is triggered.
fake_socket = mock.Mock()
sb.socket = fake_socket
sb.close()
fake_socket.close.assert_called_once()
def test_create_socket_and_endpoint_description_full_coverage():
"""
Test the less-commonly used branch of create_socket() where bind=False.
This covers:
- the loop that sets socket options,
- the connect() path,
- the logic in endpoint_description() that inverts self.bound.
"""
fake_context = mock.Mock()
fake_socket = mock.Mock()
# The context should hand back our fake socket object.
fake_context.socket.return_value = fake_socket
sb = SocketBase("id")
# Calling create_socket with bind=False forces the connect() code path.
sb.create_socket(
zmq_context=fake_context,
socket_type=zmq.SUB,
port=9999,
options=[(zmq.CONFLATE, 1)], # one option is enough to hit the loop
bind=False,
)
fake_socket.setsockopt.assert_called_once_with(zmq.CONFLATE, 1)
fake_socket.connect.assert_called_once_with("tcp://localhost:9999")
# Check that endpoint_description reflects bound=False -> "bind": True
desc = sb.endpoint_description()
assert desc == {"id": "id", "port": 9999, "bind": True}

108
test/unit/test_state.py Normal file
View File

@@ -0,0 +1,108 @@
import threading
import signal
import pytest
import mock
from robot_interface.state import State
def test_initialize_does_not_reinitialize():
"""
Check that calling `initialize` on an already initialized state does not change existing
attributes.
"""
state = State()
# Mock qi_session to avoid real session creation
mock_session = mock.MagicMock()
state.qi_session = mock_session
# Set state as already initialized
state.is_initialized = True
old_exit_event = state.exit_event
# Call initialize
state.initialize()
# Ensure existing attributes were not overwritten
assert state.exit_event == old_exit_event # exit_event should not be recreated
assert state.qi_session == mock_session # qi_session should not be replaced
assert state.is_initialized is True # is_initialized should remain True
def test_deinitialize_behavior():
"""Check that deinitialize closes sockets and updates the initialization state correctly."""
state = State()
# Case 1: Initialized with sockets
state.is_initialized = True
mock_socket_1 = mock.Mock()
mock_socket_2 = mock.Mock()
state.sockets = [mock_socket_1, mock_socket_2]
state.deinitialize()
# Sockets should be closed
mock_socket_1.close.assert_called_once()
mock_socket_2.close.assert_called_once()
# State should be marked as not initialized
assert not state.is_initialized
# Case 2: Not initialized, should not raise
state.is_initialized = False
state.sockets = []
state.deinitialize()
assert not state.is_initialized
def test_access_control_before_initialization():
"""Verify that accessing certain attributes before initialization raises RuntimeError."""
state = State()
with pytest.raises(RuntimeError, match=".*sockets.*"):
_ = state.sockets
with pytest.raises(RuntimeError, match=".*qi_session.*"):
_ = state.qi_session
def test_exit_event_before_initialized_returns_if_set():
"""Check that exit_event can be accessed even if state is not initialized,
but only if it is set."""
state = State()
# Manually create and set the exit_event
object.__setattr__(state, "exit_event", threading.Event())
object.__getattribute__(state, "exit_event").set()
# Should return the event without raising
assert state.exit_event.is_set()
def test_getattribute_allowed_attributes_before_init():
"""Ensure attributes allowed before initialization can be accessed without error."""
state = State()
assert callable(state.initialize)
assert callable(state.deinitialize)
assert state.is_initialized is False
assert state.__dict__ is not None
assert state.__class__.__name__ == "State"
assert state.__doc__ is not None
def test_signal_handler_sets_exit_event(monkeypatch):
"""Ensure SIGINT triggers the exit_event via signal handler."""
state = State()
# Patch get_qi_session to prevent real session creation
monkeypatch.setattr("robot_interface.state.get_qi_session", lambda: "dummy_session")
# Initialize state to set up signal handlers
state.initialize()
# Simulate SIGINT
signal_handler = signal.getsignal(signal.SIGINT)
signal_handler(None, None)
# Exit event should be set
assert state.exit_event.is_set()

View File

@@ -6,11 +6,21 @@ from robot_interface.utils.timeblock import TimeBlock
class AnyFloat(object):
"""
A helper class used in tests to assert that a mock function was called
with an argument that is specifically a float, regardless of its value.
It overrides the equality comparison (`__eq__`) to check only the type.
"""
def __eq__(self, other):
return isinstance(other, float)
def test_no_limit():
"""
Tests the scenario where the `TimeBlock` context manager is used without
a time limit.
"""
callback = mock.Mock()
with TimeBlock(callback):
@@ -20,6 +30,10 @@ def test_no_limit():
def test_exceed_limit():
"""
Tests the scenario where the execution time within the `TimeBlock`
exceeds the provided limit.
"""
callback = mock.Mock()
with TimeBlock(callback, 0):
@@ -29,6 +43,10 @@ def test_exceed_limit():
def test_within_limit():
"""
Tests the scenario where the execution time within the `TimeBlock`
stays within the provided limit.
"""
callback = mock.Mock()
with TimeBlock(callback, 5):

View File

@@ -0,0 +1,99 @@
# coding=utf-8
import mock
import pytest
import zmq
from robot_interface.endpoints.video_sender import VideoSender
from robot_interface.state import state
from robot_interface.core.config import settings
@pytest.fixture
def zmq_context():
"""Provide a ZMQ context."""
yield zmq.Context()
def _patch_basics(mocker):
"""Common patches: prevent real threads, port binds, and state errors."""
mocker.patch("robot_interface.endpoints.socket_base.zmq.Socket.bind")
mocker.patch("robot_interface.endpoints.video_sender.threading.Thread")
mocker.patch.object(state, "is_initialized", True)
def _patch_exit_event(mocker):
"""Make exit_event stop the loop after one iteration."""
fake_event = mock.Mock()
fake_event.is_set.side_effect = [False, True]
mocker.patch.object(state, "exit_event", fake_event)
def test_no_qi_session(zmq_context, mocker):
"""Video loop should not start without a qi_session."""
_patch_basics(mocker)
mocker.patch.object(state, "qi_session", None)
sender = VideoSender(zmq_context)
sender.start_video_rcv()
assert not hasattr(sender, "thread")
def test_video_streaming(zmq_context, mocker):
"""VideoSender should send retrieved image data."""
_patch_basics(mocker)
_patch_exit_event(mocker)
# Pepper's image buffer lives at index 6
mocker.patch.object(settings.video_config, "image_buffer", 6)
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.return_value = [None]*6 + ["fake_img"]
fake_session = mock.Mock()
fake_session.service.return_value = mock_video_service
mocker.patch.object(state, "qi_session", fake_session)
mocker.patch.object(
fake_session.service("ALVideoDevice"),
"subscribeCamera",
return_value="stream_name"
)
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send = send_socket
sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_called_with("fake_img")
def test_video_receive_error(zmq_context, mocker):
"""Errors retrieving images should not call send()."""
_patch_basics(mocker)
_patch_exit_event(mocker)
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.side_effect = Exception("boom")
fake_session = mock.Mock()
fake_session.service.return_value = mock_video_service
mocker.patch.object(state, "qi_session", fake_session)
mocker.patch.object(
fake_session.service("ALVideoDevice"),
"subscribeCamera",
return_value="stream_name"
)
sender = VideoSender(zmq_context)
send_socket = mock.Mock()
sender.socket.send = send_socket
sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_not_called()