tessl install tessl/pypi-kserve@0.16.1KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.
Comprehensive testing strategies for KServe model servers including unit testing, integration testing, mocking, and continuous integration.
import pytest
from kserve import Model
from kserve.errors import InvalidInput, ModelNotReady
import numpy as np
class TestableModel(Model):
def load(self):
# Use mock model for testing
self.model = MockModel()
self.ready = True
def predict(self, payload, headers=None):
if not self.ready:
raise ModelNotReady(self.name)
instances = payload.get("instances")
if instances is None:
raise InvalidInput("Missing 'instances' in payload")
predictions = self.model.predict(instances)
return {"predictions": predictions.tolist()}
# Test cases
def test_model_load():
"""Test model loading"""
model = TestableModel("test-model")
model.load()
assert model.ready is True
assert model.model is not None
def test_predict_success():
"""Test successful prediction"""
model = TestableModel("test-model")
model.load()
result = model.predict({"instances": [[1, 2, 3, 4]]})
assert "predictions" in result
assert isinstance(result["predictions"], list)
assert len(result["predictions"]) == 1
def test_predict_invalid_input():
"""Test prediction with invalid input"""
model = TestableModel("test-model")
model.load()
with pytest.raises(InvalidInput):
model.predict({"invalid_key": [[1, 2, 3]]})
def test_predict_not_ready():
"""Test prediction when model not ready"""
model = TestableModel("test-model")
# Don't call load()
with pytest.raises(ModelNotReady):
model.predict({"instances": [[1, 2, 3, 4]]})
def test_predict_empty_instances():
"""Test prediction with empty instances"""
model = TestableModel("test-model")
model.load()
with pytest.raises(InvalidInput):
model.predict({"instances": []})import pytest
import numpy as np
from kserve import Model
class PreprocessingModel(Model):
def preprocess(self, body, headers=None):
instances = np.array(body["instances"])
# Normalize
normalized = (instances - self.mean) / self.std
return {"instances": normalized.tolist()}
def test_preprocess():
"""Test preprocessing logic"""
model = PreprocessingModel("test-model")
model.mean = np.array([5.0, 3.0, 3.5, 1.0])
model.std = np.array([1.0, 0.5, 1.5, 0.5])
input_data = {"instances": [[5.0, 3.0, 3.5, 1.0]]}
result = model.preprocess(input_data)
# Should be normalized to zeros
assert np.allclose(result["instances"], [[0, 0, 0, 0]])
def test_preprocess_batch():
"""Test preprocessing with batch"""
model = PreprocessingModel("test-model")
model.mean = np.array([0.0])
model.std = np.array([1.0])
input_data = {"instances": [[1.0], [2.0], [3.0]]}
result = model.preprocess(input_data)
assert len(result["instances"]) == 3import pytest
from kserve import Model
class PostprocessingModel(Model):
def postprocess(self, response, headers=None):
return {
**response,
"model_name": self.name,
"model_version": "1.0.0"
}
def test_postprocess():
"""Test postprocessing adds metadata"""
model = PostprocessingModel("test-model")
input_response = {"predictions": [[0.8, 0.2]]}
result = model.postprocess(input_response)
assert "predictions" in result
assert result["model_name"] == "test-model"
assert result["model_version"] == "1.0.0"import pytest
import asyncio
from kserve import ModelServer, InferenceRESTClient
from multiprocessing import Process
import time
def start_server(model):
"""Start model server in subprocess"""
ModelServer().start([model])
@pytest.fixture
def model_server():
"""Fixture to start and stop model server"""
model = TestableModel("test-model")
model.load()
# Start server in background process
process = Process(target=start_server, args=(model,))
process.start()
# Wait for server to be ready
time.sleep(2)
yield "http://localhost:8080"
# Cleanup
process.terminate()
process.join()
@pytest.mark.asyncio
async def test_server_health(model_server):
"""Test server health endpoints"""
client = InferenceRESTClient()
is_live = await client.is_server_live(base_url=model_server)
assert is_live is True
is_ready = await client.is_server_ready(base_url=model_server)
assert is_ready is True
await client.close()
@pytest.mark.asyncio
async def test_server_inference(model_server):
"""Test end-to-end inference"""
client = InferenceRESTClient()
response = await client.infer(
base_url=model_server,
model_name="test-model",
data={
"inputs": [{
"name": "input-0",
"shape": [1, 4],
"datatype": "FP32",
"data": [[1.0, 2.0, 3.0, 4.0]]
}]
}
)
assert "outputs" in response or "predictions" in response
await client.close()import pytest
import asyncio
from kserve import InferenceGRPCClient, InferInput, InferRequest
import numpy as np
@pytest.mark.asyncio
async def test_grpc_inference():
"""Test gRPC inference"""
client = InferenceGRPCClient(url="localhost:8081")
# Prepare input
data = np.array([[1, 2, 3, 4]], dtype=np.float32)
input_tensor = InferInput(
name="input-0",
shape=list(data.shape),
datatype="FP32"
)
input_tensor.set_data_from_numpy(data)
# Create request
request = InferRequest(
model_name="test-model",
infer_inputs=[input_tensor]
)
# Make inference
response = await client.infer(request)
assert len(response.outputs) > 0
assert response.model_name == "test-model"
await client.close()import pytest
from kserve import KServeClient, V1beta1InferenceService
from kubernetes.client.rest import ApiException
def test_create_inferenceservice():
"""Test creating InferenceService"""
client = KServeClient()
isvc = V1beta1InferenceService(
api_version="serving.kserve.io/v1beta1",
kind="InferenceService",
metadata={"name": "test-isvc", "namespace": "default"},
spec={...}
)
try:
result = client.create(isvc, namespace="default")
assert result.metadata.name == "test-isvc"
finally:
# Cleanup
client.delete("test-isvc", namespace="default")
def test_get_inferenceservice():
"""Test getting InferenceService"""
client = KServeClient()
isvc = client.get("sklearn-iris", namespace="default")
assert isvc.metadata.name == "sklearn-iris"
assert isvc.spec.predictor is not None
def test_inferenceservice_not_found():
"""Test getting non-existent InferenceService"""
client = KServeClient()
with pytest.raises(ApiException) as exc_info:
client.get("non-existent", namespace="default")
assert exc_info.value.status == 404from unittest.mock import Mock, patch
import pytest
def test_model_with_mock():
"""Test model using mock"""
model = TestableModel("test-model")
# Mock the underlying model
model.model = Mock()
model.model.predict.return_value = np.array([[0.8, 0.2]])
model.ready = True
# Test prediction
result = model.predict({"instances": [[1, 2, 3, 4]]})
# Verify mock was called
model.model.predict.assert_called_once()
assert result["predictions"] == [[0.8, 0.2]]
@patch('joblib.load')
def test_model_load_with_patch(mock_load):
"""Test model loading with patched joblib"""
mock_model = Mock()
mock_load.return_value = mock_model
model = TestableModel("test-model")
model.load()
assert model.model == mock_model
assert model.ready is True
mock_load.assert_called_once()import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
@patch('httpx.AsyncClient')
async def test_rest_client_with_mock(mock_client):
"""Test REST client with mocked HTTP"""
# Setup mock
mock_response = AsyncMock()
mock_response.json.return_value = {"predictions": [[0.8, 0.2]]}
mock_response.status_code = 200
mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
# Test client
from kserve import InferenceRESTClient
client = InferenceRESTClient()
response = await client.infer(
base_url="http://localhost:8080",
model_name="test-model",
data={"instances": [[1, 2, 3, 4]]}
)
assert "predictions" in responseimport pytest
from kserve import Model, ModelServer
import numpy as np
@pytest.fixture
def mock_model():
"""Fixture for mock model"""
model = Mock()
model.predict.return_value = np.array([[0.8, 0.2]])
return model
@pytest.fixture
def test_model(mock_model):
"""Fixture for test model instance"""
model = TestableModel("test-model")
model.model = mock_model
model.ready = True
return model
@pytest.fixture
def sample_payload():
"""Fixture for sample input payload"""
return {"instances": [[5.1, 3.5, 1.4, 0.2]]}
def test_with_fixtures(test_model, sample_payload):
"""Test using fixtures"""
result = test_model.predict(sample_payload)
assert "predictions" in resultname: Test KServe Model
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install kserve pytest pytest-asyncio
pip install -r requirements.txt
- name: Run unit tests
run: pytest tests/unit -v
- name: Run integration tests
run: pytest tests/integration -v
- name: Generate coverage report
run: |
pip install pytest-cov
pytest --cov=. --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v2import asyncio
import time
from kserve import InferenceRESTClient
async def benchmark_inference(num_requests: int = 1000):
"""Benchmark inference performance"""
client = InferenceRESTClient()
start_time = time.time()
tasks = []
for i in range(num_requests):
task = client.infer(
base_url="http://localhost:8080",
model_name="test-model",
data={"instances": [[1, 2, 3, 4]]}
)
tasks.append(task)
# Execute all requests
results = await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.time() - start_time
successful = sum(1 for r in results if not isinstance(r, Exception))
failed = len(results) - successful
print(f"Total requests: {num_requests}")
print(f"Successful: {successful}")
print(f"Failed: {failed}")
print(f"Total time: {elapsed:.2f}s")
print(f"Requests/sec: {num_requests/elapsed:.2f}")
print(f"Avg latency: {elapsed/num_requests*1000:.2f}ms")
await client.close()
asyncio.run(benchmark_inference())import asyncio
from kserve import InferenceRESTClient
async def stress_test(duration_seconds: int = 60, concurrent_requests: int = 100):
"""Stress test with concurrent requests"""
client = InferenceRESTClient()
async def make_request():
try:
await client.infer(
base_url="http://localhost:8080",
model_name="test-model",
data={"instances": [[1, 2, 3, 4]]}
)
return "success"
except Exception as e:
return f"error: {e}"
start_time = time.time()
results = {"success": 0, "error": 0}
while time.time() - start_time < duration_seconds:
# Launch concurrent requests
tasks = [make_request() for _ in range(concurrent_requests)]
batch_results = await asyncio.gather(*tasks)
# Count results
for r in batch_results:
if r == "success":
results["success"] += 1
else:
results["error"] += 1
print(f"Stress test completed:")
print(f" Successful: {results['success']}")
print(f" Failed: {results['error']}")
print(f" Success rate: {results['success']/(results['success']+results['error'])*100:.2f}%")
await client.close()
asyncio.run(stress_test())tests/
├── unit/
│ ├── test_model.py
│ ├── test_preprocessing.py
│ ├── test_postprocessing.py
│ └── test_validation.py
├── integration/
│ ├── test_server.py
│ ├── test_clients.py
│ └── test_kubernetes.py
├── performance/
│ ├── test_load.py
│ └── test_stress.py
├── fixtures/
│ ├── models.py
│ └── data.py
└── conftest.pyimport pytest
from kserve import Model
import numpy as np
@pytest.fixture(scope="session")
def test_data():
"""Generate test data"""
return {
"instances": np.random.rand(10, 4).tolist()
}
@pytest.fixture
def mock_model():
"""Mock model for testing"""
from unittest.mock import Mock
model = Mock()
model.predict.return_value = np.array([[0.8, 0.15, 0.05]])
return model
@pytest.fixture
async def rest_client():
"""REST client fixture with cleanup"""
from kserve import InferenceRESTClient
client = InferenceRESTClient()
yield client
await client.close()
@pytest.fixture
async def grpc_client():
"""gRPC client fixture with cleanup"""
from kserve import InferenceGRPCClient
client = InferenceGRPCClient(url="localhost:8081")
yield client
await client.close()import pytest
import asyncio
from kserve import Model
class AsyncModel(Model):
async def load(self):
"""Async model loading"""
await asyncio.sleep(0.1) # Simulate async operation
self.model = MockModel()
self.ready = True
async def predict(self, payload, headers=None):
"""Async prediction"""
instances = payload["instances"]
result = await self.async_predict(instances)
return {"predictions": result}
@pytest.mark.asyncio
async def test_async_load():
"""Test async model loading"""
model = AsyncModel("async-model")
await model.load()
assert model.ready is True
@pytest.mark.asyncio
async def test_async_predict():
"""Test async prediction"""
model = AsyncModel("async-model")
await model.load()
result = await model.predict({"instances": [[1, 2, 3, 4]]})
assert "predictions" in result
@pytest.mark.asyncio
async def test_concurrent_predictions():
"""Test concurrent async predictions"""
model = AsyncModel("async-model")
await model.load()
# Make 10 concurrent predictions
tasks = [
model.predict({"instances": [[i, i+1, i+2, i+3]]})
for i in range(10)
]
results = await asyncio.gather(*tasks)
assert len(results) == 10import pytest
from kserve import Model
from kserve.errors import InvalidInput, InferenceError
def test_invalid_input_type():
"""Test error on invalid input type"""
model = TestableModel("test-model")
model.load()
with pytest.raises(InvalidInput) as exc_info:
model.predict({"instances": "not a list"})
assert "must be a list" in str(exc_info.value)
def test_inference_error():
"""Test inference error handling"""
model = TestableModel("test-model")
model.load()
# Mock model to raise exception
model.model.predict.side_effect = ValueError("Invalid value")
with pytest.raises(InferenceError) as exc_info:
model.predict({"instances": [[1, 2, 3, 4]]})
assert "Prediction failed" in str(exc_info.value)
def test_batch_size_limit():
"""Test batch size validation"""
model = TestableModel("test-model")
model.load()
# Create oversized batch
large_batch = {"instances": [[1, 2, 3, 4]] * 100}
with pytest.raises(InvalidInput) as exc_info:
model.predict(large_batch)
assert "exceeds maximum" in str(exc_info.value)# Install coverage tools
pip install pytest-cov
# Run tests with coverage
pytest --cov=. --cov-report=html --cov-report=term
# View HTML report
open htmlcov/index.html# .coveragerc
[run]
source = .
omit =
tests/*
setup.py
*/__pycache__/*
[report]
exclude_lines =
pragma: no cover
def __repr__
raise AssertionError
raise NotImplementedError
if __name__ == .__main__.:import pytest
@pytest.fixture(autouse=True)
def reset_state():
"""Reset global state between tests"""
# Setup
yield
# Teardown
# Reset any global stateimport pytest
@pytest.mark.parametrize("input_data,expected", [
([[1, 2, 3, 4]], [[0.8, 0.15, 0.05]]),
([[5, 6, 7, 8]], [[0.1, 0.2, 0.7]]),
([[2, 3, 4, 5]], [[0.5, 0.3, 0.2]]),
])
def test_predictions_parametrized(input_data, expected):
"""Test with multiple input/output pairs"""
model = TestableModel("test-model")
model.load()
result = model.predict({"instances": input_data})
assert result["predictions"] == expectedimport pytest
import numpy as np
@pytest.fixture
def iris_data():
"""Iris dataset for testing"""
return {
"instances": [
[5.1, 3.5, 1.4, 0.2],
[4.9, 3.0, 1.4, 0.2],
[6.2, 2.9, 4.3, 1.3]
],
"expected": [0, 0, 1]
}
def test_with_iris_data(iris_data):
"""Test using iris data fixture"""
model = TestableModel("test-model")
model.load()
result = model.predict({"instances": iris_data["instances"]})
# Verify predictionsimport pytest
import asyncio
@pytest.fixture
def event_loop():
"""Create event loop for async tests"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.mark.asyncio
async def test_with_timeout():
"""Test with timeout"""
async with asyncio.timeout(5): # 5 second timeout
result = await long_running_operation()
assert result is not None