or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kserve@0.16.x

docs

index.md
tile.json

tessl/pypi-kserve

tessl install tessl/pypi-kserve@0.16.1

KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.

inference-clients.mddocs/reference/

Inference Clients

Make inference requests to remote KServe model servers using REST or gRPC protocols with built-in retry logic and SSL support.

Capabilities

InferenceClientFactory

Factory for creating inference clients with automatic protocol detection and configuration.

class InferenceClientFactory:
    """
    Factory for creating REST or gRPC inference clients.

    Static methods for client creation with automatic resource management.
    """
    @staticmethod
    def get_grpc_client(
        url: str,
        use_ssl: bool = False,
        root_certificates: str = None,
        private_key: str = None,
        certificate_chain: str = None,
        timeout: float = 60,
        verbose: bool = False
    ) -> InferenceGRPCClient:
        """
        Create a gRPC inference client.

        Args:
            url (str): Server URL (e.g., "localhost:8081")
            use_ssl (bool): Use SSL/TLS (default: False)
            root_certificates (str): Path to PEM-encoded root certificates file
            private_key (str): Path to PEM-encoded private key file
            certificate_chain (str): Path to PEM-encoded certificate chain file
            timeout (float): Request timeout in seconds (default: 60)
            verbose (bool): Enable verbose logging (default: False)

        Returns:
            InferenceGRPCClient: Configured gRPC client instance
        """

    @staticmethod
    def get_rest_client(
        url: str,
        protocol: str = "v2",
        use_ssl: bool = True,
        timeout: float = 60,
        verbose: bool = False,
        **kwargs
    ) -> InferenceRESTClient:
        """
        Create a REST inference client.

        Args:
            url (str): Server URL (e.g., "http://localhost:8080")
            protocol (str): Protocol version ("v1" or "v2", default: "v2")
            use_ssl (bool): Verify SSL certificates (default: True)
            timeout (float): Request timeout in seconds (default: 60)
            verbose (bool): Enable verbose logging (default: False)
            **kwargs: Additional configuration options for RESTConfig

        Returns:
            InferenceRESTClient: Configured REST client instance
        """

    @staticmethod
    def close():
        """Close all open clients and cleanup resources."""

InferenceGRPCClient

Asynchronous gRPC client for high-performance inference with v2 protocol support.

