or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

canal-cdc.mdcore-json.mddebezium-cdc.mdindex.mdmaxwell-cdc.mdogg-cdc.md

debezium-cdc.mddocs/

0

# Debezium CDC Format

1

2

JSON format support for Debezium Change Data Capture system, enabling processing of database change events from various databases including MySQL, PostgreSQL, SQL Server, MongoDB, and Oracle. Debezium produces structured change events with comprehensive metadata and optional schema information.

3

4

## Capabilities

5

6

### Debezium Format Configuration

7

8

Configuration options specific to Debezium CDC format, including schema inclusion and comprehensive error handling options.

9

10

```java { .api }

11

/**

12

* Configuration options for Debezium JSON format

13

*/

14

public class DebeziumJsonFormatOptions {

15

16

/** Whether schema information is included in messages (default: false) */

17

public static final ConfigOption<Boolean> SCHEMA_INCLUDE;

18

19

/** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */

20

public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;

21

22

/** Timestamp format pattern (inherited from JsonFormatOptions) */

23

public static final ConfigOption<String> TIMESTAMP_FORMAT;

24

25

/** How to handle null keys in maps (inherited from JsonFormatOptions) */

26

public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;

27

28

/** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */

29

public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;

30

}

31

```

32

33

**Configuration Usage:**

34

35

```java

36

import org.apache.flink.configuration.Configuration;

37

import org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions;

38

39

// Configure Debezium format options

40

Configuration config = new Configuration();

41

config.set(DebeziumJsonFormatOptions.SCHEMA_INCLUDE, false);

42

config.set(DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS, true);

43

config.set(DebeziumJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");

44

```

45

46

## Debezium JSON Structure

47

48

Debezium produces change events with a standardized envelope structure:

49

50

### With Schema (when SCHEMA_INCLUDE = true)

51

52

```json

53

{

54

"schema": {

55

"type": "struct",

56

"fields": [

57

{

58

"type": "struct",

59

"fields": [

60

{"field": "id", "type": "int32"},

61

{"field": "name", "type": "string"},

62

{"field": "email", "type": "string"}

63

],

64

"optional": false,

65

"name": "users.Value"

66

},

67

{

68

"type": "struct",

69

"fields": [

70

{"field": "version", "type": "string"},

71

{"field": "connector", "type": "string"},

72

{"field": "name", "type": "string"},

73

{"field": "ts_ms", "type": "int64"},

74

{"field": "snapshot", "type": "string"},

75

{"field": "db", "type": "string"},

76

{"field": "table", "type": "string"},

77

{"field": "server_id", "type": "int64"},

78

{"field": "gtid", "type": "string"},

79

{"field": "file", "type": "string"},

80

{"field": "pos", "type": "int64"},

81

{"field": "row", "type": "int32"},

82

{"field": "thread", "type": "int64"},

83

{"field": "query", "type": "string"}

84

],

85

"optional": false,

86

"name": "io.debezium.connector.mysql.Source"

87

}

88

],

89

"optional": false,

90

"name": "users.Envelope"

91

},

92

"payload": {

93

"before": null,

94

"after": {

95

"id": 1,

96

"name": "Alice",

97

"email": "alice@example.com"

98

},

99

"source": {

100

"version": "1.9.7.Final",

101

"connector": "mysql",

102

"name": "mysql-server",

103

"ts_ms": 1672574400000,

104

"snapshot": "false",

105

"db": "user_db",

106

"table": "users",

107

"server_id": 1,

108

"gtid": null,

109

"file": "mysql-bin.000001",

110

"pos": 154,

111

"row": 0,

112

"thread": 7,

113

"query": null

114

},

115

"op": "c",

116

"ts_ms": 1672574400123,

117

"transaction": null

118

}

119

}

120

```

121

122

### Without Schema (when SCHEMA_INCLUDE = false, default)

123

124

