Apache Flink Table API Scala Bridge provides seamless integration between Flink's Table/SQL API and Scala-specific DataStream operations for stream processing applications.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-api-scala-bridge_2-12@1.20.0The Flink Table API Scala Bridge provides seamless integration between Apache Flink's Table/SQL API and the Scala-specific DataStream API. This bridge library enables developers to convert DataStreams to Tables and vice-versa while leveraging Scala's type system and functional programming paradigms.
Key Features:
⚠️ Deprecation Notice: All Flink Scala APIs are deprecated as of version 1.18.0 and will be removed in a future major version. Users should migrate to the Java APIs. See FLIP-265 for details.
org.apache.flink:flink-table-api-scala-bridge_2.12Add to your build.sbt:
libraryDependencies += "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.20.2"Or in Maven pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.20.2</version>
</dependency>// Essential imports for Table API and DataStream integration
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
// Common type imports
import org.apache.flink.types.Row
import org.apache.flink.table.types.DataType
import org.apache.flink.table.connector.ChangelogModeimport org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
// Create execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)case class User(name: String, age: Int)
val users: DataStream[User] = env.fromCollection(Seq(
User("Alice", 25),
User("Bob", 30)
))
// Convert DataStream to Table (automatic schema derivation)
val userTable: Table = tableEnv.fromDataStream(users)
// Or use implicit conversion
import org.apache.flink.table.api.bridge.scala._
val userTable2: Table = users.toTable(tableEnv)// Convert Table back to DataStream
val resultStream: DataStream[Row] = tableEnv.toDataStream(userTable)
// Or use implicit conversion
val resultStream2: DataStream[Row] = userTable.toDataStream// Register table for SQL queries
tableEnv.createTemporaryView("users", userTable)
// Execute SQL query
val sqlResult: Table = tableEnv.sqlQuery(
"SELECT name, age FROM users WHERE age > 25"
)
val resultStream: DataStream[Row] = tableEnv.toDataStream(sqlResult)The Flink Table API Scala Bridge consists of several key components:
Configure and create StreamTableEnvironment instances with various settings and execution modes.
object StreamTableEnvironment {
def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment
}Convert between DataStreams and Tables with automatic schema derivation and custom schema definitions.
trait StreamTableEnvironment {
def fromDataStream[T](dataStream: DataStream[T]): Table
def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table
def toDataStream(table: Table): DataStream[Row]
def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]
}Handle updating tables and changelog streams for complex event processing scenarios.
trait StreamTableEnvironment {
def fromChangelogStream(dataStream: DataStream[Row]): Table
def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table
def toChangelogStream(table: Table): DataStream[Row]
def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row]
}Create, register, and manage tables and views within the table environment.
trait StreamTableEnvironment {
def createTemporaryView[T](path: String, dataStream: DataStream[T]): Unit
def createTemporaryView[T](path: String, dataStream: DataStream[T], schema: Schema): Unit
}Batch multiple table operations together for optimized execution planning.
trait StreamTableEnvironment {
def createStatementSet(): StreamStatementSet
}
trait StreamStatementSet {
def addInsert(targetPath: String, table: Table): StreamStatementSet
def addInsertSql(statement: String): StreamStatementSet
def execute(): TableResult
}Package-level implicit conversions for seamless integration between DataStream and Table APIs.
package object scala {
implicit def tableConversions(table: Table): TableConversions
implicit def dataStreamConversions[T](dataStream: DataStream[T]): DataStreamConversions[T]
implicit def tableToChangelogDataStream(table: Table): DataStream[Row]
}// Core Flink types
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.Table
import org.apache.flink.types.Row
// Schema and type system
import org.apache.flink.table.api.Schema
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.AbstractDataType
// Execution environment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
// Changelog processing
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.RowKindThe bridge API throws several types of exceptions:
try {
val table = tableEnv.fromDataStream(dataStream)
val result = tableEnv.toDataStream(table)
} catch {
case e: ValidationException => // Handle validation errors
case e: TableException => // Handle table processing errors
}Since this API is deprecated, consider migrating to the Java Table API:
// Java equivalent
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);