0
# Stream Table Environment
1
2
The `StreamTableEnvironment` is the main entry point for creating and managing table environments that integrate with Flink's DataStream API. It provides factory methods for creating table environments and comprehensive methods for converting between DataStream and Table representations.
3
4
## Core Interface
5
6
```java { .api }
7
@PublicEvolving
8
public interface StreamTableEnvironment extends TableEnvironment {
9
// Factory methods
10
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
11
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
12
13
// DataStream to Table conversions
14
<T> Table fromDataStream(DataStream<T> dataStream);
15
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
16
Table fromChangelogStream(DataStream<Row> dataStream);
17
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
18
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
19
20
// Table to DataStream conversions
21
DataStream<Row> toDataStream(Table table);
22
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
23
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
24
DataStream<Row> toChangelogStream(Table table);
25
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
26
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
27
28
// Temporary view creation
29
<T> void createTemporaryView(String path, DataStream<T> dataStream);
30
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
31
32
// Statement set creation
33
StreamStatementSet createStatementSet();
34
}
35
```
36
37
## Factory Methods
38
39
### Creating Basic Environment
40
41
Create a `StreamTableEnvironment` from an existing `StreamExecutionEnvironment`:
42
43
```java
44
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
45
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
46
```
47
48
### Creating with Custom Settings
49
50
Create a `StreamTableEnvironment` with specific configuration settings:
51
52
```java
53
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
54
EnvironmentSettings settings = EnvironmentSettings.newInstance()
55
.inStreamingMode()
56
.build();
57
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
58
```
59
60
## DataStream to Table Conversion
61
62
### Basic Conversion
63
64
Convert a `DataStream` to a `Table` using automatic schema inference:
65
66
```java
67
DataStream<MyPojo> dataStream = env.fromElements(
68
new MyPojo("Alice", 25),
69
new MyPojo("Bob", 30)
70
);
71
Table table = tableEnv.fromDataStream(dataStream);
72
```
73
74
### Conversion with Custom Schema
75
76
Convert a `DataStream` to a `Table` with a custom schema definition:
77
78
```java
79
DataStream<Row> dataStream = env.fromElements(
80
Row.of("Alice", 25),
81
Row.of("Bob", 30)
82
);
83
84
Schema schema = Schema.newBuilder()
85
.column("name", DataTypes.STRING())
86
.column("age", DataTypes.INT())
87
.build();
88
89
Table table = tableEnv.fromDataStream(dataStream, schema);
90
```
91
92
### Changelog Stream Conversion
93
94
Convert a changelog `DataStream` to a `Table`:
95
96
```java
97
DataStream<Row> changelogStream = // ... source of changelog data
98
Table table = tableEnv.fromChangelogStream(changelogStream);
99
100
// With custom schema
101
Schema schema = Schema.newBuilder()
102
.column("id", DataTypes.BIGINT())
103
.column("name", DataTypes.STRING())
104
.column("value", DataTypes.DOUBLE())
105
.build();
106
107
Table tableWithSchema = tableEnv.fromChangelogStream(changelogStream, schema);
108
109
// With changelog mode specification
110
ChangelogMode changelogMode = ChangelogMode.insertOnly();
111
Table tableWithMode = tableEnv.fromChangelogStream(changelogStream, schema, changelogMode);
112
```
113
114
## Table to DataStream Conversion
115
116
### Basic Conversion to Row
117
118
Convert a `Table` to a `DataStream<Row>`:
119
120
```java
121
Table table = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 21");
122
DataStream<Row> resultStream = tableEnv.toDataStream(table);
123
```
124
125
### Conversion to Specific Type
126
127
Convert a `Table` to a `DataStream` of a specific class:
128
129
```java
130
Table table = tableEnv.sqlQuery("SELECT name, age FROM users");
131
DataStream<MyPojo> pojoStream = tableEnv.toDataStream(table, MyPojo.class);
132
```
133
134
### Conversion with Data Type
135
136
Convert a `Table` to a `DataStream` using a specific data type:
137
138
```java
139
Table table = tableEnv.sqlQuery("SELECT name, age FROM users");
140
AbstractDataType<?> dataType = DataTypes.ROW(
141
DataTypes.FIELD("name", DataTypes.STRING()),
142
DataTypes.FIELD("age", DataTypes.INT())
143
);
144
DataStream<Row> typedStream = tableEnv.toDataStream(table, dataType);
145
```
146
147
### Changelog Stream Conversion
148
149
Convert a `Table` to a changelog `DataStream`:
150
151
```java
152
Table table = tableEnv.sqlQuery("SELECT id, name, COUNT(*) as cnt FROM events GROUP BY id, name");
153
DataStream<Row> changelogStream = tableEnv.toChangelogStream(table);
154
155
// With target schema
156
Schema targetSchema = Schema.newBuilder()
157
.column("id", DataTypes.BIGINT())
158
.column("name", DataTypes.STRING())
159
.column("cnt", DataTypes.BIGINT())
160
.build();
161
162
DataStream<Row> changelogWithSchema = tableEnv.toChangelogStream(table, targetSchema);
163
164
// With changelog mode
165
ChangelogMode mode = ChangelogMode.upsert();
166
DataStream<Row> upsertChangelog = tableEnv.toChangelogStream(table, targetSchema, mode);
167
```
168
169
## Temporary View Creation
170
171
### Basic View Creation
172
173
Create a temporary view from a `DataStream`:
174
175
```java
176
DataStream<MyPojo> dataStream = env.fromElements(
177
new MyPojo("Alice", 25),
178
new MyPojo("Bob", 30)
179
);
180
181
tableEnv.createTemporaryView("users", dataStream);
182
183
// Now you can query the view
184
Table result = tableEnv.sqlQuery("SELECT * FROM users WHERE age > 25");
185
```
186
187
### View Creation with Schema
188
189
Create a temporary view with a custom schema:
190
191
```java
192
DataStream<Row> dataStream = env.fromElements(
193
Row.of("Alice", 25),
194
Row.of("Bob", 30)
195
);
196
197
Schema schema = Schema.newBuilder()
198
.column("name", DataTypes.STRING())
199
.column("age", DataTypes.INT())
200
.build();
201
202
tableEnv.createTemporaryView("users_with_schema", dataStream, schema);
203
Table result = tableEnv.sqlQuery("SELECT name FROM users_with_schema WHERE age > 25");
204
```
205
206
## Statement Set Creation
207
208
Create a `StreamStatementSet` for batch operations:
209
210
```java
211
StreamStatementSet statementSet = tableEnv.createStatementSet();
212
213
// Add multiple insert operations
214
statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE condition1");
215
statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE condition2");
216
217
// Execute all statements together
218
statementSet.attachAsDataStream();
219
```
220
221
## Deprecated Methods
222
223
The following methods are deprecated and should be avoided in new code:
224
225
- `registerFunction()` - Use `createFunction()` instead
226
- `fromDataStream(DataStream, String)` - Use `fromDataStream(DataStream, Schema)` instead
227
- `registerDataStream()` - Use `createTemporaryView()` instead
228
- `toAppendStream()` - Use `toDataStream()` instead
229
- `toRetractStream()` - Use `toChangelogStream()` instead
230
- `execute()` - Use `StreamExecutionEnvironment.execute()` instead
231
232
## Error Handling
233
234
Common exceptions that may be thrown:
235
236
- `ValidationException` - When schema validation fails during conversion
237
- `TableException` - When table operations fail
238
- `IllegalArgumentException` - When invalid parameters are provided
239
240
Always ensure proper error handling when working with conversions:
241
242
```java
243
try {
244
Table table = tableEnv.fromDataStream(dataStream, schema);
245
DataStream<Row> result = tableEnv.toDataStream(table);
246
} catch (ValidationException e) {
247
// Handle schema validation errors
248
log.error("Schema validation failed: " + e.getMessage());
249
} catch (TableException e) {
250
// Handle table operation errors
251
log.error("Table operation failed: " + e.getMessage());
252
}
253
```