or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kserve@0.16.x

docs

index.md
tile.json

tessl/pypi-kserve

tessl install tessl/pypi-kserve@0.16.1

KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.

constants-utils.mddocs/reference/

Constants and Utilities

Constants for Kubernetes resource types, API versions, server defaults, protocol definitions, and utility functions for environment detection and data type conversions.

Import Paths

Constants are located in the kserve.constants.constants module:

from kserve.constants.constants import (
    KSERVE_GROUP,
    KSERVE_V1BETA1_VERSION,
    DEFAULT_HTTP_PORT,
    # ... other constants
)

Capabilities

Kubernetes Constants

Constants for KServe Kubernetes resources.

# API Group and Versions
KSERVE_GROUP: str = "serving.kserve.io"
KSERVE_V1BETA1_VERSION: str = "v1beta1"
KSERVE_V1ALPHA1_VERSION: str = "v1alpha1"

# Resource Kinds
KSERVE_KIND_INFERENCESERVICE: str = "InferenceService"
KSERVE_PLURAL_INFERENCESERVICE: str = "inferenceservices"

KSERVE_KIND_TRAINEDMODEL: str = "TrainedModel"
KSERVE_PLURAL_TRAINEDMODEL: str = "trainedmodels"

KSERVE_KIND_INFERENCEGRAPH: str = "InferenceGraph"
KSERVE_PLURAL_INFERENCEGRAPH: str = "inferencegraphs"

KSERVE_KIND_SERVINGRUNTIME: str = "ServingRuntime"
KSERVE_PLURAL_SERVINGRUNTIME: str = "servingruntimes"

KSERVE_KIND_CLUSTERSERVINGRUNTIME: str = "ClusterServingRuntime"
KSERVE_PLURAL_CLUSTERSERVINGRUNTIME: str = "clusterservingruntimes"

Server Configuration Constants

Default server configuration values.

# Port Configuration
DEFAULT_HTTP_PORT: int = 8080
DEFAULT_GRPC_PORT: int = 8081

# gRPC Configuration
MAX_GRPC_MESSAGE_LENGTH: int = 8388608  # 8MB

# Protocol Route Prefixes
V2_ROUTE_PREFIX: str = "/v2"
V1_ROUTE_PREFIX: str = "/v1"

Storage Credential Constants

Default names for storage credentials.

# AWS S3 Credentials
S3_ACCESS_KEY_ID_DEFAULT_NAME: str = "AWS_ACCESS_KEY_ID"
S3_SECRET_ACCESS_KEY_DEFAULT_NAME: str = "AWS_SECRET_ACCESS_KEY"

# Google Cloud Storage Credentials
GCS_CREDS_FILE_DEFAULT_NAME: str = "gcs-credentials.json"

# Azure Credentials
AZ_DEFAULT_CREDS_FILE: str = "azure-credentials.json"

# Kubernetes Secret Names
DEFAULT_SECRET_NAME: str = "kserve-secret-"
DEFAULT_SA_NAME: str = "kserve-service-credentials"

Protocol Enums

Enumerations for protocol types and model types.

class PredictorProtocol(Enum):
    """
    Predictor protocol types.

    Values:
        REST_V1: Legacy REST protocol ("v1")
        REST_V2: Standard REST protocol ("v2")
        GRPC_V2: gRPC protocol ("grpc-v2")
    """
    REST_V1 = "v1"
    REST_V2 = "v2"
    GRPC_V2 = "grpc-v2"

class ModelType(Enum):
    """
    Model component types.

    Values:
        EXPLAINER: Explainer component (1)
        PREDICTOR: Predictor component (2)
    """
    EXPLAINER = 1
    PREDICTOR = 2

class InferenceVerb(Enum):
    """
    Inference verb types for protocol operations.

    Values:
        EXPLAIN: Explanation request ("explain")
        PREDICT: Prediction request ("predict")
        GENERATE: Generation request for LLMs ("generate")
    """
    EXPLAIN = "explain"
    PREDICT = "predict"
    GENERATE = "generate"

