0
# Custom Deserialization
1
2
The Pub/Sub connector provides advanced deserialization capabilities that give access to full Pub/Sub message metadata including attributes, message ID, and publish time. This is essential for applications requiring message metadata or custom deserialization logic.
3
4
## Capabilities
5
6
### PubSubDeserializationSchema Interface
7
8
Core interface for custom deserialization with metadata access.
9
10
```java { .api }
11
/**
12
* Deserialization schema for PubsubMessage objects with metadata access
13
* @param <T> Type of deserialized objects
14
*/
15
public interface PubSubDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
16
17
/**
18
* Initialization method called before working methods
19
* @param context Initialization context for metrics and configuration
20
* @throws Exception If initialization fails
21
*/
22
default void open(DeserializationSchema.InitializationContext context) throws Exception {}
23
24
/**
25
* Determine if element signals end of stream
26
* @param nextElement Element to test for end-of-stream signal
27
* @return True if element signals end of stream, false otherwise
28
*/
29
boolean isEndOfStream(T nextElement);
30
31
/**
32
* Deserialize a PubsubMessage to the target type
33
* @param message PubsubMessage to deserialize
34
* @return Deserialized object (null if message cannot be deserialized)
35
* @throws Exception If deserialization fails
36
*/
37
T deserialize(PubsubMessage message) throws Exception;
38
39
/**
40
* Deserialize PubsubMessage using collector for multiple output records
41
* @param message PubsubMessage to deserialize
42
* @param out Collector for output records
43
* @throws Exception If deserialization fails
44
*/
45
default void deserialize(PubsubMessage message, Collector<T> out) throws Exception {
46
T deserialized = deserialize(message);
47
if (deserialized != null) {
48
out.collect(deserialized);
49
}
50
}
51
52
/**
53
* Get type information for produced elements
54
* @return TypeInformation for type T
55
*/
56
TypeInformation<T> getProducedType();
57
}
58
```
59
60
### DeserializationSchemaWrapper
61
62
Adapter class that wraps standard Flink DeserializationSchema for use with PubSubSource.
63
64
```java { .api }
65
/**
66
* Wrapper that adapts DeserializationSchema to PubSubDeserializationSchema
67
* @param <T> Type of deserialized objects
68
*/
69
class DeserializationSchemaWrapper<T> implements PubSubDeserializationSchema<T> {
70
71
/**
72
* Constructor taking standard DeserializationSchema
73
* @param deserializationSchema Standard Flink deserialization schema
74
*/
75
DeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema);
76
77
// Interface methods implemented to delegate to wrapped schema
78
}
79
```
80
81
### Available Message Metadata
82
83
PubsubMessage provides access to comprehensive message metadata:
84
85
```java { .api }
86
// From com.google.pubsub.v1.PubsubMessage
87
public class PubsubMessage {
88
public ByteString getData(); // Message payload
89
public Map<String, String> getAttributesMap(); // Message attributes
90
public String getMessageId(); // Unique message identifier
91
public Timestamp getPublishTime(); // When message was published
92
// ... other metadata fields
93
}
94
```
95
96
## Usage Examples
97
98
### Basic Metadata Access
99
100
```java
101
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
102
import org.apache.flink.api.common.typeinfo.TypeInformation;
103
import com.google.pubsub.v1.PubsubMessage;
104
105
public class MessageWithMetadata {
106
public String data;
107
public String messageId;
108
public long publishTimeSeconds;
109
public Map<String, String> attributes;
110
111
public MessageWithMetadata(String data, String messageId, long publishTime, Map<String, String> attributes) {
112
this.data = data;
113
this.messageId = messageId;
114
this.publishTimeSeconds = publishTime;
115
this.attributes = attributes;
116
}
117
}
118
119
public class MetadataDeserializationSchema implements PubSubDeserializationSchema<MessageWithMetadata> {
120
121
@Override
122
public MessageWithMetadata deserialize(PubsubMessage message) throws Exception {
123
String data = message.getData().toStringUtf8();
124
String messageId = message.getMessageId();
125
long publishTime = message.getPublishTime().getSeconds();
126
Map<String, String> attributes = message.getAttributesMap();
127
128
return new MessageWithMetadata(data, messageId, publishTime, attributes);
129
}
130
131
@Override
132
public boolean isEndOfStream(MessageWithMetadata nextElement) {
133
return false; // Never signals end of stream
134
}
135
136
@Override
137
public TypeInformation<MessageWithMetadata> getProducedType() {
138
return TypeInformation.of(MessageWithMetadata.class);
139
}
140
}
141
142
// Usage
143
PubSubSource<MessageWithMetadata> source = PubSubSource.newBuilder()
144
.withDeserializationSchema(new MetadataDeserializationSchema())
145
.withProjectName("my-gcp-project")
146
.withSubscriptionName("my-subscription")
147
.build();
148
```
149
150
### Conditional Deserialization Based on Attributes
151
152
```java
153
public class ConditionalDeserializationSchema implements PubSubDeserializationSchema<String> {
154
155
@Override
156
public String deserialize(PubsubMessage message) throws Exception {
157
Map<String, String> attributes = message.getAttributesMap();
158
159
// Only process messages with specific attribute
160
if (!"IMPORTANT".equals(attributes.get("priority"))) {
161
return null; // Skip this message
162
}
163
164
// Add attribute info to the data
165
String data = message.getData().toStringUtf8();
166
String source = attributes.getOrDefault("source", "unknown");
167
168
return String.format("[%s] %s", source, data);
169
}
170
171
@Override
172
public boolean isEndOfStream(String nextElement) {
173
// End stream on special termination message
174
return "TERMINATE".equals(nextElement);
175
}
176
177
@Override
178
public TypeInformation<String> getProducedType() {
179
return TypeInformation.of(String.class);
180
}
181
}
182
```
183
184
### Multi-Record Output with Collector
185
186
```java
187
import org.apache.flink.util.Collector;
188
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
189
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
190
191
public class BatchMessageDeserializationSchema implements PubSubDeserializationSchema<String> {
192
private ObjectMapper mapper = new ObjectMapper();
193
194
@Override
195
public String deserialize(PubsubMessage message) throws Exception {
196
// This method won't be called when using collector version
197
throw new UnsupportedOperationException("Use collector version");
198
}
199
200
@Override
201
public void deserialize(PubsubMessage message, Collector<String> out) throws Exception {
202
String jsonData = message.getData().toStringUtf8();
203
JsonNode rootNode = mapper.readTree(jsonData);
204
205
// Handle batch messages - split array into individual records
206
if (rootNode.isArray()) {
207
for (JsonNode item : rootNode) {
208
out.collect(item.toString());
209
}
210
} else {
211
out.collect(jsonData);
212
}
213
}
214
215
@Override
216
public boolean isEndOfStream(String nextElement) {
217
return false;
218
}
219
220
@Override
221
public TypeInformation<String> getProducedType() {
222
return TypeInformation.of(String.class);
223
}
224
}
225
```
226
227
### Complex Event Deserialization with Validation
228
229
```java
230
public class ValidatedEventDeserializationSchema implements PubSubDeserializationSchema<Event> {
231
private ObjectMapper mapper = new ObjectMapper();
232
233
@Override
234
public void open(DeserializationSchema.InitializationContext context) throws Exception {
235
// Setup metrics for tracking deserialization errors
236
context.getMetricGroup().counter("deserialization_errors");
237
context.getMetricGroup().counter("validation_errors");
238
}
239
240
@Override
241
public Event deserialize(PubsubMessage message) throws Exception {
242
try {
243
// Parse JSON data
244
String jsonData = message.getData().toStringUtf8();
245
Event event = mapper.readValue(jsonData, Event.class);
246
247
// Add metadata
248
event.setMessageId(message.getMessageId());
249
event.setPublishTime(message.getPublishTime().getSeconds());
250
event.setAttributes(message.getAttributesMap());
251
252
// Validate event
253
if (!isValidEvent(event)) {
254
throw new IllegalArgumentException("Event validation failed");
255
}
256
257
return event;
258
259
} catch (Exception e) {
260
// Log error but don't fail processing
261
System.err.println("Failed to deserialize message: " + e.getMessage());
262
return null; // Skip invalid messages
263
}
264
}
265
266
private boolean isValidEvent(Event event) {
267
return event.getEventType() != null &&
268
event.getTimestamp() > 0 &&
269
event.getUserId() != null;
270
}
271
272
@Override
273
public boolean isEndOfStream(Event nextElement) {
274
return nextElement != null && "SHUTDOWN".equals(nextElement.getEventType());
275
}
276
277
@Override
278
public TypeInformation<Event> getProducedType() {
279
return TypeInformation.of(Event.class);
280
}
281
}
282
```
283
284
### Schema Evolution with Version Handling
285
286
```java
287
public class VersionedDeserializationSchema implements PubSubDeserializationSchema<VersionedEvent> {
288
private ObjectMapper mapper = new ObjectMapper();
289
290
@Override
291
public VersionedEvent deserialize(PubsubMessage message) throws Exception {
292
Map<String, String> attributes = message.getAttributesMap();
293
String version = attributes.getOrDefault("schema_version", "1.0");
294
295
String jsonData = message.getData().toStringUtf8();
296
297
switch (version) {
298
case "1.0":
299
return deserializeV1(jsonData);
300
case "2.0":
301
return deserializeV2(jsonData);
302
default:
303
throw new IllegalArgumentException("Unsupported schema version: " + version);
304
}
305
}
306
307
private VersionedEvent deserializeV1(String json) throws Exception {
308
EventV1 v1Event = mapper.readValue(json, EventV1.class);
309
return new VersionedEvent(v1Event.name, v1Event.value, "default_category");
310
}
311
312
private VersionedEvent deserializeV2(String json) throws Exception {
313
EventV2 v2Event = mapper.readValue(json, EventV2.class);
314
return new VersionedEvent(v2Event.name, v2Event.value, v2Event.category);
315
}
316
317
@Override
318
public boolean isEndOfStream(VersionedEvent nextElement) {
319
return false;
320
}
321
322
@Override
323
public TypeInformation<VersionedEvent> getProducedType() {
324
return TypeInformation.of(VersionedEvent.class);
325
}
326
}
327
```
328
329
## Performance Considerations
330
331
### Memory Management
332
333
- Avoid storing large metadata objects in memory
334
- Use streaming parsing for large JSON payloads
335
- Consider object pooling for high-throughput scenarios
336
337
### Error Handling
338
339
- Return `null` from `deserialize()` to skip invalid messages
340
- Use metrics to track deserialization errors
341
- Implement fallback deserialization strategies for schema evolution
342
343
### Type Safety
344
345
- Always provide accurate `TypeInformation` in `getProducedType()`
346
- Use generic type parameters correctly to maintain type safety
347
- Consider using Flink's `TypeHint` for complex generic types
348
349
## Important Notes
350
351
- **Null Handling**: Returning `null` from `deserialize()` will skip the message entirely
352
- **Exception Handling**: Uncaught exceptions in `deserialize()` will cause the source to fail
353
- **Collector Usage**: When using the collector version, don't implement the single-record `deserialize()` method
354
- **Metadata Access**: Full PubsubMessage metadata is only available through `PubSubDeserializationSchema`, not standard `DeserializationSchema`
355
- **Performance**: Metadata access has minimal performance overhead compared to standard deserialization