CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-deeplake

Database for AI powered by a storage format optimized for deep-learning applications.

75

1.59x

Evaluation75%

1.59x

Agent success when using this tile

Overview
Eval results
Files

storage-system.mddocs/

Storage System

Multi-cloud storage abstraction supporting local filesystem, S3, GCS, and Azure with built-in compression, encryption, and performance optimization. Deep Lake's storage layer provides unified access patterns across different storage backends.

Capabilities

Storage Reader Operations

Read operations for accessing data from various storage backends with automatic optimization and caching.

class Reader:
    """Storage read operations."""
    
    path: str
    original_path: str
    token: Optional[str]
    
    def get(self, path: str) -> bytes:
        """
        Get data from storage path.
        
        Parameters:
        - path: Storage path to read from
        
        Returns:
        bytes: Raw data from storage
        """
    
    def length(self, path: str) -> int:
        """
        Get length of data at storage path.
        
        Parameters:
        - path: Storage path to check
        
        Returns:
        int: Data length in bytes
        """
    
    def list(self, path: str = "") -> List[str]:
        """
        List items at storage path.
        
        Parameters:
        - path: Storage path to list (empty for root)
        
        Returns:
        List[str]: List of item names at path
        """
    
    def subdir(self, path: str) -> Reader:
        """
        Create reader for subdirectory.
        
        Parameters:
        - path: Subdirectory path
        
        Returns:
        Reader: Reader instance for subdirectory
        """

Storage Writer Operations

Write operations for storing data to various storage backends with automatic compression and optimization.

class Writer:
    """Storage write operations."""
    
    path: str
    original_path: str
    token: Optional[str]
    
    def set(self, path: str, data: bytes) -> None:
        """
        Store data at storage path.
        
        Parameters:
        - path: Storage path to write to
        - data: Raw data to store
        """
    
    def remove(self, path: str) -> None:
        """
        Remove item at storage path.
        
        Parameters:
        - path: Storage path to remove
        """
    
    def remove_directory(self, path: str) -> None:
        """
        Remove directory and all contents.
        
        Parameters:
        - path: Directory path to remove
        """
    
    def subdir(self, path: str) -> Writer:
        """
        Create writer for subdirectory.
        
        Parameters:
        - path: Subdirectory path
        
        Returns:
        Writer: Writer instance for subdirectory
        """

Storage Metadata

Access metadata information for storage resources including size, timestamps, and ETags.

class ResourceMeta:
    """Storage resource metadata."""
    
    path: str
    size: int
    etag: Optional[str]
    last_modified: Optional[str]

Storage Configuration

Global storage configuration for performance tuning and concurrency control.

def concurrency() -> int:
    """
    Get current storage thread count.
    
    Returns:
    int: Number of concurrent storage threads
    """

def set_concurrency(num_threads: int) -> None:
    """
    Set storage thread count for parallel operations.
    
    Parameters:
    - num_threads: Number of concurrent threads for storage operations
    """

Usage Examples

Basic Storage Operations

import deeplake

# Access storage directly (usually not needed for normal usage)
# Storage operations are typically handled automatically by datasets

# Get storage configuration
current_threads = deeplake.storage.concurrency()
print(f"Current storage threads: {current_threads}")

# Optimize for high-performance systems
deeplake.storage.set_concurrency(8)
print("Increased storage concurrency for better performance")

Local Filesystem Storage

# Create dataset on local filesystem
dataset = deeplake.create("./local_dataset")

# Deep Lake automatically handles local storage operations
dataset.add_column("data", deeplake.types.Text())
dataset.append({"data": "sample text"})
dataset.commit("Added sample data")

# Storage operations happen transparently
print(f"Dataset stored locally at: {dataset.path}")

S3 Storage Integration

# S3 credentials
s3_creds = {
    "aws_access_key_id": "your_access_key",
    "aws_secret_access_key": "your_secret_key",
    "aws_region": "us-east-1"
}

# Create dataset on S3
s3_dataset = deeplake.create("s3://my-bucket/my-dataset", creds=s3_creds)

# Storage operations work the same across backends
s3_dataset.add_column("images", deeplake.types.Image())
s3_dataset.add_column("labels", deeplake.types.Text())

# Batch upload to S3
batch_data = [
    {"images": f"s3://my-bucket/images/img_{i}.jpg", "labels": f"label_{i}"}
    for i in range(1000)
]

s3_dataset.extend(batch_data)
s3_dataset.commit("Uploaded batch to S3")

