CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12

Table/SQL API bridge for Scala, enabling interaction between Table API and DataStream/DataSet APIs in Apache Flink

Pending
Overview
Eval results
Files

implicit-conversions.mddocs/

Implicit Conversions

The Flink Table API Scala Bridge provides Scala-idiomatic implicit conversions for seamless integration between DataStream and Table APIs. These conversions are available through the package object and enable fluent, type-safe operations.

Available Implicit Conversions

All implicit conversions are provided through the org.apache.flink.table.api.bridge.scala package object and are automatically available when you import the package.

Import Statement

import org.apache.flink.table.api.bridge.scala._

This import makes all implicit conversions available in scope.

Capabilities

Table to TableConversions

Implicit conversion from Table to TableConversions for fluent DataStream conversion methods.

/**
 * Conversions from Table to DataStream
 * @param table The table to provide conversion methods for
 * @return TableConversions wrapper with conversion methods
 */
implicit def tableConversions(table: Table): TableConversions

Usage Example:

import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

val table = tableEnv.fromDataStream(dataStream)

// These methods are available via implicit conversion:
val rowStream = table.toDataStream        // Returns DataStream[Row]
val changelogStream = table.toChangelogStream  // Returns DataStream[Row]

// Type-safe conversion
case class Person(name: String, age: Int)
val personStream = table.toDataStream(classOf[Person])

Table to DataStream Direct Conversion

Direct implicit conversion from Table to changelog DataStream for the most common use case.

/**
 * Conversions from Table to DataStream of changelog entries
 * Provides direct conversion to changelog DataStream for convenience
 * @param table The table to convert
 * @return DataStream[Row] containing changelog entries
 */
implicit def tableToChangelogDataStream(table: Table): DataStream[Row]

Usage Example:

import org.apache.flink.table.api.bridge.scala._

val table = tableEnv.sqlQuery("SELECT name, COUNT(*) FROM events GROUP BY name")

// Direct implicit conversion to changelog stream
val changelogStream: DataStream[Row] = table

// Process the changelog stream
changelogStream.map { row =>
  s"${row.getKind}: ${row.getField(0)} -> ${row.getField(1)}"
}

Important Note: This conversion only works with Tables that are part of a Scala StreamTableEnvironment. It will throw a ValidationException if used with tables from other environments.

DataStream to DataStreamConversions

Implicit conversion from DataStream to DataStreamConversions for fluent Table conversion methods.

/**
 * Conversions from DataStream to Table
 * @param set The DataStream to provide conversion methods for
 * @return DataStreamConversions wrapper with conversion methods
 */
implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T]

Usage Example:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._

val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))

// These methods are available via implicit conversion:
val table = dataStream.toTable(tableEnv)
val tableWithSchema = dataStream.toTable(tableEnv, schema)
dataStream.createTemporaryView(tableEnv, "my_view")

// For changelog streams
val changelogStream = env.fromElements(
  Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25))
)
val changelogTable = changelogStream.toChangelogTable(tableEnv)

Complete Usage Examples

Basic Conversion Example

import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// Create DataStream
val dataStream = env.fromElements(
  ("Alice", 25, "Engineer"),
  ("Bob", 30, "Manager"),
  ("Charlie", 35, "Developer")
)

// Convert to Table using implicit conversion
val table = dataStream.toTable(tableEnv)

// Apply table operations
val filteredTable = table.filter($"_2" > 28) // age > 28

// Convert back to DataStream using implicit conversion  
val resultStream = filteredTable.toDataStream

// Or direct implicit conversion to changelog stream
val changelogResult: DataStream[Row] = filteredTable

resultStream.print("Results")
env.execute("Implicit Conversions Example")

Advanced Schema Example

import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

case class Employee(name: String, age: Int, department: String)

val dataStream = env.fromElements(
  Employee("Alice", 25, "Engineering"), 
  Employee("Bob", 30, "Marketing"),
  Employee("Charlie", 35, "Engineering")
)

// Define custom schema
val schema = Schema.newBuilder()
  .column("name", DataTypes.STRING())
  .column("age", DataTypes.INT())
  .column("department", DataTypes.STRING())
  .build()

// Convert with schema using implicit conversion
val table = dataStream.toTable(tableEnv, schema)

// Create temporary view using implicit conversion
dataStream.createTemporaryView(tableEnv, "employees", schema)

// Query the view
val engineeringTable = tableEnv.sqlQuery("""
  SELECT name, age 
  FROM employees 
  WHERE department = 'Engineering' AND age > 30
""")

// Convert back to typed DataStream using implicit conversion
val engineeringStream = engineeringTable.toDataStream(classOf[Employee])

engineeringStream.print("Engineering Results")
env.execute("Schema Example")

Changelog Processing Example

import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.types.{Row, RowKind}

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// Create changelog stream
val changelogStream = env.fromElements(
  Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25)),
  Row.of(RowKind.UPDATE_AFTER, "Alice", Integer.valueOf(26)), 
  Row.of(RowKind.DELETE, "Bob", Integer.valueOf(30)),
  Row.of(RowKind.INSERT, "Charlie", Integer.valueOf(35))
)

// Convert to table using implicit conversion
val changelogTable = changelogStream.toChangelogTable(tableEnv)

// Apply aggregation (will produce more changelog events)
val aggregatedTable = changelogTable
  .groupBy($"f1" >= 30) // group by age >= 30
  .select($"f1" >= 30 as "age_group", $"f0".count() as "count")

// Convert back to changelog stream using direct implicit conversion
val resultChangelogStream: DataStream[Row] = aggregatedTable

// Process changelog events  
resultChangelogStream.map { row =>
  val kind = row.getKind
  val ageGroup = row.getField(0).asInstanceOf[Boolean] 
  val count = row.getField(1).asInstanceOf[Long]
  
  val group = if (ageGroup) "30+" else "under 30"
  s"$kind: $group has $count people"
}.print("Changelog Results")

env.execute("Changelog Example")

Error Handling

ValidationException for Invalid Conversions

The direct tableToChangelogDataStream implicit conversion performs validation:

try {
  val invalidTable = batchTableEnv.fromValues(1, 2, 3) // Not from StreamTableEnvironment
  val stream: DataStream[Row] = invalidTable // This will throw ValidationException
} catch {
  case e: ValidationException =>
    println(s"Cannot convert table: ${e.getMessage}")
}

Type Safety

Implicit conversions maintain type safety:

val dataStream: DataStream[(String, Int)] = env.fromElements(("Alice", 25))
val table = dataStream.toTable(tableEnv) // Type information preserved

// This will work:
case class Person(name: String, age: Int)
val personStream = table.toDataStream(classOf[Person]) 

// This will fail at runtime if types don't match:
case class WrongType(id: Int, value: String)
// val wrongStream = table.toDataStream(classOf[WrongType]) // Runtime error

Best Practices

  1. Always import the package: import org.apache.flink.table.api.bridge.scala._
  2. Use type-safe conversions: Prefer strongly-typed conversions when possible
  3. Handle changelog appropriately: Use changelog streams for operations that produce updates
  4. Schema compatibility: Ensure target types match table schemas
  5. Environment consistency: Keep tables and streams within the same environment type

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12

docs

datastream-conversions.md

implicit-conversions.md

index.md

statement-sets.md

stream-table-environment.md

table-conversions.md

tile.json