CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-temporalio

Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.

Overview
Eval results
Files

workflow.mddocs/

Workflow Development

The temporalio.workflow module provides core functionality for defining and executing workflows. This module contains decorators, context functions, activity execution methods, and utilities needed for workflow development. Workflows are the fundamental building blocks of Temporal applications, representing the orchestration logic for business processes.

Workflow Definition and Decorators

Workflow Definition

The @workflow.defn decorator is used to mark a class as a workflow definition.

@overload
def defn(cls: ClassType) -> ClassType: ...

@overload
def defn(
    *,
    name: Optional[str] = None,
    sandboxed: bool = True,
    failure_exception_types: Sequence[Type[BaseException]] = [],
    versioning_behavior: temporalio.common.VersioningBehavior = temporalio.common.VersioningBehavior.UNSPECIFIED,
) -> Callable[[ClassType], ClassType]: ...

@overload
def defn(
    *,
    sandboxed: bool = True,
    dynamic: bool = False,
    versioning_behavior: temporalio.common.VersioningBehavior = temporalio.common.VersioningBehavior.UNSPECIFIED,
) -> Callable[[ClassType], ClassType]: ...

def defn(
    cls: Optional[ClassType] = None,
    *,
    name: Optional[str] = None,
    sandboxed: bool = True,
    dynamic: bool = False,
    failure_exception_types: Sequence[Type[BaseException]] = [],
    versioning_behavior: temporalio.common.VersioningBehavior = temporalio.common.VersioningBehavior.UNSPECIFIED,
) -> Union[ClassType, Callable[[ClassType], ClassType]]

Parameters:

  • cls (Optional[ClassType]): The class to decorate
  • name (Optional[str]): Name to use for the workflow. Defaults to class __name__. Cannot be set if dynamic is set
  • sandboxed (bool): Whether the workflow should run in a sandbox. Default is True
  • dynamic (bool): If true, this workflow will be dynamic. Dynamic workflows have to accept a single 'Sequence[RawValue]' parameter. Cannot be set to true if name is present
  • failure_exception_types (Sequence[Type[BaseException]]): The types of exceptions that, if a workflow-thrown exception extends, will cause the workflow/update to fail instead of suspending the workflow via task failure
  • versioning_behavior (temporalio.common.VersioningBehavior): Specifies the versioning behavior to use for this workflow

Example:

import temporalio.workflow

@temporalio.workflow.defn
class MyWorkflow:
    @temporalio.workflow.run
    async def run(self, input: str) -> str:
        return f"Hello, {input}!"

# With custom name and settings
@temporalio.workflow.defn(name="CustomWorkflow", sandboxed=False)
class CustomWorkflow:
    @temporalio.workflow.run
    async def run(self) -> str:
        return "Custom workflow"

Workflow Initialization

def init(init_fn: CallableType) -> CallableType

Parameters:

  • init_fn (CallableType): The __init__ method to decorate

The @workflow.init decorator may be used on the __init__ method of the workflow class to specify that it accepts the same workflow input arguments as the @workflow.run method. If used, the parameters of your __init__ and @workflow.run methods must be identical.

Example:

@temporalio.workflow.defn
class MyWorkflow:
    @temporalio.workflow.init
    def __init__(self, input: str):
        self.input = input

    @temporalio.workflow.run
    async def run(self, input: str) -> str:
        return f"Hello, {self.input}!"

Run Method Decorator

def run(fn: CallableAsyncType) -> CallableAsyncType

Parameters:

  • fn (CallableAsyncType): The function to decorate

The @workflow.run decorator must be used on one and only one async method defined on the same class as @workflow.defn. This can be defined on a base class method but must then be explicitly overridden and defined on the workflow class.

Example:

@temporalio.workflow.defn
class MyWorkflow:
    @temporalio.workflow.run
    async def run(self, name: str) -> str:
        return await temporalio.workflow.start_activity(
            my_activity,
            name,
            schedule_to_close_timeout=timedelta(minutes=5)
        )

Signal Handler Decorator

@overload
def signal(
    fn: CallableSyncOrAsyncReturnNoneType,
) -> CallableSyncOrAsyncReturnNoneType: ...

@overload
def signal(
    *,
    unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
    description: Optional[str] = None,
) -> Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]: ...

@overload
def signal(
    *,
    name: str,
    unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
    description: Optional[str] = None,
) -> Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]: ...

@overload
def signal(
    *,
    dynamic: Literal[True],
    unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
    description: Optional[str] = None,
) -> Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]: ...

def signal(
    fn: Optional[CallableSyncOrAsyncReturnNoneType] = None,
    *,
    name: Optional[str] = None,
    dynamic: Optional[bool] = False,
    unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
    description: Optional[str] = None,
) -> Union[CallableSyncOrAsyncReturnNoneType, Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]]

Parameters:

  • fn (Optional[CallableSyncOrAsyncReturnNoneType]): The function to decorate
  • name (Optional[str]): Signal name. Defaults to method __name__. Cannot be present when dynamic is present
  • dynamic (Optional[bool]): If true, this handles all signals not otherwise handled. Cannot be present when name is present
  • unfinished_policy (HandlerUnfinishedPolicy): Actions taken if a workflow terminates with a running instance of this handler
  • description (Optional[str]): A short description of the signal that may appear in the UI/CLI

Example:

@temporalio.workflow.defn
class MyWorkflow:
    def __init__(self):
        self.messages = []

    @temporalio.workflow.signal
    async def add_message(self, message: str) -> None:
        self.messages.append(message)

    @temporalio.workflow.signal(name="custom_signal", description="Custom signal handler")
    def handle_custom_signal(self, data: dict) -> None:
        # Handle custom signal
        pass

Query Handler Decorator

@overload
def query(fn: CallableType) -> CallableType: ...

@overload
def query(
    *, name: str, description: Optional[str] = None
) -> Callable[[CallableType], CallableType]: ...

@overload
def query(
    *, dynamic: Literal[True], description: Optional[str] = None
) -> Callable[[CallableType], CallableType]: ...

@overload
def query(*, description: str) -> Callable[[CallableType], CallableType]: ...

def query(
    fn: Optional[CallableType] = None,
    *,
    name: Optional[str] = None,
    dynamic: Optional[bool] = False,
    description: Optional[str] = None,
) -> Union[CallableType, Callable[[CallableType], CallableType]]

Parameters:

  • fn (Optional[CallableType]): The function to decorate
  • name (Optional[str]): Query name. Defaults to method __name__. Cannot be present when dynamic is present
  • dynamic (Optional[bool]): If true, this handles all queries not otherwise handled. Cannot be present when name is present
  • description (Optional[str]): A short description of the query that may appear in the UI/CLI

Example:

@temporalio.workflow.defn
class MyWorkflow:
    def __init__(self):
        self.status = "running"

    @temporalio.workflow.query
    def get_status(self) -> str:
        return self.status

    @temporalio.workflow.query(name="message_count", description="Get number of messages")
    def get_message_count(self) -> int:
        return len(self.messages)

Update Handler Decorator

@overload
def update(
    fn: UpdateMethodMultiParam[MultiParamSpec, ReturnType]
) -> UpdateMethodMultiParam[MultiParamSpec, ReturnType]: ...

@overload
def update(
    *, name: str, description: Optional[str] = None
) -> Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]: ...

@overload
def update(
    *,
    unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
    description: Optional[str] = None,
) -> Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]: ...

@overload
def update(
    *,
    name: str,
    unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
    description: Optional[str] = None,
) -> Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]: ...

@overload
def update(
    *,
    dynamic: Literal[True],
    unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
    description: Optional[str] = None,
) -> Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]: ...

def update(
    fn: Optional[UpdateMethodMultiParam[MultiParamSpec, ReturnType]] = None,
    *,
    name: Optional[str] = None,
    dynamic: Optional[bool] = False,
    unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
    description: Optional[str] = None,
) -> Union[UpdateMethodMultiParam[MultiParamSpec, ReturnType], Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]]

Parameters:

  • fn (Optional[UpdateMethodMultiParam[MultiParamSpec, ReturnType]]): The function to decorate
  • name (Optional[str]): Update name. Defaults to method __name__. Cannot be present when dynamic is present
  • dynamic (Optional[bool]): If true, this handles all updates not otherwise handled. Cannot be present when name is present
  • unfinished_policy (HandlerUnfinishedPolicy): Actions taken if a workflow terminates with a running instance of this handler
  • description (Optional[str]): A short description of the update that may appear in the UI/CLI

Example:

@temporalio.workflow.defn
class MyWorkflow:
    def __init__(self):
        self.value = 0

    @temporalio.workflow.update
    def increment_value(self, amount: int) -> int:
        self.value += amount
        return self.value

    @temporalio.workflow.update(name="set_value", description="Set the workflow value")
    def set_workflow_value(self, new_value: int) -> int:
        self.value = new_value
        return self.value

Context Functions

Workflow Information

def info() -> Info

Returns:

  • Info: Current workflow execution information

Get current workflow info. This will error if not called in a workflow.

Example:

# Inside a workflow
workflow_info = temporalio.workflow.info()
print(f"Workflow ID: {workflow_info.workflow_id}")
print(f"Run ID: {workflow_info.run_id}")
def instance() -> Any

Returns:

  • Any: Current workflow instance

Get the current workflow instance. This is the same as the workflow instance that is currently executing.

def in_workflow() -> bool

Returns:

  • bool: Whether currently executing in a workflow context

Check whether currently in a workflow. This will return True if called from a workflow and False otherwise.

Time Functions

def now() -> datetime

Returns:

  • datetime: Current workflow time

Get current workflow time as a timezone-aware datetime in UTC. This is always the current time according to the workflow's view of time which may not be the same as system time.

def time() -> float

Returns:

  • float: Current workflow time as seconds since epoch

Get current workflow time as seconds since the Unix epoch.

def time_ns() -> int

Returns:

  • int: Current workflow time as nanoseconds since epoch

Get current workflow time as nanoseconds since the Unix epoch.

Example:

# Inside a workflow
current_time = temporalio.workflow.now()
timestamp = temporalio.workflow.time()
nano_timestamp = temporalio.workflow.time_ns()

print(f"Current workflow time: {current_time}")
print(f"Timestamp: {timestamp}")

Utility Functions

def uuid4() -> uuid.UUID

Returns:

  • uuid.UUID: Deterministic UUID4

Generate a deterministic UUID4 value. This UUID will be the same for the same workflow execution on replay.

def random() -> Random

Returns:

  • Random: Deterministic random number generator

Get a deterministic random number generator. This random generator will produce the same sequence of numbers for the same workflow execution on replay.