print(f"S3 dataset has {len(s3_dataset)} rows")

Google Cloud Storage Integration

# GCS credentials (using service account key)
gcs_creds = {
    "google_application_credentials": "/path/to/service-account-key.json"
}

# Alternative: using service account JSON content
gcs_creds_json = {
    "google_application_credentials_json": {
        "type": "service_account",
        "project_id": "your-project-id",
        "private_key_id": "key-id",
        "private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n",
        "client_email": "service-account@project.iam.gserviceaccount.com",
        "client_id": "client-id",
        "auth_uri": "https://accounts.google.com/o/oauth2/auth",
        "token_uri": "https://oauth2.googleapis.com/token"
    }
}

# Create dataset on GCS
gcs_dataset = deeplake.create("gcs://my-bucket/my-dataset", creds=gcs_creds)

# Storage operations are identical across platforms
gcs_dataset.add_column("embeddings", deeplake.types.Embedding(size=768))
gcs_dataset.append({"embeddings": [0.1] * 768})
gcs_dataset.commit("Added embeddings to GCS")

Azure Blob Storage Integration

# Azure credentials
azure_creds = {
    "azure_storage_account": "mystorageaccount",
    "azure_storage_key": "your_storage_key"
}

# Alternative: using connection string
azure_creds_conn = {
    "azure_storage_connection_string": "DefaultEndpointsProtocol=https;AccountName=mystorageaccount;AccountKey=your_key;EndpointSuffix=core.windows.net"
}

# Alternative: using SAS token
azure_creds_sas = {
    "azure_storage_account": "mystorageaccount",
    "azure_storage_sas_token": "your_sas_token"
}

# Create dataset on Azure
azure_dataset = deeplake.create("azure://my-container/my-dataset", creds=azure_creds)

# Same operations across all cloud providers
azure_dataset.add_column("videos", deeplake.types.Video())
azure_dataset.append({"videos": "azure://my-container/videos/video1.mp4"})
azure_dataset.commit("Added video to Azure")

Multi-Cloud Dataset Management

# Create datasets across multiple cloud providers
datasets = {}

# Local for development
datasets["local"] = deeplake.create("./dev_dataset")

# S3 for production
datasets["s3"] = deeplake.create("s3://prod-bucket/dataset", creds=s3_creds)

# GCS for backup
datasets["gcs"] = deeplake.create("gcs://backup-bucket/dataset", creds=gcs_creds)

# Same schema across all datasets
for name, dataset in datasets.items():
    dataset.add_column("id", deeplake.types.Int64())
    dataset.add_column("data", deeplake.types.Text())
    dataset.add_column("timestamp", deeplake.types.Int64())
    
    # Add sample data
    dataset.append({
        "id": 1,
        "data": f"Sample data in {name}",
        "timestamp": 1640995200  # Unix timestamp
    })
    
    dataset.commit(f"Initial data in {name}")
    print(f"Created {name} dataset with {len(dataset)} rows")

# Copy data between cloud providers
deeplake.copy("./dev_dataset", "s3://prod-bucket/dev-copy", dst_creds=s3_creds)
print("Copied local dataset to S3")

Storage Performance Optimization

import time

# Measure storage performance
def benchmark_storage_operations(dataset, num_operations=100):
    start_time = time.time()
    
    # Batch operations for better performance
    batch_data = [
        {"data": f"sample_{i}", "value": i * 0.1}
        for i in range(num_operations)
    ]
    
    dataset.extend(batch_data)
    dataset.commit(f"Added {num_operations} rows")
    
    end_time = time.time()
    return end_time - start_time

# Test with different storage backends
s3_dataset = deeplake.create("s3://benchmark-bucket/s3-test", creds=s3_creds)
s3_dataset.add_column("data", deeplake.types.Text())
s3_dataset.add_column("value", deeplake.types.Float32())

# Optimize storage concurrency for benchmarking
original_concurrency = deeplake.storage.concurrency()
deeplake.storage.set_concurrency(16)  # Increase for high-throughput

s3_time = benchmark_storage_operations(s3_dataset, 1000)
print(f"S3 operations took {s3_time:.2f} seconds")

# Restore original concurrency
deeplake.storage.set_concurrency(original_concurrency)

Storage Error Handling

