Database for AI powered by a storage format optimized for deep-learning applications.
75
Evaluation — 75%
↑ 1.59xAgent success when using this tile
Pre-defined schema templates for common ML use cases including text embeddings, COCO datasets, and custom schema creation patterns. Schema templates provide standardized dataset structures for specific domains and applications.
Ready-to-use schema templates for common machine learning scenarios with optimized column types and indexing.
class TextEmbeddings:
"""Schema template for text embeddings datasets."""
def __init__(self, embedding_size: int, quantize: bool = False):
"""
Initialize text embeddings schema.
Parameters:
- embedding_size: Dimension of embedding vectors
- quantize: Whether to use quantized embeddings for memory efficiency
"""
class COCOImages:
"""COCO dataset schema template."""
def __init__(self, embedding_size: int, quantize: bool = False, objects: bool = True, keypoints: bool = False, stuffs: bool = False):
"""
Initialize COCO images schema.
Parameters:
- embedding_size: Dimension of embedding vectors for images
- quantize: Whether to use quantized embeddings
- objects: Include object detection annotations
- keypoints: Include keypoint detection annotations
- stuffs: Include stuff segmentation annotations
"""
class SchemaTemplate:
"""Base class for schema templates."""
passimport deeplake
from deeplake.schemas import TextEmbeddings
# Create dataset with text embeddings schema
schema = TextEmbeddings(embedding_size=768)
dataset = deeplake.create("./text_embeddings_dataset", schema=schema)
# Examine the generated schema
print("Text Embeddings Schema:")
for col in dataset.schema.columns:
print(f" {col.name}: {type(col.dtype).__name__}")
# Add text data with embeddings
import numpy as np
dataset.append({
"text": "This is a sample text for embedding.",
"embeddings": np.random.random(768).astype(np.float32)
})
dataset.append({
"text": "Another example text with semantic meaning.",
"embeddings": np.random.random(768).astype(np.float32)
})
dataset.commit("Added text embeddings data")
# Query similar texts (using embedding similarity)
target_embedding = np.random.random(768).astype(np.float32)
similar_texts = deeplake.query(f"""
SELECT text, COSINE_SIMILARITY(embeddings, {target_embedding.tolist()}) as similarity
FROM dataset
WHERE COSINE_SIMILARITY(embeddings, {target_embedding.tolist()}) > 0.5
ORDER BY similarity DESC
""")
print(f"Found {len(similar_texts)} similar texts")# Create quantized embeddings dataset for memory efficiency
quantized_schema = TextEmbeddings(embedding_size=1024, quantize=True)
quantized_dataset = deeplake.create("./quantized_embeddings", schema=quantized_schema)
print("Quantized Embeddings Schema:")
for col in quantized_dataset.schema.columns:
print(f" {col.name}: {type(col.dtype).__name__}")
if hasattr(col.dtype, 'quantization'):
print(f" Quantization: {col.dtype.quantization}")
# Add quantized embedding data
large_embeddings = [
np.random.random(1024).astype(np.float32) for _ in range(1000)
]
texts = [f"Document {i} content..." for i in range(1000)]
batch_data = [
{"text": text, "embeddings": embedding}
for text, embedding in zip(texts, large_embeddings)
]
quantized_dataset.extend(batch_data)
quantized_dataset.commit("Added quantized embeddings batch")
print(f"Quantized dataset size: {len(quantized_dataset)} documents")from deeplake.schemas import COCOImages
# Create COCO dataset with object detection
coco_schema = COCOImages(embedding_size=512, objects=True, keypoints=False)
coco_dataset = deeplake.create("./coco_dataset", schema=coco_schema)
print("COCO Images Schema:")
for col in coco_dataset.schema.columns:
print(f" {col.name}: {type(col.dtype).__name__}")
# Add COCO-style data
coco_sample = {
"images": "./images/sample_image.jpg",
"embeddings": np.random.random(512).astype(np.float32),
# Add other COCO-specific fields based on schema
}
# The schema defines the expected structure for COCO data
coco_dataset.append(coco_sample)
coco_dataset.commit("Added COCO sample")# COCO schema with keypoint detection
coco_keypoints_schema = COCOImages(
embedding_size=256,
objects=True,
keypoints=True,
stuffs=False
)
coco_keypoints_dataset = deeplake.create("./coco_keypoints", schema=coco_keypoints_schema)
print("COCO Keypoints Schema:")
for col in coco_keypoints_dataset.schema.columns:
print(f" {col.name}: {type(col.dtype).__name__}")
# Add keypoint data
keypoint_sample = {
"images": "./images/person_image.jpg",
"embeddings": np.random.random(256).astype(np.float32),
# Keypoint-specific fields would be defined by the schema
}
coco_keypoints_dataset.append(keypoint_sample)
coco_keypoints_dataset.commit("Added keypoint sample")# Create custom schema templates for specific domains
class VideoAnalysisSchema:
"""Custom schema for video analysis datasets."""
def __init__(self, frame_embedding_size=512, audio_embedding_size=128):
self.frame_embedding_size = frame_embedding_size
self.audio_embedding_size = audio_embedding_size
def create_schema(self):
"""Create the actual schema definition."""
# This would return a schema specification
# In practice, this might create the columns directly
pass
class MedicalImagingSchema:
"""Custom schema for medical imaging datasets."""
def __init__(self, include_dicom_metadata=True, embedding_size=1024):
self.include_dicom_metadata = include_dicom_metadata
self.embedding_size = embedding_size
def create_schema(self):
"""Create medical imaging schema."""
pass
# Implement custom video analysis dataset
def create_video_analysis_dataset(path, frame_emb_size=512, audio_emb_size=128):
"""Create dataset optimized for video analysis."""
dataset = deeplake.create(path)
# Video-specific columns
dataset.add_column("video_path", deeplake.types.Text())
dataset.add_column("video_metadata", deeplake.types.Dict())
dataset.add_column("duration", deeplake.types.Float32())
dataset.add_column("fps", deeplake.types.Float32())
# Frame analysis
dataset.add_column("frame_embeddings",
deeplake.types.Sequence(
deeplake.types.Embedding(size=frame_emb_size)
))
dataset.add_column("frame_timestamps",
deeplake.types.Sequence(deeplake.types.Float32()))
# Audio analysis
dataset.add_column("audio_embeddings",
deeplake.types.Sequence(
deeplake.types.Embedding(size=audio_emb_size)
))
dataset.add_column("audio_segments",
deeplake.types.Sequence(deeplake.types.Float32()))
# Analysis results
dataset.add_column("scene_labels", deeplake.types.Sequence(deeplake.types.Text()))
dataset.add_column("object_detections", deeplake.types.Sequence(deeplake.types.Dict()))
dataset.add_column("transcript", deeplake.types.Text())
return dataset
# Use custom schema
video_dataset = create_video_analysis_dataset("./video_analysis")
# Add video analysis data
video_sample = {
"video_path": "./videos/sample_video.mp4",
"video_metadata": {"resolution": "1920x1080", "codec": "h264"},
"duration": 120.5,
"fps": 30.0,
"frame_embeddings": [np.random.random(512).astype(np.float32) for _ in range(10)],
"frame_timestamps": [i * 0.033 for i in range(10)], # 30fps intervals
"audio_embeddings": [np.random.random(128).astype(np.float32) for _ in range(5)],
"audio_segments": [i * 24.1 for i in range(5)], # 5 audio segments
"scene_labels": ["indoor", "person", "conversation"],
"object_detections": [
{"bbox": [100, 100, 200, 200], "class": "person", "confidence": 0.95},
{"bbox": [300, 150, 400, 250], "class": "chair", "confidence": 0.87}
],
"transcript": "This is a sample video transcript..."
}
video_dataset.append(video_sample)
video_dataset.commit("Added video analysis sample")# E-commerce product schema
def create_ecommerce_schema(path):
"""Schema for e-commerce product datasets."""
dataset = deeplake.create(path)
# Product information
dataset.add_column("product_id", deeplake.types.Text())
dataset.add_column("title", deeplake.types.Text(
index_type=deeplake.types.TextIndex(deeplake.types.Inverted)
))
dataset.add_column("description", deeplake.types.Text(
index_type=deeplake.types.TextIndex(deeplake.types.BM25)
))
# Visual content
dataset.add_column("product_images", deeplake.types.Sequence(deeplake.types.Image()))
dataset.add_column("image_embeddings", deeplake.types.Sequence(
deeplake.types.Embedding(size=512,
index_type=deeplake.types.EmbeddingIndex(deeplake.types.Clustered))
))
# Categorical data
dataset.add_column("category", deeplake.types.Text())
dataset.add_column("subcategory", deeplake.types.Text())
dataset.add_column("brand", deeplake.types.Text())
# Numerical attributes
dataset.add_column("price", deeplake.types.Float32())
dataset.add_column("rating", deeplake.types.Float32())
dataset.add_column("review_count", deeplake.types.Int32())
# Rich attributes
dataset.add_column("attributes", deeplake.types.Dict()) # Color, size, material, etc.
dataset.add_column("tags", deeplake.types.Sequence(deeplake.types.Text()))
return dataset
# Genomics data schema
def create_genomics_schema(path):
"""Schema for genomics datasets."""
dataset = deeplake.create(path)
# Sample identification
dataset.add_column("sample_id", deeplake.types.Text())
dataset.add_column("patient_id", deeplake.types.Text())
dataset.add_column("tissue_type", deeplake.types.Text())
# Sequence data
dataset.add_column("sequence", deeplake.types.Text())
dataset.add_column("quality_scores", deeplake.types.Sequence(deeplake.types.Int8()))
# Genomic coordinates
dataset.add_column("chromosome", deeplake.types.Text())
dataset.add_column("start_position", deeplake.types.Int64())
dataset.add_column("end_position", deeplake.types.Int64())
# Variant information
dataset.add_column("variants", deeplake.types.Sequence(deeplake.types.Dict()))
dataset.add_column("annotations", deeplake.types.Dict())
# Expression data
dataset.add_column("expression_values", deeplake.types.Array(
deeplake.types.Float32(), shape=[20000] # ~20k genes
))
# Embeddings for ML
dataset.add_column("sequence_embeddings", deeplake.types.Embedding(size=256))
return dataset
# Time series schema
def create_timeseries_schema(path, num_features=10):
"""Schema for time series datasets."""
dataset = deeplake.create(path)
# Time series identification
dataset.add_column("series_id", deeplake.types.Text())
dataset.add_column("start_time", deeplake.types.Int64()) # Unix timestamp
dataset.add_column("end_time", deeplake.types.Int64())
dataset.add_column("frequency", deeplake.types.Text()) # 'daily', 'hourly', etc.
# Time series data
dataset.add_column("timestamps", deeplake.types.Sequence(deeplake.types.Int64()))
dataset.add_column("values", deeplake.types.Sequence(
deeplake.types.Array(deeplake.types.Float32(), shape=[num_features])
))
# Metadata
dataset.add_column("source", deeplake.types.Text())
dataset.add_column("tags", deeplake.types.Sequence(deeplake.types.Text()))
dataset.add_column("metadata", deeplake.types.Dict())
# Derived features
dataset.add_column("statistical_features", deeplake.types.Array(
deeplake.types.Float32(), shape=[50] # Pre-computed stats
))
dataset.add_column("embeddings", deeplake.types.Embedding(size=128))
return dataset
# Use domain-specific schemas
ecommerce_dataset = create_ecommerce_schema("./ecommerce_products")
genomics_dataset = create_genomics_schema("./genomics_samples")
timeseries_dataset = create_timeseries_schema("./time_series_data", num_features=15)
print("Created domain-specific datasets:")
print(f"E-commerce columns: {len(ecommerce_dataset.schema.columns)}")
print(f"Genomics columns: {len(genomics_dataset.schema.columns)}")
print(f"Time series columns: {len(timeseries_dataset.schema.columns)}")# Best practices for creating reusable schema templates
class FlexibleImageDatasetSchema:
"""Flexible schema template for image datasets."""
def __init__(self,
include_embeddings=True,
embedding_size=512,
include_annotations=True,
include_metadata=True,
enable_text_search=False,
enable_similarity_search=True):
self.include_embeddings = include_embeddings
self.embedding_size = embedding_size
self.include_annotations = include_annotations
self.include_metadata = include_metadata
self.enable_text_search = enable_text_search
self.enable_similarity_search = enable_similarity_search
def create_dataset(self, path):
"""Create dataset with flexible schema."""
dataset = deeplake.create(path)
# Core image columns (always present)
dataset.add_column("image_id", deeplake.types.Text())
dataset.add_column("image", deeplake.types.Image())
dataset.add_column("width", deeplake.types.Int32())
dataset.add_column("height", deeplake.types.Int32())
# Optional embeddings
if self.include_embeddings:
index_type = None
if self.enable_similarity_search:
index_type = deeplake.types.EmbeddingIndex(deeplake.types.Clustered)
dataset.add_column("embeddings",
deeplake.types.Embedding(size=self.embedding_size,
index_type=index_type))
# Optional annotations
if self.include_annotations:
dataset.add_column("labels", deeplake.types.Sequence(deeplake.types.Text()))
dataset.add_column("bboxes", deeplake.types.Sequence(deeplake.types.BoundingBox()))
dataset.add_column("masks", deeplake.types.Sequence(deeplake.types.SegmentMask()))
# Optional text fields with search
if self.enable_text_search:
dataset.add_column("caption",
deeplake.types.Text(
index_type=deeplake.types.TextIndex(deeplake.types.BM25)
))
dataset.add_column("description",
deeplake.types.Text(
index_type=deeplake.types.TextIndex(deeplake.types.Inverted)
))
# Optional metadata
if self.include_metadata:
dataset.add_column("metadata", deeplake.types.Dict())
dataset.add_column("source", deeplake.types.Text())
dataset.add_column("created_at", deeplake.types.Int64())
return dataset
# Usage examples of flexible schema
# Minimal image dataset
minimal_schema = FlexibleImageDatasetSchema(
include_embeddings=False,
include_annotations=False,
include_metadata=False
)
minimal_dataset = minimal_schema.create_dataset("./minimal_images")
# Full-featured image dataset
full_schema = FlexibleImageDatasetSchema(
include_embeddings=True,
embedding_size=768,
include_annotations=True,
include_metadata=True,
enable_text_search=True,
enable_similarity_search=True
)
full_dataset = full_schema.create_dataset("./full_featured_images")
print(f"Minimal schema columns: {len(minimal_dataset.schema.columns)}")
print(f"Full schema columns: {len(full_dataset.schema.columns)}")
# Demonstrate schema validation
def validate_schema_compatibility(dataset1, dataset2):
"""Check if two datasets have compatible schemas."""
schema1_cols = {col.name: type(col.dtype) for col in dataset1.schema.columns}
schema2_cols = {col.name: type(col.dtype) for col in dataset2.schema.columns}
common_cols = set(schema1_cols.keys()) & set(schema2_cols.keys())
compatible_cols = [
col for col in common_cols
if schema1_cols[col] == schema2_cols[col]
]
return {
"compatible": len(compatible_cols) == len(common_cols),
"common_columns": list(common_cols),
"compatible_columns": compatible_cols,
"schema1_only": list(set(schema1_cols.keys()) - set(schema2_cols.keys())),
"schema2_only": list(set(schema2_cols.keys()) - set(schema1_cols.keys()))
}
# Test schema compatibility
compatibility = validate_schema_compatibility(minimal_dataset, full_dataset)
print(f"Schema compatibility: {compatibility}")Install with Tessl CLI
npx tessl i tessl/pypi-deeplakedocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10