0
# Core Table Operations
1
2
This document covers the essential table environment setup, table creation, and basic table operations in Apache Flink Table Uber Blink.
3
4
## Table Environment Creation
5
6
The TableEnvironment is the main entry point for all table operations.
7
8
### Pure Table Environment
9
10
```java { .api }
11
static TableEnvironment create(EnvironmentSettings settings);
12
static TableEnvironment create(Configuration configuration);
13
```
14
15
**Usage:**
16
17
```java
18
// Create with settings builder
19
EnvironmentSettings settings = EnvironmentSettings.newInstance()
20
.useBlinkPlanner()
21
.inStreamingMode()
22
.build();
23
TableEnvironment tEnv = TableEnvironment.create(settings);
24
25
// Create with configuration
26
Configuration config = new Configuration();
27
config.setString("table.sql-dialect", "default");
28
TableEnvironment tEnv = TableEnvironment.create(config);
29
```
30
31
## Environment Settings
32
33
```java { .api }
34
class EnvironmentSettings {
35
static EnvironmentSettings.Builder newInstance();
36
37
interface Builder {
38
Builder useBlinkPlanner();
39
Builder inStreamingMode();
40
Builder inBatchMode();
41
Builder withBuiltInCatalogName(String catalogName);
42
Builder withBuiltInDatabaseName(String databaseName);
43
Builder withConfiguration(Configuration configuration);
44
EnvironmentSettings build();
45
}
46
}
47
```
48
49
## SQL Execution
50
51
### Direct SQL Execution
52
53
```java { .api }
54
TableResult executeSql(String statement);
55
Table sqlQuery(String query);
56
```
57
58
**Usage:**
59
60
```java
61
// Execute DDL/DML statements
62
TableResult result = tEnv.executeSql(
63
"CREATE TABLE users (id INT, name STRING, age INT) WITH ('connector' = 'filesystem', 'path' = '/data/users', 'format' = 'json')"
64
);
65
66
// Execute queries returning tables
67
Table userTable = tEnv.sqlQuery("SELECT * FROM users WHERE age > 18");
68
```
69
70
## Table Creation and Management
71
72
### Creating Tables
73
74
```java { .api }
75
void createTable(String path, TableDescriptor descriptor);
76
void createTemporaryTable(String path, TableDescriptor descriptor);
77
void createTemporaryView(String path, Table view);
78
void dropTable(String path);
79
void dropTemporaryTable(String path);
80
void dropTemporaryView(String path);
81
boolean tableExists(String path);
82
```
83
84
### Table Access
85
86
```java { .api }
87
Table from(String path);
88
Table scan(String... tablePath);
89
Table fromValues(Object... values);
90
Table fromValues(AbstractDataType<?> rowType, Object... values);
91
Table fromValues(DataType rowType, Object... values);
92
```
93
94
**Usage:**
95
96
```java
97
// Create table with descriptor
98
TableDescriptor descriptor = TableDescriptor.forConnector("filesystem")
99
.schema(Schema.newBuilder()
100
.column("id", DataTypes.INT())
101
.column("name", DataTypes.STRING())
102
.column("age", DataTypes.INT())
103
.build())
104
.option("path", "/data/users")
105
.option("format", "json")
106
.build();
107
108
tEnv.createTable("users", descriptor);
109
110
// Access existing table
111
Table userTable = tEnv.from("users");
112
113
// Create table from values
114
Table valuesTable = tEnv.fromValues(
115
DataTypes.ROW(
116
DataTypes.FIELD("id", DataTypes.INT()),
117
DataTypes.FIELD("name", DataTypes.STRING())
118
),
119
Row.of(1, "Alice"),
120
Row.of(2, "Bob")
121
);
122
123
// Create temporary view
124
tEnv.createTemporaryView("user_view", userTable.where($"age".isGreater(18)));
125
```
126
127
## Basic Table Operations
128
129
The Table interface provides fluent API for data transformations.
130
131
### Selection and Projection
132
133
```java { .api }
134
interface Table {
135
Table select(Expression... fields);
136
Table addColumns(Expression... fields);
137
Table addOrReplaceColumns(Expression... fields);
138
Table renameColumns(Expression... fields);
139
Table dropColumns(Expression... fields);
140
}
141
```
142
143
### Filtering and Sorting
144
145
```java { .api }
146
interface Table {
147
Table where(Expression predicate);
148
Table filter(Expression predicate);
149
Table distinct();
150
Table orderBy(Expression... fields);
151
Table offset(int offset);
152
Table fetch(int fetch);
153
Table limit(int fetch);
154
}
155
```
156
157
### Grouping and Aggregation
158
159
```java { .api }
160
interface Table {
161
GroupedTable groupBy(Expression... fields);
162
WindowGroupedTable window(GroupWindow window);
163
}
164
165
interface GroupedTable {
166
Table select(Expression... fields);
167
AggregatedTable aggregate(Expression aggregateFunction);
168
FlatAggregateTable flatAggregate(Expression tableAggregateFunction);
169
}
170
```
171
172
**Usage:**
173
174
```java
175
Table users = tEnv.from("users");
176
177
// Select specific columns
178
Table result = users.select($("name"), $("age"));
179
180
// Filter data
181
Table adults = users.where($("age").isGreaterOrEqual(18));
182
183
// Group and aggregate
184
Table ageGroups = users
185
.groupBy($("age"))
186
.select($("age"), $("age").count().as("count"));
187
```
188
189
## Joins
190
191
```java { .api }
192
interface Table {
193
Table join(Table right);
194
Table join(Table right, Expression joinPredicate);
195
Table leftOuterJoin(Table right);
196
Table leftOuterJoin(Table right, Expression joinPredicate);
197
Table rightOuterJoin(Table right);
198
Table rightOuterJoin(Table right, Expression joinPredicate);
199
Table fullOuterJoin(Table right);
200
Table fullOuterJoin(Table right, Expression joinPredicate);
201
}
202
```
203
204
**Usage:**
205
206
```java
207
Table users = tEnv.from("users");
208
Table orders = tEnv.from("orders");
209
210
// Inner join
211
Table userOrders = users.join(orders, $("users.id").isEqual($("orders.user_id")));
212
213
// Left outer join
214
Table usersWithOrders = users.leftOuterJoin(orders, $("users.id").isEqual($("orders.user_id")));
215
```
216
217
## Set Operations
218
219
```java { .api }
220
interface Table {
221
Table unionAll(Table right);
222
Table union(Table right);
223
Table intersect(Table right);
224
Table intersectAll(Table right);
225
Table minus(Table right);
226
Table minusAll(Table right);
227
}
228
```
229
230
## Table Execution
231
232
```java { .api }
233
interface Table {
234
TableResult execute();
235
void executeInsert(String tablePath);
236
TableResult executeInsert(String tablePath, boolean overwrite);
237
CloseableIterator<Row> execute().collect();
238
}
239
```
240
241
## Statement Sets (Multi-Sink Operations)
242
243
For executing multiple DML statements as a single job:
244
245
```java { .api }
246
interface TableEnvironment {
247
StatementSet createStatementSet();
248
}
249
250
interface StatementSet {
251
StatementSet addInsertSql(String statement);
252
StatementSet addInsert(String targetPath, Table table);
253
StatementSet addInsert(String targetPath, Table table, boolean overwrite);
254
String explain(ExplainDetail... extraDetails);
255
TableResult execute();
256
}
257
```
258
259
**Usage:**
260
261
```java
262
// Create statement set for multi-sink execution
263
StatementSet statementSet = tEnv.createStatementSet();
264
265
// Add multiple insert operations
266
statementSet.addInsert("sink_table_1", processedData);
267
statementSet.addInsertSql("INSERT INTO sink_table_2 SELECT * FROM source WHERE condition = true");
268
269
// Execute all operations as single job
270
TableResult result = statementSet.execute();
271
```
272
273
```java
274
Table result = users.select($("name"), $("age")).where($("age").isGreater(21));
275
276
// Execute and print results
277
result.execute().print();
278
279
// Collect results as iterator
280
try (CloseableIterator<Row> iterator = result.execute().collect()) {
281
while (iterator.hasNext()) {
282
Row row = iterator.next();
283
System.out.println(row);
284
}
285
}
286
287
// Insert into another table
288
result.executeInsert("young_adults");
289
```
290
291
## Schema Information
292
293
```java { .api }
294
interface Table {
295
ResolvedSchema getResolvedSchema();
296
TableSchema getSchema(); // Deprecated
297
String explain();
298
String explain(ExplainDetail... extraDetails);
299
}
300
```
301
302
## Table Configuration
303
304
```java { .api }
305
interface TableEnvironment {
306
TableConfig getConfig();
307
Configuration getConfiguration();
308
}
309
310
class TableConfig {
311
Configuration getConfiguration();
312
ZoneId getLocalTimeZone();
313
void setLocalTimeZone(ZoneId zoneId);
314
}
315
```
316
317
## Error Handling
318
319
Common exceptions when working with table operations:
320
321
```java { .api }
322
class TableException extends RuntimeException;
323
class SqlParserException extends TableException;
324
class ValidationException extends TableException;
325
class CodeGenException extends TableException;
326
```
327
328
## Types
329
330
```java { .api }
331
interface Table extends Serializable;
332
333
class ResolvedSchema {
334
List<Column> getColumns();
335
List<String> getColumnNames();
336
List<DataType> getColumnDataTypes();
337
Optional<UniqueConstraint> getPrimaryKey();
338
List<UniqueConstraint> getUniqueConstraints();
339
}
340
341
enum ExplainDetail {
342
CHANGELOG_MODE,
343
ESTIMATED_COST,
344
JSON_EXECUTION_PLAN
345
}
346
347
interface CloseableIterator<T> extends Iterator<T>, AutoCloseable;
348
```