tessl install tessl/pypi-kserve@0.16.1KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.
Structured logging with configurable log levels and Prometheus metrics for monitoring inference latency across preprocessing, prediction, postprocessing, and explanation stages.
KServe provides structured logging with two logger instances for application and trace logging.
Import Path:
from kserve.logging import logger, trace_logger# Logger instances
logger: logging.Logger
"""
Main KServe logger instance.
Name: "kserve"
Default level: INFO (configurable via KSERVE_LOGLEVEL env var)
"""
trace_logger: logging.Logger
"""
Trace logger for detailed request/response logging.
Name: "kserve.trace"
Default level: INFO
"""
# Logger name constants
KSERVE_LOGGER_NAME: str = "kserve"
KSERVE_TRACE_LOGGER_NAME: str = "kserve.trace"
KSERVE_LOGLEVEL: str # From environment variable, default: "INFO"
def configure_logging(log_config: Union[Dict, str] = None) -> None:
"""
Configure logging with custom configuration.
Args:
log_config (dict or str, optional): Logging configuration.
Can be:
- dict: Python logging configuration dictionary
- str: Path to JSON or YAML config file
- str: JSON string
- None: Use default configuration
"""KServe exports Prometheus histograms for monitoring inference latency.
# Prometheus histogram metrics
PRE_HIST_TIME: prometheus_client.Histogram
"""
Preprocessing latency histogram.
Metric name: request_preprocess_seconds
Buckets: 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0
"""
POST_HIST_TIME: prometheus_client.Histogram
"""
Postprocessing latency histogram.
Metric name: request_postprocess_seconds
"""
PREDICT_HIST_TIME: prometheus_client.Histogram
"""
Prediction latency histogram.
Metric name: request_predict_seconds
"""
EXPLAIN_HIST_TIME: prometheus_client.Histogram
"""
Explanation latency histogram.
Metric name: request_explain_seconds
"""
def get_labels(model_name: str) -> Dict[str, str]:
"""
Create Prometheus labels dictionary for a model.
Args:
model_name (str): Name of the model
Returns:
dict: Labels dictionary with model_name
"""Data model for LLM token statistics.
class LLMStats:
"""
LLM token statistics (Pydantic BaseModel).
Attributes:
num_prompt_tokens (int): Number of tokens in prompt
num_generation_tokens (int): Number of tokens generated
"""
num_prompt_tokens: int
num_generation_tokens: intfrom kserve import logger
# Info logging
logger.info("Model server starting")
logger.info(f"Loading model from {storage_uri}")
# Warning logging
logger.warning("Model took longer than expected to load")
# Error logging
try:
result = model.predict(data)
except Exception as e:
logger.error(f"Prediction failed: {e}", exc_info=True)
# Debug logging
logger.debug(f"Input shape: {input_data.shape}")
logger.debug(f"Model config: {config}")from kserve import Model, logger
class MyModel(Model):
def load(self):
logger.info(f"Loading model {self.name}")
self.model = load_model()
logger.info(f"Model {self.name} loaded successfully")
self.ready = True
def predict(self, payload, headers=None):
logger.debug(f"Received prediction request for {self.name}")
logger.debug(f"Payload: {payload}")
instances = payload["instances"]
predictions = self.model.predict(instances)
logger.info(f"Prediction completed for {self.name}")
return {"predictions": predictions.tolist()}from kserve import trace_logger
import json
class TracedModel(Model):
def predict(self, payload, headers=None):
# Log request
trace_logger.info(f"Request: {json.dumps(payload)}")
# Run prediction
instances = payload["instances"]
predictions = self.model.predict(instances)
response = {"predictions": predictions.tolist()}
# Log response
trace_logger.info(f"Response: {json.dumps(response)}")
return responsefrom kserve import configure_logging
# Configure with dictionary
log_config = {
"version": 1,
"formatters": {
"default": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "default",
"level": "INFO"
},
"file": {
"class": "logging.FileHandler",
"filename": "/var/log/kserve/model.log",
"formatter": "default",
"level": "DEBUG"
}
},
"loggers": {
"kserve": {
"level": "INFO",
"handlers": ["console", "file"],
"propagate": False
},
"kserve.trace": {
"level": "DEBUG",
"handlers": ["file"],
"propagate": False
}
}
}
configure_logging(log_config)from kserve import configure_logging
# From YAML file
configure_logging("/etc/kserve/logging.yaml")
# From JSON file
configure_logging("/etc/kserve/logging.json")Example YAML configuration file:
version: 1
formatters:
json:
class: pythonjsonlogger.jsonlogger.JsonFormatter
format: "%(asctime)s %(name)s %(levelname)s %(message)s"
default:
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
handlers:
console:
class: logging.StreamHandler
formatter: default
level: INFO
file:
class: logging.handlers.RotatingFileHandler
filename: /var/log/kserve/model.log
formatter: json
level: DEBUG
maxBytes: 10485760
backupCount: 5
loggers:
kserve:
level: INFO
handlers: [console, file]
propagate: false
kserve.trace:
level: DEBUG
handlers: [file]
propagate: falseimport os
from kserve import logger
# Log level is read from KSERVE_LOGLEVEL environment variable
# Set via environment: export KSERVE_LOGLEVEL=DEBUG
logger.info("This will always show")
logger.debug("This only shows if KSERVE_LOGLEVEL=DEBUG")
# Check current log level
print(f"Current log level: {logger.level}")from kserve import logger
import json
class StructuredLoggingModel(Model):
def predict(self, payload, headers=None):
# Log with structured data
log_data = {
"model": self.name,
"operation": "predict",
"input_size": len(payload.get("instances", [])),
"timestamp": time.time()
}
logger.info(f"Prediction started: {json.dumps(log_data)}")
# Run prediction
instances = payload["instances"]
predictions = self.model.predict(instances)
# Log result
log_data["output_size"] = len(predictions)
log_data["status"] = "success"
logger.info(f"Prediction completed: {json.dumps(log_data)}")
return {"predictions": predictions.tolist()}from kserve import Model, get_labels
from kserve.metrics import PRE_HIST_TIME, PREDICT_HIST_TIME, POST_HIST_TIME
import time
class MetricsModel(Model):
def preprocess(self, body, headers=None):
labels = get_labels(self.name)
start_time = time.time()
# Preprocess
processed = self._transform_input(body)
# Record preprocessing time
elapsed = time.time() - start_time
PRE_HIST_TIME.labels(**labels).observe(elapsed)
return processed
def predict(self, payload, headers=None):
labels = get_labels(self.name)
start_time = time.time()
# Predict
instances = payload["instances"]
predictions = self.model.predict(instances)
# Record prediction time
elapsed = time.time() - start_time
PREDICT_HIST_TIME.labels(**labels).observe(elapsed)
return {"predictions": predictions.tolist()}
def postprocess(self, response, headers=None):
labels = get_labels(self.name)
start_time = time.time()
# Postprocess
processed = self._add_metadata(response)
# Record postprocessing time
elapsed = time.time() - start_time
POST_HIST_TIME.labels(**labels).observe(elapsed)
return processedKServe automatically records metrics for all model lifecycle methods:
from kserve import Model
class AutoMetricsModel(Model):
def preprocess(self, body, headers=None):
# Preprocessing time automatically recorded
return self._transform(body)
def predict(self, payload, headers=None):
# Prediction time automatically recorded
return {"predictions": self.model.predict(payload["instances"])}
def postprocess(self, response, headers=None):
# Postprocessing time automatically recorded
return self._add_metadata(response)# Get Prometheus metrics
curl http://localhost:8080/metrics
# Example output:
# request_preprocess_seconds_bucket{le="0.005",model_name="sklearn-iris"} 45.0
# request_preprocess_seconds_bucket{le="0.01",model_name="sklearn-iris"} 89.0
# request_preprocess_seconds_sum{model_name="sklearn-iris"} 8.234
# request_preprocess_seconds_count{model_name="sklearn-iris"} 100.0
#
# request_predict_seconds_bucket{le="0.01",model_name="sklearn-iris"} 12.0
# request_predict_seconds_bucket{le="0.025",model_name="sklearn-iris"} 78.0
# request_predict_seconds_sum{model_name="sklearn-iris"} 15.678
# request_predict_seconds_count{model_name="sklearn-iris"} 100.0from kserve import Model
from kserve.metrics import LLMStats
class LLMModel(Model):
def predict(self, payload, headers=None):
prompt = payload["prompt"]
max_tokens = payload.get("max_tokens", 100)
# Generate response
response = self.model.generate(prompt, max_tokens=max_tokens)
# Create stats
stats = LLMStats(
num_prompt_tokens=len(self.tokenizer.encode(prompt)),
num_generation_tokens=len(self.tokenizer.encode(response))
)
return {
"generated_text": response,
"stats": stats.dict()
}from prometheus_client import Counter, Gauge, Histogram
from kserve import Model
# Define custom metrics
PREDICTION_COUNTER = Counter(
'model_predictions_total',
'Total number of predictions',
['model_name', 'status']
)
BATCH_SIZE_GAUGE = Gauge(
'model_batch_size',
'Current batch size',
['model_name']
)
INFERENCE_LATENCY = Histogram(
'model_inference_seconds',
'Model inference latency',
['model_name'],
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.5, 1.0]
)
class CustomMetricsModel(Model):
def predict(self, payload, headers=None):
import time
instances = payload["instances"]
batch_size = len(instances)
# Update batch size gauge
BATCH_SIZE_GAUGE.labels(model_name=self.name).set(batch_size)
# Time inference
start = time.time()
try:
predictions = self.model.predict(instances)
# Record success
PREDICTION_COUNTER.labels(
model_name=self.name,
status='success'
).inc()
# Record latency
latency = time.time() - start
INFERENCE_LATENCY.labels(model_name=self.name).observe(latency)
return {"predictions": predictions.tolist()}
except Exception as e:
# Record failure
PREDICTION_COUNTER.labels(
model_name=self.name,
status='error'
).inc()
raise# prometheus.yml
scrape_configs:
- job_name: 'kserve'
scrape_interval: 15s
static_configs:
- targets: ['localhost:8080']
metrics_path: '/metrics'# Average prediction latency
rate(request_predict_seconds_sum[5m]) / rate(request_predict_seconds_count[5m])
# 95th percentile prediction latency
histogram_quantile(0.95, rate(request_predict_seconds_bucket[5m]))
# 99th percentile prediction latency
histogram_quantile(0.99, rate(request_predict_seconds_bucket[5m]))
# Request rate per model
rate(request_predict_seconds_count[5m])
# Total preprocessing time
rate(request_preprocess_seconds_sum[5m])
# Total requests by model
sum by (model_name) (request_predict_seconds_count)Example Grafana dashboard queries:
{
"panels": [
{
"title": "Prediction Latency (p95)",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(request_predict_seconds_bucket[5m]))"
}
]
},
{
"title": "Request Rate",
"targets": [
{
"expr": "rate(request_predict_seconds_count[1m])"
}
]
},
{
"title": "Preprocessing Time",
"targets": [
{
"expr": "rate(request_preprocess_seconds_sum[5m])"
}
]
}
]
}from kserve import logger
# DEBUG: Detailed debugging information
logger.debug(f"Input tensor shape: {tensor.shape}")
# INFO: General informational messages
logger.info("Model loaded successfully")
# WARNING: Warning messages
logger.warning("Model load time exceeded 10 seconds")
# ERROR: Error messages
logger.error(f"Failed to load model: {e}", exc_info=True)from kserve import logger
# Bad: Not enough context
logger.info("Prediction completed")
# Good: Include relevant context
logger.info(f"Prediction completed for model {self.name}, batch_size={batch_size}, latency={latency:.3f}s")from kserve import logger, trace_logger
class LoggingModel(Model):
def predict(self, payload, headers=None):
# Use trace logger for detailed request/response
trace_logger.debug(f"Request payload: {payload}")
# Use regular logger for application events
logger.info(f"Processing prediction for {self.name}")
result = self.model.predict(payload["instances"])
trace_logger.debug(f"Response: {result}")
logger.info(f"Prediction completed")
return {"predictions": result.tolist()}from kserve import Model, get_labels
from kserve.metrics import PREDICT_HIST_TIME
from prometheus_client import Counter
# Track prediction counts
PREDICTION_COUNT = Counter(
'predictions_total',
'Total predictions',
['model_name', 'batch_size_bucket']
)
class MonitoredModel(Model):
def predict(self, payload, headers=None):
instances = payload["instances"]
batch_size = len(instances)
# Bucket batch sizes
if batch_size <= 1:
bucket = "small"
elif batch_size <= 10:
bucket = "medium"
else:
bucket = "large"
PREDICTION_COUNT.labels(
model_name=self.name,
batch_size_bucket=bucket
).inc()
predictions = self.model.predict(instances)
return {"predictions": predictions.tolist()}# production_logging.yaml
version: 1
disable_existing_loggers: false
formatters:
json:
class: pythonjsonlogger.jsonlogger.JsonFormatter
format: "%(asctime)s %(name)s %(levelname)s %(message)s"
handlers:
console:
class: logging.StreamHandler
formatter: json
level: INFO
file:
class: logging.handlers.RotatingFileHandler
filename: /var/log/kserve/production.log
formatter: json
level: INFO
maxBytes: 104857600 # 100MB
backupCount: 10
loggers:
kserve:
level: INFO
handlers: [console, file]
propagate: false
kserve.trace:
level: WARNING # Less verbose in production
handlers: [file]
propagate: false# Prometheus alerting rules
groups:
- name: kserve_alerts
rules:
- alert: HighLatency
expr: histogram_quantile(0.95, rate(request_predict_seconds_bucket[5m])) > 1.0
for: 5m
labels:
severity: warning
annotations:
summary: "High prediction latency detected"
- alert: HighErrorRate
expr: rate(model_predictions_total{status="error"}[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"