or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

built-in-connectors.mdconnector-framework.mddatastream-integration.mdindex.mdprocedure-context.mdstatement-set.mdtable-environment.md

statement-set.mddocs/

0

# Statement Set Operations

1

2

Batch execution of multiple table operations for optimized query planning and execution. StreamStatementSet enables adding multiple DML statements and executing them as a single optimized job.

3

4

## Capabilities

5

6

### Statement Set Creation

7

8

Creates a StreamStatementSet for collecting multiple table operations.

9

10

```java { .api }

11

/**

12

* Creates a statement set for batch execution of table operations

13

* @return StreamStatementSet for adding multiple operations

14

*/

15

StreamStatementSet createStatementSet();

16

```

17

18

### Statement Set Interface

19

20

Main interface for building and executing multiple table operations together.

21

22

```java { .api }

23

/**

24

* Statement set that integrates with DataStream API for batch execution

25

* Optimizes all added statements together before execution

26

*/

27

public interface StreamStatementSet extends StatementSet {

28

29

/**

30

* Adds a table pipeline to the statement set

31

* @param tablePipeline The table pipeline to add

32

* @return This statement set for method chaining

33

*/

34

StreamStatementSet add(TablePipeline tablePipeline);

35

36

/**

37

* Adds an INSERT SQL statement to the statement set

38

* @param statement SQL INSERT statement string

39

* @return This statement set for method chaining

40

*/

41

StreamStatementSet addInsertSql(String statement);

42

43

/**

44

* Adds an INSERT operation to the statement set

45

* @param targetPath Path to target table for insertion

46

* @param table Source table for the insert operation

47

* @return This statement set for method chaining

48

*/

49

StreamStatementSet addInsert(String targetPath, Table table);

50

51

/**

52

* Adds an INSERT operation with overwrite option

53

* @param targetPath Path to target table for insertion

54

* @param table Source table for the insert operation

55

* @param overwrite Whether to overwrite existing data

56

* @return This statement set for method chaining

57

*/

58

StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);

59

60

/**

61

* Adds an INSERT operation using table descriptor

62

* @param targetDescriptor Descriptor defining the target table

63

* @param table Source table for the insert operation

64

* @return This statement set for method chaining

65

*/

66

StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);

67

68

/**

69

* Adds an INSERT operation using table descriptor with overwrite

70

* @param targetDescriptor Descriptor defining the target table

71

* @param table Source table for the insert operation

72

* @param overwrite Whether to overwrite existing data

73

* @return This statement set for method chaining

74

*/

75

StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);

76

}

77

```

78

79

### Execution Methods

80

81

Methods for executing the collected statements.

82

83

```java { .api }

84

/**

85

* Optimizes all statements as one entity and adds them as transformations

86

* to the underlying StreamExecutionEnvironment. Use StreamExecutionEnvironment.execute()

87

* to actually execute them. The added statements will be cleared after calling this method.

88

*/

89

void attachAsDataStream();

90

91

/**

92

* Optimizes and executes all added statements as one job

93

* @return TableResult containing execution results

94

*/

95

TableResult execute();

96

```

97

98

### Explain and Debug

99

100

Methods for analyzing the execution plan.

101

102

```java { .api }

103

/**

104

* Prints the execution plan for all added statements

105

* @param extraDetails Additional details to include in the explanation

106

* @return This statement set for method chaining

107

*/

108

StreamStatementSet printExplain(ExplainDetail... extraDetails);

109

110

/**

111

* Returns the execution plan as a string

112

* @param extraDetails Additional details to include in the explanation

113

* @return Execution plan string

114

*/

115

String explain(ExplainDetail... extraDetails);

116

```

117

118

## Usage Examples

119

120

### Basic Statement Set Usage

121

122

```java

123

import org.apache.flink.table.api.bridge.java.StreamStatementSet;

124

import org.apache.flink.table.api.Table;

125

126

// Create statement set

127

StreamStatementSet statementSet = tableEnv.createStatementSet();

128

129

// Add multiple operations

130

Table salesData = tableEnv.from("sales_source");

131

Table customerData = tableEnv.from("customer_source");

132

133

// Process sales data

134

Table salesSummary = salesData

135

.groupBy($("customer_id"))

136

.select($("customer_id"), $("amount").sum().as("total_sales"));

137

138

// Join with customer data

139

Table customerSales = customerData

140

.join(salesSummary, $("id").isEqual($("customer_id")))

141

.select($("name"), $("total_sales"));

142

143

// Add insert operations to statement set

144

statementSet.addInsert("sales_summary_sink", salesSummary);

145

statementSet.addInsert("customer_sales_sink", customerSales);

146

147

// Execute all operations together

148

statementSet.execute();

149

```

