or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

deserialization.mdindex.mdsink.mdsource.md

deserialization.mddocs/

0

# Custom Deserialization

1

2

The Pub/Sub connector provides advanced deserialization capabilities that give access to full Pub/Sub message metadata including attributes, message ID, and publish time. This is essential for applications requiring message metadata or custom deserialization logic.

3

4

## Capabilities

5

6

### PubSubDeserializationSchema Interface

7

8

Core interface for custom deserialization with metadata access.

9

10

```java { .api }

11

/**

12

* Deserialization schema for PubsubMessage objects with metadata access

13

* @param <T> Type of deserialized objects

14

*/

15

public interface PubSubDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

16

17

/**

18

* Initialization method called before working methods

19

* @param context Initialization context for metrics and configuration

20

* @throws Exception If initialization fails

21

*/

22

default void open(DeserializationSchema.InitializationContext context) throws Exception {}

23

24

/**

25

* Determine if element signals end of stream

26

* @param nextElement Element to test for end-of-stream signal

27

* @return True if element signals end of stream, false otherwise

28

*/

29

boolean isEndOfStream(T nextElement);

30

31

/**

32

* Deserialize a PubsubMessage to the target type

33

* @param message PubsubMessage to deserialize

34

* @return Deserialized object (null if message cannot be deserialized)

35

* @throws Exception If deserialization fails

36

*/

37

T deserialize(PubsubMessage message) throws Exception;

38

39

/**

40

* Deserialize PubsubMessage using collector for multiple output records

41

* @param message PubsubMessage to deserialize

42

* @param out Collector for output records

43

* @throws Exception If deserialization fails

44

*/

45

default void deserialize(PubsubMessage message, Collector<T> out) throws Exception {

46

T deserialized = deserialize(message);

47

if (deserialized != null) {

48

out.collect(deserialized);

49

}

50

}

51

52

/**

53

* Get type information for produced elements

54

* @return TypeInformation for type T

55

*/

56

TypeInformation<T> getProducedType();

57

}

58

```

59

60

### DeserializationSchemaWrapper

61

62

Adapter class that wraps standard Flink DeserializationSchema for use with PubSubSource.

63

64

```java { .api }

65

/**

66

* Wrapper that adapts DeserializationSchema to PubSubDeserializationSchema

67

* @param <T> Type of deserialized objects

68

*/

69

class DeserializationSchemaWrapper<T> implements PubSubDeserializationSchema<T> {

70

71

/**

72

* Constructor taking standard DeserializationSchema

73

* @param deserializationSchema Standard Flink deserialization schema

74

*/

75

DeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema);

76

77

// Interface methods implemented to delegate to wrapped schema

78

}

79

```

80

81

### Available Message Metadata

82

83

PubsubMessage provides access to comprehensive message metadata:

84

85

```java { .api }

86

// From com.google.pubsub.v1.PubsubMessage

87

public class PubsubMessage {

88

public ByteString getData(); // Message payload

89

public Map<String, String> getAttributesMap(); // Message attributes

90

public String getMessageId(); // Unique message identifier

91

public Timestamp getPublishTime(); // When message was published

92

// ... other metadata fields

93

}

94

```

95

96

## Usage Examples

97

98

### Basic Metadata Access

99

100

