or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kserve@0.16.x

docs

index.md
tile.json

tessl/pypi-kserve

tessl install tessl/pypi-kserve@0.16.1

KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.

production-deployment.mddocs/guides/

Production Deployment Guide

Best practices for deploying KServe model servers in production environments with proper error handling, monitoring, resource management, and high availability.

Production Checklist

  • Error handling and validation
  • Resource limits configured
  • Health checks implemented
  • Logging and metrics enabled
  • Graceful shutdown handling
  • Retry logic with backoff
  • Circuit breaker for resilience
  • SSL/TLS configured
  • Monitoring and alerting setup
  • Load testing completed

Error Handling

Comprehensive Input Validation

from kserve import Model
from kserve.errors import InvalidInput, InferenceError, ModelNotReady

class ProductionModel(Model):
    def predict(self, payload, headers=None):
        # Check model readiness
        if not self.ready:
            raise ModelNotReady(self.name, "Model is still loading")
        
        # Validate input structure
        if "instances" not in payload:
            raise InvalidInput("Missing 'instances' field in request")
        
        instances = payload["instances"]
        
        # Validate input type
        if not isinstance(instances, list):
            raise InvalidInput("'instances' must be a list")
        
        # Validate not empty
        if len(instances) == 0:
            raise InvalidInput("'instances' cannot be empty")
        
        # Validate batch size
        if len(instances) > 32:
            raise InvalidInput(f"Batch size {len(instances)} exceeds maximum of 32")
        
        # Validate instance shape
        for idx, instance in enumerate(instances):
            if len(instance) != 4:
                raise InvalidInput(
                    f"Instance {idx} has {len(instance)} features, expected 4"
                )
        
        # Run inference with error handling
        try:
            predictions = self.model.predict(instances)
            return {"predictions": predictions.tolist()}
        except ValueError as e:
            raise InvalidInput(f"Invalid input values: {e}")
        except Exception as e:
            raise InferenceError(f"Prediction failed: {e}")

Resource Management

Configure Resource Limits

from kserve import (
    V1beta1InferenceService,
    V1beta1InferenceServiceSpec,
    V1beta1PredictorSpec,
    V1beta1SKLearnSpec
)

isvc = V1beta1InferenceService(
    api_version="serving.kserve.io/v1beta1",
    kind="InferenceService",
    metadata={"name": "sklearn-iris", "namespace": "default"},
    spec=V1beta1InferenceServiceSpec(
        predictor=V1beta1PredictorSpec(
            min_replicas=2,  # High availability
            max_replicas=10,  # Auto-scaling
            sklearn=V1beta1SKLearnSpec(
                storage_uri="gs://models/sklearn/iris",
                resources={
                    "limits": {
                        "cpu": "2",
                        "memory": "4Gi",
                        "nvidia.com/gpu": "1"  # GPU if needed
                    },
                    "requests": {
                        "cpu": "1",
                        "memory": "2Gi"
                    }
                }
            )
        )
    )
)

Memory-Efficient Model Loading

from kserve import Model
import gc

class MemoryEfficientModel(Model):
    def load(self):
        """Load model with memory optimization"""
        try:
            # Load model
            self.model = load_model()
            
            # Force garbage collection
            gc.collect()
            
            self.ready = True
            logger.info(f"Model {self.name} loaded, memory optimized")
        except MemoryError as e:
            logger.error(f"Out of memory loading model: {e}")
            gc.collect()
            raise
    
    def predict(self, payload, headers=None):
        try:
            result = self.model.predict(payload["instances"])
            return {"predictions": result.tolist()}
        finally:
            # Cleanup after large predictions
            gc.collect()

Health Checks

Implement Custom Health Checks

from kserve import Model, logger

