tessl install tessl/pypi-kserve@0.16.1KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.
Patterns for integrating KServe with other systems and services including databases, message queues, feature stores, and monitoring systems.
from kserve import Model
import psycopg2
from psycopg2.pool import SimpleConnectionPool
class DatabaseIntegratedModel(Model):
def __init__(self, name: str, db_config: dict):
super().__init__(name)
self.db_pool = SimpleConnectionPool(
minconn=1,
maxconn=10,
**db_config
)
def load(self):
"""Load model and metadata from database"""
conn = self.db_pool.getconn()
try:
cursor = conn.cursor()
# Get model metadata
cursor.execute(
"SELECT model_path, version, config FROM models WHERE name = %s",
(self.name,)
)
model_path, version, config = cursor.fetchone()
# Load model
self.model = joblib.load(model_path)
self.version = version
self.config = json.loads(config)
self.ready = True
logger.info(f"Loaded {self.name} version {version}")
finally:
self.db_pool.putconn(conn)
def predict(self, payload, headers=None):
"""Predict and log to database"""
instances = payload["instances"]
predictions = self.model.predict(instances)
# Log prediction to database
conn = self.db_pool.getconn()
try:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO predictions (model_name, input, output, timestamp) "
"VALUES (%s, %s, %s, NOW())",
(self.name, json.dumps(instances), json.dumps(predictions.tolist()))
)
conn.commit()
except Exception as e:
logger.error(f"Failed to log prediction: {e}")
finally:
self.db_pool.putconn(conn)
return {"predictions": predictions.tolist()}
def stop(self):
"""Close database connections"""
self.db_pool.closeall()from kserve import Model, ModelServer
from kafka import KafkaConsumer, KafkaProducer
import json
import threading
class KafkaModel(Model):
def __init__(self, name: str, kafka_brokers: List[str]):
super().__init__(name)
self.kafka_brokers = kafka_brokers
self.consumer = None
self.producer = None
self.processing_thread = None
def load(self):
"""Load model and setup Kafka"""
self.model = joblib.load("/mnt/models/model.pkl")
# Setup Kafka consumer
self.consumer = KafkaConsumer(
'inference-requests',
bootstrap_servers=self.kafka_brokers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id=f'kserve-{self.name}'
)
# Setup Kafka producer
self.producer = KafkaProducer(
bootstrap_servers=self.kafka_brokers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Start processing thread
self.processing_thread = threading.Thread(
target=self._process_kafka_messages,
daemon=True
)
self.processing_thread.start()
self.ready = True
logger.info(f"Kafka integration ready for {self.name}")
def _process_kafka_messages(self):
"""Process messages from Kafka"""
for message in self.consumer:
try:
request = message.value
request_id = request.get("request_id")
instances = request.get("instances")
# Run prediction
predictions = self.model.predict(instances)
# Send response
response = {
"request_id": request_id,
"predictions": predictions.tolist(),
"model_name": self.name
}
self.producer.send('inference-responses', response)
logger.info(f"Processed request {request_id}")
except Exception as e:
logger.error(f"Failed to process message: {e}", exc_info=True)
def predict(self, payload, headers=None):
"""Synchronous prediction for HTTP requests"""
return {"predictions": self.model.predict(payload["instances"])}
def stop(self):
"""Cleanup Kafka resources"""
if self.consumer:
self.consumer.close()
if self.producer:
self.producer.close()from kserve import Model
from feast import FeatureStore
import pandas as pd
class FeastIntegratedModel(Model):
def __init__(self, name: str, feast_repo_path: str):
super().__init__(name)
self.feast_repo_path = feast_repo_path
self.feature_store = None
def load(self):
"""Load model and feature store"""
self.model = joblib.load("/mnt/models/model.pkl")
self.feature_store = FeatureStore(repo_path=self.feast_repo_path)
self.ready = True
async def preprocess(self, body, headers=None):
"""Fetch features from Feast"""
entity_ids = body["entity_ids"]
# Create entity dataframe
entity_df = pd.DataFrame({
"entity_id": entity_ids,
"event_timestamp": [pd.Timestamp.now()] * len(entity_ids)
})
# Get features
features = self.feature_store.get_online_features(
features=[
"user_features:age",
"user_features:income",
"user_features:credit_score"
],
entity_df=entity_df
).to_df()
# Convert to instances
instances = features[["age", "income", "credit_score"]].values.tolist()
return {"instances": instances}
def predict(self, payload, headers=None):
"""Predict using fetched features"""
instances = payload["instances"]
predictions = self.model.predict(instances)
return {"predictions": predictions.tolist()}from kserve import Model, logger
from datadog import initialize, statsd
import time
class DatadogModel(Model):
def __init__(self, name: str, datadog_config: dict):
super().__init__(name)
initialize(**datadog_config)
self.statsd = statsd
def load(self):
self.model = joblib.load("/mnt/models/model.pkl")
self.ready = True
# Send metric
self.statsd.increment('model.loaded', tags=[f"model:{self.name}"])
def predict(self, payload, headers=None):
"""Predict with Datadog metrics"""
start = time.time()
try:
instances = payload["instances"]
# Send batch size metric
self.statsd.histogram(
'model.batch_size',
len(instances),
tags=[f"model:{self.name}"]
)
# Run prediction
predictions = self.model.predict(instances)
# Send latency metric
latency = time.time() - start
self.statsd.histogram(
'model.prediction.latency',
latency,
tags=[f"model:{self.name}"]
)
# Send success metric
self.statsd.increment(
'model.prediction.success',
tags=[f"model:{self.name}"]
)
return {"predictions": predictions.tolist()}
except Exception as e:
# Send error metric
self.statsd.increment(
'model.prediction.error',
tags=[f"model:{self.name}", f"error_type:{type(e).__name__}"]
)
raisefrom kserve import Model
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer(__name__)
class TracedModel(Model):
def predict(self, payload, headers=None):
"""Predict with distributed tracing"""
with tracer.start_as_current_span("model.predict") as span:
span.set_attribute("model.name", self.name)
span.set_attribute("batch.size", len(payload.get("instances", [])))
try:
instances = payload["instances"]
with tracer.start_as_current_span("model.inference"):
predictions = self.model.predict(instances)
span.set_status(Status(StatusCode.OK))
return {"predictions": predictions.tolist()}
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise# InferenceService with Istio mTLS
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: sklearn-iris
namespace: default
annotations:
sidecar.istio.io/inject: "true"
traffic.sidecar.istio.io/includeInboundPorts: "8080,8081"
spec:
predictor:
sklearn:
storageUri: gs://models/sklearn/irisfrom kserve import InferenceRESTClient, RESTConfig
# Client configuration for Istio mTLS
config = RESTConfig(
protocol="v2",
verify="/etc/certs/root-cert.pem",
cert=("/etc/certs/cert-chain.pem", "/etc/certs/key.pem")
)
client = InferenceRESTClient(config=config)from kserve import KServeClient
def deploy_with_s3_iam():
"""Deploy model using S3 with IAM role"""
client = KServeClient()
# Create service account with IAM role annotation
from kubernetes import client as k8s_client
sa = k8s_client.V1ServiceAccount(
metadata=k8s_client.V1ObjectMeta(
name="kserve-sa",
namespace="default",
annotations={
"eks.amazonaws.com/role-arn": "arn:aws:iam::123456789:role/kserve-s3-role"
}
)
)
# Create InferenceService using service account
isvc = V1beta1InferenceService(
metadata={"name": "sklearn-s3", "namespace": "default"},
spec=V1beta1InferenceServiceSpec(
predictor=V1beta1PredictorSpec(
service_account_name="kserve-sa",
sklearn=V1beta1SKLearnSpec(
storage_uri="s3://my-bucket/models/sklearn/iris"
)
)
)
)
client.create(isvc, namespace="default")def deploy_with_workload_identity():
"""Deploy model using GCP Workload Identity"""
client = KServeClient()
isvc = V1beta1InferenceService(
metadata={
"name": "sklearn-gcs",
"namespace": "default"
},
spec=V1beta1InferenceServiceSpec(
predictor=V1beta1PredictorSpec(
service_account_name="kserve-gcs-sa", # Bound to GCP SA
sklearn=V1beta1SKLearnSpec(
storage_uri="gs://my-bucket/models/sklearn/iris"
)
)
)
)
client.create(isvc, namespace="default")