Table/SQL API bridge for Scala, enabling interaction between Table API and DataStream/DataSet APIs in Apache Flink
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12@2.1.0The Flink Table API Scala Bridge provides seamless integration between Apache Flink's Table/SQL API and DataStream API for Scala developers. This bridge enables bidirectional conversion between DataStreams and Tables, allowing mixed declarative SQL operations with procedural stream processing in unified Scala applications.
org.apache.flink:flink-table-api-scala-bridge_2.12:2.1.0pom.xml dependencies<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
// Create execution environment and table environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// Create a DataStream
val dataStream = env.fromElements(
("Alice", 25, "Engineer"),
("Bob", 30, "Manager"),
("Charlie", 35, "Developer")
)
// Convert DataStream to Table
val table = tableEnv.fromDataStream(dataStream, $"name", $"age", $"role")
// Perform SQL operations
val filteredTable = table.filter($"age" > 28)
// Convert back to DataStream
val resultStream = tableEnv.toDataStream(filteredTable)
// Execute the job
env.execute("Table Bridge Example")The Flink Table API Scala Bridge is built around several key components:
DataStreamConversions and TableConversions for seamless type-safe conversionsStreamStatementSet for batching multiple table operations for optimized executionCore table environment for streaming applications, providing the entry point for all Table/SQL API operations integrated with DataStream processing.
trait StreamTableEnvironment extends TableEnvironment
object StreamTableEnvironment {
def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment
}Convert DataStreams to Tables with automatic or custom schema derivation, supporting both regular streams and changelog streams.
class DataStreamConversions[T](dataStream: DataStream[T]) {
def toTable(tableEnv: StreamTableEnvironment): Table
def toTable(tableEnv: StreamTableEnvironment, schema: Schema): Table
def toChangelogTable(tableEnv: StreamTableEnvironment): Table
}Convert Tables back to DataStreams with support for different output modes including insert-only and full changelog streams.
class TableConversions(table: Table) {
def toDataStream: DataStream[Row]
def toDataStream[T](targetClass: Class[T]): DataStream[T]
def toChangelogStream: DataStream[Row]
def toChangelogStream(targetSchema: Schema): DataStream[Row]
}Batch multiple table operations together for optimized execution and resource management.
trait StreamStatementSet extends StatementSet {
def add(tablePipeline: TablePipeline): StreamStatementSet
def addInsertSql(statement: String): StreamStatementSet
def addInsert(targetPath: String, table: Table): StreamStatementSet
def attachAsDataStream(): Unit
}Scala-idiomatic implicit conversions for seamless integration between DataStream and Table APIs.
// Available implicit conversions from package object
implicit def tableConversions(table: Table): TableConversions
implicit def tableToChangelogDataStream(table: Table): DataStream[Row]
implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T]// Core Flink types used throughout the API
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.Schema
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.types.AbstractDataType
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{TableDescriptor, TablePipeline, ExplainDetail}
import org.apache.flink.types.RowKindImportant: All APIs in this module are deprecated as of Flink 1.18.0 (FLIP-265) and will be removed in a future major version. Consider migrating to the new unified Table API approach as documented in the Flink migration guide.