or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kserve@0.16.x

docs

index.md
tile.json

tessl/pypi-kserve

tessl install tessl/pypi-kserve@0.16.1

KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.

custom-models.mddocs/reference/

Custom Model Implementation

Build custom model servers by subclassing the Model class and implementing inference logic with support for preprocessing, prediction, postprocessing, and explanation.

Overview

KServe provides a hierarchy of model classes that enable flexible model serving implementations. The class hierarchy is:

  • BaseKServeModel - Abstract base providing core lifecycle and health management
  • InferenceModel - Abstract class adding inference-specific methods
  • Model - Concrete implementation with full inference pipeline support
  • RayModel - Specialized implementation for Ray Serve integration

Capabilities

BaseKServeModel (Abstract Base)

The root abstract base class for all KServe models, providing core lifecycle management, health checks, and engine integration.

class BaseKServeModel:
    """
    Abstract base class for all KServe models.

    Provides core lifecycle management, health checking, and engine integration.
    All custom models inherit from this base class (typically through Model or InferenceModel).

    Args:
        name (str): Model name identifier
    """
    def __init__(self, name: str): ...

    async def healthy(self) -> bool:
        """
        Check if model is healthy and operational.

        Returns:
            bool: True if model is healthy, False otherwise
        """
        ...

    def load(self) -> bool:
        """
        Load model artifacts and initialize model.
        Must be implemented by subclasses.

        Returns:
            bool: True if load successful, False otherwise
        """
        ...

    def start(self):
        """
        Start the model server (synchronous startup).
        Used for models that need to run startup tasks.
        """
        ...

    async def start_engine(self):
        """
        Start the model engine asynchronously.
        Used for async initialization tasks.
        """
        ...

    def stop(self):
        """
        Stop the model server gracefully.
        """
        ...

    def stop_engine(self):
        """
        Stop the model engine and cleanup resources.
        """
        ...

    @property
    def name(self) -> str:
        """Model name identifier."""
        ...

    @property
    def ready(self) -> bool:
        """Whether model is ready to serve requests."""
        ...

    @ready.setter
    def ready(self, value: bool):
        """Set model ready state."""
        ...

    @property
    def engine(self):
        """Model engine instance."""
        ...

InferenceModel (Abstract)

Abstract class extending BaseKServeModel with inference-specific methods. Defines the interface for inference operations.

class InferenceModel(BaseKServeModel):
    """
    Abstract inference model class.

    Extends BaseKServeModel with inference-specific abstract methods.
    Subclasses must implement __call__ for handling inference requests.

    Args:
        name (str): Model name identifier
    """
    def __call__(
        self,
        body: Dict,
        headers: Dict = None,
        verb: InferenceVerb = InferenceVerb.PREDICT
    ) -> Union[Dict, InferResponse, AsyncIterator]:
        """
        Handle inference request (abstract method).

        Args:
            body (Dict): Request body
            headers (Dict, optional): Request headers
            verb (InferenceVerb): Inference verb (PREDICT or EXPLAIN)

        Returns:
            Union[Dict, InferResponse, AsyncIterator]: Inference result
        """
        ...

    def get_input_types(self) -> List[Dict]:
        """
        Get input type definitions for the model.

        Returns:
            List[Dict]: List of input type specifications
        """
        ...

    def get_output_types(self) -> List[Dict]:
        """
        Get output type definitions for the model.

        Returns:
            List[Dict]: List of output type specifications
        """
        ...

Model Base Class

The Model class provides the foundation for building custom inference servers with lifecycle hooks and request processing pipeline.

class Model(InferenceModel):
    """
    Base class for custom model implementations.

    Args:
        name (str): Model name
        return_response_headers (bool): Whether to return response headers (default: False)
    """
    def __init__(self, name: str, return_response_headers: bool = False): ...

    # Properties
    @property
    def ready(self) -> bool: ...

    @property
    def name(self) -> str: ...

    @property
    def predictor_config(self) -> PredictorConfig: ...

Load Method

Load model artifacts and initialize the model. Called once during model server startup.

def load(self) -> None:
    """
    Load the model from storage and initialize.
    Override this method to implement custom loading logic.
    Set self.ready = True when loading is complete.
    """

Usage:

from kserve import Model

class MyModel(Model):
    def load(self):
        # Load model from disk or storage
        self.model = joblib.load("model.pkl")
        self.ready = True

Preprocess Method

Transform and validate input data before prediction. Supports both dictionary and InferRequest formats.

