or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

framework-servers.mdindex.mdinference-clients.mdkubernetes-client.mdmodel-serving.mdprotocol.mdresource-models.mdstorage.md

storage.mddocs/

0

# Storage Integration

1

2

Unified storage interface supporting multiple cloud providers and local storage for model artifact management. The storage system abstracts different storage backends and provides seamless model downloading and uploading capabilities.

3

4

## Capabilities

5

6

### Storage Interface

7

8

Unified interface for accessing model artifacts from various storage backends.

9

10

```python { .api }

11

from kserve.storage import Storage

12

13

class Storage:

14

@staticmethod

15

def download(uri: str, dest: str = None) -> str:

16

"""

17

Download model artifacts from storage URI.

18

19

Args:

20

uri (str): Storage URI (s3://, gs://, azure://, pvc://, file://)

21

dest (str, optional): Local destination path

22

23

Returns:

24

str: Local path to downloaded artifacts

25

26

Raises:

27

StorageError: If download fails

28

"""

29

30

@staticmethod

31

def upload(src: str, uri: str) -> bool:

32

"""

33

Upload local artifacts to storage URI.

34

35

Args:

36

src (str): Local source path

37

uri (str): Destination storage URI

38

39

Returns:

40

bool: True if upload successful

41

42

Raises:

43

StorageError: If upload fails

44

"""

45

46

@staticmethod

47

def is_local(uri: str) -> bool:

48

"""

49

Check if URI points to local storage.

50

51

Args:

52

uri (str): Storage URI to check

53

54

Returns:

55

bool: True if local storage

56

"""

57

58

@staticmethod

59

def get_storage_spec(uri: str) -> Dict[str, Any]:

60

"""

61

Parse storage URI and return specification.

62

63

Args:

64

uri (str): Storage URI

65

66

Returns:

67

Dict[str, Any]: Storage specification with type, bucket, path, etc.

68

"""

69

```

70

71

### Storage URI Formats

72

73

Supported storage URI formats for different backends.

74

75

```python { .api }

76

# Amazon S3 URIs

77

S3_URI_FORMAT = "s3://bucket-name/path/to/model/"

78

79

# Google Cloud Storage URIs

80

GCS_URI_FORMAT = "gs://bucket-name/path/to/model/"

81

82

# Azure Blob Storage URIs

83

AZURE_URI_FORMAT = "azure://container-name/path/to/model/"

84

85

# Persistent Volume Claim URIs

86

PVC_URI_FORMAT = "pvc://pvc-name/path/to/model/"

87

88

# Local filesystem URIs

89

LOCAL_URI_FORMAT = "file:///path/to/model/"

90

91

# HTTP/HTTPS URIs

92

HTTP_URI_FORMAT = "https://example.com/model.tar.gz"

93

94

# HuggingFace Hub URIs

95

HF_URI_FORMAT = "hf://username/model-name"

96

```

97

98

### Storage Configuration

99

100

Configuration classes for different storage backends.

101

102

```python { .api }

103

class S3Config:

104

def __init__(self,

105

access_key_id: str,

106

secret_access_key: str,

107

region: str = "us-east-1",

108

endpoint_url: Optional[str] = None,

109

use_ssl: bool = True):

110

"""

111

S3 storage configuration.

112

113

Args:

114

access_key_id (str): AWS access key ID

115

secret_access_key (str): AWS secret access key

116

region (str): AWS region

117

endpoint_url (str, optional): Custom S3 endpoint

118

use_ssl (bool): Use SSL/TLS connection

119

"""

120

121

class GCSConfig:

122

def __init__(self,

123

service_account_key: Optional[str] = None,

124

project_id: Optional[str] = None):

125

"""

126

Google Cloud Storage configuration.

127

128

Args:

129

service_account_key (str, optional): Path to service account JSON

130

project_id (str, optional): GCP project ID

131

"""

132

133

class AzureConfig:

134

def __init__(self,

135

account_name: str,

136

account_key: Optional[str] = None,

137

sas_token: Optional[str] = None,

138

connection_string: Optional[str] = None):

139

"""

140

Azure Blob Storage configuration.

141

142

Args:

143

account_name (str): Azure storage account name

144

account_key (str, optional): Account access key

145

sas_token (str, optional): SAS token for access

146

connection_string (str, optional): Full connection string

147

"""

148

```

149

150

### Storage Operations

151

152

Low-level storage operations for specific backends.

153

154

