or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bridge.mdexpressions.mdfunctions.mdindex.mdoperations.mdserialization.mdtypeinfo.mdtypes.md
tile.json

tessl/maven-org-apache-flink--flink-table-api-scala_2-12

Scala API for Apache Flink's Table & SQL ecosystem with type-safe bindings and comprehensive support for Scala-specific types

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-table-api-scala_2.12@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-api-scala_2-12@2.1.0

index.mddocs/

Apache Flink Scala Table API

The Apache Flink Scala Table API provides Scala-specific bindings for Flink's Table & SQL ecosystem. It enables type-safe Scala programming with implicit conversions, operator overloading, and comprehensive support for Scala-specific types like case classes, Option, Either, and collections.

⚠️ Deprecation Notice: All Flink Scala APIs are deprecated as of Flink 1.18.0 (FLIP-265) and will be removed in a future major version. Users should migrate to the Java Table API while continuing to use Scala as their application language.

Package Information

  • Package Name: flink-table-api-scala_2.12
  • Package Type: maven
  • Language: Scala (2.12)
  • Group ID: org.apache.flink
  • Installation: Add to pom.xml:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala_2.12</artifactId>
      <version>2.1.0</version>
    </dependency>

Core Imports

import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

The first import provides access to:

  • Implicit conversions from Scala literals to Expressions
  • Expression operations and operators
  • Type information creation macros
  • Comprehensive type system for Scala types

The bridge import adds:

  • DataStream integration capabilities
  • Conversion utilities between Table and DataStream
  • StreamTableEnvironment for streaming applications

Basic Usage

import org.apache.flink.table.api._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._

// Create execution environment and table environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

// Create table from data with case class
case class Order(id: Int, product: String, amount: Double)
val orders = env.fromElements(
  Order(1, "laptop", 999.99),
  Order(2, "mouse", 29.99)
)

val ordersTable = tEnv.fromDataStream(orders)

// Use Scala-specific syntax with implicit conversions
val result = ordersTable
  .select($"id", $"product", $"amount" * 1.1 as "amountWithTax")
  .where($"amount" > 50.0)

// Convert back to DataStream
val resultStream = tEnv.toDataStream(result)

Architecture

The Flink Scala Table API is built around several key components:

  • Implicit Conversions: Seamless conversion between Scala values and Flink expressions using trait mixins
  • Expression DSL: Rich domain-specific language with operator overloading for natural Scala syntax
  • Type System: Macro-based type information generation for compile-time type safety
  • Scala Type Support: Comprehensive support for Option, Either, Try, case classes, and collections
  • Serialization Layer: Efficient Kryo-based serialization for all Scala types
  • Field Access: Symbol-based ($'field) and string-based field references

Capabilities

Expression and Conversion System

Core expression creation and implicit conversions that enable natural Scala syntax for table operations. Includes literal conversions, field references, and operator overloading.

trait ImplicitExpressionConversions {
  // Constants for window operations
  implicit val UNBOUNDED_ROW: Expression
  implicit val UNBOUNDED_RANGE: Expression  
  implicit val CURRENT_ROW: Expression
  implicit val CURRENT_RANGE: Expression
  
  // Field reference creation
  def $(name: String): Expression
  def col(name: String): Expression
  
  // Literal creation
  def lit(v: Any): Expression
  def lit(v: Any, dataType: DataType): Expression
  
  // Function calls
  def call(path: String, params: Expression*): Expression
  def call(function: UserDefinedFunction, params: Expression*): Expression
  def call(function: Class[_ <: UserDefinedFunction], params: Expression*): Expression
  def callSql(sqlExpression: String): Expression
}

// Implicit classes for expression operations
implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations
implicit class UnresolvedFieldExpression(s: Symbol) extends ImplicitExpressionOperations

Expression and Conversions

Type System and Type Information

Comprehensive type system providing TypeInformation for all Scala types including case classes, collections, Option, Either, and Try. Uses macros for automatic type inference.

object Types {
  // Generic type creation
  def of[T: TypeInformation]: TypeInformation[T]
  
  // Scala-specific types
  val UNIT: TypeInformation[Unit]
  val NOTHING: TypeInformation[Nothing]
  
  // Factory methods for complex types
  def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
  def TUPLE[T: TypeInformation]: TypeInformation[T]
  def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T]
  def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]
  def TRY[A, T <: Try[A]](valueType: TypeInformation[A]): TypeInformation[T]
  
  // Collection types
  def TRAVERSABLE[T: TypeInformation]: TypeInformation[T]
  def OBJECT_ARRAY[E <: AnyRef](elementType: TypeInformation[E]): TypeInformation[Array[E]]
}

