0
# Table Environment and Setup
1
2
The TableEnvironment is the main entry point and central context for all Table API operations. It handles the creation of tables, execution of SQL statements, catalog management, and configuration of the execution environment.
3
4
## Capabilities
5
6
### TableEnvironment Creation
7
8
Creates table environments configured for specific execution modes and settings.
9
10
```java { .api }
11
/**
12
* Creates a table environment based on the provided settings
13
* @param settings Configuration settings for the table environment
14
* @return Configured TableEnvironment instance
15
*/
16
public static TableEnvironment create(EnvironmentSettings settings);
17
```
18
19
**Usage Examples:**
20
21
```java
22
// Streaming mode environment
23
EnvironmentSettings streamSettings = EnvironmentSettings
24
.newInstance()
25
.inStreamingMode()
26
.build();
27
TableEnvironment streamEnv = TableEnvironment.create(streamSettings);
28
29
// Batch mode environment
30
EnvironmentSettings batchSettings = EnvironmentSettings
31
.newInstance()
32
.inBatchMode()
33
.build();
34
TableEnvironment batchEnv = TableEnvironment.create(batchSettings);
35
```
36
37
### Table Access and Creation
38
39
Methods for accessing existing tables and creating new table definitions.
40
41
```java { .api }
42
/**
43
* Gets a table from the catalog by path
44
* @param path Table path in format [[catalog.]database.]table
45
* @return Table instance for the specified path
46
*/
47
public Table from(String path);
48
49
/**
50
* Creates a table in the catalog with the specified descriptor
51
* @param path Table path where to create the table
52
* @param descriptor Table descriptor defining schema and properties
53
*/
54
public void createTable(String path, TableDescriptor descriptor);
55
56
/**
57
* Creates a temporary view from a Table object
58
* @param path View name/path
59
* @param view Table object to create view from
60
*/
61
public void createTemporaryView(String path, Table view);
62
63
/**
64
* Creates a temporary table from a descriptor
65
* @param path Table path
66
* @param descriptor Table descriptor
67
*/
68
public void createTemporaryTable(String path, TableDescriptor descriptor);
69
```
70
71
**Usage Examples:**
72
73
```java
74
// Access existing table
75
Table orders = tableEnv.from("catalog1.database1.orders");
76
Table customers = tableEnv.from("customers"); // uses current catalog/database
77
78
// Create table with descriptor
79
TableDescriptor descriptor = TableDescriptor
80
.forConnector("filesystem")
81
.schema(Schema.newBuilder()
82
.column("id", DataTypes.BIGINT())
83
.column("name", DataTypes.STRING())
84
.column("price", DataTypes.DECIMAL(10, 2))
85
.build())
86
.option("path", "/path/to/data")
87
.option("format", "csv")
88
.build();
89
90
tableEnv.createTable("my_table", descriptor);
91
92
// Create temporary view
93
Table filteredOrders = orders.filter($("amount").isGreater(100));
94
tableEnv.createTemporaryView("large_orders", filteredOrders);
95
```
96
97
### SQL Execution
98
99
Execute SQL statements and queries with full SQL support.
100
101
```java { .api }
102
/**
103
* Executes a SQL query and returns the result as a Table
104
* @param query SQL SELECT query
105
* @return Table containing query results
106
*/
107
public Table sqlQuery(String query);
108
109
/**
110
* Executes a SQL statement (DDL, DML, or query)
111
* @param statement SQL statement to execute
112
* @return TableResult containing execution results
113
*/
114
public TableResult executeSql(String statement);
115
```
116
117
**Usage Examples:**
118
119
```java
120
// SQL query
121
Table result = tableEnv.sqlQuery(
122
"SELECT customer_id, SUM(amount) as total " +
123
"FROM orders " +
124
"WHERE order_date >= '2023-01-01' " +
125
"GROUP BY customer_id"
126
);
127
128
// SQL DDL statement
129
tableEnv.executeSql(
130
"CREATE TABLE user_behavior (" +
131
" user_id BIGINT," +
132
" item_id BIGINT," +
133
" category_id BIGINT," +
134
" behavior STRING," +
135
" ts TIMESTAMP(3)" +
136
") WITH (" +
137
" 'connector' = 'kafka'," +
138
" 'topic' = 'user_behavior'," +
139
" 'properties.bootstrap.servers' = 'localhost:9092'" +
140
")"
141
);
142
143
// SQL DML statement
144
tableEnv.executeSql(
145
"INSERT INTO result_table " +
146
"SELECT user_id, COUNT(*) as behavior_cnt " +
147
"FROM user_behavior " +
148
"GROUP BY user_id"
149
);
150
```
151
152
### Catalog Operations
153
154
Manage catalogs, databases, and metadata contexts.
155
156
```java { .api }
157
/**
158
* Register a catalog with the given name
159
* @param catalogName Name to register the catalog under
160
* @param catalog Catalog instance to register
161
*/
162
public void registerCatalog(String catalogName, Catalog catalog);
163
164
/**
165
* Set the current catalog for resolving unqualified table references
166
* @param catalogName Name of catalog to use as current
167
*/
168
public void useCatalog(String catalogName);
169
170
/**
171
* Set the current database within the current catalog
172
* @param databaseName Name of database to use as current
173
*/
174
public void useDatabase(String databaseName);
175
176
/**
177
* Get the current catalog name
178
* @return Name of the current catalog
179
*/
180
public String getCurrentCatalog();
181
182
/**
183
* Get the current database name
184
* @return Name of the current database
185
*/
186
public String getCurrentDatabase();
187
188
/**
189
* List all registered catalog names
190
* @return Array of catalog names
191
*/
192
public String[] listCatalogs();
193
194
/**
195
* List all databases in the current catalog
196
* @return Array of database names
197
*/
198
public String[] listDatabases();
199
200
/**
201
* List all tables in the current database
202
* @return Array of table names
203
*/
204
public String[] listTables();
205
```
206
207
### Function Registration
208
209
Register user-defined functions for use in Table API and SQL.
210
211
```java { .api }
212
/**
213
* Register a scalar function under the given name
214
* @param name Function name to register under
215
* @param function ScalarFunction instance
216
*/
217
public void registerFunction(String name, ScalarFunction function);
218
219
/**
220
* Create a temporary system function from a class name
221
* @param name Function name
222
* @param functionClass Function class name
223
*/
224
public void createTemporarySystemFunction(String name, String functionClass);
225
226
/**
227
* Create a temporary function from a function instance
228
* @param path Function path/name
229
* @param function Function instance
230
*/
231
public void createTemporaryFunction(String path, UserDefinedFunction function);
232
```
233
234
### Configuration Management
235
236
Access and modify table environment configuration.
237
238
```java { .api }
239
/**
240
* Get the table configuration for this environment
241
* @return TableConfig instance
242
*/
243
public TableConfig getConfig();
244
```
245
246
### Statement Sets
247
248
Create statement sets for batching multiple DML operations.
249
250
```java { .api }
251
/**
252
* Create a StatementSet for executing multiple statements together
253
* @return StatementSet instance
254
*/
255
public StatementSet createStatementSet();
256
```
257
258
## Environment Settings
259
260
Configuration class for creating table environments with specific settings.
261
262
```java { .api }
263
/**
264
* Creates a new EnvironmentSettings builder
265
* @return Builder for configuring environment settings
266
*/
267
public static EnvironmentSettings.Builder newInstance();
268
269
public static class Builder {
270
/**
271
* Sets the planner to Blink planner (default and only option in newer versions)
272
* @return Builder instance for chaining
273
*/
274
public Builder useBlinkPlanner();
275
276
/**
277
* Configures environment for streaming mode execution
278
* @return Builder instance for chaining
279
*/
280
public Builder inStreamingMode();
281
282
/**
283
* Configures environment for batch mode execution
284
* @return Builder instance for chaining
285
*/
286
public Builder inBatchMode();
287
288
/**
289
* Sets a custom class loader for the environment
290
* @param classLoader Custom class loader
291
* @return Builder instance for chaining
292
*/
293
public Builder withClassLoader(ClassLoader classLoader);
294
295
/**
296
* Builds the EnvironmentSettings with configured options
297
* @return EnvironmentSettings instance
298
*/
299
public EnvironmentSettings build();
300
}
301
```
302
303
**Usage Examples:**
304
305
```java
306
// Default streaming environment
307
EnvironmentSettings settings = EnvironmentSettings
308
.newInstance()
309
.inStreamingMode()
310
.build();
311
312
// Batch environment with custom class loader
313
ClassLoader customClassLoader = // ... custom class loader
314
EnvironmentSettings batchSettings = EnvironmentSettings
315
.newInstance()
316
.inBatchMode()
317
.withClassLoader(customClassLoader)
318
.build();
319
```
320
321
## Configuration Types
322
323
```java { .api }
324
public final class TableConfig implements WritableConfig, ReadableConfig {
325
/**
326
* Get the underlying configuration object
327
* @return Configuration instance
328
*/
329
public Configuration getConfiguration();
330
331
/**
332
* Set the SQL dialect for parsing SQL statements
333
* @param dialect SQL dialect to use
334
*/
335
public void setSqlDialect(SqlDialect dialect);
336
337
/**
338
* Set the parallelism for table operations
339
* @param parallelism Parallelism level
340
*/
341
public void setParallelism(int parallelism);
342
343
/**
344
* Get the current parallelism setting
345
* @return Current parallelism level
346
*/
347
public int getParallelism();
348
}
349
350
public enum SqlDialect {
351
/** Default Flink SQL dialect */
352
DEFAULT,
353
/** Hive-compatible SQL dialect */
354
HIVE
355
}
356
```