CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-temporalio

Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.

Overview
Eval results
Files

data-conversion.mddocs/

Data Conversion

Temporalio provides a comprehensive data conversion system that enables serialization and deserialization of Python values for workflow and activity parameters, return values, and other data used in the Temporal platform. The conversion system is modular and extensible, allowing custom serialization strategies while maintaining compatibility with other Temporal SDKs.

Core Framework

DataConverter Class

The DataConverter class is the main orchestrator for data conversion, combining payload conversion with optional encoding/encryption:

@dataclass(frozen=True)
class DataConverter:
    """Data converter for converting and encoding payloads to/from Python values.

    This combines PayloadConverter which converts values with
    PayloadCodec which encodes bytes.
    """

    payload_converter_class: Type[PayloadConverter] = DefaultPayloadConverter
    """Class to instantiate for payload conversion."""

    payload_codec: Optional[PayloadCodec] = None
    """Optional codec for encoding payload bytes."""

    failure_converter_class: Type[FailureConverter] = DefaultFailureConverter
    """Class to instantiate for failure conversion."""

    payload_converter: PayloadConverter = dataclasses.field(init=False)
    """Payload converter created from the payload_converter_class."""

    failure_converter: FailureConverter = dataclasses.field(init=False)
    """Failure converter created from the failure_converter_class."""

    default: ClassVar[DataConverter]
    """Singleton default data converter."""

    async def encode(
        self, values: Sequence[Any]
    ) -> List[temporalio.api.common.v1.Payload]:
        """Encode values into payloads.

        First converts values to payloads then encodes payloads using codec.

        Args:
            values: Values to be converted and encoded.

        Returns:
            Converted and encoded payloads. Note, this does not have to be the
            same number as values given, but must be at least one and cannot be
            more than was given.
        """

    async def decode(
        self,
        payloads: Sequence[temporalio.api.common.v1.Payload],
        type_hints: Optional[List[Type]] = None,
    ) -> List[Any]:
        """Decode payloads into values.

        First decodes payloads using codec then converts payloads to values.

        Args:
            payloads: Payloads to be decoded and converted.
            type_hints: Optional type hints to guide conversion.

        Returns:
            Decoded and converted values.
        """

    async def encode_wrapper(
        self, values: Sequence[Any]
    ) -> temporalio.api.common.v1.Payloads:
        """encode() for the temporalio.api.common.v1.Payloads wrapper."""

    async def decode_wrapper(
        self,
        payloads: Optional[temporalio.api.common.v1.Payloads],
        type_hints: Optional[List[Type]] = None,
    ) -> List[Any]:
        """decode() for the temporalio.api.common.v1.Payloads wrapper."""

    async def encode_failure(
        self, exception: BaseException, failure: temporalio.api.failure.v1.Failure
    ) -> None:
        """Convert and encode failure."""

    async def decode_failure(
        self, failure: temporalio.api.failure.v1.Failure
    ) -> BaseException:
        """Decode and convert failure."""

Default Function

def default() -> DataConverter:
    """Default data converter.

    .. deprecated::
        Use DataConverter.default instead.
    """

Payload Conversion

PayloadConverter Base Class

The PayloadConverter is the abstract base class for converting Python values to/from Temporal payloads:

