chore: cleanup for merging/ switching to later branches

This commit is contained in:
Björn Otgaar
2025-10-30 17:07:20 +01:00
parent 8990af88fb
commit 4688a8fe17
5 changed files with 0 additions and 289 deletions

View File

@@ -2,88 +2,6 @@
### Linux (or WSL)
Start off by installing [Pyenv](https://github.com/pyenv/pyenv?tab=readme-ov-file#installation) and walk through the steps outlined there (be sure to also add it to PATH). Also install the [Python build requirements](https://github.com/pyenv/pyenv/wiki#suggested-build-environment). Afterwards, install Python 2.7 and activate it for your current shell:
<<<<<<< HEAD
```bash
pyenv install 2.7
pyenv shell 2.7
```
You can check that this worked by typing
```bash
python -V
```
Which should return `Python 2.7.18`.
Next, `cd` into this repository and create (and activate) a virtual environment:
```bash
cd <path to project>/
python -m pip install virtualenv
python -m virtualenv .venv
source .venv/bin/activate
```
To be able to install the PyAudio Python package, you'll need to have the `portaudio` system package installed. On Debian or Ubuntu:
```shell
sudo apt install portaudio19-dev
```
Then you can install the required packages with
```bash
pip install -r requirements.txt
```
Now we need to install the NaoQi SDK into our virtual environment, which we need to do manually. Begin by downloading the SDK:
```bash
wget https://community-static.aldebaran.com/resources/2.5.10/Python%20SDK/pynaoqi-python2.7-2.5.7.1-linux64.tar.gz
```
Next, move into the `site-packages` directory and extract the file you just downloaded:
```bash
cd .venv/lib/python2.7/site-packages/
tar xvfz <path to SDK>/pynaoqi-python2.7-2.5.7.1-linux64.tar.gz
rm <path to SDK>/pynaoqi-python2.7-2.5.7.1-linux64.tar.gz
```
Lastly, we need to inform our virtual environment where to find our newly installed package:
```bash
echo <path to project>/.venv/lib/python2.7/site-packages/pynaoqi-python2.7-2.5.7.1-linux64/lib/python2.7/site-packages/ > pynaoqi-python2.7.pth
```
That's it! Verify that it works with
```bash
python -c "import qi; print(qi)"
```
You should now be able to run this project.
### MacOS
On ARM CPU's, pyenv doesn't want to install Python 2. You can download and install it from [the Python website](https://www.python.org/downloads/release/python-2718/).
Create a virtual environment as described in the Linux section.
Then build `portaudio` for x86_64 CPU's.
Then follow the remaining installation instructions in the Linux section.
## Running
Assuming you have the virtual environment activated (`source .venv/bin/activate` on Linux) and that you have a virtual robot running on localhost you should be able to run this project by typing
```bash
python main.py --qi-url tcp://localhost:<port>
```
where `<port>` is the port on which your robot is running.
=======
The robot interface is a high-level API for controlling the robot. It implements the API as designed: https://utrechtuniversity.youtrack.cloud/articles/N25B-A-14/RI-CB-Communication.
This is an implementation for the Pepper robot, using the Pepper SDK and Python 2.7 as required by the SDK.
@@ -216,4 +134,3 @@ If your commit fails its either:
branch name != <type>/description-of-branch ,
commit name != <type>: description of the commit.
<ref>: N25B-Num's
>>>>>>> origin/dev

149
main.py
View File

@@ -1,149 +0,0 @@
import sys
import logging
import argparse
import zmq
import json
import logging
import hashlib
import socket
import time
from src.audio_streaming import AudioStreaming
from state import state
from urlparse import urlparse
def say(session, message):
tts = session.service("ALTextToSpeech")
tts.say(message)
def listen_for_messages(session):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, u"") # u because Python 2 shenanigans
# Let the CB know we're connected.
pub = context.socket(zmq.PUB)
pub.bind("tcp://*:5555")
time.sleep(1)
print("Now attempting to send connection data to CB.")
connection_data = {
"event": "robot_connected",
"id": state.__getattribute__("id"),
"name": state.__getattribute__("name"),
"port": state.__getattribute__("port")
}
connection_json = json.dumps(connection_data)
pub.send_string(connection_json)
print("Send data: ", connection_json)
listening_loop(socket, pub)
def listening_loop(socket, pub):
print("Entered listening loop.")
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
logging.info("Listening for messages")
while not state.exit_event.is_set():
if not poller.poll(200): continue # At most 200 ms delay after CTRL+C
# We now know there's a message waiting for us
message = socket.recv_string()
logging.debug("Received message: {}".format(message))
handle_message(message, pub)
def handle_message(msg, pub):
"""
Parse the message and act accordingly to conditional statements.
"""
msgs = msg.split(",") # Let's split the attributes
if msgs[0] == "ping":
# return ping
ping_data = {
"event": "ping",
"id": state.__getattribute__("id"),
}
ping_json = json.dumps(ping_data)
pub.send_string(ping_json)
logging.debug("Returned ping.")
return
# Other if statements such as `if msgs[0] == "say"`
return
def get_session():
if "--qi-url" not in sys.argv:
logging.info("No Qi URL argument given. Running in stand-alone mode.")
ip = resolve_local_ip()
port = -1
return None, ip, port
parser = argparse.ArgumentParser()
parser.add_argument("--qi-url", type=str, help="Qi URL argument")
parser.add_argument("--name", type=str, default="Robot without name", help="Optional robot name")
args = parser.parse_args()
name = args.name
parsed = urlparse(args.qi_url)
ip = parsed.hostname
port = parsed.port
# If URL uses localhost, get own ip instead
if ip in ("localhost", "127.0.0.1"):
ip = resolve_local_ip()
try:
import qi
except ImportError:
logging.info("Unable to import qi. Running in stand-alone mode.")
return None, ip, port, name
try:
app = qi.Application()
app.start()
return app.session, ip, port, name
except RuntimeError:
logging.info("Unable to connect to the robot. Running in stand-alone mode.")
return None, ip, port, name
def resolve_local_ip():
"""Return the actual local IP, not 127.0.0.1."""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80)) # Use a public IP just to resolve interface
ip = s.getsockname()[0]
s.close()
return ip
except Exception as e:
logging.warning("Could not resolve local IP: {}".format(e))
return "127.0.0.1"
def main():
session, ip, port, name = get_session()
# hash ip, port into id
id_source = "{}:{}".format(ip, port)
unique_id = hashlib.md5(id_source).hexdigest()
print("created unique id: ", unique_id)
state.id = unique_id
state.port = port
state.ip = ip
state.name = name
logging.info("Session ID: {} (from {})".format(unique_id, id_source))
listen_for_messages(session)
if __name__ == "__main__":
try:
state.initialize()
main()
finally:
state.deinitialize()

