or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-system.mddatastream-integration.mdindex.mdsql-execution.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.md

user-defined-functions.mddocs/

0

# User-Defined Functions

1

2

Flink's Table API provides a comprehensive framework for creating custom scalar, table, and aggregate functions. UDFs enable extending the built-in function library with domain-specific logic while maintaining type safety and performance optimization.

3

4

## Capabilities

5

6

### Scalar Functions

7

8

Functions that take one or more input values and return a single output value.

9

10

```java { .api }

11

/**

12

* Base class for user-defined scalar functions

13

* Users extend this class and implement eval() methods

14

*/

15

abstract class ScalarFunction extends UserDefinedFunction {

16

// Users implement one or more eval() methods with different signatures

17

// public ReturnType eval(InputType1 input1, InputType2 input2, ...);

18

19

/**

20

* Gets the function context for accessing runtime information

21

* @return FunctionContext providing runtime context

22

*/

23

FunctionContext getFunctionContext();

24

}

25

26

/**

27

* Base class for all user-defined functions

28

*/

29

abstract class UserDefinedFunction implements FunctionDefinition {

30

/**

31

* Optional method called when function is opened

32

* @param context Function context for initialization

33

*/

34

void open(FunctionContext context) throws Exception;

35

36

/**

37

* Optional method called when function is closed

38

*/

39

void close() throws Exception;

40

41

/**

42

* Indicates whether the function is deterministic

43

* @return true if function always returns same result for same inputs

44

*/

45

boolean isDeterministic();

46

47

/**

48

* Gets the type inference for this function

49

* @return TypeInference defining input/output types

50

*/

51

TypeInference getTypeInference();

52

}

53

```

54

55

**Usage Examples:**

56

57

```java

58

// Simple scalar function

59

public class UpperCaseFunction extends ScalarFunction {

60

public String eval(String input) {

61

return input != null ? input.toUpperCase() : null;

62

}

63

}

64

65

// Multiple eval signatures for overloading

66

public class AddFunction extends ScalarFunction {

67

public Integer eval(Integer a, Integer b) {

68

return (a != null && b != null) ? a + b : null;

69

}

70

71

public Double eval(Double a, Double b) {

72

return (a != null && b != null) ? a + b : null;

73

}

74

75

public String eval(String a, String b) {

76

return (a != null && b != null) ? a + b : null;

77

}

78

}

79

80

// Function with context and lifecycle

81

public class HashFunction extends ScalarFunction {

82

private MessageDigest md5;

83

84

@Override

85

public void open(FunctionContext context) throws Exception {

86

md5 = MessageDigest.getInstance("MD5");

87

}

88

89

public String eval(String input) {

90

if (input == null) return null;

91

92

byte[] hash = md5.digest(input.getBytes());

93

return DatatypeConverter.printHexBinary(hash);

94

}

95

}

96

97

// Registration and usage

98

tableEnv.createTemporarySystemFunction("my_upper", new UpperCaseFunction());

99

Table result = tableEnv.sqlQuery("SELECT my_upper(name) FROM users");

100

```

101

102

### Table Functions

103

104

Functions that take zero or more input values and return multiple rows (table-valued functions).

105

106

```java { .api }

107

/**

108

* Base class for user-defined table functions

109

* @param <T> Type of output rows

110

*/

111

abstract class TableFunction<T> extends UserDefinedFunction {

112

// Users implement one or more eval() methods that call collect()

113

// public void eval(InputType1 input1, InputType2 input2, ...);

114

115

/**

116

* Emits a result row from the table function

117

* @param result Result row to emit

118

*/

119

protected void collect(T result);

120

121

/**

122

* Gets the result type of this table function

123

* @return DataType representing the output row structure

124

*/

125

DataType getResultType();

126

}

127

```

128

129

**Usage Examples:**

130

131

