Files
pepperplus-ri/main.py
Twirre Meulenbelt c634e4b516 chore: replace print with logging and make robot conditional
All print statements in the main program, and components used by the main program, have been replaced with appropriate logging statements. The connection to the robot now only gets made when it's possible, otherwise only the microphone will be run.

ref: N25B-119
2025-10-02 16:13:39 +02:00

70 lines
1.6 KiB
Python

import sys
import logging
logging.
import zmq
from src.audio_streaming import AudioStreaming
from state import state
def say(session, message):
tts = session.service("ALTextToSpeech")
tts.say(message)
def listen_for_messages(session):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, u"") # u because Python 2 shenanigans
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
logging.info("Listening for messages")
while not state.exit_event.is_set():
if not poller.poll(200): continue # At most 200 ms delay after CTRL+C
# We now know there's a message waiting for us
message = socket.recv_string()
logging.debug("Received message: {}".format(message))
if session: say(session, message)
def get_session():
if "--qi-url" not in sys.argv:
logging.info("No Qi URL argument given. Running in stand-alone mode.")
return None
try:
import qi
except ImportError:
logging.info("Unable to import qi. Running in stand-alone mode.")
return None
try:
app = qi.Application()
app.start()
return app.session
except RuntimeError:
logging.info("Unable to connect to the robot. Running in stand-alone mode.")
return None
def main():
session = get_session()
audio_streamer = AudioStreaming()
audio_streamer.run()
listen_for_messages(session) # Runs indefinitely, until CTRL+C
if __name__ == "__main__":
try:
state.initialize()
main()
finally:
state.deinitialize()