0
# Serialization Schemas
1
2
Interfaces and implementations for serializing and deserializing Kafka messages with key-value semantics, metadata access, and type safety. These schemas provide the bridge between Flink's data types and Kafka's byte array format.
3
4
## Capabilities
5
6
### KeyedDeserializationSchema
7
8
Interface for deserializing Kafka messages with access to key, value, topic, partition, and offset information.
9
10
```java { .api }
11
public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
12
T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
13
boolean isEndOfStream(T nextElement);
14
TypeInformation<T> getProducedType();
15
}
16
```
17
18
**Methods:**
19
20
- `deserialize()` - Convert Kafka message bytes to typed objects
21
- `messageKey` - Message key as byte array (null if no key)
22
- `message` - Message value as byte array (null if tombstone/deleted)
23
- `topic` - Topic name where message originated
24
- `partition` - Partition number within the topic
25
- `offset` - Message offset within the partition
26
- **Returns:** Deserialized object (null to skip this message)
27
- **Throws:** IOException if deserialization fails
28
29
- `isEndOfStream()` - Check if element signals end of stream
30
- `nextElement` - The deserialized element to test
31
- **Returns:** true if this element should terminate the stream
32
33
- `getProducedType()` - Provide type information for Flink's type system
34
35
**Usage Example:**
36
37
```java
38
public class MyEventDeserializationSchema implements KeyedDeserializationSchema<MyEvent> {
39
@Override
40
public MyEvent deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
41
throws IOException {
42
if (message == null) {
43
return null; // Skip tombstone messages
44
}
45
46
String keyStr = messageKey != null ? new String(messageKey, StandardCharsets.UTF_8) : null;
47
String valueStr = new String(message, StandardCharsets.UTF_8);
48
49
return new MyEvent(keyStr, valueStr, topic, partition, offset, System.currentTimeMillis());
50
}
51
52
@Override
53
public boolean isEndOfStream(MyEvent nextElement) {
54
return nextElement != null && "END_MARKER".equals(nextElement.getType());
55
}
56
57
@Override
58
public TypeInformation<MyEvent> getProducedType() {
59
return TypeInformation.of(MyEvent.class);
60
}
61
}
62
```
63
64
### KeyedSerializationSchema
65
66
Interface for serializing elements to Kafka messages with separate key and value handling and optional target topic selection.
67
68
```java { .api }
69
public interface KeyedSerializationSchema<T> extends Serializable {
70
byte[] serializeKey(T element);
71
byte[] serializeValue(T element);
72
String getTargetTopic(T element);
73
}
74
```
75
76
**Methods:**
77
78
- `serializeKey()` - Extract and serialize the message key
79
- `element` - The element to serialize
80
- **Returns:** Key as byte array (null if no key)
81
82
- `serializeValue()` - Serialize the message value
83
- `element` - The element to serialize
84
- **Returns:** Value as byte array
85
86
- `getTargetTopic()` - Determine target topic for this element
87
- `element` - The element being sent
88
- **Returns:** Topic name (null to use producer's default topic)
89
90
**Usage Example:**
91
92
```java
93
public class MyEventSerializationSchema implements KeyedSerializationSchema<MyEvent> {
94
@Override
95
public byte[] serializeKey(MyEvent element) {
96
return element.getUserId() != null ?
97
element.getUserId().getBytes(StandardCharsets.UTF_8) : null;
98
}
99
100
@Override
101
public byte[] serializeValue(MyEvent element) {
102
// Use JSON serialization for the value
103
ObjectMapper mapper = new ObjectMapper();
104
try {
105
return mapper.writeValueAsBytes(element);
106
} catch (JsonProcessingException e) {
107
throw new RuntimeException("Failed to serialize event", e);
108
}
109
}
110
111
@Override
112
public String getTargetTopic(MyEvent element) {
113
// Route to different topics based on event type
114
switch (element.getType()) {
115
case "USER_ACTION":
116
return "user-actions";
117
case "SYSTEM_EVENT":
118
return "system-events";
119
default:
120
return null; // Use default topic
121
}
122
}
123
}
124
```
125
126
### Schema Wrappers
127
128
Utility classes for adapting between simple and keyed serialization interfaces.
129
130
#### KeyedDeserializationSchemaWrapper
131
132
Wraps a simple DeserializationSchema to work with the keyed interface.
133
134
```java { .api }
135
public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {
136
public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema);
137
}
138
```
139
140
**Usage Example:**
141
142
```java
143
// Wrap a simple string deserializer
144
DeserializationSchema<String> simpleSchema = new SimpleStringSchema();
145
KeyedDeserializationSchema<String> keyedSchema = new KeyedDeserializationSchemaWrapper<>(simpleSchema);
146
```
147
148
#### KeyedSerializationSchemaWrapper
149
150
Wraps a simple SerializationSchema to work with the keyed interface.
151
152
```java { .api }
153
public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {
154
public KeyedSerializationSchemaWrapper(SerializationSchema<T> serializationSchema);
155
}
156
```
157
158
**Usage Example:**
159
160
```java
161
// Wrap a simple string serializer
162
SerializationSchema<String> simpleSchema = new SimpleStringSchema();
163
KeyedSerializationSchema<String> keyedSchema = new KeyedSerializationSchemaWrapper<>(simpleSchema);
164
```
165
166
### Built-in Schema Implementations
167
168
#### JSONKeyValueDeserializationSchema
169
170
Deserializes JSON key-value messages into Jackson ObjectNode with optional metadata inclusion.
171
172
```java { .api }
173
public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> {
174
public JSONKeyValueDeserializationSchema(boolean includeMetadata);
175
}
176
```
177
178
**Parameters:**
179
- `includeMetadata` - Whether to include topic, partition, offset in the output
180
181
**Usage Example:**
182
183
```java
184
// Include metadata in the deserialized JSON
185
KeyedDeserializationSchema<ObjectNode> schema = new JSONKeyValueDeserializationSchema(true);
186
187
// The resulting ObjectNode will have structure:
188
// {
189
// "key": { ... }, // Original message key as JSON
190
// "value": { ... }, // Original message value as JSON
191
// "metadata": {
192
// "topic": "my-topic",
193
// "partition": 0,
194
// "offset": 12345
195
// }
196
// }
197
```
198
199
#### JSONDeserializationSchema
200
201
Simple JSON deserialization extending JsonNodeDeserializationSchema.
202
203
```java { .api }
204
public class JSONDeserializationSchema extends JsonNodeDeserializationSchema {
205
public JSONDeserializationSchema();
206
}
207
```
208
209
#### TypeInformationKeyValueSerializationSchema
210
211
Type-safe serialization for key-value pairs using Flink's type information system.
212
213
```java { .api }
214
public class TypeInformationKeyValueSerializationSchema<K, V>
215
implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K, V>> {
216
217
public TypeInformationKeyValueSerializationSchema(
218
Class<K> keyClass,
219
TypeInformation<K> keyTypeInfo,
220
Class<V> valueClass,
221
TypeInformation<V> valueTypeInfo,
222
ExecutionConfig config
223
);
224
}
225
```
226
227
**Usage Example:**
228
229
```java
230
// Create schema for String keys and Long values
231
TypeInformationKeyValueSerializationSchema<String, Long> schema =
232
new TypeInformationKeyValueSerializationSchema<>(
233
String.class,
234
BasicTypeInfo.STRING_TYPE_INFO,
235
Long.class,
236
BasicTypeInfo.LONG_TYPE_INFO,
237
env.getConfig()
238
);
239
```
240
241
#### Table-Related Schemas
242
243
JSON schemas for table API integration:
244
245
```java { .api }
246
public class JsonRowDeserializationSchema extends org.apache.flink.formats.json.JsonRowDeserializationSchema {
247
// Extends format-specific JSON row deserialization
248
}
249
250
public class JsonRowSerializationSchema extends org.apache.flink.formats.json.JsonRowSerializationSchema {
251
// Extends format-specific JSON row serialization
252
}
253
```
254
255
## Best Practices
256
257
### Error Handling
258
259
Always handle deserialization errors gracefully:
260
261
```java
262
@Override
263
public MyEvent deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
264
throws IOException {
265
try {
266
// Deserialization logic
267
return parseMessage(message);
268
} catch (Exception e) {
269
// Option 1: Skip malformed messages
270
logger.warn("Failed to deserialize message at {}:{}:{}", topic, partition, offset, e);
271
return null;
272
273
// Option 2: Create error record
274
// return new MyEvent.error(topic, partition, offset, e.getMessage());
275
276
// Option 3: Rethrow to fail the job (strictest)
277
// throw new IOException("Deserialization failed", e);
278
}
279
}
280
```
281
282
### Type Safety
283
284
Always provide accurate type information:
285
286
```java
287
@Override
288
public TypeInformation<MyEvent> getProducedType() {
289
// For POJOs
290
return TypeInformation.of(MyEvent.class);
291
292
// For generic types
293
return new TypeHint<List<MyEvent>>(){}.getTypeInfo();
294
295
// For tuples
296
return Types.TUPLE(Types.STRING, Types.LONG);
297
}
298
```
299
300
### Performance Considerations
301
302
- Reuse serializers and deserializers when possible
303
- Avoid heavy computation in serialization methods (they're called for every record)
304
- Use efficient serialization formats (Avro, Protobuf) for high-throughput scenarios
305
- Consider message size impact on network and storage