CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-temporalio

Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Temporalio

A comprehensive Python SDK for building distributed, scalable, durable, and highly available workflows and activities using the Temporal orchestration engine. The SDK transforms async Python functions into distributed workflows backed by a fault-tolerant event loop, providing seamless integration with asyncio concepts while maintaining durability and reliability in distributed environments.

Package Information

  • Package Name: temporalio
  • Language: Python
  • Installation: pip install temporalio

Core Imports

from temporalio.client import Client
from temporalio.worker import Worker
from temporalio import workflow, activity

For advanced usage:

import temporalio
from temporalio import common, converter, exceptions, runtime, testing
from temporalio.common import RetryPolicy, SearchAttributeKey, MetricMeter
from temporalio.exceptions import ApplicationError, TemporalError, is_cancelled_exception
from temporalio.runtime import Runtime, TelemetryConfig

For integrations:

# Pydantic integration
from temporalio.contrib.pydantic import pydantic_data_converter

Basic Usage

from datetime import timedelta
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker
import asyncio
import concurrent.futures

# Define an activity
@activity.defn
async def say_hello(name: str) -> str:
    return f"Hello, {name}!"

# Define a workflow
@workflow.defn
class SayHello:
    @workflow.run
    async def run(self, name: str) -> str:
        return await workflow.execute_activity(
            say_hello,
            name,
            schedule_to_close_timeout=timedelta(seconds=5)
        )

# Create and run worker
async def run_worker():
    client = await Client.connect("localhost:7233")

    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
        worker = Worker(
            client,
            task_queue="my-task-queue",
            workflows=[SayHello],
            activities=[say_hello],
            activity_executor=executor,
        )
        await worker.run()

# Execute workflow
async def run_workflow():
    client = await Client.connect("localhost:7233")

    result = await client.execute_workflow(
        SayHello.run,
        "World",
        id="my-workflow-id",
        task_queue="my-task-queue"
    )
    print(f"Result: {result}")

if __name__ == "__main__":
    # Run worker in production or run_workflow() for testing
    asyncio.run(run_worker())

Architecture

Temporal's architecture centers around several key concepts:

  • Workflows: Long-running, fault-tolerant business logic that coordinates activities and maintains state
  • Activities: Individual units of work that can fail and be retried, typically I/O operations or external service calls
  • Workers: Processes that execute workflows and activities by polling task queues
  • Client: Interface for starting workflows, sending signals, and querying workflow state
  • Task Queues: Named queues that route work between clients and workers

The Python SDK implements workflows using a custom asyncio event loop that provides distributed fault tolerance, deterministic execution, and seamless integration with Python's async/await patterns.

Capabilities

Client Operations

Connect to Temporal server, start workflows, manage executions, and handle schedules. The client provides the primary interface for interacting with the Temporal server.

class Client:
    @classmethod
    async def connect(
        cls,
        target_host: str = "localhost:7233",
        *,
        namespace: str = "default",
        data_converter: DataConverter = None,
        interceptors: Sequence[Interceptor] = [],
        tls: TLSConfig | bool = False,
        retry_config: RetryConfig = None,
        keep_alive_config: KeepAliveConfig = None,
        rpc_metadata: Mapping[str, str] = {},
        identity: str = None,
        lazy: bool = False,
        runtime: Runtime = None,
        http_connect_proxy_config: HttpConnectProxyConfig = None,
    ) -> Client: ...

    async def execute_workflow(
        self,
        workflow: MethodAsyncNoParam[WorkflowReturnType] | str,
        arg: Any = temporalio.common._arg_unset,
        *,
        id: str,
        task_queue: str,
        execution_timeout: timedelta = None,
        run_timeout: timedelta = None,
        task_timeout: timedelta = None,
        id_reuse_policy: WorkflowIDReusePolicy = WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY,
        id_conflict_policy: WorkflowIDConflictPolicy = WorkflowIDConflictPolicy.UNSPECIFIED,
        retry_policy: RetryPolicy = None,
        cron_schedule: str = None,
        memo: Mapping[str, Any] = None,
        search_attributes: SearchAttributes = None,
        start_signal: str = None,
        start_signal_args: Sequence[Any] = [],
        request_eager_start: bool = True,
    ) -> WorkflowReturnType: ...

    async def start_workflow(
        self,
        workflow: MethodAsyncNoParam[WorkflowReturnType] | str,
        arg: Any = temporalio.common._arg_unset,
        *,
        id: str,
        task_queue: str,
        execution_timeout: timedelta = None,
        run_timeout: timedelta = None,
        task_timeout: timedelta = None,
        id_reuse_policy: WorkflowIDReusePolicy = WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY,
        id_conflict_policy: WorkflowIDConflictPolicy = WorkflowIDConflictPolicy.UNSPECIFIED,
        retry_policy: RetryPolicy = None,
        cron_schedule: str = None,
        memo: Mapping[str, Any] = None,
        search_attributes: SearchAttributes = None,
        start_signal: str = None,
        start_signal_args: Sequence[Any] = [],
        request_eager_start: bool = True,
    ) -> WorkflowHandle[WorkflowReturnType, WorkflowReturnType]: ...