Example:

# Inside a workflow
workflow_uuid = temporalio.workflow.uuid4()
rng = temporalio.workflow.random()
random_number = rng.randint(1, 100)
def payload_converter() -> temporalio.converter.PayloadConverter

Returns:

  • temporalio.converter.PayloadConverter: Current payload converter

Get the payload converter for the current workflow.

def metric_meter() -> temporalio.common.MetricMeter

Returns:

  • temporalio.common.MetricMeter: Current metric meter

Get the metric meter for the current workflow.

Memo and Search Attribute Management

Memo Operations

def memo() -> Mapping[str, Any]

Returns:

  • Mapping[str, Any]: Current workflow memo

Get current workflow memo as a mapping of keys to values.

@overload
def memo_value(key: str, default: Any = temporalio.common._arg_unset) -> Any: ...

@overload
def memo_value(key: str, *, type_hint: Type[ParamType]) -> ParamType: ...

@overload
def memo_value(
    key: str, default: ParamType, *, type_hint: Type[ParamType]
) -> ParamType: ...

def memo_value(
    key: str,
    default: Any = temporalio.common._arg_unset,
    *,
    type_hint: Optional[Type] = None,
) -> Any

Parameters:

  • key (str): Memo key to get
  • default (Any): Default value if key not present
  • type_hint (Optional[Type]): Type hint for return value

Returns:

  • Any: Memo value for the given key

Get a specific memo value. If no default is provided and the key is not present, this will raise a KeyError.

def upsert_memo(updates: Mapping[str, Any]) -> None

Parameters:

  • updates (Mapping[str, Any]): Memo keys and values to upsert

Upsert memo values. This will update the workflow memo with the provided key-value pairs.

Example:

# Inside a workflow
current_memo = temporalio.workflow.memo()
user_id = temporalio.workflow.memo_value("user_id", default="unknown")

# Update memo
temporalio.workflow.upsert_memo({"status": "processing", "progress": 0.5})

Search Attribute Operations

def upsert_search_attributes(
    updates: Union[
        temporalio.common.TypedSearchAttributes,
        Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
    ]
) -> None

Parameters:

  • updates (Union[temporalio.common.TypedSearchAttributes, Mapping[Union[str, temporalio.common.SearchAttributeKey], Any]]): Search attributes to upsert

Upsert search attributes. This will update the workflow's search attributes with the provided values.

Example:

# Inside a workflow
import temporalio.common

# Update search attributes
temporalio.workflow.upsert_search_attributes({
    "status": "active",
    temporalio.common.SearchAttributeKey.for_keyword("department"): "engineering"
})

Activity Execution

Starting Activities

