tessl install tessl/maven-org-apache-flink--flink-table-api-scala_2-12@1.20.0Scala API for Apache Flink's Table/SQL ecosystem providing idiomatic Scala interfaces for table operations
⚠️ DEPRECATED: This Scala API is deprecated since Flink 1.18.0 and will be removed in a future Flink major version. Users should migrate to the Java Table API. See FLIP-265 for details.
Apache Flink Table API Scala provides idiomatic Scala interfaces for Flink's Table & SQL ecosystem. This library enables developers to write table programs using natural Scala syntax with implicit conversions, operator overloading, and type-safe expressions for both bounded and unbounded data streams.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.12</artifactId>
<version>1.20.2</version>
</dependency>For sbt:
libraryDependencies += "org.apache.flink" %% "flink-table-api-scala" % "1.20.2"import org.apache.flink.table.api._This single import provides access to all Scala Table API functionality including:
import org.apache.flink.table.api._
import org.apache.flink.streaming.api.scala._
// Create table environment (requires Flink streaming environment)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// Using expressions with implicit conversions
val table = tableEnv.fromValues(
("Alice", 25, true),
("Bob", 30, false),
("Charlie", 35, true)
).as($"name", $"age", $"active")
// Filter and transform using Scala operators
val result = table
.filter($"age" > 25 && $"active")
.select($"name", $"age" * 12 as "ageInMonths")
// String interpolation for field references
val filtered = table.select($"name", $"age").where($"age" >= 30)The Scala Table API is built around three core components:
org.apache.flink.table.api) that extends ImplicitExpressionConversionsCore trait providing implicit conversions from Scala values to Flink expressions, enabling natural Scala syntax in table operations.
trait ImplicitExpressionConversions {
// Implicit constants for window operations
implicit val UNBOUNDED_ROW: Expression
implicit val UNBOUNDED_RANGE: Expression
implicit val CURRENT_ROW: Expression
implicit val CURRENT_RANGE: Expression
// Field reference creation
def $(name: String): Expression
def col(name: String): Expression
// Literal creation
def lit(v: Any): Expression
def lit(v: Any, dataType: DataType): Expression
}Trait providing operator overloading and method extensions for expressions, enabling fluent Scala-style operations on table expressions.
trait ImplicitExpressionOperations {
// Comparison operators
def >(other: Expression): Expression
def >=(other: Expression): Expression
def <(other: Expression): Expression
def <=(other: Expression): Expression
def ===(other: Expression): Expression
def !==(other: Expression): Expression
// Arithmetic operators
def +(other: Expression): Expression
def -(other: Expression): Expression
def *(other: Expression): Expression
def /(other: Expression): Expression
def %(other: Expression): Expression
// Boolean operators
def &&(other: Expression): Expression
def ||(other: Expression): Expression
def unary_! : Expression
// Special operations
def as(name: Symbol, extraNames: Symbol*): Expression
def ?(ifTrue: Expression, ifFalse: Expression): Expression
def to(other: Expression): Expression
def trim(removeLeading: Boolean = true, removeTrailing: Boolean = true, character: Expression = " "): Expression
}Comprehensive collection of SQL functions accessible through the Scala API for date/time operations, string manipulation, mathematical calculations, and data transformations.
// Date/time functions
def currentDate(): Expression
def currentTime(): Expression
def currentTimestamp(): Expression
def dateFormat(timestamp: Expression, format: Expression): Expression
def timestampDiff(timePointUnit: TimePointUnit, timePoint1: Expression, timePoint2: Expression): Expression
// String functions
def concat(string: Expression, strings: Expression*): Expression
def concatWs(separator: Expression, string: Expression, strings: Expression*): Expression
// Math functions
def pi(): Expression
def e(): Expression
def rand(): Expression
def atan2(y: Expression, x: Expression): Expression
// Collection functions
def array(head: Expression, tail: Expression*): Expression
def row(head: Expression, tail: Expression*): Expression
def map(key: Expression, value: Expression, tail: Expression*): ExpressionSupport for calling user-defined functions, system functions, and SQL expressions within Scala table programs.
// Function calls
def call(path: String, params: Expression*): Expression
def call(function: UserDefinedFunction, params: Expression*): Expression
def call(function: Class[_ <: UserDefinedFunction], params: Expression*): Expression
def callSql(sqlExpression: String): Expression
// Function wrapper classes
implicit class ScalarFunctionCall(s: ScalarFunction) {
def apply(params: Expression*): Expression
}
implicit class TableFunctionCall(t: TableFunction[_]) {
def apply(params: Expression*): Expression
}
implicit class ImperativeAggregateFunctionCall[T, ACC](a: ImperativeAggregateFunction[T, ACC]) {
def apply(params: Expression*): Expression
def distinct(params: Expression*): Expression
}Built-in support for JSON construction and manipulation operations with configurable null handling behavior.
def jsonString(value: Expression): Expression
def jsonObject(onNull: JsonOnNull, keyValues: Expression*): Expression
def jsonObjectAgg(onNull: JsonOnNull, keyExpr: Expression, valueExpr: Expression): Expression
def jsonArray(onNull: JsonOnNull, values: Expression*): Expression
def jsonArrayAgg(onNull: JsonOnNull, itemExpr: Expression): ExpressionDeprecation Status: All Scala APIs are deprecated as of Flink 1.18.0
Migration Path:
org.apache.flink.table.api._ to Java equivalentsTimeline: Scala APIs will be removed in a future Flink major version