or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kserve@0.16.x

docs

index.md
tile.json

tessl/pypi-kserve

tessl install tessl/pypi-kserve@0.16.1

KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.

design-patterns.mddocs/reference/

Design Patterns

Key design patterns for building robust and efficient KServe model servers.

Async vs Sync Methods

KServe automatically detects whether methods are coroutines and handles them appropriately.

Synchronous Implementation

from kserve import Model

class SyncModel(Model):
    def load(self):
        """Synchronous loading"""
        self.model = joblib.load("/mnt/models/model.pkl")
        self.ready = True
    
    def predict(self, payload, headers=None):
        """Synchronous prediction"""
        instances = payload["instances"]
        predictions = self.model.predict(instances)
        return {"predictions": predictions.tolist()}

Asynchronous Implementation

from kserve import Model
import asyncio

class AsyncModel(Model):
    async def load(self):
        """Asynchronous loading"""
        self.model = await load_model_async()
        self.ready = True
    
    async def predict(self, payload, headers=None):
        """Asynchronous prediction"""
        instances = payload["instances"]
        predictions = await self.model.predict_async(instances)
        return {"predictions": predictions}

Mixed Sync/Async

class MixedModel(Model):
    def load(self):
        """Sync load"""
        self.model = joblib.load("/mnt/models/model.pkl")
        self.ready = True
    
    async def predict(self, payload, headers=None):
        """Async predict for I/O operations"""
        instances = payload["instances"]
        
        # Run CPU-bound work in thread pool
        loop = asyncio.get_event_loop()
        predictions = await loop.run_in_executor(
            None,
            self.model.predict,
            instances
        )
        
        return {"predictions": predictions.tolist()}

Context Managers

Client Context Managers

from kserve import InferenceGRPCClient, InferenceRESTClient

# gRPC client with automatic cleanup
async with InferenceGRPCClient(url="localhost:8081") as client:
    response = await client.infer(model_name="my-model", inputs=[...])
    # Client automatically closed

# REST client with automatic cleanup
async with InferenceRESTClient() as client:
    response = await client.infer(
        base_url="http://localhost:8080",
        model_name="my-model",
        data={...}
    )
    # Client automatically closed

Custom Context Manager for Models

from kserve import Model

class ContextManagedModel(Model):
    def __enter__(self):
        """Setup resources"""
        self.load()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Cleanup resources"""
        self.stop()
        return False
    
    def stop(self):
        """Cleanup"""
        if hasattr(self, 'db_connection'):
            self.db_connection.close()
        if hasattr(self, 'cache_client'):
            self.cache_client.close()

# Usage
with ContextManagedModel("my-model") as model:
    result = model.predict({"instances": [[1, 2, 3, 4]]})

Binary Data Handling

Efficient Binary Encoding

from kserve import InferInput, InferOutput
import numpy as np

# Create large tensor
data = np.random.rand(1000, 1000).astype(np.float32)

# Use binary encoding for efficiency
input_tensor = InferInput(
    name="input-0",
    shape=list(data.shape),
    datatype="FP32"
)
input_tensor.set_data_from_numpy(data, binary_data=True)

# Binary encoding reduces payload size by ~50%

Handling Binary Responses

from kserve import Model, InferResponse, InferOutput
import numpy as np

class BinaryResponseModel(Model):
    def predict(self, payload, headers=None):
        """Return binary-encoded responses"""
        if isinstance(payload, InferRequest):
            # Get input
            input_data = payload.inputs[0].as_numpy()
            
            # Run prediction
            predictions = self.model.predict(input_data)
            
            # Create output with binary encoding
            output = InferOutput(
                name="predictions",
                shape=list(predictions.shape),
                datatype="FP32"
            )
            output.set_data_from_numpy(predictions, binary_data=True)
            
            return InferResponse(
                model_name=self.name,
                infer_outputs=[output],
                use_binary_outputs=True
            )

Factory Pattern

Model Factory

from kserve import Model
from typing import Dict, Type

class ModelFactory:
    """Factory for creating model instances"""
    
    _registry: Dict[str, Type[Model]] = {}
    
    @classmethod
    def register(cls, model_type: str):
        """Decorator to register model classes"""
        def decorator(model_class):
            cls._registry[model_type] = model_class
            return model_class
        return decorator
    
    @classmethod
    def create(cls, model_type: str, name: str, **kwargs) -> Model:
        """Create model instance"""
        if model_type not in cls._registry:
            raise ValueError(f"Unknown model type: {model_type}")
        
        model_class = cls._registry[model_type]
        return model_class(name, **kwargs)

# Register models
@ModelFactory.register("sklearn")
class SKLearnModel(Model):
    def load(self):
        self.model = joblib.load("/mnt/models/sklearn_model.pkl")
        self.ready = True
    
    def predict(self, payload, headers=None):
        return {"predictions": self.model.predict(payload["instances"])}

@ModelFactory.register("pytorch")
class PyTorchModel(Model):
    def load(self):
        self.model = torch.load("/mnt/models/pytorch_model.pth")
        self.ready = True
    
    def predict(self, payload, headers=None):
        tensor = torch.tensor(payload["instances"])
        with torch.no_grad():
            predictions = self.model(tensor)
        return {"predictions": predictions.numpy().tolist()}

# Usage
model = ModelFactory.create("sklearn", "iris-classifier")
model.load()

Strategy Pattern

Prediction Strategy

from abc import ABC, abstractmethod
from kserve import Model

class PredictionStrategy(ABC):
    """Abstract prediction strategy"""
    
    @abstractmethod
    def predict(self, model, instances):
        pass

class StandardPrediction(PredictionStrategy):
    """Standard prediction strategy"""
    def predict(self, model, instances):
        return model.predict(instances)

class EnsemblePrediction(PredictionStrategy):
    """Ensemble prediction strategy"""
    def predict(self, model, instances):
        # Run multiple predictions and average
        predictions = []
        for _ in range(5):
            pred = model.predict(instances)
            predictions.append(pred)
        return np.mean(predictions, axis=0)

class StrategyModel(Model):
    def __init__(self, name: str, strategy: PredictionStrategy):
        super().__init__(name)
        self.strategy = strategy
    
    def predict(self, payload, headers=None):
        instances = payload["instances"]
        predictions = self.strategy.predict(self.model, instances)
        return {"predictions": predictions.tolist()}

Observer Pattern

Model Event Observers

from kserve import Model
from typing import List, Callable

class ObservableModel(Model):
    def __init__(self, name: str):
        super().__init__(name)
        self.observers: List[Callable] = []
    
    def attach(self, observer: Callable):
        """Attach observer"""
        self.observers.append(observer)
    
    def detach(self, observer: Callable):
        """Detach observer"""
        self.observers.remove(observer)
    
    def notify(self, event: str, data: dict):
        """Notify all observers"""
        for observer in self.observers:
            try:
                observer(event, data)
            except Exception as e:
                logger.error(f"Observer failed: {e}")
    
    def load(self):
        self.model = joblib.load("/mnt/models/model.pkl")
        self.ready = True
        self.notify("model_loaded", {"model": self.name})
    
    def predict(self, payload, headers=None):
        self.notify("prediction_started", {"batch_size": len(payload["instances"])})
        
        predictions = self.model.predict(payload["instances"])
        
        self.notify("prediction_completed", {
            "batch_size": len(payload["instances"]),
            "output_size": len(predictions)
        })
        
        return {"predictions": predictions.tolist()}

# Usage
def log_observer(event: str, data: dict):
    logger.info(f"Event: {event}, Data: {data}")

def metrics_observer(event: str, data: dict):
    if event == "prediction_completed":
        PREDICTION_COUNTER.inc()

model = ObservableModel("observable-model")
model.attach(log_observer)
model.attach(metrics_observer)

Decorator Pattern

Adding Functionality with Decorators

from kserve import Model
from functools import wraps
import time

def timing_decorator(func):
    """Decorator to time function execution"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        elapsed = time.time() - start
        logger.info(f"{func.__name__} took {elapsed:.3f}s")
        return result
    return wrapper