```json

125

{

126

"before": null,

127

"after": {

128

"id": 1,

129

"name": "Alice",

130

"email": "alice@example.com"

131

},

132

"source": {

133

"version": "1.9.7.Final",

134

"connector": "mysql",

135

"name": "mysql-server",

136

"ts_ms": 1672574400000,

137

"snapshot": "false",

138

"db": "user_db",

139

"table": "users",

140

"server_id": 1,

141

"gtid": null,

142

"file": "mysql-bin.000001",

143

"pos": 154,

144

"row": 0,

145

"thread": 7,

146

"query": null

147

},

148

"op": "c",

149

"ts_ms": 1672574400123,

150

"transaction": null

151

}

152

```

153

154

## Table API Integration

155

156

### SQL DDL Usage

157

158

Create tables using Debezium JSON format for change data capture processing:

159

160

```sql

161

CREATE TABLE debezium_source (

162

id BIGINT,

163

name STRING,

164

email STRING,

165

created_at TIMESTAMP(3),

166

PRIMARY KEY (id) NOT ENFORCED

167

) WITH (

168

'connector' = 'kafka',

169

'topic' = 'debezium-topic',

170

'properties.bootstrap.servers' = 'localhost:9092',

171

'format' = 'debezium-json',

172

'debezium-json.schema-include' = 'false',

173

'debezium-json.ignore-parse-errors' = 'true'

174

);

175

```

176

177

### Programmatic Table Definition

178

179

```java

180

import org.apache.flink.table.api.TableDescriptor;

181

import org.apache.flink.table.api.DataTypes;

182

import org.apache.flink.table.api.Schema;

183

184

TableDescriptor debeziumTable = TableDescriptor.forConnector("kafka")

185

.schema(Schema.newBuilder()

186

.column("id", DataTypes.BIGINT())

187

.column("name", DataTypes.STRING())

188

.column("email", DataTypes.STRING())

189

.column("created_at", DataTypes.TIMESTAMP(3))

190

.primaryKey("id")

191

.build())

192

.option("topic", "debezium-topic")

193

.option("properties.bootstrap.servers", "localhost:9092")

194

.format("debezium-json")

195

.option("debezium-json.schema-include", "false")

196

.option("debezium-json.ignore-parse-errors", "true")

197

.build();

198

```

199

200

## Change Event Processing

201

202

Debezium format handles various types of change events with comprehensive metadata:

203

204

### Insert Events (op: "c" for create)

205

- `before`: null

206

- `after`: New row data

207

- `op`: "c"

208

209

### Update Events (op: "u" for update)

210

- `before`: Previous row data

211

- `after`: Updated row data

212

- `op`: "u"

213

214

### Delete Events (op: "d" for delete)

215

- `before`: Deleted row data

216

- `after`: null

217

- `op`: "d"

218

219

### Read Events (op: "r" for read/snapshot)

220

- `before`: null

221

- `after`: Current row data (from snapshot)

222

- `op`: "r"

223

224

## Source Metadata

225

226

Debezium provides rich source metadata for tracking change origins:

227

228

| Field | Description | Example |

229

|-------|-------------|---------|

230

| version | Debezium version | "1.9.7.Final" |

231

| connector | Source connector type | "mysql", "postgresql" |

232

| name | Connector instance name | "mysql-server" |

233

| ts_ms | Event timestamp (milliseconds) | 1672574400000 |

234

| snapshot | Snapshot indicator | "true", "false", "last" |

235

| db | Source database name | "user_db" |

236

| table | Source table name | "users" |

237

| server_id | Database server ID | 1 |

238

| gtid | Global Transaction ID | MySQL GTID |

239

| file | Binlog file name | "mysql-bin.000001" |

240

| pos | Position in binlog | 154 |

241

| row | Row number in event | 0 |

242

| thread | Thread ID | 7 |

243

| query | SQL query (if available) | "INSERT INTO..." |

244

245

## Metadata Access in Tables

246

247

Access Debezium metadata through special metadata columns:

248

249

