0
# User-Defined Functions
1
2
Custom Python functions that integrate seamlessly with Daft's distributed DataFrame operations. UDFs support three execution modes: row-wise (1-to-1), async row-wise, and generator (1-to-many) with automatic type inference and optimization.
3
4
## Capabilities
5
6
### Function Decorator
7
8
Modern decorator interface for creating user-defined functions.
9
10
```python { .api }
11
# The func decorator is an alias for _DaftFuncDecorator
12
func = _DaftFuncDecorator
13
14
@func
15
def custom_function(input_arg: InputType) -> OutputType:
16
"""Row-wise function processing one row at a time."""
17
18
@func
19
async def async_function(input_arg: InputType) -> OutputType:
20
"""Async row-wise function for I/O-bound operations."""
21
22
@func
23
def generator_function(input_arg: InputType) -> Iterator[OutputType]:
24
"""Generator function producing multiple outputs per input."""
25
26
class _DaftFuncDecorator:
27
"""
28
Decorator to convert Python functions into Daft user-defined functions.
29
30
Supports three function variants:
31
- Row-wise (1 row in, 1 row out) - default for regular functions
32
- Async row-wise (1 row in, 1 row out) - for async functions
33
- Generator (1 row in, N rows out) - for generator functions
34
35
When decorated functions are called with Expressions, they return Expressions.
36
When called with regular Python values, they execute immediately.
37
"""
38
39
def __new__(
40
cls,
41
fn: Optional[Callable] = None,
42
*,
43
return_dtype: Optional[DataTypeLike] = None
44
) -> Union[RowWiseUdf, GeneratorUdf, _PartialUdf]:
45
"""
46
Create UDF decorator.
47
48
Parameters:
49
- fn: Function to decorate (None for parameterized decorator)
50
- return_dtype: Explicit return data type (inferred if None)
51
52
Returns:
53
UDF instance or partial decorator for chaining
54
"""
55
```
56
57
### Row-wise UDFs
58
59
Process one row at a time with 1-to-1 mapping.
60
61
```python { .api }
62
class RowWiseUdf:
63
"""User-defined function processing individual rows."""
64
65
def __init__(
66
self,
67
func: Callable,
68
return_dtype: Optional[DataTypeLike] = None
69
):
70
"""
71
Create row-wise UDF.
72
73
Parameters:
74
- func: Python function to wrap
75
- return_dtype: Return data type (inferred if None)
76
"""
77
78
def __call__(self, *args: Expression) -> Expression:
79
"""
80
Apply UDF to expressions.
81
82
Parameters:
83
- args: Column expressions as function arguments
84
85
Returns:
86
Expression: UDF expression for DataFrame operations
87
"""
88
```
89
90
### Generator UDFs
91
92
Process one input to produce multiple outputs.
93
94
```python { .api }
95
class GeneratorUdf:
96
"""User-defined function that generates multiple rows from one input."""
97
98
def __init__(
99
self,
100
func: Callable,
101
return_dtype: Optional[DataTypeLike] = None
102
):
103
"""
104
Create generator UDF.
105
106
Parameters:
107
- func: Python generator function to wrap
108
- return_dtype: Return data type (inferred if None)
109
"""
110
111
def __call__(self, *args: Expression) -> Expression:
112
"""
113
Apply generator UDF to expressions.
114
115
Parameters:
116
- args: Column expressions as function arguments
117
118
Returns:
119
Expression: Generator UDF expression
120
"""
121
```
122
123
### Legacy UDF Interface
124
125
Backward compatibility interface for existing UDFs.
126
127
```python { .api }
128
def udf(
129
func: Callable,
130
return_dtype: Optional[DataType] = None
131
) -> UDF:
132
"""
133
Legacy UDF decorator.
134
135
Parameters:
136
- func: Function to convert to UDF
137
- return_dtype: Return data type
138
139
Returns:
140
UDF: Legacy UDF instance
141
"""
142
143
class UDF:
144
"""Legacy user-defined function class."""
145
146
def __call__(self, *args: Expression) -> Expression:
147
"""Apply UDF to expressions."""
148
```
149
150
## Usage Examples
151
152
### Basic Row-wise UDF
153
```python
154
import daft
155
from daft import col
156
157
@daft.func
158
def double_value(x: int) -> int:
159
"""Double the input value."""
160
return x * 2
161
162
@daft.func
163
def format_name(first: str, last: str) -> str:
164
"""Format full name."""
165
return f"{first} {last}"
166
167
# Use in DataFrame operations
168
df = daft.from_pydict({
169
"first_name": ["Alice", "Bob"],
170
"last_name": ["Smith", "Jones"],
171
"score": [85, 92]
172
})
173
174
result = df.select(
175
format_name(col("first_name"), col("last_name")).alias("full_name"),
176
double_value(col("score")).alias("double_score")
177
).collect()
178
```
179
180
### Async UDF for I/O Operations
181
```python
182
import asyncio
183
import aiohttp
184
185
@daft.func
186
async def fetch_data(url: str) -> str:
187
"""Fetch data from URL asynchronously."""
188
async with aiohttp.ClientSession() as session:
189
async with session.get(url) as response:
190
return await response.text()
191
192
# Use with URLs in DataFrame
193
urls_df = daft.from_pydict({
194
"url": ["https://api.example.com/1", "https://api.example.com/2"]
195
})
196
197
results = urls_df.select(
198
col("url"),
199
fetch_data(col("url")).alias("response")
200
).collect()
201
```
202
203
### Generator UDF for One-to-Many
204
```python
205
from typing import Iterator
206
207
@daft.func
208
def tokenize(text: str) -> Iterator[str]:
209
"""Split text into individual tokens."""
210
for word in text.split():
211
yield word
212
213
@daft.func
214
def expand_range(n: int) -> Iterator[int]:
215
"""Generate range of numbers."""
216
for i in range(n):
217
yield i
218
219
# Use generator UDFs
220
text_df = daft.from_pydict({
221
"sentence": ["hello world", "daft is fast"],
222
"count": [3, 2]
223
})
224
225
# Tokenize sentences (explodes rows)
226
tokens = text_df.select(
227
tokenize(col("sentence")).alias("token")
228
).collect()
229
230
# Generate number ranges
231
ranges = text_df.select(
232
col("count"),
233
expand_range(col("count")).alias("number")
234
).collect()
235
```
236
237
### UDF with Explicit Return Type
238
```python
239
@daft.func(return_dtype=daft.DataType.float32())
240
def calculate_ratio(numerator: int, denominator: int) -> float:
241
"""Calculate ratio with specific return type."""
242
if denominator == 0:
243
return 0.0
244
return float(numerator) / float(denominator)
245
246
# Use with type specification
247
df = daft.from_pydict({
248
"num": [10, 20, 30],
249
"den": [2, 4, 0]
250
})
251
252
result = df.select(
253
calculate_ratio(col("num"), col("den")).alias("ratio")
254
).collect()
255
```
256
257
### Complex Data Processing UDF
258
```python
259
from typing import Dict, List, Any
260
import json
261
262
@daft.func
263
def extract_features(data: str) -> Dict[str, Any]:
264
"""Extract features from JSON string."""
265
try:
266
parsed = json.loads(data)
267
return {
268
"feature_count": len(parsed.get("features", [])),
269
"has_metadata": "metadata" in parsed,
270
"total_size": sum(len(str(v)) for v in parsed.values())
271
}
272
except:
273
return {"feature_count": 0, "has_metadata": False, "total_size": 0}
274
275
@daft.func
276
def process_list(items: List[str]) -> str:
277
"""Process list of items."""
278
return ", ".join(sorted(items))
279
280
# Use with complex types
281
json_df = daft.from_pydict({
282
"json_data": ['{"features": ["a", "b"], "metadata": {}}', '{"other": "value"}'],
283
"tags": [["python", "data"], ["machine", "learning"]]
284
})
285
286
processed = json_df.select(
287
extract_features(col("json_data")).alias("features"),
288
process_list(col("tags")).alias("tag_string")
289
).collect()
290
```
291
292
### Direct Function Application
293
```python
294
# Use UDF decorator on existing function
295
def existing_function(x: float) -> float:
296
return x ** 2 + 1
297
298
# Create UDF from existing function
299
square_plus_one = daft.func(existing_function)
300
301
# Apply to DataFrame
302
df = daft.from_pydict({"values": [1.0, 2.0, 3.0, 4.0]})
303
result = df.select(
304
square_plus_one(col("values")).alias("transformed")
305
).collect()
306
```
307
308
### Error Handling in UDFs
309
```python
310
@daft.func
311
def safe_divide(a: float, b: float) -> float:
312
"""Safely divide two numbers."""
313
try:
314
if b == 0:
315
return float('inf')
316
return a / b
317
except Exception:
318
return float('nan')
319
320
@daft.func
321
def validate_email(email: str) -> bool:
322
"""Validate email format."""
323
try:
324
return "@" in email and "." in email.split("@")[1]
325
except:
326
return False
327
328
# Use with error handling
329
data_df = daft.from_pydict({
330
"numerator": [10.0, 20.0, 30.0],
331
"denominator": [2.0, 0.0, 5.0],
332
"email": ["user@domain.com", "invalid", "test@example.org"]
333
})
334
335
safe_result = data_df.select(
336
safe_divide(col("numerator"), col("denominator")).alias("safe_ratio"),
337
validate_email(col("email")).alias("valid_email")
338
).collect()
339
```
340
341
### Performance Considerations
342
```python
343
# Vectorized operations when possible
344
@daft.func
345
def batch_process(values: List[float]) -> List[float]:
346
"""Process batch of values efficiently."""
347
import numpy as np
348
arr = np.array(values)
349
return (arr * 2 + 1).tolist()
350
351
# Use with grouped data for better performance
352
grouped_df = df.groupby("category").agg(
353
col("value").list().alias("value_list")
354
)
355
356
processed = grouped_df.select(
357
col("category"),
358
batch_process(col("value_list")).alias("processed_values")
359
).collect()
360
```
361
362
## Integration with DataFrame Operations
363
364
UDFs work seamlessly with all DataFrame operations:
365
366
```python
367
# Chaining UDFs with other operations
368
result = (df
369
.filter(validate_email(col("email")))
370
.select(
371
col("name"),
372
format_name(col("first"), col("last")).alias("full_name"),
373
double_value(col("score")).alias("bonus_score")
374
)
375
.groupby("department")
376
.agg(col("bonus_score").mean().alias("avg_bonus"))
377
.collect()
378
)
379
380
# Using UDFs in filters and conditions
381
filtered = df.filter(
382
(col("age") > 18) & validate_email(col("email"))
383
).collect()
384
```
385
386
## Type System Integration
387
388
```python { .api }
389
DataTypeLike = Union[DataType, str, type]
390
```
391
392
UDFs automatically infer return types from function annotations, but explicit types can be specified for better control and performance optimization.