or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

datastream-conversions.mdimplicit-conversions.mdindex.mdstatement-sets.mdstream-table-environment.mdtable-conversions.md
tile.json

tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12

Table/SQL API bridge for Scala, enabling interaction between Table API and DataStream/DataSet APIs in Apache Flink

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-table-api-scala-bridge_2.12@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12@2.1.0

index.mddocs/

Flink Table API Scala Bridge

The Flink Table API Scala Bridge provides seamless integration between Apache Flink's Table/SQL API and DataStream API for Scala developers. This bridge enables bidirectional conversion between DataStreams and Tables, allowing mixed declarative SQL operations with procedural stream processing in unified Scala applications.

Package Information

  • Package Name: flink-table-api-scala-bridge_2.12
  • Package Type: maven
  • Language: Scala
  • Maven Coordinates: org.apache.flink:flink-table-api-scala-bridge_2.12:2.1.0
  • Installation: Add to your pom.xml dependencies
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
    <version>2.1.0</version>
</dependency>

Core Imports

import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._

Basic Usage

import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._

// Create execution environment and table environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// Create a DataStream
val dataStream = env.fromElements(
  ("Alice", 25, "Engineer"),
  ("Bob", 30, "Manager"),
  ("Charlie", 35, "Developer")
)

// Convert DataStream to Table
val table = tableEnv.fromDataStream(dataStream, $"name", $"age", $"role")

// Perform SQL operations
val filteredTable = table.filter($"age" > 28)

// Convert back to DataStream
val resultStream = tableEnv.toDataStream(filteredTable)

// Execute the job
env.execute("Table Bridge Example")

Architecture

The Flink Table API Scala Bridge is built around several key components:

  • StreamTableEnvironment: Entry point for creating and managing table environments in streaming contexts
  • Conversion Classes: DataStreamConversions and TableConversions for seamless type-safe conversions
  • Statement Management: StreamStatementSet for batching multiple table operations for optimized execution
  • Implicit Conversions: Scala-idiomatic conversion utilities available through package object
  • Schema Support: Flexible schema definition and type mapping between Scala types and table schemas

Capabilities

Stream Table Environment

Core table environment for streaming applications, providing the entry point for all Table/SQL API operations integrated with DataStream processing.

trait StreamTableEnvironment extends TableEnvironment

object StreamTableEnvironment {
  def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
  def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment
}

Stream Table Environment

DataStream to Table Conversions

Convert DataStreams to Tables with automatic or custom schema derivation, supporting both regular streams and changelog streams.

class DataStreamConversions[T](dataStream: DataStream[T]) {
  def toTable(tableEnv: StreamTableEnvironment): Table
  def toTable(tableEnv: StreamTableEnvironment, schema: Schema): Table
  def toChangelogTable(tableEnv: StreamTableEnvironment): Table
}

DataStream Conversions

Table to DataStream Conversions

Convert Tables back to DataStreams with support for different output modes including insert-only and full changelog streams.

class TableConversions(table: Table) {
  def toDataStream: DataStream[Row]
  def toDataStream[T](targetClass: Class[T]): DataStream[T]
  def toChangelogStream: DataStream[Row]
  def toChangelogStream(targetSchema: Schema): DataStream[Row]
}

Table Conversions

Statement Set Operations

Batch multiple table operations together for optimized execution and resource management.

trait StreamStatementSet extends StatementSet {
  def add(tablePipeline: TablePipeline): StreamStatementSet
  def addInsertSql(statement: String): StreamStatementSet
  def addInsert(targetPath: String, table: Table): StreamStatementSet
  def attachAsDataStream(): Unit
}

Statement Sets

Implicit Conversions

Scala-idiomatic implicit conversions for seamless integration between DataStream and Table APIs.

// Available implicit conversions from package object
implicit def tableConversions(table: Table): TableConversions
implicit def tableToChangelogDataStream(table: Table): DataStream[Row]  
implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T]

Implicit Conversions

Types

// Core Flink types used throughout the API
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.Schema
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.types.AbstractDataType
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{TableDescriptor, TablePipeline, ExplainDetail}
import org.apache.flink.types.RowKind

Deprecation Notice

Important: All APIs in this module are deprecated as of Flink 1.18.0 (FLIP-265) and will be removed in a future major version. Consider migrating to the new unified Table API approach as documented in the Flink migration guide.