or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-access.mddata-import-export.mddataset-management.mderror-handling.mdframework-integration.mdindex.mdquery-system.mdschema-templates.mdstorage-system.mdtype-system.mdversion-control.md

storage-system.mddocs/

0

# Storage System

1

2

Multi-cloud storage abstraction supporting local filesystem, S3, GCS, and Azure with built-in compression, encryption, and performance optimization. Deep Lake's storage layer provides unified access patterns across different storage backends.

3

4

## Capabilities

5

6

### Storage Reader Operations

7

8

Read operations for accessing data from various storage backends with automatic optimization and caching.

9

10

```python { .api }

11

class Reader:

12

"""Storage read operations."""

13

14

path: str

15

original_path: str

16

token: Optional[str]

17

18

def get(self, path: str) -> bytes:

19

"""

20

Get data from storage path.

21

22

Parameters:

23

- path: Storage path to read from

24

25

Returns:

26

bytes: Raw data from storage

27

"""

28

29

def length(self, path: str) -> int:

30

"""

31

Get length of data at storage path.

32

33

Parameters:

34

- path: Storage path to check

35

36

Returns:

37

int: Data length in bytes

38

"""

39

40

def list(self, path: str = "") -> List[str]:

41

"""

42

List items at storage path.

43

44

Parameters:

45

- path: Storage path to list (empty for root)

46

47

Returns:

48

List[str]: List of item names at path

49

"""

50

51

def subdir(self, path: str) -> Reader:

52

"""

53

Create reader for subdirectory.

54

55

Parameters:

56

- path: Subdirectory path

57

58

Returns:

59

Reader: Reader instance for subdirectory

60

"""

61

```

62

63

### Storage Writer Operations

64

65

Write operations for storing data to various storage backends with automatic compression and optimization.

66

67

```python { .api }

68

class Writer:

69

"""Storage write operations."""

70

71

path: str

72

original_path: str

73

token: Optional[str]

74

75

def set(self, path: str, data: bytes) -> None:

76

"""

77

Store data at storage path.

78

79

Parameters:

80

- path: Storage path to write to

81

- data: Raw data to store

82

"""

83

84

def remove(self, path: str) -> None:

85

"""

86

Remove item at storage path.

87

88

Parameters:

89

- path: Storage path to remove

90

"""

91

92

def remove_directory(self, path: str) -> None:

93

"""

94

Remove directory and all contents.

95

96

Parameters:

97

- path: Directory path to remove

98

"""

99

100

def subdir(self, path: str) -> Writer:

101

"""

102

Create writer for subdirectory.

103

104

Parameters:

105

- path: Subdirectory path

106

107

Returns:

108

Writer: Writer instance for subdirectory

109

"""

110

```

111

112

### Storage Metadata

113

114

Access metadata information for storage resources including size, timestamps, and ETags.

115

116

```python { .api }

117

class ResourceMeta:

118

"""Storage resource metadata."""

119

120

path: str

121

size: int

122

etag: Optional[str]

123

last_modified: Optional[str]

124

```

125

126

### Storage Configuration

127

128

Global storage configuration for performance tuning and concurrency control.

129

130

```python { .api }

131

def concurrency() -> int:

132

"""

133

Get current storage thread count.

134

135

Returns:

136

int: Number of concurrent storage threads

137

"""

138

139

def set_concurrency(num_threads: int) -> None:

140

"""

141

Set storage thread count for parallel operations.

142

143

Parameters:

144

- num_threads: Number of concurrent threads for storage operations

145

"""

146

```

147

148

## Usage Examples

149

150

### Basic Storage Operations

151

152

```python

153

import deeplake

154

155

# Access storage directly (usually not needed for normal usage)

156

# Storage operations are typically handled automatically by datasets

157

158

# Get storage configuration

159

current_threads = deeplake.storage.concurrency()

160

print(f"Current storage threads: {current_threads}")

161

162

# Optimize for high-performance systems

163

deeplake.storage.set_concurrency(8)

164

print("Increased storage concurrency for better performance")

165

```

166

167

### Local Filesystem Storage

168

169

```python

170

# Create dataset on local filesystem

171

dataset = deeplake.create("./local_dataset")

172

173

# Deep Lake automatically handles local storage operations

174

dataset.add_column("data", deeplake.types.Text())

175

dataset.append({"data": "sample text"})

176

dataset.commit("Added sample data")

177

178

# Storage operations happen transparently

179

print(f"Dataset stored locally at: {dataset.path}")

180

```

181

182

### S3 Storage Integration

183

184

```python

185

# S3 credentials

186

s3_creds = {

187

"aws_access_key_id": "your_access_key",

188

"aws_secret_access_key": "your_secret_key",

189

"aws_region": "us-east-1"

190

}

191

192

# Create dataset on S3

193

s3_dataset = deeplake.create("s3://my-bucket/my-dataset", creds=s3_creds)

194

195

# Storage operations work the same across backends

196

s3_dataset.add_column("images", deeplake.types.Image())

197

s3_dataset.add_column("labels", deeplake.types.Text())

198

199

# Batch upload to S3

200

batch_data = [

201

{"images": f"s3://my-bucket/images/img_{i}.jpg", "labels": f"label_{i}"}

202

for i in range(1000)

203

]

204

205

s3_dataset.extend(batch_data)

206

s3_dataset.commit("Uploaded batch to S3")

207

208

print(f"S3 dataset has {len(s3_dataset)} rows")

209

```