def preprocess(self, body: Dict, headers: Dict = None) -> Union[Dict, InferRequest]:
    """
    Preprocess the input request.

    Args:
        body (dict): Input request body
        headers (dict, optional): Request headers

    Returns:
        dict or InferRequest: Preprocessed data for prediction
    """

Usage:

class MyModel(Model):
    def preprocess(self, body, headers=None):
        # Extract and transform input data
        instances = body["instances"]
        # Normalize data
        normalized = [(x - self.mean) / self.std for x in instances]
        return {"instances": normalized}

Predict Method

Execute model inference. This is the core method where predictions are made. Supports both sync and async implementations.

def predict(
    self,
    payload: Union[Dict, InferRequest],
    headers: Dict = None,
    response_headers: Dict = None
) -> Union[Dict, InferResponse, List[str], Awaitable[Union[Dict, InferResponse, List[str]]]]:
    """
    Run model inference.

    Args:
        payload (dict or InferRequest): Preprocessed input data
        headers (dict, optional): Request headers
        response_headers (dict, optional): Response headers to populate

    Returns:
        dict, InferResponse, or List[str]: Prediction results
    """

Usage:

class MyModel(Model):
    def predict(self, payload, headers=None):
        # Extract input
        instances = payload["instances"]
        # Run inference
        predictions = self.model.predict(instances)
        # Return results
        return {"predictions": predictions.tolist()}

Async example:

class AsyncModel(Model):
    async def predict(self, payload, headers=None):
        instances = payload["instances"]
        # Async inference
        predictions = await self.async_model.predict(instances)
        return {"predictions": predictions}

Postprocess Method

Transform prediction outputs. Use this to format results or add metadata.

def postprocess(
    self,
    response: Union[Dict, InferResponse],
    headers: Dict = None,
    response_headers: Dict = None
) -> Union[Dict, InferResponse, Awaitable[Union[Dict, InferResponse]]]:
    """
    Postprocess the prediction response.

    Args:
        response (dict or InferResponse): Prediction results
        headers (dict, optional): Request headers
        response_headers (dict, optional): Response headers to populate

    Returns:
        dict or InferResponse: Formatted response
    """

Usage:

class MyModel(Model):
    def postprocess(self, response, headers=None):
        # Add metadata to response
        predictions = response["predictions"]
        return {
            "predictions": predictions,
            "model_name": self.name,
            "version": "1.0.0"
        }

Explain Method

Generate explanations for model predictions. Used for model interpretability.

def explain(self, payload: Dict, headers: Dict = None) -> Union[Dict, Awaitable[Dict]]:
    """
    Generate explanations for predictions.

    Args:
        payload (dict): Input data to explain
        headers (dict, optional): Request headers

    Returns:
        dict: Explanation results
    """

Usage:

from lime.lime_tabular import LimeTabularExplainer

class ExplainableModel(Model):
    def load(self):
        self.model = joblib.load("model.pkl")
        self.explainer = LimeTabularExplainer(
            training_data=self.training_data,
            mode='classification'
        )
        self.ready = True

    def explain(self, payload, headers=None):
        instance = payload["instances"][0]
        explanation = self.explainer.explain_instance(
            instance,
            self.model.predict_proba
        )
        return {"explanations": explanation.as_list()}

Inference Pipeline

The Model class implements an automatic inference pipeline:

  1. Receive Request - FastAPI endpoint receives HTTP request
  2. Preprocess - preprocess() transforms input data
  3. Predict - predict() executes model inference
  4. Postprocess - postprocess() formats output
  5. Return Response - JSON response sent to client

Each stage is optional and can be overridden. By default, data passes through unchanged.

Request Formats

Dictionary Format (REST v1)

{
    "instances": [
        [1, 2, 3, 4],
        [5, 6, 7, 8]
    ]
}

InferRequest Format (REST v2/gRPC)

from kserve import InferRequest, InferInput
import numpy as np

# Create input
input_data = InferInput(
    name="input-0",
    shape=[2, 4],
    datatype="FP32",
    data=[[1, 2, 3, 4], [5, 6, 7, 8]]
)

# Create request
request = InferRequest(
    model_name="my-model",
    infer_inputs=[input_data]
)

Complete Example

from kserve import Model, ModelServer
import numpy as np
import joblib

