0
# Flink JSON Format
1
2
Apache Flink JSON format library provides comprehensive JSON data processing capabilities within the Flink table ecosystem. It enables reading and writing JSON data with automatic schema derivation, supports both streaming and batch processing, and includes specialized Change Data Capture (CDC) format support for Canal, Debezium, Maxwell, and Oracle GoldenGate systems.
3
4
## Package Information
5
6
- **Package Name**: flink-json
7
- **Group ID**: org.apache.flink
8
- **Language**: Java
9
- **Installation**: Add Maven dependency:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-json</artifactId>
14
<version>2.1.0</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```java
21
import org.apache.flink.formats.json.JsonDeserializationSchema;
22
import org.apache.flink.formats.json.JsonSerializationSchema;
23
import org.apache.flink.formats.json.JsonFormatOptions;
24
import org.apache.flink.formats.json.JsonRowSchemaConverter;
25
```
26
27
For CDC formats:
28
29
```java
30
import org.apache.flink.formats.json.canal.CanalJsonFormatOptions;
31
import org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions;
32
import org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions;
33
import org.apache.flink.formats.json.ogg.OggJsonFormatOptions;
34
```
35
36
## Basic Usage
37
38
```java
39
import org.apache.flink.formats.json.JsonDeserializationSchema;
40
import org.apache.flink.formats.json.JsonSerializationSchema;
41
import org.apache.flink.formats.json.JsonRowSchemaConverter;
42
import org.apache.flink.formats.json.JsonFormatOptions;
43
import org.apache.flink.api.common.typeinfo.TypeInformation;
44
import org.apache.flink.api.common.typeinfo.Types;
45
import org.apache.flink.types.Row;
46
import org.apache.flink.configuration.ConfigOption;
47
48
// Create deserialization schema for User objects
49
JsonDeserializationSchema<User> deserializer =
50
new JsonDeserializationSchema<>(User.class);
51
52
// Create serialization schema for User objects
53
JsonSerializationSchema<User> serializer =
54
new JsonSerializationSchema<>();
55
56
// Schema conversion from JSON schema string
57
TypeInformation<Row> typeInfo = JsonRowSchemaConverter.convert(jsonSchemaString);
58
59
// Configure format options
60
ConfigOption<Boolean> ignoreParseErrors = JsonFormatOptions.IGNORE_PARSE_ERRORS;
61
ConfigOption<String> timestampFormat = JsonFormatOptions.TIMESTAMP_FORMAT;
62
```
63
64
## Architecture
65
66
The Flink JSON format library is organized around several key architectural components:
67
68
- **Core Serialization/Deserialization**: Generic schemas for converting between Java objects and JSON
69
- **Schema Conversion**: Utilities for converting JSON schemas to Flink TypeInformation
70
- **Configuration System**: Extensive ConfigOption-based configuration for error handling, timestamp formats, and null value handling
71
- **CDC Format Specialization**: Dedicated format support for Change Data Capture systems with system-specific metadata handling
72
- **Table Ecosystem Integration**: Seamless integration with Flink's table connectors and SQL layer through factory pattern implementations
73
74
This design enables both programmatic usage through schemas and declarative usage through SQL DDL statements, supporting complex data pipeline scenarios including real-time CDC processing and ETL operations.
75
76
## Capabilities
77
78
### Core JSON Processing
79
80
Generic JSON serialization and deserialization capabilities for converting between Java objects and JSON data, with customizable ObjectMapper configuration and comprehensive error handling options.
81
82
```java { .api }
83
public class JsonDeserializationSchema<T> {
84
public JsonDeserializationSchema(Class<T> clazz);
85
public JsonDeserializationSchema(TypeInformation<T> typeInformation);
86
public JsonDeserializationSchema(Class<T> clazz, SerializableSupplier<ObjectMapper> mapperFactory);
87
public T deserialize(byte[] message) throws IOException;
88
}
89
90
public class JsonSerializationSchema<T> {
91
public JsonSerializationSchema();
92
public JsonSerializationSchema(SerializableSupplier<ObjectMapper> mapperFactory);
93
public byte[] serialize(T element);
94
}
95
```
96
97
[Core JSON Processing](./core-json.md)
98
99
### Canal CDC Format
100
101
Specialized JSON format support for Canal Change Data Capture system, enabling processing of MySQL binlog changes with database and table filtering capabilities.
102
103
```java { .api }
104
public class CanalJsonFormatOptions {
105
public static final ConfigOption<String> DATABASE_INCLUDE;
106
public static final ConfigOption<String> TABLE_INCLUDE;
107
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
108
public static final ConfigOption<String> TIMESTAMP_FORMAT;
109
}
110
```
111
112
[Canal CDC Format](./canal-cdc.md)
113
114
### Debezium CDC Format
115
116
JSON format support for Debezium Change Data Capture system, handling database change events with optional schema inclusion and comprehensive metadata processing.
117
118
```java { .api }
119
public class DebeziumJsonFormatOptions {
120
public static final ConfigOption<Boolean> SCHEMA_INCLUDE;
121
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
122
public static final ConfigOption<String> TIMESTAMP_FORMAT;
123
}
124
```
125
126
[Debezium CDC Format](./debezium-cdc.md)
127
128
### Maxwell CDC Format
129
130
JSON format support for Maxwell's daemon CDC system, processing MySQL binlog changes with Maxwell-specific JSON structure and metadata handling.
131
132
```java { .api }
133
public class MaxwellJsonFormatOptions {
134
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
135
public static final ConfigOption<String> TIMESTAMP_FORMAT;
136
public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
137
}
138
```
139
140
[Maxwell CDC Format](./maxwell-cdc.md)
141
142
### Oracle GoldenGate CDC Format
143
144
JSON format support for Oracle GoldenGate (OGG) Change Data Capture system, enabling processing of Oracle database changes with OGG-specific JSON formatting.
145
146
```java { .api }
147
public class OggJsonFormatOptions {
148
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
149
public static final ConfigOption<String> TIMESTAMP_FORMAT;
150
public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
151
}
152
```
153
154
[Oracle GoldenGate CDC Format](./ogg-cdc.md)
155
156
## Configuration Options
157
158
All JSON formats support comprehensive configuration options for robust production deployment:
159
160
```java { .api }
161
public class JsonFormatOptions {
162
public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD;
163
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
164
public static final ConfigOption<String> MAP_NULL_KEY_MODE;
165
public static final ConfigOption<String> MAP_NULL_KEY_LITERAL;
166
public static final ConfigOption<String> TIMESTAMP_FORMAT;
167
public static final ConfigOption<Boolean> ENCODE_DECIMAL_AS_PLAIN_NUMBER;
168
public static final ConfigOption<Boolean> ENCODE_IGNORE_NULL_FIELDS;
169
public static final ConfigOption<Boolean> DECODE_JSON_PARSER_ENABLED;
170
}
171
172
public enum MapNullKeyMode {
173
FAIL, DROP, LITERAL
174
}
175
```
176
177
## Exception Handling
178
179
```java { .api }
180
public class JsonParseException extends RuntimeException {
181
public JsonParseException(String message);
182
public JsonParseException(String message, Throwable cause);
183
}
184
```
185
186
## Schema Conversion
187
188
```java { .api }
189
public final class JsonRowSchemaConverter {
190
public static <T> TypeInformation<T> convert(String jsonSchema);
191
}
192
```