class HealthCheckedModel(Model):
    def __init__(self, name: str):
        super().__init__(name)
        self.health_check_failures = 0
        self.max_health_failures = 3
        self.last_health_check = None
    
    async def healthy(self) -> bool:
        """Custom health check with test inference"""
        try:
            # Perform test inference
            test_input = {"instances": [[1.0, 2.0, 3.0, 4.0]]}
            result = self.predict(test_input)
            
            # Reset failure count on success
            self.health_check_failures = 0
            self.last_health_check = datetime.now()
            return True
            
        except Exception as e:
            self.health_check_failures += 1
            logger.error(f"Health check failed: {e}")
            
            # Fail health check if too many failures
            if self.health_check_failures >= self.max_health_failures:
                logger.critical(f"Health check failed {self.health_check_failures} times")
                return False
            
            # Still healthy if under threshold
            return True

Logging and Monitoring

Production Logging Setup

from kserve import Model, logger, configure_logging

# Configure structured logging
log_config = {
    "version": 1,
    "formatters": {
        "json": {
            "class": "pythonjsonlogger.jsonlogger.JsonFormatter",
            "format": "%(asctime)s %(name)s %(levelname)s %(message)s"
        }
    },
    "handlers": {
        "console": {
            "class": "logging.StreamHandler",
            "formatter": "json",
            "level": "INFO"
        },
        "file": {
            "class": "logging.handlers.RotatingFileHandler",
            "filename": "/var/log/kserve/model.log",
            "formatter": "json",
            "level": "INFO",
            "maxBytes": 104857600,  # 100MB
            "backupCount": 10
        }
    },
    "loggers": {
        "kserve": {
            "level": "INFO",
            "handlers": ["console", "file"]
        }
    }
}

configure_logging(log_config)

class LoggedModel(Model):
    def predict(self, payload, headers=None):
        logger.info(f"Prediction request", extra={
            "model": self.name,
            "batch_size": len(payload.get("instances", [])),
            "request_id": headers.get("x-request-id") if headers else None
        })
        
        try:
            result = self.model.predict(payload["instances"])
            logger.info(f"Prediction success", extra={
                "model": self.name,
                "output_size": len(result)
            })
            return {"predictions": result.tolist()}
        except Exception as e:
            logger.error(f"Prediction failed", extra={
                "model": self.name,
                "error": str(e)
            }, exc_info=True)
            raise

Prometheus Metrics

from kserve import Model, get_labels
from kserve.metrics import PREDICT_HIST_TIME
from prometheus_client import Counter, Gauge

# Custom metrics
PREDICTION_ERRORS = Counter(
    'prediction_errors_total',
    'Total prediction errors',
    ['model_name', 'error_type']
)

ACTIVE_REQUESTS = Gauge(
    'active_requests',
    'Number of active requests',
    ['model_name']
)

class MetricsModel(Model):
    def predict(self, payload, headers=None):
        labels = get_labels(self.name)
        
        # Track active requests
        ACTIVE_REQUESTS.labels(model_name=self.name).inc()
        
        try:
            result = self.model.predict(payload["instances"])
            return {"predictions": result.tolist()}
        except InvalidInput as e:
            PREDICTION_ERRORS.labels(
                model_name=self.name,
                error_type="invalid_input"
            ).inc()
            raise
        except Exception as e:
            PREDICTION_ERRORS.labels(
                model_name=self.name,
                error_type="inference_error"
            ).inc()
            raise
        finally:
            ACTIVE_REQUESTS.labels(model_name=self.name).dec()

Resilience Patterns

Retry Logic with Exponential Backoff

import asyncio
import random
from kserve import InferenceRESTClient

async def infer_with_retry(
    client: InferenceRESTClient,
    base_url: str,
    model_name: str,
    data: dict,
    max_retries: int = 3
):
    """Make inference request with exponential backoff"""
    for attempt in range(max_retries):
        try:
            return await client.infer(
                base_url=base_url,
                model_name=model_name,
                data=data
            )
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            
            # Exponential backoff with jitter
            wait_time = (2 ** attempt) + random.uniform(0, 1)
            logger.warning(
                f"Inference failed (attempt {attempt + 1}/{max_retries}), "
                f"retrying in {wait_time:.2f}s: {e}"
            )
            await asyncio.sleep(wait_time)