210

211

### Google Cloud Storage Integration

212

213

```python

214

# GCS credentials (using service account key)

215

gcs_creds = {

216

"google_application_credentials": "/path/to/service-account-key.json"

217

}

218

219

# Alternative: using service account JSON content

220

gcs_creds_json = {

221

"google_application_credentials_json": {

222

"type": "service_account",

223

"project_id": "your-project-id",

224

"private_key_id": "key-id",

225

"private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n",

226

"client_email": "service-account@project.iam.gserviceaccount.com",

227

"client_id": "client-id",

228

"auth_uri": "https://accounts.google.com/o/oauth2/auth",

229

"token_uri": "https://oauth2.googleapis.com/token"

230

}

231

}

232

233

# Create dataset on GCS

234

gcs_dataset = deeplake.create("gcs://my-bucket/my-dataset", creds=gcs_creds)

235

236

# Storage operations are identical across platforms

237

gcs_dataset.add_column("embeddings", deeplake.types.Embedding(size=768))

238

gcs_dataset.append({"embeddings": [0.1] * 768})

239

gcs_dataset.commit("Added embeddings to GCS")

240

```

241

242

### Azure Blob Storage Integration

243

244

```python

245

# Azure credentials

246

azure_creds = {

247

"azure_storage_account": "mystorageaccount",

248

"azure_storage_key": "your_storage_key"

249

}

250

251

# Alternative: using connection string

252

azure_creds_conn = {

253

"azure_storage_connection_string": "DefaultEndpointsProtocol=https;AccountName=mystorageaccount;AccountKey=your_key;EndpointSuffix=core.windows.net"

254

}

255

256

# Alternative: using SAS token

257

azure_creds_sas = {

258

"azure_storage_account": "mystorageaccount",

259

"azure_storage_sas_token": "your_sas_token"

260

}

261

262

# Create dataset on Azure

263

azure_dataset = deeplake.create("azure://my-container/my-dataset", creds=azure_creds)

264

265

# Same operations across all cloud providers

266

azure_dataset.add_column("videos", deeplake.types.Video())

267

azure_dataset.append({"videos": "azure://my-container/videos/video1.mp4"})

268

azure_dataset.commit("Added video to Azure")

269

```

270

271

### Multi-Cloud Dataset Management

272

273

```python

274

# Create datasets across multiple cloud providers

275

datasets = {}

276

277

# Local for development

278

datasets["local"] = deeplake.create("./dev_dataset")

279

280

# S3 for production

281

datasets["s3"] = deeplake.create("s3://prod-bucket/dataset", creds=s3_creds)

282

283

# GCS for backup

284

datasets["gcs"] = deeplake.create("gcs://backup-bucket/dataset", creds=gcs_creds)

285

286

# Same schema across all datasets

287

for name, dataset in datasets.items():

288

dataset.add_column("id", deeplake.types.Int64())

289

dataset.add_column("data", deeplake.types.Text())

290

dataset.add_column("timestamp", deeplake.types.Int64())

291

292

# Add sample data

293

dataset.append({

294

"id": 1,

295

"data": f"Sample data in {name}",

296

"timestamp": 1640995200 # Unix timestamp

297

})

298

299

dataset.commit(f"Initial data in {name}")

300

print(f"Created {name} dataset with {len(dataset)} rows")

301

302

# Copy data between cloud providers

303

deeplake.copy("./dev_dataset", "s3://prod-bucket/dev-copy", dst_creds=s3_creds)

304

print("Copied local dataset to S3")

305

```

306

307

### Storage Performance Optimization

308

309

```python

310

import time

311

312

# Measure storage performance

313

def benchmark_storage_operations(dataset, num_operations=100):

314

start_time = time.time()

315

316

# Batch operations for better performance

317

batch_data = [

318

{"data": f"sample_{i}", "value": i * 0.1}

319

for i in range(num_operations)

320

]

321

322

dataset.extend(batch_data)

323

dataset.commit(f"Added {num_operations} rows")

324

325

end_time = time.time()

326

return end_time - start_time

327

328

# Test with different storage backends

329

s3_dataset = deeplake.create("s3://benchmark-bucket/s3-test", creds=s3_creds)

330

s3_dataset.add_column("data", deeplake.types.Text())

331

s3_dataset.add_column("value", deeplake.types.Float32())

332

333

# Optimize storage concurrency for benchmarking

334

original_concurrency = deeplake.storage.concurrency()

335

deeplake.storage.set_concurrency(16) # Increase for high-throughput

336

337

s3_time = benchmark_storage_operations(s3_dataset, 1000)

338

print(f"S3 operations took {s3_time:.2f} seconds")

339

340

# Restore original concurrency

341

deeplake.storage.set_concurrency(original_concurrency)

342

```

