or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

activity.mdclient.mdcommon.mdcontrib-pydantic.mddata-conversion.mdexceptions.mdindex.mdruntime.mdtesting.mdworker.mdworkflow.md
tile.json

tessl/pypi-temporalio

Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/temporalio@1.17.x

To install, run

npx @tessl/cli install tessl/pypi-temporalio@1.17.0

index.mddocs/

Temporalio

A comprehensive Python SDK for building distributed, scalable, durable, and highly available workflows and activities using the Temporal orchestration engine. The SDK transforms async Python functions into distributed workflows backed by a fault-tolerant event loop, providing seamless integration with asyncio concepts while maintaining durability and reliability in distributed environments.

Package Information

  • Package Name: temporalio
  • Language: Python
  • Installation: pip install temporalio

Core Imports

from temporalio.client import Client
from temporalio.worker import Worker
from temporalio import workflow, activity

For advanced usage:

import temporalio
from temporalio import common, converter, exceptions, runtime, testing
from temporalio.common import RetryPolicy, SearchAttributeKey, MetricMeter
from temporalio.exceptions import ApplicationError, TemporalError, is_cancelled_exception
from temporalio.runtime import Runtime, TelemetryConfig

For integrations:

# Pydantic integration
from temporalio.contrib.pydantic import pydantic_data_converter

Basic Usage

from datetime import timedelta
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker
import asyncio
import concurrent.futures

# Define an activity
@activity.defn
async def say_hello(name: str) -> str:
    return f"Hello, {name}!"

# Define a workflow
@workflow.defn
class SayHello:
    @workflow.run
    async def run(self, name: str) -> str:
        return await workflow.execute_activity(
            say_hello,
            name,
            schedule_to_close_timeout=timedelta(seconds=5)
        )

# Create and run worker
async def run_worker():
    client = await Client.connect("localhost:7233")

    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
        worker = Worker(
            client,
            task_queue="my-task-queue",
            workflows=[SayHello],
            activities=[say_hello],
            activity_executor=executor,
        )
        await worker.run()

# Execute workflow
async def run_workflow():
    client = await Client.connect("localhost:7233")

    result = await client.execute_workflow(
        SayHello.run,
        "World",
        id="my-workflow-id",
        task_queue="my-task-queue"
    )
    print(f"Result: {result}")

if __name__ == "__main__":
    # Run worker in production or run_workflow() for testing
    asyncio.run(run_worker())

Architecture

Temporal's architecture centers around several key concepts:

  • Workflows: Long-running, fault-tolerant business logic that coordinates activities and maintains state
  • Activities: Individual units of work that can fail and be retried, typically I/O operations or external service calls
  • Workers: Processes that execute workflows and activities by polling task queues
  • Client: Interface for starting workflows, sending signals, and querying workflow state
  • Task Queues: Named queues that route work between clients and workers

The Python SDK implements workflows using a custom asyncio event loop that provides distributed fault tolerance, deterministic execution, and seamless integration with Python's async/await patterns.

Capabilities

Client Operations

Connect to Temporal server, start workflows, manage executions, and handle schedules. The client provides the primary interface for interacting with the Temporal server.

class Client:
    @classmethod
    async def connect(
        cls,
        target_host: str = "localhost:7233",
        *,
        namespace: str = "default",
        data_converter: DataConverter = None,
        interceptors: Sequence[Interceptor] = [],
        tls: TLSConfig | bool = False,
        retry_config: RetryConfig = None,
        keep_alive_config: KeepAliveConfig = None,
        rpc_metadata: Mapping[str, str] = {},
        identity: str = None,
        lazy: bool = False,
        runtime: Runtime = None,
        http_connect_proxy_config: HttpConnectProxyConfig = None,
    ) -> Client: ...

    async def execute_workflow(
        self,
        workflow: MethodAsyncNoParam[WorkflowReturnType] | str,
        arg: Any = temporalio.common._arg_unset,
        *,
        id: str,
        task_queue: str,
        execution_timeout: timedelta = None,
        run_timeout: timedelta = None,
        task_timeout: timedelta = None,
        id_reuse_policy: WorkflowIDReusePolicy = WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY,
        id_conflict_policy: WorkflowIDConflictPolicy = WorkflowIDConflictPolicy.UNSPECIFIED,
        retry_policy: RetryPolicy = None,
        cron_schedule: str = None,
        memo: Mapping[str, Any] = None,
        search_attributes: SearchAttributes = None,
        start_signal: str = None,
        start_signal_args: Sequence[Any] = [],
        request_eager_start: bool = True,
    ) -> WorkflowReturnType: ...

    async def start_workflow(
        self,
        workflow: MethodAsyncNoParam[WorkflowReturnType] | str,
        arg: Any = temporalio.common._arg_unset,
        *,
        id: str,
        task_queue: str,
        execution_timeout: timedelta = None,
        run_timeout: timedelta = None,
        task_timeout: timedelta = None,
        id_reuse_policy: WorkflowIDReusePolicy = WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY,
        id_conflict_policy: WorkflowIDConflictPolicy = WorkflowIDConflictPolicy.UNSPECIFIED,
        retry_policy: RetryPolicy = None,
        cron_schedule: str = None,
        memo: Mapping[str, Any] = None,
        search_attributes: SearchAttributes = None,
        start_signal: str = None,
        start_signal_args: Sequence[Any] = [],
        request_eager_start: bool = True,
    ) -> WorkflowHandle[WorkflowReturnType, WorkflowReturnType]: ...

