Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.
npx @tessl/cli install tessl/pypi-temporalio@1.17.0A comprehensive Python SDK for building distributed, scalable, durable, and highly available workflows and activities using the Temporal orchestration engine. The SDK transforms async Python functions into distributed workflows backed by a fault-tolerant event loop, providing seamless integration with asyncio concepts while maintaining durability and reliability in distributed environments.
pip install temporaliofrom temporalio.client import Client
from temporalio.worker import Worker
from temporalio import workflow, activityFor advanced usage:
import temporalio
from temporalio import common, converter, exceptions, runtime, testing
from temporalio.common import RetryPolicy, SearchAttributeKey, MetricMeter
from temporalio.exceptions import ApplicationError, TemporalError, is_cancelled_exception
from temporalio.runtime import Runtime, TelemetryConfigFor integrations:
# Pydantic integration
from temporalio.contrib.pydantic import pydantic_data_converterfrom datetime import timedelta
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker
import asyncio
import concurrent.futures
# Define an activity
@activity.defn
async def say_hello(name: str) -> str:
return f"Hello, {name}!"
# Define a workflow
@workflow.defn
class SayHello:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
say_hello,
name,
schedule_to_close_timeout=timedelta(seconds=5)
)
# Create and run worker
async def run_worker():
client = await Client.connect("localhost:7233")
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[SayHello],
activities=[say_hello],
activity_executor=executor,
)
await worker.run()
# Execute workflow
async def run_workflow():
client = await Client.connect("localhost:7233")
result = await client.execute_workflow(
SayHello.run,
"World",
id="my-workflow-id",
task_queue="my-task-queue"
)
print(f"Result: {result}")
if __name__ == "__main__":
# Run worker in production or run_workflow() for testing
asyncio.run(run_worker())Temporal's architecture centers around several key concepts:
The Python SDK implements workflows using a custom asyncio event loop that provides distributed fault tolerance, deterministic execution, and seamless integration with Python's async/await patterns.
Connect to Temporal server, start workflows, manage executions, and handle schedules. The client provides the primary interface for interacting with the Temporal server.
class Client:
@classmethod
async def connect(
cls,
target_host: str = "localhost:7233",
*,
namespace: str = "default",
data_converter: DataConverter = None,
interceptors: Sequence[Interceptor] = [],
tls: TLSConfig | bool = False,
retry_config: RetryConfig = None,
keep_alive_config: KeepAliveConfig = None,
rpc_metadata: Mapping[str, str] = {},
identity: str = None,
lazy: bool = False,
runtime: Runtime = None,
http_connect_proxy_config: HttpConnectProxyConfig = None,
) -> Client: ...
async def execute_workflow(
self,
workflow: MethodAsyncNoParam[WorkflowReturnType] | str,
arg: Any = temporalio.common._arg_unset,
*,
id: str,
task_queue: str,
execution_timeout: timedelta = None,
run_timeout: timedelta = None,
task_timeout: timedelta = None,
id_reuse_policy: WorkflowIDReusePolicy = WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY,
id_conflict_policy: WorkflowIDConflictPolicy = WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: RetryPolicy = None,
cron_schedule: str = None,
memo: Mapping[str, Any] = None,
search_attributes: SearchAttributes = None,
start_signal: str = None,
start_signal_args: Sequence[Any] = [],
request_eager_start: bool = True,
) -> WorkflowReturnType: ...
async def start_workflow(
self,
workflow: MethodAsyncNoParam[WorkflowReturnType] | str,
arg: Any = temporalio.common._arg_unset,
*,
id: str,
task_queue: str,
execution_timeout: timedelta = None,
run_timeout: timedelta = None,
task_timeout: timedelta = None,
id_reuse_policy: WorkflowIDReusePolicy = WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY,
id_conflict_policy: WorkflowIDConflictPolicy = WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: RetryPolicy = None,
cron_schedule: str = None,
memo: Mapping[str, Any] = None,
search_attributes: SearchAttributes = None,
start_signal: str = None,
start_signal_args: Sequence[Any] = [],
request_eager_start: bool = True,
) -> WorkflowHandle[WorkflowReturnType, WorkflowReturnType]: ...
class WorkflowHandle(Generic[SelfType, ReturnType]):
@property
def id(self) -> str: ...
@property
def result_run_id(self) -> str | None: ...
async def result(self) -> ReturnType: ...
async def cancel(self) -> None: ...
async def terminate(self, reason: str = None) -> None: ...
async def signal(
self,
signal: MethodSyncOrAsyncSingleParam[SelfType, Any] | str,
arg: Any = temporalio.common._arg_unset,
) -> None: ...
async def query(
self,
query: MethodSyncOrAsyncNoParam[SelfType, ReturnType] | str,
arg: Any = temporalio.common._arg_unset,
) -> ReturnType: ...
async def start_update(
self,
update: MethodSyncOrAsyncSingleParam[SelfType, UpdateReturnType] | str,
arg: Any = temporalio.common._arg_unset,
*,
id: str = None,
wait_for_stage: WorkflowUpdateStage = WorkflowUpdateStage.COMPLETED,
) -> WorkflowUpdateHandle[UpdateReturnType]: ...Define workflows with signals, queries, updates, and child workflow management. Workflows contain the core business logic and coordinate activities and other workflows.
def defn(
cls: Type[MultiParamSpec],
*,
name: str = None,
dynamic: bool = False,
failure_exception_types: Sequence[type[BaseException]] = [],
versioning_intent: VersioningIntent = VersioningIntent.COMPATIBLE,
) -> Type[MultiParamSpec]: ...
def run(fn: MethodAsyncNoParam[SelfType, ReturnType]) -> MethodAsyncNoParam[SelfType, ReturnType]: ...
def signal(fn: MethodSyncOrAsyncSingleParam[SelfType, None] | None = None, *, name: str = None, dynamic: bool = False) -> Any: ...
def query(fn: MethodSyncOrAsyncNoParam[SelfType, ReturnType] | None = None, *, name: str = None, dynamic: bool = False) -> Any: ...
def update(fn: MethodSyncOrAsyncSingleParam[SelfType, ReturnType] | None = None, *, name: str = None, dynamic: bool = False) -> Any: ...
async def execute_activity(
activity: MethodSyncOrAsyncSingleParam[Any, ActivityReturnType] | str,
arg: Any = temporalio.common._arg_unset,
*,
activity_id: str = None,
task_queue: str = None,
schedule_to_close_timeout: timedelta = None,
schedule_to_start_timeout: timedelta = None,
start_to_close_timeout: timedelta = None,
heartbeat_timeout: timedelta = None,
retry_policy: RetryPolicy = None,
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
versioning_intent: VersioningIntent = VersioningIntent.UNSPECIFIED,
) -> ActivityReturnType: ...
async def execute_child_workflow(
workflow: MethodAsyncNoParam[Any, ChildWorkflowReturnType] | str,
arg: Any = temporalio.common._arg_unset,
*,
id: str = None,
task_queue: str = None,
execution_timeout: timedelta = None,
run_timeout: timedelta = None,
task_timeout: timedelta = None,
id_reuse_policy: WorkflowIDReusePolicy = WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY,
retry_policy: RetryPolicy = None,
cron_schedule: str = None,
parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
versioning_intent: VersioningIntent = VersioningIntent.COMPATIBLE,
) -> ChildWorkflowReturnType: ...
def info() -> Info: ...
async def sleep(seconds: float) -> None: ...
def now() -> datetime: ...
def uuid4() -> str: ...
class Info:
attempt: int
continued_run_id: str | None
cron_schedule: str | None
execution_timeout: timedelta | None
namespace: str
parent: ParentInfo | None
retry_policy: RetryPolicy | None
root: RootInfo | None
run_id: str
run_timeout: timedelta | None
search_attributes: SearchAttributes
start_time: datetime
task_queue: str
task_timeout: timedelta | None
workflow_id: str
workflow_type: strCreate activities with context access, heartbeating, cancellation handling, and different execution models (async, threaded, multiprocess).
def defn(
fn: CallableAsyncSingleParam[ActivityParam, ActivityReturnType] | None = None,
*,
name: str = None,
dynamic: bool = False,
) -> Any: ...
def info() -> Info: ...
def in_activity() -> bool: ...
async def heartbeat(*details: Any) -> None: ...
def is_cancelled() -> bool: ...
def is_worker_shutdown() -> bool: ...
def cancellation_details() -> ActivityCancellationDetails | None: ...
async def wait_for_cancelled() -> None: ...
def wait_for_cancelled_sync(timeout: timedelta = None) -> bool: ...
def wait_for_worker_shutdown_sync(timeout: timedelta = None) -> bool: ...
def raise_complete_async() -> NoReturn: ...
class Info:
activity_id: str
activity_type: str
attempt: int
current_attempt_scheduled_time: datetime
heartbeat_details: Sequence[Any]
heartbeat_timeout: timedelta | None
is_local: bool
local_retry_threshold: timedelta | None
schedule_to_close_timeout: timedelta | None
schedule_to_start_timeout: timedelta | None
scheduled_time: datetime
start_to_close_timeout: timedelta | None
started_time: datetime
task_queue: str
task_token: bytes
workflow_id: str
workflow_namespace: str
workflow_run_id: str
workflow_type: str
class ActivityCancellationDetails:
reason: str
details: Sequence[Any]Configure and run workers that execute workflows and activities, including thread pool management, task queue polling, and interceptor configuration.
class Worker:
def __init__(
self,
client: Client,
*,
task_queue: str,
activities: Sequence[Callable] = [],
workflows: Sequence[type] = [],
interceptors: Sequence[Interceptor] = [],
activity_executor: Executor = None,
workflow_task_executor: Executor = None,
workflow_runner: WorkflowRunner = None,
shared_state_manager: SharedStateManager = None,
debug_mode: bool = False,
on_fatal_error: Callable[[Exception], None] = None,
max_cached_workflows: int = 1000,
max_concurrent_workflow_task_polls: int = 100,
nonsticky_to_sticky_poll_ratio: float = 0.2,
max_concurrent_activity_task_polls: int = 100,
no_remote_activities: bool = False,
sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10),
max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60),
default_heartbeat_throttle_interval: timedelta = timedelta(seconds=1),
max_activities_per_second: float = None,
max_task_queue_activities_per_second: float = None,
graceful_shutdown_timeout: timedelta = timedelta(0),
use_worker_versioning: bool = True,
build_id: str = None,
identity: str = None,
tuner: WorkerTuner = None,
workflow_failure_exception_types: Sequence[type[BaseException]] = [],
): ...
async def run(self) -> None: ...
async def shutdown(self) -> None: ...
class WorkerConfig:
client: Client
task_queue: str
activities: Sequence[Callable]
workflows: Sequence[type]
interceptors: Sequence[Interceptor]
activity_executor: Executor | None
workflow_task_executor: Executor | None
debug_mode: bool
max_cached_workflows: int
max_concurrent_workflow_task_polls: int
max_concurrent_activity_task_polls: int
graceful_shutdown_timeout: timedeltaHandle serialization and deserialization of workflow and activity data, including custom payload converters, failure converters, and search attribute encoding.
def default() -> DataConverter: ...
class DataConverter:
def __init__(
self,
*,
payload_converter: PayloadConverter = None,
failure_converter: FailureConverter = None,
payload_codec: PayloadCodec = None,
): ...
async def encode(
self,
values: Sequence[Any]
) -> temporalio.api.common.v1.Payloads: ...
async def decode(
self,
payloads: temporalio.api.common.v1.Payloads,
type_hints: Sequence[type] = None
) -> list[Any]: ...
class PayloadConverter(ABC):
@abstractmethod
def to_payloads(self, values: Sequence[Any]) -> Sequence[temporalio.api.common.v1.Payload] | None: ...
@abstractmethod
def from_payloads(
self,
payloads: Sequence[temporalio.api.common.v1.Payload],
type_hints: Sequence[type] = None
) -> list[Any]: ...
class DefaultPayloadConverter(PayloadConverter):
def __init__(
self,
*,
encoding_payload_converters: Sequence[EncodingPayloadConverter] = None,
): ...
class JSONPlainPayloadConverter(EncodingPayloadConverter):
def __init__(
self,
*,
encoder: type[json.JSONEncoder] = AdvancedJSONEncoder,
decoder: Callable[[str], Any] = json.loads,
encoding: str = "utf-8",
): ...
def encode_search_attributes(search_attributes: SearchAttributes) -> temporalio.api.common.v1.SearchAttributes: ...
def decode_search_attributes(proto: temporalio.api.common.v1.SearchAttributes) -> SearchAttributes: ...Comprehensive testing utilities including workflow environments, activity environments, time skipping, and mocking capabilities.
class WorkflowEnvironment:
@classmethod
async def start_time_skipping(
cls,
*,
client: Client = None,
download_dest_dir: Path | str | None = Path("."),
dev_server_exe: str | None = None,
dev_server_existing_path: str | None = None,
dev_server_extra_args: Sequence[str] = [],
namespace: str = "default",
data_converter: DataConverter = None,
interceptors: Sequence[Interceptor] = [],
tls: TLSConfig | bool = False,
port: int | None = None,
ui: bool = True,
log_level: str = "warn",
log_format: str = "pretty",
database_filename: str | None = None,
sqlite_pragma: Mapping[str, str] = {},
) -> WorkflowEnvironment: ...
async def shutdown(self) -> None: ...
@property
def client(self) -> Client: ...
async def sleep(self, duration: timedelta) -> None: ...
class ActivityEnvironment:
def __init__(
self,
*,
info: activity.Info,
on_heartbeat: Callable[..., None] = None,
on_complete_async: Callable[[], None] = None,
cancelled: bool = False,
worker_shutdown: bool = False,
): ...
@contextmanager
def as_current(self) -> Iterator[None]: ...
async def new_random_task_queue() -> str: ...Core utilities, configuration objects, and types used throughout the SDK including retry policies, search attributes, metrics, and workflow configuration.
@dataclass
class RetryPolicy:
initial_interval: timedelta = timedelta(seconds=1)
backoff_coefficient: float = 2.0
maximum_interval: Optional[timedelta] = None
maximum_attempts: int = 0
non_retryable_error_types: Optional[Sequence[str]] = None
class WorkflowIDReusePolicy(IntEnum):
ALLOW_DUPLICATE = 0
ALLOW_DUPLICATE_FAILED_ONLY = 1
REJECT_DUPLICATE = 2
TERMINATE_IF_RUNNING = 3
class SearchAttributeKey(ABC, Generic[SearchAttributeValueType]):
@staticmethod
def for_text(name: str) -> SearchAttributeKey[str]: ...
@staticmethod
def for_keyword(name: str) -> SearchAttributeKey[str]: ...
@staticmethod
def for_int(name: str) -> SearchAttributeKey[int]: ...
class MetricMeter(ABC):
@abstractmethod
def create_counter(
self, name: str, description: Optional[str] = None, unit: Optional[str] = None
) -> MetricCounter: ...Comprehensive exception hierarchy for handling errors in workflows, activities, and client operations with detailed error information and utility functions.
class TemporalError(Exception):
@property
def cause(self) -> Optional[BaseException]: ...
class ApplicationError(FailureError):
def __init__(
self,
message: str,
*details: Any,
type: Optional[str] = None,
non_retryable: bool = False,
next_retry_delay: Optional[timedelta] = None,
category: ApplicationErrorCategory = ApplicationErrorCategory.UNSPECIFIED,
): ...
class ActivityError(FailureError):
@property
def activity_type(self) -> str: ...
@property
def retry_state(self) -> Optional[RetryState]: ...
def is_cancelled_exception(exception: BaseException) -> bool: ...Runtime and telemetry configuration for managing SDK resources, logging, metrics collection, and observability features.
class Runtime:
@staticmethod
def default() -> Runtime: ...
@staticmethod
def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None: ...
def __init__(self, *, telemetry: TelemetryConfig) -> None: ...
@dataclass(frozen=True)
class TelemetryConfig:
logging: Optional[LoggingConfig] = LoggingConfig.default
metrics: Optional[Union[OpenTelemetryConfig, PrometheusConfig, MetricBuffer]] = None
global_tags: Mapping[str, str] = field(default_factory=dict)
@dataclass(frozen=True)
class PrometheusConfig:
bind_address: str
counters_total_suffix: bool = False
durations_as_seconds: bool = FalsePydantic v2 data converter for automatic serialization and validation of Pydantic models with type safety and custom JSON options.
pydantic_data_converter = DataConverter(
payload_converter_class=PydanticPayloadConverter
)
class PydanticJSONPlainPayloadConverter(EncodingPayloadConverter):
def __init__(self, to_json_options: Optional[ToJsonOptions] = None): ...
@dataclass
class ToJsonOptions:
exclude_unset: bool = Falsefrom typing import TypeVar, Generic, Protocol, Callable, Any, Sequence, Mapping, Iterator
from datetime import datetime, timedelta
from enum import IntEnum, Enum
# Generic type variables
WorkflowReturnType = TypeVar("WorkflowReturnType")
ActivityReturnType = TypeVar("ActivityReturnType")
SelfType = TypeVar("SelfType")
ReturnType = TypeVar("ReturnType")
MultiParamSpec = TypeVar("MultiParamSpec")
# Protocol types for callables
class MethodAsyncNoParam(Protocol[SelfType, ReturnType]):
async def __call__(self: SelfType) -> ReturnType: ...
class MethodSyncOrAsyncSingleParam(Protocol[SelfType, ReturnType]):
def __call__(self: SelfType, arg: Any) -> ReturnType: ...
class CallableAsyncSingleParam(Protocol[ActivityParam, ActivityReturnType]):
async def __call__(self, arg: ActivityParam) -> ActivityReturnType: ...
# Configuration types
class RetryPolicy:
initial_interval: timedelta
backoff_coefficient: float
maximum_interval: timedelta
maximum_attempts: int
non_retryable_error_types: Sequence[str]
class TLSConfig:
server_root_ca_cert: bytes | None
domain: str | None
client_cert: bytes | None
client_private_key: bytes | None
# Enum types
class WorkflowIDReusePolicy(IntEnum):
ALLOW_DUPLICATE = 0
ALLOW_DUPLICATE_FAILED_ONLY = 1
REJECT_DUPLICATE = 2
TERMINATE_IF_RUNNING = 3
class WorkflowIDConflictPolicy(IntEnum):
UNSPECIFIED = 0
FAIL = 1
USE_EXISTING = 2
class ActivityCancellationType(IntEnum):
TRY_CANCEL = 0
WAIT_CANCELLATION_COMPLETED = 1
ABANDON = 2
class ChildWorkflowCancellationType(IntEnum):
WAIT_CANCELLATION_COMPLETED = 0
REQUEST_CANCEL = 1
ABANDON = 2
class ParentClosePolicy(IntEnum):
TERMINATE = 0
ABANDON = 1
REQUEST_CANCEL = 2
class VersioningIntent(Enum):
UNSPECIFIED = "Unspecified"
COMPATIBLE = "Compatible"
DEFAULT = "Default"
class WorkflowUpdateStage(IntEnum):
ADMITTED = 0
ACCEPTED = 1
COMPLETED = 2
# Search attributes
SearchAttributes = Mapping[SearchAttributeKey[Any], Any]
SearchAttributeKey = type[SearchAttributeKey[Any]]
# Common aliases
Executor = concurrent.futures.Executor
Interceptor = Any # Base type for interceptor implementations