Circuit Breaker Pattern

from kserve import Model
from kserve.errors import InferenceError
from datetime import datetime, timedelta

class CircuitBreakerModel(Model):
    def __init__(self, name: str, failure_threshold: int = 5, timeout: int = 60):
        super().__init__(name)
        self.failure_threshold = failure_threshold
        self.timeout = timedelta(seconds=timeout)
        self.failure_count = 0
        self.last_failure_time = None
        self.circuit_open = False
    
    def predict(self, payload, headers=None):
        # Check circuit breaker state
        if self.circuit_open:
            if datetime.now() - self.last_failure_time > self.timeout:
                # Try to close circuit
                logger.info(f"Circuit breaker timeout expired, attempting to close")
                self.circuit_open = False
                self.failure_count = 0
            else:
                raise InferenceError("Circuit breaker is open, service unavailable")
        
        try:
            result = self.model.predict(payload["instances"])
            self.failure_count = 0  # Reset on success
            return {"predictions": result.tolist()}
            
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.failure_count >= self.failure_threshold:
                self.circuit_open = True
                logger.error(
                    f"Circuit breaker opened after {self.failure_count} failures"
                )
            
            raise InferenceError(f"Prediction failed: {e}")

Graceful Degradation

from kserve import Model, logger

class ResilientModel(Model):
    def __init__(self, name: str):
        super().__init__(name)
        self.fallback_enabled = True
        self.cache = {}
    
    def predict(self, payload, headers=None):
        try:
            # Primary inference path
            result = self.model.predict(payload["instances"])
            
            # Cache successful results
            cache_key = str(payload["instances"])
            self.cache[cache_key] = result
            
            return {"predictions": result.tolist()}
            
        except Exception as e:
            if self.fallback_enabled and cache_key in self.cache:
                logger.warning(
                    f"Primary inference failed, using cached result: {e}"
                )
                return {"predictions": self.cache[cache_key].tolist()}
            else:
                raise InferenceError(f"Inference failed and no fallback available: {e}")

Graceful Shutdown

from kserve import Model, ModelServer
import signal
import sys

class GracefulModel(Model):
    def __init__(self, name: str):
        super().__init__(name)
        self.shutting_down = False
        
        # Register signal handlers
        signal.signal(signal.SIGTERM, self._handle_shutdown)
        signal.signal(signal.SIGINT, self._handle_shutdown)
    
    def _handle_shutdown(self, signum, frame):
        """Handle shutdown signal"""
        logger.info(f"Received shutdown signal for {self.name}")
        self.shutting_down = True
        self.stop()
        sys.exit(0)
    
    def predict(self, payload, headers=None):
        # Reject new requests during shutdown
        if self.shutting_down:
            raise ModelNotReady(self.name, "Model is shutting down")
        
        return {"predictions": self.model.predict(payload["instances"])}
    
    def stop(self):
        """Cleanup resources"""
        logger.info(f"Stopping model {self.name}")
        self.ready = False
        # Close connections, save state, etc.
        if hasattr(self.model, 'close'):
            self.model.close()

SSL/TLS Configuration

HTTPS Server

import uvicorn
from kserve import ModelServer

if __name__ == "__main__":
    model = MyModel("my-model")
    model.load()
    
    server = ModelServer()
    app = server.create_application()
    server.register_model(model)
    
    # Run with SSL
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8443,
        ssl_keyfile="/path/to/key.pem",
        ssl_certfile="/path/to/cert.pem",
        ssl_ca_certs="/path/to/ca.pem"
    )

Client with SSL

from kserve import InferenceRESTClient, RESTConfig

# Configure SSL verification
config = RESTConfig(
    protocol="v2",
    verify=True,  # Verify SSL certificates
    cert=("/path/to/client-cert.pem", "/path/to/client-key.pem")
)

client = InferenceRESTClient(config=config)

Timeout Configuration

