tessl install tessl/maven-org-apache-flink--flink-table-api-scala_2-12@1.20.0Scala API for Apache Flink's Table/SQL ecosystem providing idiomatic Scala interfaces for table operations
Support for calling user-defined functions, system functions, and SQL expressions within Scala table programs. This enables seamless integration of custom logic and external functions into table operations.
Core methods for calling different types of functions.
/**
* Calls a function by catalog path with lookup resolution order:
* 1. Temporary system function
* 2. System function
* 3. Temporary catalog function
* 4. Catalog function
* @param path Function name or catalog.database.function path
* @param params Function parameters
* @return Function call expression
*/
def call(path: String, params: Expression*): Expression
/**
* Calls an unregistered inline user-defined function
* @param function UserDefinedFunction instance
* @param params Function parameters
* @return Function call expression
*/
def call(function: UserDefinedFunction, params: Expression*): Expression
/**
* Calls an unregistered function by class
* @param function UserDefinedFunction class
* @param params Function parameters
* @return Function call expression
*/
def call(function: Class[_ <: UserDefinedFunction], params: Expression*): Expression
/**
* Calls a SQL expression (limited to scalar expressions)
* @param sqlExpression SQL expression string
* @return Parsed and translated expression
*/
def callSql(sqlExpression: String): ExpressionUsage:
// System function calls
table.select(
call("UPPER", $"name") as "upperName",
call("SUBSTRING", $"text", 1, 10) as "prefix"
)
// Custom function calls
val myFunction = new MyScalarFunction()
table.select(call(myFunction, $"input") as "result")
// SQL expression calls
table.select(
callSql("CASE WHEN age >= 18 THEN 'adult' ELSE 'minor' END") as "ageGroup"
)Wrapper class for scalar functions providing apply method syntax.
implicit class ScalarFunctionCall(val s: ScalarFunction) {
/**
* Calls scalar function with parameters using apply syntax
* @param params Function parameters
* @return Function call expression
*/
def apply(params: Expression*): Expression
}Usage:
// Define and use scalar function
class UpperCaseFunction extends ScalarFunction {
def eval(str: String): String = str.toUpperCase
}
val upperCase = new UpperCaseFunction()
// Apply syntax usage
table.select(
$"name",
upperCase($"name") as "upperName"
)
// Equivalent to call(upperCase, $"name")Wrapper class for table functions enabling apply method syntax for UDTF calls.
implicit class TableFunctionCall(val t: TableFunction[_]) {
/**
* Calls table function with parameters using apply syntax
* @param params Function parameters
* @return Table function call expression
*/
def apply(params: Expression*): Expression
}Usage:
// Define table function
class SplitFunction extends TableFunction[Row] {
def eval(str: String): Unit = {
str.split(",").foreach(s => collect(Row.of(s.trim())))
}
}
val splitFunc = new SplitFunction()
// Use in lateral join
table
.joinLateral(splitFunc($"csvData") as ("item"))
.select($"id", $"item")Wrapper class for aggregate functions with support for both regular and distinct aggregations.
implicit class ImperativeAggregateFunctionCall[T: TypeInformation, ACC: TypeInformation](
val a: ImperativeAggregateFunction[T, ACC]
) {
/**
* Calls aggregate function with parameters
* @param params Function parameters
* @return Aggregate function call expression
*/
def apply(params: Expression*): Expression
/**
* Calls aggregate function with DISTINCT modifier
* @param params Function parameters
* @return DISTINCT aggregate function call expression
*/
def distinct(params: Expression*): Expression
}Usage:
// Define aggregate function
class AvgFunction extends AggregateFunction[Double, AvgAccumulator] {
override def createAccumulator(): AvgAccumulator = new AvgAccumulator()
override def getValue(accumulator: AvgAccumulator): Double =
accumulator.sum / accumulator.count
def accumulate(acc: AvgAccumulator, value: Double): Unit = {
acc.sum += value
acc.count += 1
}
}
val avgFunc = new AvgFunction()
// Regular aggregation
table
.groupBy($"category")
.select($"category", avgFunc($"value") as "avgValue")
// Distinct aggregation
table
.groupBy($"category")
.select($"category", avgFunc.distinct($"value") as "distinctAvg")Methods for registering functions in the table environment for reuse.
Usage with TableEnvironment:
// Register functions for reuse
tableEnv.createTemporarySystemFunction("myUpper", classOf[UpperCaseFunction])
tableEnv.createTemporaryFunction("mySplit", classOf[SplitFunction])
// Use registered functions
table.select(call("myUpper", $"name") as "upperName")
table.joinLateral(call("mySplit", $"csvData") as ("item"))Working with function contexts and complex parameter types.
Usage:
// Functions with multiple parameter types
class ComplexFunction extends ScalarFunction {
def eval(str: String, num: Integer, flag: Boolean): String = {
if (flag) s"${str}_${num}" else str
}
}
val complexFunc = new ComplexFunction()
table.select(
complexFunc($"name", $"id", $"active") as "result"
)
// Functions with optional parameters
class OptionalParamFunction extends ScalarFunction {
def eval(required: String): String = eval(required, "default")
def eval(required: String, optional: String): String = s"${required}_${optional}"
}
val optFunc = new OptionalParamFunction()
table.select(
optFunc($"name") as "result1",
optFunc($"name", $"suffix") as "result2"
)Support for generic function definitions with type preservation.
Usage:
// Generic function with type information
class GenericMapFunction[T: TypeInformation] extends ScalarFunction {
def eval(input: T, mapper: Function[T, String]): String = mapper(input)
}
// Type-safe usage
implicit val stringTypeInfo = createTypeInformation[String]
val mapFunc = new GenericMapFunction[String]()
table.select(mapFunc($"stringField", lit("prefix_")) as "mapped")Best practices for error handling in user-defined functions.
Usage:
class SafeDivideFunction extends ScalarFunction {
def eval(numerator: Double, denominator: Double): Double = {
if (denominator == 0.0) Double.NaN
else numerator / denominator
}
}
val safeDivide = new SafeDivideFunction()
table.select(
safeDivide($"revenue", $"units") as "pricePerUnit"
)
// Handle NaN results
table
.select(safeDivide($"revenue", $"units") as "price")
.filter(!$"price".isNaN)Combining multiple function calls for complex operations.
Usage:
// Chaining function calls
val processText = new TextProcessor()
val extractKeywords = new KeywordExtractor()
table.select(
extractKeywords(processText($"rawText")) as "keywords"
)
// Complex expressions with functions
table.select(
$"name",
call("UPPER", call("TRIM", $"name")) as "cleanUpperName",
ifThenElse(
call("LENGTH", $"description") > 100,
call("SUBSTRING", $"description", 1, 100) + "...",
$"description"
) as "shortDescription"
)Optimizing function calls for better performance.
Usage:
// Deterministic functions can be optimized
class DeterministicFunction extends ScalarFunction {
override def isDeterministic: Boolean = true
def eval(input: String): String = input.toUpperCase
}
// Non-deterministic functions
class RandomFunction extends ScalarFunction {
override def isDeterministic: Boolean = false
def eval(): Double = scala.util.Random.nextDouble()
}
// Prefer built-in functions when available
table.select(
// Instead of custom function:
// call("myUpper", $"name")
// Use built-in:
call("UPPER", $"name") as "upperName"
)