0
# Model Serving Framework
1
2
Core classes and interfaces for implementing custom model servers with lifecycle management, health checking, and protocol support. This framework provides the foundation for building production-ready ML serving applications.
3
4
## Capabilities
5
6
### Model Base Class
7
8
Abstract base class for implementing custom model serving logic with standardized lifecycle management and protocol support.
9
10
```python { .api }
11
class Model:
12
def __init__(self,
13
name: str,
14
predictor_config: Optional[PredictorConfig] = None,
15
return_response_headers: bool = False):
16
"""
17
Initialize a model instance.
18
19
Args:
20
name (str): Unique name for the model
21
predictor_config (PredictorConfig, optional): Configuration for predictor HTTP calls
22
return_response_headers (bool): Whether to return response headers
23
"""
24
25
def load(self) -> bool:
26
"""
27
Load model from storage. Called once during server startup.
28
Override this method to implement model loading logic.
29
30
Returns:
31
bool: True if model is ready, False otherwise
32
"""
33
34
def start(self):
35
"""
36
Start handler can be overridden to perform model setup.
37
"""
38
39
def stop(self):
40
"""
41
Stop handler can be overridden to perform model teardown.
42
"""
43
44
async def start_engine(self):
45
"""
46
Start engine for models that require an engine before use.
47
"""
48
49
def stop_engine(self):
50
"""
51
Stop engine handler for engine shutdown.
52
"""
53
54
async def healthy(self) -> bool:
55
"""
56
Check the health of this model. By default returns self.ready.
57
58
Returns:
59
bool: True if healthy, False otherwise
60
"""
61
62
async def predict(self,
63
payload: Union[Dict, InferRequest, ModelInferRequest],
64
headers: Dict[str, str] = None,
65
response_headers: Dict[str, str] = None) -> Union[Dict, InferResponse, AsyncIterator[Any]]:
66
"""
67
Main prediction method. By default makes call to predictor.
68
69
Args:
70
payload: Model inputs passed from preprocess handler
71
headers: Request headers
72
response_headers: Response headers
73
74
Returns:
75
Inference result or Response from the predictor
76
77
Raises:
78
HTTPStatusError: When getting back an error response from the predictor
79
"""
80
81
async def preprocess(self,
82
payload: Union[Dict, InferRequest],
83
headers: Dict[str, str] = None) -> Union[Dict, InferRequest]:
84
"""
85
Preprocess handler can be overridden for data or feature transformation.
86
87
Args:
88
payload: Payload of the request (Dict for v1, InferRequest for v2)
89
headers: Request headers
90
91
Returns:
92
Dict or InferRequest: Transformed data for predictor or tensors for predict handler
93
"""
94
95
async def postprocess(self,
96
result: Union[Dict, InferResponse],
97
headers: Dict[str, str] = None,
98
response_headers: Dict[str, str] = None) -> Union[Dict, InferResponse]:
99
"""
100
Postprocess handler can be overridden for inference result transformation.
101
102
Args:
103
result: Inference result from predict handler (Dict for v1, InferResponse for v2)
104
headers: Request headers
105
response_headers: Response headers
106
107
Returns:
108
Dict or InferResponse: Post-processed result to return to client
109
"""
110
111
async def explain(self, payload: Dict, headers: Dict[str, str] = None) -> Dict:
112
"""
113
Explain handler can be overridden to implement model explanation.
114
115
Args:
116
payload: Explainer model inputs passed from preprocess handler
117
headers: Request headers
118
119
Returns:
120
Dict: Explanation for the inference result
121
122
Raises:
123
HTTPStatusError: When getting back an error response from the explainer
124
"""
125
126
def get_input_types(self) -> List[Dict]:
127
"""
128
Get expected input types for the model.
129
130
Returns:
131
List[Dict]: List of input type specifications
132
"""
133
134
def get_output_types(self) -> List[Dict]:
135
"""
136
Get output types produced by the model.
137
138
Returns:
139
List[Dict]: List of output type specifications
140
"""
141
142
# Properties
143
name: str # Model name
144
ready: bool # Whether model is ready to serve
145
protocol: str # Protocol version (v1 or v2)
146
predictor_host: str # Host for predictor service
147
timeout: int # Request timeout in seconds
148
use_ssl: bool # Whether to use SSL for requests
149
retries: int # Number of retries for failed requests
150
```
151
152
### Model Server
153
154
Production-ready HTTP and gRPC server for hosting one or more models with multi-processing, health checks, and metrics collection.
155
156
```python { .api }
157
class ModelServer:
158
def __init__(self,
159
http_port: int = 8080,
160
grpc_port: int = 8081,
161
workers: int = 1,
162
max_threads: int = None,
163
max_asyncio_workers: int = None):
164
"""
165
Initialize model server.
166
167
Args:
168
http_port (int): HTTP server port (default: 8080)
169
grpc_port (int): gRPC server port (default: 8081)
170
workers (int): Number of worker processes (default: 1)
171
max_threads (int): Maximum threads per worker
172
max_asyncio_workers (int): Maximum async workers
173
"""
174
175
def start(self, models: List[Model]):
176
"""
177
Start the server with given models.
178
179
Args:
180
models (List[Model]): List of models to serve
181
"""
182
183
def register_model(self, model: Model):
184
"""
185
Register a model with the server.
186
187
Args:
188
model (Model): Model instance to register
189
"""
190
191
def register_exception_handler(self, exception_type, handler):
192
"""
193
Register custom exception handler.
194
195
Args:
196
exception_type: Exception type to handle
197
handler: Handler function
198
"""
199
200
# Properties
201
model_registry: ModelRepository # Registry of loaded models
202
dataplane: str # Data plane protocol
203
protocol_version: str # Protocol version
204
```
205
206
### Model Repository
207
208
Registry for managing multiple models with dynamic loading, unloading, and health monitoring capabilities.
209
210
```python { .api }
211
class ModelRepository:
212
def __init__(self):
213
"""Initialize empty model repository."""
214
215
def get_model(self, name: str) -> Model:
216
"""
217
Get model by name.
218
219
Args:
220
name (str): Model name
221
222
Returns:
223
Model: Model instance
224
225
Raises:
226
KeyError: If model not found
227
"""
228
229
def get_models(self) -> Dict[str, Model]:
230
"""
231
Get all registered models.
232
233
Returns:
234
Dict[str, Model]: Dictionary of model name to model instance
235
"""
236
237
def update(self, model: Model):
238
"""
239
Update or add model to repository.
240
241
Args:
242
model (Model): Model instance to update/add
243
"""
244
245
def load(self, name: str) -> bool:
246
"""
247
Load model by name.
248
249
Args:
250
name (str): Model name to load
251
252
Returns:
253
bool: True if loaded successfully
254
"""
255
256
def unload(self, name: str) -> bool:
257
"""
258
Unload model by name.
259
260
Args:
261
name (str): Model name to unload
262
263
Returns:
264
bool: True if unloaded successfully
265
"""
266
267
def is_model_ready(self, name: str) -> bool:
268
"""
269
Check if model is ready to serve.
270
271
Args:
272
name (str): Model name
273
274
Returns:
275
bool: True if model is ready
276
"""
277
```
278
279
## Usage Examples
280
281
### Basic Custom Model
282
283
```python
284
from kserve import Model, ModelServer
285
import joblib
286
import numpy as np
287
288
class SkLearnModel(Model):
289
def __init__(self, name: str, model_path: str):
290
super().__init__(name)
291
self.model_path = model_path
292
self.model = None
293
294
def load(self):
295
self.model = joblib.load(self.model_path)
296
self.ready = True
297
298
async def predict(self, payload):
299
# Extract data from payload
300
instances = payload["instances"]
301
302
# Make predictions
303
predictions = self.model.predict(instances)
304
305
return {"predictions": predictions.tolist()}
306
307
# Start server
308
if __name__ == "__main__":
309
model = SkLearnModel("iris-classifier", "model.joblib")
310
ModelServer().start([model])
311
```
312
313
### Advanced Model with Preprocessing
314
315
```python
316
from kserve import Model
317
import pandas as pd
318
import numpy as np
319
320
class AdvancedModel(Model):
321
def __init__(self, name: str):
322
super().__init__(name)
323
self.scaler = None
324
self.model = None
325
326
def load(self):
327
# Load model and preprocessor
328
self.model = load_model("model.pkl")
329
self.scaler = load_scaler("scaler.pkl")
330
self.ready = True
331
332
async def preprocess(self, payload):
333
# Convert to DataFrame
334
df = pd.DataFrame(payload["instances"])
335
336
# Apply scaling
337
scaled_data = self.scaler.transform(df)
338
339
return scaled_data
340
341
async def predict(self, payload):
342
# Preprocess
343
preprocessed = await self.preprocess(payload)
344
345
# Predict
346
predictions = self.model.predict(preprocessed)
347
348
# Postprocess
349
return await self.postprocess(predictions)
350
351
async def postprocess(self, predictions):
352
return {
353
"predictions": predictions.tolist(),
354
"model_name": self.name,
355
"version": "1.0"
356
}
357
```
358
359
### Multi-Model Server
360
361
```python
362
from kserve import ModelServer
363
364
# Create multiple models
365
model1 = MyModel("model-v1")
366
model2 = MyModel("model-v2")
367
model3 = MyModel("champion-model")
368
369
# Start server with multiple models
370
server = ModelServer(
371
http_port=8080,
372
grpc_port=8081,
373
workers=4
374
)
375
server.start([model1, model2, model3])
376
```
377
378
### Predictor Configuration
379
380
Configuration class for HTTP calls to predictors.
381
382
```python { .api }
383
class PredictorConfig:
384
def __init__(self,
385
predictor_host: str,
386
predictor_protocol: str = "v1",
387
predictor_use_ssl: bool = False,
388
predictor_request_timeout_seconds: int = 600,
389
predictor_request_retries: int = 0,
390
predictor_health_check: bool = False):
391
"""
392
Configuration for HTTP calls to the predictor.
393
394
Args:
395
predictor_host (str): Host name of the predictor
396
predictor_protocol (str): Inference protocol ("v1", "v2", "grpc-v2")
397
predictor_use_ssl (bool): Enable SSL for HTTP connection
398
predictor_request_timeout_seconds (int): Request timeout (default: 600)
399
predictor_request_retries (int): Number of retries (default: 0)
400
predictor_health_check (bool): Enable predictor health check
401
"""
402
403
@property
404
def predictor_base_url(self) -> str:
405
"""
406
Get the base URL for the predictor.
407
408
Returns:
409
str: Base URL for the predictor
410
"""
411
```
412
413
## Types
414
415
```python { .api }
416
from typing import List, Dict, Any, Optional, Union, AsyncIterator
417
from cloudevents.http import CloudEvent
418
from .protocol.infer_type import InferRequest, InferResponse
419
from .protocol.grpc.grpc_predict_v2_pb2 import ModelInferRequest
420
421
ModelName = str
422
ModelDict = Dict[str, Model]
423
Payload = Dict[str, Any]
424
InferReturnValueTypes = Union[Dict, InferResponse, List[str]]
425
InferReturnType = Union[InferReturnValueTypes, Awaitable[InferReturnValueTypes]]
426
```