tessl install tessl/pypi-kserve@0.16.1KServe is a comprehensive Python SDK that provides standardized interfaces for building and deploying machine learning model serving infrastructure on Kubernetes.
Make inference requests to remote KServe model servers using REST or gRPC protocols with built-in retry logic and SSL support.
Factory for creating inference clients with automatic protocol detection and configuration.
class InferenceClientFactory:
"""
Factory for creating REST or gRPC inference clients.
Static methods for client creation with automatic resource management.
"""
@staticmethod
def get_grpc_client(
url: str,
use_ssl: bool = False,
root_certificates: str = None,
private_key: str = None,
certificate_chain: str = None,
timeout: float = 60,
verbose: bool = False
) -> InferenceGRPCClient:
"""
Create a gRPC inference client.
Args:
url (str): Server URL (e.g., "localhost:8081")
use_ssl (bool): Use SSL/TLS (default: False)
root_certificates (str): Path to PEM-encoded root certificates file
private_key (str): Path to PEM-encoded private key file
certificate_chain (str): Path to PEM-encoded certificate chain file
timeout (float): Request timeout in seconds (default: 60)
verbose (bool): Enable verbose logging (default: False)
Returns:
InferenceGRPCClient: Configured gRPC client instance
"""
@staticmethod
def get_rest_client(
url: str,
protocol: str = "v2",
use_ssl: bool = True,
timeout: float = 60,
verbose: bool = False,
**kwargs
) -> InferenceRESTClient:
"""
Create a REST inference client.
Args:
url (str): Server URL (e.g., "http://localhost:8080")
protocol (str): Protocol version ("v1" or "v2", default: "v2")
use_ssl (bool): Verify SSL certificates (default: True)
timeout (float): Request timeout in seconds (default: 60)
verbose (bool): Enable verbose logging (default: False)
**kwargs: Additional configuration options for RESTConfig
Returns:
InferenceRESTClient: Configured REST client instance
"""
@staticmethod
def close():
"""Close all open clients and cleanup resources."""Asynchronous gRPC client for high-performance inference with v2 protocol support.
class InferenceGRPCClient:
"""
gRPC client for KServe inference.
Args:
url (str): Server URL (e.g., "localhost:8081")
verbose (bool): Enable verbose logging (default: False)
use_ssl (bool): Use SSL/TLS (default: False)
root_certificates (str): Path to PEM-encoded root certificates file
private_key (str): Path to PEM-encoded private key file
certificate_chain (str): Path to PEM-encoded certificate chain file
creds (grpc.ChannelCredentials): gRPC channel credentials
channel_args (list): Additional channel arguments as (key, value) tuples
timeout (float): Request timeout in seconds (default: 60)
retries (int): Number of retry attempts (default: 3)
"""
def __init__(
self,
url: str,
verbose: bool = False,
use_ssl: bool = False,
root_certificates: str = None,
private_key: str = None,
certificate_chain: str = None,
creds: grpc.ChannelCredentials = None,
channel_args: List[Tuple[str, Any]] = None,
timeout: Optional[float] = 60,
retries: Optional[int] = 3
): ...
async def infer(
self,
infer_request: InferRequest,
timeout: Union[Optional[float], _UseClientDefault] = USE_CLIENT_DEFAULT,
headers: Union[grpc.aio.Metadata, Sequence[Tuple[str, str]], None] = None
) -> InferResponse:
"""
Make an inference request.
Args:
infer_request (InferRequest): Inference request object
timeout (float, optional): Request timeout (overrides client default)
headers (Metadata or list, optional): Request metadata headers as grpc.aio.Metadata
or sequence of (key, value) tuples
Returns:
InferResponse: Inference response with outputs
Raises:
grpc.RpcError: For non-OK-status response
"""
async def is_server_live(
self,
timeout: Union[Optional[float], _UseClientDefault] = USE_CLIENT_DEFAULT,
headers: Union[grpc.aio.Metadata, Sequence[Tuple[str, str]], None] = None
) -> bool:
"""
Check if server is live.
Args:
timeout (float, optional): Request timeout
headers (Metadata or list, optional): Request metadata headers
Returns:
bool: True if server is live
Raises:
grpc.RpcError: For non-OK-status response
"""
async def is_server_ready(
self,
timeout: Union[Optional[float], _UseClientDefault] = USE_CLIENT_DEFAULT,
headers: Union[grpc.aio.Metadata, Sequence[Tuple[str, str]], None] = None
) -> bool:
"""
Check if server is ready.
Args:
timeout (float, optional): Request timeout
headers (Metadata or list, optional): Request metadata headers
Returns:
bool: True if server is ready
Raises:
grpc.RpcError: For non-OK-status response
"""
async def is_model_ready(
self,
model_name: str,
timeout: Union[Optional[float], _UseClientDefault] = USE_CLIENT_DEFAULT,
headers: Union[grpc.aio.Metadata, Sequence[Tuple[str, str]], None] = None
) -> bool:
"""
Check if model is ready.
Args:
model_name (str): Name of the model
timeout (float, optional): Request timeout
headers (Metadata or list, optional): Request metadata headers
Returns:
bool: True if model is ready
Raises:
grpc.RpcError: For non-OK-status response or if model not found
"""
async def close(self) -> None:
"""Close the gRPC channel and cleanup resources"""
async def __aenter__(self):
"""Async context manager entry"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self.close()Asynchronous REST client for KServe inference supporting both v1 and v2 protocols.
class InferenceRESTClient:
"""
REST client for KServe inference.
Args:
config (RESTConfig, optional): Client configuration
"""
def __init__(self, config: RESTConfig = None): ...
async def infer(
self,
base_url: Union[httpx.URL, str],
data: Union[InferRequest, dict],
model_name: Optional[str] = None,
headers: Optional[Mapping[str, str]] = None,
response_headers: Dict[str, str] = None,
is_graph_endpoint: bool = False,
timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
) -> Union[InferResponse, Dict]:
"""
Make an inference request.
Args:
base_url (str or httpx.URL): Base URL of inference server (e.g., "http://localhost:8080")
data (InferRequest or dict): Request data
model_name (str, optional): Model name (required unless is_graph_endpoint=True)
headers (dict, optional): HTTP headers
response_headers (dict, optional): Dict to populate with response headers
is_graph_endpoint (bool): If True, use base_url as-is for inference graph (default: False)
timeout (float or httpx.Timeout, optional): Request timeout
Returns:
InferResponse or dict: Response data (InferResponse for v2, dict for v1 or graph)
Raises:
HTTPStatusError: For non-2xx response codes
UnsupportedProtocol: If protocol version not supported
"""
async def explain(
self,
base_url: Union[httpx.URL, str],
model_name: str,
data: Dict,
headers: Optional[Mapping[str, str]] = None,
timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
) -> Dict:
"""
Make an explanation request (v1 protocol only).
Args:
base_url (str or httpx.URL): Base URL of inference server
model_name (str): Model name
data (dict): Request data
headers (dict, optional): HTTP headers
timeout (float or httpx.Timeout, optional): Request timeout
Returns:
dict: Explanation response
Raises:
HTTPStatusError: For non-2xx response codes
UnsupportedProtocol: If protocol is not v1
"""
async def is_server_live(
self,
base_url: Union[str, httpx.URL],
headers: Optional[Mapping[str, str]] = None,
timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
) -> bool:
"""
Check if server is live.
Args:
base_url (str or httpx.URL): Base URL of inference server
headers (dict, optional): HTTP headers
timeout (float or httpx.Timeout, optional): Request timeout
Returns:
bool: True if server is live
Raises:
HTTPStatusError: For non-2xx response codes
UnsupportedProtocol: If protocol not supported
"""
async def is_server_ready(
self,
base_url: Union[httpx.URL, str],
headers: Optional[Mapping[str, str]] = None,
timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
) -> bool:
"""
Check if server is ready.
Args:
base_url (str or httpx.URL): Base URL of inference server
headers (dict, optional): HTTP headers
timeout (float or httpx.Timeout, optional): Request timeout
Returns:
bool: True if server is ready
Raises:
HTTPStatusError: For non-2xx response codes
UnsupportedProtocol: If protocol not supported
"""
async def is_model_ready(
self,
base_url: Union[httpx.URL, str],
model_name: str,
headers: Optional[Mapping[str, str]] = None,
timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
) -> bool:
"""
Check if model is ready.
Args:
base_url (str or httpx.URL): Base URL of inference server
model_name (str): Model name
headers (dict, optional): HTTP headers
timeout (float or httpx.Timeout, optional): Request timeout
Returns:
bool: True if model is ready
Raises:
HTTPStatusError: For non-2xx response codes in v1 (v2 returns success status)
UnsupportedProtocol: If protocol not supported
"""
async def get_model_metadata(
self,
base_url: Union[httpx.URL, str],
model_name: str,
headers: Optional[Mapping[str, str]] = None,
timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
) -> Dict:
"""
Get model metadata.
Args:
base_url (str or httpx.URL): Base URL of inference server
model_name (str): Model name
headers (dict, optional): HTTP headers
timeout (float or httpx.Timeout, optional): Request timeout
Returns:
dict: Model metadata including name, versions, platform, inputs, outputs
Raises:
HTTPStatusError: For non-2xx response codes
UnsupportedProtocol: If protocol not supported
"""
async def get_server_metadata(
self,
base_url: Union[httpx.URL, str],
headers: Optional[Mapping[str, str]] = None,
timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
) -> Dict:
"""
Get server metadata.
Args:
base_url (str or httpx.URL): Base URL of inference server
headers (dict, optional): HTTP headers
timeout (float or httpx.Timeout, optional): Request timeout
Returns:
dict: Server metadata including name, version, extensions
Raises:
HTTPStatusError: For non-2xx response codes
UnsupportedProtocol: If protocol not supported
"""
async def list_models(
self,
base_url: Union[httpx.URL, str],
headers: Optional[Mapping[str, str]] = None,
timeout: Union[float, None, tuple, httpx.Timeout] = httpx.USE_CLIENT_DEFAULT
) -> List[str]:
"""
List all models.
Args:
base_url (str or httpx.URL): Base URL of inference server
headers (dict, optional): HTTP headers
timeout (float or httpx.Timeout, optional): Request timeout
Returns:
list: List of model names
Raises:
HTTPStatusError: For non-2xx response codes
UnsupportedProtocol: If protocol not supported
"""
async def close(self) -> None:
"""Close the client, transport and proxies"""
async def __aenter__(self):
"""Async context manager entry"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self.close()Configuration for REST client.
class RESTConfig:
"""
Configuration for InferenceRESTClient.
Args:
transport (httpx.AsyncBaseTransport, optional): Asynchronous transport class
protocol (str or PredictorProtocol): Protocol version ("v1" or "v2", default: "v1")
retries (int): Number of retry attempts (default: 3)
http2 (bool): Enable HTTP/2 support (default: False)
timeout (float, None, tuple, or httpx.Timeout): Request timeout (default: 60 seconds)
cert (str or tuple): SSL client certificate - path to file, (cert_file, key_file) tuple,
or (cert_file, key_file, password) tuple
verify (str, bool, or ssl.SSLContext): SSL verification - True (default CA bundle),
path to CA bundle file, ssl.SSLContext, or False (disable verification)
auth (httpx.Auth, optional): Authentication handler
verbose (bool): Enable verbose logging (default: False)
"""
def __init__(
self,
transport: httpx.AsyncBaseTransport = None,
protocol: Union[str, PredictorProtocol] = "v1",
retries: int = 3,
http2: bool = False,
timeout: Union[float, None, tuple, httpx.Timeout] = 60,
cert = None,
verify: Union[str, bool, ssl.SSLContext] = True,
auth = None,
verbose: bool = False
): ...
# Properties
@property
def transport(self) -> Optional[httpx.AsyncBaseTransport]:
"""Asynchronous transport instance"""
@property
def http2(self) -> bool:
"""HTTP/2 support enabled"""
@property
def cert(self):
"""SSL client certificate configuration"""
@property
def verify(self) -> Union[str, bool, ssl.SSLContext]:
"""SSL verification configuration"""
@property
def auth(self):
"""Authentication handler"""
@property
def verbose(self) -> bool:
"""Verbose logging enabled"""import asyncio
from kserve import InferenceGRPCClient, InferInput
import numpy as np
async def main():
# Create client
client = InferenceGRPCClient(url="localhost:8081")
# Prepare input data
input_data = InferInput(
name="input-0",
shape=[1, 4],
datatype="FP32",
data=[[5.1, 3.5, 1.4, 0.2]]
)
# Make inference request
response = await client.infer(
model_name="sklearn-iris",
inputs=[input_data]
)
# Extract output
output = response.outputs[0]
predictions = output.data
print(f"Predictions: {predictions}")
client.close()
asyncio.run(main())import asyncio
from kserve import InferenceGRPCClient, InferInput
import numpy as np
async def main():
client = InferenceGRPCClient(url="localhost:8081")
# Create NumPy array
data = np.array([[1, 2, 3, 4]], dtype=np.float32)
# Create input with NumPy data
input_tensor = InferInput(name="input-0", shape=list(data.shape), datatype="FP32")
input_tensor.set_data_from_numpy(data)
# Infer
response = await client.infer(
model_name="my-model",
inputs=[input_tensor]
)
# Convert output to NumPy
output = response.outputs[0]
result = output.as_numpy()
print(f"Result shape: {result.shape}")
print(f"Result: {result}")
client.close()
asyncio.run(main())import asyncio
from kserve import InferenceGRPCClient, InferInput
async def main():
# Read certificates
with open("/path/to/ca.pem", "rb") as f:
root_certs = f.read()
with open("/path/to/client-key.pem", "rb") as f:
private_key = f.read()
with open("/path/to/client-cert.pem", "rb") as f:
cert_chain = f.read()
# Create client with SSL
client = InferenceGRPCClient(
url="secure.example.com:8081",
use_ssl=True,
root_certificates=root_certs,
private_key=private_key,
certificate_chain=cert_chain
)
# Make request
input_data = InferInput(name="input", shape=[1, 4], datatype="FP32", data=[[1, 2, 3, 4]])
response = await client.infer(model_name="my-model", inputs=[input_data])
client.close()
asyncio.run(main())import asyncio
from kserve import InferenceGRPCClient
async def main():
client = InferenceGRPCClient(url="localhost:8081")
# Check server liveness
is_live = await client.is_server_live()
print(f"Server live: {is_live}")
# Check server readiness
is_ready = await client.is_server_ready()
print(f"Server ready: {is_ready}")
# Check model readiness
model_ready = await client.is_model_ready("sklearn-iris")
print(f"Model ready: {model_ready}")
# Get server metadata
server_meta = await client.get_server_metadata()
print(f"Server: {server_meta}")
# Get model metadata
model_meta = await client.get_model_metadata("sklearn-iris")
print(f"Model: {model_meta}")
client.close()
asyncio.run(main())from kserve import InferenceRESTClient
# Create client
client = InferenceRESTClient(url="http://localhost:8080")
# Make inference request (v2 protocol)
response = client.infer(
model_name="sklearn-iris",
data={
"inputs": [
{
"name": "input-0",
"shape": [1, 4],
"datatype": "FP32",
"data": [[5.1, 3.5, 1.4, 0.2]]
}
]
}
)
print(response)from kserve import InferenceRESTClient, RESTConfig
# Create client with v1 protocol
config = RESTConfig(protocol="v1")
client = InferenceRESTClient(url="http://localhost:8080", config=config)
# Make prediction (v1 protocol)
response = client.infer(
model_name="sklearn-iris",
data={
"instances": [
[5.1, 3.5, 1.4, 0.2]
]
}
)
print(response["predictions"])
# Make explanation request
explanation = client.explain(
model_name="sklearn-iris",
data={
"instances": [
[5.1, 3.5, 1.4, 0.2]
]
}
)
print(explanation)from kserve import InferenceRESTClient, RESTConfig
# Configure with timeout and retries
config = RESTConfig(
protocol="v2",
timeout=30,
retries=3
)
client = InferenceRESTClient(url="http://localhost:8080", config=config)
# Request with custom timeout
response = client.infer(
model_name="slow-model",
data={"inputs": [...]},
timeout=60 # Override default
)from kserve import InferenceRESTClient, RESTConfig
# Configure SSL
config = RESTConfig(
verify_ssl=True,
cert="/path/to/client-cert.pem",
key="/path/to/client-key.pem"
)
client = InferenceRESTClient(url="https://secure.example.com:8080", config=config)
response = client.infer(model_name="my-model", data={"inputs": [...]})from kserve import InferenceRESTClient
client = InferenceRESTClient(url="http://localhost:8080")
# Check server liveness
if client.is_server_live():
print("Server is live")
# Check server readiness
if client.is_server_ready():
print("Server is ready")
# Check model readiness
if client.is_model_ready("sklearn-iris"):
print("Model is ready")
# List all models
models = client.list_models()
print(f"Available models: {models}")
# Get metadata
server_meta = client.get_server_metadata()
model_meta = client.get_model_metadata("sklearn-iris")The gRPC client includes a default retry policy:
Customize retries:
# Disable retries
client = InferenceGRPCClient(url="localhost:8081", retries=0)
# Custom retry count
client = InferenceGRPCClient(url="localhost:8081", retries=5)Both clients support request timeouts:
# Set default timeout
client = InferenceGRPCClient(url="localhost:8081", timeout=120)
# Override per request
await client.infer(model_name="my-model", inputs=[...], timeout=60)Handle client errors:
import asyncio
from kserve import InferenceGRPCClient, InferInput
from grpc import RpcError
async def main():
client = InferenceGRPCClient(url="localhost:8081")
try:
input_data = InferInput(name="input", shape=[1, 4], datatype="FP32", data=[[1, 2, 3, 4]])
response = await client.infer(model_name="my-model", inputs=[input_data])
except RpcError as e:
print(f"gRPC error: {e.code()} - {e.details()}")
except Exception as e:
print(f"Error: {e}")
finally:
client.close()
asyncio.run(main())Handle models with multiple outputs:
import asyncio
from kserve import InferenceGRPCClient, InferInput
async def main():
client = InferenceGRPCClient(url="localhost:8081")
input_data = InferInput(name="input", shape=[1, 4], datatype="FP32", data=[[1, 2, 3, 4]])
response = await client.infer(model_name="multi-output-model", inputs=[input_data])
# Access multiple outputs
for output in response.outputs:
print(f"Output: {output.name}")
print(f"Shape: {output.shape}")
print(f"Data: {output.data}")
client.close()
asyncio.run(main())Pass custom headers to requests:
import asyncio
from kserve import InferenceGRPCClient, InferInput
async def main():
client = InferenceGRPCClient(url="localhost:8081")
input_data = InferInput(name="input", shape=[1, 4], datatype="FP32", data=[[1, 2, 3, 4]])
# Pass custom headers
response = await client.infer(
model_name="my-model",
inputs=[input_data],
headers={
"x-request-id": "12345",
"x-user-id": "user-123"
}
)
client.close()
asyncio.run(main())Pass additional parameters:
import asyncio
from kserve import InferenceGRPCClient, InferInput
async def main():
client = InferenceGRPCClient(url="localhost:8081")
input_data = InferInput(name="input", shape=[1, 4], datatype="FP32", data=[[1, 2, 3, 4]])
# Pass parameters
response = await client.infer(
model_name="my-model",
inputs=[input_data],
parameters={
"batch_size": 1,
"priority": "high"
}
)
client.close()
asyncio.run(main())Make generation requests for LLM models:
import asyncio
from kserve import InferenceGRPCClient, InferInput
async def main():
client = InferenceGRPCClient(url="localhost:8081")
# Text prompt
prompt = InferInput(
name="prompt",
shape=[1],
datatype="BYTES",
data=["What is machine learning?"]
)
# Generation parameters
response = await client.generate(
model_name="llama-7b",
inputs=[prompt],
parameters={
"max_tokens": 100,
"temperature": 0.7,
"top_p": 0.9
}
)
# Extract generated text
output = response.outputs[0]
generated_text = output.data[0]
print(f"Generated: {generated_text}")
client.close()
asyncio.run(main())Sentinel value for using client defaults:
USE_CLIENT_DEFAULT: objectUsed to distinguish between explicitly passing None vs using default values.