0
# Table Environment
1
2
The TableEnvironment is the main entry point of the Table API, providing methods for registering tables, executing SQL queries, and configuring the table program.
3
4
## Capabilities
5
6
### Environment Creation
7
8
Factory methods for creating batch and streaming table environments.
9
10
```scala { .api }
11
object TableEnvironment {
12
/**
13
* Creates a batch table environment with an ExecutionEnvironment
14
* @param executionEnvironment The batch execution environment
15
* @returns BatchTableEnvironment for batch processing
16
*/
17
def getTableEnvironment(executionEnvironment: ExecutionEnvironment): BatchTableEnvironment
18
19
/**
20
* Creates a streaming table environment with a StreamExecutionEnvironment
21
* @param executionEnvironment The stream execution environment
22
* @returns StreamTableEnvironment for stream processing
23
*/
24
def getTableEnvironment(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
25
}
26
```
27
28
**Usage Examples:**
29
30
```scala
31
import org.apache.flink.api.scala._
32
import org.apache.flink.streaming.api.scala._
33
import org.apache.flink.table.api.scala._
34
35
// Batch environment
36
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
37
val batchTableEnv = TableEnvironment.getTableEnvironment(batchEnv)
38
39
// Streaming environment
40
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
41
val streamTableEnv = TableEnvironment.getTableEnvironment(streamEnv)
42
```
43
44
### Table Registration and Management
45
46
Register and manage tables within the table environment.
47
48
```scala { .api }
49
/**
50
* Registers a table in the catalog for later reference
51
* @param name Name to register the table under
52
* @param table Table instance to register
53
*/
54
def registerTable(name: String, table: Table): Unit
55
56
/**
57
* Registers a table source in the catalog
58
* @param name Name to register the source under
59
* @param tableSource The table source to register
60
*/
61
def registerTableSource(name: String, tableSource: TableSource[_]): Unit
62
63
/**
64
* Registers a table sink in the catalog
65
* @param name Name to register the sink under
66
* @param fieldNames Field names of the sink
67
* @param fieldTypes Field types of the sink
68
* @param tableSink The table sink to register
69
*/
70
def registerTableSink(
71
name: String,
72
fieldNames: Array[String],
73
fieldTypes: Array[TypeInformation[_]],
74
tableSink: TableSink[_]
75
): Unit
76
77
/**
78
* Scans a registered table by name
79
* @param tablePath Table name or path components
80
* @returns Table instance for the registered table
81
*/
82
def scan(tablePath: String*): Table
83
84
/**
85
* Creates a table from a table source
86
* @param source The table source to create table from
87
* @returns Table instance wrapping the source
88
*/
89
def fromTableSource(source: TableSource[_]): Table
90
91
/**
92
* Lists all registered tables in the environment
93
* @returns Array of table names
94
*/
95
def listTables(): Array[String]
96
```
97
98
**Usage Examples:**
99
100
```scala
101
// Register a table
102
val dataSet = env.fromElements((1, "Alice"), (2, "Bob"))
103
val table = dataSet.toTable(tEnv, 'id, 'name)
104
tEnv.registerTable("Users", table)
105
106
// Scan registered table
107
val users = tEnv.scan("Users")
108
109
// Register and use table source
110
val csvSource = new CsvTableSource("/path/to/file.csv", Array("id", "name"), Array(Types.INT, Types.STRING))
111
tEnv.registerTableSource("CsvUsers", csvSource)
112
val csvTable = tEnv.scan("CsvUsers")
113
```
114
115
### SQL Execution
116
117
Execute SQL queries and statements directly on the table environment.
118
119
```scala { .api }
120
/**
121
* Executes a SQL query and returns the result as a Table
122
* @param query The SQL query string
123
* @returns Table containing query results
124
*/
125
def sqlQuery(query: String): Table
126
127
/**
128
* Executes a SQL statement (DDL/DML)
129
* @param stmt The SQL statement string
130
*/
131
def sqlUpdate(stmt: String): Unit
132
```
133
134
**Usage Examples:**
135
136
```scala
137
// SQL queries
138
val result = tEnv.sqlQuery("SELECT id, name FROM Users WHERE id > 1")
139
val aggregated = tEnv.sqlQuery("SELECT COUNT(*) as user_count FROM Users")
140
141
// SQL DDL/DML
142
tEnv.sqlUpdate("CREATE TABLE OutputTable (id INT, name STRING)")
143
tEnv.sqlUpdate("INSERT INTO OutputTable SELECT * FROM Users")
144
```
145
146
### Function Registration
147
148
Register user-defined functions for use in queries.
149
150
```scala { .api }
151
/**
152
* Registers a scalar function
153
* @param name Function name for SQL usage
154
* @param function ScalarFunction implementation
155
*/
156
def registerFunction(name: String, function: ScalarFunction): Unit
157
158
/**
159
* Lists all registered user-defined functions
160
* @returns Array of function names
161
*/
162
def listUserDefinedFunctions(): Array[String]
163
```
164
165
**Usage Examples:**
166
167
```scala
168
// Register custom function
169
class AddOne extends ScalarFunction {
170
def eval(x: Int): Int = x + 1
171
}
172
173
tEnv.registerFunction("addOne", new AddOne())
174
175
// Use in SQL
176
val result = tEnv.sqlQuery("SELECT addOne(id) FROM Users")
177
```
178
179
### External Catalog Management
180
181
Register external catalogs for accessing external metadata stores.
182
183
```scala { .api }
184
/**
185
* Registers an external catalog
186
* @param name Catalog name
187
* @param externalCatalog External catalog implementation
188
*/
189
def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit
190
191
/**
192
* Gets the default catalog
193
* @returns The default external catalog
194
*/
195
def getDefaultCatalog: ExternalCatalog
196
```
197
198
### Configuration and Utilities
199
200
Access configuration and utility methods.
201
202
```scala { .api }
203
/**
204
* Gets the table configuration
205
* @returns TableConfig instance
206
*/
207
def getConfig: TableConfig
208
209
/**
210
* Explains the execution plan for a table
211
* @param table Table to explain
212
* @returns String representation of execution plan
213
*/
214
def explain(table: Table): String
215
```
216
217
**Usage Examples:**
218
219
```scala
220
// Configure timezone
221
val config = tEnv.getConfig
222
config.setTimeZone(TimeZone.getTimeZone("UTC"))
223
224
// Explain query plan
225
val table = tEnv.scan("Users").select('name)
226
println(tEnv.explain(table))
227
```
228
229
## Specialized Environments
230
231
### BatchTableEnvironment
232
233
Batch-specific table environment with DataSet conversion capabilities.
234
235
```scala { .api }
236
abstract class BatchTableEnvironment extends TableEnvironment {
237
/**
238
* Converts a Table to a DataSet
239
* @param table Table to convert
240
* @returns DataSet of Row
241
*/
242
def toDataSet[T](table: Table): DataSet[T]
243
244
/**
245
* Converts a Table to a DataSet with specific type
246
* @param table Table to convert
247
* @param typeInfo Type information for conversion
248
* @returns Typed DataSet
249
*/
250
def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T]
251
}
252
```
253
254
### StreamTableEnvironment
255
256
Stream-specific table environment with DataStream conversion capabilities.
257
258
```scala { .api }
259
abstract class StreamTableEnvironment extends TableEnvironment {
260
/**
261
* Converts a Table to an append-only DataStream
262
* @param table Table to convert
263
* @returns DataStream of Row
264
*/
265
def toAppendStream[T](table: Table): DataStream[T]
266
267
/**
268
* Converts a Table to a retract DataStream
269
* @param table Table to convert
270
* @returns DataStream of (Boolean, T) where Boolean indicates add/retract
271
*/
272
def toRetractStream[T](table: Table): DataStream[(Boolean, T)]
273
}
274
```
275
276
**Usage Examples:**
277
278
```scala
279
// Batch conversion
280
val batchResult: DataSet[Row] = batchTableEnv.toDataSet(table)
281
282
// Stream conversions
283
val appendStream: DataStream[Row] = streamTableEnv.toAppendStream(table)
284
val retractStream: DataStream[(Boolean, Row)] = streamTableEnv.toRetractStream(aggregatedTable)
285
```
286
287
## Types
288
289
```scala { .api }
290
abstract class TableEnvironment
291
abstract class BatchTableEnvironment extends TableEnvironment
292
abstract class StreamTableEnvironment extends TableEnvironment
293
294
trait ExternalCatalog {
295
/**
296
* Gets a table from the external catalog
297
* @param tablePath Table path components
298
* @returns Table from the external catalog
299
*/
300
def getTable(tablePath: String*): Table
301
302
/**
303
* Lists all tables in the external catalog
304
* @returns List of table names
305
*/
306
def listTables(): java.util.List[String]
307
308
/**
309
* Gets a database from the external catalog
310
* @param databaseName Name of the database
311
* @returns External catalog database
312
*/
313
def getDatabase(databaseName: String): ExternalCatalogDatabase
314
}
315
316
trait ExternalCatalogDatabase {
317
/**
318
* Gets a table from the database
319
* @param tableName Name of the table
320
* @returns Table from the database
321
*/
322
def getTable(tableName: String): Table
323
324
/**
325
* Lists all tables in the database
326
* @returns List of table names in the database
327
*/
328
def listTables(): java.util.List[String]
329
}
330
```