class WorkflowHandle(Generic[SelfType, ReturnType]):
    @property
    def id(self) -> str: ...

    @property
    def result_run_id(self) -> str | None: ...

    async def result(self) -> ReturnType: ...

    async def cancel(self) -> None: ...

    async def terminate(self, reason: str = None) -> None: ...

    async def signal(
        self,
        signal: MethodSyncOrAsyncSingleParam[SelfType, Any] | str,
        arg: Any = temporalio.common._arg_unset,
    ) -> None: ...

    async def query(
        self,
        query: MethodSyncOrAsyncNoParam[SelfType, ReturnType] | str,
        arg: Any = temporalio.common._arg_unset,
    ) -> ReturnType: ...

    async def start_update(
        self,
        update: MethodSyncOrAsyncSingleParam[SelfType, UpdateReturnType] | str,
        arg: Any = temporalio.common._arg_unset,
        *,
        id: str = None,
        wait_for_stage: WorkflowUpdateStage = WorkflowUpdateStage.COMPLETED,
    ) -> WorkflowUpdateHandle[UpdateReturnType]: ...

Client Operations

Workflow Development

Define workflows with signals, queries, updates, and child workflow management. Workflows contain the core business logic and coordinate activities and other workflows.

def defn(
    cls: Type[MultiParamSpec],
    *,
    name: str = None,
    dynamic: bool = False,
    failure_exception_types: Sequence[type[BaseException]] = [],
    versioning_intent: VersioningIntent = VersioningIntent.COMPATIBLE,
) -> Type[MultiParamSpec]: ...

def run(fn: MethodAsyncNoParam[SelfType, ReturnType]) -> MethodAsyncNoParam[SelfType, ReturnType]: ...

def signal(fn: MethodSyncOrAsyncSingleParam[SelfType, None] | None = None, *, name: str = None, dynamic: bool = False) -> Any: ...

def query(fn: MethodSyncOrAsyncNoParam[SelfType, ReturnType] | None = None, *, name: str = None, dynamic: bool = False) -> Any: ...

def update(fn: MethodSyncOrAsyncSingleParam[SelfType, ReturnType] | None = None, *, name: str = None, dynamic: bool = False) -> Any: ...

async def execute_activity(
    activity: MethodSyncOrAsyncSingleParam[Any, ActivityReturnType] | str,
    arg: Any = temporalio.common._arg_unset,
    *,
    activity_id: str = None,
    task_queue: str = None,
    schedule_to_close_timeout: timedelta = None,
    schedule_to_start_timeout: timedelta = None,
    start_to_close_timeout: timedelta = None,
    heartbeat_timeout: timedelta = None,
    retry_policy: RetryPolicy = None,
    cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
    versioning_intent: VersioningIntent = VersioningIntent.UNSPECIFIED,
) -> ActivityReturnType: ...

async def execute_child_workflow(
    workflow: MethodAsyncNoParam[Any, ChildWorkflowReturnType] | str,
    arg: Any = temporalio.common._arg_unset,
    *,
    id: str = None,
    task_queue: str = None,
    execution_timeout: timedelta = None,
    run_timeout: timedelta = None,
    task_timeout: timedelta = None,
    id_reuse_policy: WorkflowIDReusePolicy = WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY,
    retry_policy: RetryPolicy = None,
    cron_schedule: str = None,
    parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
    cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
    versioning_intent: VersioningIntent = VersioningIntent.COMPATIBLE,
) -> ChildWorkflowReturnType: ...