@overload
def start_activity(
    activity: CallableAsyncType,
    arg: AnyType,
    *,
    task_queue: Optional[str] = None,
    schedule_to_close_timeout: Optional[timedelta] = None,
    schedule_to_start_timeout: Optional[timedelta] = None,
    start_to_close_timeout: Optional[timedelta] = None,
    heartbeat_timeout: Optional[timedelta] = None,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
    versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
    local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[ReturnType]: ...

@overload
def start_activity(
    activity: CallableAsyncType,
    *,
    task_queue: Optional[str] = None,
    schedule_to_close_timeout: Optional[timedelta] = None,
    schedule_to_start_timeout: Optional[timedelta] = None,
    start_to_close_timeout: Optional[timedelta] = None,
    heartbeat_timeout: Optional[timedelta] = None,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
    versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
    local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[ReturnType]: ...

@overload
def start_activity(
    activity: str,
    arg: AnyType,
    *,
    task_queue: Optional[str] = None,
    schedule_to_close_timeout: Optional[timedelta] = None,
    schedule_to_start_timeout: Optional[timedelta] = None,
    start_to_close_timeout: Optional[timedelta] = None,
    heartbeat_timeout: Optional[timedelta] = None,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
    versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
    local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[Any]: ...

@overload
def start_activity(
    activity: str,
    *,
    task_queue: Optional[str] = None,
    schedule_to_close_timeout: Optional[timedelta] = None,
    schedule_to_start_timeout: Optional[timedelta] = None,
    start_to_close_timeout: Optional[timedelta] = None,
    heartbeat_timeout: Optional[timedelta] = None,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
    versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
    local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[Any]: ...

def start_activity(
    activity: Union[CallableType, str],
    arg: Any = temporalio.common._arg_unset,
    *,
    task_queue: Optional[str] = None,
    schedule_to_close_timeout: Optional[timedelta] = None,
    schedule_to_start_timeout: Optional[timedelta] = None,
    start_to_close_timeout: Optional[timedelta] = None,
    heartbeat_timeout: Optional[timedelta] = None,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
    versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
    local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[Any]

Parameters:

  • activity (Union[CallableType, str]): Activity function or string name
  • arg (Any): Single argument to pass to the activity
  • task_queue (Optional[str]): Task queue to run activity on. Defaults to current workflow's task queue
  • schedule_to_close_timeout (Optional[timedelta]): Total time allowed for activity from schedule to completion
  • schedule_to_start_timeout (Optional[timedelta]): Time allowed for activity to start from when scheduled
  • start_to_close_timeout (Optional[timedelta]): Time allowed for activity to complete from start
  • heartbeat_timeout (Optional[timedelta]): Maximum time between activity heartbeats
  • retry_policy (Optional[temporalio.common.RetryPolicy]): How the activity is retried on failure
  • cancellation_type (ActivityCancellationType): How to handle activity cancellation
  • versioning_intent (Optional[temporalio.common.VersioningIntent]): Worker versioning intent for this activity
  • local_retry_threshold (Optional[timedelta]): Threshold for local retries vs server retries

Returns:

  • ActivityHandle[Any]: Handle to the started activity

Start an activity and return a handle to it. The activity will not be awaited automatically.

Example:

# Inside a workflow
from datetime import timedelta
import temporalio.activity

@temporalio.activity.defn
async def my_activity(name: str) -> str:
    return f"Hello, {name}!"

# Start activity and await result
result = await temporalio.workflow.start_activity(
    my_activity,
    "World",
    schedule_to_close_timeout=timedelta(minutes=5)
)

# Start activity and get handle for later awaiting
handle = temporalio.workflow.start_activity(
    my_activity,
    "World",
    schedule_to_close_timeout=timedelta(minutes=5)
)
result = await handle

Starting Activities by Class

@overload
def start_activity_class(
    activity_cls: Type[ClassType],
    activity_method: str,
    arg: AnyType,
    *,
    task_queue: Optional[str] = None,
    schedule_to_close_timeout: Optional[timedelta] = None,
    schedule_to_start_timeout: Optional[timedelta] = None,
    start_to_close_timeout: Optional[timedelta] = None,
    heartbeat_timeout: Optional[timedelta] = None,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
    versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
    local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[Any]: ...

def start_activity_class(
    activity_cls: Type,
    activity_method: str,
    arg: Any = temporalio.common._arg_unset,
    *,
    task_queue: Optional[str] = None,
    schedule_to_close_timeout: Optional[timedelta] = None,
    schedule_to_start_timeout: Optional[timedelta] = None,
    start_to_close_timeout: Optional[timedelta] = None,
    heartbeat_timeout: Optional[timedelta] = None,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
    versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
    local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[Any]

Parameters:

  • activity_cls (Type): Activity class
  • activity_method (str): Method name on the activity class
  • arg (Any): Single argument to pass to the activity
  • Additional parameters same as start_activity

Returns:

  • ActivityHandle[Any]: Handle to the started activity

Start an activity method from a class and return a handle to it.

Starting Activity Methods

@overload
def start_activity_method(
    activity_method: MethodAsyncNoParam[SelfType, ReturnType],
    *,
    task_queue: Optional[str] = None,
    schedule_to_close_timeout: Optional[timedelta] = None,
    schedule_to_start_timeout: Optional[timedelta] = None,
    start_to_close_timeout: Optional[timedelta] = None,
    heartbeat_timeout: Optional[timedelta] = None,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
    versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
    local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[ReturnType]: ...

def start_activity_method(
    activity_method: Callable,
    arg: Any = temporalio.common._arg_unset,
    *,
    task_queue: Optional[str] = None,
    schedule_to_close_timeout: Optional[timedelta] = None,
    schedule_to_start_timeout: Optional[timedelta] = None,
    start_to_close_timeout: Optional[timedelta] = None,
    heartbeat_timeout: Optional[timedelta] = None,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
    versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
    local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[Any]

Parameters:

  • activity_method (Callable): Activity method to start
  • arg (Any): Single argument to pass to the activity
  • Additional parameters same as start_activity

Returns:

  • ActivityHandle[Any]: Handle to the started activity

Start an activity method and return a handle to it.

Starting Local Activities

@overload
def start_local_activity(
    activity: CallableAsyncType,
    arg: AnyType,
    *,
    schedule_to_close_timeout: Optional[timedelta] = None,
    schedule_to_start_timeout: Optional[timedelta] = None,
    start_to_close_timeout: Optional[timedelta] = None,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    local_retry_threshold: Optional[timedelta] = None,
    cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
) -> ActivityHandle[ReturnType]: ...

def start_local_activity(
    activity: Union[CallableType, str],
    arg: Any = temporalio.common._arg_unset,
    *,
    schedule_to_close_timeout: Optional[timedelta] = None,
    schedule_to_start_timeout: Optional[timedelta] = None,
    start_to_close_timeout: Optional[timedelta] = None,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    local_retry_threshold: Optional[timedelta] = None,
    cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
) -> ActivityHandle[Any]

Parameters:

  • activity (Union[CallableType, str]): Local activity function or string name
  • arg (Any): Single argument to pass to the activity
  • schedule_to_close_timeout (Optional[timedelta]): Total time allowed for activity from schedule to completion
  • schedule_to_start_timeout (Optional[timedelta]): Time allowed for activity to start from when scheduled
  • start_to_close_timeout (Optional[timedelta]): Time allowed for activity to complete from start
  • retry_policy (Optional[temporalio.common.RetryPolicy]): How the activity is retried on failure
  • local_retry_threshold (Optional[timedelta]): Threshold for local retries vs server retries
  • cancellation_type (ActivityCancellationType): How to handle activity cancellation

Returns:

  • ActivityHandle[Any]: Handle to the started local activity

Start a local activity and return a handle to it. Local activities are executed in the same process as the workflow worker.

Example:

# Inside a workflow
@temporalio.activity.defn
async def local_activity(data: str) -> str:
    return data.upper()

# Start local activity
result = await temporalio.workflow.start_local_activity(
    local_activity,
    "hello world",
    start_to_close_timeout=timedelta(seconds=30)
)

Child Workflows and External Workflows

Child Workflow Execution

@overload
def start_child_workflow(
    workflow: CallableAsyncType,
    arg: AnyType,
    *,
    id: str,
    task_queue: Optional[str] = None,
    execution_timeout: Optional[timedelta] = None,
    run_timeout: Optional[timedelta] = None,
    task_timeout: Optional[timedelta] = None,
    id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cron_schedule: str = "",
    memo: Optional[Mapping[str, Any]] = None,
    search_attributes: Optional[
        Union[
            temporalio.common.TypedSearchAttributes,
            Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
        ]
    ] = None,
    cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
    parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
    versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
) -> ChildWorkflowHandle[ReturnType]: ...

def start_child_workflow(
    workflow: Union[CallableType, str],
    arg: Any = temporalio.common._arg_unset,
    *,
    id: str,
    task_queue: Optional[str] = None,
    execution_timeout: Optional[timedelta] = None,
    run_timeout: Optional[timedelta] = None,
    task_timeout: Optional[timedelta] = None,
    id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cron_schedule: str = "",
    memo: Optional[Mapping[str, Any]] = None,
    search_attributes: Optional[
        Union[
            temporalio.common.TypedSearchAttributes,
            Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
        ]
    ] = None,
    cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
    parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
    versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
) -> ChildWorkflowHandle[Any]

Parameters:

  • workflow (Union[CallableType, str]): Child workflow function or string name
  • arg (Any): Single argument to pass to the child workflow
  • id (str): ID for the child workflow
  • task_queue (Optional[str]): Task queue to run child workflow on
  • execution_timeout (Optional[timedelta]): Maximum time for workflow execution
  • run_timeout (Optional[timedelta]): Maximum time for workflow run
  • task_timeout (Optional[timedelta]): Maximum time for workflow tasks
  • id_reuse_policy (temporalio.common.WorkflowIDReusePolicy): Policy for workflow ID reuse
  • retry_policy (Optional[temporalio.common.RetryPolicy]): How the workflow is retried on failure
  • cron_schedule (str): Cron schedule for the workflow
  • memo (Optional[Mapping[str, Any]]): Memo to attach to workflow
  • search_attributes (Optional[Union[temporalio.common.TypedSearchAttributes, Mapping[Union[str, temporalio.common.SearchAttributeKey], Any]]]): Search attributes for the workflow
  • cancellation_type (ChildWorkflowCancellationType): How to handle child workflow cancellation
  • parent_close_policy (ParentClosePolicy): What to do with child when parent closes
  • versioning_intent (Optional[temporalio.common.VersioningIntent]): Worker versioning intent

Returns:

  • ChildWorkflowHandle[Any]: Handle to the started child workflow

Start a child workflow and return a handle to it.

Example:

# Inside a workflow
@temporalio.workflow.defn
class ChildWorkflow:
    @temporalio.workflow.run
    async def run(self, data: str) -> str:
        return f"Processed: {data}"

# Start child workflow
child_handle = temporalio.workflow.start_child_workflow(
    ChildWorkflow.run,
    "input data",
    id="child-workflow-123",
    task_queue="child-queue"
)
result = await child_handle

External Workflow Handles

@overload
def get_external_workflow_handle(workflow_id: str) -> ExternalWorkflowHandle: ...

@overload
def get_external_workflow_handle(
    workflow_id: str, *, run_id: Optional[str] = None
) -> ExternalWorkflowHandle: ...

def get_external_workflow_handle(
    workflow_id: str, *, run_id: Optional[str] = None
) -> ExternalWorkflowHandle

Parameters:

  • workflow_id (str): Workflow ID of the external workflow
  • run_id (Optional[str]): Run ID of the external workflow

Returns:

  • ExternalWorkflowHandle: Handle for the external workflow

Get a handle to an external workflow by workflow ID and optional run ID.

@overload
def get_external_workflow_handle_for(
    workflow_type: CallableAsyncType, workflow_id: str
) -> ExternalWorkflowHandle[ReturnType]: ...

@overload
def get_external_workflow_handle_for(
    workflow_type: CallableAsyncType,
    workflow_id: str,
    *,
    run_id: Optional[str] = None
) -> ExternalWorkflowHandle[ReturnType]: ...

def get_external_workflow_handle_for(
    workflow_type: Callable,
    workflow_id: str,
    *,
    run_id: Optional[str] = None,
) -> ExternalWorkflowHandle[Any]

Parameters:

  • workflow_type (Callable): Type of the external workflow
  • workflow_id (str): Workflow ID of the external workflow
  • run_id (Optional[str]): Run ID of the external workflow

Returns:

  • ExternalWorkflowHandle[Any]: Typed handle for the external workflow

Get a typed handle to an external workflow by workflow type, workflow ID, and optional run ID.

Example:

# Inside a workflow
# Get handle to external workflow
external_handle = temporalio.workflow.get_external_workflow_handle("external-workflow-id")

# Signal external workflow
await external_handle.signal("signal_name", "signal_data")

# Cancel external workflow
await external_handle.cancel()

Continue-as-New

@overload
def continue_as_new(
    *,
    task_queue: Optional[str] = None,
    run_timeout: Optional[timedelta] = None,
    task_timeout: Optional[timedelta] = None,
    memo: Optional[Mapping[str, Any]] = None,
    search_attributes: Optional[
        Union[
            temporalio.common.TypedSearchAttributes,
            Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
        ]
    ] = None,
    versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
) -> NoReturn: ...

@overload
def continue_as_new(
    arg: AnyType,
    *,
    task_queue: Optional[str] = None,
    run_timeout: Optional[timedelta] = None,
    task_timeout: Optional[timedelta] = None,
    memo: Optional[Mapping[str, Any]] = None,
    search_attributes: Optional[
        Union[
            temporalio.common.TypedSearchAttributes,
            Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
        ]
    ] = None,
    versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
) -> NoReturn: ...

def continue_as_new(
    arg: Any = temporalio.common._arg_unset,
    *,
    task_queue: Optional[str] = None,
    run_timeout: Optional[timedelta] = None,
    task_timeout: Optional[timedelta] = None,
    memo: Optional[Mapping[str, Any]] = None,
    search_attributes: Optional[
        Union[
            temporalio.common.TypedSearchAttributes,
            Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
        ]
    ] = None,
    versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
) -> NoReturn

Parameters:

  • arg (Any): Argument to pass to the continued workflow
  • task_queue (Optional[str]): Task queue for the continued workflow
  • run_timeout (Optional[timedelta]): Maximum time for the continued workflow run
  • task_timeout (Optional[timedelta]): Maximum time for continued workflow tasks
  • memo (Optional[Mapping[str, Any]]): Memo to attach to continued workflow
  • search_attributes (Optional[Union[temporalio.common.TypedSearchAttributes, Mapping[Union[str, temporalio.common.SearchAttributeKey], Any]]]): Search attributes for continued workflow
  • versioning_intent (Optional[temporalio.common.VersioningIntent]): Worker versioning intent

Continue-as-new the current workflow. This will complete the current workflow and start a new execution with the same workflow ID.

Example:

# Inside a workflow
@temporalio.workflow.defn
class MyWorkflow:
    @temporalio.workflow.run
    async def run(self, iteration: int) -> str:
        if iteration >= 100:
            return f"Completed after {iteration} iterations"

        # Continue as new with next iteration
        temporalio.workflow.continue_as_new(iteration + 1)

Signal, Query, and Update Handler Management

Dynamic Handler Registration

def get_signal_handler(name: str) -> Optional[Callable]

Parameters:

  • name (str): Signal handler name

Returns:

  • Optional[Callable]: Signal handler function if found

Get a signal handler by name.

def set_signal_handler(
    name: str, handler: Optional[Callable[..., None]]
) -> None

Parameters:

  • name (str): Signal handler name
  • handler (Optional[Callable[..., None]]): Signal handler function or None to unset

Set a signal handler for the given name. Set to None to unset.

def get_dynamic_signal_handler() -> Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]

Returns:

  • Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]: Dynamic signal handler if set

Get the dynamic signal handler.

def set_dynamic_signal_handler(
    handler: Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]
) -> None

Parameters:

  • handler (Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]): Dynamic signal handler or None to unset

Set the dynamic signal handler. Set to None to unset.

def get_query_handler(name: str) -> Optional[Callable]

Parameters:

  • name (str): Query handler name

Returns:

  • Optional[Callable]: Query handler function if found

Get a query handler by name.

def set_query_handler(
    name: str, handler: Optional[Callable[..., Any]]
) -> None

Parameters:

  • name (str): Query handler name
  • handler (Optional[Callable[..., Any]]): Query handler function or None to unset

Set a query handler for the given name. Set to None to unset.

def get_dynamic_query_handler() -> Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]

Returns:

  • Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]: Dynamic query handler if set

Get the dynamic query handler.

def set_dynamic_query_handler(
    handler: Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]
) -> None

Parameters:

  • handler (Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]): Dynamic query handler or None to unset

Set the dynamic query handler. Set to None to unset.

def get_update_handler(name: str) -> Optional[Callable]

Parameters:

  • name (str): Update handler name

Returns:

  • Optional[Callable]: Update handler function if found

Get an update handler by name.

def set_update_handler(
    name: str,
    handler: Optional[Callable[..., Any]],
    validator: Optional[Callable[..., None]] = None,
) -> None

Parameters:

  • name (str): Update handler name
  • handler (Optional[Callable[..., Any]]): Update handler function or None to unset
  • validator (Optional[Callable[..., None]]): Optional validator function

Set an update handler for the given name. Set to None to unset.

def get_dynamic_update_handler() -> Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]

