or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-table-api-scala_2.12@1.20.x

docs

builtin-functions.mdexpression-operations.mdfunction-integration.mdimplicit-conversions.mdindex.mdjson-operations.md
tile.json

tessl/maven-org-apache-flink--flink-table-api-scala_2-12

tessl install tessl/maven-org-apache-flink--flink-table-api-scala_2-12@1.20.0

Scala API for Apache Flink's Table/SQL ecosystem providing idiomatic Scala interfaces for table operations

function-integration.mddocs/

Function Integration

Support for calling user-defined functions, system functions, and SQL expressions within Scala table programs. This enables seamless integration of custom logic and external functions into table operations.

Capabilities

Function Call Methods

Core methods for calling different types of functions.

/**
 * Calls a function by catalog path with lookup resolution order:
 * 1. Temporary system function
 * 2. System function  
 * 3. Temporary catalog function
 * 4. Catalog function
 * @param path Function name or catalog.database.function path
 * @param params Function parameters
 * @return Function call expression
 */
def call(path: String, params: Expression*): Expression

/**
 * Calls an unregistered inline user-defined function
 * @param function UserDefinedFunction instance
 * @param params Function parameters
 * @return Function call expression
 */
def call(function: UserDefinedFunction, params: Expression*): Expression

/**
 * Calls an unregistered function by class
 * @param function UserDefinedFunction class
 * @param params Function parameters  
 * @return Function call expression
 */
def call(function: Class[_ <: UserDefinedFunction], params: Expression*): Expression

/**
 * Calls a SQL expression (limited to scalar expressions)
 * @param sqlExpression SQL expression string
 * @return Parsed and translated expression
 */
def callSql(sqlExpression: String): Expression

Usage:

// System function calls
table.select(
  call("UPPER", $"name") as "upperName",
  call("SUBSTRING", $"text", 1, 10) as "prefix"
)

// Custom function calls
val myFunction = new MyScalarFunction()
table.select(call(myFunction, $"input") as "result")

// SQL expression calls
table.select(
  callSql("CASE WHEN age >= 18 THEN 'adult' ELSE 'minor' END") as "ageGroup"
)

Scalar Function Integration

Wrapper class for scalar functions providing apply method syntax.

implicit class ScalarFunctionCall(val s: ScalarFunction) {
  /**
   * Calls scalar function with parameters using apply syntax
   * @param params Function parameters
   * @return Function call expression
   */
  def apply(params: Expression*): Expression
}

Usage:

// Define and use scalar function
class UpperCaseFunction extends ScalarFunction {
  def eval(str: String): String = str.toUpperCase
}

val upperCase = new UpperCaseFunction()

// Apply syntax usage
table.select(
  $"name",
  upperCase($"name") as "upperName"
)

// Equivalent to call(upperCase, $"name")

Table Function Integration

Wrapper class for table functions enabling apply method syntax for UDTF calls.

implicit class TableFunctionCall(val t: TableFunction[_]) {
  /**
   * Calls table function with parameters using apply syntax
   * @param params Function parameters
   * @return Table function call expression
   */
  def apply(params: Expression*): Expression
}

Usage:

// Define table function
class SplitFunction extends TableFunction[Row] {
  def eval(str: String): Unit = {
    str.split(",").foreach(s => collect(Row.of(s.trim())))
  }
}

val splitFunc = new SplitFunction()

// Use in lateral join
table
  .joinLateral(splitFunc($"csvData") as ("item"))
  .select($"id", $"item")

Aggregate Function Integration

Wrapper class for aggregate functions with support for both regular and distinct aggregations.

implicit class ImperativeAggregateFunctionCall[T: TypeInformation, ACC: TypeInformation](
  val a: ImperativeAggregateFunction[T, ACC]
) {
  /**
   * Calls aggregate function with parameters
   * @param params Function parameters
   * @return Aggregate function call expression
   */
  def apply(params: Expression*): Expression

  /**
   * Calls aggregate function with DISTINCT modifier
   * @param params Function parameters
   * @return DISTINCT aggregate function call expression  
   */
  def distinct(params: Expression*): Expression
}

Usage:

// Define aggregate function
class AvgFunction extends AggregateFunction[Double, AvgAccumulator] {
  override def createAccumulator(): AvgAccumulator = new AvgAccumulator()
  
  override def getValue(accumulator: AvgAccumulator): Double = 
    accumulator.sum / accumulator.count
    
  def accumulate(acc: AvgAccumulator, value: Double): Unit = {
    acc.sum += value
    acc.count += 1
  }
}

val avgFunc = new AvgFunction()

// Regular aggregation
table
  .groupBy($"category")
  .select($"category", avgFunc($"value") as "avgValue")

// Distinct aggregation  
table
  .groupBy($"category")
  .select($"category", avgFunc.distinct($"value") as "distinctAvg")

Function Registration

Methods for registering functions in the table environment for reuse.

Usage with TableEnvironment:

// Register functions for reuse
tableEnv.createTemporarySystemFunction("myUpper", classOf[UpperCaseFunction])
tableEnv.createTemporaryFunction("mySplit", classOf[SplitFunction])

// Use registered functions
table.select(call("myUpper", $"name") as "upperName")
table.joinLateral(call("mySplit", $"csvData") as ("item"))

Function Context and Parameters

Working with function contexts and complex parameter types.

Usage:

// Functions with multiple parameter types
class ComplexFunction extends ScalarFunction {
  def eval(str: String, num: Integer, flag: Boolean): String = {
    if (flag) s"${str}_${num}" else str
  }
}

val complexFunc = new ComplexFunction()

table.select(
  complexFunc($"name", $"id", $"active") as "result"
)

// Functions with optional parameters
class OptionalParamFunction extends ScalarFunction {
  def eval(required: String): String = eval(required, "default")
  def eval(required: String, optional: String): String = s"${required}_${optional}"
}

val optFunc = new OptionalParamFunction()

table.select(
  optFunc($"name") as "result1",
  optFunc($"name", $"suffix") as "result2"  
)

Generic Function Types

Support for generic function definitions with type preservation.

Usage:

// Generic function with type information
class GenericMapFunction[T: TypeInformation] extends ScalarFunction {
  def eval(input: T, mapper: Function[T, String]): String = mapper(input)
}

// Type-safe usage
implicit val stringTypeInfo = createTypeInformation[String]
val mapFunc = new GenericMapFunction[String]()

table.select(mapFunc($"stringField", lit("prefix_")) as "mapped")

Error Handling in Functions

Best practices for error handling in user-defined functions.

Usage:

class SafeDivideFunction extends ScalarFunction {
  def eval(numerator: Double, denominator: Double): Double = {
    if (denominator == 0.0) Double.NaN
    else numerator / denominator
  }
}

val safeDivide = new SafeDivideFunction()

table.select(
  safeDivide($"revenue", $"units") as "pricePerUnit"
)

// Handle NaN results
table
  .select(safeDivide($"revenue", $"units") as "price")
  .filter(!$"price".isNaN)

Function Composition

Combining multiple function calls for complex operations.

Usage:

// Chaining function calls
val processText = new TextProcessor()
val extractKeywords = new KeywordExtractor()

table.select(
  extractKeywords(processText($"rawText")) as "keywords"
)

// Complex expressions with functions
table.select(
  $"name",
  call("UPPER", call("TRIM", $"name")) as "cleanUpperName",
  ifThenElse(
    call("LENGTH", $"description") > 100,
    call("SUBSTRING", $"description", 1, 100) + "...",
    $"description"
  ) as "shortDescription"
)

Performance Considerations

Optimizing function calls for better performance.

Usage:

// Deterministic functions can be optimized
class DeterministicFunction extends ScalarFunction {
  override def isDeterministic: Boolean = true
  
  def eval(input: String): String = input.toUpperCase
}

// Non-deterministic functions
class RandomFunction extends ScalarFunction {
  override def isDeterministic: Boolean = false
  
  def eval(): Double = scala.util.Random.nextDouble()
}

// Prefer built-in functions when available
table.select(
  // Instead of custom function:
  // call("myUpper", $"name")
  // Use built-in:
  call("UPPER", $"name") as "upperName"
)