class WorkflowHandle(Generic[SelfType, ReturnType]):
    @property
    def id(self) -> str: ...

    @property
    def result_run_id(self) -> str | None: ...

    async def result(self) -> ReturnType: ...

    async def cancel(self) -> None: ...

    async def terminate(self, reason: str = None) -> None: ...

    async def signal(
        self,
        signal: MethodSyncOrAsyncSingleParam[SelfType, Any] | str,
        arg: Any = temporalio.common._arg_unset,
    ) -> None: ...

    async def query(
        self,
        query: MethodSyncOrAsyncNoParam[SelfType, ReturnType] | str,
        arg: Any = temporalio.common._arg_unset,
    ) -> ReturnType: ...

    async def start_update(
        self,
        update: MethodSyncOrAsyncSingleParam[SelfType, UpdateReturnType] | str,
        arg: Any = temporalio.common._arg_unset,
        *,
        id: str = None,
        wait_for_stage: WorkflowUpdateStage = WorkflowUpdateStage.COMPLETED,
    ) -> WorkflowUpdateHandle[UpdateReturnType]: ...

Client Operations

Workflow Development

Define workflows with signals, queries, updates, and child workflow management. Workflows contain the core business logic and coordinate activities and other workflows.

def defn(
    cls: Type[MultiParamSpec],
    *,
    name: str = None,
    dynamic: bool = False,
    failure_exception_types: Sequence[type[BaseException]] = [],
    versioning_intent: VersioningIntent = VersioningIntent.COMPATIBLE,
) -> Type[MultiParamSpec]: ...

def run(fn: MethodAsyncNoParam[SelfType, ReturnType]) -> MethodAsyncNoParam[SelfType, ReturnType]: ...

def signal(fn: MethodSyncOrAsyncSingleParam[SelfType, None] | None = None, *, name: str = None, dynamic: bool = False) -> Any: ...

def query(fn: MethodSyncOrAsyncNoParam[SelfType, ReturnType] | None = None, *, name: str = None, dynamic: bool = False) -> Any: ...

def update(fn: MethodSyncOrAsyncSingleParam[SelfType, ReturnType] | None = None, *, name: str = None, dynamic: bool = False) -> Any: ...

async def execute_activity(
    activity: MethodSyncOrAsyncSingleParam[Any, ActivityReturnType] | str,
    arg: Any = temporalio.common._arg_unset,
    *,
    activity_id: str = None,
    task_queue: str = None,
    schedule_to_close_timeout: timedelta = None,
    schedule_to_start_timeout: timedelta = None,
    start_to_close_timeout: timedelta = None,
    heartbeat_timeout: timedelta = None,
    retry_policy: RetryPolicy = None,
    cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
    versioning_intent: VersioningIntent = VersioningIntent.UNSPECIFIED,
) -> ActivityReturnType: ...

async def execute_child_workflow(
    workflow: MethodAsyncNoParam[Any, ChildWorkflowReturnType] | str,
    arg: Any = temporalio.common._arg_unset,
    *,
    id: str = None,
    task_queue: str = None,
    execution_timeout: timedelta = None,
    run_timeout: timedelta = None,
    task_timeout: timedelta = None,
    id_reuse_policy: WorkflowIDReusePolicy = WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY,
    retry_policy: RetryPolicy = None,
    cron_schedule: str = None,
    parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
    cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
    versioning_intent: VersioningIntent = VersioningIntent.COMPATIBLE,
) -> ChildWorkflowReturnType: ...

def info() -> Info: ...

async def sleep(seconds: float) -> None: ...

def now() -> datetime: ...

def uuid4() -> str: ...

