or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

canal-cdc.mdcore-json.mddebezium-cdc.mdindex.mdmaxwell-cdc.mdogg-cdc.md

core-json.mddocs/

0

# Core JSON Processing

1

2

Core JSON serialization and deserialization capabilities providing the foundation for all JSON data processing in Apache Flink. These components handle conversion between Java objects and JSON with extensive configuration options for error handling, null value processing, and timestamp formatting.

3

4

## Capabilities

5

6

### JSON Deserialization Schema

7

8

Generic deserialization schema that converts JSON byte arrays into Java objects with full type safety and configurable ObjectMapper support.

9

10

```java { .api }

11

/**

12

* Generic deserialization schema for JSON data

13

* @param <T> The target type for deserialization

14

*/

15

public class JsonDeserializationSchema<T> implements DeserializationSchema<T> {

16

17

/**

18

* Create deserialization schema for specific class

19

* @param clazz Target class for deserialization

20

*/

21

public JsonDeserializationSchema(Class<T> clazz);

22

23

/**

24

* Create deserialization schema with type information

25

* @param typeInformation Flink TypeInformation for the target type

26

*/

27

public JsonDeserializationSchema(TypeInformation<T> typeInformation);

28

29

/**

30

* Create deserialization schema with custom ObjectMapper

31

* @param clazz Target class for deserialization

32

* @param mapperFactory Supplier providing custom ObjectMapper configuration

33

*/

34

public JsonDeserializationSchema(Class<T> clazz, SerializableSupplier<ObjectMapper> mapperFactory);

35

36

/**

37

* Create deserialization schema with type information and custom ObjectMapper

38

* @param typeInformation Flink TypeInformation for the target type

39

* @param mapperFactory Supplier providing custom ObjectMapper configuration

40

*/

41

public JsonDeserializationSchema(TypeInformation<T> typeInformation, SerializableSupplier<ObjectMapper> mapperFactory);

42

43

/**

44

* Initialize the schema with runtime context

45

* @param context Initialization context providing runtime information

46

*/

47

public void open(InitializationContext context) throws Exception;

48

49

/**

50

* Deserialize JSON bytes into target object

51

* @param message JSON data as byte array

52

* @return Deserialized object of type T

53

* @throws IOException When JSON parsing fails

54

*/

55

public T deserialize(byte[] message) throws IOException;

56

57

/**

58

* Get the type information for the deserialized type (inherited from DeserializationSchema)

59

* @return TypeInformation for type T

60

*/

61

public TypeInformation<T> getProducedType();

62

}

63

```

64

65

**Usage Examples:**

66

67

```java

68

import org.apache.flink.formats.json.JsonDeserializationSchema;

69

import org.apache.flink.api.common.typeinfo.TypeInformation;

70

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

71

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;

72

73

// Basic deserialization for User class

74

JsonDeserializationSchema<User> userDeserializer =

75

new JsonDeserializationSchema<>(User.class);

76

77

// Using TypeInformation

78

TypeInformation<User> userTypeInfo = TypeInformation.of(User.class);

79

JsonDeserializationSchema<User> typedDeserializer =

80

new JsonDeserializationSchema<>(userTypeInfo);

81

82

// Custom ObjectMapper configuration

83

JsonDeserializationSchema<User> customDeserializer =

84

new JsonDeserializationSchema<>(User.class, () -> {

85

ObjectMapper mapper = new ObjectMapper();

86

mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

87

return mapper;

88

});

89

90

// Deserialize JSON data

91

byte[] jsonBytes = "{\"name\":\"Alice\",\"age\":30}".getBytes();

92

User user = userDeserializer.deserialize(jsonBytes);

93

```

94

95

### JSON Serialization Schema

96

97

Generic serialization schema that converts Java objects into JSON byte arrays with configurable ObjectMapper support for custom serialization behavior.

98

99

```java { .api }

100

/**

101

* Generic serialization schema for JSON data

102

* @param <T> The source type for serialization

103

*/

104

public class JsonSerializationSchema<T> implements SerializationSchema<T> {

105

106

/**

107

* Create serialization schema with default ObjectMapper

108

*/

109

public JsonSerializationSchema();

110

111

/**

112

* Create serialization schema with custom ObjectMapper

113

* @param mapperFactory Supplier providing custom ObjectMapper configuration

114

*/

115

public JsonSerializationSchema(SerializableSupplier<ObjectMapper> mapperFactory);

116

117

/**

118

* Initialize the schema with runtime context

119

* @param context Initialization context providing runtime information

120

*/

121

public void open(InitializationContext context) throws Exception;

122

123

/**

124

* Serialize object to JSON bytes

125

* @param element Object to serialize

126

* @return JSON data as byte array

127

*/

128

public byte[] serialize(T element);

129

}

130

```

131

132

**Usage Examples:**

133

134

```java

135

import org.apache.flink.formats.json.JsonSerializationSchema;

136

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

137

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;

138

139

// Basic serialization

140

JsonSerializationSchema<User> userSerializer = new JsonSerializationSchema<>();

141

142

// Custom ObjectMapper for pretty printing

143

JsonSerializationSchema<User> prettySerializer =

144

new JsonSerializationSchema<>(() -> {

145

ObjectMapper mapper = new ObjectMapper();

146

mapper.enable(SerializationFeature.INDENT_OUTPUT);

147

return mapper;

148

});

149

150

// Serialize object to JSON

151

User user = new User("Alice", 30);

152

byte[] jsonBytes = userSerializer.serialize(user);

153

String jsonString = new String(jsonBytes); // {"name":"Alice","age":30}

154

```

