or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

canal-cdc.mdcore-json.mddebezium-cdc.mdindex.mdmaxwell-cdc.mdogg-cdc.md

canal-cdc.mddocs/

0

# Canal CDC Format

1

2

JSON format support for Canal Change Data Capture system, enabling processing of MySQL binlog changes with Canal-specific JSON structure. Canal is a MySQL binlog incremental subscription and consumption service that outputs changes in a specific JSON format with database and table metadata.

3

4

## Capabilities

5

6

### Canal Format Configuration

7

8

Configuration options specific to Canal CDC format, including database and table filtering capabilities for selective change data processing.

9

10

```java { .api }

11

/**

12

* Configuration options for Canal JSON format

13

*/

14

public class CanalJsonFormatOptions {

15

16

/** Regular expression to filter databases (optional) */

17

public static final ConfigOption<String> DATABASE_INCLUDE;

18

19

/** Regular expression to filter tables (optional) */

20

public static final ConfigOption<String> TABLE_INCLUDE;

21

22

/** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */

23

public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;

24

25

/** Timestamp format pattern (inherited from JsonFormatOptions) */

26

public static final ConfigOption<String> TIMESTAMP_FORMAT;

27

28

/** How to handle null keys in maps (inherited from JsonFormatOptions) */

29

public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;

30

31

/** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */

32

public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;

33

}

34

```

35

36

**Configuration Usage:**

37

38

```java

39

import org.apache.flink.configuration.Configuration;

40

import org.apache.flink.formats.json.canal.CanalJsonFormatOptions;

41

42

// Configure Canal format options

43

Configuration config = new Configuration();

44

config.set(CanalJsonFormatOptions.DATABASE_INCLUDE, "user_db|order_db");

45

config.set(CanalJsonFormatOptions.TABLE_INCLUDE, "users|orders|products");

46

config.set(CanalJsonFormatOptions.IGNORE_PARSE_ERRORS, true);

47

config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");

48

```

49

50

## Canal JSON Structure

51

52

Canal produces JSON messages with the following structure for change events:

53

54

```json

55

{

56

"data": [

57

{

58

"id": "1",

59

"name": "Alice",

60

"email": "alice@example.com",

61

"created_at": "2023-01-01 10:00:00"

62

}

63

],

64

"database": "user_db",

65

"es": 1672574400000,

66

"id": 1,

67

"isDdl": false,

68

"mysqlType": {

69

"id": "int(11)",

70

"name": "varchar(100)",

71

"email": "varchar(255)",

72

"created_at": "datetime"

73

},

74

"old": null,

75

"pkNames": ["id"],

76

"sql": "",

77

"sqlType": {

78

"id": 4,

79

"name": 12,

80

"email": 12,

81

"created_at": 93

82

},

83

"table": "users",

84

"ts": 1672574400123,

85

"type": "INSERT"

86

}

87

```

88

89

## Table API Integration

90

91

### SQL DDL Usage

92

93

Create tables using Canal JSON format for change data capture processing:

94

95

```sql

96

CREATE TABLE canal_source (

97

id BIGINT,

98

name STRING,

99

email STRING,

100

created_at TIMESTAMP(3),

101

PRIMARY KEY (id) NOT ENFORCED

102

) WITH (

103

'connector' = 'kafka',

104

'topic' = 'canal-topic',

105

'properties.bootstrap.servers' = 'localhost:9092',

106

'format' = 'canal-json',

107

'canal-json.ignore-parse-errors' = 'true',

108

'canal-json.database.include' = 'user_db',

109

'canal-json.table.include' = 'users'

110

);

111

```

112

113

### Programmatic Table Definition

114

115

```java

116

import org.apache.flink.table.api.TableDescriptor;

117

import org.apache.flink.table.api.DataTypes;

118

import org.apache.flink.table.api.Schema;

119

120

TableDescriptor canalTable = TableDescriptor.forConnector("kafka")

121

.schema(Schema.newBuilder()

122

.column("id", DataTypes.BIGINT())

123

.column("name", DataTypes.STRING())

124

.column("email", DataTypes.STRING())

125

.column("created_at", DataTypes.TIMESTAMP(3))

126

.primaryKey("id")

127

.build())

128

.option("topic", "canal-topic")

129

.option("properties.bootstrap.servers", "localhost:9092")

130

.format("canal-json")

131

.option("canal-json.ignore-parse-errors", "true")

132

.option("canal-json.database.include", "user_db")

133

.option("canal-json.table.include", "users")

134

.build();

135

```

