or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

builtin-connectors.mdchangelog-processing.mddatastream-connectors.mdindex.mdprocedures.mdstatement-sets.mdstream-table-environment.mdwatermark-strategies.md

statement-sets.mddocs/

0

# Statement Sets

1

2

Batch execution optimization for multiple table operations with shared planning and execution. StreamStatementSet enables efficient execution of multiple INSERT statements and table pipelines by optimizing them together as a single job.

3

4

## Capabilities

5

6

### Statement Set Creation

7

8

Create StreamStatementSet instances for batch execution of multiple operations.

9

10

```java { .api }

11

/**

12

* StreamStatementSet that integrates with the Java DataStream API

13

* Accepts pipelines defined by DML statements or Table objects

14

* The planner can optimize all added statements together

15

*/

16

public interface StreamStatementSet extends StatementSet {

17

18

/**

19

* Add a table pipeline to the statement set

20

* @param tablePipeline Table pipeline to add

21

* @return This StreamStatementSet for method chaining

22

*/

23

StreamStatementSet add(TablePipeline tablePipeline);

24

25

/**

26

* Add an INSERT SQL statement to the statement set

27

* @param statement INSERT SQL statement string

28

* @return This StreamStatementSet for method chaining

29

*/

30

StreamStatementSet addInsertSql(String statement);

31

32

/**

33

* Add table insert operation to existing table

34

* @param targetPath Path to target table

35

* @param table Source table to insert

36

* @return This StreamStatementSet for method chaining

37

*/

38

StreamStatementSet addInsert(String targetPath, Table table);

39

40

/**

41

* Add table insert operation with overwrite option

42

* @param targetPath Path to target table

43

* @param table Source table to insert

44

* @param overwrite Whether to overwrite existing data

45

* @return This StreamStatementSet for method chaining

46

*/

47

StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);

48

49

/**

50

* Add table insert operation using table descriptor

51

* @param targetDescriptor Target table descriptor

52

* @param table Source table to insert

53

* @return This StreamStatementSet for method chaining

54

*/

55

StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);

56

57

/**

58

* Add table insert operation using table descriptor with overwrite

59

* @param targetDescriptor Target table descriptor

60

* @param table Source table to insert

61

* @param overwrite Whether to overwrite existing data

62

* @return This StreamStatementSet for method chaining

63

*/

64

StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);

65

66

// Inherited from StatementSet:

67

TableResult execute();

68

CompiledPlan compilePlan(); // @Experimental

69

String explain(ExplainDetail... extraDetails);

70

String explain(ExplainFormat format, ExplainDetail... extraDetails);

71

}

72

```

73

74

### Statement Set Execution

75

76

Execute statement sets with different execution strategies.

77

78

```java { .api }

79

/**

80

* Optimizes all statements as one entity and adds them as transformations

81

* to the underlying StreamExecutionEnvironment

82

* Use StreamExecutionEnvironment.execute() to execute them

83

* The added statements will be cleared after calling this method

84

*/

85

void attachAsDataStream();

86

87

/**

88

* Print execution plan explanation with optional details

89

* @param extraDetails Additional details to include in explanation

90

* @return This StreamStatementSet for method chaining

91

*/

92

StreamStatementSet printExplain(ExplainDetail... extraDetails);

93

```

94

95

**Usage Examples:**

96

97

```java

98

import org.apache.flink.table.api.bridge.java.StreamStatementSet;

99

import org.apache.flink.table.api.Table;

100

import org.apache.flink.table.api.TableDescriptor;

101

import org.apache.flink.table.api.ExplainDetail;

102

103

// Create statement set

104

StreamStatementSet statementSet = tableEnv.createStatementSet();

105

106

// Add multiple operations

107

statementSet

108

.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE status = 'ACTIVE'")

109

.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE status = 'INACTIVE'")

110

.addInsert("sink3", tableEnv.sqlQuery("SELECT COUNT(*) FROM source"));

111

112

// Execute all statements together

113

statementSet.execute();

114

115

// Alternative: Attach to DataStream environment

116

StreamStatementSet asyncSet = tableEnv.createStatementSet();

117

asyncSet

118

.addInsertSql("INSERT INTO async_sink SELECT * FROM streaming_source")

119

.attachAsDataStream();

120

121

// Execute the underlying StreamExecutionEnvironment

122

env.execute("Batch Statement Set Job");

123

```

124

125

### Advanced Statement Set Patterns

126

127

Complex usage patterns for optimized batch execution.

128

129

