0
# Storage Integration
1
2
Unified storage interface supporting multiple cloud providers and local storage for model artifact management. The storage system abstracts different storage backends and provides seamless model downloading and uploading capabilities.
3
4
## Capabilities
5
6
### Storage Interface
7
8
Unified interface for accessing model artifacts from various storage backends.
9
10
```python { .api }
11
from kserve.storage import Storage
12
13
class Storage:
14
@staticmethod
15
def download(uri: str, dest: str = None) -> str:
16
"""
17
Download model artifacts from storage URI.
18
19
Args:
20
uri (str): Storage URI (s3://, gs://, azure://, pvc://, file://)
21
dest (str, optional): Local destination path
22
23
Returns:
24
str: Local path to downloaded artifacts
25
26
Raises:
27
StorageError: If download fails
28
"""
29
30
@staticmethod
31
def upload(src: str, uri: str) -> bool:
32
"""
33
Upload local artifacts to storage URI.
34
35
Args:
36
src (str): Local source path
37
uri (str): Destination storage URI
38
39
Returns:
40
bool: True if upload successful
41
42
Raises:
43
StorageError: If upload fails
44
"""
45
46
@staticmethod
47
def is_local(uri: str) -> bool:
48
"""
49
Check if URI points to local storage.
50
51
Args:
52
uri (str): Storage URI to check
53
54
Returns:
55
bool: True if local storage
56
"""
57
58
@staticmethod
59
def get_storage_spec(uri: str) -> Dict[str, Any]:
60
"""
61
Parse storage URI and return specification.
62
63
Args:
64
uri (str): Storage URI
65
66
Returns:
67
Dict[str, Any]: Storage specification with type, bucket, path, etc.
68
"""
69
```
70
71
### Storage URI Formats
72
73
Supported storage URI formats for different backends.
74
75
```python { .api }
76
# Amazon S3 URIs
77
S3_URI_FORMAT = "s3://bucket-name/path/to/model/"
78
79
# Google Cloud Storage URIs
80
GCS_URI_FORMAT = "gs://bucket-name/path/to/model/"
81
82
# Azure Blob Storage URIs
83
AZURE_URI_FORMAT = "azure://container-name/path/to/model/"
84
85
# Persistent Volume Claim URIs
86
PVC_URI_FORMAT = "pvc://pvc-name/path/to/model/"
87
88
# Local filesystem URIs
89
LOCAL_URI_FORMAT = "file:///path/to/model/"
90
91
# HTTP/HTTPS URIs
92
HTTP_URI_FORMAT = "https://example.com/model.tar.gz"
93
94
# HuggingFace Hub URIs
95
HF_URI_FORMAT = "hf://username/model-name"
96
```
97
98
### Storage Configuration
99
100
Configuration classes for different storage backends.
101
102
```python { .api }
103
class S3Config:
104
def __init__(self,
105
access_key_id: str,
106
secret_access_key: str,
107
region: str = "us-east-1",
108
endpoint_url: Optional[str] = None,
109
use_ssl: bool = True):
110
"""
111
S3 storage configuration.
112
113
Args:
114
access_key_id (str): AWS access key ID
115
secret_access_key (str): AWS secret access key
116
region (str): AWS region
117
endpoint_url (str, optional): Custom S3 endpoint
118
use_ssl (bool): Use SSL/TLS connection
119
"""
120
121
class GCSConfig:
122
def __init__(self,
123
service_account_key: Optional[str] = None,
124
project_id: Optional[str] = None):
125
"""
126
Google Cloud Storage configuration.
127
128
Args:
129
service_account_key (str, optional): Path to service account JSON
130
project_id (str, optional): GCP project ID
131
"""
132
133
class AzureConfig:
134
def __init__(self,
135
account_name: str,
136
account_key: Optional[str] = None,
137
sas_token: Optional[str] = None,
138
connection_string: Optional[str] = None):
139
"""
140
Azure Blob Storage configuration.
141
142
Args:
143
account_name (str): Azure storage account name
144
account_key (str, optional): Account access key
145
sas_token (str, optional): SAS token for access
146
connection_string (str, optional): Full connection string
147
"""
148
```
149
150
### Storage Operations
151
152
Low-level storage operations for specific backends.
153
154
```python { .api }
155
class StorageOperations:
156
@staticmethod
157
def list_objects(uri: str) -> List[str]:
158
"""
159
List objects in storage location.
160
161
Args:
162
uri (str): Storage URI
163
164
Returns:
165
List[str]: List of object paths
166
"""
167
168
@staticmethod
169
def get_object_metadata(uri: str) -> Dict[str, Any]:
170
"""
171
Get metadata for storage object.
172
173
Args:
174
uri (str): Object URI
175
176
Returns:
177
Dict[str, Any]: Object metadata (size, modified_time, etc.)
178
"""
179
180
@staticmethod
181
def delete_object(uri: str) -> bool:
182
"""
183
Delete object from storage.
184
185
Args:
186
uri (str): Object URI to delete
187
188
Returns:
189
bool: True if deletion successful
190
"""
191
192
@staticmethod
193
def copy_object(src_uri: str, dest_uri: str) -> bool:
194
"""
195
Copy object between storage locations.
196
197
Args:
198
src_uri (str): Source object URI
199
dest_uri (str): Destination object URI
200
201
Returns:
202
bool: True if copy successful
203
"""
204
```
205
206
## Usage Examples
207
208
### Basic Model Download
209
210
```python
211
from kserve.storage import Storage
212
import os
213
214
# Download model from S3
215
def download_model():
216
model_uri = "s3://ml-models/sklearn/iris-classifier/model.joblib"
217
local_path = Storage.download(model_uri, dest="/tmp/models/")
218
219
print(f"Model downloaded to: {local_path}")
220
221
# Verify download
222
if os.path.exists(local_path):
223
print("Model file exists locally")
224
return local_path
225
else:
226
raise FileNotFoundError("Model download failed")
227
228
# Use in model server
229
from kserve import Model
230
import joblib
231
232
class MyModel(Model):
233
def load(self):
234
# Download model automatically
235
model_path = Storage.download(
236
"s3://my-bucket/models/latest/model.joblib"
237
)
238
239
# Load the downloaded model
240
self.model = joblib.load(model_path)
241
self.ready = True
242
```
243
244
### Multi-Backend Storage
245
246
```python
247
from kserve.storage import Storage
248
249
def download_from_multiple_sources():
250
"""Download models from different storage backends."""
251
252
sources = [
253
"s3://aws-bucket/tensorflow-model/",
254
"gs://gcp-bucket/pytorch-model/",
255
"azure://azure-container/sklearn-model/",
256
"pvc://model-pvc/xgboost-model/",
257
"file:///local/models/lightgbm-model/"
258
]
259
260
local_paths = []
261
for uri in sources:
262
try:
263
path = Storage.download(uri)
264
local_paths.append(path)
265
print(f"Downloaded: {uri} -> {path}")
266
except Exception as e:
267
print(f"Failed to download {uri}: {e}")
268
269
return local_paths
270
```
271
272
### Conditional Model Loading
273
274
```python
275
from kserve.storage import Storage
276
import os
277
278
class ConditionalModel(Model):
279
def __init__(self, name: str, primary_uri: str, fallback_uri: str):
280
super().__init__(name)
281
self.primary_uri = primary_uri
282
self.fallback_uri = fallback_uri
283
284
def load(self):
285
"""Load model with fallback option."""
286
model_path = None
287
288
# Try primary source first
289
try:
290
model_path = Storage.download(self.primary_uri)
291
print(f"Loaded from primary source: {self.primary_uri}")
292
except Exception as e:
293
print(f"Primary source failed: {e}")
294
295
# Fallback to secondary source
296
try:
297
model_path = Storage.download(self.fallback_uri)
298
print(f"Loaded from fallback source: {self.fallback_uri}")
299
except Exception as e2:
300
raise RuntimeError(f"Both sources failed: {e}, {e2}")
301
302
# Load the model
303
self.model = self._load_model_file(model_path)
304
self.ready = True
305
306
def _load_model_file(self, path: str):
307
"""Load model based on file extension."""
308
if path.endswith('.joblib'):
309
import joblib
310
return joblib.load(path)
311
elif path.endswith('.pkl'):
312
import pickle
313
with open(path, 'rb') as f:
314
return pickle.load(f)
315
else:
316
raise ValueError(f"Unsupported model format: {path}")
317
```
318
319
### Model Versioning and Caching
320
321
```python
322
from kserve.storage import Storage
323
import hashlib
324
import os
325
import json
326
327
class VersionedModelLoader:
328
def __init__(self, cache_dir: str = "/tmp/model_cache"):
329
self.cache_dir = cache_dir
330
os.makedirs(cache_dir, exist_ok=True)
331
332
def get_model_hash(self, uri: str) -> str:
333
"""Generate hash for model URI."""
334
return hashlib.md5(uri.encode()).hexdigest()
335
336
def is_cached(self, uri: str) -> bool:
337
"""Check if model is already cached."""
338
model_hash = self.get_model_hash(uri)
339
cache_path = os.path.join(self.cache_dir, model_hash)
340
return os.path.exists(cache_path)
341
342
def load_cached_model(self, uri: str) -> str:
343
"""Load model from cache."""
344
model_hash = self.get_model_hash(uri)
345
cache_path = os.path.join(self.cache_dir, model_hash)
346
347
if self.is_cached(uri):
348
print(f"Loading cached model: {cache_path}")
349
return cache_path
350
else:
351
# Download and cache
352
print(f"Downloading and caching model: {uri}")
353
downloaded_path = Storage.download(uri, dest=cache_path)
354
355
# Store metadata
356
metadata = {
357
"uri": uri,
358
"downloaded_at": str(datetime.now()),
359
"local_path": downloaded_path
360
}
361
362
with open(f"{cache_path}.meta", 'w') as f:
363
json.dump(metadata, f)
364
365
return downloaded_path
366
367
# Usage in model server
368
class CachedModel(Model):
369
def __init__(self, name: str, model_uri: str):
370
super().__init__(name)
371
self.model_uri = model_uri
372
self.loader = VersionedModelLoader()
373
374
def load(self):
375
model_path = self.loader.load_cached_model(self.model_uri)
376
self.model = joblib.load(model_path)
377
self.ready = True
378
```
379
380
### Storage with Authentication
381
382
```python
383
from kserve.storage import Storage, S3Config, GCSConfig
384
import os
385
386
def setup_authenticated_storage():
387
"""Set up storage with authentication."""
388
389
# Configure S3 with credentials
390
s3_config = S3Config(
391
access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
392
secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
393
region=os.getenv("AWS_DEFAULT_REGION", "us-east-1")
394
)
395
396
# Configure GCS with service account
397
gcs_config = GCSConfig(
398
service_account_key="/path/to/service-account.json",
399
project_id=os.getenv("GCP_PROJECT_ID")
400
)
401
402
# Download from authenticated sources
403
s3_model = Storage.download(
404
"s3://private-bucket/confidential-model/",
405
config=s3_config
406
)
407
408
gcs_model = Storage.download(
409
"gs://private-bucket/secure-model/",
410
config=gcs_config
411
)
412
413
return s3_model, gcs_model
414
```
415
416
### Batch Model Management
417
418
```python
419
from kserve.storage import Storage, StorageOperations
420
from concurrent.futures import ThreadPoolExecutor
421
import threading
422
423
class BatchModelManager:
424
def __init__(self, max_workers: int = 4):
425
self.max_workers = max_workers
426
427
def download_models_batch(self, model_uris: List[str]) -> Dict[str, str]:
428
"""Download multiple models concurrently."""
429
430
def download_single(uri):
431
try:
432
path = Storage.download(uri)
433
return uri, path
434
except Exception as e:
435
return uri, f"Error: {e}"
436
437
results = {}
438
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
439
futures = {executor.submit(download_single, uri): uri for uri in model_uris}
440
441
for future in futures:
442
uri, result = future.result()
443
results[uri] = result
444
445
return results
446
447
def sync_model_versions(self, base_uri: str, local_dir: str):
448
"""Sync all model versions from storage."""
449
450
# List all objects in storage
451
objects = StorageOperations.list_objects(base_uri)
452
453
# Filter for model files
454
model_files = [obj for obj in objects if obj.endswith(('.joblib', '.pkl', '.pt'))]
455
456
# Download each model version
457
for model_file in model_files:
458
full_uri = f"{base_uri.rstrip('/')}/{model_file}"
459
local_path = os.path.join(local_dir, model_file)
460
461
try:
462
Storage.download(full_uri, dest=local_path)
463
print(f"Synced: {model_file}")
464
except Exception as e:
465
print(f"Failed to sync {model_file}: {e}")
466
467
# Usage
468
manager = BatchModelManager()
469
470
# Download multiple models
471
model_uris = [
472
"s3://models/v1/sklearn-model.joblib",
473
"s3://models/v2/xgboost-model.bst",
474
"gs://models/v3/pytorch-model.pt"
475
]
476
477
results = manager.download_models_batch(model_uris)
478
for uri, path in results.items():
479
print(f"{uri} -> {path}")
480
```
481
482
### Storage Error Handling
483
484
```python
485
from kserve.storage import Storage
486
from kserve.errors import StorageError
487
import logging
488
import time
489
490
class RobustStorageHandler:
491
def __init__(self, max_retries: int = 3, retry_delay: float = 1.0):
492
self.max_retries = max_retries
493
self.retry_delay = retry_delay
494
self.logger = logging.getLogger(__name__)
495
496
def download_with_retry(self, uri: str, dest: str = None) -> str:
497
"""Download with automatic retry logic."""
498
499
for attempt in range(self.max_retries):
500
try:
501
path = Storage.download(uri, dest=dest)
502
self.logger.info(f"Successfully downloaded {uri} on attempt {attempt + 1}")
503
return path
504
505
except StorageError as e:
506
self.logger.warning(f"Attempt {attempt + 1} failed for {uri}: {e}")
507
508
if attempt < self.max_retries - 1:
509
time.sleep(self.retry_delay * (2 ** attempt)) # Exponential backoff
510
else:
511
self.logger.error(f"All {self.max_retries} attempts failed for {uri}")
512
raise
513
514
except Exception as e:
515
self.logger.error(f"Unexpected error downloading {uri}: {e}")
516
raise
517
518
def validate_storage_access(self, uri: str) -> bool:
519
"""Validate that storage URI is accessible."""
520
try:
521
metadata = StorageOperations.get_object_metadata(uri)
522
return metadata is not None
523
except Exception:
524
return False
525
526
# Usage in model server
527
class RobustModel(Model):
528
def __init__(self, name: str, model_uri: str):
529
super().__init__(name)
530
self.model_uri = model_uri
531
self.storage_handler = RobustStorageHandler()
532
533
def load(self):
534
# Validate storage access first
535
if not self.storage_handler.validate_storage_access(self.model_uri):
536
raise RuntimeError(f"Cannot access storage: {self.model_uri}")
537
538
# Download with retry
539
model_path = self.storage_handler.download_with_retry(self.model_uri)
540
541
# Load model
542
self.model = joblib.load(model_path)
543
self.ready = True
544
```
545
546
## Storage Backend Support
547
548
### Supported Protocols
549
550
```python { .api }
551
SUPPORTED_PROTOCOLS = {
552
"s3": "Amazon S3 and S3-compatible storage",
553
"gs": "Google Cloud Storage",
554
"azure": "Azure Blob Storage",
555
"pvc": "Kubernetes Persistent Volume Claims",
556
"file": "Local filesystem",
557
"http": "HTTP/HTTPS downloads",
558
"https": "Secure HTTP downloads",
559
"hf": "HuggingFace Hub models"
560
}
561
```
562
563
### Authentication Methods
564
565
```python { .api }
566
AUTHENTICATION_METHODS = {
567
"s3": ["access_key", "iam_role", "instance_profile"],
568
"gs": ["service_account", "default_credentials", "workload_identity"],
569
"azure": ["account_key", "sas_token", "managed_identity"],
570
"hf": ["access_token", "login"]
571
}
572
```
573
574
## Exception Classes
575
576
Storage-related exceptions that may be raised during storage operations.
577
578
```python { .api }
579
class ModelMissingError(Exception):
580
def __init__(self, path: str):
581
"""
582
Exception raised when model file is missing from storage.
583
584
Args:
585
path (str): Path to missing model file
586
"""
587
588
class ModelNotFound(Exception):
589
def __init__(self, model_name: str = None):
590
"""
591
Exception raised when requested model does not exist.
592
593
Args:
594
model_name (str, optional): Name of missing model
595
"""
596
597
class InferenceError(RuntimeError):
598
def __init__(self, reason: str, status: str = None, debug_details: str = None):
599
"""
600
Exception raised during inference operations.
601
602
Args:
603
reason (str): Error reason
604
status (str, optional): Error status code
605
debug_details (str, optional): Additional debug information
606
"""
607
608
class InvalidInput(ValueError):
609
def __init__(self, reason: str):
610
"""
611
Exception raised for invalid input arguments.
612
613
Args:
614
reason (str): Validation error reason
615
"""
616
```
617
618
## Types
619
620
```python { .api }
621
from typing import Dict, Any, List, Optional, Union
622
623
StorageURI = str
624
LocalPath = str
625
StorageConfig = Union[S3Config, GCSConfig, AzureConfig]
626
StorageMetadata = Dict[str, Any]
627
ObjectList = List[str]
628
```