0
# Debezium Change Data Capture Format
1
2
Debezium Avro format support for change data capture scenarios with Confluent Schema Registry integration. Handles INSERT, UPDATE, and DELETE operations with before/after record states for real-time data synchronization.
3
4
## Capabilities
5
6
### Debezium Format Factory
7
8
Factory for creating Debezium Avro format instances with full change data capture support.
9
10
```java { .api }
11
/**
12
* Format identifier for SQL DDL
13
*/
14
String IDENTIFIER = "debezium-avro-confluent";
15
16
/**
17
* Creates decoding format for change data capture sources
18
* @param context Table factory context
19
* @param formatOptions Configuration options
20
* @return Decoding format supporting CDC operations
21
*/
22
DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
23
DynamicTableFactory.Context context,
24
ReadableConfig formatOptions
25
);
26
27
/**
28
* Creates encoding format for change data capture sinks
29
* @param context Table factory context
30
* @param formatOptions Configuration options
31
* @return Encoding format supporting CDC operations
32
*/
33
EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
34
DynamicTableFactory.Context context,
35
ReadableConfig formatOptions
36
);
37
38
/**
39
* Returns changelog mode supporting all CDC operations
40
* @return ChangelogMode with INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE
41
*/
42
ChangelogMode getChangelogMode();
43
```
44
45
### Debezium Deserialization Schema
46
47
Deserializes Debezium change event records from Avro format, extracting row kind and data changes.
48
49
```java { .api }
50
/**
51
* Deserialization schema for Debezium Avro change events
52
* Handles before/after states and operation types
53
*/
54
@Internal
55
class DebeziumAvroDeserializationSchema implements DeserializationSchema<RowData> {
56
57
/**
58
* Constructor for Debezium deserializer
59
* @param rowType Expected row type for output
60
* @param producedTypeInfo Type information for output
61
* @param schemaRegistryURL Schema Registry URL
62
* @param schema Optional explicit schema string
63
* @param registryConfigs Additional registry configurations
64
*/
65
DebeziumAvroDeserializationSchema(
66
RowType rowType,
67
TypeInformation<RowData> producedTypeInfo,
68
String schemaRegistryURL,
69
@Nullable String schema,
70
@Nullable Map<String, ?> registryConfigs
71
);
72
}
73
```
74
75
### Debezium Serialization Schema
76
77
Serializes Flink RowData to Debezium Avro format with proper change event structure.
78
79
```java { .api }
80
/**
81
* Serialization schema for Debezium Avro change events
82
* Creates proper before/after/op structure
83
*/
84
@Internal
85
class DebeziumAvroSerializationSchema implements SerializationSchema<RowData> {
86
87
/**
88
* Constructor for Debezium serializer
89
* @param rowType Input row type
90
* @param schemaRegistryURL Schema Registry URL
91
* @param subject Schema Registry subject
92
* @param schema Optional explicit schema string
93
* @param registryConfigs Additional registry configurations
94
*/
95
DebeziumAvroSerializationSchema(
96
RowType rowType,
97
String schemaRegistryURL,
98
String subject,
99
@Nullable String schema,
100
@Nullable Map<String, ?> registryConfigs
101
);
102
}
103
```
104
105
## SQL Table Integration
106
107
### Source Table for Change Data Capture
108
109
```sql
110
CREATE TABLE debezium_source (
111
id BIGINT,
112
name STRING,
113
email STRING,
114
updated_at TIMESTAMP(3)
115
) WITH (
116
'connector' = 'kafka',
117
'topic' = 'mysql.inventory.users',
118
'properties.bootstrap.servers' = 'localhost:9092',
119
'format' = 'debezium-avro-confluent',
120
'debezium-avro-confluent.url' = 'http://localhost:8081'
121
);
122
```
123
124
### Sink Table for Change Data Capture
125
126
```sql
127
CREATE TABLE debezium_sink (
128
id BIGINT,
129
name STRING,
130
email STRING,
131
updated_at TIMESTAMP(3)
132
) WITH (
133
'connector' = 'kafka',
134
'topic' = 'processed.users',
135
'properties.bootstrap.servers' = 'localhost:9092',
136
'format' = 'debezium-avro-confluent',
137
'debezium-avro-confluent.url' = 'http://localhost:8081',
138
'debezium-avro-confluent.subject' = 'processed.users-value'
139
);
140
```
141
142
## Debezium Schema Structure
143
144
The Debezium format expects or produces Avro records with the following structure:
145
146
```json
147
{
148
"type": "record",
149
"name": "Envelope",
150
"fields": [
151
{
152
"name": "before",
153
"type": ["null", {
154
"type": "record",
155
"name": "User",
156
"fields": [
157
{"name": "id", "type": "long"},
158
{"name": "name", "type": ["null", "string"]},
159
{"name": "email", "type": ["null", "string"]}
160
]
161
}],
162
"default": null
163
},
164
{
165
"name": "after",
166
"type": ["null", "User"],
167
"default": null
168
},
169
{
170
"name": "op",
171
"type": "string"
172
}
173
]
174
}
175
```
176
177
## Change Event Operations
178
179
The format handles the following Debezium operation types and their Flink RowKind mappings:
180
181
- **"c" (CREATE/INSERT)**: `after` contains new record, `before` is null → Produces `RowKind.INSERT`
182
- **"u" (UPDATE)**: Both `before` and `after` contain record states → Produces two records: `RowKind.UPDATE_BEFORE` (from `before`) and `RowKind.UPDATE_AFTER` (from `after`)
183
- **"d" (DELETE)**: `before` contains deleted record, `after` is null → Produces `RowKind.DELETE` (from `before`)
184
- **"r" (READ)**: Initial snapshot record, `after` contains data → Produces `RowKind.INSERT`
185
186
**Important:** For UPDATE and DELETE operations, the `before` field must not be null. If it is null, the deserializer throws an IllegalStateException with a message about PostgreSQL REPLICA IDENTITY settings.
187
188
### Serialization RowKind Mappings
189
190
When serializing from Flink RowData to Debezium format, the following RowKind mappings apply:
191
192
- **`RowKind.INSERT`** → `before` = null, `after` = rowData, `op` = "c"
193
- **`RowKind.UPDATE_AFTER`** → `before` = null, `after` = rowData, `op` = "c"
194
- **`RowKind.UPDATE_BEFORE`** → `before` = rowData, `after` = null, `op` = "d"
195
- **`RowKind.DELETE`** → `before` = rowData, `after` = null, `op` = "d"
196
197
## Usage Example
198
199
```java
200
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
201
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
202
203
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
204
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
205
206
// Create source table for Debezium CDC
207
tableEnv.executeSql(
208
"CREATE TABLE user_changes (" +
209
" id BIGINT," +
210
" name STRING," +
211
" email STRING," +
212
" updated_at TIMESTAMP(3)" +
213
") WITH (" +
214
" 'connector' = 'kafka'," +
215
" 'topic' = 'mysql.inventory.users'," +
216
" 'properties.bootstrap.servers' = 'localhost:9092'," +
217
" 'format' = 'debezium-avro-confluent'," +
218
" 'debezium-avro-confluent.url' = 'http://localhost:8081'" +
219
")"
220
);
221
222
// Process change events
223
tableEnv.executeSql(
224
"INSERT INTO processed_users " +
225
"SELECT id, UPPER(name) as name, email, updated_at " +
226
"FROM user_changes " +
227
"WHERE name IS NOT NULL"
228
);
229
```
230
231
## Configuration Notes
232
233
- The **`subject`** parameter is required for serialization to register schemas
234
- Schema Registry authentication and SSL options are inherited from `AvroConfluentFormatOptions`
235
- The format automatically handles schema evolution through the Schema Registry
236
- All Debezium metadata fields (source, ts_ms, etc.) are available if included in the schema