def caching_decorator(cache_dict):
    """Decorator to cache results"""
    def decorator(func):
        @wraps(func)
        def wrapper(self, payload, *args, **kwargs):
            cache_key = str(payload)
            if cache_key in cache_dict:
                logger.debug("Cache hit")
                return cache_dict[cache_key]
            
            result = func(self, payload, *args, **kwargs)
            cache_dict[cache_key] = result
            return result
        return wrapper
    return decorator

class DecoratedModel(Model):
    def __init__(self, name: str):
        super().__init__(name)
        self.cache = {}
    
    @timing_decorator
    def load(self):
        self.model = joblib.load("/mnt/models/model.pkl")
        self.ready = True
    
    @timing_decorator
    @caching_decorator(cache_dict={})
    def predict(self, payload, headers=None):
        return {"predictions": self.model.predict(payload["instances"])}

Adapter Pattern

Protocol Adapter

from kserve import Model, InferRequest, InferResponse

class ProtocolAdapter:
    """Adapt between v1 and v2 protocols"""
    
    @staticmethod
    def v1_to_v2(v1_payload: dict) -> InferRequest:
        """Convert v1 payload to v2 InferRequest"""
        instances = v1_payload["instances"]
        
        # Create InferInput
        input_tensor = InferInput(
            name="input-0",
            shape=[len(instances), len(instances[0])],
            datatype="FP32",
            data=instances
        )
        
        return InferRequest(
            model_name="model",
            infer_inputs=[input_tensor]
        )
    
    @staticmethod
    def v2_to_v1(v2_response: InferResponse) -> dict:
        """Convert v2 InferResponse to v1 dict"""
        output = v2_response.outputs[0]
        predictions = output.as_numpy()
        
        return {"predictions": predictions.tolist()}

class AdaptedModel(Model):
    def predict(self, payload, headers=None):
        """Handle both protocols"""
        if isinstance(payload, dict) and "instances" in payload:
            # Convert v1 to v2
            v2_request = ProtocolAdapter.v1_to_v2(payload)
            v2_response = self._predict_v2(v2_request)
            return ProtocolAdapter.v2_to_v1(v2_response)
        else:
            return self._predict_v2(payload)

Singleton Pattern

Model Registry Singleton

from kserve import ModelRepository

class ModelRegistrySingleton:
    """Singleton model registry"""
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
                    cls._instance.repository = ModelRepository()
        return cls._instance
    
    def get_repository(self) -> ModelRepository:
        return self.repository

# Usage
registry = ModelRegistrySingleton()
repository = registry.get_repository()

Next Steps

  • Error Recovery - Resilience patterns
  • Troubleshooting - Common issues
  • Real-World Scenarios - Complete examples