or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

changelog-processing.mddatastream-integration.mdenvironment-setup.mdimplicit-conversions.mdindex.mdstatement-sets.mdtable-operations.md

statement-sets.mddocs/

0

# Statement Sets

1

2

## Overview

3

4

StreamStatementSet provides a way to batch multiple table operations together for optimized execution planning. It allows you to add multiple INSERT statements or table pipelines and execute them as a single optimized job, improving performance and resource utilization.

5

6

## Core API

7

8

### StreamStatementSet Creation

9

10

```scala { .api }

11

trait StreamTableEnvironment {

12

def createStatementSet(): StreamStatementSet

13

}

14

```

15

16

### Statement Management

17

18

```scala { .api }

19

trait StreamStatementSet {

20

def add(tablePipeline: TablePipeline): StreamStatementSet

21

def addInsertSql(statement: String): StreamStatementSet

22

def addInsert(targetPath: String, table: Table): StreamStatementSet

23

def addInsert(targetPath: String, table: Table, overwrite: Boolean): StreamStatementSet

24

def addInsert(targetDescriptor: TableDescriptor, table: Table): StreamStatementSet

25

def addInsert(targetDescriptor: TableDescriptor, table: Table, overwrite: Boolean): StreamStatementSet

26

}

27

```

28

29

### Execution and Debugging

30

31

```scala { .api }

32

trait StreamStatementSet {

33

def printExplain(extraDetails: ExplainDetail*): StreamStatementSet

34

def attachAsDataStream(): Unit

35

}

36

```

37

38

## Basic Usage

39

40

### Creating and Using Statement Sets

41

42

```scala

43

// Create statement set

44

val statementSet: StreamStatementSet = tableEnv.createStatementSet()

45

46

// Create source tables

47

val orderStream: DataStream[Order] = // ... source

48

val customerStream: DataStream[Customer] = // ... source

49

50

tableEnv.createTemporaryView("orders", orderStream)

51

tableEnv.createTemporaryView("customers", customerStream)

52

53

// Add multiple insert operations

54

statementSet

55

.addInsertSql("""

56

INSERT INTO daily_sales

57

SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd') as sale_date, SUM(amount)

58

FROM orders

59

GROUP BY DATE_FORMAT(order_time, 'yyyy-MM-dd')

60

""")

61

.addInsertSql("""

62

INSERT INTO customer_summary

63

SELECT c.id, c.name, COUNT(o.orderId) as order_count, SUM(o.amount) as total_spent

64

FROM customers c

65

LEFT JOIN orders o ON c.id = o.userId

66

GROUP BY c.id, c.name

67

""")

68

69

// Execute all statements together

70

statementSet.execute()

71

```

72

73

### Adding Table Objects

74

75

```scala

76

// Create processed tables

77

val dailySales: Table = tableEnv.sqlQuery("""

78

SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd') as sale_date, SUM(amount) as total_amount

79

FROM orders

80

GROUP BY DATE_FORMAT(order_time, 'yyyy-MM-dd')

81

""")

82

83

val customerMetrics: Table = tableEnv.sqlQuery("""

84

SELECT userId, COUNT(*) as order_count, AVG(amount) as avg_amount

85

FROM orders

86

GROUP BY userId

87

""")

88

89

// Add to statement set

90

statementSet

91

.addInsert("sales_summary", dailySales)

92

.addInsert("customer_metrics", customerMetrics, overwrite = true)

93

.execute()

94

```

95

96

## Advanced Features

97

98

### Using TableDescriptor

99

100

```scala

101

import org.apache.flink.table.api.TableDescriptor

102

import org.apache.flink.table.api.Schema

103

104

// Define sink table descriptor

105

val salesSinkDescriptor = TableDescriptor.forConnector("kafka")

106

.schema(Schema.newBuilder()

107

.column("sale_date", DataTypes.STRING())

108

.column("total_amount", DataTypes.DECIMAL(10, 2))

109

.build())

110

.option("topic", "daily-sales")

111

.option("properties.bootstrap.servers", "localhost:9092")

112

.format("json")

113

.build()

114

115

// Add insert with descriptor

116

statementSet.addInsert(salesSinkDescriptor, dailySales)

117

```

118

119

### Execution Planning and Optimization

120

121

```scala

122

// View execution plan before running

123

statementSet

124

.addInsert("sales_summary", dailySales)

125

.addInsert("customer_metrics", customerMetrics)

126

.printExplain() // Prints the optimized execution plan

127

128

// Execute with optimization

129

statementSet.execute()

130

```

131

132

### Overwrite Operations

133

134

```scala

135

// Insert with overwrite (for batch scenarios)

136

statementSet

137

.addInsert("historical_sales", salesTable, overwrite = true)

138

.addInsert("historical_customers", customerTable, overwrite = false)

139

.execute()

140

```

141

142

## StreamExecutionEnvironment Integration

143

144

### Attaching to DataStream Job

145

146

```scala

147

// Create Flink streaming job components

148

val env = StreamExecutionEnvironment.getExecutionEnvironment

149

val tableEnv = StreamTableEnvironment.create(env)

150

151

// Setup tables and statement set

152

val statementSet = tableEnv.createStatementSet()

153

statementSet

154

.addInsert("sink1", table1)

155

.addInsert("sink2", table2)

156

157

// Attach to streaming environment instead of executing immediately

158

statementSet.attachAsDataStream()

159

160

// Add additional DataStream operations

161

val alertStream: DataStream[Alert] = // ... create alert stream

162

alertStream.addSink(new AlertSink())

163

164

// Execute the complete job

165

env.execute("Combined Table and DataStream Job")

166

```

