0
# Statement Set Operations
1
2
Batch execution of multiple table operations for optimized query planning and execution. StreamStatementSet enables adding multiple DML statements and executing them as a single optimized job.
3
4
## Capabilities
5
6
### Statement Set Creation
7
8
Creates a StreamStatementSet for collecting multiple table operations.
9
10
```java { .api }
11
/**
12
* Creates a statement set for batch execution of table operations
13
* @return StreamStatementSet for adding multiple operations
14
*/
15
StreamStatementSet createStatementSet();
16
```
17
18
### Statement Set Interface
19
20
Main interface for building and executing multiple table operations together.
21
22
```java { .api }
23
/**
24
* Statement set that integrates with DataStream API for batch execution
25
* Optimizes all added statements together before execution
26
*/
27
public interface StreamStatementSet extends StatementSet {
28
29
/**
30
* Adds a table pipeline to the statement set
31
* @param tablePipeline The table pipeline to add
32
* @return This statement set for method chaining
33
*/
34
StreamStatementSet add(TablePipeline tablePipeline);
35
36
/**
37
* Adds an INSERT SQL statement to the statement set
38
* @param statement SQL INSERT statement string
39
* @return This statement set for method chaining
40
*/
41
StreamStatementSet addInsertSql(String statement);
42
43
/**
44
* Adds an INSERT operation to the statement set
45
* @param targetPath Path to target table for insertion
46
* @param table Source table for the insert operation
47
* @return This statement set for method chaining
48
*/
49
StreamStatementSet addInsert(String targetPath, Table table);
50
51
/**
52
* Adds an INSERT operation with overwrite option
53
* @param targetPath Path to target table for insertion
54
* @param table Source table for the insert operation
55
* @param overwrite Whether to overwrite existing data
56
* @return This statement set for method chaining
57
*/
58
StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);
59
60
/**
61
* Adds an INSERT operation using table descriptor
62
* @param targetDescriptor Descriptor defining the target table
63
* @param table Source table for the insert operation
64
* @return This statement set for method chaining
65
*/
66
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);
67
68
/**
69
* Adds an INSERT operation using table descriptor with overwrite
70
* @param targetDescriptor Descriptor defining the target table
71
* @param table Source table for the insert operation
72
* @param overwrite Whether to overwrite existing data
73
* @return This statement set for method chaining
74
*/
75
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);
76
}
77
```
78
79
### Execution Methods
80
81
Methods for executing the collected statements.
82
83
```java { .api }
84
/**
85
* Optimizes all statements as one entity and adds them as transformations
86
* to the underlying StreamExecutionEnvironment. Use StreamExecutionEnvironment.execute()
87
* to actually execute them. The added statements will be cleared after calling this method.
88
*/
89
void attachAsDataStream();
90
91
/**
92
* Optimizes and executes all added statements as one job
93
* @return TableResult containing execution results
94
*/
95
TableResult execute();
96
```
97
98
### Explain and Debug
99
100
Methods for analyzing the execution plan.
101
102
```java { .api }
103
/**
104
* Prints the execution plan for all added statements
105
* @param extraDetails Additional details to include in the explanation
106
* @return This statement set for method chaining
107
*/
108
StreamStatementSet printExplain(ExplainDetail... extraDetails);
109
110
/**
111
* Returns the execution plan as a string
112
* @param extraDetails Additional details to include in the explanation
113
* @return Execution plan string
114
*/
115
String explain(ExplainDetail... extraDetails);
116
```
117
118
## Usage Examples
119
120
### Basic Statement Set Usage
121
122
```java
123
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
124
import org.apache.flink.table.api.Table;
125
126
// Create statement set
127
StreamStatementSet statementSet = tableEnv.createStatementSet();
128
129
// Add multiple operations
130
Table salesData = tableEnv.from("sales_source");
131
Table customerData = tableEnv.from("customer_source");
132
133
// Process sales data
134
Table salesSummary = salesData
135
.groupBy($("customer_id"))
136
.select($("customer_id"), $("amount").sum().as("total_sales"));
137
138
// Join with customer data
139
Table customerSales = customerData
140
.join(salesSummary, $("id").isEqual($("customer_id")))
141
.select($("name"), $("total_sales"));
142
143
// Add insert operations to statement set
144
statementSet.addInsert("sales_summary_sink", salesSummary);
145
statementSet.addInsert("customer_sales_sink", customerSales);
146
147
// Execute all operations together
148
statementSet.execute();
149
```
150
151
### SQL Statement Integration
152
153
```java
154
// Mix of programmatic and SQL operations
155
StreamStatementSet statementSet = tableEnv.createStatementSet();
156
157
// Add SQL INSERT statements
158
statementSet.addInsertSql(
159
"INSERT INTO daily_summary " +
160
"SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd') as day, " +
161
" COUNT(*) as order_count, " +
162
" SUM(amount) as total_amount " +
163
"FROM orders " +
164
"GROUP BY DATE_FORMAT(order_time, 'yyyy-MM-dd')"
165
);
166
167
statementSet.addInsertSql(
168
"INSERT INTO product_stats " +
169
"SELECT product_id, " +
170
" COUNT(*) as sales_count, " +
171
" AVG(amount) as avg_amount " +
172
"FROM orders " +
173
"GROUP BY product_id"
174
);
175
176
// Add programmatic operations
177
Table alertData = tableEnv.sqlQuery(
178
"SELECT customer_id, amount FROM orders WHERE amount > 1000"
179
);
180
statementSet.addInsert("high_value_alerts", alertData);
181
182
// Execute all together
183
statementSet.execute();
184
```
185
186
### DataStream Integration
187
188
```java
189
// Attach to DataStream environment for custom execution control
190
StreamStatementSet statementSet = tableEnv.createStatementSet();
191
192
// Add operations
193
statementSet.addInsert("sink1", table1);
194
statementSet.addInsert("sink2", table2);
195
statementSet.addInsert("sink3", table3);
196
197
// Attach to StreamExecutionEnvironment instead of immediate execution
198
statementSet.attachAsDataStream();
199
200
// Add custom DataStream operations
201
DataStream<String> alertStream = env.socketTextStream("localhost", 9999);
202
alertStream.print("alerts");
203
204
// Execute the complete job (both table and stream operations)
205
env.execute("Combined Table and Stream Job");
206
```
207
208
### Table Descriptor Usage
209
210
```java
211
import org.apache.flink.table.api.TableDescriptor;
212
import org.apache.flink.table.api.Schema;
213
import org.apache.flink.table.api.DataTypes;
214
215
// Create table descriptors for dynamic sink creation
216
TableDescriptor csvSinkDescriptor = TableDescriptor.forConnector("filesystem")
217
.schema(Schema.newBuilder()
218
.column("id", DataTypes.BIGINT())
219
.column("name", DataTypes.STRING())
220
.column("amount", DataTypes.DECIMAL(10, 2))
221
.build())
222
.option("path", "/path/to/csv/output")
223
.format("csv")
224
.build();
225
226
TableDescriptor printSinkDescriptor = TableDescriptor.forConnector("print")
227
.schema(Schema.newBuilder()
228
.column("summary", DataTypes.STRING())
229
.column("count", DataTypes.BIGINT())
230
.build())
231
.option("print-identifier", "summary")
232
.build();
233
234
// Use descriptors in statement set
235
StreamStatementSet statementSet = tableEnv.createStatementSet();
236
237
Table processedData = tableEnv.sqlQuery("SELECT id, name, amount FROM source_table");
238
Table summaryData = tableEnv.sqlQuery("SELECT 'Total processed' as summary, COUNT(*) as count FROM source_table");
239
240
statementSet.addInsert(csvSinkDescriptor, processedData);
241
statementSet.addInsert(printSinkDescriptor, summaryData);
242
243
statementSet.execute();
244
```
245
246
### Execution Plan Analysis
247
248
```java
249
import org.apache.flink.table.api.ExplainDetail;
250
251
// Create and populate statement set
252
StreamStatementSet statementSet = tableEnv.createStatementSet();
253
statementSet.addInsert("sink1", complexQuery1);
254
statementSet.addInsert("sink2", complexQuery2);
255
statementSet.addInsert("sink3", complexQuery3);
256
257
// Print execution plan with details
258
statementSet.printExplain(
259
ExplainDetail.CHANGELOG_MODE,
260
ExplainDetail.ESTIMATED_COST,
261
ExplainDetail.JSON_EXECUTION_PLAN
262
);
263
264
// Get execution plan as string for logging
265
String executionPlan = statementSet.explain(ExplainDetail.ESTIMATED_COST);
266
logger.info("Execution plan: {}", executionPlan);
267
268
// Execute after analysis
269
statementSet.execute();
270
```
271
272
## Type Definitions
273
274
### Statement Set Hierarchy
275
276
```java { .api }
277
import org.apache.flink.table.api.StatementSet;
278
import org.apache.flink.table.api.TablePipeline;
279
import org.apache.flink.table.api.TableDescriptor;
280
281
// Interface hierarchy
282
interface StatementSet {
283
StatementSet add(TablePipeline tablePipeline);
284
StatementSet addInsertSql(String statement);
285
// ... base methods
286
}
287
288
interface StreamStatementSet extends StatementSet {
289
void attachAsDataStream();
290
// ... streaming-specific methods
291
}
292
```
293
294
### Table Pipeline
295
296
```java { .api }
297
/**
298
* Represents a table operation pipeline that can be added to statement sets
299
*/
300
interface TablePipeline {
301
// Marker interface for table operations
302
}
303
304
// Table implements TablePipeline
305
Table implements TablePipeline {
306
// Table operations can be added to statement sets
307
}
308
```
309
310
### Explain Details
311
312
```java { .api }
313
import org.apache.flink.table.api.ExplainDetail;
314
315
// Available explanation details
316
ExplainDetail.CHANGELOG_MODE // Show changelog mode information
317
ExplainDetail.ESTIMATED_COST // Show cost estimation
318
ExplainDetail.JSON_EXECUTION_PLAN // Show JSON format execution plan
319
```
320
321
### Execution Results
322
323
```java { .api }
324
import org.apache.flink.table.api.TableResult;
325
326
// Result of statement set execution
327
interface TableResult {
328
JobClient getJobClient(); // Access to job execution
329
TableSchema getTableSchema(); // Result schema
330
CloseableIterator<Row> collect(); // Collect results (blocking)
331
void print(); // Print results to console
332
CompletableFuture<Void> await(); // Wait for completion
333
}
334
```
335
336
## Best Practices
337
338
### Optimization Benefits
339
340
Using statement sets provides several optimization opportunities:
341
342
- **Shared subquery elimination**: Common subexpressions across statements are computed once
343
- **Operator fusion**: Compatible operators are fused to reduce serialization overhead
344
- **Resource planning**: Global optimization considers all operations for resource allocation
345
- **Pipeline optimization**: Cross-statement optimizations like predicate pushdown
346
347
### Performance Guidelines
348
349
```java
350
// Good: Group related operations in statement sets
351
StreamStatementSet etlSet = tableEnv.createStatementSet();
352
etlSet.addInsert("cleaned_data", cleaningQuery);
353
etlSet.addInsert("enriched_data", enrichmentQuery);
354
etlSet.addInsert("aggregated_data", aggregationQuery);
355
etlSet.execute(); // Optimized together
356
357
// Good: Use attachAsDataStream for mixed workloads
358
StreamStatementSet tableOps = tableEnv.createStatementSet();
359
tableOps.addInsert("table_sink", tableQuery);
360
tableOps.attachAsDataStream();
361
362
// Add DataStream operations
363
dataStream.addSink(customSink);
364
env.execute("Mixed workload");
365
366
// Avoid: Executing statements individually when they could be batched
367
table1.executeInsert("sink1"); // Individual execution
368
table2.executeInsert("sink2"); // Individual execution
369
table3.executeInsert("sink3"); // Individual execution
370
```
371
372
### Error Handling
373
374
```java
375
try {
376
StreamStatementSet statementSet = tableEnv.createStatementSet();
377
statementSet.addInsert("sink1", query1);
378
statementSet.addInsert("sink2", query2);
379
380
// Validate before execution
381
String plan = statementSet.explain();
382
logger.debug("Execution plan: {}", plan);
383
384
TableResult result = statementSet.execute();
385
result.await(); // Wait for completion
386
387
} catch (Exception e) {
388
logger.error("Statement set execution failed", e);
389
// Handle failure appropriately
390
}
391
```