class SKLearnModel(Model):
    def __init__(self, name: str):
        super().__init__(name)
        self.model = None
        self.ready = False

    def load(self):
        """Load scikit-learn model"""
        self.model = joblib.load("/mnt/models/model.pkl")
        self.ready = True

    def preprocess(self, body, headers=None):
        """Convert input to numpy array"""
        instances = body["instances"]
        return {"instances": np.array(instances)}

    def predict(self, payload, headers=None):
        """Run prediction"""
        instances = payload["instances"]
        predictions = self.model.predict(instances)
        probabilities = self.model.predict_proba(instances)

        return {
            "predictions": predictions.tolist(),
            "probabilities": probabilities.tolist()
        }

    def postprocess(self, response, headers=None):
        """Add metadata"""
        return {
            **response,
            "model_name": self.name,
            "model_version": "1.0.0"
        }

if __name__ == "__main__":
    model = SKLearnModel("sklearn-model")
    model.load()
    ModelServer().start([model])

Async Support

All lifecycle methods support async implementations. KServe automatically detects coroutine functions and awaits them:

class AsyncModel(Model):
    async def load(self):
        """Async model loading"""
        self.model = await load_model_async()
        self.ready = True

    async def predict(self, payload, headers=None):
        """Async prediction"""
        instances = payload["instances"]
        predictions = await self.model.predict_async(instances)
        return {"predictions": predictions}

Multi-Model Support

Model servers can host multiple models:

if __name__ == "__main__":
    model1 = SKLearnModel("model-a")
    model1.load()

    model2 = SKLearnModel("model-b")
    model2.load()

    ModelServer().start([model1, model2])

Access models via:

  • /v1/models/model-a:predict
  • /v1/models/model-b:predict

Error Handling

Raise appropriate exceptions for error conditions:

from kserve.errors import InvalidInput, ModelNotReady, InferenceError

class MyModel(Model):
    def predict(self, payload, headers=None):
        if not self.ready:
            raise ModelNotReady(self.name)

        instances = payload.get("instances")
        if instances is None:
            raise InvalidInput("Missing 'instances' in request")

        try:
            predictions = self.model.predict(instances)
            return {"predictions": predictions.tolist()}
        except Exception as e:
            raise InferenceError(f"Prediction failed: {e}")

Response Headers

Return custom response headers:

class MyModel(Model):
    def __init__(self, name: str):
        super().__init__(name, return_response_headers=True)

    def predict(self, payload, headers=None, response_headers=None):
        # Set custom header
        if response_headers is not None:
            response_headers["X-Model-Version"] = "1.0.0"

        predictions = self.model.predict(payload["instances"])
        return {"predictions": predictions.tolist()}

BaseKServeModel

For advanced use cases, inherit from BaseKServeModel directly:

class BaseKServeModel(ABC):
    """
    Abstract base class for all KServe models.

    Attributes:
        name (str): Model name
        ready (bool): Model readiness status
        engine (bool): Whether model uses inference engine
    """
    name: str
    ready: bool
    engine: bool

    @abstractmethod
    def load(self) -> None: ...

    def start(self) -> None:
        """Start the model server"""

    def stop(self) -> None:
        """Stop the model server"""

    def healthy(self) -> bool:
        """Health check"""
        return self.ready

InferenceModel

InferenceModel extends BaseKServeModel with inference capabilities:

class InferenceModel(BaseKServeModel):
    """Abstract inference model with call interface"""

    @abstractmethod
    def __call__(
        self,
        body: Union[Dict, InferRequest],
        headers: Dict = None,
        verb: InferenceVerb = InferenceVerb.PREDICT
    ) -> Union[Dict, InferResponse, Awaitable]:
        """
        Handle inference requests.

        Args:
            body: Request body
            headers: Request headers
            verb: Inference verb (PREDICT or EXPLAIN)

        Returns:
            Response data
        """

    def get_input_types(self) -> List[Dict]:
        """Get model input types"""
        return []

    def get_output_types(self) -> List[Dict]:
        """Get model output types"""
        return []

InferenceVerb

Enum for inference operation types:

class InferenceVerb(Enum):
    """Inference operation types"""
    PREDICT = 2
    EXPLAIN = 1

RayModel

Model class for Ray Serve integration, enabling distributed model serving with Ray.

