or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-json

JSON format support library for Apache Flink table ecosystem with comprehensive serialization, deserialization, and CDC format capabilities

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-json@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-json@2.1.0

0

# Flink JSON Format

1

2

Apache Flink JSON format library provides comprehensive JSON data processing capabilities within the Flink table ecosystem. It enables reading and writing JSON data with automatic schema derivation, supports both streaming and batch processing, and includes specialized Change Data Capture (CDC) format support for Canal, Debezium, Maxwell, and Oracle GoldenGate systems.

3

4

## Package Information

5

6

- **Package Name**: flink-json

7

- **Group ID**: org.apache.flink

8

- **Language**: Java

9

- **Installation**: Add Maven dependency:

10

```xml

11

<dependency>

12

<groupId>org.apache.flink</groupId>

13

<artifactId>flink-json</artifactId>

14

<version>2.1.0</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```java

21

import org.apache.flink.formats.json.JsonDeserializationSchema;

22

import org.apache.flink.formats.json.JsonSerializationSchema;

23

import org.apache.flink.formats.json.JsonFormatOptions;

24

import org.apache.flink.formats.json.JsonRowSchemaConverter;

25

```

26

27

For CDC formats:

28

29

```java

30

import org.apache.flink.formats.json.canal.CanalJsonFormatOptions;

31

import org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions;

32

import org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions;

33

import org.apache.flink.formats.json.ogg.OggJsonFormatOptions;

34

```

35

36

## Basic Usage

37

38

```java

39

import org.apache.flink.formats.json.JsonDeserializationSchema;

40

import org.apache.flink.formats.json.JsonSerializationSchema;

41

import org.apache.flink.formats.json.JsonRowSchemaConverter;

42

import org.apache.flink.formats.json.JsonFormatOptions;

43

import org.apache.flink.api.common.typeinfo.TypeInformation;

44

import org.apache.flink.api.common.typeinfo.Types;

45

import org.apache.flink.types.Row;

46

import org.apache.flink.configuration.ConfigOption;

47

48

// Create deserialization schema for User objects

49

JsonDeserializationSchema<User> deserializer =

50

new JsonDeserializationSchema<>(User.class);

51

52

// Create serialization schema for User objects

53

JsonSerializationSchema<User> serializer =

54

new JsonSerializationSchema<>();

55

56

// Schema conversion from JSON schema string

57

TypeInformation<Row> typeInfo = JsonRowSchemaConverter.convert(jsonSchemaString);

58

59

// Configure format options

60

ConfigOption<Boolean> ignoreParseErrors = JsonFormatOptions.IGNORE_PARSE_ERRORS;

61

ConfigOption<String> timestampFormat = JsonFormatOptions.TIMESTAMP_FORMAT;

62

```

63

64

## Architecture

65

66

The Flink JSON format library is organized around several key architectural components:

67

68

- **Core Serialization/Deserialization**: Generic schemas for converting between Java objects and JSON

69

- **Schema Conversion**: Utilities for converting JSON schemas to Flink TypeInformation

70

- **Configuration System**: Extensive ConfigOption-based configuration for error handling, timestamp formats, and null value handling

71

- **CDC Format Specialization**: Dedicated format support for Change Data Capture systems with system-specific metadata handling

72

- **Table Ecosystem Integration**: Seamless integration with Flink's table connectors and SQL layer through factory pattern implementations

73

74

This design enables both programmatic usage through schemas and declarative usage through SQL DDL statements, supporting complex data pipeline scenarios including real-time CDC processing and ETL operations.

75

76

## Capabilities

77

78

### Core JSON Processing

79

80

Generic JSON serialization and deserialization capabilities for converting between Java objects and JSON data, with customizable ObjectMapper configuration and comprehensive error handling options.

81

82

```java { .api }

83