```java

132

// Split string into multiple rows

133

@FunctionHint(output = @DataTypeHint("ROW<word STRING>"))

134

public class SplitFunction extends TableFunction<Row> {

135

136

public void eval(String str) {

137

if (str != null) {

138

for (String word : str.split("\\s+")) {

139

collect(Row.of(word));

140

}

141

}

142

}

143

}

144

145

// Generate number sequence

146

@FunctionHint(output = @DataTypeHint("ROW<num INT>"))

147

public class RangeFunction extends TableFunction<Row> {

148

149

public void eval(Integer start, Integer end) {

150

if (start != null && end != null) {

151

for (int i = start; i <= end; i++) {

152

collect(Row.of(i));

153

}

154

}

155

}

156

}

157

158

// Registration and usage

159

tableEnv.createTemporarySystemFunction("split_words", new SplitFunction());

160

161

// SQL usage with LATERAL TABLE

162

Table result = tableEnv.sqlQuery(

163

"SELECT t.word, COUNT(*) as word_count " +

164

"FROM documents d, LATERAL TABLE(split_words(d.content)) AS t(word) " +

165

"GROUP BY t.word"

166

);

167

168

// Table API usage

169

Table documents = tableEnv.from("documents");

170

Table words = documents

171

.joinLateral(call("split_words", $("content")).as("word"))

172

.select($("doc_id"), $("word"));

173

```

174

175

### Aggregate Functions

176

177

Functions that aggregate multiple input rows into a single output value.

178

179

```java { .api }

180

/**

181

* Base class for user-defined aggregate functions

182

* @param <T> Type of the final result

183

* @param <ACC> Type of the accumulator

184

*/

185

abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {

186

/**

187

* Creates a new accumulator for aggregation

188

* @return New accumulator instance

189

*/

190

public abstract ACC createAccumulator();

191

192

/**

193

* Extracts the final result from the accumulator

194

* @param accumulator Final accumulator state

195

* @return Aggregation result

196

*/

197

public abstract T getValue(ACC accumulator);

198

199

// Users implement accumulate() method(s)

200

// public void accumulate(ACC accumulator, InputType1 input1, InputType2 input2, ...);

201

202

/**

203

* Optional: Retracts a value from the accumulator (for changelog streams)

204

* @param accumulator Accumulator to retract from

205

* @param input Input values to retract

206

*/

207

// public void retract(ACC accumulator, InputType1 input1, InputType2 input2, ...);

208

209

/**

210

* Optional: Merges two accumulators (for batch processing)

211

* @param accumulator Target accumulator

212

* @param accumulators Source accumulators to merge

213

*/

214

// public void merge(ACC accumulator, Iterable<ACC> accumulators);

215

}

216

```

217

218

**Usage Examples:**

219

220

```java

221

// Weighted average aggregate function

222

public class WeightedAvgAccumulator {

223

public double sum = 0.0;

224

public double weightSum = 0.0;

225

}

226

227

public class WeightedAverage extends AggregateFunction<Double, WeightedAvgAccumulator> {

228

229

@Override

230

public WeightedAvgAccumulator createAccumulator() {

231

return new WeightedAvgAccumulator();

232

}

233

234

public void accumulate(WeightedAvgAccumulator acc, Double value, Double weight) {

235

if (value != null && weight != null) {

236

acc.sum += value * weight;

237

acc.weightSum += weight;

238

}

239

}

240

241

@Override

242

public Double getValue(WeightedAvgAccumulator acc) {

243

return acc.weightSum != 0 ? acc.sum / acc.weightSum : null;

244

}

245

246

public void retract(WeightedAvgAccumulator acc, Double value, Double weight) {

247

if (value != null && weight != null) {

248

acc.sum -= value * weight;

249

acc.weightSum -= weight;

250

}

251

}

252

253

public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> others) {

254

for (WeightedAvgAccumulator other : others) {

255

acc.sum += other.sum;

256

acc.weightSum += other.weightSum;

257

}

258

}

259

}

260

261

// Registration and usage

262

tableEnv.createTemporarySystemFunction("weighted_avg", new WeightedAverage());

263

264

Table result = tableEnv.sqlQuery(

265

"SELECT product_category, weighted_avg(price, quantity) as avg_price " +

266

"FROM sales " +

267

"GROUP BY product_category"

268

);

269

```

270