def info() -> Info: ...

async def sleep(seconds: float) -> None: ...

def now() -> datetime: ...

def uuid4() -> str: ...

class Info:
    attempt: int
    continued_run_id: str | None
    cron_schedule: str | None
    execution_timeout: timedelta | None
    namespace: str
    parent: ParentInfo | None
    retry_policy: RetryPolicy | None
    root: RootInfo | None
    run_id: str
    run_timeout: timedelta | None
    search_attributes: SearchAttributes
    start_time: datetime
    task_queue: str
    task_timeout: timedelta | None
    workflow_id: str
    workflow_type: str

Workflow Development

Activity Development

Create activities with context access, heartbeating, cancellation handling, and different execution models (async, threaded, multiprocess).

def defn(
    fn: CallableAsyncSingleParam[ActivityParam, ActivityReturnType] | None = None,
    *,
    name: str = None,
    dynamic: bool = False,
) -> Any: ...

def info() -> Info: ...

def in_activity() -> bool: ...

async def heartbeat(*details: Any) -> None: ...

def is_cancelled() -> bool: ...

def is_worker_shutdown() -> bool: ...

def cancellation_details() -> ActivityCancellationDetails | None: ...

async def wait_for_cancelled() -> None: ...

def wait_for_cancelled_sync(timeout: timedelta = None) -> bool: ...

def wait_for_worker_shutdown_sync(timeout: timedelta = None) -> bool: ...

def raise_complete_async() -> NoReturn: ...

class Info:
    activity_id: str
    activity_type: str
    attempt: int
    current_attempt_scheduled_time: datetime
    heartbeat_details: Sequence[Any]
    heartbeat_timeout: timedelta | None
    is_local: bool
    local_retry_threshold: timedelta | None
    schedule_to_close_timeout: timedelta | None
    schedule_to_start_timeout: timedelta | None
    scheduled_time: datetime
    start_to_close_timeout: timedelta | None
    started_time: datetime
    task_queue: str
    task_token: bytes
    workflow_id: str
    workflow_namespace: str
    workflow_run_id: str
    workflow_type: str

class ActivityCancellationDetails:
    reason: str
    details: Sequence[Any]

Activity Development

Worker Management

Configure and run workers that execute workflows and activities, including thread pool management, task queue polling, and interceptor configuration.

class Worker:
    def __init__(
        self,
        client: Client,
        *,
        task_queue: str,
        activities: Sequence[Callable] = [],
        workflows: Sequence[type] = [],
        interceptors: Sequence[Interceptor] = [],
        activity_executor: Executor = None,
        workflow_task_executor: Executor = None,
        workflow_runner: WorkflowRunner = None,
        shared_state_manager: SharedStateManager = None,
        debug_mode: bool = False,
        on_fatal_error: Callable[[Exception], None] = None,
        max_cached_workflows: int = 1000,
        max_concurrent_workflow_task_polls: int = 100,
        nonsticky_to_sticky_poll_ratio: float = 0.2,
        max_concurrent_activity_task_polls: int = 100,
        no_remote_activities: bool = False,
        sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10),
        max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60),
        default_heartbeat_throttle_interval: timedelta = timedelta(seconds=1),
        max_activities_per_second: float = None,
        max_task_queue_activities_per_second: float = None,
        graceful_shutdown_timeout: timedelta = timedelta(0),
        use_worker_versioning: bool = True,
        build_id: str = None,
        identity: str = None,
        tuner: WorkerTuner = None,
        workflow_failure_exception_types: Sequence[type[BaseException]] = [],
    ): ...

    async def run(self) -> None: ...

    async def shutdown(self) -> None: ...

class WorkerConfig:
    client: Client
    task_queue: str
    activities: Sequence[Callable]
    workflows: Sequence[type]
    interceptors: Sequence[Interceptor]
    activity_executor: Executor | None
    workflow_task_executor: Executor | None
    debug_mode: bool
    max_cached_workflows: int
    max_concurrent_workflow_task_polls: int
    max_concurrent_activity_task_polls: int
    graceful_shutdown_timeout: timedelta

Worker Management

Data Conversion

Handle serialization and deserialization of workflow and activity data, including custom payload converters, failure converters, and search attribute encoding.

def default() -> DataConverter: ...

