or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kserve@0.16.x

docs

index.md
tile.json

tessl/pypi-kserve

tessl install tessl/pypi-kserve@0.16.1

KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.

error-recovery.mddocs/reference/

Error Recovery Strategies

Comprehensive patterns for handling failures and recovering gracefully in production KServe deployments.

Retry Logic with Exponential Backoff

Basic Retry Pattern

import asyncio
import random
from kserve import InferenceRESTClient
from kserve.errors import InferenceError

async def infer_with_retry(
    client: InferenceRESTClient,
    base_url: str,
    model_name: str,
    data: dict,
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0
):
    """Make inference request with exponential backoff"""
    for attempt in range(max_retries):
        try:
            return await client.infer(
                base_url=base_url,
                model_name=model_name,
                data=data
            )
        except Exception as e:
            if attempt == max_retries - 1:
                logger.error(f"All retry attempts failed: {e}")
                raise
            
            # Calculate backoff with jitter
            delay = min(base_delay * (2 ** attempt), max_delay)
            jitter = random.uniform(0, delay * 0.1)
            wait_time = delay + jitter
            
            logger.warning(
                f"Inference failed (attempt {attempt + 1}/{max_retries}), "
                f"retrying in {wait_time:.2f}s: {e}"
            )
            
            await asyncio.sleep(wait_time)

Retry with Backoff Class

import asyncio
import random
from typing import Callable, Any

