or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

ai-ml.mdcatalog.mddata-io.mddataframe-operations.mdexpressions.mdindex.mdsession.mdsql.mdudf.md

udf.mddocs/

0

# User-Defined Functions

1

2

Custom Python functions that integrate seamlessly with Daft's distributed DataFrame operations. UDFs support three execution modes: row-wise (1-to-1), async row-wise, and generator (1-to-many) with automatic type inference and optimization.

3

4

## Capabilities

5

6

### Function Decorator

7

8

Modern decorator interface for creating user-defined functions.

9

10

```python { .api }

11

# The func decorator is an alias for _DaftFuncDecorator

12

func = _DaftFuncDecorator

13

14

@func

15

def custom_function(input_arg: InputType) -> OutputType:

16

"""Row-wise function processing one row at a time."""

17

18

@func

19

async def async_function(input_arg: InputType) -> OutputType:

20

"""Async row-wise function for I/O-bound operations."""

21

22

@func

23

def generator_function(input_arg: InputType) -> Iterator[OutputType]:

24

"""Generator function producing multiple outputs per input."""

25

26

class _DaftFuncDecorator:

27

"""

28

Decorator to convert Python functions into Daft user-defined functions.

29

30

Supports three function variants:

31

- Row-wise (1 row in, 1 row out) - default for regular functions

32

- Async row-wise (1 row in, 1 row out) - for async functions

33

- Generator (1 row in, N rows out) - for generator functions

34

35

When decorated functions are called with Expressions, they return Expressions.

36

When called with regular Python values, they execute immediately.

37

"""

38

39

def __new__(

40

cls,

41

fn: Optional[Callable] = None,

42

*,

43

return_dtype: Optional[DataTypeLike] = None

44

) -> Union[RowWiseUdf, GeneratorUdf, _PartialUdf]:

45

"""

46

Create UDF decorator.

47

48

Parameters:

49

- fn: Function to decorate (None for parameterized decorator)

50

- return_dtype: Explicit return data type (inferred if None)

51

52

Returns:

53

UDF instance or partial decorator for chaining

54

"""

55

```

56

57

### Row-wise UDFs

58

59

Process one row at a time with 1-to-1 mapping.

60

61

```python { .api }

62

class RowWiseUdf:

63

"""User-defined function processing individual rows."""

64

65

def __init__(

66

self,

67

func: Callable,

68

return_dtype: Optional[DataTypeLike] = None

69

):

70

"""

71

Create row-wise UDF.

72

73

Parameters:

74

- func: Python function to wrap

75

- return_dtype: Return data type (inferred if None)

76

"""

77

78

def __call__(self, *args: Expression) -> Expression:

79

"""

80

Apply UDF to expressions.

81

82

Parameters:

83

- args: Column expressions as function arguments

84

85

Returns:

86

Expression: UDF expression for DataFrame operations

87

"""

88

```

89

90

### Generator UDFs

91

92

Process one input to produce multiple outputs.

93

94

```python { .api }

95

class GeneratorUdf:

96

"""User-defined function that generates multiple rows from one input."""

97

98

def __init__(

99

self,

100

func: Callable,

101

return_dtype: Optional[DataTypeLike] = None

102

):

103

"""

104

Create generator UDF.

105

106

Parameters:

107

- func: Python generator function to wrap

108

- return_dtype: Return data type (inferred if None)

109

"""

110

111

def __call__(self, *args: Expression) -> Expression:

112

"""

113

Apply generator UDF to expressions.

114

115

Parameters:

116

- args: Column expressions as function arguments

117

118

Returns:

119

Expression: Generator UDF expression

120

"""

121

```

122

123

### Legacy UDF Interface

124

125

Backward compatibility interface for existing UDFs.

126

127

```python { .api }

128

def udf(

129

func: Callable,

130

return_dtype: Optional[DataType] = None

131

) -> UDF:

132

"""

133

Legacy UDF decorator.

134

135

Parameters:

136

- func: Function to convert to UDF

137

- return_dtype: Return data type

138

139

Returns:

140

UDF: Legacy UDF instance

141

"""

142

143

class UDF:

144

"""Legacy user-defined function class."""

145

146

def __call__(self, *args: Expression) -> Expression:

147

"""Apply UDF to expressions."""

148

```

149

150

## Usage Examples

151

152

### Basic Row-wise UDF

153

```python

154

import daft

155

from daft import col

156

157

@daft.func

158

def double_value(x: int) -> int:

159

"""Double the input value."""

160

return x * 2

161

162

@daft.func

163

def format_name(first: str, last: str) -> str:

164

"""Format full name."""

165

return f"{first} {last}"

166

167

# Use in DataFrame operations

168

df = daft.from_pydict({

169

"first_name": ["Alice", "Bob"],

170

"last_name": ["Smith", "Jones"],

171

"score": [85, 92]

172

})

173

174

result = df.select(

175

format_name(col("first_name"), col("last_name")).alias("full_name"),

176

double_value(col("score")).alias("double_score")

177

).collect()

178

```

179

180

### Async UDF for I/O Operations

181

```python

182

import asyncio

183

import aiohttp

184

185

@daft.func

186

async def fetch_data(url: str) -> str:

187

"""Fetch data from URL asynchronously."""

188

async with aiohttp.ClientSession() as session:

189

async with session.get(url) as response:

190

return await response.text()

191

192

# Use with URLs in DataFrame

193

urls_df = daft.from_pydict({

194

"url": ["https://api.example.com/1", "https://api.example.com/2"]

195

})

196

197

results = urls_df.select(

198

col("url"),

199

fetch_data(col("url")).alias("response")

200

).collect()

201

```

