0
# Statement Sets
1
2
Batch execution optimization for multiple table operations with shared planning and execution. StreamStatementSet enables efficient execution of multiple INSERT statements and table pipelines by optimizing them together as a single job.
3
4
## Capabilities
5
6
### Statement Set Creation
7
8
Create StreamStatementSet instances for batch execution of multiple operations.
9
10
```java { .api }
11
/**
12
* StreamStatementSet that integrates with the Java DataStream API
13
* Accepts pipelines defined by DML statements or Table objects
14
* The planner can optimize all added statements together
15
*/
16
public interface StreamStatementSet extends StatementSet {
17
18
/**
19
* Add a table pipeline to the statement set
20
* @param tablePipeline Table pipeline to add
21
* @return This StreamStatementSet for method chaining
22
*/
23
StreamStatementSet add(TablePipeline tablePipeline);
24
25
/**
26
* Add an INSERT SQL statement to the statement set
27
* @param statement INSERT SQL statement string
28
* @return This StreamStatementSet for method chaining
29
*/
30
StreamStatementSet addInsertSql(String statement);
31
32
/**
33
* Add table insert operation to existing table
34
* @param targetPath Path to target table
35
* @param table Source table to insert
36
* @return This StreamStatementSet for method chaining
37
*/
38
StreamStatementSet addInsert(String targetPath, Table table);
39
40
/**
41
* Add table insert operation with overwrite option
42
* @param targetPath Path to target table
43
* @param table Source table to insert
44
* @param overwrite Whether to overwrite existing data
45
* @return This StreamStatementSet for method chaining
46
*/
47
StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);
48
49
/**
50
* Add table insert operation using table descriptor
51
* @param targetDescriptor Target table descriptor
52
* @param table Source table to insert
53
* @return This StreamStatementSet for method chaining
54
*/
55
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);
56
57
/**
58
* Add table insert operation using table descriptor with overwrite
59
* @param targetDescriptor Target table descriptor
60
* @param table Source table to insert
61
* @param overwrite Whether to overwrite existing data
62
* @return This StreamStatementSet for method chaining
63
*/
64
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);
65
66
// Inherited from StatementSet:
67
TableResult execute();
68
CompiledPlan compilePlan(); // @Experimental
69
String explain(ExplainDetail... extraDetails);
70
String explain(ExplainFormat format, ExplainDetail... extraDetails);
71
}
72
```
73
74
### Statement Set Execution
75
76
Execute statement sets with different execution strategies.
77
78
```java { .api }
79
/**
80
* Optimizes all statements as one entity and adds them as transformations
81
* to the underlying StreamExecutionEnvironment
82
* Use StreamExecutionEnvironment.execute() to execute them
83
* The added statements will be cleared after calling this method
84
*/
85
void attachAsDataStream();
86
87
/**
88
* Print execution plan explanation with optional details
89
* @param extraDetails Additional details to include in explanation
90
* @return This StreamStatementSet for method chaining
91
*/
92
StreamStatementSet printExplain(ExplainDetail... extraDetails);
93
```
94
95
**Usage Examples:**
96
97
```java
98
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
99
import org.apache.flink.table.api.Table;
100
import org.apache.flink.table.api.TableDescriptor;
101
import org.apache.flink.table.api.ExplainDetail;
102
103
// Create statement set
104
StreamStatementSet statementSet = tableEnv.createStatementSet();
105
106
// Add multiple operations
107
statementSet
108
.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE status = 'ACTIVE'")
109
.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE status = 'INACTIVE'")
110
.addInsert("sink3", tableEnv.sqlQuery("SELECT COUNT(*) FROM source"));
111
112
// Execute all statements together
113
statementSet.execute();
114
115
// Alternative: Attach to DataStream environment
116
StreamStatementSet asyncSet = tableEnv.createStatementSet();
117
asyncSet
118
.addInsertSql("INSERT INTO async_sink SELECT * FROM streaming_source")
119
.attachAsDataStream();
120
121
// Execute the underlying StreamExecutionEnvironment
122
env.execute("Batch Statement Set Job");
123
```
124
125
### Advanced Statement Set Patterns
126
127
Complex usage patterns for optimized batch execution.
128
129
```java
130
// Multi-sink pattern with shared computation
131
StreamStatementSet multiSinkSet = tableEnv.createStatementSet();
132
133
// Shared intermediate computation
134
Table processedData = tableEnv.sqlQuery(
135
"SELECT " +
136
" user_id, " +
137
" product_id, " +
138
" quantity, " +
139
" price, " +
140
" quantity * price as total_amount, " +
141
" EXTRACT(HOUR FROM order_time) as order_hour " +
142
"FROM raw_orders " +
143
"WHERE quantity > 0 AND price > 0"
144
);
145
146
// Multiple sinks using shared computation
147
multiSinkSet
148
.addInsert("hourly_stats", tableEnv.sqlQuery(
149
"SELECT order_hour, COUNT(*), SUM(total_amount) " +
150
"FROM " + processedData + " " +
151
"GROUP BY order_hour"
152
))
153
.addInsert("product_stats", tableEnv.sqlQuery(
154
"SELECT product_id, COUNT(*), AVG(total_amount) " +
155
"FROM " + processedData + " " +
156
"GROUP BY product_id"
157
))
158
.addInsert("user_stats", tableEnv.sqlQuery(
159
"SELECT user_id, COUNT(*), SUM(total_amount) " +
160
"FROM " + processedData + " " +
161
"GROUP BY user_id"
162
));
163
164
// Execute with shared optimization
165
multiSinkSet.execute();
166
```
167
168
### Execution Plan Analysis
169
170
Analyze and debug statement set execution plans.
171
172
```java { .api }
173
import org.apache.flink.table.api.ExplainDetail;
174
175
// Create statement set with operations
176
StreamStatementSet debugSet = tableEnv.createStatementSet();
177
debugSet
178
.addInsertSql("INSERT INTO debug_sink1 SELECT * FROM source WHERE type = 'A'")
179
.addInsertSql("INSERT INTO debug_sink2 SELECT * FROM source WHERE type = 'B'");
180
181
// Print detailed execution plan
182
debugSet.printExplain(
183
ExplainDetail.CHANGELOG_MODE,
184
ExplainDetail.COST,
185
ExplainDetail.ESTIMATED_COST
186
);
187
188
// This will output the optimized execution plan showing:
189
// - Shared operators
190
// - Data flow between operations
191
// - Cost estimates
192
// - Changelog mode information
193
```
194
195
## Performance Optimization
196
197
### Batch Execution Benefits
198
199
Understanding when statement sets provide performance improvements.
200
201
```java
202
// Without statement set - separate optimizations
203
tableEnv.executeSql("INSERT INTO sink1 SELECT * FROM source WHERE region = 'US'");
204
tableEnv.executeSql("INSERT INTO sink2 SELECT * FROM source WHERE region = 'EU'");
205
tableEnv.executeSql("INSERT INTO sink3 SELECT * FROM source WHERE region = 'ASIA'");
206
// Result: 3 separate jobs, source read 3 times
207
208
// With statement set - shared optimization
209
StreamStatementSet optimizedSet = tableEnv.createStatementSet();
210
optimizedSet
211
.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE region = 'US'")
212
.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE region = 'EU'")
213
.addInsertSql("INSERT INTO sink3 SELECT * FROM source WHERE region = 'ASIA'")
214
.execute();
215
// Result: 1 job, source read once, shared filtering and routing
216
```
217
218
### Resource Optimization
219
220
Configure statement sets for optimal resource usage.
221
222
```java
223
// Large-scale batch processing with statement sets
224
StreamStatementSet batchProcessingSet = tableEnv.createStatementSet();
225
226
// Configure environment for batch optimization
227
env.setParallelism(16);
228
env.getConfig().setLatencyTrackingInterval(1000);
229
230
// Add multiple heavy computations
231
String[] regions = {"US", "EU", "ASIA", "AFRICA", "OCEANIA"};
232
for (String region : regions) {
233
batchProcessingSet.addInsertSql(
234
"INSERT INTO region_analytics_" + region.toLowerCase() + " " +
235
"SELECT " +
236
" date_trunc('day', order_time) as order_date, " +
237
" product_category, " +
238
" COUNT(*) as order_count, " +
239
" SUM(total_amount) as total_revenue, " +
240
" AVG(total_amount) as avg_order_value " +
241
"FROM orders " +
242
"WHERE region = '" + region + "' " +
243
"GROUP BY date_trunc('day', order_time), product_category"
244
);
245
}
246
247
// Execute with shared resource planning
248
batchProcessingSet.execute();
249
```
250
251
### Dynamic Statement Building
252
253
Build statement sets dynamically based on runtime conditions.
254
255
```java
256
public StreamStatementSet buildDynamicStatementSet(
257
StreamTableEnvironment tableEnv,
258
List<String> targetTables,
259
Map<String, String> filterConditions) {
260
261
StreamStatementSet dynamicSet = tableEnv.createStatementSet();
262
263
for (String targetTable : targetTables) {
264
String condition = filterConditions.get(targetTable);
265
String insertSql = String.format(
266
"INSERT INTO %s SELECT * FROM source_table WHERE %s",
267
targetTable, condition
268
);
269
dynamicSet.addInsertSql(insertSql);
270
}
271
272
return dynamicSet;
273
}
274
275
// Usage
276
List<String> tables = Arrays.asList("active_users", "inactive_users", "premium_users");
277
Map<String, String> conditions = Map.of(
278
"active_users", "last_login > CURRENT_TIMESTAMP - INTERVAL '30' DAY",
279
"inactive_users", "last_login <= CURRENT_TIMESTAMP - INTERVAL '30' DAY",
280
"premium_users", "subscription_type = 'PREMIUM'"
281
);
282
283
StreamStatementSet dynamicSet = buildDynamicStatementSet(tableEnv, tables, conditions);
284
dynamicSet.execute();
285
```
286
287
## Error Handling and Monitoring
288
289
### Statement Set Error Handling
290
291
Handle errors in batch statement execution.
292
293
```java
294
try {
295
StreamStatementSet statementSet = tableEnv.createStatementSet();
296
statementSet
297
.addInsertSql("INSERT INTO sink1 SELECT * FROM source1")
298
.addInsertSql("INSERT INTO sink2 SELECT * FROM source2")
299
.addInsertSql("INSERT INTO sink3 SELECT * FROM source3");
300
301
// Execute and handle potential failures
302
TableResult result = statementSet.execute();
303
304
// Monitor execution
305
result.await(); // Wait for completion
306
System.out.println("Statement set executed successfully");
307
308
} catch (Exception e) {
309
System.err.println("Statement set execution failed: " + e.getMessage());
310
// Handle partial completion, rollback, or retry logic
311
}
312
```
313
314
### Monitoring Statement Set Progress
315
316
Monitor the progress of long-running statement sets.
317
318
```java
319
StreamStatementSet monitoredSet = tableEnv.createStatementSet();
320
321
// Add operations
322
monitoredSet
323
.addInsertSql("INSERT INTO large_sink1 SELECT * FROM large_source")
324
.addInsertSql("INSERT INTO large_sink2 SELECT * FROM large_source");
325
326
// Execute asynchronously
327
CompletableFuture<TableResult> execution = CompletableFuture.supplyAsync(() -> {
328
try {
329
return monitoredSet.execute();
330
} catch (Exception e) {
331
throw new RuntimeException(e);
332
}
333
});
334
335
// Monitor progress
336
execution.thenAccept(result -> {
337
System.out.println("Statement set completed successfully");
338
}).exceptionally(throwable -> {
339
System.err.println("Statement set failed: " + throwable.getMessage());
340
return null;
341
});
342
```
343
344
## Integration with DataStream API
345
346
### Hybrid Processing Patterns
347
348
Combine statement sets with DataStream operations.
349
350
```java
351
// DataStream processing
352
DataStream<Row> preprocessedStream = env
353
.fromSource(kafkaSource, watermarkStrategy, "kafka-source")
354
.map(new PreprocessingFunction())
355
.filter(new QualityFilter());
356
357
// Convert to table for SQL processing
358
tableEnv.createTemporaryView("preprocessed_data", preprocessedStream);
359
360
// Use statement set for multiple SQL operations
361
StreamStatementSet hybridSet = tableEnv.createStatementSet();
362
hybridSet
363
.addInsertSql("INSERT INTO sql_sink1 SELECT * FROM preprocessed_data WHERE category = 'A'")
364
.addInsertSql("INSERT INTO sql_sink2 SELECT * FROM preprocessed_data WHERE category = 'B'")
365
.attachAsDataStream(); // Attach to existing StreamExecutionEnvironment
366
367
// Add additional DataStream operations
368
DataStream<String> postProcessed = tableEnv
369
.toDataStream(tableEnv.sqlQuery("SELECT * FROM preprocessed_data"))
370
.map(new PostProcessingFunction());
371
372
postProcessed.addSink(customSink);
373
374
// Execute entire pipeline
375
env.execute("Hybrid Stream-Table Processing");
376
```
377
378
## Types
379
380
### Statement Set Types
381
382
```java { .api }
383
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
384
import org.apache.flink.table.api.StatementSet;
385
import org.apache.flink.table.api.TablePipeline;
386
import org.apache.flink.table.api.Table;
387
import org.apache.flink.table.api.TableDescriptor;
388
```
389
390
### Execution and Monitoring Types
391
392
```java { .api }
393
import org.apache.flink.table.api.TableResult;
394
import org.apache.flink.table.api.ExplainDetail;
395
import java.util.concurrent.CompletableFuture;
396
```
397
398
### Environment Integration Types
399
400
```java { .api }
401
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
402
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
403
```