JSON format support library for Apache Flink table ecosystem with comprehensive serialization, deserialization, and CDC format capabilities
—
JSON format support for Canal Change Data Capture system, enabling processing of MySQL binlog changes with Canal-specific JSON structure. Canal is a MySQL binlog incremental subscription and consumption service that outputs changes in a specific JSON format with database and table metadata.
Configuration options specific to Canal CDC format, including database and table filtering capabilities for selective change data processing.
/**
* Configuration options for Canal JSON format
*/
public class CanalJsonFormatOptions {
/** Regular expression to filter databases (optional) */
public static final ConfigOption<String> DATABASE_INCLUDE;
/** Regular expression to filter tables (optional) */
public static final ConfigOption<String> TABLE_INCLUDE;
/** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
/** Timestamp format pattern (inherited from JsonFormatOptions) */
public static final ConfigOption<String> TIMESTAMP_FORMAT;
/** How to handle null keys in maps (inherited from JsonFormatOptions) */
public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
/** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */
public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;
}Configuration Usage:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.canal.CanalJsonFormatOptions;
// Configure Canal format options
Configuration config = new Configuration();
config.set(CanalJsonFormatOptions.DATABASE_INCLUDE, "user_db|order_db");
config.set(CanalJsonFormatOptions.TABLE_INCLUDE, "users|orders|products");
config.set(CanalJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");Canal produces JSON messages with the following structure for change events:
{
"data": [
{
"id": "1",
"name": "Alice",
"email": "alice@example.com",
"created_at": "2023-01-01 10:00:00"
}
],
"database": "user_db",
"es": 1672574400000,
"id": 1,
"isDdl": false,
"mysqlType": {
"id": "int(11)",
"name": "varchar(100)",
"email": "varchar(255)",
"created_at": "datetime"
},
"old": null,
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"email": 12,
"created_at": 93
},
"table": "users",
"ts": 1672574400123,
"type": "INSERT"
}Create tables using Canal JSON format for change data capture processing:
CREATE TABLE canal_source (
id BIGINT,
name STRING,
email STRING,
created_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'canal-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'canal-json',
'canal-json.ignore-parse-errors' = 'true',
'canal-json.database.include' = 'user_db',
'canal-json.table.include' = 'users'
);import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
TableDescriptor canalTable = TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("email", DataTypes.STRING())
.column("created_at", DataTypes.TIMESTAMP(3))
.primaryKey("id")
.build())
.option("topic", "canal-topic")
.option("properties.bootstrap.servers", "localhost:9092")
.format("canal-json")
.option("canal-json.ignore-parse-errors", "true")
.option("canal-json.database.include", "user_db")
.option("canal-json.table.include", "users")
.build();Canal format automatically handles change event metadata, extracting the actual data changes and making them available through the table schema:
type: "INSERT"data: Array containing new row dataold: nulltype: "UPDATE"data: Array containing updated row dataold: Array containing previous row data (before update)type: "DELETE"data: Array containing deleted row dataold: nullUse regular expressions to filter which databases are processed:
// Include specific databases
config.set(CanalJsonFormatOptions.DATABASE_INCLUDE, "user_db|inventory_db");
// Exclude system databases (using negative lookahead)
config.set(CanalJsonFormatOptions.DATABASE_INCLUDE, "^(?!mysql|information_schema|performance_schema).*");Filter specific tables within included databases:
// Include specific tables
config.set(CanalJsonFormatOptions.TABLE_INCLUDE, "users|orders|products");
// Include tables with specific patterns
config.set(CanalJsonFormatOptions.TABLE_INCLUDE, "user_.*|order_.*");Configure error handling for malformed Canal JSON messages:
// Ignore parsing errors and continue processing
config.set(CanalJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
// Fail on parsing errors (default behavior)
config.set(CanalJsonFormatOptions.IGNORE_PARSE_ERRORS, false);Configure timestamp format for proper temporal processing:
// SQL standard timestamp format
config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "SQL");
// Custom timestamp format
config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
// ISO-8601 format
config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "ISO-8601");Canal JSON format automatically maps MySQL types to Flink types:
| MySQL Type | Flink Type | Notes |
|---|---|---|
| TINYINT | TINYINT | |
| SMALLINT | SMALLINT | |
| INT | INT | |
| BIGINT | BIGINT | |
| FLOAT | FLOAT | |
| DOUBLE | DOUBLE | |
| DECIMAL | DECIMAL | Precision preserved |
| VARCHAR | STRING | |
| CHAR | STRING | |
| TEXT | STRING | |
| DATE | DATE | |
| TIME | TIME | |
| DATETIME | TIMESTAMP | |
| TIMESTAMP | TIMESTAMP_LTZ | With timezone |
| JSON | STRING | As JSON string |
Canal format provides access to change event metadata through special fields:
CREATE TABLE canal_with_metadata (
-- Regular data columns
id BIGINT,
name STRING,
email STRING,
-- Metadata columns
canal_database STRING METADATA FROM 'database',
canal_table STRING METADATA FROM 'table',
canal_event_type STRING METADATA FROM 'type',
canal_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'ts',
canal_is_ddl BOOLEAN METADATA FROM 'isDdl'
) WITH (
'connector' = 'kafka',
'format' = 'canal-json'
-- other connector options
);ignore-parse-errors for production resilienceCanal format provides metrics for monitoring change data processing:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-json