# Robust storage operations with error handling
def safe_dataset_operation(dataset_url, creds, operation_func):
    try:
        dataset = deeplake.open(dataset_url, creds=creds)
        result = operation_func(dataset)
        return result
    
    except deeplake.StorageAccessDenied:
        print("Storage access denied - check credentials")
        return None
    
    except deeplake.StorageKeyNotFound:
        print("Dataset not found - check URL")
        return None
    
    except deeplake.StorageNetworkConnectionError:
        print("Network connection error - check connectivity")
        return None
    
    except deeplake.StorageInternalError:
        print("Storage internal error - try again later")
        return None

# Safe operations with automatic retry
def add_data_safely(dataset_url, creds, data):
    def add_data_operation(dataset):
        dataset.extend(data)
        dataset.commit("Added data safely")
        return len(dataset)
    
    result = safe_dataset_operation(dataset_url, creds, add_data_operation)
    if result:
        print(f"Successfully added data. Dataset now has {result} rows")
    else:
        print("Failed to add data")

# Example usage
sample_data = [{"text": f"sample_{i}"} for i in range(10)]
add_data_safely("s3://my-bucket/safe-dataset", s3_creds, sample_data)

Storage Monitoring and Metrics

# Monitor storage performance and usage
class StorageMonitor:
    def __init__(self):
        self.operations = []
    
    def time_operation(self, operation_name, operation_func):
        start_time = time.time()
        try:
            result = operation_func()
            end_time = time.time()
            duration = end_time - start_time
            
            self.operations.append({
                "operation": operation_name,
                "duration": duration,
                "success": True,
                "timestamp": start_time
            })
            
            return result
            
        except Exception as e:
            end_time = time.time()
            duration = end_time - start_time
            
            self.operations.append({
                "operation": operation_name,
                "duration": duration,
                "success": False,
                "error": str(e),
                "timestamp": start_time
            })
            
            raise
    
    def get_stats(self):
        if not self.operations:
            return {"message": "No operations recorded"}
        
        successful_ops = [op for op in self.operations if op["success"]]
        failed_ops = [op for op in self.operations if not op["success"]]
        
        avg_duration = sum(op["duration"] for op in successful_ops) / len(successful_ops) if successful_ops else 0
        
        return {
            "total_operations": len(self.operations),
            "successful": len(successful_ops),
            "failed": len(failed_ops),
            "average_duration": avg_duration,
            "success_rate": len(successful_ops) / len(self.operations) * 100
        }

# Usage
monitor = StorageMonitor()

# Monitor dataset creation
dataset = monitor.time_operation(
    "create_dataset",
    lambda: deeplake.create("s3://monitor-bucket/test-dataset", creds=s3_creds)
)

# Monitor data operations
monitor.time_operation(
    "add_column",
    lambda: dataset.add_column("data", deeplake.types.Text())
)

monitor.time_operation(
    "append_data",
    lambda: dataset.append({"data": "test data"})
)

monitor.time_operation(
    "commit",
    lambda: dataset.commit("Test commit")
)

# Get performance statistics
stats = monitor.get_stats()
print(f"Storage operations statistics: {stats}")

Advanced Storage Configuration

# Configure storage for different use cases

# High-throughput configuration
def configure_for_high_throughput():
    # Increase concurrency for parallel operations
    deeplake.storage.set_concurrency(32)
    print("Configured for high-throughput operations")

# Memory-efficient configuration  
def configure_for_memory_efficiency():
    # Reduce concurrency to save memory
    deeplake.storage.set_concurrency(2)
    print("Configured for memory efficiency")

# Balanced configuration
def configure_balanced():
    # Moderate concurrency for balanced performance
    deeplake.storage.set_concurrency(8)
    print("Configured for balanced performance")

# Apply configuration based on use case
import psutil

# Auto-configure based on system resources
available_cores = psutil.cpu_count()
available_memory_gb = psutil.virtual_memory().total / (1024**3)

if available_cores >= 16 and available_memory_gb >= 32:
    configure_for_high_throughput()
elif available_memory_gb < 8:
    configure_for_memory_efficiency()
else:
    configure_balanced()

print(f"System: {available_cores} cores, {available_memory_gb:.1f}GB RAM")
print(f"Storage concurrency: {deeplake.storage.concurrency()}")

Install with Tessl CLI

npx tessl i tessl/pypi-deeplake

docs

data-access.md

data-import-export.md

dataset-management.md

error-handling.md

framework-integration.md

index.md

query-system.md

schema-templates.md

storage-system.md

type-system.md

version-control.md

tile.json