or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnectors.mdevent-time-watermarks.mdexecution-jobs.mdfunctions-and-operators.mdindex.mdstate-management.mdtype-system-serialization.mdutilities.md

functions-and-operators.mddocs/

0

# Functions and Operators

1

2

Apache Flink Core provides a comprehensive set of user-defined function interfaces and operators for building data transformation pipelines. These APIs enable developers to implement custom business logic for stream and batch processing applications.

3

4

## Core Function Interfaces

5

6

### MapFunction

7

8

Transform elements one-to-one.

9

10

```java { .api }

11

import org.apache.flink.api.common.functions.MapFunction;

12

13

// Basic map function

14

public class StringLengthMapper implements MapFunction<String, Integer> {

15

@Override

16

public Integer map(String value) throws Exception {

17

return value.length();

18

}

19

}

20

21

// Rich map function with lifecycle methods

22

public class RichStringMapper extends RichMapFunction<String, String> {

23

private String prefix;

24

25

@Override

26

public void open(OpenContext openContext) throws Exception {

27

// Initialize resources, read configuration

28

prefix = getRuntimeContext().getExecutionConfig()

29

.getGlobalJobParameters().toMap().get("prefix");

30

}

31

32

@Override

33

public String map(String value) throws Exception {

34

return prefix + value;

35

}

36

37

@Override

38

public void close() throws Exception {

39

// Clean up resources

40

}

41

}

42

```

43

44

### FlatMapFunction

45

46

Transform elements one-to-many.

47

48

```java { .api }

49

import org.apache.flink.api.common.functions.FlatMapFunction;

50

import org.apache.flink.util.Collector;

51

52

// Split strings into words

53

public class TokenizerFunction implements FlatMapFunction<String, String> {

54

@Override

55

public void flatMap(String value, Collector<String> out) throws Exception {

56

for (String word : value.split("\\s+")) {

57

if (!word.isEmpty()) {

58

out.collect(word);

59

}

60

}

61

}

62

}

63

64

// Rich flat map function

65

public class RichTokenizerFunction extends RichFlatMapFunction<String, String> {

66

private Pattern pattern;

67

68

@Override

69

public void open(OpenContext openContext) throws Exception {

70

// Compile regex pattern once during initialization

71

pattern = Pattern.compile("\\s+");

72

}

73

74

@Override

75

public void flatMap(String value, Collector<String> out) throws Exception {

76

for (String word : pattern.split(value)) {

77

if (!word.isEmpty()) {

78

out.collect(word.toLowerCase());

79

}

80

}

81

}

82

}

83

```

84

85

### FilterFunction

86

87

Filter elements based on predicates.

88

89

```java { .api }

90

import org.apache.flink.api.common.functions.FilterFunction;

91

92

// Filter strings by length

93

public class LengthFilter implements FilterFunction<String> {

94

private final int minLength;

95

96

public LengthFilter(int minLength) {

97

this.minLength = minLength;

98

}

99

100

@Override

101

public boolean filter(String value) throws Exception {

102

return value.length() >= minLength;

103

}

104

}

105

106

// Rich filter with metrics

107

public class RichLengthFilter extends RichFilterFunction<String> {

108

private Counter filteredCounter;

109

110

@Override

111

public void open(OpenContext openContext) throws Exception {

112

filteredCounter = getRuntimeContext()

113

.getMetricGroup()

114

.counter("filtered_elements");

115

}

116

117

@Override

118

public boolean filter(String value) throws Exception {

119

boolean pass = value.length() >= 5;

120

if (!pass) {

121

filteredCounter.inc();

122

}

123

return pass;

124

}

125

}

126

```

127

128

### ReduceFunction

129

130

Combine elements of the same type.

131

132

```java { .api }

133

import org.apache.flink.api.common.functions.ReduceFunction;

134

135

// Sum integers

136

public class SumReduceFunction implements ReduceFunction<Integer> {

137

@Override

138

public Integer reduce(Integer value1, Integer value2) throws Exception {

139

return value1 + value2;

140

}

141

}

142

143

// Combine objects

144

public class WordCountReduceFunction implements ReduceFunction<Tuple2<String, Integer>> {

145

@Override

146

public Tuple2<String, Integer> reduce(

147

Tuple2<String, Integer> value1,

148

Tuple2<String, Integer> value2) throws Exception {

149

return new Tuple2<>(value1.f0, value1.f1 + value2.f1);

150

}

151

}

152

```