```sql

250

CREATE TABLE debezium_with_metadata (

251

-- Regular data columns

252

id BIGINT,

253

name STRING,

254

email STRING,

255

256

-- Metadata columns

257

debezium_op STRING METADATA FROM 'op',

258

debezium_source_ts TIMESTAMP_LTZ(3) METADATA FROM 'source.ts_ms',

259

debezium_source_db STRING METADATA FROM 'source.db',

260

debezium_source_table STRING METADATA FROM 'source.table',

261

debezium_source_connector STRING METADATA FROM 'source.connector',

262

debezium_source_snapshot STRING METADATA FROM 'source.snapshot'

263

) WITH (

264

'connector' = 'kafka',

265

'format' = 'debezium-json'

266

-- other connector options

267

);

268

```

269

270

## Schema Handling

271

272

### Schema Inclusion

273

274

When `SCHEMA_INCLUDE` is enabled, Debezium messages include complete schema information:

275

276

```java

277

// Enable schema inclusion for schema evolution handling

278

config.set(DebeziumJsonFormatOptions.SCHEMA_INCLUDE, true);

279

```

280

281

Benefits of schema inclusion:

282

- Schema evolution detection

283

- Data type validation

284

- Field mapping verification

285

- Better error diagnostics

286

287

### Schema Registry Integration

288

289

For production deployments, consider using Confluent Schema Registry with Debezium:

290

291

```sql

292

CREATE TABLE debezium_avro_source (

293

id BIGINT,

294

name STRING,

295

email STRING

296

) WITH (

297

'connector' = 'kafka',

298

'format' = 'debezium-avro-confluent',

299

'debezium-avro-confluent.url' = 'http://schema-registry:8081'

300

);

301

```

302

303

## Multi-Database Support

304

305

Debezium format supports multiple database systems with consistent envelope structure:

306

307

### MySQL Connector

308

```java

309

// MySQL-specific source fields

310

// server_id, gtid, file, pos, row, thread

311

```

312

313

### PostgreSQL Connector

314

```java

315

// PostgreSQL-specific source fields

316

// lsn, txId, ts_usec

317

```

318

319

### SQL Server Connector

320

```java

321

// SQL Server-specific source fields

322

// change_lsn, commit_lsn, event_serial_no

323

```

324

325

### MongoDB Connector

326

```java

327

// MongoDB-specific source fields

328

// ord, h, tord

329

```

330

331

## Transaction Handling

332

333

Debezium provides transaction boundary information:

334

335

```json

336

{

337

"before": null,

338

"after": {...},

339

"source": {...},

340

"op": "c",

341

"ts_ms": 1672574400123,

342

"transaction": {

343

"id": "571",

344

"total_order": 1,

345

"data_collection_order": 1

346

}

347

}

348

```

349

350

Access transaction metadata:

351

352

```sql

353

CREATE TABLE debezium_with_transaction (

354

id BIGINT,

355

name STRING,

356

-- Transaction metadata

357

transaction_id STRING METADATA FROM 'transaction.id',

358

transaction_total_order BIGINT METADATA FROM 'transaction.total_order',

359

transaction_data_collection_order BIGINT METADATA FROM 'transaction.data_collection_order'

360

) WITH (

361

'connector' = 'kafka',

362

'format' = 'debezium-json'

363

);

364

```

365

366

## Error Handling

367

368

Configure comprehensive error handling for production deployments:

369

370

```java

371

// Ignore parsing errors for resilience

372

config.set(DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS, true);

373

374

// Handle timestamp parsing

375

config.set(DebeziumJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");

376

377

// Handle null keys in nested maps

378

config.set(DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "DROP");

379

```

380

381

## Production Considerations

382

383

### Performance Optimization

384

- Disable schema inclusion unless needed for schema evolution

385

- Use appropriate Kafka consumer configurations

386

- Consider message compression and batching

387

388

### Reliability

389

- Enable parse error ignoring for production resilience

390

- Implement dead letter queues for failed messages

391

- Monitor Debezium connector health

392

393

### Schema Evolution

394

- Use schema-include for environments with frequent schema changes

395

- Implement schema compatibility checks

396

- Plan for graceful handling of schema evolution events

397

398

### Monitoring

399

- Track parse success/failure rates

400

- Monitor event lag and processing latency

401

- Set up alerts for connector disconnections

402

- Monitor data type conversion errors