Refactored ZMQ context implementation #16

Merged
k.marinus merged 7 commits from refactor/zmq-internal-socket-behaviour into dev 2025-11-05 11:35:27 +00:00
Showing only changes of commit 2c867adce2 - Show all commits

View File

@@ -1,4 +1,4 @@
from unittest.mock import MagicMock
from unittest.mock import AsyncMock, patch
import pytest
from fastapi import FastAPI
@@ -16,7 +16,6 @@ def app():
"""
app = FastAPI()
app.include_router(command.router)
app.state.internal_comm_socket = MagicMock() # mock ZMQ socket
return app
@@ -26,32 +25,30 @@ def client(app):
return TestClient(app)
def test_receive_command_endpoint(client, app):
def test_receive_command_success(client):
"""
Test that a POST to /command sends the right multipart message
and returns a 202 with the expected JSON body.
Test for successful reception of a command.
Ensures the status code is 202 and the response body is correct.
It also verifies that the ZeroMQ socket's send_multipart method is called with the expected data.
"""
mock_socket = app.state.internal_comm_socket
# Arrange
mock_pub_socket = AsyncMock()
client.app.state.endpoints_pub_socket = mock_pub_socket
# Prepare test payload that matches SpeechCommand
payload = {"endpoint": "actuate/speech", "data": "yooo"}
command_data = {"endpoint": "actuate/speech", "data": "This is a test"}
speech_command = SpeechCommand(**command_data)
# Send POST request
response = client.post("/command", json=payload)
# Act
response = client.post("/command", json=command_data)
# Check response
# Assert
assert response.status_code == 202
assert response.json() == {"status": "Command received"}
# Verify that the socket was called with the correct data
assert mock_socket.send_multipart.called, "Socket should be used to send data"
args, kwargs = mock_socket.send_multipart.call_args
sent_data = args[0]
assert sent_data[0] == b"command"
# Check JSON encoding roughly matches
assert isinstance(SpeechCommand.model_validate_json(sent_data[1].decode()), SpeechCommand)
# Verify that the ZMQ socket was used correctly
mock_pub_socket.send_multipart.assert_awaited_once_with(
[b"command", speech_command.model_dump_json().encode()]
)
def test_receive_command_invalid_payload(client):