or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bridge.mdexpressions.mdfunctions.mdindex.mdoperations.mdserialization.mdtypeinfo.mdtypes.md
tile.json

bridge.mddocs/

DataStream Integration (Bridge API)

The bridge API provides seamless integration between Flink's Table API and DataStream API, enabling conversion between Table and DataStream objects and streaming-specific table operations.

Core Classes

StreamTableEnvironment

The main entry point for DataStream-Table integration, providing methods to convert between DataStream and Table objects.

trait StreamTableEnvironment extends TableEnvironment {
  // DataStream to Table conversion
  def fromDataStream[T](dataStream: DataStream[T]): Table
  def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table  
  def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table
  
  // Table to DataStream conversion
  def toDataStream(table: Table): DataStream[Row]
  def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]
  def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T]
  
  // Changelog stream conversion
  def toChangelogStream(table: Table): DataStream[Row]
  def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row]
  def toChangelogStream(
    table: Table, 
    targetDataType: AbstractDataType[_], 
    changelogMode: ChangelogMode
  ): DataStream[Row]
}

Factory Methods

object StreamTableEnvironment {
  // Create from existing StreamExecutionEnvironment
  def create(
    executionEnvironment: StreamExecutionEnvironment
  ): StreamTableEnvironment
  
  def create(
    executionEnvironment: StreamExecutionEnvironment,
    settings: EnvironmentSettings
  ): StreamTableEnvironment
  
  def create(settings: EnvironmentSettings): StreamTableEnvironment
}

Implicit Conversion Classes

TableConversions

Provides implicit conversion methods for Table objects to DataStream.

class TableConversions(table: Table) {
  /**
   * Converts the Table to a DataStream of Row objects.
   * Equivalent to StreamTableEnvironment.toDataStream(table)
   */
  def toDataStream(): DataStream[Row]
  
  /**
   * Converts the Table to a typed DataStream.
   * Equivalent to StreamTableEnvironment.toDataStream(table, targetClass)
   */
  def toDataStream[T](targetClass: Class[T]): DataStream[T]
  
  /**
   * Converts the Table to a typed DataStream with specified data type.
   * Equivalent to StreamTableEnvironment.toDataStream(table, targetDataType)
   */
  def toDataStream[T](targetDataType: AbstractDataType[_]): DataStream[T]
  
  /**
   * Converts the Table to a changelog DataStream.
   * Equivalent to StreamTableEnvironment.toChangelogStream(table)
   */
  def toChangelogStream(): DataStream[Row]
  
  /**
   * Converts the Table to a changelog DataStream with schema.
   * Equivalent to StreamTableEnvironment.toChangelogStream(table, targetSchema)
   */
  def toChangelogStream(targetSchema: Schema): DataStream[Row]
  
  /**
   * Converts the Table to a changelog DataStream with data type and changelog mode.
   */
  def toChangelogStream(
    targetDataType: AbstractDataType[_], 
    changelogMode: ChangelogMode
  ): DataStream[Row]
}

DataStreamConversions

Provides implicit conversion methods for DataStream objects to Table.

class DataStreamConversions[T](dataStream: DataStream[T]) {
  /**
   * Converts the DataStream to a Table.
   * Equivalent to StreamTableEnvironment.fromDataStream(dataStream)
   */
  def toTable()(implicit tEnv: StreamTableEnvironment): Table
  
  /**
   * Converts the DataStream to a Table with schema.
   * Equivalent to StreamTableEnvironment.fromDataStream(dataStream, schema)
   */
  def toTable(schema: Schema)(implicit tEnv: StreamTableEnvironment): Table
  
  /**
   * Converts the DataStream to a Table with field expressions.  
   * Equivalent to StreamTableEnvironment.fromDataStream(dataStream, fields)
   */
  def toTable(fields: Expression*)(implicit tEnv: StreamTableEnvironment): Table
}

Package Object Implicits

The org.apache.flink.table.api.bridge.scala package object provides automatic implicit conversions:

package object scala {
  // Automatic Table to TableConversions
  implicit def tableConversions(table: Table): TableConversions
  
  // Automatic Table to DataStream[Row] conversion
  implicit def tableToChangelogDataStream(table: Table): DataStream[Row]
  
  // Automatic DataStream to DataStreamConversions  
  implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T]
}

Usage Examples

Environment Setup

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

DataStream to Table Conversion

case class Order(id: Int, product: String, amount: Double)

val orders: DataStream[Order] = env.fromElements(
  Order(1, "laptop", 999.99),
  Order(2, "mouse", 29.99)
)

// Direct conversion
val ordersTable = tableEnv.fromDataStream(orders)

// With schema
val ordersTableWithSchema = tableEnv.fromDataStream(
  orders,
  Schema.newBuilder()
    .column("id", DataTypes.INT())
    .column("product", DataTypes.STRING())
    .column("amount", DataTypes.DOUBLE())
    .build()
)

// Using implicit conversion
val ordersTableImplicit = orders.toTable()

Table to DataStream Conversion

val processedTable = ordersTable
  .select($"id", $"product", $"amount" * 1.1 as "amountWithTax")
  .where($"amount" > 50.0)

// Direct conversion
val resultStream: DataStream[Row] = tableEnv.toDataStream(processedTable)

// Typed conversion
val typedStream: DataStream[Order] = tableEnv.toDataStream(processedTable, classOf[Order])

// Using implicit conversion
val implicitStream: DataStream[Row] = processedTable.toDataStream()

Changelog Streams

// For tables with updates/deletes
val changelogStream = tableEnv.toChangelogStream(processedTable)

// Process changelog entries
changelogStream.process(new ProcessFunction[Row, String] {
  override def processElement(
    value: Row, 
    ctx: ProcessFunction[Row, String]#Context, 
    out: Collector[String]
  ): Unit = {
    val rowKind = value.getKind
    val data = value.toString
    out.collect(s"$rowKind: $data")
  }
})

Integration Patterns

Hybrid Processing Pipeline

// DataStream processing
val rawStream = env.addSource(new MySourceFunction())
val cleanedStream = rawStream.filter(_.isValid)

// Convert to Table for SQL processing  
val cleanedTable = tableEnv.fromDataStream(cleanedStream)
val aggregatedTable = tableEnv.sqlQuery(
  "SELECT category, COUNT(*) as cnt, AVG(amount) as avg_amount " +
  "FROM " + cleanedTable + " " +
  "GROUP BY category"
)

// Convert back to DataStream for further processing
val aggregatedStream = tableEnv.toDataStream(aggregatedTable)
aggregatedStream.addSink(new MySinkFunction())

Notes

  • All bridge API components are marked as @deprecated as part of FLIP-265
  • Implicit conversions are automatically available when importing the bridge package
  • Schema inference works automatically for case classes and basic types
  • Changelog streams preserve Row-level change information (INSERT, UPDATE, DELETE)
  • The bridge package requires both table and streaming dependencies