tessl install tessl/pypi-kserve@0.16.1KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.
Build custom model servers by subclassing the Model class and implementing inference logic with support for preprocessing, prediction, postprocessing, and explanation.
KServe provides a hierarchy of model classes that enable flexible model serving implementations. The class hierarchy is:
The root abstract base class for all KServe models, providing core lifecycle management, health checks, and engine integration.
class BaseKServeModel:
"""
Abstract base class for all KServe models.
Provides core lifecycle management, health checking, and engine integration.
All custom models inherit from this base class (typically through Model or InferenceModel).
Args:
name (str): Model name identifier
"""
def __init__(self, name: str): ...
async def healthy(self) -> bool:
"""
Check if model is healthy and operational.
Returns:
bool: True if model is healthy, False otherwise
"""
...
def load(self) -> bool:
"""
Load model artifacts and initialize model.
Must be implemented by subclasses.
Returns:
bool: True if load successful, False otherwise
"""
...
def start(self):
"""
Start the model server (synchronous startup).
Used for models that need to run startup tasks.
"""
...
async def start_engine(self):
"""
Start the model engine asynchronously.
Used for async initialization tasks.
"""
...
def stop(self):
"""
Stop the model server gracefully.
"""
...
def stop_engine(self):
"""
Stop the model engine and cleanup resources.
"""
...
@property
def name(self) -> str:
"""Model name identifier."""
...
@property
def ready(self) -> bool:
"""Whether model is ready to serve requests."""
...
@ready.setter
def ready(self, value: bool):
"""Set model ready state."""
...
@property
def engine(self):
"""Model engine instance."""
...Abstract class extending BaseKServeModel with inference-specific methods. Defines the interface for inference operations.
class InferenceModel(BaseKServeModel):
"""
Abstract inference model class.
Extends BaseKServeModel with inference-specific abstract methods.
Subclasses must implement __call__ for handling inference requests.
Args:
name (str): Model name identifier
"""
def __call__(
self,
body: Dict,
headers: Dict = None,
verb: InferenceVerb = InferenceVerb.PREDICT
) -> Union[Dict, InferResponse, AsyncIterator]:
"""
Handle inference request (abstract method).
Args:
body (Dict): Request body
headers (Dict, optional): Request headers
verb (InferenceVerb): Inference verb (PREDICT or EXPLAIN)
Returns:
Union[Dict, InferResponse, AsyncIterator]: Inference result
"""
...
def get_input_types(self) -> List[Dict]:
"""
Get input type definitions for the model.
Returns:
List[Dict]: List of input type specifications
"""
...
def get_output_types(self) -> List[Dict]:
"""
Get output type definitions for the model.
Returns:
List[Dict]: List of output type specifications
"""
...The Model class provides the foundation for building custom inference servers with lifecycle hooks and request processing pipeline.
class Model(InferenceModel):
"""
Base class for custom model implementations.
Args:
name (str): Model name
return_response_headers (bool): Whether to return response headers (default: False)
"""
def __init__(self, name: str, return_response_headers: bool = False): ...
# Properties
@property
def ready(self) -> bool: ...
@property
def name(self) -> str: ...
@property
def predictor_config(self) -> PredictorConfig: ...Load model artifacts and initialize the model. Called once during model server startup.
def load(self) -> None:
"""
Load the model from storage and initialize.
Override this method to implement custom loading logic.
Set self.ready = True when loading is complete.
"""Usage:
from kserve import Model
class MyModel(Model):
def load(self):
# Load model from disk or storage
self.model = joblib.load("model.pkl")
self.ready = TrueTransform and validate input data before prediction. Supports both dictionary and InferRequest formats.
def preprocess(self, body: Dict, headers: Dict = None) -> Union[Dict, InferRequest]:
"""
Preprocess the input request.
Args:
body (dict): Input request body
headers (dict, optional): Request headers
Returns:
dict or InferRequest: Preprocessed data for prediction
"""Usage:
class MyModel(Model):
def preprocess(self, body, headers=None):
# Extract and transform input data
instances = body["instances"]
# Normalize data
normalized = [(x - self.mean) / self.std for x in instances]
return {"instances": normalized}Execute model inference. This is the core method where predictions are made. Supports both sync and async implementations.
def predict(
self,
payload: Union[Dict, InferRequest],
headers: Dict = None,
response_headers: Dict = None
) -> Union[Dict, InferResponse, List[str], Awaitable[Union[Dict, InferResponse, List[str]]]]:
"""
Run model inference.
Args:
payload (dict or InferRequest): Preprocessed input data
headers (dict, optional): Request headers
response_headers (dict, optional): Response headers to populate
Returns:
dict, InferResponse, or List[str]: Prediction results
"""Usage:
class MyModel(Model):
def predict(self, payload, headers=None):
# Extract input
instances = payload["instances"]
# Run inference
predictions = self.model.predict(instances)
# Return results
return {"predictions": predictions.tolist()}Async example:
class AsyncModel(Model):
async def predict(self, payload, headers=None):
instances = payload["instances"]
# Async inference
predictions = await self.async_model.predict(instances)
return {"predictions": predictions}Transform prediction outputs. Use this to format results or add metadata.
def postprocess(
self,
response: Union[Dict, InferResponse],
headers: Dict = None,
response_headers: Dict = None
) -> Union[Dict, InferResponse, Awaitable[Union[Dict, InferResponse]]]:
"""
Postprocess the prediction response.
Args:
response (dict or InferResponse): Prediction results
headers (dict, optional): Request headers
response_headers (dict, optional): Response headers to populate
Returns:
dict or InferResponse: Formatted response
"""Usage:
class MyModel(Model):
def postprocess(self, response, headers=None):
# Add metadata to response
predictions = response["predictions"]
return {
"predictions": predictions,
"model_name": self.name,
"version": "1.0.0"
}Generate explanations for model predictions. Used for model interpretability.
def explain(self, payload: Dict, headers: Dict = None) -> Union[Dict, Awaitable[Dict]]:
"""
Generate explanations for predictions.
Args:
payload (dict): Input data to explain
headers (dict, optional): Request headers
Returns:
dict: Explanation results
"""Usage:
from lime.lime_tabular import LimeTabularExplainer
class ExplainableModel(Model):
def load(self):
self.model = joblib.load("model.pkl")
self.explainer = LimeTabularExplainer(
training_data=self.training_data,
mode='classification'
)
self.ready = True
def explain(self, payload, headers=None):
instance = payload["instances"][0]
explanation = self.explainer.explain_instance(
instance,
self.model.predict_proba
)
return {"explanations": explanation.as_list()}The Model class implements an automatic inference pipeline:
preprocess() transforms input datapredict() executes model inferencepostprocess() formats outputEach stage is optional and can be overridden. By default, data passes through unchanged.
{
"instances": [
[1, 2, 3, 4],
[5, 6, 7, 8]
]
}from kserve import InferRequest, InferInput
import numpy as np
# Create input
input_data = InferInput(
name="input-0",
shape=[2, 4],
datatype="FP32",
data=[[1, 2, 3, 4], [5, 6, 7, 8]]
)
# Create request
request = InferRequest(
model_name="my-model",
infer_inputs=[input_data]
)from kserve import Model, ModelServer
import numpy as np
import joblib
class SKLearnModel(Model):
def __init__(self, name: str):
super().__init__(name)
self.model = None
self.ready = False
def load(self):
"""Load scikit-learn model"""
self.model = joblib.load("/mnt/models/model.pkl")
self.ready = True
def preprocess(self, body, headers=None):
"""Convert input to numpy array"""
instances = body["instances"]
return {"instances": np.array(instances)}
def predict(self, payload, headers=None):
"""Run prediction"""
instances = payload["instances"]
predictions = self.model.predict(instances)
probabilities = self.model.predict_proba(instances)
return {
"predictions": predictions.tolist(),
"probabilities": probabilities.tolist()
}
def postprocess(self, response, headers=None):
"""Add metadata"""
return {
**response,
"model_name": self.name,
"model_version": "1.0.0"
}
if __name__ == "__main__":
model = SKLearnModel("sklearn-model")
model.load()
ModelServer().start([model])All lifecycle methods support async implementations. KServe automatically detects coroutine functions and awaits them:
class AsyncModel(Model):
async def load(self):
"""Async model loading"""
self.model = await load_model_async()
self.ready = True
async def predict(self, payload, headers=None):
"""Async prediction"""
instances = payload["instances"]
predictions = await self.model.predict_async(instances)
return {"predictions": predictions}Model servers can host multiple models:
if __name__ == "__main__":
model1 = SKLearnModel("model-a")
model1.load()
model2 = SKLearnModel("model-b")
model2.load()
ModelServer().start([model1, model2])Access models via:
/v1/models/model-a:predict/v1/models/model-b:predictRaise appropriate exceptions for error conditions:
from kserve.errors import InvalidInput, ModelNotReady, InferenceError
class MyModel(Model):
def predict(self, payload, headers=None):
if not self.ready:
raise ModelNotReady(self.name)
instances = payload.get("instances")
if instances is None:
raise InvalidInput("Missing 'instances' in request")
try:
predictions = self.model.predict(instances)
return {"predictions": predictions.tolist()}
except Exception as e:
raise InferenceError(f"Prediction failed: {e}")Return custom response headers:
class MyModel(Model):
def __init__(self, name: str):
super().__init__(name, return_response_headers=True)
def predict(self, payload, headers=None, response_headers=None):
# Set custom header
if response_headers is not None:
response_headers["X-Model-Version"] = "1.0.0"
predictions = self.model.predict(payload["instances"])
return {"predictions": predictions.tolist()}For advanced use cases, inherit from BaseKServeModel directly:
class BaseKServeModel(ABC):
"""
Abstract base class for all KServe models.
Attributes:
name (str): Model name
ready (bool): Model readiness status
engine (bool): Whether model uses inference engine
"""
name: str
ready: bool
engine: bool
@abstractmethod
def load(self) -> None: ...
def start(self) -> None:
"""Start the model server"""
def stop(self) -> None:
"""Stop the model server"""
def healthy(self) -> bool:
"""Health check"""
return self.readyInferenceModel extends BaseKServeModel with inference capabilities:
class InferenceModel(BaseKServeModel):
"""Abstract inference model with call interface"""
@abstractmethod
def __call__(
self,
body: Union[Dict, InferRequest],
headers: Dict = None,
verb: InferenceVerb = InferenceVerb.PREDICT
) -> Union[Dict, InferResponse, Awaitable]:
"""
Handle inference requests.
Args:
body: Request body
headers: Request headers
verb: Inference verb (PREDICT or EXPLAIN)
Returns:
Response data
"""
def get_input_types(self) -> List[Dict]:
"""Get model input types"""
return []
def get_output_types(self) -> List[Dict]:
"""Get model output types"""
return []Enum for inference operation types:
class InferenceVerb(Enum):
"""Inference operation types"""
PREDICT = 2
EXPLAIN = 1Model class for Ray Serve integration, enabling distributed model serving with Ray.
class RayModel(InferenceModel):
"""
Model implementation for Ray Serve integration.
Wraps a Ray Serve deployment handle to enable distributed inference
with Ray's scalable serving framework.
Args:
name (str): Model name identifier
handle (ray.serve.DeploymentHandle): Ray Serve deployment handle
"""
def __init__(self, name: str, handle): ...
def __call__(
self,
body: Dict,
headers: Dict = None,
verb: InferenceVerb = InferenceVerb.PREDICT
) -> Union[Dict, InferResponse, AsyncIterator]:
"""
Handle inference request via Ray Serve deployment.
Args:
body (Dict): Request body
headers (Dict, optional): Request headers
verb (InferenceVerb): Inference verb (PREDICT or EXPLAIN)
Returns:
Union[Dict, InferResponse, AsyncIterator]: Inference result from Ray
"""
...
def load(self) -> bool:
"""
Load method for Ray model (typically no-op as Ray manages loading).
Returns:
bool: Always returns True
"""
...
async def get_input_types(self) -> List[Dict]:
"""
Get input type definitions from Ray deployment.
Returns:
List[Dict]: List of input type specifications
"""
...
async def get_output_types(self) -> List[Dict]:
"""
Get output type definitions from Ray deployment.
Returns:
List[Dict]: List of output type specifications
"""
...Usage Example:
import ray
from ray import serve
from kserve import ModelServer
from kserve.ray import RayModel
# Initialize Ray
ray.init()
serve.start()
# Define Ray Serve deployment
@serve.deployment
class MyRayModel:
def __init__(self):
self.model = load_model()
async def __call__(self, request):
instances = request["instances"]
predictions = self.model.predict(instances)
return {"predictions": predictions.tolist()}
# Deploy to Ray
deployment = MyRayModel.bind()
handle = serve.run(deployment)
# Create KServe RayModel wrapper
ray_model = RayModel("ray-model", handle)
# Start KServe model server
ModelServer().start([ray_model])Additional methods available on the Model class:
class Model(InferenceModel):
# ... (other methods shown above)
def validate(self, payload: Dict):
"""
Validate input payload before processing.
Override this method to implement custom input validation logic.
Raise InvalidInput exception if validation fails.
Args:
payload (Dict): Input payload to validate
Raises:
InvalidInput: If payload validation fails
"""
...
def get_input_types(self) -> List[Dict]:
"""
Get input type definitions for the model.
Returns metadata about expected input tensors including name, shape, and datatype.
Used for API documentation and client code generation.
Returns:
List[Dict]: List of input type specifications, each containing:
- name (str): Input tensor name
- shape (List[int]): Expected shape
- datatype (str): Data type (e.g., "FP32", "INT64")
Example:
return [{
"name": "input-0",
"shape": [-1, 784],
"datatype": "FP32"
}]
"""
...
def get_output_types(self) -> List[Dict]:
"""
Get output type definitions for the model.
Returns metadata about model output tensors including name, shape, and datatype.
Used for API documentation and client code generation.
Returns:
List[Dict]: List of output type specifications, each containing:
- name (str): Output tensor name
- shape (List[int]): Expected shape
- datatype (str): Data type (e.g., "FP32", "INT64")
Example:
return [{
"name": "output-0",
"shape": [-1, 10],
"datatype": "FP32"
}]
"""
...Usage Example:
from kserve import Model
from kserve.errors import InvalidInput
class ValidatedModel(Model):
def validate(self, payload):
"""Validate input format and content"""
if "instances" not in payload:
raise InvalidInput("Missing 'instances' key in payload")
instances = payload["instances"]
if not isinstance(instances, list):
raise InvalidInput("'instances' must be a list")
if len(instances) == 0:
raise InvalidInput("'instances' cannot be empty")
# Validate each instance
for i, instance in enumerate(instances):
if len(instance) != 4:
raise InvalidInput(f"Instance {i} must have exactly 4 features")
def get_input_types(self):
"""Define expected input types"""
return [{
"name": "input-0",
"shape": [-1, 4],
"datatype": "FP32"
}]
def get_output_types(self):
"""Define output types"""
return [{
"name": "output-0",
"shape": [-1, 3],
"datatype": "FP32"
}]
def predict(self, payload, headers=None):
# validation already performed by framework
instances = payload["instances"]
predictions = self.model.predict(instances)
return {"predictions": predictions.tolist()}