or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kserve@0.16.x

docs

index.md
tile.json

tessl/pypi-kserve

tessl install tessl/pypi-kserve@0.16.1

KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.

testing-strategies.mddocs/guides/

Testing Strategies

Comprehensive testing strategies for KServe model servers including unit testing, integration testing, mocking, and continuous integration.

Unit Testing

Testing Model Logic

import pytest
from kserve import Model
from kserve.errors import InvalidInput, ModelNotReady
import numpy as np

class TestableModel(Model):
    def load(self):
        # Use mock model for testing
        self.model = MockModel()
        self.ready = True
    
    def predict(self, payload, headers=None):
        if not self.ready:
            raise ModelNotReady(self.name)
        
        instances = payload.get("instances")
        if instances is None:
            raise InvalidInput("Missing 'instances' in payload")
        
        predictions = self.model.predict(instances)
        return {"predictions": predictions.tolist()}

# Test cases
def test_model_load():
    """Test model loading"""
    model = TestableModel("test-model")
    model.load()
    
    assert model.ready is True
    assert model.model is not None

def test_predict_success():
    """Test successful prediction"""
    model = TestableModel("test-model")
    model.load()
    
    result = model.predict({"instances": [[1, 2, 3, 4]]})
    
    assert "predictions" in result
    assert isinstance(result["predictions"], list)
    assert len(result["predictions"]) == 1

def test_predict_invalid_input():
    """Test prediction with invalid input"""
    model = TestableModel("test-model")
    model.load()
    
    with pytest.raises(InvalidInput):
        model.predict({"invalid_key": [[1, 2, 3]]})

def test_predict_not_ready():
    """Test prediction when model not ready"""
    model = TestableModel("test-model")
    # Don't call load()
    
    with pytest.raises(ModelNotReady):
        model.predict({"instances": [[1, 2, 3, 4]]})

def test_predict_empty_instances():
    """Test prediction with empty instances"""
    model = TestableModel("test-model")
    model.load()
    
    with pytest.raises(InvalidInput):
        model.predict({"instances": []})

Testing Preprocessing

import pytest
import numpy as np
from kserve import Model

class PreprocessingModel(Model):
    def preprocess(self, body, headers=None):
        instances = np.array(body["instances"])
        # Normalize
        normalized = (instances - self.mean) / self.std
        return {"instances": normalized.tolist()}

def test_preprocess():
    """Test preprocessing logic"""
    model = PreprocessingModel("test-model")
    model.mean = np.array([5.0, 3.0, 3.5, 1.0])
    model.std = np.array([1.0, 0.5, 1.5, 0.5])
    
    input_data = {"instances": [[5.0, 3.0, 3.5, 1.0]]}
    result = model.preprocess(input_data)
    
    # Should be normalized to zeros
    assert np.allclose(result["instances"], [[0, 0, 0, 0]])

def test_preprocess_batch():
    """Test preprocessing with batch"""
    model = PreprocessingModel("test-model")
    model.mean = np.array([0.0])
    model.std = np.array([1.0])
    
    input_data = {"instances": [[1.0], [2.0], [3.0]]}
    result = model.preprocess(input_data)
    
    assert len(result["instances"]) == 3

Testing Postprocessing

import pytest
from kserve import Model

class PostprocessingModel(Model):
    def postprocess(self, response, headers=None):
        return {
            **response,
            "model_name": self.name,
            "model_version": "1.0.0"
        }

def test_postprocess():
    """Test postprocessing adds metadata"""
    model = PostprocessingModel("test-model")
    
    input_response = {"predictions": [[0.8, 0.2]]}
    result = model.postprocess(input_response)
    
    assert "predictions" in result
    assert result["model_name"] == "test-model"
    assert result["model_version"] == "1.0.0"

Integration Testing

Testing Model Server

import pytest
import asyncio
from kserve import ModelServer, InferenceRESTClient
from multiprocessing import Process
import time

def start_server(model):
    """Start model server in subprocess"""
    ModelServer().start([model])

@pytest.fixture
def model_server():
    """Fixture to start and stop model server"""
    model = TestableModel("test-model")
    model.load()
    
    # Start server in background process
    process = Process(target=start_server, args=(model,))
    process.start()
    
    # Wait for server to be ready
    time.sleep(2)
    
    yield "http://localhost:8080"
    
    # Cleanup
    process.terminate()
    process.join()

@pytest.mark.asyncio
async def test_server_health(model_server):
    """Test server health endpoints"""
    client = InferenceRESTClient()
    
    is_live = await client.is_server_live(base_url=model_server)
    assert is_live is True
    
    is_ready = await client.is_server_ready(base_url=model_server)
    assert is_ready is True
    
    await client.close()