class InferenceGRPCClient:
    """
    gRPC client for KServe inference.

    Args:
        url (str): Server URL (e.g., "localhost:8081")
        verbose (bool): Enable verbose logging (default: False)
        use_ssl (bool): Use SSL/TLS (default: False)
        root_certificates (str): Path to PEM-encoded root certificates file
        private_key (str): Path to PEM-encoded private key file
        certificate_chain (str): Path to PEM-encoded certificate chain file
        creds (grpc.ChannelCredentials): gRPC channel credentials
        channel_args (list): Additional channel arguments as (key, value) tuples
        timeout (float): Request timeout in seconds (default: 60)
        retries (int): Number of retry attempts (default: 3)
    """
    def __init__(
        self,
        url: str,
        verbose: bool = False,
        use_ssl: bool = False,
        root_certificates: str = None,
        private_key: str = None,
        certificate_chain: str = None,
        creds: grpc.ChannelCredentials = None,
        channel_args: List[Tuple[str, Any]] = None,
        timeout: Optional[float] = 60,
        retries: Optional[int] = 3
    ): ...

    async def infer(
        self,
        infer_request: InferRequest,
        timeout: Union[Optional[float], _UseClientDefault] = USE_CLIENT_DEFAULT,
        headers: Union[grpc.aio.Metadata, Sequence[Tuple[str, str]], None] = None
    ) -> InferResponse:
        """
        Make an inference request.

        Args:
            infer_request (InferRequest): Inference request object
            timeout (float, optional): Request timeout (overrides client default)
            headers (Metadata or list, optional): Request metadata headers as grpc.aio.Metadata
                or sequence of (key, value) tuples

        Returns:
            InferResponse: Inference response with outputs

        Raises:
            grpc.RpcError: For non-OK-status response
        """

    async def is_server_live(
        self,
        timeout: Union[Optional[float], _UseClientDefault] = USE_CLIENT_DEFAULT,
        headers: Union[grpc.aio.Metadata, Sequence[Tuple[str, str]], None] = None
    ) -> bool:
        """
        Check if server is live.

        Args:
            timeout (float, optional): Request timeout
            headers (Metadata or list, optional): Request metadata headers

        Returns:
            bool: True if server is live

        Raises:
            grpc.RpcError: For non-OK-status response
        """

    async def is_server_ready(
        self,
        timeout: Union[Optional[float], _UseClientDefault] = USE_CLIENT_DEFAULT,
        headers: Union[grpc.aio.Metadata, Sequence[Tuple[str, str]], None] = None
    ) -> bool:
        """
        Check if server is ready.

        Args:
            timeout (float, optional): Request timeout
            headers (Metadata or list, optional): Request metadata headers

        Returns:
            bool: True if server is ready

        Raises:
            grpc.RpcError: For non-OK-status response
        """

    async def is_model_ready(
        self,
        model_name: str,
        timeout: Union[Optional[float], _UseClientDefault] = USE_CLIENT_DEFAULT,
        headers: Union[grpc.aio.Metadata, Sequence[Tuple[str, str]], None] = None
    ) -> bool:
        """
        Check if model is ready.

        Args:
            model_name (str): Name of the model
            timeout (float, optional): Request timeout
            headers (Metadata or list, optional): Request metadata headers

        Returns:
            bool: True if model is ready

        Raises:
            grpc.RpcError: For non-OK-status response or if model not found
        """

    async def close(self) -> None:
        """Close the gRPC channel and cleanup resources"""

    async def __aenter__(self):
        """Async context manager entry"""
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit"""
        await self.close()

InferenceRESTClient

Asynchronous REST client for KServe inference supporting both v1 and v2 protocols.

class InferenceRESTClient:
    """
    REST client for KServe inference.

    Args:
        config (RESTConfig, optional): Client configuration
    """
    def __init__(self, config: RESTConfig = None): ...

    async def infer(
        self,
        base_url: Union[httpx.URL, str],
        data: Union[InferRequest, dict],
        model_name: Optional[str] = None,
        headers: Optional[Mapping[str, str]] = None,
        response_headers: Dict[str, str] = None,
        is_graph_endpoint: bool = False,
        timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
    ) -> Union[InferResponse, Dict]:
        """
        Make an inference request.

        Args:
            base_url (str or httpx.URL): Base URL of inference server (e.g., "http://localhost:8080")
            data (InferRequest or dict): Request data
            model_name (str, optional): Model name (required unless is_graph_endpoint=True)
            headers (dict, optional): HTTP headers
            response_headers (dict, optional): Dict to populate with response headers
            is_graph_endpoint (bool): If True, use base_url as-is for inference graph (default: False)
            timeout (float or httpx.Timeout, optional): Request timeout

        Returns:
            InferResponse or dict: Response data (InferResponse for v2, dict for v1 or graph)

        Raises:
            HTTPStatusError: For non-2xx response codes
            UnsupportedProtocol: If protocol version not supported
        """

    async def explain(
        self,
        base_url: Union[httpx.URL, str],
        model_name: str,
        data: Dict,
        headers: Optional[Mapping[str, str]] = None,
        timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
    ) -> Dict:
        """
        Make an explanation request (v1 protocol only).

        Args:
            base_url (str or httpx.URL): Base URL of inference server
            model_name (str): Model name
            data (dict): Request data
            headers (dict, optional): HTTP headers
            timeout (float or httpx.Timeout, optional): Request timeout

        Returns:
            dict: Explanation response

        Raises:
            HTTPStatusError: For non-2xx response codes
            UnsupportedProtocol: If protocol is not v1
        """

    async def is_server_live(
        self,
        base_url: Union[str, httpx.URL],
        headers: Optional[Mapping[str, str]] = None,
        timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
    ) -> bool:
        """
        Check if server is live.

        Args:
            base_url (str or httpx.URL): Base URL of inference server
            headers (dict, optional): HTTP headers
            timeout (float or httpx.Timeout, optional): Request timeout

        Returns:
            bool: True if server is live

        Raises:
            HTTPStatusError: For non-2xx response codes
            UnsupportedProtocol: If protocol not supported
        """

    async def is_server_ready(
        self,
        base_url: Union[httpx.URL, str],
        headers: Optional[Mapping[str, str]] = None,
        timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
    ) -> bool:
        """
        Check if server is ready.

        Args:
            base_url (str or httpx.URL): Base URL of inference server
            headers (dict, optional): HTTP headers
            timeout (float or httpx.Timeout, optional): Request timeout

        Returns:
            bool: True if server is ready

        Raises:
            HTTPStatusError: For non-2xx response codes
            UnsupportedProtocol: If protocol not supported
        """

    async def is_model_ready(
        self,
        base_url: Union[httpx.URL, str],
        model_name: str,
        headers: Optional[Mapping[str, str]] = None,
        timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
    ) -> bool:
        """
        Check if model is ready.

        Args:
            base_url (str or httpx.URL): Base URL of inference server
            model_name (str): Model name
            headers (dict, optional): HTTP headers
            timeout (float or httpx.Timeout, optional): Request timeout

        Returns:
            bool: True if model is ready

        Raises:
            HTTPStatusError: For non-2xx response codes in v1 (v2 returns success status)
            UnsupportedProtocol: If protocol not supported
        """

    async def get_model_metadata(
        self,
        base_url: Union[httpx.URL, str],
        model_name: str,
        headers: Optional[Mapping[str, str]] = None,
        timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
    ) -> Dict:
        """
        Get model metadata.

        Args:
            base_url (str or httpx.URL): Base URL of inference server
            model_name (str): Model name
            headers (dict, optional): HTTP headers
            timeout (float or httpx.Timeout, optional): Request timeout

        Returns:
            dict: Model metadata including name, versions, platform, inputs, outputs

        Raises:
            HTTPStatusError: For non-2xx response codes
            UnsupportedProtocol: If protocol not supported
        """

    async def get_server_metadata(
        self,
        base_url: Union[httpx.URL, str],
        headers: Optional[Mapping[str, str]] = None,
        timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
    ) -> Dict:
        """
        Get server metadata.

        Args:
            base_url (str or httpx.URL): Base URL of inference server
            headers (dict, optional): HTTP headers
            timeout (float or httpx.Timeout, optional): Request timeout

        Returns:
            dict: Server metadata including name, version, extensions

        Raises:
            HTTPStatusError: For non-2xx response codes
            UnsupportedProtocol: If protocol not supported
        """

    async def list_models(
        self,
        base_url: Union[httpx.URL, str],
        headers: Optional[Mapping[str, str]] = None,
        timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
    ) -> List[str]:
        """
        List all models.

        Args:
            base_url (str or httpx.URL): Base URL of inference server
            headers (dict, optional): HTTP headers
            timeout (float or httpx.Timeout, optional): Request timeout

        Returns:
            list: List of model names

        Raises:
            HTTPStatusError: For non-2xx response codes
            UnsupportedProtocol: If protocol not supported
        """

    async def close(self) -> None:
        """Close the client, transport and proxies"""

    async def __aenter__(self):
        """Async context manager entry"""
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit"""
        await self.close()