class PayloadConverter(ABC):
    """Base payload converter to/from multiple payloads/values."""

    default: ClassVar[PayloadConverter]
    """Default payload converter."""

    @abstractmethod
    def to_payloads(
        self, values: Sequence[Any]
    ) -> List[temporalio.api.common.v1.Payload]:
        """Encode values into payloads.

        Implementers are expected to just return the payload for
        temporalio.common.RawValue.

        Args:
            values: Values to be converted.

        Returns:
            Converted payloads. Note, this does not have to be the same number
            as values given, but must be at least one and cannot be more than
            was given.

        Raises:
            Exception: Any issue during conversion.
        """

    @abstractmethod
    def from_payloads(
        self,
        payloads: Sequence[temporalio.api.common.v1.Payload],
        type_hints: Optional[List[Type]] = None,
    ) -> List[Any]:
        """Decode payloads into values.

        Implementers are expected to treat a type hint of
        temporalio.common.RawValue as just the raw value.

        Args:
            payloads: Payloads to convert to Python values.
            type_hints: Types that are expected if any. This may not have any
                types if there are no annotations on the target. If this is
                present, it must have the exact same length as payloads even if
                the values are just "object".

        Returns:
            Collection of Python values. Note, this does not have to be the same
            number as values given, but at least one must be present.

        Raises:
            Exception: Any issue during conversion.
        """

    def to_payloads_wrapper(
        self, values: Sequence[Any]
    ) -> temporalio.api.common.v1.Payloads:
        """to_payloads() for the temporalio.api.common.v1.Payloads wrapper."""

    def from_payloads_wrapper(
        self, payloads: Optional[temporalio.api.common.v1.Payloads]
    ) -> List[Any]:
        """from_payloads() for the temporalio.api.common.v1.Payloads wrapper."""

    def to_payload(self, value: Any) -> temporalio.api.common.v1.Payload:
        """Convert a single value to a payload.

        This is a shortcut for to_payloads() with a single-item list
        and result.

        Args:
            value: Value to convert to a single payload.

        Returns:
            Single converted payload.
        """

    def from_payload(
        self,
        payload: temporalio.api.common.v1.Payload,
        type_hint: Optional[Type] = None,
    ) -> Any:
        """Convert a single payload to a value.

        This is a shortcut for from_payloads() with a single-item list
        and result.

        Args:
            payload: Payload to convert to value.
            type_hint: Optional type hint to say which type to convert to.

        Returns:
            Single converted value.
        """

EncodingPayloadConverter

For encoding-specific converters used with CompositePayloadConverter:

class EncodingPayloadConverter(ABC):
    """Base converter to/from single payload/value with a known encoding for use in CompositePayloadConverter."""

    @property
    @abstractmethod
    def encoding(self) -> str:
        """Encoding for the payload this converter works with."""

    @abstractmethod
    def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
        """Encode a single value to a payload or None.

        Args:
            value: Value to be converted.

        Returns:
            Payload of the value or None if unable to convert.

        Raises:
            TypeError: Value is not the expected type.
            ValueError: Value is of the expected type but otherwise incorrect.
            RuntimeError: General error during encoding.
        """

    @abstractmethod
    def from_payload(
        self,
        payload: temporalio.api.common.v1.Payload,
        type_hint: Optional[Type] = None,
    ) -> Any:
        """Decode a single payload to a Python value or raise exception.

        Args:
            payload: Payload to convert to Python value.
            type_hint: Type that is expected if any. This may not have a type if
                there are no annotations on the target.

        Return:
            The decoded value from the payload. Since the encoding is checked by
            the caller, this should raise an exception if the payload cannot be
            converted.

        Raises:
            RuntimeError: General error during decoding.
        """

CompositePayloadConverter

Combines multiple encoding payload converters:

class CompositePayloadConverter(PayloadConverter):
    """Composite payload converter that delegates to a list of encoding payload converters.

    Encoding/decoding are attempted on each payload converter successively until
    it succeeds.

    Attributes:
        converters: Mapping of encoding bytes to payload converters.
    """

    converters: Mapping[bytes, EncodingPayloadConverter]

    def __init__(self, *converters: EncodingPayloadConverter) -> None:
        """Initializes the data converter.

        Args:
            converters: Payload converters to delegate to, in order.
        """

DefaultPayloadConverter

The default implementation compatible with other Temporal SDKs:

class DefaultPayloadConverter(CompositePayloadConverter):
    """Default payload converter compatible with other Temporal SDKs.

    This handles None, bytes, all protobuf message types, and any type that
    json.dump accepts. A singleton instance of this is available at
    PayloadConverter.default.
    """

    default_encoding_payload_converters: Tuple[EncodingPayloadConverter, ...]
    """Default set of encoding payload converters the default payload converter
    uses.
    """

    def __init__(self) -> None:
        """Create a default payload converter."""

Built-in Converters

Binary Converters

BinaryNullPayloadConverter