271

### Table Aggregate Functions

272

273

Functions that aggregate multiple input rows into multiple output rows.

274

275

```java { .api }

276

/**

277

* Base class for user-defined table aggregate functions

278

* @param <T> Type of the output rows

279

* @param <ACC> Type of the accumulator

280

*/

281

abstract class TableAggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {

282

/**

283

* Creates a new accumulator for aggregation

284

* @return New accumulator instance

285

*/

286

public abstract ACC createAccumulator();

287

288

// Users implement accumulate() method(s)

289

// public void accumulate(ACC accumulator, InputType1 input1, InputType2 input2, ...);

290

291

/**

292

* Emits the final result from the accumulator

293

* @param accumulator Final accumulator state

294

* @param out Collector for emitting results

295

*/

296

public abstract void emitValue(ACC accumulator, Collector<T> out);

297

298

/**

299

* Optional: Emits incremental updates with retraction

300

* @param accumulator Current accumulator state

301

* @param out Collector for emitting results with retraction

302

*/

303

// public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out);

304

}

305

```

306

307

**Usage Examples:**

308

309

```java

310

// Top N aggregate function

311

public class TopNAccumulator {

312

public List<Tuple2<Integer, String>> topN = new ArrayList<>();

313

public int n;

314

}

315

316

public class TopN extends TableAggregateFunction<Tuple2<Integer, String>, TopNAccumulator> {

317

private int n;

318

319

public TopN(int n) {

320

this.n = n;

321

}

322

323

@Override

324

public TopNAccumulator createAccumulator() {

325

TopNAccumulator acc = new TopNAccumulator();

326

acc.n = n;

327

return acc;

328

}

329

330

public void accumulate(TopNAccumulator acc, Integer score, String name) {

331

if (score != null && name != null) {

332

acc.topN.add(Tuple2.of(score, name));

333

acc.topN.sort((a, b) -> b.f0.compareTo(a.f0)); // Sort descending

334

if (acc.topN.size() > acc.n) {

335

acc.topN.remove(acc.topN.size() - 1);

336

}

337

}

338

}

339

340

@Override

341

public void emitValue(TopNAccumulator acc, Collector<Tuple2<Integer, String>> out) {

342

for (Tuple2<Integer, String> item : acc.topN) {

343

out.collect(item);

344

}

345

}

346

}

347

348

// Registration and usage

349

tableEnv.createTemporarySystemFunction("top3", new TopN(3));

350

351

Table result = tableEnv.sqlQuery(

352

"SELECT score, name " +

353

"FROM (SELECT score, name FROM players) " +

354

"GROUP BY () " +

355

"FLAT_AGGREGATE(top3(score, name)) AS (score, name)"

356

);

357

```

358

359

### Async Functions

360

361

Functions that perform asynchronous operations for I/O bound tasks.

362

363

```java { .api }

364

/**

365

* Async scalar function for I/O bound operations

366

*/

367

class AsyncScalarFunction extends UserDefinedFunction {

368

// Users implement evalAsync() methods that return CompletableFuture

369

// public CompletableFuture<ReturnType> evalAsync(InputType1 input1, InputType2 input2, ...);

370

}

371

372

/**

373

* Base class for async table functions

374

* @param <T> Type of output rows

375

*/

376

abstract class AsyncTableFunction<T> extends UserDefinedFunction {

377

// Users implement evalAsync() methods that use AsyncCollector

378

// public void evalAsync(InputType1 input1, AsyncCollector<T> collector);

379

}

380

```

381

382

**Usage Examples:**

383

384

```java

385

// Async HTTP lookup function

386

public class HttpLookupFunction extends AsyncScalarFunction {

387

private transient AsyncHttpClient httpClient;

388

389

@Override

390

public void open(FunctionContext context) throws Exception {

391

httpClient = Dsl.asyncHttpClient();

392

}

393

394

public CompletableFuture<String> evalAsync(String url) {

395

if (url == null) {

396

return CompletableFuture.completedFuture(null);

397

}

398

399

return httpClient

400

.prepareGet(url)

401

.execute()

402

.toCompletableFuture()

403

.thenApply(response -> response.getResponseBody());

404

}

405

406

@Override

407

public void close() throws Exception {

408

if (httpClient != null) {

409

httpClient.close();

410

}

411

}

412

}

413

414

// Registration and usage

415

tableEnv.createTemporarySystemFunction("http_get", new HttpLookupFunction());

416

```

