Files
pepperplus-ri/src/robot_interface/utils/timeblock.py
Twirre Meulenbelt 4c3aa3a911 feat: adapt actuation receiver to state's qi_session
Makes actuation tests pass. In main, the timing of the socket no longer contains the time to receive and send data, but only the processing time of the message handler.

ref: N25B-168
2025-10-16 21:46:46 +02:00

32 lines
1.2 KiB
Python

import time
class TimeBlock(object):
"""
A context manager that times the execution of the block it contains. If execution exceeds the
limit, or if no limit is given, the callback will be called with the time that the block took.
"""
def __init__(self, callback, limit_ms=None):
"""
:param callback: The callback function that is called when the block of code is over,
unless the code block did not exceed the time limit.
:type callback: Callable[[float], None]
:param limit_ms: The number of milliseconds the block of code is allowed to take. If it
exceeds this time, or if it's None, the callback function will be called with the time the
block took.
:type limit_ms: int | None
"""
self.limit_ms = float(limit_ms) if limit_ms is not None else None
self.callback = callback
self.start = None
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, exc_type, exc_value, traceback):
elapsed = (time.time() - self.start) * 1000.0 # ms
if self.limit_ms is None or elapsed > self.limit_ms:
self.callback(elapsed)