CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-daft

Distributed Dataframes for Multimodal Data with high-performance query engine and support for complex nested data structures, AI/ML operations, and seamless cloud storage integration.

Pending
Overview
Eval results
Files

ai-ml.mddocs/

AI/ML Functions

Built-in functions for AI and machine learning workflows including text embeddings, large language model generation, and model inference operations optimized for distributed processing.

Capabilities

Text Embeddings

Generate vector embeddings from text data for semantic search and similarity operations.

def embed_text(
    text: Expression,
    *,
    provider: str | Provider | None = None,
    model: str | None = None,
    **options: str,
) -> Expression:
    """
    Generate text embeddings using specified model and provider.
    
    Parameters:
    - text: Text expression to embed
    - provider: Provider to use for embedding model (e.g., "sentence_transformers", "openai")
    - model: Model name/identifier for embedding generation  
    - options: Additional options to pass for the model
    
    Returns:
    Expression: Vector embedding expression
    
    Examples:
    >>> df.select(embed_text(col("description"), model="all-MiniLM-L6-v2"))
    >>> df.select(embed_text(col("query"), provider="openai", model="text-embedding-ada-002"))
    """

Image Embeddings

Generate vector embeddings from image data for visual similarity and search operations.

def embed_image(
    image: Expression,
    *,
    provider: str | Provider | None = None,
    model: str | None = None,
    **options: str,
) -> Expression:
    """
    Generate image embeddings using specified model and provider.
    
    Parameters:
    - image: Image expression to embed
    - provider: Provider to use for embedding model (e.g., "transformers", "openai")
    - model: Model name/identifier for image embedding generation
    - options: Additional options to pass for the model
    
    Returns:
    Expression: Vector embedding expression
    
    Examples:
    >>> df.select(embed_image(col("image"), provider="transformers", model="clip-vit-base-patch32"))
    >>> df.select(embed_image(col("photo"), provider="openai", model="clip"))
    """

Large Language Model Generation

Generate text using large language models for various NLP tasks.

def llm_generate(
    input_column: Expression,
    model: str = "facebook/opt-125m",
    provider: Literal["vllm", "openai"] = "vllm",
    concurrency: int = 1,
    batch_size: int | None = None,
    num_cpus: int | None = None,
    num_gpus: int | None = None,
    **generation_config: dict[str, Any],
) -> Expression:
    """
    Generate text using large language model with specified provider.
    
    Parameters:
    - input_column: Input text expression for generation
    - model: Model identifier (default: "facebook/opt-125m")
    - provider: LLM provider ("vllm" or "openai")
    - concurrency: Number of concurrent model instances
    - batch_size: Batch size for processing
    - num_cpus: CPU resources to allocate
    - num_gpus: GPU resources to allocate
    - generation_config: Model parameters (temperature, max_tokens, etc.)
    
    Returns:
    Expression: Generated text expression
    
    Examples:
    >>> df.select(llm_generate(col("question"), model="gpt-4o", provider="openai"))
    >>> df.select(llm_generate(col("prompt"), model="facebook/opt-125m", provider="vllm", temperature=0.7))
    """

PyTorch Integration

Convert DataFrames to PyTorch datasets for deep learning workflows.

class DataFrame:
    def to_torch_map_dataset(
        self,
        shard_strategy: Optional[Literal["file"]] = None,
        world_size: Optional[int] = None,
        rank: Optional[int] = None,
    ) -> "torch.utils.data.Dataset":
        """
        Convert DataFrame to PyTorch map-style dataset.
        
        Parameters:
        - shard_strategy: Strategy for distributed sharding
        - world_size: Total number of processes for distributed training
        - rank: Current process rank for distributed training
        
        Returns:
        torch.utils.data.Dataset: PyTorch map-style dataset
        """
    
    def to_torch_iter_dataset(
        self,
        shard_strategy: Optional[Literal["file"]] = None,
        world_size: Optional[int] = None,
        rank: Optional[int] = None,
    ) -> "torch.utils.data.IterableDataset":
        """
        Convert DataFrame to PyTorch iterable dataset.
        
        Parameters:
        - shard_strategy: Strategy for distributed sharding
        - world_size: Total number of processes for distributed training
        - rank: Current process rank for distributed training
        
        Returns:
        torch.utils.data.IterableDataset: PyTorch iterable dataset
        """

