0
# Canal CDC Format
1
2
JSON format support for Canal Change Data Capture system, enabling processing of MySQL binlog changes with Canal-specific JSON structure. Canal is a MySQL binlog incremental subscription and consumption service that outputs changes in a specific JSON format with database and table metadata.
3
4
## Capabilities
5
6
### Canal Format Configuration
7
8
Configuration options specific to Canal CDC format, including database and table filtering capabilities for selective change data processing.
9
10
```java { .api }
11
/**
12
* Configuration options for Canal JSON format
13
*/
14
public class CanalJsonFormatOptions {
15
16
/** Regular expression to filter databases (optional) */
17
public static final ConfigOption<String> DATABASE_INCLUDE;
18
19
/** Regular expression to filter tables (optional) */
20
public static final ConfigOption<String> TABLE_INCLUDE;
21
22
/** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */
23
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
24
25
/** Timestamp format pattern (inherited from JsonFormatOptions) */
26
public static final ConfigOption<String> TIMESTAMP_FORMAT;
27
28
/** How to handle null keys in maps (inherited from JsonFormatOptions) */
29
public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
30
31
/** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */
32
public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;
33
}
34
```
35
36
**Configuration Usage:**
37
38
```java
39
import org.apache.flink.configuration.Configuration;
40
import org.apache.flink.formats.json.canal.CanalJsonFormatOptions;
41
42
// Configure Canal format options
43
Configuration config = new Configuration();
44
config.set(CanalJsonFormatOptions.DATABASE_INCLUDE, "user_db|order_db");
45
config.set(CanalJsonFormatOptions.TABLE_INCLUDE, "users|orders|products");
46
config.set(CanalJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
47
config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
48
```
49
50
## Canal JSON Structure
51
52
Canal produces JSON messages with the following structure for change events:
53
54
```json
55
{
56
"data": [
57
{
58
"id": "1",
59
"name": "Alice",
60
"email": "alice@example.com",
61
"created_at": "2023-01-01 10:00:00"
62
}
63
],
64
"database": "user_db",
65
"es": 1672574400000,
66
"id": 1,
67
"isDdl": false,
68
"mysqlType": {
69
"id": "int(11)",
70
"name": "varchar(100)",
71
"email": "varchar(255)",
72
"created_at": "datetime"
73
},
74
"old": null,
75
"pkNames": ["id"],
76
"sql": "",
77
"sqlType": {
78
"id": 4,
79
"name": 12,
80
"email": 12,
81
"created_at": 93
82
},
83
"table": "users",
84
"ts": 1672574400123,
85
"type": "INSERT"
86
}
87
```
88
89
## Table API Integration
90
91
### SQL DDL Usage
92
93
Create tables using Canal JSON format for change data capture processing:
94
95
```sql
96
CREATE TABLE canal_source (
97
id BIGINT,
98
name STRING,
99
email STRING,
100
created_at TIMESTAMP(3),
101
PRIMARY KEY (id) NOT ENFORCED
102
) WITH (
103
'connector' = 'kafka',
104
'topic' = 'canal-topic',
105
'properties.bootstrap.servers' = 'localhost:9092',
106
'format' = 'canal-json',
107
'canal-json.ignore-parse-errors' = 'true',
108
'canal-json.database.include' = 'user_db',
109
'canal-json.table.include' = 'users'
110
);
111
```
112
113
### Programmatic Table Definition
114
115
```java
116
import org.apache.flink.table.api.TableDescriptor;
117
import org.apache.flink.table.api.DataTypes;
118
import org.apache.flink.table.api.Schema;
119
120
TableDescriptor canalTable = TableDescriptor.forConnector("kafka")
121
.schema(Schema.newBuilder()
122
.column("id", DataTypes.BIGINT())
123
.column("name", DataTypes.STRING())
124
.column("email", DataTypes.STRING())
125
.column("created_at", DataTypes.TIMESTAMP(3))
126
.primaryKey("id")
127
.build())
128
.option("topic", "canal-topic")
129
.option("properties.bootstrap.servers", "localhost:9092")
130
.format("canal-json")
131
.option("canal-json.ignore-parse-errors", "true")
132
.option("canal-json.database.include", "user_db")
133
.option("canal-json.table.include", "users")
134
.build();
135
```
136
137
## Change Event Processing
138
139
Canal format automatically handles change event metadata, extracting the actual data changes and making them available through the table schema:
140
141
### Insert Events
142
- `type`: "INSERT"
143
- `data`: Array containing new row data
144
- `old`: null
145
146
### Update Events
147
- `type`: "UPDATE"
148
- `data`: Array containing updated row data
149
- `old`: Array containing previous row data (before update)
150
151
### Delete Events
152
- `type`: "DELETE"
153
- `data`: Array containing deleted row data
154
- `old`: null
155
156
## Filtering Capabilities
157
158
### Database Filtering
159
160
Use regular expressions to filter which databases are processed:
161
162
```java
163
// Include specific databases
164
config.set(CanalJsonFormatOptions.DATABASE_INCLUDE, "user_db|inventory_db");
165
166
// Exclude system databases (using negative lookahead)
167
config.set(CanalJsonFormatOptions.DATABASE_INCLUDE, "^(?!mysql|information_schema|performance_schema).*");
168
```
169
170
### Table Filtering
171
172
Filter specific tables within included databases:
173
174
```java
175
// Include specific tables
176
config.set(CanalJsonFormatOptions.TABLE_INCLUDE, "users|orders|products");
177
178
// Include tables with specific patterns
179
config.set(CanalJsonFormatOptions.TABLE_INCLUDE, "user_.*|order_.*");
180
```
181
182
## Error Handling
183
184
Configure error handling for malformed Canal JSON messages:
185
186
```java
187
// Ignore parsing errors and continue processing
188
config.set(CanalJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
189
190
// Fail on parsing errors (default behavior)
191
config.set(CanalJsonFormatOptions.IGNORE_PARSE_ERRORS, false);
192
```
193
194
## Timestamp Handling
195
196
Configure timestamp format for proper temporal processing:
197
198
```java
199
// SQL standard timestamp format
200
config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "SQL");
201
202
// Custom timestamp format
203
config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
204
205
// ISO-8601 format
206
config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "ISO-8601");
207
```
208
209
## Data Type Mapping
210
211
Canal JSON format automatically maps MySQL types to Flink types:
212
213
| MySQL Type | Flink Type | Notes |
214
|------------|------------|-------|
215
| TINYINT | TINYINT | |
216
| SMALLINT | SMALLINT | |
217
| INT | INT | |
218
| BIGINT | BIGINT | |
219
| FLOAT | FLOAT | |
220
| DOUBLE | DOUBLE | |
221
| DECIMAL | DECIMAL | Precision preserved |
222
| VARCHAR | STRING | |
223
| CHAR | STRING | |
224
| TEXT | STRING | |
225
| DATE | DATE | |
226
| TIME | TIME | |
227
| DATETIME | TIMESTAMP | |
228
| TIMESTAMP | TIMESTAMP_LTZ | With timezone |
229
| JSON | STRING | As JSON string |
230
231
## Metadata Fields
232
233
Canal format provides access to change event metadata through special fields:
234
235
```sql
236
CREATE TABLE canal_with_metadata (
237
-- Regular data columns
238
id BIGINT,
239
name STRING,
240
email STRING,
241
242
-- Metadata columns
243
canal_database STRING METADATA FROM 'database',
244
canal_table STRING METADATA FROM 'table',
245
canal_event_type STRING METADATA FROM 'type',
246
canal_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'ts',
247
canal_is_ddl BOOLEAN METADATA FROM 'isDdl'
248
) WITH (
249
'connector' = 'kafka',
250
'format' = 'canal-json'
251
-- other connector options
252
);
253
```
254
255
## Production Considerations
256
257
### Performance Optimization
258
259
- Use database and table filtering to reduce processing overhead
260
- Configure appropriate Kafka consumer settings for high-throughput scenarios
261
- Consider partitioning strategies based on database/table names
262
263
### Reliability
264
265
- Enable `ignore-parse-errors` for production resilience
266
- Implement dead letter queues for failed messages
267
- Monitor Canal format parsing metrics
268
269
### Monitoring
270
271
Canal format provides metrics for monitoring change data processing:
272
- Parse success/failure rates
273
- Filtered message counts
274
- Processing latency
275
- Data type conversion errors