RESTConfig

Configuration for REST client.

class RESTConfig:
    """
    Configuration for InferenceRESTClient.

    Args:
        transport (httpx.AsyncBaseTransport, optional): Asynchronous transport class
        protocol (str or PredictorProtocol): Protocol version ("v1" or "v2", default: "v1")
        retries (int): Number of retry attempts (default: 3)
        http2 (bool): Enable HTTP/2 support (default: False)
        timeout (float, None, tuple, or httpx.Timeout): Request timeout (default: 60 seconds)
        cert (str or tuple): SSL client certificate - path to file, (cert_file, key_file) tuple,
            or (cert_file, key_file, password) tuple
        verify (str, bool, or ssl.SSLContext): SSL verification - True (default CA bundle),
            path to CA bundle file, ssl.SSLContext, or False (disable verification)
        auth (httpx.Auth, optional): Authentication handler
        verbose (bool): Enable verbose logging (default: False)
    """
    def __init__(
        self,
        transport: httpx.AsyncBaseTransport = None,
        protocol: Union[str, PredictorProtocol] = "v1",
        retries: int = 3,
        http2: bool = False,
        timeout: Union[float, None, tuple, httpx.Timeout] = 60,
        cert = None,
        verify: Union[str, bool, ssl.SSLContext] = True,
        auth = None,
        verbose: bool = False
    ): ...

    # Properties
    @property
    def transport(self) -> Optional[httpx.AsyncBaseTransport]:
        """Asynchronous transport instance"""

    @property
    def http2(self) -> bool:
        """HTTP/2 support enabled"""

    @property
    def cert(self):
        """SSL client certificate configuration"""

    @property
    def verify(self) -> Union[str, bool, ssl.SSLContext]:
        """SSL verification configuration"""

    @property
    def auth(self):
        """Authentication handler"""

    @property
    def verbose(self) -> bool:
        """Verbose logging enabled"""

Usage Examples

gRPC Client Basic Usage

import asyncio
from kserve import InferenceGRPCClient, InferInput
import numpy as np

