0
# Flink Table API
1
2
Apache Flink Table API provides a high-level declarative API for both stream and batch processing that supports SQL-like queries and operations. It offers a unified programming model allowing developers to write queries using either the Table API (language-embedded query API for Scala and Java) or SQL, enabling operations like filtering, joining, aggregating, and windowing on structured data streams and datasets.
3
4
## Package Information
5
6
- **Package Name**: flink-table_2.11
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Installation**: `maven: org.apache.flink/flink-table_2.11/1.5.1`
10
11
## Core Imports
12
13
**Scala:**
14
```scala
15
import org.apache.flink.table.api._
16
import org.apache.flink.table.api.scala._
17
import org.apache.flink.streaming.api.scala._
18
import org.apache.flink.api.scala._
19
```
20
21
**Java:**
22
```java
23
import org.apache.flink.table.api.*;
24
import org.apache.flink.table.api.java.*;
25
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26
import org.apache.flink.api.java.ExecutionEnvironment;
27
```
28
29
## Basic Usage
30
31
**Scala Batch Example:**
32
```scala
33
import org.apache.flink.api.scala._
34
import org.apache.flink.table.api.scala._
35
36
val env = ExecutionEnvironment.getExecutionEnvironment
37
val tEnv = TableEnvironment.getTableEnvironment(env)
38
39
val input: DataSet[(String, Int)] = env.fromElements(
40
("Hello", 2), ("Hello", 5), ("Ciao", 3)
41
)
42
43
val result = input
44
.toTable(tEnv, 'word, 'count)
45
.groupBy('word)
46
.select('word, 'count.avg)
47
48
result.print()
49
```
50
51
**Scala Streaming Example:**
52
```scala
53
import org.apache.flink.streaming.api.scala._
54
import org.apache.flink.table.api.scala._
55
56
val env = StreamExecutionEnvironment.getExecutionEnvironment
57
val tEnv = TableEnvironment.getTableEnvironment(env)
58
59
val input: DataStream[(String, Int)] = env.fromElements(
60
("Hello", 2), ("Hello", 5), ("Ciao", 3)
61
)
62
63
val result = input
64
.toTable(tEnv, 'word, 'count)
65
.select('word, 'count * 2)
66
67
tEnv.toAppendStream[Row](result).print()
68
```
69
70
## Architecture
71
72
The Flink Table API is built around several key components:
73
74
- **TableEnvironment**: Main entry point providing table registration, SQL execution, and configuration
75
- **Table**: Core abstraction representing relational data with fluent query operations
76
- **Type System**: Rich type definitions supporting primitive, complex, and temporal types
77
- **Expression System**: Type-safe expression building for queries and transformations
78
- **Source/Sink Integration**: Pluggable connectors for external data systems
79
- **Function Framework**: User-defined scalar, table, and aggregate functions
80
- **SQL Integration**: Apache Calcite-based SQL parser and optimizer
81
82
## Capabilities
83
84
### Table Environment Management
85
86
Central management for table operations, SQL execution, and resource configuration. Essential for initializing both batch and streaming table environments.
87
88
```scala { .api }
89
abstract class TableEnvironment {
90
def getConfig: TableConfig
91
def scan(tablePath: String*): Table
92
def fromTableSource(source: TableSource[_]): Table
93
def registerTable(name: String, table: Table): Unit
94
def registerTableSource(name: String, tableSource: TableSource[_]): Unit
95
def registerFunction(name: String, function: ScalarFunction): Unit
96
def sqlQuery(query: String): Table
97
def sqlUpdate(stmt: String): Unit
98
def listTables(): Array[String]
99
def explain(table: Table): String
100
}
101
102
object TableEnvironment {
103
def getTableEnvironment(executionEnvironment: ExecutionEnvironment): BatchTableEnvironment
104
def getTableEnvironment(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
105
}
106
```
107
108
[Table Environment](./table-environment.md)
109
110
### Table Operations and Queries
111
112
Core table abstraction providing SQL-like operations for data transformation, filtering, aggregation, and joining.
113
114
```scala { .api }
115
class Table {
116
def select(fields: Expression*): Table
117
def select(fields: String): Table
118
def filter(predicate: Expression): Table
119
def where(predicate: Expression): Table
120
def groupBy(fields: Expression*): GroupedTable
121
def orderBy(fields: Expression*): Table
122
def distinct(): Table
123
def join(right: Table): Table
124
def join(right: Table, joinPredicate: Expression): Table
125
def leftOuterJoin(right: Table, joinPredicate: Expression): Table
126
def union(right: Table): Table
127
def window(window: Window): WindowedTable
128
def as(fields: Expression*): Table
129
def getSchema: TableSchema
130
def insertInto(tableName: String): Unit
131
}
132
```
133
134
[Table Operations](./table-operations.md)
135
136
### Type System and Schema Management
137
138
Rich type system supporting primitive, complex, and temporal types with schema definition and validation.
139
140
```scala { .api }
141
object Types {
142
val STRING: TypeInformation[String]
143
val BOOLEAN: TypeInformation[java.lang.Boolean]
144
val INT: TypeInformation[java.lang.Integer]
145
val LONG: TypeInformation[java.lang.Long]
146
val DOUBLE: TypeInformation[java.lang.Double]
147
val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]
148
def ROW(types: TypeInformation[_]*): TypeInformation[Row]
149
def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row]
150
def MAP[K, V](keyType: TypeInformation[K], valueType: TypeInformation[V]): TypeInformation[java.util.Map[K, V]]
151
}
152
153
class TableSchema {
154
def getFieldNames: Array[String]
155
def getFieldTypes: Array[TypeInformation[_]]
156
}
157
```
158
159
[Type System](./type-system.md)
160
161
### User-Defined Functions
162
163
Framework for creating custom scalar, table, and aggregate functions with lifecycle management and context access.
164
165
```scala { .api }
166
abstract class UserDefinedFunction {
167
def open(context: FunctionContext): Unit
168
def close(): Unit
169
def isDeterministic: Boolean
170
}
171
172
abstract class ScalarFunction extends UserDefinedFunction {
173
def getResultType(signature: Array[Class[_]]): TypeInformation[_]
174
def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]]
175
}
176
177
abstract class TableFunction[T] extends UserDefinedFunction {
178
protected def collect(result: T): Unit
179
}
180
181
abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {
182
def createAccumulator(): ACC
183
def getValue(accumulator: ACC): T
184
}
185
```
186
187
[User-Defined Functions](./user-defined-functions.md)
188
189
### Data Sources and Sinks
190
191
Pluggable interfaces for integrating external data systems with support for projection and filter pushdown.
192
193
```scala { .api }
194
trait TableSource[T] {
195
def getReturnType: TypeInformation[T]
196
def getTableSchema: TableSchema
197
def explainSource(): String
198
}
199
200
trait BatchTableSource[T] extends TableSource[T] {
201
def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
202
}
203
204
trait StreamTableSource[T] extends TableSource[T] {
205
def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
206
}
207
208
trait TableSink[T] {
209
def getOutputType: TypeInformation[T]
210
def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[T]
211
}
212
```
213
214
[Sources and Sinks](./sources-sinks.md)
215
216
### Window Operations
217
218
Time and count-based windowing operations for stream processing with tumbling, sliding, and session window support.
219
220
```scala { .api }
221
sealed trait Window
222
223
case class TumbleWithSize(size: Expression) extends Window
224
case class SlideWithSize(size: Expression) extends Window
225
case class SessionWithGap(gap: Expression) extends Window
226
227
class WindowedTable {
228
def groupBy(fields: Expression*): WindowGroupedTable
229
}
230
231
case class OverWindow(
232
partitionBy: Seq[Expression],
233
orderBy: Expression,
234
preceding: Expression,
235
following: Expression
236
)
237
```
238
239
[Window Operations](./window-operations.md)
240
241
### SQL Integration
242
243
Direct SQL query execution with full DDL and DML support, leveraging Apache Calcite for parsing and optimization.
244
245
```scala { .api }
246
// Available on TableEnvironment
247
def sqlQuery(query: String): Table
248
def sqlUpdate(stmt: String): Unit
249
```
250
251
**Usage Examples:**
252
```scala
253
// Query execution
254
val result = tEnv.sqlQuery("SELECT word, COUNT(*) FROM WordTable GROUP BY word")
255
256
// DDL operations
257
tEnv.sqlUpdate("CREATE TABLE MyTable (name STRING, age INT)")
258
259
// DML operations
260
tEnv.sqlUpdate("INSERT INTO MyTable SELECT name, age FROM SourceTable")
261
```
262
263
[SQL Integration](./sql-integration.md)
264
265
## Types
266
267
```scala { .api }
268
case class Row(values: Any*)
269
270
class TableConfig {
271
def getTimeZone: TimeZone
272
def setTimeZone(timeZone: TimeZone): Unit
273
}
274
275
trait FunctionContext {
276
def getMetricGroup: MetricGroup
277
def getCachedFile(name: String): File
278
}
279
280
abstract class QueryConfig
281
abstract class BatchQueryConfig extends QueryConfig
282
abstract class StreamQueryConfig extends QueryConfig
283
284
trait ExternalCatalog {
285
def getTable(tablePath: String*): Table
286
def listTables(): java.util.List[String]
287
def getDatabase(databaseName: String): ExternalCatalogDatabase
288
}
289
290
trait ExternalCatalogDatabase {
291
def getTable(tableName: String): Table
292
def listTables(): java.util.List[String]
293
}
294
295
object ExpressionParser {
296
def parseExpression(expression: String): Expression
297
def parseExpressionList(expression: String): Seq[Expression]
298
}
299
300
class ValidationException(message: String) extends TableException(message)
301
class TableException(message: String) extends RuntimeException(message)
302
```