Device Management

Utility functions for optimal device selection in AI/ML workflows.

def get_device() -> "torch.device":
    """
    Get optimal PyTorch device (CUDA GPU if available, otherwise CPU).
    
    Returns:
    torch.device: Optimal device for computation
    """

Provider Management

Functions for managing AI/ML providers and models.

def load_openai(name: str | None = None, **options) -> Provider:
    """Load OpenAI provider with configuration."""

def load_sentence_transformers(name: str | None = None, **options) -> Provider:
    """Load Sentence Transformers provider with configuration."""

def load_transformers(name: str | None = None, **options) -> Provider:
    """Load Hugging Face Transformers provider with configuration."""

def load_lm_studio(name: str | None = None, **options) -> Provider:
    """Load LM Studio provider with configuration."""

def load_provider(provider: str, name: str | None = None, **options) -> Provider:
    """Load provider by name with configuration."""

Usage Examples

Image Similarity and Visual Search

import daft
from daft import col
from daft.functions import embed_image

# Dataset with image paths or URLs
images_df = daft.from_pydict({
    "image_id": [1, 2, 3, 4],
    "image_path": [
        "s3://bucket/images/product1.jpg",
        "s3://bucket/images/product2.jpg", 
        "s3://bucket/images/product3.jpg",
        "s3://bucket/images/query.jpg"
    ]
})

# Load and decode images
decoded_images = images_df.select(
    col("image_id"),
    col("image_path").url.download().image.decode().alias("image")
)

# Generate image embeddings for visual similarity
embedded_images = decoded_images.select(
    col("image_id"),
    col("image"),
    embed_image(col("image"), provider="transformers", model="clip-vit-base-patch32").alias("image_embedding")
)

# Compute visual similarity (using cosine distance)
query_embedding = embedded_images.filter(col("image_id") == 4).collect()[0]["image_embedding"][0]

similarities = embedded_images.select(
    col("image_id"),
    col("image_embedding").embedding.cosine_distance(lit(query_embedding)).alias("similarity")
).filter(col("image_id") != 4).sort("similarity").collect()

PyTorch Deep Learning Integration

import torch
from torch.utils.data import DataLoader

# Create DataFrame with image data and labels
training_df = daft.from_pydict({
    "image_path": ["train/cat1.jpg", "train/dog1.jpg", "train/cat2.jpg"],
    "label": [0, 1, 0]  # 0=cat, 1=dog
})

# Load and preprocess images
processed_df = training_df.select(
    col("image_path").url.download().image.decode().image.resize(224, 224).alias("image"),
    col("label")
)

# Convert to PyTorch dataset for training
torch_dataset = processed_df.to_torch_map_dataset()

# Create DataLoader for training
dataloader = DataLoader(torch_dataset, batch_size=32, shuffle=True)

# Use in PyTorch training loop
for batch in dataloader:
    images = batch["image"]
    labels = batch["label"]
    # Training code here...

Text Similarity and Semantic Search

import daft
from daft import col
from daft.functions import embed_text

# Create dataset with text descriptions
documents = daft.from_pydict({
    "doc_id": [1, 2, 3, 4],
    "title": ["Machine Learning", "Deep Learning", "Data Science", "Statistics"],
    "description": [
        "Algorithms that learn from data",
        "Neural networks with multiple layers", 
        "Extracting insights from data",
        "Mathematical analysis of data"
    ]
})

# Generate embeddings for semantic search
embedded_docs = documents.select(
    col("doc_id"),
    col("title"),
    col("description"),
    embed_text(col("description"), "sentence-transformers/all-MiniLM-L6-v2").alias("embedding")
)

# Create query embedding
query_df = daft.from_pydict({"query": ["artificial intelligence algorithms"]})
query_embedded = query_df.select(
    embed_text(col("query"), "sentence-transformers/all-MiniLM-L6-v2").alias("query_embedding")
)

