or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

canal-cdc.mdcore-json.mddebezium-cdc.mdindex.mdmaxwell-cdc.mdogg-cdc.md

maxwell-cdc.mddocs/

0

# Maxwell CDC Format

1

2

JSON format support for Maxwell's daemon Change Data Capture system, enabling processing of MySQL binlog changes with Maxwell-specific JSON structure. Maxwell is a MySQL change data capture application that reads binlog and outputs row updates as JSON.

3

4

## Capabilities

5

6

### Maxwell Format Configuration

7

8

Configuration options specific to Maxwell CDC format, providing comprehensive control over JSON processing and error handling.

9

10

```java { .api }

11

/**

12

* Configuration options for Maxwell JSON format

13

*/

14

public class MaxwellJsonFormatOptions {

15

16

/** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */

17

public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;

18

19

/** Timestamp format pattern (inherited from JsonFormatOptions) */

20

public static final ConfigOption<String> TIMESTAMP_FORMAT;

21

22

/** How to handle null keys in maps (inherited from JsonFormatOptions) */

23

public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;

24

25

/** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */

26

public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;

27

}

28

```

29

30

**Configuration Usage:**

31

32

```java

33

import org.apache.flink.configuration.Configuration;

34

import org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions;

35

36

// Configure Maxwell format options

37

Configuration config = new Configuration();

38

config.set(MaxwellJsonFormatOptions.IGNORE_PARSE_ERRORS, true);

39

config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");

40

config.set(MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "DROP");

41

```

42

43

## Maxwell JSON Structure

44

45

Maxwell produces change events with a flat structure containing both data and metadata:

46

47

### Insert Event

48

49

```json

50

{

51

"database": "user_db",

52

"table": "users",

53

"type": "insert",

54

"ts": 1672574400,

55

"xid": 1234,

56

"xoffset": 0,

57

"data": {

58

"id": 1,

59

"name": "Alice",

60

"email": "alice@example.com",

61

"created_at": "2023-01-01 10:00:00"

62

}

63

}

64

```

65

66

### Update Event

67

68

```json

69

{

70

"database": "user_db",

71

"table": "users",

72

"type": "update",

73

"ts": 1672574401,

74

"xid": 1235,

75

"xoffset": 0,

76

"data": {

77

"id": 1,

78

"name": "Alice Smith",

79

"email": "alice.smith@example.com",

80

"created_at": "2023-01-01 10:00:00"

81

},

82

"old": {

83

"name": "Alice",

84

"email": "alice@example.com"

85

}

86

}

87

```

88

89

### Delete Event

90

91

```json

92

{

93

"database": "user_db",

94

"table": "users",

95

"type": "delete",

96

"ts": 1672574402,

97

"xid": 1236,

98

"xoffset": 0,

99

"data": {

100

"id": 1,

101

"name": "Alice Smith",

102

"email": "alice.smith@example.com",

103

"created_at": "2023-01-01 10:00:00"

104

}

105

}

106

```

107

108

## Table API Integration

109

110

### SQL DDL Usage

111

112

Create tables using Maxwell JSON format for change data capture processing:

113

114

```sql

115

CREATE TABLE maxwell_source (

116

id BIGINT,

117

name STRING,

118

email STRING,

119

created_at TIMESTAMP(3),

120

PRIMARY KEY (id) NOT ENFORCED

121

) WITH (

122

'connector' = 'kafka',

123

'topic' = 'maxwell-topic',

124

'properties.bootstrap.servers' = 'localhost:9092',

125

'format' = 'maxwell-json',

126

'maxwell-json.ignore-parse-errors' = 'true',

127

'maxwell-json.timestamp-format' = 'yyyy-MM-dd HH:mm:ss'

128

);

129

```

130

131

### Programmatic Table Definition

132

133

```java

134

import org.apache.flink.table.api.TableDescriptor;

135

import org.apache.flink.table.api.DataTypes;

136

import org.apache.flink.table.api.Schema;

137

138

TableDescriptor maxwellTable = TableDescriptor.forConnector("kafka")

139

.schema(Schema.newBuilder()

140

.column("id", DataTypes.BIGINT())

141

.column("name", DataTypes.STRING())

142

.column("email", DataTypes.STRING())

143

.column("created_at", DataTypes.TIMESTAMP(3))

144

.primaryKey("id")

145

.build())

146

.option("topic", "maxwell-topic")

147

.option("properties.bootstrap.servers", "localhost:9092")

148

.format("maxwell-json")

149

.option("maxwell-json.ignore-parse-errors", "true")

150

.option("maxwell-json.timestamp-format", "yyyy-MM-dd HH:mm:ss")

151

.build();

152

```

153

154

## Change Event Processing

155

156

Maxwell format handles three types of change events with comprehensive metadata:

