10 Commits

Author SHA1 Message Date
JobvAlewijk
e129113c9e chore: forgot copyright 2026-01-29 17:53:57 +01:00
JobvAlewijk
18a4bde4ca test: added tests and docstrings
ref: N25B-397
2026-01-29 17:51:39 +01:00
Twirre Meulenbelt
815fc7bcde feat: publish face detection instead of req/res
ref: N25B-395
2026-01-29 17:18:39 +01:00
JobvAlewijk
3bbe97579d Merge branch 'main' of ssh://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-ri into feat/face-detection 2026-01-29 16:22:42 +01:00
JobvAlewijk
4afceccf46 feat: fixed connection !!! 2026-01-19 16:58:01 +01:00
JobvAlewijk
83099a2810 chore: modified into req reply socket on 5559 2026-01-17 14:01:32 +01:00
JobvAlewijk
4e9afbaaf5 Merge branch 'dev' of ssh://git.science.uu.nl/ics/sp/2025/n25b/pepperplus-ri into feat/face-detection 2026-01-16 16:48:51 +01:00
JobvAlewijk
49386ef8cd feat: communicate face to CB
Had to do some weird socket stuff

ref: N25B-397
2026-01-12 14:25:10 +01:00
JobvAlewijk
3b470c8f29 feat: fully working face detection
ref: N25B-397
2026-01-07 17:56:21 +01:00
JobvAlewijk
b8f71f6bee feat: base face detection
ref: N25B-397
2026-01-04 18:56:04 +01:00
41 changed files with 388 additions and 624 deletions

View File

@@ -6,9 +6,6 @@
# The hostname or IP address of the Control Backend. # The hostname or IP address of the Control Backend.
AGENT__CONTROL_BACKEND_HOST=localhost AGENT__CONTROL_BACKEND_HOST=localhost
# Whether to use Pepper's microphone when Pepper is connected.
AUDIO__USE_PEPPER_MICROPHONE=true
# Variables that are unlikely to be configured, you can probably ignore these: # Variables that are unlikely to be configured, you can probably ignore these:

View File

@@ -7,5 +7,3 @@ sphinx
sphinx_rtd_theme sphinx_rtd_theme
pre-commit pre-commit
python-dotenv python-dotenv
numpy<=1.16.6
enum34

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals from __future__ import unicode_literals
from robot_interface.utils.get_config import get_config from robot_interface.utils.get_config import get_config
@@ -24,6 +17,10 @@ class AgentSettings(object):
:vartype video_sender_port: int :vartype video_sender_port: int
:ivar audio_sender_port: Port used for sending audio data, defaults to 5558. :ivar audio_sender_port: Port used for sending audio data, defaults to 5558.
:vartype audio_sender_port: int :vartype audio_sender_port: int
:ivar face_detection_port: Port used for sending face detection events, defaults to 5559.
:vartype face_detection_port: int
:ivar face_detection_interval: Time between face detection events, defaults to 1000 ms.
:vartype face_detection_interval: int
""" """
def __init__( def __init__(
self, self,
@@ -32,12 +29,16 @@ class AgentSettings(object):
main_receiver_port=None, main_receiver_port=None,
video_sender_port=None, video_sender_port=None,
audio_sender_port=None, audio_sender_port=None,
face_detection_port=None,
face_detection_interval=None,
): ):
self.control_backend_host = get_config(control_backend_host, "AGENT__CONTROL_BACKEND_HOST", "localhost") self.control_backend_host = get_config(control_backend_host, "AGENT__CONTROL_BACKEND_HOST", "localhost")
self.actuation_receiver_port = get_config(actuation_receiver_port, "AGENT__ACTUATION_RECEIVER_PORT", 5557, int) self.actuation_receiver_port = get_config(actuation_receiver_port, "AGENT__ACTUATION_RECEIVER_PORT", 5557, int)
self.main_receiver_port = get_config(main_receiver_port, "AGENT__MAIN_RECEIVER_PORT", 5555, int) self.main_receiver_port = get_config(main_receiver_port, "AGENT__MAIN_RECEIVER_PORT", 5555, int)
self.video_sender_port = get_config(video_sender_port, "AGENT__VIDEO_SENDER_PORT", 5556, int) self.video_sender_port = get_config(video_sender_port, "AGENT__VIDEO_SENDER_PORT", 5556, int)
self.audio_sender_port = get_config(audio_sender_port, "AGENT__AUDIO_SENDER_PORT", 5558, int) self.audio_sender_port = get_config(audio_sender_port, "AGENT__AUDIO_SENDER_PORT", 5558, int)
self.face_detection_port = get_config(face_detection_port, "AGENT__FACE_DETECTION_PORT", 5559, int)
self.face_detection_interval = get_config(face_detection_interval, "AGENT__FACE_DETECTION_INTERVAL", 1000, int)
class VideoConfig(object): class VideoConfig(object):
@@ -68,7 +69,7 @@ class VideoConfig(object):
): ):
self.camera_index = get_config(camera_index, "VIDEO__CAMERA_INDEX", 0, int) self.camera_index = get_config(camera_index, "VIDEO__CAMERA_INDEX", 0, int)
self.resolution = get_config(resolution, "VIDEO__RESOLUTION", 2, int) self.resolution = get_config(resolution, "VIDEO__RESOLUTION", 2, int)
self.color_space = get_config(color_space, "VIDEO__COLOR_SPACE", 13, int) self.color_space = get_config(color_space, "VIDEO__COLOR_SPACE", 11, int)
self.fps = get_config(fps, "VIDEO__FPS", 15, int) self.fps = get_config(fps, "VIDEO__FPS", 15, int)
self.stream_name = get_config(stream_name, "VIDEO__STREAM_NAME", "Pepper Video") self.stream_name = get_config(stream_name, "VIDEO__STREAM_NAME", "Pepper Video")
self.image_buffer = get_config(image_buffer, "VIDEO__IMAGE_BUFFER", 6, int) self.image_buffer = get_config(image_buffer, "VIDEO__IMAGE_BUFFER", 6, int)
@@ -78,8 +79,6 @@ class AudioConfig(object):
""" """
Audio configuration constants. Audio configuration constants.
:ivar use_pepper_microphone: Whether to use Pepper's microphone or not, defaults to True.
:vartype use_pepper_microphone: bool
:ivar sample_rate: Audio sampling rate in Hz, defaults to 16000. :ivar sample_rate: Audio sampling rate in Hz, defaults to 16000.
:vartype sample_rate: int :vartype sample_rate: int
:ivar chunk_size: Size of audio chunks to capture/process, defaults to 512. :ivar chunk_size: Size of audio chunks to capture/process, defaults to 512.
@@ -87,14 +86,7 @@ class AudioConfig(object):
:ivar channels: Number of audio channels, defaults to 1. :ivar channels: Number of audio channels, defaults to 1.
:vartype channels: int :vartype channels: int
""" """
def __init__( def __init__(self, sample_rate=None, chunk_size=None, channels=None):
self,
use_pepper_microphone=None,
sample_rate=None,
chunk_size=None,
channels=None,
):
self.use_pepper_microphone = get_config(use_pepper_microphone, "AUDIO__USE_PEPPER_MICROPHONE", True, bool)
self.sample_rate = get_config(sample_rate, "AUDIO__SAMPLE_RATE", 16000, int) self.sample_rate = get_config(sample_rate, "AUDIO__SAMPLE_RATE", 16000, int)
self.chunk_size = get_config(chunk_size, "AUDIO__CHUNK_SIZE", 512, int) self.chunk_size = get_config(chunk_size, "AUDIO__CHUNK_SIZE", 512, int)
self.channels = get_config(channels, "AUDIO__CHANNELS", 1, int) self.channels = get_config(channels, "AUDIO__CHANNELS", 1, int)

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that we can log texts with Unicode characters from __future__ import unicode_literals # So that we can log texts with Unicode characters
import logging import logging
from threading import Thread from threading import Thread

