or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

changelog-processing.mddatastream-integration.mdenvironment-setup.mdimplicit-conversions.mdindex.mdstatement-sets.mdtable-operations.md

changelog-processing.mddocs/

0

# Changelog Processing

1

2

## Overview

3

4

Changelog processing enables handling of updating tables and streams that contain INSERT, UPDATE, and DELETE operations. This is essential for processing change data capture (CDC) streams and maintaining consistent state in streaming applications.

5

6

## Core API

7

8

### Changelog Stream to Table Conversion

9

10

```scala { .api }

11

trait StreamTableEnvironment {

12

def fromChangelogStream(dataStream: DataStream[Row]): Table

13

def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table

14

def fromChangelogStream(dataStream: DataStream[Row], schema: Schema, changelogMode: ChangelogMode): Table

15

}

16

```

17

18

### Table to Changelog Stream Conversion

19

20

```scala { .api }

21

trait StreamTableEnvironment {

22

def toChangelogStream(table: Table): DataStream[Row]

23

def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row]

24

def toChangelogStream(table: Table, targetSchema: Schema, changelogMode: ChangelogMode): DataStream[Row]

25

}

26

```

27

28

### DataStreamConversions for Changelog

29

30

```scala { .api }

31

class DataStreamConversions[T](dataStream: DataStream[T]) {

32

def toChangelogTable(tableEnv: StreamTableEnvironment): Table

33

def toChangelogTable(tableEnv: StreamTableEnvironment, schema: Schema): Table

34

def toChangelogTable(tableEnv: StreamTableEnvironment, schema: Schema, changelogMode: ChangelogMode): Table

35

}

36

```

37

38

### TableConversions for Changelog

39

40

```scala { .api }

41

class TableConversions(table: Table) {

42

def toChangelogStream: DataStream[Row]

43

def toChangelogStream(targetSchema: Schema): DataStream[Row]

44

def toChangelogStream(targetSchema: Schema, changelogMode: ChangelogMode): DataStream[Row]

45

}

46

```

47

48

## Changelog Concepts

49

50

### RowKind Operations

51

52

```scala

53

import org.apache.flink.types.{Row, RowKind}

54

55

// Row kinds for changelog operations

56

val insertRow = Row.ofKind(RowKind.INSERT, "Alice", 25)

57

val updateBeforeRow = Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 25)

58

val updateAfterRow = Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 26)

59

val deleteRow = Row.ofKind(RowKind.DELETE, "Alice", 26)

60

```

61

62

### ChangelogMode Types

63

64

```scala

65

import org.apache.flink.table.connector.ChangelogMode

66

67

// Insert-only mode (append streams)

68

val insertOnlyMode = ChangelogMode.insertOnly()

69

70

// Upsert mode (no UPDATE_BEFORE)

71

val upsertMode = ChangelogMode.upsert()

72

73

// Full changelog mode (all operations)

74

val allMode = ChangelogMode.all()

75

76

// Custom changelog mode

77

val customMode = ChangelogMode.newBuilder()

78

.addContainedKind(RowKind.INSERT)

79

.addContainedKind(RowKind.UPDATE_AFTER)

80

.addContainedKind(RowKind.DELETE)

81

.build()

82

```

83

84

## Converting Changelog Streams to Tables

85

86

### Basic Changelog Stream Processing

87

88

```scala

89

// Create a changelog DataStream

90

val changelogData = Seq(

91

Row.ofKind(RowKind.INSERT, "Alice", 25),

92

Row.ofKind(RowKind.INSERT, "Bob", 30),

93

Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 25),

94

Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 26),

95

Row.ofKind(RowKind.DELETE, "Bob", 30)

96

)

97

98

val changelogStream: DataStream[Row] = env.fromCollection(changelogData)

99

100

// Convert to table (assumes all changelog operations)

101

val changelogTable: Table = tableEnv.fromChangelogStream(changelogStream)

102

```

103

104

### Changelog with Custom Schema

105

106

```scala

107

import org.apache.flink.table.api.Schema

108

import org.apache.flink.table.types.DataTypes

109

110

val schema = Schema.newBuilder()

111

.column("user_name", DataTypes.STRING())

112

.column("user_age", DataTypes.INT())

113

.column("process_time", DataTypes.TIMESTAMP_LTZ(3))

114

.columnByExpression("process_time", "PROCTIME()")

115

.build()

116

117

val changelogTable: Table = tableEnv.fromChangelogStream(changelogStream, schema)

118

```

119

120

### Restricted Changelog Mode

121

122

```scala

123

// Upsert-only changelog (no UPDATE_BEFORE)

124

val upsertChangelogTable: Table = tableEnv.fromChangelogStream(

125

upsertStream,

126

schema,

127

ChangelogMode.upsert()

128

)

129

```

130

131

### Using Implicit Conversions

132

133

```scala

134

import org.apache.flink.table.api.bridge.scala._

135

136

// Must be DataStream[Row] for changelog conversion

137

val changelogTable: Table = changelogStream.toChangelogTable(tableEnv)

138

139

// With schema and changelog mode

140

val changelogTable2: Table = changelogStream.toChangelogTable(tableEnv, schema, ChangelogMode.upsert())

141

```

142

143

## Converting Tables to Changelog Streams

144

145

### Basic Table to Changelog Conversion

146

147

