or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddata-types.mddatastream-bridge.mdexpressions.mdfunctions.mdindex.mdsql-gateway.mdtable-operations.md

functions.mddocs/

0

# User-Defined Functions

1

2

Framework for extending Apache Flink with custom scalar functions, table functions, and aggregate functions for specialized processing requirements.

3

4

## Capabilities

5

6

### ScalarFunction

7

8

Base class for creating custom scalar functions that take one or more input parameters and return a single result value.

9

10

```java { .api }

11

/**

12

* Base class for user-defined scalar functions

13

*/

14

public abstract class ScalarFunction extends UserDefinedFunction {

15

/**

16

* Evaluation method that must be implemented for the function logic.

17

* Method can be overloaded for different parameter combinations.

18

* @param args Input arguments (types must match function signature)

19

* @return Computed result value

20

*/

21

public abstract Object eval(Object... args);

22

23

/**

24

* Optional method to specify the result type when it cannot be inferred

25

* @param signature Array of argument types

26

* @return DataType of the function result

27

*/

28

public DataType getResultType(DataType[] signature);

29

30

/**

31

* Optional method to specify type inference for parameters

32

* @return TypeInference specification

33

*/

34

public TypeInference getTypeInference();

35

}

36

```

37

38

**ScalarFunction Example:**

39

40

```java

41

// Custom hash function

42

public class HashFunction extends ScalarFunction {

43

public String eval(String input) {

44

if (input == null) {

45

return null;

46

}

47

return Integer.toHexString(input.hashCode());

48

}

49

50

// Overloaded version for multiple inputs

51

public String eval(String input1, String input2) {

52

if (input1 == null || input2 == null) {

53

return null;

54

}

55

return Integer.toHexString((input1 + input2).hashCode());

56

}

57

}

58

59

// Register and use the function

60

tEnv.createTemporarySystemFunction("hash", HashFunction.class);

61

62

Table result = orders

63

.select($("order_id"),

64

call("hash", $("customer_email")).as("customer_hash"),

65

call("hash", $("order_id").cast(DataTypes.STRING()),

66

$("customer_email")).as("order_hash"));

67

```

68

69

### TableFunction

70

71

Base class for creating custom table functions that take one or more input parameters and return multiple rows (one-to-many transformation).

72

73

```java { .api }

74

/**

75

* Base class for user-defined table functions

76

* @param <T> Type of the output rows

77

*/

78

public abstract class TableFunction<T> extends UserDefinedFunction {

79

/**

80

* Evaluation method that must be implemented for the function logic.

81

* Use collect() method to emit output rows.

82

* @param args Input arguments (types must match function signature)

83

*/

84

public abstract void eval(Object... args);

85

86

/**

87

* Emit an output row from the table function

88

* @param result Row to emit

89

*/

90

protected void collect(T result);

91

92

/**

93

* Optional method to specify the result type when it cannot be inferred

94

* @param signature Array of argument types

95

* @return DataType of the function result rows

96

*/

97

public DataType getResultType(DataType[] signature);

98

}

99

```

100

101

**TableFunction Example:**

102

103

```java

104

// Function to split comma-separated values into rows

105

@FunctionHint(output = @DataTypeHint("STRING"))

106

public class SplitFunction extends TableFunction<String> {

107

public void eval(String str) {

108

if (str != null) {

109

for (String s : str.split(",")) {

110

collect(s.trim());

111

}

112

}

113

}

114

}

115

116

// Function returning structured rows

117

@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))

118

public class WordAnalyzer extends TableFunction<Row> {

119

public void eval(String sentence) {

120

if (sentence != null) {

121

for (String word : sentence.split("\\s+")) {

122

collect(Row.of(word, word.length()));

123

}

124

}

125

}

126

}

127

128

// Register and use table functions

129

tEnv.createTemporarySystemFunction("split", SplitFunction.class);

130

tEnv.createTemporarySystemFunction("analyze_words", WordAnalyzer.class);

131

132

// Use with LATERAL JOIN

133

Table result = orders

134

.joinLateral(call("split", $("product_tags")).as("tag"))

135

.select($("order_id"), $("tag"));

136

137

// Use with LEFT JOIN LATERAL for optional results

138

Table analysis = documents

139

.leftOuterJoinLateral(call("analyze_words", $("title")).as("word", "length"))

140

.select($("document_id"), $("word"), $("length"));

141

```

