0
# Core JSON Processing
1
2
Core JSON serialization and deserialization capabilities providing the foundation for all JSON data processing in Apache Flink. These components handle conversion between Java objects and JSON with extensive configuration options for error handling, null value processing, and timestamp formatting.
3
4
## Capabilities
5
6
### JSON Deserialization Schema
7
8
Generic deserialization schema that converts JSON byte arrays into Java objects with full type safety and configurable ObjectMapper support.
9
10
```java { .api }
11
/**
12
* Generic deserialization schema for JSON data
13
* @param <T> The target type for deserialization
14
*/
15
public class JsonDeserializationSchema<T> implements DeserializationSchema<T> {
16
17
/**
18
* Create deserialization schema for specific class
19
* @param clazz Target class for deserialization
20
*/
21
public JsonDeserializationSchema(Class<T> clazz);
22
23
/**
24
* Create deserialization schema with type information
25
* @param typeInformation Flink TypeInformation for the target type
26
*/
27
public JsonDeserializationSchema(TypeInformation<T> typeInformation);
28
29
/**
30
* Create deserialization schema with custom ObjectMapper
31
* @param clazz Target class for deserialization
32
* @param mapperFactory Supplier providing custom ObjectMapper configuration
33
*/
34
public JsonDeserializationSchema(Class<T> clazz, SerializableSupplier<ObjectMapper> mapperFactory);
35
36
/**
37
* Create deserialization schema with type information and custom ObjectMapper
38
* @param typeInformation Flink TypeInformation for the target type
39
* @param mapperFactory Supplier providing custom ObjectMapper configuration
40
*/
41
public JsonDeserializationSchema(TypeInformation<T> typeInformation, SerializableSupplier<ObjectMapper> mapperFactory);
42
43
/**
44
* Initialize the schema with runtime context
45
* @param context Initialization context providing runtime information
46
*/
47
public void open(InitializationContext context) throws Exception;
48
49
/**
50
* Deserialize JSON bytes into target object
51
* @param message JSON data as byte array
52
* @return Deserialized object of type T
53
* @throws IOException When JSON parsing fails
54
*/
55
public T deserialize(byte[] message) throws IOException;
56
57
/**
58
* Get the type information for the deserialized type (inherited from DeserializationSchema)
59
* @return TypeInformation for type T
60
*/
61
public TypeInformation<T> getProducedType();
62
}
63
```
64
65
**Usage Examples:**
66
67
```java
68
import org.apache.flink.formats.json.JsonDeserializationSchema;
69
import org.apache.flink.api.common.typeinfo.TypeInformation;
70
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
71
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
72
73
// Basic deserialization for User class
74
JsonDeserializationSchema<User> userDeserializer =
75
new JsonDeserializationSchema<>(User.class);
76
77
// Using TypeInformation
78
TypeInformation<User> userTypeInfo = TypeInformation.of(User.class);
79
JsonDeserializationSchema<User> typedDeserializer =
80
new JsonDeserializationSchema<>(userTypeInfo);
81
82
// Custom ObjectMapper configuration
83
JsonDeserializationSchema<User> customDeserializer =
84
new JsonDeserializationSchema<>(User.class, () -> {
85
ObjectMapper mapper = new ObjectMapper();
86
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
87
return mapper;
88
});
89
90
// Deserialize JSON data
91
byte[] jsonBytes = "{\"name\":\"Alice\",\"age\":30}".getBytes();
92
User user = userDeserializer.deserialize(jsonBytes);
93
```
94
95
### JSON Serialization Schema
96
97
Generic serialization schema that converts Java objects into JSON byte arrays with configurable ObjectMapper support for custom serialization behavior.
98
99
```java { .api }
100
/**
101
* Generic serialization schema for JSON data
102
* @param <T> The source type for serialization
103
*/
104
public class JsonSerializationSchema<T> implements SerializationSchema<T> {
105
106
/**
107
* Create serialization schema with default ObjectMapper
108
*/
109
public JsonSerializationSchema();
110
111
/**
112
* Create serialization schema with custom ObjectMapper
113
* @param mapperFactory Supplier providing custom ObjectMapper configuration
114
*/
115
public JsonSerializationSchema(SerializableSupplier<ObjectMapper> mapperFactory);
116
117
/**
118
* Initialize the schema with runtime context
119
* @param context Initialization context providing runtime information
120
*/
121
public void open(InitializationContext context) throws Exception;
122
123
/**
124
* Serialize object to JSON bytes
125
* @param element Object to serialize
126
* @return JSON data as byte array
127
*/
128
public byte[] serialize(T element);
129
}
130
```
131
132
**Usage Examples:**
133
134
```java
135
import org.apache.flink.formats.json.JsonSerializationSchema;
136
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
137
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
138
139
// Basic serialization
140
JsonSerializationSchema<User> userSerializer = new JsonSerializationSchema<>();
141
142
// Custom ObjectMapper for pretty printing
143
JsonSerializationSchema<User> prettySerializer =
144
new JsonSerializationSchema<>(() -> {
145
ObjectMapper mapper = new ObjectMapper();
146
mapper.enable(SerializationFeature.INDENT_OUTPUT);
147
return mapper;
148
});
149
150
// Serialize object to JSON
151
User user = new User("Alice", 30);
152
byte[] jsonBytes = userSerializer.serialize(user);
153
String jsonString = new String(jsonBytes); // {"name":"Alice","age":30}
154
```
155
156
### Schema Conversion Utilities
157
158
Utility for converting JSON schema strings into Flink TypeInformation, enabling automatic schema derivation for table ecosystem integration.
159
160
```java { .api }
161
/**
162
* Utility class for JSON schema conversion
163
*/
164
public final class JsonRowSchemaConverter {
165
166
/**
167
* Convert JSON schema string to Flink TypeInformation
168
* @param <T> Target type for conversion
169
* @param jsonSchema JSON schema as string
170
* @return TypeInformation representing the schema structure
171
* @throws IllegalArgumentException When schema is invalid
172
*/
173
public static <T> TypeInformation<T> convert(String jsonSchema);
174
}
175
```
176
177
**Usage Examples:**
178
179
```java
180
import org.apache.flink.formats.json.JsonRowSchemaConverter;
181
import org.apache.flink.api.common.typeinfo.TypeInformation;
182
import org.apache.flink.types.Row;
183
184
// JSON schema string
185
String jsonSchema = "{\n" +
186
" \"type\": \"object\",\n" +
187
" \"properties\": {\n" +
188
" \"name\": {\"type\": \"string\"},\n" +
189
" \"age\": {\"type\": \"integer\"},\n" +
190
" \"active\": {\"type\": \"boolean\"}\n" +
191
" }\n" +
192
"}";
193
194
// Convert to TypeInformation
195
TypeInformation<Row> typeInfo = JsonRowSchemaConverter.convert(jsonSchema);
196
197
// Use with deserialization schema
198
JsonDeserializationSchema<Row> rowDeserializer =
199
new JsonDeserializationSchema<>(typeInfo);
200
```
201
202
### Configuration Options
203
204
Comprehensive configuration options for controlling JSON processing behavior, error handling, and data formatting.
205
206
```java { .api }
207
/**
208
* Configuration options for JSON format processing
209
*/
210
public class JsonFormatOptions {
211
212
/** Whether to fail when a field is missing from JSON (default: false) */
213
public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD;
214
215
/** Whether to ignore JSON parsing errors (default: false) */
216
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
217
218
/** How to handle null keys in maps (default: "FAIL") */
219
public static final ConfigOption<String> MAP_NULL_KEY_MODE;
220
221
/** Literal string to use for null keys when mode is LITERAL (default: "null") */
222
public static final ConfigOption<String> MAP_NULL_KEY_LITERAL;
223
224
/** Timestamp format pattern (default: "SQL") */
225
public static final ConfigOption<String> TIMESTAMP_FORMAT;
226
227
/** Whether to encode decimals as plain numbers (default: false) */
228
public static final ConfigOption<Boolean> ENCODE_DECIMAL_AS_PLAIN_NUMBER;
229
230
/** Whether to ignore null fields during encoding (default: false) */
231
public static final ConfigOption<Boolean> ENCODE_IGNORE_NULL_FIELDS;
232
233
/** Whether to enable JSON parser for decoding (default: true) */
234
public static final ConfigOption<Boolean> DECODE_JSON_PARSER_ENABLED;
235
}
236
237
/**
238
* Enum for null key handling modes in maps
239
*/
240
public enum MapNullKeyMode {
241
/** Fail when encountering null keys */
242
FAIL,
243
/** Drop entries with null keys */
244
DROP,
245
/** Replace null keys with literal string */
246
LITERAL
247
}
248
```
249
250
**Configuration Usage:**
251
252
```java
253
import org.apache.flink.configuration.Configuration;
254
import org.apache.flink.formats.json.JsonFormatOptions;
255
256
// Configure JSON format options
257
Configuration config = new Configuration();
258
config.set(JsonFormatOptions.IGNORE_PARSE_ERRORS, true);
259
config.set(JsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
260
config.set(JsonFormatOptions.MAP_NULL_KEY_MODE, "DROP");
261
config.set(JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER, true);
262
```
263
264
### Exception Handling
265
266
Specialized exception for JSON parsing errors with detailed error information.
267
268
```java { .api }
269
/**
270
* Exception thrown when JSON parsing fails
271
*/
272
public class JsonParseException extends RuntimeException {
273
274
/**
275
* Create exception with error message
276
* @param message Description of the parsing error
277
*/
278
public JsonParseException(String message);
279
280
/**
281
* Create exception with error message and cause
282
* @param message Description of the parsing error
283
* @param cause Underlying cause of the error
284
*/
285
public JsonParseException(String message, Throwable cause);
286
}
287
```
288
289
**Error Handling Examples:**
290
291
```java
292
import org.apache.flink.formats.json.JsonParseException;
293
294
try {
295
User user = deserializer.deserialize(malformedJsonBytes);
296
} catch (JsonParseException e) {
297
// Handle JSON parsing error
298
logger.error("Failed to parse JSON: " + e.getMessage(), e);
299
// Optionally skip the record or apply fallback logic
300
}
301
```
302
303
## Integration Patterns
304
305
### Stream Processing Integration
306
307
```java
308
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
309
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
310
311
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
312
313
// Create Kafka consumer with JSON deserialization
314
FlinkKafkaConsumer<User> consumer = new FlinkKafkaConsumer<>(
315
"user-topic",
316
new JsonDeserializationSchema<>(User.class),
317
kafkaProperties
318
);
319
320
DataStream<User> users = env.addSource(consumer);
321
```
322
323
### Table API Integration
324
325
The core JSON schemas integrate seamlessly with Flink's Table API through format factories, enabling declarative JSON processing through SQL DDL statements and programmatic table definitions.