or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer-base.mdindex.mdpartitioners.mdproducer-base.mdserialization.mdtable-api.md

serialization.mddocs/

0

# Serialization Schemas

1

2

Interfaces and implementations for serializing and deserializing Kafka messages with key-value semantics, metadata access, and type safety. These schemas provide the bridge between Flink's data types and Kafka's byte array format.

3

4

## Capabilities

5

6

### KeyedDeserializationSchema

7

8

Interface for deserializing Kafka messages with access to key, value, topic, partition, and offset information.

9

10

```java { .api }

11

public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

12

T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;

13

boolean isEndOfStream(T nextElement);

14

TypeInformation<T> getProducedType();

15

}

16

```

17

18

**Methods:**

19

20

- `deserialize()` - Convert Kafka message bytes to typed objects

21

- `messageKey` - Message key as byte array (null if no key)

22

- `message` - Message value as byte array (null if tombstone/deleted)

23

- `topic` - Topic name where message originated

24

- `partition` - Partition number within the topic

25

- `offset` - Message offset within the partition

26

- **Returns:** Deserialized object (null to skip this message)

27

- **Throws:** IOException if deserialization fails

28

29

- `isEndOfStream()` - Check if element signals end of stream

30

- `nextElement` - The deserialized element to test

31

- **Returns:** true if this element should terminate the stream

32

33

- `getProducedType()` - Provide type information for Flink's type system

34

35

**Usage Example:**

36

37

```java

38

public class MyEventDeserializationSchema implements KeyedDeserializationSchema<MyEvent> {

39

@Override

40

public MyEvent deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)

41

throws IOException {

42

if (message == null) {

43

return null; // Skip tombstone messages

44

}

45

46

String keyStr = messageKey != null ? new String(messageKey, StandardCharsets.UTF_8) : null;

47

String valueStr = new String(message, StandardCharsets.UTF_8);

48

49

return new MyEvent(keyStr, valueStr, topic, partition, offset, System.currentTimeMillis());

50

}

51

52

@Override

53

public boolean isEndOfStream(MyEvent nextElement) {

54

return nextElement != null && "END_MARKER".equals(nextElement.getType());

55

}

56

57

@Override

58

public TypeInformation<MyEvent> getProducedType() {

59

return TypeInformation.of(MyEvent.class);

60

}

61

}

62

```

63

64

### KeyedSerializationSchema

65

66

Interface for serializing elements to Kafka messages with separate key and value handling and optional target topic selection.

67

68

```java { .api }

69

public interface KeyedSerializationSchema<T> extends Serializable {

70

byte[] serializeKey(T element);

71

byte[] serializeValue(T element);

72

String getTargetTopic(T element);

73

}

74

```

75

76

**Methods:**

77

78

- `serializeKey()` - Extract and serialize the message key

79

- `element` - The element to serialize

80

- **Returns:** Key as byte array (null if no key)

81

82

- `serializeValue()` - Serialize the message value

83

- `element` - The element to serialize

84

- **Returns:** Value as byte array

85

86

- `getTargetTopic()` - Determine target topic for this element

87

- `element` - The element being sent

88