Returns:

  • Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]: Dynamic update handler if set

Get the dynamic update handler.

def set_dynamic_update_handler(
    handler: Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]],
    validator: Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]] = None,
) -> None

Parameters:

  • handler (Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]): Dynamic update handler or None to unset
  • validator (Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]): Optional validator function

Set the dynamic update handler. Set to None to unset.

Example:

# Inside a workflow
def custom_signal_handler(message: str) -> None:
    print(f"Received signal: {message}")

def custom_query_handler(query_type: str) -> str:
    return f"Query response for {query_type}"

# Set handlers dynamically
temporalio.workflow.set_signal_handler("custom_signal", custom_signal_handler)
temporalio.workflow.set_query_handler("custom_query", custom_query_handler)

# Get handlers
signal_handler = temporalio.workflow.get_signal_handler("custom_signal")
query_handler = temporalio.workflow.get_query_handler("custom_query")

Advanced Features

Nexus Operations

@overload
def create_nexus_client(
    client: NexusClient[ServiceHandlerT], *, endpoint: str
) -> ServiceHandlerT: ...

@overload
def create_nexus_client(
    *, service: str, endpoint: str, serializer: nexusrpc.Serializer = nexusrpc.JSONSerializer()
) -> temporalio.nexus.OperationClient: ...

def create_nexus_client(
    client: Optional[NexusClient[ServiceHandlerT]] = None,
    *,
    service: Optional[str] = None,
    endpoint: str,
    serializer: nexusrpc.Serializer = nexusrpc.JSONSerializer(),
) -> Union[ServiceHandlerT, temporalio.nexus.OperationClient]