153

154

## Advanced Function Interfaces

155

156

### JoinFunction and FlatJoinFunction

157

158

Join elements from two data streams.

159

160

```java { .api }

161

import org.apache.flink.api.common.functions.JoinFunction;

162

import org.apache.flink.api.common.functions.FlatJoinFunction;

163

164

// Simple join function

165

public class UserOrderJoinFunction implements JoinFunction<User, Order, UserOrder> {

166

@Override

167

public UserOrder join(User user, Order order) throws Exception {

168

return new UserOrder(user.getId(), user.getName(), order.getAmount());

169

}

170

}

171

172

// Flat join function producing multiple results

173

public class UserOrderFlatJoinFunction implements FlatJoinFunction<User, Order, String> {

174

@Override

175

public void join(User user, Order order, Collector<String> out) throws Exception {

176

// Output multiple formats for each join

177

out.collect("User: " + user.getName() + " - Order: " + order.getId());

178

out.collect("Amount: " + order.getAmount() + " for " + user.getName());

179

}

180

}

181

```

182

183

### CoGroupFunction

184

185

Group and process elements from two data streams.

186

187

```java { .api }

188

import org.apache.flink.api.common.functions.CoGroupFunction;

189

190

public class UserOrderCoGroupFunction implements

191

CoGroupFunction<User, Order, UserOrderSummary> {

192

193

@Override

194

public void coGroup(Iterable<User> users, Iterable<Order> orders,

195

Collector<UserOrderSummary> out) throws Exception {

196

197

User user = users.iterator().hasNext() ? users.iterator().next() : null;

198

199

if (user != null) {

200

int totalAmount = 0;

201

int orderCount = 0;

202

203

for (Order order : orders) {

204

totalAmount += order.getAmount();

205

orderCount++;

206

}

207

208

out.collect(new UserOrderSummary(user.getId(), orderCount, totalAmount));

209

}

210

}

211

}

212

```

213

214

### GroupReduceFunction

215

216

Reduce groups of elements.

217

218

```java { .api }

219

import org.apache.flink.api.common.functions.GroupReduceFunction;

220

221

public class WordCountGroupReduceFunction implements

222

GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

223

224

@Override

225

public void reduce(Iterable<Tuple2<String, Integer>> values,

226

Collector<Tuple2<String, Integer>> out) throws Exception {

227

228

String word = null;

229

int count = 0;

230

231

for (Tuple2<String, Integer> value : values) {

232

word = value.f0;

233

count += value.f1;

234

}

235

236

out.collect(new Tuple2<>(word, count));

237

}

238

}

239

```

240

241

### MapPartitionFunction

242

243

Process entire partitions.

244

245

```java { .api }

246

import org.apache.flink.api.common.functions.MapPartitionFunction;

247

248

public class StatisticsMapPartitionFunction implements

249

MapPartitionFunction<Integer, PartitionStatistics> {

250

251

@Override

252

public void mapPartition(Iterable<Integer> values,

253

Collector<PartitionStatistics> out) throws Exception {

254

255

int count = 0;

256

int sum = 0;

257

int min = Integer.MAX_VALUE;

258

int max = Integer.MIN_VALUE;

259

260

for (Integer value : values) {

261

count++;

262

sum += value;

263

min = Math.min(min, value);

264

max = Math.max(max, value);

265

}

266

267

if (count > 0) {

268

double avg = (double) sum / count;

269

out.collect(new PartitionStatistics(count, sum, avg, min, max));

270

}

271

}

272

}

273

```

274

275

## Rich Functions

276

277

Rich functions provide additional lifecycle methods and access to runtime context.

278

279