142

143

### AggregateFunction

144

145

Base class for creating custom aggregate functions that accumulate values over multiple rows and return a single result.

146

147

```java { .api }

148

/**

149

* Base class for user-defined aggregate functions

150

* @param <T> Type of the aggregation result

151

* @param <ACC> Type of the accumulator used during aggregation

152

*/

153

public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {

154

/**

155

* Create and initialize a new accumulator

156

* @return New accumulator instance

157

*/

158

public abstract ACC createAccumulator();

159

160

/**

161

* Accumulate input values into the accumulator

162

* @param accumulator Current accumulator state

163

* @param args Input values to accumulate

164

*/

165

public abstract void accumulate(ACC accumulator, Object... args);

166

167

/**

168

* Extract the final result from the accumulator

169

* @param accumulator Final accumulator state

170

* @return Aggregated result

171

*/

172

public abstract T getValue(ACC accumulator);

173

174

/**

175

* Retract input values from the accumulator (for streaming updates)

176

* @param accumulator Current accumulator state

177

* @param args Input values to retract

178

*/

179

public void retract(ACC accumulator, Object... args) {

180

// Optional: implement for streaming scenarios with retractions

181

}

182

183

/**

184

* Merge two accumulators (for distributed aggregation)

185

* @param accumulator Target accumulator

186

* @param others Accumulators to merge from

187

*/

188

public void merge(ACC accumulator, Iterable<ACC> others) {

189

// Optional: implement for distributed scenarios

190

}

191

192

/**

193

* Reset the accumulator to initial state

194

* @param accumulator Accumulator to reset

195

*/

196

public void resetAccumulator(ACC accumulator) {

197

// Optional: implement for reusing accumulators

198

}

199

}

200

```

201

202

**AggregateFunction Example:**

203

204

```java

205

// Custom weighted average function

206

public class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {

207

208

// Accumulator class

209

public static class WeightedAvgAccumulator {

210

public double sum = 0;

211

public double weightSum = 0;

212

}

213

214

@Override

215

public WeightedAvgAccumulator createAccumulator() {

216

return new WeightedAvgAccumulator();

217

}

218

219

@Override

220

public void accumulate(WeightedAvgAccumulator acc, Double value, Double weight) {

221

if (value != null && weight != null) {

222

acc.sum += value * weight;

223

acc.weightSum += weight;

224

}

225

}

226

227

@Override

228

public Double getValue(WeightedAvgAccumulator acc) {

229

if (acc.weightSum == 0) {

230

return null;

231

}

232

return acc.sum / acc.weightSum;

233

}

234

235

@Override

236

public void retract(WeightedAvgAccumulator acc, Double value, Double weight) {

237

if (value != null && weight != null) {

238

acc.sum -= value * weight;

239

acc.weightSum -= weight;

240

}

241

}

242

243

@Override

244

public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> others) {

245

for (WeightedAvgAccumulator other : others) {

246

acc.sum += other.sum;

247

acc.weightSum += other.weightSum;

248

}

249

}

250

}

251

252

// Register and use aggregate function

253

tEnv.createTemporarySystemFunction("weighted_avg", WeightedAvg.class);

254

255

Table result = sales

256

.groupBy($("product_category"))

257

.select($("product_category"),

258

call("weighted_avg", $("price"), $("quantity")).as("weighted_avg_price"));

259

```

260

261

### AsyncScalarFunction

262

263

Base class for creating asynchronous scalar functions that can perform non-blocking I/O operations.

264

265

```java { .api }

266

/**

267

* Base class for asynchronous user-defined scalar functions

268

*/

269

public abstract class AsyncScalarFunction extends UserDefinedFunction {

270

/**

271

* Asynchronous evaluation method

272

* @param resultFuture CompletableFuture to complete with the result

273

* @param args Input arguments

274

*/

275

public abstract void eval(CompletableFuture<Object> resultFuture, Object... args);

276

277

/**

278

* Optional method to specify timeout for async operations

279

* @return Timeout duration in milliseconds

280

*/

281

public long getTimeout() {

282

return 60000; // Default 60 seconds

283

}

284

}

285

```