trait ImplicitTypeConversions {
  // Macro-based type inference
  implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
  implicit val scalaNothingTypeInfo: TypeInformation[Nothing]
}

Type System

Expression Operations and Operators

Rich set of expression operations including arithmetic, comparison, logical operators, and specialized operations for table transformations. Provides natural Scala operator syntax.

trait ImplicitExpressionOperations {
  // Field aliasing
  def as(name: Symbol, extraNames: Symbol*): Expression
  
  // Comparison operators
  def >(other: Expression): Expression
  def >=(other: Expression): Expression
  def <(other: Expression): Expression
  def <=(other: Expression): Expression
  def ===(other: Expression): Expression
  def !==(other: Expression): Expression
  
  // Logical operators
  def &&(other: Expression): Expression
  def ||(other: Expression): Expression
  def unary_!: Expression
  
  // Arithmetic operators
  def +(other: Expression): Expression
  def -(other: Expression): Expression
  def *(other: Expression): Expression
  def /(other: Expression): Expression
  def %(other: Expression): Expression
  def unary_-: Expression
  def unary_+: Expression
  
  // Specialized operations
  def to(other: Expression): Expression  // Range for column selection
  def ?(ifTrue: Expression, ifFalse: Expression): Expression  // Ternary conditional
  def rows: Expression  // Row interval for windowing
}

Expression Operations

Built-in Functions Library

Extensive collection of built-in functions for date/time operations, mathematical calculations, string manipulation, JSON processing, and utility operations.

// Date and time functions
def currentDate(): Expression
def currentTime(): Expression  
def currentTimestamp(): Expression
def localTime(): Expression
def localTimestamp(): Expression

// Mathematical functions
def pi(): Expression
def e(): Expression
def rand(): Expression
def randInteger(bound: Expression): Expression
def atan2(y: Expression, x: Expression): Expression
def log(base: Expression, antilogarithm: Expression): Expression
def exp(base: Expression): Expression
def power(base: Expression, exponent: Expression): Expression
def mod(numeric1: Expression, numeric2: Expression): Expression

// String functions  
def concat(string: Expression*): Expression
def concatWs(separator: Expression, string: Expression*): Expression
def uuid(): Expression
def upper(string: Expression): Expression
def lower(string: Expression): Expression
def length(string: Expression): Expression
def position(string: Expression, substring: Expression): Expression

// JSON functions
def jsonString(string: Expression): Expression
def jsonObject(keyValue: Expression*): Expression
def jsonArray(element: Expression*): Expression
def jsonValue(jsonString: Expression, path: Expression): Expression
def jsonQuery(jsonString: Expression, path: Expression): Expression

// Utility functions
def nullOf(dataType: DataType): Expression
def ifThenElse(condition: Expression, ifTrue: Expression, ifFalse: Expression): Expression
def coalesce(expr: Expression*): Expression
def isnull(expr: Expression): Expression
def isNotNull(expr: Expression): Expression

Built-in Functions

Scala Type Information Classes

Specialized TypeInformation implementations for Scala-specific types providing efficient serialization and type handling for case classes, Option, Either, Try, and collections.

