CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-deeplake

Database for AI powered by a storage format optimized for deep-learning applications.

75

1.59x

Evaluation75%

1.59x

Agent success when using this tile

Overview
Eval results
Files

data-import-export.mddocs/

Data Import and Export

Comprehensive data import/export capabilities supporting various formats including Parquet, CSV, COCO datasets, and custom data ingestion pipelines. Deep Lake provides seamless data migration and integration with existing data workflows.

Capabilities

Data Import Functions

Import data from various formats with automatic schema detection and type conversion.

def from_parquet(url_or_bytes: Union[str, bytes]) -> ReadOnlyDataset:
    """
    Create dataset from Parquet file or bytes.
    
    Parameters:
    - url_or_bytes: Parquet file path/URL or raw bytes
    
    Returns:
    ReadOnlyDataset: Read-only dataset with Parquet data
    """

def from_csv(url_or_bytes: Union[str, bytes]) -> ReadOnlyDataset:
    """
    Create dataset from CSV file or bytes.
    
    Parameters:
    - url_or_bytes: CSV file path/URL or raw bytes
    
    Returns:
    ReadOnlyDataset: Read-only dataset with CSV data
    """

def from_coco(images_directory: Union[str, pathlib.Path], annotation_files: Dict[str, Union[str, pathlib.Path]], dest: Union[str, pathlib.Path], dest_creds: Optional[Dict[str, str]] = None, key_to_column_mapping: Optional[Dict] = None, file_to_group_mapping: Optional[Dict] = None) -> Dataset:
    """
    Import COCO format dataset.
    
    Parameters:
    - images_directory: Directory containing COCO images
    - annotation_files: Dictionary mapping annotation type to JSON file path (keys: 'instances', 'keypoints', 'stuff')
    - dest: Destination path for Deep Lake dataset
    - dest_creds: Storage credentials for destination
    - key_to_column_mapping: Optional mapping of COCO keys to column names
    - file_to_group_mapping: Optional mapping of file types to group names
    
    Returns:
    Dataset: Deep Lake dataset with COCO data
    """

Data Export Functions

Export datasets to various formats for integration with other tools and workflows.

class DatasetView:
    """Export capabilities for dataset views."""
    
    def to_csv(self, path: str) -> None:
        """
        Export dataset view to CSV format.
        
        Parameters:
        - path: Output CSV file path
        """

Legacy Data Conversion

Convert datasets between Deep Lake versions with data preservation and format migration.

def convert(src: str, dst: str, dst_creds: Optional[Dict[str, str]] = None, token: Optional[str] = None) -> None:
    """
    Convert v3 dataset to v4 format.
    
    Parameters:
    - src: Source v3 dataset path
    - dst: Destination v4 dataset path
    - dst_creds: Destination storage credentials
    - token: Activeloop authentication token
    """

Usage Examples

Parquet Import

import deeplake

# Import from local Parquet file
dataset = deeplake.from_parquet("./data/my_data.parquet")
print(f"Imported {len(dataset)} rows from Parquet")
print(f"Columns: {[col.name for col in dataset.schema.columns]}")

# Access imported data
for i in range(min(5, len(dataset))):
    row = dataset[i]
    print(f"Row {i}: {row.to_dict()}")

# Import from remote Parquet file
s3_dataset = deeplake.from_parquet("s3://my-bucket/data.parquet")
print(f"Imported {len(s3_dataset)} rows from S3 Parquet")

# Convert to mutable dataset if needed
mutable_dataset = deeplake.like(dataset, "./mutable_from_parquet")
print("Created mutable copy of Parquet data")

CSV Import

# Import from local CSV file
csv_dataset = deeplake.from_csv("./data/dataset.csv")
print(f"Imported {len(csv_dataset)} rows from CSV")

# Examine schema (automatically inferred)
schema = csv_dataset.schema
for col in schema.columns:
    print(f"Column '{col.name}': {type(col.dtype)}")

# Import from URL
url_dataset = deeplake.from_csv("https://example.com/data.csv")
print(f"Imported {len(url_dataset)} rows from URL")

# Import from bytes (useful for processing in-memory CSV)
import io
csv_content = """name,age,score
Alice,25,0.95
Bob,30,0.88
Charlie,35,0.92"""

csv_bytes = csv_content.encode('utf-8')
bytes_dataset = deeplake.from_csv(csv_bytes)
print(f"Imported {len(bytes_dataset)} rows from bytes")

# Access CSV data
for row in bytes_dataset:
    print(f"Name: {row['name']}, Age: {row['age']}, Score: {row['score']}")

COCO Dataset Import

import pathlib

# Import COCO dataset with instances annotations
coco_dataset = deeplake.from_coco(
    images_directory="./coco_data/images",
    annotation_files={"instances": "./coco_data/annotations/instances_train2017.json"},
    dest="./coco_deep_lake"
)

print(f"Imported COCO dataset with {len(coco_dataset)} samples")

# Examine COCO schema
for col in coco_dataset.schema.columns:
    print(f"COCO column: {col.name} ({type(col.dtype)})")

