0
# Payload Conversion and Serialization
1
2
Pluggable payload conversion system for serializing workflow arguments, results, and signals with support for custom codecs and encryption.
3
4
## Capabilities
5
6
### Payload Converter
7
8
Core interface for converting between Java objects and Temporal payloads.
9
10
```java { .api }
11
/**
12
* Converts between Java objects and Temporal payloads.
13
*/
14
public interface PayloadConverter {
15
/**
16
* Converts Java object to payload.
17
* @param value object to convert
18
* @return optional payload if conversion successful
19
* @throws DataConverterException if conversion fails
20
*/
21
Optional<Payload> toData(Object value) throws DataConverterException;
22
23
/**
24
* Converts payload to Java object.
25
* @param content payload to convert
26
* @param valueClass target class
27
* @param valueType target generic type
28
* @return converted object
29
* @throws DataConverterException if conversion fails
30
*/
31
<T> T fromData(Payload content, Class<T> valueClass, Type valueType) throws DataConverterException;
32
33
/**
34
* Gets encoding type for this converter.
35
* @return encoding type string
36
*/
37
String getEncodingType();
38
}
39
```
40
41
### Data Converter
42
43
Main interface for data conversion with support for multiple payload converters.
44
45
```java { .api }
46
/**
47
* Main interface for data conversion with support for multiple payload converters.
48
*/
49
public interface DataConverter {
50
/**
51
* Gets default data converter instance.
52
* @return default data converter
53
*/
54
static DataConverter getDefaultInstance();
55
56
/**
57
* Creates data converter with custom payload converters.
58
* @param payloadConverters array of payload converters
59
* @return data converter with custom converters
60
*/
61
static DataConverter newDefaultInstance(PayloadConverter... payloadConverters);
62
63
/**
64
* Converts single value to payloads.
65
* @param value value to convert
66
* @return optional payloads
67
*/
68
Optional<Payloads> toPayloads(Object... value) throws DataConverterException;
69
70
/**
71
* Converts payloads to array of values.
72
* @param content payloads to convert
73
* @param valueTypes target types
74
* @return array of converted values
75
*/
76
<T> T[] fromPayloads(int index, Optional<Payloads> content, Class<T> valueType, Type... valueTypes) throws DataConverterException;
77
78
/**
79
* Converts single payload to value.
80
* @param content payload to convert
81
* @param valueClass target class
82
* @param valueType target generic type
83
* @return converted value
84
*/
85
<T> T fromPayload(Payload content, Class<T> valueClass, Type valueType) throws DataConverterException;
86
87
/**
88
* Gets payload converters.
89
* @return list of payload converters
90
*/
91
List<PayloadConverter> getPayloadConverters();
92
93
/**
94
* Converts failure to payload.
95
* @param failure failure to convert
96
* @return failure payload
97
*/
98
Failure exceptionToFailure(Throwable failure);
99
100
/**
101
* Converts payload to failure.
102
* @param failure failure payload
103
* @return failure exception
104
*/
105
Throwable failureToException(Failure failure);
106
107
/**
108
* Creates data converter with payload codec.
109
* @param payloadCodec payload codec for encoding/decoding
110
* @return data converter with codec
111
*/
112
DataConverter withPayloadCodec(PayloadCodec payloadCodec);
113
114
/**
115
* Creates data converter with failure converter.
116
* @param failureConverter failure converter
117
* @return data converter with failure converter
118
*/
119
DataConverter withFailureConverter(FailureConverter failureConverter);
120
}
121
```
122
123
**Usage Examples:**
124
125
```java
126
public class CustomDataConverterExample {
127
public static void setupCustomDataConverter() {
128
// Create custom payload converters
129
PayloadConverter jsonConverter = new JacksonJsonPayloadConverter();
130
PayloadConverter protobufConverter = new ProtobufJsonPayloadConverter();
131
PayloadConverter customConverter = new MyCustomPayloadConverter();
132
133
// Create data converter with custom converters
134
DataConverter dataConverter = DataConverter.newDefaultInstance(
135
customConverter, // Highest priority
136
protobufConverter,
137
jsonConverter // Fallback
138
);
139
140
// Use with workflow client
141
WorkflowClient client = WorkflowClient.newInstance(
142
service,
143
WorkflowClientOptions.newBuilder()
144
.setDataConverter(dataConverter)
145
.build()
146
);
147
148
// Use with worker
149
Worker worker = factory.newWorker(
150
"task-queue",
151
WorkerOptions.newBuilder()
152
.setDataConverter(dataConverter)
153
.build()
154
);
155
}
156
157
private static class MyCustomPayloadConverter implements PayloadConverter {
158
private static final String ENCODING_TYPE = "application/x-custom";
159
160
@Override
161
public String getEncodingType() {
162
return ENCODING_TYPE;
163
}
164
165
@Override
166
public Optional<Payload> toData(Object value) throws DataConverterException {
167
if (value instanceof MyCustomObject) {
168
try {
169
byte[] data = serializeCustomObject((MyCustomObject) value);
170
return Optional.of(
171
Payload.newBuilder()
172
.putMetadata(EncodingKeys.METADATA_ENCODING_KEY,
173
ByteString.copyFromUtf8(ENCODING_TYPE))
174
.setData(ByteString.copyFrom(data))
175
.build()
176
);
177
} catch (Exception e) {
178
throw new DataConverterException("Failed to serialize custom object", e);
179
}
180
}
181
return Optional.empty();
182
}
183
184
@Override
185
public <T> T fromData(Payload content, Class<T> valueClass, Type valueType) throws DataConverterException {
186
if (!valueClass.isAssignableFrom(MyCustomObject.class)) {
187
throw new DataConverterException("Unsupported type: " + valueClass);
188
}
189
190
try {
191
byte[] data = content.getData().toByteArray();
192
MyCustomObject obj = deserializeCustomObject(data);
193
return valueClass.cast(obj);
194
} catch (Exception e) {
195
throw new DataConverterException("Failed to deserialize custom object", e);
196
}
197
}
198
199
private byte[] serializeCustomObject(MyCustomObject obj) {
200
// Custom serialization logic
201
return obj.serialize();
202
}
203
204
private MyCustomObject deserializeCustomObject(byte[] data) {
205
// Custom deserialization logic
206
return MyCustomObject.deserialize(data);
207
}
208
}
209
}
210
```
211
212
### Payload Codec
213
214
Interface for encoding/decoding payloads (compression, encryption).
215
216
```java { .api }
217
/**
218
* Interface for encoding/decoding of payloads (encryption, compression).
219
*/
220
public interface PayloadCodec {
221
/**
222
* Encodes list of payloads.
223
* @param payloads payloads to encode
224
* @return encoded payloads
225
*/
226
List<Payload> encode(List<Payload> payloads);
227
228
/**
229
* Decodes list of payloads.
230
* @param payloads payloads to decode
231
* @return decoded payloads
232
*/
233
List<Payload> decode(List<Payload> payloads);
234
}
235
```
236
237
**Usage Examples:**
238
239
```java
240
public class EncryptionCodecExample {
241
public static class AESEncryptionCodec implements PayloadCodec {
242
private static final String ENCODING_TYPE = "binary/encrypted";
243
private final byte[] encryptionKey;
244
245
public AESEncryptionCodec(byte[] encryptionKey) {
246
this.encryptionKey = encryptionKey;
247
}
248
249
@Override
250
public List<Payload> encode(List<Payload> payloads) {
251
return payloads.stream()
252
.map(this::encryptPayload)
253
.collect(Collectors.toList());
254
}
255
256
@Override
257
public List<Payload> decode(List<Payload> payloads) {
258
return payloads.stream()
259
.map(this::decryptPayload)
260
.collect(Collectors.toList());
261
}
262
263
private Payload encryptPayload(Payload payload) {
264
try {
265
// Skip if already encrypted
266
String encoding = payload.getMetadataMap()
267
.get(EncodingKeys.METADATA_ENCODING_KEY)
268
.toStringUtf8();
269
270
if (ENCODING_TYPE.equals(encoding)) {
271
return payload;
272
}
273
274
// Encrypt payload data
275
byte[] data = payload.getData().toByteArray();
276
byte[] encryptedData = encrypt(data, encryptionKey);
277
278
return Payload.newBuilder()
279
.putAllMetadata(payload.getMetadataMap())
280
.putMetadata(EncodingKeys.METADATA_ENCODING_KEY,
281
ByteString.copyFromUtf8(ENCODING_TYPE))
282
.putMetadata("encryption-algorithm",
283
ByteString.copyFromUtf8("AES-256-GCM"))
284
.setData(ByteString.copyFrom(encryptedData))
285
.build();
286
} catch (Exception e) {
287
throw new RuntimeException("Failed to encrypt payload", e);
288
}
289
}
290
291
private Payload decryptPayload(Payload payload) {
292
try {
293
String encoding = payload.getMetadataMap()
294
.get(EncodingKeys.METADATA_ENCODING_KEY)
295
.toStringUtf8();
296
297
if (!ENCODING_TYPE.equals(encoding)) {
298
return payload; // Not encrypted
299
}
300
301
// Decrypt payload data
302
byte[] encryptedData = payload.getData().toByteArray();
303
byte[] decryptedData = decrypt(encryptedData, encryptionKey);
304
305
// Remove encryption metadata and restore original encoding
306
Map<String, ByteString> metadata = new HashMap<>(payload.getMetadataMap());
307
metadata.remove(EncodingKeys.METADATA_ENCODING_KEY);
308
metadata.remove("encryption-algorithm");
309
310
return Payload.newBuilder()
311
.putAllMetadata(metadata)
312
.setData(ByteString.copyFrom(decryptedData))
313
.build();
314
} catch (Exception e) {
315
throw new RuntimeException("Failed to decrypt payload", e);
316
}
317
}
318
319
private byte[] encrypt(byte[] data, byte[] key) throws Exception {
320
// AES encryption implementation
321
Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
322
SecretKeySpec keySpec = new SecretKeySpec(key, "AES");
323
cipher.init(Cipher.ENCRYPT_MODE, keySpec);
324
return cipher.doFinal(data);
325
}
326
327
private byte[] decrypt(byte[] encryptedData, byte[] key) throws Exception {
328
// AES decryption implementation
329
Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
330
SecretKeySpec keySpec = new SecretKeySpec(key, "AES");
331
cipher.init(Cipher.DECRYPT_MODE, keySpec);
332
return cipher.doFinal(encryptedData);
333
}
334
}
335
336
public static void useEncryptionCodec() {
337
// Generate encryption key (in practice, use secure key management)
338
byte[] encryptionKey = generateSecureKey();
339
340
// Create data converter with encryption codec
341
PayloadCodec encryptionCodec = new AESEncryptionCodec(encryptionKey);
342
DataConverter dataConverter = DataConverter.getDefaultInstance()
343
.withPayloadCodec(encryptionCodec);
344
345
// Use with workflow client
346
WorkflowClient client = WorkflowClient.newInstance(
347
service,
348
WorkflowClientOptions.newBuilder()
349
.setDataConverter(dataConverter)
350
.build()
351
);
352
}
353
}
354
```
355
356
### Serialization Context
357
358
Context information for payload conversion.
359
360
```java { .api }
361
/**
362
* Context information for payload conversion.
363
*/
364
public interface SerializationContext {
365
/**
366
* Gets namespace for the context.
367
* @return namespace
368
*/
369
String getNamespace();
370
}
371
372
/**
373
* Activity-specific serialization context.
374
*/
375
public interface ActivitySerializationContext extends SerializationContext {
376
/**
377
* Gets namespace for the activity.
378
* @return namespace
379
*/
380
String getNamespace();
381
382
/**
383
* Gets task queue for the activity.
384
* @return task queue name
385
*/
386
String getTaskQueue();
387
388
/**
389
* Gets workflow ID that scheduled the activity.
390
* @return workflow ID
391
*/
392
String getWorkflowId();
393
394
/**
395
* Gets run ID of the workflow.
396
* @return run ID
397
*/
398
String getRunId();
399
400
/**
401
* Gets activity ID.
402
* @return activity ID
403
*/
404
String getActivityId();
405
406
/**
407
* Gets activity type.
408
* @return activity type
409
*/
410
String getActivityType();
411
412
/**
413
* Gets attempt number.
414
* @return attempt number
415
*/
416
int getAttempt();
417
}
418
```
419
420
### Built-in Payload Converters
421
422
Default payload converters provided by the SDK.
423
424
```java { .api }
425
/**
426
* Converts null values to empty payloads.
427
*/
428
public class NullPayloadConverter implements PayloadConverter {
429
public static final NullPayloadConverter INSTANCE = new NullPayloadConverter();
430
431
@Override
432
public String getEncodingType() {
433
return "binary/null";
434
}
435
436
@Override
437
public Optional<Payload> toData(Object value) throws DataConverterException {
438
if (value == null) {
439
return Optional.of(
440
Payload.newBuilder()
441
.putMetadata(EncodingKeys.METADATA_ENCODING_KEY,
442
ByteString.copyFromUtf8(getEncodingType()))
443
.build()
444
);
445
}
446
return Optional.empty();
447
}
448
449
@Override
450
public <T> T fromData(Payload content, Class<T> valueClass, Type valueType) throws DataConverterException {
451
return null;
452
}
453
}
454
455
/**
456
* Converts byte arrays to binary payloads.
457
*/
458
public class ByteArrayPayloadConverter implements PayloadConverter {
459
public static final ByteArrayPayloadConverter INSTANCE = new ByteArrayPayloadConverter();
460
461
@Override
462
public String getEncodingType() {
463
return "binary/plain";
464
}
465
466
@Override
467
public Optional<Payload> toData(Object value) throws DataConverterException {
468
if (value instanceof byte[]) {
469
return Optional.of(
470
Payload.newBuilder()
471
.putMetadata(EncodingKeys.METADATA_ENCODING_KEY,
472
ByteString.copyFromUtf8(getEncodingType()))
473
.setData(ByteString.copyFrom((byte[]) value))
474
.build()
475
);
476
}
477
return Optional.empty();
478
}
479
480
@Override
481
public <T> T fromData(Payload content, Class<T> valueClass, Type valueType) throws DataConverterException {
482
if (!valueClass.isAssignableFrom(byte[].class)) {
483
throw new DataConverterException("Cannot convert to " + valueClass);
484
}
485
return valueClass.cast(content.getData().toByteArray());
486
}
487
}
488
489
/**
490
* Converts protobuf messages to JSON payloads.
491
*/
492
public class ProtobufJsonPayloadConverter implements PayloadConverter {
493
@Override
494
public String getEncodingType() {
495
return "json/protobuf";
496
}
497
498
@Override
499
public Optional<Payload> toData(Object value) throws DataConverterException {
500
if (value instanceof Message) {
501
try {
502
String json = JsonFormat.printer().print((Message) value);
503
return Optional.of(
504
Payload.newBuilder()
505
.putMetadata(EncodingKeys.METADATA_ENCODING_KEY,
506
ByteString.copyFromUtf8(getEncodingType()))
507
.putMetadata("messageType",
508
ByteString.copyFromUtf8(value.getClass().getName()))
509
.setData(ByteString.copyFromUtf8(json))
510
.build()
511
);
512
} catch (Exception e) {
513
throw new DataConverterException("Failed to convert protobuf to JSON", e);
514
}
515
}
516
return Optional.empty();
517
}
518
519
@Override
520
public <T> T fromData(Payload content, Class<T> valueClass, Type valueType) throws DataConverterException {
521
if (!Message.class.isAssignableFrom(valueClass)) {
522
throw new DataConverterException("Not a protobuf message type");
523
}
524
525
try {
526
String json = content.getData().toStringUtf8();
527
Message.Builder builder = getMessageBuilder(valueClass);
528
JsonFormat.parser().merge(json, builder);
529
return valueClass.cast(builder.build());
530
} catch (Exception e) {
531
throw new DataConverterException("Failed to convert JSON to protobuf", e);
532
}
533
}
534
535
private Message.Builder getMessageBuilder(Class<?> messageClass) throws Exception {
536
Method method = messageClass.getMethod("newBuilder");
537
return (Message.Builder) method.invoke(null);
538
}
539
}
540
541
/**
542
* Converts objects to JSON using Jackson.
543
*/
544
public class JacksonJsonPayloadConverter implements PayloadConverter {
545
private final ObjectMapper objectMapper;
546
547
public JacksonJsonPayloadConverter(ObjectMapper objectMapper) {
548
this.objectMapper = objectMapper;
549
}
550
551
public JacksonJsonPayloadConverter() {
552
this(new ObjectMapper()
553
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
554
.registerModule(new JavaTimeModule())
555
);
556
}
557
558
@Override
559
public String getEncodingType() {
560
return "json/jackson";
561
}
562
563
@Override
564
public Optional<Payload> toData(Object value) throws DataConverterException {
565
// Skip primitive types and strings (handled by other converters)
566
if (value == null || isPrimitive(value.getClass())) {
567
return Optional.empty();
568
}
569
570
try {
571
byte[] json = objectMapper.writeValueAsBytes(value);
572
return Optional.of(
573
Payload.newBuilder()
574
.putMetadata(EncodingKeys.METADATA_ENCODING_KEY,
575
ByteString.copyFromUtf8(getEncodingType()))
576
.setData(ByteString.copyFrom(json))
577
.build()
578
);
579
} catch (Exception e) {
580
throw new DataConverterException("Failed to serialize to JSON", e);
581
}
582
}
583
584
@Override
585
public <T> T fromData(Payload content, Class<T> valueClass, Type valueType) throws DataConverterException {
586
try {
587
byte[] data = content.getData().toByteArray();
588
if (valueType != null && valueType != valueClass) {
589
TypeReference<T> typeReference = new TypeReference<T>() {
590
@Override
591
public Type getType() {
592
return valueType;
593
}
594
};
595
return objectMapper.readValue(data, typeReference);
596
} else {
597
return objectMapper.readValue(data, valueClass);
598
}
599
} catch (Exception e) {
600
throw new DataConverterException("Failed to deserialize from JSON", e);
601
}
602
}
603
604
private boolean isPrimitive(Class<?> clazz) {
605
return clazz.isPrimitive() ||
606
clazz == String.class ||
607
Number.class.isAssignableFrom(clazz) ||
608
clazz == Boolean.class;
609
}
610
}
611
```
612
613
### Data Converter Exception
614
615
Exception thrown during data conversion operations.
616
617
```java { .api }
618
/**
619
* Exception thrown during data conversion operations.
620
*/
621
public class DataConverterException extends Exception {
622
/**
623
* Creates DataConverterException with message.
624
* @param message exception message
625
*/
626
public DataConverterException(String message);
627
628
/**
629
* Creates DataConverterException with message and cause.
630
* @param message exception message
631
* @param cause underlying cause
632
*/
633
public DataConverterException(String message, Throwable cause);
634
635
/**
636
* Creates DataConverterException with cause.
637
* @param cause underlying cause
638
*/
639
public DataConverterException(Throwable cause);
640
}
641
```
642
643
**Usage Examples:**
644
645
```java
646
public class ComprehensiveDataConversionExample {
647
public static void setupComprehensiveDataConverter() {
648
// Create custom Jackson ObjectMapper
649
ObjectMapper objectMapper = new ObjectMapper()
650
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
651
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
652
.registerModule(new JavaTimeModule())
653
.registerModule(new Jdk8Module());
654
655
// Create payload converters in priority order
656
List<PayloadConverter> payloadConverters = Arrays.asList(
657
NullPayloadConverter.INSTANCE, // Handle nulls
658
ByteArrayPayloadConverter.INSTANCE, // Handle byte arrays
659
new ProtobufJsonPayloadConverter(), // Handle protobuf
660
new JacksonJsonPayloadConverter(objectMapper), // Handle POJOs
661
new MyCustomPayloadConverter() // Handle custom types
662
);
663
664
// Create encryption codec
665
PayloadCodec encryptionCodec = new AESEncryptionCodec(getEncryptionKey());
666
667
// Create compression codec
668
PayloadCodec compressionCodec = new GzipCompressionCodec();
669
670
// Chain codecs (compression then encryption)
671
PayloadCodec chainedCodec = new ChainedPayloadCodec(compressionCodec, encryptionCodec);
672
673
// Create data converter
674
DataConverter dataConverter = DataConverter.newDefaultInstance(
675
payloadConverters.toArray(new PayloadConverter[0])
676
)
677
.withPayloadCodec(chainedCodec)
678
.withFailureConverter(new CustomFailureConverter());
679
680
// Use with client and workers
681
WorkflowClientOptions clientOptions = WorkflowClientOptions.newBuilder()
682
.setDataConverter(dataConverter)
683
.build();
684
685
WorkflowClient client = WorkflowClient.newInstance(service, clientOptions);
686
}
687
688
// Compression codec example
689
public static class GzipCompressionCodec implements PayloadCodec {
690
private static final String ENCODING_TYPE = "binary/gzip";
691
private static final int MIN_SIZE_FOR_COMPRESSION = 1024; // Only compress if > 1KB
692
693
@Override
694
public List<Payload> encode(List<Payload> payloads) {
695
return payloads.stream()
696
.map(this::compressPayload)
697
.collect(Collectors.toList());
698
}
699
700
@Override
701
public List<Payload> decode(List<Payload> payloads) {
702
return payloads.stream()
703
.map(this::decompressPayload)
704
.collect(Collectors.toList());
705
}
706
707
private Payload compressPayload(Payload payload) {
708
byte[] data = payload.getData().toByteArray();
709
710
// Only compress if data is large enough
711
if (data.length < MIN_SIZE_FOR_COMPRESSION) {
712
return payload;
713
}
714
715
try {
716
byte[] compressedData = compress(data);
717
718
// Only use compression if it actually saves space
719
if (compressedData.length >= data.length) {
720
return payload;
721
}
722
723
return Payload.newBuilder()
724
.putAllMetadata(payload.getMetadataMap())
725
.putMetadata(EncodingKeys.METADATA_ENCODING_KEY,
726
ByteString.copyFromUtf8(ENCODING_TYPE))
727
.putMetadata("original-size",
728
ByteString.copyFromUtf8(String.valueOf(data.length)))
729
.setData(ByteString.copyFrom(compressedData))
730
.build();
731
} catch (Exception e) {
732
// If compression fails, return original payload
733
return payload;
734
}
735
}
736
737
private Payload decompressPayload(Payload payload) {
738
String encoding = payload.getMetadataMap()
739
.get(EncodingKeys.METADATA_ENCODING_KEY)
740
.toStringUtf8();
741
742
if (!ENCODING_TYPE.equals(encoding)) {
743
return payload; // Not compressed
744
}
745
746
try {
747
byte[] compressedData = payload.getData().toByteArray();
748
byte[] decompressedData = decompress(compressedData);
749
750
// Remove compression metadata
751
Map<String, ByteString> metadata = new HashMap<>(payload.getMetadataMap());
752
metadata.remove(EncodingKeys.METADATA_ENCODING_KEY);
753
metadata.remove("original-size");
754
755
return Payload.newBuilder()
756
.putAllMetadata(metadata)
757
.setData(ByteString.copyFrom(decompressedData))
758
.build();
759
} catch (Exception e) {
760
throw new RuntimeException("Failed to decompress payload", e);
761
}
762
}
763
764
private byte[] compress(byte[] data) throws IOException {
765
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
766
GZIPOutputStream gzipOut = new GZIPOutputStream(baos)) {
767
gzipOut.write(data);
768
gzipOut.finish();
769
return baos.toByteArray();
770
}
771
}
772
773
private byte[] decompress(byte[] compressedData) throws IOException {
774
try (ByteArrayInputStream bais = new ByteArrayInputStream(compressedData);
775
GZIPInputStream gzipIn = new GZIPInputStream(bais);
776
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
777
byte[] buffer = new byte[1024];
778
int len;
779
while ((len = gzipIn.read(buffer)) != -1) {
780
baos.write(buffer, 0, len);
781
}
782
return baos.toByteArray();
783
}
784
}
785
}
786
}
787
```