or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdcode-generation.mdexpression-system.mdfunction-integration.mdindex.mdplanner-factory.mdquery-planning.md

function-integration.mddocs/

0

# Function Integration

1

2

The function integration system provides comprehensive support for user-defined functions (UDFs) including scalar functions, aggregate functions, and table functions. It handles function registration, validation, SQL integration, and code generation for optimal performance.

3

4

## Capabilities

5

6

### UserDefinedFunctionUtils - UDF Utilities

7

8

Core utilities for user-defined function handling, validation, and integration with the Flink Table API.

9

10

```scala { .api }

11

/**

12

* Utilities for user-defined function management and validation

13

*/

14

object UserDefinedFunctionUtils {

15

16

/**

17

* Validates that a class can be instantiated for UDF usage

18

* @param clazz Class to check for instantiation capability

19

* @throws ValidationException if class cannot be instantiated

20

*/

21

def checkForInstantiation(clazz: Class[_]): Unit

22

23

/**

24

* Checks whether the given class is not a Scala singleton object

25

* Prevents concurrent risks with TableFunction implementations

26

* @param clazz Class to check for singleton pattern

27

* @throws ValidationException if class is a Scala object

28

*/

29

def checkNotSingleton(clazz: Class[_]): Unit

30

31

/**

32

* Gets the eval method signature for a user-defined function

33

* @param function User-defined function instance

34

* @param expectedTypes Expected parameter types

35

* @return Method signature for eval method

36

*/

37

def getEvalMethodSignature(

38

function: UserDefinedFunction,

39

expectedTypes: Array[LogicalType]

40

): Method

41

42

/**

43

* Checks if a specified method exists in the function

44

* @param method Method name to check

45

* @param function User-defined function instance

46

* @return true if method exists, false otherwise

47

*/

48

def ifMethodExistInFunction(method: String, function: UserDefinedFunction): Boolean

49

50

/**

51

* Extracts method signatures from UDF class for function inference

52

* @param clazz UDF class to analyze

53

* @param methodName Name of the method to extract (e.g., "eval", "accumulate")

54

* @return Array of method signatures found

55

*/

56

def getMethodSignatures(clazz: Class[_], methodName: String): Array[MethodSignature]

57

58

/**

59

* Determines result type information from UDF method signatures

60

* @param signatures Array of method signatures

61

* @param inputTypes Input argument types

62

* @return Inferred result type information

63

*/

64

def getResultTypeFromSignatures(

65

signatures: Array[MethodSignature],

66

inputTypes: Array[DataType]

67

): TypeInformation[_]

68

}

69

```

70

71

**Usage Example:**

72

73

```scala

74

import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils

75

import org.apache.flink.table.functions.ScalarFunction

76

77

// Custom scalar function

78

class MyUpperFunction extends ScalarFunction {

79

def eval(str: String): String = str.toUpperCase

80

}

81

82

// Validate function before registration

83

val myFunction = new MyUpperFunction()

84

UserDefinedFunctionUtils.validateScalarFunction(myFunction)

85

UserDefinedFunctionUtils.checkForInstantiation(myFunction.getClass)

86

87

// Function can now be registered with table environment

88

tableEnv.createTemporarySystemFunction("MY_UPPER", myFunction)

89

```

90

91

### Scalar Function SQL Integration

92

93

SQL integration layer for scalar functions, providing seamless integration between user-defined scalar functions and SQL queries.

94

95

```scala { .api }

96

/**

97

* SQL integration for scalar functions

98

*/

99

class ScalarSqlFunction(

100

identifier: String,

101

displayName: String,

102

scalarFunction: ScalarFunction,

103

typeFactory: FlinkTypeFactory

104

) extends SqlFunction {

105

106

/**

107

* Returns function kind for SQL integration

108

* @return SqlKind.OTHER_FUNCTION for scalar functions

109

*/

110

def getKind: SqlKind = SqlKind.OTHER_FUNCTION

111

112

/**

113

* Returns SQL identifier for this function

114

* @return Function identifier used in SQL

115

*/

116

def getName: String = identifier

117

118

/**

119

* Infers return type based on operand types

120

* @param opBinding Operand binding with argument types

121

* @return Inferred return type for SQL query

122

*/

123

def inferReturnType(opBinding: SqlOperatorBinding): RelDataType

124

125

/**

126

* Validates function call with given operands

127

* @param callBinding Call binding with argument information

128

* @return True if call is valid, false otherwise

129

*/

130

def checkOperandTypes(callBinding: SqlCallBinding): Boolean

131

}

132

```

