or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kserve@0.16.x

docs

index.md
tile.json

tessl/pypi-kserve

tessl install tessl/pypi-kserve@0.16.1

KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.

model-repository.mddocs/reference/

Model Repository

Dynamically manage models at runtime with support for loading, unloading, and querying model readiness.

Capabilities

ModelRepository Class

The ModelRepository manages a collection of models and provides dynamic loading/unloading capabilities.

class ModelRepository:
    """
    Model repository for dynamic model management.

    Args:
        models_dir (str): Directory containing model artifacts (default: "/mnt/models")
    """
    def __init__(self, models_dir: str = "/mnt/models"): ...

    def load(self, name: str) -> bool:
        """
        Load a specific model by name.

        Args:
            name (str): Model name to load

        Returns:
            bool: True if model loaded successfully, False otherwise
        """

    def load_model(self, name: str) -> bool:
        """
        Load a model from the models directory.

        Args:
            name (str): Model name to load

        Returns:
            bool: True if model loaded successfully, False otherwise

        Raises:
            ModelMissingError: If model path does not exist
        """

    def set_models_dir(self, models_dir: str) -> None:
        """
        Set the models directory path.

        Args:
            models_dir (str): Path to models directory
        """

    def load_models(self) -> None:
        """
        Load all models from the models directory.
        Scans models_dir and loads each model found.
        """

    def get_model(self, name: str) -> Model:
        """
        Get a model by name.

        Args:
            name (str): Model name

        Returns:
            Model: Model instance

        Raises:
            ModelMissingError: If model is not found
        """

    def get_models(self) -> Dict[str, Model]:
        """
        Get all loaded models.

        Returns:
            dict: Dictionary mapping model names to Model instances
        """

    def update(self, model: Model, name: str = None) -> None:
        """
        Add or update a model in the repository.

        Args:
            model (Model): Model instance to add
            name (str, optional): Model name (default: uses model.name)
        """

    def unload(self, name: str) -> None:
        """
        Unload a model from the repository.

        Args:
            name (str): Name of model to unload

        Raises:
            ModelMissingError: If model is not found
        """

    async def is_model_ready(self, name: str) -> bool:
        """
        Check if a model is ready (async).

        Args:
            name (str): Model name

        Returns:
            bool: True if model is ready, False otherwise
        """

Usage Examples

Basic Repository Usage

from kserve import ModelRepository, Model

class MyModel(Model):
    def load(self):
        # Load model artifacts
        self.model = load_model()
        self.ready = True

    def predict(self, payload, headers=None):
        return {"predictions": self.model.predict(payload["instances"])}

# Create repository
repository = ModelRepository(models_dir="/mnt/models")

# Create and add model
model = MyModel("my-model")
model.load()
repository.update(model)

# Get model
retrieved_model = repository.get_model("my-model")

# Check if ready
if repository.is_model_ready("my-model"):
    print("Model is ready")

Loading Models from Directory

from kserve import ModelRepository

# Create repository
repository = ModelRepository(models_dir="/mnt/models")

# Load all models from directory
# Directory structure:
# /mnt/models/
#   ├── model-a/
#   ├── model-b/
#   └── model-c/
repository.load_models()

# List all models
models = repository.get_models()
for name, model in models.items():
    print(f"Loaded model: {name}, ready: {model.ready}")

Dynamic Model Loading

from kserve import ModelRepository, Model

# Create repository
repository = ModelRepository()

# Load model dynamically
class DynamicModel(Model):
    def __init__(self, name: str, model_path: str):
        super().__init__(name)
        self.model_path = model_path

    def load(self):
        import joblib
        self.model = joblib.load(self.model_path)
        self.ready = True

    def predict(self, payload, headers=None):
        instances = payload["instances"]
        predictions = self.model.predict(instances)
        return {"predictions": predictions.tolist()}

# Add new model
new_model = DynamicModel("sklearn-v2", "/models/sklearn/v2/model.pkl")
new_model.load()
repository.update(new_model)

print(f"Model added: {new_model.name}")

Model Versioning

from kserve import ModelRepository, Model

repository = ModelRepository()