// Case class type information
abstract class CaseClassTypeInfo[T](
  clazz: Class[T],
  typeParamTypeInfos: Array[TypeInformation[_]],
  fieldTypes: Seq[TypeInformation[_]],
  fieldNames: Seq[String]
) extends TypeInformation[T] {
  def getFieldNames: Array[String]
  def getFieldIndex(fieldName: String): Int
  def getFieldIndices(fields: Array[String]): Array[Int]
  def getTypeAt[X](fieldExpression: String): TypeInformation[X]
  def getFlatFields(): List[FlatFieldDescriptor]
}

// Option type information
class OptionTypeInfo[A, T <: Option[A]](elemTypeInfo: TypeInformation[A]) 
  extends TypeInformation[T]

// Either type information  
class EitherTypeInfo[A, B, T <: Either[A, B]](
  clazz: Class[T], 
  leftTypeInfo: TypeInformation[A], 
  rightTypeInfo: TypeInformation[B]
) extends TypeInformation[T]

// Try type information
class TryTypeInfo[A, T <: Try[A]](valueType: TypeInformation[A]) 
  extends TypeInformation[T]

// Collection type information
abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
  clazz: Class[T], 
  elementTypeInfo: TypeInformation[E]
) extends TypeInformation[T]

// Enumeration type information
class EnumValueTypeInfo[E <: Enumeration](enum: E, clazz: Class[E#Value]) 
  extends TypeInformation[E#Value] with AtomicType[E#Value]

Type Information Classes

DataStream Integration (Bridge API)

Integration layer between Table API and DataStream API enabling seamless conversion between Table and DataStream objects with streaming-specific operations.

trait StreamTableEnvironment extends TableEnvironment {
  // DataStream to Table conversion
  def fromDataStream[T](dataStream: DataStream[T]): Table
  def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table
  
  // Table to DataStream conversion  
  def toDataStream(table: Table): DataStream[Row]
  def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]
  def toChangelogStream(table: Table): DataStream[Row]
}

// Implicit conversion classes
class TableConversions(table: Table)
class DataStreamConversions[T](dataStream: DataStream[T])

DataStream Integration

Runtime Serialization Support

Kryo-based serialization configuration and specialized serializers for efficient handling of all Scala types including collections, tuples, and special types in distributed Flink execution.

// Kryo configuration for Scala types
class FlinkScalaKryoInstantiator extends KryoInstantiator {
  def newKryo: Kryo  // Pre-configured with Scala serializers
}

// Specialized serializers for runtime efficiency
class CaseClassSerializer[T <: Product](/* parameters */) extends TypeSerializer[T]
class OptionSerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Option[A]]
class EitherSerializer[A, B](/* parameters */) extends TypeSerializer[Either[A, B]]
class TrySerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Try[A]]
class TraversableSerializer[T <: TraversableOnce[E], E](/* parameters */) extends TypeSerializer[T]
class EnumValueSerializer[E <: Enumeration](/* parameters */) extends TypeSerializer[E#Value]

Runtime Serialization

Migration Guide

Since all Scala APIs are deprecated, consider these migration approaches:

  1. Immediate: Continue using Scala APIs with deprecation warnings
  2. Hybrid: Use Java Table API with Scala DataStream API
  3. Full Migration: Move to Java Table API entirely

The Java Table API provides equivalent functionality without Scala-specific syntax conveniences.

Common Patterns

Case Class Integration

case class User(id: Int, name: String, email: Option[String])
val userTable = tEnv.fromDataStream(users)  // Automatic type inference

Option Type Handling

val result = table.select($"email".isNotNull ? $"email" : lit("N/A") as "emailDisplay")

Expression Chaining

val processed = table
  .select($"amount" * 1.1 + $"tax" as "total")
  .where($"total" > 100.0)
  .groupBy($"category")
  .select($"category", $"total".sum as "categoryTotal")