tessl install tessl/pypi-kserve@0.16.1KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.
Best practices for deploying KServe model servers in production environments with proper error handling, monitoring, resource management, and high availability.
from kserve import Model
from kserve.errors import InvalidInput, InferenceError, ModelNotReady
class ProductionModel(Model):
def predict(self, payload, headers=None):
# Check model readiness
if not self.ready:
raise ModelNotReady(self.name, "Model is still loading")
# Validate input structure
if "instances" not in payload:
raise InvalidInput("Missing 'instances' field in request")
instances = payload["instances"]
# Validate input type
if not isinstance(instances, list):
raise InvalidInput("'instances' must be a list")
# Validate not empty
if len(instances) == 0:
raise InvalidInput("'instances' cannot be empty")
# Validate batch size
if len(instances) > 32:
raise InvalidInput(f"Batch size {len(instances)} exceeds maximum of 32")
# Validate instance shape
for idx, instance in enumerate(instances):
if len(instance) != 4:
raise InvalidInput(
f"Instance {idx} has {len(instance)} features, expected 4"
)
# Run inference with error handling
try:
predictions = self.model.predict(instances)
return {"predictions": predictions.tolist()}
except ValueError as e:
raise InvalidInput(f"Invalid input values: {e}")
except Exception as e:
raise InferenceError(f"Prediction failed: {e}")from kserve import (
V1beta1InferenceService,
V1beta1InferenceServiceSpec,
V1beta1PredictorSpec,
V1beta1SKLearnSpec
)
isvc = V1beta1InferenceService(
api_version="serving.kserve.io/v1beta1",
kind="InferenceService",
metadata={"name": "sklearn-iris", "namespace": "default"},
spec=V1beta1InferenceServiceSpec(
predictor=V1beta1PredictorSpec(
min_replicas=2, # High availability
max_replicas=10, # Auto-scaling
sklearn=V1beta1SKLearnSpec(
storage_uri="gs://models/sklearn/iris",
resources={
"limits": {
"cpu": "2",
"memory": "4Gi",
"nvidia.com/gpu": "1" # GPU if needed
},
"requests": {
"cpu": "1",
"memory": "2Gi"
}
}
)
)
)
)from kserve import Model
import gc
class MemoryEfficientModel(Model):
def load(self):
"""Load model with memory optimization"""
try:
# Load model
self.model = load_model()
# Force garbage collection
gc.collect()
self.ready = True
logger.info(f"Model {self.name} loaded, memory optimized")
except MemoryError as e:
logger.error(f"Out of memory loading model: {e}")
gc.collect()
raise
def predict(self, payload, headers=None):
try:
result = self.model.predict(payload["instances"])
return {"predictions": result.tolist()}
finally:
# Cleanup after large predictions
gc.collect()from kserve import Model, logger
class HealthCheckedModel(Model):
def __init__(self, name: str):
super().__init__(name)
self.health_check_failures = 0
self.max_health_failures = 3
self.last_health_check = None
async def healthy(self) -> bool:
"""Custom health check with test inference"""
try:
# Perform test inference
test_input = {"instances": [[1.0, 2.0, 3.0, 4.0]]}
result = self.predict(test_input)
# Reset failure count on success
self.health_check_failures = 0
self.last_health_check = datetime.now()
return True
except Exception as e:
self.health_check_failures += 1
logger.error(f"Health check failed: {e}")
# Fail health check if too many failures
if self.health_check_failures >= self.max_health_failures:
logger.critical(f"Health check failed {self.health_check_failures} times")
return False
# Still healthy if under threshold
return Truefrom kserve import Model, logger, configure_logging
# Configure structured logging
log_config = {
"version": 1,
"formatters": {
"json": {
"class": "pythonjsonlogger.jsonlogger.JsonFormatter",
"format": "%(asctime)s %(name)s %(levelname)s %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "json",
"level": "INFO"
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"filename": "/var/log/kserve/model.log",
"formatter": "json",
"level": "INFO",
"maxBytes": 104857600, # 100MB
"backupCount": 10
}
},
"loggers": {
"kserve": {
"level": "INFO",
"handlers": ["console", "file"]
}
}
}
configure_logging(log_config)
class LoggedModel(Model):
def predict(self, payload, headers=None):
logger.info(f"Prediction request", extra={
"model": self.name,
"batch_size": len(payload.get("instances", [])),
"request_id": headers.get("x-request-id") if headers else None
})
try:
result = self.model.predict(payload["instances"])
logger.info(f"Prediction success", extra={
"model": self.name,
"output_size": len(result)
})
return {"predictions": result.tolist()}
except Exception as e:
logger.error(f"Prediction failed", extra={
"model": self.name,
"error": str(e)
}, exc_info=True)
raisefrom kserve import Model, get_labels
from kserve.metrics import PREDICT_HIST_TIME
from prometheus_client import Counter, Gauge
# Custom metrics
PREDICTION_ERRORS = Counter(
'prediction_errors_total',
'Total prediction errors',
['model_name', 'error_type']
)
ACTIVE_REQUESTS = Gauge(
'active_requests',
'Number of active requests',
['model_name']
)
class MetricsModel(Model):
def predict(self, payload, headers=None):
labels = get_labels(self.name)
# Track active requests
ACTIVE_REQUESTS.labels(model_name=self.name).inc()
try:
result = self.model.predict(payload["instances"])
return {"predictions": result.tolist()}
except InvalidInput as e:
PREDICTION_ERRORS.labels(
model_name=self.name,
error_type="invalid_input"
).inc()
raise
except Exception as e:
PREDICTION_ERRORS.labels(
model_name=self.name,
error_type="inference_error"
).inc()
raise
finally:
ACTIVE_REQUESTS.labels(model_name=self.name).dec()import asyncio
import random
from kserve import InferenceRESTClient
async def infer_with_retry(
client: InferenceRESTClient,
base_url: str,
model_name: str,
data: dict,
max_retries: int = 3
):
"""Make inference request with exponential backoff"""
for attempt in range(max_retries):
try:
return await client.infer(
base_url=base_url,
model_name=model_name,
data=data
)
except Exception as e:
if attempt == max_retries - 1:
raise
# Exponential backoff with jitter
wait_time = (2 ** attempt) + random.uniform(0, 1)
logger.warning(
f"Inference failed (attempt {attempt + 1}/{max_retries}), "
f"retrying in {wait_time:.2f}s: {e}"
)
await asyncio.sleep(wait_time)from kserve import Model
from kserve.errors import InferenceError
from datetime import datetime, timedelta
class CircuitBreakerModel(Model):
def __init__(self, name: str, failure_threshold: int = 5, timeout: int = 60):
super().__init__(name)
self.failure_threshold = failure_threshold
self.timeout = timedelta(seconds=timeout)
self.failure_count = 0
self.last_failure_time = None
self.circuit_open = False
def predict(self, payload, headers=None):
# Check circuit breaker state
if self.circuit_open:
if datetime.now() - self.last_failure_time > self.timeout:
# Try to close circuit
logger.info(f"Circuit breaker timeout expired, attempting to close")
self.circuit_open = False
self.failure_count = 0
else:
raise InferenceError("Circuit breaker is open, service unavailable")
try:
result = self.model.predict(payload["instances"])
self.failure_count = 0 # Reset on success
return {"predictions": result.tolist()}
except Exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.circuit_open = True
logger.error(
f"Circuit breaker opened after {self.failure_count} failures"
)
raise InferenceError(f"Prediction failed: {e}")from kserve import Model, logger
class ResilientModel(Model):
def __init__(self, name: str):
super().__init__(name)
self.fallback_enabled = True
self.cache = {}
def predict(self, payload, headers=None):
try:
# Primary inference path
result = self.model.predict(payload["instances"])
# Cache successful results
cache_key = str(payload["instances"])
self.cache[cache_key] = result
return {"predictions": result.tolist()}
except Exception as e:
if self.fallback_enabled and cache_key in self.cache:
logger.warning(
f"Primary inference failed, using cached result: {e}"
)
return {"predictions": self.cache[cache_key].tolist()}
else:
raise InferenceError(f"Inference failed and no fallback available: {e}")from kserve import Model, ModelServer
import signal
import sys
class GracefulModel(Model):
def __init__(self, name: str):
super().__init__(name)
self.shutting_down = False
# Register signal handlers
signal.signal(signal.SIGTERM, self._handle_shutdown)
signal.signal(signal.SIGINT, self._handle_shutdown)
def _handle_shutdown(self, signum, frame):
"""Handle shutdown signal"""
logger.info(f"Received shutdown signal for {self.name}")
self.shutting_down = True
self.stop()
sys.exit(0)
def predict(self, payload, headers=None):
# Reject new requests during shutdown
if self.shutting_down:
raise ModelNotReady(self.name, "Model is shutting down")
return {"predictions": self.model.predict(payload["instances"])}
def stop(self):
"""Cleanup resources"""
logger.info(f"Stopping model {self.name}")
self.ready = False
# Close connections, save state, etc.
if hasattr(self.model, 'close'):
self.model.close()import uvicorn
from kserve import ModelServer
if __name__ == "__main__":
model = MyModel("my-model")
model.load()
server = ModelServer()
app = server.create_application()
server.register_model(model)
# Run with SSL
uvicorn.run(
app,
host="0.0.0.0",
port=8443,
ssl_keyfile="/path/to/key.pem",
ssl_certfile="/path/to/cert.pem",
ssl_ca_certs="/path/to/ca.pem"
)from kserve import InferenceRESTClient, RESTConfig
# Configure SSL verification
config = RESTConfig(
protocol="v2",
verify=True, # Verify SSL certificates
cert=("/path/to/client-cert.pem", "/path/to/client-key.pem")
)
client = InferenceRESTClient(config=config)from kserve import ModelServer
# Configure server timeouts
server = ModelServer(
http_port=8080,
workers=4,
grace_period=30 # Graceful shutdown period
)from kserve import InferenceRESTClient, RESTConfig
# Configure client timeouts
config = RESTConfig(
protocol="v2",
timeout=30, # 30 second default timeout
retries=3
)
client = InferenceRESTClient(config=config)
# Override per request
response = await client.infer(
base_url="http://localhost:8080",
model_name="slow-model",
data={...},
timeout=120 # 2 minute timeout for this request
)from kserve import V1beta1PredictorSpec, V1beta1SKLearnSpec
predictor = V1beta1PredictorSpec(
min_replicas=3, # Always run 3 replicas
max_replicas=10, # Scale up to 10
scale_target=100, # Target 100 concurrent requests
scale_metric="concurrency",
sklearn=V1beta1SKLearnSpec(
storage_uri="gs://models/sklearn/iris",
resources={
"limits": {"cpu": "2", "memory": "4Gi"},
"requests": {"cpu": "1", "memory": "2Gi"}
}
)
)apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: sklearn-iris-pdb
spec:
minAvailable: 2
selector:
matchLabels:
serving.kserve.io/inferenceservice: sklearn-iris# prometheus.yml
scrape_configs:
- job_name: 'kserve'
scrape_interval: 15s
kubernetes_sd_configs:
- role: pod
namespaces:
names: ['default']
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_serving_kserve_io_inferenceservice]
action: keep
regex: .+
- source_labels: [__meta_kubernetes_pod_container_port_number]
action: keep
regex: "8080"
metrics_path: '/metrics'{
"dashboard": {
"title": "KServe Model Metrics",
"panels": [
{
"title": "Prediction Latency (p95)",
"targets": [{
"expr": "histogram_quantile(0.95, rate(request_predict_seconds_bucket[5m]))"
}]
},
{
"title": "Request Rate",
"targets": [{
"expr": "rate(request_predict_seconds_count[1m])"
}]
},
{
"title": "Error Rate",
"targets": [{
"expr": "rate(prediction_errors_total[5m])"
}]
}
]
}
}groups:
- name: kserve_alerts
rules:
- alert: HighLatency
expr: histogram_quantile(0.95, rate(request_predict_seconds_bucket[5m])) > 1.0
for: 5m
labels:
severity: warning
annotations:
summary: "High prediction latency detected"
description: "95th percentile latency is {{ $value }}s"
- alert: HighErrorRate
expr: rate(prediction_errors_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} errors/sec"
- alert: ModelNotReady
expr: up{job="kserve"} == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Model server is down"from locust import HttpUser, task, between
class KServeUser(HttpUser):
wait_time = between(0.1, 0.5)
@task
def predict(self):
self.client.post(
"/v2/models/sklearn-iris/infer",
json={
"inputs": [{
"name": "input-0",
"shape": [1, 4],
"datatype": "FP32",
"data": [[5.1, 3.5, 1.4, 0.2]]
}]
},
headers={"Content-Type": "application/json"}
)Run load test:
locust -f load_test.py --host http://localhost:8080 --users 100 --spawn-rate 10from kserve import Configuration, KServeClient
# Configure with API key
config = Configuration(
host="https://k8s-api.example.com",
api_key={"authorization": os.getenv("K8S_API_TOKEN")},
api_key_prefix={"authorization": "Bearer"},
verify_ssl=True,
ssl_ca_cert="/etc/ssl/certs/ca.crt"
)
client = KServeClient(client_configuration=config)from kserve import Model
from kserve.errors import InvalidInput
import re
class SecureModel(Model):
def predict(self, payload, headers=None):
instances = payload.get("instances", [])
# Validate input size
if len(instances) > 1000:
raise InvalidInput("Batch size too large, maximum 1000")
# Sanitize string inputs
for instance in instances:
if isinstance(instance, str):
# Remove potentially dangerous characters
if re.search(r'[<>\"\'&]', instance):
raise InvalidInput("Invalid characters in input")
return {"predictions": self.model.predict(instances)}# Deploy new version
isvc_v2 = V1beta1InferenceService(
metadata={"name": "sklearn-iris-v2"},
spec=V1beta1InferenceServiceSpec(
predictor=V1beta1PredictorSpec(
sklearn=V1beta1SKLearnSpec(
storage_uri="gs://models/sklearn/iris/v2"
)
)
)
)
kserve_client.create(isvc_v2, namespace="default", watch=True)
# Test new version
# ... validation ...
# Switch traffic (update ingress or service mesh)
# Delete old version
kserve_client.delete("sklearn-iris-v1", namespace="default")from kserve import V1beta1InferenceService, V1beta1InferenceServiceSpec
# Deploy canary with traffic split
isvc = V1beta1InferenceService(
metadata={
"name": "sklearn-iris",
"annotations": {
"serving.kserve.io/traffic": "v1=90,v2=10" # 90% v1, 10% v2
}
},
spec=V1beta1InferenceServiceSpec(
predictor=V1beta1PredictorSpec(
canary_traffic_percent=10, # 10% to canary
sklearn=V1beta1SKLearnSpec(
storage_uri="gs://models/sklearn/iris/v2"
)
)
)
)FROM python:3.10-slim
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy model code
COPY model.py /app/
COPY model.pkl /mnt/models/
# Set working directory
WORKDIR /app
# Non-root user
RUN useradd -m -u 1000 kserve
USER kserve
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=60s --retries=3 \
CMD curl -f http://localhost:8080/v2/health/live || exit 1
# Start server
CMD ["python", "model.py"]# Build stage
FROM python:3.10 as builder
WORKDIR /build
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt
# Runtime stage
FROM python:3.10-slim
COPY --from=builder /root/.local /root/.local
COPY model.py /app/
COPY model.pkl /mnt/models/
WORKDIR /app
ENV PATH=/root/.local/bin:$PATH
CMD ["python", "model.py"]import os
from kserve import Model, ModelServer
class ConfigurableModel(Model):
def __init__(self, name: str):
super().__init__(name)
# Load from environment
self.model_path = os.getenv("MODEL_PATH", "/mnt/models/model.pkl")
self.batch_size = int(os.getenv("MAX_BATCH_SIZE", "32"))
self.timeout = int(os.getenv("INFERENCE_TIMEOUT", "30"))
def load(self):
self.model = joblib.load(self.model_path)
self.ready = True
if __name__ == "__main__":
model = ConfigurableModel(os.getenv("MODEL_NAME", "my-model"))
model.load()
ModelServer(
http_port=int(os.getenv("HTTP_PORT", "8080")),
grpc_port=int(os.getenv("GRPC_PORT", "8081")),
workers=int(os.getenv("WORKERS", "1"))
).start([model])