Parameters:

  • client (Optional[NexusClient[ServiceHandlerT]]): Nexus client to use
  • service (Optional[str]): Service name for Nexus operations
  • endpoint (str): Nexus endpoint
  • serializer (nexusrpc.Serializer): Serializer for Nexus operations

Returns:

  • Union[ServiceHandlerT, temporalio.nexus.OperationClient]: Nexus client for operations

Create a Nexus client for calling operations on external Nexus services.

Versioning and Patches

def patched(id: str) -> bool

Parameters:

  • id (str): Patch identifier

Returns:

  • bool: Whether the patch has been applied

Check whether a patch has been applied. This is used for workflow code versioning.

def deprecate_patch(id: str) -> None

Parameters:

  • id (str): Patch identifier to deprecate

Deprecate a patch. This removes the patch from future workflow executions but maintains compatibility with existing executions.

Example:

# Inside a workflow
if temporalio.workflow.patched("feature_v2"):
    # New version of the feature
    result = await new_feature_implementation()
else:
    # Old version for replay compatibility
    result = await old_feature_implementation()

# In newer workflow versions, deprecate old patches
temporalio.workflow.deprecate_patch("old_feature_v1")

Workflow Utilities

def all_handlers_finished() -> bool

Returns:

  • bool: Whether all handlers have finished