```python { .api }

155

class StorageOperations:

156

@staticmethod

157

def list_objects(uri: str) -> List[str]:

158

"""

159

List objects in storage location.

160

161

Args:

162

uri (str): Storage URI

163

164

Returns:

165

List[str]: List of object paths

166

"""

167

168

@staticmethod

169

def get_object_metadata(uri: str) -> Dict[str, Any]:

170

"""

171

Get metadata for storage object.

172

173

Args:

174

uri (str): Object URI

175

176

Returns:

177

Dict[str, Any]: Object metadata (size, modified_time, etc.)

178

"""

179

180

@staticmethod

181

def delete_object(uri: str) -> bool:

182

"""

183

Delete object from storage.

184

185

Args:

186

uri (str): Object URI to delete

187

188

Returns:

189

bool: True if deletion successful

190

"""

191

192

@staticmethod

193

def copy_object(src_uri: str, dest_uri: str) -> bool:

194

"""

195

Copy object between storage locations.

196

197

Args:

198

src_uri (str): Source object URI

199

dest_uri (str): Destination object URI

200

201

Returns:

202

bool: True if copy successful

203

"""

204

```

205

206

## Usage Examples

207

208

### Basic Model Download

209

210

```python

211

from kserve.storage import Storage

212

import os

213

214

# Download model from S3

215

def download_model():

216

model_uri = "s3://ml-models/sklearn/iris-classifier/model.joblib"

217

local_path = Storage.download(model_uri, dest="/tmp/models/")

218

219

print(f"Model downloaded to: {local_path}")

220

221

# Verify download

222

if os.path.exists(local_path):

223

print("Model file exists locally")

224

return local_path

225

else:

226

raise FileNotFoundError("Model download failed")

227

228

# Use in model server

229

from kserve import Model

230

import joblib

231

232

class MyModel(Model):

233

def load(self):

234

# Download model automatically

235

model_path = Storage.download(

236

"s3://my-bucket/models/latest/model.joblib"

237

)

238

239

# Load the downloaded model

240

self.model = joblib.load(model_path)

241

self.ready = True

242

```

243

244

### Multi-Backend Storage

245

246

```python

247

from kserve.storage import Storage

248

249

def download_from_multiple_sources():

250

"""Download models from different storage backends."""

251

252

sources = [

253

"s3://aws-bucket/tensorflow-model/",

254

"gs://gcp-bucket/pytorch-model/",

255

"azure://azure-container/sklearn-model/",

256

"pvc://model-pvc/xgboost-model/",

257

"file:///local/models/lightgbm-model/"

258

]

259

260

local_paths = []

261

for uri in sources:

262

try:

263

path = Storage.download(uri)

264

local_paths.append(path)

265

print(f"Downloaded: {uri} -> {path}")

266

except Exception as e:

267

print(f"Failed to download {uri}: {e}")

268

269

return local_paths

270

```

271

272

### Conditional Model Loading

273

274

```python

275

from kserve.storage import Storage

276

import os

277

278

class ConditionalModel(Model):

279

def __init__(self, name: str, primary_uri: str, fallback_uri: str):

280

super().__init__(name)

281

self.primary_uri = primary_uri

282

self.fallback_uri = fallback_uri

283

284

def load(self):

285

"""Load model with fallback option."""

286

model_path = None

287

288

# Try primary source first

289

try:

290

model_path = Storage.download(self.primary_uri)

291

print(f"Loaded from primary source: {self.primary_uri}")

292

except Exception as e:

293

print(f"Primary source failed: {e}")

294

295

# Fallback to secondary source

296

try:

297

model_path = Storage.download(self.fallback_uri)

298

print(f"Loaded from fallback source: {self.fallback_uri}")

299

except Exception as e2:

300

raise RuntimeError(f"Both sources failed: {e}, {e2}")

301

302

# Load the model

303

self.model = self._load_model_file(model_path)

304

self.ready = True

305

306

def _load_model_file(self, path: str):

307

"""Load model based on file extension."""

308

if path.endswith('.joblib'):

309

import joblib

310

return joblib.load(path)

311

elif path.endswith('.pkl'):

312

import pickle

313

with open(path, 'rb') as f:

314

return pickle.load(f)

315

else:

316

raise ValueError(f"Unsupported model format: {path}")

317

```

318

319

### Model Versioning and Caching

320

321