167

168

### Mixed Table and DataStream Processing

169

170

```scala

171

// Process some data with Table API

172

val processedTable: Table = tableEnv.sqlQuery("""

173

SELECT userId, COUNT(*) as event_count

174

FROM user_events

175

WHERE event_type = 'click'

176

GROUP BY userId

177

""")

178

179

// Convert back to DataStream for custom processing

180

val processedStream: DataStream[Row] = tableEnv.toDataStream(processedTable)

181

182

// Apply DataStream transformations

183

val enrichedStream = processedStream

184

.keyBy(_.getField(0).toString)

185

.map(row => enrichRow(row))

186

187

// Add DataStream sink

188

enrichedStream.addSink(new CustomSink())

189

190

// Also insert table results to external system

191

val statementSet = tableEnv.createStatementSet()

192

statementSet.addInsert("user_metrics_sink", processedTable)

193

statementSet.attachAsDataStream()

194

195

// Execute combined job

196

env.execute("Mixed Processing Job")

197

```

198

199

## Error Handling

200

201

```scala

202

try {

203

val statementSet = tableEnv.createStatementSet()

204

statementSet

205

.addInsertSql("INSERT INTO invalid_sink SELECT * FROM non_existent_table")

206

.execute()

207

} catch {

208

case e: ValidationException =>

209

// SQL validation failed or table doesn't exist

210

case e: TableException =>

211

// Execution planning or runtime error

212

case e: JobExecutionException =>

213

// Job execution failed

214

}

215

```

216

217

## Performance Benefits

218

219

### Query Optimization

220

221

```scala

222

// Without statement set - separate optimizations

223

tableEnv.executeSql("INSERT INTO sink1 SELECT * FROM complex_query1")

224

tableEnv.executeSql("INSERT INTO sink2 SELECT * FROM complex_query2")

225

tableEnv.executeSql("INSERT INTO sink3 SELECT * FROM complex_query3")

226

227

// With statement set - global optimization

228

val statementSet = tableEnv.createStatementSet()

229

statementSet

230

.addInsertSql("INSERT INTO sink1 SELECT * FROM complex_query1")

231

.addInsertSql("INSERT INTO sink2 SELECT * FROM complex_query2")

232

.addInsertSql("INSERT INTO sink3 SELECT * FROM complex_query3")

233

.execute() // All queries optimized together

234

```

235

236

### Resource Sharing

237

238

```scala

239

// Multiple queries that can share intermediate results

240

val commonSubquery = """

241

SELECT userId, DATE_FORMAT(event_time, 'yyyy-MM-dd') as date, amount

242

FROM transactions

243

WHERE amount > 100

244

"""

245

246

val statementSet = tableEnv.createStatementSet()

247

statementSet

248

.addInsertSql(s"""

249

INSERT INTO daily_high_value_sales

250

SELECT date, SUM(amount) as total

251

FROM ($commonSubquery)

252

GROUP BY date

253

""")

254

.addInsertSql(s"""

255

INSERT INTO user_high_value_summary

256

SELECT userId, COUNT(*) as transaction_count, AVG(amount) as avg_amount

257

FROM ($commonSubquery)

258

GROUP BY userId

259

""")

260

.execute() // Optimizer can share computation of common subquery

261

```

262

263

## Best Practices

264

265

1. **Batch Related Operations**: Group related INSERT operations in the same statement set

266

2. **Use Descriptors**: Prefer TableDescriptor for sink configuration over string-based DDL

267

3. **Explain Plans**: Always review execution plans for complex statement sets

268

4. **Resource Management**: Monitor resource usage when combining many operations

269

5. **Error Isolation**: Consider breaking up very large statement sets for better error isolation

270

6. **Testing**: Test statement sets thoroughly as they execute as atomic units

271

7. **Documentation**: Document the purpose and dependencies of complex statement sets

272

273

## Common Patterns

274

275

### ETL Pipeline Pattern

276

277

```scala

278

val statementSet = tableEnv.createStatementSet()

279

280

// Extract and transform in multiple stages

281

statementSet

282

.addInsertSql("INSERT INTO staging_orders SELECT * FROM raw_orders WHERE status = 'valid'")

283

.addInsertSql("INSERT INTO staging_customers SELECT * FROM raw_customers WHERE email IS NOT NULL")

284

.addInsertSql("""

285

INSERT INTO enriched_orders

286

SELECT o.*, c.name, c.segment

287

FROM staging_orders o

288

JOIN staging_customers c ON o.customer_id = c.id

289

""")

290

.addInsertSql("""

291

INSERT INTO order_summary

292

SELECT customer_id, DATE_FORMAT(order_date, 'yyyy-MM') as month,

293

COUNT(*) as order_count, SUM(amount) as total_amount

294

FROM enriched_orders

295

GROUP BY customer_id, DATE_FORMAT(order_date, 'yyyy-MM')

296

""")

297

.execute()

298

```

299

300

### Multi-Sink Pattern

301

302

```scala

303

// Send processed data to multiple destinations

304

val processedOrders: Table = tableEnv.sqlQuery("SELECT * FROM processed_orders")

305

306

val statementSet = tableEnv.createStatementSet()

307

statementSet

308

.addInsert("kafka_sink", processedOrders) // Real-time notifications

309

.addInsert("elasticsearch_sink", processedOrders) // Search indexing

310

.addInsert("hdfs_sink", processedOrders) // Long-term storage

311

.execute()

312

```