Table/SQL API bridge for Scala, enabling interaction between Table API and DataStream/DataSet APIs in Apache Flink
—
The TableConversions class provides utilities for converting Tables back to DataStreams with support for different output modes, type specifications, and changelog handling. It serves as a fluent wrapper around Table instances.
Wrapper class for Table conversion operations.
/**
* Creates conversion utilities for a Table
* @param table The Table to provide conversion methods for
*/
class TableConversions(table: Table)This class is typically accessed through implicit conversions from the package object:
import org.apache.flink.table.api.bridge.scala._
val table = tableEnv.fromDataStream(dataStream)
// table now has conversion methods available via implicit conversion
val resultStream = table.toDataStreamConvert Tables to DataStreams for insert-only operations with different type specifications.
/**
* Convert insert-only Table to DataStream of Row
* @return DataStream containing Row elements
*/
def toDataStream: DataStream[Row]
/**
* Convert insert-only Table to DataStream of specified class
* @param targetClass Target class for the DataStream elements
* @return DataStream containing elements of the specified class
*/
def toDataStream[T](targetClass: Class[T]): DataStream[T]
/**
* Convert insert-only Table to DataStream of specified data type
* @param targetDataType Target data type specification
* @return DataStream containing elements of the specified data type
*/
def toDataStream[T](targetDataType: AbstractDataType[_]): DataStream[T]Usage Examples:
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// Create a table
val table = tableEnv.fromDataStream(env.fromElements(("Alice", 25), ("Bob", 30)))
// Convert to Row DataStream
val rowStream = table.toDataStream
// Convert to typed DataStream using case class
case class Person(name: String, age: Int)
val personStream = table.toDataStream(classOf[Person])
// Convert using data type specification
val typedStream = table.toDataStream[Person](DataTypes.STRUCTURED(classOf[Person]))Convert Tables to changelog DataStreams for handling streams with insert/update/delete operations.
/**
* Convert Table to changelog DataStream
* @return DataStream containing Row elements with changelog information
*/
def toChangelogStream: DataStream[Row]
/**
* Convert Table to changelog DataStream with custom schema
* @param targetSchema Custom schema for the output stream
* @return DataStream containing Row elements with changelog information
*/
def toChangelogStream(targetSchema: Schema): DataStream[Row]
/**
* Convert Table to changelog DataStream with custom schema and changelog mode
* @param targetSchema Custom schema for the output stream
* @param changelogMode Changelog mode configuration
* @return DataStream containing Row elements with changelog information
*/
def toChangelogStream(targetSchema: Schema, changelogMode: ChangelogMode): DataStream[Row]Usage Examples:
import org.apache.flink.types.{Row, RowKind}
// Convert to basic changelog stream
val changelogStream = table.toChangelogStream
// Process changelog stream
changelogStream.map { row =>
val kind = row.getKind
val name = row.getField(0).toString
val age = row.getField(1).asInstanceOf[Int]
kind match {
case RowKind.INSERT => s"Added: $name, $age"
case RowKind.UPDATE_AFTER => s"Updated: $name, $age"
case RowKind.DELETE => s"Deleted: $name, $age"
case _ => s"Other: $name, $age"
}
}
// Convert with custom schema
val targetSchema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build()
val changelogStreamWithSchema = table.toChangelogStream(targetSchema)
// Convert with custom changelog mode
val changelogMode = ChangelogMode.insertOnly()
val changelogStreamWithMode = table.toChangelogStream(targetSchema, changelogMode)Deprecated methods that should not be used in new code:
/**
* Convert to append-only stream (deprecated)
* @deprecated Use toDataStream instead
*/
@deprecated("Use toDataStream", "1.18.0")
def toAppendStream[T: TypeInformation]: DataStream[T]
/**
* Convert to retract stream (deprecated)
* @deprecated Use toChangelogStream instead
*/
@deprecated("Use toChangelogStream", "1.18.0")
def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)]Migration Examples:
// Legacy approach (deprecated)
val appendStream = table.toAppendStream[Person]
val retractStream = table.toRetractStream[Person]
// Preferred approach
val insertOnlyStream = table.toDataStream(classOf[Person])
val changelogStream = table.toChangelogStream
// Process changelog stream to get retract-style tuples if needed
val retractStyleStream = changelogStream.map { row =>
val isInsert = row.getKind == RowKind.INSERT || row.getKind == RowKind.UPDATE_AFTER
val person = Person(row.getField(0).toString, row.getField(1).asInstanceOf[Int])
(isInsert, person)
}Use toDataStream methods when:
// Good for append-only scenarios
val insertOnlyTable = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 18")
val resultStream = insertOnlyTable.toDataStream(classOf[Person])Use toChangelogStream methods when:
// Good for aggregation scenarios
val aggregatedTable = tableEnv.sqlQuery("SELECT name, COUNT(*) as count FROM events GROUP BY name")
val changelogStream = aggregatedTable.toChangelogStream
changelogStream.map { row =>
row.getKind match {
case RowKind.INSERT => s"New group: ${row.getField(0)}"
case RowKind.UPDATE_AFTER => s"Updated count for: ${row.getField(0)}"
case _ => s"Other change for: ${row.getField(0)}"
}
}For successful Table to DataStream conversion:
Common target types include:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12