0
# Changelog Processing
1
2
## Overview
3
4
Changelog processing enables handling of updating tables and streams that contain INSERT, UPDATE, and DELETE operations. This is essential for processing change data capture (CDC) streams and maintaining consistent state in streaming applications.
5
6
## Core API
7
8
### Changelog Stream to Table Conversion
9
10
```scala { .api }
11
trait StreamTableEnvironment {
12
def fromChangelogStream(dataStream: DataStream[Row]): Table
13
def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table
14
def fromChangelogStream(dataStream: DataStream[Row], schema: Schema, changelogMode: ChangelogMode): Table
15
}
16
```
17
18
### Table to Changelog Stream Conversion
19
20
```scala { .api }
21
trait StreamTableEnvironment {
22
def toChangelogStream(table: Table): DataStream[Row]
23
def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row]
24
def toChangelogStream(table: Table, targetSchema: Schema, changelogMode: ChangelogMode): DataStream[Row]
25
}
26
```
27
28
### DataStreamConversions for Changelog
29
30
```scala { .api }
31
class DataStreamConversions[T](dataStream: DataStream[T]) {
32
def toChangelogTable(tableEnv: StreamTableEnvironment): Table
33
def toChangelogTable(tableEnv: StreamTableEnvironment, schema: Schema): Table
34
def toChangelogTable(tableEnv: StreamTableEnvironment, schema: Schema, changelogMode: ChangelogMode): Table
35
}
36
```
37
38
### TableConversions for Changelog
39
40
```scala { .api }
41
class TableConversions(table: Table) {
42
def toChangelogStream: DataStream[Row]
43
def toChangelogStream(targetSchema: Schema): DataStream[Row]
44
def toChangelogStream(targetSchema: Schema, changelogMode: ChangelogMode): DataStream[Row]
45
}
46
```
47
48
## Changelog Concepts
49
50
### RowKind Operations
51
52
```scala
53
import org.apache.flink.types.{Row, RowKind}
54
55
// Row kinds for changelog operations
56
val insertRow = Row.ofKind(RowKind.INSERT, "Alice", 25)
57
val updateBeforeRow = Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 25)
58
val updateAfterRow = Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 26)
59
val deleteRow = Row.ofKind(RowKind.DELETE, "Alice", 26)
60
```
61
62
### ChangelogMode Types
63
64
```scala
65
import org.apache.flink.table.connector.ChangelogMode
66
67
// Insert-only mode (append streams)
68
val insertOnlyMode = ChangelogMode.insertOnly()
69
70
// Upsert mode (no UPDATE_BEFORE)
71
val upsertMode = ChangelogMode.upsert()
72
73
// Full changelog mode (all operations)
74
val allMode = ChangelogMode.all()
75
76
// Custom changelog mode
77
val customMode = ChangelogMode.newBuilder()
78
.addContainedKind(RowKind.INSERT)
79
.addContainedKind(RowKind.UPDATE_AFTER)
80
.addContainedKind(RowKind.DELETE)
81
.build()
82
```
83
84
## Converting Changelog Streams to Tables
85
86
### Basic Changelog Stream Processing
87
88
```scala
89
// Create a changelog DataStream
90
val changelogData = Seq(
91
Row.ofKind(RowKind.INSERT, "Alice", 25),
92
Row.ofKind(RowKind.INSERT, "Bob", 30),
93
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 25),
94
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 26),
95
Row.ofKind(RowKind.DELETE, "Bob", 30)
96
)
97
98
val changelogStream: DataStream[Row] = env.fromCollection(changelogData)
99
100
// Convert to table (assumes all changelog operations)
101
val changelogTable: Table = tableEnv.fromChangelogStream(changelogStream)
102
```
103
104
### Changelog with Custom Schema
105
106
```scala
107
import org.apache.flink.table.api.Schema
108
import org.apache.flink.table.types.DataTypes
109
110
val schema = Schema.newBuilder()
111
.column("user_name", DataTypes.STRING())
112
.column("user_age", DataTypes.INT())
113
.column("process_time", DataTypes.TIMESTAMP_LTZ(3))
114
.columnByExpression("process_time", "PROCTIME()")
115
.build()
116
117
val changelogTable: Table = tableEnv.fromChangelogStream(changelogStream, schema)
118
```
119
120
### Restricted Changelog Mode
121
122
```scala
123
// Upsert-only changelog (no UPDATE_BEFORE)
124
val upsertChangelogTable: Table = tableEnv.fromChangelogStream(
125
upsertStream,
126
schema,
127
ChangelogMode.upsert()
128
)
129
```
130
131
### Using Implicit Conversions
132
133
```scala
134
import org.apache.flink.table.api.bridge.scala._
135
136
// Must be DataStream[Row] for changelog conversion
137
val changelogTable: Table = changelogStream.toChangelogTable(tableEnv)
138
139
// With schema and changelog mode
140
val changelogTable2: Table = changelogStream.toChangelogTable(tableEnv, schema, ChangelogMode.upsert())
141
```
142
143
## Converting Tables to Changelog Streams
144
145
### Basic Table to Changelog Conversion
146
147
```scala
148
// Table with updates (e.g., from aggregation)
149
val aggregatedTable: Table = tableEnv.sqlQuery("""
150
SELECT user_name, COUNT(*) as event_count
151
FROM user_events
152
GROUP BY user_name
153
""")
154
155
// Convert to changelog stream
156
val changelogStream: DataStream[Row] = tableEnv.toChangelogStream(aggregatedTable)
157
158
// Using implicit conversion
159
val changelogStream2: DataStream[Row] = aggregatedTable.toChangelogStream
160
```
161
162
### Changelog with Custom Schema
163
164
```scala
165
val outputSchema = Schema.newBuilder()
166
.column("name", DataTypes.STRING())
167
.column("count", DataTypes.BIGINT())
168
.column("last_updated", DataTypes.TIMESTAMP_LTZ(3))
169
.columnByExpression("last_updated", "CURRENT_TIMESTAMP")
170
.build()
171
172
val changelogStream: DataStream[Row] = tableEnv.toChangelogStream(aggregatedTable, outputSchema)
173
```
174
175
### Restricted Output Changelog Mode
176
177
```scala
178
// Force upsert mode output (fails if table produces UPDATE_BEFORE)
179
val upsertStream: DataStream[Row] = tableEnv.toChangelogStream(
180
aggregatedTable,
181
outputSchema,
182
ChangelogMode.upsert()
183
)
184
```
185
186
## Advanced Changelog Processing
187
188
### CDC Integration
189
190
```scala
191
// Typical CDC stream from Kafka
192
val cdcStream: DataStream[Row] = env
193
.addSource(new FlinkKafkaConsumer("cdc-topic", new RowDeserializer(), properties))
194
195
// Define schema matching source table
196
val cdcSchema = Schema.newBuilder()
197
.column("id", DataTypes.BIGINT())
198
.column("name", DataTypes.STRING())
199
.column("email", DataTypes.STRING())
200
.column("updated_at", DataTypes.TIMESTAMP_LTZ(3))
201
.primaryKey("id")
202
.build()
203
204
// Create table from CDC stream
205
val cdcTable: Table = tableEnv.fromChangelogStream(cdcStream, cdcSchema)
206
```
207
208
### Stateful Stream Processing
209
210
```scala
211
// Process changelog stream with state
212
val processedTable: Table = tableEnv.sqlQuery("""
213
SELECT
214
user_id,
215
LAST_VALUE(user_name) as current_name,
216
COUNT(*) as update_count
217
FROM cdc_users
218
GROUP BY user_id
219
""")
220
221
// Convert back to changelog for downstream processing
222
val processedChangelog: DataStream[Row] = tableEnv.toChangelogStream(processedTable)
223
```
224
225
### Deduplication
226
227
```scala
228
// Deduplicate changelog stream by keeping latest version
229
val deduplicatedTable: Table = tableEnv.sqlQuery("""
230
SELECT user_id, user_name, email, updated_at
231
FROM (
232
SELECT *,
233
ROW_NUMBER() OVER (
234
PARTITION BY user_id
235
ORDER BY updated_at DESC
236
) as rn
237
FROM cdc_users
238
)
239
WHERE rn = 1
240
""")
241
```
242
243
## Event-Time Processing with Changelog
244
245
### Watermarks in Changelog Streams
246
247
```scala
248
val schema = Schema.newBuilder()
249
.column("id", DataTypes.BIGINT())
250
.column("data", DataTypes.STRING())
251
.column("event_time", DataTypes.TIMESTAMP_LTZ(3))
252
.watermark("event_time", "SOURCE_WATERMARK()")
253
.build()
254
255
val eventTimeChangelogTable: Table = tableEnv.fromChangelogStream(
256
changelogStream,
257
schema,
258
ChangelogMode.all()
259
)
260
```
261
262
### Temporal Operations
263
264
```scala
265
// Temporal join with changelog table
266
val enrichedTable: Table = tableEnv.sqlQuery("""
267
SELECT o.order_id, o.product_id, u.user_name, u.email
268
FROM orders o
269
JOIN users FOR SYSTEM_TIME AS OF o.order_time AS u
270
ON o.user_id = u.user_id
271
""")
272
```
273
274
## Error Handling
275
276
```scala
277
try {
278
val changelogTable = tableEnv.fromChangelogStream(changelogStream, schema, changelogMode)
279
} catch {
280
case e: ValidationException =>
281
// Invalid changelog mode or schema mismatch
282
case e: TableException =>
283
// Changelog processing error
284
}
285
```
286
287
## Performance Considerations
288
289
1. **State Size**: Changelog processing maintains state; monitor state size
290
2. **Changelog Mode**: Use restrictive modes when possible (e.g., upsert vs. full changelog)
291
3. **Watermarks**: Proper watermark configuration is crucial for event-time processing
292
4. **Primary Keys**: Define primary keys for efficient updates and deduplication
293
294
## Best Practices
295
296
1. **Define Primary Keys**: Always define primary keys for changelog tables
297
2. **Use Appropriate Modes**: Choose the most restrictive changelog mode that meets requirements
298
3. **Monitor State**: Monitor state size in production changelog processing jobs
299
4. **Handle Late Data**: Configure appropriate watermark strategies for late arriving updates
300
5. **Schema Evolution**: Plan for schema changes in CDC scenarios
301
6. **Deduplication**: Implement deduplication logic for CDC streams with potential duplicates