Data Type Mappings

Mappings for gRPC data types.

GRPC_CONTENT_DATATYPE_MAPPINGS: Dict[str, str]
"""
Maps gRPC data types to content field names.
Used for binary data encoding in gRPC protocol.
"""

Utility Functions

Environment Detection

def is_running_in_k8s() -> bool:
    """
    Check if code is running inside a Kubernetes cluster.

    Returns:
        bool: True if running in Kubernetes, False otherwise
    """

def get_default_target_namespace() -> str:
    """
    Get the default target namespace.

    Returns:
        str: Namespace name. Returns current namespace if in cluster,
             otherwise returns "default"
    """

System Utilities

def cpu_count() -> int:
    """
    Get the number of available CPUs.

    Returns:
        int: Number of CPUs available to the process
    """

Context Functions

def set_predictor_config(predictor_config: PredictorConfig) -> None:
    """
    Set global predictor configuration.

    Args:
        predictor_config (PredictorConfig): Predictor configuration object
    """

def get_predictor_config() -> Optional[PredictorConfig]:
    """
    Get global predictor configuration.

    Returns:
        PredictorConfig or None: Current predictor configuration or None if not set
    """

LLM Statistics

class LLMStats:
    """
    Statistics for LLM inference operations.

    Attributes:
        prompt_tokens (int): Number of tokens in the prompt
        completion_tokens (int): Number of tokens in the completion
        total_tokens (int): Total tokens used (prompt + completion)
    """
    def __init__(
        self,
        prompt_tokens: int = 0,
        completion_tokens: int = 0,
        total_tokens: int = 0
    ): ...

    @property
    def prompt_tokens(self) -> int:
        """Number of prompt tokens"""

    @property
    def completion_tokens(self) -> int:
        """Number of completion tokens"""

    @property
    def total_tokens(self) -> int:
        """Total tokens used"""

CloudEvents Utilities

def is_structured_cloudevent(headers: dict) -> bool:
    """
    Check if request headers indicate structured CloudEvent format.

    Args:
        headers (dict): HTTP request headers

    Returns:
        bool: True if request is a structured CloudEvent
    """

def create_response_cloudevent(
    request_id: str,
    source: str,
    data: dict,
    event_type: str = "org.kserve.inference.response"
) -> dict:
    """
    Create a CloudEvent response.

    Args:
        request_id (str): Request ID
        source (str): Event source
        data (dict): Response data
        event_type (str): CloudEvent type (default: "org.kserve.inference.response")

    Returns:
        dict: CloudEvent formatted response
    """

def to_headers(cloudevent: dict) -> dict:
    """
    Convert CloudEvent to HTTP headers.

    Args:
        cloudevent (dict): CloudEvent dictionary

    Returns:
        dict: HTTP headers representing the CloudEvent
    """

Protocol Utilities

def generate_uuid() -> str:
    """
    Generate a UUID string.

    Returns:
        str: UUID string
    """

def get_predict_input(request_body: dict, default_input: str = "instances") -> List:
    """
    Extract prediction input from request body.

    Args:
        request_body (dict): Request body dictionary
        default_input (str): Default input field name (default: "instances")

    Returns:
        list: Input data for prediction

    Raises:
        InvalidInput: If input field is missing
    """

def get_predict_response(predictions: Any, model_name: str, response_id: str = None) -> dict:
    """
    Create prediction response dictionary.

    Args:
        predictions (any): Prediction results
        model_name (str): Model name
        response_id (str, optional): Response ID

    Returns:
        dict: Response dictionary with predictions
    """

def strtobool(val: str) -> bool:
    """
    Convert string representation of truth to boolean.

    Args:
        val (str): String value ("yes", "true", "y", "1" for True; "no", "false", "n", "0" for False)

    Returns:
        bool: Boolean value

    Raises:
        ValueError: If val is not a valid truth value
    """

