or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

datastream-conversions.mdimplicit-conversions.mdindex.mdstatement-sets.mdstream-table-environment.mdtable-conversions.md

statement-sets.mddocs/

0

# Statement Sets

1

2

The StreamStatementSet provides functionality for batching multiple table operations together for optimized execution and resource management. It extends the base StatementSet with streaming-specific capabilities.

3

4

## Capabilities

5

6

### Statement Set Creation

7

8

Create a statement set from a StreamTableEnvironment for batching operations.

9

10

```scala { .api }

11

// Created via StreamTableEnvironment

12

val statementSet = tableEnv.createStatementSet()

13

```

14

15

### Table Pipeline Management

16

17

Add table processing pipelines to the statement set for batch execution.

18

19

```scala { .api }

20

/**

21

* Add table pipeline to statement set

22

* @param tablePipeline The table pipeline to add

23

* @return This StreamStatementSet for method chaining

24

*/

25

def add(tablePipeline: TablePipeline): StreamStatementSet

26

```

27

28

**Usage Example:**

29

30

```scala

31

import org.apache.flink.table.api._

32

33

val statementSet = tableEnv.createStatementSet()

34

35

// Create table pipelines

36

val pipeline1 = tableEnv.from("source_table_1").insertInto("sink_table_1")

37

val pipeline2 = tableEnv.from("source_table_2").insertInto("sink_table_2")

38

39

statementSet

40

.add(pipeline1)

41

.add(pipeline2)

42

.attachAsDataStream()

43

```

44

45

### SQL Statement Management

46

47

Add SQL insert statements to the statement set.

48

49

```scala { .api }

50

/**

51

* Add insert SQL statement to statement set

52

* @param statement The SQL insert statement

53

* @return This StreamStatementSet for method chaining

54

*/

55

def addInsertSql(statement: String): StreamStatementSet

56

```

57

58

**Usage Example:**

59

60

```scala

61

val statementSet = tableEnv.createStatementSet()

62

63

statementSet

64

.addInsertSql("INSERT INTO sink_table_1 SELECT * FROM source_table_1 WHERE age > 25")

65

.addInsertSql("INSERT INTO sink_table_2 SELECT name, COUNT(*) FROM source_table_2 GROUP BY name")

66

.attachAsDataStream()

67

```

68

69

### Table Insert Operations

70

71

Add table insert operations with various configuration options.

72

73

```scala { .api }

74

/**

75

* Add insert operation with target path

76

* @param targetPath The target table path

77

* @param table The table to insert

78

* @return This StreamStatementSet for method chaining

79

*/

80

def addInsert(targetPath: String, table: Table): StreamStatementSet

81

82

/**

83

* Add insert operation with target path and overwrite option

84

* @param targetPath The target table path

85

* @param table The table to insert

86

* @param overwrite Whether to overwrite existing data

87

* @return This StreamStatementSet for method chaining

88

*/

89

def addInsert(targetPath: String, table: Table, overwrite: Boolean): StreamStatementSet

90

91

/**

92

* Add insert operation with table descriptor

93

* @param targetDescriptor The target table descriptor

94

* @param table The table to insert

95

* @return This StreamStatementSet for method chaining

96

*/

97

def addInsert(targetDescriptor: TableDescriptor, table: Table): StreamStatementSet

98

99

/**

100

* Add insert operation with table descriptor and overwrite option

101

* @param targetDescriptor The target table descriptor

102

* @param table The table to insert

103

* @param overwrite Whether to overwrite existing data

104

* @return This StreamStatementSet for method chaining

105

*/

106

def addInsert(targetDescriptor: TableDescriptor, table: Table, overwrite: Boolean): StreamStatementSet

107

```

108

109

**Usage Examples:**

110

111

```scala

112

val statementSet = tableEnv.createStatementSet()

113

val processedTable1 = tableEnv.from("source_table_1").filter($"age" > 25)

114

val processedTable2 = tableEnv.from("source_table_2").groupBy($"name").select($"name", $"age".count())

115

116

// Insert with target path

117

statementSet

118

.addInsert("sink_table_1", processedTable1)

119

.addInsert("sink_table_2", processedTable2, true) // with overwrite

120

121

// Insert with table descriptor

122

val sinkDescriptor = TableDescriptor.forConnector("filesystem")

123

.schema(Schema.newBuilder()

124

.column("name", DataTypes.STRING())

125

.column("age", DataTypes.INT())

126

.build())

127

.option("path", "/path/to/sink")

128

.format("csv")

129

.build()

130

131

statementSet

132

.addInsert(sinkDescriptor, processedTable1)

133

.attachAsDataStream()

134

```

