CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-daft

Distributed Dataframes for Multimodal Data with high-performance query engine and support for complex nested data structures, AI/ML operations, and seamless cloud storage integration.

Pending
Overview
Eval results
Files

udf.mddocs/

User-Defined Functions

Custom Python functions that integrate seamlessly with Daft's distributed DataFrame operations. UDFs support three execution modes: row-wise (1-to-1), async row-wise, and generator (1-to-many) with automatic type inference and optimization.

Capabilities

Function Decorator

Modern decorator interface for creating user-defined functions.

# The func decorator is an alias for _DaftFuncDecorator
func = _DaftFuncDecorator

@func
def custom_function(input_arg: InputType) -> OutputType:
    """Row-wise function processing one row at a time."""

@func  
async def async_function(input_arg: InputType) -> OutputType:
    """Async row-wise function for I/O-bound operations."""

@func
def generator_function(input_arg: InputType) -> Iterator[OutputType]:
    """Generator function producing multiple outputs per input."""

class _DaftFuncDecorator:
    """
    Decorator to convert Python functions into Daft user-defined functions.
    
    Supports three function variants:
    - Row-wise (1 row in, 1 row out) - default for regular functions
    - Async row-wise (1 row in, 1 row out) - for async functions  
    - Generator (1 row in, N rows out) - for generator functions
    
    When decorated functions are called with Expressions, they return Expressions.
    When called with regular Python values, they execute immediately.
    """
    
    def __new__(
        cls, 
        fn: Optional[Callable] = None, 
        *, 
        return_dtype: Optional[DataTypeLike] = None
    ) -> Union[RowWiseUdf, GeneratorUdf, _PartialUdf]:
        """
        Create UDF decorator.
        
        Parameters:
        - fn: Function to decorate (None for parameterized decorator)
        - return_dtype: Explicit return data type (inferred if None)
        
        Returns:
        UDF instance or partial decorator for chaining
        """

Row-wise UDFs

Process one row at a time with 1-to-1 mapping.

class RowWiseUdf:
    """User-defined function processing individual rows."""
    
    def __init__(
        self, 
        func: Callable, 
        return_dtype: Optional[DataTypeLike] = None
    ):
        """
        Create row-wise UDF.
        
        Parameters:
        - func: Python function to wrap
        - return_dtype: Return data type (inferred if None)
        """
    
    def __call__(self, *args: Expression) -> Expression:
        """
        Apply UDF to expressions.
        
        Parameters:
        - args: Column expressions as function arguments
        
        Returns:
        Expression: UDF expression for DataFrame operations
        """

Generator UDFs

Process one input to produce multiple outputs.

class GeneratorUdf:
    """User-defined function that generates multiple rows from one input."""
    
    def __init__(
        self, 
        func: Callable, 
        return_dtype: Optional[DataTypeLike] = None
    ):
        """
        Create generator UDF.
        
        Parameters:
        - func: Python generator function to wrap
        - return_dtype: Return data type (inferred if None)
        """
    
    def __call__(self, *args: Expression) -> Expression:
        """
        Apply generator UDF to expressions.
        
        Parameters:
        - args: Column expressions as function arguments
        
        Returns:
        Expression: Generator UDF expression
        """

Legacy UDF Interface

Backward compatibility interface for existing UDFs.

def udf(
    func: Callable,
    return_dtype: Optional[DataType] = None
) -> UDF:
    """
    Legacy UDF decorator.
    
    Parameters:
    - func: Function to convert to UDF
    - return_dtype: Return data type
    
    Returns:
    UDF: Legacy UDF instance
    """

class UDF:
    """Legacy user-defined function class."""
    
    def __call__(self, *args: Expression) -> Expression:
        """Apply UDF to expressions."""

Usage Examples

Basic Row-wise UDF

import daft
from daft import col

@daft.func
def double_value(x: int) -> int:
    """Double the input value."""
    return x * 2

@daft.func
def format_name(first: str, last: str) -> str:
    """Format full name."""
    return f"{first} {last}"

# Use in DataFrame operations
df = daft.from_pydict({
    "first_name": ["Alice", "Bob"],
    "last_name": ["Smith", "Jones"], 
    "score": [85, 92]
})

result = df.select(
    format_name(col("first_name"), col("last_name")).alias("full_name"),
    double_value(col("score")).alias("double_score")
).collect()

Async UDF for I/O Operations

import asyncio
import aiohttp