286

287

**AsyncScalarFunction Example:**

288

289

```java

290

// Async function for external service lookup

291

public class AsyncEnrichFunction extends AsyncScalarFunction {

292

private transient HttpClient httpClient;

293

294

@Override

295

public void open(FunctionContext context) throws Exception {

296

httpClient = HttpClient.newHttpClient();

297

}

298

299

public void eval(CompletableFuture<String> resultFuture, String userId) {

300

if (userId == null) {

301

resultFuture.complete(null);

302

return;

303

}

304

305

HttpRequest request = HttpRequest.newBuilder()

306

.uri(URI.create("https://api.example.com/users/" + userId))

307

.build();

308

309

httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())

310

.thenApply(HttpResponse::body)

311

.thenAccept(resultFuture::complete)

312

.exceptionally(throwable -> {

313

resultFuture.complete(null); // Handle errors gracefully

314

return null;

315

});

316

}

317

318

@Override

319

public void close() throws Exception {

320

if (httpClient != null) {

321

// Cleanup resources

322

}

323

}

324

}

325

```

326

327

### AsyncTableFunction

328

329

Base class for creating asynchronous table functions for non-blocking one-to-many transformations.

330

331

```java { .api }

332

/**

333

* Base class for asynchronous user-defined table functions

334

* @param <T> Type of the output rows

335

*/

336

public abstract class AsyncTableFunction<T> extends UserDefinedFunction {

337

/**

338

* Asynchronous evaluation method

339

* @param resultFuture CompletableFuture to complete with collection of results

340

* @param args Input arguments

341

*/

342

public abstract void eval(CompletableFuture<Collection<T>> resultFuture, Object... args);

343

}

344

```

345

346

### ProcessTableFunction

347

348

Advanced table function for complex transformations with access to multiple input tables and state.

349

350

```java { .api }

351

/**

352

* Base class for process table functions with advanced capabilities

353

* @param <T> Type of the output rows

354

*/

355

public abstract class ProcessTableFunction<T> extends UserDefinedFunction {

356

/**

357

* Process method with access to context

358

* @param ctx Processing context with state and timer access

359

* @param args Input arguments

360

*/

361

public abstract void eval(ProcessContext ctx, Object... args) throws Exception;

362

363

/**

364

* Processing context interface

365

*/

366

public interface ProcessContext {

367

/**

368

* Get keyed state for maintaining function state

369

* @param stateDescriptor State descriptor

370

* @return State instance

371

*/

372

<S extends State> S getState(StateDescriptor<S, ?> stateDescriptor);

373

374

/**

375

* Emit an output row

376

* @param result Row to emit

377

*/

378

void collect(T result);

379

380

/**

381

* Get current processing time

382

* @return Current processing time timestamp

383

*/

384

long currentProcessingTime();

385

386

/**

387

* Get current watermark

388

* @return Current event time watermark

389

*/

390

long currentWatermark();

391

}

392

}

393

```

394

395

### Function Registration

396

397

Methods for registering user-defined functions in the TableEnvironment.

398

399

```java { .api }

400

/**

401

* Register a function class as a temporary system function

402

* @param name Function name for SQL usage

403

* @param functionClass Function implementation class

404

*/

405

public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass);

406

407

/**

408

* Register a function instance as a temporary system function

409

* @param name Function name for SQL usage

410

* @param functionInstance Function implementation instance

411

*/

412

public void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance);

413

414

/**

415

* Register a function in a specific catalog and database

416

* @param path Full path to the function (catalog.database.function)

417

* @param functionClass Function implementation class

418

*/

419

public void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);

420

421

/**

422

* Drop a temporary system function

423

* @param name Function name to drop

424

* @return true if function existed and was dropped

425

*/

426

public boolean dropTemporarySystemFunction(String name);

427

```

428

429

### Type Hints and Annotations

430

431

Annotations for providing type information to the Flink runtime.

432

433