class RayModel(InferenceModel):
    """
    Model implementation for Ray Serve integration.

    Wraps a Ray Serve deployment handle to enable distributed inference
    with Ray's scalable serving framework.

    Args:
        name (str): Model name identifier
        handle (ray.serve.DeploymentHandle): Ray Serve deployment handle
    """
    def __init__(self, name: str, handle): ...

    def __call__(
        self,
        body: Dict,
        headers: Dict = None,
        verb: InferenceVerb = InferenceVerb.PREDICT
    ) -> Union[Dict, InferResponse, AsyncIterator]:
        """
        Handle inference request via Ray Serve deployment.

        Args:
            body (Dict): Request body
            headers (Dict, optional): Request headers
            verb (InferenceVerb): Inference verb (PREDICT or EXPLAIN)

        Returns:
            Union[Dict, InferResponse, AsyncIterator]: Inference result from Ray
        """
        ...

    def load(self) -> bool:
        """
        Load method for Ray model (typically no-op as Ray manages loading).

        Returns:
            bool: Always returns True
        """
        ...

    async def get_input_types(self) -> List[Dict]:
        """
        Get input type definitions from Ray deployment.

        Returns:
            List[Dict]: List of input type specifications
        """
        ...

    async def get_output_types(self) -> List[Dict]:
        """
        Get output type definitions from Ray deployment.

        Returns:
            List[Dict]: List of output type specifications
        """
        ...

Usage Example:

import ray
from ray import serve
from kserve import ModelServer
from kserve.ray import RayModel

# Initialize Ray
ray.init()
serve.start()

# Define Ray Serve deployment
@serve.deployment
class MyRayModel:
    def __init__(self):
        self.model = load_model()

    async def __call__(self, request):
        instances = request["instances"]
        predictions = self.model.predict(instances)
        return {"predictions": predictions.tolist()}

# Deploy to Ray
deployment = MyRayModel.bind()
handle = serve.run(deployment)

# Create KServe RayModel wrapper
ray_model = RayModel("ray-model", handle)

# Start KServe model server
ModelServer().start([ray_model])

Model Methods: validate, get_input_types, get_output_types

Additional methods available on the Model class:

class Model(InferenceModel):
    # ... (other methods shown above)

    def validate(self, payload: Dict):
        """
        Validate input payload before processing.

        Override this method to implement custom input validation logic.
        Raise InvalidInput exception if validation fails.

        Args:
            payload (Dict): Input payload to validate

        Raises:
            InvalidInput: If payload validation fails
        """
        ...

    def get_input_types(self) -> List[Dict]:
        """
        Get input type definitions for the model.

        Returns metadata about expected input tensors including name, shape, and datatype.
        Used for API documentation and client code generation.

        Returns:
            List[Dict]: List of input type specifications, each containing:
                - name (str): Input tensor name
                - shape (List[int]): Expected shape
                - datatype (str): Data type (e.g., "FP32", "INT64")

        Example:
            return [{
                "name": "input-0",
                "shape": [-1, 784],
                "datatype": "FP32"
            }]
        """
        ...

    def get_output_types(self) -> List[Dict]:
        """
        Get output type definitions for the model.

        Returns metadata about model output tensors including name, shape, and datatype.
        Used for API documentation and client code generation.

        Returns:
            List[Dict]: List of output type specifications, each containing:
                - name (str): Output tensor name
                - shape (List[int]): Expected shape
                - datatype (str): Data type (e.g., "FP32", "INT64")

        Example:
            return [{
                "name": "output-0",
                "shape": [-1, 10],
                "datatype": "FP32"
            }]
        """
        ...

Usage Example:

from kserve import Model
from kserve.errors import InvalidInput

class ValidatedModel(Model):
    def validate(self, payload):
        """Validate input format and content"""
        if "instances" not in payload:
            raise InvalidInput("Missing 'instances' key in payload")

        instances = payload["instances"]
        if not isinstance(instances, list):
            raise InvalidInput("'instances' must be a list")

        if len(instances) == 0:
            raise InvalidInput("'instances' cannot be empty")

        # Validate each instance
        for i, instance in enumerate(instances):
            if len(instance) != 4:
                raise InvalidInput(f"Instance {i} must have exactly 4 features")

    def get_input_types(self):
        """Define expected input types"""
        return [{
            "name": "input-0",
            "shape": [-1, 4],
            "datatype": "FP32"
        }]

    def get_output_types(self):
        """Define output types"""
        return [{
            "name": "output-0",
            "shape": [-1, 3],
            "datatype": "FP32"
        }]

    def predict(self, payload, headers=None):
        # validation already performed by framework
        instances = payload["instances"]
        predictions = self.model.predict(instances)
        return {"predictions": predictions.tolist()}