```scala

148

// Table with updates (e.g., from aggregation)

149

val aggregatedTable: Table = tableEnv.sqlQuery("""

150

SELECT user_name, COUNT(*) as event_count

151

FROM user_events

152

GROUP BY user_name

153

""")

154

155

// Convert to changelog stream

156

val changelogStream: DataStream[Row] = tableEnv.toChangelogStream(aggregatedTable)

157

158

// Using implicit conversion

159

val changelogStream2: DataStream[Row] = aggregatedTable.toChangelogStream

160

```

161

162

### Changelog with Custom Schema

163

164

```scala

165

val outputSchema = Schema.newBuilder()

166

.column("name", DataTypes.STRING())

167

.column("count", DataTypes.BIGINT())

168

.column("last_updated", DataTypes.TIMESTAMP_LTZ(3))

169

.columnByExpression("last_updated", "CURRENT_TIMESTAMP")

170

.build()

171

172

val changelogStream: DataStream[Row] = tableEnv.toChangelogStream(aggregatedTable, outputSchema)

173

```

174

175

### Restricted Output Changelog Mode

176

177

```scala

178

// Force upsert mode output (fails if table produces UPDATE_BEFORE)

179

val upsertStream: DataStream[Row] = tableEnv.toChangelogStream(

180

aggregatedTable,

181

outputSchema,

182

ChangelogMode.upsert()

183

)

184

```

185

186

## Advanced Changelog Processing

187

188

### CDC Integration

189

190

```scala

191

// Typical CDC stream from Kafka

192

val cdcStream: DataStream[Row] = env

193

.addSource(new FlinkKafkaConsumer("cdc-topic", new RowDeserializer(), properties))

194

195

// Define schema matching source table

196

val cdcSchema = Schema.newBuilder()

197

.column("id", DataTypes.BIGINT())

198

.column("name", DataTypes.STRING())

199

.column("email", DataTypes.STRING())

200

.column("updated_at", DataTypes.TIMESTAMP_LTZ(3))

201

.primaryKey("id")

202

.build()

203

204

// Create table from CDC stream

205

val cdcTable: Table = tableEnv.fromChangelogStream(cdcStream, cdcSchema)

206

```

207

208

### Stateful Stream Processing

209

210

```scala

211

// Process changelog stream with state

212

val processedTable: Table = tableEnv.sqlQuery("""

213

SELECT

214

user_id,

215

LAST_VALUE(user_name) as current_name,

216

COUNT(*) as update_count

217

FROM cdc_users

218

GROUP BY user_id

219

""")

220

221

// Convert back to changelog for downstream processing

222

val processedChangelog: DataStream[Row] = tableEnv.toChangelogStream(processedTable)

223

```

224

225

### Deduplication

226

227

```scala

228

// Deduplicate changelog stream by keeping latest version

229

val deduplicatedTable: Table = tableEnv.sqlQuery("""

230

SELECT user_id, user_name, email, updated_at

231

FROM (

232

SELECT *,

233

ROW_NUMBER() OVER (

234

PARTITION BY user_id

235

ORDER BY updated_at DESC

236

) as rn

237

FROM cdc_users

238

)

239

WHERE rn = 1

240

""")

241

```

242

243

## Event-Time Processing with Changelog

244

245

### Watermarks in Changelog Streams

246

247

```scala

248

val schema = Schema.newBuilder()

249

.column("id", DataTypes.BIGINT())

250

.column("data", DataTypes.STRING())

251

.column("event_time", DataTypes.TIMESTAMP_LTZ(3))

252

.watermark("event_time", "SOURCE_WATERMARK()")

253

.build()

254

255

val eventTimeChangelogTable: Table = tableEnv.fromChangelogStream(

256

changelogStream,

257

schema,

258

ChangelogMode.all()

259

)

260

```

261

262

### Temporal Operations

263

264

```scala

265

// Temporal join with changelog table

266

val enrichedTable: Table = tableEnv.sqlQuery("""

267

SELECT o.order_id, o.product_id, u.user_name, u.email

268

FROM orders o

269

JOIN users FOR SYSTEM_TIME AS OF o.order_time AS u

270

ON o.user_id = u.user_id

271

""")

272

```

273

274

## Error Handling

275

276

```scala

277

try {

278

val changelogTable = tableEnv.fromChangelogStream(changelogStream, schema, changelogMode)

279

} catch {

280

case e: ValidationException =>

281

// Invalid changelog mode or schema mismatch

282

case e: TableException =>

283

// Changelog processing error

284

}

285

```

286

287

## Performance Considerations

288

289

1. **State Size**: Changelog processing maintains state; monitor state size

290

2. **Changelog Mode**: Use restrictive modes when possible (e.g., upsert vs. full changelog)

291

3. **Watermarks**: Proper watermark configuration is crucial for event-time processing

292

4. **Primary Keys**: Define primary keys for efficient updates and deduplication

293

294

## Best Practices

295

296

1. **Define Primary Keys**: Always define primary keys for changelog tables

297

2. **Use Appropriate Modes**: Choose the most restrictive changelog mode that meets requirements

298

3. **Monitor State**: Monitor state size in production changelog processing jobs

299

4. **Handle Late Data**: Configure appropriate watermark strategies for late arriving updates

300

5. **Schema Evolution**: Plan for schema changes in CDC scenarios

301

6. **Deduplication**: Implement deduplication logic for CDC streams with potential duplicates