@pytest.mark.asyncio
async def test_server_inference(model_server):
    """Test end-to-end inference"""
    client = InferenceRESTClient()
    
    response = await client.infer(
        base_url=model_server,
        model_name="test-model",
        data={
            "inputs": [{
                "name": "input-0",
                "shape": [1, 4],
                "datatype": "FP32",
                "data": [[1.0, 2.0, 3.0, 4.0]]
            }]
        }
    )
    
    assert "outputs" in response or "predictions" in response
    await client.close()

Testing gRPC Client

import pytest
import asyncio
from kserve import InferenceGRPCClient, InferInput, InferRequest
import numpy as np

@pytest.mark.asyncio
async def test_grpc_inference():
    """Test gRPC inference"""
    client = InferenceGRPCClient(url="localhost:8081")
    
    # Prepare input
    data = np.array([[1, 2, 3, 4]], dtype=np.float32)
    input_tensor = InferInput(
        name="input-0",
        shape=list(data.shape),
        datatype="FP32"
    )
    input_tensor.set_data_from_numpy(data)
    
    # Create request
    request = InferRequest(
        model_name="test-model",
        infer_inputs=[input_tensor]
    )
    
    # Make inference
    response = await client.infer(request)
    
    assert len(response.outputs) > 0
    assert response.model_name == "test-model"
    
    await client.close()

Testing Kubernetes Operations

import pytest
from kserve import KServeClient, V1beta1InferenceService
from kubernetes.client.rest import ApiException

def test_create_inferenceservice():
    """Test creating InferenceService"""
    client = KServeClient()
    
    isvc = V1beta1InferenceService(
        api_version="serving.kserve.io/v1beta1",
        kind="InferenceService",
        metadata={"name": "test-isvc", "namespace": "default"},
        spec={...}
    )
    
    try:
        result = client.create(isvc, namespace="default")
        assert result.metadata.name == "test-isvc"
    finally:
        # Cleanup
        client.delete("test-isvc", namespace="default")

def test_get_inferenceservice():
    """Test getting InferenceService"""
    client = KServeClient()
    
    isvc = client.get("sklearn-iris", namespace="default")
    assert isvc.metadata.name == "sklearn-iris"
    assert isvc.spec.predictor is not None

def test_inferenceservice_not_found():
    """Test getting non-existent InferenceService"""
    client = KServeClient()
    
    with pytest.raises(ApiException) as exc_info:
        client.get("non-existent", namespace="default")
    
    assert exc_info.value.status == 404

Mocking Strategies

Mocking Model Inference

from unittest.mock import Mock, patch
import pytest

def test_model_with_mock():
    """Test model using mock"""
    model = TestableModel("test-model")
    
    # Mock the underlying model
    model.model = Mock()
    model.model.predict.return_value = np.array([[0.8, 0.2]])
    model.ready = True
    
    # Test prediction
    result = model.predict({"instances": [[1, 2, 3, 4]]})
    
    # Verify mock was called
    model.model.predict.assert_called_once()
    assert result["predictions"] == [[0.8, 0.2]]

@patch('joblib.load')
def test_model_load_with_patch(mock_load):
    """Test model loading with patched joblib"""
    mock_model = Mock()
    mock_load.return_value = mock_model
    
    model = TestableModel("test-model")
    model.load()
    
    assert model.model == mock_model
    assert model.ready is True
    mock_load.assert_called_once()

Mocking HTTP Clients

import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
@patch('httpx.AsyncClient')
async def test_rest_client_with_mock(mock_client):
    """Test REST client with mocked HTTP"""
    # Setup mock
    mock_response = AsyncMock()
    mock_response.json.return_value = {"predictions": [[0.8, 0.2]]}
    mock_response.status_code = 200
    
    mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
    
    # Test client
    from kserve import InferenceRESTClient
    client = InferenceRESTClient()
    
    response = await client.infer(
        base_url="http://localhost:8080",
        model_name="test-model",
        data={"instances": [[1, 2, 3, 4]]}
    )
    
    assert "predictions" in response

Test Fixtures

Pytest Fixtures

import pytest
from kserve import Model, ModelServer
import numpy as np

@pytest.fixture
def mock_model():
    """Fixture for mock model"""
    model = Mock()
    model.predict.return_value = np.array([[0.8, 0.2]])
    return model

@pytest.fixture
def test_model(mock_model):
    """Fixture for test model instance"""
    model = TestableModel("test-model")
    model.model = mock_model
    model.ready = True
    return model