def is_v2(request_headers: dict) -> bool:
    """
    Check if request uses v2 protocol based on content type.

    Args:
        request_headers (dict): Request headers

    Returns:
        bool: True if v2 protocol is used
    """

def is_v1(request_headers: dict) -> bool:
    """
    Check if request uses v1 protocol.

    Args:
        request_headers (dict): Request headers

    Returns:
        bool: True if v1 protocol is used
    """

NumPy Codec

def to_np_dtype(datatype: str) -> np.dtype:
    """
    Convert KServe datatype to NumPy dtype.

    Args:
        datatype (str): KServe datatype string (e.g., "FP32", "INT64", "BOOL")

    Returns:
        np.dtype: NumPy data type

    Raises:
        InvalidInput: If datatype is not recognized
    """

def from_np_dtype(dtype: np.dtype) -> str:
    """
    Convert NumPy dtype to KServe datatype string.

    Args:
        dtype (np.dtype): NumPy data type

    Returns:
        str: KServe datatype string (e.g., "FP32", "INT64", "BOOL")

    Raises:
        InvalidInput: If dtype cannot be mapped to KServe datatype
    """

Serialization Utilities

def serialize_byte_tensor(input_tensor: np.ndarray) -> bytes:
    """
    Serialize a BYTES tensor to raw bytes format.

    Args:
        input_tensor (np.ndarray): NumPy array with object dtype containing bytes

    Returns:
        bytes: Serialized byte tensor

    Raises:
        InvalidInput: If tensor is not a valid BYTES tensor
    """

def deserialize_bytes_tensor(encoded_tensor: bytes) -> np.ndarray:
    """
    Deserialize raw bytes to BYTES tensor.

    Args:
        encoded_tensor (bytes): Serialized byte tensor

    Returns:
        np.ndarray: NumPy array with object dtype containing bytes

    Raises:
        InvalidInput: If encoded_tensor cannot be deserialized
    """

Usage Examples

Using Kubernetes Constants

from kserve.constants import (
    KSERVE_GROUP,
    KSERVE_V1BETA1_VERSION,
    KSERVE_KIND_INFERENCESERVICE,
    KSERVE_PLURAL_INFERENCESERVICE
)

# Create InferenceService with constants
isvc = {
    "apiVersion": f"{KSERVE_GROUP}/{KSERVE_V1BETA1_VERSION}",
    "kind": KSERVE_KIND_INFERENCESERVICE,
    "metadata": {
        "name": "sklearn-iris"
    },
    "spec": {
        "predictor": {
            "sklearn": {
                "storageUri": "gs://models/sklearn/iris"
            }
        }
    }
}

# List InferenceServices using constants
from kubernetes import client

custom_api = client.CustomObjectsApi()
isvcs = custom_api.list_namespaced_custom_object(
    group=KSERVE_GROUP,
    version=KSERVE_V1BETA1_VERSION,
    namespace="default",
    plural=KSERVE_PLURAL_INFERENCESERVICE
)

Using Server Constants

from kserve.constants import (
    DEFAULT_HTTP_PORT,
    DEFAULT_GRPC_PORT,
    V2_ROUTE_PREFIX,
    MAX_GRPC_MESSAGE_LENGTH
)

# Configure server ports
http_port = DEFAULT_HTTP_PORT  # 8080
grpc_port = DEFAULT_GRPC_PORT  # 8081

# Build endpoint URL
endpoint = f"http://localhost:{http_port}{V2_ROUTE_PREFIX}/models/my-model/infer"

# Configure gRPC message size
grpc_options = [
    ('grpc.max_send_message_length', MAX_GRPC_MESSAGE_LENGTH),
    ('grpc.max_receive_message_length', MAX_GRPC_MESSAGE_LENGTH)
]