```java

130

// Multi-sink pattern with shared computation

131

StreamStatementSet multiSinkSet = tableEnv.createStatementSet();

132

133

// Shared intermediate computation

134

Table processedData = tableEnv.sqlQuery(

135

"SELECT " +

136

" user_id, " +

137

" product_id, " +

138

" quantity, " +

139

" price, " +

140

" quantity * price as total_amount, " +

141

" EXTRACT(HOUR FROM order_time) as order_hour " +

142

"FROM raw_orders " +

143

"WHERE quantity > 0 AND price > 0"

144

);

145

146

// Multiple sinks using shared computation

147

multiSinkSet

148

.addInsert("hourly_stats", tableEnv.sqlQuery(

149

"SELECT order_hour, COUNT(*), SUM(total_amount) " +

150

"FROM " + processedData + " " +

151

"GROUP BY order_hour"

152

))

153

.addInsert("product_stats", tableEnv.sqlQuery(

154

"SELECT product_id, COUNT(*), AVG(total_amount) " +

155

"FROM " + processedData + " " +

156

"GROUP BY product_id"

157

))

158

.addInsert("user_stats", tableEnv.sqlQuery(

159

"SELECT user_id, COUNT(*), SUM(total_amount) " +

160

"FROM " + processedData + " " +

161

"GROUP BY user_id"

162

));

163

164

// Execute with shared optimization

165

multiSinkSet.execute();

166

```

167

168

### Execution Plan Analysis

169

170

Analyze and debug statement set execution plans.

171

172

```java { .api }

173

import org.apache.flink.table.api.ExplainDetail;

174

175

// Create statement set with operations

176

StreamStatementSet debugSet = tableEnv.createStatementSet();

177

debugSet

178

.addInsertSql("INSERT INTO debug_sink1 SELECT * FROM source WHERE type = 'A'")

179

.addInsertSql("INSERT INTO debug_sink2 SELECT * FROM source WHERE type = 'B'");

180

181

// Print detailed execution plan

182

debugSet.printExplain(

183

ExplainDetail.CHANGELOG_MODE,

184

ExplainDetail.COST,

185

ExplainDetail.ESTIMATED_COST

186

);

187

188

// This will output the optimized execution plan showing:

189

// - Shared operators

190

// - Data flow between operations

191

// - Cost estimates

192

// - Changelog mode information

193

```

194

195

## Performance Optimization

196

197

### Batch Execution Benefits

198

199

Understanding when statement sets provide performance improvements.

200

201

```java

202

// Without statement set - separate optimizations

203

tableEnv.executeSql("INSERT INTO sink1 SELECT * FROM source WHERE region = 'US'");

204

tableEnv.executeSql("INSERT INTO sink2 SELECT * FROM source WHERE region = 'EU'");

205

tableEnv.executeSql("INSERT INTO sink3 SELECT * FROM source WHERE region = 'ASIA'");

206

// Result: 3 separate jobs, source read 3 times

207

208

// With statement set - shared optimization

209

StreamStatementSet optimizedSet = tableEnv.createStatementSet();

210

optimizedSet

211

.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE region = 'US'")

212

.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE region = 'EU'")

213

.addInsertSql("INSERT INTO sink3 SELECT * FROM source WHERE region = 'ASIA'")

214

.execute();

215

// Result: 1 job, source read once, shared filtering and routing

216

```

217

218

### Resource Optimization

219

220

Configure statement sets for optimal resource usage.

221

222

```java

223

// Large-scale batch processing with statement sets

224

StreamStatementSet batchProcessingSet = tableEnv.createStatementSet();

225

226

// Configure environment for batch optimization

227

env.setParallelism(16);

228

env.getConfig().setLatencyTrackingInterval(1000);

229

230

// Add multiple heavy computations

231

String[] regions = {"US", "EU", "ASIA", "AFRICA", "OCEANIA"};

232

for (String region : regions) {

233

batchProcessingSet.addInsertSql(

234

"INSERT INTO region_analytics_" + region.toLowerCase() + " " +

235

"SELECT " +

236

" date_trunc('day', order_time) as order_date, " +

237

" product_category, " +

238

" COUNT(*) as order_count, " +

239

" SUM(total_amount) as total_revenue, " +

240

" AVG(total_amount) as avg_order_value " +

241

"FROM orders " +

242

"WHERE region = '" + region + "' " +

243

"GROUP BY date_trunc('day', order_time), product_category"

244

);

245

}

246

247

// Execute with shared resource planning

248

batchProcessingSet.execute();

249

```

250

251

### Dynamic Statement Building

252

253

Build statement sets dynamically based on runtime conditions.

254

255

