tessl install tessl/pypi-kserve@0.16.1KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.
Common issues and solutions for KServe model servers.
from kserve import Model, logger
class DebuggableModel(Model):
def load(self):
"""Load with detailed logging"""
try:
logger.info(f"Starting model load for {self.name}")
logger.info(f"Model path: {self.model_path}")
# Check file exists
import os
if not os.path.exists(self.model_path):
logger.error(f"Model file not found: {self.model_path}")
raise FileNotFoundError(self.model_path)
# Check file size
size_mb = os.path.getsize(self.model_path) / (1024 * 1024)
logger.info(f"Model file size: {size_mb:.2f}MB")
# Load model
logger.info("Loading model...")
self.model = joblib.load(self.model_path)
logger.info("Model loaded successfully")
# Test prediction
logger.info("Testing model...")
test_result = self.model.predict([[1, 2, 3, 4]])
logger.info(f"Test prediction successful: {test_result}")
self.ready = True
logger.info(f"Model {self.name} is ready")
except Exception as e:
logger.error(f"Failed to load model {self.name}: {e}", exc_info=True)
self.ready = False
raise# Kubernetes
kubectl logs <pod-name> -n <namespace>
# Docker
docker logs <container-id># Check file exists
ls -lh /mnt/models/model.pkl
# Check permissions
ls -la /mnt/models/
# Check disk space
df -h /mnt/models/import joblib
try:
model = joblib.load("/mnt/models/model.pkl")
print("Model loaded successfully")
print(f"Model type: {type(model)}")
# Test prediction
result = model.predict([[1, 2, 3, 4]])
print(f"Test prediction: {result}")
except Exception as e:
print(f"Failed to load: {e}")from kserve import Model, logger
import psutil
import gc
class MemoryMonitoredModel(Model):
def load(self):
"""Load with memory monitoring"""
# Check memory before load
mem_before = psutil.virtual_memory()
logger.info(f"Memory before load: {mem_before.percent}% used")
self.model = joblib.load("/mnt/models/model.pkl")
# Check memory after load
mem_after = psutil.virtual_memory()
logger.info(f"Memory after load: {mem_after.percent}% used")
logger.info(f"Model memory: {mem_after.used - mem_before.used} bytes")
self.ready = True
def predict(self, payload, headers=None):
"""Predict with memory monitoring"""
mem_before = psutil.virtual_memory()
predictions = self.model.predict(payload["instances"])
mem_after = psutil.virtual_memory()
logger.debug(f"Prediction memory delta: {mem_after.used - mem_before.used} bytes")
return {"predictions": predictions.tolist()}resources:
limits:
memory: "8Gi" # Increase from 4Gi
requests:
memory: "4Gi"import gc
from kserve import Model
class GCModel(Model):
def predict(self, payload, headers=None):
try:
result = self.model.predict(payload["instances"])
return {"predictions": result.tolist()}
finally:
gc.collect()import torch
# Quantize model to reduce memory
model = torch.quantization.quantize_dynamic(
model,
{torch.nn.Linear},
dtype=torch.qint8
)class LazyModel(Model):
def load(self):
# Don't load model yet
self.ready = True
def predict(self, payload, headers=None):
# Load on first use
if self.model is None:
self.model = joblib.load("/mnt/models/model.pkl")
return {"predictions": self.model.predict(payload["instances"])}from kserve import InferenceRESTClient, RESTConfig
import time
async def diagnose_timeout():
"""Diagnose connection timeout issues"""
client = InferenceRESTClient()
# Test with increasing timeouts
for timeout in [5, 10, 30, 60]:
try:
start = time.time()
response = await client.infer(
base_url="http://localhost:8080",
model_name="my-model",
data={"instances": [[1, 2, 3, 4]]},
timeout=timeout
)
elapsed = time.time() - start
print(f"Success with {timeout}s timeout (took {elapsed:.2f}s)")
break
except Exception as e:
print(f"Failed with {timeout}s timeout: {e}")
await client.close()config = RESTConfig(
protocol="v2",
timeout=120, # Increase to 2 minutes
retries=1
)
client = InferenceRESTClient(config=config)# Use smaller batch sizes
# Enable GPU acceleration
# Optimize preprocessing# Test connection
curl -v http://localhost:8080/v2/health/live
# Check DNS resolution
nslookup model-service.default.svc.cluster.local
# Test latency
ping model-service.default.svc.cluster.localfrom kserve import InferenceGRPCClient
import grpc
import asyncio
async def resilient_grpc_client():
"""Create resilient gRPC client"""
# Configure channel options
channel_args = [
('grpc.keepalive_time_ms', 30000),
('grpc.keepalive_timeout_ms', 10000),
('grpc.keepalive_permit_without_calls', True),
('grpc.http2.max_pings_without_data', 0),
('grpc.max_connection_idle_ms', 60000),
('grpc.max_connection_age_ms', 300000)
]
client = InferenceGRPCClient(
url="localhost:8081",
channel_args=channel_args,
timeout=60,
retries=3
)
return clientfrom kserve import KServeClient
# Check S3 credentials
client = KServeClient()
client.set_credentials(
storage_type="S3",
namespace="default",
service_account="kserve-sa",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)# Test S3 access
aws s3 ls s3://my-bucket/models/
# Test GCS access
gsutil ls gs://my-bucket/models/
# Test Azure access
az storage blob list --account-name myaccount --container-name models# Kubernetes
kubectl get serviceaccount kserve-sa -n default -o yaml
# Check role bindings
kubectl get rolebindings -n default | grep kservefrom kserve import InferenceRESTClient, RESTConfig
# Ensure protocol matches server
# For v1 protocol
v1_config = RESTConfig(protocol="v1")
v1_client = InferenceRESTClient(config=v1_config)
# For v2 protocol
v2_config = RESTConfig(protocol="v2")
v2_client = InferenceRESTClient(config=v2_config)
# Check server protocol
response = await client.get_server_metadata(base_url="http://localhost:8080")
print(f"Server version: {response}")import cProfile
import pstats
from kserve import Model
class ProfiledModel(Model):
def predict(self, payload, headers=None):
"""Profile prediction performance"""
profiler = cProfile.Profile()
profiler.enable()
result = self.model.predict(payload["instances"])
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20) # Top 20 functions
return {"predictions": result.tolist()}from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_predict(input_tuple):
return model.predict([list(input_tuple)])# Increase batch size
ModelServer(max_batch_size=64).start([model])import torch
model = model.to('cuda')python model.py --workers 4 --max_threads 8Check events:
kubectl describe pod <pod-name> -n <namespace>
kubectl get events -n <namespace> --sort-by='.lastTimestamp'Common causes:
Check status:
kubectl get inferenceservice sklearn-iris -n default -o yaml
kubectl describe inferenceservice sklearn-iris -n defaultCheck underlying resources:
kubectl get pods -n default -l serving.kserve.io/inferenceservice=sklearn-iris
kubectl get services -n default -l serving.kserve.io/inferenceservice=sklearn-iris
kubectl get virtualservices -n defaultimport os
os.environ["KSERVE_LOGLEVEL"] = "DEBUG"
from kserve import logger
logger.setLevel("DEBUG")python model.py --enable_docs_url true
# Access at http://localhost:8080/docs# Get Prometheus metrics
curl http://localhost:8080/metrics
# Check specific metric
curl http://localhost:8080/metrics | grep request_predict_seconds