or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation-grouping.mdcatalog-management.mdexpressions.mdindex.mdsql-integration.mdtable-environment.mdtable-operations.mduser-defined-functions.mdwindow-operations.md

user-defined-functions.mddocs/

0

# User-Defined Functions

1

2

Flink Table API supports custom functions to extend the built-in function library. You can create scalar functions, table functions, aggregate functions, and table aggregate functions to implement domain-specific logic.

3

4

## Capabilities

5

6

### Scalar Functions

7

8

User-defined scalar functions take zero, one, or multiple scalar values and return a single scalar value.

9

10

```java { .api }

11

/**

12

* Base class for scalar user-defined functions

13

* Users must extend this class and implement eval() methods

14

*/

15

public abstract class ScalarFunction extends UserDefinedFunction {

16

/**

17

* Users implement one or more eval methods with different signatures

18

* The eval method name is fixed - Flink uses reflection to find matching methods

19

*

20

* Example signatures:

21

* public String eval(String input);

22

* public Integer eval(Integer a, Integer b);

23

* public Double eval(Double... values);

24

*/

25

}

26

```

27

28

**Usage Examples:**

29

30

```java

31

// Custom string manipulation function

32

public class StringHashFunction extends ScalarFunction {

33

public String eval(String input) {

34

if (input == null) {

35

return null;

36

}

37

return "hash_" + Math.abs(input.hashCode());

38

}

39

40

public String eval(String input, String prefix) {

41

if (input == null) {

42

return null;

43

}

44

return prefix + "_" + Math.abs(input.hashCode());

45

}

46

}

47

48

// Register and use the function

49

StringHashFunction hashFunc = new StringHashFunction();

50

tableEnv.createTemporaryFunction("string_hash", hashFunc);

51

52

// Use in Table API

53

Table result = sourceTable.select(

54

$("id"),

55

$("name"),

56

call("string_hash", $("name")).as("name_hash"),

57

call("string_hash", $("name"), lit("user")).as("prefixed_hash")

58

);

59

60

// Use in SQL

61

Table sqlResult = tableEnv.sqlQuery(

62

"SELECT id, name, string_hash(name) as name_hash " +

63

"FROM source_table"

64

);

65

66

// Mathematical function example

67

public class PowerFunction extends ScalarFunction {

68

public Double eval(Double base, Double exponent) {

69

if (base == null || exponent == null) {

70

return null;

71

}

72

return Math.pow(base, exponent);

73

}

74

75

public Long eval(Long base, Long exponent) {

76

if (base == null || exponent == null) {

77

return null;

78

}

79

return (long) Math.pow(base, exponent);

80

}

81

}

82

```

83

84

### Table Functions

85

86

User-defined table functions take zero, one, or multiple scalar values and return multiple rows (table).

87

88

```java { .api }

89

/**

90

* Base class for table user-defined functions

91

* Users must extend this class and implement eval() methods

92

* Use collect() to emit output rows

93

*/

94

public abstract class TableFunction<T> extends UserDefinedFunction {

95

/**

96

* Emits a result row to the output table

97

* @param result Row data to emit

98

*/

99

protected void collect(T result);

100

101

/**

102

* Users implement eval methods that call collect() for each output row

103

* The eval method name is fixed - Flink uses reflection to find matching methods

104

*/

105

}

106

```

107

108

**Usage Examples:**

109

110

```java

111

// Split string into multiple rows

112

public class SplitFunction extends TableFunction<Row> {

113

public void eval(String str, String separator) {

114

if (str == null || separator == null) {

115

return;

116

}

117

118

String[] parts = str.split(separator);

119

for (int i = 0; i < parts.length; i++) {

120

collect(Row.of(parts[i].trim(), i));

121

}

122

}

123

}

124

125

// Register and use table function

126

SplitFunction splitFunc = new SplitFunction();

127

tableEnv.createTemporaryFunction("split_string", splitFunc);

128

129

// Use with LATERAL TABLE in SQL

130

Table result = tableEnv.sqlQuery(

131

"SELECT t.id, t.name, s.word, s.position " +

132

"FROM source_table t, " +

133

"LATERAL TABLE(split_string(t.tags, ',')) AS s(word, position)"

134

);

135

136

// Use in Table API with joinLateral

137

Table lateralResult = sourceTable

138

.joinLateral(call("split_string", $("tags"), lit(",")))

139

.select($("id"), $("name"), $("f0").as("word"), $("f1").as("position"));

140

141

// Generate series function

142

public class GenerateSeriesFunction extends TableFunction<Integer> {

143

public void eval(Integer start, Integer end) {

144

if (start == null || end == null) {

145

return;

146

}

147

148

for (int i = start; i <= end; i++) {

149

collect(i);

150

}

151

}

152

153

public void eval(Integer start, Integer end, Integer step) {

154

if (start == null || end == null || step == null || step == 0) {

155

return;

156

}

157

158

if (step > 0) {

159

for (int i = start; i <= end; i += step) {

160

collect(i);

161

}

162

} else {

163

for (int i = start; i >= end; i += step) {

164

collect(i);

165

}

166

}

167

}

168

}

169

```

