JSON format support library for Apache Flink table ecosystem with comprehensive serialization, deserialization, and CDC format capabilities
—
Core JSON serialization and deserialization capabilities providing the foundation for all JSON data processing in Apache Flink. These components handle conversion between Java objects and JSON with extensive configuration options for error handling, null value processing, and timestamp formatting.
Generic deserialization schema that converts JSON byte arrays into Java objects with full type safety and configurable ObjectMapper support.
/**
* Generic deserialization schema for JSON data
* @param <T> The target type for deserialization
*/
public class JsonDeserializationSchema<T> implements DeserializationSchema<T> {
/**
* Create deserialization schema for specific class
* @param clazz Target class for deserialization
*/
public JsonDeserializationSchema(Class<T> clazz);
/**
* Create deserialization schema with type information
* @param typeInformation Flink TypeInformation for the target type
*/
public JsonDeserializationSchema(TypeInformation<T> typeInformation);
/**
* Create deserialization schema with custom ObjectMapper
* @param clazz Target class for deserialization
* @param mapperFactory Supplier providing custom ObjectMapper configuration
*/
public JsonDeserializationSchema(Class<T> clazz, SerializableSupplier<ObjectMapper> mapperFactory);
/**
* Create deserialization schema with type information and custom ObjectMapper
* @param typeInformation Flink TypeInformation for the target type
* @param mapperFactory Supplier providing custom ObjectMapper configuration
*/
public JsonDeserializationSchema(TypeInformation<T> typeInformation, SerializableSupplier<ObjectMapper> mapperFactory);
/**
* Initialize the schema with runtime context
* @param context Initialization context providing runtime information
*/
public void open(InitializationContext context) throws Exception;
/**
* Deserialize JSON bytes into target object
* @param message JSON data as byte array
* @return Deserialized object of type T
* @throws IOException When JSON parsing fails
*/
public T deserialize(byte[] message) throws IOException;
/**
* Get the type information for the deserialized type (inherited from DeserializationSchema)
* @return TypeInformation for type T
*/
public TypeInformation<T> getProducedType();
}Usage Examples:
import org.apache.flink.formats.json.JsonDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
// Basic deserialization for User class
JsonDeserializationSchema<User> userDeserializer =
new JsonDeserializationSchema<>(User.class);
// Using TypeInformation
TypeInformation<User> userTypeInfo = TypeInformation.of(User.class);
JsonDeserializationSchema<User> typedDeserializer =
new JsonDeserializationSchema<>(userTypeInfo);
// Custom ObjectMapper configuration
JsonDeserializationSchema<User> customDeserializer =
new JsonDeserializationSchema<>(User.class, () -> {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return mapper;
});
// Deserialize JSON data
byte[] jsonBytes = "{\"name\":\"Alice\",\"age\":30}".getBytes();
User user = userDeserializer.deserialize(jsonBytes);Generic serialization schema that converts Java objects into JSON byte arrays with configurable ObjectMapper support for custom serialization behavior.
/**
* Generic serialization schema for JSON data
* @param <T> The source type for serialization
*/
public class JsonSerializationSchema<T> implements SerializationSchema<T> {
/**
* Create serialization schema with default ObjectMapper
*/
public JsonSerializationSchema();
/**
* Create serialization schema with custom ObjectMapper
* @param mapperFactory Supplier providing custom ObjectMapper configuration
*/
public JsonSerializationSchema(SerializableSupplier<ObjectMapper> mapperFactory);
/**
* Initialize the schema with runtime context
* @param context Initialization context providing runtime information
*/
public void open(InitializationContext context) throws Exception;
/**
* Serialize object to JSON bytes
* @param element Object to serialize
* @return JSON data as byte array
*/
public byte[] serialize(T element);
}Usage Examples:
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
// Basic serialization
JsonSerializationSchema<User> userSerializer = new JsonSerializationSchema<>();
// Custom ObjectMapper for pretty printing
JsonSerializationSchema<User> prettySerializer =
new JsonSerializationSchema<>(() -> {
ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT);
return mapper;
});
// Serialize object to JSON
User user = new User("Alice", 30);
byte[] jsonBytes = userSerializer.serialize(user);
String jsonString = new String(jsonBytes); // {"name":"Alice","age":30}Utility for converting JSON schema strings into Flink TypeInformation, enabling automatic schema derivation for table ecosystem integration.
/**
* Utility class for JSON schema conversion
*/
public final class JsonRowSchemaConverter {
/**
* Convert JSON schema string to Flink TypeInformation
* @param <T> Target type for conversion
* @param jsonSchema JSON schema as string
* @return TypeInformation representing the schema structure
* @throws IllegalArgumentException When schema is invalid
*/
public static <T> TypeInformation<T> convert(String jsonSchema);
}Usage Examples:
import org.apache.flink.formats.json.JsonRowSchemaConverter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
// JSON schema string
String jsonSchema = "{\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
" \"name\": {\"type\": \"string\"},\n" +
" \"age\": {\"type\": \"integer\"},\n" +
" \"active\": {\"type\": \"boolean\"}\n" +
" }\n" +
"}";
// Convert to TypeInformation
TypeInformation<Row> typeInfo = JsonRowSchemaConverter.convert(jsonSchema);
// Use with deserialization schema
JsonDeserializationSchema<Row> rowDeserializer =
new JsonDeserializationSchema<>(typeInfo);Comprehensive configuration options for controlling JSON processing behavior, error handling, and data formatting.
/**
* Configuration options for JSON format processing
*/
public class JsonFormatOptions {
/** Whether to fail when a field is missing from JSON (default: false) */
public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD;
/** Whether to ignore JSON parsing errors (default: false) */
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
/** How to handle null keys in maps (default: "FAIL") */
public static final ConfigOption<String> MAP_NULL_KEY_MODE;
/** Literal string to use for null keys when mode is LITERAL (default: "null") */
public static final ConfigOption<String> MAP_NULL_KEY_LITERAL;
/** Timestamp format pattern (default: "SQL") */
public static final ConfigOption<String> TIMESTAMP_FORMAT;
/** Whether to encode decimals as plain numbers (default: false) */
public static final ConfigOption<Boolean> ENCODE_DECIMAL_AS_PLAIN_NUMBER;
/** Whether to ignore null fields during encoding (default: false) */
public static final ConfigOption<Boolean> ENCODE_IGNORE_NULL_FIELDS;
/** Whether to enable JSON parser for decoding (default: true) */
public static final ConfigOption<Boolean> DECODE_JSON_PARSER_ENABLED;
}
/**
* Enum for null key handling modes in maps
*/
public enum MapNullKeyMode {
/** Fail when encountering null keys */
FAIL,
/** Drop entries with null keys */
DROP,
/** Replace null keys with literal string */
LITERAL
}Configuration Usage:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.JsonFormatOptions;
// Configure JSON format options
Configuration config = new Configuration();
config.set(JsonFormatOptions.IGNORE_PARSE_ERRORS, true);
config.set(JsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
config.set(JsonFormatOptions.MAP_NULL_KEY_MODE, "DROP");
config.set(JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER, true);Specialized exception for JSON parsing errors with detailed error information.
/**
* Exception thrown when JSON parsing fails
*/
public class JsonParseException extends RuntimeException {
/**
* Create exception with error message
* @param message Description of the parsing error
*/
public JsonParseException(String message);
/**
* Create exception with error message and cause
* @param message Description of the parsing error
* @param cause Underlying cause of the error
*/
public JsonParseException(String message, Throwable cause);
}Error Handling Examples:
import org.apache.flink.formats.json.JsonParseException;
try {
User user = deserializer.deserialize(malformedJsonBytes);
} catch (JsonParseException e) {
// Handle JSON parsing error
logger.error("Failed to parse JSON: " + e.getMessage(), e);
// Optionally skip the record or apply fallback logic
}import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create Kafka consumer with JSON deserialization
FlinkKafkaConsumer<User> consumer = new FlinkKafkaConsumer<>(
"user-topic",
new JsonDeserializationSchema<>(User.class),
kafkaProperties
);
DataStream<User> users = env.addSource(consumer);The core JSON schemas integrate seamlessly with Flink's Table API through format factories, enabling declarative JSON processing through SQL DDL statements and programmatic table definitions.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-json