Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.
Temporalio provides a comprehensive data conversion system that enables serialization and deserialization of Python values for workflow and activity parameters, return values, and other data used in the Temporal platform. The conversion system is modular and extensible, allowing custom serialization strategies while maintaining compatibility with other Temporal SDKs.
The DataConverter class is the main orchestrator for data conversion, combining payload conversion with optional encoding/encryption:
@dataclass(frozen=True)
class DataConverter:
"""Data converter for converting and encoding payloads to/from Python values.
This combines PayloadConverter which converts values with
PayloadCodec which encodes bytes.
"""
payload_converter_class: Type[PayloadConverter] = DefaultPayloadConverter
"""Class to instantiate for payload conversion."""
payload_codec: Optional[PayloadCodec] = None
"""Optional codec for encoding payload bytes."""
failure_converter_class: Type[FailureConverter] = DefaultFailureConverter
"""Class to instantiate for failure conversion."""
payload_converter: PayloadConverter = dataclasses.field(init=False)
"""Payload converter created from the payload_converter_class."""
failure_converter: FailureConverter = dataclasses.field(init=False)
"""Failure converter created from the failure_converter_class."""
default: ClassVar[DataConverter]
"""Singleton default data converter."""
async def encode(
self, values: Sequence[Any]
) -> List[temporalio.api.common.v1.Payload]:
"""Encode values into payloads.
First converts values to payloads then encodes payloads using codec.
Args:
values: Values to be converted and encoded.
Returns:
Converted and encoded payloads. Note, this does not have to be the
same number as values given, but must be at least one and cannot be
more than was given.
"""
async def decode(
self,
payloads: Sequence[temporalio.api.common.v1.Payload],
type_hints: Optional[List[Type]] = None,
) -> List[Any]:
"""Decode payloads into values.
First decodes payloads using codec then converts payloads to values.
Args:
payloads: Payloads to be decoded and converted.
type_hints: Optional type hints to guide conversion.
Returns:
Decoded and converted values.
"""
async def encode_wrapper(
self, values: Sequence[Any]
) -> temporalio.api.common.v1.Payloads:
"""encode() for the temporalio.api.common.v1.Payloads wrapper."""
async def decode_wrapper(
self,
payloads: Optional[temporalio.api.common.v1.Payloads],
type_hints: Optional[List[Type]] = None,
) -> List[Any]:
"""decode() for the temporalio.api.common.v1.Payloads wrapper."""
async def encode_failure(
self, exception: BaseException, failure: temporalio.api.failure.v1.Failure
) -> None:
"""Convert and encode failure."""
async def decode_failure(
self, failure: temporalio.api.failure.v1.Failure
) -> BaseException:
"""Decode and convert failure."""def default() -> DataConverter:
"""Default data converter.
.. deprecated::
Use DataConverter.default instead.
"""The PayloadConverter is the abstract base class for converting Python values to/from Temporal payloads:
class PayloadConverter(ABC):
"""Base payload converter to/from multiple payloads/values."""
default: ClassVar[PayloadConverter]
"""Default payload converter."""
@abstractmethod
def to_payloads(
self, values: Sequence[Any]
) -> List[temporalio.api.common.v1.Payload]:
"""Encode values into payloads.
Implementers are expected to just return the payload for
temporalio.common.RawValue.
Args:
values: Values to be converted.
Returns:
Converted payloads. Note, this does not have to be the same number
as values given, but must be at least one and cannot be more than
was given.
Raises:
Exception: Any issue during conversion.
"""
@abstractmethod
def from_payloads(
self,
payloads: Sequence[temporalio.api.common.v1.Payload],
type_hints: Optional[List[Type]] = None,
) -> List[Any]:
"""Decode payloads into values.
Implementers are expected to treat a type hint of
temporalio.common.RawValue as just the raw value.
Args:
payloads: Payloads to convert to Python values.
type_hints: Types that are expected if any. This may not have any
types if there are no annotations on the target. If this is
present, it must have the exact same length as payloads even if
the values are just "object".
Returns:
Collection of Python values. Note, this does not have to be the same
number as values given, but at least one must be present.
Raises:
Exception: Any issue during conversion.
"""
def to_payloads_wrapper(
self, values: Sequence[Any]
) -> temporalio.api.common.v1.Payloads:
"""to_payloads() for the temporalio.api.common.v1.Payloads wrapper."""
def from_payloads_wrapper(
self, payloads: Optional[temporalio.api.common.v1.Payloads]
) -> List[Any]:
"""from_payloads() for the temporalio.api.common.v1.Payloads wrapper."""
def to_payload(self, value: Any) -> temporalio.api.common.v1.Payload:
"""Convert a single value to a payload.
This is a shortcut for to_payloads() with a single-item list
and result.
Args:
value: Value to convert to a single payload.
Returns:
Single converted payload.
"""
def from_payload(
self,
payload: temporalio.api.common.v1.Payload,
type_hint: Optional[Type] = None,
) -> Any:
"""Convert a single payload to a value.
This is a shortcut for from_payloads() with a single-item list
and result.
Args:
payload: Payload to convert to value.
type_hint: Optional type hint to say which type to convert to.
Returns:
Single converted value.
"""For encoding-specific converters used with CompositePayloadConverter:
class EncodingPayloadConverter(ABC):
"""Base converter to/from single payload/value with a known encoding for use in CompositePayloadConverter."""
@property
@abstractmethod
def encoding(self) -> str:
"""Encoding for the payload this converter works with."""
@abstractmethod
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
"""Encode a single value to a payload or None.
Args:
value: Value to be converted.
Returns:
Payload of the value or None if unable to convert.
Raises:
TypeError: Value is not the expected type.
ValueError: Value is of the expected type but otherwise incorrect.
RuntimeError: General error during encoding.
"""
@abstractmethod
def from_payload(
self,
payload: temporalio.api.common.v1.Payload,
type_hint: Optional[Type] = None,
) -> Any:
"""Decode a single payload to a Python value or raise exception.
Args:
payload: Payload to convert to Python value.
type_hint: Type that is expected if any. This may not have a type if
there are no annotations on the target.
Return:
The decoded value from the payload. Since the encoding is checked by
the caller, this should raise an exception if the payload cannot be
converted.
Raises:
RuntimeError: General error during decoding.
"""Combines multiple encoding payload converters:
class CompositePayloadConverter(PayloadConverter):
"""Composite payload converter that delegates to a list of encoding payload converters.
Encoding/decoding are attempted on each payload converter successively until
it succeeds.
Attributes:
converters: Mapping of encoding bytes to payload converters.
"""
converters: Mapping[bytes, EncodingPayloadConverter]
def __init__(self, *converters: EncodingPayloadConverter) -> None:
"""Initializes the data converter.
Args:
converters: Payload converters to delegate to, in order.
"""The default implementation compatible with other Temporal SDKs:
class DefaultPayloadConverter(CompositePayloadConverter):
"""Default payload converter compatible with other Temporal SDKs.
This handles None, bytes, all protobuf message types, and any type that
json.dump accepts. A singleton instance of this is available at
PayloadConverter.default.
"""
default_encoding_payload_converters: Tuple[EncodingPayloadConverter, ...]
"""Default set of encoding payload converters the default payload converter
uses.
"""
def __init__(self) -> None:
"""Create a default payload converter."""Handles None values:
class BinaryNullPayloadConverter(EncodingPayloadConverter):
"""Converter for 'binary/null' payloads supporting None values."""
@property
def encoding(self) -> str:
"""Returns 'binary/null'."""
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
"""Convert None values to payload."""
def from_payload(
self,
payload: temporalio.api.common.v1.Payload,
type_hint: Optional[Type] = None,
) -> Any:
"""Convert payload back to None."""Handles bytes values:
class BinaryPlainPayloadConverter(EncodingPayloadConverter):
"""Converter for 'binary/plain' payloads supporting bytes values."""
@property
def encoding(self) -> str:
"""Returns 'binary/plain'."""
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
"""Convert bytes values to payload."""
def from_payload(
self,
payload: temporalio.api.common.v1.Payload,
type_hint: Optional[Type] = None,
) -> Any:
"""Convert payload back to bytes."""Handles Protocol Buffer messages using JSON encoding:
class JSONProtoPayloadConverter(EncodingPayloadConverter):
"""Converter for 'json/protobuf' payloads supporting protobuf Message values."""
def __init__(self, ignore_unknown_fields: bool = False):
"""Initialize a JSON proto converter.
Args:
ignore_unknown_fields: Determines whether converter should error if
unknown fields are detected
"""
@property
def encoding(self) -> str:
"""Returns 'json/protobuf'."""
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
"""Convert protobuf Message values to JSON payload."""
def from_payload(
self,
payload: temporalio.api.common.v1.Payload,
type_hint: Optional[Type] = None,
) -> Any:
"""Convert JSON payload back to protobuf Message."""Handles Protocol Buffer messages using binary encoding:
class BinaryProtoPayloadConverter(EncodingPayloadConverter):
"""Converter for 'binary/protobuf' payloads supporting protobuf Message values."""
@property
def encoding(self) -> str:
"""Returns 'binary/protobuf'."""
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
"""Convert protobuf Message values to binary payload."""
def from_payload(
self,
payload: temporalio.api.common.v1.Payload,
type_hint: Optional[Type] = None,
) -> Any:
"""Convert binary payload back to protobuf Message."""Primary converter for common Python values:
class JSONPlainPayloadConverter(EncodingPayloadConverter):
"""Converter for 'json/plain' payloads supporting common Python values.
For encoding, this supports all values that json.dump supports
and by default adds extra encoding support for dataclasses, classes with
dict() methods, and all iterables.
For decoding, this uses type hints to attempt to rebuild the type from the
type hint.
"""
def __init__(
self,
*,
encoder: Optional[Type[json.JSONEncoder]] = AdvancedJSONEncoder,
decoder: Optional[Type[json.JSONDecoder]] = None,
encoding: str = "json/plain",
custom_type_converters: Sequence[JSONTypeConverter] = [],
) -> None:
"""Initialize a JSON data converter.
Args:
encoder: Custom encoder class object to use.
decoder: Custom decoder class object to use.
encoding: Encoding name to use.
custom_type_converters: Set of custom type converters that are used
when converting from a payload to type-hinted values.
"""
@property
def encoding(self) -> str:
"""Get the encoding name."""
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
"""Convert Python values to JSON payload."""
def from_payload(
self,
payload: temporalio.api.common.v1.Payload,
type_hint: Optional[Type] = None,
) -> Any:
"""Convert JSON payload back to Python values using type hints."""Enhanced JSON encoder with support for additional Python types:
class AdvancedJSONEncoder(json.JSONEncoder):
"""Advanced JSON encoder.
This encoder supports dataclasses and all iterables as lists.
It also uses Pydantic v1's "dict" methods if available on the object,
but this is deprecated. Pydantic users should upgrade to v2 and use
temporalio.contrib.pydantic.pydantic_data_converter.
"""
def default(self, o: Any) -> Any:
"""Override JSON encoding default.
Supports:
- datetime objects (converted to ISO format)
- dataclasses (converted using dataclasses.asdict)
- Objects with dict() method (deprecated Pydantic v1 support)
- Non-list iterables (converted to lists)
- UUID objects (converted to strings)
See json.JSONEncoder.default.
"""Abstract base for custom type conversion during JSON decoding:
class JSONTypeConverter(ABC):
"""Converter for converting an object from Python json.loads
result (e.g. scalar, list, or dict) to a known type.
"""
Unhandled = _JSONTypeConverterUnhandled(object())
"""Sentinel value that must be used as the result of
to_typed_value to say the given type is not handled by this
converter."""
@abstractmethod
def to_typed_value(
self, hint: Type, value: Any
) -> Union[Optional[Any], _JSONTypeConverterUnhandled]:
"""Convert the given value to a type based on the given hint.
Args:
hint: Type hint to use to help in converting the value.
value: Value as returned by json.loads. Usually a scalar,
list, or dict.
Returns:
The converted value or Unhandled if this converter does
not handle this situation.
"""def value_to_type(
hint: Type,
value: Any,
custom_converters: Sequence[JSONTypeConverter] = [],
) -> Any:
"""Convert a given value to the given type hint.
This is used internally to convert a raw JSON loaded value to a specific
type hint.
Args:
hint: Type hint to convert the value to.
value: Raw value (e.g. primitive, dict, or list) to convert from.
custom_converters: Set of custom converters to try before doing default
conversion. Converters are tried in order and the first value that
is not JSONTypeConverter.Unhandled will be returned from
this function instead of doing default behavior.
Returns:
Converted value.
Raises:
TypeError: Unable to convert to the given hint.
"""Abstract base for encoding/decoding payloads (e.g., compression or encryption):
class PayloadCodec(ABC):
"""Codec for encoding/decoding to/from bytes.
Commonly used for compression or encryption.
"""
@abstractmethod
async def encode(
self, payloads: Sequence[temporalio.api.common.v1.Payload]
) -> List[temporalio.api.common.v1.Payload]:
"""Encode the given payloads.
Args:
payloads: Payloads to encode. This value should not be mutated.
Returns:
Encoded payloads. Note, this does not have to be the same number as
payloads given, but must be at least one and cannot be more than was
given.
"""
@abstractmethod
async def decode(
self, payloads: Sequence[temporalio.api.common.v1.Payload]
) -> List[temporalio.api.common.v1.Payload]:
"""Decode the given payloads.
Args:
payloads: Payloads to decode. This value should not be mutated.
Returns:
Decoded payloads. Note, this does not have to be the same number as
payloads given, but must be at least one and cannot be more than was
given.
"""
async def encode_wrapper(self, payloads: temporalio.api.common.v1.Payloads) -> None:
"""encode() for the temporalio.api.common.v1.Payloads wrapper.
This replaces the payloads within the wrapper.
"""
async def decode_wrapper(self, payloads: temporalio.api.common.v1.Payloads) -> None:
"""decode() for the temporalio.api.common.v1.Payloads wrapper.
This replaces the payloads within.
"""
async def encode_failure(self, failure: temporalio.api.failure.v1.Failure) -> None:
"""Encode payloads of a failure."""
async def decode_failure(self, failure: temporalio.api.failure.v1.Failure) -> None:
"""Decode payloads of a failure."""Abstract base for converting between Python exceptions and Temporal failures:
class FailureConverter(ABC):
"""Base failure converter to/from errors.
Note, for workflow exceptions, to_failure is only invoked if the
exception is an instance of temporalio.exceptions.FailureError.
Users should extend temporalio.exceptions.ApplicationError if
they want a custom workflow exception to work with this class.
"""
default: ClassVar[FailureConverter]
"""Default failure converter."""
@abstractmethod
def to_failure(
self,
exception: BaseException,
payload_converter: PayloadConverter,
failure: temporalio.api.failure.v1.Failure,
) -> None:
"""Convert the given exception to a Temporal failure.
Users should make sure not to alter the exception input.
Args:
exception: The exception to convert.
payload_converter: The payload converter to use if needed.
failure: The failure to update with error information.
"""
@abstractmethod
def from_failure(
self,
failure: temporalio.api.failure.v1.Failure,
payload_converter: PayloadConverter,
) -> BaseException:
"""Convert the given Temporal failure to an exception.
Users should make sure not to alter the failure input.
Args:
failure: The failure to convert.
payload_converter: The payload converter to use if needed.
Returns:
Converted error.
"""Standard implementation for failure conversion:
class DefaultFailureConverter(FailureConverter):
"""Default failure converter.
A singleton instance of this is available at
FailureConverter.default.
"""
def __init__(self, *, encode_common_attributes: bool = False) -> None:
"""Create the default failure converter.
Args:
encode_common_attributes: If True, the message and stack trace
of the failure will be moved into the encoded attribute section
of the failure which can be encoded with a codec.
"""Enhanced failure converter that encodes attributes for codec processing:
class DefaultFailureConverterWithEncodedAttributes(DefaultFailureConverter):
"""Implementation of DefaultFailureConverter which moves message
and stack trace to encoded attributes subject to a codec.
"""
def __init__(self) -> None:
"""Create a default failure converter with encoded attributes."""from typing import Union, Sequence, Mapping, List
from datetime import datetime
SearchAttributeValue = Union[str, int, float, bool, datetime, Sequence[str]]
SearchAttributeValues = Union[
List[str], List[int], List[float], List[bool], List[datetime]
]
SearchAttributes = Mapping[str, SearchAttributeValues]from enum import IntEnum
class SearchAttributeIndexedValueType(IntEnum):
"""Server index type of a search attribute."""
TEXT = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_TEXT)
KEYWORD = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_KEYWORD)
INT = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_INT)
DOUBLE = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_DOUBLE)
BOOL = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_BOOL)
DATETIME = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_DATETIME)
KEYWORD_LIST = int(
temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_KEYWORD_LIST
)Type-safe search attribute key:
from abc import ABC, abstractmethod
from typing import Generic, TypeVar
SearchAttributeValueType = TypeVar(
"SearchAttributeValueType", str, int, float, bool, datetime, Sequence[str]
)
class SearchAttributeKey(ABC, Generic[SearchAttributeValueType]):
"""Typed search attribute key representation.
Use one of the for static methods here to create a key.
"""
@property
@abstractmethod
def name(self) -> str:
"""Get the name of the key."""
@property
@abstractmethod
def indexed_value_type(self) -> SearchAttributeIndexedValueType:
"""Get the server index typed of the key"""
@property
@abstractmethod
def value_type(self) -> Type[SearchAttributeValueType]:
"""Get the Python type of value for the key.
This may contain generics which cannot be used in isinstance.
origin_value_type can be used instead.
"""
@property
def origin_value_type(self) -> Type:
"""Get the Python type of value for the key without generics."""
def value_set(
self, value: SearchAttributeValueType
) -> SearchAttributeUpdate[SearchAttributeValueType]:
"""Create a search attribute update to set the given value on this key."""
def value_unset(self) -> SearchAttributeUpdate[SearchAttributeValueType]:
"""Create a search attribute update to unset the value on this key."""
@staticmethod
def for_text(name: str) -> SearchAttributeKey[str]:
"""Create a 'Text' search attribute type."""
@staticmethod
def for_keyword(name: str) -> SearchAttributeKey[str]:
"""Create a 'Keyword' search attribute type."""
@staticmethod
def for_int(name: str) -> SearchAttributeKey[int]:
"""Create an 'Int' search attribute type."""
@staticmethod
def for_float(name: str) -> SearchAttributeKey[float]:
"""Create a 'Double' search attribute type."""
@staticmethod
def for_bool(name: str) -> SearchAttributeKey[bool]:
"""Create a 'Bool' search attribute type."""
@staticmethod
def for_datetime(name: str) -> SearchAttributeKey[datetime]:
"""Create a 'Datetime' search attribute type."""
@staticmethod
def for_keyword_list(name: str) -> SearchAttributeKey[Sequence[str]]:
"""Create a 'KeywordList' search attribute type."""from typing_extensions import NamedTuple
class SearchAttributePair(NamedTuple, Generic[SearchAttributeValueType]):
"""A named tuple representing a key/value search attribute pair."""
key: SearchAttributeKey[SearchAttributeValueType]
value: SearchAttributeValueType
class SearchAttributeUpdate(ABC, Generic[SearchAttributeValueType]):
"""Representation of a search attribute update."""
@property
@abstractmethod
def key(self) -> SearchAttributeKey[SearchAttributeValueType]:
"""Key that is being set."""
@property
@abstractmethod
def value(self) -> Optional[SearchAttributeValueType]:
"""Value that is being set or None if being unset."""from typing import Collection
class TypedSearchAttributes(Collection[SearchAttributePair]):
"""Collection of typed search attributes.
This is represented as an immutable collection of
SearchAttributePair. This can be created passing a sequence of
pairs to the constructor.
"""
search_attributes: Sequence[SearchAttributePair]
"""Underlying sequence of search attribute pairs. Do not mutate this, only
create new TypedSearchAttribute instances.
These are sorted by key name during construction. Duplicates cannot exist.
"""
empty: ClassVar[TypedSearchAttributes]
"""Class variable representing an empty set of attributes."""
def __len__(self) -> int:
"""Get the number of search attributes."""
def __getitem__(
self, key: SearchAttributeKey[SearchAttributeValueType]
) -> SearchAttributeValueType:
"""Get a single search attribute value by key or fail with KeyError."""
def __iter__(self) -> Iterator[SearchAttributePair]:
"""Get an iterator over search attribute key/value pairs."""
def __contains__(self, key: object) -> bool:
"""Check whether this search attribute contains the given key.
This uses key equality so the key must be the same name and type.
"""
def get(
self,
key: SearchAttributeKey[SearchAttributeValueType],
default: Optional[Any] = None,
) -> Any:
"""Get an attribute value for a key (or default). This is similar to dict.get."""
def updated(self, *search_attributes: SearchAttributePair) -> TypedSearchAttributes:
"""Copy this collection, replacing attributes with matching key names or
adding if key name not present.
"""def encode_search_attributes(
attributes: Union[
temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes
],
api: temporalio.api.common.v1.SearchAttributes,
) -> None:
"""Convert search attributes into an API message.
Args:
attributes: Search attributes to convert. The dictionary form of this is
DEPRECATED.
api: API message to set converted attributes on.
"""
def encode_typed_search_attribute_value(
key: temporalio.common.SearchAttributeKey[
temporalio.common.SearchAttributeValueType
],
value: Optional[temporalio.common.SearchAttributeValue],
) -> temporalio.api.common.v1.Payload:
"""Convert typed search attribute value into a payload.
Args:
key: Key for the value.
value: Value to convert.
Returns:
Payload for the value.
"""
def encode_search_attribute_values(
vals: temporalio.common.SearchAttributeValues,
) -> temporalio.api.common.v1.Payload:
"""Convert search attribute values into a payload.
.. deprecated::
Use typed search attributes instead.
Args:
vals: List of values to convert.
"""
def decode_search_attributes(
api: temporalio.api.common.v1.SearchAttributes,
) -> temporalio.common.SearchAttributes:
"""Decode API search attributes to values.
.. deprecated::
Use typed search attributes instead.
Args:
api: API message with search attribute values to convert.
Returns:
Converted search attribute values (new mapping every time).
"""
def decode_typed_search_attributes(
api: temporalio.api.common.v1.SearchAttributes,
) -> temporalio.common.TypedSearchAttributes:
"""Decode API search attributes to typed search attributes.
Args:
api: API message with search attribute values to convert.
Returns:
Typed search attribute collection (new object every time).
"""from dataclasses import dataclass
from datetime import timedelta
from typing import Optional, Sequence
@dataclass
class RetryPolicy:
"""Options for retrying workflows and activities."""
initial_interval: timedelta = timedelta(seconds=1)
"""Backoff interval for the first retry. Default 1s."""
backoff_coefficient: float = 2.0
"""Coefficient to multiply previous backoff interval by to get new
interval. Default 2.0.
"""
maximum_interval: Optional[timedelta] = None
"""Maximum backoff interval between retries. Default 100x
initial_interval.
"""
maximum_attempts: int = 0
"""Maximum number of attempts.
If 0, the default, there is no maximum.
"""
non_retryable_error_types: Optional[Sequence[str]] = None
"""List of error types that are not retryable."""
@staticmethod
def from_proto(proto: temporalio.api.common.v1.RetryPolicy) -> RetryPolicy:
"""Create a retry policy from the proto object."""
def apply_to_proto(self, proto: temporalio.api.common.v1.RetryPolicy) -> None:
"""Apply the fields in this policy to the given proto object."""class Priority:
"""Priority configuration for workflows and activities."""@dataclass(frozen=True)
class RawValue:
"""Representation of an unconverted, raw payload.
This type can be used as a parameter or return type in workflows,
activities, signals, and queries to pass through a raw payload.
Encoding/decoding of the payload is still done by the system.
"""
payload: temporalio.api.common.v1.Payloadimport json
from typing import Any, Optional, Type, List
from temporalio.converter import EncodingPayloadConverter
import temporalio.api.common.v1
class CustomJSONConverter(EncodingPayloadConverter):
"""Custom JSON converter with special handling."""
@property
def encoding(self) -> str:
return "json/custom"
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
if isinstance(value, (dict, list, str, int, float, bool)) or value is None:
return temporalio.api.common.v1.Payload(
metadata={"encoding": self.encoding.encode()},
data=json.dumps(value, separators=(',', ':')).encode()
)
return None
def from_payload(
self,
payload: temporalio.api.common.v1.Payload,
type_hint: Optional[Type] = None
) -> Any:
return json.loads(payload.data.decode())from temporalio.converter import DataConverter, DefaultPayloadConverter, PayloadCodec
import temporalio.api.common.v1
class EncryptionCodec(PayloadCodec):
"""Simple encryption codec example."""
async def encode(
self, payloads: List[temporalio.api.common.v1.Payload]
) -> List[temporalio.api.common.v1.Payload]:
# Implement encryption logic here
return payloads
async def decode(
self, payloads: List[temporalio.api.common.v1.Payload]
) -> List[temporalio.api.common.v1.Payload]:
# Implement decryption logic here
return payloads
# Create data converter with encryption
data_converter = DataConverter(
payload_converter_class=DefaultPayloadConverter,
payload_codec=EncryptionCodec()
)from temporalio.converter import JSONTypeConverter, value_to_type
from typing import Type, Any, Union
from datetime import date
class DateTypeConverter(JSONTypeConverter):
"""Convert ISO date strings to date objects."""
def to_typed_value(self, hint: Type, value: Any) -> Union[Any, object]:
if hint is date and isinstance(value, str):
try:
return date.fromisoformat(value)
except ValueError:
pass
return JSONTypeConverter.Unhandled
# Use with JSONPlainPayloadConverter
from temporalio.converter import JSONPlainPayloadConverter
converter = JSONPlainPayloadConverter(
custom_type_converters=[DateTypeConverter()]
)from temporalio.common import SearchAttributeKey, TypedSearchAttributes, SearchAttributePair
from datetime import datetime
# Define typed search attribute keys
customer_id_key = SearchAttributeKey.for_keyword("CustomerId")
order_amount_key = SearchAttributeKey.for_float("OrderAmount")
created_time_key = SearchAttributeKey.for_datetime("CreatedTime")
# Create search attributes
search_attributes = TypedSearchAttributes([
SearchAttributePair(customer_id_key, "customer-123"),
SearchAttributePair(order_amount_key, 99.99),
SearchAttributePair(created_time_key, datetime.now())
])
# Update search attributes
updated_attributes = search_attributes.updated(
SearchAttributePair(order_amount_key, 149.99)
)from temporalio.converter import FailureConverter, PayloadConverter
from temporalio.exceptions import ApplicationError
import temporalio.api.failure.v1
class CustomFailureConverter(FailureConverter):
"""Custom failure converter with special error handling."""
def to_failure(
self,
exception: BaseException,
payload_converter: PayloadConverter,
failure: temporalio.api.failure.v1.Failure,
) -> None:
# Custom logic for converting exceptions to failures
if isinstance(exception, MyCustomError):
# Handle custom error type
app_error = ApplicationError(
str(exception),
type="MyCustomError",
non_retryable=True
)
app_error.__traceback__ = exception.__traceback__
app_error.__cause__ = exception.__cause__
# Delegate to default handling for ApplicationError
super().to_failure(app_error, payload_converter, failure)
else:
# Use default handling for other errors
super().to_failure(exception, payload_converter, failure)
def from_failure(
self,
failure: temporalio.api.failure.v1.Failure,
payload_converter: PayloadConverter,
) -> BaseException:
# Custom logic for converting failures to exceptions
if (failure.HasField("application_failure_info") and
failure.application_failure_info.type == "MyCustomError"):
return MyCustomError(failure.message)
# Use default handling for other failures
return super().from_failure(failure, payload_converter)
class MyCustomError(Exception):
"""Custom exception class."""
passThe data conversion system integrates seamlessly with workflow and activity execution:
import temporalio
from temporalio import workflow, activity
from temporalio.converter import DataConverter
from dataclasses import dataclass
@dataclass
class OrderRequest:
customer_id: str
items: List[str]
total: float
@activity.defn
async def process_order(request: OrderRequest) -> str:
# The request parameter is automatically deserialized
# using the configured data converter
return f"Processed order for {request.customer_id}"
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, request: OrderRequest) -> str:
# Both workflow parameters and activity parameters/returns
# are handled by the data converter
result = await workflow.execute_activity(
process_order,
request,
start_to_close_timeout=timedelta(seconds=30)
)
return resultThe data conversion system provides a flexible and extensible foundation for handling all serialization needs in Temporal applications, from simple JSON encoding to complex custom formats with encryption and compression support.
Install with Tessl CLI
npx tessl i tessl/pypi-temporalio