- **Returns:** Topic name (null to use producer's default topic)

89

90

**Usage Example:**

91

92

```java

93

public class MyEventSerializationSchema implements KeyedSerializationSchema<MyEvent> {

94

@Override

95

public byte[] serializeKey(MyEvent element) {

96

return element.getUserId() != null ?

97

element.getUserId().getBytes(StandardCharsets.UTF_8) : null;

98

}

99

100

@Override

101

public byte[] serializeValue(MyEvent element) {

102

// Use JSON serialization for the value

103

ObjectMapper mapper = new ObjectMapper();

104

try {

105

return mapper.writeValueAsBytes(element);

106

} catch (JsonProcessingException e) {

107

throw new RuntimeException("Failed to serialize event", e);

108

}

109

}

110

111

@Override

112

public String getTargetTopic(MyEvent element) {

113

// Route to different topics based on event type

114

switch (element.getType()) {

115

case "USER_ACTION":

116

return "user-actions";

117

case "SYSTEM_EVENT":

118

return "system-events";

119

default:

120

return null; // Use default topic

121

}

122

}

123

}

124

```

125

126

### Schema Wrappers

127

128

Utility classes for adapting between simple and keyed serialization interfaces.

129

130

#### KeyedDeserializationSchemaWrapper

131

132

Wraps a simple DeserializationSchema to work with the keyed interface.

133

134

```java { .api }

135

public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {

136

public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema);

137

}

138

```

139

140

**Usage Example:**

141

142

```java

143

// Wrap a simple string deserializer

144

DeserializationSchema<String> simpleSchema = new SimpleStringSchema();

145

KeyedDeserializationSchema<String> keyedSchema = new KeyedDeserializationSchemaWrapper<>(simpleSchema);

146

```

147

148

#### KeyedSerializationSchemaWrapper

149

150

Wraps a simple SerializationSchema to work with the keyed interface.

151

152

```java { .api }

153

public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {

154

public KeyedSerializationSchemaWrapper(SerializationSchema<T> serializationSchema);

155

}

156

```

157

158

**Usage Example:**

159

160

```java

161

// Wrap a simple string serializer

162

SerializationSchema<String> simpleSchema = new SimpleStringSchema();

163

KeyedSerializationSchema<String> keyedSchema = new KeyedSerializationSchemaWrapper<>(simpleSchema);

164

```

165

166

### Built-in Schema Implementations

167

168

#### JSONKeyValueDeserializationSchema

169

170

Deserializes JSON key-value messages into Jackson ObjectNode with optional metadata inclusion.

171

172

```java { .api }

173

public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> {

174

public JSONKeyValueDeserializationSchema(boolean includeMetadata);

175

}

176

```

177

178

**Parameters:**

179

- `includeMetadata` - Whether to include topic, partition, offset in the output

180

181

**Usage Example:**

182

183

```java

184

// Include metadata in the deserialized JSON

185

KeyedDeserializationSchema<ObjectNode> schema = new JSONKeyValueDeserializationSchema(true);

186

187

// The resulting ObjectNode will have structure:

188

// {

189

// "key": { ... }, // Original message key as JSON

190

// "value": { ... }, // Original message value as JSON

191

// "metadata": {

192

// "topic": "my-topic",

193

// "partition": 0,

194

// "offset": 12345

195

// }

196

// }

197

```

198

199

#### JSONDeserializationSchema

200

201

Simple JSON deserialization extending JsonNodeDeserializationSchema.

202

203

```java { .api }

204

public class JSONDeserializationSchema extends JsonNodeDeserializationSchema {

205

public JSONDeserializationSchema();

206

}

207

```

208

209

#### TypeInformationKeyValueSerializationSchema

210

211

Type-safe serialization for key-value pairs using Flink's type information system.

212

213

```java { .api }

214

public class TypeInformationKeyValueSerializationSchema<K, V>

215

implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K, V>> {

216

217

public TypeInformationKeyValueSerializationSchema(

218

Class<K> keyClass,

219

TypeInformation<K> keyTypeInfo,

220

Class<V> valueClass,

221

TypeInformation<V> valueTypeInfo,

222

ExecutionConfig config

223

);

224

}

225

```

226

227

**Usage Example:**

228

229

```java

230

// Create schema for String keys and Long values

231

TypeInformationKeyValueSerializationSchema<String, Long> schema =

232

new TypeInformationKeyValueSerializationSchema<>(

233

String.class,

234

BasicTypeInfo.STRING_TYPE_INFO,

235

Long.class,

236

BasicTypeInfo.LONG_TYPE_INFO,

237

env.getConfig()

238

);

239

```

240

241

#### Table-Related Schemas

242

243

JSON schemas for table API integration:

244

245

```java { .api }

246

public class JsonRowDeserializationSchema extends org.apache.flink.formats.json.JsonRowDeserializationSchema {

247

// Extends format-specific JSON row deserialization

248

}

249

250

public class JsonRowSerializationSchema extends org.apache.flink.formats.json.JsonRowSerializationSchema {

251

// Extends format-specific JSON row serialization

252

}

253

```

254

255

## Best Practices

256

257

### Error Handling

258

259

Always handle deserialization errors gracefully:

260

261

```java

262

@Override

263

public MyEvent deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)

264

throws IOException {

265

try {

266

// Deserialization logic

267

return parseMessage(message);

268

} catch (Exception e) {

269

// Option 1: Skip malformed messages

270

logger.warn("Failed to deserialize message at {}:{}:{}", topic, partition, offset, e);

271

return null;

272

273

// Option 2: Create error record

274

// return new MyEvent.error(topic, partition, offset, e.getMessage());

275

276

// Option 3: Rethrow to fail the job (strictest)

277

// throw new IOException("Deserialization failed", e);

278

}

279

}

280

```

281

282

### Type Safety

283

284

Always provide accurate type information:

285

286

```java

287

@Override

288

public TypeInformation<MyEvent> getProducedType() {

289

// For POJOs

290

return TypeInformation.of(MyEvent.class);

291

292

// For generic types

293

return new TypeHint<List<MyEvent>>(){}.getTypeInfo();

294

295

// For tuples

296

return Types.TUPLE(Types.STRING, Types.LONG);

297

}

298

```

299

300

### Performance Considerations

301

302

- Reuse serializers and deserializers when possible

303

- Avoid heavy computation in serialization methods (they're called for every record)

304

- Use efficient serialization formats (Avro, Protobuf) for high-throughput scenarios

305

- Consider message size impact on network and storage