class ExponentialBackoff:
    """Exponential backoff retry helper"""
    
    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 30.0,
        jitter: bool = True
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.jitter = jitter
    
    async def execute(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with retry logic"""
        for attempt in range(self.max_retries):
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise
                
                delay = min(self.base_delay * (2 ** attempt), self.max_delay)
                if self.jitter:
                    delay += random.uniform(0, delay * 0.1)
                
                logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s")
                await asyncio.sleep(delay)

# Usage
backoff = ExponentialBackoff(max_retries=5, base_delay=1.0, max_delay=60.0)
result = await backoff.execute(
    client.infer,
    base_url="http://localhost:8080",
    model_name="my-model",
    data=data
)

Circuit Breaker Pattern

Basic Circuit Breaker

from kserve import Model
from kserve.errors import InferenceError
from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """Circuit breaker for fault tolerance"""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        timeout_seconds: int = 60,
        half_open_max_calls: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timedelta(seconds=timeout_seconds)
        self.half_open_max_calls = half_open_max_calls
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
        self.half_open_calls = 0
    
    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection"""
        if self.state == CircuitState.OPEN:
            if datetime.now() - self.last_failure_time > self.timeout:
                logger.info("Circuit breaker transitioning to half-open")
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
            else:
                raise InferenceError("Circuit breaker is open")
        
        if self.state == CircuitState.HALF_OPEN:
            if self.half_open_calls >= self.half_open_max_calls:
                raise InferenceError("Circuit breaker half-open limit reached")
            self.half_open_calls += 1
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        """Handle successful call"""
        if self.state == CircuitState.HALF_OPEN:
            logger.info("Circuit breaker closing after successful calls")
            self.state = CircuitState.CLOSED
        
        self.failure_count = 0
        self.half_open_calls = 0
    
    def _on_failure(self):
        """Handle failed call"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        
        if self.failure_count >= self.failure_threshold:
            logger.error(f"Circuit breaker opening after {self.failure_count} failures")
            self.state = CircuitState.OPEN

class CircuitBreakerModel(Model):
    def __init__(self, name: str):
        super().__init__(name)
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=5,
            timeout_seconds=60
        )
    
    def predict(self, payload, headers=None):
        """Predict with circuit breaker protection"""
        return self.circuit_breaker.call(
            self._do_predict,
            payload
        )
    
    def _do_predict(self, payload):
        """Actual prediction logic"""
        instances = payload["instances"]
        predictions = self.model.predict(instances)
        return {"predictions": predictions.tolist()}

Bulkhead Pattern

Isolate Resources

from kserve import Model
from concurrent.futures import ThreadPoolExecutor
import asyncio

class BulkheadModel(Model):
    """Isolate resources using bulkhead pattern"""
    
    def __init__(self, name: str):
        super().__init__(name)
        # Separate thread pools for different operations
        self.preprocessing_pool = ThreadPoolExecutor(max_workers=2)
        self.inference_pool = ThreadPoolExecutor(max_workers=4)
        self.postprocessing_pool = ThreadPoolExecutor(max_workers=2)
    
    async def preprocess(self, body, headers=None):
        """Preprocess in isolated pool"""
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(
            self.preprocessing_pool,
            self._do_preprocess,
            body
        )
        return result
    
    async def predict(self, payload, headers=None):
        """Predict in isolated pool"""
        loop = asyncio.get_event_loop()
        instances = payload["instances"]
        
        predictions = await loop.run_in_executor(
            self.inference_pool,
            self.model.predict,
            instances
        )
        
        return {"predictions": predictions.tolist()}
    
    async def postprocess(self, response, headers=None):
        """Postprocess in isolated pool"""
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(
            self.postprocessing_pool,
            self._do_postprocess,
            response
        )
        return result
    
    def stop(self):
        """Shutdown all pools"""
        self.preprocessing_pool.shutdown(wait=True)
        self.inference_pool.shutdown(wait=True)
        self.postprocessing_pool.shutdown(wait=True)

Fallback Pattern

Primary-Fallback Model

from kserve import Model

class FallbackModel(Model):
    def __init__(self, name: str, fallback_model_path: str):
        super().__init__(name)
        self.fallback_model_path = fallback_model_path
        self.fallback_model = None
    
    def load(self):
        """Load primary and fallback models"""
        # Load primary model
        self.model = joblib.load("/mnt/models/primary_model.pkl")
        
        # Load fallback model (simpler, faster)
        self.fallback_model = joblib.load(self.fallback_model_path)
        
        self.ready = True
        logger.info("Primary and fallback models loaded")
    
    def predict(self, payload, headers=None):
        """Predict with fallback on failure"""
        instances = payload["instances"]
        
        try:
            # Try primary model
            predictions = self.model.predict(instances)
            return {
                "predictions": predictions.tolist(),
                "model_used": "primary"
            }
        except Exception as e:
            logger.warning(f"Primary model failed, using fallback: {e}")
            
            try:
                # Fallback to simpler model
                predictions = self.fallback_model.predict(instances)
                return {
                    "predictions": predictions.tolist(),
                    "model_used": "fallback"
                }
            except Exception as e2:
                logger.error(f"Fallback also failed: {e2}")
                raise InferenceError("Both primary and fallback models failed")

Timeout Pattern

Request Timeout Management

from kserve import Model
import asyncio
import signal

class TimeoutModel(Model):
    def __init__(self, name: str, prediction_timeout: float = 30.0):
        super().__init__(name)
        self.prediction_timeout = prediction_timeout
    
    async def predict(self, payload, headers=None):
        """Predict with timeout"""
        try:
            result = await asyncio.wait_for(
                self._do_predict(payload),
                timeout=self.prediction_timeout
            )
            return result
        except asyncio.TimeoutError:
            logger.error(f"Prediction timeout after {self.prediction_timeout}s")
            raise InferenceError(f"Prediction timeout after {self.prediction_timeout}s")
    
    async def _do_predict(self, payload):
        """Actual prediction logic"""
        instances = payload["instances"]
        
        # Run in thread pool to avoid blocking
        loop = asyncio.get_event_loop()
        predictions = await loop.run_in_executor(
            None,
            self.model.predict,
            instances
        )
        
        return {"predictions": predictions.tolist()}

Health Check Pattern

Advanced Health Checks

from kserve import Model
from datetime import datetime, timedelta

class AdvancedHealthModel(Model):
    def __init__(self, name: str):
        super().__init__(name)
        self.health_check_interval = timedelta(seconds=30)
        self.last_health_check = None
        self.health_check_failures = 0
        self.max_failures = 3
        self.is_healthy = True
    
    async def healthy(self) -> bool:
        """Comprehensive health check"""
        now = datetime.now()
        
        # Skip if recently checked
        if (self.last_health_check and 
            now - self.last_health_check < self.health_check_interval):
            return self.is_healthy
        
        try:
            # Check model is loaded
            if not self.ready or self.model is None:
                raise RuntimeError("Model not loaded")
            
            # Test inference
            test_input = {"instances": [[1.0, 2.0, 3.0, 4.0]]}
            result = self.predict(test_input)
            
            if "predictions" not in result:
                raise RuntimeError("Invalid prediction response")
            
            # Check memory
            import psutil
            memory = psutil.virtual_memory()
            if memory.percent > 95:
                raise RuntimeError(f"Memory usage critical: {memory.percent}%")
            
            # Health check passed
            self.health_check_failures = 0
            self.is_healthy = True
            self.last_health_check = now
            
            return True
            
        except Exception as e:
            self.health_check_failures += 1
            logger.error(f"Health check failed: {e}")
            
            if self.health_check_failures >= self.max_failures:
                logger.critical(f"Health check failed {self.health_check_failures} times")
                self.is_healthy = False
                return False
            
            # Still report healthy if under threshold
            return True

Next Steps

  • Design Patterns - More patterns
  • Troubleshooting - Debug issues
  • Edge Cases - Handle edge cases