Scala API for Apache Flink's Table & SQL ecosystem with type-safe bindings and comprehensive support for Scala-specific types
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-api-scala_2-12@2.1.0The Apache Flink Scala Table API provides Scala-specific bindings for Flink's Table & SQL ecosystem. It enables type-safe Scala programming with implicit conversions, operator overloading, and comprehensive support for Scala-specific types like case classes, Option, Either, and collections.
⚠️ Deprecation Notice: All Flink Scala APIs are deprecated as of Flink 1.18.0 (FLIP-265) and will be removed in a future major version. Users should migrate to the Java Table API while continuing to use Scala as their application language.
pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.12</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._The first import provides access to:
The bridge import adds:
import org.apache.flink.table.api._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
// Create execution environment and table environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
// Create table from data with case class
case class Order(id: Int, product: String, amount: Double)
val orders = env.fromElements(
Order(1, "laptop", 999.99),
Order(2, "mouse", 29.99)
)
val ordersTable = tEnv.fromDataStream(orders)
// Use Scala-specific syntax with implicit conversions
val result = ordersTable
.select($"id", $"product", $"amount" * 1.1 as "amountWithTax")
.where($"amount" > 50.0)
// Convert back to DataStream
val resultStream = tEnv.toDataStream(result)The Flink Scala Table API is built around several key components:
Core expression creation and implicit conversions that enable natural Scala syntax for table operations. Includes literal conversions, field references, and operator overloading.
trait ImplicitExpressionConversions {
// Constants for window operations
implicit val UNBOUNDED_ROW: Expression
implicit val UNBOUNDED_RANGE: Expression
implicit val CURRENT_ROW: Expression
implicit val CURRENT_RANGE: Expression
// Field reference creation
def $(name: String): Expression
def col(name: String): Expression
// Literal creation
def lit(v: Any): Expression
def lit(v: Any, dataType: DataType): Expression
// Function calls
def call(path: String, params: Expression*): Expression
def call(function: UserDefinedFunction, params: Expression*): Expression
def call(function: Class[_ <: UserDefinedFunction], params: Expression*): Expression
def callSql(sqlExpression: String): Expression
}
// Implicit classes for expression operations
implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations
implicit class UnresolvedFieldExpression(s: Symbol) extends ImplicitExpressionOperationsComprehensive type system providing TypeInformation for all Scala types including case classes, collections, Option, Either, and Try. Uses macros for automatic type inference.
object Types {
// Generic type creation
def of[T: TypeInformation]: TypeInformation[T]
// Scala-specific types
val UNIT: TypeInformation[Unit]
val NOTHING: TypeInformation[Nothing]
// Factory methods for complex types
def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
def TUPLE[T: TypeInformation]: TypeInformation[T]
def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T]
def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]
def TRY[A, T <: Try[A]](valueType: TypeInformation[A]): TypeInformation[T]
// Collection types
def TRAVERSABLE[T: TypeInformation]: TypeInformation[T]
def OBJECT_ARRAY[E <: AnyRef](elementType: TypeInformation[E]): TypeInformation[Array[E]]
}
trait ImplicitTypeConversions {
// Macro-based type inference
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
implicit val scalaNothingTypeInfo: TypeInformation[Nothing]
}Rich set of expression operations including arithmetic, comparison, logical operators, and specialized operations for table transformations. Provides natural Scala operator syntax.
trait ImplicitExpressionOperations {
// Field aliasing
def as(name: Symbol, extraNames: Symbol*): Expression
// Comparison operators
def >(other: Expression): Expression
def >=(other: Expression): Expression
def <(other: Expression): Expression
def <=(other: Expression): Expression
def ===(other: Expression): Expression
def !==(other: Expression): Expression
// Logical operators
def &&(other: Expression): Expression
def ||(other: Expression): Expression
def unary_!: Expression
// Arithmetic operators
def +(other: Expression): Expression
def -(other: Expression): Expression
def *(other: Expression): Expression
def /(other: Expression): Expression
def %(other: Expression): Expression
def unary_-: Expression
def unary_+: Expression
// Specialized operations
def to(other: Expression): Expression // Range for column selection
def ?(ifTrue: Expression, ifFalse: Expression): Expression // Ternary conditional
def rows: Expression // Row interval for windowing
}Extensive collection of built-in functions for date/time operations, mathematical calculations, string manipulation, JSON processing, and utility operations.
// Date and time functions
def currentDate(): Expression
def currentTime(): Expression
def currentTimestamp(): Expression
def localTime(): Expression
def localTimestamp(): Expression
// Mathematical functions
def pi(): Expression
def e(): Expression
def rand(): Expression
def randInteger(bound: Expression): Expression
def atan2(y: Expression, x: Expression): Expression
def log(base: Expression, antilogarithm: Expression): Expression
def exp(base: Expression): Expression
def power(base: Expression, exponent: Expression): Expression
def mod(numeric1: Expression, numeric2: Expression): Expression
// String functions
def concat(string: Expression*): Expression
def concatWs(separator: Expression, string: Expression*): Expression
def uuid(): Expression
def upper(string: Expression): Expression
def lower(string: Expression): Expression
def length(string: Expression): Expression
def position(string: Expression, substring: Expression): Expression
// JSON functions
def jsonString(string: Expression): Expression
def jsonObject(keyValue: Expression*): Expression
def jsonArray(element: Expression*): Expression
def jsonValue(jsonString: Expression, path: Expression): Expression
def jsonQuery(jsonString: Expression, path: Expression): Expression
// Utility functions
def nullOf(dataType: DataType): Expression
def ifThenElse(condition: Expression, ifTrue: Expression, ifFalse: Expression): Expression
def coalesce(expr: Expression*): Expression
def isnull(expr: Expression): Expression
def isNotNull(expr: Expression): ExpressionSpecialized TypeInformation implementations for Scala-specific types providing efficient serialization and type handling for case classes, Option, Either, Try, and collections.
// Case class type information
abstract class CaseClassTypeInfo[T](
clazz: Class[T],
typeParamTypeInfos: Array[TypeInformation[_]],
fieldTypes: Seq[TypeInformation[_]],
fieldNames: Seq[String]
) extends TypeInformation[T] {
def getFieldNames: Array[String]
def getFieldIndex(fieldName: String): Int
def getFieldIndices(fields: Array[String]): Array[Int]
def getTypeAt[X](fieldExpression: String): TypeInformation[X]
def getFlatFields(): List[FlatFieldDescriptor]
}
// Option type information
class OptionTypeInfo[A, T <: Option[A]](elemTypeInfo: TypeInformation[A])
extends TypeInformation[T]
// Either type information
class EitherTypeInfo[A, B, T <: Either[A, B]](
clazz: Class[T],
leftTypeInfo: TypeInformation[A],
rightTypeInfo: TypeInformation[B]
) extends TypeInformation[T]
// Try type information
class TryTypeInfo[A, T <: Try[A]](valueType: TypeInformation[A])
extends TypeInformation[T]
// Collection type information
abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
clazz: Class[T],
elementTypeInfo: TypeInformation[E]
) extends TypeInformation[T]
// Enumeration type information
class EnumValueTypeInfo[E <: Enumeration](enum: E, clazz: Class[E#Value])
extends TypeInformation[E#Value] with AtomicType[E#Value]Integration layer between Table API and DataStream API enabling seamless conversion between Table and DataStream objects with streaming-specific operations.
trait StreamTableEnvironment extends TableEnvironment {
// DataStream to Table conversion
def fromDataStream[T](dataStream: DataStream[T]): Table
def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table
// Table to DataStream conversion
def toDataStream(table: Table): DataStream[Row]
def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]
def toChangelogStream(table: Table): DataStream[Row]
}
// Implicit conversion classes
class TableConversions(table: Table)
class DataStreamConversions[T](dataStream: DataStream[T])Kryo-based serialization configuration and specialized serializers for efficient handling of all Scala types including collections, tuples, and special types in distributed Flink execution.
// Kryo configuration for Scala types
class FlinkScalaKryoInstantiator extends KryoInstantiator {
def newKryo: Kryo // Pre-configured with Scala serializers
}
// Specialized serializers for runtime efficiency
class CaseClassSerializer[T <: Product](/* parameters */) extends TypeSerializer[T]
class OptionSerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Option[A]]
class EitherSerializer[A, B](/* parameters */) extends TypeSerializer[Either[A, B]]
class TrySerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Try[A]]
class TraversableSerializer[T <: TraversableOnce[E], E](/* parameters */) extends TypeSerializer[T]
class EnumValueSerializer[E <: Enumeration](/* parameters */) extends TypeSerializer[E#Value]Since all Scala APIs are deprecated, consider these migration approaches:
The Java Table API provides equivalent functionality without Scala-specific syntax conveniences.
case class User(id: Int, name: String, email: Option[String])
val userTable = tEnv.fromDataStream(users) // Automatic type inferenceval result = table.select($"email".isNotNull ? $"email" : lit("N/A") as "emailDisplay")val processed = table
.select($"amount" * 1.1 + $"tax" as "total")
.where($"total" > 100.0)
.groupBy($"category")
.select($"category", $"total".sum as "categoryTotal")