0
# Serialization and Deserialization
1
2
Kinesis-specific serialization interfaces that provide access to stream metadata during deserialization and allow custom target stream specification during serialization.
3
4
## Capabilities
5
6
### Kinesis Deserialization Schema
7
8
Extended deserialization interface that provides access to Kinesis record metadata including partition key, sequence number, approximate arrival timestamp, stream name, and shard ID.
9
10
```java { .api }
11
@PublicEvolving
12
public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
13
14
/**
15
* Initialize the deserialization schema with context.
16
*
17
* @param context Initialization context with runtime information
18
* @throws Exception On initialization errors
19
*/
20
default void open(DeserializationSchema.InitializationContext context) throws Exception;
21
22
/**
23
* Deserialize a Kinesis record with full metadata access.
24
*
25
* @param recordValue Raw record data bytes
26
* @param partitionKey Partition key used for the record
27
* @param seqNum Sequence number of the record within the shard
28
* @param approxArrivalTimestamp Approximate arrival timestamp in milliseconds
29
* @param stream Name of the Kinesis stream
30
* @param shardId ID of the shard containing this record
31
* @return Deserialized object of type T
32
* @throws IOException On deserialization errors
33
*/
34
T deserialize(byte[] recordValue, String partitionKey, String seqNum,
35
long approxArrivalTimestamp, String stream, String shardId) throws IOException;
36
}
37
```
38
39
### Kinesis Serialization Schema
40
41
Extended serialization interface that allows specifying the target stream for each record, enabling dynamic stream routing based on record content.
42
43
```java { .api }
44
@PublicEvolving
45
public interface KinesisSerializationSchema<T> extends Serializable {
46
47
/**
48
* Initialize the serialization schema with context.
49
*
50
* @param context Initialization context with runtime information
51
* @throws Exception On initialization errors
52
*/
53
default void open(InitializationContext context) throws Exception;
54
55
/**
56
* Serialize an object to ByteBuffer for Kinesis.
57
*
58
* @param element Object to serialize
59
* @return Serialized data as ByteBuffer
60
*/
61
ByteBuffer serialize(T element);
62
63
/**
64
* Determine the target stream for this record.
65
*
66
* @param element Object to determine stream for
67
* @return Target stream name
68
*/
69
String getTargetStream(T element);
70
}
71
```
72
73
### Schema Wrapper
74
75
Internal wrapper that adapts standard Flink DeserializationSchema to KinesisDeserializationSchema, automatically handling the conversion from Kinesis-specific parameters to standard deserialization.
76
77
```java { .api }
78
@Internal
79
public class KinesisDeserializationSchemaWrapper<T> implements KinesisDeserializationSchema<T> {
80
81
/**
82
* Create wrapper around standard deserialization schema.
83
*
84
* @param deserializationSchema Standard Flink deserialization schema to wrap
85
*/
86
public KinesisDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema);
87
88
/**
89
* Deserialize using wrapped schema, ignoring Kinesis metadata.
90
*
91
* @param recordValue Raw record data bytes
92
* @param partitionKey Partition key (ignored by wrapper)
93
* @param seqNum Sequence number (ignored by wrapper)
94
* @param approxArrivalTimestamp Arrival timestamp (ignored by wrapper)
95
* @param stream Stream name (ignored by wrapper)
96
* @param shardId Shard ID (ignored by wrapper)
97
* @return Deserialized object using wrapped schema
98
* @throws IOException On deserialization errors
99
*/
100
@Override
101
public T deserialize(byte[] recordValue, String partitionKey, String seqNum,
102
long approxArrivalTimestamp, String stream, String shardId) throws IOException;
103
104
/**
105
* Get type information from wrapped schema.
106
*
107
* @return Type information from wrapped deserialization schema
108
*/
109
@Override
110
public TypeInformation<T> getProducedType();
111
112
/**
113
* Open the wrapped deserialization schema.
114
*
115
* @param context Initialization context
116
* @throws Exception On initialization errors
117
*/
118
@Override
119
public void open(DeserializationSchema.InitializationContext context) throws Exception;
120
}
121
```
122
123
## Usage Examples
124
125
### Custom Deserialization with Metadata
126
127
```java
128
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
129
import org.apache.flink.api.common.typeinfo.TypeInformation;
130
import com.fasterxml.jackson.databind.ObjectMapper;
131
import java.nio.charset.StandardCharsets;
132
133
public class EventDeserializationSchema implements KinesisDeserializationSchema<EventWithMetadata> {
134
private transient ObjectMapper objectMapper;
135
136
@Override
137
public void open(DeserializationSchema.InitializationContext context) throws Exception {
138
objectMapper = new ObjectMapper();
139
}
140
141
@Override
142
public EventWithMetadata deserialize(byte[] recordValue, String partitionKey, String seqNum,
143
long approxArrivalTimestamp, String stream, String shardId)
144
throws IOException {
145
// Parse the JSON payload
146
String json = new String(recordValue, StandardCharsets.UTF_8);
147
Event event = objectMapper.readValue(json, Event.class);
148
149
// Create enriched event with Kinesis metadata
150
EventWithMetadata enrichedEvent = new EventWithMetadata();
151
enrichedEvent.setEvent(event);
152
enrichedEvent.setPartitionKey(partitionKey);
153
enrichedEvent.setSequenceNumber(seqNum);
154
enrichedEvent.setArrivalTimestamp(approxArrivalTimestamp);
155
enrichedEvent.setStreamName(stream);
156
enrichedEvent.setShardId(shardId);
157
158
return enrichedEvent;
159
}
160
161
@Override
162
public TypeInformation<EventWithMetadata> getProducedType() {
163
return TypeInformation.of(EventWithMetadata.class);
164
}
165
}
166
```
167
168
### Event-Time Extraction from Metadata
169
170
```java
171
public class TimestampedEventSchema implements KinesisDeserializationSchema<TimestampedEvent> {
172
private transient ObjectMapper objectMapper;
173
174
@Override
175
public void open(DeserializationSchema.InitializationContext context) throws Exception {
176
objectMapper = new ObjectMapper();
177
}
178
179
@Override
180
public TimestampedEvent deserialize(byte[] recordValue, String partitionKey, String seqNum,
181
long approxArrivalTimestamp, String stream, String shardId)
182
throws IOException {
183
String json = new String(recordValue, StandardCharsets.UTF_8);
184
JsonNode node = objectMapper.readTree(json);
185
186
TimestampedEvent event = new TimestampedEvent();
187
event.setData(node.get("data").asText());
188
189
// Use event timestamp if available, otherwise use Kinesis arrival time
190
if (node.has("timestamp")) {
191
event.setEventTime(node.get("timestamp").asLong());
192
} else {
193
event.setEventTime(approxArrivalTimestamp);
194
}
195
196
// Add processing metadata
197
event.setProcessingTime(System.currentTimeMillis());
198
event.setPartitionKey(partitionKey);
199
200
return event;
201
}
202
203
@Override
204
public TypeInformation<TimestampedEvent> getProducedType() {
205
return TypeInformation.of(TimestampedEvent.class);
206
}
207
}
208
```
209
210
### Multi-Format Deserialization
211
212
```java
213
public class MultiFormatDeserializationSchema implements KinesisDeserializationSchema<GenericRecord> {
214
private transient ObjectMapper jsonMapper;
215
private transient AvroDeserializer avroDeserializer;
216
217
@Override
218
public void open(DeserializationSchema.InitializationContext context) throws Exception {
219
jsonMapper = new ObjectMapper();
220
avroDeserializer = new AvroDeserializer();
221
}
222
223
@Override
224
public GenericRecord deserialize(byte[] recordValue, String partitionKey, String seqNum,
225
long approxArrivalTimestamp, String stream, String shardId)
226
throws IOException {
227
// Detect format based on stream name or content
228
if (stream.endsWith("-json")) {
229
return deserializeJson(recordValue);
230
} else if (stream.endsWith("-avro")) {
231
return deserializeAvro(recordValue);
232
} else {
233
// Auto-detect based on content
234
return autoDetectAndDeserialize(recordValue);
235
}
236
}
237
238
private GenericRecord deserializeJson(byte[] data) throws IOException {
239
String json = new String(data, StandardCharsets.UTF_8);
240
return jsonMapper.readValue(json, GenericRecord.class);
241
}
242
243
private GenericRecord deserializeAvro(byte[] data) throws IOException {
244
return avroDeserializer.deserialize(data);
245
}
246
247
private GenericRecord autoDetectAndDeserialize(byte[] data) throws IOException {
248
// Simple heuristic: JSON starts with '{' or '['
249
if (data.length > 0 && (data[0] == '{' || data[0] == '[')) {
250
return deserializeJson(data);
251
} else {
252
return deserializeAvro(data);
253
}
254
}
255
256
@Override
257
public TypeInformation<GenericRecord> getProducedType() {
258
return TypeInformation.of(GenericRecord.class);
259
}
260
}
261
```
262
263
### Custom Serialization with Stream Routing
264
265
```java
266
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
267
import java.nio.ByteBuffer;
268
import java.nio.charset.StandardCharsets;
269
270
public class EventSerializationSchema implements KinesisSerializationSchema<Event> {
271
private transient ObjectMapper objectMapper;
272
273
@Override
274
public void open(InitializationContext context) throws Exception {
275
objectMapper = new ObjectMapper();
276
}
277
278
@Override
279
public ByteBuffer serialize(Event element) {
280
try {
281
// Add processing timestamp
282
element.setProcessedAt(System.currentTimeMillis());
283
284
// Serialize to JSON
285
String json = objectMapper.writeValueAsString(element);
286
return ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8));
287
} catch (Exception e) {
288
throw new RuntimeException("Failed to serialize event", e);
289
}
290
}
291
292
@Override
293
public String getTargetStream(Event element) {
294
// Route to different streams based on event properties
295
if (element.isHighPriority()) {
296
return "high-priority-events";
297
} else if (element.getEventType().equals("ERROR")) {
298
return "error-events";
299
} else {
300
return "normal-events";
301
}
302
}
303
}
304
```
305
306
### Dynamic Stream Routing with Tenant Isolation
307
308
```java
309
public class TenantAwareSerializationSchema implements KinesisSerializationSchema<TenantEvent> {
310
private transient ObjectMapper objectMapper;
311
312
@Override
313
public void open(InitializationContext context) throws Exception {
314
objectMapper = new ObjectMapper();
315
}
316
317
@Override
318
public ByteBuffer serialize(TenantEvent element) {
319
try {
320
// Enrich with metadata
321
element.setIngestionTime(System.currentTimeMillis());
322
element.setProcessingRegion(System.getProperty("aws.region", "unknown"));
323
324
String json = objectMapper.writeValueAsString(element);
325
return ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8));
326
} catch (Exception e) {
327
throw new RuntimeException("Failed to serialize tenant event", e);
328
}
329
}
330
331
@Override
332
public String getTargetStream(TenantEvent element) {
333
// Route to tenant-specific streams for isolation
334
String tenantId = element.getTenantId();
335
String eventType = element.getEventType();
336
337
return String.format("tenant-%s-events-%s", tenantId, eventType.toLowerCase());
338
}
339
}
340
```
341
342
### Binary Data Serialization
343
344
```java
345
public class BinaryDataSchema implements KinesisSerializationSchema<BinaryDataEvent> {
346
347
@Override
348
public ByteBuffer serialize(BinaryDataEvent element) {
349
// Handle binary data directly
350
ByteBuffer buffer = ByteBuffer.allocate(element.getDataSize() + 8);
351
352
// Add header with timestamp
353
buffer.putLong(element.getTimestamp());
354
355
// Add binary payload
356
buffer.put(element.getBinaryData());
357
358
buffer.flip();
359
return buffer;
360
}
361
362
@Override
363
public String getTargetStream(BinaryDataEvent element) {
364
// Route based on data type
365
return "binary-data-" + element.getDataType();
366
}
367
}
368
```
369
370
### Compression Support
371
372
```java
373
import java.io.ByteArrayOutputStream;
374
import java.util.zip.GZIPOutputStream;
375
import java.util.zip.GZIPInputStream;
376
377
public class CompressedSerializationSchema implements KinesisSerializationSchema<LargeEvent> {
378
private transient ObjectMapper objectMapper;
379
380
@Override
381
public void open(InitializationContext context) throws Exception {
382
objectMapper = new ObjectMapper();
383
}
384
385
@Override
386
public ByteBuffer serialize(LargeEvent element) {
387
try {
388
// Serialize to JSON first
389
String json = objectMapper.writeValueAsString(element);
390
byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
391
392
// Compress if data is large
393
if (jsonBytes.length > 1024) {
394
return ByteBuffer.wrap(compress(jsonBytes));
395
} else {
396
return ByteBuffer.wrap(jsonBytes);
397
}
398
} catch (Exception e) {
399
throw new RuntimeException("Failed to serialize and compress event", e);
400
}
401
}
402
403
private byte[] compress(byte[] data) throws IOException {
404
ByteArrayOutputStream baos = new ByteArrayOutputStream();
405
try (GZIPOutputStream gzipOut = new GZIPOutputStream(baos)) {
406
gzipOut.write(data);
407
}
408
return baos.toByteArray();
409
}
410
411
@Override
412
public String getTargetStream(LargeEvent element) {
413
return element.getStreamName();
414
}
415
}
416
```
417
418
## Error Handling in Serialization
419
420
### Robust Deserialization with Fallbacks
421
422
```java
423
public class RobustDeserializationSchema implements KinesisDeserializationSchema<Either<Event, ErrorRecord>> {
424
private transient ObjectMapper objectMapper;
425
private transient Logger logger;
426
427
@Override
428
public void open(DeserializationSchema.InitializationContext context) throws Exception {
429
objectMapper = new ObjectMapper();
430
logger = LoggerFactory.getLogger(getClass());
431
}
432
433
@Override
434
public Either<Event, ErrorRecord> deserialize(byte[] recordValue, String partitionKey, String seqNum,
435
long approxArrivalTimestamp, String stream, String shardId) {
436
try {
437
String json = new String(recordValue, StandardCharsets.UTF_8);
438
Event event = objectMapper.readValue(json, Event.class);
439
return Either.left(event);
440
} catch (Exception e) {
441
logger.warn("Failed to deserialize record from stream {} shard {}: {}",
442
stream, shardId, e.getMessage());
443
444
ErrorRecord errorRecord = new ErrorRecord();
445
errorRecord.setRawData(recordValue);
446
errorRecord.setError(e.getMessage());
447
errorRecord.setStreamName(stream);
448
errorRecord.setShardId(shardId);
449
errorRecord.setSequenceNumber(seqNum);
450
errorRecord.setTimestamp(approxArrivalTimestamp);
451
452
return Either.right(errorRecord);
453
}
454
}
455
456
@Override
457
public TypeInformation<Either<Event, ErrorRecord>> getProducedType() {
458
return new EitherTypeInfo<>(
459
TypeInformation.of(Event.class),
460
TypeInformation.of(ErrorRecord.class)
461
);
462
}
463
}
464
```
465
466
## Performance Considerations
467
468
### Efficient Object Reuse
469
470
```java
471
public class EfficientDeserializationSchema implements KinesisDeserializationSchema<Event> {
472
private transient ObjectMapper objectMapper;
473
private transient Event reusableEvent; // Reuse objects to reduce GC pressure
474
475
@Override
476
public void open(DeserializationSchema.InitializationContext context) throws Exception {
477
objectMapper = new ObjectMapper();
478
reusableEvent = new Event();
479
}
480
481
@Override
482
public Event deserialize(byte[] recordValue, String partitionKey, String seqNum,
483
long approxArrivalTimestamp, String stream, String shardId) throws IOException {
484
// Reuse object and reset fields
485
reusableEvent.reset();
486
487
// Efficient parsing without creating intermediate objects
488
JsonParser parser = objectMapper.getFactory().createParser(recordValue);
489
// ... parse fields directly into reusableEvent
490
491
return reusableEvent.copy(); // Return copy for thread safety
492
}
493
494
@Override
495
public TypeInformation<Event> getProducedType() {
496
return TypeInformation.of(Event.class);
497
}
498
}
499
```
500
501
### Batch Serialization Optimization
502
503
```java
504
public class BatchOptimizedSchema implements KinesisSerializationSchema<List<Event>> {
505
private transient ObjectMapper objectMapper;
506
private transient ByteArrayOutputStream buffer;
507
508
@Override
509
public void open(InitializationContext context) throws Exception {
510
objectMapper = new ObjectMapper();
511
buffer = new ByteArrayOutputStream(4096); // Pre-allocated buffer
512
}
513
514
@Override
515
public ByteBuffer serialize(List<Event> elements) {
516
try {
517
buffer.reset(); // Reuse buffer
518
519
// Efficient batch serialization
520
objectMapper.writeValue(buffer, elements);
521
522
return ByteBuffer.wrap(buffer.toByteArray());
523
} catch (Exception e) {
524
throw new RuntimeException("Failed to serialize event batch", e);
525
}
526
}
527
528
@Override
529
public String getTargetStream(List<Event> elements) {
530
// Route based on first event in batch
531
return elements.isEmpty() ? "default-stream" : elements.get(0).getStreamName();
532
}
533
}