```java

256

public StreamStatementSet buildDynamicStatementSet(

257

StreamTableEnvironment tableEnv,

258

List<String> targetTables,

259

Map<String, String> filterConditions) {

260

261

StreamStatementSet dynamicSet = tableEnv.createStatementSet();

262

263

for (String targetTable : targetTables) {

264

String condition = filterConditions.get(targetTable);

265

String insertSql = String.format(

266

"INSERT INTO %s SELECT * FROM source_table WHERE %s",

267

targetTable, condition

268

);

269

dynamicSet.addInsertSql(insertSql);

270

}

271

272

return dynamicSet;

273

}

274

275

// Usage

276

List<String> tables = Arrays.asList("active_users", "inactive_users", "premium_users");

277

Map<String, String> conditions = Map.of(

278

"active_users", "last_login > CURRENT_TIMESTAMP - INTERVAL '30' DAY",

279

"inactive_users", "last_login <= CURRENT_TIMESTAMP - INTERVAL '30' DAY",

280

"premium_users", "subscription_type = 'PREMIUM'"

281

);

282

283

StreamStatementSet dynamicSet = buildDynamicStatementSet(tableEnv, tables, conditions);

284

dynamicSet.execute();

285

```

286

287

## Error Handling and Monitoring

288

289

### Statement Set Error Handling

290

291

Handle errors in batch statement execution.

292

293

```java

294

try {

295

StreamStatementSet statementSet = tableEnv.createStatementSet();

296

statementSet

297

.addInsertSql("INSERT INTO sink1 SELECT * FROM source1")

298

.addInsertSql("INSERT INTO sink2 SELECT * FROM source2")

299

.addInsertSql("INSERT INTO sink3 SELECT * FROM source3");

300

301

// Execute and handle potential failures

302

TableResult result = statementSet.execute();

303

304

// Monitor execution

305

result.await(); // Wait for completion

306

System.out.println("Statement set executed successfully");

307

308

} catch (Exception e) {

309

System.err.println("Statement set execution failed: " + e.getMessage());

310

// Handle partial completion, rollback, or retry logic

311

}

312

```

313

314

### Monitoring Statement Set Progress

315

316

Monitor the progress of long-running statement sets.

317

318

```java

319

StreamStatementSet monitoredSet = tableEnv.createStatementSet();

320

321

// Add operations

322

monitoredSet

323

.addInsertSql("INSERT INTO large_sink1 SELECT * FROM large_source")

324

.addInsertSql("INSERT INTO large_sink2 SELECT * FROM large_source");

325

326

// Execute asynchronously

327

CompletableFuture<TableResult> execution = CompletableFuture.supplyAsync(() -> {

328

try {

329

return monitoredSet.execute();

330

} catch (Exception e) {

331

throw new RuntimeException(e);

332

}

333

});

334

335

// Monitor progress

336

execution.thenAccept(result -> {

337

System.out.println("Statement set completed successfully");

338

}).exceptionally(throwable -> {

339

System.err.println("Statement set failed: " + throwable.getMessage());

340

return null;

341

});

342

```

343

344

## Integration with DataStream API

345

346

### Hybrid Processing Patterns

347

348

Combine statement sets with DataStream operations.

349

350

```java

351

// DataStream processing

352

DataStream<Row> preprocessedStream = env

353

.fromSource(kafkaSource, watermarkStrategy, "kafka-source")

354

.map(new PreprocessingFunction())

355

.filter(new QualityFilter());

356

357

// Convert to table for SQL processing

358

tableEnv.createTemporaryView("preprocessed_data", preprocessedStream);

359

360

// Use statement set for multiple SQL operations

361

StreamStatementSet hybridSet = tableEnv.createStatementSet();

362

hybridSet

363

.addInsertSql("INSERT INTO sql_sink1 SELECT * FROM preprocessed_data WHERE category = 'A'")

364

.addInsertSql("INSERT INTO sql_sink2 SELECT * FROM preprocessed_data WHERE category = 'B'")

365

.attachAsDataStream(); // Attach to existing StreamExecutionEnvironment

366

367

// Add additional DataStream operations

368

DataStream<String> postProcessed = tableEnv

369

.toDataStream(tableEnv.sqlQuery("SELECT * FROM preprocessed_data"))

370

.map(new PostProcessingFunction());

371

372

postProcessed.addSink(customSink);

373

374

// Execute entire pipeline

375

env.execute("Hybrid Stream-Table Processing");

376

```

377

378

## Types

379

380

### Statement Set Types

381

382

```java { .api }

383

import org.apache.flink.table.api.bridge.java.StreamStatementSet;

384

import org.apache.flink.table.api.StatementSet;

385

import org.apache.flink.table.api.TablePipeline;

386

import org.apache.flink.table.api.Table;

387

import org.apache.flink.table.api.TableDescriptor;

388

```

389

390

### Execution and Monitoring Types

391

392

```java { .api }

393

import org.apache.flink.table.api.TableResult;

394

import org.apache.flink.table.api.ExplainDetail;

395

import java.util.concurrent.CompletableFuture;

396

```

397

398

### Environment Integration Types

399

400

```java { .api }

401

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

402

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

403

```