0
# Statement Sets
1
2
The StreamStatementSet provides functionality for batching multiple table operations together for optimized execution and resource management. It extends the base StatementSet with streaming-specific capabilities.
3
4
## Capabilities
5
6
### Statement Set Creation
7
8
Create a statement set from a StreamTableEnvironment for batching operations.
9
10
```scala { .api }
11
// Created via StreamTableEnvironment
12
val statementSet = tableEnv.createStatementSet()
13
```
14
15
### Table Pipeline Management
16
17
Add table processing pipelines to the statement set for batch execution.
18
19
```scala { .api }
20
/**
21
* Add table pipeline to statement set
22
* @param tablePipeline The table pipeline to add
23
* @return This StreamStatementSet for method chaining
24
*/
25
def add(tablePipeline: TablePipeline): StreamStatementSet
26
```
27
28
**Usage Example:**
29
30
```scala
31
import org.apache.flink.table.api._
32
33
val statementSet = tableEnv.createStatementSet()
34
35
// Create table pipelines
36
val pipeline1 = tableEnv.from("source_table_1").insertInto("sink_table_1")
37
val pipeline2 = tableEnv.from("source_table_2").insertInto("sink_table_2")
38
39
statementSet
40
.add(pipeline1)
41
.add(pipeline2)
42
.attachAsDataStream()
43
```
44
45
### SQL Statement Management
46
47
Add SQL insert statements to the statement set.
48
49
```scala { .api }
50
/**
51
* Add insert SQL statement to statement set
52
* @param statement The SQL insert statement
53
* @return This StreamStatementSet for method chaining
54
*/
55
def addInsertSql(statement: String): StreamStatementSet
56
```
57
58
**Usage Example:**
59
60
```scala
61
val statementSet = tableEnv.createStatementSet()
62
63
statementSet
64
.addInsertSql("INSERT INTO sink_table_1 SELECT * FROM source_table_1 WHERE age > 25")
65
.addInsertSql("INSERT INTO sink_table_2 SELECT name, COUNT(*) FROM source_table_2 GROUP BY name")
66
.attachAsDataStream()
67
```
68
69
### Table Insert Operations
70
71
Add table insert operations with various configuration options.
72
73
```scala { .api }
74
/**
75
* Add insert operation with target path
76
* @param targetPath The target table path
77
* @param table The table to insert
78
* @return This StreamStatementSet for method chaining
79
*/
80
def addInsert(targetPath: String, table: Table): StreamStatementSet
81
82
/**
83
* Add insert operation with target path and overwrite option
84
* @param targetPath The target table path
85
* @param table The table to insert
86
* @param overwrite Whether to overwrite existing data
87
* @return This StreamStatementSet for method chaining
88
*/
89
def addInsert(targetPath: String, table: Table, overwrite: Boolean): StreamStatementSet
90
91
/**
92
* Add insert operation with table descriptor
93
* @param targetDescriptor The target table descriptor
94
* @param table The table to insert
95
* @return This StreamStatementSet for method chaining
96
*/
97
def addInsert(targetDescriptor: TableDescriptor, table: Table): StreamStatementSet
98
99
/**
100
* Add insert operation with table descriptor and overwrite option
101
* @param targetDescriptor The target table descriptor
102
* @param table The table to insert
103
* @param overwrite Whether to overwrite existing data
104
* @return This StreamStatementSet for method chaining
105
*/
106
def addInsert(targetDescriptor: TableDescriptor, table: Table, overwrite: Boolean): StreamStatementSet
107
```
108
109
**Usage Examples:**
110
111
```scala
112
val statementSet = tableEnv.createStatementSet()
113
val processedTable1 = tableEnv.from("source_table_1").filter($"age" > 25)
114
val processedTable2 = tableEnv.from("source_table_2").groupBy($"name").select($"name", $"age".count())
115
116
// Insert with target path
117
statementSet
118
.addInsert("sink_table_1", processedTable1)
119
.addInsert("sink_table_2", processedTable2, true) // with overwrite
120
121
// Insert with table descriptor
122
val sinkDescriptor = TableDescriptor.forConnector("filesystem")
123
.schema(Schema.newBuilder()
124
.column("name", DataTypes.STRING())
125
.column("age", DataTypes.INT())
126
.build())
127
.option("path", "/path/to/sink")
128
.format("csv")
129
.build()
130
131
statementSet
132
.addInsert(sinkDescriptor, processedTable1)
133
.attachAsDataStream()
134
```
135
136
### Execution and Debugging
137
138
Methods for explaining execution plans and attaching to the streaming environment.
139
140
```scala { .api }
141
/**
142
* Print execution plan with optional extra details
143
* @param extraDetails Additional details to include in explanation
144
* @return This StreamStatementSet for method chaining
145
*/
146
def printExplain(extraDetails: ExplainDetail*): StreamStatementSet
147
148
/**
149
* Attach statements to underlying StreamExecutionEnvironment
150
* This triggers the actual execution setup
151
*/
152
def attachAsDataStream(): Unit
153
```
154
155
**Usage Examples:**
156
157
```scala
158
val statementSet = tableEnv.createStatementSet()
159
160
statementSet
161
.addInsertSql("INSERT INTO sink SELECT * FROM source WHERE age > 25")
162
.printExplain(ExplainDetail.CHANGELOG_MODE, ExplainDetail.COST)
163
.attachAsDataStream()
164
165
// The execution plan will be printed before attachment
166
```
167
168
## Complete Usage Example
169
170
Here's a comprehensive example showing how to use StreamStatementSet for complex multi-table operations:
171
172
```scala
173
import org.apache.flink.table.api._
174
import org.apache.flink.table.api.bridge.scala._
175
import org.apache.flink.streaming.api.scala._
176
177
val env = StreamExecutionEnvironment.getExecutionEnvironment
178
val tableEnv = StreamTableEnvironment.create(env)
179
180
// Register source tables
181
tableEnv.executeSql("""
182
CREATE TABLE users (
183
id INT,
184
name STRING,
185
age INT,
186
city STRING
187
) WITH (
188
'connector' = 'filesystem',
189
'path' = '/path/to/users.csv',
190
'format' = 'csv'
191
)
192
""")
193
194
tableEnv.executeSql("""
195
CREATE TABLE orders (
196
order_id INT,
197
user_id INT,
198
amount DECIMAL(10,2),
199
order_time TIMESTAMP(3)
200
) WITH (
201
'connector' = 'filesystem',
202
'path' = '/path/to/orders.csv',
203
'format' = 'csv'
204
)
205
""")
206
207
// Create statement set for batch operations
208
val statementSet = tableEnv.createStatementSet()
209
210
// Add multiple insert operations
211
val youngUsers = tableEnv.sqlQuery("SELECT * FROM users WHERE age < 30")
212
val userStats = tableEnv.sqlQuery("""
213
SELECT
214
u.city,
215
COUNT(*) as user_count,
216
AVG(u.age) as avg_age
217
FROM users u
218
GROUP BY u.city
219
""")
220
221
val orderSummary = tableEnv.sqlQuery("""
222
SELECT
223
u.city,
224
COUNT(o.order_id) as order_count,
225
SUM(o.amount) as total_amount
226
FROM users u
227
JOIN orders o ON u.id = o.user_id
228
GROUP BY u.city
229
""")
230
231
statementSet
232
.addInsert("young_users_sink", youngUsers)
233
.addInsert("user_stats_sink", userStats)
234
.addInsert("order_summary_sink", orderSummary)
235
.addInsertSql("INSERT INTO audit_log SELECT 'batch_job' as job_type, CURRENT_TIMESTAMP as run_time")
236
.printExplain() // Print execution plan
237
.attachAsDataStream() // Execute all operations together
238
239
env.execute("Multi-table ETL Job")
240
```
241
242
## Benefits of Statement Sets
243
244
### Performance Optimization
245
- **Shared computation**: Common subqueries are computed once and reused
246
- **Optimized execution plans**: The planner can optimize across multiple statements
247
- **Resource sharing**: Efficient use of parallelism and memory
248
249
### Operational Benefits
250
- **Atomic execution**: All statements succeed or fail together
251
- **Simplified debugging**: Single execution plan for multiple operations
252
- **Better resource management**: Coordinated scheduling of related operations
253
254
### Best Practices
255
- **Group related operations**: Batch operations that work on similar data
256
- **Use for ETL pipelines**: Ideal for multi-sink data processing jobs
257
- **Consider data freshness**: All operations execute with the same timestamp
258
- **Monitor resource usage**: Large statement sets may require more memory