0
# Maxwell CDC Format
1
2
JSON format support for Maxwell's daemon Change Data Capture system, enabling processing of MySQL binlog changes with Maxwell-specific JSON structure. Maxwell is a MySQL change data capture application that reads binlog and outputs row updates as JSON.
3
4
## Capabilities
5
6
### Maxwell Format Configuration
7
8
Configuration options specific to Maxwell CDC format, providing comprehensive control over JSON processing and error handling.
9
10
```java { .api }
11
/**
12
* Configuration options for Maxwell JSON format
13
*/
14
public class MaxwellJsonFormatOptions {
15
16
/** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */
17
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
18
19
/** Timestamp format pattern (inherited from JsonFormatOptions) */
20
public static final ConfigOption<String> TIMESTAMP_FORMAT;
21
22
/** How to handle null keys in maps (inherited from JsonFormatOptions) */
23
public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
24
25
/** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */
26
public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;
27
}
28
```
29
30
**Configuration Usage:**
31
32
```java
33
import org.apache.flink.configuration.Configuration;
34
import org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions;
35
36
// Configure Maxwell format options
37
Configuration config = new Configuration();
38
config.set(MaxwellJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
39
config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
40
config.set(MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "DROP");
41
```
42
43
## Maxwell JSON Structure
44
45
Maxwell produces change events with a flat structure containing both data and metadata:
46
47
### Insert Event
48
49
```json
50
{
51
"database": "user_db",
52
"table": "users",
53
"type": "insert",
54
"ts": 1672574400,
55
"xid": 1234,
56
"xoffset": 0,
57
"data": {
58
"id": 1,
59
"name": "Alice",
60
"email": "alice@example.com",
61
"created_at": "2023-01-01 10:00:00"
62
}
63
}
64
```
65
66
### Update Event
67
68
```json
69
{
70
"database": "user_db",
71
"table": "users",
72
"type": "update",
73
"ts": 1672574401,
74
"xid": 1235,
75
"xoffset": 0,
76
"data": {
77
"id": 1,
78
"name": "Alice Smith",
79
"email": "alice.smith@example.com",
80
"created_at": "2023-01-01 10:00:00"
81
},
82
"old": {
83
"name": "Alice",
84
"email": "alice@example.com"
85
}
86
}
87
```
88
89
### Delete Event
90
91
```json
92
{
93
"database": "user_db",
94
"table": "users",
95
"type": "delete",
96
"ts": 1672574402,
97
"xid": 1236,
98
"xoffset": 0,
99
"data": {
100
"id": 1,
101
"name": "Alice Smith",
102
"email": "alice.smith@example.com",
103
"created_at": "2023-01-01 10:00:00"
104
}
105
}
106
```
107
108
## Table API Integration
109
110
### SQL DDL Usage
111
112
Create tables using Maxwell JSON format for change data capture processing:
113
114
```sql
115
CREATE TABLE maxwell_source (
116
id BIGINT,
117
name STRING,
118
email STRING,
119
created_at TIMESTAMP(3),
120
PRIMARY KEY (id) NOT ENFORCED
121
) WITH (
122
'connector' = 'kafka',
123
'topic' = 'maxwell-topic',
124
'properties.bootstrap.servers' = 'localhost:9092',
125
'format' = 'maxwell-json',
126
'maxwell-json.ignore-parse-errors' = 'true',
127
'maxwell-json.timestamp-format' = 'yyyy-MM-dd HH:mm:ss'
128
);
129
```
130
131
### Programmatic Table Definition
132
133
```java
134
import org.apache.flink.table.api.TableDescriptor;
135
import org.apache.flink.table.api.DataTypes;
136
import org.apache.flink.table.api.Schema;
137
138
TableDescriptor maxwellTable = TableDescriptor.forConnector("kafka")
139
.schema(Schema.newBuilder()
140
.column("id", DataTypes.BIGINT())
141
.column("name", DataTypes.STRING())
142
.column("email", DataTypes.STRING())
143
.column("created_at", DataTypes.TIMESTAMP(3))
144
.primaryKey("id")
145
.build())
146
.option("topic", "maxwell-topic")
147
.option("properties.bootstrap.servers", "localhost:9092")
148
.format("maxwell-json")
149
.option("maxwell-json.ignore-parse-errors", "true")
150
.option("maxwell-json.timestamp-format", "yyyy-MM-dd HH:mm:ss")
151
.build();
152
```
153
154
## Change Event Processing
155
156
Maxwell format handles three types of change events with comprehensive metadata:
157
158
### Insert Events
159
- `type`: "insert"
160
- `data`: New row data
161
- `old`: Not present
162
163
### Update Events
164
- `type`: "update"
165
- `data`: Updated row data (complete row)
166
- `old`: Changed fields only (partial row)
167
168
### Delete Events
169
- `type`: "delete"
170
- `data`: Deleted row data (complete row)
171
- `old`: Not present
172
173
## Maxwell Metadata Fields
174
175
Maxwell provides rich metadata for each change event:
176
177
| Field | Description | Type | Example |
178
|-------|-------------|------|---------|
179
| database | Source database name | String | "user_db" |
180
| table | Source table name | String | "users" |
181
| type | Change operation type | String | "insert", "update", "delete" |
182
| ts | Transaction timestamp (seconds) | Long | 1672574400 |
183
| xid | Transaction ID | Long | 1234 |
184
| xoffset | Position within transaction | Integer | 0 |
185
| commit | Commit flag (for transaction end) | Boolean | true |
186
| data | Row data after change | Object | {...} |
187
| old | Previous values (update only) | Object | {...} |
188
189
### Accessing Metadata in Tables
190
191
```sql
192
CREATE TABLE maxwell_with_metadata (
193
-- Regular data columns
194
id BIGINT,
195
name STRING,
196
email STRING,
197
198
-- Metadata columns
199
maxwell_database STRING METADATA FROM 'database',
200
maxwell_table STRING METADATA FROM 'table',
201
maxwell_type STRING METADATA FROM 'type',
202
maxwell_ts TIMESTAMP_LTZ(3) METADATA FROM 'ts',
203
maxwell_xid BIGINT METADATA FROM 'xid'
204
) WITH (
205
'connector' = 'kafka',
206
'format' = 'maxwell-json'
207
-- other connector options
208
);
209
```
210
211
## Data Type Mapping
212
213
Maxwell JSON format maps MySQL types to Flink types:
214
215
| MySQL Type | Maxwell JSON | Flink Type | Notes |
216
|------------|--------------|------------|-------|
217
| TINYINT | Number | TINYINT | |
218
| SMALLINT | Number | SMALLINT | |
219
| INT | Number | INT | |
220
| BIGINT | Number | BIGINT | |
221
| FLOAT | Number | FLOAT | |
222
| DOUBLE | Number | DOUBLE | |
223
| DECIMAL | String | DECIMAL | Exact precision |
224
| VARCHAR | String | STRING | |
225
| CHAR | String | STRING | |
226
| TEXT | String | STRING | |
227
| DATE | String | DATE | Format: YYYY-MM-DD |
228
| TIME | String | TIME | Format: HH:MM:SS |
229
| DATETIME | String | TIMESTAMP | Configurable format |
230
| TIMESTAMP | String | TIMESTAMP_LTZ | With timezone |
231
| JSON | String | STRING | As JSON string |
232
| BINARY | String | BYTES | Base64 encoded |
233
| BIT | Number | BOOLEAN | For BIT(1) |
234
235
## Timestamp Handling
236
237
Configure timestamp parsing for proper temporal processing:
238
239
```java
240
// Default MySQL datetime format
241
config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
242
243
// ISO-8601 format
244
config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "ISO-8601");
245
246
// Custom format
247
config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "MM/dd/yyyy HH:mm:ss");
248
```
249
250
### Event Timestamp vs Data Timestamps
251
252
Maxwell provides two types of timestamps:
253
254
1. **Event Timestamp** (`ts`): When the transaction occurred
255
2. **Data Timestamps**: Timestamp columns in the actual data
256
257
```sql
258
CREATE TABLE maxwell_timestamps (
259
id BIGINT,
260
name STRING,
261
created_at TIMESTAMP(3), -- Data timestamp
262
updated_at TIMESTAMP(3), -- Data timestamp
263
264
-- Event timestamp from Maxwell
265
event_time TIMESTAMP_LTZ(3) METADATA FROM 'ts'
266
) WITH (
267
'connector' = 'kafka',
268
'format' = 'maxwell-json'
269
);
270
```
271
272
## Transaction Processing
273
274
Maxwell groups related changes by transaction:
275
276
### Transaction Boundaries
277
278
```json
279
{
280
"database": "user_db",
281
"table": "users",
282
"type": "insert",
283
"ts": 1672574400,
284
"xid": 1234,
285
"xoffset": 0,
286
"data": {...}
287
}
288
```
289
290
```json
291
{
292
"database": "user_db",
293
"table": "orders",
294
"type": "insert",
295
"ts": 1672574400,
296
"xid": 1234,
297
"xoffset": 1,
298
"data": {...}
299
}
300
```
301
302
```json
303
{
304
"ts": 1672574400,
305
"xid": 1234,
306
"commit": true
307
}
308
```
309
310
### Transaction-Aware Processing
311
312
```sql
313
-- Group changes by transaction
314
SELECT
315
maxwell_xid,
316
maxwell_ts,
317
COUNT(*) as change_count,
318
COLLECT(maxwell_type) as change_types
319
FROM maxwell_with_metadata
320
GROUP BY maxwell_xid, maxwell_ts;
321
```
322
323
## Advanced Features
324
325
### Bootstrapping Support
326
327
Maxwell supports bootstrapping existing data:
328
329
```json
330
{
331
"database": "user_db",
332
"table": "users",
333
"type": "bootstrap-start",
334
"ts": 1672574400,
335
"data": {}
336
}
337
```
338
339
```json
340
{
341
"database": "user_db",
342
"table": "users",
343
"type": "bootstrap-insert",
344
"ts": 1672574400,
345
"data": {
346
"id": 1,
347
"name": "Alice"
348
}
349
}
350
```
351
352
```json
353
{
354
"database": "user_db",
355
"table": "users",
356
"type": "bootstrap-complete",
357
"ts": 1672574400,
358
"data": {}
359
}
360
```
361
362
### DDL Events
363
364
Maxwell can capture DDL changes:
365
366
```json
367
{
368
"database": "user_db",
369
"table": "users",
370
"type": "table-create",
371
"ts": 1672574400,
372
"sql": "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(100))"
373
}
374
```
375
376
## Error Handling
377
378
Configure robust error handling for production deployments:
379
380
```java
381
// Ignore parsing errors
382
config.set(MaxwellJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
383
384
// Handle null keys in maps
385
config.set(MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "LITERAL");
386
config.set(MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL, "__NULL__");
387
```
388
389
### Common Error Scenarios
390
391
1. **Malformed JSON**: Enable `IGNORE_PARSE_ERRORS`
392
2. **Timestamp Parsing**: Configure appropriate `TIMESTAMP_FORMAT`
393
3. **Large Binary Data**: Consider data size limits
394
4. **Schema Evolution**: Handle new/removed columns gracefully
395
396
## Production Considerations
397
398
### Performance Optimization
399
400
```java
401
// Optimize for high-throughput scenarios
402
Properties kafkaProps = new Properties();
403
kafkaProps.setProperty("max.poll.records", "10000");
404
kafkaProps.setProperty("fetch.min.bytes", "1048576");
405
kafkaProps.setProperty("fetch.max.wait.ms", "500");
406
```
407
408
### Reliability
409
410
- Enable `ignore-parse-errors` for production resilience
411
- Implement proper error handling and alerting
412
- Monitor Maxwell daemon health and connectivity
413
- Set up proper Kafka retention policies
414
415
### Monitoring
416
417
Key metrics to monitor:
418
- Maxwell lag behind MySQL binlog
419
- Parse success/failure rates
420
- Event processing latency
421
- Transaction completion rates
422
- DDL event frequency
423
424
### Schema Evolution
425
426
Handle schema changes gracefully:
427
428
```sql
429
-- Use flexible schema definition
430
CREATE TABLE maxwell_flexible (
431
-- Known columns
432
id BIGINT,
433
name STRING,
434
435
-- Catch-all for new columns
436
row_data STRING, -- Full JSON for analysis
437
438
-- Metadata for debugging
439
maxwell_database STRING METADATA FROM 'database',
440
maxwell_table STRING METADATA FROM 'table'
441
) WITH (
442
'connector' = 'kafka',
443
'format' = 'maxwell-json',
444
'maxwell-json.ignore-parse-errors' = 'true'
445
);
446
```
447
448
## Integration Patterns
449
450
### Real-time Analytics
451
452
```sql
453
-- Real-time user activity tracking
454
SELECT
455
name,
456
COUNT(*) as changes,
457
MAX(maxwell_ts) as last_change
458
FROM maxwell_source
459
WHERE maxwell_type IN ('insert', 'update')
460
GROUP BY TUMBLE(maxwell_ts, INTERVAL '1' MINUTE), name;
461
```
462
463
### Data Lake Integration
464
465
```sql
466
-- Partition by date for data lake storage
467
CREATE TABLE maxwell_data_lake (
468
id BIGINT,
469
name STRING,
470
email STRING,
471
change_type STRING,
472
change_date DATE
473
) PARTITIONED BY (change_date)
474
WITH (
475
'connector' = 's3',
476
'format' = 'parquet'
477
);
478
479
INSERT INTO maxwell_data_lake
480
SELECT id, name, email, maxwell_type, DATE(maxwell_ts)
481
FROM maxwell_with_metadata;
482
```