150

151

### SQL Statement Integration

152

153

```java

154

// Mix of programmatic and SQL operations

155

StreamStatementSet statementSet = tableEnv.createStatementSet();

156

157

// Add SQL INSERT statements

158

statementSet.addInsertSql(

159

"INSERT INTO daily_summary " +

160

"SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd') as day, " +

161

" COUNT(*) as order_count, " +

162

" SUM(amount) as total_amount " +

163

"FROM orders " +

164

"GROUP BY DATE_FORMAT(order_time, 'yyyy-MM-dd')"

165

);

166

167

statementSet.addInsertSql(

168

"INSERT INTO product_stats " +

169

"SELECT product_id, " +

170

" COUNT(*) as sales_count, " +

171

" AVG(amount) as avg_amount " +

172

"FROM orders " +

173

"GROUP BY product_id"

174

);

175

176

// Add programmatic operations

177

Table alertData = tableEnv.sqlQuery(

178

"SELECT customer_id, amount FROM orders WHERE amount > 1000"

179

);

180

statementSet.addInsert("high_value_alerts", alertData);

181

182

// Execute all together

183

statementSet.execute();

184

```

185

186

### DataStream Integration

187

188

```java

189

// Attach to DataStream environment for custom execution control

190

StreamStatementSet statementSet = tableEnv.createStatementSet();

191

192

// Add operations

193

statementSet.addInsert("sink1", table1);

194

statementSet.addInsert("sink2", table2);

195

statementSet.addInsert("sink3", table3);

196

197

// Attach to StreamExecutionEnvironment instead of immediate execution

198

statementSet.attachAsDataStream();

199

200

// Add custom DataStream operations

201

DataStream<String> alertStream = env.socketTextStream("localhost", 9999);

202

alertStream.print("alerts");

203

204

// Execute the complete job (both table and stream operations)

205

env.execute("Combined Table and Stream Job");

206

```

207

208

### Table Descriptor Usage

209

210

```java

211

import org.apache.flink.table.api.TableDescriptor;

212

import org.apache.flink.table.api.Schema;

213

import org.apache.flink.table.api.DataTypes;

214

215

// Create table descriptors for dynamic sink creation

216

TableDescriptor csvSinkDescriptor = TableDescriptor.forConnector("filesystem")

217

.schema(Schema.newBuilder()

218

.column("id", DataTypes.BIGINT())

219

.column("name", DataTypes.STRING())

220

.column("amount", DataTypes.DECIMAL(10, 2))

221

.build())

222

.option("path", "/path/to/csv/output")

223

.format("csv")

224

.build();

225

226

TableDescriptor printSinkDescriptor = TableDescriptor.forConnector("print")

227

.schema(Schema.newBuilder()

228

.column("summary", DataTypes.STRING())

229

.column("count", DataTypes.BIGINT())

230

.build())

231

.option("print-identifier", "summary")

232

.build();

233

234

// Use descriptors in statement set

235

StreamStatementSet statementSet = tableEnv.createStatementSet();

236

237

Table processedData = tableEnv.sqlQuery("SELECT id, name, amount FROM source_table");

238

Table summaryData = tableEnv.sqlQuery("SELECT 'Total processed' as summary, COUNT(*) as count FROM source_table");

239

240

statementSet.addInsert(csvSinkDescriptor, processedData);

241

statementSet.addInsert(printSinkDescriptor, summaryData);

242

243

statementSet.execute();

244

```

245

246

### Execution Plan Analysis

247

248