class DataConverter:
    def __init__(
        self,
        *,
        payload_converter: PayloadConverter = None,
        failure_converter: FailureConverter = None,
        payload_codec: PayloadCodec = None,
    ): ...

    async def encode(
        self,
        values: Sequence[Any]
    ) -> temporalio.api.common.v1.Payloads: ...

    async def decode(
        self,
        payloads: temporalio.api.common.v1.Payloads,
        type_hints: Sequence[type] = None
    ) -> list[Any]: ...

class PayloadConverter(ABC):
    @abstractmethod
    def to_payloads(self, values: Sequence[Any]) -> Sequence[temporalio.api.common.v1.Payload] | None: ...

    @abstractmethod
    def from_payloads(
        self,
        payloads: Sequence[temporalio.api.common.v1.Payload],
        type_hints: Sequence[type] = None
    ) -> list[Any]: ...

class DefaultPayloadConverter(PayloadConverter):
    def __init__(
        self,
        *,
        encoding_payload_converters: Sequence[EncodingPayloadConverter] = None,
    ): ...

class JSONPlainPayloadConverter(EncodingPayloadConverter):
    def __init__(
        self,
        *,
        encoder: type[json.JSONEncoder] = AdvancedJSONEncoder,
        decoder: Callable[[str], Any] = json.loads,
        encoding: str = "utf-8",
    ): ...

def encode_search_attributes(search_attributes: SearchAttributes) -> temporalio.api.common.v1.SearchAttributes: ...

def decode_search_attributes(proto: temporalio.api.common.v1.SearchAttributes) -> SearchAttributes: ...

Data Conversion

Testing

Comprehensive testing utilities including workflow environments, activity environments, time skipping, and mocking capabilities.

class WorkflowEnvironment:
    @classmethod
    async def start_time_skipping(
        cls,
        *,
        client: Client = None,
        download_dest_dir: Path | str | None = Path("."),
        dev_server_exe: str | None = None,
        dev_server_existing_path: str | None = None,
        dev_server_extra_args: Sequence[str] = [],
        namespace: str = "default",
        data_converter: DataConverter = None,
        interceptors: Sequence[Interceptor] = [],
        tls: TLSConfig | bool = False,
        port: int | None = None,
        ui: bool = True,
        log_level: str = "warn",
        log_format: str = "pretty",
        database_filename: str | None = None,
        sqlite_pragma: Mapping[str, str] = {},
    ) -> WorkflowEnvironment: ...

    async def shutdown(self) -> None: ...

    @property
    def client(self) -> Client: ...

    async def sleep(self, duration: timedelta) -> None: ...

class ActivityEnvironment:
    def __init__(
        self,
        *,
        info: activity.Info,
        on_heartbeat: Callable[..., None] = None,
        on_complete_async: Callable[[], None] = None,
        cancelled: bool = False,
        worker_shutdown: bool = False,
    ): ...

    @contextmanager
    def as_current(self) -> Iterator[None]: ...

async def new_random_task_queue() -> str: ...

Testing

Common Utilities

Core utilities, configuration objects, and types used throughout the SDK including retry policies, search attributes, metrics, and workflow configuration.

@dataclass
class RetryPolicy:
    initial_interval: timedelta = timedelta(seconds=1)
    backoff_coefficient: float = 2.0
    maximum_interval: Optional[timedelta] = None
    maximum_attempts: int = 0
    non_retryable_error_types: Optional[Sequence[str]] = None

class WorkflowIDReusePolicy(IntEnum):
    ALLOW_DUPLICATE = 0
    ALLOW_DUPLICATE_FAILED_ONLY = 1
    REJECT_DUPLICATE = 2
    TERMINATE_IF_RUNNING = 3

class SearchAttributeKey(ABC, Generic[SearchAttributeValueType]):
    @staticmethod
    def for_text(name: str) -> SearchAttributeKey[str]: ...

    @staticmethod
    def for_keyword(name: str) -> SearchAttributeKey[str]: ...

    @staticmethod
    def for_int(name: str) -> SearchAttributeKey[int]: ...

class MetricMeter(ABC):
    @abstractmethod
    def create_counter(
        self, name: str, description: Optional[str] = None, unit: Optional[str] = None
    ) -> MetricCounter: ...

Common Utilities

Exception Handling

Comprehensive exception hierarchy for handling errors in workflows, activities, and client operations with detailed error information and utility functions.

class TemporalError(Exception):
    @property
    def cause(self) -> Optional[BaseException]: ...

class ApplicationError(FailureError):
    def __init__(
        self,
        message: str,
        *details: Any,
        type: Optional[str] = None,
        non_retryable: bool = False,
        next_retry_delay: Optional[timedelta] = None,
        category: ApplicationErrorCategory = ApplicationErrorCategory.UNSPECIFIED,
    ): ...

