0
# Changelog Processing
1
2
Advanced stream processing with support for changelog semantics including inserts, updates, and deletes. This enables handling of updating tables and complex event-driven scenarios with full CRUD operation support.
3
4
## Capabilities
5
6
### Changelog Stream to Table Conversion
7
8
Convert changelog DataStreams containing Row objects with RowKind flags into Tables.
9
10
```java { .api }
11
/**
12
* Converts changelog stream to Table with automatic schema derivation
13
* Supports all RowKind changes (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE)
14
* @param dataStream Changelog stream of Row objects with RowKind flags
15
* @return Table supporting changelog operations
16
*/
17
Table fromChangelogStream(DataStream<Row> dataStream);
18
19
/**
20
* Converts changelog stream to Table with custom schema
21
* @param dataStream Changelog stream of Row objects with RowKind flags
22
* @param schema Custom schema for the resulting table
23
* @return Table with specified schema supporting changelog operations
24
*/
25
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
26
27
/**
28
* Converts changelog stream to Table with specific changelog mode
29
* @param dataStream Changelog stream of Row objects with RowKind flags
30
* @param schema Custom schema for the resulting table
31
* @param changelogMode Expected kinds of changes in the changelog
32
* @return Table with specified schema and changelog mode
33
*/
34
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
35
```
36
37
**Usage Examples:**
38
39
```java
40
import org.apache.flink.streaming.api.datastream.DataStream;
41
import org.apache.flink.table.api.Table;
42
import org.apache.flink.table.api.Schema;
43
import org.apache.flink.table.connector.ChangelogMode;
44
import org.apache.flink.types.Row;
45
import org.apache.flink.types.RowKind;
46
47
// Create changelog stream with various row kinds
48
DataStream<Row> changelogStream = env.fromElements(
49
Row.ofKind(RowKind.INSERT, "Alice", 25),
50
Row.ofKind(RowKind.INSERT, "Bob", 30),
51
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 25),
52
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 26),
53
Row.ofKind(RowKind.DELETE, "Bob", 30)
54
);
55
56
// Convert with automatic schema
57
Table changelogTable = tableEnv.fromChangelogStream(changelogStream);
58
59
// Convert with custom schema
60
Schema schema = Schema.newBuilder()
61
.column("name", "STRING")
62
.column("age", "INT")
63
.primaryKey("name")
64
.build();
65
Table customChangelogTable = tableEnv.fromChangelogStream(changelogStream, schema);
66
67
// Convert with specific changelog mode (upsert mode - no UPDATE_BEFORE)
68
ChangelogMode upsertMode = ChangelogMode.upsert();
69
Table upsertTable = tableEnv.fromChangelogStream(changelogStream, schema, upsertMode);
70
```
71
72
### Table to Changelog Stream Conversion
73
74
Convert Tables to changelog DataStreams with RowKind flags for downstream processing.
75
76
```java { .api }
77
/**
78
* Converts Table to changelog stream with all supported row kinds
79
* @param table The Table to convert (can be updating or insert-only)
80
* @return Changelog stream of Row objects with RowKind flags
81
*/
82
DataStream<Row> toChangelogStream(Table table);
83
84
/**
85
* Converts Table to changelog stream with custom target schema
86
* @param table The Table to convert (can be updating or insert-only)
87
* @param targetSchema Schema for the output stream
88
* @return Changelog stream with specified schema
89
*/
90
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
91
92
/**
93
* Converts Table to changelog stream with specific changelog mode
94
* @param table The Table to convert (can be updating or insert-only)
95
* @param targetSchema Schema for the output stream
96
* @param changelogMode Required kinds of changes in result changelog
97
* @return Changelog stream with specified mode
98
* @throws TableException if table cannot be represented in the specified mode
99
*/
100
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
101
```
102
103
**Usage Examples:**
104
105
```java
106
// Create an updating table
107
Table updatingTable = tableEnv.sqlQuery(
108
"SELECT user_id, COUNT(*) as order_count FROM orders GROUP BY user_id"
109
);
110
111
// Convert to changelog stream (supports all row kinds)
112
DataStream<Row> changelogStream = tableEnv.toChangelogStream(updatingTable);
113
114
// Convert with custom schema
115
Schema outputSchema = Schema.newBuilder()
116
.column("user_id", "STRING")
117
.column("order_count", "BIGINT")
118
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
119
.build();
120
DataStream<Row> customChangelogStream = tableEnv.toChangelogStream(updatingTable, outputSchema);
121
122
// Convert to upsert stream (no UPDATE_BEFORE)
123
ChangelogMode upsertMode = ChangelogMode.upsert();
124
DataStream<Row> upsertStream = tableEnv.toChangelogStream(updatingTable, outputSchema, upsertMode);
125
126
// Process changelog stream
127
changelogStream.process(new ProcessFunction<Row, String>() {
128
@Override
129
public void processElement(Row row, Context ctx, Collector<String> out) {
130
RowKind kind = row.getKind();
131
switch (kind) {
132
case INSERT:
133
out.collect("New record: " + row);
134
break;
135
case UPDATE_AFTER:
136
out.collect("Updated record: " + row);
137
break;
138
case DELETE:
139
out.collect("Deleted record: " + row);
140
break;
141
// Handle other row kinds...
142
}
143
}
144
});
145
```
146
147
### Schema Configuration for Changelog Streams
148
149
Advanced schema configuration including metadata columns and watermark propagation.
150
151
```java { .api }
152
// Schema with metadata column for timestamp propagation
153
Schema timestampSchema = Schema.newBuilder()
154
.column("id", "STRING")
155
.column("name", "STRING")
156
.column("age", "INT")
157
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
158
.watermark("rowtime", "SOURCE_WATERMARK()")
159
.build();
160
161
// Schema with computed columns
162
Schema computedSchema = Schema.newBuilder()
163
.column("user_id", "STRING")
164
.column("score", "INT")
165
.columnByExpression("score_category",
166
"CASE WHEN score >= 90 THEN 'A' " +
167
"WHEN score >= 80 THEN 'B' " +
168
"ELSE 'C' END")
169
.build();
170
171
// Schema with primary key for upsert semantics
172
Schema upsertSchema = Schema.newBuilder()
173
.column("product_id", "STRING")
174
.column("product_name", "STRING")
175
.column("price", "DECIMAL(10, 2)")
176
.primaryKey("product_id")
177
.build();
178
```
179
180
## Changelog Modes
181
182
### Standard Changelog Mode
183
184
Supports all row kinds for complete CRUD operations.
185
186
```java { .api }
187
// Default changelog mode - supports all row kinds
188
ChangelogMode standardMode = ChangelogMode.all();
189
190
// Manually specify supported row kinds
191
ChangelogMode customMode = ChangelogMode.newBuilder()
192
.addContainedKind(RowKind.INSERT)
193
.addContainedKind(RowKind.UPDATE_AFTER)
194
.addContainedKind(RowKind.DELETE)
195
.build();
196
```
197
198
### Upsert Mode
199
200
Optimized mode without UPDATE_BEFORE for key-based updates.
201
202
```java { .api }
203
// Upsert mode - no UPDATE_BEFORE, only INSERT, UPDATE_AFTER, DELETE
204
ChangelogMode upsertMode = ChangelogMode.upsert();
205
```
206
207
### Insert-Only Mode
208
209
For append-only streams without updates or deletes.
210
211
```java { .api }
212
// Insert-only mode - only INSERT row kind
213
ChangelogMode insertOnlyMode = ChangelogMode.insertOnly();
214
```
215
216
## Row Kind Processing
217
218
### Working with RowKind
219
220
Understanding and processing different types of changelog events.
221
222
```java { .api }
223
import org.apache.flink.types.Row;
224
import org.apache.flink.types.RowKind;
225
226
// Create rows with specific kinds
227
Row insertRow = Row.ofKind(RowKind.INSERT, "Alice", 25);
228
Row updateBeforeRow = Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 25);
229
Row updateAfterRow = Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 26);
230
Row deleteRow = Row.ofKind(RowKind.DELETE, "Bob", 30);
231
232
// Check row kind
233
RowKind kind = row.getKind();
234
if (kind == RowKind.INSERT) {
235
// Handle insert
236
} else if (kind == RowKind.UPDATE_AFTER) {
237
// Handle update
238
} else if (kind == RowKind.DELETE) {
239
// Handle delete
240
}
241
```
242
243
### Processing Changelog Streams
244
245
Common patterns for processing changelog data.
246
247
```java
248
// Process changelog with stateful operations
249
changelogStream
250
.keyBy(row -> row.getField(0)) // Key by first field
251
.process(new KeyedProcessFunction<String, Row, String>() {
252
private ValueState<String> currentValue;
253
254
@Override
255
public void open(Configuration parameters) {
256
currentValue = getRuntimeContext().getState(
257
new ValueStateDescriptor<>("current", String.class));
258
}
259
260
@Override
261
public void processElement(Row row, Context ctx, Collector<String> out)
262
throws Exception {
263
RowKind kind = row.getKind();
264
String key = (String) row.getField(0);
265
266
switch (kind) {
267
case INSERT:
268
case UPDATE_AFTER:
269
currentValue.update(row.toString());
270
out.collect("Current state for " + key + ": " + row);
271
break;
272
case DELETE:
273
currentValue.clear();
274
out.collect("Deleted state for " + key);
275
break;
276
}
277
}
278
});
279
```
280
281
## Types
282
283
### Changelog Processing Types
284
285
```java { .api }
286
import org.apache.flink.table.connector.ChangelogMode;
287
import org.apache.flink.types.Row;
288
import org.apache.flink.types.RowKind;
289
import org.apache.flink.table.api.Schema;
290
```
291
292
### Row Creation Utilities
293
294
```java { .api }
295
// Row creation with RowKind
296
Row insertRow = Row.ofKind(RowKind.INSERT, field1, field2, field3);
297
Row updateRow = Row.ofKind(RowKind.UPDATE_AFTER, field1, field2, field3);
298
Row deleteRow = Row.ofKind(RowKind.DELETE, field1, field2, field3);
299
300
// Row kind manipulation
301
Row row = Row.of(field1, field2);
302
row.setKind(RowKind.UPDATE_AFTER);
303
RowKind kind = row.getKind();
304
```