or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kserve@0.16.x

docs

index.md
tile.json

tessl/pypi-kserve

tessl install tessl/pypi-kserve@0.16.1

KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.

model-server.mddocs/reference/

Model Server

Start and manage a FastAPI-based model server with support for multiple models, workers, HTTP/gRPC protocols, and comprehensive configuration options.

Capabilities

ModelServer Class

The ModelServer orchestrates the inference server lifecycle, managing HTTP and gRPC servers, worker processes, and model registration.

class ModelServer:
    """
    Model server orchestrator for KServe inference servers.
    Manages FastAPI HTTP server and optionally gRPC server.

    Args:
        http_port (int): HTTP server port (default: 8080)
        grpc_port (int): gRPC server port (default: 8081)
        workers (int): Number of worker processes (default: 1)
        max_threads (int): Maximum threads per worker (default: None)
        max_asyncio_workers (int): Maximum asyncio workers (default: None)
        registered_models (ModelRepository, optional): Model repository instance
        enable_grpc (bool): Enable gRPC server (default: True)
        enable_docs_url (bool): Enable FastAPI docs at /docs (default: False)
        enable_latency_logging (bool): Enable latency logging (default: True)
        access_log_format (str, optional): Access log format string
        grace_period (int): Graceful shutdown period in seconds (default: 30)
        predictor_config (PredictorConfig, optional): Predictor configuration
    """
    def __init__(
        self,
        http_port: int = 8080,
        grpc_port: int = 8081,
        workers: int = 1,
        max_threads: int = None,
        max_asyncio_workers: int = None,
        registered_models: ModelRepository = None,
        enable_grpc: bool = True,
        enable_docs_url: bool = False,
        enable_latency_logging: bool = True,
        access_log_format: str = None,
        grace_period: int = 30,
        predictor_config = None
    ): ...

    def start(self, models: Union[List[Model], Dict[str, Model]]) -> None:
        """
        Start the model server with given models.

        Args:
            models (list or dict): Model instances to serve.
                Can be a list of Model objects or a dict mapping names to models.
        """

    def create_application(self) -> FastAPI:
        """
        Create and configure the FastAPI application.

        Returns:
            FastAPI: Configured FastAPI application instance
        """

    def register_model(self, model: Model) -> None:
        """
        Register a model with the server.

        Args:
            model (Model): Model instance to register
        """

    def register_exception_handler(
        self,
        exc_class: type,
        handler: Callable
    ) -> None:
        """
        Register a custom exception handler.

        Args:
            exc_class (type): Exception class to handle
            handler (Callable): Handler function for the exception
        """

    def default_exception_handler(
        self,
        request,
        exc: Exception
    ) -> JSONResponse:
        """
        Default exception handler for unhandled exceptions.

        Args:
            request: FastAPI request object
            exc (Exception): Raised exception

        Returns:
            JSONResponse: Error response with status 500
        """

    def setup_event_loop(self) -> None:
        """Setup asyncio event loop with thread pool executor."""

    def register_signal_handler(self) -> None:
        """Register signal handlers for graceful shutdown."""

    def stop(self) -> None:
        """
        Stop the model server gracefully.
        Completes in-flight requests and shuts down workers.
        """

Starting a Model Server

Basic Usage

from kserve import Model, ModelServer

class MyModel(Model):
    def load(self):
        self.model = load_my_model()
        self.ready = True

    def predict(self, payload, headers=None):
        return {"predictions": self.model.predict(payload["instances"])}

if __name__ == "__main__":
    model = MyModel("my-model")
    model.load()
    ModelServer().start([model])

Multiple Models

Serve multiple models from a single server:

if __name__ == "__main__":
    model1 = MyModel("model-a")
    model1.load()

    model2 = MyModel("model-b")
    model2.load()

    # Pass as list
    ModelServer().start([model1, model2])

    # Or pass as dictionary
    ModelServer().start({
        "model-a": model1,
        "model-b": model2
    })

Command-Line Arguments

The ModelServer accepts command-line arguments for configuration:

# HTTP Configuration
--http_port: int = 8080           # HTTP server port
--workers: int = 1                # Number of worker processes
--max_threads: int = 4            # Max threads per worker
--max_asyncio_workers: int = None # Max asyncio workers

# gRPC Configuration
--grpc_port: int = 8081           # gRPC server port
--enable_grpc: bool = True        # Enable gRPC server

# API Documentation
--enable_docs_url: bool = False   # Enable FastAPI docs at /docs

# Logging Configuration
--configure_logging: bool = True  # Configure KServe logging
--log_config_file: str = None     # Path to logging config file
--access_log_format: str = None   # Access log format string
--enable_latency_logging: bool = True  # Log inference latency

Usage Examples

# Start with custom HTTP port
python model.py --http_port 9000

# Start with multiple workers
python model.py --workers 4

# Disable gRPC server
python model.py --enable_grpc false

# Enable API documentation
python model.py --enable_docs_url true

# Custom log config
python model.py --log_config_file /path/to/log_config.yaml

Protocol Endpoints

REST v2 Endpoints (default)

  • GET /v2/health/live - Server liveness
  • GET /v2/health/ready - Server readiness
  • GET /v2/models - List all models
  • GET /v2/models/{model_name} - Model metadata
  • GET /v2/models/{model_name}/ready - Model readiness
  • POST /v2/models/{model_name}/infer - Inference request
  • POST /v2/models/{model_name}/generate - Generation request (LLM)

REST v1 Endpoints (legacy)

  • POST /v1/models/{model_name}:predict - Prediction
  • POST /v1/models/{model_name}:explain - Explanation

OpenAPI Documentation

When --enable_docs_url true is set:

  • GET /docs - Swagger UI
  • GET /redoc - ReDoc UI
  • GET /openapi.json - OpenAPI specification

Metrics

  • GET /metrics - Prometheus metrics

Prometheus histograms:

  • request_preprocess_seconds - Preprocessing latency
  • request_predict_seconds - Prediction latency
  • request_postprocess_seconds - Postprocessing latency
  • request_explain_seconds - Explanation latency

Server Configuration

Multi-Worker Mode

Run multiple worker processes for increased throughput:

# Start with 4 workers
python model.py --workers 4

Workers are managed using Uvicorn's multiprocess mode. Each worker is a separate process with its own model instance.

Thread Configuration

Configure threading for CPU-bound operations:

# Set max threads per worker
python model.py --max_threads 8

Asyncio Workers

Configure asyncio task workers:

# Set max asyncio workers
python model.py --max_asyncio_workers 100

gRPC Server

KServe supports gRPC v2 protocol for high-performance inference:

# Enable/disable gRPC
python model.py --enable_grpc true

# Set gRPC port
python model.py --grpc_port 9000

gRPC endpoints:

  • ServerLive - Server liveness
  • ServerReady - Server readiness
  • ModelReady - Model readiness
  • ServerMetadata - Server metadata
  • ModelMetadata - Model metadata
  • ModelInfer - Inference request

Logging Configuration

Default Logging

KServe configures structured logging by default:

from kserve import logger

logger.info("Model loaded successfully")
logger.error("Prediction failed", exc_info=True)

Custom Log Configuration

Provide a custom logging configuration:

# YAML config file
python model.py --log_config_file /path/to/logging.yaml

# JSON config
python model.py --log_config_file /path/to/logging.json

Example YAML config:

version: 1
formatters:
  default:
    format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
handlers:
  console:
    class: logging.StreamHandler
    formatter: default
    level: INFO
loggers:
  kserve:
    level: INFO
    handlers: [console]
  kserve.trace:
    level: DEBUG
    handlers: [console]

Disable Logging Configuration

python model.py --configure_logging false

Latency Logging

KServe automatically logs inference latency:

# Enable/disable latency logging
python model.py --enable_latency_logging true

Logs include:

  • Preprocessing time
  • Prediction time
  • Postprocessing time
  • Total inference time

Access Log Format