202

203

### Generator UDF for One-to-Many

204

```python

205

from typing import Iterator

206

207

@daft.func

208

def tokenize(text: str) -> Iterator[str]:

209

"""Split text into individual tokens."""

210

for word in text.split():

211

yield word

212

213

@daft.func

214

def expand_range(n: int) -> Iterator[int]:

215

"""Generate range of numbers."""

216

for i in range(n):

217

yield i

218

219

# Use generator UDFs

220

text_df = daft.from_pydict({

221

"sentence": ["hello world", "daft is fast"],

222

"count": [3, 2]

223

})

224

225

# Tokenize sentences (explodes rows)

226

tokens = text_df.select(

227

tokenize(col("sentence")).alias("token")

228

).collect()

229

230

# Generate number ranges

231

ranges = text_df.select(

232

col("count"),

233

expand_range(col("count")).alias("number")

234

).collect()

235

```

236

237

### UDF with Explicit Return Type

238

```python

239

@daft.func(return_dtype=daft.DataType.float32())

240

def calculate_ratio(numerator: int, denominator: int) -> float:

241

"""Calculate ratio with specific return type."""

242

if denominator == 0:

243

return 0.0

244

return float(numerator) / float(denominator)

245

246

# Use with type specification

247

df = daft.from_pydict({

248

"num": [10, 20, 30],

249

"den": [2, 4, 0]

250

})

251

252

result = df.select(

253

calculate_ratio(col("num"), col("den")).alias("ratio")

254

).collect()

255

```

256

257

### Complex Data Processing UDF

258

```python

259

from typing import Dict, List, Any

260

import json

261

262

@daft.func

263

def extract_features(data: str) -> Dict[str, Any]:

264

"""Extract features from JSON string."""

265

try:

266

parsed = json.loads(data)

267

return {

268

"feature_count": len(parsed.get("features", [])),

269

"has_metadata": "metadata" in parsed,

270

"total_size": sum(len(str(v)) for v in parsed.values())

271

}

272

except:

273

return {"feature_count": 0, "has_metadata": False, "total_size": 0}

274

275

@daft.func

276

def process_list(items: List[str]) -> str:

277

"""Process list of items."""

278

return ", ".join(sorted(items))

279

280

# Use with complex types

281

json_df = daft.from_pydict({

282

"json_data": ['{"features": ["a", "b"], "metadata": {}}', '{"other": "value"}'],

283

"tags": [["python", "data"], ["machine", "learning"]]

284

})

285

286

processed = json_df.select(

287

extract_features(col("json_data")).alias("features"),

288

process_list(col("tags")).alias("tag_string")

289

).collect()

290

```

291

292

### Direct Function Application

293

```python

294

# Use UDF decorator on existing function

295

def existing_function(x: float) -> float:

296

return x ** 2 + 1

297

298

# Create UDF from existing function

299

square_plus_one = daft.func(existing_function)

300

301

# Apply to DataFrame

302

df = daft.from_pydict({"values": [1.0, 2.0, 3.0, 4.0]})

303

result = df.select(

304

square_plus_one(col("values")).alias("transformed")

305

).collect()

306

```

307

308

### Error Handling in UDFs

309

```python

310

@daft.func

311

def safe_divide(a: float, b: float) -> float:

312

"""Safely divide two numbers."""

313

try:

314

if b == 0:

315

return float('inf')

316

return a / b

317

except Exception:

318

return float('nan')

319

320

@daft.func

321

def validate_email(email: str) -> bool:

322

"""Validate email format."""

323

try:

324

return "@" in email and "." in email.split("@")[1]

325

except:

326

return False

327

328

# Use with error handling

329

data_df = daft.from_pydict({

330

"numerator": [10.0, 20.0, 30.0],

331

"denominator": [2.0, 0.0, 5.0],

332

"email": ["user@domain.com", "invalid", "test@example.org"]

333

})

334

335

safe_result = data_df.select(

336

safe_divide(col("numerator"), col("denominator")).alias("safe_ratio"),

337

validate_email(col("email")).alias("valid_email")

338

).collect()

339

```

340

341

### Performance Considerations

342

```python

343

# Vectorized operations when possible

344

@daft.func

345

def batch_process(values: List[float]) -> List[float]:

346

"""Process batch of values efficiently."""

347

import numpy as np

348

arr = np.array(values)

349

return (arr * 2 + 1).tolist()

350

351

# Use with grouped data for better performance

352

grouped_df = df.groupby("category").agg(

353

col("value").list().alias("value_list")

354

)

355

356

processed = grouped_df.select(

357

col("category"),

358

batch_process(col("value_list")).alias("processed_values")

359

).collect()

360

```

361

362

## Integration with DataFrame Operations

363

364

UDFs work seamlessly with all DataFrame operations:

365

366

```python

367

# Chaining UDFs with other operations

368

result = (df

369

.filter(validate_email(col("email")))

370

.select(

371

col("name"),

372

format_name(col("first"), col("last")).alias("full_name"),

373

double_value(col("score")).alias("bonus_score")

374

)

375

.groupby("department")

376

.agg(col("bonus_score").mean().alias("avg_bonus"))

377

.collect()

378

)

379

380

# Using UDFs in filters and conditions

381

filtered = df.filter(

382

(col("age") > 18) & validate_email(col("email"))

383

).collect()

384

```

385

386

## Type System Integration

387

388

```python { .api }

389

DataTypeLike = Union[DataType, str, type]

390

```

391

392

UDFs automatically infer return types from function annotations, but explicit types can be specified for better control and performance optimization.