0
# Statement Sets
1
2
## Overview
3
4
StreamStatementSet provides a way to batch multiple table operations together for optimized execution planning. It allows you to add multiple INSERT statements or table pipelines and execute them as a single optimized job, improving performance and resource utilization.
5
6
## Core API
7
8
### StreamStatementSet Creation
9
10
```scala { .api }
11
trait StreamTableEnvironment {
12
def createStatementSet(): StreamStatementSet
13
}
14
```
15
16
### Statement Management
17
18
```scala { .api }
19
trait StreamStatementSet {
20
def add(tablePipeline: TablePipeline): StreamStatementSet
21
def addInsertSql(statement: String): StreamStatementSet
22
def addInsert(targetPath: String, table: Table): StreamStatementSet
23
def addInsert(targetPath: String, table: Table, overwrite: Boolean): StreamStatementSet
24
def addInsert(targetDescriptor: TableDescriptor, table: Table): StreamStatementSet
25
def addInsert(targetDescriptor: TableDescriptor, table: Table, overwrite: Boolean): StreamStatementSet
26
}
27
```
28
29
### Execution and Debugging
30
31
```scala { .api }
32
trait StreamStatementSet {
33
def printExplain(extraDetails: ExplainDetail*): StreamStatementSet
34
def attachAsDataStream(): Unit
35
}
36
```
37
38
## Basic Usage
39
40
### Creating and Using Statement Sets
41
42
```scala
43
// Create statement set
44
val statementSet: StreamStatementSet = tableEnv.createStatementSet()
45
46
// Create source tables
47
val orderStream: DataStream[Order] = // ... source
48
val customerStream: DataStream[Customer] = // ... source
49
50
tableEnv.createTemporaryView("orders", orderStream)
51
tableEnv.createTemporaryView("customers", customerStream)
52
53
// Add multiple insert operations
54
statementSet
55
.addInsertSql("""
56
INSERT INTO daily_sales
57
SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd') as sale_date, SUM(amount)
58
FROM orders
59
GROUP BY DATE_FORMAT(order_time, 'yyyy-MM-dd')
60
""")
61
.addInsertSql("""
62
INSERT INTO customer_summary
63
SELECT c.id, c.name, COUNT(o.orderId) as order_count, SUM(o.amount) as total_spent
64
FROM customers c
65
LEFT JOIN orders o ON c.id = o.userId
66
GROUP BY c.id, c.name
67
""")
68
69
// Execute all statements together
70
statementSet.execute()
71
```
72
73
### Adding Table Objects
74
75
```scala
76
// Create processed tables
77
val dailySales: Table = tableEnv.sqlQuery("""
78
SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd') as sale_date, SUM(amount) as total_amount
79
FROM orders
80
GROUP BY DATE_FORMAT(order_time, 'yyyy-MM-dd')
81
""")
82
83
val customerMetrics: Table = tableEnv.sqlQuery("""
84
SELECT userId, COUNT(*) as order_count, AVG(amount) as avg_amount
85
FROM orders
86
GROUP BY userId
87
""")
88
89
// Add to statement set
90
statementSet
91
.addInsert("sales_summary", dailySales)
92
.addInsert("customer_metrics", customerMetrics, overwrite = true)
93
.execute()
94
```
95
96
## Advanced Features
97
98
### Using TableDescriptor
99
100
```scala
101
import org.apache.flink.table.api.TableDescriptor
102
import org.apache.flink.table.api.Schema
103
104
// Define sink table descriptor
105
val salesSinkDescriptor = TableDescriptor.forConnector("kafka")
106
.schema(Schema.newBuilder()
107
.column("sale_date", DataTypes.STRING())
108
.column("total_amount", DataTypes.DECIMAL(10, 2))
109
.build())
110
.option("topic", "daily-sales")
111
.option("properties.bootstrap.servers", "localhost:9092")
112
.format("json")
113
.build()
114
115
// Add insert with descriptor
116
statementSet.addInsert(salesSinkDescriptor, dailySales)
117
```
118
119
### Execution Planning and Optimization
120
121
```scala
122
// View execution plan before running
123
statementSet
124
.addInsert("sales_summary", dailySales)
125
.addInsert("customer_metrics", customerMetrics)
126
.printExplain() // Prints the optimized execution plan
127
128
// Execute with optimization
129
statementSet.execute()
130
```
131
132
### Overwrite Operations
133
134
```scala
135
// Insert with overwrite (for batch scenarios)
136
statementSet
137
.addInsert("historical_sales", salesTable, overwrite = true)
138
.addInsert("historical_customers", customerTable, overwrite = false)
139
.execute()
140
```
141
142
## StreamExecutionEnvironment Integration
143
144
### Attaching to DataStream Job
145
146
```scala
147
// Create Flink streaming job components
148
val env = StreamExecutionEnvironment.getExecutionEnvironment
149
val tableEnv = StreamTableEnvironment.create(env)
150
151
// Setup tables and statement set
152
val statementSet = tableEnv.createStatementSet()
153
statementSet
154
.addInsert("sink1", table1)
155
.addInsert("sink2", table2)
156
157
// Attach to streaming environment instead of executing immediately
158
statementSet.attachAsDataStream()
159
160
// Add additional DataStream operations
161
val alertStream: DataStream[Alert] = // ... create alert stream
162
alertStream.addSink(new AlertSink())
163
164
// Execute the complete job
165
env.execute("Combined Table and DataStream Job")
166
```
167
168
### Mixed Table and DataStream Processing
169
170
```scala
171
// Process some data with Table API
172
val processedTable: Table = tableEnv.sqlQuery("""
173
SELECT userId, COUNT(*) as event_count
174
FROM user_events
175
WHERE event_type = 'click'
176
GROUP BY userId
177
""")
178
179
// Convert back to DataStream for custom processing
180
val processedStream: DataStream[Row] = tableEnv.toDataStream(processedTable)
181
182
// Apply DataStream transformations
183
val enrichedStream = processedStream
184
.keyBy(_.getField(0).toString)
185
.map(row => enrichRow(row))
186
187
// Add DataStream sink
188
enrichedStream.addSink(new CustomSink())
189
190
// Also insert table results to external system
191
val statementSet = tableEnv.createStatementSet()
192
statementSet.addInsert("user_metrics_sink", processedTable)
193
statementSet.attachAsDataStream()
194
195
// Execute combined job
196
env.execute("Mixed Processing Job")
197
```
198
199
## Error Handling
200
201
```scala
202
try {
203
val statementSet = tableEnv.createStatementSet()
204
statementSet
205
.addInsertSql("INSERT INTO invalid_sink SELECT * FROM non_existent_table")
206
.execute()
207
} catch {
208
case e: ValidationException =>
209
// SQL validation failed or table doesn't exist
210
case e: TableException =>
211
// Execution planning or runtime error
212
case e: JobExecutionException =>
213
// Job execution failed
214
}
215
```
216
217
## Performance Benefits
218
219
### Query Optimization
220
221
```scala
222
// Without statement set - separate optimizations
223
tableEnv.executeSql("INSERT INTO sink1 SELECT * FROM complex_query1")
224
tableEnv.executeSql("INSERT INTO sink2 SELECT * FROM complex_query2")
225
tableEnv.executeSql("INSERT INTO sink3 SELECT * FROM complex_query3")
226
227
// With statement set - global optimization
228
val statementSet = tableEnv.createStatementSet()
229
statementSet
230
.addInsertSql("INSERT INTO sink1 SELECT * FROM complex_query1")
231
.addInsertSql("INSERT INTO sink2 SELECT * FROM complex_query2")
232
.addInsertSql("INSERT INTO sink3 SELECT * FROM complex_query3")
233
.execute() // All queries optimized together
234
```
235
236
### Resource Sharing
237
238
```scala
239
// Multiple queries that can share intermediate results
240
val commonSubquery = """
241
SELECT userId, DATE_FORMAT(event_time, 'yyyy-MM-dd') as date, amount
242
FROM transactions
243
WHERE amount > 100
244
"""
245
246
val statementSet = tableEnv.createStatementSet()
247
statementSet
248
.addInsertSql(s"""
249
INSERT INTO daily_high_value_sales
250
SELECT date, SUM(amount) as total
251
FROM ($commonSubquery)
252
GROUP BY date
253
""")
254
.addInsertSql(s"""
255
INSERT INTO user_high_value_summary
256
SELECT userId, COUNT(*) as transaction_count, AVG(amount) as avg_amount
257
FROM ($commonSubquery)
258
GROUP BY userId
259
""")
260
.execute() // Optimizer can share computation of common subquery
261
```
262
263
## Best Practices
264
265
1. **Batch Related Operations**: Group related INSERT operations in the same statement set
266
2. **Use Descriptors**: Prefer TableDescriptor for sink configuration over string-based DDL
267
3. **Explain Plans**: Always review execution plans for complex statement sets
268
4. **Resource Management**: Monitor resource usage when combining many operations
269
5. **Error Isolation**: Consider breaking up very large statement sets for better error isolation
270
6. **Testing**: Test statement sets thoroughly as they execute as atomic units
271
7. **Documentation**: Document the purpose and dependencies of complex statement sets
272
273
## Common Patterns
274
275
### ETL Pipeline Pattern
276
277
```scala
278
val statementSet = tableEnv.createStatementSet()
279
280
// Extract and transform in multiple stages
281
statementSet
282
.addInsertSql("INSERT INTO staging_orders SELECT * FROM raw_orders WHERE status = 'valid'")
283
.addInsertSql("INSERT INTO staging_customers SELECT * FROM raw_customers WHERE email IS NOT NULL")
284
.addInsertSql("""
285
INSERT INTO enriched_orders
286
SELECT o.*, c.name, c.segment
287
FROM staging_orders o
288
JOIN staging_customers c ON o.customer_id = c.id
289
""")
290
.addInsertSql("""
291
INSERT INTO order_summary
292
SELECT customer_id, DATE_FORMAT(order_date, 'yyyy-MM') as month,
293
COUNT(*) as order_count, SUM(amount) as total_amount
294
FROM enriched_orders
295
GROUP BY customer_id, DATE_FORMAT(order_date, 'yyyy-MM')
296
""")
297
.execute()
298
```
299
300
### Multi-Sink Pattern
301
302
```scala
303
// Send processed data to multiple destinations
304
val processedOrders: Table = tableEnv.sqlQuery("SELECT * FROM processed_orders")
305
306
val statementSet = tableEnv.createStatementSet()
307
statementSet
308
.addInsert("kafka_sink", processedOrders) // Real-time notifications
309
.addInsert("elasticsearch_sink", processedOrders) // Search indexing
310
.addInsert("hdfs_sink", processedOrders) // Long-term storage
311
.execute()
312
```