class Info:
    attempt: int
    continued_run_id: str | None
    cron_schedule: str | None
    execution_timeout: timedelta | None
    namespace: str
    parent: ParentInfo | None
    retry_policy: RetryPolicy | None
    root: RootInfo | None
    run_id: str
    run_timeout: timedelta | None
    search_attributes: SearchAttributes
    start_time: datetime
    task_queue: str
    task_timeout: timedelta | None
    workflow_id: str
    workflow_type: str

Workflow Development

Activity Development

Create activities with context access, heartbeating, cancellation handling, and different execution models (async, threaded, multiprocess).

def defn(
    fn: CallableAsyncSingleParam[ActivityParam, ActivityReturnType] | None = None,
    *,
    name: str = None,
    dynamic: bool = False,
) -> Any: ...

def info() -> Info: ...

def in_activity() -> bool: ...

async def heartbeat(*details: Any) -> None: ...

def is_cancelled() -> bool: ...

def is_worker_shutdown() -> bool: ...

def cancellation_details() -> ActivityCancellationDetails | None: ...

async def wait_for_cancelled() -> None: ...

def wait_for_cancelled_sync(timeout: timedelta = None) -> bool: ...

def wait_for_worker_shutdown_sync(timeout: timedelta = None) -> bool: ...

def raise_complete_async() -> NoReturn: ...

class Info:
    activity_id: str
    activity_type: str
    attempt: int
    current_attempt_scheduled_time: datetime
    heartbeat_details: Sequence[Any]
    heartbeat_timeout: timedelta | None
    is_local: bool
    local_retry_threshold: timedelta | None
    schedule_to_close_timeout: timedelta | None
    schedule_to_start_timeout: timedelta | None
    scheduled_time: datetime
    start_to_close_timeout: timedelta | None
    started_time: datetime
    task_queue: str
    task_token: bytes
    workflow_id: str
    workflow_namespace: str
    workflow_run_id: str
    workflow_type: str

class ActivityCancellationDetails:
    reason: str
    details: Sequence[Any]

Activity Development

Worker Management

Configure and run workers that execute workflows and activities, including thread pool management, task queue polling, and interceptor configuration.

class Worker:
    def __init__(
        self,
        client: Client,
        *,
        task_queue: str,
        activities: Sequence[Callable] = [],
        workflows: Sequence[type] = [],
        interceptors: Sequence[Interceptor] = [],
        activity_executor: Executor = None,
        workflow_task_executor: Executor = None,
        workflow_runner: WorkflowRunner = None,
        shared_state_manager: SharedStateManager = None,
        debug_mode: bool = False,
        on_fatal_error: Callable[[Exception], None] = None,
        max_cached_workflows: int = 1000,
        max_concurrent_workflow_task_polls: int = 100,
        nonsticky_to_sticky_poll_ratio: float = 0.2,
        max_concurrent_activity_task_polls: int = 100,
        no_remote_activities: bool = False,
        sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10),
        max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60),
        default_heartbeat_throttle_interval: timedelta = timedelta(seconds=1),
        max_activities_per_second: float = None,
        max_task_queue_activities_per_second: float = None,
        graceful_shutdown_timeout: timedelta = timedelta(0),
        use_worker_versioning: bool = True,
        build_id: str = None,
        identity: str = None,
        tuner: WorkerTuner = None,
        workflow_failure_exception_types: Sequence[type[BaseException]] = [],
    ): ...

    async def run(self) -> None: ...

    async def shutdown(self) -> None: ...

class WorkerConfig:
    client: Client
    task_queue: str
    activities: Sequence[Callable]
    workflows: Sequence[type]
    interceptors: Sequence[Interceptor]
    activity_executor: Executor | None
    workflow_task_executor: Executor | None
    debug_mode: bool
    max_cached_workflows: int
    max_concurrent_workflow_task_polls: int
    max_concurrent_activity_task_polls: int
    graceful_shutdown_timeout: timedelta

Worker Management

Data Conversion

Handle serialization and deserialization of workflow and activity data, including custom payload converters, failure converters, and search attribute encoding.

def default() -> DataConverter: ...

class DataConverter:
    def __init__(
        self,
        *,
        payload_converter: PayloadConverter = None,
        failure_converter: FailureConverter = None,
        payload_codec: PayloadCodec = None,
    ): ...

    async def encode(
        self,
        values: Sequence[Any]
    ) -> temporalio.api.common.v1.Payloads: ...

    async def decode(
        self,
        payloads: temporalio.api.common.v1.Payloads,
        type_hints: Sequence[type] = None
    ) -> list[Any]: ...

