CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12

Table/SQL API bridge for Scala, enabling interaction between Table API and DataStream/DataSet APIs in Apache Flink

Pending
Overview
Eval results
Files

table-conversions.mddocs/

Table Conversions

The TableConversions class provides utilities for converting Tables back to DataStreams with support for different output modes, type specifications, and changelog handling. It serves as a fluent wrapper around Table instances.

Capabilities

TableConversions Construction

Wrapper class for Table conversion operations.

/**
 * Creates conversion utilities for a Table
 * @param table The Table to provide conversion methods for
 */
class TableConversions(table: Table)

This class is typically accessed through implicit conversions from the package object:

import org.apache.flink.table.api.bridge.scala._

val table = tableEnv.fromDataStream(dataStream)
// table now has conversion methods available via implicit conversion
val resultStream = table.toDataStream

DataStream Conversion Methods

Convert Tables to DataStreams for insert-only operations with different type specifications.

/**
 * Convert insert-only Table to DataStream of Row
 * @return DataStream containing Row elements
 */
def toDataStream: DataStream[Row]

/**
 * Convert insert-only Table to DataStream of specified class
 * @param targetClass Target class for the DataStream elements
 * @return DataStream containing elements of the specified class
 */
def toDataStream[T](targetClass: Class[T]): DataStream[T]

/**
 * Convert insert-only Table to DataStream of specified data type
 * @param targetDataType Target data type specification
 * @return DataStream containing elements of the specified data type
 */
def toDataStream[T](targetDataType: AbstractDataType[_]): DataStream[T]

Usage Examples:

import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// Create a table
val table = tableEnv.fromDataStream(env.fromElements(("Alice", 25), ("Bob", 30)))

// Convert to Row DataStream
val rowStream = table.toDataStream

// Convert to typed DataStream using case class
case class Person(name: String, age: Int)
val personStream = table.toDataStream(classOf[Person])

// Convert using data type specification
val typedStream = table.toDataStream[Person](DataTypes.STRUCTURED(classOf[Person]))

Changelog Stream Conversion

Convert Tables to changelog DataStreams for handling streams with insert/update/delete operations.

/**
 * Convert Table to changelog DataStream
 * @return DataStream containing Row elements with changelog information
 */
def toChangelogStream: DataStream[Row]

/**
 * Convert Table to changelog DataStream with custom schema
 * @param targetSchema Custom schema for the output stream
 * @return DataStream containing Row elements with changelog information
 */
def toChangelogStream(targetSchema: Schema): DataStream[Row]

/**
 * Convert Table to changelog DataStream with custom schema and changelog mode
 * @param targetSchema Custom schema for the output stream
 * @param changelogMode Changelog mode configuration
 * @return DataStream containing Row elements with changelog information
 */
def toChangelogStream(targetSchema: Schema, changelogMode: ChangelogMode): DataStream[Row]

Usage Examples:

import org.apache.flink.types.{Row, RowKind}

// Convert to basic changelog stream
val changelogStream = table.toChangelogStream

// Process changelog stream
changelogStream.map { row =>
  val kind = row.getKind
  val name = row.getField(0).toString
  val age = row.getField(1).asInstanceOf[Int]
  
  kind match {
    case RowKind.INSERT => s"Added: $name, $age"
    case RowKind.UPDATE_AFTER => s"Updated: $name, $age"  
    case RowKind.DELETE => s"Deleted: $name, $age"
    case _ => s"Other: $name, $age"
  }
}

// Convert with custom schema
val targetSchema = Schema.newBuilder()
  .column("name", DataTypes.STRING())
  .column("age", DataTypes.INT())
  .build()
val changelogStreamWithSchema = table.toChangelogStream(targetSchema)

// Convert with custom changelog mode
val changelogMode = ChangelogMode.insertOnly()
val changelogStreamWithMode = table.toChangelogStream(targetSchema, changelogMode)

Legacy Methods

Deprecated methods that should not be used in new code:

/**
 * Convert to append-only stream (deprecated)
 * @deprecated Use toDataStream instead
 */
@deprecated("Use toDataStream", "1.18.0")
def toAppendStream[T: TypeInformation]: DataStream[T]

/**
 * Convert to retract stream (deprecated)  
 * @deprecated Use toChangelogStream instead
 */
@deprecated("Use toChangelogStream", "1.18.0")
def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)]

Migration Examples:

// Legacy approach (deprecated)
val appendStream = table.toAppendStream[Person]
val retractStream = table.toRetractStream[Person]

// Preferred approach
val insertOnlyStream = table.toDataStream(classOf[Person])
val changelogStream = table.toChangelogStream

// Process changelog stream to get retract-style tuples if needed
val retractStyleStream = changelogStream.map { row =>
  val isInsert = row.getKind == RowKind.INSERT || row.getKind == RowKind.UPDATE_AFTER
  val person = Person(row.getField(0).toString, row.getField(1).asInstanceOf[Int])
  (isInsert, person)
}

Insert-Only vs Changelog Streams

Insert-Only Streams

Use toDataStream methods when:

  • Your table only contains INSERT operations
  • You're working with bounded data or append-only streams
  • You don't need to handle updates or deletes
// Good for append-only scenarios
val insertOnlyTable = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 18")
val resultStream = insertOnlyTable.toDataStream(classOf[Person])

Changelog Streams

Use toChangelogStream methods when:

  • Your table may contain UPDATE or DELETE operations
  • You're working with aggregations or joins that produce updates
  • You need to handle the full lifecycle of data changes
// Good for aggregation scenarios
val aggregatedTable = tableEnv.sqlQuery("SELECT name, COUNT(*) as count FROM events GROUP BY name")
val changelogStream = aggregatedTable.toChangelogStream

changelogStream.map { row =>
  row.getKind match {
    case RowKind.INSERT => s"New group: ${row.getField(0)}"
    case RowKind.UPDATE_AFTER => s"Updated count for: ${row.getField(0)}"
    case _ => s"Other change for: ${row.getField(0)}"
  }
}

Type Conversion Requirements

For successful Table to DataStream conversion:

  • Target types must be supported by Flink's type system
  • Schema compatibility between table and target type is required
  • Changelog mode compatibility must match the table's characteristics
  • Null handling should be considered for nullable fields

Common target types include:

  • Row: Universal type that can represent any table schema
  • Case classes: Type-safe conversion for structured data
  • Tuples: Simple conversion for tuple-like data
  • POJOs: Java bean-style classes with getters/setters

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12

docs

datastream-conversions.md

implicit-conversions.md

index.md

statement-sets.md

stream-table-environment.md

table-conversions.md

tile.json