Using Protocol Enums

from kserve.constants import PredictorProtocol, ModelType

# Check protocol version
def get_protocol_handler(protocol: str):
    if protocol == PredictorProtocol.REST_V1.value:
        return V1Handler()
    elif protocol == PredictorProtocol.REST_V2.value:
        return V2Handler()
    elif protocol == PredictorProtocol.GRPC_V2.value:
        return GRPCHandler()
    else:
        raise ValueError(f"Unknown protocol: {protocol}")

# Use in configuration
predictor_protocol = PredictorProtocol.REST_V2.value  # "v2"

# Check model type
if component_type == ModelType.PREDICTOR.value:
    print("This is a predictor component")
elif component_type == ModelType.EXPLAINER.value:
    print("This is an explainer component")

Environment Detection

from kserve.utils import is_running_in_k8s, get_default_target_namespace

# Check if running in Kubernetes
if is_running_in_k8s():
    print("Running inside Kubernetes cluster")
    # Use in-cluster configuration
    from kubernetes import config
    config.load_incluster_config()
else:
    print("Running outside Kubernetes cluster")
    # Use kubeconfig
    from kubernetes import config
    config.load_kube_config()

# Get target namespace
namespace = get_default_target_namespace()
print(f"Using namespace: {namespace}")

CPU Count for Worker Configuration

from kserve.utils import cpu_count

# Determine number of workers based on CPUs
num_cpus = cpu_count()
num_workers = min(num_cpus, 4)  # Cap at 4 workers

print(f"Available CPUs: {num_cpus}")
print(f"Starting {num_workers} workers")

CloudEvents Detection

from kserve.utils import is_structured_cloudevent
from kserve import Model

class CloudEventAwareModel(Model):
    def predict(self, payload, headers=None):
        if headers and is_structured_cloudevent(headers):
            # Handle CloudEvent format
            print("Processing CloudEvent request")
            # Extract data from CloudEvent
            if "data" in payload:
                instances = payload["data"]["instances"]
            else:
                instances = payload["instances"]
        else:
            # Handle regular request
            print("Processing regular request")
            instances = payload["instances"]

        predictions = self.model.predict(instances)
        return {"predictions": predictions.tolist()}

Storage Credential Configuration

from kserve.constants import (
    S3_ACCESS_KEY_ID_DEFAULT_NAME,
    S3_SECRET_ACCESS_KEY_DEFAULT_NAME,
    GCS_CREDS_FILE_DEFAULT_NAME,
    DEFAULT_SECRET_NAME,
    DEFAULT_SA_NAME
)
import os

# Get S3 credentials from environment
s3_access_key = os.getenv(S3_ACCESS_KEY_ID_DEFAULT_NAME)
s3_secret_key = os.getenv(S3_SECRET_ACCESS_KEY_DEFAULT_NAME)

if s3_access_key and s3_secret_key:
    print("S3 credentials found")

# Check for GCS credentials file
gcs_creds_path = f"/var/secrets/{GCS_CREDS_FILE_DEFAULT_NAME}"
if os.path.exists(gcs_creds_path):
    print("GCS credentials found")

# Generate secret name
model_name = "sklearn-iris"
secret_name = f"{DEFAULT_SECRET_NAME}{model_name}"
print(f"Secret name: {secret_name}")

Building Resource URLs

from kserve.constants import (
    KSERVE_GROUP,
    KSERVE_V1BETA1_VERSION,
    KSERVE_PLURAL_INFERENCESERVICE,
    V2_ROUTE_PREFIX
)

# Build Kubernetes API URL
def get_isvc_url(namespace: str, name: str) -> str:
    return (
        f"/apis/{KSERVE_GROUP}/{KSERVE_V1BETA1_VERSION}"
        f"/namespaces/{namespace}/{KSERVE_PLURAL_INFERENCESERVICE}/{name}"
    )

