CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-json

JSON format support library for Apache Flink table ecosystem with comprehensive serialization, deserialization, and CDC format capabilities

Pending
Overview
Eval results
Files

debezium-cdc.mddocs/

Debezium CDC Format

JSON format support for Debezium Change Data Capture system, enabling processing of database change events from various databases including MySQL, PostgreSQL, SQL Server, MongoDB, and Oracle. Debezium produces structured change events with comprehensive metadata and optional schema information.

Capabilities

Debezium Format Configuration

Configuration options specific to Debezium CDC format, including schema inclusion and comprehensive error handling options.

/**
 * Configuration options for Debezium JSON format
 */
public class DebeziumJsonFormatOptions {
    
    /** Whether schema information is included in messages (default: false) */
    public static final ConfigOption<Boolean> SCHEMA_INCLUDE;
    
    /** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */
    public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
    
    /** Timestamp format pattern (inherited from JsonFormatOptions) */
    public static final ConfigOption<String> TIMESTAMP_FORMAT;
    
    /** How to handle null keys in maps (inherited from JsonFormatOptions) */
    public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
    
    /** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */
    public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;
}

Configuration Usage:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions;

// Configure Debezium format options
Configuration config = new Configuration();
config.set(DebeziumJsonFormatOptions.SCHEMA_INCLUDE, false);
config.set(DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
config.set(DebeziumJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");

Debezium JSON Structure

Debezium produces change events with a standardized envelope structure:

With Schema (when SCHEMA_INCLUDE = true)

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {"field": "id", "type": "int32"},
          {"field": "name", "type": "string"},
          {"field": "email", "type": "string"}
        ],
        "optional": false,
        "name": "users.Value"
      },
      {
        "type": "struct",
        "fields": [
          {"field": "version", "type": "string"},
          {"field": "connector", "type": "string"},
          {"field": "name", "type": "string"},
          {"field": "ts_ms", "type": "int64"},
          {"field": "snapshot", "type": "string"},
          {"field": "db", "type": "string"},
          {"field": "table", "type": "string"},
          {"field": "server_id", "type": "int64"},
          {"field": "gtid", "type": "string"},
          {"field": "file", "type": "string"},
          {"field": "pos", "type": "int64"},
          {"field": "row", "type": "int32"},
          {"field": "thread", "type": "int64"},
          {"field": "query", "type": "string"}
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source"
      }
    ],
    "optional": false,
    "name": "users.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1,
      "name": "Alice",
      "email": "alice@example.com"
    },
    "source": {
      "version": "1.9.7.Final",
      "connector": "mysql",
      "name": "mysql-server",
      "ts_ms": 1672574400000,
      "snapshot": "false",
      "db": "user_db",
      "table": "users",
      "server_id": 1,
      "gtid": null,
      "file": "mysql-bin.000001",
      "pos": 154,
      "row": 0,
      "thread": 7,
      "query": null
    },
    "op": "c",
    "ts_ms": 1672574400123,
    "transaction": null
  }
}

Without Schema (when SCHEMA_INCLUDE = false, default)

{
  "before": null,
  "after": {
    "id": 1,
    "name": "Alice",
    "email": "alice@example.com"
  },
  "source": {
    "version": "1.9.7.Final",
    "connector": "mysql",
    "name": "mysql-server",
    "ts_ms": 1672574400000,
    "snapshot": "false",
    "db": "user_db",
    "table": "users",
    "server_id": 1,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 154,
    "row": 0,
    "thread": 7,
    "query": null
  },
  "op": "c",
  "ts_ms": 1672574400123,
  "transaction": null
}

Table API Integration

SQL DDL Usage

Create tables using Debezium JSON format for change data capture processing:

CREATE TABLE debezium_source (
  id BIGINT,
  name STRING,
  email STRING,
  created_at TIMESTAMP(3),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'debezium-topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'debezium-json',
  'debezium-json.schema-include' = 'false',
  'debezium-json.ignore-parse-errors' = 'true'
);

Programmatic Table Definition

import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;

TableDescriptor debeziumTable = TableDescriptor.forConnector("kafka")
    .schema(Schema.newBuilder()
        .column("id", DataTypes.BIGINT())
        .column("name", DataTypes.STRING())
        .column("email", DataTypes.STRING())
        .column("created_at", DataTypes.TIMESTAMP(3))
        .primaryKey("id")
        .build())
    .option("topic", "debezium-topic")
    .option("properties.bootstrap.servers", "localhost:9092")
    .format("debezium-json")
    .option("debezium-json.schema-include", "false")
    .option("debezium-json.ignore-parse-errors", "true")
    .build();

Change Event Processing

Debezium format handles various types of change events with comprehensive metadata:

Insert Events (op: "c" for create)

  • before: null
  • after: New row data
  • op: "c"