# Load multiple versions of same model
model_v1 = MyModel("fraud-detector-v1")
model_v1.load()
repository.update(model_v1)

model_v2 = MyModel("fraud-detector-v2")
model_v2.load()
repository.update(model_v2)

# Access specific version
v1 = repository.get_model("fraud-detector-v1")
v2 = repository.get_model("fraud-detector-v2")

Unloading Models

from kserve import ModelRepository

repository = ModelRepository()

# Add models
repository.update(model_a)
repository.update(model_b)

# Unload model to free resources
repository.unload("model-a")

# Check if model exists
try:
    model = repository.get_model("model-a")
except ModelMissingError:
    print("Model has been unloaded")

Model Updates

from kserve import ModelRepository, Model

repository = ModelRepository()

# Initial model
model = MyModel("classifier")
model.load()
repository.update(model)

# Update model (e.g., after retraining)
updated_model = MyModel("classifier")
updated_model.load()  # Loads new version
repository.update(updated_model, name="classifier")  # Replaces existing

print("Model updated")

Checking Model Readiness

from kserve import ModelRepository

repository = ModelRepository()
repository.update(my_model)

# Wait for model to be ready
import time

timeout = 30
elapsed = 0
while not repository.is_model_ready("my-model") and elapsed < timeout:
    time.sleep(1)
    elapsed += 1

if repository.is_model_ready("my-model"):
    print("Model is ready")
else:
    print("Model failed to become ready")

Listing All Models

from kserve import ModelRepository

repository = ModelRepository()

# Add multiple models
repository.update(model1)
repository.update(model2)
repository.update(model3)

# Get all models
all_models = repository.get_models()

print(f"Total models: {len(all_models)}")
for name, model in all_models.items():
    print(f"- {name}: ready={model.ready}")

Integration with ModelServer

Using Repository with ModelServer

from kserve import ModelServer, ModelRepository, Model

class SKLearnModel(Model):
    def load(self):
        import joblib
        self.model = joblib.load(f"/mnt/models/{self.name}/model.pkl")
        self.ready = True

    def predict(self, payload, headers=None):
        instances = payload["instances"]
        predictions = self.model.predict(instances)
        return {"predictions": predictions.tolist()}

if __name__ == "__main__":
    # Create repository
    repository = ModelRepository(models_dir="/mnt/models")

    # Create models
    model1 = SKLearnModel("iris")
    model1.load()

    model2 = SKLearnModel("wine")
    model2.load()

    # Add to repository
    repository.update(model1)
    repository.update(model2)

    # Start server with repository models
    ModelServer().start(list(repository.get_models().values()))

Dynamic Model Loading at Runtime

from kserve import ModelServer, ModelRepository, Model
from fastapi import FastAPI

# Create repository
repository = ModelRepository()

# Create FastAPI app
app = FastAPI()

# Custom endpoint to load model
@app.post("/v1/models/{model_name}/load")
async def load_model_endpoint(model_name: str, model_path: str):
    # Create and load model
    model = MyModel(model_name)
    model.model_path = model_path
    model.load()

    # Add to repository
    repository.update(model)

    return {"status": "loaded", "model": model_name}

# Custom endpoint to unload model
@app.delete("/v1/models/{model_name}")
async def unload_model_endpoint(model_name: str):
    repository.unload(model_name)
    return {"status": "unloaded", "model": model_name}

# Start server
if __name__ == "__main__":
    ModelServer().start([])

Model Repository Extension

The ModelRepository integrates with KServe's Model Repository Extension protocol for dynamic model management via API.

Model Repository Extension API

# Internal class (not directly instantiated by users)
class ModelRepositoryExtension:
    """
    Model repository extension for v2 protocol.
    Provides API endpoints for dynamic model management.
    """
    def __init__(self, model_repository: ModelRepository): ...

    async def load(self, model_name: str) -> Dict:
        """
        Load a model.

        Args:
            model_name (str): Name of model to load

        Returns:
            dict: Load status
        """

    async def unload(self, model_name: str) -> Dict:
        """
        Unload a model.

        Args:
            model_name (str): Name of model to unload

        Returns:
            dict: Unload status
        """

    async def list(self) -> List[str]:
        """
        List all models.

        Returns:
            list: List of model names
        """