class PayloadConverter(ABC):
    @abstractmethod
    def to_payloads(self, values: Sequence[Any]) -> Sequence[temporalio.api.common.v1.Payload] | None: ...

    @abstractmethod
    def from_payloads(
        self,
        payloads: Sequence[temporalio.api.common.v1.Payload],
        type_hints: Sequence[type] = None
    ) -> list[Any]: ...

class DefaultPayloadConverter(PayloadConverter):
    def __init__(
        self,
        *,
        encoding_payload_converters: Sequence[EncodingPayloadConverter] = None,
    ): ...

class JSONPlainPayloadConverter(EncodingPayloadConverter):
    def __init__(
        self,
        *,
        encoder: type[json.JSONEncoder] = AdvancedJSONEncoder,
        decoder: Callable[[str], Any] = json.loads,
        encoding: str = "utf-8",
    ): ...

def encode_search_attributes(search_attributes: SearchAttributes) -> temporalio.api.common.v1.SearchAttributes: ...

def decode_search_attributes(proto: temporalio.api.common.v1.SearchAttributes) -> SearchAttributes: ...

Data Conversion

Testing

Comprehensive testing utilities including workflow environments, activity environments, time skipping, and mocking capabilities.

class WorkflowEnvironment:
    @classmethod
    async def start_time_skipping(
        cls,
        *,
        client: Client = None,
        download_dest_dir: Path | str | None = Path("."),
        dev_server_exe: str | None = None,
        dev_server_existing_path: str | None = None,
        dev_server_extra_args: Sequence[str] = [],
        namespace: str = "default",
        data_converter: DataConverter = None,
        interceptors: Sequence[Interceptor] = [],
        tls: TLSConfig | bool = False,
        port: int | None = None,
        ui: bool = True,
        log_level: str = "warn",
        log_format: str = "pretty",
        database_filename: str | None = None,
        sqlite_pragma: Mapping[str, str] = {},
    ) -> WorkflowEnvironment: ...

    async def shutdown(self) -> None: ...

    @property
    def client(self) -> Client: ...

    async def sleep(self, duration: timedelta) -> None: ...

class ActivityEnvironment:
    def __init__(
        self,
        *,
        info: activity.Info,
        on_heartbeat: Callable[..., None] = None,
        on_complete_async: Callable[[], None] = None,
        cancelled: bool = False,
        worker_shutdown: bool = False,
    ): ...

    @contextmanager
    def as_current(self) -> Iterator[None]: ...

async def new_random_task_queue() -> str: ...

Testing

Common Utilities

Core utilities, configuration objects, and types used throughout the SDK including retry policies, search attributes, metrics, and workflow configuration.

@dataclass
class RetryPolicy:
    initial_interval: timedelta = timedelta(seconds=1)
    backoff_coefficient: float = 2.0
    maximum_interval: Optional[timedelta] = None
    maximum_attempts: int = 0
    non_retryable_error_types: Optional[Sequence[str]] = None

class WorkflowIDReusePolicy(IntEnum):
    ALLOW_DUPLICATE = 0
    ALLOW_DUPLICATE_FAILED_ONLY = 1
    REJECT_DUPLICATE = 2
    TERMINATE_IF_RUNNING = 3

class SearchAttributeKey(ABC, Generic[SearchAttributeValueType]):
    @staticmethod
    def for_text(name: str) -> SearchAttributeKey[str]: ...

    @staticmethod
    def for_keyword(name: str) -> SearchAttributeKey[str]: ...

    @staticmethod
    def for_int(name: str) -> SearchAttributeKey[int]: ...

class MetricMeter(ABC):
    @abstractmethod
    def create_counter(
        self, name: str, description: Optional[str] = None, unit: Optional[str] = None
    ) -> MetricCounter: ...

Common Utilities

Exception Handling

Comprehensive exception hierarchy for handling errors in workflows, activities, and client operations with detailed error information and utility functions.

class TemporalError(Exception):
    @property
    def cause(self) -> Optional[BaseException]: ...

class ApplicationError(FailureError):
    def __init__(
        self,
        message: str,
        *details: Any,
        type: Optional[str] = None,
        non_retryable: bool = False,
        next_retry_delay: Optional[timedelta] = None,
        category: ApplicationErrorCategory = ApplicationErrorCategory.UNSPECIFIED,
    ): ...

