JSON format support library for Apache Flink table ecosystem with comprehensive serialization, deserialization, and CDC format capabilities
—
JSON format support for Debezium Change Data Capture system, enabling processing of database change events from various databases including MySQL, PostgreSQL, SQL Server, MongoDB, and Oracle. Debezium produces structured change events with comprehensive metadata and optional schema information.
Configuration options specific to Debezium CDC format, including schema inclusion and comprehensive error handling options.
/**
* Configuration options for Debezium JSON format
*/
public class DebeziumJsonFormatOptions {
/** Whether schema information is included in messages (default: false) */
public static final ConfigOption<Boolean> SCHEMA_INCLUDE;
/** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
/** Timestamp format pattern (inherited from JsonFormatOptions) */
public static final ConfigOption<String> TIMESTAMP_FORMAT;
/** How to handle null keys in maps (inherited from JsonFormatOptions) */
public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
/** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */
public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;
}Configuration Usage:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions;
// Configure Debezium format options
Configuration config = new Configuration();
config.set(DebeziumJsonFormatOptions.SCHEMA_INCLUDE, false);
config.set(DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
config.set(DebeziumJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");Debezium produces change events with a standardized envelope structure:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{"field": "id", "type": "int32"},
{"field": "name", "type": "string"},
{"field": "email", "type": "string"}
],
"optional": false,
"name": "users.Value"
},
{
"type": "struct",
"fields": [
{"field": "version", "type": "string"},
{"field": "connector", "type": "string"},
{"field": "name", "type": "string"},
{"field": "ts_ms", "type": "int64"},
{"field": "snapshot", "type": "string"},
{"field": "db", "type": "string"},
{"field": "table", "type": "string"},
{"field": "server_id", "type": "int64"},
{"field": "gtid", "type": "string"},
{"field": "file", "type": "string"},
{"field": "pos", "type": "int64"},
{"field": "row", "type": "int32"},
{"field": "thread", "type": "int64"},
{"field": "query", "type": "string"}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source"
}
],
"optional": false,
"name": "users.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 1,
"name": "Alice",
"email": "alice@example.com"
},
"source": {
"version": "1.9.7.Final",
"connector": "mysql",
"name": "mysql-server",
"ts_ms": 1672574400000,
"snapshot": "false",
"db": "user_db",
"table": "users",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 154,
"row": 0,
"thread": 7,
"query": null
},
"op": "c",
"ts_ms": 1672574400123,
"transaction": null
}
}{
"before": null,
"after": {
"id": 1,
"name": "Alice",
"email": "alice@example.com"
},
"source": {
"version": "1.9.7.Final",
"connector": "mysql",
"name": "mysql-server",
"ts_ms": 1672574400000,
"snapshot": "false",
"db": "user_db",
"table": "users",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 154,
"row": 0,
"thread": 7,
"query": null
},
"op": "c",
"ts_ms": 1672574400123,
"transaction": null
}Create tables using Debezium JSON format for change data capture processing:
CREATE TABLE debezium_source (
id BIGINT,
name STRING,
email STRING,
created_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'debezium-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'debezium-json',
'debezium-json.schema-include' = 'false',
'debezium-json.ignore-parse-errors' = 'true'
);import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
TableDescriptor debeziumTable = TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("email", DataTypes.STRING())
.column("created_at", DataTypes.TIMESTAMP(3))
.primaryKey("id")
.build())
.option("topic", "debezium-topic")
.option("properties.bootstrap.servers", "localhost:9092")
.format("debezium-json")
.option("debezium-json.schema-include", "false")
.option("debezium-json.ignore-parse-errors", "true")
.build();Debezium format handles various types of change events with comprehensive metadata:
before: nullafter: New row dataop: "c"before: Previous row dataafter: Updated row dataop: "u"before: Deleted row dataafter: nullop: "d"before: nullafter: Current row data (from snapshot)op: "r"Debezium provides rich source metadata for tracking change origins:
| Field | Description | Example |
|---|---|---|
| version | Debezium version | "1.9.7.Final" |
| connector | Source connector type | "mysql", "postgresql" |
| name | Connector instance name | "mysql-server" |
| ts_ms | Event timestamp (milliseconds) | 1672574400000 |
| snapshot | Snapshot indicator | "true", "false", "last" |
| db | Source database name | "user_db" |
| table | Source table name | "users" |
| server_id | Database server ID | 1 |
| gtid | Global Transaction ID | MySQL GTID |
| file | Binlog file name | "mysql-bin.000001" |
| pos | Position in binlog | 154 |
| row | Row number in event | 0 |
| thread | Thread ID | 7 |
| query | SQL query (if available) | "INSERT INTO..." |
Access Debezium metadata through special metadata columns:
CREATE TABLE debezium_with_metadata (
-- Regular data columns
id BIGINT,
name STRING,
email STRING,
-- Metadata columns
debezium_op STRING METADATA FROM 'op',
debezium_source_ts TIMESTAMP_LTZ(3) METADATA FROM 'source.ts_ms',
debezium_source_db STRING METADATA FROM 'source.db',
debezium_source_table STRING METADATA FROM 'source.table',
debezium_source_connector STRING METADATA FROM 'source.connector',
debezium_source_snapshot STRING METADATA FROM 'source.snapshot'
) WITH (
'connector' = 'kafka',
'format' = 'debezium-json'
-- other connector options
);When SCHEMA_INCLUDE is enabled, Debezium messages include complete schema information:
// Enable schema inclusion for schema evolution handling
config.set(DebeziumJsonFormatOptions.SCHEMA_INCLUDE, true);Benefits of schema inclusion:
For production deployments, consider using Confluent Schema Registry with Debezium:
CREATE TABLE debezium_avro_source (
id BIGINT,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.url' = 'http://schema-registry:8081'
);Debezium format supports multiple database systems with consistent envelope structure:
// MySQL-specific source fields
// server_id, gtid, file, pos, row, thread// PostgreSQL-specific source fields
// lsn, txId, ts_usec// SQL Server-specific source fields
// change_lsn, commit_lsn, event_serial_no// MongoDB-specific source fields
// ord, h, tordDebezium provides transaction boundary information:
{
"before": null,
"after": {...},
"source": {...},
"op": "c",
"ts_ms": 1672574400123,
"transaction": {
"id": "571",
"total_order": 1,
"data_collection_order": 1
}
}Access transaction metadata:
CREATE TABLE debezium_with_transaction (
id BIGINT,
name STRING,
-- Transaction metadata
transaction_id STRING METADATA FROM 'transaction.id',
transaction_total_order BIGINT METADATA FROM 'transaction.total_order',
transaction_data_collection_order BIGINT METADATA FROM 'transaction.data_collection_order'
) WITH (
'connector' = 'kafka',
'format' = 'debezium-json'
);Configure comprehensive error handling for production deployments:
// Ignore parsing errors for resilience
config.set(DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
// Handle timestamp parsing
config.set(DebeziumJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
// Handle null keys in nested maps
config.set(DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "DROP");Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-json