```java { .api }

280

import org.apache.flink.api.common.functions.RichFunction;

281

import org.apache.flink.api.common.functions.OpenContext;

282

import org.apache.flink.configuration.Configuration;

283

284

public abstract class AbstractRichFunction implements RichFunction {

285

private RuntimeContext runtimeContext;

286

287

@Override

288

public void setRuntimeContext(RuntimeContext runtimeContext) {

289

this.runtimeContext = runtimeContext;

290

}

291

292

@Override

293

public RuntimeContext getRuntimeContext() {

294

return runtimeContext;

295

}

296

297

@Override

298

public void open(OpenContext openContext) throws Exception {

299

// Override in subclasses for initialization

300

}

301

302

@Override

303

public void close() throws Exception {

304

// Override in subclasses for cleanup

305

}

306

}

307

308

// Example rich function implementation

309

public class DatabaseLookupFunction extends RichMapFunction<String, UserProfile> {

310

private DatabaseConnection connection;

311

312

@Override

313

public void open(OpenContext openContext) throws Exception {

314

// Initialize database connection

315

Configuration config = (Configuration) getRuntimeContext()

316

.getExecutionConfig().getGlobalJobParameters();

317

318

String dbUrl = config.getString("db.url", "localhost:5432");

319

connection = new DatabaseConnection(dbUrl);

320

}

321

322

@Override

323

public UserProfile map(String userId) throws Exception {

324

return connection.getUserProfile(userId);

325

}

326

327

@Override

328

public void close() throws Exception {

329

if (connection != null) {

330

connection.close();

331

}

332

}

333

}

334

```

335

336

## Function Utilities

337

338

### BroadcastVariableInitializer

339

340

Transform broadcast variables during initialization.

341

342

```java { .api }

343

import org.apache.flink.api.common.functions.BroadcastVariableInitializer;

344

345

public class MapBroadcastInitializer implements

346

BroadcastVariableInitializer<Tuple2<String, Integer>, Map<String, Integer>> {

347

348

@Override

349

public Map<String, Integer> initializeBroadcastVariable(

350

Iterable<Tuple2<String, Integer>> data) {

351

352

Map<String, Integer> map = new HashMap<>();

353

for (Tuple2<String, Integer> tuple : data) {

354

map.put(tuple.f0, tuple.f1);

355

}

356

return map;

357

}

358

}

359

360

// Using broadcast variable in rich function

361

public class EnrichWithBroadcastFunction extends RichMapFunction<String, EnrichedData> {

362

private Map<String, Integer> broadcastMap;

363

364

@Override

365

public void open(OpenContext openContext) throws Exception {

366

// Access broadcast variable

367

broadcastMap = getRuntimeContext()

368

.getBroadcastVariable("config-map");

369

}

370

371

@Override

372

public EnrichedData map(String value) throws Exception {

373

Integer config = broadcastMap.get(value);

374

return new EnrichedData(value, config != null ? config : 0);

375

}

376

}

377

```

378

379

### Partitioner

380

381

Custom partitioning logic.

382

383

```java { .api }

384

import org.apache.flink.api.common.functions.Partitioner;

385

386

public class CustomPartitioner implements Partitioner<String> {

387

@Override

388

public int partition(String key, int numPartitions) {

389

// Custom partitioning logic

390

return Math.abs(key.hashCode()) % numPartitions;

391

}

392

}

393

394

// Hash-based partitioner for specific business logic

395

public class UserIdPartitioner implements Partitioner<String> {

396

@Override

397

public int partition(String userId, int numPartitions) {

398

// Ensure users with similar IDs go to same partition

399

return (userId.hashCode() & Integer.MAX_VALUE) % numPartitions;

400

}

401

}

402

```

403

404

## Runtime Context

405

406

Access runtime information and services within functions.

407

408

```java { .api }

409

import org.apache.flink.api.common.functions.RuntimeContext;

410

import org.apache.flink.metrics.Counter;

411

import org.apache.flink.metrics.MetricGroup;

412

413

public class MetricsAwareFunction extends RichMapFunction<String, String> {

414

private Counter processedCounter;

415

private Counter errorCounter;

416

417

@Override

418

public void open(OpenContext openContext) throws Exception {

419

RuntimeContext ctx = getRuntimeContext();

420

421

// Access task information

422

String taskName = ctx.getTaskName();

423

int subtaskIndex = ctx.getIndexOfThisSubtask();

424

int parallelism = ctx.getNumberOfParallelSubtasks();

425

426

// Create metrics

427

MetricGroup metricGroup = ctx.getMetricGroup();

428

processedCounter = metricGroup.counter("processed");

429

errorCounter = metricGroup.counter("errors");

430

431

// Access state (in keyed operations)

432

ValueStateDescriptor<Integer> descriptor =

433

new ValueStateDescriptor<>("count", Integer.class);

434

ValueState<Integer> countState = ctx.getState(descriptor);

435

}

436

437

@Override

438

public String map(String value) throws Exception {

439

try {

440

processedCounter.inc();

441

// Process value

442

return value.toUpperCase();

443

} catch (Exception e) {

444

errorCounter.inc();

445

throw e;

446

}

447

}

448

}

449

```