157

158

### Insert Events

159

- `type`: "insert"

160

- `data`: New row data

161

- `old`: Not present

162

163

### Update Events

164

- `type`: "update"

165

- `data`: Updated row data (complete row)

166

- `old`: Changed fields only (partial row)

167

168

### Delete Events

169

- `type`: "delete"

170

- `data`: Deleted row data (complete row)

171

- `old`: Not present

172

173

## Maxwell Metadata Fields

174

175

Maxwell provides rich metadata for each change event:

176

177

| Field | Description | Type | Example |

178

|-------|-------------|------|---------|

179

| database | Source database name | String | "user_db" |

180

| table | Source table name | String | "users" |

181

| type | Change operation type | String | "insert", "update", "delete" |

182

| ts | Transaction timestamp (seconds) | Long | 1672574400 |

183

| xid | Transaction ID | Long | 1234 |

184

| xoffset | Position within transaction | Integer | 0 |

185

| commit | Commit flag (for transaction end) | Boolean | true |

186

| data | Row data after change | Object | {...} |

187

| old | Previous values (update only) | Object | {...} |

188

189

### Accessing Metadata in Tables

190

191

```sql

192

CREATE TABLE maxwell_with_metadata (

193

-- Regular data columns

194

id BIGINT,

195

name STRING,

196

email STRING,

197

198

-- Metadata columns

199

maxwell_database STRING METADATA FROM 'database',

200

maxwell_table STRING METADATA FROM 'table',

201

maxwell_type STRING METADATA FROM 'type',

202

maxwell_ts TIMESTAMP_LTZ(3) METADATA FROM 'ts',

203

maxwell_xid BIGINT METADATA FROM 'xid'

204

) WITH (

205

'connector' = 'kafka',

206

'format' = 'maxwell-json'

207

-- other connector options

208

);

209

```

210

211

## Data Type Mapping

212

213

Maxwell JSON format maps MySQL types to Flink types:

214

215

| MySQL Type | Maxwell JSON | Flink Type | Notes |

216

|------------|--------------|------------|-------|

217

| TINYINT | Number | TINYINT | |

218

| SMALLINT | Number | SMALLINT | |

219

| INT | Number | INT | |

220

| BIGINT | Number | BIGINT | |

221

| FLOAT | Number | FLOAT | |

222

| DOUBLE | Number | DOUBLE | |

223

| DECIMAL | String | DECIMAL | Exact precision |

224

| VARCHAR | String | STRING | |

225

| CHAR | String | STRING | |

226

| TEXT | String | STRING | |

227

| DATE | String | DATE | Format: YYYY-MM-DD |

228

| TIME | String | TIME | Format: HH:MM:SS |

229

| DATETIME | String | TIMESTAMP | Configurable format |

230

| TIMESTAMP | String | TIMESTAMP_LTZ | With timezone |

231

| JSON | String | STRING | As JSON string |

232

| BINARY | String | BYTES | Base64 encoded |

233

| BIT | Number | BOOLEAN | For BIT(1) |

234

235

## Timestamp Handling

236

237

Configure timestamp parsing for proper temporal processing:

238

239

```java

240

// Default MySQL datetime format

241

config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");

242

243

// ISO-8601 format

244

config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "ISO-8601");

245

246

// Custom format

247

config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "MM/dd/yyyy HH:mm:ss");

248

```

249

250

### Event Timestamp vs Data Timestamps

251

252

Maxwell provides two types of timestamps:

253

254

1. **Event Timestamp** (`ts`): When the transaction occurred

255

2. **Data Timestamps**: Timestamp columns in the actual data

256

257

```sql

258

CREATE TABLE maxwell_timestamps (

259

id BIGINT,

260

name STRING,

261

created_at TIMESTAMP(3), -- Data timestamp

262

updated_at TIMESTAMP(3), -- Data timestamp

263

264

-- Event timestamp from Maxwell

265

event_time TIMESTAMP_LTZ(3) METADATA FROM 'ts'

266

) WITH (

267

'connector' = 'kafka',

268

'format' = 'maxwell-json'

269

);

270

```

271

272

## Transaction Processing

273

274

Maxwell groups related changes by transaction:

275

276

### Transaction Boundaries

277

278

```json

279

{

280

"database": "user_db",

281

"table": "users",

282

"type": "insert",

283

"ts": 1672574400,

284

"xid": 1234,

285

"xoffset": 0,

286

"data": {...}

287

}

288

```

289

290

```json

291

{

292

"database": "user_db",

293

"table": "orders",

294

"type": "insert",

295

"ts": 1672574400,

296

"xid": 1234,

297

"xoffset": 1,

298

"data": {...}

299

}

300

```

301

302