Server-Side Timeouts

from kserve import ModelServer

# Configure server timeouts
server = ModelServer(
    http_port=8080,
    workers=4,
    grace_period=30  # Graceful shutdown period
)

Client-Side Timeouts

from kserve import InferenceRESTClient, RESTConfig

# Configure client timeouts
config = RESTConfig(
    protocol="v2",
    timeout=30,  # 30 second default timeout
    retries=3
)

client = InferenceRESTClient(config=config)

# Override per request
response = await client.infer(
    base_url="http://localhost:8080",
    model_name="slow-model",
    data={...},
    timeout=120  # 2 minute timeout for this request
)

High Availability

Multi-Replica Deployment

from kserve import V1beta1PredictorSpec, V1beta1SKLearnSpec

predictor = V1beta1PredictorSpec(
    min_replicas=3,  # Always run 3 replicas
    max_replicas=10,  # Scale up to 10
    scale_target=100,  # Target 100 concurrent requests
    scale_metric="concurrency",
    sklearn=V1beta1SKLearnSpec(
        storage_uri="gs://models/sklearn/iris",
        resources={
            "limits": {"cpu": "2", "memory": "4Gi"},
            "requests": {"cpu": "1", "memory": "2Gi"}
        }
    )
)

Pod Disruption Budget

apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: sklearn-iris-pdb
spec:
  minAvailable: 2
  selector:
    matchLabels:
      serving.kserve.io/inferenceservice: sklearn-iris

Monitoring Setup

Prometheus Configuration

# prometheus.yml
scrape_configs:
  - job_name: 'kserve'
    scrape_interval: 15s
    kubernetes_sd_configs:
      - role: pod
        namespaces:
          names: ['default']
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_label_serving_kserve_io_inferenceservice]
        action: keep
        regex: .+
      - source_labels: [__meta_kubernetes_pod_container_port_number]
        action: keep
        regex: "8080"
    metrics_path: '/metrics'

Grafana Dashboard

{
  "dashboard": {
    "title": "KServe Model Metrics",
    "panels": [
      {
        "title": "Prediction Latency (p95)",
        "targets": [{
          "expr": "histogram_quantile(0.95, rate(request_predict_seconds_bucket[5m]))"
        }]
      },
      {
        "title": "Request Rate",
        "targets": [{
          "expr": "rate(request_predict_seconds_count[1m])"
        }]
      },
      {
        "title": "Error Rate",
        "targets": [{
          "expr": "rate(prediction_errors_total[5m])"
        }]
      }
    ]
  }
}

Alerting Rules

groups:
  - name: kserve_alerts
    rules:
      - alert: HighLatency
        expr: histogram_quantile(0.95, rate(request_predict_seconds_bucket[5m])) > 1.0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High prediction latency detected"
          description: "95th percentile latency is {{ $value }}s"
      
      - alert: HighErrorRate
        expr: rate(prediction_errors_total[5m]) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value }} errors/sec"
      
      - alert: ModelNotReady
        expr: up{job="kserve"} == 0
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Model server is down"

Load Testing

Using Locust

from locust import HttpUser, task, between

class KServeUser(HttpUser):
    wait_time = between(0.1, 0.5)
    
    @task
    def predict(self):
        self.client.post(
            "/v2/models/sklearn-iris/infer",
            json={
                "inputs": [{
                    "name": "input-0",
                    "shape": [1, 4],
                    "datatype": "FP32",
                    "data": [[5.1, 3.5, 1.4, 0.2]]
                }]
            },
            headers={"Content-Type": "application/json"}
        )

Run load test:

locust -f load_test.py --host http://localhost:8080 --users 100 --spawn-rate 10

Security Best Practices

Authentication

from kserve import Configuration, KServeClient

# Configure with API key
config = Configuration(
    host="https://k8s-api.example.com",
    api_key={"authorization": os.getenv("K8S_API_TOKEN")},
    api_key_prefix={"authorization": "Bearer"},
    verify_ssl=True,
    ssl_ca_cert="/etc/ssl/certs/ca.crt"
)