# Access COCO data
sample = coco_dataset[0]
print(f"Image: {sample['images']}")

# Import with multiple annotation types
full_coco_dataset = deeplake.from_coco(
    images_directory=pathlib.Path("./coco_data/images"),
    annotation_files={
        "instances": "./coco_data/annotations/instances_train2017.json",
        "keypoints": "./coco_data/annotations/person_keypoints_train2017.json"
    ],
    dest="s3://my-bucket/full_coco_dataset",
    dest_creds={"aws_access_key_id": "...", "aws_secret_access_key": "..."}
)

print(f"Full COCO dataset: {len(full_coco_dataset)} samples")

CSV Export

# Create sample dataset
dataset = deeplake.create("./export_dataset")
dataset.add_column("id", deeplake.types.Int64())
dataset.add_column("name", deeplake.types.Text())
dataset.add_column("score", deeplake.types.Float32())

# Add sample data
for i in range(100):
    dataset.append({
        "id": i,
        "name": f"item_{i}",
        "score": i * 0.01
    })

dataset.commit("Added sample data for export")

# Export to CSV
dataset.to_csv("./exported_data.csv")
print("Exported dataset to CSV")

# Export filtered data
high_scores = deeplake.query("SELECT * FROM dataset WHERE score > 0.5")
high_scores.to_csv("./high_scores.csv")
print("Exported filtered data to CSV")

# Export specific columns
columns_subset = deeplake.query("SELECT name, score FROM dataset")
columns_subset.to_csv("./subset_data.csv")
print("Exported subset of columns to CSV")

Legacy Dataset Conversion

# Convert Deep Lake v3 dataset to v4 format
deeplake.convert(
    src="./old_v3_dataset",
    dst="./new_v4_dataset"
)
print("Converted v3 dataset to v4 format")

# Convert with cloud storage
deeplake.convert(
    src="s3://old-bucket/v3_dataset",
    dst="s3://new-bucket/v4_dataset",
    dst_creds={"aws_access_key_id": "...", "aws_secret_access_key": "..."}
)
print("Converted cloud v3 dataset to v4 format")

# Open converted dataset
converted_dataset = deeplake.open("./new_v4_dataset")
print(f"Converted dataset has {len(converted_dataset)} rows")
print(f"Schema: {[col.name for col in converted_dataset.schema.columns]}")

Custom Data Ingestion Pipeline

import pandas as pd
import numpy as np
from pathlib import Path

def ingest_custom_format(data_dir: str, dest_path: str):
    """Custom ingestion pipeline for proprietary format."""
    
    # Create target dataset
    dataset = deeplake.create(dest_path)
    
    # Define schema based on source format
    dataset.add_column("file_id", deeplake.types.Text())
    dataset.add_column("image", deeplake.types.Image())
    dataset.add_column("metadata", deeplake.types.Dict())
    dataset.add_column("features", deeplake.types.Array(deeplake.types.Float32(), shape=[512]))
    
    # Process source files
    data_path = Path(data_dir)
    batch_data = []
    
    for file_path in data_path.glob("*.json"):
        # Read custom metadata format
        with open(file_path, 'r') as f:
            metadata = json.load(f)
        
        # Find corresponding image
        image_path = data_path / f"{file_path.stem}.jpg"
        if not image_path.exists():
            continue
        
        # Extract features (example: using pre-computed features)
        features_path = data_path / f"{file_path.stem}_features.npy"
        if features_path.exists():
            features = np.load(features_path).astype(np.float32)
        else:
            features = np.zeros(512, dtype=np.float32)
        
        # Prepare batch entry
        batch_data.append({
            "file_id": file_path.stem,
            "image": str(image_path),
            "metadata": metadata,
            "features": features
        })
        
        # Batch commit for performance
        if len(batch_data) >= 100:
            dataset.extend(batch_data)
            dataset.commit(f"Ingested batch of {len(batch_data)} items")
            batch_data = []
    
    # Final commit
    if batch_data:
        dataset.extend(batch_data)
        dataset.commit(f"Final batch of {len(batch_data)} items")
    
    print(f"Ingestion complete. Dataset has {len(dataset)} items")
    return dataset

# Use custom ingestion pipeline
custom_dataset = ingest_custom_format("./custom_data", "./ingested_dataset")

Batch Data Processing Pipeline