@daft.func
async def fetch_data(url: str) -> str:
    """Fetch data from URL asynchronously."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

# Use with URLs in DataFrame
urls_df = daft.from_pydict({
    "url": ["https://api.example.com/1", "https://api.example.com/2"]
})

results = urls_df.select(
    col("url"),
    fetch_data(col("url")).alias("response")
).collect()

Generator UDF for One-to-Many

from typing import Iterator

@daft.func
def tokenize(text: str) -> Iterator[str]:
    """Split text into individual tokens."""
    for word in text.split():
        yield word

@daft.func  
def expand_range(n: int) -> Iterator[int]:
    """Generate range of numbers."""
    for i in range(n):
        yield i

# Use generator UDFs
text_df = daft.from_pydict({
    "sentence": ["hello world", "daft is fast"],
    "count": [3, 2]
})

# Tokenize sentences (explodes rows)
tokens = text_df.select(
    tokenize(col("sentence")).alias("token")
).collect()

# Generate number ranges  
ranges = text_df.select(
    col("count"),
    expand_range(col("count")).alias("number")
).collect()

UDF with Explicit Return Type

@daft.func(return_dtype=daft.DataType.float32())
def calculate_ratio(numerator: int, denominator: int) -> float:
    """Calculate ratio with specific return type."""
    if denominator == 0:
        return 0.0
    return float(numerator) / float(denominator)

# Use with type specification
df = daft.from_pydict({
    "num": [10, 20, 30],
    "den": [2, 4, 0]
})

result = df.select(
    calculate_ratio(col("num"), col("den")).alias("ratio")
).collect()

Complex Data Processing UDF

from typing import Dict, List, Any
import json

@daft.func
def extract_features(data: str) -> Dict[str, Any]:
    """Extract features from JSON string."""
    try:
        parsed = json.loads(data)
        return {
            "feature_count": len(parsed.get("features", [])),
            "has_metadata": "metadata" in parsed,
            "total_size": sum(len(str(v)) for v in parsed.values())
        }
    except:
        return {"feature_count": 0, "has_metadata": False, "total_size": 0}

@daft.func
def process_list(items: List[str]) -> str:
    """Process list of items."""
    return ", ".join(sorted(items))

# Use with complex types
json_df = daft.from_pydict({
    "json_data": ['{"features": ["a", "b"], "metadata": {}}', '{"other": "value"}'],
    "tags": [["python", "data"], ["machine", "learning"]]
})

processed = json_df.select(
    extract_features(col("json_data")).alias("features"),
    process_list(col("tags")).alias("tag_string")
).collect()

Direct Function Application

# Use UDF decorator on existing function
def existing_function(x: float) -> float:
    return x ** 2 + 1

# Create UDF from existing function
square_plus_one = daft.func(existing_function)

# Apply to DataFrame
df = daft.from_pydict({"values": [1.0, 2.0, 3.0, 4.0]})
result = df.select(
    square_plus_one(col("values")).alias("transformed")
).collect()

Error Handling in UDFs

@daft.func
def safe_divide(a: float, b: float) -> float:
    """Safely divide two numbers."""
    try:
        if b == 0:
            return float('inf')
        return a / b
    except Exception:
        return float('nan')

@daft.func
def validate_email(email: str) -> bool:
    """Validate email format."""
    try:
        return "@" in email and "." in email.split("@")[1]
    except:
        return False

# Use with error handling
data_df = daft.from_pydict({
    "numerator": [10.0, 20.0, 30.0],
    "denominator": [2.0, 0.0, 5.0],
    "email": ["user@domain.com", "invalid", "test@example.org"]
})

safe_result = data_df.select(
    safe_divide(col("numerator"), col("denominator")).alias("safe_ratio"),
    validate_email(col("email")).alias("valid_email")
).collect()

Performance Considerations

# Vectorized operations when possible
@daft.func
def batch_process(values: List[float]) -> List[float]:
    """Process batch of values efficiently."""
    import numpy as np
    arr = np.array(values)
    return (arr * 2 + 1).tolist()

# Use with grouped data for better performance
grouped_df = df.groupby("category").agg(
    col("value").list().alias("value_list")
)

processed = grouped_df.select(
    col("category"),
    batch_process(col("value_list")).alias("processed_values")
).collect()

Integration with DataFrame Operations

UDFs work seamlessly with all DataFrame operations:

# Chaining UDFs with other operations
result = (df
    .filter(validate_email(col("email")))
    .select(
        col("name"),
        format_name(col("first"), col("last")).alias("full_name"),
        double_value(col("score")).alias("bonus_score")
    )
    .groupby("department")
    .agg(col("bonus_score").mean().alias("avg_bonus"))
    .collect()
)

# Using UDFs in filters and conditions
filtered = df.filter(
    (col("age") > 18) & validate_email(col("email"))
).collect()

Type System Integration

DataTypeLike = Union[DataType, str, type]

UDFs automatically infer return types from function annotations, but explicit types can be specified for better control and performance optimization.

Install with Tessl CLI

npx tessl i tessl/pypi-daft

docs

ai-ml.md

catalog.md

data-io.md

dataframe-operations.md

expressions.md

index.md

session.md

sql.md

udf.md

tile.json