client = KServeClient(client_configuration=config)

Input Sanitization

from kserve import Model
from kserve.errors import InvalidInput
import re

class SecureModel(Model):
    def predict(self, payload, headers=None):
        instances = payload.get("instances", [])
        
        # Validate input size
        if len(instances) > 1000:
            raise InvalidInput("Batch size too large, maximum 1000")
        
        # Sanitize string inputs
        for instance in instances:
            if isinstance(instance, str):
                # Remove potentially dangerous characters
                if re.search(r'[<>\"\'&]', instance):
                    raise InvalidInput("Invalid characters in input")
        
        return {"predictions": self.model.predict(instances)}

Deployment Strategies

Blue-Green Deployment

# Deploy new version
isvc_v2 = V1beta1InferenceService(
    metadata={"name": "sklearn-iris-v2"},
    spec=V1beta1InferenceServiceSpec(
        predictor=V1beta1PredictorSpec(
            sklearn=V1beta1SKLearnSpec(
                storage_uri="gs://models/sklearn/iris/v2"
            )
        )
    )
)

kserve_client.create(isvc_v2, namespace="default", watch=True)

# Test new version
# ... validation ...

# Switch traffic (update ingress or service mesh)
# Delete old version
kserve_client.delete("sklearn-iris-v1", namespace="default")

Canary Deployment

from kserve import V1beta1InferenceService, V1beta1InferenceServiceSpec

# Deploy canary with traffic split
isvc = V1beta1InferenceService(
    metadata={
        "name": "sklearn-iris",
        "annotations": {
            "serving.kserve.io/traffic": "v1=90,v2=10"  # 90% v1, 10% v2
        }
    },
    spec=V1beta1InferenceServiceSpec(
        predictor=V1beta1PredictorSpec(
            canary_traffic_percent=10,  # 10% to canary
            sklearn=V1beta1SKLearnSpec(
                storage_uri="gs://models/sklearn/iris/v2"
            )
        )
    )
)

Container Image Best Practices

Dockerfile Example

FROM python:3.10-slim

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy model code
COPY model.py /app/
COPY model.pkl /mnt/models/

# Set working directory
WORKDIR /app

# Non-root user
RUN useradd -m -u 1000 kserve
USER kserve

# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=60s --retries=3 \
  CMD curl -f http://localhost:8080/v2/health/live || exit 1

# Start server
CMD ["python", "model.py"]

Multi-Stage Build

# Build stage
FROM python:3.10 as builder
WORKDIR /build
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt

# Runtime stage
FROM python:3.10-slim
COPY --from=builder /root/.local /root/.local
COPY model.py /app/
COPY model.pkl /mnt/models/
WORKDIR /app
ENV PATH=/root/.local/bin:$PATH
CMD ["python", "model.py"]

Environment Configuration

import os
from kserve import Model, ModelServer

class ConfigurableModel(Model):
    def __init__(self, name: str):
        super().__init__(name)
        # Load from environment
        self.model_path = os.getenv("MODEL_PATH", "/mnt/models/model.pkl")
        self.batch_size = int(os.getenv("MAX_BATCH_SIZE", "32"))
        self.timeout = int(os.getenv("INFERENCE_TIMEOUT", "30"))
    
    def load(self):
        self.model = joblib.load(self.model_path)
        self.ready = True

if __name__ == "__main__":
    model = ConfigurableModel(os.getenv("MODEL_NAME", "my-model"))
    model.load()
    
    ModelServer(
        http_port=int(os.getenv("HTTP_PORT", "8080")),
        grpc_port=int(os.getenv("GRPC_PORT", "8081")),
        workers=int(os.getenv("WORKERS", "1"))
    ).start([model])

Next Steps

  • Testing Strategies - Unit and integration testing
  • Performance Optimization - Caching and async patterns
  • Edge Cases - Advanced error handling
  • Troubleshooting - Common issues and solutions