0
# Storage System
1
2
Multi-cloud storage abstraction supporting local filesystem, S3, GCS, and Azure with built-in compression, encryption, and performance optimization. Deep Lake's storage layer provides unified access patterns across different storage backends.
3
4
## Capabilities
5
6
### Storage Reader Operations
7
8
Read operations for accessing data from various storage backends with automatic optimization and caching.
9
10
```python { .api }
11
class Reader:
12
"""Storage read operations."""
13
14
path: str
15
original_path: str
16
token: Optional[str]
17
18
def get(self, path: str) -> bytes:
19
"""
20
Get data from storage path.
21
22
Parameters:
23
- path: Storage path to read from
24
25
Returns:
26
bytes: Raw data from storage
27
"""
28
29
def length(self, path: str) -> int:
30
"""
31
Get length of data at storage path.
32
33
Parameters:
34
- path: Storage path to check
35
36
Returns:
37
int: Data length in bytes
38
"""
39
40
def list(self, path: str = "") -> List[str]:
41
"""
42
List items at storage path.
43
44
Parameters:
45
- path: Storage path to list (empty for root)
46
47
Returns:
48
List[str]: List of item names at path
49
"""
50
51
def subdir(self, path: str) -> Reader:
52
"""
53
Create reader for subdirectory.
54
55
Parameters:
56
- path: Subdirectory path
57
58
Returns:
59
Reader: Reader instance for subdirectory
60
"""
61
```
62
63
### Storage Writer Operations
64
65
Write operations for storing data to various storage backends with automatic compression and optimization.
66
67
```python { .api }
68
class Writer:
69
"""Storage write operations."""
70
71
path: str
72
original_path: str
73
token: Optional[str]
74
75
def set(self, path: str, data: bytes) -> None:
76
"""
77
Store data at storage path.
78
79
Parameters:
80
- path: Storage path to write to
81
- data: Raw data to store
82
"""
83
84
def remove(self, path: str) -> None:
85
"""
86
Remove item at storage path.
87
88
Parameters:
89
- path: Storage path to remove
90
"""
91
92
def remove_directory(self, path: str) -> None:
93
"""
94
Remove directory and all contents.
95
96
Parameters:
97
- path: Directory path to remove
98
"""
99
100
def subdir(self, path: str) -> Writer:
101
"""
102
Create writer for subdirectory.
103
104
Parameters:
105
- path: Subdirectory path
106
107
Returns:
108
Writer: Writer instance for subdirectory
109
"""
110
```
111
112
### Storage Metadata
113
114
Access metadata information for storage resources including size, timestamps, and ETags.
115
116
```python { .api }
117
class ResourceMeta:
118
"""Storage resource metadata."""
119
120
path: str
121
size: int
122
etag: Optional[str]
123
last_modified: Optional[str]
124
```
125
126
### Storage Configuration
127
128
Global storage configuration for performance tuning and concurrency control.
129
130
```python { .api }
131
def concurrency() -> int:
132
"""
133
Get current storage thread count.
134
135
Returns:
136
int: Number of concurrent storage threads
137
"""
138
139
def set_concurrency(num_threads: int) -> None:
140
"""
141
Set storage thread count for parallel operations.
142
143
Parameters:
144
- num_threads: Number of concurrent threads for storage operations
145
"""
146
```
147
148
## Usage Examples
149
150
### Basic Storage Operations
151
152
```python
153
import deeplake
154
155
# Access storage directly (usually not needed for normal usage)
156
# Storage operations are typically handled automatically by datasets
157
158
# Get storage configuration
159
current_threads = deeplake.storage.concurrency()
160
print(f"Current storage threads: {current_threads}")
161
162
# Optimize for high-performance systems
163
deeplake.storage.set_concurrency(8)
164
print("Increased storage concurrency for better performance")
165
```
166
167
### Local Filesystem Storage
168
169
```python
170
# Create dataset on local filesystem
171
dataset = deeplake.create("./local_dataset")
172
173
# Deep Lake automatically handles local storage operations
174
dataset.add_column("data", deeplake.types.Text())
175
dataset.append({"data": "sample text"})
176
dataset.commit("Added sample data")
177
178
# Storage operations happen transparently
179
print(f"Dataset stored locally at: {dataset.path}")
180
```
181
182
### S3 Storage Integration
183
184
```python
185
# S3 credentials
186
s3_creds = {
187
"aws_access_key_id": "your_access_key",
188
"aws_secret_access_key": "your_secret_key",
189
"aws_region": "us-east-1"
190
}
191
192
# Create dataset on S3
193
s3_dataset = deeplake.create("s3://my-bucket/my-dataset", creds=s3_creds)
194
195
# Storage operations work the same across backends
196
s3_dataset.add_column("images", deeplake.types.Image())
197
s3_dataset.add_column("labels", deeplake.types.Text())
198
199
# Batch upload to S3
200
batch_data = [
201
{"images": f"s3://my-bucket/images/img_{i}.jpg", "labels": f"label_{i}"}
202
for i in range(1000)
203
]
204
205
s3_dataset.extend(batch_data)
206
s3_dataset.commit("Uploaded batch to S3")
207
208
print(f"S3 dataset has {len(s3_dataset)} rows")
209
```
210
211
### Google Cloud Storage Integration
212
213
```python
214
# GCS credentials (using service account key)
215
gcs_creds = {
216
"google_application_credentials": "/path/to/service-account-key.json"
217
}
218
219
# Alternative: using service account JSON content
220
gcs_creds_json = {
221
"google_application_credentials_json": {
222
"type": "service_account",
223
"project_id": "your-project-id",
224
"private_key_id": "key-id",
225
"private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n",
226
"client_email": "service-account@project.iam.gserviceaccount.com",
227
"client_id": "client-id",
228
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
229
"token_uri": "https://oauth2.googleapis.com/token"
230
}
231
}
232
233
# Create dataset on GCS
234
gcs_dataset = deeplake.create("gcs://my-bucket/my-dataset", creds=gcs_creds)
235
236
# Storage operations are identical across platforms
237
gcs_dataset.add_column("embeddings", deeplake.types.Embedding(size=768))
238
gcs_dataset.append({"embeddings": [0.1] * 768})
239
gcs_dataset.commit("Added embeddings to GCS")
240
```
241
242
### Azure Blob Storage Integration
243
244
```python
245
# Azure credentials
246
azure_creds = {
247
"azure_storage_account": "mystorageaccount",
248
"azure_storage_key": "your_storage_key"
249
}
250
251
# Alternative: using connection string
252
azure_creds_conn = {
253
"azure_storage_connection_string": "DefaultEndpointsProtocol=https;AccountName=mystorageaccount;AccountKey=your_key;EndpointSuffix=core.windows.net"
254
}
255
256
# Alternative: using SAS token
257
azure_creds_sas = {
258
"azure_storage_account": "mystorageaccount",
259
"azure_storage_sas_token": "your_sas_token"
260
}
261
262
# Create dataset on Azure
263
azure_dataset = deeplake.create("azure://my-container/my-dataset", creds=azure_creds)
264
265
# Same operations across all cloud providers
266
azure_dataset.add_column("videos", deeplake.types.Video())
267
azure_dataset.append({"videos": "azure://my-container/videos/video1.mp4"})
268
azure_dataset.commit("Added video to Azure")
269
```
270
271
### Multi-Cloud Dataset Management
272
273
```python
274
# Create datasets across multiple cloud providers
275
datasets = {}
276
277
# Local for development
278
datasets["local"] = deeplake.create("./dev_dataset")
279
280
# S3 for production
281
datasets["s3"] = deeplake.create("s3://prod-bucket/dataset", creds=s3_creds)
282
283
# GCS for backup
284
datasets["gcs"] = deeplake.create("gcs://backup-bucket/dataset", creds=gcs_creds)
285
286
# Same schema across all datasets
287
for name, dataset in datasets.items():
288
dataset.add_column("id", deeplake.types.Int64())
289
dataset.add_column("data", deeplake.types.Text())
290
dataset.add_column("timestamp", deeplake.types.Int64())
291
292
# Add sample data
293
dataset.append({
294
"id": 1,
295
"data": f"Sample data in {name}",
296
"timestamp": 1640995200 # Unix timestamp
297
})
298
299
dataset.commit(f"Initial data in {name}")
300
print(f"Created {name} dataset with {len(dataset)} rows")
301
302
# Copy data between cloud providers
303
deeplake.copy("./dev_dataset", "s3://prod-bucket/dev-copy", dst_creds=s3_creds)
304
print("Copied local dataset to S3")
305
```
306
307
### Storage Performance Optimization
308
309
```python
310
import time
311
312
# Measure storage performance
313
def benchmark_storage_operations(dataset, num_operations=100):
314
start_time = time.time()
315
316
# Batch operations for better performance
317
batch_data = [
318
{"data": f"sample_{i}", "value": i * 0.1}
319
for i in range(num_operations)
320
]
321
322
dataset.extend(batch_data)
323
dataset.commit(f"Added {num_operations} rows")
324
325
end_time = time.time()
326
return end_time - start_time
327
328
# Test with different storage backends
329
s3_dataset = deeplake.create("s3://benchmark-bucket/s3-test", creds=s3_creds)
330
s3_dataset.add_column("data", deeplake.types.Text())
331
s3_dataset.add_column("value", deeplake.types.Float32())
332
333
# Optimize storage concurrency for benchmarking
334
original_concurrency = deeplake.storage.concurrency()
335
deeplake.storage.set_concurrency(16) # Increase for high-throughput
336
337
s3_time = benchmark_storage_operations(s3_dataset, 1000)
338
print(f"S3 operations took {s3_time:.2f} seconds")
339
340
# Restore original concurrency
341
deeplake.storage.set_concurrency(original_concurrency)
342
```
343
344
### Storage Error Handling
345
346
```python
347
# Robust storage operations with error handling
348
def safe_dataset_operation(dataset_url, creds, operation_func):
349
try:
350
dataset = deeplake.open(dataset_url, creds=creds)
351
result = operation_func(dataset)
352
return result
353
354
except deeplake.StorageAccessDenied:
355
print("Storage access denied - check credentials")
356
return None
357
358
except deeplake.StorageKeyNotFound:
359
print("Dataset not found - check URL")
360
return None
361
362
except deeplake.StorageNetworkConnectionError:
363
print("Network connection error - check connectivity")
364
return None
365
366
except deeplake.StorageInternalError:
367
print("Storage internal error - try again later")
368
return None
369
370
# Safe operations with automatic retry
371
def add_data_safely(dataset_url, creds, data):
372
def add_data_operation(dataset):
373
dataset.extend(data)
374
dataset.commit("Added data safely")
375
return len(dataset)
376
377
result = safe_dataset_operation(dataset_url, creds, add_data_operation)
378
if result:
379
print(f"Successfully added data. Dataset now has {result} rows")
380
else:
381
print("Failed to add data")
382
383
# Example usage
384
sample_data = [{"text": f"sample_{i}"} for i in range(10)]
385
add_data_safely("s3://my-bucket/safe-dataset", s3_creds, sample_data)
386
```
387
388
### Storage Monitoring and Metrics
389
390
```python
391
# Monitor storage performance and usage
392
class StorageMonitor:
393
def __init__(self):
394
self.operations = []
395
396
def time_operation(self, operation_name, operation_func):
397
start_time = time.time()
398
try:
399
result = operation_func()
400
end_time = time.time()
401
duration = end_time - start_time
402
403
self.operations.append({
404
"operation": operation_name,
405
"duration": duration,
406
"success": True,
407
"timestamp": start_time
408
})
409
410
return result
411
412
except Exception as e:
413
end_time = time.time()
414
duration = end_time - start_time
415
416
self.operations.append({
417
"operation": operation_name,
418
"duration": duration,
419
"success": False,
420
"error": str(e),
421
"timestamp": start_time
422
})
423
424
raise
425
426
def get_stats(self):
427
if not self.operations:
428
return {"message": "No operations recorded"}
429
430
successful_ops = [op for op in self.operations if op["success"]]
431
failed_ops = [op for op in self.operations if not op["success"]]
432
433
avg_duration = sum(op["duration"] for op in successful_ops) / len(successful_ops) if successful_ops else 0
434
435
return {
436
"total_operations": len(self.operations),
437
"successful": len(successful_ops),
438
"failed": len(failed_ops),
439
"average_duration": avg_duration,
440
"success_rate": len(successful_ops) / len(self.operations) * 100
441
}
442
443
# Usage
444
monitor = StorageMonitor()
445
446
# Monitor dataset creation
447
dataset = monitor.time_operation(
448
"create_dataset",
449
lambda: deeplake.create("s3://monitor-bucket/test-dataset", creds=s3_creds)
450
)
451
452
# Monitor data operations
453
monitor.time_operation(
454
"add_column",
455
lambda: dataset.add_column("data", deeplake.types.Text())
456
)
457
458
monitor.time_operation(
459
"append_data",
460
lambda: dataset.append({"data": "test data"})
461
)
462
463
monitor.time_operation(
464
"commit",
465
lambda: dataset.commit("Test commit")
466
)
467
468
# Get performance statistics
469
stats = monitor.get_stats()
470
print(f"Storage operations statistics: {stats}")
471
```
472
473
### Advanced Storage Configuration
474
475
```python
476
# Configure storage for different use cases
477
478
# High-throughput configuration
479
def configure_for_high_throughput():
480
# Increase concurrency for parallel operations
481
deeplake.storage.set_concurrency(32)
482
print("Configured for high-throughput operations")
483
484
# Memory-efficient configuration
485
def configure_for_memory_efficiency():
486
# Reduce concurrency to save memory
487
deeplake.storage.set_concurrency(2)
488
print("Configured for memory efficiency")
489
490
# Balanced configuration
491
def configure_balanced():
492
# Moderate concurrency for balanced performance
493
deeplake.storage.set_concurrency(8)
494
print("Configured for balanced performance")
495
496
# Apply configuration based on use case
497
import psutil
498
499
# Auto-configure based on system resources
500
available_cores = psutil.cpu_count()
501
available_memory_gb = psutil.virtual_memory().total / (1024**3)
502
503
if available_cores >= 16 and available_memory_gb >= 32:
504
configure_for_high_throughput()
505
elif available_memory_gb < 8:
506
configure_for_memory_efficiency()
507
else:
508
configure_balanced()
509
510
print(f"System: {available_cores} cores, {available_memory_gb:.1f}GB RAM")
511
print(f"Storage concurrency: {deeplake.storage.concurrency()}")
512
```