or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

environment.mdfunctions.mdindex.mdjoins.mdsources-sinks.mdtransformations.md

functions.mddocs/

0

# User-Defined Functions

1

2

Function interfaces for implementing custom transformation logic. These abstract base classes define the contracts for various processing patterns, enabling users to implement custom business logic while leveraging Flink's distributed execution capabilities.

3

4

## Capabilities

5

6

### Base Function Interface

7

8

#### Base Function Class

9

10

Abstract base class for all user-defined functions providing common infrastructure.

11

12

```python { .api }

13

class Function:

14

"""

15

Abstract base class for all user-defined functions.

16

17

Provides common functionality for function execution including

18

configuration, lifecycle management, and error handling.

19

"""

20

21

def _run(self):

22

"""Abstract method implemented by subclasses for function execution."""

23

24

def _configure(self, input_file, output_file, mmap_size, port, env, info, subtask_index):

25

"""Sets up function execution context with runtime parameters."""

26

27

def _close(self):

28

"""Cleanup method called after function execution."""

29

```

30

31

### Transformation Functions

32

33

#### Map Function

34

35

Transforms each input element to exactly one output element.

36

37

```python { .api }

38

class MapFunction(Function):

39

def map(self, value):

40

"""

41

Transforms single input element to single output element.

42

43

Parameters:

44

value: Input element of any type

45

46

Returns:

47

Transformed element (can be different type)

48

"""

49

50

def collect(self, value):

51

"""

52

Internal method for collecting transformed values.

53

54

Parameters:

55

value: Input value to transform and collect

56

"""

57

```

58

59

#### Flat Map Function

60

61

Transforms each input element to zero or more output elements.

62

63

```python { .api }

64

class FlatMapFunction(Function):

65

def flat_map(self, value, collector):

66

"""

67

Transforms single input to zero or more outputs.

68

69

Parameters:

70

value: Input element

71

collector: Output collector - call collector.collect(output) for each result

72

"""

73

74

def collect(self, value):

75

"""

76

Internal method for collecting values using the flat_map transformation.

77

78

Parameters:

79

value: Input value to transform via flat_map

80

"""

81

```

82

83

#### Filter Function

84

85

Determines whether elements should be included in the result.

86

87

```python { .api }

88

class FilterFunction(Function):

89

def filter(self, value):

90

"""

91

Predicate function to include/exclude elements.

92

93

Parameters:

94

value: Input element to test

95

96

Returns:

97

bool: True to include element, False to exclude

98

"""

99

```

100

101

#### Map Partition Function

102

103

Processes entire partitions of data rather than individual elements.

104

105

```python { .api }

106

class MapPartitionFunction(Function):

107

def map_partition(self, iterator, collector):

108

"""

109

Processes entire partition of elements.

110

111

Allows for more efficient processing when setup/cleanup costs are high

112

or when processing requires access to multiple elements.

113

114

Parameters:

115

iterator: Iterator over all elements in the partition

116

collector: Output collector - call collector.collect(output) for each result

117

"""

118

```

119

120

### Reduction Functions

121

122

#### Reduce Function

123

124

Combines two elements into one of the same type.

125

126

```python { .api }

127

class ReduceFunction(Function):

128

def reduce(self, value1, value2):

129

"""

130

Combines two elements into one of the same type.

131

132

This function is applied associatively to reduce a set of elements

133

down to a single element.

134

135

Parameters:

136

value1: First element to combine

137

value2: Second element to combine

138

139

Returns:

140

Combined element of the same type as inputs

141

"""

142

143

def combine(self, value1, value2):

144

"""

145

Optional combiner function for partial aggregation.

146

147

Used for optimization - should have same semantics as reduce().

148

149

Parameters:

150

value1: First element to combine

151

value2: Second element to combine

152

153

Returns:

154

Combined element of the same type as inputs

155

"""

156

```

157

158

#### Group Reduce Function

159

160

Processes groups of elements with the same key.

161

162

