tessl install tessl/pypi-kserve@0.16.1KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.
Dynamically manage models at runtime with support for loading, unloading, and querying model readiness.
The ModelRepository manages a collection of models and provides dynamic loading/unloading capabilities.
class ModelRepository:
"""
Model repository for dynamic model management.
Args:
models_dir (str): Directory containing model artifacts (default: "/mnt/models")
"""
def __init__(self, models_dir: str = "/mnt/models"): ...
def load(self, name: str) -> bool:
"""
Load a specific model by name.
Args:
name (str): Model name to load
Returns:
bool: True if model loaded successfully, False otherwise
"""
def load_model(self, name: str) -> bool:
"""
Load a model from the models directory.
Args:
name (str): Model name to load
Returns:
bool: True if model loaded successfully, False otherwise
Raises:
ModelMissingError: If model path does not exist
"""
def set_models_dir(self, models_dir: str) -> None:
"""
Set the models directory path.
Args:
models_dir (str): Path to models directory
"""
def load_models(self) -> None:
"""
Load all models from the models directory.
Scans models_dir and loads each model found.
"""
def get_model(self, name: str) -> Model:
"""
Get a model by name.
Args:
name (str): Model name
Returns:
Model: Model instance
Raises:
ModelMissingError: If model is not found
"""
def get_models(self) -> Dict[str, Model]:
"""
Get all loaded models.
Returns:
dict: Dictionary mapping model names to Model instances
"""
def update(self, model: Model, name: str = None) -> None:
"""
Add or update a model in the repository.
Args:
model (Model): Model instance to add
name (str, optional): Model name (default: uses model.name)
"""
def unload(self, name: str) -> None:
"""
Unload a model from the repository.
Args:
name (str): Name of model to unload
Raises:
ModelMissingError: If model is not found
"""
async def is_model_ready(self, name: str) -> bool:
"""
Check if a model is ready (async).
Args:
name (str): Model name
Returns:
bool: True if model is ready, False otherwise
"""from kserve import ModelRepository, Model
class MyModel(Model):
def load(self):
# Load model artifacts
self.model = load_model()
self.ready = True
def predict(self, payload, headers=None):
return {"predictions": self.model.predict(payload["instances"])}
# Create repository
repository = ModelRepository(models_dir="/mnt/models")
# Create and add model
model = MyModel("my-model")
model.load()
repository.update(model)
# Get model
retrieved_model = repository.get_model("my-model")
# Check if ready
if repository.is_model_ready("my-model"):
print("Model is ready")from kserve import ModelRepository
# Create repository
repository = ModelRepository(models_dir="/mnt/models")
# Load all models from directory
# Directory structure:
# /mnt/models/
# ├── model-a/
# ├── model-b/
# └── model-c/
repository.load_models()
# List all models
models = repository.get_models()
for name, model in models.items():
print(f"Loaded model: {name}, ready: {model.ready}")from kserve import ModelRepository, Model
# Create repository
repository = ModelRepository()
# Load model dynamically
class DynamicModel(Model):
def __init__(self, name: str, model_path: str):
super().__init__(name)
self.model_path = model_path
def load(self):
import joblib
self.model = joblib.load(self.model_path)
self.ready = True
def predict(self, payload, headers=None):
instances = payload["instances"]
predictions = self.model.predict(instances)
return {"predictions": predictions.tolist()}
# Add new model
new_model = DynamicModel("sklearn-v2", "/models/sklearn/v2/model.pkl")
new_model.load()
repository.update(new_model)
print(f"Model added: {new_model.name}")from kserve import ModelRepository, Model
repository = ModelRepository()
# Load multiple versions of same model
model_v1 = MyModel("fraud-detector-v1")
model_v1.load()
repository.update(model_v1)
model_v2 = MyModel("fraud-detector-v2")
model_v2.load()
repository.update(model_v2)
# Access specific version
v1 = repository.get_model("fraud-detector-v1")
v2 = repository.get_model("fraud-detector-v2")from kserve import ModelRepository
repository = ModelRepository()
# Add models
repository.update(model_a)
repository.update(model_b)
# Unload model to free resources
repository.unload("model-a")
# Check if model exists
try:
model = repository.get_model("model-a")
except ModelMissingError:
print("Model has been unloaded")from kserve import ModelRepository, Model
repository = ModelRepository()
# Initial model
model = MyModel("classifier")
model.load()
repository.update(model)
# Update model (e.g., after retraining)
updated_model = MyModel("classifier")
updated_model.load() # Loads new version
repository.update(updated_model, name="classifier") # Replaces existing
print("Model updated")from kserve import ModelRepository
repository = ModelRepository()
repository.update(my_model)
# Wait for model to be ready
import time
timeout = 30
elapsed = 0
while not repository.is_model_ready("my-model") and elapsed < timeout:
time.sleep(1)
elapsed += 1
if repository.is_model_ready("my-model"):
print("Model is ready")
else:
print("Model failed to become ready")from kserve import ModelRepository
repository = ModelRepository()
# Add multiple models
repository.update(model1)
repository.update(model2)
repository.update(model3)
# Get all models
all_models = repository.get_models()
print(f"Total models: {len(all_models)}")
for name, model in all_models.items():
print(f"- {name}: ready={model.ready}")from kserve import ModelServer, ModelRepository, Model
class SKLearnModel(Model):
def load(self):
import joblib
self.model = joblib.load(f"/mnt/models/{self.name}/model.pkl")
self.ready = True
def predict(self, payload, headers=None):
instances = payload["instances"]
predictions = self.model.predict(instances)
return {"predictions": predictions.tolist()}
if __name__ == "__main__":
# Create repository
repository = ModelRepository(models_dir="/mnt/models")
# Create models
model1 = SKLearnModel("iris")
model1.load()
model2 = SKLearnModel("wine")
model2.load()
# Add to repository
repository.update(model1)
repository.update(model2)
# Start server with repository models
ModelServer().start(list(repository.get_models().values()))from kserve import ModelServer, ModelRepository, Model
from fastapi import FastAPI
# Create repository
repository = ModelRepository()
# Create FastAPI app
app = FastAPI()
# Custom endpoint to load model
@app.post("/v1/models/{model_name}/load")
async def load_model_endpoint(model_name: str, model_path: str):
# Create and load model
model = MyModel(model_name)
model.model_path = model_path
model.load()
# Add to repository
repository.update(model)
return {"status": "loaded", "model": model_name}
# Custom endpoint to unload model
@app.delete("/v1/models/{model_name}")
async def unload_model_endpoint(model_name: str):
repository.unload(model_name)
return {"status": "unloaded", "model": model_name}
# Start server
if __name__ == "__main__":
ModelServer().start([])The ModelRepository integrates with KServe's Model Repository Extension protocol for dynamic model management via API.
# Internal class (not directly instantiated by users)
class ModelRepositoryExtension:
"""
Model repository extension for v2 protocol.
Provides API endpoints for dynamic model management.
"""
def __init__(self, model_repository: ModelRepository): ...
async def load(self, model_name: str) -> Dict:
"""
Load a model.
Args:
model_name (str): Name of model to load
Returns:
dict: Load status
"""
async def unload(self, model_name: str) -> Dict:
"""
Unload a model.
Args:
model_name (str): Name of model to unload
Returns:
dict: Unload status
"""
async def list(self) -> List[str]:
"""
List all models.
Returns:
list: List of model names
"""When ModelServer is started with a repository, these endpoints are available:
# Load a model
POST /v2/repository/models/{model_name}/load
# Unload a model
POST /v2/repository/models/{model_name}/unload
# List all models
GET /v2/repository/modelsExample:
# Load model
curl -X POST http://localhost:8080/v2/repository/models/sklearn-iris/load
# Unload model
curl -X POST http://localhost:8080/v2/repository/models/sklearn-iris/unload
# List models
curl http://localhost:8080/v2/repository/modelsfrom kserve import ModelRepository
from kserve.errors import ModelMissingError
repository = ModelRepository()
# Handle missing model
try:
model = repository.get_model("non-existent")
except ModelMissingError as e:
print(f"Model not found: {e}")
# Check before getting
if repository.is_model_ready("my-model"):
model = repository.get_model("my-model")
else:
print("Model not ready")from kserve import Model, ModelServer, ModelRepository
import joblib
import os
class SKLearnModel(Model):
def __init__(self, name: str, model_dir: str):
super().__init__(name)
self.model_dir = model_dir
self.model = None
def load(self):
model_path = os.path.join(self.model_dir, "model.pkl")
self.model = joblib.load(model_path)
self.ready = True
print(f"Model {self.name} loaded from {model_path}")
def predict(self, payload, headers=None):
instances = payload["instances"]
predictions = self.model.predict(instances)
return {"predictions": predictions.tolist()}
if __name__ == "__main__":
# Create repository
repository = ModelRepository(models_dir="/mnt/models")
# Discover and load models from directory
models_dir = "/mnt/models"
model_names = ["iris", "wine", "digits"]
for model_name in model_names:
model_dir = os.path.join(models_dir, model_name)
if os.path.exists(model_dir):
# Create and load model
model = SKLearnModel(model_name, model_dir)
try:
model.load()
repository.update(model)
print(f"Added {model_name} to repository")
except Exception as e:
print(f"Failed to load {model_name}: {e}")
# Check readiness
print(f"\nLoaded {len(repository.get_models())} models")
for name, model in repository.get_models().items():
print(f"- {name}: ready={repository.is_model_ready(name)}")
# Start server with all models
print("\nStarting model server...")
ModelServer().start(list(repository.get_models().values()))Load models only when needed:
class LazyModel(Model):
def __init__(self, name: str, model_path: str):
super().__init__(name)
self.model_path = model_path
self.model = None
self.ready = False
def load(self):
if not self.model:
self.model = load_model(self.model_path)
self.ready = True
# Add to repository without loading
repository.update(LazyModel("model-1", "/path/to/model"))
# Model loads on first predictionUnload models to free memory:
# Monitor memory usage
import psutil
def manage_models(repository: ModelRepository, max_memory_percent: float = 80.0):
memory = psutil.virtual_memory()
if memory.percent > max_memory_percent:
# Unload least recently used model
models = repository.get_models()
if models:
lru_model = min(models.keys())
repository.unload(lru_model)
print(f"Unloaded {lru_model} to free memory")Validate models before adding to repository:
def validate_and_add_model(repository: ModelRepository, model: Model):
try:
# Validate model can load
model.load()
# Validate model can predict
test_input = {"instances": [[1, 2, 3, 4]]}
result = model.predict(test_input)
# Add to repository if valid
repository.update(model)
print(f"Model {model.name} validated and added")
except Exception as e:
print(f"Model validation failed: {e}")Use locks for thread-safe operations:
import threading
class ThreadSafeRepository:
def __init__(self, repository: ModelRepository):
self.repository = repository
self.lock = threading.Lock()
def get_model(self, name: str) -> Model:
with self.lock:
return self.repository.get_model(name)
def update(self, model: Model, name: str = None):
with self.lock:
self.repository.update(model, name)