public class JsonDeserializationSchema<T> {

84

public JsonDeserializationSchema(Class<T> clazz);

85

public JsonDeserializationSchema(TypeInformation<T> typeInformation);

86

public JsonDeserializationSchema(Class<T> clazz, SerializableSupplier<ObjectMapper> mapperFactory);

87

public T deserialize(byte[] message) throws IOException;

88

}

89

90

public class JsonSerializationSchema<T> {

91

public JsonSerializationSchema();

92

public JsonSerializationSchema(SerializableSupplier<ObjectMapper> mapperFactory);

93

public byte[] serialize(T element);

94

}

95

```

96

97

[Core JSON Processing](./core-json.md)

98

99

### Canal CDC Format

100

101

Specialized JSON format support for Canal Change Data Capture system, enabling processing of MySQL binlog changes with database and table filtering capabilities.

102

103

```java { .api }

104

public class CanalJsonFormatOptions {

105

public static final ConfigOption<String> DATABASE_INCLUDE;

106

public static final ConfigOption<String> TABLE_INCLUDE;

107

public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;

108

public static final ConfigOption<String> TIMESTAMP_FORMAT;

109

}

110

```

111

112

[Canal CDC Format](./canal-cdc.md)

113

114

### Debezium CDC Format

115

116

JSON format support for Debezium Change Data Capture system, handling database change events with optional schema inclusion and comprehensive metadata processing.

117

118

```java { .api }

119

public class DebeziumJsonFormatOptions {

120

public static final ConfigOption<Boolean> SCHEMA_INCLUDE;

121

public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;

122

public static final ConfigOption<String> TIMESTAMP_FORMAT;

123

}

124

```

125

126

[Debezium CDC Format](./debezium-cdc.md)

127

128

### Maxwell CDC Format

129

130

JSON format support for Maxwell's daemon CDC system, processing MySQL binlog changes with Maxwell-specific JSON structure and metadata handling.

131

132

```java { .api }

133

public class MaxwellJsonFormatOptions {

134

public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;

135

public static final ConfigOption<String> TIMESTAMP_FORMAT;

136

public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;

137

}

138

```

139

140

[Maxwell CDC Format](./maxwell-cdc.md)

141

142

### Oracle GoldenGate CDC Format

143

144

JSON format support for Oracle GoldenGate (OGG) Change Data Capture system, enabling processing of Oracle database changes with OGG-specific JSON formatting.

145

146

```java { .api }

147

public class OggJsonFormatOptions {

148

public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;

149

public static final ConfigOption<String> TIMESTAMP_FORMAT;

150

public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;

151

}

152

```

153

154

[Oracle GoldenGate CDC Format](./ogg-cdc.md)

155

156

## Configuration Options

157

158

All JSON formats support comprehensive configuration options for robust production deployment:

159

160

```java { .api }

161

public class JsonFormatOptions {

162

public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD;

163

public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;

164

public static final ConfigOption<String> MAP_NULL_KEY_MODE;

165

public static final ConfigOption<String> MAP_NULL_KEY_LITERAL;

166

public static final ConfigOption<String> TIMESTAMP_FORMAT;

167

public static final ConfigOption<Boolean> ENCODE_DECIMAL_AS_PLAIN_NUMBER;

168

public static final ConfigOption<Boolean> ENCODE_IGNORE_NULL_FIELDS;

169

public static final ConfigOption<Boolean> DECODE_JSON_PARSER_ENABLED;

170

}

171

172

public enum MapNullKeyMode {

173

FAIL, DROP, LITERAL

174

}

175

```

176

177

## Exception Handling

178

179

```java { .api }

180

public class JsonParseException extends RuntimeException {

181

public JsonParseException(String message);

182

public JsonParseException(String message, Throwable cause);

183

}

184

```

185

186

## Schema Conversion

187

188

```java { .api }

189

public final class JsonRowSchemaConverter {

190

public static <T> TypeInformation<T> convert(String jsonSchema);

191

}

192

```