View File

@@ -1,9 +1,5 @@
pyzmq<16
<<<<<<< HEAD
pyaudio<=0.2.11
=======
pyaudio<=0.2.11
pytest<5
pytest-mock<3.0.0
pytest-cov<3.0.0
>>>>>>> origin/dev

View File

@@ -1,53 +0,0 @@
import logging
import signal
import threading
class State(object):
"""
Do not create an instance of this class directly: use the instance `state` below. This state must be initiated once,
probably when your program starts.
This class is used to share state between threads. For example, when the program is quit, that all threads can
detect this via the `exit_event` property being set.
"""
def __init__(self):
self.is_initialized = False
self.exit_event = None
def initialize(self):
if self.is_initialized:
logging.warn("Already initialized")
return
self.exit_event = threading.Event()
def handle_exit(_, __):
logging.info("Exiting.")
self.exit_event.set()
signal.signal(signal.SIGINT, handle_exit)
signal.signal(signal.SIGTERM, handle_exit)
self.is_initialized = True
def deinitialize(self):
if not self.is_initialized: return
self.is_initialized = False
def __getattribute__(self, name):
# Enforce that the state is initialized before accessing any property (aside from the basic ones)
if name in ("initialize", "deinitialize", "is_initialized", "__dict__", "__class__", "id", "name", "ip", "port"):
return object.__getattribute__(self, name)
if not object.__getattribute__(self, "is_initialized"):
# Special case for the exit_event: if the event is set, return it without an error
if name == "exit_event":
exit_event = object.__getattribute__(self, "exit_event")
if exit_event and exit_event.is_set(): return exit_event
raise RuntimeError("State must be initialized before accessing '%s'" % name)
return object.__getattribute__(self, name)
# Must call `.initialize` before use
state = State()

BIN
state.pyc

Binary file not shown.