155

156

### Schema Conversion Utilities

157

158

Utility for converting JSON schema strings into Flink TypeInformation, enabling automatic schema derivation for table ecosystem integration.

159

160

```java { .api }

161

/**

162

* Utility class for JSON schema conversion

163

*/

164

public final class JsonRowSchemaConverter {

165

166

/**

167

* Convert JSON schema string to Flink TypeInformation

168

* @param <T> Target type for conversion

169

* @param jsonSchema JSON schema as string

170

* @return TypeInformation representing the schema structure

171

* @throws IllegalArgumentException When schema is invalid

172

*/

173

public static <T> TypeInformation<T> convert(String jsonSchema);

174

}

175

```

176

177

**Usage Examples:**

178

179

```java

180

import org.apache.flink.formats.json.JsonRowSchemaConverter;

181

import org.apache.flink.api.common.typeinfo.TypeInformation;

182

import org.apache.flink.types.Row;

183

184

// JSON schema string

185

String jsonSchema = "{\n" +

186

" \"type\": \"object\",\n" +

187

" \"properties\": {\n" +

188

" \"name\": {\"type\": \"string\"},\n" +

189

" \"age\": {\"type\": \"integer\"},\n" +

190

" \"active\": {\"type\": \"boolean\"}\n" +

191

" }\n" +

192

"}";

193

194

// Convert to TypeInformation

195

TypeInformation<Row> typeInfo = JsonRowSchemaConverter.convert(jsonSchema);

196

197

// Use with deserialization schema

198

JsonDeserializationSchema<Row> rowDeserializer =

199

new JsonDeserializationSchema<>(typeInfo);

200

```

201

202

### Configuration Options

203

204

Comprehensive configuration options for controlling JSON processing behavior, error handling, and data formatting.

205

206

```java { .api }

207

/**

208

* Configuration options for JSON format processing

209

*/

210

public class JsonFormatOptions {

211

212

/** Whether to fail when a field is missing from JSON (default: false) */

213

public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD;

214

215

/** Whether to ignore JSON parsing errors (default: false) */

216

public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;

217

218

/** How to handle null keys in maps (default: "FAIL") */

219

public static final ConfigOption<String> MAP_NULL_KEY_MODE;

220

221

/** Literal string to use for null keys when mode is LITERAL (default: "null") */

222

public static final ConfigOption<String> MAP_NULL_KEY_LITERAL;

223

224

/** Timestamp format pattern (default: "SQL") */

225

public static final ConfigOption<String> TIMESTAMP_FORMAT;

226

227

/** Whether to encode decimals as plain numbers (default: false) */

228

public static final ConfigOption<Boolean> ENCODE_DECIMAL_AS_PLAIN_NUMBER;

229

230

/** Whether to ignore null fields during encoding (default: false) */

231

public static final ConfigOption<Boolean> ENCODE_IGNORE_NULL_FIELDS;

232

233

/** Whether to enable JSON parser for decoding (default: true) */

234

public static final ConfigOption<Boolean> DECODE_JSON_PARSER_ENABLED;

235

}

236

237

/**

238

* Enum for null key handling modes in maps

239

*/

240

public enum MapNullKeyMode {

241

/** Fail when encountering null keys */

242

FAIL,

243

/** Drop entries with null keys */

244

DROP,

245

/** Replace null keys with literal string */

246

LITERAL

247

}

248

```

249

250

**Configuration Usage:**

251

252

```java

253

import org.apache.flink.configuration.Configuration;

254

import org.apache.flink.formats.json.JsonFormatOptions;

255

256

// Configure JSON format options

257

Configuration config = new Configuration();

258

config.set(JsonFormatOptions.IGNORE_PARSE_ERRORS, true);

259

config.set(JsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");

260

config.set(JsonFormatOptions.MAP_NULL_KEY_MODE, "DROP");

261

config.set(JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER, true);

262

```

263

264

### Exception Handling

265

266

Specialized exception for JSON parsing errors with detailed error information.

267

268

```java { .api }

269

/**

270

* Exception thrown when JSON parsing fails

271

*/

272

public class JsonParseException extends RuntimeException {

273

274

/**

275

* Create exception with error message

276

* @param message Description of the parsing error

277

*/

278

public JsonParseException(String message);

279

280

/**

281

* Create exception with error message and cause

282

* @param message Description of the parsing error

283

* @param cause Underlying cause of the error

284

*/

285

public JsonParseException(String message, Throwable cause);

286

}

287

```

288

289

**Error Handling Examples:**

290

291

```java

292

import org.apache.flink.formats.json.JsonParseException;

293

294

try {

295

User user = deserializer.deserialize(malformedJsonBytes);

296

} catch (JsonParseException e) {

297

// Handle JSON parsing error

298

logger.error("Failed to parse JSON: " + e.getMessage(), e);

299

// Optionally skip the record or apply fallback logic

300

}

301

```

302

303

## Integration Patterns

304

305

### Stream Processing Integration

306

307

```java

308

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

309

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

310

311

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

312

313

// Create Kafka consumer with JSON deserialization

314

FlinkKafkaConsumer<User> consumer = new FlinkKafkaConsumer<>(

315

"user-topic",

316

new JsonDeserializationSchema<>(User.class),

317

kafkaProperties

318

);

319

320

DataStream<User> users = env.addSource(consumer);

321

```

322

323

### Table API Integration

324

325

The core JSON schemas integrate seamlessly with Flink's Table API through format factories, enabling declarative JSON processing through SQL DDL statements and programmatic table definitions.