Files
pepperplus-ri/main.py
2025-10-08 12:11:24 +02:00

131 lines
3.5 KiB
Python

import sys
import logging
import argparse
import zmq
import json
import logging
import hashlib
import socket
import time
from src.audio_streaming import AudioStreaming
from state import state
from urlparse import urlparse
def say(session, message):
tts = session.service("ALTextToSpeech")
tts.say(message)
def listen_for_messages(session):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, u"") # u because Python 2 shenanigans
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
logging.info("Listening for messages")
# Let the CB know we're connected.
pub = context.socket(zmq.PUB)
pub.bind("tcp://*:5555")
time.sleep(1)
print("Now attempting to send connection data to CB.")
connection_data = {
"event": "robot_connected",
"id": state.__getattribute__("id"),
"name": state.__getattribute__("name"),
"port": state.__getattribute__("port")
}
connection_json = json.dumps(connection_data)
pub.send_string(connection_json)
print("Send data: ", connection_json)
while not state.exit_event.is_set():
if not poller.poll(200): continue # At most 200 ms delay after CTRL+C
# We now know there's a message waiting for us
message = socket.recv_string()
logging.debug("Received message: {}".format(message))
if session: say(session, message)
def get_session():
if "--qi-url" not in sys.argv:
logging.info("No Qi URL argument given. Running in stand-alone mode.")
ip = resolve_local_ip()
port = -1
return None, ip, port
parser = argparse.ArgumentParser()
parser.add_argument("--qi-url", type=str, help="Qi URL argument")
parser.add_argument("--name", type=str, default="Robot without name", help="Optional robot name")
args = parser.parse_args()
name = args.name
parsed = urlparse(args.qi_url)
ip = parsed.hostname
port = parsed.port
# If URL uses localhost, get own ip instead
if ip in ("localhost", "127.0.0.1"):
ip = resolve_local_ip()
try:
import qi
except ImportError:
logging.info("Unable to import qi. Running in stand-alone mode.")
return None, ip, port, name
try:
app = qi.Application()
app.start()
return app.session, ip, port, name
except RuntimeError:
logging.info("Unable to connect to the robot. Running in stand-alone mode.")
return None, ip, port, name
def resolve_local_ip():
"""Return the actual local IP, not 127.0.0.1."""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80)) # Use a public IP just to resolve interface
ip = s.getsockname()[0]
s.close()
return ip
except Exception as e:
logging.warning("Could not resolve local IP: {}".format(e))
return "127.0.0.1"
def main():
session, ip, port, name = get_session()
# hash ip, port into id
id_source = "{}:{}".format(ip, port)
unique_id = hashlib.md5(id_source).hexdigest()
print("created unique id: ", unique_id)
state.id = unique_id
state.port = port
state.ip = ip
state.name = name
logging.info("Session ID: {} (from {})".format(unique_id, id_source))
listen_for_messages(session)
if __name__ == "__main__":
try:
state.initialize()
main()
finally:
state.deinitialize()