CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12

Table/SQL API bridge for Scala, enabling interaction between Table API and DataStream/DataSet APIs in Apache Flink

Pending
Overview
Eval results
Files

datastream-conversions.mddocs/

DataStream Conversions

The DataStreamConversions class provides utilities for converting DataStreams to Tables with various schema and conversion options. It serves as a fluent wrapper around DataStream instances to enable easy table conversion.

Capabilities

DataStreamConversions Construction

Wrapper class for DataStream conversion operations.

/**
 * Creates conversion utilities for a DataStream
 * @param dataStream The DataStream to provide conversion methods for
 */
class DataStreamConversions[T](dataStream: DataStream[T])

This class is typically accessed through implicit conversions from the package object:

import org.apache.flink.table.api.bridge.scala._

val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))
// dataStream now has conversion methods available via implicit conversion
val table = dataStream.toTable(tableEnv)

Table Conversion Methods

Convert DataStreams to Tables with automatic or custom schema derivation.

/**
 * Convert to Table with auto-derived schema
 * @param tableEnv The StreamTableEnvironment to use for conversion
 * @return Table representation of the DataStream
 */
def toTable(tableEnv: StreamTableEnvironment): Table

/**
 * Convert to Table with custom schema
 * @param tableEnv The StreamTableEnvironment to use for conversion
 * @param schema Custom schema definition
 * @return Table representation of the DataStream
 */
def toTable(tableEnv: StreamTableEnvironment, schema: Schema): Table

Usage Examples:

import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// Auto-derived schema
val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))
val table = dataStream.toTable(tableEnv)

// Custom schema
val schema = Schema.newBuilder()
  .column("name", DataTypes.STRING())
  .column("age", DataTypes.INT())
  .build()
val tableWithSchema = dataStream.toTable(tableEnv, schema)

Changelog Table Conversion

Convert changelog DataStreams to Tables for handling streams with insert/update/delete operations.

/**
 * Convert changelog DataStream to Table
 * @param tableEnv The StreamTableEnvironment to use for conversion
 * @return Table representation of the changelog stream
 */
def toChangelogTable(tableEnv: StreamTableEnvironment): Table

/**
 * Convert changelog DataStream to Table with custom schema
 * @param tableEnv The StreamTableEnvironment to use for conversion
 * @param schema Custom schema definition
 * @return Table representation of the changelog stream
 */
def toChangelogTable(tableEnv: StreamTableEnvironment, schema: Schema): Table

/**
 * Convert changelog DataStream to Table with custom schema and changelog mode
 * @param tableEnv The StreamTableEnvironment to use for conversion  
 * @param schema Custom schema definition
 * @param changelogMode Changelog mode configuration
 * @return Table representation of the changelog stream
 */
def toChangelogTable(tableEnv: StreamTableEnvironment, schema: Schema, changelogMode: ChangelogMode): Table

Usage Examples:

import org.apache.flink.types.{Row, RowKind}

// Changelog stream with Row elements
val changelogStream = env.fromElements(
  Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25)),
  Row.of(RowKind.UPDATE_AFTER, "Alice", Integer.valueOf(26)),
  Row.of(RowKind.DELETE, "Bob", Integer.valueOf(30))
)

// Convert to changelog table
val changelogTable = changelogStream.toChangelogTable(tableEnv)

// With custom schema
val schema = Schema.newBuilder()
  .column("name", DataTypes.STRING())
  .column("age", DataTypes.INT())
  .build()
val changelogTableWithSchema = changelogStream.toChangelogTable(tableEnv, schema)

// With custom changelog mode
val changelogMode = ChangelogMode.insertOnly()
val changelogTableWithMode = changelogStream.toChangelogTable(tableEnv, schema, changelogMode)

Temporary View Creation

Create temporary views from DataStreams for use in SQL queries.

/**
 * Create temporary view from the DataStream
 * @param tableEnv The StreamTableEnvironment to use for view creation
 * @param path The view path/name
 */
def createTemporaryView(tableEnv: StreamTableEnvironment, path: String): Unit

/**
 * Create temporary view from the DataStream with custom schema
 * @param tableEnv The StreamTableEnvironment to use for view creation
 * @param path The view path/name
 * @param schema Custom schema definition
 */
def createTemporaryView(tableEnv: StreamTableEnvironment, path: String, schema: Schema): Unit

Usage Examples:

val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))

// Create temporary view
dataStream.createTemporaryView(tableEnv, "users")

// Create temporary view with custom schema
val schema = Schema.newBuilder()
  .column("name", DataTypes.STRING())
  .column("age", DataTypes.INT())
  .build()
dataStream.createTemporaryView(tableEnv, "users_with_schema", schema)

// Now you can query the views with SQL
val result = tableEnv.sqlQuery("SELECT * FROM users WHERE age > 25")

Legacy Methods

Deprecated methods that should not be used in new code:

/**
 * Convert with field expressions (legacy)
 * @deprecated Use toTable with Schema instead
 */
@deprecated("Use toTable with Schema", "1.18.0")
def toTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table

The legacy method allows specifying field expressions directly, but the new approach using Schema is preferred:

// Legacy approach (deprecated)
val legacyTable = dataStream.toTable(tableEnv, $"name", $"age")

// Preferred approach
val schema = Schema.newBuilder()
  .column("name", DataTypes.STRING())
  .column("age", DataTypes.INT())
  .build()
val preferredTable = dataStream.toTable(tableEnv, schema)

Type Requirements

For DataStreamConversions to work properly, the DataStream element type T must be one of:

  • Scala case classes with public fields
  • Scala Tuples (up to Tuple22)
  • Row types for changelog streams
  • POJOs with public fields and default constructor
  • Basic types (String, Int, Long, etc.)

The type information is automatically derived using Flink's TypeInformation system for Scala types.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12

docs

datastream-conversions.md

implicit-conversions.md

index.md

statement-sets.md

stream-table-environment.md

table-conversions.md

tile.json