Use ZMQ's global context instance and setup an XPUB/XSUB proxy intermediary to allow for easier multi-pubs. close: N25B-217
94 lines
2.9 KiB
Python
94 lines
2.9 KiB
Python
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
from control_backend.api.v1.endpoints import command
|
|
from control_backend.schemas.ri_message import SpeechCommand
|
|
|
|
|
|
@pytest.fixture
|
|
def app():
|
|
"""
|
|
Creates a FastAPI test app and attaches the router under test.
|
|
Also sets up a mock internal_comm_socket.
|
|
"""
|
|
app = FastAPI()
|
|
app.include_router(command.router)
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def client(app):
|
|
"""Create a test client for the app."""
|
|
return TestClient(app)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@patch("control_backend.api.endpoints.command.Context.instance")
|
|
async def test_receive_command_success(mock_context_instance, async_client):
|
|
"""
|
|
Test for successful reception of a command.
|
|
Ensures the status code is 202 and the response body is correct.
|
|
It also verifies that the ZeroMQ socket's send_multipart method is called with the expected data.
|
|
"""
|
|
# Arrange
|
|
mock_pub_socket = AsyncMock()
|
|
mock_context_instance.return_value.socket.return_value = mock_pub_socket
|
|
|
|
command_data = {"command": "test_command", "text": "This is a test"}
|
|
speech_command = SpeechCommand(**command_data)
|
|
|
|
# Act
|
|
response = await async_client.post("/command", json=command_data)
|
|
|
|
# Assert
|
|
assert response.status_code == 202
|
|
assert response.json() == {"status": "Command received"}
|
|
|
|
# Verify that the ZMQ socket was used correctly
|
|
mock_context_instance.return_value.socket.assert_called_once_with(1) # zmq.PUB is 1
|
|
mock_pub_socket.connect.assert_called_once()
|
|
mock_pub_socket.send_multipart.assert_awaited_once_with(
|
|
[b"command", speech_command.model_dump_json().encode()]
|
|
)
|
|
|
|
|
|
def test_receive_command_endpoint(client, app, mocker):
|
|
"""
|
|
Test that a POST to /command sends the right multipart message
|
|
and returns a 202 with the expected JSON body.
|
|
"""
|
|
mock_socket = mocker.patch.object()
|
|
|
|
# Prepare test payload that matches SpeechCommand
|
|
payload = {"endpoint": "actuate/speech", "data": "yooo"}
|
|
|
|
# Send POST request
|
|
response = client.post("/command", json=payload)
|
|
|
|
# Check response
|
|
assert response.status_code == 202
|
|
assert response.json() == {"status": "Command received"}
|
|
|
|
# Verify that the socket was called with the correct data
|
|
assert mock_socket.send_multipart.called, "Socket should be used to send data"
|
|
|
|
args, kwargs = mock_socket.send_multipart.call_args
|
|
sent_data = args[0]
|
|
|
|
assert sent_data[0] == b"command"
|
|
# Check JSON encoding roughly matches
|
|
assert isinstance(SpeechCommand.model_validate_json(sent_data[1].decode()), SpeechCommand)
|
|
|
|
|
|
def test_receive_command_invalid_payload(client):
|
|
"""
|
|
Test invalid data handling (schema validation).
|
|
"""
|
|
# Missing required field(s)
|
|
bad_payload = {"invalid": "data"}
|
|
response = client.post("/command", json=bad_payload)
|
|
assert response.status_code == 422 # validation error
|