Customize HTTP access log format:

python model.py --access_log_format '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'

Health Checks

Liveness Probe

Check if server process is alive:

curl http://localhost:8080/v2/health/live

Response:

{"live": true}

Readiness Probe

Check if server is ready to accept requests:

curl http://localhost:8080/v2/health/ready

Response:

{"ready": true}

Returns ready when at least one model is loaded and ready.

Model Readiness

Check if a specific model is ready:

curl http://localhost:8080/v2/models/my-model/ready

Response:

{"ready": true}

Creating Custom Applications

For advanced use cases, create a custom FastAPI application:

from kserve import ModelServer, Model

class MyModel(Model):
    def load(self):
        self.model = load_my_model()
        self.ready = True

    def predict(self, payload, headers=None):
        return {"predictions": self.model.predict(payload["instances"])}

if __name__ == "__main__":
    # Create model
    model = MyModel("my-model")
    model.load()

    # Create server and get FastAPI app
    server = ModelServer()
    app = server.create_application()

    # Register models
    server.start([model])

Add custom routes:

from fastapi import FastAPI

app = server.create_application()

@app.get("/custom")
def custom_endpoint():
    return {"message": "Custom endpoint"}

server.start([model])

Server Metadata

Get server metadata:

curl http://localhost:8080/v2

Response:

{
  "name": "kserve",
  "version": "0.16.0",
  "extensions": []
}

Model Metadata

Get model metadata:

curl http://localhost:8080/v2/models/my-model

Response:

{
  "name": "my-model",
  "versions": ["1"],
  "platform": "kserve",
  "inputs": [],
  "outputs": []
}

List Models

List all loaded models:

curl http://localhost:8080/v2/models

Response:

{
  "models": ["my-model", "another-model"]
}

Error Responses

KServe returns structured error responses:

{
  "error": "Model my-model is not ready",
  "code": 503
}

Common HTTP status codes:

  • 400 - Invalid input
  • 404 - Model not found
  • 500 - Inference error
  • 503 - Model not ready / Server not ready

Shutdown Handling

KServe handles graceful shutdown:

  • SIGTERM signal triggers shutdown
  • In-flight requests are completed
  • Models are unloaded
  • Server exits cleanly

SSL/TLS Configuration

Configure SSL for HTTPS:

# Via Uvicorn SSL configuration
import uvicorn

if __name__ == "__main__":
    model = MyModel("my-model")
    model.load()

    server = ModelServer()
    server.start([model])

    # Or run with SSL manually
    uvicorn.run(
        server.create_application(),
        host="0.0.0.0",
        port=8443,
        ssl_keyfile="/path/to/key.pem",
        ssl_certfile="/path/to/cert.pem"
    )

Environment Variables

KServe respects environment variables:

  • KSERVE_LOGLEVEL - Log level (default: INFO)
  • MODELS_DIR - Models directory (default: /mnt/models)

CloudEvents Support

KServe supports CloudEvents for request/response:

# Structured CloudEvents
Content-Type: application/cloudevents+json

{
  "specversion": "1.0",
  "type": "org.kserve.inference.request",
  "source": "client",
  "id": "12345",
  "datacontenttype": "application/json",
  "data": {
    "instances": [[1, 2, 3]]
  }
}

Complete Server Example

from kserve import Model, ModelServer
import argparse

class SKLearnModel(Model):
    def load(self):
        import joblib
        self.model = joblib.load("/mnt/models/model.pkl")
        self.ready = True

    def predict(self, payload, headers=None):
        instances = payload["instances"]
        predictions = self.model.predict(instances)
        return {"predictions": predictions.tolist()}

if __name__ == "__main__":
    # Parse arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--model_name", default="sklearn-model")
    args, _ = parser.parse_known_args()

    # Create and load model
    model = SKLearnModel(args.model_name)
    model.load()

    # Start server
    # Additional args are parsed by ModelServer
    ModelServer().start([model])

Run with:

python model.py --model_name my-model --http_port 9000 --workers 2 --enable_grpc false