JSON format support library for Apache Flink table ecosystem with comprehensive serialization, deserialization, and CDC format capabilities
—
JSON format support for Maxwell's daemon Change Data Capture system, enabling processing of MySQL binlog changes with Maxwell-specific JSON structure. Maxwell is a MySQL change data capture application that reads binlog and outputs row updates as JSON.
Configuration options specific to Maxwell CDC format, providing comprehensive control over JSON processing and error handling.
/**
* Configuration options for Maxwell JSON format
*/
public class MaxwellJsonFormatOptions {
/** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
/** Timestamp format pattern (inherited from JsonFormatOptions) */
public static final ConfigOption<String> TIMESTAMP_FORMAT;
/** How to handle null keys in maps (inherited from JsonFormatOptions) */
public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
/** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */
public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;
}Configuration Usage:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions;
// Configure Maxwell format options
Configuration config = new Configuration();
config.set(MaxwellJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
config.set(MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "DROP");Maxwell produces change events with a flat structure containing both data and metadata:
{
"database": "user_db",
"table": "users",
"type": "insert",
"ts": 1672574400,
"xid": 1234,
"xoffset": 0,
"data": {
"id": 1,
"name": "Alice",
"email": "alice@example.com",
"created_at": "2023-01-01 10:00:00"
}
}{
"database": "user_db",
"table": "users",
"type": "update",
"ts": 1672574401,
"xid": 1235,
"xoffset": 0,
"data": {
"id": 1,
"name": "Alice Smith",
"email": "alice.smith@example.com",
"created_at": "2023-01-01 10:00:00"
},
"old": {
"name": "Alice",
"email": "alice@example.com"
}
}{
"database": "user_db",
"table": "users",
"type": "delete",
"ts": 1672574402,
"xid": 1236,
"xoffset": 0,
"data": {
"id": 1,
"name": "Alice Smith",
"email": "alice.smith@example.com",
"created_at": "2023-01-01 10:00:00"
}
}Create tables using Maxwell JSON format for change data capture processing:
CREATE TABLE maxwell_source (
id BIGINT,
name STRING,
email STRING,
created_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'maxwell-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'maxwell-json',
'maxwell-json.ignore-parse-errors' = 'true',
'maxwell-json.timestamp-format' = 'yyyy-MM-dd HH:mm:ss'
);import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
TableDescriptor maxwellTable = TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("email", DataTypes.STRING())
.column("created_at", DataTypes.TIMESTAMP(3))
.primaryKey("id")
.build())
.option("topic", "maxwell-topic")
.option("properties.bootstrap.servers", "localhost:9092")
.format("maxwell-json")
.option("maxwell-json.ignore-parse-errors", "true")
.option("maxwell-json.timestamp-format", "yyyy-MM-dd HH:mm:ss")
.build();Maxwell format handles three types of change events with comprehensive metadata:
type: "insert"data: New row dataold: Not presenttype: "update"data: Updated row data (complete row)old: Changed fields only (partial row)type: "delete"data: Deleted row data (complete row)old: Not presentMaxwell provides rich metadata for each change event:
| Field | Description | Type | Example |
|---|---|---|---|
| database | Source database name | String | "user_db" |
| table | Source table name | String | "users" |
| type | Change operation type | String | "insert", "update", "delete" |
| ts | Transaction timestamp (seconds) | Long | 1672574400 |
| xid | Transaction ID | Long | 1234 |
| xoffset | Position within transaction | Integer | 0 |
| commit | Commit flag (for transaction end) | Boolean | true |
| data | Row data after change | Object | {...} |
| old | Previous values (update only) | Object | {...} |
CREATE TABLE maxwell_with_metadata (
-- Regular data columns
id BIGINT,
name STRING,
email STRING,
-- Metadata columns
maxwell_database STRING METADATA FROM 'database',
maxwell_table STRING METADATA FROM 'table',
maxwell_type STRING METADATA FROM 'type',
maxwell_ts TIMESTAMP_LTZ(3) METADATA FROM 'ts',
maxwell_xid BIGINT METADATA FROM 'xid'
) WITH (
'connector' = 'kafka',
'format' = 'maxwell-json'
-- other connector options
);Maxwell JSON format maps MySQL types to Flink types:
| MySQL Type | Maxwell JSON | Flink Type | Notes |
|---|---|---|---|
| TINYINT | Number | TINYINT | |
| SMALLINT | Number | SMALLINT | |
| INT | Number | INT | |
| BIGINT | Number | BIGINT | |
| FLOAT | Number | FLOAT | |
| DOUBLE | Number | DOUBLE | |
| DECIMAL | String | DECIMAL | Exact precision |
| VARCHAR | String | STRING | |
| CHAR | String | STRING | |
| TEXT | String | STRING | |
| DATE | String | DATE | Format: YYYY-MM-DD |
| TIME | String | TIME | Format: HH:MM:SS |
| DATETIME | String | TIMESTAMP | Configurable format |
| TIMESTAMP | String | TIMESTAMP_LTZ | With timezone |
| JSON | String | STRING | As JSON string |
| BINARY | String | BYTES | Base64 encoded |
| BIT | Number | BOOLEAN | For BIT(1) |
Configure timestamp parsing for proper temporal processing:
// Default MySQL datetime format
config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
// ISO-8601 format
config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "ISO-8601");
// Custom format
config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "MM/dd/yyyy HH:mm:ss");Maxwell provides two types of timestamps:
ts): When the transaction occurredCREATE TABLE maxwell_timestamps (
id BIGINT,
name STRING,
created_at TIMESTAMP(3), -- Data timestamp
updated_at TIMESTAMP(3), -- Data timestamp
-- Event timestamp from Maxwell
event_time TIMESTAMP_LTZ(3) METADATA FROM 'ts'
) WITH (
'connector' = 'kafka',
'format' = 'maxwell-json'
);Maxwell groups related changes by transaction:
{
"database": "user_db",
"table": "users",
"type": "insert",
"ts": 1672574400,
"xid": 1234,
"xoffset": 0,
"data": {...}
}{
"database": "user_db",
"table": "orders",
"type": "insert",
"ts": 1672574400,
"xid": 1234,
"xoffset": 1,
"data": {...}
}{
"ts": 1672574400,
"xid": 1234,
"commit": true
}-- Group changes by transaction
SELECT
maxwell_xid,
maxwell_ts,
COUNT(*) as change_count,
COLLECT(maxwell_type) as change_types
FROM maxwell_with_metadata
GROUP BY maxwell_xid, maxwell_ts;Maxwell supports bootstrapping existing data:
{
"database": "user_db",
"table": "users",
"type": "bootstrap-start",
"ts": 1672574400,
"data": {}
}{
"database": "user_db",
"table": "users",
"type": "bootstrap-insert",
"ts": 1672574400,
"data": {
"id": 1,
"name": "Alice"
}
}{
"database": "user_db",
"table": "users",
"type": "bootstrap-complete",
"ts": 1672574400,
"data": {}
}Maxwell can capture DDL changes:
{
"database": "user_db",
"table": "users",
"type": "table-create",
"ts": 1672574400,
"sql": "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(100))"
}Configure robust error handling for production deployments:
// Ignore parsing errors
config.set(MaxwellJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
// Handle null keys in maps
config.set(MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "LITERAL");
config.set(MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL, "__NULL__");IGNORE_PARSE_ERRORSTIMESTAMP_FORMAT// Optimize for high-throughput scenarios
Properties kafkaProps = new Properties();
kafkaProps.setProperty("max.poll.records", "10000");
kafkaProps.setProperty("fetch.min.bytes", "1048576");
kafkaProps.setProperty("fetch.max.wait.ms", "500");ignore-parse-errors for production resilienceKey metrics to monitor:
Handle schema changes gracefully:
-- Use flexible schema definition
CREATE TABLE maxwell_flexible (
-- Known columns
id BIGINT,
name STRING,
-- Catch-all for new columns
row_data STRING, -- Full JSON for analysis
-- Metadata for debugging
maxwell_database STRING METADATA FROM 'database',
maxwell_table STRING METADATA FROM 'table'
) WITH (
'connector' = 'kafka',
'format' = 'maxwell-json',
'maxwell-json.ignore-parse-errors' = 'true'
);-- Real-time user activity tracking
SELECT
name,
COUNT(*) as changes,
MAX(maxwell_ts) as last_change
FROM maxwell_source
WHERE maxwell_type IN ('insert', 'update')
GROUP BY TUMBLE(maxwell_ts, INTERVAL '1' MINUTE), name;-- Partition by date for data lake storage
CREATE TABLE maxwell_data_lake (
id BIGINT,
name STRING,
email STRING,
change_type STRING,
change_date DATE
) PARTITIONED BY (change_date)
WITH (
'connector' = 's3',
'format' = 'parquet'
);
INSERT INTO maxwell_data_lake
SELECT id, name, email, maxwell_type, DATE(maxwell_ts)
FROM maxwell_with_metadata;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-json