CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-json

JSON format support library for Apache Flink table ecosystem with comprehensive serialization, deserialization, and CDC format capabilities

Pending
Overview
Eval results
Files

maxwell-cdc.mddocs/

Maxwell CDC Format

JSON format support for Maxwell's daemon Change Data Capture system, enabling processing of MySQL binlog changes with Maxwell-specific JSON structure. Maxwell is a MySQL change data capture application that reads binlog and outputs row updates as JSON.

Capabilities

Maxwell Format Configuration

Configuration options specific to Maxwell CDC format, providing comprehensive control over JSON processing and error handling.

/**
 * Configuration options for Maxwell JSON format
 */
public class MaxwellJsonFormatOptions {
    
    /** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */
    public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
    
    /** Timestamp format pattern (inherited from JsonFormatOptions) */
    public static final ConfigOption<String> TIMESTAMP_FORMAT;
    
    /** How to handle null keys in maps (inherited from JsonFormatOptions) */
    public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
    
    /** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */
    public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;
}

Configuration Usage:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions;

// Configure Maxwell format options
Configuration config = new Configuration();
config.set(MaxwellJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
config.set(MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "DROP");

Maxwell JSON Structure

Maxwell produces change events with a flat structure containing both data and metadata:

Insert Event

{
  "database": "user_db",
  "table": "users",
  "type": "insert",
  "ts": 1672574400,
  "xid": 1234,
  "xoffset": 0,
  "data": {
    "id": 1,
    "name": "Alice",
    "email": "alice@example.com",
    "created_at": "2023-01-01 10:00:00"
  }
}

Update Event

{
  "database": "user_db",
  "table": "users",
  "type": "update",
  "ts": 1672574401,
  "xid": 1235,
  "xoffset": 0,
  "data": {
    "id": 1,
    "name": "Alice Smith",
    "email": "alice.smith@example.com",
    "created_at": "2023-01-01 10:00:00"
  },
  "old": {
    "name": "Alice",
    "email": "alice@example.com"
  }
}

Delete Event

{
  "database": "user_db",
  "table": "users",
  "type": "delete",
  "ts": 1672574402,
  "xid": 1236,
  "xoffset": 0,
  "data": {
    "id": 1,
    "name": "Alice Smith",
    "email": "alice.smith@example.com",
    "created_at": "2023-01-01 10:00:00"
  }
}

Table API Integration

SQL DDL Usage

Create tables using Maxwell JSON format for change data capture processing:

CREATE TABLE maxwell_source (
  id BIGINT,
  name STRING,
  email STRING,
  created_at TIMESTAMP(3),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'maxwell-topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'maxwell-json',
  'maxwell-json.ignore-parse-errors' = 'true',
  'maxwell-json.timestamp-format' = 'yyyy-MM-dd HH:mm:ss'
);

Programmatic Table Definition

import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;

TableDescriptor maxwellTable = TableDescriptor.forConnector("kafka")
    .schema(Schema.newBuilder()
        .column("id", DataTypes.BIGINT())
        .column("name", DataTypes.STRING())
        .column("email", DataTypes.STRING())
        .column("created_at", DataTypes.TIMESTAMP(3))
        .primaryKey("id")
        .build())
    .option("topic", "maxwell-topic")
    .option("properties.bootstrap.servers", "localhost:9092")
    .format("maxwell-json")
    .option("maxwell-json.ignore-parse-errors", "true")
    .option("maxwell-json.timestamp-format", "yyyy-MM-dd HH:mm:ss")
    .build();

Change Event Processing

Maxwell format handles three types of change events with comprehensive metadata:

Insert Events

  • type: "insert"
  • data: New row data
  • old: Not present

Update Events

  • type: "update"
  • data: Updated row data (complete row)
  • old: Changed fields only (partial row)

Delete Events

  • type: "delete"
  • data: Deleted row data (complete row)
  • old: Not present

Maxwell Metadata Fields

Maxwell provides rich metadata for each change event:

FieldDescriptionTypeExample
databaseSource database nameString"user_db"
tableSource table nameString"users"
typeChange operation typeString"insert", "update", "delete"
tsTransaction timestamp (seconds)Long1672574400
xidTransaction IDLong1234
xoffsetPosition within transactionInteger0
commitCommit flag (for transaction end)Booleantrue
dataRow data after changeObject{...}
oldPrevious values (update only)Object{...}

Accessing Metadata in Tables

CREATE TABLE maxwell_with_metadata (
  -- Regular data columns
  id BIGINT,
  name STRING,
  email STRING,
  
  -- Metadata columns
  maxwell_database STRING METADATA FROM 'database',
  maxwell_table STRING METADATA FROM 'table',
  maxwell_type STRING METADATA FROM 'type',
  maxwell_ts TIMESTAMP_LTZ(3) METADATA FROM 'ts',
  maxwell_xid BIGINT METADATA FROM 'xid'
) WITH (
  'connector' = 'kafka',
  'format' = 'maxwell-json'
  -- other connector options
);

Data Type Mapping

Maxwell JSON format maps MySQL types to Flink types:

MySQL TypeMaxwell JSONFlink TypeNotes
TINYINTNumberTINYINT
SMALLINTNumberSMALLINT
INTNumberINT
BIGINTNumberBIGINT
FLOATNumberFLOAT
DOUBLENumberDOUBLE
DECIMALStringDECIMALExact precision
VARCHARStringSTRING
CHARStringSTRING
TEXTStringSTRING
DATEStringDATEFormat: YYYY-MM-DD
TIMEStringTIMEFormat: HH:MM:SS
DATETIMEStringTIMESTAMPConfigurable format
TIMESTAMPStringTIMESTAMP_LTZWith timezone
JSONStringSTRINGAs JSON string
BINARYStringBYTESBase64 encoded
BITNumberBOOLEANFor BIT(1)

Timestamp Handling

Configure timestamp parsing for proper temporal processing:

// Default MySQL datetime format
config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");

// ISO-8601 format
config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "ISO-8601");