Handles None values:

class BinaryNullPayloadConverter(EncodingPayloadConverter):
    """Converter for 'binary/null' payloads supporting None values."""

    @property
    def encoding(self) -> str:
        """Returns 'binary/null'."""

    def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
        """Convert None values to payload."""

    def from_payload(
        self,
        payload: temporalio.api.common.v1.Payload,
        type_hint: Optional[Type] = None,
    ) -> Any:
        """Convert payload back to None."""

BinaryPlainPayloadConverter

Handles bytes values:

class BinaryPlainPayloadConverter(EncodingPayloadConverter):
    """Converter for 'binary/plain' payloads supporting bytes values."""

    @property
    def encoding(self) -> str:
        """Returns 'binary/plain'."""

    def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
        """Convert bytes values to payload."""

    def from_payload(
        self,
        payload: temporalio.api.common.v1.Payload,
        type_hint: Optional[Type] = None,
    ) -> Any:
        """Convert payload back to bytes."""

Protobuf Converters

JSONProtoPayloadConverter

Handles Protocol Buffer messages using JSON encoding:

class JSONProtoPayloadConverter(EncodingPayloadConverter):
    """Converter for 'json/protobuf' payloads supporting protobuf Message values."""

    def __init__(self, ignore_unknown_fields: bool = False):
        """Initialize a JSON proto converter.

        Args:
            ignore_unknown_fields: Determines whether converter should error if
                unknown fields are detected
        """

    @property
    def encoding(self) -> str:
        """Returns 'json/protobuf'."""

    def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
        """Convert protobuf Message values to JSON payload."""

    def from_payload(
        self,
        payload: temporalio.api.common.v1.Payload,
        type_hint: Optional[Type] = None,
    ) -> Any:
        """Convert JSON payload back to protobuf Message."""

BinaryProtoPayloadConverter

Handles Protocol Buffer messages using binary encoding:

class BinaryProtoPayloadConverter(EncodingPayloadConverter):
    """Converter for 'binary/protobuf' payloads supporting protobuf Message values."""

    @property
    def encoding(self) -> str:
        """Returns 'binary/protobuf'."""

    def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
        """Convert protobuf Message values to binary payload."""

    def from_payload(
        self,
        payload: temporalio.api.common.v1.Payload,
        type_hint: Optional[Type] = None,
    ) -> Any:
        """Convert binary payload back to protobuf Message."""

JSON Handling

JSONPlainPayloadConverter

Primary converter for common Python values:

class JSONPlainPayloadConverter(EncodingPayloadConverter):
    """Converter for 'json/plain' payloads supporting common Python values.

    For encoding, this supports all values that json.dump supports
    and by default adds extra encoding support for dataclasses, classes with
    dict() methods, and all iterables.

    For decoding, this uses type hints to attempt to rebuild the type from the
    type hint.
    """

    def __init__(
        self,
        *,
        encoder: Optional[Type[json.JSONEncoder]] = AdvancedJSONEncoder,
        decoder: Optional[Type[json.JSONDecoder]] = None,
        encoding: str = "json/plain",
        custom_type_converters: Sequence[JSONTypeConverter] = [],
    ) -> None:
        """Initialize a JSON data converter.

        Args:
            encoder: Custom encoder class object to use.
            decoder: Custom decoder class object to use.
            encoding: Encoding name to use.
            custom_type_converters: Set of custom type converters that are used
                when converting from a payload to type-hinted values.
        """

    @property
    def encoding(self) -> str:
        """Get the encoding name."""

    def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
        """Convert Python values to JSON payload."""

    def from_payload(
        self,
        payload: temporalio.api.common.v1.Payload,
        type_hint: Optional[Type] = None,
    ) -> Any:
        """Convert JSON payload back to Python values using type hints."""

AdvancedJSONEncoder

Enhanced JSON encoder with support for additional Python types:

class AdvancedJSONEncoder(json.JSONEncoder):
    """Advanced JSON encoder.

    This encoder supports dataclasses and all iterables as lists.

    It also uses Pydantic v1's "dict" methods if available on the object,
    but this is deprecated. Pydantic users should upgrade to v2 and use
    temporalio.contrib.pydantic.pydantic_data_converter.
    """

    def default(self, o: Any) -> Any:
        """Override JSON encoding default.

        Supports:
        - datetime objects (converted to ISO format)
        - dataclasses (converted using dataclasses.asdict)
        - Objects with dict() method (deprecated Pydantic v1 support)
        - Non-list iterables (converted to lists)
        - UUID objects (converted to strings)

        See json.JSONEncoder.default.
        """

JSONTypeConverter

Abstract base for custom type conversion during JSON decoding:

class JSONTypeConverter(ABC):
    """Converter for converting an object from Python json.loads
    result (e.g. scalar, list, or dict) to a known type.
    """

    Unhandled = _JSONTypeConverterUnhandled(object())
    """Sentinel value that must be used as the result of
    to_typed_value to say the given type is not handled by this
    converter."""

    @abstractmethod
    def to_typed_value(
        self, hint: Type, value: Any
    ) -> Union[Optional[Any], _JSONTypeConverterUnhandled]:
        """Convert the given value to a type based on the given hint.

        Args:
            hint: Type hint to use to help in converting the value.
            value: Value as returned by json.loads. Usually a scalar,
                list, or dict.

        Returns:
            The converted value or Unhandled if this converter does
            not handle this situation.
        """

Type Conversion Utilities

def value_to_type(
    hint: Type,
    value: Any,
    custom_converters: Sequence[JSONTypeConverter] = [],
) -> Any:
    """Convert a given value to the given type hint.

    This is used internally to convert a raw JSON loaded value to a specific
    type hint.

    Args:
        hint: Type hint to convert the value to.
        value: Raw value (e.g. primitive, dict, or list) to convert from.
        custom_converters: Set of custom converters to try before doing default
            conversion. Converters are tried in order and the first value that
            is not JSONTypeConverter.Unhandled will be returned from
            this function instead of doing default behavior.

    Returns:
        Converted value.

    Raises:
        TypeError: Unable to convert to the given hint.
    """

Payload Codecs

PayloadCodec Base Class

Abstract base for encoding/decoding payloads (e.g., compression or encryption):

class PayloadCodec(ABC):
    """Codec for encoding/decoding to/from bytes.

    Commonly used for compression or encryption.
    """

    @abstractmethod
    async def encode(
        self, payloads: Sequence[temporalio.api.common.v1.Payload]
    ) -> List[temporalio.api.common.v1.Payload]:
        """Encode the given payloads.

        Args:
            payloads: Payloads to encode. This value should not be mutated.

        Returns:
            Encoded payloads. Note, this does not have to be the same number as
            payloads given, but must be at least one and cannot be more than was
            given.
        """

    @abstractmethod
    async def decode(
        self, payloads: Sequence[temporalio.api.common.v1.Payload]
    ) -> List[temporalio.api.common.v1.Payload]:
        """Decode the given payloads.

        Args:
            payloads: Payloads to decode. This value should not be mutated.

        Returns:
            Decoded payloads. Note, this does not have to be the same number as
            payloads given, but must be at least one and cannot be more than was
            given.
        """

    async def encode_wrapper(self, payloads: temporalio.api.common.v1.Payloads) -> None:
        """encode() for the temporalio.api.common.v1.Payloads wrapper.

        This replaces the payloads within the wrapper.
        """

    async def decode_wrapper(self, payloads: temporalio.api.common.v1.Payloads) -> None:
        """decode() for the temporalio.api.common.v1.Payloads wrapper.

        This replaces the payloads within.
        """

    async def encode_failure(self, failure: temporalio.api.failure.v1.Failure) -> None:
        """Encode payloads of a failure."""

    async def decode_failure(self, failure: temporalio.api.failure.v1.Failure) -> None:
        """Decode payloads of a failure."""

Failure Conversion

FailureConverter Base Class

Abstract base for converting between Python exceptions and Temporal failures:

class FailureConverter(ABC):
    """Base failure converter to/from errors.

    Note, for workflow exceptions, to_failure is only invoked if the
    exception is an instance of temporalio.exceptions.FailureError.
    Users should extend temporalio.exceptions.ApplicationError if
    they want a custom workflow exception to work with this class.
    """

    default: ClassVar[FailureConverter]
    """Default failure converter."""

    @abstractmethod
    def to_failure(
        self,
        exception: BaseException,
        payload_converter: PayloadConverter,
        failure: temporalio.api.failure.v1.Failure,
    ) -> None:
        """Convert the given exception to a Temporal failure.

        Users should make sure not to alter the exception input.

        Args:
            exception: The exception to convert.
            payload_converter: The payload converter to use if needed.
            failure: The failure to update with error information.
        """

    @abstractmethod
    def from_failure(
        self,
        failure: temporalio.api.failure.v1.Failure,
        payload_converter: PayloadConverter,
    ) -> BaseException:
        """Convert the given Temporal failure to an exception.

        Users should make sure not to alter the failure input.

        Args:
            failure: The failure to convert.
            payload_converter: The payload converter to use if needed.

        Returns:
            Converted error.
        """

DefaultFailureConverter

Standard implementation for failure conversion:

class DefaultFailureConverter(FailureConverter):
    """Default failure converter.

    A singleton instance of this is available at
    FailureConverter.default.
    """

    def __init__(self, *, encode_common_attributes: bool = False) -> None:
        """Create the default failure converter.

        Args:
            encode_common_attributes: If True, the message and stack trace
                of the failure will be moved into the encoded attribute section
                of the failure which can be encoded with a codec.
        """

DefaultFailureConverterWithEncodedAttributes

Enhanced failure converter that encodes attributes for codec processing:

class DefaultFailureConverterWithEncodedAttributes(DefaultFailureConverter):
    """Implementation of DefaultFailureConverter which moves message
    and stack trace to encoded attributes subject to a codec.
    """

    def __init__(self) -> None:
        """Create a default failure converter with encoded attributes."""

Search Attributes

Core Types

from typing import Union, Sequence, Mapping, List
from datetime import datetime

SearchAttributeValue = Union[str, int, float, bool, datetime, Sequence[str]]
SearchAttributeValues = Union[
    List[str], List[int], List[float], List[bool], List[datetime]
]
SearchAttributes = Mapping[str, SearchAttributeValues]

SearchAttributeIndexedValueType

from enum import IntEnum

class SearchAttributeIndexedValueType(IntEnum):
    """Server index type of a search attribute."""

    TEXT = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_TEXT)
    KEYWORD = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_KEYWORD)
    INT = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_INT)
    DOUBLE = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_DOUBLE)
    BOOL = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_BOOL)
    DATETIME = int(temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_DATETIME)
    KEYWORD_LIST = int(
        temporalio.api.enums.v1.IndexedValueType.INDEXED_VALUE_TYPE_KEYWORD_LIST
    )

SearchAttributeKey

Type-safe search attribute key:

from abc import ABC, abstractmethod
from typing import Generic, TypeVar

SearchAttributeValueType = TypeVar(
    "SearchAttributeValueType", str, int, float, bool, datetime, Sequence[str]
)