170

171

### Aggregate Functions

172

173

User-defined aggregate functions take multiple rows and compute a single aggregate result.

174

175

```java { .api }

176

/**

177

* Base class for aggregate user-defined functions

178

* Users must implement accumulator management methods

179

*/

180

public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {

181

/**

182

* Creates a new accumulator for aggregation

183

* @return New accumulator instance

184

*/

185

public abstract ACC createAccumulator();

186

187

/**

188

* Accumulates input values into the accumulator

189

* @param accumulator Current accumulator state

190

* @param input Input values to accumulate (one or more parameters)

191

*/

192

public abstract void accumulate(ACC accumulator, Object... input);

193

194

/**

195

* Extracts the final result from the accumulator

196

* @param accumulator Final accumulator state

197

* @return Aggregate result

198

*/

199

public abstract T getValue(ACC accumulator);

200

201

/**

202

* Retracts input values from the accumulator (optional)

203

* Only needed for streaming scenarios with retractions

204

* @param accumulator Current accumulator state

205

* @param input Input values to retract

206

*/

207

public void retract(ACC accumulator, Object... input) {

208

// Optional - implement if retraction is needed

209

}

210

211

/**

212

* Merges two accumulators (optional)

213

* Needed for session windows and some optimization scenarios

214

* @param accumulator Target accumulator

215

* @param other Source accumulator to merge from

216

*/

217

public void merge(ACC accumulator, Iterable<ACC> other) {

218

// Optional - implement if merging is needed

219

}

220

}

221

```

222

223

**Usage Examples:**

224

225

```java

226

// Custom average function with accumulator

227

public class WeightedAverageFunction extends AggregateFunction<Double, WeightedAverageAccumulator> {

228

229

// Accumulator class

230

public static class WeightedAverageAccumulator {

231

public double sum = 0.0;

232

public double weightSum = 0.0;

233

}

234

235

@Override

236

public WeightedAverageAccumulator createAccumulator() {

237

return new WeightedAverageAccumulator();

238

}

239

240

@Override

241

public void accumulate(WeightedAverageAccumulator acc, Double value, Double weight) {

242

if (value != null && weight != null) {

243

acc.sum += value * weight;

244

acc.weightSum += weight;

245

}

246

}

247

248

@Override

249

public Double getValue(WeightedAverageAccumulator acc) {

250

if (acc.weightSum == 0.0) {

251

return null;

252

}

253

return acc.sum / acc.weightSum;

254

}

255

256

@Override

257

public void retract(WeightedAverageAccumulator acc, Double value, Double weight) {

258

if (value != null && weight != null) {

259

acc.sum -= value * weight;

260

acc.weightSum -= weight;

261

}

262

}

263

264

@Override

265

public void merge(WeightedAverageAccumulator acc, Iterable<WeightedAverageAccumulator> others) {

266

for (WeightedAverageAccumulator other : others) {

267

acc.sum += other.sum;

268

acc.weightSum += other.weightSum;

269

}

270

}

271

}

272

273

// Register and use aggregate function

274

WeightedAverageFunction weightedAvg = new WeightedAverageFunction();

275

tableEnv.createTemporaryFunction("weighted_avg", weightedAvg);

276

277

// Use in Table API

278

Table result = sourceTable

279

.groupBy($("category"))

280

.select(

281

$("category"),

282

call("weighted_avg", $("price"), $("quantity")).as("weighted_avg_price")

283

);

284

285

// Use in SQL

286

Table sqlResult = tableEnv.sqlQuery(

287

"SELECT category, weighted_avg(price, quantity) as weighted_avg_price " +

288

"FROM source_table " +

289

"GROUP BY category"

290

);

291

292

// Custom string concatenation aggregate

293

public class StringConcatFunction extends AggregateFunction<String, StringBuilder> {

294

295

@Override

296

public StringBuilder createAccumulator() {

297

return new StringBuilder();

298

}

299

300

@Override

301

public void accumulate(StringBuilder acc, String value, String separator) {

302

if (value != null) {

303

if (acc.length() > 0 && separator != null) {

304

acc.append(separator);

305

}

306

acc.append(value);

307

}

308

}

309

310

@Override

311

public String getValue(StringBuilder acc) {

312

return acc.toString();

313

}

314

}

315

```