```python

322

from kserve.storage import Storage

323

import hashlib

324

import os

325

import json

326

327

class VersionedModelLoader:

328

def __init__(self, cache_dir: str = "/tmp/model_cache"):

329

self.cache_dir = cache_dir

330

os.makedirs(cache_dir, exist_ok=True)

331

332

def get_model_hash(self, uri: str) -> str:

333

"""Generate hash for model URI."""

334

return hashlib.md5(uri.encode()).hexdigest()

335

336

def is_cached(self, uri: str) -> bool:

337

"""Check if model is already cached."""

338

model_hash = self.get_model_hash(uri)

339

cache_path = os.path.join(self.cache_dir, model_hash)

340

return os.path.exists(cache_path)

341

342

def load_cached_model(self, uri: str) -> str:

343

"""Load model from cache."""

344

model_hash = self.get_model_hash(uri)

345

cache_path = os.path.join(self.cache_dir, model_hash)

346

347

if self.is_cached(uri):

348

print(f"Loading cached model: {cache_path}")

349

return cache_path

350

else:

351

# Download and cache

352

print(f"Downloading and caching model: {uri}")

353

downloaded_path = Storage.download(uri, dest=cache_path)

354

355

# Store metadata

356

metadata = {

357

"uri": uri,

358

"downloaded_at": str(datetime.now()),

359

"local_path": downloaded_path

360

}

361

362

with open(f"{cache_path}.meta", 'w') as f:

363

json.dump(metadata, f)

364

365

return downloaded_path

366

367

# Usage in model server

368

class CachedModel(Model):

369

def __init__(self, name: str, model_uri: str):

370

super().__init__(name)

371

self.model_uri = model_uri

372

self.loader = VersionedModelLoader()

373

374

def load(self):

375

model_path = self.loader.load_cached_model(self.model_uri)

376

self.model = joblib.load(model_path)

377

self.ready = True

378

```

379

380

### Storage with Authentication

381

382

```python

383

from kserve.storage import Storage, S3Config, GCSConfig

384

import os

385

386

def setup_authenticated_storage():

387

"""Set up storage with authentication."""

388

389

# Configure S3 with credentials

390

s3_config = S3Config(

391

access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),

392

secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),

393

region=os.getenv("AWS_DEFAULT_REGION", "us-east-1")

394

)

395

396

# Configure GCS with service account

397

gcs_config = GCSConfig(

398

service_account_key="/path/to/service-account.json",

399

project_id=os.getenv("GCP_PROJECT_ID")

400

)

401

402

# Download from authenticated sources

403

s3_model = Storage.download(

404

"s3://private-bucket/confidential-model/",

405

config=s3_config

406

)

407

408

gcs_model = Storage.download(

409

"gs://private-bucket/secure-model/",

410

config=gcs_config

411

)

412

413

return s3_model, gcs_model

414

```

415

416

### Batch Model Management

417

418

```python

419

from kserve.storage import Storage, StorageOperations

420

from concurrent.futures import ThreadPoolExecutor

421

import threading

422

423

class BatchModelManager:

424

def __init__(self, max_workers: int = 4):

425

self.max_workers = max_workers

426

427

def download_models_batch(self, model_uris: List[str]) -> Dict[str, str]:

428

"""Download multiple models concurrently."""

429

430

def download_single(uri):

431

try:

432

path = Storage.download(uri)

433

return uri, path

434

except Exception as e:

435

return uri, f"Error: {e}"

436

437

results = {}

438

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:

439

futures = {executor.submit(download_single, uri): uri for uri in model_uris}

440

441

for future in futures:

442

uri, result = future.result()

443

results[uri] = result

444

445

return results

446

447

def sync_model_versions(self, base_uri: str, local_dir: str):

448

"""Sync all model versions from storage."""

449

450

# List all objects in storage

451

objects = StorageOperations.list_objects(base_uri)

452

453

# Filter for model files

454

model_files = [obj for obj in objects if obj.endswith(('.joblib', '.pkl', '.pt'))]

455

456

# Download each model version

457

for model_file in model_files:

458

full_uri = f"{base_uri.rstrip('/')}/{model_file}"

459

local_path = os.path.join(local_dir, model_file)

460

461

try:

462

Storage.download(full_uri, dest=local_path)

463

print(f"Synced: {model_file}")

464

except Exception as e:

465

print(f"Failed to sync {model_file}: {e}")

466

467

# Usage

468

manager = BatchModelManager()

469

470

# Download multiple models

471

model_uris = [

472

"s3://models/v1/sklearn-model.joblib",

473

"s3://models/v2/xgboost-model.bst",

474

"gs://models/v3/pytorch-model.pt"

475

]

476

477

results = manager.download_models_batch(model_uris)

478

for uri, path in results.items():

479

print(f"{uri} -> {path}")

480

```

