Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.
Core utility classes, enums, and configuration objects used throughout the Temporal Python SDK. These classes provide fundamental building blocks for retry policies, workflow configuration, search attributes, metrics, and other common functionality.
Configure retry behavior for workflows and activities with exponential backoff and custom error handling.
@dataclass
class RetryPolicy:
initial_interval: timedelta = timedelta(seconds=1)
backoff_coefficient: float = 2.0
maximum_interval: Optional[timedelta] = None
maximum_attempts: int = 0
non_retryable_error_types: Optional[Sequence[str]] = None
@staticmethod
def from_proto(proto: temporalio.api.common.v1.RetryPolicy) -> RetryPolicy: ...
def apply_to_proto(self, proto: temporalio.api.common.v1.RetryPolicy) -> None: ...from temporalio.common import RetryPolicy
from datetime import timedelta
# Basic retry policy with exponential backoff
retry_policy = RetryPolicy(
initial_interval=timedelta(seconds=1),
backoff_coefficient=2.0,
maximum_interval=timedelta(minutes=10),
maximum_attempts=5,
non_retryable_error_types=["ValueError", "TypeError"]
)
# Use in workflow execution
result = await client.execute_workflow(
MyWorkflow.run,
"arg",
id="workflow-id",
task_queue="my-queue",
retry_policy=retry_policy
)Control how workflow IDs are handled when starting workflows with potentially conflicting identifiers.
class WorkflowIDReusePolicy(IntEnum):
ALLOW_DUPLICATE = 0
ALLOW_DUPLICATE_FAILED_ONLY = 1
REJECT_DUPLICATE = 2
TERMINATE_IF_RUNNING = 3
class WorkflowIDConflictPolicy(IntEnum):
UNSPECIFIED = 0
FAIL = 1
USE_EXISTING = 2
TERMINATE_EXISTING = 3Type-safe search attribute handling with strongly typed keys and values for workflow search and filtering.
class SearchAttributeKey(ABC, Generic[SearchAttributeValueType]):
@property
@abstractmethod
def name(self) -> str: ...
@property
@abstractmethod
def indexed_value_type(self) -> SearchAttributeIndexedValueType: ...
@property
@abstractmethod
def value_type(self) -> Type[SearchAttributeValueType]: ...
@staticmethod
def for_text(name: str) -> SearchAttributeKey[str]: ...
@staticmethod
def for_keyword(name: str) -> SearchAttributeKey[str]: ...
@staticmethod
def for_int(name: str) -> SearchAttributeKey[int]: ...
@staticmethod
def for_float(name: str) -> SearchAttributeKey[float]: ...
@staticmethod
def for_bool(name: str) -> SearchAttributeKey[bool]: ...
@staticmethod
def for_datetime(name: str) -> SearchAttributeKey[datetime]: ...
@staticmethod
def for_keyword_list(name: str) -> SearchAttributeKey[Sequence[str]]: ...
@dataclass(frozen=True)
class TypedSearchAttributes(Collection[SearchAttributePair]):
search_attributes: Sequence[SearchAttributePair]
def updated(self, *search_attributes: SearchAttributePair) -> TypedSearchAttributes: ...
class SearchAttributeUpdate(ABC, Generic[SearchAttributeValueType]):
@property
@abstractmethod
def key(self) -> SearchAttributeKey[SearchAttributeValueType]: ...
@property
@abstractmethod
def value(self) -> Optional[SearchAttributeValueType]: ...from temporalio.common import SearchAttributeKey, TypedSearchAttributes
from datetime import datetime
# Create typed search attribute keys
customer_id = SearchAttributeKey.for_keyword("CustomerId")
order_total = SearchAttributeKey.for_float("OrderTotal")
created_at = SearchAttributeKey.for_datetime("CreatedAt")
# Create search attributes
search_attributes = TypedSearchAttributes([
(customer_id, "customer-123"),
(order_total, 99.99),
(created_at, datetime.now())
])
# Use in workflow execution
result = await client.execute_workflow(
OrderWorkflow.run,
order_data,
id="order-workflow",
task_queue="orders",
search_attributes=search_attributes
)Comprehensive metrics system for monitoring workflows, activities, and SDK performance.
class MetricMeter(ABC):
noop: ClassVar[MetricMeter]
@abstractmethod
def create_counter(
self, name: str, description: Optional[str] = None, unit: Optional[str] = None
) -> MetricCounter: ...
@abstractmethod
def create_histogram(
self, name: str, description: Optional[str] = None, unit: Optional[str] = None
) -> MetricHistogram: ...
@abstractmethod
def create_histogram_float(
self, name: str, description: Optional[str] = None, unit: Optional[str] = None
) -> MetricHistogramFloat: ...
@abstractmethod
def create_histogram_timedelta(
self, name: str, description: Optional[str] = None, unit: Optional[str] = None
) -> MetricHistogramTimedelta: ...
@abstractmethod
def create_gauge(
self, name: str, description: Optional[str] = None, unit: Optional[str] = None
) -> MetricGauge: ...
@abstractmethod
def create_gauge_float(
self, name: str, description: Optional[str] = None, unit: Optional[str] = None
) -> MetricGaugeFloat: ...
class MetricCounter(MetricCommon):
@abstractmethod
def add(
self, value: int, additional_attributes: Optional[MetricAttributes] = None
) -> None: ...
class MetricHistogram(MetricCommon):
@abstractmethod
def record(
self, value: int, additional_attributes: Optional[MetricAttributes] = None
) -> None: ...
class MetricGauge(MetricCommon):
@abstractmethod
def set(
self, value: int, additional_attributes: Optional[MetricAttributes] = None
) -> None: ...Control task scheduling priority and fairness for workflows and activities.
@dataclass(frozen=True)
class Priority:
priority_key: Optional[int] = None
fairness_key: Optional[str] = None
fairness_weight: Optional[float] = None
default: ClassVar[Priority]from temporalio.common import Priority
# High priority workflow
high_priority = Priority(priority_key=1)
# Tenant-based fairness
tenant_priority = Priority(
priority_key=3,
fairness_key="tenant-abc",
fairness_weight=1.5
)Direct access to unconverted payload data for advanced use cases.
@dataclass(frozen=True)
class RawValue:
payload: temporalio.api.common.v1.PayloadControl worker versioning behavior for gradual deployments and rollbacks.
class VersioningBehavior(IntEnum):
UNSPECIFIED = 0
PINNED = 1
AUTO_UPGRADE = 2
@dataclass(frozen=True)
class WorkerDeploymentVersion:
deployment_name: str
build_id: str
def to_canonical_string(self) -> str: ...
@staticmethod
def from_canonical_string(canonical: str) -> WorkerDeploymentVersion: ...
class VersioningOverride(ABC):
@abstractmethod
def _to_proto(self) -> temporalio.api.workflow.v1.VersioningOverride: ...
@dataclass(frozen=True)
class PinnedVersioningOverride(VersioningOverride):
version: WorkerDeploymentVersion
@dataclass(frozen=True)
class AutoUpgradeVersioningOverride(VersioningOverride):
passConfigure when queries should be rejected based on workflow state.
class QueryRejectCondition(IntEnum):
NONE = 0
NOT_OPEN = 1
NOT_COMPLETED_CLEANLY = 2Control automatic header encoding and decoding behavior.
class HeaderCodecBehavior(IntEnum):
NO_CODEC = 1
CODEC = 2
WORKFLOW_ONLY_CODEC = 3from typing import TypeVar, Generic, Sequence, Mapping, List, Union, Optional
from datetime import datetime, timedelta
# Search attribute types
SearchAttributeValues = Union[
List[str], List[int], List[float], List[bool], List[datetime]
]
SearchAttributes = Mapping[str, SearchAttributeValues]
SearchAttributeValue = Union[str, int, float, bool, datetime, Sequence[str]]
SearchAttributeValueType = TypeVar("SearchAttributeValueType", str, int, float, bool, datetime, Sequence[str])
# Metrics types
MetricAttributes = Mapping[str, Union[str, int, float, bool]]
class SearchAttributeIndexedValueType(IntEnum):
TEXT = 0
KEYWORD = 1
INT = 2
DOUBLE = 3
BOOL = 4
DATETIME = 5
KEYWORD_LIST = 6Install with Tessl CLI
npx tessl i tessl/pypi-temporalio