```java

101

import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;

102

import org.apache.flink.api.common.typeinfo.TypeInformation;

103

import com.google.pubsub.v1.PubsubMessage;

104

105

public class MessageWithMetadata {

106

public String data;

107

public String messageId;

108

public long publishTimeSeconds;

109

public Map<String, String> attributes;

110

111

public MessageWithMetadata(String data, String messageId, long publishTime, Map<String, String> attributes) {

112

this.data = data;

113

this.messageId = messageId;

114

this.publishTimeSeconds = publishTime;

115

this.attributes = attributes;

116

}

117

}

118

119

public class MetadataDeserializationSchema implements PubSubDeserializationSchema<MessageWithMetadata> {

120

121

@Override

122

public MessageWithMetadata deserialize(PubsubMessage message) throws Exception {

123

String data = message.getData().toStringUtf8();

124

String messageId = message.getMessageId();

125

long publishTime = message.getPublishTime().getSeconds();

126

Map<String, String> attributes = message.getAttributesMap();

127

128

return new MessageWithMetadata(data, messageId, publishTime, attributes);

129

}

130

131

@Override

132

public boolean isEndOfStream(MessageWithMetadata nextElement) {

133

return false; // Never signals end of stream

134

}

135

136

@Override

137

public TypeInformation<MessageWithMetadata> getProducedType() {

138

return TypeInformation.of(MessageWithMetadata.class);

139

}

140

}

141

142

// Usage

143

PubSubSource<MessageWithMetadata> source = PubSubSource.newBuilder()

144

.withDeserializationSchema(new MetadataDeserializationSchema())

145

.withProjectName("my-gcp-project")

146

.withSubscriptionName("my-subscription")

147

.build();

148

```

149

150

### Conditional Deserialization Based on Attributes

151

152

```java

153

public class ConditionalDeserializationSchema implements PubSubDeserializationSchema<String> {

154

155

@Override

156

public String deserialize(PubsubMessage message) throws Exception {

157

Map<String, String> attributes = message.getAttributesMap();

158

159

// Only process messages with specific attribute

160

if (!"IMPORTANT".equals(attributes.get("priority"))) {

161

return null; // Skip this message

162

}

163

164

// Add attribute info to the data

165

String data = message.getData().toStringUtf8();

166

String source = attributes.getOrDefault("source", "unknown");

167

168

return String.format("[%s] %s", source, data);

169

}

170

171

@Override

172

public boolean isEndOfStream(String nextElement) {

173

// End stream on special termination message

174

return "TERMINATE".equals(nextElement);

175

}

176

177

@Override

178

public TypeInformation<String> getProducedType() {

179

return TypeInformation.of(String.class);

180

}

181

}

182

```

183

184

### Multi-Record Output with Collector

185

186

```java

187

import org.apache.flink.util.Collector;

188

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

189

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;

190

191

public class BatchMessageDeserializationSchema implements PubSubDeserializationSchema<String> {

192

private ObjectMapper mapper = new ObjectMapper();

193

194

@Override

195

public String deserialize(PubsubMessage message) throws Exception {

196

// This method won't be called when using collector version

197

throw new UnsupportedOperationException("Use collector version");

198

}

199

200

@Override

201

public void deserialize(PubsubMessage message, Collector<String> out) throws Exception {

202

String jsonData = message.getData().toStringUtf8();

203

JsonNode rootNode = mapper.readTree(jsonData);

204

205

// Handle batch messages - split array into individual records

206

if (rootNode.isArray()) {

207

for (JsonNode item : rootNode) {

208

out.collect(item.toString());

209

}

210

} else {

211

out.collect(jsonData);

212

}

213

}

214

215

@Override

216

public boolean isEndOfStream(String nextElement) {

217

return false;

218

}

219

220

@Override

221

public TypeInformation<String> getProducedType() {

222

return TypeInformation.of(String.class);

223

}

224

}

225

```

226

227

### Complex Event Deserialization with Validation

228

229