133

134

### Aggregate Function SQL Integration

135

136

SQL integration for aggregate functions, handling accumulator management and result computation.

137

138

```scala { .api }

139

/**

140

* SQL integration for aggregate functions

141

*/

142

class AggSqlFunction(

143

identifier: String,

144

displayName: String,

145

aggregateFunction: AggregateFunction[_, _],

146

resultType: RelDataType,

147

accType: RelDataType,

148

typeFactory: FlinkTypeFactory

149

) extends SqlAggFunction {

150

151

/**

152

* Returns function kind for SQL integration

153

* @return SqlKind.OTHER_FUNCTION for aggregate functions

154

*/

155

def getKind: SqlKind = SqlKind.OTHER_FUNCTION

156

157

/**

158

* Returns aggregate function name

159

* @return Function identifier used in SQL

160

*/

161

def getName: String = identifier

162

163

/**

164

* Infers return type for aggregate result

165

* @param opBinding Operand binding information

166

* @return Return type of aggregation result

167

*/

168

def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = resultType

169

170

/**

171

* Returns accumulator type information

172

* @param typeFactory Type factory for type creation

173

* @return Accumulator type information

174

*/

175

def getAccumulatorType(typeFactory: RelDataTypeFactory): RelDataType = accType

176

177

/**

178

* Validates aggregate function call

179

* @param callBinding Call binding with operand information

180

* @return True if call is valid

181

*/

182

def checkOperandTypes(callBinding: SqlCallBinding): Boolean

183

}

184

```

185

186

### Table Function SQL Integration

187

188

SQL integration for table functions (UDTFs), enabling table-valued function calls in SQL queries.

189

190

```scala { .api }

191

/**

192

* SQL integration for table functions (UDTFs)

193

*/

194

class TableSqlFunction(

195

identifier: String,

196

displayName: String,

197

tableFunction: TableFunction[_],

198

resultType: RelDataType,

199

typeFactory: FlinkTypeFactory

200

) extends SqlFunction {

201

202

/**

203

* Returns function kind for table functions

204

* @return SqlKind.OTHER_FUNCTION for table functions

205

*/

206

def getKind: SqlKind = SqlKind.OTHER_FUNCTION

207

208

/**

209

* Returns table function name

210

* @return Function identifier used in SQL

211

*/

212

def getName: String = identifier

213

214

/**

215

* Infers return type for table function

216

* @param opBinding Operand binding information

217

* @return Row type returned by table function

218

*/

219

def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = resultType

220

221

/**

222

* Validates table function call

223

* @param callBinding Call binding with argument types

224

* @return True if call is valid

225

*/

226

def checkOperandTypes(callBinding: SqlCallBinding): Boolean

227

228

/**

229

* Returns table function definition for query planning

230

* @return TableFunction instance

231

*/

232

def getTableFunction: TableFunction[_] = tableFunction

233

}

234

```

235

236

## Function Call Code Generation

237

238

Integration with the code generation system for optimized function execution:

239

240

```scala { .api }

241

/**

242

* Code generation for function calls

243

*/

244

object FunctionCallCodeGenerator {

245

246

/**

247

* Generates code for scalar function calls

248

* @param scalarFunction Scalar function to generate code for

249

* @param operands Operand expressions

250

* @param resultType Expected result type

251

* @param config Table configuration

252

* @return Generated code expression

253

*/

254

def generateScalarFunctionCall(

255

scalarFunction: ScalarFunction,

256

operands: Seq[GeneratedExpression],

257

resultType: DataType,

258

config: TableConfig

259

): GeneratedExpression

260

261

/**

262

* Generates code for aggregate function calls

263

* @param aggregateFunction Aggregate function

264

* @param operands Input operand expressions

265

* @param accType Accumulator type

266

* @param resultType Result type

267

* @return Generated aggregate handler code

268

*/

269

def generateAggregateFunctionCall(

270

aggregateFunction: AggregateFunction[_, _],

271

operands: Seq[GeneratedExpression],

272

accType: DataType,

273

resultType: DataType

274

): GeneratedAggregateHandler

275

}

276

```

277

278

## Built-in Function Integration

