Table/SQL API bridge for Scala, enabling interaction between Table API and DataStream/DataSet APIs in Apache Flink
—
The DataStreamConversions class provides utilities for converting DataStreams to Tables with various schema and conversion options. It serves as a fluent wrapper around DataStream instances to enable easy table conversion.
Wrapper class for DataStream conversion operations.
/**
* Creates conversion utilities for a DataStream
* @param dataStream The DataStream to provide conversion methods for
*/
class DataStreamConversions[T](dataStream: DataStream[T])This class is typically accessed through implicit conversions from the package object:
import org.apache.flink.table.api.bridge.scala._
val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))
// dataStream now has conversion methods available via implicit conversion
val table = dataStream.toTable(tableEnv)Convert DataStreams to Tables with automatic or custom schema derivation.
/**
* Convert to Table with auto-derived schema
* @param tableEnv The StreamTableEnvironment to use for conversion
* @return Table representation of the DataStream
*/
def toTable(tableEnv: StreamTableEnvironment): Table
/**
* Convert to Table with custom schema
* @param tableEnv The StreamTableEnvironment to use for conversion
* @param schema Custom schema definition
* @return Table representation of the DataStream
*/
def toTable(tableEnv: StreamTableEnvironment, schema: Schema): TableUsage Examples:
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// Auto-derived schema
val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))
val table = dataStream.toTable(tableEnv)
// Custom schema
val schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build()
val tableWithSchema = dataStream.toTable(tableEnv, schema)Convert changelog DataStreams to Tables for handling streams with insert/update/delete operations.
/**
* Convert changelog DataStream to Table
* @param tableEnv The StreamTableEnvironment to use for conversion
* @return Table representation of the changelog stream
*/
def toChangelogTable(tableEnv: StreamTableEnvironment): Table
/**
* Convert changelog DataStream to Table with custom schema
* @param tableEnv The StreamTableEnvironment to use for conversion
* @param schema Custom schema definition
* @return Table representation of the changelog stream
*/
def toChangelogTable(tableEnv: StreamTableEnvironment, schema: Schema): Table
/**
* Convert changelog DataStream to Table with custom schema and changelog mode
* @param tableEnv The StreamTableEnvironment to use for conversion
* @param schema Custom schema definition
* @param changelogMode Changelog mode configuration
* @return Table representation of the changelog stream
*/
def toChangelogTable(tableEnv: StreamTableEnvironment, schema: Schema, changelogMode: ChangelogMode): TableUsage Examples:
import org.apache.flink.types.{Row, RowKind}
// Changelog stream with Row elements
val changelogStream = env.fromElements(
Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25)),
Row.of(RowKind.UPDATE_AFTER, "Alice", Integer.valueOf(26)),
Row.of(RowKind.DELETE, "Bob", Integer.valueOf(30))
)
// Convert to changelog table
val changelogTable = changelogStream.toChangelogTable(tableEnv)
// With custom schema
val schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build()
val changelogTableWithSchema = changelogStream.toChangelogTable(tableEnv, schema)
// With custom changelog mode
val changelogMode = ChangelogMode.insertOnly()
val changelogTableWithMode = changelogStream.toChangelogTable(tableEnv, schema, changelogMode)Create temporary views from DataStreams for use in SQL queries.
/**
* Create temporary view from the DataStream
* @param tableEnv The StreamTableEnvironment to use for view creation
* @param path The view path/name
*/
def createTemporaryView(tableEnv: StreamTableEnvironment, path: String): Unit
/**
* Create temporary view from the DataStream with custom schema
* @param tableEnv The StreamTableEnvironment to use for view creation
* @param path The view path/name
* @param schema Custom schema definition
*/
def createTemporaryView(tableEnv: StreamTableEnvironment, path: String, schema: Schema): UnitUsage Examples:
val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))
// Create temporary view
dataStream.createTemporaryView(tableEnv, "users")
// Create temporary view with custom schema
val schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build()
dataStream.createTemporaryView(tableEnv, "users_with_schema", schema)
// Now you can query the views with SQL
val result = tableEnv.sqlQuery("SELECT * FROM users WHERE age > 25")Deprecated methods that should not be used in new code:
/**
* Convert with field expressions (legacy)
* @deprecated Use toTable with Schema instead
*/
@deprecated("Use toTable with Schema", "1.18.0")
def toTable(tableEnv: StreamTableEnvironment, fields: Expression*): TableThe legacy method allows specifying field expressions directly, but the new approach using Schema is preferred:
// Legacy approach (deprecated)
val legacyTable = dataStream.toTable(tableEnv, $"name", $"age")
// Preferred approach
val schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build()
val preferredTable = dataStream.toTable(tableEnv, schema)For DataStreamConversions to work properly, the DataStream element type T must be one of:
The type information is automatically derived using Flink's TypeInformation system for Scala types.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12