def process_multiple_sources(sources: List[Dict], output_path: str):
    """Process multiple data sources into unified dataset."""
    
    # Create unified dataset
    unified_dataset = deeplake.create(output_path)
    
    # Define common schema
    unified_dataset.add_column("source", deeplake.types.Text())
    unified_dataset.add_column("id", deeplake.types.Text())
    unified_dataset.add_column("content", deeplake.types.Text())
    unified_dataset.add_column("timestamp", deeplake.types.Int64())
    unified_dataset.add_column("metadata", deeplake.types.Dict())
    
    for source_config in sources:
        source_type = source_config["type"]
        source_path = source_config["path"]
        source_name = source_config["name"]
        
        print(f"Processing {source_name} ({source_type})...")
        
        if source_type == "csv":
            # Import CSV and transform
            csv_data = deeplake.from_csv(source_path)
            
            for row in csv_data:
                unified_dataset.append({
                    "source": source_name,
                    "id": f"{source_name}_{row['id']}",
                    "content": row.get("text", ""),
                    "timestamp": int(row.get("timestamp", 0)),
                    "metadata": {"original_source": source_type}
                })
        
        elif source_type == "parquet":
            # Import Parquet and transform
            parquet_data = deeplake.from_parquet(source_path)
            
            for row in parquet_data:
                unified_dataset.append({
                    "source": source_name,
                    "id": f"{source_name}_{row['identifier']}",
                    "content": row.get("content", ""),
                    "timestamp": int(row.get("created_at", 0)),
                    "metadata": {"original_source": source_type, "extra": row.get("extra", {})}
                })
        
        # Commit after each source
        unified_dataset.commit(f"Added data from {source_name}")
    
    print(f"Unified dataset created with {len(unified_dataset)} total records")
    return unified_dataset

# Example usage
sources = [
    {"type": "csv", "path": "./data/source1.csv", "name": "dataset_a"},
    {"type": "parquet", "path": "./data/source2.parquet", "name": "dataset_b"},
    {"type": "csv", "path": "s3://bucket/source3.csv", "name": "dataset_c"}
]

unified = process_multiple_sources(sources, "./unified_dataset")

Advanced Export Options

# Create complex dataset for export examples
dataset = deeplake.create("./complex_export_dataset")

dataset.add_column("id", deeplake.types.Int64())
dataset.add_column("category", deeplake.types.Text())
dataset.add_column("embeddings", deeplake.types.Embedding(size=128))
dataset.add_column("image_path", deeplake.types.Text())
dataset.add_column("metadata", deeplake.types.Dict())
dataset.add_column("active", deeplake.types.Bool())

# Add sample data
for i in range(1000):
    dataset.append({
        "id": i,
        "category": f"category_{i % 10}",
        "embeddings": np.random.random(128).astype(np.float32),
        "image_path": f"images/img_{i}.jpg",
        "metadata": {"score": np.random.random(), "tags": [f"tag_{j}" for j in range(3)]},
        "active": i % 2 == 0
    })

dataset.commit("Added complex sample data")

# Export with filtering
active_records = deeplake.query("SELECT * FROM dataset WHERE active == true")
active_records.to_csv("./active_records.csv")

# Export specific categories
category_5 = deeplake.query("SELECT * FROM dataset WHERE category == 'category_5'")
category_5.to_csv("./category_5_data.csv")

# Export aggregated data
category_stats = deeplake.query("""
    SELECT category, COUNT(*) as count, AVG(metadata['score']) as avg_score
    FROM dataset 
    GROUP BY category
""")
category_stats.to_csv("./category_statistics.csv")

print("Exported multiple views of complex dataset")

Integration with Pandas

import pandas as pd

# Export Deep Lake data for Pandas processing
def export_for_pandas(dataset_view, include_embeddings=False):
    """Export dataset to format suitable for Pandas."""
    
    # Create temporary CSV (excluding complex types)
    if include_embeddings:
        # For datasets with embeddings, we need special handling
        data_rows = []
        for row in dataset_view:
            row_dict = row.to_dict()
            # Convert embeddings to string representation
            if "embeddings" in row_dict:
                row_dict["embeddings"] = str(row_dict["embeddings"].tolist())
            data_rows.append(row_dict) 
        
        return pd.DataFrame(data_rows)
    else:
        # Export to CSV and read with Pandas
        temp_csv = "./temp_export.csv"
        dataset_view.to_csv(temp_csv)
        df = pd.read_csv(temp_csv)
        Path(temp_csv).unlink()  # Clean up
        return df

# Use with Pandas
df = export_for_pandas(dataset[0:100])  # First 100 rows
print(f"Pandas DataFrame shape: {df.shape}")
print(df.head())

# Process with Pandas and re-import
processed_df = df.groupby('category').agg({
    'id': 'count',
    'active': 'sum'
}).rename(columns={'id': 'total_count', 'active': 'active_count'})

# Convert processed results back to Deep Lake
processed_dataset = deeplake.create("./processed_results")
processed_dataset.add_column("category", deeplake.types.Text())
processed_dataset.add_column("total_count", deeplake.types.Int64())
processed_dataset.add_column("active_count", deeplake.types.Int64())

for category, row in processed_df.iterrows():
    processed_dataset.append({
        "category": category,
        "total_count": int(row['total_count']),
        "active_count": int(row['active_count'])
    })

processed_dataset.commit("Imported processed Pandas results")
print(f"Processed dataset has {len(processed_dataset)} category summaries")

Install with Tessl CLI

npx tessl i tessl/pypi-deeplake

docs

data-access.md

data-import-export.md

dataset-management.md

error-handling.md

framework-integration.md

index.md

query-system.md

schema-templates.md

storage-system.md

type-system.md

version-control.md

tile.json