url = get_isvc_url("default", "sklearn-iris")
print(f"InferenceService URL: {url}")

# Build inference endpoint URL
def get_inference_url(host: str, port: int, model_name: str) -> str:
    return f"http://{host}:{port}{V2_ROUTE_PREFIX}/models/{model_name}/infer"

inference_url = get_inference_url("localhost", 8080, "my-model")
print(f"Inference URL: {inference_url}")

Dynamic Port Configuration

from kserve.constants import DEFAULT_HTTP_PORT, DEFAULT_GRPC_PORT
import os

# Get ports from environment with defaults
http_port = int(os.getenv("HTTP_PORT", DEFAULT_HTTP_PORT))
grpc_port = int(os.getenv("GRPC_PORT", DEFAULT_GRPC_PORT))

print(f"HTTP server will listen on port {http_port}")
print(f"gRPC server will listen on port {grpc_port}")

Protocol Version Handling

from kserve.constants import PredictorProtocol

class ProtocolAwareModel:
    def __init__(self, protocol: str = None):
        # Use default if not specified
        if protocol is None:
            protocol = PredictorProtocol.REST_V2.value

        # Validate protocol
        valid_protocols = [p.value for p in PredictorProtocol]
        if protocol not in valid_protocols:
            raise ValueError(
                f"Invalid protocol '{protocol}'. "
                f"Must be one of: {valid_protocols}"
            )

        self.protocol = protocol

    def get_endpoint_prefix(self) -> str:
        if self.protocol == PredictorProtocol.REST_V1.value:
            return "/v1"
        elif self.protocol == PredictorProtocol.REST_V2.value:
            return "/v2"
        else:
            return ""

Complete Example: Configuration Helper

from kserve.constants import (
    KSERVE_GROUP,
    KSERVE_V1BETA1_VERSION,
    DEFAULT_HTTP_PORT,
    DEFAULT_GRPC_PORT,
    PredictorProtocol,
    S3_ACCESS_KEY_ID_DEFAULT_NAME,
    S3_SECRET_ACCESS_KEY_DEFAULT_NAME
)
from kserve.utils import is_running_in_k8s, get_default_target_namespace, cpu_count
import os

class KServeConfig:
    """Helper class for KServe configuration"""

    def __init__(self):
        # Detect environment
        self.in_cluster = is_running_in_k8s()
        self.namespace = get_default_target_namespace()

        # Server configuration
        self.http_port = int(os.getenv("HTTP_PORT", DEFAULT_HTTP_PORT))
        self.grpc_port = int(os.getenv("GRPC_PORT", DEFAULT_GRPC_PORT))

        # Worker configuration
        self.num_cpus = cpu_count()
        self.num_workers = min(self.num_cpus, 4)

        # Protocol configuration
        self.protocol = os.getenv("PROTOCOL", PredictorProtocol.REST_V2.value)

        # Storage credentials
        self.s3_access_key = os.getenv(S3_ACCESS_KEY_ID_DEFAULT_NAME)
        self.s3_secret_key = os.getenv(S3_SECRET_ACCESS_KEY_DEFAULT_NAME)

    def get_api_version(self) -> str:
        """Get full API version string"""
        return f"{KSERVE_GROUP}/{KSERVE_V1BETA1_VERSION}"

    def get_inference_url(self, host: str, model_name: str) -> str:
        """Build inference URL"""
        if self.protocol == PredictorProtocol.REST_V1.value:
            return f"http://{host}:{self.http_port}/v1/models/{model_name}:predict"
        elif self.protocol == PredictorProtocol.REST_V2.value:
            return f"http://{host}:{self.http_port}/v2/models/{model_name}/infer"
        else:
            raise ValueError(f"Unsupported protocol: {self.protocol}")

    def has_s3_credentials(self) -> bool:
        """Check if S3 credentials are configured"""
        return bool(self.s3_access_key and self.s3_secret_key)

    def print_config(self):
        """Print current configuration"""
        print("KServe Configuration:")
        print(f"  Environment: {'Kubernetes' if self.in_cluster else 'Local'}")
        print(f"  Namespace: {self.namespace}")
        print(f"  HTTP Port: {self.http_port}")
        print(f"  gRPC Port: {self.grpc_port}")
        print(f"  CPUs: {self.num_cpus}")
        print(f"  Workers: {self.num_workers}")
        print(f"  Protocol: {self.protocol}")
        print(f"  S3 Credentials: {'Configured' if self.has_s3_credentials() else 'Not configured'}")