Check whether all handlers (signal, query, update) have finished executing.

def as_completed(
    tasks: Iterable[Awaitable[AnyType]], *, timeout: Optional[float] = None
) -> Iterator[Awaitable[AnyType]]

Parameters:

  • tasks (Iterable[Awaitable[AnyType]]): Tasks to wait for completion
  • timeout (Optional[float]): Timeout in seconds

Returns:

  • Iterator[Awaitable[AnyType]]: Iterator of completed tasks

Return an iterator that yields tasks as they complete. Similar to asyncio.as_completed but works in workflows.

Example:

# Inside a workflow
tasks = [
    temporalio.workflow.start_activity(activity1, "arg1", schedule_to_close_timeout=timedelta(minutes=1)),
    temporalio.workflow.start_activity(activity2, "arg2", schedule_to_close_timeout=timedelta(minutes=1)),
    temporalio.workflow.start_activity(activity3, "arg3", schedule_to_close_timeout=timedelta(minutes=1))
]

# Process tasks as they complete
for task in temporalio.workflow.as_completed(tasks):
    result = await task
    print(f"Task completed with result: {result}")

Workflow Details Management

def get_current_details() -> str

Returns:

  • str: Current workflow details

Get the current workflow details string.

def set_current_details(description: str) -> None

Parameters:

  • description (str): Description to set as current workflow details

Set current workflow details. This appears in the workflow execution details in the Temporal Web UI.

Example:

# Inside a workflow
temporalio.workflow.set_current_details("Processing user data")
await process_user_data()

temporalio.workflow.set_current_details("Sending notifications")
await send_notifications()

current_status = temporalio.workflow.get_current_details()

Core Classes and Types

Workflow Information Classes

class Info:
    """Information about the running workflow."""

    namespace: str
    workflow_id: str
    run_id: str
    workflow_type: str
    task_queue: str
    attempt: int
    cron_schedule: Optional[str]
    continued_from_execution_run_id: Optional[str]
    parent: Optional[ParentInfo]
    root: Optional[RootInfo]
    start_time: datetime
    execution_timeout: Optional[timedelta]
    run_timeout: Optional[timedelta]
    task_timeout: timedelta
    retry_policy: Optional[temporalio.common.RetryPolicy]
    typed_search_attributes: temporalio.common.TypedSearchAttributes
    raw_memo: Optional[Mapping[str, temporalio.api.common.v1.Payload]]
    headers: Mapping[str, temporalio.api.common.v1.Payload]
    unsafe: temporalio.workflow.unsafe
class ParentInfo:
    """Information about parent workflow if this is a child workflow."""

    namespace: str
    workflow_id: str
    run_id: str
class RootInfo:
    """Information about root workflow if this is a child workflow."""

    workflow_id: str
    run_id: str
class UpdateInfo:
    """Information about the currently running update."""

    id: str
    name: str

Activity Handle

class ActivityHandle(Generic[ReturnType]):
    """Handle for a running activity."""

    @property
    def id(self) -> str:
        """ID of the activity."""
        ...

    def cancel(self) -> None:
        """Cancel the activity."""
        ...

    def is_cancelled(self) -> bool:
        """Check if the activity is cancelled."""
        ...

    def is_done(self) -> bool:
        """Check if the activity is complete."""
        ...

    def result(self) -> ReturnType:
        """Get the result of the activity. Raises exception if not complete."""
        ...

