Distributed Dataframes for Multimodal Data with high-performance query engine and support for complex nested data structures, AI/ML operations, and seamless cloud storage integration.
—
Custom Python functions that integrate seamlessly with Daft's distributed DataFrame operations. UDFs support three execution modes: row-wise (1-to-1), async row-wise, and generator (1-to-many) with automatic type inference and optimization.
Modern decorator interface for creating user-defined functions.
# The func decorator is an alias for _DaftFuncDecorator
func = _DaftFuncDecorator
@func
def custom_function(input_arg: InputType) -> OutputType:
"""Row-wise function processing one row at a time."""
@func
async def async_function(input_arg: InputType) -> OutputType:
"""Async row-wise function for I/O-bound operations."""
@func
def generator_function(input_arg: InputType) -> Iterator[OutputType]:
"""Generator function producing multiple outputs per input."""
class _DaftFuncDecorator:
"""
Decorator to convert Python functions into Daft user-defined functions.
Supports three function variants:
- Row-wise (1 row in, 1 row out) - default for regular functions
- Async row-wise (1 row in, 1 row out) - for async functions
- Generator (1 row in, N rows out) - for generator functions
When decorated functions are called with Expressions, they return Expressions.
When called with regular Python values, they execute immediately.
"""
def __new__(
cls,
fn: Optional[Callable] = None,
*,
return_dtype: Optional[DataTypeLike] = None
) -> Union[RowWiseUdf, GeneratorUdf, _PartialUdf]:
"""
Create UDF decorator.
Parameters:
- fn: Function to decorate (None for parameterized decorator)
- return_dtype: Explicit return data type (inferred if None)
Returns:
UDF instance or partial decorator for chaining
"""Process one row at a time with 1-to-1 mapping.
class RowWiseUdf:
"""User-defined function processing individual rows."""
def __init__(
self,
func: Callable,
return_dtype: Optional[DataTypeLike] = None
):
"""
Create row-wise UDF.
Parameters:
- func: Python function to wrap
- return_dtype: Return data type (inferred if None)
"""
def __call__(self, *args: Expression) -> Expression:
"""
Apply UDF to expressions.
Parameters:
- args: Column expressions as function arguments
Returns:
Expression: UDF expression for DataFrame operations
"""Process one input to produce multiple outputs.
class GeneratorUdf:
"""User-defined function that generates multiple rows from one input."""
def __init__(
self,
func: Callable,
return_dtype: Optional[DataTypeLike] = None
):
"""
Create generator UDF.
Parameters:
- func: Python generator function to wrap
- return_dtype: Return data type (inferred if None)
"""
def __call__(self, *args: Expression) -> Expression:
"""
Apply generator UDF to expressions.
Parameters:
- args: Column expressions as function arguments
Returns:
Expression: Generator UDF expression
"""Backward compatibility interface for existing UDFs.
def udf(
func: Callable,
return_dtype: Optional[DataType] = None
) -> UDF:
"""
Legacy UDF decorator.
Parameters:
- func: Function to convert to UDF
- return_dtype: Return data type
Returns:
UDF: Legacy UDF instance
"""
class UDF:
"""Legacy user-defined function class."""
def __call__(self, *args: Expression) -> Expression:
"""Apply UDF to expressions."""import daft
from daft import col
@daft.func
def double_value(x: int) -> int:
"""Double the input value."""
return x * 2
@daft.func
def format_name(first: str, last: str) -> str:
"""Format full name."""
return f"{first} {last}"
# Use in DataFrame operations
df = daft.from_pydict({
"first_name": ["Alice", "Bob"],
"last_name": ["Smith", "Jones"],
"score": [85, 92]
})
result = df.select(
format_name(col("first_name"), col("last_name")).alias("full_name"),
double_value(col("score")).alias("double_score")
).collect()import asyncio
import aiohttp
@daft.func
async def fetch_data(url: str) -> str:
"""Fetch data from URL asynchronously."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
# Use with URLs in DataFrame
urls_df = daft.from_pydict({
"url": ["https://api.example.com/1", "https://api.example.com/2"]
})
results = urls_df.select(
col("url"),
fetch_data(col("url")).alias("response")
).collect()from typing import Iterator
@daft.func
def tokenize(text: str) -> Iterator[str]:
"""Split text into individual tokens."""
for word in text.split():
yield word
@daft.func
def expand_range(n: int) -> Iterator[int]:
"""Generate range of numbers."""
for i in range(n):
yield i
# Use generator UDFs
text_df = daft.from_pydict({
"sentence": ["hello world", "daft is fast"],
"count": [3, 2]
})
# Tokenize sentences (explodes rows)
tokens = text_df.select(
tokenize(col("sentence")).alias("token")
).collect()
# Generate number ranges
ranges = text_df.select(
col("count"),
expand_range(col("count")).alias("number")
).collect()@daft.func(return_dtype=daft.DataType.float32())
def calculate_ratio(numerator: int, denominator: int) -> float:
"""Calculate ratio with specific return type."""
if denominator == 0:
return 0.0
return float(numerator) / float(denominator)
# Use with type specification
df = daft.from_pydict({
"num": [10, 20, 30],
"den": [2, 4, 0]
})
result = df.select(
calculate_ratio(col("num"), col("den")).alias("ratio")
).collect()from typing import Dict, List, Any
import json
@daft.func
def extract_features(data: str) -> Dict[str, Any]:
"""Extract features from JSON string."""
try:
parsed = json.loads(data)
return {
"feature_count": len(parsed.get("features", [])),
"has_metadata": "metadata" in parsed,
"total_size": sum(len(str(v)) for v in parsed.values())
}
except:
return {"feature_count": 0, "has_metadata": False, "total_size": 0}
@daft.func
def process_list(items: List[str]) -> str:
"""Process list of items."""
return ", ".join(sorted(items))
# Use with complex types
json_df = daft.from_pydict({
"json_data": ['{"features": ["a", "b"], "metadata": {}}', '{"other": "value"}'],
"tags": [["python", "data"], ["machine", "learning"]]
})
processed = json_df.select(
extract_features(col("json_data")).alias("features"),
process_list(col("tags")).alias("tag_string")
).collect()# Use UDF decorator on existing function
def existing_function(x: float) -> float:
return x ** 2 + 1
# Create UDF from existing function
square_plus_one = daft.func(existing_function)
# Apply to DataFrame
df = daft.from_pydict({"values": [1.0, 2.0, 3.0, 4.0]})
result = df.select(
square_plus_one(col("values")).alias("transformed")
).collect()@daft.func
def safe_divide(a: float, b: float) -> float:
"""Safely divide two numbers."""
try:
if b == 0:
return float('inf')
return a / b
except Exception:
return float('nan')
@daft.func
def validate_email(email: str) -> bool:
"""Validate email format."""
try:
return "@" in email and "." in email.split("@")[1]
except:
return False
# Use with error handling
data_df = daft.from_pydict({
"numerator": [10.0, 20.0, 30.0],
"denominator": [2.0, 0.0, 5.0],
"email": ["user@domain.com", "invalid", "test@example.org"]
})
safe_result = data_df.select(
safe_divide(col("numerator"), col("denominator")).alias("safe_ratio"),
validate_email(col("email")).alias("valid_email")
).collect()# Vectorized operations when possible
@daft.func
def batch_process(values: List[float]) -> List[float]:
"""Process batch of values efficiently."""
import numpy as np
arr = np.array(values)
return (arr * 2 + 1).tolist()
# Use with grouped data for better performance
grouped_df = df.groupby("category").agg(
col("value").list().alias("value_list")
)
processed = grouped_df.select(
col("category"),
batch_process(col("value_list")).alias("processed_values")
).collect()UDFs work seamlessly with all DataFrame operations:
# Chaining UDFs with other operations
result = (df
.filter(validate_email(col("email")))
.select(
col("name"),
format_name(col("first"), col("last")).alias("full_name"),
double_value(col("score")).alias("bonus_score")
)
.groupby("department")
.agg(col("bonus_score").mean().alias("avg_bonus"))
.collect()
)
# Using UDFs in filters and conditions
filtered = df.filter(
(col("age") > 18) & validate_email(col("email"))
).collect()DataTypeLike = Union[DataType, str, type]UDFs automatically infer return types from function annotations, but explicit types can be specified for better control and performance optimization.
Install with Tessl CLI
npx tessl i tessl/pypi-daft