or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kserve@0.16.x

docs

examples

edge-cases.mdintegration-patterns.mdreal-world-scenarios.md
index.md
tile.json

tessl/pypi-kserve

tessl install tessl/pypi-kserve@0.16.1

KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.

integration-patterns.mddocs/examples/

Integration Patterns

Patterns for integrating KServe with other systems and services including databases, message queues, feature stores, and monitoring systems.

Database Integration

PostgreSQL for Model Metadata

from kserve import Model
import psycopg2
from psycopg2.pool import SimpleConnectionPool

class DatabaseIntegratedModel(Model):
    def __init__(self, name: str, db_config: dict):
        super().__init__(name)
        self.db_pool = SimpleConnectionPool(
            minconn=1,
            maxconn=10,
            **db_config
        )
    
    def load(self):
        """Load model and metadata from database"""
        conn = self.db_pool.getconn()
        try:
            cursor = conn.cursor()
            
            # Get model metadata
            cursor.execute(
                "SELECT model_path, version, config FROM models WHERE name = %s",
                (self.name,)
            )
            model_path, version, config = cursor.fetchone()
            
            # Load model
            self.model = joblib.load(model_path)
            self.version = version
            self.config = json.loads(config)
            self.ready = True
            
            logger.info(f"Loaded {self.name} version {version}")
            
        finally:
            self.db_pool.putconn(conn)
    
    def predict(self, payload, headers=None):
        """Predict and log to database"""
        instances = payload["instances"]
        predictions = self.model.predict(instances)
        
        # Log prediction to database
        conn = self.db_pool.getconn()
        try:
            cursor = conn.cursor()
            cursor.execute(
                "INSERT INTO predictions (model_name, input, output, timestamp) "
                "VALUES (%s, %s, %s, NOW())",
                (self.name, json.dumps(instances), json.dumps(predictions.tolist()))
            )
            conn.commit()
        except Exception as e:
            logger.error(f"Failed to log prediction: {e}")
        finally:
            self.db_pool.putconn(conn)
        
        return {"predictions": predictions.tolist()}
    
    def stop(self):
        """Close database connections"""
        self.db_pool.closeall()

Message Queue Integration

Kafka for Async Inference

from kserve import Model, ModelServer
from kafka import KafkaConsumer, KafkaProducer
import json
import threading