316

317

### Table Aggregate Functions

318

319

Table aggregate functions take multiple rows and return multiple rows (like table functions but with aggregation semantics).

320

321

```java { .api }

322

/**

323

* Base class for table aggregate functions

324

* Combines aspects of both table functions and aggregate functions

325

*/

326

public abstract class TableAggregateFunction<T, ACC> extends UserDefinedFunction {

327

/**

328

* Creates a new accumulator

329

* @return New accumulator instance

330

*/

331

public abstract ACC createAccumulator();

332

333

/**

334

* Accumulates input values

335

* @param accumulator Current accumulator state

336

* @param input Input values

337

*/

338

public abstract void accumulate(ACC accumulator, Object... input);

339

340

/**

341

* Emits result rows from the final accumulator state

342

* @param accumulator Final accumulator state

343

* @param out Collector for emitting output rows

344

*/

345

public abstract void emitValue(ACC accumulator, Collector<T> out);

346

347

/**

348

* Emits updated result rows during streaming processing

349

* @param accumulator Current accumulator state

350

* @param out Collector for emitting output rows

351

*/

352

public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out) {

353

// Optional - implement for streaming scenarios with retractions

354

}

355

}

356

```

357

358

**Usage Examples:**

359

360

```java

361

// Top-N function returning multiple rows per group

362

public class TopNFunction extends TableAggregateFunction<Row, TopNAccumulator> {

363

364

private int n;

365

366

public TopNFunction(int n) {

367

this.n = n;

368

}

369

370

public static class TopNAccumulator {

371

public List<Double> topValues = new ArrayList<>();

372

}

373

374

@Override

375

public TopNAccumulator createAccumulator() {

376

return new TopNAccumulator();

377

}

378

379

@Override

380

public void accumulate(TopNAccumulator acc, Double value) {

381

if (value != null) {

382

acc.topValues.add(value);

383

acc.topValues.sort((a, b) -> Double.compare(b, a)); // Descending order

384

385

// Keep only top N values

386

if (acc.topValues.size() > n) {

387

acc.topValues = acc.topValues.subList(0, n);

388

}

389

}

390

}

391

392

@Override

393

public void emitValue(TopNAccumulator acc, Collector<Row> out) {

394

for (int i = 0; i < acc.topValues.size(); i++) {

395

out.collect(Row.of(acc.topValues.get(i), i + 1));

396

}

397

}

398

}

399

400

// Register and use table aggregate function

401

TopNFunction topN = new TopNFunction(3);

402

tableEnv.createTemporaryFunction("top_n", topN);

403

404

// Use in Table API

405

Table topResults = sourceTable

406

.groupBy($("category"))

407

.flatAggregate(call("top_n", $("score")))

408

.select($("category"), $("f0").as("score"), $("f1").as("rank"));

409

```

410

411

### Function Registration Methods

412

413

Various ways to register functions in the table environment.

414

415

```java { .api }

416

/**

417

* Register a function instance with the given name

418

* @param name Function name for SQL and Table API usage

419

* @param function Function instance

420

*/

421

public void createTemporaryFunction(String name, UserDefinedFunction function);

422

423

/**

424

* Register a function class by name

425

* @param name Function name

426

* @param functionClass Function class

427

*/

428

public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass);

429

430

/**

431

* Register function in specific catalog and database

432

* @param path Catalog path (catalog.database.function)

433

* @param function Function instance

434

*/

435

public void createFunction(String path, UserDefinedFunction function);

436

437

/**

438

* Drop a temporary function

439

* @param name Function name to drop

440

* @return true if function was dropped

441

*/

442

public boolean dropTemporaryFunction(String name);

443

```

444

445

**Usage Examples:**

446

447