Child Workflow Handle

class ChildWorkflowHandle(Generic[ReturnType]):
    """Handle for a running child workflow."""

    @property
    def id(self) -> str:
        """ID of the child workflow."""
        ...

    @property
    def first_execution_run_id(self) -> Optional[str]:
        """First execution run ID of the child workflow."""
        ...

    async def signal(self, signal: str, arg: Any = temporalio.common._arg_unset) -> None:
        """Send a signal to the child workflow."""
        ...

    def cancel(self) -> None:
        """Cancel the child workflow."""
        ...

    def is_cancelled(self) -> bool:
        """Check if the child workflow is cancelled."""
        ...

    def is_done(self) -> bool:
        """Check if the child workflow is complete."""
        ...

    def result(self) -> ReturnType:
        """Get the result of the child workflow. Raises exception if not complete."""
        ...

External Workflow Handle

class ExternalWorkflowHandle(Generic[ReturnType]):
    """Handle for an external workflow."""

    @property
    def id(self) -> str:
        """ID of the external workflow."""
        ...

    @property
    def run_id(self) -> Optional[str]:
        """Run ID of the external workflow."""
        ...

    async def signal(self, signal: str, arg: Any = temporalio.common._arg_unset) -> None:
        """Send a signal to the external workflow."""
        ...

    async def cancel(self) -> None:
        """Cancel the external workflow."""
        ...

Configuration Types

class ActivityConfig(TypedDict, total=False):
    """Configuration for activity execution."""

    task_queue: Optional[str]
    schedule_to_close_timeout: Optional[timedelta]
    schedule_to_start_timeout: Optional[timedelta]
    start_to_close_timeout: Optional[timedelta]
    heartbeat_timeout: Optional[timedelta]
    retry_policy: Optional[temporalio.common.RetryPolicy]
    cancellation_type: ActivityCancellationType
    versioning_intent: Optional[temporalio.common.VersioningIntent]
    local_retry_threshold: Optional[timedelta]
class LocalActivityConfig(TypedDict, total=False):
    """Configuration for local activity execution."""

    schedule_to_close_timeout: Optional[timedelta]
    schedule_to_start_timeout: Optional[timedelta]
    start_to_close_timeout: Optional[timedelta]
    retry_policy: Optional[temporalio.common.RetryPolicy]
    local_retry_threshold: Optional[timedelta]
    cancellation_type: ActivityCancellationType
class ChildWorkflowConfig(TypedDict, total=False):
    """Configuration for child workflow execution."""

    id: str
    task_queue: Optional[str]
    execution_timeout: Optional[timedelta]
    run_timeout: Optional[timedelta]
    task_timeout: Optional[timedelta]
    id_reuse_policy: temporalio.common.WorkflowIDReusePolicy
    retry_policy: Optional[temporalio.common.RetryPolicy]
    cron_schedule: str
    memo: Optional[Mapping[str, Any]]
    search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, Mapping[Union[str, temporalio.common.SearchAttributeKey], Any]]]
    cancellation_type: ChildWorkflowCancellationType
    parent_close_policy: ParentClosePolicy
    versioning_intent: Optional[temporalio.common.VersioningIntent]

Enumeration Types

class HandlerUnfinishedPolicy(Enum):
    """Actions taken if a workflow terminates with running handlers."""

    WARN_AND_ABANDON = 1  # Issue a warning in addition to abandoning
    ABANDON = 2  # Abandon the handler
class ActivityCancellationType(IntEnum):
    """How to handle activity cancellation."""

    TRY_CANCEL = 1  # Try to cancel the activity
    WAIT_CANCELLATION_COMPLETED = 2  # Wait for cancellation to complete
    ABANDON = 3  # Abandon the activity immediately
class ChildWorkflowCancellationType(IntEnum):
    """How to handle child workflow cancellation."""

    WAIT_CANCELLATION_COMPLETED = 1  # Wait for cancellation to complete
    ABANDON = 2  # Abandon the child workflow immediately
class ParentClosePolicy(IntEnum):
    """What to do with child workflows when parent closes."""

    TERMINATE = 1  # Terminate child workflows
    ABANDON = 2  # Abandon child workflows
    REQUEST_CANCEL = 3  # Request cancellation of child workflows

Exception Types

class ContinueAsNewError(BaseException):
    """Exception raised to continue workflow as new."""
    pass
class NondeterminismError(Exception):
    """Exception for nondeterministic behavior in workflow."""
    pass
class ReadOnlyContextError(Exception):
    """Exception for read-only context violations."""
    pass
class UnfinishedUpdateHandlersWarning(RuntimeWarning):
    """Warning for unfinished update handlers when workflow exits."""
    pass
class UnfinishedSignalHandlersWarning(RuntimeWarning):
    """Warning for unfinished signal handlers when workflow exits."""
    pass

This comprehensive guide covers all the essential workflow development capabilities in the Temporalio Python SDK. The workflow module provides the core building blocks for creating robust, scalable, and maintainable distributed applications using Temporal's orchestration engine.

Install with Tessl CLI

npx tessl i tessl/pypi-temporalio

docs

activity.md

client.md

common.md

contrib-pydantic.md

data-conversion.md

exceptions.md

index.md

runtime.md

testing.md

worker.md

workflow.md

tile.json