class ActivityError(FailureError):
    @property
    def activity_type(self) -> str: ...

    @property
    def retry_state(self) -> Optional[RetryState]: ...

def is_cancelled_exception(exception: BaseException) -> bool: ...

Exception Handling

Runtime Configuration

Runtime and telemetry configuration for managing SDK resources, logging, metrics collection, and observability features.

class Runtime:
    @staticmethod
    def default() -> Runtime: ...

    @staticmethod
    def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None: ...

    def __init__(self, *, telemetry: TelemetryConfig) -> None: ...

@dataclass(frozen=True)
class TelemetryConfig:
    logging: Optional[LoggingConfig] = LoggingConfig.default
    metrics: Optional[Union[OpenTelemetryConfig, PrometheusConfig, MetricBuffer]] = None
    global_tags: Mapping[str, str] = field(default_factory=dict)

@dataclass(frozen=True)
class PrometheusConfig:
    bind_address: str
    counters_total_suffix: bool = False
    durations_as_seconds: bool = False

Runtime Configuration

Pydantic Integration

Pydantic v2 data converter for automatic serialization and validation of Pydantic models with type safety and custom JSON options.

pydantic_data_converter = DataConverter(
    payload_converter_class=PydanticPayloadConverter
)

class PydanticJSONPlainPayloadConverter(EncodingPayloadConverter):
    def __init__(self, to_json_options: Optional[ToJsonOptions] = None): ...

@dataclass
class ToJsonOptions:
    exclude_unset: bool = False

Pydantic Integration

Types

from typing import TypeVar, Generic, Protocol, Callable, Any, Sequence, Mapping, Iterator
from datetime import datetime, timedelta
from enum import IntEnum, Enum

# Generic type variables
WorkflowReturnType = TypeVar("WorkflowReturnType")
ActivityReturnType = TypeVar("ActivityReturnType")
SelfType = TypeVar("SelfType")
ReturnType = TypeVar("ReturnType")
MultiParamSpec = TypeVar("MultiParamSpec")

# Protocol types for callables
class MethodAsyncNoParam(Protocol[SelfType, ReturnType]):
    async def __call__(self: SelfType) -> ReturnType: ...

class MethodSyncOrAsyncSingleParam(Protocol[SelfType, ReturnType]):
    def __call__(self: SelfType, arg: Any) -> ReturnType: ...

class CallableAsyncSingleParam(Protocol[ActivityParam, ActivityReturnType]):
    async def __call__(self, arg: ActivityParam) -> ActivityReturnType: ...

# Configuration types
class RetryPolicy:
    initial_interval: timedelta
    backoff_coefficient: float
    maximum_interval: timedelta
    maximum_attempts: int
    non_retryable_error_types: Sequence[str]

class TLSConfig:
    server_root_ca_cert: bytes | None
    domain: str | None
    client_cert: bytes | None
    client_private_key: bytes | None

# Enum types
class WorkflowIDReusePolicy(IntEnum):
    ALLOW_DUPLICATE = 0
    ALLOW_DUPLICATE_FAILED_ONLY = 1
    REJECT_DUPLICATE = 2
    TERMINATE_IF_RUNNING = 3

class WorkflowIDConflictPolicy(IntEnum):
    UNSPECIFIED = 0
    FAIL = 1
    USE_EXISTING = 2

class ActivityCancellationType(IntEnum):
    TRY_CANCEL = 0
    WAIT_CANCELLATION_COMPLETED = 1
    ABANDON = 2

class ChildWorkflowCancellationType(IntEnum):
    WAIT_CANCELLATION_COMPLETED = 0
    REQUEST_CANCEL = 1
    ABANDON = 2

class ParentClosePolicy(IntEnum):
    TERMINATE = 0
    ABANDON = 1
    REQUEST_CANCEL = 2

class VersioningIntent(Enum):
    UNSPECIFIED = "Unspecified"
    COMPATIBLE = "Compatible"
    DEFAULT = "Default"

class WorkflowUpdateStage(IntEnum):
    ADMITTED = 0
    ACCEPTED = 1
    COMPLETED = 2

# Search attributes
SearchAttributes = Mapping[SearchAttributeKey[Any], Any]
SearchAttributeKey = type[SearchAttributeKey[Any]]

# Common aliases
Executor = concurrent.futures.Executor
Interceptor = Any  # Base type for interceptor implementations
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/temporalio@1.17.x
Publish Source
CLI
Badge
tessl/pypi-temporalio badge