343

344

### Storage Error Handling

345

346

```python

347

# Robust storage operations with error handling

348

def safe_dataset_operation(dataset_url, creds, operation_func):

349

try:

350

dataset = deeplake.open(dataset_url, creds=creds)

351

result = operation_func(dataset)

352

return result

353

354

except deeplake.StorageAccessDenied:

355

print("Storage access denied - check credentials")

356

return None

357

358

except deeplake.StorageKeyNotFound:

359

print("Dataset not found - check URL")

360

return None

361

362

except deeplake.StorageNetworkConnectionError:

363

print("Network connection error - check connectivity")

364

return None

365

366

except deeplake.StorageInternalError:

367

print("Storage internal error - try again later")

368

return None

369

370

# Safe operations with automatic retry

371

def add_data_safely(dataset_url, creds, data):

372

def add_data_operation(dataset):

373

dataset.extend(data)

374

dataset.commit("Added data safely")

375

return len(dataset)

376

377

result = safe_dataset_operation(dataset_url, creds, add_data_operation)

378

if result:

379

print(f"Successfully added data. Dataset now has {result} rows")

380

else:

381

print("Failed to add data")

382

383

# Example usage

384

sample_data = [{"text": f"sample_{i}"} for i in range(10)]

385

add_data_safely("s3://my-bucket/safe-dataset", s3_creds, sample_data)

386

```

387

388

### Storage Monitoring and Metrics

389

390

```python

391

# Monitor storage performance and usage

392

class StorageMonitor:

393

def __init__(self):

394

self.operations = []

395

396

def time_operation(self, operation_name, operation_func):

397

start_time = time.time()

398

try:

399

result = operation_func()

400

end_time = time.time()

401

duration = end_time - start_time

402

403

self.operations.append({

404

"operation": operation_name,

405

"duration": duration,

406

"success": True,

407

"timestamp": start_time

408

})

409

410

return result

411

412

except Exception as e:

413

end_time = time.time()

414

duration = end_time - start_time

415

416

self.operations.append({

417

"operation": operation_name,

418

"duration": duration,

419

"success": False,

420

"error": str(e),

421

"timestamp": start_time

422

})

423

424

raise

425

426

def get_stats(self):

427

if not self.operations:

428

return {"message": "No operations recorded"}

429

430

successful_ops = [op for op in self.operations if op["success"]]

431

failed_ops = [op for op in self.operations if not op["success"]]

432

433

avg_duration = sum(op["duration"] for op in successful_ops) / len(successful_ops) if successful_ops else 0

434

435

return {

436

"total_operations": len(self.operations),

437

"successful": len(successful_ops),

438

"failed": len(failed_ops),

439

"average_duration": avg_duration,

440

"success_rate": len(successful_ops) / len(self.operations) * 100

441

}

442

443

# Usage

444

monitor = StorageMonitor()

445

446

# Monitor dataset creation

447

dataset = monitor.time_operation(

448

"create_dataset",

449

lambda: deeplake.create("s3://monitor-bucket/test-dataset", creds=s3_creds)

450

)

451

452

# Monitor data operations

453

monitor.time_operation(

454

"add_column",

455

lambda: dataset.add_column("data", deeplake.types.Text())

456

)

457

458

monitor.time_operation(

459

"append_data",

460

lambda: dataset.append({"data": "test data"})

461

)

462

463

monitor.time_operation(

464

"commit",

465

lambda: dataset.commit("Test commit")

466

)

467

468

# Get performance statistics

469

stats = monitor.get_stats()

470

print(f"Storage operations statistics: {stats}")

471

```

472

473

### Advanced Storage Configuration

474

475

```python

476

# Configure storage for different use cases

477

478

# High-throughput configuration

479

def configure_for_high_throughput():

480

# Increase concurrency for parallel operations

481

deeplake.storage.set_concurrency(32)

482

print("Configured for high-throughput operations")

483

484

# Memory-efficient configuration

485

def configure_for_memory_efficiency():

486

# Reduce concurrency to save memory

487

deeplake.storage.set_concurrency(2)

488

print("Configured for memory efficiency")

489

490

# Balanced configuration

491

def configure_balanced():

492

# Moderate concurrency for balanced performance

493

deeplake.storage.set_concurrency(8)

494

print("Configured for balanced performance")

495

496

# Apply configuration based on use case

497

import psutil

498

499

# Auto-configure based on system resources

500

available_cores = psutil.cpu_count()

501

available_memory_gb = psutil.virtual_memory().total / (1024**3)

502

503

if available_cores >= 16 and available_memory_gb >= 32:

504

configure_for_high_throughput()

505

elif available_memory_gb < 8:

506

configure_for_memory_efficiency()

507

else:

508

configure_balanced()

509

510

print(f"System: {available_cores} cores, {available_memory_gb:.1f}GB RAM")

511

print(f"Storage concurrency: {deeplake.storage.concurrency()}")

512

```