# Usage
if __name__ == "__main__":
    config = KServeConfig()
    config.print_config()

    # Get inference URL
    url = config.get_inference_url("localhost", "my-model")
    print(f"\nInference URL: {url}")

NumPy Codec Examples

from kserve.utils.numpy_codec import to_np_dtype, from_np_dtype
import numpy as np

# Convert KServe datatype to NumPy dtype
dtype = to_np_dtype("FP32")
print(f"NumPy dtype: {dtype}")  # float32

# Convert NumPy dtype to KServe datatype
arr = np.array([[1.0, 2.0]], dtype=np.float32)
datatype = from_np_dtype(arr.dtype)
print(f"KServe datatype: {datatype}")  # "FP32"

# Create array with converted dtype
int_dtype = to_np_dtype("INT64")
int_array = np.array([1, 2, 3], dtype=int_dtype)

Serialization Examples

from kserve.utils import serialize_byte_tensor, deserialize_bytes_tensor
import numpy as np

# Create BYTES tensor
texts = [b"hello", b"world", b"test"]
tensor = np.array(texts, dtype=object)

# Serialize
serialized = serialize_byte_tensor(tensor)
print(f"Serialized: {len(serialized)} bytes")

# Deserialize
deserialized = deserialize_bytes_tensor(serialized)
print(f"Deserialized: {deserialized}")  # [b'hello' b'world' b'test']

Best Practices

1. Use Constants Instead of Magic Strings

# Bad: Magic strings
api_version = "serving.kserve.io/v1beta1"
kind = "InferenceService"

# Good: Use constants
from kserve.constants import KSERVE_GROUP, KSERVE_V1BETA1_VERSION, KSERVE_KIND_INFERENCESERVICE

api_version = f"{KSERVE_GROUP}/{KSERVE_V1BETA1_VERSION}"
kind = KSERVE_KIND_INFERENCESERVICE

2. Detect Environment Dynamically

from kserve.utils import is_running_in_k8s

if is_running_in_k8s():
    # Production configuration
    from kubernetes import config
    config.load_incluster_config()
else:
    # Development configuration
    from kubernetes import config
    config.load_kube_config()

3. Use Protocol Enums for Type Safety

from kserve.constants import PredictorProtocol

# Bad: String comparison
if protocol == "v2":
    handler = V2Handler()

# Good: Use enum
if protocol == PredictorProtocol.REST_V2.value:
    handler = V2Handler()

4. Centralize Configuration

from kserve.constants import *
from kserve.utils import *

class ServerConfig:
    """Centralized server configuration"""
    HTTP_PORT = DEFAULT_HTTP_PORT
    GRPC_PORT = DEFAULT_GRPC_PORT
    PROTOCOL = PredictorProtocol.REST_V2.value
    WORKERS = cpu_count()
    NAMESPACE = get_default_target_namespace()
    IN_CLUSTER = is_running_in_k8s()

5. Validate Configuration

from kserve.constants import PredictorProtocol

def validate_protocol(protocol: str):
    """Validate protocol version"""
    valid_protocols = [p.value for p in PredictorProtocol]
    if protocol not in valid_protocols:
        raise ValueError(
            f"Invalid protocol '{protocol}'. "
            f"Valid protocols: {', '.join(valid_protocols)}"
        )