Apache Flink's Table API and SQL module for unified stream and batch processing
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table@1.20.00
# Apache Flink Table API
1
2
Apache Flink's Table API and SQL module provides unified stream and batch processing capabilities through both Table API and SQL interfaces. It offers language-integrated query APIs for Java, Scala, and Python with intuitive composition of queries using relational operators such as selection, filter, and join.
3
4
The Table API is built around the core concept of Tables - pipeline descriptions that can be optimized and executed on either bounded or unbounded data streams. This enables both real-time streaming analytics and traditional batch processing with the same unified API.
5
6
## Package Information
7
8
- **Package Name**: org.apache.flink:flink-table
9
- **Package Type**: maven
10
- **Language**: Java (with Scala support)
11
- **Version**: 1.20.2
12
- **Installation**: Add to Maven dependencies:
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-table-api-java</artifactId>
17
<version>1.20.2</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.table.api.TableEnvironment;
25
import org.apache.flink.table.api.EnvironmentSettings;
26
import org.apache.flink.table.api.Table;
27
import org.apache.flink.table.api.DataTypes;
28
import org.apache.flink.table.api.Schema;
29
```
30
31
For DataStream integration:
32
33
```java
34
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
35
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
36
```
37
38
## Basic Usage
39
40
```java
41
import org.apache.flink.table.api.Table;
42
import org.apache.flink.table.api.TableEnvironment;
43
import org.apache.flink.table.api.EnvironmentSettings;
44
import static org.apache.flink.table.api.Expressions.*;
45
46
// Create table environment
47
EnvironmentSettings settings = EnvironmentSettings
48
.newInstance()
49
.inStreamingMode()
50
.build();
51
TableEnvironment tableEnv = TableEnvironment.create(settings);
52
53
// Execute SQL queries
54
tableEnv.executeSql("CREATE TABLE source_table (id INT, name STRING) WITH (...)");
55
Table result = tableEnv.sqlQuery("SELECT id, UPPER(name) as name FROM source_table WHERE id > 10");
56
57
// Table API operations
58
Table filtered = result
59
.select($("id"), $("name"))
60
.where($("id").isGreater(5));
61
62
// Execute and collect results
63
filtered.execute().print();
64
```
65
66
## Architecture
67
68
Apache Flink Table API is built around several key architectural components:
69
70
- **Table Environment**: Central context for creating Table and SQL API programs, managing catalogs and configuration
71
- **Table Abstraction**: Core abstraction representing data transformation pipelines (not actual data)
72
- **Type System**: Comprehensive data type system supporting primitives, complex types, and user-defined types
73
- **Expression API**: Type-safe expression system for table transformations and computations
74
- **Catalog System**: Pluggable metadata management for tables, functions, and databases
75
- **Connector Framework**: Extensible source/sink architecture for data integration
76
- **Function Framework**: Support for scalar, table, and aggregate user-defined functions
77
- **Query Planning**: Advanced query optimization and execution planning with Calcite integration
78
79
## Capabilities
80
81
### Table Environment
82
83
Core entry point and central context for creating Table and SQL API programs. Handles catalog management, SQL execution, and table operations.
84
85
```java { .api }
86
interface TableEnvironment {
87
// Factory methods
88
static TableEnvironment create(EnvironmentSettings settings);
89
static TableEnvironment create(Configuration configuration);
90
91
// SQL execution
92
Table sqlQuery(String query);
93
TableResult executeSql(String statement);
94
95
// Table creation and access
96
Table from(String path);
97
Table from(TableDescriptor descriptor);
98
void createTemporaryTable(String path, TableDescriptor descriptor);
99
void createTable(String path, TableDescriptor descriptor);
100
101
// Catalog and database management
102
void useCatalog(String catalogName);
103
void useDatabase(String databaseName);
104
String[] listTables();
105
String[] listCatalogs();
106
String[] listDatabases();
107
}
108
```
109
110
[Table Environment](./table-environment.md)
111
112
### Table Operations
113
114
Core table abstraction providing fluent API for data transformations, joins, aggregations, and window operations.
115
116
```java { .api }
117
interface Table extends Explainable<Table>, Executable {
118
// Schema access
119
ResolvedSchema getResolvedSchema();
120
121
// Basic transformations
122
Table select(Expression... fields);
123
Table filter(Expression predicate);
124
Table where(Expression predicate);
125
Table as(Expression... fields);
126
Table as(String field, String... fields);
127
Table distinct();
128
129
// Column operations
130
Table addColumns(Expression... fields);
131
Table addOrReplaceColumns(Expression... fields);
132
Table renameColumns(Expression... fields);
133
Table dropColumns(Expression... fields);
134
135
// Aggregations
136
GroupedTable groupBy(Expression... fields);
137
AggregatedTable aggregate(Expression aggregateFunction);
138
AggregatedTable flatAggregate(Expression tableAggregateFunction);
139
140
// Joins
141
Table join(Table right);
142
Table join(Table right, Expression joinPredicate);
143
Table leftOuterJoin(Table right);
144
Table leftOuterJoin(Table right, Expression joinPredicate);
145
Table rightOuterJoin(Table right);
146
Table rightOuterJoin(Table right, Expression joinPredicate);
147
Table fullOuterJoin(Table right);
148
Table fullOuterJoin(Table right, Expression joinPredicate);
149
150
// Lateral joins
151
Table joinLateral(Expression tableFunctionCall);
152
Table joinLateral(Expression tableFunctionCall, Expression joinPredicate);
153
Table leftOuterJoinLateral(Expression tableFunctionCall);
154
Table leftOuterJoinLateral(Expression tableFunctionCall, Expression joinPredicate);
155
156
// Set operations
157
Table union(Table right);
158
Table unionAll(Table right);
159
Table intersect(Table right);
160
Table intersectAll(Table right);
161
Table minus(Table right);
162
Table minusAll(Table right);
163
164
// Function operations
165
Table map(Expression mapFunction);
166
Table flatMap(Expression tableFunction);
167
168
// Ordering and limiting
169
Table orderBy(Expression... fields);
170
Table offset(int offset);
171
Table fetch(int fetch);
172
Table limit(int fetch);
173
Table limit(int offset, int fetch);
174
175
// Window operations
176
GroupWindowedTable window(GroupWindow groupWindow);
177
OverWindowedTable window(OverWindow... overWindows);
178
179
// Temporal operations
180
TemporalTableFunction createTemporalTableFunction(Expression timeAttribute, Expression primaryKey);
181
182
// Insert operations
183
TablePipeline insertInto(String tablePath);
184
TablePipeline insertInto(String tablePath, boolean overwrite);
185
TablePipeline insertInto(TableDescriptor descriptor);
186
TablePipeline insertInto(TableDescriptor descriptor, boolean overwrite);
187
TableResult executeInsert(String tablePath);
188
TableResult executeInsert(String tablePath, boolean overwrite);
189
TableResult executeInsert(TableDescriptor descriptor);
190
TableResult executeInsert(TableDescriptor descriptor, boolean overwrite);
191
192
// Execution
193
TableResult execute();
194
String explain();
195
}
196
```
197
198
[Table Operations](./table-operations.md)
199
200
### DataStream Integration
201
202
Integration layer between Table API and Flink's DataStream API, enabling conversion between tables and data streams for complex pipelines.
203
204
```java { .api }
205
interface StreamTableEnvironment extends TableEnvironment {
206
// Factory methods
207
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
208
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
209
210
// DataStream conversion
211
<T> Table fromDataStream(DataStream<T> dataStream);
212
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
213
DataStream<Row> toDataStream(Table table);
214
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
215
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
216
217
// Changelog conversion
218
Table fromChangelogStream(DataStream<Row> dataStream);
219
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
220
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
221
DataStream<Row> toChangelogStream(Table table);
222
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
223
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
224
225
// Temporary view creation
226
<T> void createTemporaryView(String path, DataStream<T> dataStream);
227
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
228
229
// Statement set creation
230
StreamStatementSet createStatementSet();
231
}
232
```
233
234
[DataStream Integration](./datastream-integration.md)
235
236
### Type System
237
238
Comprehensive type system supporting primitive types, complex nested structures, and user-defined types with full serialization support.
239
240
```java { .api }
241
class DataTypes {
242
// Primitive types
243
static DataType BOOLEAN();
244
static DataType TINYINT();
245
static DataType SMALLINT();
246
static DataType INT();
247
static DataType BIGINT();
248
static DataType FLOAT();
249
static DataType DOUBLE();
250
static DataType DECIMAL(int precision, int scale);
251
252
// String and binary types
253
static DataType CHAR(int length);
254
static DataType VARCHAR(int length);
255
static DataType STRING();
256
static DataType BINARY(int length);
257
static DataType VARBINARY(int length);
258
static DataType BYTES();
259
260
// Temporal types
261
static DataType DATE();
262
static DataType TIME();
263
static DataType TIMESTAMP();
264
static DataType TIMESTAMP_WITH_TIME_ZONE();
265
static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE();
266
267
// Complex types
268
static DataType ARRAY(DataType elementType);
269
static DataType MAP(DataType keyType, DataType valueType);
270
static DataType ROW(Field... fields);
271
}
272
273
class Schema {
274
static Builder newBuilder();
275
Column[] getColumns();
276
List<UniqueConstraint> getPrimaryKey();
277
List<WatermarkSpec> getWatermarkSpecs();
278
}
279
```
280
281
[Type System](./type-system.md)
282
283
### User-Defined Functions
284
285
Framework for creating custom scalar, table, and aggregate functions with support for multiple programming languages.
286
287
```java { .api }
288
abstract class UserDefinedFunction implements FunctionDefinition {
289
// Context and configuration access
290
FunctionContext getFunctionContext();
291
}
292
293
abstract class ScalarFunction extends UserDefinedFunction {
294
// Implementation provided by user
295
// public ReturnType eval(InputType1 input1, InputType2 input2, ...);
296
}
297
298
abstract class TableFunction<T> extends UserDefinedFunction {
299
// Emit results using collect()
300
// public void eval(InputType1 input1, InputType2 input2, ...);
301
protected void collect(T result);
302
}
303
304
abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {
305
public abstract ACC createAccumulator();
306
public abstract T getValue(ACC accumulator);
307
// public void accumulate(ACC accumulator, InputType1 input1, InputType2 input2, ...);
308
}
309
```
310
311
[User-Defined Functions](./user-defined-functions.md)
312
313
### SQL Execution
314
315
Direct SQL query execution with support for DDL, DML, and query operations, including statement batching and result handling.
316
317
```java { .api }
318
interface TableEnvironment {
319
// SQL execution
320
Table sqlQuery(String query);
321
TableResult executeSql(String statement);
322
StatementSet createStatementSet();
323
324
// Function registration
325
void createTemporarySystemFunction(String name, UserDefinedFunction function);
326
void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);
327
}
328
329
interface TableResult {
330
ResultKind getResultKind();
331
ResolvedSchema getResolvedSchema();
332
CloseableIterator<Row> collect();
333
void print();
334
}
335
336
interface StatementSet {
337
StatementSet addInsertSql(String statement);
338
StatementSet addInsert(String targetPath, Table table);
339
TableResult execute();
340
}
341
```
342
343
[SQL Execution](./sql-execution.md)
344
345
### Catalog System
346
347
Pluggable metadata management system for tables, functions, databases, and user-defined catalogs with persistent storage support.
348
349
```java { .api }
350
interface Catalog {
351
// Database operations
352
void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);
353
void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);
354
List<String> listDatabases();
355
CatalogDatabase getDatabase(String databaseName);
356
357
// Table operations
358
void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists);
359
void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);
360
List<String> listTables(String databaseName);
361
CatalogTable getTable(ObjectPath tablePath);
362
363
// Function operations
364
void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists);
365
void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists);
366
List<String> listFunctions(String databaseName);
367
}
368
369
class ObjectIdentifier {
370
static ObjectIdentifier of(String catalogName, String databaseName, String objectName);
371
String getCatalogName();
372
String getDatabaseName();
373
String getObjectName();
374
}
375
```
376
377
[Catalog System](./catalog-system.md)
378
379
## Types
380
381
### Core Data Types
382
383
```java { .api }
384
abstract class DataType {
385
LogicalType getLogicalType();
386
Class<?> getConversionClass();
387
DataType notNull();
388
DataType nullable();
389
DataType bridgedTo(Class<?> newConversionClass);
390
}
391
392
// Primitive wrapper types
393
class AtomicDataType extends DataType { }
394
class CollectionDataType extends DataType { }
395
class FieldsDataType extends DataType { }
396
class KeyValueDataType extends DataType { }
397
```
398
399
### Configuration and Settings
400
401
```java { .api }
402
class EnvironmentSettings {
403
static EnvironmentSettings.Builder newInstance();
404
405
interface Builder {
406
Builder useBlinkPlanner();
407
Builder useAnyPlanner();
408
Builder inStreamingMode();
409
Builder inBatchMode();
410
Builder withConfiguration(Configuration configuration);
411
EnvironmentSettings build();
412
}
413
}
414
415
class TableConfig {
416
Configuration getConfiguration();
417
String getSqlDialect();
418
ZoneId getLocalTimeZone();
419
}
420
```
421
422
### Result and Execution Types
423
424
```java { .api }
425
enum ResultKind {
426
SUCCESS,
427
SUCCESS_WITH_CONTENT
428
}
429
430
enum SqlDialect {
431
DEFAULT,
432
HIVE
433
}
434
435
enum ExplainFormat {
436
TEXT,
437
JSON
438
}
439
440
class Row {
441
Object getField(int pos);
442
Object getField(String name);
443
int getArity();
444
RowKind getKind();
445
}
446
```
447
448
### Exception Types
449
450
```java { .api }
451
class TableException extends RuntimeException { }
452
class TableRuntimeException extends RuntimeException { }
453
class ValidationException extends TableException { }
454
class TableNotExistException extends TableException { }
455
class AmbiguousTableFactoryException extends TableException { }
456
```