450

451

## Aggregate Functions

452

453

For advanced aggregation operations.

454

455

```java { .api }

456

import org.apache.flink.api.common.functions.AggregateFunction;

457

458

// Average aggregate function

459

public class AverageAggregateFunction implements

460

AggregateFunction<Integer, Tuple2<Integer, Integer>, Double> {

461

462

@Override

463

public Tuple2<Integer, Integer> createAccumulator() {

464

return new Tuple2<>(0, 0); // (sum, count)

465

}

466

467

@Override

468

public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {

469

return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1);

470

}

471

472

@Override

473

public Double getResult(Tuple2<Integer, Integer> accumulator) {

474

return accumulator.f1 == 0 ? 0.0 : (double) accumulator.f0 / accumulator.f1;

475

}

476

477

@Override

478

public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {

479

return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);

480

}

481

}

482

483

// Rich aggregate function with metrics

484

public class RichAverageAggregateFunction extends RichAggregateFunction<Integer, Tuple2<Integer, Integer>, Double> {

485

private Counter aggregationCounter;

486

487

@Override

488

public void open(OpenContext openContext) throws Exception {

489

aggregationCounter = getRuntimeContext()

490

.getMetricGroup()

491

.counter("aggregations");

492

}

493

494

@Override

495

public Tuple2<Integer, Integer> createAccumulator() {

496

return new Tuple2<>(0, 0);

497

}

498

499

@Override

500

public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {

501

aggregationCounter.inc();

502

return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1);

503

}

504

505

@Override

506

public Double getResult(Tuple2<Integer, Integer> accumulator) {

507

return accumulator.f1 == 0 ? 0.0 : (double) accumulator.f0 / accumulator.f1;

508

}

509

510

@Override

511

public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {

512

return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);

513

}

514

}

515

```

516

517

## Cross Functions

518

519

For Cartesian product operations.

520

521

```java { .api }

522

import org.apache.flink.api.common.functions.CrossFunction;

523

524

public class UserProductCrossFunction implements CrossFunction<User, Product, UserProductPair> {

525

@Override

526

public UserProductPair cross(User user, Product product) throws Exception {

527

return new UserProductPair(

528

user.getId(),

529

product.getId(),

530

calculateCompatibility(user, product)

531

);

532

}

533

534

private double calculateCompatibility(User user, Product product) {

535

// Custom compatibility calculation

536

return Math.random(); // Simplified example

537

}

538

}

539

```

540

541

## Best Practices

542

543

### Function Design

544

545

```java { .api }

546

// Prefer stateless functions when possible

547

public class StatelessTransformFunction implements MapFunction<InputType, OutputType> {

548

@Override

549

public OutputType map(InputType input) throws Exception {

550

// Pure transformation logic without side effects

551

return transform(input);

552

}

553

554

private OutputType transform(InputType input) {

555

// Deterministic transformation

556

return new OutputType(input.getValue() * 2);

557

}

558

}

559

560

// Use rich functions when you need lifecycle management

561

public class ResourceManagedFunction extends RichMapFunction<InputType, OutputType> {

562

private transient ExpensiveResource resource;

563

564

@Override

565

public void open(OpenContext openContext) throws Exception {

566

// Initialize expensive resources once per task

567

resource = new ExpensiveResource();

568

}

569

570

@Override

571

public OutputType map(InputType input) throws Exception {

572

return resource.process(input);

573

}

574

575

@Override

576

public void close() throws Exception {

577

if (resource != null) {

578

resource.cleanup();

579

}

580

}

581

}

582

```

583

584

### Exception Handling

585

586

```java { .api }

587

public class RobustMapFunction implements MapFunction<String, Result> {

588

@Override

589

public Result map(String value) throws Exception {

590

try {

591

return processValue(value);

592

} catch (IllegalArgumentException e) {

593

// Handle known exceptions gracefully

594

return Result.createErrorResult("Invalid input: " + value);

595

} catch (Exception e) {

596

// Re-throw unexpected exceptions to trigger Flink's fault tolerance

597

throw new Exception("Processing failed for value: " + value, e);

598

}

599

}

600

601

private Result processValue(String value) {

602

// Business logic

603

return new Result(value);

604

}

605

}

606

```

607

608

Apache Flink's function interfaces provide a powerful foundation for implementing custom data processing logic. By choosing the appropriate function type and following best practices, you can build efficient, maintainable, and fault-tolerant data processing applications.