0
# Table Environment
1
2
TableEnvironment is the entry point and central context for creating Table and SQL API programs. It provides a unified interface for both streaming and batch processing, managing catalogs, databases, functions, and configuration.
3
4
## Capabilities
5
6
### Environment Creation
7
8
Creates table environments with specific settings for streaming or batch processing.
9
10
```java { .api }
11
/**
12
* Creates a table environment that is the entry point for Table and SQL API programs
13
* @param settings The environment settings used to instantiate the TableEnvironment
14
* @return TableEnvironment instance
15
*/
16
static TableEnvironment create(EnvironmentSettings settings);
17
18
/**
19
* Creates a table environment with configuration
20
* @param configuration The configuration for the table environment
21
* @return TableEnvironment instance
22
*/
23
static TableEnvironment create(Configuration configuration);
24
```
25
26
**Usage Examples:**
27
28
```java
29
// Streaming environment
30
EnvironmentSettings streamingSettings = EnvironmentSettings
31
.newInstance()
32
.inStreamingMode()
33
.build();
34
TableEnvironment streamingEnv = TableEnvironment.create(streamingSettings);
35
36
// Batch environment
37
EnvironmentSettings batchSettings = EnvironmentSettings
38
.newInstance()
39
.inBatchMode()
40
.build();
41
TableEnvironment batchEnv = TableEnvironment.create(batchSettings);
42
43
// With custom configuration
44
Configuration config = new Configuration();
45
config.setString("table.exec.mini-batch.enabled", "true");
46
TableEnvironment configEnv = TableEnvironment.create(config);
47
```
48
49
### SQL Query Execution
50
51
Executes SQL queries and statements with support for both queries and DDL/DML operations.
52
53
```java { .api }
54
/**
55
* Evaluates a SQL query on registered tables and returns the result as a Table
56
* @param query SQL query string
57
* @return Table representing the query result
58
*/
59
Table sqlQuery(String query);
60
61
/**
62
* Executes a single SQL statement and returns the execution result
63
* @param statement SQL statement (DDL, DML, or query)
64
* @return TableResult containing execution information and data
65
*/
66
TableResult executeSql(String statement);
67
```
68
69
**Usage Examples:**
70
71
```java
72
// Query execution
73
Table result = tableEnv.sqlQuery(
74
"SELECT customer_id, SUM(amount) as total " +
75
"FROM orders " +
76
"WHERE order_date >= '2024-01-01' " +
77
"GROUP BY customer_id"
78
);
79
80
// DDL execution
81
tableEnv.executeSql(
82
"CREATE TABLE orders (" +
83
" order_id BIGINT," +
84
" customer_id BIGINT," +
85
" amount DECIMAL(10,2)," +
86
" order_date DATE" +
87
") WITH (" +
88
" 'connector' = 'kafka'," +
89
" 'topic' = 'orders'" +
90
")"
91
);
92
93
// DML execution
94
TableResult insertResult = tableEnv.executeSql(
95
"INSERT INTO target_table SELECT * FROM source_table WHERE active = true"
96
);
97
```
98
99
### Table Registration and Access
100
101
Manages table creation, registration, and access within the table environment.
102
103
```java { .api }
104
/**
105
* Creates a Table from a registered table path
106
* @param path Table path in catalog.database.table format
107
* @return Table instance for the specified path
108
*/
109
Table from(String path);
110
111
/**
112
* Creates a Table from a table descriptor
113
* @param descriptor TableDescriptor defining the table structure and properties
114
* @return Table instance based on the descriptor
115
*/
116
Table from(TableDescriptor descriptor);
117
118
/**
119
* Creates a temporary table in the current catalog and database
120
* @param path Table path (can be simple name or catalog.database.table)
121
* @param descriptor TableDescriptor defining the table
122
*/
123
void createTemporaryTable(String path, TableDescriptor descriptor);
124
125
/**
126
* Creates a persistent table in the catalog
127
* @param path Table path in catalog.database.table format
128
* @param descriptor TableDescriptor defining the table
129
*/
130
void createTable(String path, TableDescriptor descriptor);
131
132
/**
133
* Creates a temporary view from a Table
134
* @param path View name or path
135
* @param table Table to register as a view
136
*/
137
void createTemporaryView(String path, Table table);
138
```
139
140
**Usage Examples:**
141
142
```java
143
// Access existing table
144
Table orders = tableEnv.from("default_catalog.orders_db.orders");
145
146
// Create table from descriptor
147
TableDescriptor descriptor = TableDescriptor
148
.forConnector("kafka")
149
.schema(Schema.newBuilder()
150
.column("id", DataTypes.BIGINT())
151
.column("name", DataTypes.STRING())
152
.build())
153
.option("topic", "users")
154
.build();
155
156
tableEnv.createTemporaryTable("users", descriptor);
157
Table users = tableEnv.from("users");
158
159
// Create view from existing table
160
Table activeUsers = users.filter($("active").isEqual(true));
161
tableEnv.createTemporaryView("active_users", activeUsers);
162
```
163
164
### Catalog and Database Management
165
166
Manages catalog and database contexts for table operations.
167
168
```java { .api }
169
/**
170
* Sets the current catalog for table operations
171
* @param catalogName Name of the catalog to use
172
*/
173
void useCatalog(String catalogName);
174
175
/**
176
* Sets the current database within the current catalog
177
* @param databaseName Name of the database to use
178
*/
179
void useDatabase(String databaseName);
180
181
/**
182
* Lists all available catalogs
183
* @return Array of catalog names
184
*/
185
String[] listCatalogs();
186
187
/**
188
* Lists all databases in the current catalog
189
* @return Array of database names
190
*/
191
String[] listDatabases();
192
193
/**
194
* Lists all tables in the current database
195
* @return Array of table names
196
*/
197
String[] listTables();
198
199
/**
200
* Lists all user-defined functions in the current database
201
* @return Array of function names
202
*/
203
String[] listUserDefinedFunctions();
204
205
/**
206
* Lists all temporary tables in the current session
207
* @return Array of temporary table names
208
*/
209
String[] listTemporaryTables();
210
211
/**
212
* Lists all temporary views in the current session
213
* @return Array of temporary view names
214
*/
215
String[] listTemporaryViews();
216
```
217
218
**Usage Examples:**
219
220
```java
221
// Catalog and database navigation
222
tableEnv.useCatalog("my_catalog");
223
tableEnv.useDatabase("analytics");
224
225
// List available resources
226
String[] catalogs = tableEnv.listCatalogs();
227
String[] databases = tableEnv.listDatabases();
228
String[] tables = tableEnv.listTables();
229
230
// Current context information
231
String currentCatalog = tableEnv.getCurrentCatalog();
232
String currentDatabase = tableEnv.getCurrentDatabase();
233
```
234
235
### Table Creation from Values
236
237
Creates tables directly from values without external data sources.
238
239
```java { .api }
240
/**
241
* Creates a table from a list of rows with automatic schema inference
242
* @param rows List of Row objects containing the data
243
* @return Table containing the specified data
244
*/
245
Table fromValues(Row... rows);
246
247
/**
248
* Creates a table from a list of rows with automatic schema inference
249
* @param rows Collection of Row objects containing the data
250
* @return Table containing the specified data
251
*/
252
Table fromValues(Collection<Row> rows);
253
254
/**
255
* Creates a table from values with explicit schema
256
* @param rowDataType DataType defining the schema for the rows
257
* @param rows List of Row objects containing the data
258
* @return Table with specified schema and data
259
*/
260
Table fromValues(AbstractDataType<?> rowDataType, Row... rows);
261
262
/**
263
* Creates a table from values with explicit schema
264
* @param rowDataType DataType defining the schema for the rows
265
* @param rows Collection of Row objects containing the data
266
* @return Table with specified schema and data
267
*/
268
Table fromValues(AbstractDataType<?> rowDataType, Collection<Row> rows);
269
270
/**
271
* Creates a table from expression values
272
* @param expressions List of expressions representing row values
273
* @return Table containing the expression values
274
*/
275
Table fromValues(Expression... expressions);
276
277
/**
278
* Creates a table from expression values
279
* @param expressions Collection of expressions representing row values
280
* @return Table containing the expression values
281
*/
282
Table fromValues(Collection<Expression> expressions);
283
284
/**
285
* Creates a table from expression values with explicit data type
286
* @param rowDataType DataType defining the schema for the expressions
287
* @param expressions List of expressions representing row values
288
* @return Table with specified schema and expression values
289
*/
290
Table fromValues(AbstractDataType<?> rowDataType, Expression... expressions);
291
292
/**
293
* Creates a table from expression values with explicit data type
294
* @param rowDataType DataType defining the schema for the expressions
295
* @param expressions Collection of expressions representing row values
296
* @return Table with specified schema and expression values
297
*/
298
Table fromValues(AbstractDataType<?> rowDataType, Collection<Expression> expressions);
299
```
300
301
**Usage Examples:**
302
303
```java
304
import static org.apache.flink.table.api.Expressions.*;
305
306
// Create table from Row objects
307
Row row1 = Row.of(1L, "Alice", 25);
308
Row row2 = Row.of(2L, "Bob", 30);
309
Row row3 = Row.of(3L, "Charlie", 35);
310
311
Table fromRowsTable = tableEnv.fromValues(row1, row2, row3);
312
313
// Create table with explicit schema
314
AbstractDataType<?> rowType = DataTypes.ROW(
315
DataTypes.FIELD("id", DataTypes.BIGINT()),
316
DataTypes.FIELD("name", DataTypes.STRING()),
317
DataTypes.FIELD("age", DataTypes.INT())
318
);
319
320
Table typedTable = tableEnv.fromValues(rowType, row1, row2, row3);
321
322
// Create table from expressions
323
Table expressionTable = tableEnv.fromValues(
324
row(1, "Alice", 25),
325
row(2, "Bob", 30),
326
row(3, "Charlie", 35)
327
);
328
329
// Create table with explicit data type for expressions
330
Table typedExpressionTable = tableEnv.fromValues(
331
rowType,
332
row(1, "Alice", 25),
333
row(2, "Bob", 30),
334
row(3, "Charlie", 35)
335
);
336
337
// Use in complex queries
338
Table result = expressionTable
339
.filter($("age").isGreater(25))
340
.select($("name"), $("age"));
341
```
342
343
### Function Management
344
345
Manages user-defined functions registration and lifecycle.
346
347
```java { .api }
348
/**
349
* Registers a temporary system function that can be used in SQL and Table API
350
* @param name Function name for SQL usage
351
* @param function UserDefinedFunction implementation
352
*/
353
void createTemporarySystemFunction(String name, UserDefinedFunction function);
354
355
/**
356
* Creates a persistent function in the catalog
357
* @param path Function path in catalog.database.function format
358
* @param functionClass Class implementing the function
359
*/
360
void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);
361
362
/**
363
* Drops a temporary system function
364
* @param name Function name to drop
365
* @return true if function existed and was dropped
366
*/
367
boolean dropTemporarySystemFunction(String name);
368
369
/**
370
* Drops a persistent function from the catalog
371
* @param path Function path in catalog.database.function format
372
*/
373
void dropFunction(String path);
374
```
375
376
**Usage Examples:**
377
378
```java
379
// Register scalar function
380
ScalarFunction myUpper = new ScalarFunction() {
381
public String eval(String input) {
382
return input != null ? input.toUpperCase() : null;
383
}
384
};
385
tableEnv.createTemporarySystemFunction("my_upper", myUpper);
386
387
// Use in SQL
388
Table result = tableEnv.sqlQuery("SELECT my_upper(name) FROM users");
389
390
// Register aggregate function
391
tableEnv.createTemporarySystemFunction("my_avg", new MyAverageFunction());
392
393
// Create persistent function
394
tableEnv.createFunction("my_catalog.my_db.my_function", MyFunction.class);
395
```
396
397
### Configuration and Context
398
399
Manages table environment configuration and context information.
400
401
```java { .api }
402
/**
403
* Gets the table configuration for this environment
404
* @return TableConfig instance for accessing and modifying settings
405
*/
406
TableConfig getConfig();
407
408
/**
409
* Gets the current catalog name
410
* @return Current catalog name
411
*/
412
String getCurrentCatalog();
413
414
/**
415
* Gets the current database name
416
* @return Current database name
417
*/
418
String getCurrentDatabase();
419
```
420
421
**Usage Examples:**
422
423
```java
424
// Access configuration
425
TableConfig config = tableEnv.getConfig();
426
config.getConfiguration().setString("table.exec.mini-batch.enabled", "true");
427
428
// Get current context
429
String catalog = tableEnv.getCurrentCatalog();
430
String database = tableEnv.getCurrentDatabase();
431
```
432
433
### Statement Set Operations
434
435
Creates and manages statement sets for batch execution of multiple operations.
436
437
```java { .api }
438
/**
439
* Creates a StatementSet for batch execution of multiple statements
440
* @return StatementSet instance for adding multiple operations
441
*/
442
StatementSet createStatementSet();
443
```
444
445
**Usage Examples:**
446
447
```java
448
// Batch multiple operations
449
StatementSet stmtSet = tableEnv.createStatementSet();
450
stmtSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE type = 'A'");
451
stmtSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE type = 'B'");
452
stmtSet.addInsert("sink3", processedTable);
453
454
// Execute all statements together
455
TableResult result = stmtSet.execute();
456
```
457
458
## Types
459
460
### Environment Settings
461
462
```java { .api }
463
class EnvironmentSettings {
464
static Builder newInstance();
465
466
interface Builder {
467
Builder useBlinkPlanner();
468
Builder useAnyPlanner();
469
Builder inStreamingMode();
470
Builder inBatchMode();
471
Builder withConfiguration(Configuration configuration);
472
EnvironmentSettings build();
473
}
474
}
475
```
476
477
### Table Configuration
478
479
```java { .api }
480
class TableConfig {
481
Configuration getConfiguration();
482
void setSqlDialect(SqlDialect sqlDialect);
483
SqlDialect getSqlDialect();
484
void setLocalTimeZone(ZoneId zoneId);
485
ZoneId getLocalTimeZone();
486
}
487
```
488
489
### Statement Set
490
491
```java { .api }
492
interface StatementSet {
493
StatementSet addInsertSql(String statement);
494
StatementSet addInsert(String targetPath, Table table);
495
StatementSet add(ModifyOperation modifyOperation);
496
String explain();
497
TableResult execute();
498
}
499
```