or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdsources-sinks.mdsql-integration.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.mdwindow-operations.md
tile.json

tessl/maven-org-apache-flink--flink-table-2-11

Apache Flink Table API for SQL-like operations on streaming and batch data

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-table_2.11@1.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-2-11@1.5.0

index.mddocs/

Flink Table API

Apache Flink Table API provides a high-level declarative API for both stream and batch processing that supports SQL-like queries and operations. It offers a unified programming model allowing developers to write queries using either the Table API (language-embedded query API for Scala and Java) or SQL, enabling operations like filtering, joining, aggregating, and windowing on structured data streams and datasets.

Package Information

  • Package Name: flink-table_2.11
  • Package Type: maven
  • Language: Scala
  • Installation: maven: org.apache.flink/flink-table_2.11/1.5.1

Core Imports

Scala:

import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._

Java:

import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;

Basic Usage

Scala Batch Example:

import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val input: DataSet[(String, Int)] = env.fromElements(
  ("Hello", 2), ("Hello", 5), ("Ciao", 3)
)

val result = input
  .toTable(tEnv, 'word, 'count)
  .groupBy('word)
  .select('word, 'count.avg)

result.print()

Scala Streaming Example:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val input: DataStream[(String, Int)] = env.fromElements(
  ("Hello", 2), ("Hello", 5), ("Ciao", 3)
)

val result = input
  .toTable(tEnv, 'word, 'count)
  .select('word, 'count * 2)

tEnv.toAppendStream[Row](result).print()

Architecture

The Flink Table API is built around several key components:

  • TableEnvironment: Main entry point providing table registration, SQL execution, and configuration
  • Table: Core abstraction representing relational data with fluent query operations
  • Type System: Rich type definitions supporting primitive, complex, and temporal types
  • Expression System: Type-safe expression building for queries and transformations
  • Source/Sink Integration: Pluggable connectors for external data systems
  • Function Framework: User-defined scalar, table, and aggregate functions
  • SQL Integration: Apache Calcite-based SQL parser and optimizer

Capabilities

Table Environment Management

Central management for table operations, SQL execution, and resource configuration. Essential for initializing both batch and streaming table environments.

abstract class TableEnvironment {
  def getConfig: TableConfig
  def scan(tablePath: String*): Table
  def fromTableSource(source: TableSource[_]): Table
  def registerTable(name: String, table: Table): Unit
  def registerTableSource(name: String, tableSource: TableSource[_]): Unit
  def registerFunction(name: String, function: ScalarFunction): Unit
  def sqlQuery(query: String): Table
  def sqlUpdate(stmt: String): Unit
  def listTables(): Array[String]
  def explain(table: Table): String
}

object TableEnvironment {
  def getTableEnvironment(executionEnvironment: ExecutionEnvironment): BatchTableEnvironment
  def getTableEnvironment(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
}

Table Environment

Table Operations and Queries

Core table abstraction providing SQL-like operations for data transformation, filtering, aggregation, and joining.

class Table {
  def select(fields: Expression*): Table
  def select(fields: String): Table
  def filter(predicate: Expression): Table
  def where(predicate: Expression): Table
  def groupBy(fields: Expression*): GroupedTable
  def orderBy(fields: Expression*): Table
  def distinct(): Table
  def join(right: Table): Table
  def join(right: Table, joinPredicate: Expression): Table
  def leftOuterJoin(right: Table, joinPredicate: Expression): Table
  def union(right: Table): Table
  def window(window: Window): WindowedTable
  def as(fields: Expression*): Table
  def getSchema: TableSchema
  def insertInto(tableName: String): Unit
}

Table Operations

Type System and Schema Management

Rich type system supporting primitive, complex, and temporal types with schema definition and validation.

object Types {
  val STRING: TypeInformation[String]
  val BOOLEAN: TypeInformation[java.lang.Boolean] 
  val INT: TypeInformation[java.lang.Integer]
  val LONG: TypeInformation[java.lang.Long]
  val DOUBLE: TypeInformation[java.lang.Double]
  val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]
  def ROW(types: TypeInformation[_]*): TypeInformation[Row]
  def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row]
  def MAP[K, V](keyType: TypeInformation[K], valueType: TypeInformation[V]): TypeInformation[java.util.Map[K, V]]
}

class TableSchema {
  def getFieldNames: Array[String]
  def getFieldTypes: Array[TypeInformation[_]]
}

Type System

User-Defined Functions

Framework for creating custom scalar, table, and aggregate functions with lifecycle management and context access.

abstract class UserDefinedFunction {
  def open(context: FunctionContext): Unit
  def close(): Unit
  def isDeterministic: Boolean
}

abstract class ScalarFunction extends UserDefinedFunction {
  def getResultType(signature: Array[Class[_]]): TypeInformation[_]
  def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]]
}

abstract class TableFunction[T] extends UserDefinedFunction {
  protected def collect(result: T): Unit
}

abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {
  def createAccumulator(): ACC
  def getValue(accumulator: ACC): T
}

User-Defined Functions

Data Sources and Sinks

Pluggable interfaces for integrating external data systems with support for projection and filter pushdown.

trait TableSource[T] {
  def getReturnType: TypeInformation[T]
  def getTableSchema: TableSchema
  def explainSource(): String
}

trait BatchTableSource[T] extends TableSource[T] {
  def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}

trait StreamTableSource[T] extends TableSource[T] {
  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}

trait TableSink[T] {
  def getOutputType: TypeInformation[T]
  def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[T]
}

Sources and Sinks

Window Operations

Time and count-based windowing operations for stream processing with tumbling, sliding, and session window support.

sealed trait Window

case class TumbleWithSize(size: Expression) extends Window
case class SlideWithSize(size: Expression) extends Window  
case class SessionWithGap(gap: Expression) extends Window

class WindowedTable {
  def groupBy(fields: Expression*): WindowGroupedTable
}

case class OverWindow(
  partitionBy: Seq[Expression],
  orderBy: Expression,
  preceding: Expression,
  following: Expression
)

Window Operations

SQL Integration

Direct SQL query execution with full DDL and DML support, leveraging Apache Calcite for parsing and optimization.

// Available on TableEnvironment
def sqlQuery(query: String): Table
def sqlUpdate(stmt: String): Unit

Usage Examples:

// Query execution
val result = tEnv.sqlQuery("SELECT word, COUNT(*) FROM WordTable GROUP BY word")

// DDL operations  
tEnv.sqlUpdate("CREATE TABLE MyTable (name STRING, age INT)")

// DML operations
tEnv.sqlUpdate("INSERT INTO MyTable SELECT name, age FROM SourceTable")

SQL Integration

Types

case class Row(values: Any*)

class TableConfig {
  def getTimeZone: TimeZone
  def setTimeZone(timeZone: TimeZone): Unit
}

trait FunctionContext {
  def getMetricGroup: MetricGroup
  def getCachedFile(name: String): File
}

abstract class QueryConfig
abstract class BatchQueryConfig extends QueryConfig
abstract class StreamQueryConfig extends QueryConfig

trait ExternalCatalog {
  def getTable(tablePath: String*): Table
  def listTables(): java.util.List[String]
  def getDatabase(databaseName: String): ExternalCatalogDatabase
}

trait ExternalCatalogDatabase {
  def getTable(tableName: String): Table
  def listTables(): java.util.List[String]
}

object ExpressionParser {
  def parseExpression(expression: String): Expression
  def parseExpressionList(expression: String): Seq[Expression]
}

class ValidationException(message: String) extends TableException(message)
class TableException(message: String) extends RuntimeException(message)