tessl install tessl/pypi-kserve@0.16.1KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.
Constants for Kubernetes resource types, API versions, server defaults, protocol definitions, and utility functions for environment detection and data type conversions.
Constants are located in the kserve.constants.constants module:
from kserve.constants.constants import (
KSERVE_GROUP,
KSERVE_V1BETA1_VERSION,
DEFAULT_HTTP_PORT,
# ... other constants
)Constants for KServe Kubernetes resources.
# API Group and Versions
KSERVE_GROUP: str = "serving.kserve.io"
KSERVE_V1BETA1_VERSION: str = "v1beta1"
KSERVE_V1ALPHA1_VERSION: str = "v1alpha1"
# Resource Kinds
KSERVE_KIND_INFERENCESERVICE: str = "InferenceService"
KSERVE_PLURAL_INFERENCESERVICE: str = "inferenceservices"
KSERVE_KIND_TRAINEDMODEL: str = "TrainedModel"
KSERVE_PLURAL_TRAINEDMODEL: str = "trainedmodels"
KSERVE_KIND_INFERENCEGRAPH: str = "InferenceGraph"
KSERVE_PLURAL_INFERENCEGRAPH: str = "inferencegraphs"
KSERVE_KIND_SERVINGRUNTIME: str = "ServingRuntime"
KSERVE_PLURAL_SERVINGRUNTIME: str = "servingruntimes"
KSERVE_KIND_CLUSTERSERVINGRUNTIME: str = "ClusterServingRuntime"
KSERVE_PLURAL_CLUSTERSERVINGRUNTIME: str = "clusterservingruntimes"Default server configuration values.
# Port Configuration
DEFAULT_HTTP_PORT: int = 8080
DEFAULT_GRPC_PORT: int = 8081
# gRPC Configuration
MAX_GRPC_MESSAGE_LENGTH: int = 8388608 # 8MB
# Protocol Route Prefixes
V2_ROUTE_PREFIX: str = "/v2"
V1_ROUTE_PREFIX: str = "/v1"Default names for storage credentials.
# AWS S3 Credentials
S3_ACCESS_KEY_ID_DEFAULT_NAME: str = "AWS_ACCESS_KEY_ID"
S3_SECRET_ACCESS_KEY_DEFAULT_NAME: str = "AWS_SECRET_ACCESS_KEY"
# Google Cloud Storage Credentials
GCS_CREDS_FILE_DEFAULT_NAME: str = "gcs-credentials.json"
# Azure Credentials
AZ_DEFAULT_CREDS_FILE: str = "azure-credentials.json"
# Kubernetes Secret Names
DEFAULT_SECRET_NAME: str = "kserve-secret-"
DEFAULT_SA_NAME: str = "kserve-service-credentials"Enumerations for protocol types and model types.
class PredictorProtocol(Enum):
"""
Predictor protocol types.
Values:
REST_V1: Legacy REST protocol ("v1")
REST_V2: Standard REST protocol ("v2")
GRPC_V2: gRPC protocol ("grpc-v2")
"""
REST_V1 = "v1"
REST_V2 = "v2"
GRPC_V2 = "grpc-v2"
class ModelType(Enum):
"""
Model component types.
Values:
EXPLAINER: Explainer component (1)
PREDICTOR: Predictor component (2)
"""
EXPLAINER = 1
PREDICTOR = 2
class InferenceVerb(Enum):
"""
Inference verb types for protocol operations.
Values:
EXPLAIN: Explanation request ("explain")
PREDICT: Prediction request ("predict")
GENERATE: Generation request for LLMs ("generate")
"""
EXPLAIN = "explain"
PREDICT = "predict"
GENERATE = "generate"Mappings for gRPC data types.
GRPC_CONTENT_DATATYPE_MAPPINGS: Dict[str, str]
"""
Maps gRPC data types to content field names.
Used for binary data encoding in gRPC protocol.
"""def is_running_in_k8s() -> bool:
"""
Check if code is running inside a Kubernetes cluster.
Returns:
bool: True if running in Kubernetes, False otherwise
"""
def get_default_target_namespace() -> str:
"""
Get the default target namespace.
Returns:
str: Namespace name. Returns current namespace if in cluster,
otherwise returns "default"
"""def cpu_count() -> int:
"""
Get the number of available CPUs.
Returns:
int: Number of CPUs available to the process
"""def set_predictor_config(predictor_config: PredictorConfig) -> None:
"""
Set global predictor configuration.
Args:
predictor_config (PredictorConfig): Predictor configuration object
"""
def get_predictor_config() -> Optional[PredictorConfig]:
"""
Get global predictor configuration.
Returns:
PredictorConfig or None: Current predictor configuration or None if not set
"""class LLMStats:
"""
Statistics for LLM inference operations.
Attributes:
prompt_tokens (int): Number of tokens in the prompt
completion_tokens (int): Number of tokens in the completion
total_tokens (int): Total tokens used (prompt + completion)
"""
def __init__(
self,
prompt_tokens: int = 0,
completion_tokens: int = 0,
total_tokens: int = 0
): ...
@property
def prompt_tokens(self) -> int:
"""Number of prompt tokens"""
@property
def completion_tokens(self) -> int:
"""Number of completion tokens"""
@property
def total_tokens(self) -> int:
"""Total tokens used"""def is_structured_cloudevent(headers: dict) -> bool:
"""
Check if request headers indicate structured CloudEvent format.
Args:
headers (dict): HTTP request headers
Returns:
bool: True if request is a structured CloudEvent
"""
def create_response_cloudevent(
request_id: str,
source: str,
data: dict,
event_type: str = "org.kserve.inference.response"
) -> dict:
"""
Create a CloudEvent response.
Args:
request_id (str): Request ID
source (str): Event source
data (dict): Response data
event_type (str): CloudEvent type (default: "org.kserve.inference.response")
Returns:
dict: CloudEvent formatted response
"""
def to_headers(cloudevent: dict) -> dict:
"""
Convert CloudEvent to HTTP headers.
Args:
cloudevent (dict): CloudEvent dictionary
Returns:
dict: HTTP headers representing the CloudEvent
"""def generate_uuid() -> str:
"""
Generate a UUID string.
Returns:
str: UUID string
"""
def get_predict_input(request_body: dict, default_input: str = "instances") -> List:
"""
Extract prediction input from request body.
Args:
request_body (dict): Request body dictionary
default_input (str): Default input field name (default: "instances")
Returns:
list: Input data for prediction
Raises:
InvalidInput: If input field is missing
"""
def get_predict_response(predictions: Any, model_name: str, response_id: str = None) -> dict:
"""
Create prediction response dictionary.
Args:
predictions (any): Prediction results
model_name (str): Model name
response_id (str, optional): Response ID
Returns:
dict: Response dictionary with predictions
"""
def strtobool(val: str) -> bool:
"""
Convert string representation of truth to boolean.
Args:
val (str): String value ("yes", "true", "y", "1" for True; "no", "false", "n", "0" for False)
Returns:
bool: Boolean value
Raises:
ValueError: If val is not a valid truth value
"""
def is_v2(request_headers: dict) -> bool:
"""
Check if request uses v2 protocol based on content type.
Args:
request_headers (dict): Request headers
Returns:
bool: True if v2 protocol is used
"""
def is_v1(request_headers: dict) -> bool:
"""
Check if request uses v1 protocol.
Args:
request_headers (dict): Request headers
Returns:
bool: True if v1 protocol is used
"""def to_np_dtype(datatype: str) -> np.dtype:
"""
Convert KServe datatype to NumPy dtype.
Args:
datatype (str): KServe datatype string (e.g., "FP32", "INT64", "BOOL")
Returns:
np.dtype: NumPy data type
Raises:
InvalidInput: If datatype is not recognized
"""
def from_np_dtype(dtype: np.dtype) -> str:
"""
Convert NumPy dtype to KServe datatype string.
Args:
dtype (np.dtype): NumPy data type
Returns:
str: KServe datatype string (e.g., "FP32", "INT64", "BOOL")
Raises:
InvalidInput: If dtype cannot be mapped to KServe datatype
"""def serialize_byte_tensor(input_tensor: np.ndarray) -> bytes:
"""
Serialize a BYTES tensor to raw bytes format.
Args:
input_tensor (np.ndarray): NumPy array with object dtype containing bytes
Returns:
bytes: Serialized byte tensor
Raises:
InvalidInput: If tensor is not a valid BYTES tensor
"""
def deserialize_bytes_tensor(encoded_tensor: bytes) -> np.ndarray:
"""
Deserialize raw bytes to BYTES tensor.
Args:
encoded_tensor (bytes): Serialized byte tensor
Returns:
np.ndarray: NumPy array with object dtype containing bytes
Raises:
InvalidInput: If encoded_tensor cannot be deserialized
"""from kserve.constants import (
KSERVE_GROUP,
KSERVE_V1BETA1_VERSION,
KSERVE_KIND_INFERENCESERVICE,
KSERVE_PLURAL_INFERENCESERVICE
)
# Create InferenceService with constants
isvc = {
"apiVersion": f"{KSERVE_GROUP}/{KSERVE_V1BETA1_VERSION}",
"kind": KSERVE_KIND_INFERENCESERVICE,
"metadata": {
"name": "sklearn-iris"
},
"spec": {
"predictor": {
"sklearn": {
"storageUri": "gs://models/sklearn/iris"
}
}
}
}
# List InferenceServices using constants
from kubernetes import client
custom_api = client.CustomObjectsApi()
isvcs = custom_api.list_namespaced_custom_object(
group=KSERVE_GROUP,
version=KSERVE_V1BETA1_VERSION,
namespace="default",
plural=KSERVE_PLURAL_INFERENCESERVICE
)from kserve.constants import (
DEFAULT_HTTP_PORT,
DEFAULT_GRPC_PORT,
V2_ROUTE_PREFIX,
MAX_GRPC_MESSAGE_LENGTH
)
# Configure server ports
http_port = DEFAULT_HTTP_PORT # 8080
grpc_port = DEFAULT_GRPC_PORT # 8081
# Build endpoint URL
endpoint = f"http://localhost:{http_port}{V2_ROUTE_PREFIX}/models/my-model/infer"
# Configure gRPC message size
grpc_options = [
('grpc.max_send_message_length', MAX_GRPC_MESSAGE_LENGTH),
('grpc.max_receive_message_length', MAX_GRPC_MESSAGE_LENGTH)
]from kserve.constants import PredictorProtocol, ModelType
# Check protocol version
def get_protocol_handler(protocol: str):
if protocol == PredictorProtocol.REST_V1.value:
return V1Handler()
elif protocol == PredictorProtocol.REST_V2.value:
return V2Handler()
elif protocol == PredictorProtocol.GRPC_V2.value:
return GRPCHandler()
else:
raise ValueError(f"Unknown protocol: {protocol}")
# Use in configuration
predictor_protocol = PredictorProtocol.REST_V2.value # "v2"
# Check model type
if component_type == ModelType.PREDICTOR.value:
print("This is a predictor component")
elif component_type == ModelType.EXPLAINER.value:
print("This is an explainer component")from kserve.utils import is_running_in_k8s, get_default_target_namespace
# Check if running in Kubernetes
if is_running_in_k8s():
print("Running inside Kubernetes cluster")
# Use in-cluster configuration
from kubernetes import config
config.load_incluster_config()
else:
print("Running outside Kubernetes cluster")
# Use kubeconfig
from kubernetes import config
config.load_kube_config()
# Get target namespace
namespace = get_default_target_namespace()
print(f"Using namespace: {namespace}")from kserve.utils import cpu_count
# Determine number of workers based on CPUs
num_cpus = cpu_count()
num_workers = min(num_cpus, 4) # Cap at 4 workers
print(f"Available CPUs: {num_cpus}")
print(f"Starting {num_workers} workers")from kserve.utils import is_structured_cloudevent
from kserve import Model
class CloudEventAwareModel(Model):
def predict(self, payload, headers=None):
if headers and is_structured_cloudevent(headers):
# Handle CloudEvent format
print("Processing CloudEvent request")
# Extract data from CloudEvent
if "data" in payload:
instances = payload["data"]["instances"]
else:
instances = payload["instances"]
else:
# Handle regular request
print("Processing regular request")
instances = payload["instances"]
predictions = self.model.predict(instances)
return {"predictions": predictions.tolist()}from kserve.constants import (
S3_ACCESS_KEY_ID_DEFAULT_NAME,
S3_SECRET_ACCESS_KEY_DEFAULT_NAME,
GCS_CREDS_FILE_DEFAULT_NAME,
DEFAULT_SECRET_NAME,
DEFAULT_SA_NAME
)
import os
# Get S3 credentials from environment
s3_access_key = os.getenv(S3_ACCESS_KEY_ID_DEFAULT_NAME)
s3_secret_key = os.getenv(S3_SECRET_ACCESS_KEY_DEFAULT_NAME)
if s3_access_key and s3_secret_key:
print("S3 credentials found")
# Check for GCS credentials file
gcs_creds_path = f"/var/secrets/{GCS_CREDS_FILE_DEFAULT_NAME}"
if os.path.exists(gcs_creds_path):
print("GCS credentials found")
# Generate secret name
model_name = "sklearn-iris"
secret_name = f"{DEFAULT_SECRET_NAME}{model_name}"
print(f"Secret name: {secret_name}")from kserve.constants import (
KSERVE_GROUP,
KSERVE_V1BETA1_VERSION,
KSERVE_PLURAL_INFERENCESERVICE,
V2_ROUTE_PREFIX
)
# Build Kubernetes API URL
def get_isvc_url(namespace: str, name: str) -> str:
return (
f"/apis/{KSERVE_GROUP}/{KSERVE_V1BETA1_VERSION}"
f"/namespaces/{namespace}/{KSERVE_PLURAL_INFERENCESERVICE}/{name}"
)
url = get_isvc_url("default", "sklearn-iris")
print(f"InferenceService URL: {url}")
# Build inference endpoint URL
def get_inference_url(host: str, port: int, model_name: str) -> str:
return f"http://{host}:{port}{V2_ROUTE_PREFIX}/models/{model_name}/infer"
inference_url = get_inference_url("localhost", 8080, "my-model")
print(f"Inference URL: {inference_url}")from kserve.constants import DEFAULT_HTTP_PORT, DEFAULT_GRPC_PORT
import os
# Get ports from environment with defaults
http_port = int(os.getenv("HTTP_PORT", DEFAULT_HTTP_PORT))
grpc_port = int(os.getenv("GRPC_PORT", DEFAULT_GRPC_PORT))
print(f"HTTP server will listen on port {http_port}")
print(f"gRPC server will listen on port {grpc_port}")from kserve.constants import PredictorProtocol
class ProtocolAwareModel:
def __init__(self, protocol: str = None):
# Use default if not specified
if protocol is None:
protocol = PredictorProtocol.REST_V2.value
# Validate protocol
valid_protocols = [p.value for p in PredictorProtocol]
if protocol not in valid_protocols:
raise ValueError(
f"Invalid protocol '{protocol}'. "
f"Must be one of: {valid_protocols}"
)
self.protocol = protocol
def get_endpoint_prefix(self) -> str:
if self.protocol == PredictorProtocol.REST_V1.value:
return "/v1"
elif self.protocol == PredictorProtocol.REST_V2.value:
return "/v2"
else:
return ""from kserve.constants import (
KSERVE_GROUP,
KSERVE_V1BETA1_VERSION,
DEFAULT_HTTP_PORT,
DEFAULT_GRPC_PORT,
PredictorProtocol,
S3_ACCESS_KEY_ID_DEFAULT_NAME,
S3_SECRET_ACCESS_KEY_DEFAULT_NAME
)
from kserve.utils import is_running_in_k8s, get_default_target_namespace, cpu_count
import os
class KServeConfig:
"""Helper class for KServe configuration"""
def __init__(self):
# Detect environment
self.in_cluster = is_running_in_k8s()
self.namespace = get_default_target_namespace()
# Server configuration
self.http_port = int(os.getenv("HTTP_PORT", DEFAULT_HTTP_PORT))
self.grpc_port = int(os.getenv("GRPC_PORT", DEFAULT_GRPC_PORT))
# Worker configuration
self.num_cpus = cpu_count()
self.num_workers = min(self.num_cpus, 4)
# Protocol configuration
self.protocol = os.getenv("PROTOCOL", PredictorProtocol.REST_V2.value)
# Storage credentials
self.s3_access_key = os.getenv(S3_ACCESS_KEY_ID_DEFAULT_NAME)
self.s3_secret_key = os.getenv(S3_SECRET_ACCESS_KEY_DEFAULT_NAME)
def get_api_version(self) -> str:
"""Get full API version string"""
return f"{KSERVE_GROUP}/{KSERVE_V1BETA1_VERSION}"
def get_inference_url(self, host: str, model_name: str) -> str:
"""Build inference URL"""
if self.protocol == PredictorProtocol.REST_V1.value:
return f"http://{host}:{self.http_port}/v1/models/{model_name}:predict"
elif self.protocol == PredictorProtocol.REST_V2.value:
return f"http://{host}:{self.http_port}/v2/models/{model_name}/infer"
else:
raise ValueError(f"Unsupported protocol: {self.protocol}")
def has_s3_credentials(self) -> bool:
"""Check if S3 credentials are configured"""
return bool(self.s3_access_key and self.s3_secret_key)
def print_config(self):
"""Print current configuration"""
print("KServe Configuration:")
print(f" Environment: {'Kubernetes' if self.in_cluster else 'Local'}")
print(f" Namespace: {self.namespace}")
print(f" HTTP Port: {self.http_port}")
print(f" gRPC Port: {self.grpc_port}")
print(f" CPUs: {self.num_cpus}")
print(f" Workers: {self.num_workers}")
print(f" Protocol: {self.protocol}")
print(f" S3 Credentials: {'Configured' if self.has_s3_credentials() else 'Not configured'}")
# Usage
if __name__ == "__main__":
config = KServeConfig()
config.print_config()
# Get inference URL
url = config.get_inference_url("localhost", "my-model")
print(f"\nInference URL: {url}")from kserve.utils.numpy_codec import to_np_dtype, from_np_dtype
import numpy as np
# Convert KServe datatype to NumPy dtype
dtype = to_np_dtype("FP32")
print(f"NumPy dtype: {dtype}") # float32
# Convert NumPy dtype to KServe datatype
arr = np.array([[1.0, 2.0]], dtype=np.float32)
datatype = from_np_dtype(arr.dtype)
print(f"KServe datatype: {datatype}") # "FP32"
# Create array with converted dtype
int_dtype = to_np_dtype("INT64")
int_array = np.array([1, 2, 3], dtype=int_dtype)from kserve.utils import serialize_byte_tensor, deserialize_bytes_tensor
import numpy as np
# Create BYTES tensor
texts = [b"hello", b"world", b"test"]
tensor = np.array(texts, dtype=object)
# Serialize
serialized = serialize_byte_tensor(tensor)
print(f"Serialized: {len(serialized)} bytes")
# Deserialize
deserialized = deserialize_bytes_tensor(serialized)
print(f"Deserialized: {deserialized}") # [b'hello' b'world' b'test']# Bad: Magic strings
api_version = "serving.kserve.io/v1beta1"
kind = "InferenceService"
# Good: Use constants
from kserve.constants import KSERVE_GROUP, KSERVE_V1BETA1_VERSION, KSERVE_KIND_INFERENCESERVICE
api_version = f"{KSERVE_GROUP}/{KSERVE_V1BETA1_VERSION}"
kind = KSERVE_KIND_INFERENCESERVICEfrom kserve.utils import is_running_in_k8s
if is_running_in_k8s():
# Production configuration
from kubernetes import config
config.load_incluster_config()
else:
# Development configuration
from kubernetes import config
config.load_kube_config()from kserve.constants import PredictorProtocol
# Bad: String comparison
if protocol == "v2":
handler = V2Handler()
# Good: Use enum
if protocol == PredictorProtocol.REST_V2.value:
handler = V2Handler()from kserve.constants import *
from kserve.utils import *
class ServerConfig:
"""Centralized server configuration"""
HTTP_PORT = DEFAULT_HTTP_PORT
GRPC_PORT = DEFAULT_GRPC_PORT
PROTOCOL = PredictorProtocol.REST_V2.value
WORKERS = cpu_count()
NAMESPACE = get_default_target_namespace()
IN_CLUSTER = is_running_in_k8s()from kserve.constants import PredictorProtocol
def validate_protocol(protocol: str):
"""Validate protocol version"""
valid_protocols = [p.value for p in PredictorProtocol]
if protocol not in valid_protocols:
raise ValueError(
f"Invalid protocol '{protocol}'. "
f"Valid protocols: {', '.join(valid_protocols)}"
)