0
# Stream Table Environment
1
2
The StreamTableEnvironment is the entry point and central context for creating Table and SQL API programs that integrate with the Java DataStream API. It provides unified processing for both bounded and unbounded data.
3
4
## Capabilities
5
6
### Environment Creation
7
8
Factory methods for creating StreamTableEnvironment instances with optional configuration.
9
10
```java { .api }
11
/**
12
* Creates a table environment with default settings
13
* @param executionEnvironment The StreamExecutionEnvironment for the TableEnvironment
14
* @return StreamTableEnvironment instance
15
*/
16
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
17
18
/**
19
* Creates a table environment with custom settings
20
* @param executionEnvironment The StreamExecutionEnvironment for the TableEnvironment
21
* @param settings The EnvironmentSettings for configuration
22
* @return StreamTableEnvironment instance
23
*/
24
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
25
```
26
27
**Usage Examples:**
28
29
```java
30
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
31
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
32
import org.apache.flink.table.api.EnvironmentSettings;
33
34
// Basic environment creation
35
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
36
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
37
38
// Environment with custom settings
39
EnvironmentSettings settings = EnvironmentSettings.newInstance()
40
.inStreamingMode()
41
.build();
42
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
43
```
44
45
### DataStream to Table Conversion
46
47
Convert DataStreams into Tables with optional schema customization.
48
49
```java { .api }
50
/**
51
* Converts DataStream to Table with automatic schema derivation
52
* @param dataStream The DataStream to convert
53
* @return Table with derived schema
54
*/
55
<T> Table fromDataStream(DataStream<T> dataStream);
56
57
/**
58
* Converts DataStream to Table with custom schema
59
* @param dataStream The DataStream to convert
60
* @param schema Custom schema for the resulting table
61
* @return Table with specified schema
62
*/
63
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
64
```
65
66
**Usage Examples:**
67
68
```java
69
import org.apache.flink.streaming.api.datastream.DataStream;
70
import org.apache.flink.table.api.Table;
71
import org.apache.flink.table.api.Schema;
72
import org.apache.flink.types.Row;
73
74
// Automatic schema derivation
75
DataStream<Row> dataStream = env.fromElements(
76
Row.of("Alice", 25),
77
Row.of("Bob", 30)
78
);
79
Table table = tableEnv.fromDataStream(dataStream);
80
81
// Custom schema with renamed columns
82
Schema schema = Schema.newBuilder()
83
.column("name", "STRING")
84
.column("age", "INT")
85
.build();
86
Table customTable = tableEnv.fromDataStream(dataStream, schema);
87
88
// Schema with computed columns and watermarks
89
Schema advancedSchema = Schema.newBuilder()
90
.column("f0", "STRING")
91
.column("f1", "INT")
92
.columnByExpression("upper_name", "UPPER(f0)")
93
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
94
.watermark("rowtime", "SOURCE_WATERMARK()")
95
.build();
96
Table advancedTable = tableEnv.fromDataStream(dataStream, advancedSchema);
97
```
98
99
### Table to DataStream Conversion
100
101
Convert Tables back to DataStreams with type safety and optional type specification.
102
103
```java { .api }
104
/**
105
* Converts Table to DataStream<Row> for insert-only tables
106
* @param table The Table to convert (must be insert-only)
107
* @return DataStream of Row objects
108
*/
109
DataStream<Row> toDataStream(Table table);
110
111
/**
112
* Converts Table to typed DataStream for insert-only tables
113
* @param table The Table to convert (must be insert-only)
114
* @param targetClass Target class for type conversion
115
* @return Typed DataStream
116
*/
117
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
118
119
/**
120
* Converts Table to DataStream with specific data type
121
* @param table The Table to convert (must be insert-only)
122
* @param targetDataType Target data type specification
123
* @return Typed DataStream with specified data type
124
*/
125
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
126
```
127
128
**Usage Examples:**
129
130
```java
131
import org.apache.flink.streaming.api.datastream.DataStream;
132
import org.apache.flink.table.api.Table;
133
import org.apache.flink.table.api.DataTypes;
134
import org.apache.flink.types.Row;
135
136
// Convert to Row DataStream
137
Table sourceTable = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 21");
138
DataStream<Row> resultStream = tableEnv.toDataStream(sourceTable);
139
140
// Convert to POJO DataStream
141
public static class User {
142
public String name;
143
public Integer age;
144
// constructors, getters, setters...
145
}
146
147
DataStream<User> userStream = tableEnv.toDataStream(sourceTable, User.class);
148
149
// Convert with specific data type
150
DataStream<User> typedUserStream = tableEnv.toDataStream(
151
sourceTable,
152
DataTypes.of(User.class)
153
);
154
```
155
156
### Temporary View Creation
157
158
Create temporary views from DataStreams for SQL querying.
159
160
```java { .api }
161
/**
162
* Creates temporary view from DataStream with automatic schema
163
* @param path View path (catalog.database.view or database.view or view)
164
* @param dataStream The DataStream to create view from
165
*/
166
<T> void createTemporaryView(String path, DataStream<T> dataStream);
167
168
/**
169
* Creates temporary view from DataStream with custom schema
170
* @param path View path (catalog.database.view or database.view or view)
171
* @param dataStream The DataStream to create view from
172
* @param schema Custom schema for the view
173
*/
174
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
175
```
176
177
**Usage Examples:**
178
179
```java
180
// Create view with automatic schema
181
DataStream<Row> orderStream = env.fromElements(
182
Row.of("order1", "user1", 100.0),
183
Row.of("order2", "user2", 250.0)
184
);
185
tableEnv.createTemporaryView("orders", orderStream);
186
187
// Query the view with SQL
188
Table result = tableEnv.sqlQuery("SELECT * FROM orders WHERE f2 > 150");
189
190
// Create view with custom schema
191
Schema orderSchema = Schema.newBuilder()
192
.column("order_id", "STRING")
193
.column("user_id", "STRING")
194
.column("amount", "DOUBLE")
195
.build();
196
tableEnv.createTemporaryView("orders_named", orderStream, orderSchema);
197
198
// Query with named columns
199
Table namedResult = tableEnv.sqlQuery("SELECT user_id, amount FROM orders_named WHERE amount > 150");
200
```
201
202
### Statement Set Creation
203
204
Create StreamStatementSet for batch execution optimization.
205
206
```java { .api }
207
/**
208
* Creates StreamStatementSet for batch execution of multiple statements
209
* @return StreamStatementSet for adding multiple operations
210
*/
211
StreamStatementSet createStatementSet();
212
```
213
214
**Usage Examples:**
215
216
```java
217
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
218
219
// Create statement set for batch execution
220
StreamStatementSet statementSet = tableEnv.createStatementSet();
221
222
// Add multiple operations
223
statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE type = 'A'");
224
statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE type = 'B'");
225
statementSet.addInsert("sink3", tableEnv.sqlQuery("SELECT COUNT(*) FROM source"));
226
227
// Execute all statements together
228
statementSet.execute();
229
```
230
231
## Types
232
233
### Schema Configuration
234
235
```java { .api }
236
import org.apache.flink.table.api.Schema;
237
import org.apache.flink.table.api.EnvironmentSettings;
238
import org.apache.flink.table.expressions.Expression;
239
```
240
241
### DataStream Integration
242
243
```java { .api }
244
import org.apache.flink.streaming.api.datastream.DataStream;
245
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
246
import org.apache.flink.api.common.typeinfo.TypeInformation;
247
import org.apache.flink.table.types.AbstractDataType;
248
import org.apache.flink.types.Row;
249
```
250
251
### Deprecated Methods
252
253
The following methods are deprecated and should be replaced with newer Schema-based alternatives:
254
255
```java { .api }
256
// Deprecated - use fromDataStream(DataStream<T>, Schema) instead
257
@Deprecated
258
<T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);
259
260
// Deprecated - use createTemporaryView(String, DataStream<T>, Schema) instead
261
@Deprecated
262
<T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields);
263
264
// Deprecated - use toDataStream(Table, Class<T>) instead
265
@Deprecated
266
<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);
267
268
// Deprecated - use toDataStream(Table, Class<T>) instead
269
@Deprecated
270
<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);
271
272
// Deprecated - use toChangelogStream(Table, Schema) instead
273
@Deprecated
274
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);
275
276
// Deprecated - use toChangelogStream(Table, Schema) instead
277
@Deprecated
278
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo);
279
```