View File

@@ -1,21 +1,7 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that `logging` can use Unicode characters in names from __future__ import unicode_literals # So that `logging` can use Unicode characters in names
import audioop
import enum
from abc import ABCMeta, abstractmethod
import threading import threading
import logging import logging
import Queue
import numpy as np
import pyaudio import pyaudio
import zmq import zmq
@@ -27,219 +13,86 @@ from robot_interface.core.config import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class AudioCapturer(object): class AudioSender(SocketBase):
""" """
Interface for audio capturers. Audio sender endpoint, responsible for sending microphone audio data.
"""
__metaclass__ = ABCMeta
@abstractmethod :param zmq_context: The ZeroMQ context to use.
def setup(self): :type zmq_context: zmq.Context
raise NotImplementedError()
@abstractmethod :param port: The port to use.
def stop(self): :type port: int
raise NotImplementedError()
@abstractmethod :ivar thread: Thread used for sending audio.
def generate_chunk(self): :vartype thread: threading.Thread | None
raise NotImplementedError()
class SampleRate(enum.Enum):
"""
Sample rate to use in Hz.
"""
LOW = 16000
HIGH = 48000
class PepperMicrophone(enum.Enum):
"""
Which of Pepper's microphones to use. In our case, some of the mics were damages/didn't work
well, so we choose to only use the fron right. If you have a Pepper robot with all working mics,
you might wish to use all microphones, to improve overall audio quality.
"""
ALL = 0
LEFT = 1
RIGHT = 2
FRONT_LEFT = 3
FRONT_RIGHT = 4
class QiAudioCapturer(AudioCapturer):
# Some of this class' methods have docstrings as binary strings. Keep them that way, otherwise
# ``qi.Session.registerService`` will give RuntimeErrors.
def __init__(self, sample_rate=SampleRate.LOW, mic=PepperMicrophone.FRONT_RIGHT,
deinterleaved=0):
"""
:raises RuntimeError: If there is no Qi session available.
:raises ValueError: If the given arguments are not compatible.
"""
self.session = state.qi_session
if not self.session:
raise RuntimeError("Cannot capture from qi device, no qi session available.")
if sample_rate == SampleRate.HIGH and mic != PepperMicrophone.ALL:
raise RuntimeError("For 48000 Hz, you must select all microphones.")
if mic == PepperMicrophone.ALL and sample_rate != SampleRate.HIGH:
raise RuntimeError("For using all microphones, 48000 Hz is required.")
self.audio = self.session.service("ALAudioDevice")
self.service_name = "ZmqAudioStreamer"
self.sample_rate = sample_rate
self.mic = mic
self.deinterleaved = deinterleaved
self.overflow = np.empty(0, dtype=np.float32)
self.q = Queue.Queue()
self._rate_state = None
def setup(self):
b"""
:raises RuntimeError: If no Qi session is available or if the session is not compatible with audio streaming.
"""
assert self.session is not None
logger.info("Listening with Pepper's microphone.")
self.session.registerService(self.service_name, self)
self.audio.setClientPreferences(self.service_name, self.sample_rate.value, self.mic.value,
self.deinterleaved)
self.audio.subscribe(self.service_name)
def stop(self):
b"""
Stop the audio capturer.
"""
try:
self.audio.unsubscribe(self.service_name)
except:
pass
def generate_chunk(self):
try:
chunk = self.q.get(True, 0.1)
return chunk
except Queue.Empty:
return None
# Callback invoked by NAOqi
def processRemote(self, nbOfChannels, nbOfSamplesByChannel, timeStamp, inputBuffer):
raw_pcm = bytes(inputBuffer)
pcm_i16 = np.frombuffer(raw_pcm, dtype=np.int16)
# Make mono channel (if it was 4 channels)
pcm_i32_mono = self._make_mono(pcm_i16.astype(np.int32), nbOfChannels)
# Resample (if it was 48k)
pcm_i32_mono_16k, self._rate_state = audioop.ratecv(pcm_i32_mono.tobytes(), 4, 1,
self.sample_rate.value,
SampleRate.LOW.value, self._rate_state)
pcm_f32_mono_16k = (np.frombuffer(pcm_i32_mono_16k, dtype=np.int32).astype(np.float32) /
32768.0)
# Attach overflow
pcm_f32_mono_16k = np.append(self.overflow, pcm_f32_mono_16k)
for i in range(len(pcm_f32_mono_16k) // 512):
self.q.put_nowait(pcm_f32_mono_16k[i * 512 : (i + 1) * 512].tobytes())
self.overflow = pcm_f32_mono_16k[len(pcm_f32_mono_16k) // 512 * 512 :]
def _make_mono(self, frag, channels):
return frag.reshape(-1, channels).mean(axis=1, dtype=np.int32)
class StandaloneAudioCapturer(AudioCapturer):
"""
Audio capturer that uses a microphone from the local device, can be chosen with the
``--microphone`` program argument.
:ivar audio: PyAudio instance. :ivar audio: PyAudio instance.
:vartype audio: pyaudio.PyAudio | None :vartype audio: pyaudio.PyAudio | None
:ivar microphone: Selected microphone information. :ivar microphone: Selected microphone information.
:vartype microphone: dict | None :vartype microphone: dict | None
:ivar stream: PyAudio stream instance. None until ``setup()`` is called, remaining None if setup
fails for any reason.
:vartype stream: pyaudio.Stream | None
""" """
def __init__(self): def __init__(self, zmq_context, port=settings.agent_settings.audio_sender_port):
self.stream = None super(AudioSender, self).__init__(str("audio")) # Convert future's unicode_literal to str
self.create_socket(zmq_context, zmq.PUB, port)
self.thread = None
try: try:
self.audio = pyaudio.PyAudio() self.audio = pyaudio.PyAudio()
self.microphone = choose_mic(self.audio) self.microphone = choose_mic(self.audio)
except IOError as e: except IOError as e:
logger.warning("PyAudio is not available. Won't be able to send audio.", exc_info=True) logger.warning("PyAudio is not available.", exc_info=e)
self.audio = None self.audio = None
self.microphone = None self.microphone = None
def setup(self): def start(self):
""" """
Setup audio stream. Will not if no microphone is available. Start sending audio in a different thread.
Will not start if no microphone is available.
""" """
if not self.microphone: if not self.microphone:
logger.info("Not listening: no microphone available.") logger.info("Not listening: no microphone available.")
return return
logger.info("Listening with microphone \"{}\".".format(self.microphone["name"])) logger.info("Listening with microphone \"{}\".".format(self.microphone["name"]))
self.stream = self.audio.open( self.thread = threading.Thread(target=self._stream)
format=pyaudio.paFloat32,
channels=settings.audio_config.channels,
rate=settings.audio_config.sample_rate,
input=True,
input_device_index=self.microphone["index"],
frames_per_buffer=settings.audio_config.chunk_size,
)
def stop(self):
"""
Close the audio stream.
"""
if not self.stream: return
self.stream.stop_stream()
self.stream.close()
def generate_chunk(self):
"""
:return: Audio frames from the microphone of size ``settings.audio_config.chunk_size``.
:rtype: bytes.
:raises IOError: If reading from the audio stream fails.
"""
return self.stream.read(settings.audio_config.chunk_size)
class AudioSender(SocketBase):
def __init__(self, zmq_context, port=settings.agent_settings.audio_sender_port):
super(AudioSender, self).__init__(str("audio"))
self.create_socket(zmq_context, zmq.PUB, port)
self.thread = threading.Thread(target=self.stream)
self.capturer = self.choose_capturer()
def start(self):
self.capturer.setup()
self.thread.start() self.thread.start()
def close(self): def wait_until_done(self):
self.capturer.stop() """
super(AudioSender, self).close() Wait until the audio thread is done.
def stream(self): Will block until `state.exit_event` is set. If the thread is not running, does nothing.
while not state.exit_event.is_set(): """
chunk = self.capturer.generate_chunk() if not self.thread: return
self.thread.join()
self.thread = None
if chunk is None or state.is_speaking: def _stream(self):
continue """
Internal method to continuously read audio from the microphone and send it over the socket.
"""
audio_settings = settings.audio_config
chunk = audio_settings.chunk_size # 320 at 16000 Hz is 20ms, 512 is required for Silero-VAD
self.socket.send(chunk) # Docs say this only raises an error if neither `input` nor `output` is True
stream = self.audio.open(
format=pyaudio.paFloat32,
channels=audio_settings.channels,
rate=audio_settings.sample_rate,
input=True,
input_device_index=self.microphone["index"],
frames_per_buffer=chunk,
)
def choose_capturer(self): try:
if state.qi_session and settings.audio_config.use_pepper_microphone: while not state.exit_event.is_set():
return QiAudioCapturer() data = stream.read(chunk)
if (state.is_speaking): continue # Do not send audio while the robot is speaking
return StandaloneAudioCapturer() self.socket.send(data)
except IOError as e:
logger.error("Stopped listening: failed to get audio from microphone.", exc_info=e)
finally:
stream.stop_stream()
stream.close()