417

418

### Function Type Inference

419

420

Advanced type inference for complex function signatures.

421

422

```java { .api }

423

/**

424

* Annotation for providing type hints to functions

425

*/

426

@Target({ElementType.TYPE, ElementType.METHOD})

427

@Retention(RetentionPolicy.RUNTIME)

428

@interface FunctionHint {

429

DataTypeHint[] input() default {};

430

DataTypeHint output() default @DataTypeHint();

431

boolean isDeterministic() default true;

432

}

433

434

/**

435

* Annotation for specifying data types

436

*/

437

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})

438

@Retention(RetentionPolicy.RUNTIME)

439

@interface DataTypeHint {

440

String value() default "";

441

Class<?> bridgedTo() default void.class;

442

boolean allowRawGlobally() default false;

443

}

444

```

445

446

**Usage Examples:**

447

448

```java

449

// Function with explicit type hints

450

@FunctionHint(

451

input = {@DataTypeHint("STRING"), @DataTypeHint("INT")},

452

output = @DataTypeHint("ARRAY<STRING>")

453

)

454

public class RepeatFunction extends TableFunction<String[]> {

455

456

public void eval(String str, Integer count) {

457

if (str != null && count != null && count > 0) {

458

String[] result = new String[count];

459

Arrays.fill(result, str);

460

collect(result);

461

}

462

}

463

}

464

465

// Complex return type with row structure

466

@FunctionHint(output = @DataTypeHint("ROW<id BIGINT, name STRING, score DOUBLE>"))

467

public class ParseResultFunction extends TableFunction<Row> {

468

469

public void eval(String jsonString) {

470

// Parse JSON and emit structured rows

471

JsonObject json = JsonParser.parseString(jsonString).getAsJsonObject();

472

collect(Row.of(

473

json.get("id").getAsLong(),

474

json.get("name").getAsString(),

475

json.get("score").getAsDouble()

476

));

477

}

478

}

479

```

480

481

## Types

482

483

### Function Base Classes

484

485

```java { .api }

486

abstract class UserDefinedFunction implements FunctionDefinition {

487

void open(FunctionContext context) throws Exception;

488

void close() throws Exception;

489

boolean isDeterministic();

490

TypeInference getTypeInference();

491

}

492

493

abstract class ScalarFunction extends UserDefinedFunction {

494

// Implementation-specific eval() methods

495

}

496

497

abstract class TableFunction<T> extends UserDefinedFunction {

498

protected void collect(T result);

499

DataType getResultType();

500

}

501

502

abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {

503

public abstract ACC createAccumulator();

504

public abstract T getValue(ACC accumulator);

505

// Optional: public void retract(ACC accumulator, ...);

506

// Optional: public void merge(ACC accumulator, Iterable<ACC> others);

507

}

508

509

abstract class TableAggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {

510

public abstract ACC createAccumulator();

511

public abstract void emitValue(ACC accumulator, Collector<T> out);

512

// Optional: public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out);

513

}

514

```

515

516

### Function Context

517

518

```java { .api }

519

interface FunctionContext {

520

MetricGroup getMetricGroup();

521

int getIndexOfThisSubtask();

522

int getNumberOfParallelSubtasks();

523

String getJobParameter(String key, String defaultValue);

524

525

// Access to distributed cache

526

File getCachedFile(String name);

527

}

528

```

529

530

### Collectors and Async Support

531

532

```java { .api }

533

interface Collector<T> {

534

void collect(T record);

535

}

536

537

interface RetractableCollector<T> extends Collector<T> {

538

void retract(T record);

539

}

540

541

interface AsyncCollector<T> {

542

void collect(Collection<T> result);

543

void complete(Collection<T> result);

544

}

545

```