Table/SQL API bridge for Scala, enabling interaction between Table API and DataStream/DataSet APIs in Apache Flink
—
The Flink Table API Scala Bridge provides Scala-idiomatic implicit conversions for seamless integration between DataStream and Table APIs. These conversions are available through the package object and enable fluent, type-safe operations.
All implicit conversions are provided through the org.apache.flink.table.api.bridge.scala package object and are automatically available when you import the package.
import org.apache.flink.table.api.bridge.scala._This import makes all implicit conversions available in scope.
Implicit conversion from Table to TableConversions for fluent DataStream conversion methods.
/**
* Conversions from Table to DataStream
* @param table The table to provide conversion methods for
* @return TableConversions wrapper with conversion methods
*/
implicit def tableConversions(table: Table): TableConversionsUsage Example:
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
val table = tableEnv.fromDataStream(dataStream)
// These methods are available via implicit conversion:
val rowStream = table.toDataStream // Returns DataStream[Row]
val changelogStream = table.toChangelogStream // Returns DataStream[Row]
// Type-safe conversion
case class Person(name: String, age: Int)
val personStream = table.toDataStream(classOf[Person])Direct implicit conversion from Table to changelog DataStream for the most common use case.
/**
* Conversions from Table to DataStream of changelog entries
* Provides direct conversion to changelog DataStream for convenience
* @param table The table to convert
* @return DataStream[Row] containing changelog entries
*/
implicit def tableToChangelogDataStream(table: Table): DataStream[Row]Usage Example:
import org.apache.flink.table.api.bridge.scala._
val table = tableEnv.sqlQuery("SELECT name, COUNT(*) FROM events GROUP BY name")
// Direct implicit conversion to changelog stream
val changelogStream: DataStream[Row] = table
// Process the changelog stream
changelogStream.map { row =>
s"${row.getKind}: ${row.getField(0)} -> ${row.getField(1)}"
}Important Note: This conversion only works with Tables that are part of a Scala StreamTableEnvironment. It will throw a ValidationException if used with tables from other environments.
Implicit conversion from DataStream to DataStreamConversions for fluent Table conversion methods.
/**
* Conversions from DataStream to Table
* @param set The DataStream to provide conversion methods for
* @return DataStreamConversions wrapper with conversion methods
*/
implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T]Usage Example:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))
// These methods are available via implicit conversion:
val table = dataStream.toTable(tableEnv)
val tableWithSchema = dataStream.toTable(tableEnv, schema)
dataStream.createTemporaryView(tableEnv, "my_view")
// For changelog streams
val changelogStream = env.fromElements(
Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25))
)
val changelogTable = changelogStream.toChangelogTable(tableEnv)import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// Create DataStream
val dataStream = env.fromElements(
("Alice", 25, "Engineer"),
("Bob", 30, "Manager"),
("Charlie", 35, "Developer")
)
// Convert to Table using implicit conversion
val table = dataStream.toTable(tableEnv)
// Apply table operations
val filteredTable = table.filter($"_2" > 28) // age > 28
// Convert back to DataStream using implicit conversion
val resultStream = filteredTable.toDataStream
// Or direct implicit conversion to changelog stream
val changelogResult: DataStream[Row] = filteredTable
resultStream.print("Results")
env.execute("Implicit Conversions Example")import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
case class Employee(name: String, age: Int, department: String)
val dataStream = env.fromElements(
Employee("Alice", 25, "Engineering"),
Employee("Bob", 30, "Marketing"),
Employee("Charlie", 35, "Engineering")
)
// Define custom schema
val schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("department", DataTypes.STRING())
.build()
// Convert with schema using implicit conversion
val table = dataStream.toTable(tableEnv, schema)
// Create temporary view using implicit conversion
dataStream.createTemporaryView(tableEnv, "employees", schema)
// Query the view
val engineeringTable = tableEnv.sqlQuery("""
SELECT name, age
FROM employees
WHERE department = 'Engineering' AND age > 30
""")
// Convert back to typed DataStream using implicit conversion
val engineeringStream = engineeringTable.toDataStream(classOf[Employee])
engineeringStream.print("Engineering Results")
env.execute("Schema Example")import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.types.{Row, RowKind}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// Create changelog stream
val changelogStream = env.fromElements(
Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25)),
Row.of(RowKind.UPDATE_AFTER, "Alice", Integer.valueOf(26)),
Row.of(RowKind.DELETE, "Bob", Integer.valueOf(30)),
Row.of(RowKind.INSERT, "Charlie", Integer.valueOf(35))
)
// Convert to table using implicit conversion
val changelogTable = changelogStream.toChangelogTable(tableEnv)
// Apply aggregation (will produce more changelog events)
val aggregatedTable = changelogTable
.groupBy($"f1" >= 30) // group by age >= 30
.select($"f1" >= 30 as "age_group", $"f0".count() as "count")
// Convert back to changelog stream using direct implicit conversion
val resultChangelogStream: DataStream[Row] = aggregatedTable
// Process changelog events
resultChangelogStream.map { row =>
val kind = row.getKind
val ageGroup = row.getField(0).asInstanceOf[Boolean]
val count = row.getField(1).asInstanceOf[Long]
val group = if (ageGroup) "30+" else "under 30"
s"$kind: $group has $count people"
}.print("Changelog Results")
env.execute("Changelog Example")The direct tableToChangelogDataStream implicit conversion performs validation:
try {
val invalidTable = batchTableEnv.fromValues(1, 2, 3) // Not from StreamTableEnvironment
val stream: DataStream[Row] = invalidTable // This will throw ValidationException
} catch {
case e: ValidationException =>
println(s"Cannot convert table: ${e.getMessage}")
}Implicit conversions maintain type safety:
val dataStream: DataStream[(String, Int)] = env.fromElements(("Alice", 25))
val table = dataStream.toTable(tableEnv) // Type information preserved
// This will work:
case class Person(name: String, age: Int)
val personStream = table.toDataStream(classOf[Person])
// This will fail at runtime if types don't match:
case class WrongType(id: Int, value: String)
// val wrongStream = table.toDataStream(classOf[WrongType]) // Runtime errorimport org.apache.flink.table.api.bridge.scala._Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12