class SearchAttributeKey(ABC, Generic[SearchAttributeValueType]):
    """Typed search attribute key representation.

    Use one of the for static methods here to create a key.
    """

    @property
    @abstractmethod
    def name(self) -> str:
        """Get the name of the key."""

    @property
    @abstractmethod
    def indexed_value_type(self) -> SearchAttributeIndexedValueType:
        """Get the server index typed of the key"""

    @property
    @abstractmethod
    def value_type(self) -> Type[SearchAttributeValueType]:
        """Get the Python type of value for the key.

        This may contain generics which cannot be used in isinstance.
        origin_value_type can be used instead.
        """

    @property
    def origin_value_type(self) -> Type:
        """Get the Python type of value for the key without generics."""

    def value_set(
        self, value: SearchAttributeValueType
    ) -> SearchAttributeUpdate[SearchAttributeValueType]:
        """Create a search attribute update to set the given value on this key."""

    def value_unset(self) -> SearchAttributeUpdate[SearchAttributeValueType]:
        """Create a search attribute update to unset the value on this key."""

    @staticmethod
    def for_text(name: str) -> SearchAttributeKey[str]:
        """Create a 'Text' search attribute type."""

    @staticmethod
    def for_keyword(name: str) -> SearchAttributeKey[str]:
        """Create a 'Keyword' search attribute type."""

    @staticmethod
    def for_int(name: str) -> SearchAttributeKey[int]:
        """Create an 'Int' search attribute type."""

    @staticmethod
    def for_float(name: str) -> SearchAttributeKey[float]:
        """Create a 'Double' search attribute type."""

    @staticmethod
    def for_bool(name: str) -> SearchAttributeKey[bool]:
        """Create a 'Bool' search attribute type."""

    @staticmethod
    def for_datetime(name: str) -> SearchAttributeKey[datetime]:
        """Create a 'Datetime' search attribute type."""

    @staticmethod
    def for_keyword_list(name: str) -> SearchAttributeKey[Sequence[str]]:
        """Create a 'KeywordList' search attribute type."""

SearchAttributePair and Updates

from typing_extensions import NamedTuple

class SearchAttributePair(NamedTuple, Generic[SearchAttributeValueType]):
    """A named tuple representing a key/value search attribute pair."""

    key: SearchAttributeKey[SearchAttributeValueType]
    value: SearchAttributeValueType

class SearchAttributeUpdate(ABC, Generic[SearchAttributeValueType]):
    """Representation of a search attribute update."""

    @property
    @abstractmethod
    def key(self) -> SearchAttributeKey[SearchAttributeValueType]:
        """Key that is being set."""

    @property
    @abstractmethod
    def value(self) -> Optional[SearchAttributeValueType]:
        """Value that is being set or None if being unset."""

TypedSearchAttributes

from typing import Collection

class TypedSearchAttributes(Collection[SearchAttributePair]):
    """Collection of typed search attributes.

    This is represented as an immutable collection of
    SearchAttributePair. This can be created passing a sequence of
    pairs to the constructor.
    """

    search_attributes: Sequence[SearchAttributePair]
    """Underlying sequence of search attribute pairs. Do not mutate this, only
    create new TypedSearchAttribute instances.

    These are sorted by key name during construction. Duplicates cannot exist.
    """

    empty: ClassVar[TypedSearchAttributes]
    """Class variable representing an empty set of attributes."""

    def __len__(self) -> int:
        """Get the number of search attributes."""

    def __getitem__(
        self, key: SearchAttributeKey[SearchAttributeValueType]
    ) -> SearchAttributeValueType:
        """Get a single search attribute value by key or fail with KeyError."""

    def __iter__(self) -> Iterator[SearchAttributePair]:
        """Get an iterator over search attribute key/value pairs."""

    def __contains__(self, key: object) -> bool:
        """Check whether this search attribute contains the given key.

        This uses key equality so the key must be the same name and type.
        """

    def get(
        self,
        key: SearchAttributeKey[SearchAttributeValueType],
        default: Optional[Any] = None,
    ) -> Any:
        """Get an attribute value for a key (or default). This is similar to dict.get."""

    def updated(self, *search_attributes: SearchAttributePair) -> TypedSearchAttributes:
        """Copy this collection, replacing attributes with matching key names or
        adding if key name not present.
        """

Search Attribute Functions

def encode_search_attributes(
    attributes: Union[
        temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes
    ],
    api: temporalio.api.common.v1.SearchAttributes,
) -> None:
    """Convert search attributes into an API message.

    Args:
        attributes: Search attributes to convert. The dictionary form of this is
            DEPRECATED.
        api: API message to set converted attributes on.
    """

def encode_typed_search_attribute_value(
    key: temporalio.common.SearchAttributeKey[
        temporalio.common.SearchAttributeValueType
    ],
    value: Optional[temporalio.common.SearchAttributeValue],
) -> temporalio.api.common.v1.Payload:
    """Convert typed search attribute value into a payload.

    Args:
        key: Key for the value.
        value: Value to convert.

    Returns:
        Payload for the value.
    """