```python { .api }

163

class GroupReduceFunction(Function):

164

def reduce(self, iterator, collector):

165

"""

166

Processes group of elements, emitting zero or more results.

167

168

Called once per group (or once for entire DataSet if not grouped).

169

Can iterate over all elements in the group and emit any number of results.

170

171

Parameters:

172

iterator: Iterator over all elements in the group

173

collector: Output collector - call collector.collect(output) for each result

174

"""

175

176

def combine(self, iterator, collector):

177

"""

178

Optional combiner for partial aggregation within partitions.

179

180

Used for optimization - should produce partial results that can be

181

further reduced by the reduce() method.

182

183

Parameters:

184

iterator: Iterator over elements in the partition

185

collector: Output collector for partial results

186

"""

187

```

188

189

### Multi-Input Functions

190

191

#### Join Function

192

193

Combines matching elements from two DataSets.

194

195

```python { .api }

196

class JoinFunction(Function):

197

def join(self, value1, value2):

198

"""

199

Combines matching elements from two DataSets.

200

201

Called for each pair of elements with matching keys.

202

203

Parameters:

204

value1: Element from first DataSet

205

value2: Element from second DataSet

206

207

Returns:

208

Combined element (can be any type)

209

"""

210

```

211

212

#### CoGroup Function

213

214

Processes groups from two DataSets with matching keys.

215

216

```python { .api }

217

class CoGroupFunction(Function):

218

def co_group(self, iterator1, iterator2, collector):

219

"""

220

Processes groups from two DataSets with the same key.

221

222

Called once per key, even if one or both groups are empty.

223

Useful for implementing outer joins and complex multi-input operations.

224

225

Parameters:

226

iterator1: Iterator over elements from first DataSet with this key

227

iterator2: Iterator over elements from second DataSet with this key

228

collector: Output collector for results

229

"""

230

```

231

232

#### Cross Function

233

234

Combines elements in a cross product (Cartesian product).

235

236

```python { .api }

237

class CrossFunction(Function):

238

def cross(self, value1, value2):

239

"""

240

Combines elements in cross product.

241

242

Called for every combination of elements from two DataSets.

243

244

Parameters:

245

value1: Element from first DataSet

246

value2: Element from second DataSet

247

248

Returns:

249

Combined element (can be any type)

250

"""

251

```

252

253

### Key Selection Functions

254

255

#### Key Selector Function

256

257

Extracts keys from elements for grouping and joining operations.

258

259

```python { .api }

260

class KeySelectorFunction(Function):

261

def get_key(self, value):

262

"""

263

Extracts key from element for grouping/joining.

264

265

Parameters:

266

value: Input element

267

268

Returns:

269

Key value used for grouping/joining

270

"""

271

```

272

273

### Runtime Context

274

275

Provides runtime information and services to functions.

276

277

```python { .api }

278

class RuntimeContext:

279

def get_broadcast_variable(self, name):

280

"""

281

Accesses broadcast variable by name.

282

283

Parameters:

284

name (str): Name of the broadcast variable

285

286

Returns:

287

Broadcast variable value

288

"""

289

290

def get_index_of_this_subtask(self):

291

"""

292

Gets index of current parallel subtask.

293

294

Returns:

295

int: Zero-based subtask index

296

"""

297

```

298

299

## Aggregation Functions

300

301

### Built-in Aggregation Types

302

303

```python { .api }

304

class Sum:

305

"""Aggregation for summing numeric values."""

306

307

class Min:

308

"""Aggregation for finding minimum values."""

309

310

class Max:

311

"""Aggregation for finding maximum values."""

312

```

313

314

### Aggregation Function Builder

315

316

```python { .api }

317

class AggregationFunction:

318

"""Combines multiple aggregations on different fields."""

319

320

def add_aggregation(self, aggregation, field):

321

"""

322

Adds additional aggregation to different field.

323

324

Parameters:

325

aggregation (Aggregation): Aggregation type (Sum, Min, Max)

326

field (int): Field index to aggregate

327

328

Returns:

329

AggregationFunction: Self for method chaining

330

"""

331

```

332

333

## Usage Examples

334

335

### Simple Map Function

336

337

```python

338

from flink.functions.MapFunction import MapFunction

339

340

class DoubleValue(MapFunction):

341

def map(self, value):

342

return value * 2

343

344

# Usage

345

data = env.from_elements(1, 2, 3, 4, 5)

346

doubled = data.map(DoubleValue())

347

348

# Or using lambda

349

doubled = data.map(lambda x: x * 2)

350

```

351

352