```json

303

{

304

"ts": 1672574400,

305

"xid": 1234,

306

"commit": true

307

}

308

```

309

310

### Transaction-Aware Processing

311

312

```sql

313

-- Group changes by transaction

314

SELECT

315

maxwell_xid,

316

maxwell_ts,

317

COUNT(*) as change_count,

318

COLLECT(maxwell_type) as change_types

319

FROM maxwell_with_metadata

320

GROUP BY maxwell_xid, maxwell_ts;

321

```

322

323

## Advanced Features

324

325

### Bootstrapping Support

326

327

Maxwell supports bootstrapping existing data:

328

329

```json

330

{

331

"database": "user_db",

332

"table": "users",

333

"type": "bootstrap-start",

334

"ts": 1672574400,

335

"data": {}

336

}

337

```

338

339

```json

340

{

341

"database": "user_db",

342

"table": "users",

343

"type": "bootstrap-insert",

344

"ts": 1672574400,

345

"data": {

346

"id": 1,

347

"name": "Alice"

348

}

349

}

350

```

351

352

```json

353

{

354

"database": "user_db",

355

"table": "users",

356

"type": "bootstrap-complete",

357

"ts": 1672574400,

358

"data": {}

359

}

360

```

361

362

### DDL Events

363

364

Maxwell can capture DDL changes:

365

366

```json

367

{

368

"database": "user_db",

369

"table": "users",

370

"type": "table-create",

371

"ts": 1672574400,

372

"sql": "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(100))"

373

}

374

```

375

376

## Error Handling

377

378

Configure robust error handling for production deployments:

379

380

```java

381

// Ignore parsing errors

382

config.set(MaxwellJsonFormatOptions.IGNORE_PARSE_ERRORS, true);

383

384

// Handle null keys in maps

385

config.set(MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "LITERAL");

386

config.set(MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL, "__NULL__");

387

```

388

389

### Common Error Scenarios

390

391

1. **Malformed JSON**: Enable `IGNORE_PARSE_ERRORS`

392

2. **Timestamp Parsing**: Configure appropriate `TIMESTAMP_FORMAT`

393

3. **Large Binary Data**: Consider data size limits

394

4. **Schema Evolution**: Handle new/removed columns gracefully

395

396

## Production Considerations

397

398

### Performance Optimization

399

400

```java

401

// Optimize for high-throughput scenarios

402

Properties kafkaProps = new Properties();

403

kafkaProps.setProperty("max.poll.records", "10000");

404

kafkaProps.setProperty("fetch.min.bytes", "1048576");

405

kafkaProps.setProperty("fetch.max.wait.ms", "500");

406

```

407

408

### Reliability

409

410

- Enable `ignore-parse-errors` for production resilience

411

- Implement proper error handling and alerting

412

- Monitor Maxwell daemon health and connectivity

413

- Set up proper Kafka retention policies

414

415

### Monitoring

416

417

Key metrics to monitor:

418

- Maxwell lag behind MySQL binlog

419

- Parse success/failure rates

420

- Event processing latency

421

- Transaction completion rates

422

- DDL event frequency

423

424

### Schema Evolution

425

426

Handle schema changes gracefully:

427

428

```sql

429

-- Use flexible schema definition

430

CREATE TABLE maxwell_flexible (

431

-- Known columns

432

id BIGINT,

433

name STRING,

434

435

-- Catch-all for new columns

436

row_data STRING, -- Full JSON for analysis

437

438

-- Metadata for debugging

439

maxwell_database STRING METADATA FROM 'database',

440

maxwell_table STRING METADATA FROM 'table'

441

) WITH (

442

'connector' = 'kafka',

443

'format' = 'maxwell-json',

444

'maxwell-json.ignore-parse-errors' = 'true'

445

);

446

```

447

448

## Integration Patterns

449

450

### Real-time Analytics

451

452

```sql

453

-- Real-time user activity tracking

454

SELECT

455

name,

456

COUNT(*) as changes,

457

MAX(maxwell_ts) as last_change

458

FROM maxwell_source

459

WHERE maxwell_type IN ('insert', 'update')

460

GROUP BY TUMBLE(maxwell_ts, INTERVAL '1' MINUTE), name;

461

```

462

463

### Data Lake Integration

464

465

```sql

466

-- Partition by date for data lake storage

467

CREATE TABLE maxwell_data_lake (

468

id BIGINT,

469

name STRING,

470

email STRING,

471

change_type STRING,

472

change_date DATE

473

) PARTITIONED BY (change_date)

474

WITH (

475

'connector' = 's3',

476

'format' = 'parquet'

477

);

478

479

INSERT INTO maxwell_data_lake

480

SELECT id, name, email, maxwell_type, DATE(maxwell_ts)

481

FROM maxwell_with_metadata;

482

```