feat: basic implementation of standardized CB2RI communication API
Based on the N25B-A-14 article, this is a stub implementation of the RI2CB communication API. It implements the ping endpoint and provides a stub for the negotiation endpoint. ref: N25B-168
This commit is contained in:
67
src/robot_interface/main.py
Normal file
67
src/robot_interface/main.py
Normal file
@@ -0,0 +1,67 @@
|
||||
import zmq
|
||||
|
||||
|
||||
def handle_ping(message):
|
||||
"""A simple ping endpoint. Returns the provided data."""
|
||||
return {"endpoint": "ping", "data": message.get("data")}
|
||||
|
||||
|
||||
def handle_negotiation(message):
|
||||
"""
|
||||
Handle a negotiation request. Will respond with ports that can be used to connect to the robot.
|
||||
|
||||
:param message: The negotiation request message.
|
||||
:type message: dict
|
||||
|
||||
:return: A response dictionary with a 'ports' key containing a list of ports and their function.
|
||||
:rtype: dict[str, list[dict]]
|
||||
"""
|
||||
# TODO: .../error on all endpoints?
|
||||
return {"endpoint": "negotiation/error", "data": "The requested endpoint is not implemented."}
|
||||
|
||||
|
||||
def route_request(message):
|
||||
"""
|
||||
Handle a request message.
|
||||
|
||||
:param message: The request message.
|
||||
:type message: dict
|
||||
|
||||
:return: A response message.
|
||||
:rtype: dict
|
||||
"""
|
||||
print("Received request: {}".format(message))
|
||||
|
||||
if "endpoint" not in message:
|
||||
return {"endpoint": "error", "data": "No endpoint provided."}
|
||||
if message["endpoint"] == "ping":
|
||||
return handle_ping(message)
|
||||
elif message["endpoint"] == "negotiation":
|
||||
return handle_negotiation(message)
|
||||
|
||||
return {"endpoint": "error", "data": "The requested endpoint is not implemented."}
|
||||
|
||||
|
||||
def main_loop(socket):
|
||||
while True:
|
||||
request = socket.recv_json()
|
||||
response = route_request(request)
|
||||
socket.send_json(response)
|
||||
|
||||
|
||||
def main():
|
||||
context = zmq.Context()
|
||||
socket = context.socket(zmq.REP)
|
||||
socket.connect("tcp://localhost:5555")
|
||||
|
||||
try:
|
||||
main_loop(socket)
|
||||
except KeyboardInterrupt:
|
||||
print("User interrupted.")
|
||||
finally:
|
||||
socket.close()
|
||||
context.term()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user