0
# Data Conversion
1
2
Temporalio provides a comprehensive data conversion system that enables serialization and deserialization of Python values for workflow and activity parameters, return values, and other data used in the Temporal platform. The conversion system is modular and extensible, allowing custom serialization strategies while maintaining compatibility with other Temporal SDKs.
3
4
## Core Framework
5
6
### DataConverter Class
7
8
The `DataConverter` class is the main orchestrator for data conversion, combining payload conversion with optional encoding/encryption:
9
10
```python { .api }
11
@dataclass(frozen=True)
12
class DataConverter:
13
"""Data converter for converting and encoding payloads to/from Python values.
14
15
This combines PayloadConverter which converts values with
16
PayloadCodec which encodes bytes.
17
"""
18
19
payload_converter_class: Type[PayloadConverter] = DefaultPayloadConverter
20
"""Class to instantiate for payload conversion."""
21
22
payload_codec: Optional[PayloadCodec] = None
23
"""Optional codec for encoding payload bytes."""
24
25
failure_converter_class: Type[FailureConverter] = DefaultFailureConverter
26
"""Class to instantiate for failure conversion."""
27
28
payload_converter: PayloadConverter = dataclasses.field(init=False)
29
"""Payload converter created from the payload_converter_class."""
30
31
failure_converter: FailureConverter = dataclasses.field(init=False)
32
"""Failure converter created from the failure_converter_class."""
33
34
default: ClassVar[DataConverter]
35
"""Singleton default data converter."""
36
37
async def encode(
38
self, values: Sequence[Any]
39
) -> List[temporalio.api.common.v1.Payload]:
40
"""Encode values into payloads.
41
42
First converts values to payloads then encodes payloads using codec.
43
44
Args:
45
values: Values to be converted and encoded.
46
47
Returns:
48
Converted and encoded payloads. Note, this does not have to be the
49
same number as values given, but must be at least one and cannot be
50
more than was given.
51
"""
52
53
async def decode(
54
self,
55
payloads: Sequence[temporalio.api.common.v1.Payload],
56
type_hints: Optional[List[Type]] = None,
57
) -> List[Any]:
58
"""Decode payloads into values.
59
60
First decodes payloads using codec then converts payloads to values.
61
62
Args:
63
payloads: Payloads to be decoded and converted.
64
type_hints: Optional type hints to guide conversion.
65
66
Returns:
67
Decoded and converted values.
68
"""
69
70
async def encode_wrapper(
71
self, values: Sequence[Any]
72
) -> temporalio.api.common.v1.Payloads:
73
"""encode() for the temporalio.api.common.v1.Payloads wrapper."""
74
75
async def decode_wrapper(
76
self,
77
payloads: Optional[temporalio.api.common.v1.Payloads],
78
type_hints: Optional[List[Type]] = None,
79
) -> List[Any]:
80
"""decode() for the temporalio.api.common.v1.Payloads wrapper."""
81
82
async def encode_failure(
83
self, exception: BaseException, failure: temporalio.api.failure.v1.Failure
84
) -> None:
85
"""Convert and encode failure."""
86
87
async def decode_failure(
88
self, failure: temporalio.api.failure.v1.Failure
89
) -> BaseException:
90
"""Decode and convert failure."""
91
```
92
93
### Default Function
94
95
```python { .api }
96
def default() -> DataConverter:
97
"""Default data converter.
98
99
.. deprecated::
100
Use DataConverter.default instead.
101
"""
102
```
103
104
## Payload Conversion
105
106
### PayloadConverter Base Class
107
108
The `PayloadConverter` is the abstract base class for converting Python values to/from Temporal payloads:
109
110
```python { .api }
111
class PayloadConverter(ABC):
112
"""Base payload converter to/from multiple payloads/values."""
113
114
default: ClassVar[PayloadConverter]
115
"""Default payload converter."""
116
117
@abstractmethod
118
def to_payloads(
119
self, values: Sequence[Any]
120
) -> List[temporalio.api.common.v1.Payload]:
121
"""Encode values into payloads.
122
123
Implementers are expected to just return the payload for
124
temporalio.common.RawValue.
125
126
Args:
127
values: Values to be converted.
128
129
Returns:
130
Converted payloads. Note, this does not have to be the same number
131
as values given, but must be at least one and cannot be more than
132
was given.
133
134
Raises:
135
Exception: Any issue during conversion.
136
"""
137
138
@abstractmethod
139
def from_payloads(
140
self,
141
payloads: Sequence[temporalio.api.common.v1.Payload],
142
type_hints: Optional[List[Type]] = None,
143
) -> List[Any]:
144
"""Decode payloads into values.
145
146
Implementers are expected to treat a type hint of
147
temporalio.common.RawValue as just the raw value.
148
149
Args:
150
payloads: Payloads to convert to Python values.
151
type_hints: Types that are expected if any. This may not have any
152
types if there are no annotations on the target. If this is
153
present, it must have the exact same length as payloads even if
154
the values are just "object".
155
156
Returns:
157
Collection of Python values. Note, this does not have to be the same
158
number as values given, but at least one must be present.
159
160
Raises:
161
Exception: Any issue during conversion.
162
"""
163
164
def to_payloads_wrapper(
165
self, values: Sequence[Any]
166
) -> temporalio.api.common.v1.Payloads:
167
"""to_payloads() for the temporalio.api.common.v1.Payloads wrapper."""
168
169
def from_payloads_wrapper(
170
self, payloads: Optional[temporalio.api.common.v1.Payloads]
171
) -> List[Any]:
172
"""from_payloads() for the temporalio.api.common.v1.Payloads wrapper."""
173
174
def to_payload(self, value: Any) -> temporalio.api.common.v1.Payload:
175
"""Convert a single value to a payload.
176
177
This is a shortcut for to_payloads() with a single-item list
178
and result.
179
180
Args:
181
value: Value to convert to a single payload.
182
183
Returns:
184
Single converted payload.
185
"""
186
187
def from_payload(
188
self,
189
payload: temporalio.api.common.v1.Payload,
190
type_hint: Optional[Type] = None,
191
) -> Any:
192
"""Convert a single payload to a value.
193
194
This is a shortcut for from_payloads() with a single-item list
195
and result.
196
197
Args:
198
payload: Payload to convert to value.
199
type_hint: Optional type hint to say which type to convert to.
200
201
Returns:
202
Single converted value.
203
"""
204
```
205
206
### EncodingPayloadConverter
207
208
For encoding-specific converters used with `CompositePayloadConverter`:
209
210
```python { .api }
211
class EncodingPayloadConverter(ABC):
212
"""Base converter to/from single payload/value with a known encoding for use in CompositePayloadConverter."""
213
214
@property
215
@abstractmethod
216
def encoding(self) -> str:
217
"""Encoding for the payload this converter works with."""
218
219
@abstractmethod
220
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
221
"""Encode a single value to a payload or None.
222
223
Args:
224
value: Value to be converted.
225
226
Returns:
227
Payload of the value or None if unable to convert.
228
229
Raises:
230
TypeError: Value is not the expected type.
231
ValueError: Value is of the expected type but otherwise incorrect.
232
RuntimeError: General error during encoding.
233
"""
234
235
@abstractmethod
236
def from_payload(
237
self,
238
payload: temporalio.api.common.v1.Payload,
239
type_hint: Optional[Type] = None,
240
) -> Any:
241
"""Decode a single payload to a Python value or raise exception.
242
243
Args:
244
payload: Payload to convert to Python value.
245
type_hint: Type that is expected if any. This may not have a type if
246
there are no annotations on the target.
247
248
Return:
249
The decoded value from the payload. Since the encoding is checked by
250
the caller, this should raise an exception if the payload cannot be
251
converted.
252
253
Raises:
254
RuntimeError: General error during decoding.
255
"""
256
```
257
258
### CompositePayloadConverter
259
260
Combines multiple encoding payload converters:
261
262
```python { .api }
263
class CompositePayloadConverter(PayloadConverter):
264
"""Composite payload converter that delegates to a list of encoding payload converters.
265
266
Encoding/decoding are attempted on each payload converter successively until
267
it succeeds.
268
269
Attributes:
270
converters: Mapping of encoding bytes to payload converters.
271
"""
272
273
converters: Mapping[bytes, EncodingPayloadConverter]
274
275
def __init__(self, *converters: EncodingPayloadConverter) -> None:
276
"""Initializes the data converter.
277
278
Args:
279
converters: Payload converters to delegate to, in order.
280
"""
281
```
282
283
### DefaultPayloadConverter
284
285
The default implementation compatible with other Temporal SDKs:
286
287
```python { .api }
288
class DefaultPayloadConverter(CompositePayloadConverter):
289
"""Default payload converter compatible with other Temporal SDKs.
290
291
This handles None, bytes, all protobuf message types, and any type that
292
json.dump accepts. A singleton instance of this is available at
293
PayloadConverter.default.
294
"""
295
296
default_encoding_payload_converters: Tuple[EncodingPayloadConverter, ...]
297
"""Default set of encoding payload converters the default payload converter
298
uses.
299
"""
300
301
def __init__(self) -> None:
302
"""Create a default payload converter."""
303
```
304
305
## Built-in Converters
306
307
### Binary Converters
308
309
#### BinaryNullPayloadConverter
310
311
Handles `None` values:
312
313
```python { .api }
314
class BinaryNullPayloadConverter(EncodingPayloadConverter):
315
"""Converter for 'binary/null' payloads supporting None values."""
316
317
@property
318
def encoding(self) -> str:
319
"""Returns 'binary/null'."""
320
321
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
322
"""Convert None values to payload."""
323
324
def from_payload(
325
self,
326
payload: temporalio.api.common.v1.Payload,
327
type_hint: Optional[Type] = None,
328
) -> Any:
329
"""Convert payload back to None."""
330
```
331
332
#### BinaryPlainPayloadConverter
333
334
Handles `bytes` values:
335
336
```python { .api }
337
class BinaryPlainPayloadConverter(EncodingPayloadConverter):
338
"""Converter for 'binary/plain' payloads supporting bytes values."""
339
340
@property
341
def encoding(self) -> str:
342
"""Returns 'binary/plain'."""
343
344
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
345
"""Convert bytes values to payload."""
346
347
def from_payload(
348
self,
349
payload: temporalio.api.common.v1.Payload,
350
type_hint: Optional[Type] = None,
351
) -> Any:
352
"""Convert payload back to bytes."""
353
```
354
355
### Protobuf Converters
356
357
#### JSONProtoPayloadConverter
358
359
Handles Protocol Buffer messages using JSON encoding:
360
361
```python { .api }
362
class JSONProtoPayloadConverter(EncodingPayloadConverter):
363
"""Converter for 'json/protobuf' payloads supporting protobuf Message values."""
364
365
def __init__(self, ignore_unknown_fields: bool = False):
366
"""Initialize a JSON proto converter.
367
368
Args:
369
ignore_unknown_fields: Determines whether converter should error if
370
unknown fields are detected
371
"""
372
373
@property
374
def encoding(self) -> str:
375
"""Returns 'json/protobuf'."""
376
377
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
378
"""Convert protobuf Message values to JSON payload."""
379
380
def from_payload(
381
self,
382
payload: temporalio.api.common.v1.Payload,
383
type_hint: Optional[Type] = None,
384
) -> Any:
385
"""Convert JSON payload back to protobuf Message."""
386
```
387
388
#### BinaryProtoPayloadConverter
389
390
Handles Protocol Buffer messages using binary encoding:
391
392
```python { .api }
393
class BinaryProtoPayloadConverter(EncodingPayloadConverter):
394
"""Converter for 'binary/protobuf' payloads supporting protobuf Message values."""
395
396
@property
397
def encoding(self) -> str:
398
"""Returns 'binary/protobuf'."""
399
400
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
401
"""Convert protobuf Message values to binary payload."""
402
403
def from_payload(
404
self,
405
payload: temporalio.api.common.v1.Payload,
406
type_hint: Optional[Type] = None,
407
) -> Any:
408
"""Convert binary payload back to protobuf Message."""
409
```
410
411
## JSON Handling
412
413
### JSONPlainPayloadConverter
414
415
Primary converter for common Python values:
416
417
```python { .api }
418
class JSONPlainPayloadConverter(EncodingPayloadConverter):
419
"""Converter for 'json/plain' payloads supporting common Python values.
420
421
For encoding, this supports all values that json.dump supports
422
and by default adds extra encoding support for dataclasses, classes with
423
dict() methods, and all iterables.
424
425
For decoding, this uses type hints to attempt to rebuild the type from the
426
type hint.
427
"""
428
429
def __init__(
430
self,
431
*,
432
encoder: Optional[Type[json.JSONEncoder]] = AdvancedJSONEncoder,
433
decoder: Optional[Type[json.JSONDecoder]] = None,
434
encoding: str = "json/plain",
435
custom_type_converters: Sequence[JSONTypeConverter] = [],
436
) -> None:
437
"""Initialize a JSON data converter.
438
439
Args:
440
encoder: Custom encoder class object to use.
441
decoder: Custom decoder class object to use.
442
encoding: Encoding name to use.
443
custom_type_converters: Set of custom type converters that are used
444
when converting from a payload to type-hinted values.
445
"""
446
447
@property
448
def encoding(self) -> str:
449
"""Get the encoding name."""
450
451
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
452
"""Convert Python values to JSON payload."""
453
454
def from_payload(
455
self,
456
payload: temporalio.api.common.v1.Payload,
457
type_hint: Optional[Type] = None,
458
) -> Any:
459
"""Convert JSON payload back to Python values using type hints."""
460
```
461
462
### AdvancedJSONEncoder
463
464
Enhanced JSON encoder with support for additional Python types:
465
466
```python { .api }
467
class AdvancedJSONEncoder(json.JSONEncoder):
468
"""Advanced JSON encoder.
469
470
This encoder supports dataclasses and all iterables as lists.
471
472
It also uses Pydantic v1's "dict" methods if available on the object,
473
but this is deprecated. Pydantic users should upgrade to v2 and use
474
temporalio.contrib.pydantic.pydantic_data_converter.
475
"""
476
477
def default(self, o: Any) -> Any:
478
"""Override JSON encoding default.
479
480
Supports:
481
- datetime objects (converted to ISO format)
482
- dataclasses (converted using dataclasses.asdict)
483
- Objects with dict() method (deprecated Pydantic v1 support)
484
- Non-list iterables (converted to lists)
485
- UUID objects (converted to strings)
486
487
See json.JSONEncoder.default.
488
"""
489
```
490
491
### JSONTypeConverter
492
493
Abstract base for custom type conversion during JSON decoding:
494
495
```python { .api }
496
class JSONTypeConverter(ABC):
497
"""Converter for converting an object from Python json.loads
498
result (e.g. scalar, list, or dict) to a known type.
499
"""
500
501
Unhandled = _JSONTypeConverterUnhandled(object())
502
"""Sentinel value that must be used as the result of
503
to_typed_value to say the given type is not handled by this
504
converter."""
505
506
@abstractmethod
507
def to_typed_value(
508
self, hint: Type, value: Any
509
) -> Union[Optional[Any], _JSONTypeConverterUnhandled]:
510
"""Convert the given value to a type based on the given hint.
511
512
Args:
513
hint: Type hint to use to help in converting the value.
514
value: Value as returned by json.loads. Usually a scalar,
515
list, or dict.
516
517
Returns:
518
The converted value or Unhandled if this converter does
519
not handle this situation.
520
"""
521
```
522
523
### Type Conversion Utilities
524
525
```python { .api }
526
def value_to_type(
527
hint: Type,
528
value: Any,
529
custom_converters: Sequence[JSONTypeConverter] = [],
530
) -> Any:
531
"""Convert a given value to the given type hint.
532
533
This is used internally to convert a raw JSON loaded value to a specific
534
type hint.
535
536
Args:
537
hint: Type hint to convert the value to.
538
value: Raw value (e.g. primitive, dict, or list) to convert from.
539
custom_converters: Set of custom converters to try before doing default
540
conversion. Converters are tried in order and the first value that
541
is not JSONTypeConverter.Unhandled will be returned from
542
this function instead of doing default behavior.
543
544
Returns:
545
Converted value.
546
547
Raises:
548
TypeError: Unable to convert to the given hint.
549
"""
550
```
551
552
## Payload Codecs
553
554
### PayloadCodec Base Class
555
556
Abstract base for encoding/decoding payloads (e.g., compression or encryption):
557
558
```python { .api }
559
class PayloadCodec(ABC):
560
"""Codec for encoding/decoding to/from bytes.
561
562
Commonly used for compression or encryption.
563
"""
564
565
@abstractmethod
566
async def encode(
567
self, payloads: Sequence[temporalio.api.common.v1.Payload]
568
) -> List[temporalio.api.common.v1.Payload]:
569
"""Encode the given payloads.
570
571
Args:
572
payloads: Payloads to encode. This value should not be mutated.
573
574
Returns:
575
Encoded payloads. Note, this does not have to be the same number as
576
payloads given, but must be at least one and cannot be more than was
577
given.
578
"""
579
580
@abstractmethod
581
async def decode(
582
self, payloads: Sequence[temporalio.api.common.v1.Payload]
583
) -> List[temporalio.api.common.v1.Payload]:
584
"""Decode the given payloads.
585
586
Args:
587
payloads: Payloads to decode. This value should not be mutated.
588
589
Returns:
590
Decoded payloads. Note, this does not have to be the same number as
591
payloads given, but must be at least one and cannot be more than was
592
given.
593
"""
594
595
async def encode_wrapper(self, payloads: temporalio.api.common.v1.Payloads) -> None:
596
"""encode() for the temporalio.api.common.v1.Payloads wrapper.
597
598
This replaces the payloads within the wrapper.
599
"""
600
601
async def decode_wrapper(self, payloads: temporalio.api.common.v1.Payloads) -> None:
602
"""decode() for the temporalio.api.common.v1.Payloads wrapper.
603
604
This replaces the payloads within.
605
"""
606
607
async def encode_failure(self, failure: temporalio.api.failure.v1.Failure) -> None:
608
"""Encode payloads of a failure."""
609
610
async def decode_failure(self, failure: temporalio.api.failure.v1.Failure) -> None:
611
"""Decode payloads of a failure."""
612
```
613
614
## Failure Conversion
615
616
### FailureConverter Base Class
617
618
Abstract base for converting between Python exceptions and Temporal failures:
619
620
```python { .api }
621
class FailureConverter(ABC):
622
"""Base failure converter to/from errors.
623
624
Note, for workflow exceptions, to_failure is only invoked if the
625
exception is an instance of temporalio.exceptions.FailureError.
626
Users should extend temporalio.exceptions.ApplicationError if
627
they want a custom workflow exception to work with this class.
628
"""
629
630
default: ClassVar[FailureConverter]
631
"""Default failure converter."""
632
633
@abstractmethod
634
def to_failure(
635
self,
636
exception: BaseException,
637
payload_converter: PayloadConverter,
638
failure: temporalio.api.failure.v1.Failure,
639
) -> None:
640
"""Convert the given exception to a Temporal failure.
641
642
Users should make sure not to alter the exception input.
643
644
Args:
645
exception: The exception to convert.
646
payload_converter: The payload converter to use if needed.
647
failure: The failure to update with error information.
648
"""
649
650
@abstractmethod
651
def from_failure(
652
self,
653
failure: temporalio.api.failure.v1.Failure,
654
payload_converter: PayloadConverter,
655
) -> BaseException:
656
"""Convert the given Temporal failure to an exception.
657
658
Users should make sure not to alter the failure input.
659
660
Args:
661
failure: The failure to convert.
662
payload_converter: The payload converter to use if needed.
663
664
Returns:
665
Converted error.
666
"""
667
```
668
669
### DefaultFailureConverter
670
671
Standard implementation for failure conversion:
672
673
```python { .api }
674
class DefaultFailureConverter(FailureConverter):
675
"""Default failure converter.
676
677
A singleton instance of this is available at
678
FailureConverter.default.
679
"""
680
681
def __init__(self, *, encode_common_attributes: bool = False) -> None:
682
"""Create the default failure converter.
683
684
Args:
685
encode_common_attributes: If True, the message and stack trace
686
of the failure will be moved into the encoded attribute section
687
of the failure which can be encoded with a codec.
688
"""
689
```
690
691
### DefaultFailureConverterWithEncodedAttributes
692
693
Enhanced failure converter that encodes attributes for codec processing:
694
695
```python { .api }
696
class DefaultFailureConverterWithEncodedAttributes(DefaultFailureConverter):
697
"""Implementation of DefaultFailureConverter which moves message
698
and stack trace to encoded attributes subject to a codec.
699
"""
700
701
def __init__(self) -> None:
702
"""Create a default failure converter with encoded attributes."""
703
```
704
705
## Search Attributes
706
707
### Core Types
708
709
```python { .api }
710
from typing import Union, Sequence, Mapping, List
711
from datetime import datetime
712
713
SearchAttributeValue = Union[str, int, float, bool, datetime, Sequence[str]]
714
SearchAttributeValues = Union[
715
List[str], List[int], List[float], List[bool], List[datetime]
716
]
717
SearchAttributes = Mapping[str, SearchAttributeValues]
718
```
719
720
### SearchAttributeIndexedValueType
721
722
```python { .api }
723
from enum import IntEnum
724
725
class SearchAttributeIndexedValueType(IntEnum):
726
"""Server index type of a search attribute."""
727
728
TEXT = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_TEXT)
729
KEYWORD = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_KEYWORD)
730
INT = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_INT)
731
DOUBLE = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_DOUBLE)
732
BOOL = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_BOOL)
733
DATETIME = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_DATETIME)
734
KEYWORD_LIST = int(
735
temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_KEYWORD_LIST
736
)
737
```
738
739
### SearchAttributeKey
740
741
Type-safe search attribute key:
742
743
```python { .api }
744
from abc import ABC, abstractmethod
745
from typing import Generic, TypeVar
746
747
SearchAttributeValueType = TypeVar(
748
"SearchAttributeValueType", str, int, float, bool, datetime, Sequence[str]
749
)
750
751
class SearchAttributeKey(ABC, Generic[SearchAttributeValueType]):
752
"""Typed search attribute key representation.
753
754
Use one of the for static methods here to create a key.
755
"""
756
757
@property
758
@abstractmethod
759
def name(self) -> str:
760
"""Get the name of the key."""
761
762
@property
763
@abstractmethod
764
def indexed_value_type(self) -> SearchAttributeIndexedValueType:
765
"""Get the server index typed of the key"""
766
767
@property
768
@abstractmethod
769
def value_type(self) -> Type[SearchAttributeValueType]:
770
"""Get the Python type of value for the key.
771
772
This may contain generics which cannot be used in isinstance.
773
origin_value_type can be used instead.
774
"""
775
776
@property
777
def origin_value_type(self) -> Type:
778
"""Get the Python type of value for the key without generics."""
779
780
def value_set(
781
self, value: SearchAttributeValueType
782
) -> SearchAttributeUpdate[SearchAttributeValueType]:
783
"""Create a search attribute update to set the given value on this key."""
784
785
def value_unset(self) -> SearchAttributeUpdate[SearchAttributeValueType]:
786
"""Create a search attribute update to unset the value on this key."""
787
788
@staticmethod
789
def for_text(name: str) -> SearchAttributeKey[str]:
790
"""Create a 'Text' search attribute type."""
791
792
@staticmethod
793
def for_keyword(name: str) -> SearchAttributeKey[str]:
794
"""Create a 'Keyword' search attribute type."""
795
796
@staticmethod
797
def for_int(name: str) -> SearchAttributeKey[int]:
798
"""Create an 'Int' search attribute type."""
799
800
@staticmethod
801
def for_float(name: str) -> SearchAttributeKey[float]:
802
"""Create a 'Double' search attribute type."""
803
804
@staticmethod
805
def for_bool(name: str) -> SearchAttributeKey[bool]:
806
"""Create a 'Bool' search attribute type."""
807
808
@staticmethod
809
def for_datetime(name: str) -> SearchAttributeKey[datetime]:
810
"""Create a 'Datetime' search attribute type."""
811
812
@staticmethod
813
def for_keyword_list(name: str) -> SearchAttributeKey[Sequence[str]]:
814
"""Create a 'KeywordList' search attribute type."""
815
```
816
817
### SearchAttributePair and Updates
818
819
```python { .api }
820
from typing_extensions import NamedTuple
821
822
class SearchAttributePair(NamedTuple, Generic[SearchAttributeValueType]):
823
"""A named tuple representing a key/value search attribute pair."""
824
825
key: SearchAttributeKey[SearchAttributeValueType]
826
value: SearchAttributeValueType
827
828
class SearchAttributeUpdate(ABC, Generic[SearchAttributeValueType]):
829
"""Representation of a search attribute update."""
830
831
@property
832
@abstractmethod
833
def key(self) -> SearchAttributeKey[SearchAttributeValueType]:
834
"""Key that is being set."""
835
836
@property
837
@abstractmethod
838
def value(self) -> Optional[SearchAttributeValueType]:
839
"""Value that is being set or None if being unset."""
840
```
841
842
### TypedSearchAttributes
843
844
```python { .api }
845
from typing import Collection
846
847
class TypedSearchAttributes(Collection[SearchAttributePair]):
848
"""Collection of typed search attributes.
849
850
This is represented as an immutable collection of
851
SearchAttributePair. This can be created passing a sequence of
852
pairs to the constructor.
853
"""
854
855
search_attributes: Sequence[SearchAttributePair]
856
"""Underlying sequence of search attribute pairs. Do not mutate this, only
857
create new TypedSearchAttribute instances.
858
859
These are sorted by key name during construction. Duplicates cannot exist.
860
"""
861
862
empty: ClassVar[TypedSearchAttributes]
863
"""Class variable representing an empty set of attributes."""
864
865
def __len__(self) -> int:
866
"""Get the number of search attributes."""
867
868
def __getitem__(
869
self, key: SearchAttributeKey[SearchAttributeValueType]
870
) -> SearchAttributeValueType:
871
"""Get a single search attribute value by key or fail with KeyError."""
872
873
def __iter__(self) -> Iterator[SearchAttributePair]:
874
"""Get an iterator over search attribute key/value pairs."""
875
876
def __contains__(self, key: object) -> bool:
877
"""Check whether this search attribute contains the given key.
878
879
This uses key equality so the key must be the same name and type.
880
"""
881
882
def get(
883
self,
884
key: SearchAttributeKey[SearchAttributeValueType],
885
default: Optional[Any] = None,
886
) -> Any:
887
"""Get an attribute value for a key (or default). This is similar to dict.get."""
888
889
def updated(self, *search_attributes: SearchAttributePair) -> TypedSearchAttributes:
890
"""Copy this collection, replacing attributes with matching key names or
891
adding if key name not present.
892
"""
893
```
894
895
### Search Attribute Functions
896
897
```python { .api }
898
def encode_search_attributes(
899
attributes: Union[
900
temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes
901
],
902
api: temporalio.api.common.v1.SearchAttributes,
903
) -> None:
904
"""Convert search attributes into an API message.
905
906
Args:
907
attributes: Search attributes to convert. The dictionary form of this is
908
DEPRECATED.
909
api: API message to set converted attributes on.
910
"""
911
912
def encode_typed_search_attribute_value(
913
key: temporalio.common.SearchAttributeKey[
914
temporalio.common.SearchAttributeValueType
915
],
916
value: Optional[temporalio.common.SearchAttributeValue],
917
) -> temporalio.api.common.v1.Payload:
918
"""Convert typed search attribute value into a payload.
919
920
Args:
921
key: Key for the value.
922
value: Value to convert.
923
924
Returns:
925
Payload for the value.
926
"""
927
928
def encode_search_attribute_values(
929
vals: temporalio.common.SearchAttributeValues,
930
) -> temporalio.api.common.v1.Payload:
931
"""Convert search attribute values into a payload.
932
933
.. deprecated::
934
Use typed search attributes instead.
935
936
Args:
937
vals: List of values to convert.
938
"""
939
940
def decode_search_attributes(
941
api: temporalio.api.common.v1.SearchAttributes,
942
) -> temporalio.common.SearchAttributes:
943
"""Decode API search attributes to values.
944
945
.. deprecated::
946
Use typed search attributes instead.
947
948
Args:
949
api: API message with search attribute values to convert.
950
951
Returns:
952
Converted search attribute values (new mapping every time).
953
"""
954
955
def decode_typed_search_attributes(
956
api: temporalio.api.common.v1.SearchAttributes,
957
) -> temporalio.common.TypedSearchAttributes:
958
"""Decode API search attributes to typed search attributes.
959
960
Args:
961
api: API message with search attribute values to convert.
962
963
Returns:
964
Typed search attribute collection (new object every time).
965
"""
966
```
967
968
## Common Configuration Types
969
970
### RetryPolicy
971
972
```python { .api }
973
from dataclasses import dataclass
974
from datetime import timedelta
975
from typing import Optional, Sequence
976
977
@dataclass
978
class RetryPolicy:
979
"""Options for retrying workflows and activities."""
980
981
initial_interval: timedelta = timedelta(seconds=1)
982
"""Backoff interval for the first retry. Default 1s."""
983
984
backoff_coefficient: float = 2.0
985
"""Coefficient to multiply previous backoff interval by to get new
986
interval. Default 2.0.
987
"""
988
989
maximum_interval: Optional[timedelta] = None
990
"""Maximum backoff interval between retries. Default 100x
991
initial_interval.
992
"""
993
994
maximum_attempts: int = 0
995
"""Maximum number of attempts.
996
997
If 0, the default, there is no maximum.
998
"""
999
1000
non_retryable_error_types: Optional[Sequence[str]] = None
1001
"""List of error types that are not retryable."""
1002
1003
@staticmethod
1004
def from_proto(proto: temporalio.api.common.v1.RetryPolicy) -> RetryPolicy:
1005
"""Create a retry policy from the proto object."""
1006
1007
def apply_to_proto(self, proto: temporalio.api.common.v1.RetryPolicy) -> None:
1008
"""Apply the fields in this policy to the given proto object."""
1009
```
1010
1011
### Priority
1012
1013
```python { .api }
1014
class Priority:
1015
"""Priority configuration for workflows and activities."""
1016
```
1017
1018
### RawValue
1019
1020
```python { .api }
1021
@dataclass(frozen=True)
1022
class RawValue:
1023
"""Representation of an unconverted, raw payload.
1024
1025
This type can be used as a parameter or return type in workflows,
1026
activities, signals, and queries to pass through a raw payload.
1027
Encoding/decoding of the payload is still done by the system.
1028
"""
1029
1030
payload: temporalio.api.common.v1.Payload
1031
```
1032
1033
## Usage Examples
1034
1035
### Custom Payload Converter
1036
1037
```python
1038
import json
1039
from typing import Any, Optional, Type, List
1040
from temporalio.converter import EncodingPayloadConverter
1041
import temporalio.api.common.v1
1042
1043
class CustomJSONConverter(EncodingPayloadConverter):
1044
"""Custom JSON converter with special handling."""
1045
1046
@property
1047
def encoding(self) -> str:
1048
return "json/custom"
1049
1050
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
1051
if isinstance(value, (dict, list, str, int, float, bool)) or value is None:
1052
return temporalio.api.common.v1.Payload(
1053
metadata={"encoding": self.encoding.encode()},
1054
data=json.dumps(value, separators=(',', ':')).encode()
1055
)
1056
return None
1057
1058
def from_payload(
1059
self,
1060
payload: temporalio.api.common.v1.Payload,
1061
type_hint: Optional[Type] = None
1062
) -> Any:
1063
return json.loads(payload.data.decode())
1064
```
1065
1066
### Custom Data Converter with Codec
1067
1068
```python
1069
from temporalio.converter import DataConverter, DefaultPayloadConverter, PayloadCodec
1070
import temporalio.api.common.v1
1071
1072
class EncryptionCodec(PayloadCodec):
1073
"""Simple encryption codec example."""
1074
1075
async def encode(
1076
self, payloads: List[temporalio.api.common.v1.Payload]
1077
) -> List[temporalio.api.common.v1.Payload]:
1078
# Implement encryption logic here
1079
return payloads
1080
1081
async def decode(
1082
self, payloads: List[temporalio.api.common.v1.Payload]
1083
) -> List[temporalio.api.common.v1.Payload]:
1084
# Implement decryption logic here
1085
return payloads
1086
1087
# Create data converter with encryption
1088
data_converter = DataConverter(
1089
payload_converter_class=DefaultPayloadConverter,
1090
payload_codec=EncryptionCodec()
1091
)
1092
```
1093
1094
### Custom JSON Type Converter
1095
1096
```python
1097
from temporalio.converter import JSONTypeConverter, value_to_type
1098
from typing import Type, Any, Union
1099
from datetime import date
1100
1101
class DateTypeConverter(JSONTypeConverter):
1102
"""Convert ISO date strings to date objects."""
1103
1104
def to_typed_value(self, hint: Type, value: Any) -> Union[Any, object]:
1105
if hint is date and isinstance(value, str):
1106
try:
1107
return date.fromisoformat(value)
1108
except ValueError:
1109
pass
1110
return JSONTypeConverter.Unhandled
1111
1112
# Use with JSONPlainPayloadConverter
1113
from temporalio.converter import JSONPlainPayloadConverter
1114
1115
converter = JSONPlainPayloadConverter(
1116
custom_type_converters=[DateTypeConverter()]
1117
)
1118
```
1119
1120
### Working with Search Attributes
1121
1122
```python
1123
from temporalio.common import SearchAttributeKey, TypedSearchAttributes, SearchAttributePair
1124
from datetime import datetime
1125
1126
# Define typed search attribute keys
1127
customer_id_key = SearchAttributeKey.for_keyword("CustomerId")
1128
order_amount_key = SearchAttributeKey.for_float("OrderAmount")
1129
created_time_key = SearchAttributeKey.for_datetime("CreatedTime")
1130
1131
# Create search attributes
1132
search_attributes = TypedSearchAttributes([
1133
SearchAttributePair(customer_id_key, "customer-123"),
1134
SearchAttributePair(order_amount_key, 99.99),
1135
SearchAttributePair(created_time_key, datetime.now())
1136
])
1137
1138
# Update search attributes
1139
updated_attributes = search_attributes.updated(
1140
SearchAttributePair(order_amount_key, 149.99)
1141
)
1142
```
1143
1144
### Custom Failure Converter
1145
1146
```python
1147
from temporalio.converter import FailureConverter, PayloadConverter
1148
from temporalio.exceptions import ApplicationError
1149
import temporalio.api.failure.v1
1150
1151
class CustomFailureConverter(FailureConverter):
1152
"""Custom failure converter with special error handling."""
1153
1154
def to_failure(
1155
self,
1156
exception: BaseException,
1157
payload_converter: PayloadConverter,
1158
failure: temporalio.api.failure.v1.Failure,
1159
) -> None:
1160
# Custom logic for converting exceptions to failures
1161
if isinstance(exception, MyCustomError):
1162
# Handle custom error type
1163
app_error = ApplicationError(
1164
str(exception),
1165
type="MyCustomError",
1166
non_retryable=True
1167
)
1168
app_error.__traceback__ = exception.__traceback__
1169
app_error.__cause__ = exception.__cause__
1170
# Delegate to default handling for ApplicationError
1171
super().to_failure(app_error, payload_converter, failure)
1172
else:
1173
# Use default handling for other errors
1174
super().to_failure(exception, payload_converter, failure)
1175
1176
def from_failure(
1177
self,
1178
failure: temporalio.api.failure.v1.Failure,
1179
payload_converter: PayloadConverter,
1180
) -> BaseException:
1181
# Custom logic for converting failures to exceptions
1182
if (failure.HasField("application_failure_info") and
1183
failure.application_failure_info.type == "MyCustomError"):
1184
return MyCustomError(failure.message)
1185
1186
# Use default handling for other failures
1187
return super().from_failure(failure, payload_converter)
1188
1189
class MyCustomError(Exception):
1190
"""Custom exception class."""
1191
pass
1192
```
1193
1194
## Integration with Workflows and Activities
1195
1196
The data conversion system integrates seamlessly with workflow and activity execution:
1197
1198
```python
1199
import temporalio
1200
from temporalio import workflow, activity
1201
from temporalio.converter import DataConverter
1202
from dataclasses import dataclass
1203
1204
@dataclass
1205
class OrderRequest:
1206
customer_id: str
1207
items: List[str]
1208
total: float
1209
1210
@activity.defn
1211
async def process_order(request: OrderRequest) -> str:
1212
# The request parameter is automatically deserialized
1213
# using the configured data converter
1214
return f"Processed order for {request.customer_id}"
1215
1216
@workflow.defn
1217
class OrderWorkflow:
1218
@workflow.run
1219
async def run(self, request: OrderRequest) -> str:
1220
# Both workflow parameters and activity parameters/returns
1221
# are handled by the data converter
1222
result = await workflow.execute_activity(
1223
process_order,
1224
request,
1225
start_to_close_timeout=timedelta(seconds=30)
1226
)
1227
return result
1228
```
1229
1230
The data conversion system provides a flexible and extensible foundation for handling all serialization needs in Temporal applications, from simple JSON encoding to complex custom formats with encryption and compression support.