```java { .api }

434

/**

435

* Annotation for providing function-level type hints

436

*/

437

@Target(ElementType.TYPE)

438

@Retention(RetentionPolicy.RUNTIME)

439

public @interface FunctionHint {

440

/**

441

* Hint for function input parameters

442

*/

443

DataTypeHint[] input() default {};

444

445

/**

446

* Hint for function output type

447

*/

448

DataTypeHint output() default @DataTypeHint();

449

450

/**

451

* Whether function is deterministic

452

*/

453

boolean isDeterministic() default true;

454

}

455

456

/**

457

* Annotation for providing data type hints

458

*/

459

@Target({ElementType.PARAMETER, ElementType.METHOD})

460

@Retention(RetentionPolicy.RUNTIME)

461

public @interface DataTypeHint {

462

/**

463

* Data type specification string

464

*/

465

String value() default "";

466

467

/**

468

* Whether the type is nullable

469

*/

470

DefaultBoolean allowRawGlobally() default DefaultBoolean.TRUE;

471

}

472

```

473

474

**Type Hint Examples:**

475

476

```java

477

@FunctionHint(

478

input = {@DataTypeHint("STRING"), @DataTypeHint("INT")},

479

output = @DataTypeHint("ARRAY<STRING>")

480

)

481

public class RepeatString extends ScalarFunction {

482

public String[] eval(String str, Integer count) {

483

if (str == null || count == null || count <= 0) {

484

return new String[0];

485

}

486

String[] result = new String[count];

487

Arrays.fill(result, str);

488

return result;

489

}

490

}

491

492

// Complex type hint for nested structure

493

@FunctionHint(output = @DataTypeHint("ROW<name STRING, stats ROW<count BIGINT, avg DOUBLE>>"))

494

public class AnalyzeData extends TableFunction<Row> {

495

public void eval(String data) {

496

// Implementation that emits Row objects matching the hint

497

}

498

}

499

```

500

501

### Function Lifecycle

502

503

Methods for managing function resources and state.

504

505

```java { .api }

506

/**

507

* Base class providing lifecycle methods for all user-defined functions

508

*/

509

public abstract class UserDefinedFunction {

510

/**

511

* Initialize function resources when function is first used

512

* @param context Function context with configuration and metrics

513

*/

514

public void open(FunctionContext context) throws Exception {

515

// Override to initialize resources

516

}

517

518

/**

519

* Clean up function resources when function is no longer needed

520

*/

521

public void close() throws Exception {

522

// Override to clean up resources

523

}

524

525

/**

526

* Check if function calls are deterministic

527

* @return true if function is deterministic (same input = same output)

528

*/

529

public boolean isDeterministic() {

530

return true;

531

}

532

}

533

534

/**

535

* Function context providing access to configuration and metrics

536

*/

537

public interface FunctionContext {

538

/**

539

* Get metric group for function metrics

540

* @return MetricGroup for registering custom metrics

541

*/

542

MetricGroup getMetricGroup();

543

544

/**

545

* Get function configuration

546

* @return Configuration object with job and function parameters

547

*/

548

Configuration getJobParameter();

549

}

550

```

551

552

**Function Lifecycle Example:**

553

554

```java

555

public class DatabaseLookupFunction extends AsyncScalarFunction {

556

private transient Connection connection;

557

private transient Counter lookupCounter;

558

559

@Override

560

public void open(FunctionContext context) throws Exception {

561

// Initialize database connection

562

String jdbcUrl = context.getJobParameter().getString("database.url");

563

connection = DriverManager.getConnection(jdbcUrl);

564

565

// Register metrics

566

lookupCounter = context.getMetricGroup().counter("lookup_count");

567

}

568

569

public void eval(CompletableFuture<String> resultFuture, String key) {

570

lookupCounter.inc();

571

572

CompletableFuture.supplyAsync(() -> {

573

try (PreparedStatement stmt = connection.prepareStatement("SELECT value FROM lookup WHERE key = ?")) {

574

stmt.setString(1, key);

575

ResultSet rs = stmt.executeQuery();

576

return rs.next() ? rs.getString("value") : null;

577

} catch (SQLException e) {

578

throw new RuntimeException(e);

579

}

580

}).thenAccept(resultFuture::complete);

581

}

582

583

@Override

584

public void close() throws Exception {

585

if (connection != null) {

586

connection.close();

587

}

588

}

589

}

590

```