135

136

### Execution and Debugging

137

138

Methods for explaining execution plans and attaching to the streaming environment.

139

140

```scala { .api }

141

/**

142

* Print execution plan with optional extra details

143

* @param extraDetails Additional details to include in explanation

144

* @return This StreamStatementSet for method chaining

145

*/

146

def printExplain(extraDetails: ExplainDetail*): StreamStatementSet

147

148

/**

149

* Attach statements to underlying StreamExecutionEnvironment

150

* This triggers the actual execution setup

151

*/

152

def attachAsDataStream(): Unit

153

```

154

155

**Usage Examples:**

156

157

```scala

158

val statementSet = tableEnv.createStatementSet()

159

160

statementSet

161

.addInsertSql("INSERT INTO sink SELECT * FROM source WHERE age > 25")

162

.printExplain(ExplainDetail.CHANGELOG_MODE, ExplainDetail.COST)

163

.attachAsDataStream()

164

165

// The execution plan will be printed before attachment

166

```

167

168

## Complete Usage Example

169

170

Here's a comprehensive example showing how to use StreamStatementSet for complex multi-table operations:

171

172

```scala

173

import org.apache.flink.table.api._

174

import org.apache.flink.table.api.bridge.scala._

175

import org.apache.flink.streaming.api.scala._

176

177

val env = StreamExecutionEnvironment.getExecutionEnvironment

178

val tableEnv = StreamTableEnvironment.create(env)

179

180

// Register source tables

181

tableEnv.executeSql("""

182

CREATE TABLE users (

183

id INT,

184

name STRING,

185

age INT,

186

city STRING

187

) WITH (

188

'connector' = 'filesystem',

189

'path' = '/path/to/users.csv',

190

'format' = 'csv'

191

)

192

""")

193

194

tableEnv.executeSql("""

195

CREATE TABLE orders (

196

order_id INT,

197

user_id INT,

198

amount DECIMAL(10,2),

199

order_time TIMESTAMP(3)

200

) WITH (

201

'connector' = 'filesystem',

202

'path' = '/path/to/orders.csv',

203

'format' = 'csv'

204

)

205

""")

206

207

// Create statement set for batch operations

208

val statementSet = tableEnv.createStatementSet()

209

210

// Add multiple insert operations

211

val youngUsers = tableEnv.sqlQuery("SELECT * FROM users WHERE age < 30")

212

val userStats = tableEnv.sqlQuery("""

213

SELECT

214

u.city,

215

COUNT(*) as user_count,

216

AVG(u.age) as avg_age

217

FROM users u

218

GROUP BY u.city

219

""")

220

221

val orderSummary = tableEnv.sqlQuery("""

222

SELECT

223

u.city,

224

COUNT(o.order_id) as order_count,

225

SUM(o.amount) as total_amount

226

FROM users u

227

JOIN orders o ON u.id = o.user_id

228

GROUP BY u.city

229

""")

230

231

statementSet

232

.addInsert("young_users_sink", youngUsers)

233

.addInsert("user_stats_sink", userStats)

234

.addInsert("order_summary_sink", orderSummary)

235

.addInsertSql("INSERT INTO audit_log SELECT 'batch_job' as job_type, CURRENT_TIMESTAMP as run_time")

236

.printExplain() // Print execution plan

237

.attachAsDataStream() // Execute all operations together

238

239

env.execute("Multi-table ETL Job")

240

```

241

242

## Benefits of Statement Sets

243

244

### Performance Optimization

245

- **Shared computation**: Common subqueries are computed once and reused

246

- **Optimized execution plans**: The planner can optimize across multiple statements

247

- **Resource sharing**: Efficient use of parallelism and memory

248

249

### Operational Benefits

250

- **Atomic execution**: All statements succeed or fail together

251

- **Simplified debugging**: Single execution plan for multiple operations

252

- **Better resource management**: Coordinated scheduling of related operations

253

254

### Best Practices

255

- **Group related operations**: Batch operations that work on similar data

256

- **Use for ETL pipelines**: Ideal for multi-sink data processing jobs

257

- **Consider data freshness**: All operations execute with the same timestamp

258

- **Monitor resource usage**: Large statement sets may require more memory