View File

@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals
import json
import logging
import threading
import time
import zmq
from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state
from robot_interface.core.config import settings
class FaceDetectionSender(SocketBase):
"""
Face detection endpoint.
Subscribes to and polls ALMemory["FaceDetected"], sends events to CB.
"""
def __init__(self, zmq_context, port=settings.agent_settings.face_detection_port):
super(FaceDetectionSender, self).__init__("face")
self.create_socket(zmq_context, zmq.PUB, port)
self._face_service = None
self._memory_service = None
self._face_thread = None
def start_face_detection(self):
if not state.qi_session:
logging.warning("No Qi session available. Face detection not started.")
return
self._face_service = state.qi_session.service("ALFaceDetection")
self._memory_service = state.qi_session.service("ALMemory")
self._face_service.setTrackingEnabled(False)
self._face_service.setRecognitionEnabled(False)
self._face_service.subscribe(
"FaceDetectionSender",
settings.agent_settings.face_detection_interval,
0.0,
)
self._face_thread = threading.Thread(target=self._face_loop)
self._face_thread.start()
logging.info("Face detection started.")
def _face_loop(self):
"""
Continuously send face detected to the CB, at the interval set in the
``start_face_detection`` method.
"""
while not state.exit_event.is_set():
try:
value = self._memory_service.getData("FaceDetected", 0)
face_present = (
value
and len(value) > 1
and value[1]
and value[1][0]
and len(value[1][0]) > 0
)
self.socket.send(json.dumps({"face_detected": face_present}).encode("utf-8"))
except Exception:
logging.exception("Error reading FaceDetected")
time.sleep(settings.agent_settings.face_detection_interval / 1000.0)
def stop_face_detection(self):
try:
if self._face_service:
self._face_service.unsubscribe("FaceDetectionSender")
self._face_service.setTrackingEnabled(False)
logging.info("Face detection stopped.")
except Exception:
logging.warning("Error during face detection cleanup.")
def close(self):
super(FaceDetectionSender, self).close()
self.stop_face_detection()

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
class GestureTags: class GestureTags:
tags = ["above", "affirmative", "afford", "agitated", "all", "allright", "alright", "any", tags = ["above", "affirmative", "afford", "agitated", "all", "allright", "alright", "any",
"assuage", "assuage", "attemper", "back", "bashful", "beg", "beseech", "blank", "assuage", "assuage", "attemper", "back", "bashful", "beg", "beseech", "blank",

View File

@@ -1,16 +1,10 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import zmq import zmq
from robot_interface.endpoints.receiver_base import ReceiverBase from robot_interface.endpoints.receiver_base import ReceiverBase
from robot_interface.state import state from robot_interface.state import state
from robot_interface.core.config import settings from robot_interface.core.config import settings
from robot_interface.endpoints.face_detector import FaceDetectionSender
class MainReceiver(ReceiverBase): class MainReceiver(ReceiverBase):
@@ -44,6 +38,7 @@ class MainReceiver(ReceiverBase):
""" """
return {"endpoint": "ping", "data": message.get("data")} return {"endpoint": "ping", "data": message.get("data")}
@staticmethod @staticmethod
def _handle_port_negotiation(message): def _handle_port_negotiation(message):
""" """

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from robot_interface.endpoints.socket_base import SocketBase from robot_interface.endpoints.socket_base import SocketBase

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from abc import ABCMeta from abc import ABCMeta
import zmq import zmq

View File

@@ -1,16 +1,7 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import zmq import zmq
import threading import threading
import logging import logging
import struct
from robot_interface.endpoints.socket_base import SocketBase from robot_interface.endpoints.socket_base import SocketBase
from robot_interface.state import state from robot_interface.state import state
from robot_interface.core.config import settings from robot_interface.core.config import settings
@@ -28,7 +19,7 @@ class VideoSender(SocketBase):
""" """
def __init__(self, zmq_context, port=settings.agent_settings.video_sender_port): def __init__(self, zmq_context, port=settings.agent_settings.video_sender_port):
super(VideoSender, self).__init__("video") super(VideoSender, self).__init__("video")
self.create_socket(zmq_context, zmq.PUB, port, [(zmq.SNDHWM,3)]) self.create_socket(zmq_context, zmq.PUB, port, [(zmq.CONFLATE,1)])
def start_video_rcv(self): def start_video_rcv(self):
""" """
@@ -61,23 +52,10 @@ class VideoSender(SocketBase):
:param vid_stream_name: The name of a camera subscription on the video service object vid_service :param vid_stream_name: The name of a camera subscription on the video service object vid_service
:type vid_stream_name: str :type vid_stream_name: str
""" """
try: while not state.exit_event.is_set():
while not state.exit_event.is_set(): try:
try: img = vid_service.getImageRemote(vid_stream_name)
img = vid_service.getImageRemote(vid_stream_name) #Possibly limit images sent if queuing issues arise
if img is not None: self.socket.send(img[settings.video_config.image_buffer])
raw_data = img[6] except:
width = img[0] logging.warn("Failed to retrieve video image from robot.")
height = img[1]
width_bytes = struct.pack('<I', width)
height_bytes = struct.pack('<I', height)
self.socket.send_multipart([width_bytes, height_bytes, raw_data])
except KeyboardInterrupt:
logging.info("Video receiving loop interrupted by user.")
except:
logging.warn("Failed to retrieve video image from robot.")
finally:
vid_service.unsubscribe(vid_stream_name)
logging.info("Unsubscribed from video stream.")

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging import logging
from robot_interface.endpoints.audio_sender import AudioSender from robot_interface.endpoints.audio_sender import AudioSender
@@ -19,6 +12,8 @@ from robot_interface.endpoints.video_sender import VideoSender
from robot_interface.state import state from robot_interface.state import state
from robot_interface.core.config import settings from robot_interface.core.config import settings
from robot_interface.utils.timeblock import TimeBlock from robot_interface.utils.timeblock import TimeBlock
from robot_interface.endpoints.face_detector import FaceDetectionSender
def main_loop(context): def main_loop(context):
@@ -42,6 +37,12 @@ def main_loop(context):
video_sender.start_video_rcv() video_sender.start_video_rcv()
audio_sender.start() audio_sender.start()
# --- Face detection sender ---
face_sender = FaceDetectionSender(context)
state.sockets.append(face_sender)
face_sender.start_face_detection()
# Sockets that can run on the main thread. These sockets' endpoints should not block for long (say 50 ms at most). # Sockets that can run on the main thread. These sockets' endpoints should not block for long (say 50 ms at most).
receivers = [main_receiver, actuation_receiver] receivers = [main_receiver, actuation_receiver]

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging import logging
import signal import signal
import threading import threading

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -16,8 +9,6 @@ def get_config(value, env, default, cast=None):
Small utility to get a configuration value, returns `value` if it is not None, else it will try to get the Small utility to get a configuration value, returns `value` if it is not None, else it will try to get the
environment variable cast with `cast`. If the environment variable is not set, it will return `default`. environment variable cast with `cast`. If the environment variable is not set, it will return `default`.
Special handling for booleans, which are only true if the value of the variable is "true" or "yes", ignoring capitalization.
:param value: The value to check. :param value: The value to check.
:type value: Any :type value: Any
:param env: The environment variable to check. :param env: The environment variable to check.
@@ -35,14 +26,7 @@ def get_config(value, env, default, cast=None):
env = os.environ.get(env, default) env = os.environ.get(env, default)
if cast is None or env is None: if cast is None:
return env return env
if cast == bool:
if isinstance(env, bool):
return env
if not isinstance(default, bool):
raise ValueError("Default value must be a boolean if the cast type is a boolean.")
return env.lower() == "true" or env.lower() == "yes"
return cast(env) return cast(env)

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that `print` can print Unicode characters in names from __future__ import unicode_literals # So that `print` can print Unicode characters in names
import logging import logging
import sys import sys

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import logging import logging
import sys import sys
@@ -14,20 +7,6 @@ except ImportError:
qi = None qi = None
def _get_qi_url():
"""
Get the Qi URL from the command line arguments, or None if not given.
"""
if "--qi-url" in sys.argv:
return sys.argv[sys.argv.index("--qi-url") + 1]
for arg in sys.argv:
if arg.startswith("--qi-url="):
return arg[len("--qi-url="):]
return None
def get_qi_session(): def get_qi_session():
""" """
Create and return a Qi session if available. Create and return a Qi session if available.
@@ -39,13 +18,12 @@ def get_qi_session():
logging.info("Unable to import qi. Running in stand-alone mode.") logging.info("Unable to import qi. Running in stand-alone mode.")
return None return None
qi_url = _get_qi_url() if "--qi-url" not in sys.argv:
if qi_url is None:
logging.info("No Qi URL argument given. Running in stand-alone mode.") logging.info("No Qi URL argument given. Running in stand-alone mode.")
return None return None
try: try:
app = qi.Application(["--qi-url", qi_url, "--qi-listen-url", "tcp://0.0.0.0:0"]) app = qi.Application()
app.start() app.start()
return app.session return app.session
except RuntimeError: except RuntimeError:

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import time import time

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals # So that we can format strings with Unicode characters from __future__ import unicode_literals # So that we can format strings with Unicode characters
import random import random
import sys import sys

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from mock import patch, MagicMock from mock import patch, MagicMock
import pytest import pytest

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from mock import patch, mock from mock import patch, mock
from robot_interface.core.config import Settings from robot_interface.core.config import Settings

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pyaudio import pyaudio
import pytest import pytest

View File

@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import sys import sys
import mock import mock

View File

@@ -1,10 +1,4 @@
# -*- coding: utf-8 -*- # coding=utf-8
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import os import os
import mock import mock
@@ -33,33 +27,35 @@ def test_no_microphone(zmq_context, mocker):
mock_info_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger.info") mock_info_logger = mocker.patch("robot_interface.endpoints.audio_sender.logger.info")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic") mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = None mock_choose_mic.return_value = None
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
sender = AudioSender(zmq_context) sender = AudioSender(zmq_context)
assert sender.capturer.microphone is None assert sender.microphone is None
sender.capturer.setup() sender.start()
assert sender.thread is None
mock_info_logger.assert_called() mock_info_logger.assert_called()
sender.wait_until_done() # Should return early because we didn't start a thread
def test_unicode_mic_name(zmq_context, mocker): def test_unicode_mic_name(zmq_context, mocker):
""" """
Tests the robustness of the `AudioSender` when handling microphone names Tests the robustness of the `AudioSender` when handling microphone names
that contain Unicode characters. that contain Unicode characters.
""" """
mocker.patch("robot_interface.endpoints.audio_sender.threading")
mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic") mock_choose_mic = mocker.patch("robot_interface.endpoints.audio_sender.choose_mic")
mock_choose_mic.return_value = {"name": u"• Some Unicode name", "index": 0L} mock_choose_mic.return_value = {"name": u"• Some Unicode name"}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
sender = AudioSender(zmq_context) sender = AudioSender(zmq_context)
assert sender.capturer.microphone is not None assert sender.microphone is not None
sender.capturer.audio.open = mock.Mock(return_value=mock.Mock())
# `.setup()` logs the name of the microphone. It should not give an error if it contains Unicode # `.start()` logs the name of the microphone. It should not give an error if it contains Unicode
# symbols. # symbols.
sender.capturer.setup() sender.start()
assert sender.thread is not None
sender.wait_until_done() # Should return instantly because we didn't start a real thread
def _fake_read(num_frames): def _fake_read(num_frames):
@@ -77,7 +73,6 @@ def test_sending_audio(mocker):
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
mock_state.exit_event.is_set.side_effect = [False, True] mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock() mock_zmq_context = mock.Mock()
@@ -90,11 +85,11 @@ def test_sending_audio(mocker):
sender = AudioSender(mock_zmq_context) sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket sender.socket.send = send_socket
sender.capturer.audio.open = mock.Mock() sender.audio.open = mock.Mock()
sender.capturer.audio.open.return_value = stream sender.audio.open.return_value = stream
sender.start() sender.start()
sender.thread.join() sender.wait_until_done()
send_socket.assert_called() send_socket.assert_called()
@@ -107,7 +102,6 @@ def test_no_sending_if_speaking(mocker):
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
mock_state.exit_event.is_set.side_effect = [False, True] mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock() mock_zmq_context = mock.Mock()
@@ -121,11 +115,11 @@ def test_no_sending_if_speaking(mocker):
sender = AudioSender(mock_zmq_context) sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket sender.socket.send = send_socket
sender.capturer.audio.open = mock.Mock() sender.audio.open = mock.Mock()
sender.capturer.audio.open.return_value = stream sender.audio.open.return_value = stream
sender.start() sender.start()
sender.thread.join() sender.wait_until_done()
send_socket.assert_not_called() send_socket.assert_not_called()
@@ -145,7 +139,6 @@ def test_break_microphone(mocker):
mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L} mock_choose_mic.return_value = {"name": u"Some mic", "index": 0L}
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state") mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
mock_state.exit_event.is_set.side_effect = [False, True] mock_state.exit_event.is_set.side_effect = [False, True]
mock_zmq_context = mock.Mock() mock_zmq_context = mock.Mock()
@@ -157,11 +150,11 @@ def test_break_microphone(mocker):
sender = AudioSender(mock_zmq_context) sender = AudioSender(mock_zmq_context)
sender.socket.send = send_socket sender.socket.send = send_socket
sender.capturer.audio.open = mock.Mock() sender.audio.open = mock.Mock()
sender.capturer.audio.open.return_value = stream sender.audio.open.return_value = stream
sender.start() sender.start()
sender.thread.join() sender.wait_until_done()
send_socket.assert_not_called() send_socket.assert_not_called()
@@ -172,8 +165,6 @@ def test_pyaudio_init_failure(mocker, zmq_context):
""" """
# Prevent binding the ZMQ socket # Prevent binding the ZMQ socket
mocker.patch("robot_interface.endpoints.audio_sender.AudioSender.create_socket") mocker.patch("robot_interface.endpoints.audio_sender.AudioSender.create_socket")
mock_state = mocker.patch("robot_interface.endpoints.audio_sender.state")
mock_state.qi_session = None
# Simulate PyAudio() failing # Simulate PyAudio() failing
mocker.patch( mocker.patch(
@@ -183,5 +174,5 @@ def test_pyaudio_init_failure(mocker, zmq_context):
sender = AudioSender(zmq_context) sender = AudioSender(zmq_context)
assert sender.capturer.audio is None assert sender.audio is None
assert sender.capturer.microphone is None assert sender.microphone is None

View File

@@ -0,0 +1,175 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from __future__ import unicode_literals
import json
import mock
import pytest
from robot_interface.endpoints.face_detector import FaceDetectionSender
from robot_interface.state import state
@pytest.fixture(autouse=True)
def initialized_state(monkeypatch):
"""
Fully initialize global state so __getattribute__ allows access.
"""
# Bypass the initialization guard
monkeypatch.setattr(state, "is_initialized", True, raising=False)
# Install a controllable exit_event
exit_event = mock.Mock()
exit_event.is_set = mock.Mock(return_value=True)
monkeypatch.setattr(state, "exit_event", exit_event, raising=False)
# Default qi_session is None unless overridden
monkeypatch.setattr(state, "qi_session", None, raising=False)
yield
def test_start_face_detection_no_qi_session():
"""
Returns early when qi_session is None.
"""
sender = FaceDetectionSender(mock.Mock())
sender.start_face_detection()
assert sender._face_thread is None
assert sender._face_service is None
assert sender._memory_service is None
def test_start_face_detection_happy_path(mocker):
"""
Initializes services and starts background thread.
"""
mock_face = mock.Mock()
mock_memory = mock.Mock()
mock_qi = mock.Mock()
mock_qi.service.side_effect = lambda name: {
"ALFaceDetection": mock_face,
"ALMemory": mock_memory,
}[name]
state.qi_session = mock_qi
fake_thread = mock.Mock()
mocker.patch("threading.Thread", return_value=fake_thread)
sender = FaceDetectionSender(mock.Mock())
sender.start_face_detection()
mock_face.setTrackingEnabled.assert_called_with(False)
mock_face.setRecognitionEnabled.assert_called_with(False)
mock_face.subscribe.assert_called_once()
fake_thread.start.assert_called_once()
def test_face_loop_face_detected_true(mocker):
"""
Sends face_detected=True when face data exists.
"""
sender = FaceDetectionSender(mock.Mock())
sender._memory_service = mock.Mock()
sender._memory_service.getData.return_value = [0, [[1]]]
sender.socket = mock.Mock()
mocker.patch("time.sleep")
state.exit_event.is_set.side_effect = [False, True]
sender._face_loop()
sent = sender.socket.send.call_args[0][0]
payload = json.loads(sent.decode("utf-8"))
assert payload["face_detected"] is True
def test_face_loop_face_detected_false(mocker):
"""
Sends face_detected=False when no face data exists.
"""
sender = FaceDetectionSender(mock.Mock())
sender._memory_service = mock.Mock()
sender._memory_service.getData.return_value = []
sender.socket = mock.Mock()
mocker.patch("time.sleep")
state.exit_event.is_set.side_effect = [False, True]
sender._face_loop()
sent = sender.socket.send.call_args[0][0]
payload = json.loads(sent.decode("utf-8"))
assert not payload["face_detected"]
def test_face_loop_handles_exception(mocker):
"""
Exceptions inside loop are swallowed.
"""
sender = FaceDetectionSender(mock.Mock())
sender._memory_service = mock.Mock()
sender._memory_service.getData.side_effect = Exception("boom")
sender.socket = mock.Mock()
mocker.patch("time.sleep")
state.exit_event.is_set.side_effect = [False, True]
# Must not raise
sender._face_loop()
def test_stop_face_detection_happy_path():
"""
Unsubscribes and disables tracking.
"""
sender = FaceDetectionSender(mock.Mock())
mock_face = mock.Mock()
sender._face_service = mock_face
sender.stop_face_detection()
mock_face.unsubscribe.assert_called_once()
mock_face.setTrackingEnabled.assert_called_with(False)
def test_stop_face_detection_exception():
"""
stop_face_detection swallows service exceptions.
"""
sender = FaceDetectionSender(mock.Mock())
mock_face = mock.Mock()
mock_face.unsubscribe.side_effect = Exception("fail")
sender._face_service = mock_face
sender.stop_face_detection()
def test_close_calls_stop_face_detection(mocker):
"""
close() calls parent close and stop_face_detection().
"""
sender = FaceDetectionSender(mock.Mock())
mocker.patch.object(sender, "stop_face_detection")
mocker.patch(
"robot_interface.endpoints.face_detector.SocketBase.close"
)
sender.close()
sender.stop_face_detection.assert_called_once()

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
from robot_interface.utils.get_config import get_config from robot_interface.utils.get_config import get_config
@@ -50,58 +43,3 @@ def test_get_config_casts_default_when_env_missing(monkeypatch):
result = get_config(None, "GET_CONFIG_MISSING", "42", int) result = get_config(None, "GET_CONFIG_MISSING", "42", int)
assert result == 42 assert result == 42
def test_get_config_unset_boolean_default(monkeypatch):
"""
When the env var is a boolean, and it's not set, ensure it uses the default value.
"""
monkeypatch.delenv("SOME_BOOLEAN_VARIABLE", raising=False)
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == False
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == True
def test_get_config_true_boolean(monkeypatch):
"""
When the env var is a boolean, and its value is "true", "TRUE", "yes", etc., it should return true.
"""
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "TRUE")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "true")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "yes")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "YES")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "TrUE")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", False, bool)
assert result == True
def test_get_config_false_boolean(monkeypatch):
"""
When the env var is a boolean, and its value is not "true", "TRUE", "yes", etc., it should return False.
"""
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "FALSE")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == False
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "false")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == False
monkeypatch.setenv("SOME_BOOLEAN_VARIABLE", "anything, tbh")
result = get_config(None, "SOME_BOOLEAN_VARIABLE", True, bool)
assert result == False

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pytest import pytest
import threading import threading
import zmq import zmq
@@ -62,6 +55,9 @@ class DummySender:
def start(self): def start(self):
self.called = True self.called = True
def start_face_detection(self):
self.called = True
def close(self): def close(self):
pass pass
@@ -115,11 +111,13 @@ def patched_main_components(monkeypatch, fake_sockets, fake_poll):
fake_act = FakeReceiver(act_sock) fake_act = FakeReceiver(act_sock)
video_sender = DummySender() video_sender = DummySender()
audio_sender = DummySender() audio_sender = DummySender()
face_sender = DummySender()
monkeypatch.setattr(main_mod, "MainReceiver", lambda ctx: fake_main) monkeypatch.setattr(main_mod, "MainReceiver", lambda ctx: fake_main)
monkeypatch.setattr(main_mod, "ActuationReceiver", lambda ctx: fake_act) monkeypatch.setattr(main_mod, "ActuationReceiver", lambda ctx: fake_act)
monkeypatch.setattr(main_mod, "VideoSender", lambda ctx: video_sender) monkeypatch.setattr(main_mod, "VideoSender", lambda ctx: video_sender)
monkeypatch.setattr(main_mod, "AudioSender", lambda ctx: audio_sender) monkeypatch.setattr(main_mod, "AudioSender", lambda ctx: audio_sender)
monkeypatch.setattr(main_mod, "FaceDetectionSender", lambda ctx: face_sender)
# Register sockets for the fake poller # Register sockets for the fake poller
fake_poll.registered = {main_sock: zmq.POLLIN, act_sock: zmq.POLLIN} fake_poll.registered = {main_sock: zmq.POLLIN, act_sock: zmq.POLLIN}

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock import mock
import pytest import pytest
import zmq import zmq

