tessl install tessl/pypi-kserve@0.16.1KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.
Key design patterns for building robust and efficient KServe model servers.
KServe automatically detects whether methods are coroutines and handles them appropriately.
from kserve import Model
class SyncModel(Model):
def load(self):
"""Synchronous loading"""
self.model = joblib.load("/mnt/models/model.pkl")
self.ready = True
def predict(self, payload, headers=None):
"""Synchronous prediction"""
instances = payload["instances"]
predictions = self.model.predict(instances)
return {"predictions": predictions.tolist()}from kserve import Model
import asyncio
class AsyncModel(Model):
async def load(self):
"""Asynchronous loading"""
self.model = await load_model_async()
self.ready = True
async def predict(self, payload, headers=None):
"""Asynchronous prediction"""
instances = payload["instances"]
predictions = await self.model.predict_async(instances)
return {"predictions": predictions}class MixedModel(Model):
def load(self):
"""Sync load"""
self.model = joblib.load("/mnt/models/model.pkl")
self.ready = True
async def predict(self, payload, headers=None):
"""Async predict for I/O operations"""
instances = payload["instances"]
# Run CPU-bound work in thread pool
loop = asyncio.get_event_loop()
predictions = await loop.run_in_executor(
None,
self.model.predict,
instances
)
return {"predictions": predictions.tolist()}from kserve import InferenceGRPCClient, InferenceRESTClient
# gRPC client with automatic cleanup
async with InferenceGRPCClient(url="localhost:8081") as client:
response = await client.infer(model_name="my-model", inputs=[...])
# Client automatically closed
# REST client with automatic cleanup
async with InferenceRESTClient() as client:
response = await client.infer(
base_url="http://localhost:8080",
model_name="my-model",
data={...}
)
# Client automatically closedfrom kserve import Model
class ContextManagedModel(Model):
def __enter__(self):
"""Setup resources"""
self.load()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Cleanup resources"""
self.stop()
return False
def stop(self):
"""Cleanup"""
if hasattr(self, 'db_connection'):
self.db_connection.close()
if hasattr(self, 'cache_client'):
self.cache_client.close()
# Usage
with ContextManagedModel("my-model") as model:
result = model.predict({"instances": [[1, 2, 3, 4]]})from kserve import InferInput, InferOutput
import numpy as np
# Create large tensor
data = np.random.rand(1000, 1000).astype(np.float32)
# Use binary encoding for efficiency
input_tensor = InferInput(
name="input-0",
shape=list(data.shape),
datatype="FP32"
)
input_tensor.set_data_from_numpy(data, binary_data=True)
# Binary encoding reduces payload size by ~50%from kserve import Model, InferResponse, InferOutput
import numpy as np
class BinaryResponseModel(Model):
def predict(self, payload, headers=None):
"""Return binary-encoded responses"""
if isinstance(payload, InferRequest):
# Get input
input_data = payload.inputs[0].as_numpy()
# Run prediction
predictions = self.model.predict(input_data)
# Create output with binary encoding
output = InferOutput(
name="predictions",
shape=list(predictions.shape),
datatype="FP32"
)
output.set_data_from_numpy(predictions, binary_data=True)
return InferResponse(
model_name=self.name,
infer_outputs=[output],
use_binary_outputs=True
)from kserve import Model
from typing import Dict, Type
class ModelFactory:
"""Factory for creating model instances"""
_registry: Dict[str, Type[Model]] = {}
@classmethod
def register(cls, model_type: str):
"""Decorator to register model classes"""
def decorator(model_class):
cls._registry[model_type] = model_class
return model_class
return decorator
@classmethod
def create(cls, model_type: str, name: str, **kwargs) -> Model:
"""Create model instance"""
if model_type not in cls._registry:
raise ValueError(f"Unknown model type: {model_type}")
model_class = cls._registry[model_type]
return model_class(name, **kwargs)
# Register models
@ModelFactory.register("sklearn")
class SKLearnModel(Model):
def load(self):
self.model = joblib.load("/mnt/models/sklearn_model.pkl")
self.ready = True
def predict(self, payload, headers=None):
return {"predictions": self.model.predict(payload["instances"])}
@ModelFactory.register("pytorch")
class PyTorchModel(Model):
def load(self):
self.model = torch.load("/mnt/models/pytorch_model.pth")
self.ready = True
def predict(self, payload, headers=None):
tensor = torch.tensor(payload["instances"])
with torch.no_grad():
predictions = self.model(tensor)
return {"predictions": predictions.numpy().tolist()}
# Usage
model = ModelFactory.create("sklearn", "iris-classifier")
model.load()from abc import ABC, abstractmethod
from kserve import Model
class PredictionStrategy(ABC):
"""Abstract prediction strategy"""
@abstractmethod
def predict(self, model, instances):
pass
class StandardPrediction(PredictionStrategy):
"""Standard prediction strategy"""
def predict(self, model, instances):
return model.predict(instances)
class EnsemblePrediction(PredictionStrategy):
"""Ensemble prediction strategy"""
def predict(self, model, instances):
# Run multiple predictions and average
predictions = []
for _ in range(5):
pred = model.predict(instances)
predictions.append(pred)
return np.mean(predictions, axis=0)
class StrategyModel(Model):
def __init__(self, name: str, strategy: PredictionStrategy):
super().__init__(name)
self.strategy = strategy
def predict(self, payload, headers=None):
instances = payload["instances"]
predictions = self.strategy.predict(self.model, instances)
return {"predictions": predictions.tolist()}from kserve import Model
from typing import List, Callable
class ObservableModel(Model):
def __init__(self, name: str):
super().__init__(name)
self.observers: List[Callable] = []
def attach(self, observer: Callable):
"""Attach observer"""
self.observers.append(observer)
def detach(self, observer: Callable):
"""Detach observer"""
self.observers.remove(observer)
def notify(self, event: str, data: dict):
"""Notify all observers"""
for observer in self.observers:
try:
observer(event, data)
except Exception as e:
logger.error(f"Observer failed: {e}")
def load(self):
self.model = joblib.load("/mnt/models/model.pkl")
self.ready = True
self.notify("model_loaded", {"model": self.name})
def predict(self, payload, headers=None):
self.notify("prediction_started", {"batch_size": len(payload["instances"])})
predictions = self.model.predict(payload["instances"])
self.notify("prediction_completed", {
"batch_size": len(payload["instances"]),
"output_size": len(predictions)
})
return {"predictions": predictions.tolist()}
# Usage
def log_observer(event: str, data: dict):
logger.info(f"Event: {event}, Data: {data}")
def metrics_observer(event: str, data: dict):
if event == "prediction_completed":
PREDICTION_COUNTER.inc()
model = ObservableModel("observable-model")
model.attach(log_observer)
model.attach(metrics_observer)from kserve import Model
from functools import wraps
import time
def timing_decorator(func):
"""Decorator to time function execution"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start
logger.info(f"{func.__name__} took {elapsed:.3f}s")
return result
return wrapper
def caching_decorator(cache_dict):
"""Decorator to cache results"""
def decorator(func):
@wraps(func)
def wrapper(self, payload, *args, **kwargs):
cache_key = str(payload)
if cache_key in cache_dict:
logger.debug("Cache hit")
return cache_dict[cache_key]
result = func(self, payload, *args, **kwargs)
cache_dict[cache_key] = result
return result
return wrapper
return decorator
class DecoratedModel(Model):
def __init__(self, name: str):
super().__init__(name)
self.cache = {}
@timing_decorator
def load(self):
self.model = joblib.load("/mnt/models/model.pkl")
self.ready = True
@timing_decorator
@caching_decorator(cache_dict={})
def predict(self, payload, headers=None):
return {"predictions": self.model.predict(payload["instances"])}from kserve import Model, InferRequest, InferResponse
class ProtocolAdapter:
"""Adapt between v1 and v2 protocols"""
@staticmethod
def v1_to_v2(v1_payload: dict) -> InferRequest:
"""Convert v1 payload to v2 InferRequest"""
instances = v1_payload["instances"]
# Create InferInput
input_tensor = InferInput(
name="input-0",
shape=[len(instances), len(instances[0])],
datatype="FP32",
data=instances
)
return InferRequest(
model_name="model",
infer_inputs=[input_tensor]
)
@staticmethod
def v2_to_v1(v2_response: InferResponse) -> dict:
"""Convert v2 InferResponse to v1 dict"""
output = v2_response.outputs[0]
predictions = output.as_numpy()
return {"predictions": predictions.tolist()}
class AdaptedModel(Model):
def predict(self, payload, headers=None):
"""Handle both protocols"""
if isinstance(payload, dict) and "instances" in payload:
# Convert v1 to v2
v2_request = ProtocolAdapter.v1_to_v2(payload)
v2_response = self._predict_v2(v2_request)
return ProtocolAdapter.v2_to_v1(v2_response)
else:
return self._predict_v2(payload)from kserve import ModelRepository
class ModelRegistrySingleton:
"""Singleton model registry"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.repository = ModelRepository()
return cls._instance
def get_repository(self) -> ModelRepository:
return self.repository
# Usage
registry = ModelRegistrySingleton()
repository = registry.get_repository()