The portable Python dataframe library that provides a unified API for data analysis across 20+ different backends
—
Comprehensive UDF system supporting scalar, aggregate, and analytic functions with type safety and backend compatibility.
Create custom scalar functions that operate on individual values.
@ibis.udf.scalar(signature, **kwargs)
def function_name(arg1, arg2, ...):
"""
Decorator for scalar UDFs.
Parameters:
- signature: function signature with input and output types
- **kwargs: additional UDF configuration
Returns:
Decorated function that can be used in expressions
"""Usage Examples:
import ibis
from ibis import udf
# Simple scalar UDF
@udf.scalar((int, int) -> int)
def add_one(x):
return x + 1
# Use in expressions
result = table.select(
'value',
incremented=add_one(table.value)
)
# UDF with multiple parameters
@udf.scalar((str, str) -> str)
def concat_with_separator(a, b):
return f"{a}|{b}"
result = table.select(
combined=concat_with_separator(table.first_name, table.last_name)
)@udf.scalar(str -> str)
def reverse_string(s):
return s[::-1] if s else None
# Mathematical UDF
@udf.scalar(float -> float)
def sigmoid(x):
import math
return 1 / (1 + math.exp(-x))
result = table.select(
'text',
reversed=reverse_string(table.text),
probability=sigmoid(table.score)
)Create custom aggregation functions.
@ibis.udf.aggregate(signature, **kwargs)
def aggregate_function_name():
"""
Decorator for aggregate UDFs.
Parameters:
- signature: function signature with input and output types
- **kwargs: additional UDF configuration
Returns:
Decorated aggregation function
"""Usage Examples:
# Custom aggregation
@udf.aggregate([int] -> float)
def geometric_mean(values):
import math
if not values:
return None
product = 1
for v in values:
if v <= 0:
return None
product *= v
return math.pow(product, 1/len(values))
# Use in aggregations
result = table.group_by('category').aggregate(
count=table.count(),
avg=table.value.mean(),
geom_mean=geometric_mean(table.value)
)
# Custom median implementation
@udf.aggregate([float] -> float)
def custom_median(values):
if not values:
return None
sorted_vals = sorted(values)
n = len(sorted_vals)
if n % 2 == 0:
return (sorted_vals[n//2 - 1] + sorted_vals[n//2]) / 2
else:
return sorted_vals[n//2]Apply functions element-wise to array or column values.
@ibis.udf.elementwise(signature, **kwargs)
def elementwise_function():
"""
Decorator for elementwise UDFs.
Parameters:
- signature: function signature
- **kwargs: additional configuration
Returns:
Decorated elementwise function
"""Usage Examples:
# Array transformation
@udf.elementwise([int] -> [int])
def double_array_elements(arr):
return [x * 2 for x in arr] if arr else None
result = table.select(
'numbers',
doubled=double_array_elements(table.numbers)
)
# String array processing
@udf.elementwise([str] -> [str])
def uppercase_strings(strings):
return [s.upper() for s in strings] if strings else NoneCreate custom reduction operations.
@ibis.udf.reduction(signature, **kwargs)
def reduction_function():
"""
Decorator for reduction UDFs.
Parameters:
- signature: function signature
- **kwargs: additional configuration
Returns:
Decorated reduction function
"""Usage Examples:
# Custom reduction
@udf.reduction([float] -> dict)
def statistics_summary(values):
if not values:
return None
import statistics
return {
'mean': statistics.mean(values),
'median': statistics.median(values),
'mode': statistics.mode(values) if len(set(values)) < len(values) else None,
'std': statistics.stdev(values) if len(values) > 1 else 0
}
result = table.aggregate(
stats=statistics_summary(table.scores)
)Create custom analytic/window functions.
@ibis.udf.analytic(signature, **kwargs)
def analytic_function():
"""
Decorator for analytic UDFs.
Parameters:
- signature: function signature
- **kwargs: additional configuration
Returns:
Decorated analytic function
"""Usage Examples:
# Custom ranking function
@udf.analytic([float] -> [int])
def custom_rank(values):
"""Custom ranking with specific tie-breaking logic."""
sorted_pairs = sorted(enumerate(values), key=lambda x: (-x[1], x[0]))
ranks = [0] * len(values)
for rank, (orig_idx, _) in enumerate(sorted_pairs, 1):
ranks[orig_idx] = rank
return ranks
# Use with window
result = table.select(
'name', 'score',
custom_ranking=custom_rank(table.score).over(
group_by='category',
order_by='score'
)
)Specify precise types for UDF inputs and outputs.
# Type specification examples
str -> str # String input, string output
(int, float) -> bool # Two inputs, boolean output
[int] -> float # Array input, scalar output
{str: int} -> [str] # Map input, array output
Optional[str] -> str # Nullable inputUsage Examples:
# Complex type signatures
@udf.scalar((Optional[str], int) -> Optional[str])
def truncate_string(s, max_len):
if s is None:
return None
return s[:max_len] if len(s) > max_len else s
# Array operations
@udf.scalar([float] -> float)
def array_variance(arr):
if not arr or len(arr) < 2:
return None
mean = sum(arr) / len(arr)
return sum((x - mean) ** 2 for x in arr) / (len(arr) - 1)
# Map operations
@udf.scalar({str: int} -> int)
def map_sum(mapping):
return sum(mapping.values()) if mapping else 0Create UDFs optimized for specific backends.
Usage Examples:
# Backend-specific implementation
@udf.scalar(str -> str, backend='postgres')
def postgres_upper(s):
# PostgreSQL-specific implementation
return s.upper() if s else None
@udf.scalar(str -> str, backend='bigquery')
def bigquery_upper(s):
# BigQuery-specific implementation
return s.upper() if s else None
# SQL-based UDF (for SQL backends)
@udf.scalar(int -> int, sql="CASE WHEN {0} > 0 THEN {0} ELSE 0 END")
def positive_only(x):
pass # Implementation provided via SQL
result = table.select(
positive_value=positive_only(table.value)
)Use external libraries in UDFs.
Usage Examples:
# UDF with external dependencies
@udf.scalar(str -> float)
def sentiment_score(text):
# This would require textblob to be available
try:
from textblob import TextBlob
if not text:
return 0.0
blob = TextBlob(text)
return blob.sentiment.polarity
except ImportError:
return 0.0 # Fallback if library not available
# JSON processing UDF
@udf.scalar(str -> dict)
def parse_json_safe(json_str):
import json
try:
return json.loads(json_str) if json_str else {}
except json.JSONDecodeError:
return {}
result = table.select(
'review_text',
sentiment=sentiment_score(table.review_text),
metadata=parse_json_safe(table.metadata_json)
)Handle errors gracefully in UDFs.
Usage Examples:
# Safe division UDF
@udf.scalar((float, float) -> Optional[float])
def safe_divide(a, b):
try:
return a / b if b != 0 else None
except (TypeError, ZeroDivisionError):
return None
# String processing with error handling
@udf.scalar(str -> Optional[str])
def extract_domain(email):
try:
if email and '@' in email:
return email.split('@')[1].lower()
return None
except (AttributeError, IndexError):
return None
result = table.select(
'email',
domain=extract_domain(table.email),
ratio=safe_divide(table.numerator, table.denominator)
)Tips for efficient UDF implementation.
Usage Examples:
# Vectorized operations when possible
@udf.scalar([float] -> float)
def efficient_mean(values):
# Use numpy for better performance if available
try:
import numpy as np
return float(np.mean(values)) if values else None
except ImportError:
return sum(values) / len(values) if values else None
# Caching expensive computations
@udf.scalar(str -> str)
def expensive_transformation(text):
# Cache results for repeated calls
if not hasattr(expensive_transformation, 'cache'):
expensive_transformation.cache = {}
if text in expensive_transformation.cache:
return expensive_transformation.cache[text]
# Expensive computation here
result = text.upper() # Simplified example
expensive_transformation.cache[text] = result
return resultInstall with Tessl CLI
npx tessl i tessl/pypi-ibis-framework