0
# Table Environment
1
2
The StreamTableEnvironment is the central entry point for integrating Table/SQL API with DataStream API. It provides factory methods, configuration options, and lifecycle management for table-stream operations.
3
4
## Capabilities
5
6
### Environment Creation
7
8
Creates table environments with StreamExecutionEnvironment integration.
9
10
```java { .api }
11
/**
12
* Creates a table environment with default settings
13
* @param executionEnvironment The StreamExecutionEnvironment to integrate with
14
* @return StreamTableEnvironment instance
15
*/
16
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
17
18
/**
19
* Creates a table environment with custom settings
20
* @param executionEnvironment The StreamExecutionEnvironment to integrate with
21
* @param settings Custom environment settings for configuration
22
* @return StreamTableEnvironment instance
23
*/
24
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
25
```
26
27
**Usage Examples:**
28
29
```java
30
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
31
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
32
import org.apache.flink.table.api.EnvironmentSettings;
33
34
// Basic environment creation
35
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
36
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
37
38
// Environment with custom settings
39
EnvironmentSettings settings = EnvironmentSettings.newInstance()
40
.inStreamingMode()
41
.useBlinkPlanner()
42
.build();
43
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
44
```
45
46
### View Management
47
48
Creates temporary views from DataStream sources that can be referenced in SQL queries.
49
50
```java { .api }
51
/**
52
* Creates a temporary view from DataStream with automatic schema derivation
53
* @param path The catalog path for the view
54
* @param dataStream The DataStream to create view from
55
*/
56
<T> void createTemporaryView(String path, DataStream<T> dataStream);
57
58
/**
59
* Creates a temporary view from DataStream with custom schema
60
* @param path The catalog path for the view
61
* @param dataStream The DataStream to create view from
62
* @param schema Custom schema definition for the view
63
*/
64
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
65
```
66
67
**Usage Examples:**
68
69
```java
70
import org.apache.flink.streaming.api.datastream.DataStream;
71
import org.apache.flink.table.api.Schema;
72
import org.apache.flink.table.api.DataTypes;
73
import org.apache.flink.types.Row;
74
75
// Create view with automatic schema
76
DataStream<Row> orderStream = // ... your data stream
77
tableEnv.createTemporaryView("orders", orderStream);
78
79
// Query the view
80
Table result = tableEnv.sqlQuery("SELECT * FROM orders WHERE amount > 100");
81
82
// Create view with custom schema
83
Schema customSchema = Schema.newBuilder()
84
.column("order_id", DataTypes.BIGINT())
85
.column("customer_name", DataTypes.STRING())
86
.column("amount", DataTypes.DECIMAL(10, 2))
87
.column("order_time", DataTypes.TIMESTAMP_LTZ(3))
88
.build();
89
90
tableEnv.createTemporaryView("custom_orders", orderStream, customSchema);
91
```
92
93
### Function Registration (Deprecated)
94
95
Legacy methods for registering user-defined functions. These are deprecated in favor of createTemporarySystemFunction.
96
97
```java { .api }
98
/**
99
* @deprecated Use createTemporarySystemFunction instead
100
*/
101
@Deprecated
102
<T> void registerFunction(String name, TableFunction<T> tableFunction);
103
104
/**
105
* @deprecated Use createTemporarySystemFunction instead
106
*/
107
@Deprecated
108
<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);
109
110
/**
111
* @deprecated Use createTemporarySystemFunction instead
112
*/
113
@Deprecated
114
<T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction);
115
```
116
117
### Statement Set Creation
118
119
Creates StreamStatementSet for batch execution of multiple operations.
120
121
```java { .api }
122
/**
123
* Creates a statement set for batch execution of table operations
124
* @return StreamStatementSet for adding multiple operations
125
*/
126
StreamStatementSet createStatementSet();
127
```
128
129
**Usage Examples:**
130
131
```java
132
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
133
134
// Create statement set for batch operations
135
StreamStatementSet statementSet = tableEnv.createStatementSet();
136
137
// Add multiple operations
138
statementSet.addInsert("sink_table1", table1);
139
statementSet.addInsert("sink_table2", table2);
140
statementSet.addInsertSql("INSERT INTO sink_table3 SELECT * FROM source_table");
141
142
// Execute all operations together
143
statementSet.execute();
144
```
145
146
## Type Definitions
147
148
### Environment Settings
149
150
```java { .api }
151
import org.apache.flink.table.api.EnvironmentSettings;
152
153
// Environment settings builder
154
EnvironmentSettings settings = EnvironmentSettings.newInstance()
155
.inStreamingMode() // or .inBatchMode()
156
.useBlinkPlanner() // Default planner
157
.build();
158
```
159
160
### Catalog Paths
161
162
View paths follow Flink's three-part naming convention:
163
164
- **Simple name**: `"my_view"` (uses default catalog and database)
165
- **Database qualified**: `"my_db.my_view"` (uses default catalog)
166
- **Fully qualified**: `"my_catalog.my_db.my_view"`
167
168
### Schema Definition
169
170
```java { .api }
171
import org.apache.flink.table.api.Schema;
172
import org.apache.flink.table.api.DataTypes;
173
174
// Schema building patterns
175
Schema schema = Schema.newBuilder()
176
.column("physical_col", DataTypes.STRING())
177
.columnByExpression("computed_col", "UPPER(physical_col)")
178
.columnByMetadata("metadata_col", DataTypes.TIMESTAMP_LTZ(3))
179
.watermark("event_time", "event_time - INTERVAL '5' SECOND")
180
.primaryKey("id")
181
.build();
182
```