279

280

Support for built-in SQL functions and their integration with the planning system:

281

282

```scala { .api }

283

/**

284

* Built-in function definitions and utilities

285

*/

286

object BuiltInFunctionDefinitions {

287

288

// String functions

289

val UPPER: FunctionDefinition

290

val LOWER: FunctionDefinition

291

val SUBSTRING: FunctionDefinition

292

val TRIM: FunctionDefinition

293

val CONCAT: FunctionDefinition

294

295

// Mathematical functions

296

val ABS: FunctionDefinition

297

val CEIL: FunctionDefinition

298

val FLOOR: FunctionDefinition

299

val ROUND: FunctionDefinition

300

val SIN: FunctionDefinition

301

val COS: FunctionDefinition

302

303

// Date/time functions

304

val CURRENT_TIMESTAMP: FunctionDefinition

305

val DATE_FORMAT: FunctionDefinition

306

val EXTRACT: FunctionDefinition

307

308

// Aggregate functions

309

val COUNT: FunctionDefinition

310

val SUM: FunctionDefinition

311

val AVG: FunctionDefinition

312

val MIN: FunctionDefinition

313

val MAX: FunctionDefinition

314

315

// Window functions

316

val ROW_NUMBER: FunctionDefinition

317

val RANK: FunctionDefinition

318

val DENSE_RANK: FunctionDefinition

319

val LAG: FunctionDefinition

320

val LEAD: FunctionDefinition

321

}

322

```

323

324

## Function Catalog Integration

325

326

Integration with Flink's function catalog system for function discovery and resolution:

327

328

```java { .api }

329

/**

330

* Function catalog integration (from flink-table-api)

331

* Functions are registered and resolved through FunctionCatalog

332

*/

333

public interface FunctionCatalog {

334

/**

335

* Registers a temporary system function

336

* @param name Function name

337

* @param functionDefinition Function definition

338

*/

339

void registerTemporarySystemFunction(String name, FunctionDefinition functionDefinition);

340

341

/**

342

* Registers a temporary catalog function

343

* @param objectIdentifier Function identifier with catalog/database/name

344

* @param functionDefinition Function definition

345

* @param ignoreIfExists Whether to ignore if function already exists

346

*/

347

void registerTemporaryCatalogFunction(

348

ObjectIdentifier objectIdentifier,

349

FunctionDefinition functionDefinition,

350

boolean ignoreIfExists

351

);

352

353

/**

354

* Looks up function by identifier

355

* @param objectIdentifier Function identifier

356

* @return Optional function lookup result

357

*/

358

Optional<FunctionLookup.Result> lookupFunction(ObjectIdentifier objectIdentifier);

359

}

360

```

361

362

## Error Handling and Validation

363

364

Function validation ensures proper UDF implementation:

365

366

```scala

367

// Common validation scenarios

368

try {

369

UserDefinedFunctionUtils.validateScalarFunction(myFunction)

370

} catch {

371

case e: ValidationException =>

372

// Handle validation errors:

373

// - Missing eval() method

374

// - Invalid method signatures

375

// - Unsupported parameter types

376

// - Missing result type information

377

}

378

379

// Function instantiation validation

380

try {

381

UserDefinedFunctionUtils.checkForInstantiation(functionClass)

382

} catch {

383

case e: ValidationException =>

384

// Handle instantiation errors:

385

// - No default constructor

386

// - Constructor throws exceptions

387

// - Class is abstract or interface

388

}

389

```

390

391

## Advanced Function Features

392

393

### Generic Function Support

394

395

Support for generic functions with type parameter resolution:

396

397

```scala

398

// Generic function example (handled automatically by utilities)

399

class GenericFunction[T] extends ScalarFunction {

400

def eval(input: T): T = input

401

}

402

403

// Type resolution is handled during function registration

404

val genericFunc = new GenericFunction[String]()

405

UserDefinedFunctionUtils.validateScalarFunction(genericFunc)

406

```

407

408

### Deterministic Function Optimization

409

410

Functions marked as deterministic can be optimized during planning:

411

412

```java

413

public class MyDeterministicFunction extends ScalarFunction {

414

@Override

415

public boolean isDeterministic() {

416

return true; // Enables constant folding and other optimizations

417

}

418

419

public String eval(String input) {

420

return input.toUpperCase();

421

}

422

}

423

```