Update Events (op: "u" for update)

  • before: Previous row data
  • after: Updated row data
  • op: "u"

Delete Events (op: "d" for delete)

  • before: Deleted row data
  • after: null
  • op: "d"

Read Events (op: "r" for read/snapshot)

  • before: null
  • after: Current row data (from snapshot)
  • op: "r"

Source Metadata

Debezium provides rich source metadata for tracking change origins:

FieldDescriptionExample
versionDebezium version"1.9.7.Final"
connectorSource connector type"mysql", "postgresql"
nameConnector instance name"mysql-server"
ts_msEvent timestamp (milliseconds)1672574400000
snapshotSnapshot indicator"true", "false", "last"
dbSource database name"user_db"
tableSource table name"users"
server_idDatabase server ID1
gtidGlobal Transaction IDMySQL GTID
fileBinlog file name"mysql-bin.000001"
posPosition in binlog154
rowRow number in event0
threadThread ID7
querySQL query (if available)"INSERT INTO..."

Metadata Access in Tables

Access Debezium metadata through special metadata columns:

CREATE TABLE debezium_with_metadata (
  -- Regular data columns
  id BIGINT,
  name STRING,
  email STRING,
  
  -- Metadata columns
  debezium_op STRING METADATA FROM 'op',
  debezium_source_ts TIMESTAMP_LTZ(3) METADATA FROM 'source.ts_ms',
  debezium_source_db STRING METADATA FROM 'source.db',
  debezium_source_table STRING METADATA FROM 'source.table',
  debezium_source_connector STRING METADATA FROM 'source.connector',
  debezium_source_snapshot STRING METADATA FROM 'source.snapshot'
) WITH (
  'connector' = 'kafka',
  'format' = 'debezium-json'
  -- other connector options
);

Schema Handling

Schema Inclusion

When SCHEMA_INCLUDE is enabled, Debezium messages include complete schema information:

// Enable schema inclusion for schema evolution handling
config.set(DebeziumJsonFormatOptions.SCHEMA_INCLUDE, true);

Benefits of schema inclusion:

  • Schema evolution detection
  • Data type validation
  • Field mapping verification
  • Better error diagnostics

Schema Registry Integration

For production deployments, consider using Confluent Schema Registry with Debezium:

CREATE TABLE debezium_avro_source (
  id BIGINT,
  name STRING,
  email STRING
) WITH (
  'connector' = 'kafka',
  'format' = 'debezium-avro-confluent',
  'debezium-avro-confluent.url' = 'http://schema-registry:8081'
);

Multi-Database Support

Debezium format supports multiple database systems with consistent envelope structure:

MySQL Connector

// MySQL-specific source fields
// server_id, gtid, file, pos, row, thread

PostgreSQL Connector

// PostgreSQL-specific source fields
// lsn, txId, ts_usec

SQL Server Connector

// SQL Server-specific source fields
// change_lsn, commit_lsn, event_serial_no

MongoDB Connector

// MongoDB-specific source fields
// ord, h, tord

Transaction Handling

Debezium provides transaction boundary information:

{
  "before": null,
  "after": {...},
  "source": {...},
  "op": "c",
  "ts_ms": 1672574400123,
  "transaction": {
    "id": "571",
    "total_order": 1,
    "data_collection_order": 1
  }
}

Access transaction metadata:

CREATE TABLE debezium_with_transaction (
  id BIGINT,
  name STRING,
  -- Transaction metadata
  transaction_id STRING METADATA FROM 'transaction.id',
  transaction_total_order BIGINT METADATA FROM 'transaction.total_order',
  transaction_data_collection_order BIGINT METADATA FROM 'transaction.data_collection_order'
) WITH (
  'connector' = 'kafka',
  'format' = 'debezium-json'
);

Error Handling

Configure comprehensive error handling for production deployments:

// Ignore parsing errors for resilience
config.set(DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS, true);

// Handle timestamp parsing
config.set(DebeziumJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");

// Handle null keys in nested maps
config.set(DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "DROP");

Production Considerations

Performance Optimization

  • Disable schema inclusion unless needed for schema evolution
  • Use appropriate Kafka consumer configurations
  • Consider message compression and batching

Reliability

  • Enable parse error ignoring for production resilience
  • Implement dead letter queues for failed messages
  • Monitor Debezium connector health

Schema Evolution

  • Use schema-include for environments with frequent schema changes
  • Implement schema compatibility checks
  • Plan for graceful handling of schema evolution events

Monitoring

  • Track parse success/failure rates
  • Monitor event lag and processing latency
  • Set up alerts for connector disconnections
  • Monitor data type conversion errors

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-json

docs

canal-cdc.md

core-json.md

debezium-cdc.md

index.md

maxwell-cdc.md

ogg-cdc.md

tile.json