class ActivityError(FailureError):
    @property
    def activity_type(self) -> str: ...

    @property
    def retry_state(self) -> Optional[RetryState]: ...

def is_cancelled_exception(exception: BaseException) -> bool: ...

Exception Handling

Runtime Configuration

Runtime and telemetry configuration for managing SDK resources, logging, metrics collection, and observability features.

class Runtime:
    @staticmethod
    def default() -> Runtime: ...

    @staticmethod
    def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None: ...

    def __init__(self, *, telemetry: TelemetryConfig) -> None: ...

@dataclass(frozen=True)
class TelemetryConfig:
    logging: Optional[LoggingConfig] = LoggingConfig.default
    metrics: Optional[Union[OpenTelemetryConfig, PrometheusConfig, MetricBuffer]] = None
    global_tags: Mapping[str, str] = field(default_factory=dict)

@dataclass(frozen=True)
class PrometheusConfig:
    bind_address: str
    counters_total_suffix: bool = False
    durations_as_seconds: bool = False

Runtime Configuration

Pydantic Integration

Pydantic v2 data converter for automatic serialization and validation of Pydantic models with type safety and custom JSON options.

pydantic_data_converter = DataConverter(
    payload_converter_class=PydanticPayloadConverter
)

class PydanticJSONPlainPayloadConverter(EncodingPayloadConverter):
    def __init__(self, to_json_options: Optional[ToJsonOptions] = None): ...

@dataclass
class ToJsonOptions:
    exclude_unset: bool = False

Pydantic Integration

Types

from typing import TypeVar, Generic, Protocol, Callable, Any, Sequence, Mapping, Iterator
from datetime import datetime, timedelta
from enum import IntEnum, Enum

# Generic type variables
WorkflowReturnType = TypeVar("WorkflowReturnType")
ActivityReturnType = TypeVar("ActivityReturnType")
SelfType = TypeVar("SelfType")
ReturnType = TypeVar("ReturnType")
MultiParamSpec = TypeVar("MultiParamSpec")

# Protocol types for callables
class MethodAsyncNoParam(Protocol[SelfType, ReturnType]):
    async def __call__(self: SelfType) -> ReturnType: ...

class MethodSyncOrAsyncSingleParam(Protocol[SelfType, ReturnType]):
    def __call__(self: SelfType, arg: Any) -> ReturnType: ...

class CallableAsyncSingleParam(Protocol[ActivityParam, ActivityReturnType]):
    async def __call__(self, arg: ActivityParam) -> ActivityReturnType: ...

# Configuration types
class RetryPolicy:
    initial_interval: timedelta
    backoff_coefficient: float
    maximum_interval: timedelta
    maximum_attempts: int
    non_retryable_error_types: Sequence[str]

class TLSConfig:
    server_root_ca_cert: bytes | None
    domain: str | None
    client_cert: bytes | None
    client_private_key: bytes | None

# Enum types
class WorkflowIDReusePolicy(IntEnum):
    ALLOW_DUPLICATE = 0
    ALLOW_DUPLICATE_FAILED_ONLY = 1
    REJECT_DUPLICATE = 2
    TERMINATE_IF_RUNNING = 3

class WorkflowIDConflictPolicy(IntEnum):
    UNSPECIFIED = 0
    FAIL = 1
    USE_EXISTING = 2

class ActivityCancellationType(IntEnum):
    TRY_CANCEL = 0
    WAIT_CANCELLATION_COMPLETED = 1
    ABANDON = 2

class ChildWorkflowCancellationType(IntEnum):
    WAIT_CANCELLATION_COMPLETED = 0
    REQUEST_CANCEL = 1
    ABANDON = 2

class ParentClosePolicy(IntEnum):
    TERMINATE = 0
    ABANDON = 1
    REQUEST_CANCEL = 2

class VersioningIntent(Enum):
    UNSPECIFIED = "Unspecified"
    COMPATIBLE = "Compatible"
    DEFAULT = "Default"

class WorkflowUpdateStage(IntEnum):
    ADMITTED = 0
    ACCEPTED = 1
    COMPLETED = 2

# Search attributes
SearchAttributes = Mapping[SearchAttributeKey[Any], Any]
SearchAttributeKey = type[SearchAttributeKey[Any]]

# Common aliases
Executor = concurrent.futures.Executor
Interceptor = Any  # Base type for interceptor implementations