```java

448

// Register with instance

449

MyCustomFunction customFunc = new MyCustomFunction();

450

tableEnv.createTemporaryFunction("my_func", customFunc);

451

452

// Register with class

453

tableEnv.createTemporarySystemFunction("power_func", PowerFunction.class);

454

455

// Register in specific catalog

456

tableEnv.createFunction("my_catalog.my_db.custom_func", customFunc);

457

458

// Register via SQL DDL

459

tableEnv.executeSql(

460

"CREATE TEMPORARY FUNCTION my_hash AS 'com.example.StringHashFunction'"

461

);

462

463

// Drop function

464

boolean dropped = tableEnv.dropTemporaryFunction("my_func");

465

466

// Drop via SQL

467

tableEnv.executeSql("DROP TEMPORARY FUNCTION my_hash");

468

```

469

470

### Advanced Function Features

471

472

Advanced patterns for function development and optimization.

473

474

```java { .api }

475

/**

476

* Function with type inference - override getResultType for complex return types

477

*/

478

public abstract class UserDefinedFunction {

479

/**

480

* Provides type information for the function result

481

* @param signature Method signature information

482

* @return TypeInformation for the result type

483

*/

484

public TypeInformation<?> getResultType(Class<?>[] signature) {

485

// Override to provide custom type information

486

return null;

487

}

488

489

/**

490

* Provides parameter type information

491

* @param signature Method signature information

492

* @return Array of TypeInformation for parameters

493

*/

494

public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {

495

// Override to provide custom parameter type information

496

return null;

497

}

498

}

499

```

500

501

**Usage Examples:**

502

503

```java

504

// Function with custom type handling

505

public class ComplexReturnFunction extends ScalarFunction {

506

507

// Return a complex type (Row with multiple fields)

508

public Row eval(String input) {

509

if (input == null) {

510

return null;

511

}

512

513

String[] parts = input.split(":");

514

return Row.of(parts[0], Integer.parseInt(parts[1]), Double.parseDouble(parts[2]));

515

}

516

517

@Override

518

public TypeInformation<?> getResultType(Class<?>[] signature) {

519

return Types.ROW(

520

Types.STRING, // field 0

521

Types.INT, // field 1

522

Types.DOUBLE // field 2

523

);

524

}

525

}

526

527

// Function with configuration

528

public class ConfigurableFunction extends ScalarFunction {

529

private String prefix;

530

531

public ConfigurableFunction(String prefix) {

532

this.prefix = prefix;

533

}

534

535

public String eval(String input) {

536

return prefix + "_" + input;

537

}

538

}

539

540

// Stateful function with open/close lifecycle

541

public class ResourceFunction extends ScalarFunction {

542

private transient SomeResource resource;

543

544

@Override

545

public void open(FunctionContext context) throws Exception {

546

super.open(context);

547

// Initialize resources (database connections, etc.)

548

this.resource = new SomeResource();

549

}

550

551

@Override

552

public void close() throws Exception {

553

super.close();

554

// Clean up resources

555

if (resource != null) {

556

resource.close();

557

}

558

}

559

560

public String eval(String input) {

561

return resource.process(input);

562

}

563

}

564

```

565

566

### Best Practices and Performance

567

568

Guidelines for efficient function implementation.

569

570

**Usage Examples:**

571

572

```java

573

// Efficient function with null handling

574

public class EfficientStringFunction extends ScalarFunction {

575

576

// Handle null inputs early

577

public String eval(String input) {

578

if (input == null) {

579

return null;

580

}

581

582

// Avoid creating unnecessary objects

583

if (input.isEmpty()) {

584

return "";

585

}

586

587

// Use StringBuilder for string concatenation

588

StringBuilder sb = new StringBuilder(input.length() + 10);

589

sb.append("processed_").append(input);

590

return sb.toString();

591

}

592

593

// Provide overloaded methods for different input types

594

public String eval(Integer input) {

595

if (input == null) {

596

return null;

597

}

598

return "processed_" + input;

599

}

600

}

601

602

// Reusable accumulator for better performance

603

public class EfficientAggregateFunction extends AggregateFunction<Double, EfficientAggregateFunction.Acc> {

604

605

public static class Acc {

606

public double sum = 0.0;

607

public long count = 0L;

608

609

// Reset for reuse

610

public void reset() {

611

sum = 0.0;

612

count = 0L;

613

}

614

}

615

616

@Override

617

public Acc createAccumulator() {

618

return new Acc();

619

}

620

621

@Override

622

public void accumulate(Acc acc, Double value) {

623

if (value != null) {

624

acc.sum += value;

625

acc.count++;

626

}

627

}

628

629

@Override

630

public Double getValue(Acc acc) {

631

return acc.count > 0 ? acc.sum / acc.count : null;

632

}

633

}

634

```