Database for AI powered by a storage format optimized for deep-learning applications.
75
Evaluation — 75%
↑ 1.59xAgent success when using this tile
Comprehensive data import/export capabilities supporting various formats including Parquet, CSV, COCO datasets, and custom data ingestion pipelines. Deep Lake provides seamless data migration and integration with existing data workflows.
Import data from various formats with automatic schema detection and type conversion.
def from_parquet(url_or_bytes: Union[str, bytes]) -> ReadOnlyDataset:
"""
Create dataset from Parquet file or bytes.
Parameters:
- url_or_bytes: Parquet file path/URL or raw bytes
Returns:
ReadOnlyDataset: Read-only dataset with Parquet data
"""
def from_csv(url_or_bytes: Union[str, bytes]) -> ReadOnlyDataset:
"""
Create dataset from CSV file or bytes.
Parameters:
- url_or_bytes: CSV file path/URL or raw bytes
Returns:
ReadOnlyDataset: Read-only dataset with CSV data
"""
def from_coco(images_directory: Union[str, pathlib.Path], annotation_files: Dict[str, Union[str, pathlib.Path]], dest: Union[str, pathlib.Path], dest_creds: Optional[Dict[str, str]] = None, key_to_column_mapping: Optional[Dict] = None, file_to_group_mapping: Optional[Dict] = None) -> Dataset:
"""
Import COCO format dataset.
Parameters:
- images_directory: Directory containing COCO images
- annotation_files: Dictionary mapping annotation type to JSON file path (keys: 'instances', 'keypoints', 'stuff')
- dest: Destination path for Deep Lake dataset
- dest_creds: Storage credentials for destination
- key_to_column_mapping: Optional mapping of COCO keys to column names
- file_to_group_mapping: Optional mapping of file types to group names
Returns:
Dataset: Deep Lake dataset with COCO data
"""Export datasets to various formats for integration with other tools and workflows.
class DatasetView:
"""Export capabilities for dataset views."""
def to_csv(self, path: str) -> None:
"""
Export dataset view to CSV format.
Parameters:
- path: Output CSV file path
"""Convert datasets between Deep Lake versions with data preservation and format migration.
def convert(src: str, dst: str, dst_creds: Optional[Dict[str, str]] = None, token: Optional[str] = None) -> None:
"""
Convert v3 dataset to v4 format.
Parameters:
- src: Source v3 dataset path
- dst: Destination v4 dataset path
- dst_creds: Destination storage credentials
- token: Activeloop authentication token
"""import deeplake
# Import from local Parquet file
dataset = deeplake.from_parquet("./data/my_data.parquet")
print(f"Imported {len(dataset)} rows from Parquet")
print(f"Columns: {[col.name for col in dataset.schema.columns]}")
# Access imported data
for i in range(min(5, len(dataset))):
row = dataset[i]
print(f"Row {i}: {row.to_dict()}")
# Import from remote Parquet file
s3_dataset = deeplake.from_parquet("s3://my-bucket/data.parquet")
print(f"Imported {len(s3_dataset)} rows from S3 Parquet")
# Convert to mutable dataset if needed
mutable_dataset = deeplake.like(dataset, "./mutable_from_parquet")
print("Created mutable copy of Parquet data")# Import from local CSV file
csv_dataset = deeplake.from_csv("./data/dataset.csv")
print(f"Imported {len(csv_dataset)} rows from CSV")
# Examine schema (automatically inferred)
schema = csv_dataset.schema
for col in schema.columns:
print(f"Column '{col.name}': {type(col.dtype)}")
# Import from URL
url_dataset = deeplake.from_csv("https://example.com/data.csv")
print(f"Imported {len(url_dataset)} rows from URL")
# Import from bytes (useful for processing in-memory CSV)
import io
csv_content = """name,age,score
Alice,25,0.95
Bob,30,0.88
Charlie,35,0.92"""
csv_bytes = csv_content.encode('utf-8')
bytes_dataset = deeplake.from_csv(csv_bytes)
print(f"Imported {len(bytes_dataset)} rows from bytes")
# Access CSV data
for row in bytes_dataset:
print(f"Name: {row['name']}, Age: {row['age']}, Score: {row['score']}")import pathlib
# Import COCO dataset with instances annotations
coco_dataset = deeplake.from_coco(
images_directory="./coco_data/images",
annotation_files={"instances": "./coco_data/annotations/instances_train2017.json"},
dest="./coco_deep_lake"
)
print(f"Imported COCO dataset with {len(coco_dataset)} samples")
# Examine COCO schema
for col in coco_dataset.schema.columns:
print(f"COCO column: {col.name} ({type(col.dtype)})")
# Access COCO data
sample = coco_dataset[0]
print(f"Image: {sample['images']}")
# Import with multiple annotation types
full_coco_dataset = deeplake.from_coco(
images_directory=pathlib.Path("./coco_data/images"),
annotation_files={
"instances": "./coco_data/annotations/instances_train2017.json",
"keypoints": "./coco_data/annotations/person_keypoints_train2017.json"
],
dest="s3://my-bucket/full_coco_dataset",
dest_creds={"aws_access_key_id": "...", "aws_secret_access_key": "..."}
)
print(f"Full COCO dataset: {len(full_coco_dataset)} samples")# Create sample dataset
dataset = deeplake.create("./export_dataset")
dataset.add_column("id", deeplake.types.Int64())
dataset.add_column("name", deeplake.types.Text())
dataset.add_column("score", deeplake.types.Float32())
# Add sample data
for i in range(100):
dataset.append({
"id": i,
"name": f"item_{i}",
"score": i * 0.01
})
dataset.commit("Added sample data for export")
# Export to CSV
dataset.to_csv("./exported_data.csv")
print("Exported dataset to CSV")
# Export filtered data
high_scores = deeplake.query("SELECT * FROM dataset WHERE score > 0.5")
high_scores.to_csv("./high_scores.csv")
print("Exported filtered data to CSV")
# Export specific columns
columns_subset = deeplake.query("SELECT name, score FROM dataset")
columns_subset.to_csv("./subset_data.csv")
print("Exported subset of columns to CSV")# Convert Deep Lake v3 dataset to v4 format
deeplake.convert(
src="./old_v3_dataset",
dst="./new_v4_dataset"
)
print("Converted v3 dataset to v4 format")
# Convert with cloud storage
deeplake.convert(
src="s3://old-bucket/v3_dataset",
dst="s3://new-bucket/v4_dataset",
dst_creds={"aws_access_key_id": "...", "aws_secret_access_key": "..."}
)
print("Converted cloud v3 dataset to v4 format")
# Open converted dataset
converted_dataset = deeplake.open("./new_v4_dataset")
print(f"Converted dataset has {len(converted_dataset)} rows")
print(f"Schema: {[col.name for col in converted_dataset.schema.columns]}")import pandas as pd
import numpy as np
from pathlib import Path
def ingest_custom_format(data_dir: str, dest_path: str):
"""Custom ingestion pipeline for proprietary format."""
# Create target dataset
dataset = deeplake.create(dest_path)
# Define schema based on source format
dataset.add_column("file_id", deeplake.types.Text())
dataset.add_column("image", deeplake.types.Image())
dataset.add_column("metadata", deeplake.types.Dict())
dataset.add_column("features", deeplake.types.Array(deeplake.types.Float32(), shape=[512]))
# Process source files
data_path = Path(data_dir)
batch_data = []
for file_path in data_path.glob("*.json"):
# Read custom metadata format
with open(file_path, 'r') as f:
metadata = json.load(f)
# Find corresponding image
image_path = data_path / f"{file_path.stem}.jpg"
if not image_path.exists():
continue
# Extract features (example: using pre-computed features)
features_path = data_path / f"{file_path.stem}_features.npy"
if features_path.exists():
features = np.load(features_path).astype(np.float32)
else:
features = np.zeros(512, dtype=np.float32)
# Prepare batch entry
batch_data.append({
"file_id": file_path.stem,
"image": str(image_path),
"metadata": metadata,
"features": features
})
# Batch commit for performance
if len(batch_data) >= 100:
dataset.extend(batch_data)
dataset.commit(f"Ingested batch of {len(batch_data)} items")
batch_data = []
# Final commit
if batch_data:
dataset.extend(batch_data)
dataset.commit(f"Final batch of {len(batch_data)} items")
print(f"Ingestion complete. Dataset has {len(dataset)} items")
return dataset
# Use custom ingestion pipeline
custom_dataset = ingest_custom_format("./custom_data", "./ingested_dataset")def process_multiple_sources(sources: List[Dict], output_path: str):
"""Process multiple data sources into unified dataset."""
# Create unified dataset
unified_dataset = deeplake.create(output_path)
# Define common schema
unified_dataset.add_column("source", deeplake.types.Text())
unified_dataset.add_column("id", deeplake.types.Text())
unified_dataset.add_column("content", deeplake.types.Text())
unified_dataset.add_column("timestamp", deeplake.types.Int64())
unified_dataset.add_column("metadata", deeplake.types.Dict())
for source_config in sources:
source_type = source_config["type"]
source_path = source_config["path"]
source_name = source_config["name"]
print(f"Processing {source_name} ({source_type})...")
if source_type == "csv":
# Import CSV and transform
csv_data = deeplake.from_csv(source_path)
for row in csv_data:
unified_dataset.append({
"source": source_name,
"id": f"{source_name}_{row['id']}",
"content": row.get("text", ""),
"timestamp": int(row.get("timestamp", 0)),
"metadata": {"original_source": source_type}
})
elif source_type == "parquet":
# Import Parquet and transform
parquet_data = deeplake.from_parquet(source_path)
for row in parquet_data:
unified_dataset.append({
"source": source_name,
"id": f"{source_name}_{row['identifier']}",
"content": row.get("content", ""),
"timestamp": int(row.get("created_at", 0)),
"metadata": {"original_source": source_type, "extra": row.get("extra", {})}
})
# Commit after each source
unified_dataset.commit(f"Added data from {source_name}")
print(f"Unified dataset created with {len(unified_dataset)} total records")
return unified_dataset
# Example usage
sources = [
{"type": "csv", "path": "./data/source1.csv", "name": "dataset_a"},
{"type": "parquet", "path": "./data/source2.parquet", "name": "dataset_b"},
{"type": "csv", "path": "s3://bucket/source3.csv", "name": "dataset_c"}
]
unified = process_multiple_sources(sources, "./unified_dataset")# Create complex dataset for export examples
dataset = deeplake.create("./complex_export_dataset")
dataset.add_column("id", deeplake.types.Int64())
dataset.add_column("category", deeplake.types.Text())
dataset.add_column("embeddings", deeplake.types.Embedding(size=128))
dataset.add_column("image_path", deeplake.types.Text())
dataset.add_column("metadata", deeplake.types.Dict())
dataset.add_column("active", deeplake.types.Bool())
# Add sample data
for i in range(1000):
dataset.append({
"id": i,
"category": f"category_{i % 10}",
"embeddings": np.random.random(128).astype(np.float32),
"image_path": f"images/img_{i}.jpg",
"metadata": {"score": np.random.random(), "tags": [f"tag_{j}" for j in range(3)]},
"active": i % 2 == 0
})
dataset.commit("Added complex sample data")
# Export with filtering
active_records = deeplake.query("SELECT * FROM dataset WHERE active == true")
active_records.to_csv("./active_records.csv")
# Export specific categories
category_5 = deeplake.query("SELECT * FROM dataset WHERE category == 'category_5'")
category_5.to_csv("./category_5_data.csv")
# Export aggregated data
category_stats = deeplake.query("""
SELECT category, COUNT(*) as count, AVG(metadata['score']) as avg_score
FROM dataset
GROUP BY category
""")
category_stats.to_csv("./category_statistics.csv")
print("Exported multiple views of complex dataset")import pandas as pd
# Export Deep Lake data for Pandas processing
def export_for_pandas(dataset_view, include_embeddings=False):
"""Export dataset to format suitable for Pandas."""
# Create temporary CSV (excluding complex types)
if include_embeddings:
# For datasets with embeddings, we need special handling
data_rows = []
for row in dataset_view:
row_dict = row.to_dict()
# Convert embeddings to string representation
if "embeddings" in row_dict:
row_dict["embeddings"] = str(row_dict["embeddings"].tolist())
data_rows.append(row_dict)
return pd.DataFrame(data_rows)
else:
# Export to CSV and read with Pandas
temp_csv = "./temp_export.csv"
dataset_view.to_csv(temp_csv)
df = pd.read_csv(temp_csv)
Path(temp_csv).unlink() # Clean up
return df
# Use with Pandas
df = export_for_pandas(dataset[0:100]) # First 100 rows
print(f"Pandas DataFrame shape: {df.shape}")
print(df.head())
# Process with Pandas and re-import
processed_df = df.groupby('category').agg({
'id': 'count',
'active': 'sum'
}).rename(columns={'id': 'total_count', 'active': 'active_count'})
# Convert processed results back to Deep Lake
processed_dataset = deeplake.create("./processed_results")
processed_dataset.add_column("category", deeplake.types.Text())
processed_dataset.add_column("total_count", deeplake.types.Int64())
processed_dataset.add_column("active_count", deeplake.types.Int64())
for category, row in processed_df.iterrows():
processed_dataset.append({
"category": category,
"total_count": int(row['total_count']),
"active_count": int(row['active_count'])
})
processed_dataset.commit("Imported processed Pandas results")
print(f"Processed dataset has {len(processed_dataset)} category summaries")Install with Tessl CLI
npx tessl i tessl/pypi-deeplakedocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10