```java

230

public class ValidatedEventDeserializationSchema implements PubSubDeserializationSchema<Event> {

231

private ObjectMapper mapper = new ObjectMapper();

232

233

@Override

234

public void open(DeserializationSchema.InitializationContext context) throws Exception {

235

// Setup metrics for tracking deserialization errors

236

context.getMetricGroup().counter("deserialization_errors");

237

context.getMetricGroup().counter("validation_errors");

238

}

239

240

@Override

241

public Event deserialize(PubsubMessage message) throws Exception {

242

try {

243

// Parse JSON data

244

String jsonData = message.getData().toStringUtf8();

245

Event event = mapper.readValue(jsonData, Event.class);

246

247

// Add metadata

248

event.setMessageId(message.getMessageId());

249

event.setPublishTime(message.getPublishTime().getSeconds());

250

event.setAttributes(message.getAttributesMap());

251

252

// Validate event

253

if (!isValidEvent(event)) {

254

throw new IllegalArgumentException("Event validation failed");

255

}

256

257

return event;

258

259

} catch (Exception e) {

260

// Log error but don't fail processing

261

System.err.println("Failed to deserialize message: " + e.getMessage());

262

return null; // Skip invalid messages

263

}

264

}

265

266

private boolean isValidEvent(Event event) {

267

return event.getEventType() != null &&

268

event.getTimestamp() > 0 &&

269

event.getUserId() != null;

270

}

271

272

@Override

273

public boolean isEndOfStream(Event nextElement) {

274

return nextElement != null && "SHUTDOWN".equals(nextElement.getEventType());

275

}

276

277

@Override

278

public TypeInformation<Event> getProducedType() {

279

return TypeInformation.of(Event.class);

280

}

281

}

282

```

283

284

### Schema Evolution with Version Handling

285

286

```java

287

public class VersionedDeserializationSchema implements PubSubDeserializationSchema<VersionedEvent> {

288

private ObjectMapper mapper = new ObjectMapper();

289

290

@Override

291

public VersionedEvent deserialize(PubsubMessage message) throws Exception {

292

Map<String, String> attributes = message.getAttributesMap();

293

String version = attributes.getOrDefault("schema_version", "1.0");

294

295

String jsonData = message.getData().toStringUtf8();

296

297

switch (version) {

298

case "1.0":

299

return deserializeV1(jsonData);

300

case "2.0":

301

return deserializeV2(jsonData);

302

default:

303

throw new IllegalArgumentException("Unsupported schema version: " + version);

304

}

305

}

306

307

private VersionedEvent deserializeV1(String json) throws Exception {

308

EventV1 v1Event = mapper.readValue(json, EventV1.class);

309

return new VersionedEvent(v1Event.name, v1Event.value, "default_category");

310

}

311

312

private VersionedEvent deserializeV2(String json) throws Exception {

313

EventV2 v2Event = mapper.readValue(json, EventV2.class);

314

return new VersionedEvent(v2Event.name, v2Event.value, v2Event.category);

315

}

316

317

@Override

318

public boolean isEndOfStream(VersionedEvent nextElement) {

319

return false;

320

}

321

322

@Override

323

public TypeInformation<VersionedEvent> getProducedType() {

324

return TypeInformation.of(VersionedEvent.class);

325

}

326

}

327

```

328

329

## Performance Considerations

330

331

### Memory Management

332

333

- Avoid storing large metadata objects in memory

334

- Use streaming parsing for large JSON payloads

335

- Consider object pooling for high-throughput scenarios

336

337

### Error Handling

338

339

- Return `null` from `deserialize()` to skip invalid messages

340

- Use metrics to track deserialization errors

341

- Implement fallback deserialization strategies for schema evolution

342

343

### Type Safety

344

345

- Always provide accurate `TypeInformation` in `getProducedType()`

346

- Use generic type parameters correctly to maintain type safety

347

- Consider using Flink's `TypeHint` for complex generic types

348

349

## Important Notes

350

351

- **Null Handling**: Returning `null` from `deserialize()` will skip the message entirely

352

- **Exception Handling**: Uncaught exceptions in `deserialize()` will cause the source to fail

353

- **Collector Usage**: When using the collector version, don't implement the single-record `deserialize()` method

354

- **Metadata Access**: Full PubsubMessage metadata is only available through `PubSubDeserializationSchema`, not standard `DeserializationSchema`

355

- **Performance**: Metadata access has minimal performance overhead compared to standard deserialization