# Compute similarity (cosine similarity would be implemented as UDF)
@daft.func
def cosine_similarity(vec1: list, vec2: list) -> float:
    import numpy as np
    v1, v2 = np.array(vec1), np.array(vec2)
    return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))

# Find most similar documents
similarities = embedded_docs.select(
    col("title"),
    cosine_similarity(col("embedding"), query_embedded.collect()[0]["query_embedding"][0]).alias("similarity")
).collect()

Content Generation and Augmentation

from daft.functions import llm_generate

# Dataset with partial content to augment
content_df = daft.from_pydict({
    "topic": ["Climate Change", "Space Exploration", "Renewable Energy"],
    "outline": [
        "Impact on global temperatures",
        "Mars colonization plans", 
        "Solar and wind power efficiency"
    ]
})

# Generate detailed content
augmented_content = content_df.select(
    col("topic"),
    col("outline"),
    llm_generate(
        daft.sql_expr("'Write a detailed paragraph about: ' || outline"),
        "gpt-3.5-turbo",
        max_tokens=200,
        temperature=0.7
    ).alias("detailed_content")
)

# Generate summaries
summaries = content_df.select(
    col("topic"),
    llm_generate(
        daft.sql_expr("'Summarize in one sentence: ' || outline"),
        "gpt-3.5-turbo",
        max_tokens=50
    ).alias("summary")
)

Question Answering System

# FAQ dataset
faq_df = daft.from_pydict({
    "category": ["Technical", "Billing", "Support", "General"],
    "question": [
        "How do I install the software?",
        "What payment methods are accepted?",
        "How to contact customer support?",
        "What is your company mission?"
    ],
    "context": [
        "Installation guide and system requirements",
        "Payment processing and billing information",
        "Support channels and contact information", 
        "Company background and mission statement"
    ]
})

# Generate comprehensive answers
qa_responses = faq_df.select(
    col("category"),
    col("question"),
    llm_generate(
        daft.sql_expr("'Context: ' || context || '\nQuestion: ' || question || '\nAnswer:'"),
        "gpt-4",
        max_tokens=150,
        temperature=0.3
    ).alias("answer")
)

Text Classification and Analysis

# Customer feedback dataset
feedback_df = daft.from_pydict({
    "customer_id": [1, 2, 3, 4, 5],
    "feedback": [
        "Great product, very satisfied!",
        "Terrible service, not recommended",
        "Average experience, could be better",
        "Excellent customer support team",
        "Product quality is disappointing"
    ]
})

# Classify sentiment
sentiment_analysis = feedback_df.select(
    col("customer_id"),
    col("feedback"),
    llm_generate(
        daft.sql_expr("'Classify the sentiment of this feedback as Positive, Negative, or Neutral: ' || feedback"),
        "gpt-3.5-turbo",
        max_tokens=10,
        temperature=0.1
    ).alias("sentiment"),
    embed_text(col("feedback"), "sentence-transformers/all-MiniLM-L6-v2").alias("feedback_embedding")
)

# Extract key themes
theme_extraction = feedback_df.select(
    col("customer_id"),
    col("feedback"),
    llm_generate(
        daft.sql_expr("'Extract 3 key themes from this feedback: ' || feedback"),
        "gpt-3.5-turbo",
        max_tokens=50
    ).alias("themes")
)

Multi-modal Content Processing

# Dataset with mixed content types
content_df = daft.from_pydict({
    "content_id": [1, 2, 3],
    "content_type": ["article", "social_post", "review"],
    "text": [
        "Long-form article about technology trends",
        "Short social media post #trending",
        "Detailed product review with pros and cons"
    ]
})

# Generate type-specific processing
processed_content = content_df.select(
    col("content_id"),
    col("content_type"),
    col("text"),
    # Conditional processing based on content type
    daft.when(col("content_type") == "article")
    .then(llm_generate(
        daft.sql_expr("'Create a compelling headline for this article: ' || text"),
        "gpt-3.5-turbo"
    ))
    .when(col("content_type") == "social_post") 
    .then(llm_generate(
        daft.sql_expr("'Suggest 3 relevant hashtags for: ' || text"),
        "gpt-3.5-turbo"
    ))
    .otherwise(llm_generate(
        daft.sql_expr("'Rate this review from 1-5 stars and explain: ' || text"),
        "gpt-3.5-turbo"
    ))
    .alias("processed_output")
)

