CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12

Table/SQL API bridge for Scala, enabling interaction between Table API and DataStream/DataSet APIs in Apache Flink

Pending
Overview
Eval results
Files

stream-table-environment.mddocs/

Stream Table Environment

The StreamTableEnvironment is the central entry point for creating Table and SQL API programs that integrate with Flink's DataStream API in Scala. It provides unified processing for both bounded and unbounded data streams.

Capabilities

Environment Creation

Factory methods for creating StreamTableEnvironment instances with default or custom settings.

object StreamTableEnvironment {
  /**
   * Creates a StreamTableEnvironment with default settings
   * @param executionEnvironment The StreamExecutionEnvironment to use
   * @return A new StreamTableEnvironment instance
   */
  def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
  
  /**
   * Creates a StreamTableEnvironment with custom settings
   * @param executionEnvironment The StreamExecutionEnvironment to use  
   * @param settings Custom environment settings
   * @return A new StreamTableEnvironment instance
   */
  def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment
}

Usage Example:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// With custom settings
val settings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()
val customTableEnv = StreamTableEnvironment.create(env, settings)

DataStream to Table Conversion

Convert DataStreams to Tables with automatic schema derivation or custom schemas.

/**
 * Convert DataStream to Table with auto-derived schema
 * @param dataStream The DataStream to convert
 * @return Table representation of the DataStream
 */
def fromDataStream[T](dataStream: DataStream[T]): Table

/**
 * Convert DataStream to Table with custom schema
 * @param dataStream The DataStream to convert
 * @param schema Custom schema definition
 * @return Table representation of the DataStream
 */
def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table

/**
 * Convert changelog DataStream to Table
 * @param dataStream Changelog DataStream with Row elements
 * @return Table representation of the changelog stream
 */
def fromChangelogStream(dataStream: DataStream[Row]): Table

/**
 * Convert changelog DataStream to Table with custom schema
 * @param dataStream Changelog DataStream with Row elements
 * @param schema Custom schema definition
 * @return Table representation of the changelog stream
 */
def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table

/**
 * Convert changelog DataStream to Table with custom schema and changelog mode
 * @param dataStream Changelog DataStream with Row elements
 * @param schema Custom schema definition  
 * @param changelogMode Changelog mode configuration
 * @return Table representation of the changelog stream
 */
def fromChangelogStream(dataStream: DataStream[Row], schema: Schema, changelogMode: ChangelogMode): Table

Usage Examples:

// Auto-derived schema
val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))
val table = tableEnv.fromDataStream(dataStream)

// Custom schema
val schema = Schema.newBuilder()
  .column("name", DataTypes.STRING())
  .column("age", DataTypes.INT())
  .build()
val tableWithSchema = tableEnv.fromDataStream(dataStream, schema)

// Changelog stream
val changelogStream = env.fromElements(
  Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25)),
  Row.of(RowKind.UPDATE_AFTER, "Alice", Integer.valueOf(26))
)
val changelogTable = tableEnv.fromChangelogStream(changelogStream)

Table to DataStream Conversion

Convert Tables back to DataStreams with different output modes and type specifications.

/**
 * Convert insert-only Table to DataStream of Row
 * @param table The Table to convert
 * @return DataStream containing Row elements
 */
def toDataStream(table: Table): DataStream[Row]

/**
 * Convert insert-only Table to DataStream of specified class
 * @param table The Table to convert
 * @param targetClass Target class for the DataStream elements
 * @return DataStream containing elements of the specified class
 */
def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]

/**
 * Convert insert-only Table to DataStream of specified data type
 * @param table The Table to convert
 * @param targetDataType Target data type specification
 * @return DataStream containing elements of the specified data type
 */
def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T]

/**
 * Convert Table to changelog DataStream
 * @param table The Table to convert
 * @return DataStream containing Row elements with changelog information
 */
def toChangelogStream(table: Table): DataStream[Row]

/**
 * Convert Table to changelog DataStream with custom schema
 * @param table The Table to convert
 * @param targetSchema Custom schema for the output stream
 * @return DataStream containing Row elements with changelog information
 */
def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row]

/**
 * Convert Table to changelog DataStream with custom schema and changelog mode
 * @param table The Table to convert
 * @param targetSchema Custom schema for the output stream
 * @param changelogMode Changelog mode configuration
 * @return DataStream containing Row elements with changelog information
 */
def toChangelogStream(table: Table, targetSchema: Schema, changelogMode: ChangelogMode): DataStream[Row]

Usage Examples:

// Convert to Row DataStream
val rowStream = tableEnv.toDataStream(table)

// Convert to typed DataStream
case class Person(name: String, age: Int)
val personStream = tableEnv.toDataStream(table, classOf[Person])

// Convert to changelog stream
val changelogStream = tableEnv.toChangelogStream(table)

View Creation

Create temporary views from DataStreams for use in SQL queries.

/**
 * Create temporary view from DataStream
 * @param path The view path/name
 * @param dataStream The DataStream to create view from
 */
def createTemporaryView[T](path: String, dataStream: DataStream[T]): Unit

/**
 * Create temporary view from DataStream with custom schema
 * @param path The view path/name
 * @param dataStream The DataStream to create view from
 * @param schema Custom schema definition
 */
def createTemporaryView[T](path: String, dataStream: DataStream[T], schema: Schema): Unit

Usage Example:

val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))
tableEnv.createTemporaryView("users", dataStream)

// Now you can query the view with SQL
val result = tableEnv.sqlQuery("SELECT * FROM users WHERE age > 25")

Statement Set Creation

Create statement sets for batching multiple table operations.

/**
 * Create statement set for batch operations
 * @return A new StreamStatementSet instance
 */
def createStatementSet(): StreamStatementSet

Usage Example:

val statementSet = tableEnv.createStatementSet()
statementSet
  .addInsert("sink_table_1", table1)
  .addInsert("sink_table_2", table2)
  .attachAsDataStream()

Legacy Deprecated Methods

These methods are deprecated and should not be used in new code:

// Deprecated - use fromDataStream with Schema instead
@deprecated("Use fromDataStream with Schema", "1.18.0")
def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table

// Deprecated - use createTemporaryView with Schema instead
@deprecated("Use createTemporaryView with Schema", "1.18.0") 
def createTemporaryView[T](path: String, dataStream: DataStream[T], fields: Expression*): Unit

// Deprecated - use toDataStream instead
@deprecated("Use toDataStream", "1.18.0")
def toAppendStream[T: TypeInformation](table: Table): DataStream[T]

// Deprecated - use toChangelogStream instead
@deprecated("Use toChangelogStream", "1.18.0")
def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)]

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12

docs

datastream-conversions.md

implicit-conversions.md

index.md

statement-sets.md

stream-table-environment.md

table-conversions.md

tile.json