def encode_search_attribute_values(
    vals: temporalio.common.SearchAttributeValues,
) -> temporalio.api.common.v1.Payload:
    """Convert search attribute values into a payload.

    .. deprecated::
        Use typed search attributes instead.

    Args:
        vals: List of values to convert.
    """

def decode_search_attributes(
    api: temporalio.api.common.v1.SearchAttributes,
) -> temporalio.common.SearchAttributes:
    """Decode API search attributes to values.

    .. deprecated::
        Use typed search attributes instead.

    Args:
        api: API message with search attribute values to convert.

    Returns:
        Converted search attribute values (new mapping every time).
    """

def decode_typed_search_attributes(
    api: temporalio.api.common.v1.SearchAttributes,
) -> temporalio.common.TypedSearchAttributes:
    """Decode API search attributes to typed search attributes.

    Args:
        api: API message with search attribute values to convert.

    Returns:
        Typed search attribute collection (new object every time).
    """

Common Configuration Types

RetryPolicy

from dataclasses import dataclass
from datetime import timedelta
from typing import Optional, Sequence

@dataclass
class RetryPolicy:
    """Options for retrying workflows and activities."""

    initial_interval: timedelta = timedelta(seconds=1)
    """Backoff interval for the first retry. Default 1s."""

    backoff_coefficient: float = 2.0
    """Coefficient to multiply previous backoff interval by to get new
    interval. Default 2.0.
    """

    maximum_interval: Optional[timedelta] = None
    """Maximum backoff interval between retries. Default 100x
    initial_interval.
    """

    maximum_attempts: int = 0
    """Maximum number of attempts.

    If 0, the default, there is no maximum.
    """

    non_retryable_error_types: Optional[Sequence[str]] = None
    """List of error types that are not retryable."""

    @staticmethod
    def from_proto(proto: temporalio.api.common.v1.RetryPolicy) -> RetryPolicy:
        """Create a retry policy from the proto object."""

    def apply_to_proto(self, proto: temporalio.api.common.v1.RetryPolicy) -> None:
        """Apply the fields in this policy to the given proto object."""

Priority

class Priority:
    """Priority configuration for workflows and activities."""

RawValue

@dataclass(frozen=True)
class RawValue:
    """Representation of an unconverted, raw payload.

    This type can be used as a parameter or return type in workflows,
    activities, signals, and queries to pass through a raw payload.
    Encoding/decoding of the payload is still done by the system.
    """

    payload: temporalio.api.common.v1.Payload

Usage Examples

Custom Payload Converter

import json
from typing import Any, Optional, Type, List
from temporalio.converter import EncodingPayloadConverter
import temporalio.api.common.v1

class CustomJSONConverter(EncodingPayloadConverter):
    """Custom JSON converter with special handling."""

    @property
    def encoding(self) -> str:
        return "json/custom"

    def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:
        if isinstance(value, (dict, list, str, int, float, bool)) or value is None:
            return temporalio.api.common.v1.Payload(
                metadata={"encoding": self.encoding.encode()},
                data=json.dumps(value, separators=(',', ':')).encode()
            )
        return None

    def from_payload(
        self,
        payload: temporalio.api.common.v1.Payload,
        type_hint: Optional[Type] = None
    ) -> Any:
        return json.loads(payload.data.decode())

Custom Data Converter with Codec

from temporalio.converter import DataConverter, DefaultPayloadConverter, PayloadCodec
import temporalio.api.common.v1

class EncryptionCodec(PayloadCodec):
    """Simple encryption codec example."""

    async def encode(
        self, payloads: List[temporalio.api.common.v1.Payload]
    ) -> List[temporalio.api.common.v1.Payload]:
        # Implement encryption logic here
        return payloads

    async def decode(
        self, payloads: List[temporalio.api.common.v1.Payload]
    ) -> List[temporalio.api.common.v1.Payload]:
        # Implement decryption logic here
        return payloads

# Create data converter with encryption
data_converter = DataConverter(
    payload_converter_class=DefaultPayloadConverter,
    payload_codec=EncryptionCodec()
)