Batch Processing with AI Functions

# Large dataset for batch AI processing
large_dataset = daft.read_parquet("s3://bucket/customer-reviews/*.parquet")

# Efficient batch processing
batch_ai_results = (large_dataset
    .filter(col("review_text").is_not_null())
    .select(
        col("review_id"),
        col("product_id"),
        col("review_text"),
        # Generate embeddings for similarity search
        embed_text(col("review_text"), "all-MiniLM-L6-v2").alias("review_embedding"),
        # Extract sentiment
        llm_generate(
            daft.sql_expr("'Sentiment (Positive/Negative/Neutral):' || review_text"),
            "gpt-3.5-turbo",
            max_tokens=10
        ).alias("sentiment"),
        # Generate summary for long reviews
        daft.when(daft.col("review_text").str_length() > 500)
        .then(llm_generate(
            daft.sql_expr("'Summarize this review in 2 sentences: ' || review_text"),
            "gpt-3.5-turbo",
            max_tokens=100
        ))
        .otherwise(col("review_text"))
        .alias("summary")
    )
    .repartition(10)  # Distribute processing
    .collect()
)

Custom Model Integration

# Using custom or specialized models
specialized_df = daft.from_pydict({
    "medical_text": [
        "Patient presents with fever and cough",
        "Symptoms include headache and fatigue",
        "No significant medical history reported"
    ]
})

# Medical text analysis with domain-specific model
medical_analysis = specialized_df.select(
    col("medical_text"),
    embed_text(col("medical_text"), "clinical-bert-base").alias("clinical_embedding"),
    llm_generate(
        daft.sql_expr("'Extract medical entities from: ' || medical_text"),
        "biomedical-llm",
        temperature=0.1
    ).alias("medical_entities")
)

Performance Optimization

# Optimize AI function calls for large datasets
optimized_processing = (large_dataset
    .filter(col("text_length") > 10)  # Pre-filter short texts
    .repartition(20)  # Distribute load
    .select(
        col("id"),
        # Batch similar operations together
        embed_text(col("title") + " " + col("content"), "sentence-transformers/all-MiniLM-L6-v2").alias("combined_embedding")
    )
    .cache()  # Cache results for reuse
)

# Use cached embeddings for multiple downstream tasks
similarity_results = optimized_processing.select(
    col("id"),
    col("combined_embedding")
    # Additional similarity computations would use cached embeddings
)

AI Provider Configuration

class Provider:
    """AI model provider interface for custom model integration."""
    
    def configure(self, **kwargs) -> None:
        """Configure provider with authentication and settings."""

class Embedding:
    """Embedding type for vector representations."""
    
    def __init__(self, values: List[float]): ...
    
    def similarity(self, other: "Embedding") -> float:
        """Compute similarity with another embedding."""

Integration Patterns

Combining AI Functions with DataFrames

# Chain AI operations with standard DataFrame operations
ai_pipeline = (df
    .filter(col("text").is_not_null())
    .select(
        col("id"),
        embed_text(col("text"), "all-MiniLM-L6-v2").alias("embedding")
    )
    .groupby("category")
    .agg(
        col("embedding").list().alias("category_embeddings"),
        col("id").count().alias("doc_count")
    )
    .filter(col("doc_count") > 5)
    .collect()
)

Error Handling and Fallbacks

# Robust AI processing with fallbacks
@daft.func
def safe_llm_generate(text: str, model: str) -> str:
    try:
        # This would integrate with actual LLM API
        return f"Generated content for: {text[:50]}..."
    except Exception as e:
        return "Generation failed"

robust_generation = df.select(
    col("input_text"),
    safe_llm_generate(col("input_text"), "gpt-3.5-turbo").alias("generated")
)

AI/ML functions in Daft are designed for scalable, distributed processing of AI workloads with built-in optimizations for batch processing and resource management.

Install with Tessl CLI

npx tessl i tessl/pypi-daft

docs

ai-ml.md

catalog.md

data-io.md

dataframe-operations.md

expressions.md

index.md

session.md

sql.md

udf.md

tile.json