Using Model Repository Extension API

When ModelServer is started with a repository, these endpoints are available:

# Load a model
POST /v2/repository/models/{model_name}/load

# Unload a model
POST /v2/repository/models/{model_name}/unload

# List all models
GET /v2/repository/models

Example:

# Load model
curl -X POST http://localhost:8080/v2/repository/models/sklearn-iris/load

# Unload model
curl -X POST http://localhost:8080/v2/repository/models/sklearn-iris/unload

# List models
curl http://localhost:8080/v2/repository/models

Error Handling

from kserve import ModelRepository
from kserve.errors import ModelMissingError

repository = ModelRepository()

# Handle missing model
try:
    model = repository.get_model("non-existent")
except ModelMissingError as e:
    print(f"Model not found: {e}")

# Check before getting
if repository.is_model_ready("my-model"):
    model = repository.get_model("my-model")
else:
    print("Model not ready")

Complete Example

from kserve import Model, ModelServer, ModelRepository
import joblib
import os

class SKLearnModel(Model):
    def __init__(self, name: str, model_dir: str):
        super().__init__(name)
        self.model_dir = model_dir
        self.model = None

    def load(self):
        model_path = os.path.join(self.model_dir, "model.pkl")
        self.model = joblib.load(model_path)
        self.ready = True
        print(f"Model {self.name} loaded from {model_path}")

    def predict(self, payload, headers=None):
        instances = payload["instances"]
        predictions = self.model.predict(instances)
        return {"predictions": predictions.tolist()}

if __name__ == "__main__":
    # Create repository
    repository = ModelRepository(models_dir="/mnt/models")

    # Discover and load models from directory
    models_dir = "/mnt/models"
    model_names = ["iris", "wine", "digits"]

    for model_name in model_names:
        model_dir = os.path.join(models_dir, model_name)
        if os.path.exists(model_dir):
            # Create and load model
            model = SKLearnModel(model_name, model_dir)
            try:
                model.load()
                repository.update(model)
                print(f"Added {model_name} to repository")
            except Exception as e:
                print(f"Failed to load {model_name}: {e}")

    # Check readiness
    print(f"\nLoaded {len(repository.get_models())} models")
    for name, model in repository.get_models().items():
        print(f"- {name}: ready={repository.is_model_ready(name)}")

    # Start server with all models
    print("\nStarting model server...")
    ModelServer().start(list(repository.get_models().values()))

Best Practices

1. Lazy Loading

Load models only when needed:

class LazyModel(Model):
    def __init__(self, name: str, model_path: str):
        super().__init__(name)
        self.model_path = model_path
        self.model = None
        self.ready = False

    def load(self):
        if not self.model:
            self.model = load_model(self.model_path)
            self.ready = True

# Add to repository without loading
repository.update(LazyModel("model-1", "/path/to/model"))

# Model loads on first prediction

2. Resource Management

Unload models to free memory:

# Monitor memory usage
import psutil

def manage_models(repository: ModelRepository, max_memory_percent: float = 80.0):
    memory = psutil.virtual_memory()
    if memory.percent > max_memory_percent:
        # Unload least recently used model
        models = repository.get_models()
        if models:
            lru_model = min(models.keys())
            repository.unload(lru_model)
            print(f"Unloaded {lru_model} to free memory")

3. Model Validation

Validate models before adding to repository:

def validate_and_add_model(repository: ModelRepository, model: Model):
    try:
        # Validate model can load
        model.load()

        # Validate model can predict
        test_input = {"instances": [[1, 2, 3, 4]]}
        result = model.predict(test_input)

        # Add to repository if valid
        repository.update(model)
        print(f"Model {model.name} validated and added")
    except Exception as e:
        print(f"Model validation failed: {e}")

4. Concurrent Access

Use locks for thread-safe operations:

import threading

class ThreadSafeRepository:
    def __init__(self, repository: ModelRepository):
        self.repository = repository
        self.lock = threading.Lock()

    def get_model(self, name: str) -> Model:
        with self.lock:
            return self.repository.get_model(name)

    def update(self, model: Model, name: str = None):
        with self.lock:
            self.repository.update(model, name)