The bridge API provides seamless integration between Flink's Table API and DataStream API, enabling conversion between Table and DataStream objects and streaming-specific table operations.
The main entry point for DataStream-Table integration, providing methods to convert between DataStream and Table objects.
trait StreamTableEnvironment extends TableEnvironment {
// DataStream to Table conversion
def fromDataStream[T](dataStream: DataStream[T]): Table
def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table
def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table
// Table to DataStream conversion
def toDataStream(table: Table): DataStream[Row]
def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]
def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T]
// Changelog stream conversion
def toChangelogStream(table: Table): DataStream[Row]
def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row]
def toChangelogStream(
table: Table,
targetDataType: AbstractDataType[_],
changelogMode: ChangelogMode
): DataStream[Row]
}object StreamTableEnvironment {
// Create from existing StreamExecutionEnvironment
def create(
executionEnvironment: StreamExecutionEnvironment
): StreamTableEnvironment
def create(
executionEnvironment: StreamExecutionEnvironment,
settings: EnvironmentSettings
): StreamTableEnvironment
def create(settings: EnvironmentSettings): StreamTableEnvironment
}Provides implicit conversion methods for Table objects to DataStream.
class TableConversions(table: Table) {
/**
* Converts the Table to a DataStream of Row objects.
* Equivalent to StreamTableEnvironment.toDataStream(table)
*/
def toDataStream(): DataStream[Row]
/**
* Converts the Table to a typed DataStream.
* Equivalent to StreamTableEnvironment.toDataStream(table, targetClass)
*/
def toDataStream[T](targetClass: Class[T]): DataStream[T]
/**
* Converts the Table to a typed DataStream with specified data type.
* Equivalent to StreamTableEnvironment.toDataStream(table, targetDataType)
*/
def toDataStream[T](targetDataType: AbstractDataType[_]): DataStream[T]
/**
* Converts the Table to a changelog DataStream.
* Equivalent to StreamTableEnvironment.toChangelogStream(table)
*/
def toChangelogStream(): DataStream[Row]
/**
* Converts the Table to a changelog DataStream with schema.
* Equivalent to StreamTableEnvironment.toChangelogStream(table, targetSchema)
*/
def toChangelogStream(targetSchema: Schema): DataStream[Row]
/**
* Converts the Table to a changelog DataStream with data type and changelog mode.
*/
def toChangelogStream(
targetDataType: AbstractDataType[_],
changelogMode: ChangelogMode
): DataStream[Row]
}Provides implicit conversion methods for DataStream objects to Table.
class DataStreamConversions[T](dataStream: DataStream[T]) {
/**
* Converts the DataStream to a Table.
* Equivalent to StreamTableEnvironment.fromDataStream(dataStream)
*/
def toTable()(implicit tEnv: StreamTableEnvironment): Table
/**
* Converts the DataStream to a Table with schema.
* Equivalent to StreamTableEnvironment.fromDataStream(dataStream, schema)
*/
def toTable(schema: Schema)(implicit tEnv: StreamTableEnvironment): Table
/**
* Converts the DataStream to a Table with field expressions.
* Equivalent to StreamTableEnvironment.fromDataStream(dataStream, fields)
*/
def toTable(fields: Expression*)(implicit tEnv: StreamTableEnvironment): Table
}The org.apache.flink.table.api.bridge.scala package object provides automatic implicit conversions:
package object scala {
// Automatic Table to TableConversions
implicit def tableConversions(table: Table): TableConversions
// Automatic Table to DataStream[Row] conversion
implicit def tableToChangelogDataStream(table: Table): DataStream[Row]
// Automatic DataStream to DataStreamConversions
implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T]
}import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)case class Order(id: Int, product: String, amount: Double)
val orders: DataStream[Order] = env.fromElements(
Order(1, "laptop", 999.99),
Order(2, "mouse", 29.99)
)
// Direct conversion
val ordersTable = tableEnv.fromDataStream(orders)
// With schema
val ordersTableWithSchema = tableEnv.fromDataStream(
orders,
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("product", DataTypes.STRING())
.column("amount", DataTypes.DOUBLE())
.build()
)
// Using implicit conversion
val ordersTableImplicit = orders.toTable()val processedTable = ordersTable
.select($"id", $"product", $"amount" * 1.1 as "amountWithTax")
.where($"amount" > 50.0)
// Direct conversion
val resultStream: DataStream[Row] = tableEnv.toDataStream(processedTable)
// Typed conversion
val typedStream: DataStream[Order] = tableEnv.toDataStream(processedTable, classOf[Order])
// Using implicit conversion
val implicitStream: DataStream[Row] = processedTable.toDataStream()// For tables with updates/deletes
val changelogStream = tableEnv.toChangelogStream(processedTable)
// Process changelog entries
changelogStream.process(new ProcessFunction[Row, String] {
override def processElement(
value: Row,
ctx: ProcessFunction[Row, String]#Context,
out: Collector[String]
): Unit = {
val rowKind = value.getKind
val data = value.toString
out.collect(s"$rowKind: $data")
}
})// DataStream processing
val rawStream = env.addSource(new MySourceFunction())
val cleanedStream = rawStream.filter(_.isValid)
// Convert to Table for SQL processing
val cleanedTable = tableEnv.fromDataStream(cleanedStream)
val aggregatedTable = tableEnv.sqlQuery(
"SELECT category, COUNT(*) as cnt, AVG(amount) as avg_amount " +
"FROM " + cleanedTable + " " +
"GROUP BY category"
)
// Convert back to DataStream for further processing
val aggregatedStream = tableEnv.toDataStream(aggregatedTable)
aggregatedStream.addSink(new MySinkFunction())@deprecated as part of FLIP-265