### Flat Map for Tokenization

353

354

```python

355

from flink.functions.FlatMapFunction import FlatMapFunction

356

357

class Tokenizer(FlatMapFunction):

358

def flat_map(self, line, collector):

359

words = line.lower().split()

360

for word in words:

361

collector.collect(word)

362

363

# Usage

364

text = env.from_elements("hello world", "flink python", "data processing")

365

words = text.flat_map(Tokenizer())

366

```

367

368

### Custom Filter Function

369

370

```python

371

from flink.functions.FilterFunction import FilterFunction

372

373

class EvenNumberFilter(FilterFunction):

374

def filter(self, value):

375

return value % 2 == 0

376

377

# Usage

378

numbers = env.from_elements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

379

evens = numbers.filter(EvenNumberFilter())

380

```

381

382

### Group Reduce for Word Counting

383

384

```python

385

from flink.functions.GroupReduceFunction import GroupReduceFunction

386

387

class WordCounter(GroupReduceFunction):

388

def reduce(self, iterator, collector):

389

word = None

390

count = 0

391

392

for element in iterator:

393

if word is None:

394

word = element

395

count += 1

396

397

collector.collect((word, count))

398

399

# Usage

400

words = text.flat_map(Tokenizer())

401

word_counts = words.group_by(0).reduce_group(WordCounter())

402

```

403

404

### Custom Reduce Function

405

406

```python

407

from flink.functions.ReduceFunction import ReduceFunction

408

409

class SumReduce(ReduceFunction):

410

def reduce(self, value1, value2):

411

return value1 + value2

412

413

def combine(self, value1, value2):

414

# Same implementation for this simple case

415

return value1 + value2

416

417

# Usage

418

numbers = env.from_elements(1, 2, 3, 4, 5)

419

total = numbers.reduce(SumReduce())

420

```

421

422

### Map Partition for Batch Processing

423

424

```python

425

from flink.functions.MapPartitionFunction import MapPartitionFunction

426

427

class BatchProcessor(MapPartitionFunction):

428

def map_partition(self, iterator, collector):

429

# Setup expensive resources once per partition

430

processor = ExpensiveProcessor()

431

432

batch = []

433

for element in iterator:

434

batch.append(element)

435

436

# Process in batches of 100

437

if len(batch) >= 100:

438

results = processor.process_batch(batch)

439

for result in results:

440

collector.collect(result)

441

batch = []

442

443

# Process remaining elements

444

if batch:

445

results = processor.process_batch(batch)

446

for result in results:

447

collector.collect(result)

448

449

# Cleanup

450

processor.close()

451

452

# Usage

453

large_dataset = env.read_csv("large_file.csv", [str, int, float])

454

processed = large_dataset.map_partition(BatchProcessor())

455

```

456

457

### Custom Join Function

458

459

```python

460

from flink.functions.JoinFunction import JoinFunction

461

462

class CustomerOrderJoin(JoinFunction):

463

def join(self, customer, order):

464

return {

465

'customer_id': customer[0],

466

'customer_name': customer[1],

467

'order_id': order[0],

468

'order_amount': order[2]

469

}

470

471

# Usage

472

customers = env.read_csv("customers.csv", [str, str])

473

orders = env.read_csv("orders.csv", [int, str, float])

474

475

result = customers.join(orders) \

476

.where(0) \

477

.equal_to(1) \

478

.using(CustomerOrderJoin())

479

```

480

481

### CoGroup for Outer Join

482

483

```python

484

from flink.functions.CoGroupFunction import CoGroupFunction

485

486

class LeftOuterJoin(CoGroupFunction):

487

def co_group(self, iterator1, iterator2, collector):

488

left_items = list(iterator1)

489

right_items = list(iterator2)

490

491

if not left_items:

492

return # No items in left dataset for this key

493

494

if not right_items:

495

# Left outer join - emit left items with null right side

496

for left_item in left_items:

497

collector.collect((left_item, None))

498

else:

499

# Inner join - emit all combinations

500

for left_item in left_items:

501

for right_item in right_items:

502

collector.collect((left_item, right_item))

503

504

# Usage

505

result = dataset1.co_group(dataset2) \

506

.where(0) \

507

.equal_to(0) \

508

.using(LeftOuterJoin())

509

```