0
# Stream Table Environment
1
2
The StreamTableEnvironment is the central entry point for creating Table and SQL API programs that integrate with Flink's DataStream API in Scala. It provides unified processing for both bounded and unbounded data streams.
3
4
## Capabilities
5
6
### Environment Creation
7
8
Factory methods for creating StreamTableEnvironment instances with default or custom settings.
9
10
```scala { .api }
11
object StreamTableEnvironment {
12
/**
13
* Creates a StreamTableEnvironment with default settings
14
* @param executionEnvironment The StreamExecutionEnvironment to use
15
* @return A new StreamTableEnvironment instance
16
*/
17
def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
18
19
/**
20
* Creates a StreamTableEnvironment with custom settings
21
* @param executionEnvironment The StreamExecutionEnvironment to use
22
* @param settings Custom environment settings
23
* @return A new StreamTableEnvironment instance
24
*/
25
def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment
26
}
27
```
28
29
**Usage Example:**
30
31
```scala
32
import org.apache.flink.streaming.api.scala._
33
import org.apache.flink.table.api._
34
import org.apache.flink.table.api.bridge.scala._
35
36
val env = StreamExecutionEnvironment.getExecutionEnvironment
37
val tableEnv = StreamTableEnvironment.create(env)
38
39
// With custom settings
40
val settings = EnvironmentSettings.newInstance()
41
.useBlinkPlanner()
42
.inStreamingMode()
43
.build()
44
val customTableEnv = StreamTableEnvironment.create(env, settings)
45
```
46
47
### DataStream to Table Conversion
48
49
Convert DataStreams to Tables with automatic schema derivation or custom schemas.
50
51
```scala { .api }
52
/**
53
* Convert DataStream to Table with auto-derived schema
54
* @param dataStream The DataStream to convert
55
* @return Table representation of the DataStream
56
*/
57
def fromDataStream[T](dataStream: DataStream[T]): Table
58
59
/**
60
* Convert DataStream to Table with custom schema
61
* @param dataStream The DataStream to convert
62
* @param schema Custom schema definition
63
* @return Table representation of the DataStream
64
*/
65
def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table
66
67
/**
68
* Convert changelog DataStream to Table
69
* @param dataStream Changelog DataStream with Row elements
70
* @return Table representation of the changelog stream
71
*/
72
def fromChangelogStream(dataStream: DataStream[Row]): Table
73
74
/**
75
* Convert changelog DataStream to Table with custom schema
76
* @param dataStream Changelog DataStream with Row elements
77
* @param schema Custom schema definition
78
* @return Table representation of the changelog stream
79
*/
80
def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table
81
82
/**
83
* Convert changelog DataStream to Table with custom schema and changelog mode
84
* @param dataStream Changelog DataStream with Row elements
85
* @param schema Custom schema definition
86
* @param changelogMode Changelog mode configuration
87
* @return Table representation of the changelog stream
88
*/
89
def fromChangelogStream(dataStream: DataStream[Row], schema: Schema, changelogMode: ChangelogMode): Table
90
```
91
92
**Usage Examples:**
93
94
```scala
95
// Auto-derived schema
96
val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))
97
val table = tableEnv.fromDataStream(dataStream)
98
99
// Custom schema
100
val schema = Schema.newBuilder()
101
.column("name", DataTypes.STRING())
102
.column("age", DataTypes.INT())
103
.build()
104
val tableWithSchema = tableEnv.fromDataStream(dataStream, schema)
105
106
// Changelog stream
107
val changelogStream = env.fromElements(
108
Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25)),
109
Row.of(RowKind.UPDATE_AFTER, "Alice", Integer.valueOf(26))
110
)
111
val changelogTable = tableEnv.fromChangelogStream(changelogStream)
112
```
113
114
### Table to DataStream Conversion
115
116
Convert Tables back to DataStreams with different output modes and type specifications.
117
118
```scala { .api }
119
/**
120
* Convert insert-only Table to DataStream of Row
121
* @param table The Table to convert
122
* @return DataStream containing Row elements
123
*/
124
def toDataStream(table: Table): DataStream[Row]
125
126
/**
127
* Convert insert-only Table to DataStream of specified class
128
* @param table The Table to convert
129
* @param targetClass Target class for the DataStream elements
130
* @return DataStream containing elements of the specified class
131
*/
132
def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]
133
134
/**
135
* Convert insert-only Table to DataStream of specified data type
136
* @param table The Table to convert
137
* @param targetDataType Target data type specification
138
* @return DataStream containing elements of the specified data type
139
*/
140
def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T]
141
142
/**
143
* Convert Table to changelog DataStream
144
* @param table The Table to convert
145
* @return DataStream containing Row elements with changelog information
146
*/
147
def toChangelogStream(table: Table): DataStream[Row]
148
149
/**
150
* Convert Table to changelog DataStream with custom schema
151
* @param table The Table to convert
152
* @param targetSchema Custom schema for the output stream
153
* @return DataStream containing Row elements with changelog information
154
*/
155
def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row]
156
157
/**
158
* Convert Table to changelog DataStream with custom schema and changelog mode
159
* @param table The Table to convert
160
* @param targetSchema Custom schema for the output stream
161
* @param changelogMode Changelog mode configuration
162
* @return DataStream containing Row elements with changelog information
163
*/
164
def toChangelogStream(table: Table, targetSchema: Schema, changelogMode: ChangelogMode): DataStream[Row]
165
```
166
167
**Usage Examples:**
168
169
```scala
170
// Convert to Row DataStream
171
val rowStream = tableEnv.toDataStream(table)
172
173
// Convert to typed DataStream
174
case class Person(name: String, age: Int)
175
val personStream = tableEnv.toDataStream(table, classOf[Person])
176
177
// Convert to changelog stream
178
val changelogStream = tableEnv.toChangelogStream(table)
179
```
180
181
### View Creation
182
183
Create temporary views from DataStreams for use in SQL queries.
184
185
```scala { .api }
186
/**
187
* Create temporary view from DataStream
188
* @param path The view path/name
189
* @param dataStream The DataStream to create view from
190
*/
191
def createTemporaryView[T](path: String, dataStream: DataStream[T]): Unit
192
193
/**
194
* Create temporary view from DataStream with custom schema
195
* @param path The view path/name
196
* @param dataStream The DataStream to create view from
197
* @param schema Custom schema definition
198
*/
199
def createTemporaryView[T](path: String, dataStream: DataStream[T], schema: Schema): Unit
200
```
201
202
**Usage Example:**
203
204
```scala
205
val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))
206
tableEnv.createTemporaryView("users", dataStream)
207
208
// Now you can query the view with SQL
209
val result = tableEnv.sqlQuery("SELECT * FROM users WHERE age > 25")
210
```
211
212
### Statement Set Creation
213
214
Create statement sets for batching multiple table operations.
215
216
```scala { .api }
217
/**
218
* Create statement set for batch operations
219
* @return A new StreamStatementSet instance
220
*/
221
def createStatementSet(): StreamStatementSet
222
```
223
224
**Usage Example:**
225
226
```scala
227
val statementSet = tableEnv.createStatementSet()
228
statementSet
229
.addInsert("sink_table_1", table1)
230
.addInsert("sink_table_2", table2)
231
.attachAsDataStream()
232
```
233
234
### Legacy Deprecated Methods
235
236
These methods are deprecated and should not be used in new code:
237
238
```scala { .api }
239
// Deprecated - use fromDataStream with Schema instead
240
@deprecated("Use fromDataStream with Schema", "1.18.0")
241
def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table
242
243
// Deprecated - use createTemporaryView with Schema instead
244
@deprecated("Use createTemporaryView with Schema", "1.18.0")
245
def createTemporaryView[T](path: String, dataStream: DataStream[T], fields: Expression*): Unit
246
247
// Deprecated - use toDataStream instead
248
@deprecated("Use toDataStream", "1.18.0")
249
def toAppendStream[T: TypeInformation](table: Table): DataStream[T]
250
251
// Deprecated - use toChangelogStream instead
252
@deprecated("Use toChangelogStream", "1.18.0")
253
def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)]
254
```