```java

249

import org.apache.flink.table.api.ExplainDetail;

250

251

// Create and populate statement set

252

StreamStatementSet statementSet = tableEnv.createStatementSet();

253

statementSet.addInsert("sink1", complexQuery1);

254

statementSet.addInsert("sink2", complexQuery2);

255

statementSet.addInsert("sink3", complexQuery3);

256

257

// Print execution plan with details

258

statementSet.printExplain(

259

ExplainDetail.CHANGELOG_MODE,

260

ExplainDetail.ESTIMATED_COST,

261

ExplainDetail.JSON_EXECUTION_PLAN

262

);

263

264

// Get execution plan as string for logging

265

String executionPlan = statementSet.explain(ExplainDetail.ESTIMATED_COST);

266

logger.info("Execution plan: {}", executionPlan);

267

268

// Execute after analysis

269

statementSet.execute();

270

```

271

272

## Type Definitions

273

274

### Statement Set Hierarchy

275

276

```java { .api }

277

import org.apache.flink.table.api.StatementSet;

278

import org.apache.flink.table.api.TablePipeline;

279

import org.apache.flink.table.api.TableDescriptor;

280

281

// Interface hierarchy

282

interface StatementSet {

283

StatementSet add(TablePipeline tablePipeline);

284

StatementSet addInsertSql(String statement);

285

// ... base methods

286

}

287

288

interface StreamStatementSet extends StatementSet {

289

void attachAsDataStream();

290

// ... streaming-specific methods

291

}

292

```

293

294

### Table Pipeline

295

296

```java { .api }

297

/**

298

* Represents a table operation pipeline that can be added to statement sets

299

*/

300

interface TablePipeline {

301

// Marker interface for table operations

302

}

303

304

// Table implements TablePipeline

305

Table implements TablePipeline {

306

// Table operations can be added to statement sets

307

}

308

```

309

310

### Explain Details

311

312

```java { .api }

313

import org.apache.flink.table.api.ExplainDetail;

314

315

// Available explanation details

316

ExplainDetail.CHANGELOG_MODE // Show changelog mode information

317

ExplainDetail.ESTIMATED_COST // Show cost estimation

318

ExplainDetail.JSON_EXECUTION_PLAN // Show JSON format execution plan

319

```

320

321

### Execution Results

322

323

```java { .api }

324

import org.apache.flink.table.api.TableResult;

325

326

// Result of statement set execution

327

interface TableResult {

328

JobClient getJobClient(); // Access to job execution

329

TableSchema getTableSchema(); // Result schema

330

CloseableIterator<Row> collect(); // Collect results (blocking)

331

void print(); // Print results to console

332

CompletableFuture<Void> await(); // Wait for completion

333

}

334

```

335

336

## Best Practices

337

338

### Optimization Benefits

339

340

Using statement sets provides several optimization opportunities:

341

342

- **Shared subquery elimination**: Common subexpressions across statements are computed once

343

- **Operator fusion**: Compatible operators are fused to reduce serialization overhead

344

- **Resource planning**: Global optimization considers all operations for resource allocation

345

- **Pipeline optimization**: Cross-statement optimizations like predicate pushdown

346

347

### Performance Guidelines

348

349

```java

350

// Good: Group related operations in statement sets

351

StreamStatementSet etlSet = tableEnv.createStatementSet();

352

etlSet.addInsert("cleaned_data", cleaningQuery);

353

etlSet.addInsert("enriched_data", enrichmentQuery);

354

etlSet.addInsert("aggregated_data", aggregationQuery);

355

etlSet.execute(); // Optimized together

356

357

// Good: Use attachAsDataStream for mixed workloads

358

StreamStatementSet tableOps = tableEnv.createStatementSet();

359

tableOps.addInsert("table_sink", tableQuery);

360

tableOps.attachAsDataStream();

361

362

// Add DataStream operations

363

dataStream.addSink(customSink);

364

env.execute("Mixed workload");

365

366

// Avoid: Executing statements individually when they could be batched

367

table1.executeInsert("sink1"); // Individual execution

368

table2.executeInsert("sink2"); // Individual execution

369

table3.executeInsert("sink3"); // Individual execution

370

```

371

372

### Error Handling

373

374

```java

375

try {

376

StreamStatementSet statementSet = tableEnv.createStatementSet();

377

statementSet.addInsert("sink1", query1);

378

statementSet.addInsert("sink2", query2);

379

380

// Validate before execution

381

String plan = statementSet.explain();

382

logger.debug("Execution plan: {}", plan);

383

384

TableResult result = statementSet.execute();

385

result.await(); // Wait for completion

386

387

} catch (Exception e) {

388

logger.error("Statement set execution failed", e);

389

// Handle failure appropriately

390

}

391

```