Compare commits
1 Commits
feat/ci-cd
...
feat/ignor
| Author | SHA1 | Date | |
|---|---|---|---|
| b4814d431f |
@@ -1,77 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# This script checks if the current branch name follows the specified format.
|
||||
# It's designed to be used as a 'pre-commit' git hook.
|
||||
|
||||
# Format: <type>/<short-description>
|
||||
# Example: feat/add-user-login
|
||||
|
||||
# --- Configuration ---
|
||||
# An array of allowed commit types
|
||||
ALLOWED_TYPES=(feat fix refactor perf style test docs build chore revert)
|
||||
# An array of branches to ignore
|
||||
IGNORED_BRANCHES=(main dev demo)
|
||||
|
||||
# --- Colors for Output ---
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
# --- Helper Functions ---
|
||||
error_exit() {
|
||||
echo -e "${RED}ERROR: $1${NC}" >&2
|
||||
echo -e "${YELLOW}Branch name format is incorrect. Aborting commit.${NC}" >&2
|
||||
exit 1
|
||||
}
|
||||
|
||||
# --- Main Logic ---
|
||||
|
||||
# 1. Get the current branch name
|
||||
BRANCH_NAME=$(git symbolic-ref --short HEAD)
|
||||
|
||||
# 2. Check if the current branch is in the ignored list
|
||||
for ignored_branch in "${IGNORED_BRANCHES[@]}"; do
|
||||
if [ "$BRANCH_NAME" == "$ignored_branch" ]; then
|
||||
echo -e "${GREEN}Branch check skipped for default branch: $BRANCH_NAME${NC}"
|
||||
exit 0
|
||||
fi
|
||||
done
|
||||
|
||||
# 3. Validate the overall structure: <type>/<description>
|
||||
if ! [[ "$BRANCH_NAME" =~ ^[a-z]+/.+$ ]]; then
|
||||
error_exit "Branch name must be in the format: <type>/<short-description>\nExample: feat/add-user-login"
|
||||
fi
|
||||
|
||||
# 4. Extract the type and description
|
||||
TYPE=$(echo "$BRANCH_NAME" | cut -d'/' -f1)
|
||||
DESCRIPTION=$(echo "$BRANCH_NAME" | cut -d'/' -f2-)
|
||||
|
||||
# 5. Validate the <type>
|
||||
type_valid=false
|
||||
for allowed_type in "${ALLOWED_TYPES[@]}"; do
|
||||
if [ "$TYPE" == "$allowed_type" ]; then
|
||||
type_valid=true
|
||||
break
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "$type_valid" == false ]; then
|
||||
error_exit "Invalid type '$TYPE'.\nAllowed types are: ${ALLOWED_TYPES[*]}"
|
||||
fi
|
||||
|
||||
# 6. Validate the <short-description>
|
||||
# Regex breakdown:
|
||||
# ^[a-z0-9]+ - Starts with one or more lowercase letters/numbers (the first word).
|
||||
# (-[a-z0-9]+){0,5} - Followed by a group of (dash + word) 0 to 5 times.
|
||||
# $ - End of the string.
|
||||
# This entire pattern enforces 1 to 6 words total, separated by dashes.
|
||||
DESCRIPTION_REGEX="^[a-z0-9]+(-[a-z0-9]+){0,5}$"
|
||||
|
||||
if ! [[ "$DESCRIPTION" =~ $DESCRIPTION_REGEX ]]; then
|
||||
error_exit "Invalid short description '$DESCRIPTION'.\nIt must be a maximum of 6 words, all lowercase, separated by dashes.\nExample: add-new-user-authentication-feature"
|
||||
fi
|
||||
|
||||
# If all checks pass, exit successfully
|
||||
echo -e "${GREEN}Branch name '$BRANCH_NAME' is valid.${NC}"
|
||||
exit 0
|
||||
@@ -1,135 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# This script checks if a commit message follows the specified format.
|
||||
# It's designed to be used as a 'commit-msg' git hook.
|
||||
|
||||
# Format:
|
||||
# <type>: <short description>
|
||||
#
|
||||
# [optional]<body>
|
||||
#
|
||||
# [ref/close]: <issue identifier>
|
||||
|
||||
# --- Configuration ---
|
||||
# An array of allowed commit types
|
||||
ALLOWED_TYPES=(feat fix refactor perf style test docs build chore revert)
|
||||
|
||||
# --- Colors for Output ---
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
# The first argument to the hook is the path to the file containing the commit message
|
||||
COMMIT_MSG_FILE=$1
|
||||
|
||||
# --- Automated Commit Detection ---
|
||||
|
||||
# Read the first line (header) for initial checks
|
||||
HEADER=$(head -n 1 "$COMMIT_MSG_FILE")
|
||||
|
||||
# Check for Merge commits (covers 'git merge' and PR merges from GitHub/GitLab)
|
||||
# Examples: "Merge branch 'main' into ...", "Merge pull request #123 from ..."
|
||||
MERGE_PATTERN="^Merge (remote-tracking )?(branch|pull request|tag) .*"
|
||||
if [[ "$HEADER" =~ $MERGE_PATTERN ]]; then
|
||||
echo -e "${GREEN}Merge commit detected by message content. Skipping validation.${NC}"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Check for Revert commits
|
||||
# Example: "Revert "feat: add new feature""
|
||||
REVERT_PATTERN="^Revert \".*\""
|
||||
if [[ "$HEADER" =~ $REVERT_PATTERN ]]; then
|
||||
echo -e "${GREEN}Revert commit detected by message content. Skipping validation.${NC}"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Check for Cherry-pick commits (this pattern appears at the end of the message)
|
||||
# Example: "(cherry picked from commit deadbeef...)"
|
||||
# We use grep -q to search the whole file quietly.
|
||||
CHERRY_PICK_PATTERN="\(cherry picked from commit [a-f0-9]{7,40}\)"
|
||||
if grep -qE "$CHERRY_PICK_PATTERN" "$COMMIT_MSG_FILE"; then
|
||||
echo -e "${GREEN}Cherry-pick detected by message content. Skipping validation.${NC}"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Check for Squash
|
||||
# Example: "Squash commits ..."
|
||||
SQUASH_PATTERN="^Squash .+"
|
||||
if [[ "$HEADER" =~ $SQUASH_PATTERN ]]; then
|
||||
echo -e "${GREEN}Squash commit detected by message content. Skipping validation.${NC}"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# --- Validation Functions ---
|
||||
|
||||
# Function to print an error message and exit
|
||||
# Usage: error_exit "Your error message here"
|
||||
error_exit() {
|
||||
# >&2 redirects echo to stderr
|
||||
echo -e "${RED}ERROR: $1${NC}" >&2
|
||||
echo -e "${YELLOW}Commit message format is incorrect. Aborting commit.${NC}" >&2
|
||||
exit 1
|
||||
}
|
||||
|
||||
# --- Main Logic ---
|
||||
|
||||
# 1. Read the header (first line) of the commit message
|
||||
HEADER=$(head -n 1 "$COMMIT_MSG_FILE")
|
||||
|
||||
# 2. Validate the header format: <type>: <description>
|
||||
# Regex breakdown:
|
||||
# ^(type1|type2|...) - Starts with one of the allowed types
|
||||
# : - Followed by a literal colon
|
||||
# \s - Followed by a single space
|
||||
# .+ - Followed by one or more characters for the description
|
||||
# $ - End of the line
|
||||
TYPES_REGEX=$(
|
||||
IFS="|"
|
||||
echo "${ALLOWED_TYPES[*]}"
|
||||
)
|
||||
HEADER_REGEX="^($TYPES_REGEX): .+$"
|
||||
|
||||
if ! [[ "$HEADER" =~ $HEADER_REGEX ]]; then
|
||||
error_exit "Invalid header format.\n\nHeader must be in the format: <type>: <short description>\nAllowed types: ${ALLOWED_TYPES[*]}\nExample: feat: add new user authentication feature"
|
||||
fi
|
||||
|
||||
# Only validate footer if commit type is not chore
|
||||
TYPE=$(echo "$HEADER" | cut -d':' -f1)
|
||||
if [ "$TYPE" != "chore" ]; then
|
||||
# 3. Validate the footer (last line) of the commit message
|
||||
FOOTER=$(tail -n 1 "$COMMIT_MSG_FILE")
|
||||
|
||||
# Regex breakdown:
|
||||
# ^(ref|close) - Starts with 'ref' or 'close'
|
||||
# : - Followed by a literal colon
|
||||
# \s - Followed by a single space
|
||||
# N25B- - Followed by the literal string 'N25B-'
|
||||
# [0-9]+ - Followed by one or more digits
|
||||
# $ - End of the line
|
||||
FOOTER_REGEX="^(ref|close): N25B-[0-9]+$"
|
||||
|
||||
if ! [[ "$FOOTER" =~ $FOOTER_REGEX ]]; then
|
||||
error_exit "Invalid footer format.\n\nFooter must be in the format: [ref/close]: <issue identifier>\nExample: ref: N25B-123"
|
||||
fi
|
||||
fi
|
||||
|
||||
# 4. If the message has more than 2 lines, validate the separator
|
||||
# A blank line must exist between the header and the body.
|
||||
LINE_COUNT=$(wc -l <"$COMMIT_MSG_FILE" | xargs) # xargs trims whitespace
|
||||
|
||||
# We only care if there is a body. Header + Footer = 2 lines.
|
||||
# Header + Blank Line + Body... + Footer > 2 lines.
|
||||
if [ "$LINE_COUNT" -gt 2 ]; then
|
||||
# Get the second line
|
||||
SECOND_LINE=$(sed -n '2p' "$COMMIT_MSG_FILE")
|
||||
|
||||
# Check if the second line is NOT empty. If it's not, it's an error.
|
||||
if [ -n "$SECOND_LINE" ]; then
|
||||
error_exit "Missing blank line between header and body.\n\nThe second line of your commit message must be empty if a body is present."
|
||||
fi
|
||||
fi
|
||||
|
||||
# If all checks pass, exit with success
|
||||
echo -e "${GREEN}Commit message is valid.${NC}"
|
||||
exit 0
|
||||
16
.githooks/commit-msg
Normal file
16
.githooks/commit-msg
Normal file
@@ -0,0 +1,16 @@
|
||||
#!/bin/sh
|
||||
|
||||
commit_msg_file=$1
|
||||
commit_msg=$(cat "$commit_msg_file")
|
||||
|
||||
if echo "$commit_msg" | grep -Eq "^(feat|fix|refactor|perf|style|test|docs|build|chore|revert): .+"; then
|
||||
if echo "$commit_msg" | grep -Eq "^(ref|close):\sN25B-.+"; then
|
||||
exit 0
|
||||
else
|
||||
echo "❌ Commit message invalid! Must end with [ref/close]: N25B-000"
|
||||
exit 1
|
||||
fi
|
||||
else
|
||||
echo "❌ Commit message invalid! Must start with <type>: <description>"
|
||||
exit 1
|
||||
fi
|
||||
17
.githooks/pre-commit
Normal file
17
.githooks/pre-commit
Normal file
@@ -0,0 +1,17 @@
|
||||
#!/bin/sh
|
||||
|
||||
# Get current branch
|
||||
branch=$(git rev-parse --abbrev-ref HEAD)
|
||||
|
||||
if echo "$branch" | grep -Eq "(dev|main)"; then
|
||||
echo 0
|
||||
fi
|
||||
|
||||
# allowed pattern <type/>
|
||||
if echo "$branch" | grep -Eq "^(feat|fix|refactor|perf|style|test|docs|build|chore|revert)\/\w+(-\w+){0,5}$"; then
|
||||
exit 0
|
||||
else
|
||||
echo "❌ Invalid branch name: $branch"
|
||||
echo "Branch must be named <type>/<description-of-branch> (must have one to six words separated by a dash)"
|
||||
exit 1
|
||||
fi
|
||||
9
.githooks/prepare-commit-msg
Normal file
9
.githooks/prepare-commit-msg
Normal file
@@ -0,0 +1,9 @@
|
||||
#!/bin/sh
|
||||
|
||||
echo "#<type>: <description>
|
||||
|
||||
#[optional body]
|
||||
|
||||
#[optional footer(s)]
|
||||
|
||||
#[ref/close]: <issue identifier>" > $1
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -217,6 +217,3 @@ __marimo__/
|
||||
|
||||
.DS_Store
|
||||
|
||||
# Docs
|
||||
docs/*
|
||||
!docs/conf.py
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
# ---------- GLOBAL SETUP ---------- #
|
||||
workflow:
|
||||
rules:
|
||||
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
|
||||
|
||||
stages:
|
||||
- install
|
||||
- test
|
||||
|
||||
default:
|
||||
image: qi-py-ri-base:latest
|
||||
cache:
|
||||
key: "${CI_COMMIT_REF_SLUG}"
|
||||
paths:
|
||||
- .venv/
|
||||
policy: pull-push
|
||||
|
||||
# --------- INSTALLING --------- #
|
||||
install:
|
||||
stage: install
|
||||
tags:
|
||||
- install
|
||||
script:
|
||||
- python -m virtualenv .venv
|
||||
- source .venv/bin/activate
|
||||
- echo /qi/pynaoqi-python2.7-2.5.7.1-linux64/lib/python2.7/site-packages/ > .venv/lib/python2.7/site-packages/pynaoqi-python2.7.pth
|
||||
- pip install -r requirements.txt
|
||||
artifacts:
|
||||
paths:
|
||||
- .venv/
|
||||
expire_in: 1h
|
||||
|
||||
# ---------- TESTING ---------- #
|
||||
test:
|
||||
stage: test
|
||||
needs:
|
||||
- install
|
||||
tags:
|
||||
- test
|
||||
script:
|
||||
- source .venv/bin/activate
|
||||
- PYTHONPATH=src pytest test/
|
||||
@@ -1,15 +0,0 @@
|
||||
repos:
|
||||
- repo: local
|
||||
hooks:
|
||||
- id: check-commit-msg
|
||||
name: Check commit message format
|
||||
entry: .githooks/check-commit-msg.sh
|
||||
language: script
|
||||
stages: [commit-msg]
|
||||
- id: check-branch-name
|
||||
name: Check branch name format
|
||||
entry: .githooks/check-branch-name.sh
|
||||
language: script
|
||||
stages: [commit]
|
||||
always_run: true
|
||||
pass_filenames: false
|
||||
61
README.md
61
README.md
@@ -34,18 +34,6 @@ python -m virtualenv .venv
|
||||
source .venv/bin/activate
|
||||
```
|
||||
|
||||
We depend on PortAudio for the `pyaudio` package, so install it with:
|
||||
|
||||
```bash
|
||||
sudo apt install -y portaudio19-dev
|
||||
```
|
||||
|
||||
On WSL, also install:
|
||||
|
||||
```bash
|
||||
sudo apt install -y libasound2-plugins
|
||||
```
|
||||
|
||||
Install the required packages with
|
||||
|
||||
```bash
|
||||
@@ -110,8 +98,6 @@ $env:PYTHONPATH="src"; python -m robot_interface.main
|
||||
|
||||
With both, if you want to connect to the actual robot (or simulator), pass the `--qi-url` argument.
|
||||
|
||||
There's also a `--microphone` argument that can be used to choose a microphone to use. If not given, the program will try the default microphone. If you don't know the name of the microphone, pass the argument with any value, and it will list the names of available microphones.
|
||||
|
||||
|
||||
|
||||
## Testing
|
||||
@@ -134,46 +120,15 @@ For coverage, add `--cov=robot_interface` as an argument to `pytest`.
|
||||
|
||||
|
||||
|
||||
## Git Hooks
|
||||
## GitHooks
|
||||
|
||||
To activate automatic linting, formatting, branch name checks and commit message checks, run (after installing requirements):
|
||||
To activate automatic commits/branch name checks run:
|
||||
|
||||
```bash
|
||||
pre-commit install
|
||||
pre-commit install --hook-type commit-msg
|
||||
```shell
|
||||
git config --local core.hooksPath .githooks
|
||||
```
|
||||
|
||||
You might get an error along the lines of `Can't install pre-commit with core.hooksPath` set. To fix this, simply unset the hooksPath by running:
|
||||
|
||||
```bash
|
||||
git config --local --unset core.hooksPath
|
||||
```
|
||||
|
||||
Then run the pre-commit install commands again.
|
||||
|
||||
## Documentation
|
||||
Generate documentation web pages using:
|
||||
|
||||
### Linux & macOS
|
||||
```bash
|
||||
PYTHONPATH=src sphinx-apidoc -F -o docs src/robot_interface
|
||||
```
|
||||
|
||||
### Windows
|
||||
```bash
|
||||
$env:PYTHONPATH="src"; sphinx-apidoc -F -o docs src/control_backend
|
||||
```
|
||||
|
||||
Optionally, in the `conf.py` file in the new `docs` folder, change preferences.
|
||||
|
||||
In the `docs` folder:
|
||||
|
||||
### Linux & macOS
|
||||
```bash
|
||||
make html
|
||||
```
|
||||
|
||||
### Windows
|
||||
```bash
|
||||
.\make.bat html
|
||||
```
|
||||
If your commit fails its either:
|
||||
branch name != <type>/description-of-branch ,
|
||||
commit name != <type>: description of the commit.
|
||||
<ref>: N25B-Num's
|
||||
|
||||
184
docs/conf.py
184
docs/conf.py
@@ -1,184 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Configuration file for the Sphinx documentation builder.
|
||||
#
|
||||
# This file does only contain a selection of the most common options. For a
|
||||
# full list see the documentation:
|
||||
# http://www.sphinx-doc.org/en/master/config
|
||||
|
||||
# -- Path setup --------------------------------------------------------------
|
||||
|
||||
# If extensions (or modules to document with autodoc) are in another directory,
|
||||
# add these directories to sys.path here. If the directory is relative to the
|
||||
# documentation root, use os.path.abspath to make it absolute, like shown here.
|
||||
#
|
||||
import os
|
||||
import sys
|
||||
sys.path.insert(0, os.path.abspath("../src"))
|
||||
|
||||
|
||||
# -- Project information -----------------------------------------------------
|
||||
|
||||
project = u'robot_interface'
|
||||
copyright = u'2025, Author'
|
||||
author = u'Author'
|
||||
|
||||
# The short X.Y version
|
||||
version = u''
|
||||
# The full version, including alpha/beta/rc tags
|
||||
release = u''
|
||||
|
||||
|
||||
# -- General configuration ---------------------------------------------------
|
||||
|
||||
# If your documentation needs a minimal Sphinx version, state it here.
|
||||
#
|
||||
# needs_sphinx = '1.0'
|
||||
|
||||
# Add any Sphinx extension module names here, as strings. They can be
|
||||
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
|
||||
# ones.
|
||||
extensions = [
|
||||
'sphinx.ext.autodoc',
|
||||
'sphinx.ext.viewcode',
|
||||
'sphinx.ext.todo',
|
||||
]
|
||||
|
||||
# Add any paths that contain templates here, relative to this directory.
|
||||
templates_path = ['_templates']
|
||||
|
||||
# The suffix(es) of source filenames.
|
||||
# You can specify multiple suffix as a list of string:
|
||||
#
|
||||
# source_suffix = ['.rst', '.md']
|
||||
source_suffix = '.rst'
|
||||
|
||||
# The master toctree document.
|
||||
master_doc = 'index'
|
||||
|
||||
# The language for content autogenerated by Sphinx. Refer to documentation
|
||||
# for a list of supported languages.
|
||||
#
|
||||
# This is also used if you do content translation via gettext catalogs.
|
||||
# Usually you set "language" from the command line for these cases.
|
||||
language = 'en'
|
||||
|
||||
# List of patterns, relative to source directory, that match files and
|
||||
# directories to ignore when looking for source files.
|
||||
# This pattern also affects html_static_path and html_extra_path.
|
||||
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
|
||||
|
||||
# The name of the Pygments (syntax highlighting) style to use.
|
||||
pygments_style = None
|
||||
|
||||
|
||||
# -- Options for HTML output -------------------------------------------------
|
||||
|
||||
# The theme to use for HTML and HTML Help pages. See the documentation for
|
||||
# a list of builtin themes.
|
||||
#
|
||||
html_theme = 'sphinx_rtd_theme'
|
||||
|
||||
# Theme options are theme-specific and customize the look and feel of a theme
|
||||
# further. For a list of options available for each theme, see the
|
||||
# documentation.
|
||||
#
|
||||
# html_theme_options = {}
|
||||
|
||||
# Add any paths that contain custom static files (such as style sheets) here,
|
||||
# relative to this directory. They are copied after the builtin static files,
|
||||
# so a file named "default.css" will overwrite the builtin "default.css".
|
||||
html_static_path = ['_static']
|
||||
|
||||
# Custom sidebar templates, must be a dictionary that maps document names
|
||||
# to template names.
|
||||
#
|
||||
# The default sidebars (for documents that don't match any pattern) are
|
||||
# defined by theme itself. Builtin themes are using these templates by
|
||||
# default: ``['localtoc.html', 'relations.html', 'sourcelink.html',
|
||||
# 'searchbox.html']``.
|
||||
#
|
||||
# html_sidebars = {}
|
||||
|
||||
|
||||
# -- Options for HTMLHelp output ---------------------------------------------
|
||||
|
||||
# Output file base name for HTML help builder.
|
||||
htmlhelp_basename = 'robot_interfacedoc'
|
||||
|
||||
|
||||
# -- Options for LaTeX output ------------------------------------------------
|
||||
|
||||
latex_elements = {
|
||||
# The paper size ('letterpaper' or 'a4paper').
|
||||
#
|
||||
# 'papersize': 'letterpaper',
|
||||
|
||||
# The font size ('10pt', '11pt' or '12pt').
|
||||
#
|
||||
# 'pointsize': '10pt',
|
||||
|
||||
# Additional stuff for the LaTeX preamble.
|
||||
#
|
||||
# 'preamble': '',
|
||||
|
||||
# Latex figure (float) alignment
|
||||
#
|
||||
# 'figure_align': 'htbp',
|
||||
}
|
||||
|
||||
# Grouping the document tree into LaTeX files. List of tuples
|
||||
# (source start file, target name, title,
|
||||
# author, documentclass [howto, manual, or own class]).
|
||||
latex_documents = [
|
||||
(master_doc, 'robot_interface.tex', u'robot\\_interface Documentation',
|
||||
u'Author', 'manual'),
|
||||
]
|
||||
|
||||
|
||||
# -- Options for manual page output ------------------------------------------
|
||||
|
||||
# One entry per manual page. List of tuples
|
||||
# (source start file, name, description, authors, manual section).
|
||||
man_pages = [
|
||||
(master_doc, 'robot_interface', u'robot_interface Documentation',
|
||||
[author], 1)
|
||||
]
|
||||
|
||||
|
||||
# -- Options for Texinfo output ----------------------------------------------
|
||||
|
||||
# Grouping the document tree into Texinfo files. List of tuples
|
||||
# (source start file, target name, title, author,
|
||||
# dir menu entry, description, category)
|
||||
texinfo_documents = [
|
||||
(master_doc, 'robot_interface', u'robot_interface Documentation',
|
||||
author, 'robot_interface', 'One line description of project.',
|
||||
'Miscellaneous'),
|
||||
]
|
||||
|
||||
|
||||
# -- Options for Epub output -------------------------------------------------
|
||||
|
||||
# Bibliographic Dublin Core info.
|
||||
epub_title = project
|
||||
|
||||
# The unique identifier of the text. This can be a ISBN number
|
||||
# or the project homepage.
|
||||
#
|
||||
# epub_identifier = ''
|
||||
|
||||
# A unique identification for the text.
|
||||
#
|
||||
# epub_uid = ''
|
||||
|
||||
# A list of files that should not be packed into the epub file.
|
||||
epub_exclude_files = ['search.html']
|
||||
|
||||
|
||||
# -- Extension configuration -------------------------------------------------
|
||||
|
||||
# -- Options for todo extension ----------------------------------------------
|
||||
|
||||
# If true, `todo` and `todoList` produce output, else they produce nothing.
|
||||
todo_include_todos = True
|
||||
@@ -3,6 +3,3 @@ pyaudio<=0.2.11
|
||||
pytest<5
|
||||
pytest-mock<3.0.0
|
||||
pytest-cov<3.0.0
|
||||
sphinx
|
||||
sphinx_rtd_theme
|
||||
pre-commit
|
||||
|
||||
@@ -1,115 +0,0 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
|
||||
class AgentSettings(object):
|
||||
"""
|
||||
Agent port configuration.
|
||||
|
||||
:ivar actuating_receiver_port: Port for receiving actuation commands.
|
||||
:vartype actuating_receiver_port: int
|
||||
:ivar main_receiver_port: Port for receiving main messages.
|
||||
:vartype main_receiver_port: int
|
||||
:ivar video_sender_port: Port used for sending video frames.
|
||||
:vartype video_sender_port: int
|
||||
:ivar audio_sender_port: Port used for sending audio data.
|
||||
:vartype audio_sender_port: int
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
actuating_receiver_port=5557,
|
||||
main_receiver_port=5555,
|
||||
video_sender_port=5556,
|
||||
audio_sender_port=5558,
|
||||
):
|
||||
self.actuating_receiver_port = actuating_receiver_port
|
||||
self.main_receiver_port = main_receiver_port
|
||||
self.video_sender_port = video_sender_port
|
||||
self.audio_sender_port = audio_sender_port
|
||||
|
||||
|
||||
class VideoConfig(object):
|
||||
"""
|
||||
Video configuration constants.
|
||||
|
||||
:ivar camera_index: Index of the camera used.
|
||||
:vartype camera_index: int
|
||||
:ivar resolution: Video resolution mode.
|
||||
:vartype resolution: int
|
||||
:ivar color_space: Color space identifier.
|
||||
:vartype color_space: int
|
||||
:ivar fps: Frames per second of the video stream.
|
||||
:vartype fps: int
|
||||
:ivar stream_name: Name of the video stream.
|
||||
:vartype stream_name: str
|
||||
:ivar image_buffer: Internal buffer size for video frames.
|
||||
:vartype image_buffer: int
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
camera_index=0,
|
||||
resolution=2,
|
||||
color_space=11,
|
||||
fps=15,
|
||||
stream_name="Pepper Video",
|
||||
image_buffer=6,
|
||||
):
|
||||
self.camera_index = camera_index
|
||||
self.resolution = resolution
|
||||
self.color_space = color_space
|
||||
self.fps = fps
|
||||
self.stream_name = stream_name
|
||||
self.image_buffer = image_buffer
|
||||
|
||||
|
||||
class AudioConfig(object):
|
||||
"""
|
||||
Audio configuration constants.
|
||||
|
||||
:ivar sample_rate: Audio sampling rate in Hz.
|
||||
:vartype sample_rate: int
|
||||
:ivar chunk_size: Size of audio chunks to capture/process.
|
||||
:vartype chunk_size: int
|
||||
:ivar channels: Number of audio channels.
|
||||
:vartype channels: int
|
||||
"""
|
||||
def __init__(self, sample_rate=16000, chunk_size=512, channels=1):
|
||||
self.sample_rate = sample_rate
|
||||
self.chunk_size = chunk_size
|
||||
self.channels = channels
|
||||
|
||||
|
||||
class MainConfig(object):
|
||||
"""
|
||||
Main system configuration.
|
||||
|
||||
:ivar poll_timeout_ms: Timeout for polling events, in milliseconds.
|
||||
:vartype poll_timeout_ms: int
|
||||
:ivar max_handler_time_ms: Maximum allowed handler time, in milliseconds.
|
||||
:vartype max_handler_time_ms: int
|
||||
"""
|
||||
def __init__(self, poll_timeout_ms=100, max_handler_time_ms=50):
|
||||
self.poll_timeout_ms = poll_timeout_ms
|
||||
self.max_handler_time_ms = max_handler_time_ms
|
||||
|
||||
|
||||
class Settings(object):
|
||||
"""
|
||||
Global settings container.
|
||||
|
||||
:ivar agent_settings: Agent-related port configuration.
|
||||
:vartype agent_settings: AgentSettings
|
||||
:ivar video_config: Video stream configuration.
|
||||
:vartype video_config: VideoConfig
|
||||
:ivar audio_config: Audio stream configuration.
|
||||
:vartype audio_config: AudioConfig
|
||||
:ivar main_config: Main system-level configuration.
|
||||
:vartype main_config: MainConfig
|
||||
"""
|
||||
def __init__(self, agent_settings=None, video_config=None, audio_config=None, main_config=None):
|
||||
self.agent_settings = agent_settings or AgentSettings()
|
||||
self.video_config = video_config or VideoConfig()
|
||||
self.audio_config = audio_config or AudioConfig()
|
||||
self.main_config = main_config or MainConfig()
|
||||
|
||||
|
||||
settings = Settings()
|
||||
@@ -1,4 +1,3 @@
|
||||
from __future__ import unicode_literals # So that we can log texts with Unicode characters
|
||||
import logging
|
||||
|
||||
import zmq
|
||||
@@ -6,35 +5,25 @@ import zmq
|
||||
from robot_interface.endpoints.receiver_base import ReceiverBase
|
||||
from robot_interface.state import state
|
||||
|
||||
from robot_interface.core.config import settings
|
||||
|
||||
|
||||
class ActuationReceiver(ReceiverBase):
|
||||
"""
|
||||
The actuation receiver endpoint, responsible for handling speech and gesture requests.
|
||||
def __init__(self, zmq_context, port=5557):
|
||||
"""
|
||||
The actuation receiver endpoint, responsible for handling speech and gesture requests.
|
||||
|
||||
:param zmq_context: The ZeroMQ context to use.
|
||||
:type zmq_context: zmq.Context
|
||||
:param zmq_context: The ZeroMQ context to use.
|
||||
:type zmq_context: zmq.Context
|
||||
|
||||
:param port: The port to use.
|
||||
:type port: int
|
||||
|
||||
:ivar _tts_service: The text-to-speech service object from the Qi session.
|
||||
:vartype _tts_service: qi.Session | None
|
||||
"""
|
||||
def __init__(self, zmq_context, port=settings.agent_settings.actuating_receiver_port):
|
||||
:param port: The port to use.
|
||||
:type port: int
|
||||
"""
|
||||
super(ActuationReceiver, self).__init__("actuation")
|
||||
self.create_socket(zmq_context, zmq.SUB, port)
|
||||
self.socket.setsockopt_string(zmq.SUBSCRIBE, u"") # Causes block if given in options
|
||||
self._tts_service = None
|
||||
self._al_memory = None
|
||||
|
||||
def _handle_speech(self, message):
|
||||
"""
|
||||
Handle a speech actuation request.
|
||||
|
||||
:param message: The message to handle, must contain properties "endpoint" and "data".
|
||||
:type message: dict
|
||||
"""
|
||||
text = message.get("data")
|
||||
if not text:
|
||||
logging.warn("Received message to speak, but it lacks data.")
|
||||
@@ -52,16 +41,26 @@ class ActuationReceiver(ReceiverBase):
|
||||
|
||||
if not self._tts_service:
|
||||
self._tts_service = state.qi_session.service("ALTextToSpeech")
|
||||
if not self._al_memory:
|
||||
self._al_memory = state.qi_session.service("ALMemory")
|
||||
|
||||
# Subscribe to speech end event
|
||||
self.status_subscriber = self._al_memory.subscriber("ALTextToSpeech/Status") # self because garbage collect
|
||||
self.status_subscriber.signal.connect(self._on_status_changed)
|
||||
|
||||
# Returns instantly. Messages received while speaking will be queued.
|
||||
qi.async(self._tts_service.say, text)
|
||||
|
||||
def handle_message(self, message):
|
||||
"""
|
||||
Handle an actuation/speech message with the receiver.
|
||||
@staticmethod
|
||||
def _on_status_changed(value): # value will contain either 'enqueued', 'started' or 'done' depending on the status
|
||||
"""Callback function for when the speaking status changes. Will change the is_speaking value of the state."""
|
||||
if "started" in value:
|
||||
logging.debug("Started speaking.")
|
||||
state.is_speaking = True
|
||||
if "done" in value:
|
||||
logging.debug("Done speaking.")
|
||||
state.is_speaking = False
|
||||
|
||||
:param message: The message to handle, must contain properties "endpoint" and "data".
|
||||
:type message: dict
|
||||
"""
|
||||
def handle_message(self, message):
|
||||
if message["endpoint"] == "actuate/speech":
|
||||
self._handle_speech(message)
|
||||
|
||||
@@ -7,49 +7,23 @@ import zmq
|
||||
|
||||
from robot_interface.endpoints.socket_base import SocketBase
|
||||
from robot_interface.state import state
|
||||
from robot_interface.utils.microphone import choose_mic
|
||||
from robot_interface.core.config import settings
|
||||
from robot_interface.utils.microphone import choose_mic_default
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AudioSender(SocketBase):
|
||||
"""
|
||||
Audio sender endpoint, responsible for sending microphone audio data.
|
||||
|
||||
:param zmq_context: The ZeroMQ context to use.
|
||||
:type zmq_context: zmq.Context
|
||||
|
||||
:param port: The port to use.
|
||||
:type port: int
|
||||
|
||||
:ivar thread: Thread used for sending audio.
|
||||
:vartype thread: threading.Thread | None
|
||||
|
||||
:ivar audio: PyAudio instance.
|
||||
:vartype audio: pyaudio.PyAudio | None
|
||||
|
||||
:ivar microphone: Selected microphone information.
|
||||
:vartype microphone: dict | None
|
||||
"""
|
||||
def __init__(self, zmq_context, port=settings.agent_settings.audio_sender_port):
|
||||
def __init__(self, zmq_context, port=5558):
|
||||
super(AudioSender, self).__init__(str("audio")) # Convert future's unicode_literal to str
|
||||
self.create_socket(zmq_context, zmq.PUB, port)
|
||||
self.audio = pyaudio.PyAudio()
|
||||
self.microphone = choose_mic_default(self.audio)
|
||||
self.thread = None
|
||||
|
||||
try:
|
||||
self.audio = pyaudio.PyAudio()
|
||||
self.microphone = choose_mic(self.audio)
|
||||
except IOError as e:
|
||||
logger.warning("PyAudio is not available.", exc_info=e)
|
||||
self.audio = None
|
||||
self.microphone = None
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start sending audio in a different thread.
|
||||
|
||||
Will not start if no microphone is available.
|
||||
"""
|
||||
if not self.microphone:
|
||||
logger.info("Not listening: no microphone available.")
|
||||
@@ -61,26 +35,21 @@ class AudioSender(SocketBase):
|
||||
|
||||
def wait_until_done(self):
|
||||
"""
|
||||
Wait until the audio thread is done.
|
||||
|
||||
Will block until `state.exit_event` is set. If the thread is not running, does nothing.
|
||||
Wait until the audio thread is done. Will only be done if `state.exit_event` is set, so
|
||||
make sure to set that before calling this method or it will block.
|
||||
"""
|
||||
if not self.thread: return
|
||||
self.thread.join()
|
||||
self.thread = None
|
||||
|
||||
def _stream(self):
|
||||
"""
|
||||
Internal method to continuously read audio from the microphone and send it over the socket.
|
||||
"""
|
||||
audio_settings = settings.audio_config
|
||||
chunk = audio_settings.chunk_size # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD
|
||||
chunk = 512 # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD
|
||||
|
||||
# Docs say this only raises an error if neither `input` nor `output` is True
|
||||
stream = self.audio.open(
|
||||
format=pyaudio.paFloat32,
|
||||
channels=audio_settings.channels,
|
||||
rate=audio_settings.sample_rate,
|
||||
channels=1,
|
||||
rate=16000,
|
||||
input=True,
|
||||
input_device_index=self.microphone["index"],
|
||||
frames_per_buffer=chunk,
|
||||
@@ -88,6 +57,13 @@ class AudioSender(SocketBase):
|
||||
|
||||
try:
|
||||
while not state.exit_event.is_set():
|
||||
# Don't send audio if Pepper is speaking
|
||||
if state.is_speaking:
|
||||
if stream.is_active(): stream.stop_stream()
|
||||
continue
|
||||
|
||||
if stream.is_stopped(): stream.start_stream()
|
||||
|
||||
data = stream.read(chunk)
|
||||
self.socket.send(data)
|
||||
except IOError as e:
|
||||
|
||||
@@ -3,50 +3,28 @@ import zmq
|
||||
from robot_interface.endpoints.receiver_base import ReceiverBase
|
||||
from robot_interface.state import state
|
||||
|
||||
from robot_interface.core.config import settings
|
||||
|
||||
class MainReceiver(ReceiverBase):
|
||||
"""
|
||||
The main receiver endpoint, responsible for handling ping and negotiation requests.
|
||||
def __init__(self, zmq_context, port=5555):
|
||||
"""
|
||||
The main receiver endpoint, responsible for handling ping and negotiation requests.
|
||||
|
||||
:param zmq_context: The ZeroMQ context to use.
|
||||
:type zmq_context: zmq.Context
|
||||
:param zmq_context: The ZeroMQ context to use.
|
||||
:type zmq_context: zmq.Context
|
||||
|
||||
:param port: The port to use.
|
||||
:type port: int
|
||||
"""
|
||||
def __init__(self, zmq_context, port=settings.agent_settings.main_receiver_port):
|
||||
:param port: The port to use.
|
||||
:type port: int
|
||||
"""
|
||||
super(MainReceiver, self).__init__("main")
|
||||
self.create_socket(zmq_context, zmq.REP, port, bind=False)
|
||||
|
||||
@staticmethod
|
||||
def _handle_ping(message):
|
||||
"""
|
||||
Handle a ping request.
|
||||
|
||||
Returns the provided data in a standardized response dictionary.
|
||||
|
||||
:param message: The ping request message.
|
||||
:type message: dict
|
||||
|
||||
:return: A response dictionary containing the original data.
|
||||
:rtype: dict[str, str | list[dict]]
|
||||
"""
|
||||
"""A simple ping endpoint. Returns the provided data."""
|
||||
return {"endpoint": "ping", "data": message.get("data")}
|
||||
|
||||
@staticmethod
|
||||
def _handle_port_negotiation(message):
|
||||
"""
|
||||
Handle a port negotiation request.
|
||||
|
||||
Returns a list of all known endpoints and their descriptions.
|
||||
|
||||
:param message: The negotiation request message.
|
||||
:type message: dict
|
||||
|
||||
:return: A response dictionary with endpoint descriptions as data.
|
||||
:rtype: dict[str, list[dict]]
|
||||
"""
|
||||
endpoints = [socket.endpoint_description() for socket in state.sockets]
|
||||
|
||||
return {"endpoint": "negotiate/ports", "data": endpoints}
|
||||
@@ -54,13 +32,13 @@ class MainReceiver(ReceiverBase):
|
||||
@staticmethod
|
||||
def _handle_negotiation(message):
|
||||
"""
|
||||
Handle a negotiation request. Responds with ports that can be used to connect to the robot.
|
||||
Handle a negotiation request. Will respond with ports that can be used to connect to the robot.
|
||||
|
||||
:param message: The negotiation request message.
|
||||
:type message: dict
|
||||
|
||||
:return: A response dictionary with the negotiation result.
|
||||
:rtype: dict[str, str | list[dict]]
|
||||
:return: A response dictionary with a 'ports' key containing a list of ports and their function.
|
||||
:rtype: dict[str, list[dict]]
|
||||
"""
|
||||
# In the future, the sender could send information like the robot's IP address, etc.
|
||||
|
||||
@@ -70,17 +48,6 @@ class MainReceiver(ReceiverBase):
|
||||
return {"endpoint": "negotiate/error", "data": "The requested endpoint is not implemented."}
|
||||
|
||||
def handle_message(self, message):
|
||||
"""
|
||||
Main entry point for handling incoming messages.
|
||||
|
||||
Dispatches messages to the appropriate handler based on the endpoint.
|
||||
|
||||
:param message: The received message.
|
||||
:type message: dict
|
||||
|
||||
:return: A response dictionary based on the requested endpoint.
|
||||
:rtype: dict[str, str | list[dict]]
|
||||
"""
|
||||
if message["endpoint"] == "ping":
|
||||
return self._handle_ping(message)
|
||||
elif message["endpoint"].startswith("negotiate"):
|
||||
|
||||
@@ -4,7 +4,7 @@ from robot_interface.endpoints.socket_base import SocketBase
|
||||
|
||||
|
||||
class ReceiverBase(SocketBase, object):
|
||||
"""Base class for receivers associated with a ZeroMQ socket."""
|
||||
"""Associated with a ZeroMQ socket."""
|
||||
__metaclass__ = ABCMeta
|
||||
|
||||
@abstractmethod
|
||||
|
||||
@@ -4,27 +4,16 @@ import zmq
|
||||
|
||||
|
||||
class SocketBase(object):
|
||||
"""
|
||||
Base class for endpoints associated with a ZeroMQ socket.
|
||||
|
||||
:ivar identifier: The identifier of the endpoint.
|
||||
:vartype identifier: str
|
||||
|
||||
:ivar port: The port used by the socket, set by `create_socket`.
|
||||
:vartype port: int | None
|
||||
|
||||
:ivar socket: The ZeroMQ socket object, set by `create_socket`.
|
||||
:vartype socket: zmq.Socket | None
|
||||
|
||||
:ivar bound: Whether the socket is bound or connected, set by `create_socket`.
|
||||
:vartype bound: bool | None
|
||||
"""
|
||||
__metaclass__ = ABCMeta
|
||||
|
||||
name = None
|
||||
socket = None
|
||||
|
||||
def __init__(self, identifier):
|
||||
"""
|
||||
:param identifier: The identifier of the endpoint.
|
||||
:type identifier: str
|
||||
"""
|
||||
self.identifier = identifier
|
||||
self.port = None # Set later by `create_socket`
|
||||
self.socket = None # Set later by `create_socket`
|
||||
@@ -43,7 +32,8 @@ class SocketBase(object):
|
||||
:param port: The port to use.
|
||||
:type port: int
|
||||
|
||||
:param options: A list of tuples where the first element contains the option and the second the value.
|
||||
:param options: A list of options to be set on the socket. The list contains tuples where the first element contains the option
|
||||
and the second the value, for example (zmq.CONFLATE, 1).
|
||||
:type options: list[tuple[int, int]]
|
||||
|
||||
:param bind: Whether to bind the socket or connect to it.
|
||||
@@ -72,7 +62,7 @@ class SocketBase(object):
|
||||
Description of the endpoint. Used for negotiation.
|
||||
|
||||
:return: A dictionary with the following keys: id, port, bind. See API specification at:
|
||||
https://utrechtuniversity.youtrack.cloud/articles/N25B-A-14/RI-CB-Communication#negotiation
|
||||
https://utrechtuniversity.youtrack.cloud/articles/N25B-A-14/RI-CB-Communication#negotiation
|
||||
:rtype: dict
|
||||
"""
|
||||
return {
|
||||
|
||||
@@ -1,43 +1,32 @@
|
||||
import zmq
|
||||
import threading
|
||||
import qi
|
||||
import logging
|
||||
|
||||
from robot_interface.endpoints.socket_base import SocketBase
|
||||
from robot_interface.state import state
|
||||
from robot_interface.core.config import settings
|
||||
|
||||
|
||||
class VideoSender(SocketBase):
|
||||
"""
|
||||
Video sender endpoint, responsible for sending video frames.
|
||||
|
||||
:param zmq_context: The ZeroMQ context to use.
|
||||
:type zmq_context: zmq.Context
|
||||
|
||||
:param port: The port to use for sending video frames.
|
||||
:type port: int
|
||||
"""
|
||||
def __init__(self, zmq_context, port=settings.agent_settings.video_sender_port):
|
||||
def __init__(self, zmq_context, port=5556):
|
||||
super(VideoSender, self).__init__("video")
|
||||
self.create_socket(zmq_context, zmq.PUB, port, [(zmq.CONFLATE,1)])
|
||||
|
||||
def start_video_rcv(self):
|
||||
"""
|
||||
Prepares arguments for retrieving video images from Pepper and starts video loop on a separate thread.
|
||||
|
||||
Will not start if no qi session is available.
|
||||
"""
|
||||
if not state.qi_session:
|
||||
logging.info("No Qi session available. Not starting video loop.")
|
||||
return
|
||||
|
||||
video = state.qi_session.service("ALVideoDevice")
|
||||
video_settings = settings.video_config
|
||||
camera_index = video_settings.camera_index
|
||||
kQVGA = video_settings.resolution
|
||||
kRGB = video_settings.color_space
|
||||
FPS = video_settings.fps
|
||||
video_name = video_settings.stream_name
|
||||
vid_stream_name = video.subscribeCamera(video_name, camera_index, kQVGA, kRGB, FPS)
|
||||
|
||||
camera_index = 0
|
||||
kQVGA = 2
|
||||
kRGB = 11
|
||||
FPS = 15
|
||||
vid_stream_name = video.subscribeCamera("Pepper Video", camera_index, kQVGA, kRGB, FPS)
|
||||
thread = threading.Thread(target=self.video_rcv_loop, args=(video, vid_stream_name))
|
||||
thread.start()
|
||||
|
||||
@@ -49,12 +38,12 @@ class VideoSender(SocketBase):
|
||||
:type vid_service: Object (Qi service object)
|
||||
|
||||
:param vid_stream_name: The name of a camera subscription on the video service object vid_service
|
||||
:type vid_stream_name: str
|
||||
:type vid_stream_name: String
|
||||
"""
|
||||
while not state.exit_event.is_set():
|
||||
try:
|
||||
img = vid_service.getImageRemote(vid_stream_name)
|
||||
#Possibly limit images sent if queuing issues arise
|
||||
self.socket.send(img[settings.video_config.image_buffer])
|
||||
self.socket.send(img[6])
|
||||
except:
|
||||
logging.warn("Failed to retrieve video image from robot.")
|
||||
|
||||
@@ -10,7 +10,6 @@ from robot_interface.endpoints.actuation_receiver import ActuationReceiver
|
||||
from robot_interface.endpoints.main_receiver import MainReceiver
|
||||
from robot_interface.endpoints.video_sender import VideoSender
|
||||
from robot_interface.state import state
|
||||
from robot_interface.core.config import settings
|
||||
from robot_interface.utils.timeblock import TimeBlock
|
||||
|
||||
|
||||
@@ -44,9 +43,20 @@ def main_loop(context):
|
||||
|
||||
logging.debug("Starting main loop.")
|
||||
|
||||
import schedule
|
||||
test_speaking_message = {"data": "Hi, my name is Pepper, and this is quite a long message."}
|
||||
def test_speak():
|
||||
logging.debug("Testing speech.")
|
||||
actuation_receiver._handle_speech(test_speaking_message)
|
||||
|
||||
schedule.every(10).seconds.do(test_speak)
|
||||
|
||||
while True:
|
||||
if state.exit_event.is_set(): break
|
||||
socks = dict(poller.poll(settings.main_config.poll_timeout_ms))
|
||||
|
||||
schedule.run_pending()
|
||||
|
||||
socks = dict(poller.poll(100))
|
||||
|
||||
for receiver in receivers:
|
||||
if receiver.socket not in socks: continue
|
||||
@@ -57,17 +67,10 @@ def main_loop(context):
|
||||
continue
|
||||
|
||||
def overtime_callback(time_ms):
|
||||
"""
|
||||
A callback function executed by TimeBlock if the message handling
|
||||
exceeds the allowed time limit.
|
||||
|
||||
:param time_ms: The elapsed time, in milliseconds, that the block took.
|
||||
:type time_ms: float
|
||||
"""
|
||||
logging.warn("Endpoint \"%s\" took too long (%.2f ms) on the main thread.",
|
||||
message["endpoint"], time_ms)
|
||||
|
||||
with TimeBlock(overtime_callback, settings.main_config.max_handler_time_ms):
|
||||
with TimeBlock(overtime_callback, 50):
|
||||
response = receiver.handle_message(message)
|
||||
|
||||
if receiver.socket.getsockopt(zmq.TYPE) == zmq.REP:
|
||||
@@ -75,12 +78,6 @@ def main_loop(context):
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Initializes the ZeroMQ context and the application state.
|
||||
It executes the main event loop (`main_loop`) and ensures that both the
|
||||
application state and the ZeroMQ context are properly cleaned up (deinitialized/terminated)
|
||||
upon exit, including handling a KeyboardInterrupt.
|
||||
"""
|
||||
context = zmq.Context()
|
||||
|
||||
state.initialize()
|
||||
|
||||
@@ -12,31 +12,15 @@ class State(object):
|
||||
|
||||
This class is used to share state between threads. For example, when the program is quit, that all threads can
|
||||
detect this via the `exit_event` property being set.
|
||||
|
||||
:ivar is_initialized: Flag indicating whether the state setup (exit handlers, QI session) has completed.
|
||||
:vartype is_initialized: bool
|
||||
|
||||
:ivar exit_event: A thread event used to signal all threads that the program is shutting down.
|
||||
:vartype exit_event: threading.Event | None
|
||||
|
||||
:ivar sockets: A list of ZeroMQ socket wrappers (`SocketBase`) that need to be closed during deinitialization.
|
||||
:vartype sockets: List[SocketBase]
|
||||
|
||||
:ivar qi_session: The QI session object used for interaction with the robot/platform services.
|
||||
:vartype qi_session: None | qi.Session
|
||||
"""
|
||||
def __init__(self):
|
||||
self.is_initialized = False
|
||||
self.exit_event = None
|
||||
self.sockets = []
|
||||
self.qi_session = None
|
||||
self.sockets = [] # type: List[SocketBase]
|
||||
self.qi_session = None # type: None | ssl.SSLSession
|
||||
self.is_speaking = False # type: Boolean
|
||||
|
||||
def initialize(self):
|
||||
"""
|
||||
Sets up the application state. Creates the thread exit event, registers
|
||||
signal handlers (`SIGINT`, `SIGTERM`) for graceful shutdown, and
|
||||
establishes the QI session.
|
||||
"""
|
||||
if self.is_initialized:
|
||||
logging.warn("Already initialized")
|
||||
return
|
||||
@@ -53,9 +37,6 @@ class State(object):
|
||||
self.is_initialized = True
|
||||
|
||||
def deinitialize(self):
|
||||
"""
|
||||
Closes all sockets stored in the `sockets` list.
|
||||
"""
|
||||
if not self.is_initialized: return
|
||||
|
||||
for socket in self.sockets:
|
||||
@@ -64,24 +45,8 @@ class State(object):
|
||||
self.is_initialized = False
|
||||
|
||||
def __getattribute__(self, name):
|
||||
"""
|
||||
Custom attribute access method that enforces a check: the state must be
|
||||
fully initialized before any non-setup attributes (like `sockets` or `qi_session`)
|
||||
can be accessed.
|
||||
|
||||
:param name: The name of the attribute being accessed.
|
||||
:type name: str
|
||||
|
||||
:return: The value of the requested attribute.
|
||||
:rtype: Any
|
||||
"""
|
||||
if name in (
|
||||
"initialize",
|
||||
"deinitialize",
|
||||
"is_initialized",
|
||||
"__dict__",
|
||||
"__class__",
|
||||
"__doc__"):
|
||||
# Enforce that the state is initialized before accessing any property (aside from the basic ones)
|
||||
if name in ("initialize", "deinitialize", "is_initialized", "__dict__", "__class__"):
|
||||
return object.__getattribute__(self, name)
|
||||
|
||||
if not object.__getattribute__(self, "is_initialized"):
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from __future__ import unicode_literals # So that `print` can print Unicode characters in names
|
||||
import logging
|
||||
import sys
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -29,7 +28,7 @@ def choose_mic_interactive(audio):
|
||||
:type audio: pyaudio.PyAudio
|
||||
|
||||
:return: A dictionary from PyAudio containing information about the microphone to use, or None
|
||||
if there is no microphone.
|
||||
if there is no microphone.
|
||||
:rtype: dict | None
|
||||
"""
|
||||
microphones = list(get_microphones(audio))
|
||||
@@ -61,61 +60,10 @@ def choose_mic_default(audio):
|
||||
:type audio: pyaudio.PyAudio
|
||||
|
||||
:return: A dictionary from PyAudio containing information about the microphone to use, or None
|
||||
if there is no microphone.
|
||||
if there is no microphone.
|
||||
:rtype: dict | None
|
||||
"""
|
||||
try:
|
||||
return audio.get_default_input_device_info()
|
||||
except IOError:
|
||||
return None
|
||||
|
||||
|
||||
def choose_mic_arguments(audio):
|
||||
"""
|
||||
Get a microphone to use from command line arguments.
|
||||
|
||||
:param audio: An instance of PyAudio to use.
|
||||
:type audio: pyaudio.PyAudio
|
||||
|
||||
:return: A dictionary from PyAudio containing information about the microphone to use, or None
|
||||
if there is no microphone satisfied by the arguments.
|
||||
:rtype: dict | None
|
||||
"""
|
||||
microphone_name = None
|
||||
for i, arg in enumerate(sys.argv):
|
||||
if arg == "--microphone" and len(sys.argv) > i+1:
|
||||
microphone_name = sys.argv[i+1].strip()
|
||||
if arg.startswith("--microphone="):
|
||||
pre_fix_len = len("--microphone=")
|
||||
microphone_name = arg[pre_fix_len:].strip()
|
||||
|
||||
if not microphone_name: return None
|
||||
|
||||
available_mics = list(get_microphones(audio))
|
||||
for mic in available_mics:
|
||||
if mic["name"] == microphone_name:
|
||||
return mic
|
||||
|
||||
available_mic_names = [mic["name"] for mic in available_mics]
|
||||
logger.warning("Microphone \"{}\" not found. Choose one of {}"
|
||||
.format(microphone_name, available_mic_names))
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def choose_mic(audio):
|
||||
"""
|
||||
Get a microphone to use. Firstly, tries to see if there's an application argument specifying the
|
||||
microphone to use. If not, get the default microphone.
|
||||
|
||||
:param audio: An instance of PyAudio to use.
|
||||
:type audio: pyaudio.PyAudio
|
||||
|
||||
:return: A dictionary from PyAudio containing information about the microphone to use, or None
|
||||
if there is no microphone.
|
||||
:rtype: dict | None
|
||||
"""
|
||||
chosen_mic = choose_mic_arguments(audio)
|
||||
if chosen_mic: return chosen_mic
|
||||
|
||||
return choose_mic_default(audio)
|
||||
|
||||
@@ -8,12 +8,6 @@ except ImportError:
|
||||
|
||||
|
||||
def get_qi_session():
|
||||
"""
|
||||
Create and return a Qi session if available.
|
||||
|
||||
:return: The active Qi session or ``None`` if unavailable.
|
||||
:rtype: qi.Session | None
|
||||
"""
|
||||
if qi is None:
|
||||
logging.info("Unable to import qi. Running in stand-alone mode.")
|
||||
return None
|
||||
|
||||
@@ -5,54 +5,27 @@ class TimeBlock(object):
|
||||
"""
|
||||
A context manager that times the execution of the block it contains. If execution exceeds the
|
||||
limit, or if no limit is given, the callback will be called with the time that the block took.
|
||||
|
||||
:param callback: The callback function that is called when the block of code is over,
|
||||
unless the code block did not exceed the time limit.
|
||||
:type callback: Callable[[float], None]
|
||||
|
||||
:param limit_ms: The number of milliseconds the block of code is allowed to take. If it
|
||||
exceeds this time, or if it's None, the callback function will be called with the time the
|
||||
block took.
|
||||
:type limit_ms: int | None
|
||||
|
||||
:ivar limit_ms: The number of milliseconds the block of code is allowed to take.
|
||||
:vartype limit_ms: float | None
|
||||
|
||||
:ivar callback: The callback function that is called when the block of code is over.
|
||||
:vartype callback: Callable[[float], None]
|
||||
|
||||
ivar start: The start time of the block, set when entering the context.
|
||||
:vartype start: float | None
|
||||
"""
|
||||
def __init__(self, callback, limit_ms=None):
|
||||
"""
|
||||
:param callback: The callback function that is called when the block of code is over,
|
||||
unless the code block did not exceed the time limit.
|
||||
:type callback: Callable[[float], None]
|
||||
|
||||
:param limit_ms: The number of milliseconds the block of code is allowed to take. If it
|
||||
exceeds this time, or if it's None, the callback function will be called with the time the
|
||||
block took.
|
||||
:type limit_ms: int | None
|
||||
"""
|
||||
self.limit_ms = float(limit_ms) if limit_ms is not None else None
|
||||
self.callback = callback
|
||||
self.start = None
|
||||
|
||||
def __enter__(self):
|
||||
"""
|
||||
Enter the context manager and record the start time.
|
||||
|
||||
:return: Returns itself so timing information can be accessed if needed.
|
||||
:rtype: TimeBlock
|
||||
"""
|
||||
self.start = time.time()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
"""
|
||||
Exit the context manager, calculate the elapsed time, and call the callback
|
||||
if the time limit was exceeded or not provided.
|
||||
|
||||
:param exc_type: The exception type, or None if no exception occurred.
|
||||
:type exc_type: Type[BaseException] | None
|
||||
|
||||
:param exc_value: The exception instance, or None if no exception occurred.
|
||||
:type exc_value: BaseException | None
|
||||
|
||||
:param traceback: The traceback object, or None if no exception occurred.
|
||||
:type traceback: TracebackType | None
|
||||
"""
|
||||
elapsed = (time.time() - self.start) * 1000.0 # ms
|
||||
if self.limit_ms is None or elapsed > self.limit_ms:
|
||||
self.callback(elapsed)
|
||||
|
||||
@@ -1,15 +1,8 @@
|
||||
from __future__ import unicode_literals # So that we can format strings with Unicode characters
|
||||
import random
|
||||
import sys
|
||||
from StringIO import StringIO
|
||||
|
||||
from robot_interface.utils.microphone import (
|
||||
choose_mic_default,
|
||||
choose_mic_interactive,
|
||||
choose_mic_arguments,
|
||||
choose_mic,
|
||||
get_microphones,
|
||||
)
|
||||
from robot_interface.utils.microphone import choose_mic_default, choose_mic_interactive, get_microphones
|
||||
|
||||
|
||||
class MicrophoneUtils(object):
|
||||
@@ -17,12 +10,10 @@ class MicrophoneUtils(object):
|
||||
|
||||
def test_choose_mic_default(self, pyaudio_instance):
|
||||
"""
|
||||
Tests that the default microphone selection function returns a valid
|
||||
microphone dictionary containing all necessary keys with correct types and values.
|
||||
|
||||
The result must contain at least "index", as this is used to identify the microphone,
|
||||
and "name" for logging. It must have one or more channels (`maxInputChannels`),
|
||||
and a default sample rate of at least 16000 Hz.
|
||||
The result must contain at least "index", as this is used to identify the microphone.
|
||||
The "name" is used for logging, so it should also exist.
|
||||
It must have one or more channels.
|
||||
Lastly it must be capable of sending at least 16000 samples per second.
|
||||
"""
|
||||
result = choose_mic_default(pyaudio_instance)
|
||||
assert "index" in result
|
||||
@@ -41,13 +32,8 @@ class MicrophoneUtils(object):
|
||||
|
||||
def test_choose_mic_interactive_input_not_int(self, pyaudio_instance, mocker):
|
||||
"""
|
||||
Tests the robustness of the interactive selection when the user first enters
|
||||
a non-integer value, ensuring the system prompts again without error and accepts
|
||||
a valid integer on the second attempt.
|
||||
First mock an input that's not an integer, then a valid integer. There should be no errors.
|
||||
"""
|
||||
microphones = get_microphones(pyaudio_instance)
|
||||
target_microphone = next(microphones)
|
||||
|
||||
mock_input = mocker.patch("__builtin__.raw_input", side_effect=["not an integer", "0"])
|
||||
fake_out = StringIO()
|
||||
mocker.patch.object(sys, "stdout", fake_out)
|
||||
@@ -55,7 +41,7 @@ class MicrophoneUtils(object):
|
||||
result = choose_mic_interactive(pyaudio_instance)
|
||||
assert "index" in result
|
||||
assert isinstance(result["index"], (int, long))
|
||||
assert result["index"] == target_microphone["index"]
|
||||
assert result["index"] == 0
|
||||
|
||||
assert mock_input.called
|
||||
|
||||
@@ -63,12 +49,8 @@ class MicrophoneUtils(object):
|
||||
|
||||
def test_choose_mic_interactive_negative_index(self, pyaudio_instance, mocker):
|
||||
"""
|
||||
Tests that the interactive selection method prevents the user from entering
|
||||
a negative integer as a microphone index.
|
||||
Make sure that the interactive method does not allow negative integers as input.
|
||||
"""
|
||||
microphones = get_microphones(pyaudio_instance)
|
||||
target_microphone = next(microphones)
|
||||
|
||||
mock_input = mocker.patch("__builtin__.raw_input", side_effect=["-1", "0"])
|
||||
fake_out = StringIO()
|
||||
mocker.patch.object(sys, "stdout", fake_out)
|
||||
@@ -76,7 +58,7 @@ class MicrophoneUtils(object):
|
||||
result = choose_mic_interactive(pyaudio_instance)
|
||||
assert "index" in result
|
||||
assert isinstance(result["index"], (int, long))
|
||||
assert result["index"] == target_microphone["index"]
|
||||
assert result["index"] == 0
|
||||
|
||||
assert mock_input.called
|
||||
|
||||
@@ -84,8 +66,7 @@ class MicrophoneUtils(object):
|
||||
|
||||
def test_choose_mic_interactive_index_too_high(self, pyaudio_instance, mocker):
|
||||
"""
|
||||
Tests that the interactive selection method prevents the user from entering
|
||||
an index that exceeds the total number of available microphones.
|
||||
Make sure that the interactive method does not allow indices higher than the highest mic index.
|
||||
"""
|
||||
real_count = len(list(get_microphones(pyaudio_instance)))
|
||||
mock_input = mocker.patch("__builtin__.raw_input", side_effect=[str(real_count), "0"])
|
||||
@@ -102,9 +83,7 @@ class MicrophoneUtils(object):
|
||||
|
||||
def test_choose_mic_interactive_random_index(self, pyaudio_instance, mocker):
|
||||
"""
|
||||
Tests the core interactive functionality by simulating the selection of a
|
||||
random valid microphone index and verifying that the correct microphone
|
||||
information is returned.
|
||||
Get a random index from the list of available mics, make sure it's correct.
|
||||
"""
|
||||
microphones = list(get_microphones(pyaudio_instance))
|
||||
random_index = random.randrange(len(microphones))
|
||||
@@ -114,77 +93,3 @@ class MicrophoneUtils(object):
|
||||
assert "index" in result
|
||||
assert isinstance(result["index"], (int, long))
|
||||
assert result["index"] == microphones[random_index]["index"]
|
||||
|
||||
def test_choose_mic_no_arguments(self, pyaudio_instance, mocker):
|
||||
"""
|
||||
Tests `choose_mic_arguments` when no command-line arguments are provided,
|
||||
"""
|
||||
mocker.patch.object(sys, "argv", [])
|
||||
|
||||
result = choose_mic_arguments(pyaudio_instance)
|
||||
|
||||
assert result is None
|
||||
|
||||
def test_choose_mic_arguments(self, pyaudio_instance, mocker):
|
||||
"""
|
||||
Tests `choose_mic_arguments` when the microphone name is passed as a separate
|
||||
argument.
|
||||
"""
|
||||
for mic in get_microphones(pyaudio_instance):
|
||||
mocker.patch.object(sys, "argv", ["--microphone", mic["name"]])
|
||||
|
||||
result = choose_mic_arguments(pyaudio_instance)
|
||||
|
||||
assert result is not None
|
||||
assert result == mic
|
||||
|
||||
def test_choose_mic_arguments_eq(self, pyaudio_instance, mocker):
|
||||
"""
|
||||
Tests `choose_mic_arguments` when the microphone name is passed using an
|
||||
equals sign (`--microphone=NAME`).
|
||||
"""
|
||||
for mic in get_microphones(pyaudio_instance):
|
||||
mocker.patch.object(sys, "argv", ["--microphone={}".format(mic["name"])])
|
||||
|
||||
result = choose_mic_arguments(pyaudio_instance)
|
||||
|
||||
assert result is not None
|
||||
assert result == mic
|
||||
|
||||
def test_choose_mic_arguments_not_exist(self, pyaudio_instance, mocker):
|
||||
"""
|
||||
Tests `choose_mic_arguments` when a non-existent microphone name is passed
|
||||
via command-line arguments, expecting the function to return None.
|
||||
"""
|
||||
mocker.patch.object(sys, "argv", ["--microphone", "Surely this microphone doesn't exist"])
|
||||
|
||||
result = choose_mic_arguments(pyaudio_instance)
|
||||
|
||||
assert result is None
|
||||
|
||||
def test_choose_mic_with_argument(self, pyaudio_instance, mocker):
|
||||
"""
|
||||
Tests `choose_mic` function when a valid microphone is
|
||||
specified via command-line arguments.
|
||||
"""
|
||||
mic = next(get_microphones(pyaudio_instance))
|
||||
mocker.patch.object(sys, "argv", ["--microphone", mic["name"]])
|
||||
|
||||
result = choose_mic(pyaudio_instance)
|
||||
|
||||
assert result is not None
|
||||
assert result == mic
|
||||
|
||||
def test_choose_mic_no_argument(self, pyaudio_instance, mocker):
|
||||
"""
|
||||
Tests `choose_mic` function when no command-line arguments
|
||||
are provided, verifying that the function falls back correctly to the
|
||||
system's default microphone selection.
|
||||
"""
|
||||
default_mic = choose_mic_default(pyaudio_instance)
|
||||
mocker.patch.object(sys, "argv", [])
|
||||
|
||||
result = choose_mic(pyaudio_instance)
|
||||
|
||||
assert result is not None
|
||||
assert result == default_mic
|
||||
|
||||
@@ -7,17 +7,6 @@ from common.microphone_utils import MicrophoneUtils
|
||||
|
||||
@pytest.fixture
|
||||
def pyaudio_instance():
|
||||
"""
|
||||
A pytest fixture that provides an initialized PyAudio instance for tests
|
||||
requiring microphone access.
|
||||
|
||||
It first initializes PyAudio. If a default input device (microphone) is not
|
||||
found, the test is skipped to avoid failures in environments
|
||||
without a mic.
|
||||
|
||||
:return: An initialized PyAudio instance.
|
||||
:rtype: pyaudio.PyAudio
|
||||
"""
|
||||
audio = pyaudio.PyAudio()
|
||||
try:
|
||||
audio.get_default_input_device_info()
|
||||
|
||||
@@ -9,21 +9,11 @@ from robot_interface.endpoints.actuation_receiver import ActuationReceiver
|
||||
|
||||
@pytest.fixture
|
||||
def zmq_context():
|
||||
"""
|
||||
A pytest fixture that creates and yields a ZMQ context.
|
||||
|
||||
:return: An initialized ZeroMQ context.
|
||||
:rtype: zmq.Context
|
||||
"""
|
||||
context = zmq.Context()
|
||||
yield context
|
||||
|
||||
|
||||
def test_handle_unimplemented_endpoint(zmq_context):
|
||||
"""
|
||||
Tests that the ``ActuationReceiver.handle_message`` method can
|
||||
handle an unknown or unimplemented endpoint without raising an error.
|
||||
"""
|
||||
receiver = ActuationReceiver(zmq_context)
|
||||
# Should not error
|
||||
receiver.handle_message({
|
||||
@@ -33,10 +23,6 @@ def test_handle_unimplemented_endpoint(zmq_context):
|
||||
|
||||
|
||||
def test_speech_message_no_data(zmq_context, mocker):
|
||||
"""
|
||||
Tests that the message handler logs a warning when a speech actuation
|
||||
request (`actuate/speech`) is received but contains empty string data.
|
||||
"""
|
||||
mock_warn = mocker.patch("logging.warn")
|
||||
|
||||
receiver = ActuationReceiver(zmq_context)
|
||||
@@ -46,10 +32,6 @@ def test_speech_message_no_data(zmq_context, mocker):
|
||||
|
||||
|
||||
def test_speech_message_invalid_data(zmq_context, mocker):
|
||||
"""
|
||||
Tests that the message handler logs a warning when a speech actuation
|
||||
request (`actuate/speech`) is received with data that is not a string (e.g., a boolean).
|
||||
"""
|
||||
mock_warn = mocker.patch("logging.warn")
|
||||
|
||||
receiver = ActuationReceiver(zmq_context)
|
||||
@@ -59,10 +41,6 @@ def test_speech_message_invalid_data(zmq_context, mocker):
|
||||
|
||||
|
||||
def test_speech_no_qi(zmq_context, mocker):
|
||||
"""
|
||||
Tests the actuation receiver's behavior when processing a speech request
|
||||
but the global state does not have an active QI session.
|
||||
"""
|
||||
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
|
||||
|
||||
mock_qi_session = mock.PropertyMock(return_value=None)
|
||||
@@ -75,10 +53,6 @@ def test_speech_no_qi(zmq_context, mocker):
|
||||
|
||||
|
||||
def test_speech(zmq_context, mocker):
|
||||
"""
|
||||
Tests the core speech actuation functionality by mocking the QI TextToSpeech
|
||||
service and verifying that it is called correctly.
|
||||
"""
|
||||
mock_state = mocker.patch("robot_interface.endpoints.actuation_receiver.state")
|
||||
|
||||
mock_qi = mock.Mock()
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# coding=utf-8
|
||||
import os
|
||||
import time
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
@@ -10,22 +11,13 @@ from robot_interface.endpoints.audio_sender import AudioSender
|
||||
|
||||
@pytest.fixture
|
||||
def zmq_context():
|
||||
"""
|
||||
A pytest fixture that creates and yields a ZMQ context.
|
||||
|
||||
:return: An initialized ZeroMQ context.
|
||||
:rtype: zmq.Context
|
||||
"""
|
||||
context = zmq.Context()
|
||||
yield context
|
||||
|
||||
|
||||
def test_no_microphone(zmq_context, mocker):
|
||||
"""
|
||||
Tests the scenario where no valid microphone can be chosen for recording.
|
||||
"""
|
||||
mock_info_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger.info")
|
||||
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
|
||||
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default")
|
||||
mock_choose_mic.return_value = None
|
||||
|
||||
sender = AudioSender(zmq_context)
|
||||
@@ -39,12 +31,8 @@ def test_no_microphone(zmq_context, mocker):
|
||||
|
||||
|
||||
def test_unicode_mic_name(zmq_context, mocker):
|
||||
"""
|
||||
Tests the robustness of the `AudioSender` when handling microphone names
|
||||
that contain Unicode characters.
|
||||
"""
|
||||
mocker.patch("robot_interface.endpoints.audio_sender.threading")
|
||||
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
|
||||
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default")
|
||||
mock_choose_mic.return_value = {"name": u"• Some Unicode name"}
|
||||
|
||||
sender = AudioSender(zmq_context)
|
||||
@@ -59,17 +47,11 @@ def test_unicode_mic_name(zmq_context, mocker):
|
||||
|
||||
|
||||
def _fake_read(num_frames):
|
||||
"""
|
||||
Helper function to simulate reading raw audio data from a microphone stream.
|
||||
"""
|
||||
return os.urandom(num_frames * 4)
|
||||
|
||||
|
||||
def test_sending_audio(mocker):
|
||||
"""
|
||||
Tests the successful sending of audio data over a ZeroMQ socket.
|
||||
"""
|
||||
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
|
||||
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default")
|
||||
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
|
||||
|
||||
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
|
||||
@@ -94,17 +76,11 @@ def test_sending_audio(mocker):
|
||||
|
||||
|
||||
def _fake_read_error(num_frames):
|
||||
"""
|
||||
Helper function to simulate an I/O error during microphone stream reading.
|
||||
"""
|
||||
raise IOError()
|
||||
|
||||
|
||||
def test_break_microphone(mocker):
|
||||
"""
|
||||
Tests the error handling when the microphone stream breaks (raises an IOError).
|
||||
"""
|
||||
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
|
||||
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic_default")
|
||||
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
|
||||
|
||||
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
|
||||
@@ -126,22 +102,3 @@ def test_break_microphone(mocker):
|
||||
sender.wait_until_done()
|
||||
|
||||
send_socket.assert_not_called()
|
||||
|
||||
|
||||
def test_pyaudio_init_failure(mocker, zmq_context):
|
||||
"""
|
||||
Tests the behavior when PyAudio initialization fails (raises an IOError).
|
||||
"""
|
||||
# Prevent binding the ZMQ socket
|
||||
mocker.patch("robot_interface.endpoints.audio_sender.AudioSender.create_socket")
|
||||
|
||||
# Simulate PyAudio() failing
|
||||
mocker.patch(
|
||||
"robot_interface.endpoints.audio_sender.pyaudio.PyAudio",
|
||||
side_effect=IOError("boom")
|
||||
)
|
||||
|
||||
sender = AudioSender(zmq_context)
|
||||
|
||||
assert sender.audio is None
|
||||
assert sender.microphone is None
|
||||
|
||||
@@ -1,222 +0,0 @@
|
||||
import pytest
|
||||
import threading
|
||||
import zmq
|
||||
|
||||
import robot_interface.main as main_mod
|
||||
from robot_interface.state import state
|
||||
|
||||
|
||||
class FakeSocket:
|
||||
"""Mock ZMQ socket for testing."""
|
||||
def __init__(self, socket_type, messages=None):
|
||||
self.socket_type = socket_type
|
||||
self.messages = messages or []
|
||||
self.sent = []
|
||||
self.closed = False
|
||||
|
||||
def recv_json(self):
|
||||
if not self.messages:
|
||||
raise RuntimeError("No more messages")
|
||||
return self.messages.pop(0)
|
||||
|
||||
def send_json(self, msg):
|
||||
self.sent.append(msg)
|
||||
|
||||
def getsockopt(self, opt):
|
||||
if opt == zmq.TYPE:
|
||||
return self.socket_type
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
|
||||
class FakeReceiver:
|
||||
"""Base class for main/actuation receivers."""
|
||||
def __init__(self, socket):
|
||||
self.socket = socket
|
||||
self._called = []
|
||||
|
||||
def handle_message(self, msg):
|
||||
self._called.append(msg)
|
||||
return {"endpoint": "pong", "data": "ok"}
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
class DummySender:
|
||||
"""Mock sender to test start methods."""
|
||||
def __init__(self):
|
||||
self.called = False
|
||||
|
||||
def start_video_rcv(self):
|
||||
self.called = True
|
||||
|
||||
def start(self):
|
||||
self.called = True
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_sockets():
|
||||
"""Create default fake main and actuation sockets."""
|
||||
main_sock = FakeSocket(zmq.REP)
|
||||
act_sock = FakeSocket(zmq.SUB)
|
||||
return main_sock, act_sock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_poll(monkeypatch):
|
||||
"""Patch zmq.Poller to simulate a single polling cycle based on socket messages."""
|
||||
class FakePoller:
|
||||
def __init__(self):
|
||||
self.registered = {}
|
||||
self.used = False
|
||||
|
||||
def register(self, socket, flags):
|
||||
self.registered[socket] = flags
|
||||
|
||||
def poll(self, timeout):
|
||||
# Only return sockets that still have messages
|
||||
active_socks = {
|
||||
s: flags
|
||||
for s, flags
|
||||
in self.registered.items()
|
||||
if getattr(s, "messages", [])
|
||||
}
|
||||
if active_socks:
|
||||
return active_socks
|
||||
# No more messages, exit loop
|
||||
state.exit_event.set()
|
||||
return {}
|
||||
|
||||
poller_instance = FakePoller()
|
||||
monkeypatch.setattr(main_mod.zmq, "Poller", lambda: poller_instance)
|
||||
return poller_instance
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patched_main_components(monkeypatch, fake_sockets, fake_poll):
|
||||
"""
|
||||
Fixture to patch main receivers and senders with fakes.
|
||||
Returns the fake instances for inspection in tests.
|
||||
"""
|
||||
main_sock, act_sock = fake_sockets
|
||||
fake_main = FakeReceiver(main_sock)
|
||||
fake_act = FakeReceiver(act_sock)
|
||||
video_sender = DummySender()
|
||||
audio_sender = DummySender()
|
||||
|
||||
monkeypatch.setattr(main_mod, "MainReceiver", lambda ctx: fake_main)
|
||||
monkeypatch.setattr(main_mod, "ActuationReceiver", lambda ctx: fake_act)
|
||||
monkeypatch.setattr(main_mod, "VideoSender", lambda ctx: video_sender)
|
||||
monkeypatch.setattr(main_mod, "AudioSender", lambda ctx: audio_sender)
|
||||
|
||||
# Register sockets for the fake poller
|
||||
fake_poll.registered = {main_sock: zmq.POLLIN, act_sock: zmq.POLLIN}
|
||||
|
||||
return fake_main, fake_act, video_sender, audio_sender
|
||||
|
||||
|
||||
def test_main_loop_rep_response(patched_main_components):
|
||||
"""REP socket returns proper response and handlers are called."""
|
||||
state.initialize()
|
||||
fake_main, fake_act, video_sender, audio_sender = patched_main_components
|
||||
|
||||
fake_main.socket.messages = [{"endpoint": "ping", "data": "x"}]
|
||||
fake_act.socket.messages = [{"endpoint": "actuate/speech", "data": "hello"}]
|
||||
|
||||
main_mod.main_loop(object())
|
||||
|
||||
assert fake_main.socket.sent == [{"endpoint": "pong", "data": "ok"}]
|
||||
assert fake_main._called
|
||||
assert fake_act._called
|
||||
assert video_sender.called
|
||||
assert audio_sender.called
|
||||
state.deinitialize()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"messages",
|
||||
[
|
||||
[{"no_endpoint": True}], # Invalid dict
|
||||
[["not", "a", "dict"]] # Non-dict message
|
||||
]
|
||||
)
|
||||
def test_main_loop_invalid_or_non_dict_message(patched_main_components, messages):
|
||||
"""Invalid or non-dict messages are ignored."""
|
||||
state.initialize()
|
||||
fake_main, _, _, _ = patched_main_components
|
||||
|
||||
fake_main.socket.messages = messages
|
||||
main_mod.main_loop(object())
|
||||
assert fake_main.socket.sent == []
|
||||
state.deinitialize()
|
||||
|
||||
|
||||
def test_main_loop_handler_returns_none(patched_main_components, monkeypatch):
|
||||
"""Handler returning None still triggers send_json(None)."""
|
||||
state.initialize()
|
||||
fake_main, _, _, _ = patched_main_components
|
||||
|
||||
class NoneHandler(FakeReceiver):
|
||||
def handle_message(self, msg):
|
||||
self._called.append(msg)
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(main_mod, "MainReceiver", lambda ctx: NoneHandler(fake_main.socket))
|
||||
fake_main.socket.messages = [{"endpoint": "some", "data": None}]
|
||||
|
||||
main_mod.main_loop(object())
|
||||
assert fake_main.socket.sent == [None]
|
||||
state.deinitialize()
|
||||
|
||||
|
||||
def test_main_loop_overtime_callback(patched_main_components, monkeypatch):
|
||||
"""TimeBlock callback is triggered if handler takes too long."""
|
||||
state.initialize()
|
||||
fake_main, _, _, _ = patched_main_components
|
||||
fake_main.socket.messages = [{"endpoint": "ping", "data": "x"}]
|
||||
|
||||
class FakeTimeBlock:
|
||||
def __init__(self, callback, limit_ms):
|
||||
self.callback = callback
|
||||
def __enter__(self):
|
||||
return self
|
||||
def __exit__(self, *a):
|
||||
self.callback(999.0)
|
||||
|
||||
monkeypatch.setattr(main_mod, "TimeBlock", FakeTimeBlock)
|
||||
main_mod.main_loop(object())
|
||||
assert fake_main.socket.sent == [{"endpoint": "pong", "data": "ok"}]
|
||||
state.deinitialize()
|
||||
|
||||
|
||||
def test_main_keyboard_interrupt(monkeypatch):
|
||||
"""main() handles KeyboardInterrupt and cleans up."""
|
||||
called = {"deinitialized": False, "term_called": False}
|
||||
|
||||
class FakeContext:
|
||||
def term(self): called["term_called"] = True
|
||||
|
||||
monkeypatch.setattr(main_mod.zmq, "Context", lambda: FakeContext())
|
||||
|
||||
def raise_keyboard_interrupt(*_):
|
||||
raise KeyboardInterrupt()
|
||||
monkeypatch.setattr(main_mod, "main_loop", raise_keyboard_interrupt)
|
||||
|
||||
def fake_initialize():
|
||||
state.is_initialized = True
|
||||
state.exit_event = threading.Event()
|
||||
def fake_deinitialize():
|
||||
called["deinitialized"] = True
|
||||
state.is_initialized = False
|
||||
|
||||
monkeypatch.setattr(main_mod.state, "initialize", fake_initialize)
|
||||
monkeypatch.setattr(main_mod.state, "deinitialize", fake_deinitialize)
|
||||
|
||||
main_mod.main()
|
||||
assert called["term_called"] is True
|
||||
assert called["deinitialized"] is True
|
||||
@@ -7,20 +7,11 @@ from robot_interface.endpoints.main_receiver import MainReceiver
|
||||
|
||||
@pytest.fixture
|
||||
def zmq_context():
|
||||
"""
|
||||
A pytest fixture that creates and yields a ZMQ context.
|
||||
|
||||
:return: An initialized ZeroMQ context.
|
||||
:rtype: zmq.Context
|
||||
"""
|
||||
context = zmq.Context()
|
||||
yield context
|
||||
|
||||
|
||||
def test_handle_ping(zmq_context):
|
||||
"""
|
||||
Tests the receiver's ability to handle the "ping" endpoint with data.
|
||||
"""
|
||||
receiver = MainReceiver(zmq_context)
|
||||
response = receiver.handle_message({"endpoint": "ping", "data": "pong"})
|
||||
|
||||
@@ -31,10 +22,6 @@ def test_handle_ping(zmq_context):
|
||||
|
||||
|
||||
def test_handle_ping_none(zmq_context):
|
||||
"""
|
||||
Tests the receiver's ability to handle the ping endpoint when the
|
||||
data field is explicitly set to None.
|
||||
"""
|
||||
receiver = MainReceiver(zmq_context)
|
||||
response = receiver.handle_message({"endpoint": "ping", "data": None})
|
||||
|
||||
@@ -46,9 +33,6 @@ def test_handle_ping_none(zmq_context):
|
||||
|
||||
@mock.patch("robot_interface.endpoints.main_receiver.state")
|
||||
def test_handle_negotiate_ports(mock_state, zmq_context):
|
||||
"""
|
||||
Tests the handling of the "negotiate/ports" endpoint.
|
||||
"""
|
||||
receiver = MainReceiver(zmq_context)
|
||||
mock_state.sockets = [receiver]
|
||||
|
||||
@@ -70,10 +54,6 @@ def test_handle_negotiate_ports(mock_state, zmq_context):
|
||||
|
||||
|
||||
def test_handle_unimplemented_endpoint(zmq_context):
|
||||
"""
|
||||
Tests that the receiver correctly handles a request to a completely
|
||||
unknown or non-existent endpoint.
|
||||
"""
|
||||
receiver = MainReceiver(zmq_context)
|
||||
response = receiver.handle_message({
|
||||
"endpoint": "some_endpoint_that_definitely_does_not_exist",
|
||||
@@ -87,13 +67,6 @@ def test_handle_unimplemented_endpoint(zmq_context):
|
||||
|
||||
|
||||
def test_handle_unimplemented_negotiation_endpoint(zmq_context):
|
||||
"""
|
||||
Tests handling a request to an unknown sub-endpoint within a known
|
||||
group
|
||||
|
||||
The expected behavior is to return a specific "negotiate/error" response
|
||||
with a descriptive error string.
|
||||
"""
|
||||
receiver = MainReceiver(zmq_context)
|
||||
response = receiver.handle_message({
|
||||
"endpoint": "negotiate/but_some_subpath_that_definitely_does_not_exist",
|
||||
|
||||
@@ -7,16 +7,6 @@ from robot_interface.utils.microphone import choose_mic_default, choose_mic_inte
|
||||
|
||||
|
||||
class MockPyAudio:
|
||||
"""
|
||||
A mock implementation of the PyAudio library class, designed for testing
|
||||
microphone utility functions without requiring actual audio hardware.
|
||||
|
||||
It provides fake devices, including one input microphone, and implements
|
||||
the core PyAudio methods required for device enumeration.
|
||||
|
||||
:ivar devices: A list of dictionaries representing mock audio devices.
|
||||
:vartype devices: List[Dict[str, Any]]
|
||||
"""
|
||||
def __init__(self):
|
||||
# You can predefine fake device info here
|
||||
self.devices = [
|
||||
@@ -47,36 +37,18 @@ class MockPyAudio:
|
||||
]
|
||||
|
||||
def get_device_count(self):
|
||||
"""
|
||||
Returns the number of available mock devices.
|
||||
|
||||
:return: The total number of devices in the mock list.
|
||||
:rtype: int
|
||||
"""
|
||||
"""Return the number of available mock devices."""
|
||||
return len(self.devices)
|
||||
|
||||
def get_device_info_by_index(self, index):
|
||||
"""
|
||||
Returns information for a given mock device index.
|
||||
|
||||
:param index: The index of the device to retrieve.
|
||||
:type index: int
|
||||
|
||||
:return: A dictionary containing device information.
|
||||
:rtype: Dict[str, Any]
|
||||
"""
|
||||
"""Return information for a given mock device index."""
|
||||
if 0 <= index < len(self.devices):
|
||||
return self.devices[index]
|
||||
else:
|
||||
raise IOError("Invalid device index: {}".format(index))
|
||||
|
||||
def get_default_input_device_info(self):
|
||||
"""
|
||||
Returns information for the default mock input device.
|
||||
|
||||
:return: A dictionary containing the default input device information.
|
||||
:rtype: Dict[str, Any]
|
||||
"""
|
||||
"""Return info for a default mock input device."""
|
||||
for device in self.devices:
|
||||
if device.get("maxInputChannels", 0) > 0:
|
||||
return device
|
||||
@@ -85,32 +57,16 @@ class MockPyAudio:
|
||||
|
||||
@pytest.fixture
|
||||
def pyaudio_instance():
|
||||
"""
|
||||
A pytest fixture that returns an instance of the `MockPyAudio` class.
|
||||
|
||||
:return: An initialized instance of the mock PyAudio class.
|
||||
:rtype: MockPyAudio
|
||||
"""
|
||||
return MockPyAudio()
|
||||
|
||||
|
||||
def _raise_io_error():
|
||||
"""
|
||||
Helper function used to mock PyAudio methods that are expected to fail
|
||||
when no device is available.
|
||||
"""
|
||||
raise IOError()
|
||||
|
||||
|
||||
class TestAudioUnit(MicrophoneUtils):
|
||||
"""
|
||||
Runs the shared microphone behavior tests defined in `MicrophoneUtils` using
|
||||
the mock PyAudio implementation.
|
||||
"""
|
||||
"""Run shared audio behavior tests with the mock implementation."""
|
||||
def test_choose_mic_default_no_mic(self):
|
||||
"""
|
||||
Tests `choose_mic_default` when no microphones are available.
|
||||
"""
|
||||
mock_pyaudio = mock.Mock()
|
||||
mock_pyaudio.get_device_count = mock.Mock(return_value=0L)
|
||||
mock_pyaudio.get_default_input_device_info = _raise_io_error
|
||||
@@ -120,9 +76,6 @@ class TestAudioUnit(MicrophoneUtils):
|
||||
assert result is None
|
||||
|
||||
def test_choose_mic_interactive_no_mic(self):
|
||||
"""
|
||||
Tests `choose_mic_interactive` when no microphones are available.
|
||||
"""
|
||||
mock_pyaudio = mock.Mock()
|
||||
mock_pyaudio.get_device_count = mock.Mock(return_value=0L)
|
||||
mock_pyaudio.get_default_input_device_info = _raise_io_error
|
||||
|
||||
@@ -1,90 +0,0 @@
|
||||
import sys
|
||||
|
||||
# Import module under test
|
||||
import robot_interface.utils.qi_utils as qi_utils
|
||||
|
||||
|
||||
def reload_qi_utils_with(qi_module):
|
||||
"""
|
||||
Helper: reload qi_utils after injecting a fake qi module.
|
||||
Python 2 uses built-in reload().
|
||||
Just changing sys.modules[qi] won't affect the already imported module.
|
||||
"""
|
||||
if qi_module is None:
|
||||
if "qi" in sys.modules:
|
||||
del sys.modules["qi"]
|
||||
else:
|
||||
sys.modules["qi"] = qi_module
|
||||
|
||||
# Python 2 reload
|
||||
global qi_utils
|
||||
qi_utils = reload(qi_utils)
|
||||
|
||||
|
||||
def test_get_qi_session_no_qi_module():
|
||||
"""
|
||||
Tests the 'qi is None' path.
|
||||
"""
|
||||
reload_qi_utils_with(None)
|
||||
|
||||
session = qi_utils.get_qi_session()
|
||||
assert session is None
|
||||
|
||||
|
||||
def test_get_qi_session_no_qi_url_argument(monkeypatch):
|
||||
"""
|
||||
Tests the '--qi-url not in sys.argv' path.
|
||||
"""
|
||||
class FakeQi:
|
||||
pass
|
||||
|
||||
reload_qi_utils_with(FakeQi())
|
||||
|
||||
monkeypatch.setattr(sys, "argv", ["pytest"])
|
||||
|
||||
session = qi_utils.get_qi_session()
|
||||
assert session is None
|
||||
|
||||
|
||||
def test_get_qi_session_runtime_error(monkeypatch):
|
||||
"""
|
||||
Tests the 'exept RuntineError' path.
|
||||
"""
|
||||
class FakeApp:
|
||||
def start(self):
|
||||
raise RuntimeError("boom")
|
||||
|
||||
class FakeQi:
|
||||
Application = lambda self=None: FakeApp()
|
||||
|
||||
reload_qi_utils_with(FakeQi())
|
||||
|
||||
monkeypatch.setattr(sys, "argv", ["pytest", "--qi-url", "tcp://localhost"])
|
||||
|
||||
session = qi_utils.get_qi_session()
|
||||
assert session is None
|
||||
|
||||
|
||||
def test_get_qi_session_success(monkeypatch):
|
||||
"""
|
||||
Tests a valid path.
|
||||
"""
|
||||
class FakeSession:
|
||||
pass
|
||||
|
||||
class FakeApp:
|
||||
def __init__(self):
|
||||
self.session = FakeSession()
|
||||
|
||||
def start(self):
|
||||
return True
|
||||
|
||||
class FakeQi:
|
||||
Application = lambda self=None: FakeApp()
|
||||
|
||||
reload_qi_utils_with(FakeQi())
|
||||
|
||||
monkeypatch.setattr(sys, "argv", ["pytest", "--qi-url", "tcp://localhost"])
|
||||
|
||||
session = qi_utils.get_qi_session()
|
||||
assert isinstance(session, FakeSession)
|
||||
@@ -1,19 +0,0 @@
|
||||
import pytest
|
||||
from robot_interface.endpoints.receiver_base import ReceiverBase
|
||||
|
||||
|
||||
def test_receiver_base_not_implemented(monkeypatch):
|
||||
"""
|
||||
Ensure that the base ReceiverBase raises NotImplementedError when
|
||||
handle_message is called on a subclass that does not implement it.
|
||||
"""
|
||||
# Patch the __abstractmethods__ to allow instantiation
|
||||
monkeypatch.setattr(ReceiverBase, "__abstractmethods__", frozenset())
|
||||
|
||||
class DummyReceiver(ReceiverBase):
|
||||
pass
|
||||
|
||||
dummy = DummyReceiver("dummy") # Can now instantiate
|
||||
|
||||
with pytest.raises(NotImplementedError):
|
||||
dummy.handle_message({"endpoint": "dummy", "data": None})
|
||||
@@ -1,55 +0,0 @@
|
||||
import mock
|
||||
import zmq
|
||||
from robot_interface.endpoints.socket_base import SocketBase
|
||||
|
||||
|
||||
def test_close_covers_both_branches():
|
||||
"""
|
||||
Exercise both possible paths inside SocketBase.close():
|
||||
- when no socket exists (should just return),
|
||||
- when a socket object is present (its close() method should be called).
|
||||
"""
|
||||
sb = SocketBase("x")
|
||||
|
||||
# First check the case where socket is None. Nothing should happen here.
|
||||
sb.close()
|
||||
|
||||
# Now simulate a real socket so the close() call is triggered.
|
||||
fake_socket = mock.Mock()
|
||||
sb.socket = fake_socket
|
||||
sb.close()
|
||||
|
||||
fake_socket.close.assert_called_once()
|
||||
|
||||
|
||||
def test_create_socket_and_endpoint_description_full_coverage():
|
||||
"""
|
||||
Test the less-commonly used branch of create_socket() where bind=False.
|
||||
This covers:
|
||||
- the loop that sets socket options,
|
||||
- the connect() path,
|
||||
- the logic in endpoint_description() that inverts self.bound.
|
||||
"""
|
||||
fake_context = mock.Mock()
|
||||
fake_socket = mock.Mock()
|
||||
|
||||
# The context should hand back our fake socket object.
|
||||
fake_context.socket.return_value = fake_socket
|
||||
|
||||
sb = SocketBase("id")
|
||||
|
||||
# Calling create_socket with bind=False forces the connect() code path.
|
||||
sb.create_socket(
|
||||
zmq_context=fake_context,
|
||||
socket_type=zmq.SUB,
|
||||
port=9999,
|
||||
options=[(zmq.CONFLATE, 1)], # one option is enough to hit the loop
|
||||
bind=False,
|
||||
)
|
||||
|
||||
fake_socket.setsockopt.assert_called_once_with(zmq.CONFLATE, 1)
|
||||
fake_socket.connect.assert_called_once_with("tcp://localhost:9999")
|
||||
|
||||
# Check that endpoint_description reflects bound=False -> "bind": True
|
||||
desc = sb.endpoint_description()
|
||||
assert desc == {"id": "id", "port": 9999, "bind": True}
|
||||
@@ -1,108 +0,0 @@
|
||||
import threading
|
||||
import signal
|
||||
import pytest
|
||||
import mock
|
||||
|
||||
from robot_interface.state import State
|
||||
|
||||
|
||||
def test_initialize_does_not_reinitialize():
|
||||
"""
|
||||
Check that calling `initialize` on an already initialized state does not change existing
|
||||
attributes.
|
||||
"""
|
||||
state = State()
|
||||
|
||||
# Mock qi_session to avoid real session creation
|
||||
mock_session = mock.MagicMock()
|
||||
state.qi_session = mock_session
|
||||
|
||||
# Set state as already initialized
|
||||
state.is_initialized = True
|
||||
old_exit_event = state.exit_event
|
||||
|
||||
# Call initialize
|
||||
state.initialize()
|
||||
|
||||
# Ensure existing attributes were not overwritten
|
||||
assert state.exit_event == old_exit_event # exit_event should not be recreated
|
||||
assert state.qi_session == mock_session # qi_session should not be replaced
|
||||
assert state.is_initialized is True # is_initialized should remain True
|
||||
|
||||
|
||||
def test_deinitialize_behavior():
|
||||
"""Check that deinitialize closes sockets and updates the initialization state correctly."""
|
||||
state = State()
|
||||
|
||||
# Case 1: Initialized with sockets
|
||||
state.is_initialized = True
|
||||
mock_socket_1 = mock.Mock()
|
||||
mock_socket_2 = mock.Mock()
|
||||
state.sockets = [mock_socket_1, mock_socket_2]
|
||||
state.deinitialize()
|
||||
|
||||
# Sockets should be closed
|
||||
mock_socket_1.close.assert_called_once()
|
||||
mock_socket_2.close.assert_called_once()
|
||||
# State should be marked as not initialized
|
||||
assert not state.is_initialized
|
||||
|
||||
# Case 2: Not initialized, should not raise
|
||||
state.is_initialized = False
|
||||
state.sockets = []
|
||||
state.deinitialize()
|
||||
assert not state.is_initialized
|
||||
|
||||
|
||||
def test_access_control_before_initialization():
|
||||
"""Verify that accessing certain attributes before initialization raises RuntimeError."""
|
||||
state = State()
|
||||
|
||||
with pytest.raises(RuntimeError, match=".*sockets.*"):
|
||||
_ = state.sockets
|
||||
|
||||
with pytest.raises(RuntimeError, match=".*qi_session.*"):
|
||||
_ = state.qi_session
|
||||
|
||||
|
||||
def test_exit_event_before_initialized_returns_if_set():
|
||||
"""Check that exit_event can be accessed even if state is not initialized,
|
||||
but only if it is set."""
|
||||
state = State()
|
||||
|
||||
# Manually create and set the exit_event
|
||||
object.__setattr__(state, "exit_event", threading.Event())
|
||||
object.__getattribute__(state, "exit_event").set()
|
||||
|
||||
# Should return the event without raising
|
||||
assert state.exit_event.is_set()
|
||||
|
||||
|
||||
def test_getattribute_allowed_attributes_before_init():
|
||||
"""Ensure attributes allowed before initialization can be accessed without error."""
|
||||
state = State()
|
||||
|
||||
assert callable(state.initialize)
|
||||
assert callable(state.deinitialize)
|
||||
assert state.is_initialized is False
|
||||
assert state.__dict__ is not None
|
||||
assert state.__class__.__name__ == "State"
|
||||
assert state.__doc__ is not None
|
||||
|
||||
|
||||
def test_signal_handler_sets_exit_event(monkeypatch):
|
||||
"""Ensure SIGINT triggers the exit_event via signal handler."""
|
||||
state = State()
|
||||
|
||||
# Patch get_qi_session to prevent real session creation
|
||||
monkeypatch.setattr("robot_interface.state.get_qi_session", lambda: "dummy_session")
|
||||
|
||||
# Initialize state to set up signal handlers
|
||||
state.initialize()
|
||||
|
||||
# Simulate SIGINT
|
||||
signal_handler = signal.getsignal(signal.SIGINT)
|
||||
signal_handler(None, None)
|
||||
|
||||
# Exit event should be set
|
||||
assert state.exit_event.is_set()
|
||||
@@ -6,21 +6,11 @@ from robot_interface.utils.timeblock import TimeBlock
|
||||
|
||||
|
||||
class AnyFloat(object):
|
||||
"""
|
||||
A helper class used in tests to assert that a mock function was called
|
||||
with an argument that is specifically a float, regardless of its value.
|
||||
|
||||
It overrides the equality comparison (`__eq__`) to check only the type.
|
||||
"""
|
||||
def __eq__(self, other):
|
||||
return isinstance(other, float)
|
||||
|
||||
|
||||
def test_no_limit():
|
||||
"""
|
||||
Tests the scenario where the `TimeBlock` context manager is used without
|
||||
a time limit.
|
||||
"""
|
||||
callback = mock.Mock()
|
||||
|
||||
with TimeBlock(callback):
|
||||
@@ -30,10 +20,6 @@ def test_no_limit():
|
||||
|
||||
|
||||
def test_exceed_limit():
|
||||
"""
|
||||
Tests the scenario where the execution time within the `TimeBlock`
|
||||
exceeds the provided limit.
|
||||
"""
|
||||
callback = mock.Mock()
|
||||
|
||||
with TimeBlock(callback, 0):
|
||||
@@ -43,10 +29,6 @@ def test_exceed_limit():
|
||||
|
||||
|
||||
def test_within_limit():
|
||||
"""
|
||||
Tests the scenario where the execution time within the `TimeBlock`
|
||||
stays within the provided limit.
|
||||
"""
|
||||
callback = mock.Mock()
|
||||
|
||||
with TimeBlock(callback, 5):
|
||||
|
||||
@@ -1,99 +0,0 @@
|
||||
# coding=utf-8
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
import zmq
|
||||
|
||||
from robot_interface.endpoints.video_sender import VideoSender
|
||||
from robot_interface.state import state
|
||||
from robot_interface.core.config import settings
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def zmq_context():
|
||||
"""Provide a ZMQ context."""
|
||||
yield zmq.Context()
|
||||
|
||||
|
||||
def _patch_basics(mocker):
|
||||
"""Common patches: prevent real threads, port binds, and state errors."""
|
||||
mocker.patch("robot_interface.endpoints.socket_base.zmq.Socket.bind")
|
||||
mocker.patch("robot_interface.endpoints.video_sender.threading.Thread")
|
||||
mocker.patch.object(state, "is_initialized", True)
|
||||
|
||||
|
||||
def _patch_exit_event(mocker):
|
||||
"""Make exit_event stop the loop after one iteration."""
|
||||
fake_event = mock.Mock()
|
||||
fake_event.is_set.side_effect = [False, True]
|
||||
mocker.patch.object(state, "exit_event", fake_event)
|
||||
|
||||
|
||||
def test_no_qi_session(zmq_context, mocker):
|
||||
"""Video loop should not start without a qi_session."""
|
||||
_patch_basics(mocker)
|
||||
mocker.patch.object(state, "qi_session", None)
|
||||
|
||||
sender = VideoSender(zmq_context)
|
||||
sender.start_video_rcv()
|
||||
|
||||
assert not hasattr(sender, "thread")
|
||||
|
||||
|
||||
def test_video_streaming(zmq_context, mocker):
|
||||
"""VideoSender should send retrieved image data."""
|
||||
_patch_basics(mocker)
|
||||
_patch_exit_event(mocker)
|
||||
|
||||
# Pepper's image buffer lives at index 6
|
||||
mocker.patch.object(settings.video_config, "image_buffer", 6)
|
||||
|
||||
mock_video_service = mock.Mock()
|
||||
mock_video_service.getImageRemote.return_value = [None]*6 + ["fake_img"]
|
||||
|
||||
fake_session = mock.Mock()
|
||||
fake_session.service.return_value = mock_video_service
|
||||
mocker.patch.object(state, "qi_session", fake_session)
|
||||
|
||||
mocker.patch.object(
|
||||
fake_session.service("ALVideoDevice"),
|
||||
"subscribeCamera",
|
||||
return_value="stream_name"
|
||||
)
|
||||
|
||||
sender = VideoSender(zmq_context)
|
||||
send_socket = mock.Mock()
|
||||
sender.socket.send = send_socket
|
||||
|
||||
sender.start_video_rcv()
|
||||
sender.video_rcv_loop(mock_video_service, "stream_name")
|
||||
|
||||
send_socket.assert_called_with("fake_img")
|
||||
|
||||
|
||||
def test_video_receive_error(zmq_context, mocker):
|
||||
"""Errors retrieving images should not call send()."""
|
||||
_patch_basics(mocker)
|
||||
_patch_exit_event(mocker)
|
||||
|
||||
mock_video_service = mock.Mock()
|
||||
mock_video_service.getImageRemote.side_effect = Exception("boom")
|
||||
|
||||
fake_session = mock.Mock()
|
||||
fake_session.service.return_value = mock_video_service
|
||||
mocker.patch.object(state, "qi_session", fake_session)
|
||||
|
||||
mocker.patch.object(
|
||||
fake_session.service("ALVideoDevice"),
|
||||
"subscribeCamera",
|
||||
return_value="stream_name"
|
||||
)
|
||||
|
||||
sender = VideoSender(zmq_context)
|
||||
send_socket = mock.Mock()
|
||||
sender.socket.send = send_socket
|
||||
|
||||
sender.start_video_rcv()
|
||||
sender.video_rcv_loop(mock_video_service, "stream_name")
|
||||
|
||||
send_socket.assert_not_called()
|
||||
Reference in New Issue
Block a user