481

482

### Storage Error Handling

483

484

```python

485

from kserve.storage import Storage

486

from kserve.errors import StorageError

487

import logging

488

import time

489

490

class RobustStorageHandler:

491

def __init__(self, max_retries: int = 3, retry_delay: float = 1.0):

492

self.max_retries = max_retries

493

self.retry_delay = retry_delay

494

self.logger = logging.getLogger(__name__)

495

496

def download_with_retry(self, uri: str, dest: str = None) -> str:

497

"""Download with automatic retry logic."""

498

499

for attempt in range(self.max_retries):

500

try:

501

path = Storage.download(uri, dest=dest)

502

self.logger.info(f"Successfully downloaded {uri} on attempt {attempt + 1}")

503

return path

504

505

except StorageError as e:

506

self.logger.warning(f"Attempt {attempt + 1} failed for {uri}: {e}")

507

508

if attempt < self.max_retries - 1:

509

time.sleep(self.retry_delay * (2 ** attempt)) # Exponential backoff

510

else:

511

self.logger.error(f"All {self.max_retries} attempts failed for {uri}")

512

raise

513

514

except Exception as e:

515

self.logger.error(f"Unexpected error downloading {uri}: {e}")

516

raise

517

518

def validate_storage_access(self, uri: str) -> bool:

519

"""Validate that storage URI is accessible."""

520

try:

521

metadata = StorageOperations.get_object_metadata(uri)

522

return metadata is not None

523

except Exception:

524

return False

525

526

# Usage in model server

527

class RobustModel(Model):

528

def __init__(self, name: str, model_uri: str):

529

super().__init__(name)

530

self.model_uri = model_uri

531

self.storage_handler = RobustStorageHandler()

532

533

def load(self):

534

# Validate storage access first

535

if not self.storage_handler.validate_storage_access(self.model_uri):

536

raise RuntimeError(f"Cannot access storage: {self.model_uri}")

537

538

# Download with retry

539

model_path = self.storage_handler.download_with_retry(self.model_uri)

540

541

# Load model

542

self.model = joblib.load(model_path)

543

self.ready = True

544

```

545

546

## Storage Backend Support

547

548

### Supported Protocols

549

550

```python { .api }

551

SUPPORTED_PROTOCOLS = {

552

"s3": "Amazon S3 and S3-compatible storage",

553

"gs": "Google Cloud Storage",

554

"azure": "Azure Blob Storage",

555

"pvc": "Kubernetes Persistent Volume Claims",

556

"file": "Local filesystem",

557

"http": "HTTP/HTTPS downloads",

558

"https": "Secure HTTP downloads",

559

"hf": "HuggingFace Hub models"

560

}

561

```

562

563

### Authentication Methods

564

565

```python { .api }

566

AUTHENTICATION_METHODS = {

567

"s3": ["access_key", "iam_role", "instance_profile"],

568

"gs": ["service_account", "default_credentials", "workload_identity"],

569

"azure": ["account_key", "sas_token", "managed_identity"],

570

"hf": ["access_token", "login"]

571

}

572

```

573

574

## Exception Classes

575

576

Storage-related exceptions that may be raised during storage operations.

577

578

```python { .api }

579

class ModelMissingError(Exception):

580

def __init__(self, path: str):

581

"""

582

Exception raised when model file is missing from storage.

583

584

Args:

585

path (str): Path to missing model file

586

"""

587

588

class ModelNotFound(Exception):

589

def __init__(self, model_name: str = None):

590

"""

591

Exception raised when requested model does not exist.

592

593

Args:

594

model_name (str, optional): Name of missing model

595

"""

596

597

class InferenceError(RuntimeError):

598

def __init__(self, reason: str, status: str = None, debug_details: str = None):

599

"""

600

Exception raised during inference operations.

601

602

Args:

603

reason (str): Error reason

604

status (str, optional): Error status code

605

debug_details (str, optional): Additional debug information

606

"""

607

608

class InvalidInput(ValueError):

609

def __init__(self, reason: str):

610

"""

611

Exception raised for invalid input arguments.

612

613

Args:

614

reason (str): Validation error reason

615

"""

616

```

617

618

## Types

619

620

```python { .api }

621

from typing import Dict, Any, List, Optional, Union

622

623

StorageURI = str

624

LocalPath = str

625

StorageConfig = Union[S3Config, GCSConfig, AzureConfig]

626

StorageMetadata = Dict[str, Any]

627

ObjectList = List[str]

628

```