0
# Debezium CDC Format
1
2
JSON format support for Debezium Change Data Capture system, enabling processing of database change events from various databases including MySQL, PostgreSQL, SQL Server, MongoDB, and Oracle. Debezium produces structured change events with comprehensive metadata and optional schema information.
3
4
## Capabilities
5
6
### Debezium Format Configuration
7
8
Configuration options specific to Debezium CDC format, including schema inclusion and comprehensive error handling options.
9
10
```java { .api }
11
/**
12
* Configuration options for Debezium JSON format
13
*/
14
public class DebeziumJsonFormatOptions {
15
16
/** Whether schema information is included in messages (default: false) */
17
public static final ConfigOption<Boolean> SCHEMA_INCLUDE;
18
19
/** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */
20
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
21
22
/** Timestamp format pattern (inherited from JsonFormatOptions) */
23
public static final ConfigOption<String> TIMESTAMP_FORMAT;
24
25
/** How to handle null keys in maps (inherited from JsonFormatOptions) */
26
public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
27
28
/** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */
29
public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;
30
}
31
```
32
33
**Configuration Usage:**
34
35
```java
36
import org.apache.flink.configuration.Configuration;
37
import org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions;
38
39
// Configure Debezium format options
40
Configuration config = new Configuration();
41
config.set(DebeziumJsonFormatOptions.SCHEMA_INCLUDE, false);
42
config.set(DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
43
config.set(DebeziumJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
44
```
45
46
## Debezium JSON Structure
47
48
Debezium produces change events with a standardized envelope structure:
49
50
### With Schema (when SCHEMA_INCLUDE = true)
51
52
```json
53
{
54
"schema": {
55
"type": "struct",
56
"fields": [
57
{
58
"type": "struct",
59
"fields": [
60
{"field": "id", "type": "int32"},
61
{"field": "name", "type": "string"},
62
{"field": "email", "type": "string"}
63
],
64
"optional": false,
65
"name": "users.Value"
66
},
67
{
68
"type": "struct",
69
"fields": [
70
{"field": "version", "type": "string"},
71
{"field": "connector", "type": "string"},
72
{"field": "name", "type": "string"},
73
{"field": "ts_ms", "type": "int64"},
74
{"field": "snapshot", "type": "string"},
75
{"field": "db", "type": "string"},
76
{"field": "table", "type": "string"},
77
{"field": "server_id", "type": "int64"},
78
{"field": "gtid", "type": "string"},
79
{"field": "file", "type": "string"},
80
{"field": "pos", "type": "int64"},
81
{"field": "row", "type": "int32"},
82
{"field": "thread", "type": "int64"},
83
{"field": "query", "type": "string"}
84
],
85
"optional": false,
86
"name": "io.debezium.connector.mysql.Source"
87
}
88
],
89
"optional": false,
90
"name": "users.Envelope"
91
},
92
"payload": {
93
"before": null,
94
"after": {
95
"id": 1,
96
"name": "Alice",
97
"email": "alice@example.com"
98
},
99
"source": {
100
"version": "1.9.7.Final",
101
"connector": "mysql",
102
"name": "mysql-server",
103
"ts_ms": 1672574400000,
104
"snapshot": "false",
105
"db": "user_db",
106
"table": "users",
107
"server_id": 1,
108
"gtid": null,
109
"file": "mysql-bin.000001",
110
"pos": 154,
111
"row": 0,
112
"thread": 7,
113
"query": null
114
},
115
"op": "c",
116
"ts_ms": 1672574400123,
117
"transaction": null
118
}
119
}
120
```
121
122
### Without Schema (when SCHEMA_INCLUDE = false, default)
123
124
```json
125
{
126
"before": null,
127
"after": {
128
"id": 1,
129
"name": "Alice",
130
"email": "alice@example.com"
131
},
132
"source": {
133
"version": "1.9.7.Final",
134
"connector": "mysql",
135
"name": "mysql-server",
136
"ts_ms": 1672574400000,
137
"snapshot": "false",
138
"db": "user_db",
139
"table": "users",
140
"server_id": 1,
141
"gtid": null,
142
"file": "mysql-bin.000001",
143
"pos": 154,
144
"row": 0,
145
"thread": 7,
146
"query": null
147
},
148
"op": "c",
149
"ts_ms": 1672574400123,
150
"transaction": null
151
}
152
```
153
154
## Table API Integration
155
156
### SQL DDL Usage
157
158
Create tables using Debezium JSON format for change data capture processing:
159
160
```sql
161
CREATE TABLE debezium_source (
162
id BIGINT,
163
name STRING,
164
email STRING,
165
created_at TIMESTAMP(3),
166
PRIMARY KEY (id) NOT ENFORCED
167
) WITH (
168
'connector' = 'kafka',
169
'topic' = 'debezium-topic',
170
'properties.bootstrap.servers' = 'localhost:9092',
171
'format' = 'debezium-json',
172
'debezium-json.schema-include' = 'false',
173
'debezium-json.ignore-parse-errors' = 'true'
174
);
175
```
176
177
### Programmatic Table Definition
178
179
```java
180
import org.apache.flink.table.api.TableDescriptor;
181
import org.apache.flink.table.api.DataTypes;
182
import org.apache.flink.table.api.Schema;
183
184
TableDescriptor debeziumTable = TableDescriptor.forConnector("kafka")
185
.schema(Schema.newBuilder()
186
.column("id", DataTypes.BIGINT())
187
.column("name", DataTypes.STRING())
188
.column("email", DataTypes.STRING())
189
.column("created_at", DataTypes.TIMESTAMP(3))
190
.primaryKey("id")
191
.build())
192
.option("topic", "debezium-topic")
193
.option("properties.bootstrap.servers", "localhost:9092")
194
.format("debezium-json")
195
.option("debezium-json.schema-include", "false")
196
.option("debezium-json.ignore-parse-errors", "true")
197
.build();
198
```
199
200
## Change Event Processing
201
202
Debezium format handles various types of change events with comprehensive metadata:
203
204
### Insert Events (op: "c" for create)
205
- `before`: null
206
- `after`: New row data
207
- `op`: "c"
208
209
### Update Events (op: "u" for update)
210
- `before`: Previous row data
211
- `after`: Updated row data
212
- `op`: "u"
213
214
### Delete Events (op: "d" for delete)
215
- `before`: Deleted row data
216
- `after`: null
217
- `op`: "d"
218
219
### Read Events (op: "r" for read/snapshot)
220
- `before`: null
221
- `after`: Current row data (from snapshot)
222
- `op`: "r"
223
224
## Source Metadata
225
226
Debezium provides rich source metadata for tracking change origins:
227
228
| Field | Description | Example |
229
|-------|-------------|---------|
230
| version | Debezium version | "1.9.7.Final" |
231
| connector | Source connector type | "mysql", "postgresql" |
232
| name | Connector instance name | "mysql-server" |
233
| ts_ms | Event timestamp (milliseconds) | 1672574400000 |
234
| snapshot | Snapshot indicator | "true", "false", "last" |
235
| db | Source database name | "user_db" |
236
| table | Source table name | "users" |
237
| server_id | Database server ID | 1 |
238
| gtid | Global Transaction ID | MySQL GTID |
239
| file | Binlog file name | "mysql-bin.000001" |
240
| pos | Position in binlog | 154 |
241
| row | Row number in event | 0 |
242
| thread | Thread ID | 7 |
243
| query | SQL query (if available) | "INSERT INTO..." |
244
245
## Metadata Access in Tables
246
247
Access Debezium metadata through special metadata columns:
248
249
```sql
250
CREATE TABLE debezium_with_metadata (
251
-- Regular data columns
252
id BIGINT,
253
name STRING,
254
email STRING,
255
256
-- Metadata columns
257
debezium_op STRING METADATA FROM 'op',
258
debezium_source_ts TIMESTAMP_LTZ(3) METADATA FROM 'source.ts_ms',
259
debezium_source_db STRING METADATA FROM 'source.db',
260
debezium_source_table STRING METADATA FROM 'source.table',
261
debezium_source_connector STRING METADATA FROM 'source.connector',
262
debezium_source_snapshot STRING METADATA FROM 'source.snapshot'
263
) WITH (
264
'connector' = 'kafka',
265
'format' = 'debezium-json'
266
-- other connector options
267
);
268
```
269
270
## Schema Handling
271
272
### Schema Inclusion
273
274
When `SCHEMA_INCLUDE` is enabled, Debezium messages include complete schema information:
275
276
```java
277
// Enable schema inclusion for schema evolution handling
278
config.set(DebeziumJsonFormatOptions.SCHEMA_INCLUDE, true);
279
```
280
281
Benefits of schema inclusion:
282
- Schema evolution detection
283
- Data type validation
284
- Field mapping verification
285
- Better error diagnostics
286
287
### Schema Registry Integration
288
289
For production deployments, consider using Confluent Schema Registry with Debezium:
290
291
```sql
292
CREATE TABLE debezium_avro_source (
293
id BIGINT,
294
name STRING,
295
email STRING
296
) WITH (
297
'connector' = 'kafka',
298
'format' = 'debezium-avro-confluent',
299
'debezium-avro-confluent.url' = 'http://schema-registry:8081'
300
);
301
```
302
303
## Multi-Database Support
304
305
Debezium format supports multiple database systems with consistent envelope structure:
306
307
### MySQL Connector
308
```java
309
// MySQL-specific source fields
310
// server_id, gtid, file, pos, row, thread
311
```
312
313
### PostgreSQL Connector
314
```java
315
// PostgreSQL-specific source fields
316
// lsn, txId, ts_usec
317
```
318
319
### SQL Server Connector
320
```java
321
// SQL Server-specific source fields
322
// change_lsn, commit_lsn, event_serial_no
323
```
324
325
### MongoDB Connector
326
```java
327
// MongoDB-specific source fields
328
// ord, h, tord
329
```
330
331
## Transaction Handling
332
333
Debezium provides transaction boundary information:
334
335
```json
336
{
337
"before": null,
338
"after": {...},
339
"source": {...},
340
"op": "c",
341
"ts_ms": 1672574400123,
342
"transaction": {
343
"id": "571",
344
"total_order": 1,
345
"data_collection_order": 1
346
}
347
}
348
```
349
350
Access transaction metadata:
351
352
```sql
353
CREATE TABLE debezium_with_transaction (
354
id BIGINT,
355
name STRING,
356
-- Transaction metadata
357
transaction_id STRING METADATA FROM 'transaction.id',
358
transaction_total_order BIGINT METADATA FROM 'transaction.total_order',
359
transaction_data_collection_order BIGINT METADATA FROM 'transaction.data_collection_order'
360
) WITH (
361
'connector' = 'kafka',
362
'format' = 'debezium-json'
363
);
364
```
365
366
## Error Handling
367
368
Configure comprehensive error handling for production deployments:
369
370
```java
371
// Ignore parsing errors for resilience
372
config.set(DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
373
374
// Handle timestamp parsing
375
config.set(DebeziumJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
376
377
// Handle null keys in nested maps
378
config.set(DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "DROP");
379
```
380
381
## Production Considerations
382
383
### Performance Optimization
384
- Disable schema inclusion unless needed for schema evolution
385
- Use appropriate Kafka consumer configurations
386
- Consider message compression and batching
387
388
### Reliability
389
- Enable parse error ignoring for production resilience
390
- Implement dead letter queues for failed messages
391
- Monitor Debezium connector health
392
393
### Schema Evolution
394
- Use schema-include for environments with frequent schema changes
395
- Implement schema compatibility checks
396
- Plan for graceful handling of schema evolution events
397
398
### Monitoring
399
- Track parse success/failure rates
400
- Monitor event lag and processing latency
401
- Set up alerts for connector disconnections
402
- Monitor data type conversion errors