async def main():
    # Create client
    client = InferenceGRPCClient(url="localhost:8081")

    # Prepare input data
    input_data = InferInput(
        name="input-0",
        shape=[1, 4],
        datatype="FP32",
        data=[[5.1, 3.5, 1.4, 0.2]]
    )

    # Make inference request
    response = await client.infer(
        model_name="sklearn-iris",
        inputs=[input_data]
    )

    # Extract output
    output = response.outputs[0]
    predictions = output.data
    print(f"Predictions: {predictions}")

    client.close()

asyncio.run(main())

gRPC Client with NumPy

import asyncio
from kserve import InferenceGRPCClient, InferInput
import numpy as np

async def main():
    client = InferenceGRPCClient(url="localhost:8081")

    # Create NumPy array
    data = np.array([[1, 2, 3, 4]], dtype=np.float32)

    # Create input with NumPy data
    input_tensor = InferInput(name="input-0", shape=list(data.shape), datatype="FP32")
    input_tensor.set_data_from_numpy(data)

    # Infer
    response = await client.infer(
        model_name="my-model",
        inputs=[input_tensor]
    )

    # Convert output to NumPy
    output = response.outputs[0]
    result = output.as_numpy()
    print(f"Result shape: {result.shape}")
    print(f"Result: {result}")

    client.close()

asyncio.run(main())

gRPC Client with SSL

import asyncio
from kserve import InferenceGRPCClient, InferInput

async def main():
    # Read certificates
    with open("/path/to/ca.pem", "rb") as f:
        root_certs = f.read()

    with open("/path/to/client-key.pem", "rb") as f:
        private_key = f.read()

    with open("/path/to/client-cert.pem", "rb") as f:
        cert_chain = f.read()

    # Create client with SSL
    client = InferenceGRPCClient(
        url="secure.example.com:8081",
        use_ssl=True,
        root_certificates=root_certs,
        private_key=private_key,
        certificate_chain=cert_chain
    )

    # Make request
    input_data = InferInput(name="input", shape=[1, 4], datatype="FP32", data=[[1, 2, 3, 4]])
    response = await client.infer(model_name="my-model", inputs=[input_data])

    client.close()

asyncio.run(main())

gRPC Client Health Checks

import asyncio
from kserve import InferenceGRPCClient

async def main():
    client = InferenceGRPCClient(url="localhost:8081")

    # Check server liveness
    is_live = await client.is_server_live()
    print(f"Server live: {is_live}")

    # Check server readiness
    is_ready = await client.is_server_ready()
    print(f"Server ready: {is_ready}")

    # Check model readiness
    model_ready = await client.is_model_ready("sklearn-iris")
    print(f"Model ready: {model_ready}")

    # Get server metadata
    server_meta = await client.get_server_metadata()
    print(f"Server: {server_meta}")

    # Get model metadata
    model_meta = await client.get_model_metadata("sklearn-iris")
    print(f"Model: {model_meta}")

    client.close()

asyncio.run(main())

REST Client Basic Usage

from kserve import InferenceRESTClient

# Create client
client = InferenceRESTClient(url="http://localhost:8080")

# Make inference request (v2 protocol)
response = client.infer(
    model_name="sklearn-iris",
    data={
        "inputs": [
            {
                "name": "input-0",
                "shape": [1, 4],
                "datatype": "FP32",
                "data": [[5.1, 3.5, 1.4, 0.2]]
            }
        ]
    }
)

print(response)

REST Client v1 Protocol

from kserve import InferenceRESTClient, RESTConfig

# Create client with v1 protocol
config = RESTConfig(protocol="v1")
client = InferenceRESTClient(url="http://localhost:8080", config=config)

# Make prediction (v1 protocol)
response = client.infer(
    model_name="sklearn-iris",
    data={
        "instances": [
            [5.1, 3.5, 1.4, 0.2]
        ]
    }
)

print(response["predictions"])

# Make explanation request
explanation = client.explain(
    model_name="sklearn-iris",
    data={
        "instances": [
            [5.1, 3.5, 1.4, 0.2]
        ]
    }
)

print(explanation)

REST Client with Timeout and Retries

from kserve import InferenceRESTClient, RESTConfig

# Configure with timeout and retries
config = RESTConfig(
    protocol="v2",
    timeout=30,
    retries=3
)

client = InferenceRESTClient(url="http://localhost:8080", config=config)

# Request with custom timeout
response = client.infer(
    model_name="slow-model",
    data={"inputs": [...]},
    timeout=60  # Override default
)

REST Client with SSL

from kserve import InferenceRESTClient, RESTConfig

# Configure SSL
config = RESTConfig(
    verify_ssl=True,
    cert="/path/to/client-cert.pem",
    key="/path/to/client-key.pem"
)

