tessl install tessl/pypi-kserve@0.16.1KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.
Start and manage a FastAPI-based model server with support for multiple models, workers, HTTP/gRPC protocols, and comprehensive configuration options.
The ModelServer orchestrates the inference server lifecycle, managing HTTP and gRPC servers, worker processes, and model registration.
class ModelServer:
"""
Model server orchestrator for KServe inference servers.
Manages FastAPI HTTP server and optionally gRPC server.
Args:
http_port (int): HTTP server port (default: 8080)
grpc_port (int): gRPC server port (default: 8081)
workers (int): Number of worker processes (default: 1)
max_threads (int): Maximum threads per worker (default: None)
max_asyncio_workers (int): Maximum asyncio workers (default: None)
registered_models (ModelRepository, optional): Model repository instance
enable_grpc (bool): Enable gRPC server (default: True)
enable_docs_url (bool): Enable FastAPI docs at /docs (default: False)
enable_latency_logging (bool): Enable latency logging (default: True)
access_log_format (str, optional): Access log format string
grace_period (int): Graceful shutdown period in seconds (default: 30)
predictor_config (PredictorConfig, optional): Predictor configuration
"""
def __init__(
self,
http_port: int = 8080,
grpc_port: int = 8081,
workers: int = 1,
max_threads: int = None,
max_asyncio_workers: int = None,
registered_models: ModelRepository = None,
enable_grpc: bool = True,
enable_docs_url: bool = False,
enable_latency_logging: bool = True,
access_log_format: str = None,
grace_period: int = 30,
predictor_config = None
): ...
def start(self, models: Union[List[Model], Dict[str, Model]]) -> None:
"""
Start the model server with given models.
Args:
models (list or dict): Model instances to serve.
Can be a list of Model objects or a dict mapping names to models.
"""
def create_application(self) -> FastAPI:
"""
Create and configure the FastAPI application.
Returns:
FastAPI: Configured FastAPI application instance
"""
def register_model(self, model: Model) -> None:
"""
Register a model with the server.
Args:
model (Model): Model instance to register
"""
def register_exception_handler(
self,
exc_class: type,
handler: Callable
) -> None:
"""
Register a custom exception handler.
Args:
exc_class (type): Exception class to handle
handler (Callable): Handler function for the exception
"""
def default_exception_handler(
self,
request,
exc: Exception
) -> JSONResponse:
"""
Default exception handler for unhandled exceptions.
Args:
request: FastAPI request object
exc (Exception): Raised exception
Returns:
JSONResponse: Error response with status 500
"""
def setup_event_loop(self) -> None:
"""Setup asyncio event loop with thread pool executor."""
def register_signal_handler(self) -> None:
"""Register signal handlers for graceful shutdown."""
def stop(self) -> None:
"""
Stop the model server gracefully.
Completes in-flight requests and shuts down workers.
"""from kserve import Model, ModelServer
class MyModel(Model):
def load(self):
self.model = load_my_model()
self.ready = True
def predict(self, payload, headers=None):
return {"predictions": self.model.predict(payload["instances"])}
if __name__ == "__main__":
model = MyModel("my-model")
model.load()
ModelServer().start([model])Serve multiple models from a single server:
if __name__ == "__main__":
model1 = MyModel("model-a")
model1.load()
model2 = MyModel("model-b")
model2.load()
# Pass as list
ModelServer().start([model1, model2])
# Or pass as dictionary
ModelServer().start({
"model-a": model1,
"model-b": model2
})The ModelServer accepts command-line arguments for configuration:
# HTTP Configuration
--http_port: int = 8080 # HTTP server port
--workers: int = 1 # Number of worker processes
--max_threads: int = 4 # Max threads per worker
--max_asyncio_workers: int = None # Max asyncio workers
# gRPC Configuration
--grpc_port: int = 8081 # gRPC server port
--enable_grpc: bool = True # Enable gRPC server
# API Documentation
--enable_docs_url: bool = False # Enable FastAPI docs at /docs
# Logging Configuration
--configure_logging: bool = True # Configure KServe logging
--log_config_file: str = None # Path to logging config file
--access_log_format: str = None # Access log format string
--enable_latency_logging: bool = True # Log inference latency# Start with custom HTTP port
python model.py --http_port 9000
# Start with multiple workers
python model.py --workers 4
# Disable gRPC server
python model.py --enable_grpc false
# Enable API documentation
python model.py --enable_docs_url true
# Custom log config
python model.py --log_config_file /path/to/log_config.yamlGET /v2/health/live - Server livenessGET /v2/health/ready - Server readinessGET /v2/models - List all modelsGET /v2/models/{model_name} - Model metadataGET /v2/models/{model_name}/ready - Model readinessPOST /v2/models/{model_name}/infer - Inference requestPOST /v2/models/{model_name}/generate - Generation request (LLM)POST /v1/models/{model_name}:predict - PredictionPOST /v1/models/{model_name}:explain - ExplanationWhen --enable_docs_url true is set:
GET /docs - Swagger UIGET /redoc - ReDoc UIGET /openapi.json - OpenAPI specificationGET /metrics - Prometheus metricsPrometheus histograms:
request_preprocess_seconds - Preprocessing latencyrequest_predict_seconds - Prediction latencyrequest_postprocess_seconds - Postprocessing latencyrequest_explain_seconds - Explanation latencyRun multiple worker processes for increased throughput:
# Start with 4 workers
python model.py --workers 4Workers are managed using Uvicorn's multiprocess mode. Each worker is a separate process with its own model instance.
Configure threading for CPU-bound operations:
# Set max threads per worker
python model.py --max_threads 8Configure asyncio task workers:
# Set max asyncio workers
python model.py --max_asyncio_workers 100KServe supports gRPC v2 protocol for high-performance inference:
# Enable/disable gRPC
python model.py --enable_grpc true
# Set gRPC port
python model.py --grpc_port 9000gRPC endpoints:
ServerLive - Server livenessServerReady - Server readinessModelReady - Model readinessServerMetadata - Server metadataModelMetadata - Model metadataModelInfer - Inference requestKServe configures structured logging by default:
from kserve import logger
logger.info("Model loaded successfully")
logger.error("Prediction failed", exc_info=True)Provide a custom logging configuration:
# YAML config file
python model.py --log_config_file /path/to/logging.yaml
# JSON config
python model.py --log_config_file /path/to/logging.jsonExample YAML config:
version: 1
formatters:
default:
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
handlers:
console:
class: logging.StreamHandler
formatter: default
level: INFO
loggers:
kserve:
level: INFO
handlers: [console]
kserve.trace:
level: DEBUG
handlers: [console]python model.py --configure_logging falseKServe automatically logs inference latency:
# Enable/disable latency logging
python model.py --enable_latency_logging trueLogs include:
Customize HTTP access log format:
python model.py --access_log_format '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'Check if server process is alive:
curl http://localhost:8080/v2/health/liveResponse:
{"live": true}Check if server is ready to accept requests:
curl http://localhost:8080/v2/health/readyResponse:
{"ready": true}Returns ready when at least one model is loaded and ready.
Check if a specific model is ready:
curl http://localhost:8080/v2/models/my-model/readyResponse:
{"ready": true}For advanced use cases, create a custom FastAPI application:
from kserve import ModelServer, Model
class MyModel(Model):
def load(self):
self.model = load_my_model()
self.ready = True
def predict(self, payload, headers=None):
return {"predictions": self.model.predict(payload["instances"])}
if __name__ == "__main__":
# Create model
model = MyModel("my-model")
model.load()
# Create server and get FastAPI app
server = ModelServer()
app = server.create_application()
# Register models
server.start([model])Add custom routes:
from fastapi import FastAPI
app = server.create_application()
@app.get("/custom")
def custom_endpoint():
return {"message": "Custom endpoint"}
server.start([model])Get server metadata:
curl http://localhost:8080/v2Response:
{
"name": "kserve",
"version": "0.16.0",
"extensions": []
}Get model metadata:
curl http://localhost:8080/v2/models/my-modelResponse:
{
"name": "my-model",
"versions": ["1"],
"platform": "kserve",
"inputs": [],
"outputs": []
}List all loaded models:
curl http://localhost:8080/v2/modelsResponse:
{
"models": ["my-model", "another-model"]
}KServe returns structured error responses:
{
"error": "Model my-model is not ready",
"code": 503
}Common HTTP status codes:
400 - Invalid input404 - Model not found500 - Inference error503 - Model not ready / Server not readyKServe handles graceful shutdown:
Configure SSL for HTTPS:
# Via Uvicorn SSL configuration
import uvicorn
if __name__ == "__main__":
model = MyModel("my-model")
model.load()
server = ModelServer()
server.start([model])
# Or run with SSL manually
uvicorn.run(
server.create_application(),
host="0.0.0.0",
port=8443,
ssl_keyfile="/path/to/key.pem",
ssl_certfile="/path/to/cert.pem"
)KServe respects environment variables:
KSERVE_LOGLEVEL - Log level (default: INFO)MODELS_DIR - Models directory (default: /mnt/models)KServe supports CloudEvents for request/response:
# Structured CloudEvents
Content-Type: application/cloudevents+json
{
"specversion": "1.0",
"type": "org.kserve.inference.request",
"source": "client",
"id": "12345",
"datacontenttype": "application/json",
"data": {
"instances": [[1, 2, 3]]
}
}from kserve import Model, ModelServer
import argparse
class SKLearnModel(Model):
def load(self):
import joblib
self.model = joblib.load("/mnt/models/model.pkl")
self.ready = True
def predict(self, payload, headers=None):
instances = payload["instances"]
predictions = self.model.predict(instances)
return {"predictions": predictions.tolist()}
if __name__ == "__main__":
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("--model_name", default="sklearn-model")
args, _ = parser.parse_known_args()
# Create and load model
model = SKLearnModel(args.model_name)
model.load()
# Start server
# Additional args are parsed by ModelServer
ModelServer().start([model])Run with:
python model.py --model_name my-model --http_port 9000 --workers 2 --enable_grpc false