Custom JSON Type Converter

from temporalio.converter import JSONTypeConverter, value_to_type
from typing import Type, Any, Union
from datetime import date

class DateTypeConverter(JSONTypeConverter):
    """Convert ISO date strings to date objects."""

    def to_typed_value(self, hint: Type, value: Any) -> Union[Any, object]:
        if hint is date and isinstance(value, str):
            try:
                return date.fromisoformat(value)
            except ValueError:
                pass
        return JSONTypeConverter.Unhandled

# Use with JSONPlainPayloadConverter
from temporalio.converter import JSONPlainPayloadConverter

converter = JSONPlainPayloadConverter(
    custom_type_converters=[DateTypeConverter()]
)

Working with Search Attributes

from temporalio.common import SearchAttributeKey, TypedSearchAttributes, SearchAttributePair
from datetime import datetime

# Define typed search attribute keys
customer_id_key = SearchAttributeKey.for_keyword("CustomerId")
order_amount_key = SearchAttributeKey.for_float("OrderAmount")
created_time_key = SearchAttributeKey.for_datetime("CreatedTime")

# Create search attributes
search_attributes = TypedSearchAttributes([
    SearchAttributePair(customer_id_key, "customer-123"),
    SearchAttributePair(order_amount_key, 99.99),
    SearchAttributePair(created_time_key, datetime.now())
])

# Update search attributes
updated_attributes = search_attributes.updated(
    SearchAttributePair(order_amount_key, 149.99)
)

Custom Failure Converter

from temporalio.converter import FailureConverter, PayloadConverter
from temporalio.exceptions import ApplicationError
import temporalio.api.failure.v1

class CustomFailureConverter(FailureConverter):
    """Custom failure converter with special error handling."""

    def to_failure(
        self,
        exception: BaseException,
        payload_converter: PayloadConverter,
        failure: temporalio.api.failure.v1.Failure,
    ) -> None:
        # Custom logic for converting exceptions to failures
        if isinstance(exception, MyCustomError):
            # Handle custom error type
            app_error = ApplicationError(
                str(exception),
                type="MyCustomError",
                non_retryable=True
            )
            app_error.__traceback__ = exception.__traceback__
            app_error.__cause__ = exception.__cause__
            # Delegate to default handling for ApplicationError
            super().to_failure(app_error, payload_converter, failure)
        else:
            # Use default handling for other errors
            super().to_failure(exception, payload_converter, failure)

    def from_failure(
        self,
        failure: temporalio.api.failure.v1.Failure,
        payload_converter: PayloadConverter,
    ) -> BaseException:
        # Custom logic for converting failures to exceptions
        if (failure.HasField("application_failure_info") and
            failure.application_failure_info.type == "MyCustomError"):
            return MyCustomError(failure.message)

        # Use default handling for other failures
        return super().from_failure(failure, payload_converter)

class MyCustomError(Exception):
    """Custom exception class."""
    pass

Integration with Workflows and Activities

The data conversion system integrates seamlessly with workflow and activity execution:

import temporalio
from temporalio import workflow, activity
from temporalio.converter import DataConverter
from dataclasses import dataclass

@dataclass
class OrderRequest:
    customer_id: str
    items: List[str]
    total: float

@activity.defn
async def process_order(request: OrderRequest) -> str:
    # The request parameter is automatically deserialized
    # using the configured data converter
    return f"Processed order for {request.customer_id}"

@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, request: OrderRequest) -> str:
        # Both workflow parameters and activity parameters/returns
        # are handled by the data converter
        result = await workflow.execute_activity(
            process_order,
            request,
            start_to_close_timeout=timedelta(seconds=30)
        )
        return result

The data conversion system provides a flexible and extensible foundation for handling all serialization needs in Temporal applications, from simple JSON encoding to complex custom formats with encryption and compression support.

Install with Tessl CLI

npx tessl i tessl/pypi-temporalio

docs

activity.md

client.md

common.md

contrib-pydantic.md

data-conversion.md

exceptions.md

index.md

runtime.md

testing.md

worker.md

workflow.md

tile.json