Apache Flink Table API for SQL-like operations on streaming and batch data
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-2-11@1.5.0Apache Flink Table API provides a high-level declarative API for both stream and batch processing that supports SQL-like queries and operations. It offers a unified programming model allowing developers to write queries using either the Table API (language-embedded query API for Scala and Java) or SQL, enabling operations like filtering, joining, aggregating, and windowing on structured data streams and datasets.
maven: org.apache.flink/flink-table_2.11/1.5.1Scala:
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._Java:
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;Scala Batch Example:
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val input: DataSet[(String, Int)] = env.fromElements(
("Hello", 2), ("Hello", 5), ("Ciao", 3)
)
val result = input
.toTable(tEnv, 'word, 'count)
.groupBy('word)
.select('word, 'count.avg)
result.print()Scala Streaming Example:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val input: DataStream[(String, Int)] = env.fromElements(
("Hello", 2), ("Hello", 5), ("Ciao", 3)
)
val result = input
.toTable(tEnv, 'word, 'count)
.select('word, 'count * 2)
tEnv.toAppendStream[Row](result).print()The Flink Table API is built around several key components:
Central management for table operations, SQL execution, and resource configuration. Essential for initializing both batch and streaming table environments.
abstract class TableEnvironment {
def getConfig: TableConfig
def scan(tablePath: String*): Table
def fromTableSource(source: TableSource[_]): Table
def registerTable(name: String, table: Table): Unit
def registerTableSource(name: String, tableSource: TableSource[_]): Unit
def registerFunction(name: String, function: ScalarFunction): Unit
def sqlQuery(query: String): Table
def sqlUpdate(stmt: String): Unit
def listTables(): Array[String]
def explain(table: Table): String
}
object TableEnvironment {
def getTableEnvironment(executionEnvironment: ExecutionEnvironment): BatchTableEnvironment
def getTableEnvironment(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
}Core table abstraction providing SQL-like operations for data transformation, filtering, aggregation, and joining.
class Table {
def select(fields: Expression*): Table
def select(fields: String): Table
def filter(predicate: Expression): Table
def where(predicate: Expression): Table
def groupBy(fields: Expression*): GroupedTable
def orderBy(fields: Expression*): Table
def distinct(): Table
def join(right: Table): Table
def join(right: Table, joinPredicate: Expression): Table
def leftOuterJoin(right: Table, joinPredicate: Expression): Table
def union(right: Table): Table
def window(window: Window): WindowedTable
def as(fields: Expression*): Table
def getSchema: TableSchema
def insertInto(tableName: String): Unit
}Rich type system supporting primitive, complex, and temporal types with schema definition and validation.
object Types {
val STRING: TypeInformation[String]
val BOOLEAN: TypeInformation[java.lang.Boolean]
val INT: TypeInformation[java.lang.Integer]
val LONG: TypeInformation[java.lang.Long]
val DOUBLE: TypeInformation[java.lang.Double]
val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]
def ROW(types: TypeInformation[_]*): TypeInformation[Row]
def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row]
def MAP[K, V](keyType: TypeInformation[K], valueType: TypeInformation[V]): TypeInformation[java.util.Map[K, V]]
}
class TableSchema {
def getFieldNames: Array[String]
def getFieldTypes: Array[TypeInformation[_]]
}Framework for creating custom scalar, table, and aggregate functions with lifecycle management and context access.
abstract class UserDefinedFunction {
def open(context: FunctionContext): Unit
def close(): Unit
def isDeterministic: Boolean
}
abstract class ScalarFunction extends UserDefinedFunction {
def getResultType(signature: Array[Class[_]]): TypeInformation[_]
def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]]
}
abstract class TableFunction[T] extends UserDefinedFunction {
protected def collect(result: T): Unit
}
abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {
def createAccumulator(): ACC
def getValue(accumulator: ACC): T
}Pluggable interfaces for integrating external data systems with support for projection and filter pushdown.
trait TableSource[T] {
def getReturnType: TypeInformation[T]
def getTableSchema: TableSchema
def explainSource(): String
}
trait BatchTableSource[T] extends TableSource[T] {
def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}
trait StreamTableSource[T] extends TableSource[T] {
def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
trait TableSink[T] {
def getOutputType: TypeInformation[T]
def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[T]
}Time and count-based windowing operations for stream processing with tumbling, sliding, and session window support.
sealed trait Window
case class TumbleWithSize(size: Expression) extends Window
case class SlideWithSize(size: Expression) extends Window
case class SessionWithGap(gap: Expression) extends Window
class WindowedTable {
def groupBy(fields: Expression*): WindowGroupedTable
}
case class OverWindow(
partitionBy: Seq[Expression],
orderBy: Expression,
preceding: Expression,
following: Expression
)Direct SQL query execution with full DDL and DML support, leveraging Apache Calcite for parsing and optimization.
// Available on TableEnvironment
def sqlQuery(query: String): Table
def sqlUpdate(stmt: String): UnitUsage Examples:
// Query execution
val result = tEnv.sqlQuery("SELECT word, COUNT(*) FROM WordTable GROUP BY word")
// DDL operations
tEnv.sqlUpdate("CREATE TABLE MyTable (name STRING, age INT)")
// DML operations
tEnv.sqlUpdate("INSERT INTO MyTable SELECT name, age FROM SourceTable")case class Row(values: Any*)
class TableConfig {
def getTimeZone: TimeZone
def setTimeZone(timeZone: TimeZone): Unit
}
trait FunctionContext {
def getMetricGroup: MetricGroup
def getCachedFile(name: String): File
}
abstract class QueryConfig
abstract class BatchQueryConfig extends QueryConfig
abstract class StreamQueryConfig extends QueryConfig
trait ExternalCatalog {
def getTable(tablePath: String*): Table
def listTables(): java.util.List[String]
def getDatabase(databaseName: String): ExternalCatalogDatabase
}
trait ExternalCatalogDatabase {
def getTable(tableName: String): Table
def listTables(): java.util.List[String]
}
object ExpressionParser {
def parseExpression(expression: String): Expression
def parseExpressionList(expression: String): Seq[Expression]
}
class ValidationException(message: String) extends TableException(message)
class TableException(message: String) extends RuntimeException(message)