@pytest.fixture
def sample_payload():
    """Fixture for sample input payload"""
    return {"instances": [[5.1, 3.5, 1.4, 0.2]]}

def test_with_fixtures(test_model, sample_payload):
    """Test using fixtures"""
    result = test_model.predict(sample_payload)
    assert "predictions" in result

Continuous Integration

GitHub Actions Example

name: Test KServe Model

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    
    steps:
      - uses: actions/checkout@v2
      
      - name: Set up Python
        uses: actions/setup-python@v2
        with:
          python-version: '3.10'
      
      - name: Install dependencies
        run: |
          pip install kserve pytest pytest-asyncio
          pip install -r requirements.txt
      
      - name: Run unit tests
        run: pytest tests/unit -v
      
      - name: Run integration tests
        run: pytest tests/integration -v
      
      - name: Generate coverage report
        run: |
          pip install pytest-cov
          pytest --cov=. --cov-report=xml
      
      - name: Upload coverage
        uses: codecov/codecov-action@v2

Performance Testing

Load Testing

import asyncio
import time
from kserve import InferenceRESTClient

async def benchmark_inference(num_requests: int = 1000):
    """Benchmark inference performance"""
    client = InferenceRESTClient()
    
    start_time = time.time()
    tasks = []
    
    for i in range(num_requests):
        task = client.infer(
            base_url="http://localhost:8080",
            model_name="test-model",
            data={"instances": [[1, 2, 3, 4]]}
        )
        tasks.append(task)
    
    # Execute all requests
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    elapsed = time.time() - start_time
    successful = sum(1 for r in results if not isinstance(r, Exception))
    failed = len(results) - successful
    
    print(f"Total requests: {num_requests}")
    print(f"Successful: {successful}")
    print(f"Failed: {failed}")
    print(f"Total time: {elapsed:.2f}s")
    print(f"Requests/sec: {num_requests/elapsed:.2f}")
    print(f"Avg latency: {elapsed/num_requests*1000:.2f}ms")
    
    await client.close()

asyncio.run(benchmark_inference())

Stress Testing

import asyncio
from kserve import InferenceRESTClient

async def stress_test(duration_seconds: int = 60, concurrent_requests: int = 100):
    """Stress test with concurrent requests"""
    client = InferenceRESTClient()
    
    async def make_request():
        try:
            await client.infer(
                base_url="http://localhost:8080",
                model_name="test-model",
                data={"instances": [[1, 2, 3, 4]]}
            )
            return "success"
        except Exception as e:
            return f"error: {e}"
    
    start_time = time.time()
    results = {"success": 0, "error": 0}
    
    while time.time() - start_time < duration_seconds:
        # Launch concurrent requests
        tasks = [make_request() for _ in range(concurrent_requests)]
        batch_results = await asyncio.gather(*tasks)
        
        # Count results
        for r in batch_results:
            if r == "success":
                results["success"] += 1
            else:
                results["error"] += 1
    
    print(f"Stress test completed:")
    print(f"  Successful: {results['success']}")
    print(f"  Failed: {results['error']}")
    print(f"  Success rate: {results['success']/(results['success']+results['error'])*100:.2f}%")
    
    await client.close()

asyncio.run(stress_test())

Test Organization

Directory Structure

tests/
├── unit/
│   ├── test_model.py
│   ├── test_preprocessing.py
│   ├── test_postprocessing.py
│   └── test_validation.py
├── integration/
│   ├── test_server.py
│   ├── test_clients.py
│   └── test_kubernetes.py
├── performance/
│   ├── test_load.py
│   └── test_stress.py
├── fixtures/
│   ├── models.py
│   └── data.py
└── conftest.py

conftest.py Example

import pytest
from kserve import Model
import numpy as np

@pytest.fixture(scope="session")
def test_data():
    """Generate test data"""
    return {
        "instances": np.random.rand(10, 4).tolist()
    }

@pytest.fixture
def mock_model():
    """Mock model for testing"""
    from unittest.mock import Mock
    model = Mock()
    model.predict.return_value = np.array([[0.8, 0.15, 0.05]])
    return model

@pytest.fixture
async def rest_client():
    """REST client fixture with cleanup"""
    from kserve import InferenceRESTClient
    client = InferenceRESTClient()
    yield client
    await client.close()

@pytest.fixture
async def grpc_client():
    """gRPC client fixture with cleanup"""
    from kserve import InferenceGRPCClient
    client = InferenceGRPCClient(url="localhost:8081")
    yield client
    await client.close()

Testing Async Code

Async Test Examples

import pytest
import asyncio
from kserve import Model

