WIP: Draft: feat: implemented video test function #26

Closed
s.o.h.luijkx wants to merge 1 commits from feat/video-test-function into dev
s.o.h.luijkx commented 2026-01-22 10:59:32 +00:00 (Migrated from git.science.uu.nl)

--- ON HOLD DUE TO PIPELINE ISSUES ---

If there is no qi session, the webcam of the device is used to send video.

Verify by:

  • Read new code
  • Run tests
  • Verify receiving correct video data by running script below
  1. Make a new file in CB src/control_backend with the content below.
  2. Run the RI.
  3. In the CB terminal, run 'PYTHONPATH=src uv run -m src.control_backend.<name_of_new_file>' (no .py extension).
    A window should open showing your webcam video.
import zmq
import cv2
import numpy as np

# --- CONFIGURATION ---
PORT = 5556
# ---------------------

def receive_video_stream():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    
    # Connect to the sender
    # If the sender is on another machine, change 'localhost' to its IP address
    socket.connect("tcp://localhost:{}".format(PORT))
    
    # Important: You must subscribe to the topic (empty string = all topics)
    socket.setsockopt(zmq.SUBSCRIBE, b"")
    
    # Optional: Keep only the last message to prevent lag (matches sender's CONFLATE)
    socket.setsockopt(zmq.CONFLATE, 1)

    print("Listening for video stream on port {}... Press 'q' to exit.".format(PORT))

    while True:
        try:
            # 1. Receive the raw bytes (JPEG data)
            jpg_buffer = socket.recv()

            # 2. Convert raw string/bytes to numpy array
            np_arr = np.frombuffer(jpg_buffer, dtype=np.uint8)

            # 3. Decode the JPEG image back to a frame
            frame = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)

            if frame is None:
                print("Received data, but could not decode image.")
                continue

            # 4. Display the frame
            cv2.imshow("Receiver Test", frame)

            # Exit if 'q' is pressed
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        except KeyboardInterrupt:
            print("\nStopping manually...")
            break
        except Exception as e:
            print("Error: {}".format(e))
            break

    # Cleanup
    socket.close()
    context.term()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    receive_video_stream()
--- ON HOLD DUE TO PIPELINE ISSUES --- If there is no qi session, the webcam of the device is used to send video. Verify by: - [ ] Read new code - [ ] Run tests - [ ] Verify receiving correct video data by running script below 1. Make a new file in CB src/control_backend with the content below. 2. Run the RI. 3. In the CB terminal, run 'PYTHONPATH=src uv run -m src.control_backend.<name_of_new_file>' (no .py extension). A window should open showing your webcam video. ``` import zmq import cv2 import numpy as np # --- CONFIGURATION --- PORT = 5556 # --------------------- def receive_video_stream(): context = zmq.Context() socket = context.socket(zmq.SUB) # Connect to the sender # If the sender is on another machine, change 'localhost' to its IP address socket.connect("tcp://localhost:{}".format(PORT)) # Important: You must subscribe to the topic (empty string = all topics) socket.setsockopt(zmq.SUBSCRIBE, b"") # Optional: Keep only the last message to prevent lag (matches sender's CONFLATE) socket.setsockopt(zmq.CONFLATE, 1) print("Listening for video stream on port {}... Press 'q' to exit.".format(PORT)) while True: try: # 1. Receive the raw bytes (JPEG data) jpg_buffer = socket.recv() # 2. Convert raw string/bytes to numpy array np_arr = np.frombuffer(jpg_buffer, dtype=np.uint8) # 3. Decode the JPEG image back to a frame frame = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) if frame is None: print("Received data, but could not decode image.") continue # 4. Display the frame cv2.imshow("Receiver Test", frame) # Exit if 'q' is pressed if cv2.waitKey(1) & 0xFF == ord('q'): break except KeyboardInterrupt: print("\nStopping manually...") break except Exception as e: print("Error: {}".format(e)) break # Cleanup socket.close() context.term() cv2.destroyAllWindows() if __name__ == "__main__": receive_video_stream() ```
s.o.h.luijkx commented 2026-01-22 11:15:47 +00:00 (Migrated from git.science.uu.nl)

changed the description

changed the description
0950726 commented 2026-01-22 11:59:32 +00:00 (Migrated from git.science.uu.nl)

marked this merge request as draft

marked this merge request as **draft**
k.marinus (Migrated from git.science.uu.nl) closed this pull request 2026-01-29 13:21:03 +00:00
k.marinus commented 2026-01-29 13:21:32 +00:00 (Migrated from git.science.uu.nl)

Will not be implemented since no time for cv2 support on RI

Will not be implemented since no time for cv2 support on RI

Pull request closed

Sign in to join this conversation.
No Reviewers
No Label
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: pepperplus/pepperplus-ri#26