client = InferenceRESTClient(url="https://secure.example.com:8080", config=config)

response = client.infer(model_name="my-model", data={"inputs": [...]})

REST Client Health Checks

from kserve import InferenceRESTClient

client = InferenceRESTClient(url="http://localhost:8080")

# Check server liveness
if client.is_server_live():
    print("Server is live")

# Check server readiness
if client.is_server_ready():
    print("Server is ready")

# Check model readiness
if client.is_model_ready("sklearn-iris"):
    print("Model is ready")

# List all models
models = client.list_models()
print(f"Available models: {models}")

# Get metadata
server_meta = client.get_server_metadata()
model_meta = client.get_model_metadata("sklearn-iris")

Built-in Retry Policy

The gRPC client includes a default retry policy:

  • Retry attempts: 3
  • Backoff strategy: Exponential with jitter
  • Retryable status codes: UNAVAILABLE, DEADLINE_EXCEEDED
  • Max backoff: 30 seconds

Customize retries:

# Disable retries
client = InferenceGRPCClient(url="localhost:8081", retries=0)

# Custom retry count
client = InferenceGRPCClient(url="localhost:8081", retries=5)

Request Timeout

Both clients support request timeouts:

# Set default timeout
client = InferenceGRPCClient(url="localhost:8081", timeout=120)

# Override per request
await client.infer(model_name="my-model", inputs=[...], timeout=60)

Error Handling

Handle client errors:

import asyncio
from kserve import InferenceGRPCClient, InferInput
from grpc import RpcError

async def main():
    client = InferenceGRPCClient(url="localhost:8081")

    try:
        input_data = InferInput(name="input", shape=[1, 4], datatype="FP32", data=[[1, 2, 3, 4]])
        response = await client.infer(model_name="my-model", inputs=[input_data])
    except RpcError as e:
        print(f"gRPC error: {e.code()} - {e.details()}")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        client.close()

asyncio.run(main())

Multiple Outputs

Handle models with multiple outputs:

import asyncio
from kserve import InferenceGRPCClient, InferInput

async def main():
    client = InferenceGRPCClient(url="localhost:8081")

    input_data = InferInput(name="input", shape=[1, 4], datatype="FP32", data=[[1, 2, 3, 4]])

    response = await client.infer(model_name="multi-output-model", inputs=[input_data])

    # Access multiple outputs
    for output in response.outputs:
        print(f"Output: {output.name}")
        print(f"Shape: {output.shape}")
        print(f"Data: {output.data}")

    client.close()

asyncio.run(main())

Custom Headers

Pass custom headers to requests:

import asyncio
from kserve import InferenceGRPCClient, InferInput

async def main():
    client = InferenceGRPCClient(url="localhost:8081")

    input_data = InferInput(name="input", shape=[1, 4], datatype="FP32", data=[[1, 2, 3, 4]])

    # Pass custom headers
    response = await client.infer(
        model_name="my-model",
        inputs=[input_data],
        headers={
            "x-request-id": "12345",
            "x-user-id": "user-123"
        }
    )

    client.close()

asyncio.run(main())

Request Parameters

Pass additional parameters:

import asyncio
from kserve import InferenceGRPCClient, InferInput

async def main():
    client = InferenceGRPCClient(url="localhost:8081")

    input_data = InferInput(name="input", shape=[1, 4], datatype="FP32", data=[[1, 2, 3, 4]])

    # Pass parameters
    response = await client.infer(
        model_name="my-model",
        inputs=[input_data],
        parameters={
            "batch_size": 1,
            "priority": "high"
        }
    )

    client.close()

asyncio.run(main())

Generation Requests (LLMs)

Make generation requests for LLM models:

import asyncio
from kserve import InferenceGRPCClient, InferInput

async def main():
    client = InferenceGRPCClient(url="localhost:8081")

    # Text prompt
    prompt = InferInput(
        name="prompt",
        shape=[1],
        datatype="BYTES",
        data=["What is machine learning?"]
    )

    # Generation parameters
    response = await client.generate(
        model_name="llama-7b",
        inputs=[prompt],
        parameters={
            "max_tokens": 100,
            "temperature": 0.7,
            "top_p": 0.9
        }
    )

    # Extract generated text
    output = response.outputs[0]
    generated_text = output.data[0]
    print(f"Generated: {generated_text}")

    client.close()

asyncio.run(main())

USE_CLIENT_DEFAULT

Sentinel value for using client defaults:

USE_CLIENT_DEFAULT: object

Used to distinguish between explicitly passing None vs using default values.