Database for AI powered by a storage format optimized for deep-learning applications.
75
Evaluation — 75%
↑ 1.59xAgent success when using this tile
Multi-cloud storage abstraction supporting local filesystem, S3, GCS, and Azure with built-in compression, encryption, and performance optimization. Deep Lake's storage layer provides unified access patterns across different storage backends.
Read operations for accessing data from various storage backends with automatic optimization and caching.
class Reader:
"""Storage read operations."""
path: str
original_path: str
token: Optional[str]
def get(self, path: str) -> bytes:
"""
Get data from storage path.
Parameters:
- path: Storage path to read from
Returns:
bytes: Raw data from storage
"""
def length(self, path: str) -> int:
"""
Get length of data at storage path.
Parameters:
- path: Storage path to check
Returns:
int: Data length in bytes
"""
def list(self, path: str = "") -> List[str]:
"""
List items at storage path.
Parameters:
- path: Storage path to list (empty for root)
Returns:
List[str]: List of item names at path
"""
def subdir(self, path: str) -> Reader:
"""
Create reader for subdirectory.
Parameters:
- path: Subdirectory path
Returns:
Reader: Reader instance for subdirectory
"""Write operations for storing data to various storage backends with automatic compression and optimization.
class Writer:
"""Storage write operations."""
path: str
original_path: str
token: Optional[str]
def set(self, path: str, data: bytes) -> None:
"""
Store data at storage path.
Parameters:
- path: Storage path to write to
- data: Raw data to store
"""
def remove(self, path: str) -> None:
"""
Remove item at storage path.
Parameters:
- path: Storage path to remove
"""
def remove_directory(self, path: str) -> None:
"""
Remove directory and all contents.
Parameters:
- path: Directory path to remove
"""
def subdir(self, path: str) -> Writer:
"""
Create writer for subdirectory.
Parameters:
- path: Subdirectory path
Returns:
Writer: Writer instance for subdirectory
"""Access metadata information for storage resources including size, timestamps, and ETags.
class ResourceMeta:
"""Storage resource metadata."""
path: str
size: int
etag: Optional[str]
last_modified: Optional[str]Global storage configuration for performance tuning and concurrency control.
def concurrency() -> int:
"""
Get current storage thread count.
Returns:
int: Number of concurrent storage threads
"""
def set_concurrency(num_threads: int) -> None:
"""
Set storage thread count for parallel operations.
Parameters:
- num_threads: Number of concurrent threads for storage operations
"""import deeplake
# Access storage directly (usually not needed for normal usage)
# Storage operations are typically handled automatically by datasets
# Get storage configuration
current_threads = deeplake.storage.concurrency()
print(f"Current storage threads: {current_threads}")
# Optimize for high-performance systems
deeplake.storage.set_concurrency(8)
print("Increased storage concurrency for better performance")# Create dataset on local filesystem
dataset = deeplake.create("./local_dataset")
# Deep Lake automatically handles local storage operations
dataset.add_column("data", deeplake.types.Text())
dataset.append({"data": "sample text"})
dataset.commit("Added sample data")
# Storage operations happen transparently
print(f"Dataset stored locally at: {dataset.path}")# S3 credentials
s3_creds = {
"aws_access_key_id": "your_access_key",
"aws_secret_access_key": "your_secret_key",
"aws_region": "us-east-1"
}
# Create dataset on S3
s3_dataset = deeplake.create("s3://my-bucket/my-dataset", creds=s3_creds)
# Storage operations work the same across backends
s3_dataset.add_column("images", deeplake.types.Image())
s3_dataset.add_column("labels", deeplake.types.Text())
# Batch upload to S3
batch_data = [
{"images": f"s3://my-bucket/images/img_{i}.jpg", "labels": f"label_{i}"}
for i in range(1000)
]
s3_dataset.extend(batch_data)
s3_dataset.commit("Uploaded batch to S3")
print(f"S3 dataset has {len(s3_dataset)} rows")# GCS credentials (using service account key)
gcs_creds = {
"google_application_credentials": "/path/to/service-account-key.json"
}
# Alternative: using service account JSON content
gcs_creds_json = {
"google_application_credentials_json": {
"type": "service_account",
"project_id": "your-project-id",
"private_key_id": "key-id",
"private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n",
"client_email": "service-account@project.iam.gserviceaccount.com",
"client_id": "client-id",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token"
}
}
# Create dataset on GCS
gcs_dataset = deeplake.create("gcs://my-bucket/my-dataset", creds=gcs_creds)
# Storage operations are identical across platforms
gcs_dataset.add_column("embeddings", deeplake.types.Embedding(size=768))
gcs_dataset.append({"embeddings": [0.1] * 768})
gcs_dataset.commit("Added embeddings to GCS")# Azure credentials
azure_creds = {
"azure_storage_account": "mystorageaccount",
"azure_storage_key": "your_storage_key"
}
# Alternative: using connection string
azure_creds_conn = {
"azure_storage_connection_string": "DefaultEndpointsProtocol=https;AccountName=mystorageaccount;AccountKey=your_key;EndpointSuffix=core.windows.net"
}
# Alternative: using SAS token
azure_creds_sas = {
"azure_storage_account": "mystorageaccount",
"azure_storage_sas_token": "your_sas_token"
}
# Create dataset on Azure
azure_dataset = deeplake.create("azure://my-container/my-dataset", creds=azure_creds)
# Same operations across all cloud providers
azure_dataset.add_column("videos", deeplake.types.Video())
azure_dataset.append({"videos": "azure://my-container/videos/video1.mp4"})
azure_dataset.commit("Added video to Azure")# Create datasets across multiple cloud providers
datasets = {}
# Local for development
datasets["local"] = deeplake.create("./dev_dataset")
# S3 for production
datasets["s3"] = deeplake.create("s3://prod-bucket/dataset", creds=s3_creds)
# GCS for backup
datasets["gcs"] = deeplake.create("gcs://backup-bucket/dataset", creds=gcs_creds)
# Same schema across all datasets
for name, dataset in datasets.items():
dataset.add_column("id", deeplake.types.Int64())
dataset.add_column("data", deeplake.types.Text())
dataset.add_column("timestamp", deeplake.types.Int64())
# Add sample data
dataset.append({
"id": 1,
"data": f"Sample data in {name}",
"timestamp": 1640995200 # Unix timestamp
})
dataset.commit(f"Initial data in {name}")
print(f"Created {name} dataset with {len(dataset)} rows")
# Copy data between cloud providers
deeplake.copy("./dev_dataset", "s3://prod-bucket/dev-copy", dst_creds=s3_creds)
print("Copied local dataset to S3")import time
# Measure storage performance
def benchmark_storage_operations(dataset, num_operations=100):
start_time = time.time()
# Batch operations for better performance
batch_data = [
{"data": f"sample_{i}", "value": i * 0.1}
for i in range(num_operations)
]
dataset.extend(batch_data)
dataset.commit(f"Added {num_operations} rows")
end_time = time.time()
return end_time - start_time
# Test with different storage backends
s3_dataset = deeplake.create("s3://benchmark-bucket/s3-test", creds=s3_creds)
s3_dataset.add_column("data", deeplake.types.Text())
s3_dataset.add_column("value", deeplake.types.Float32())
# Optimize storage concurrency for benchmarking
original_concurrency = deeplake.storage.concurrency()
deeplake.storage.set_concurrency(16) # Increase for high-throughput
s3_time = benchmark_storage_operations(s3_dataset, 1000)
print(f"S3 operations took {s3_time:.2f} seconds")
# Restore original concurrency
deeplake.storage.set_concurrency(original_concurrency)# Robust storage operations with error handling
def safe_dataset_operation(dataset_url, creds, operation_func):
try:
dataset = deeplake.open(dataset_url, creds=creds)
result = operation_func(dataset)
return result
except deeplake.StorageAccessDenied:
print("Storage access denied - check credentials")
return None
except deeplake.StorageKeyNotFound:
print("Dataset not found - check URL")
return None
except deeplake.StorageNetworkConnectionError:
print("Network connection error - check connectivity")
return None
except deeplake.StorageInternalError:
print("Storage internal error - try again later")
return None
# Safe operations with automatic retry
def add_data_safely(dataset_url, creds, data):
def add_data_operation(dataset):
dataset.extend(data)
dataset.commit("Added data safely")
return len(dataset)
result = safe_dataset_operation(dataset_url, creds, add_data_operation)
if result:
print(f"Successfully added data. Dataset now has {result} rows")
else:
print("Failed to add data")
# Example usage
sample_data = [{"text": f"sample_{i}"} for i in range(10)]
add_data_safely("s3://my-bucket/safe-dataset", s3_creds, sample_data)# Monitor storage performance and usage
class StorageMonitor:
def __init__(self):
self.operations = []
def time_operation(self, operation_name, operation_func):
start_time = time.time()
try:
result = operation_func()
end_time = time.time()
duration = end_time - start_time
self.operations.append({
"operation": operation_name,
"duration": duration,
"success": True,
"timestamp": start_time
})
return result
except Exception as e:
end_time = time.time()
duration = end_time - start_time
self.operations.append({
"operation": operation_name,
"duration": duration,
"success": False,
"error": str(e),
"timestamp": start_time
})
raise
def get_stats(self):
if not self.operations:
return {"message": "No operations recorded"}
successful_ops = [op for op in self.operations if op["success"]]
failed_ops = [op for op in self.operations if not op["success"]]
avg_duration = sum(op["duration"] for op in successful_ops) / len(successful_ops) if successful_ops else 0
return {
"total_operations": len(self.operations),
"successful": len(successful_ops),
"failed": len(failed_ops),
"average_duration": avg_duration,
"success_rate": len(successful_ops) / len(self.operations) * 100
}
# Usage
monitor = StorageMonitor()
# Monitor dataset creation
dataset = monitor.time_operation(
"create_dataset",
lambda: deeplake.create("s3://monitor-bucket/test-dataset", creds=s3_creds)
)
# Monitor data operations
monitor.time_operation(
"add_column",
lambda: dataset.add_column("data", deeplake.types.Text())
)
monitor.time_operation(
"append_data",
lambda: dataset.append({"data": "test data"})
)
monitor.time_operation(
"commit",
lambda: dataset.commit("Test commit")
)
# Get performance statistics
stats = monitor.get_stats()
print(f"Storage operations statistics: {stats}")# Configure storage for different use cases
# High-throughput configuration
def configure_for_high_throughput():
# Increase concurrency for parallel operations
deeplake.storage.set_concurrency(32)
print("Configured for high-throughput operations")
# Memory-efficient configuration
def configure_for_memory_efficiency():
# Reduce concurrency to save memory
deeplake.storage.set_concurrency(2)
print("Configured for memory efficiency")
# Balanced configuration
def configure_balanced():
# Moderate concurrency for balanced performance
deeplake.storage.set_concurrency(8)
print("Configured for balanced performance")
# Apply configuration based on use case
import psutil
# Auto-configure based on system resources
available_cores = psutil.cpu_count()
available_memory_gb = psutil.virtual_memory().total / (1024**3)
if available_cores >= 16 and available_memory_gb >= 32:
configure_for_high_throughput()
elif available_memory_gb < 8:
configure_for_memory_efficiency()
else:
configure_balanced()
print(f"System: {available_cores} cores, {available_memory_gb:.1f}GB RAM")
print(f"Storage concurrency: {deeplake.storage.concurrency()}")Install with Tessl CLI
npx tessl i tessl/pypi-deeplakedocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10