or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

changelog-processing.mddatastream-integration.mdenvironment-setup.mdimplicit-conversions.mdindex.mdstatement-sets.mdtable-operations.md
tile.json

tessl/maven-org-apache-flink--flink-table-api-scala-bridge_2-12

Apache Flink Table API Scala Bridge provides seamless integration between Flink's Table/SQL API and Scala-specific DataStream operations for stream processing applications.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-table-api-scala-bridge_2.12@1.20.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-api-scala-bridge_2-12@1.20.0

index.mddocs/

Flink Table API Scala Bridge

Overview

The Flink Table API Scala Bridge provides seamless integration between Apache Flink's Table/SQL API and the Scala-specific DataStream API. This bridge library enables developers to convert DataStreams to Tables and vice-versa while leveraging Scala's type system and functional programming paradigms.

Key Features:

  • Bidirectional conversion between DataStream and Table APIs
  • Support for both bounded and unbounded data processing
  • Integration with Flink's SQL engine for complex queries
  • Scala-idiomatic APIs with implicit conversions
  • Event-time processing and watermark propagation
  • Changelog stream processing for updating tables

⚠️ Deprecation Notice: All Flink Scala APIs are deprecated as of version 1.18.0 and will be removed in a future major version. Users should migrate to the Java APIs. See FLIP-265 for details.

Package Information

  • Package: org.apache.flink:flink-table-api-scala-bridge_2.12
  • Version: 1.20.2
  • Language: Scala 2.12
  • License: Apache-2.0

Installation

Add to your build.sbt:

libraryDependencies += "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.20.2"

Or in Maven pom.xml:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
    <version>1.20.2</version>
</dependency>

Core Imports

// Essential imports for Table API and DataStream integration
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._

// Common type imports
import org.apache.flink.types.Row
import org.apache.flink.table.types.DataType
import org.apache.flink.table.connector.ChangelogMode

Basic Usage

Environment Setup

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._

// Create execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

DataStream to Table Conversion

case class User(name: String, age: Int)

val users: DataStream[User] = env.fromCollection(Seq(
  User("Alice", 25),
  User("Bob", 30)
))

// Convert DataStream to Table (automatic schema derivation)
val userTable: Table = tableEnv.fromDataStream(users)

// Or use implicit conversion
import org.apache.flink.table.api.bridge.scala._
val userTable2: Table = users.toTable(tableEnv)

Table to DataStream Conversion

// Convert Table back to DataStream
val resultStream: DataStream[Row] = tableEnv.toDataStream(userTable)

// Or use implicit conversion
val resultStream2: DataStream[Row] = userTable.toDataStream

SQL Operations

// Register table for SQL queries
tableEnv.createTemporaryView("users", userTable)

// Execute SQL query
val sqlResult: Table = tableEnv.sqlQuery(
  "SELECT name, age FROM users WHERE age > 25"
)

val resultStream: DataStream[Row] = tableEnv.toDataStream(sqlResult)

Architecture

The Flink Table API Scala Bridge consists of several key components:

  • StreamTableEnvironment: Main entry point for table operations with DataStream integration
  • Conversion Utilities: Classes providing DataStream ↔ Table conversion methods
  • Implicit Conversions: Package-level implicits for seamless API integration
  • Statement Sets: Batch execution of multiple table operations
  • Schema System: Type-safe schema definitions and transformations

Capabilities

Environment and Setup

Configure and create StreamTableEnvironment instances with various settings and execution modes.

object StreamTableEnvironment {
  def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
  def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment
}

DataStream Integration

Convert between DataStreams and Tables with automatic schema derivation and custom schema definitions.

trait StreamTableEnvironment {
  def fromDataStream[T](dataStream: DataStream[T]): Table
  def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table
  def toDataStream(table: Table): DataStream[Row]
  def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]
}

Changelog Processing

Handle updating tables and changelog streams for complex event processing scenarios.

trait StreamTableEnvironment {
  def fromChangelogStream(dataStream: DataStream[Row]): Table
  def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table
  def toChangelogStream(table: Table): DataStream[Row]
  def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row]
}

Table Operations

Create, register, and manage tables and views within the table environment.

trait StreamTableEnvironment {
  def createTemporaryView[T](path: String, dataStream: DataStream[T]): Unit
  def createTemporaryView[T](path: String, dataStream: DataStream[T], schema: Schema): Unit
}

Statement Sets

Batch multiple table operations together for optimized execution planning.

trait StreamTableEnvironment {
  def createStatementSet(): StreamStatementSet
}

trait StreamStatementSet {
  def addInsert(targetPath: String, table: Table): StreamStatementSet
  def addInsertSql(statement: String): StreamStatementSet
  def execute(): TableResult
}

Implicit Conversions

Package-level implicit conversions for seamless integration between DataStream and Table APIs.

package object scala {
  implicit def tableConversions(table: Table): TableConversions
  implicit def dataStreamConversions[T](dataStream: DataStream[T]): DataStreamConversions[T]
  implicit def tableToChangelogDataStream(table: Table): DataStream[Row]
}

Common Types

// Core Flink types
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.Table
import org.apache.flink.types.Row

// Schema and type system
import org.apache.flink.table.api.Schema
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.AbstractDataType

// Execution environment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings

// Changelog processing
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.types.RowKind

Error Handling

The bridge API throws several types of exceptions:

  • ValidationException: Invalid operations or incompatible types
  • TableException: General table processing errors
  • UnsupportedOperationException: Operations not supported in streaming mode
try {
  val table = tableEnv.fromDataStream(dataStream)
  val result = tableEnv.toDataStream(table)
} catch {
  case e: ValidationException => // Handle validation errors
  case e: TableException => // Handle table processing errors
}

Migration Guide

Since this API is deprecated, consider migrating to the Java Table API:

// Java equivalent
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);