class AsyncModel(Model):
    async def load(self):
        """Async model loading"""
        await asyncio.sleep(0.1)  # Simulate async operation
        self.model = MockModel()
        self.ready = True
    
    async def predict(self, payload, headers=None):
        """Async prediction"""
        instances = payload["instances"]
        result = await self.async_predict(instances)
        return {"predictions": result}

@pytest.mark.asyncio
async def test_async_load():
    """Test async model loading"""
    model = AsyncModel("async-model")
    await model.load()
    
    assert model.ready is True

@pytest.mark.asyncio
async def test_async_predict():
    """Test async prediction"""
    model = AsyncModel("async-model")
    await model.load()
    
    result = await model.predict({"instances": [[1, 2, 3, 4]]})
    assert "predictions" in result

@pytest.mark.asyncio
async def test_concurrent_predictions():
    """Test concurrent async predictions"""
    model = AsyncModel("async-model")
    await model.load()
    
    # Make 10 concurrent predictions
    tasks = [
        model.predict({"instances": [[i, i+1, i+2, i+3]]})
        for i in range(10)
    ]
    
    results = await asyncio.gather(*tasks)
    assert len(results) == 10

Testing Error Scenarios

Error Handling Tests

import pytest
from kserve import Model
from kserve.errors import InvalidInput, InferenceError

def test_invalid_input_type():
    """Test error on invalid input type"""
    model = TestableModel("test-model")
    model.load()
    
    with pytest.raises(InvalidInput) as exc_info:
        model.predict({"instances": "not a list"})
    
    assert "must be a list" in str(exc_info.value)

def test_inference_error():
    """Test inference error handling"""
    model = TestableModel("test-model")
    model.load()
    
    # Mock model to raise exception
    model.model.predict.side_effect = ValueError("Invalid value")
    
    with pytest.raises(InferenceError) as exc_info:
        model.predict({"instances": [[1, 2, 3, 4]]})
    
    assert "Prediction failed" in str(exc_info.value)

def test_batch_size_limit():
    """Test batch size validation"""
    model = TestableModel("test-model")
    model.load()
    
    # Create oversized batch
    large_batch = {"instances": [[1, 2, 3, 4]] * 100}
    
    with pytest.raises(InvalidInput) as exc_info:
        model.predict(large_batch)
    
    assert "exceeds maximum" in str(exc_info.value)

Test Coverage

Measuring Coverage

# Install coverage tools
pip install pytest-cov

# Run tests with coverage
pytest --cov=. --cov-report=html --cov-report=term

# View HTML report
open htmlcov/index.html

Coverage Configuration

# .coveragerc
[run]
source = .
omit =
    tests/*
    setup.py
    */__pycache__/*

[report]
exclude_lines =
    pragma: no cover
    def __repr__
    raise AssertionError
    raise NotImplementedError
    if __name__ == .__main__.:

Best Practices

1. Test Isolation

import pytest

@pytest.fixture(autouse=True)
def reset_state():
    """Reset global state between tests"""
    # Setup
    yield
    # Teardown
    # Reset any global state

2. Parameterized Tests

import pytest

@pytest.mark.parametrize("input_data,expected", [
    ([[1, 2, 3, 4]], [[0.8, 0.15, 0.05]]),
    ([[5, 6, 7, 8]], [[0.1, 0.2, 0.7]]),
    ([[2, 3, 4, 5]], [[0.5, 0.3, 0.2]]),
])
def test_predictions_parametrized(input_data, expected):
    """Test with multiple input/output pairs"""
    model = TestableModel("test-model")
    model.load()
    
    result = model.predict({"instances": input_data})
    assert result["predictions"] == expected

3. Test Data Fixtures

import pytest
import numpy as np

@pytest.fixture
def iris_data():
    """Iris dataset for testing"""
    return {
        "instances": [
            [5.1, 3.5, 1.4, 0.2],
            [4.9, 3.0, 1.4, 0.2],
            [6.2, 2.9, 4.3, 1.3]
        ],
        "expected": [0, 0, 1]
    }

def test_with_iris_data(iris_data):
    """Test using iris data fixture"""
    model = TestableModel("test-model")
    model.load()
    
    result = model.predict({"instances": iris_data["instances"]})
    # Verify predictions

4. Async Test Utilities

import pytest
import asyncio

@pytest.fixture
def event_loop():
    """Create event loop for async tests"""
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

@pytest.mark.asyncio
async def test_with_timeout():
    """Test with timeout"""
    async with asyncio.timeout(5):  # 5 second timeout
        result = await long_running_operation()
        assert result is not None

Next Steps

  • Performance Optimization - Optimize for production
  • Edge Cases - Advanced error handling
  • Troubleshooting - Common issues