tessl install tessl/pypi-kserve@0.16.1KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.
Comprehensive patterns for handling failures and recovering gracefully in production KServe deployments.
import asyncio
import random
from kserve import InferenceRESTClient
from kserve.errors import InferenceError
async def infer_with_retry(
client: InferenceRESTClient,
base_url: str,
model_name: str,
data: dict,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0
):
"""Make inference request with exponential backoff"""
for attempt in range(max_retries):
try:
return await client.infer(
base_url=base_url,
model_name=model_name,
data=data
)
except Exception as e:
if attempt == max_retries - 1:
logger.error(f"All retry attempts failed: {e}")
raise
# Calculate backoff with jitter
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
wait_time = delay + jitter
logger.warning(
f"Inference failed (attempt {attempt + 1}/{max_retries}), "
f"retrying in {wait_time:.2f}s: {e}"
)
await asyncio.sleep(wait_time)import asyncio
import random
from typing import Callable, Any
class ExponentialBackoff:
"""Exponential backoff retry helper"""
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
jitter: bool = True
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.jitter = jitter
async def execute(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with retry logic"""
for attempt in range(self.max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
raise
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
if self.jitter:
delay += random.uniform(0, delay * 0.1)
logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s")
await asyncio.sleep(delay)
# Usage
backoff = ExponentialBackoff(max_retries=5, base_delay=1.0, max_delay=60.0)
result = await backoff.execute(
client.infer,
base_url="http://localhost:8080",
model_name="my-model",
data=data
)from kserve import Model
from kserve.errors import InferenceError
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker for fault tolerance"""
def __init__(
self,
failure_threshold: int = 5,
timeout_seconds: int = 60,
half_open_max_calls: int = 3
):
self.failure_threshold = failure_threshold
self.timeout = timedelta(seconds=timeout_seconds)
self.half_open_max_calls = half_open_max_calls
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self.half_open_calls = 0
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if datetime.now() - self.last_failure_time > self.timeout:
logger.info("Circuit breaker transitioning to half-open")
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise InferenceError("Circuit breaker is open")
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
raise InferenceError("Circuit breaker half-open limit reached")
self.half_open_calls += 1
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Handle successful call"""
if self.state == CircuitState.HALF_OPEN:
logger.info("Circuit breaker closing after successful calls")
self.state = CircuitState.CLOSED
self.failure_count = 0
self.half_open_calls = 0
def _on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
logger.error(f"Circuit breaker opening after {self.failure_count} failures")
self.state = CircuitState.OPEN
class CircuitBreakerModel(Model):
def __init__(self, name: str):
super().__init__(name)
self.circuit_breaker = CircuitBreaker(
failure_threshold=5,
timeout_seconds=60
)
def predict(self, payload, headers=None):
"""Predict with circuit breaker protection"""
return self.circuit_breaker.call(
self._do_predict,
payload
)
def _do_predict(self, payload):
"""Actual prediction logic"""
instances = payload["instances"]
predictions = self.model.predict(instances)
return {"predictions": predictions.tolist()}from kserve import Model
from concurrent.futures import ThreadPoolExecutor
import asyncio
class BulkheadModel(Model):
"""Isolate resources using bulkhead pattern"""
def __init__(self, name: str):
super().__init__(name)
# Separate thread pools for different operations
self.preprocessing_pool = ThreadPoolExecutor(max_workers=2)
self.inference_pool = ThreadPoolExecutor(max_workers=4)
self.postprocessing_pool = ThreadPoolExecutor(max_workers=2)
async def preprocess(self, body, headers=None):
"""Preprocess in isolated pool"""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
self.preprocessing_pool,
self._do_preprocess,
body
)
return result
async def predict(self, payload, headers=None):
"""Predict in isolated pool"""
loop = asyncio.get_event_loop()
instances = payload["instances"]
predictions = await loop.run_in_executor(
self.inference_pool,
self.model.predict,
instances
)
return {"predictions": predictions.tolist()}
async def postprocess(self, response, headers=None):
"""Postprocess in isolated pool"""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
self.postprocessing_pool,
self._do_postprocess,
response
)
return result
def stop(self):
"""Shutdown all pools"""
self.preprocessing_pool.shutdown(wait=True)
self.inference_pool.shutdown(wait=True)
self.postprocessing_pool.shutdown(wait=True)from kserve import Model
class FallbackModel(Model):
def __init__(self, name: str, fallback_model_path: str):
super().__init__(name)
self.fallback_model_path = fallback_model_path
self.fallback_model = None
def load(self):
"""Load primary and fallback models"""
# Load primary model
self.model = joblib.load("/mnt/models/primary_model.pkl")
# Load fallback model (simpler, faster)
self.fallback_model = joblib.load(self.fallback_model_path)
self.ready = True
logger.info("Primary and fallback models loaded")
def predict(self, payload, headers=None):
"""Predict with fallback on failure"""
instances = payload["instances"]
try:
# Try primary model
predictions = self.model.predict(instances)
return {
"predictions": predictions.tolist(),
"model_used": "primary"
}
except Exception as e:
logger.warning(f"Primary model failed, using fallback: {e}")
try:
# Fallback to simpler model
predictions = self.fallback_model.predict(instances)
return {
"predictions": predictions.tolist(),
"model_used": "fallback"
}
except Exception as e2:
logger.error(f"Fallback also failed: {e2}")
raise InferenceError("Both primary and fallback models failed")from kserve import Model
import asyncio
import signal
class TimeoutModel(Model):
def __init__(self, name: str, prediction_timeout: float = 30.0):
super().__init__(name)
self.prediction_timeout = prediction_timeout
async def predict(self, payload, headers=None):
"""Predict with timeout"""
try:
result = await asyncio.wait_for(
self._do_predict(payload),
timeout=self.prediction_timeout
)
return result
except asyncio.TimeoutError:
logger.error(f"Prediction timeout after {self.prediction_timeout}s")
raise InferenceError(f"Prediction timeout after {self.prediction_timeout}s")
async def _do_predict(self, payload):
"""Actual prediction logic"""
instances = payload["instances"]
# Run in thread pool to avoid blocking
loop = asyncio.get_event_loop()
predictions = await loop.run_in_executor(
None,
self.model.predict,
instances
)
return {"predictions": predictions.tolist()}from kserve import Model
from datetime import datetime, timedelta
class AdvancedHealthModel(Model):
def __init__(self, name: str):
super().__init__(name)
self.health_check_interval = timedelta(seconds=30)
self.last_health_check = None
self.health_check_failures = 0
self.max_failures = 3
self.is_healthy = True
async def healthy(self) -> bool:
"""Comprehensive health check"""
now = datetime.now()
# Skip if recently checked
if (self.last_health_check and
now - self.last_health_check < self.health_check_interval):
return self.is_healthy
try:
# Check model is loaded
if not self.ready or self.model is None:
raise RuntimeError("Model not loaded")
# Test inference
test_input = {"instances": [[1.0, 2.0, 3.0, 4.0]]}
result = self.predict(test_input)
if "predictions" not in result:
raise RuntimeError("Invalid prediction response")
# Check memory
import psutil
memory = psutil.virtual_memory()
if memory.percent > 95:
raise RuntimeError(f"Memory usage critical: {memory.percent}%")
# Health check passed
self.health_check_failures = 0
self.is_healthy = True
self.last_health_check = now
return True
except Exception as e:
self.health_check_failures += 1
logger.error(f"Health check failed: {e}")
if self.health_check_failures >= self.max_failures:
logger.critical(f"Health check failed {self.health_check_failures} times")
self.is_healthy = False
return False
# Still report healthy if under threshold
return True