CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-json

JSON format support library for Apache Flink table ecosystem with comprehensive serialization, deserialization, and CDC format capabilities

Pending
Overview
Eval results
Files

canal-cdc.mddocs/

Canal CDC Format

JSON format support for Canal Change Data Capture system, enabling processing of MySQL binlog changes with Canal-specific JSON structure. Canal is a MySQL binlog incremental subscription and consumption service that outputs changes in a specific JSON format with database and table metadata.

Capabilities

Canal Format Configuration

Configuration options specific to Canal CDC format, including database and table filtering capabilities for selective change data processing.

/**
 * Configuration options for Canal JSON format
 */
public class CanalJsonFormatOptions {
    
    /** Regular expression to filter databases (optional) */
    public static final ConfigOption<String> DATABASE_INCLUDE;
    
    /** Regular expression to filter tables (optional) */
    public static final ConfigOption<String> TABLE_INCLUDE;
    
    /** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */
    public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
    
    /** Timestamp format pattern (inherited from JsonFormatOptions) */
    public static final ConfigOption<String> TIMESTAMP_FORMAT;
    
    /** How to handle null keys in maps (inherited from JsonFormatOptions) */
    public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
    
    /** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */
    public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;
}

Configuration Usage:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.canal.CanalJsonFormatOptions;

// Configure Canal format options
Configuration config = new Configuration();
config.set(CanalJsonFormatOptions.DATABASE_INCLUDE, "user_db|order_db");
config.set(CanalJsonFormatOptions.TABLE_INCLUDE, "users|orders|products");
config.set(CanalJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");

Canal JSON Structure

Canal produces JSON messages with the following structure for change events:

{
  "data": [
    {
      "id": "1",
      "name": "Alice",
      "email": "alice@example.com",
      "created_at": "2023-01-01 10:00:00"
    }
  ],
  "database": "user_db",
  "es": 1672574400000,
  "id": 1,
  "isDdl": false,
  "mysqlType": {
    "id": "int(11)",
    "name": "varchar(100)",
    "email": "varchar(255)",
    "created_at": "datetime"
  },
  "old": null,
  "pkNames": ["id"],
  "sql": "",
  "sqlType": {
    "id": 4,
    "name": 12,
    "email": 12,
    "created_at": 93
  },
  "table": "users",
  "ts": 1672574400123,
  "type": "INSERT"
}

Table API Integration

SQL DDL Usage

Create tables using Canal JSON format for change data capture processing:

CREATE TABLE canal_source (
  id BIGINT,
  name STRING,
  email STRING,
  created_at TIMESTAMP(3),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'canal-topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'canal-json',
  'canal-json.ignore-parse-errors' = 'true',
  'canal-json.database.include' = 'user_db',
  'canal-json.table.include' = 'users'
);

Programmatic Table Definition

import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;

TableDescriptor canalTable = TableDescriptor.forConnector("kafka")
    .schema(Schema.newBuilder()
        .column("id", DataTypes.BIGINT())
        .column("name", DataTypes.STRING())
        .column("email", DataTypes.STRING())
        .column("created_at", DataTypes.TIMESTAMP(3))
        .primaryKey("id")
        .build())
    .option("topic", "canal-topic")
    .option("properties.bootstrap.servers", "localhost:9092")
    .format("canal-json")
    .option("canal-json.ignore-parse-errors", "true")
    .option("canal-json.database.include", "user_db")
    .option("canal-json.table.include", "users")
    .build();

Change Event Processing

Canal format automatically handles change event metadata, extracting the actual data changes and making them available through the table schema:

Insert Events

  • type: "INSERT"
  • data: Array containing new row data
  • old: null

Update Events

  • type: "UPDATE"
  • data: Array containing updated row data
  • old: Array containing previous row data (before update)

Delete Events

  • type: "DELETE"
  • data: Array containing deleted row data
  • old: null

Filtering Capabilities

Database Filtering

Use regular expressions to filter which databases are processed:

// Include specific databases
config.set(CanalJsonFormatOptions.DATABASE_INCLUDE, "user_db|inventory_db");

// Exclude system databases (using negative lookahead)
config.set(CanalJsonFormatOptions.DATABASE_INCLUDE, "^(?!mysql|information_schema|performance_schema).*");

Table Filtering

Filter specific tables within included databases:

// Include specific tables
config.set(CanalJsonFormatOptions.TABLE_INCLUDE, "users|orders|products");

// Include tables with specific patterns
config.set(CanalJsonFormatOptions.TABLE_INCLUDE, "user_.*|order_.*");

Error Handling

Configure error handling for malformed Canal JSON messages:

// Ignore parsing errors and continue processing
config.set(CanalJsonFormatOptions.IGNORE_PARSE_ERRORS, true);

// Fail on parsing errors (default behavior)
config.set(CanalJsonFormatOptions.IGNORE_PARSE_ERRORS, false);

Timestamp Handling

Configure timestamp format for proper temporal processing:

// SQL standard timestamp format
config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "SQL");

// Custom timestamp format
config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");

// ISO-8601 format
config.set(CanalJsonFormatOptions.TIMESTAMP_FORMAT, "ISO-8601");

Data Type Mapping

Canal JSON format automatically maps MySQL types to Flink types:

MySQL TypeFlink TypeNotes
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMALPrecision preserved
VARCHARSTRING
CHARSTRING
TEXTSTRING
DATEDATE
TIMETIME
DATETIMETIMESTAMP
TIMESTAMPTIMESTAMP_LTZWith timezone
JSONSTRINGAs JSON string

Metadata Fields

Canal format provides access to change event metadata through special fields:

CREATE TABLE canal_with_metadata (
  -- Regular data columns
  id BIGINT,
  name STRING,
  email STRING,
  
  -- Metadata columns
  canal_database STRING METADATA FROM 'database',
  canal_table STRING METADATA FROM 'table',
  canal_event_type STRING METADATA FROM 'type',
  canal_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'ts',
  canal_is_ddl BOOLEAN METADATA FROM 'isDdl'
) WITH (
  'connector' = 'kafka',
  'format' = 'canal-json'
  -- other connector options
);

Production Considerations

Performance Optimization

  • Use database and table filtering to reduce processing overhead
  • Configure appropriate Kafka consumer settings for high-throughput scenarios
  • Consider partitioning strategies based on database/table names

Reliability

  • Enable ignore-parse-errors for production resilience
  • Implement dead letter queues for failed messages
  • Monitor Canal format parsing metrics

Monitoring

Canal format provides metrics for monitoring change data processing:

  • Parse success/failure rates
  • Filtered message counts
  • Processing latency
  • Data type conversion errors

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-json

docs

canal-cdc.md

core-json.md

debezium-cdc.md

index.md

maxwell-cdc.md

ogg-cdc.md

tile.json