View File

@@ -1,10 +1,4 @@
# -*- coding: utf-8 -*- # coding=utf-8
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock import mock
import pytest import pytest

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import sys import sys
# Import module under test # Import module under test
@@ -62,7 +55,7 @@ def test_get_qi_session_runtime_error(monkeypatch):
raise RuntimeError("boom") raise RuntimeError("boom")
class FakeQi: class FakeQi:
Application = lambda *args, **kwargs: FakeApp() Application = lambda self=None: FakeApp()
reload_qi_utils_with(FakeQi()) reload_qi_utils_with(FakeQi())
@@ -87,7 +80,7 @@ def test_get_qi_session_success(monkeypatch):
return True return True
class FakeQi: class FakeQi:
Application = lambda *args, **kwargs: FakeApp() Application = lambda self=None: FakeApp()
reload_qi_utils_with(FakeQi()) reload_qi_utils_with(FakeQi())

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import pytest import pytest
from robot_interface.endpoints.receiver_base import ReceiverBase from robot_interface.endpoints.receiver_base import ReceiverBase

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import mock import mock
import zmq import zmq
from robot_interface.endpoints.socket_base import SocketBase from robot_interface.endpoints.socket_base import SocketBase

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import threading import threading
import signal import signal
import pytest import pytest

View File