class KafkaModel(Model):
    def __init__(self, name: str, kafka_brokers: List[str]):
        super().__init__(name)
        self.kafka_brokers = kafka_brokers
        self.consumer = None
        self.producer = None
        self.processing_thread = None
    
    def load(self):
        """Load model and setup Kafka"""
        self.model = joblib.load("/mnt/models/model.pkl")
        
        # Setup Kafka consumer
        self.consumer = KafkaConsumer(
            'inference-requests',
            bootstrap_servers=self.kafka_brokers,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            group_id=f'kserve-{self.name}'
        )
        
        # Setup Kafka producer
        self.producer = KafkaProducer(
            bootstrap_servers=self.kafka_brokers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        # Start processing thread
        self.processing_thread = threading.Thread(
            target=self._process_kafka_messages,
            daemon=True
        )
        self.processing_thread.start()
        
        self.ready = True
        logger.info(f"Kafka integration ready for {self.name}")
    
    def _process_kafka_messages(self):
        """Process messages from Kafka"""
        for message in self.consumer:
            try:
                request = message.value
                request_id = request.get("request_id")
                instances = request.get("instances")
                
                # Run prediction
                predictions = self.model.predict(instances)
                
                # Send response
                response = {
                    "request_id": request_id,
                    "predictions": predictions.tolist(),
                    "model_name": self.name
                }
                
                self.producer.send('inference-responses', response)
                logger.info(f"Processed request {request_id}")
                
            except Exception as e:
                logger.error(f"Failed to process message: {e}", exc_info=True)
    
    def predict(self, payload, headers=None):
        """Synchronous prediction for HTTP requests"""
        return {"predictions": self.model.predict(payload["instances"])}
    
    def stop(self):
        """Cleanup Kafka resources"""
        if self.consumer:
            self.consumer.close()
        if self.producer:
            self.producer.close()

Feature Store Integration

Feast Feature Store

from kserve import Model
from feast import FeatureStore
import pandas as pd

class FeastIntegratedModel(Model):
    def __init__(self, name: str, feast_repo_path: str):
        super().__init__(name)
        self.feast_repo_path = feast_repo_path
        self.feature_store = None
    
    def load(self):
        """Load model and feature store"""
        self.model = joblib.load("/mnt/models/model.pkl")
        self.feature_store = FeatureStore(repo_path=self.feast_repo_path)
        self.ready = True
    
    async def preprocess(self, body, headers=None):
        """Fetch features from Feast"""
        entity_ids = body["entity_ids"]
        
        # Create entity dataframe
        entity_df = pd.DataFrame({
            "entity_id": entity_ids,
            "event_timestamp": [pd.Timestamp.now()] * len(entity_ids)
        })
        
        # Get features
        features = self.feature_store.get_online_features(
            features=[
                "user_features:age",
                "user_features:income",
                "user_features:credit_score"
            ],
            entity_df=entity_df
        ).to_df()
        
        # Convert to instances
        instances = features[["age", "income", "credit_score"]].values.tolist()
        
        return {"instances": instances}
    
    def predict(self, payload, headers=None):
        """Predict using fetched features"""
        instances = payload["instances"]
        predictions = self.model.predict(instances)
        return {"predictions": predictions.tolist()}

Monitoring Integration

Datadog Integration

from kserve import Model, logger
from datadog import initialize, statsd
import time

class DatadogModel(Model):
    def __init__(self, name: str, datadog_config: dict):
        super().__init__(name)
        initialize(**datadog_config)
        self.statsd = statsd
    
    def load(self):
        self.model = joblib.load("/mnt/models/model.pkl")
        self.ready = True
        
        # Send metric
        self.statsd.increment('model.loaded', tags=[f"model:{self.name}"])
    
    def predict(self, payload, headers=None):
        """Predict with Datadog metrics"""
        start = time.time()
        
        try:
            instances = payload["instances"]
            
            # Send batch size metric
            self.statsd.histogram(
                'model.batch_size',
                len(instances),
                tags=[f"model:{self.name}"]
            )
            
            # Run prediction
            predictions = self.model.predict(instances)
            
            # Send latency metric
            latency = time.time() - start
            self.statsd.histogram(
                'model.prediction.latency',
                latency,
                tags=[f"model:{self.name}"]
            )
            
            # Send success metric
            self.statsd.increment(
                'model.prediction.success',
                tags=[f"model:{self.name}"]
            )
            
            return {"predictions": predictions.tolist()}
            
        except Exception as e:
            # Send error metric
            self.statsd.increment(
                'model.prediction.error',
                tags=[f"model:{self.name}", f"error_type:{type(e).__name__}"]
            )
            raise

OpenTelemetry Integration

from kserve import Model
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer(__name__)

class TracedModel(Model):
    def predict(self, payload, headers=None):
        """Predict with distributed tracing"""
        with tracer.start_as_current_span("model.predict") as span:
            span.set_attribute("model.name", self.name)
            span.set_attribute("batch.size", len(payload.get("instances", [])))
            
            try:
                instances = payload["instances"]
                
                with tracer.start_as_current_span("model.inference"):
                    predictions = self.model.predict(instances)
                
                span.set_status(Status(StatusCode.OK))
                return {"predictions": predictions.tolist()}
                
            except Exception as e:
                span.set_status(Status(StatusCode.ERROR, str(e)))
                span.record_exception(e)
                raise

Service Mesh Integration

Istio with mTLS

# InferenceService with Istio mTLS
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: sklearn-iris
  namespace: default
  annotations:
    sidecar.istio.io/inject: "true"
    traffic.sidecar.istio.io/includeInboundPorts: "8080,8081"
spec:
  predictor:
    sklearn:
      storageUri: gs://models/sklearn/iris
from kserve import InferenceRESTClient, RESTConfig

# Client configuration for Istio mTLS
config = RESTConfig(
    protocol="v2",
    verify="/etc/certs/root-cert.pem",
    cert=("/etc/certs/cert-chain.pem", "/etc/certs/key.pem")
)

client = InferenceRESTClient(config=config)

Cloud Provider Integration

AWS S3 with IAM Roles

from kserve import KServeClient

def deploy_with_s3_iam():
    """Deploy model using S3 with IAM role"""
    client = KServeClient()
    
    # Create service account with IAM role annotation
    from kubernetes import client as k8s_client
    
    sa = k8s_client.V1ServiceAccount(
        metadata=k8s_client.V1ObjectMeta(
            name="kserve-sa",
            namespace="default",
            annotations={
                "eks.amazonaws.com/role-arn": "arn:aws:iam::123456789:role/kserve-s3-role"
            }
        )
    )
    
    # Create InferenceService using service account
    isvc = V1beta1InferenceService(
        metadata={"name": "sklearn-s3", "namespace": "default"},
        spec=V1beta1InferenceServiceSpec(
            predictor=V1beta1PredictorSpec(
                service_account_name="kserve-sa",
                sklearn=V1beta1SKLearnSpec(
                    storage_uri="s3://my-bucket/models/sklearn/iris"
                )
            )
        )
    )
    
    client.create(isvc, namespace="default")

GCP with Workload Identity

def deploy_with_workload_identity():
    """Deploy model using GCP Workload Identity"""
    client = KServeClient()
    
    isvc = V1beta1InferenceService(
        metadata={
            "name": "sklearn-gcs",
            "namespace": "default"
        },
        spec=V1beta1InferenceServiceSpec(
            predictor=V1beta1PredictorSpec(
                service_account_name="kserve-gcs-sa",  # Bound to GCP SA
                sklearn=V1beta1SKLearnSpec(
                    storage_uri="gs://my-bucket/models/sklearn/iris"
                )
            )
        )
    )
    
    client.create(isvc, namespace="default")

Next Steps

  • Real-World Scenarios - Complete implementations
  • Edge Cases - More edge case handling
  • Production Deployment - Production best practices