136

137

## Change Event Processing

138

139

Canal format automatically handles change event metadata, extracting the actual data changes and making them available through the table schema:

140

141

### Insert Events

142

- `type`: "INSERT"

143

- `data`: Array containing new row data

144

- `old`: null

145

146

### Update Events

147

- `type`: "UPDATE"

148

- `data`: Array containing updated row data

149

- `old`: Array containing previous row data (before update)

150

151

### Delete Events

152

- `type`: "DELETE"

153

- `data`: Array containing deleted row data

154

- `old`: null

155

156

## Filtering Capabilities

157

158

### Database Filtering

159

160

Use regular expressions to filter which databases are processed:

161

162

```java

163

// Include specific databases

164

config.set(CanalJsonFormatOptions.DATABASE_INCLUDE, "user_db|inventory_db");

165

166

// Exclude system databases (using negative lookahead)

167

config.set(CanalJsonFormatOptions.DATABASE_INCLUDE, "^(?!mysql|information_schema|performance_schema).*");

168

```

169

170

### Table Filtering

171

172

Filter specific tables within included databases:

173

174

```java

175

// Include specific tables

176

config.set(CanalJsonFormatOptions.TABLE_INCLUDE, "users|orders|products");

177

178

// Include tables with specific patterns

179

config.set(CanalJsonFormatOptions.TABLE_INCLUDE, "user_.*|order_.*");

180

```

181

182

## Error Handling

183

184

Configure error handling for malformed Canal JSON messages:

185

186

```java

187

// Ignore parsing errors and continue processing

188

config.set(CanalJsonFormatOptions.IGNORE_PARSE_ERRORS, true);

189

190

// Fail on parsing errors (default behavior)

191

config.set(CanalJsonFormatOptions.IGNORE_PARSE_ERRORS, false);

192

```

193

194

## Timestamp Handling

195

196

Configure timestamp format for proper temporal processing:

197

198

```java

199

// SQL standard timestamp format

200

config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "SQL");

201

202

// Custom timestamp format

203

config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");

204

205

// ISO-8601 format

206

config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "ISO-8601");

207

```

208

209

## Data Type Mapping

210

211

Canal JSON format automatically maps MySQL types to Flink types:

212

213

| MySQL Type | Flink Type | Notes |

214

|------------|------------|-------|

215

| TINYINT | TINYINT | |

216

| SMALLINT | SMALLINT | |

217

| INT | INT | |

218

| BIGINT | BIGINT | |

219

| FLOAT | FLOAT | |

220

| DOUBLE | DOUBLE | |

221

| DECIMAL | DECIMAL | Precision preserved |

222

| VARCHAR | STRING | |

223

| CHAR | STRING | |

224

| TEXT | STRING | |

225

| DATE | DATE | |

226

| TIME | TIME | |

227

| DATETIME | TIMESTAMP | |

228

| TIMESTAMP | TIMESTAMP_LTZ | With timezone |

229

| JSON | STRING | As JSON string |

230

231

## Metadata Fields

232

233

Canal format provides access to change event metadata through special fields:

234

235

```sql

236

CREATE TABLE canal_with_metadata (

237

-- Regular data columns

238

id BIGINT,

239

name STRING,

240

email STRING,

241

242

-- Metadata columns

243

canal_database STRING METADATA FROM 'database',

244

canal_table STRING METADATA FROM 'table',

245

canal_event_type STRING METADATA FROM 'type',

246

canal_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'ts',

247

canal_is_ddl BOOLEAN METADATA FROM 'isDdl'

248

) WITH (

249

'connector' = 'kafka',

250

'format' = 'canal-json'

251

-- other connector options

252

);

253

```

254

255

## Production Considerations

256

257

### Performance Optimization

258

259

- Use database and table filtering to reduce processing overhead

260

- Configure appropriate Kafka consumer settings for high-throughput scenarios

261

- Consider partitioning strategies based on database/table names

262

263

### Reliability

264

265

- Enable `ignore-parse-errors` for production resilience

266

- Implement dead letter queues for failed messages

267

- Monitor Canal format parsing metrics

268

269

### Monitoring

270

271

Canal format provides metrics for monitoring change data processing:

272

- Parse success/failure rates

273

- Filtered message counts

274

- Processing latency

275

- Data type conversion errors