// Custom format
config.set(MaxwellJsonFormatOptions.TIMESTAMP_FORMAT, "MM/dd/yyyy HH:mm:ss");

Event Timestamp vs Data Timestamps

Maxwell provides two types of timestamps:

  1. Event Timestamp (ts): When the transaction occurred
  2. Data Timestamps: Timestamp columns in the actual data
CREATE TABLE maxwell_timestamps (
  id BIGINT,
  name STRING,
  created_at TIMESTAMP(3),  -- Data timestamp
  updated_at TIMESTAMP(3),  -- Data timestamp
  
  -- Event timestamp from Maxwell
  event_time TIMESTAMP_LTZ(3) METADATA FROM 'ts'
) WITH (
  'connector' = 'kafka',
  'format' = 'maxwell-json'
);

Transaction Processing

Maxwell groups related changes by transaction:

Transaction Boundaries

{
  "database": "user_db",
  "table": "users",
  "type": "insert",
  "ts": 1672574400,
  "xid": 1234,
  "xoffset": 0,
  "data": {...}
}
{
  "database": "user_db",
  "table": "orders",
  "type": "insert",
  "ts": 1672574400,
  "xid": 1234,
  "xoffset": 1,
  "data": {...}
}
{
  "ts": 1672574400,
  "xid": 1234,
  "commit": true
}

Transaction-Aware Processing

-- Group changes by transaction
SELECT 
  maxwell_xid,
  maxwell_ts,
  COUNT(*) as change_count,
  COLLECT(maxwell_type) as change_types
FROM maxwell_with_metadata
GROUP BY maxwell_xid, maxwell_ts;

Advanced Features

Bootstrapping Support

Maxwell supports bootstrapping existing data:

{
  "database": "user_db",
  "table": "users",
  "type": "bootstrap-start",
  "ts": 1672574400,
  "data": {}
}
{
  "database": "user_db",
  "table": "users",
  "type": "bootstrap-insert",
  "ts": 1672574400,
  "data": {
    "id": 1,
    "name": "Alice"
  }
}
{
  "database": "user_db",
  "table": "users",
  "type": "bootstrap-complete",
  "ts": 1672574400,
  "data": {}
}

DDL Events

Maxwell can capture DDL changes:

{
  "database": "user_db",
  "table": "users",
  "type": "table-create",
  "ts": 1672574400,
  "sql": "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(100))"
}

Error Handling

Configure robust error handling for production deployments:

// Ignore parsing errors
config.set(MaxwellJsonFormatOptions.IGNORE_PARSE_ERRORS, true);

// Handle null keys in maps
config.set(MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "LITERAL");
config.set(MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL, "__NULL__");

Common Error Scenarios

  1. Malformed JSON: Enable IGNORE_PARSE_ERRORS
  2. Timestamp Parsing: Configure appropriate TIMESTAMP_FORMAT
  3. Large Binary Data: Consider data size limits
  4. Schema Evolution: Handle new/removed columns gracefully

Production Considerations

Performance Optimization

// Optimize for high-throughput scenarios
Properties kafkaProps = new Properties();
kafkaProps.setProperty("max.poll.records", "10000");
kafkaProps.setProperty("fetch.min.bytes", "1048576");
kafkaProps.setProperty("fetch.max.wait.ms", "500");

Reliability

  • Enable ignore-parse-errors for production resilience
  • Implement proper error handling and alerting
  • Monitor Maxwell daemon health and connectivity
  • Set up proper Kafka retention policies

Monitoring

Key metrics to monitor:

  • Maxwell lag behind MySQL binlog
  • Parse success/failure rates
  • Event processing latency
  • Transaction completion rates
  • DDL event frequency

Schema Evolution

Handle schema changes gracefully:

-- Use flexible schema definition
CREATE TABLE maxwell_flexible (
  -- Known columns
  id BIGINT,
  name STRING,
  
  -- Catch-all for new columns
  row_data STRING,  -- Full JSON for analysis
  
  -- Metadata for debugging
  maxwell_database STRING METADATA FROM 'database',
  maxwell_table STRING METADATA FROM 'table'
) WITH (
  'connector' = 'kafka',
  'format' = 'maxwell-json',
  'maxwell-json.ignore-parse-errors' = 'true'
);

Integration Patterns

Real-time Analytics

-- Real-time user activity tracking
SELECT 
  name,
  COUNT(*) as changes,
  MAX(maxwell_ts) as last_change
FROM maxwell_source
WHERE maxwell_type IN ('insert', 'update')
GROUP BY TUMBLE(maxwell_ts, INTERVAL '1' MINUTE), name;

Data Lake Integration

-- Partition by date for data lake storage
CREATE TABLE maxwell_data_lake (
  id BIGINT,
  name STRING,
  email STRING,
  change_type STRING,
  change_date DATE
) PARTITIONED BY (change_date)
WITH (
  'connector' = 's3',
  'format' = 'parquet'
);

INSERT INTO maxwell_data_lake
SELECT id, name, email, maxwell_type, DATE(maxwell_ts)
FROM maxwell_with_metadata;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-json

docs

canal-cdc.md

core-json.md

debezium-cdc.md

index.md

maxwell-cdc.md

ogg-cdc.md

tile.json