@@ -1,10 +1,3 @@
# -*- coding: utf-8 -*-
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import time import time
import mock import mock

View File

@@ -1,11 +1,5 @@
# -*- coding: utf-8 -*- # coding=utf-8
"""
This program has been developed by students from the bachelor Computer Science at Utrecht
University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
"""
import struct
import mock import mock
import pytest import pytest
import zmq import zmq
@@ -54,10 +48,8 @@ def test_video_streaming(zmq_context, mocker):
# Pepper's image buffer lives at index 6 # Pepper's image buffer lives at index 6
mocker.patch.object(settings.video_config, "image_buffer", 6) mocker.patch.object(settings.video_config, "image_buffer", 6)
test_width = 320
test_height = 240
mock_video_service = mock.Mock() mock_video_service = mock.Mock()
mock_video_service.getImageRemote.return_value = [test_width, test_height, None, None, None, None, b"fake_img"] mock_video_service.getImageRemote.return_value = [None]*6 + ["fake_img"]
fake_session = mock.Mock() fake_session = mock.Mock()
fake_session.service.return_value = mock_video_service fake_session.service.return_value = mock_video_service
@@ -71,16 +63,12 @@ def test_video_streaming(zmq_context, mocker):
sender = VideoSender(zmq_context) sender = VideoSender(zmq_context)
send_socket = mock.Mock() send_socket = mock.Mock()
sender.socket.send_multipart = send_socket sender.socket.send = send_socket
sender.start_video_rcv() sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name") sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_called_with([ send_socket.assert_called_with("fake_img")
struct.pack('<I', 320),
struct.pack('<I', 240),
b"fake_img"
])
def test_video_receive_error(zmq_context, mocker): def test_video_receive_error(zmq_context, mocker):
@@ -103,30 +91,9 @@ def test_video_receive_error(zmq_context, mocker):
sender = VideoSender(zmq_context) sender = VideoSender(zmq_context)
send_socket = mock.Mock() send_socket = mock.Mock()
sender.socket.send_multipart = send_socket sender.socket.send = send_socket
sender.start_video_rcv() sender.start_video_rcv()
sender.video_rcv_loop(mock_video_service, "stream_name") sender.video_rcv_loop(mock_video_service, "stream_name")
send_socket.assert_not_called() send_socket.assert_not_called()
def test_video_loop_keyboard_interrupt(zmq_context, mocker):
"""Video loop should handle KeyboardInterrupt gracefully and unsubscribe."""
_patch_basics(mocker)
_patch_exit_event(mocker)
# We mock the video service to raise KeyboardInterrupt when accessed
mock_video_service = mock.Mock()
mock_video_service.getImageRemote.side_effect = KeyboardInterrupt
# Mock logging to verify the specific interrupt message is logged
mock_logger = mocker.patch("robot_interface.endpoints.video_sender.logging")
sender = VideoSender(zmq_context)
# Execute the loop
sender.video_rcv_loop(mock_video_service, "stream_name")
# Verify the 'finally' block executed (unsubscribe)
mock_video_service.unsubscribe.assert_called_with("stream_name")
mock_logger.info.assert_any_call("Unsubscribed from video stream.")