JSON format support library for Apache Flink table ecosystem with comprehensive serialization, deserialization, and CDC format capabilities
npx @tessl/cli install tessl/maven-org-apache-flink--flink-json@2.1.0Apache Flink JSON format library provides comprehensive JSON data processing capabilities within the Flink table ecosystem. It enables reading and writing JSON data with automatic schema derivation, supports both streaming and batch processing, and includes specialized Change Data Capture (CDC) format support for Canal, Debezium, Maxwell, and Oracle GoldenGate systems.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.formats.json.JsonDeserializationSchema;
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonRowSchemaConverter;For CDC formats:
import org.apache.flink.formats.json.canal.CanalJsonFormatOptions;
import org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions;
import org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions;
import org.apache.flink.formats.json.ogg.OggJsonFormatOptions;import org.apache.flink.formats.json.JsonDeserializationSchema;
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.formats.json.JsonRowSchemaConverter;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.apache.flink.configuration.ConfigOption;
// Create deserialization schema for User objects
JsonDeserializationSchema<User> deserializer =
new JsonDeserializationSchema<>(User.class);
// Create serialization schema for User objects
JsonSerializationSchema<User> serializer =
new JsonSerializationSchema<>();
// Schema conversion from JSON schema string
TypeInformation<Row> typeInfo = JsonRowSchemaConverter.convert(jsonSchemaString);
// Configure format options
ConfigOption<Boolean> ignoreParseErrors = JsonFormatOptions.IGNORE_PARSE_ERRORS;
ConfigOption<String> timestampFormat = JsonFormatOptions.TIMESTAMP_FORMAT;The Flink JSON format library is organized around several key architectural components:
This design enables both programmatic usage through schemas and declarative usage through SQL DDL statements, supporting complex data pipeline scenarios including real-time CDC processing and ETL operations.
Generic JSON serialization and deserialization capabilities for converting between Java objects and JSON data, with customizable ObjectMapper configuration and comprehensive error handling options.
public class JsonDeserializationSchema<T> {
public JsonDeserializationSchema(Class<T> clazz);
public JsonDeserializationSchema(TypeInformation<T> typeInformation);
public JsonDeserializationSchema(Class<T> clazz, SerializableSupplier<ObjectMapper> mapperFactory);
public T deserialize(byte[] message) throws IOException;
}
public class JsonSerializationSchema<T> {
public JsonSerializationSchema();
public JsonSerializationSchema(SerializableSupplier<ObjectMapper> mapperFactory);
public byte[] serialize(T element);
}Specialized JSON format support for Canal Change Data Capture system, enabling processing of MySQL binlog changes with database and table filtering capabilities.
public class CanalJsonFormatOptions {
public static final ConfigOption<String> DATABASE_INCLUDE;
public static final ConfigOption<String> TABLE_INCLUDE;
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
public static final ConfigOption<String> TIMESTAMP_FORMAT;
}JSON format support for Debezium Change Data Capture system, handling database change events with optional schema inclusion and comprehensive metadata processing.
public class DebeziumJsonFormatOptions {
public static final ConfigOption<Boolean> SCHEMA_INCLUDE;
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
public static final ConfigOption<String> TIMESTAMP_FORMAT;
}JSON format support for Maxwell's daemon CDC system, processing MySQL binlog changes with Maxwell-specific JSON structure and metadata handling.
public class MaxwellJsonFormatOptions {
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
public static final ConfigOption<String> TIMESTAMP_FORMAT;
public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
}JSON format support for Oracle GoldenGate (OGG) Change Data Capture system, enabling processing of Oracle database changes with OGG-specific JSON formatting.
public class OggJsonFormatOptions {
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
public static final ConfigOption<String> TIMESTAMP_FORMAT;
public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
}All JSON formats support comprehensive configuration options for robust production deployment:
public class JsonFormatOptions {
public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD;
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
public static final ConfigOption<String> MAP_NULL_KEY_MODE;
public static final ConfigOption<String> MAP_NULL_KEY_LITERAL;
public static final ConfigOption<String> TIMESTAMP_FORMAT;
public static final ConfigOption<Boolean> ENCODE_DECIMAL_AS_PLAIN_NUMBER;
public static final ConfigOption<Boolean> ENCODE_IGNORE_NULL_FIELDS;
public static final ConfigOption<Boolean> DECODE_JSON_PARSER_ENABLED;
}
public enum MapNullKeyMode {
FAIL, DROP, LITERAL
}public class JsonParseException extends RuntimeException {
public JsonParseException(String message);
public JsonParseException(String message, Throwable cause);
}public final class JsonRowSchemaConverter {
public static <T> TypeInformation<T> convert(String jsonSchema);
}