Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.
The temporalio.workflow module provides core functionality for defining and executing workflows. This module contains decorators, context functions, activity execution methods, and utilities needed for workflow development. Workflows are the fundamental building blocks of Temporal applications, representing the orchestration logic for business processes.
The @workflow.defn decorator is used to mark a class as a workflow definition.
@overload
def defn(cls: ClassType) -> ClassType: ...
@overload
def defn(
*,
name: Optional[str] = None,
sandboxed: bool = True,
failure_exception_types: Sequence[Type[BaseException]] = [],
versioning_behavior: temporalio.common.VersioningBehavior = temporalio.common.VersioningBehavior.UNSPECIFIED,
) -> Callable[[ClassType], ClassType]: ...
@overload
def defn(
*,
sandboxed: bool = True,
dynamic: bool = False,
versioning_behavior: temporalio.common.VersioningBehavior = temporalio.common.VersioningBehavior.UNSPECIFIED,
) -> Callable[[ClassType], ClassType]: ...
def defn(
cls: Optional[ClassType] = None,
*,
name: Optional[str] = None,
sandboxed: bool = True,
dynamic: bool = False,
failure_exception_types: Sequence[Type[BaseException]] = [],
versioning_behavior: temporalio.common.VersioningBehavior = temporalio.common.VersioningBehavior.UNSPECIFIED,
) -> Union[ClassType, Callable[[ClassType], ClassType]]Parameters:
cls (Optional[ClassType]): The class to decoratename (Optional[str]): Name to use for the workflow. Defaults to class __name__. Cannot be set if dynamic is setsandboxed (bool): Whether the workflow should run in a sandbox. Default is Truedynamic (bool): If true, this workflow will be dynamic. Dynamic workflows have to accept a single 'Sequence[RawValue]' parameter. Cannot be set to true if name is presentfailure_exception_types (Sequence[Type[BaseException]]): The types of exceptions that, if a workflow-thrown exception extends, will cause the workflow/update to fail instead of suspending the workflow via task failureversioning_behavior (temporalio.common.VersioningBehavior): Specifies the versioning behavior to use for this workflowExample:
import temporalio.workflow
@temporalio.workflow.defn
class MyWorkflow:
@temporalio.workflow.run
async def run(self, input: str) -> str:
return f"Hello, {input}!"
# With custom name and settings
@temporalio.workflow.defn(name="CustomWorkflow", sandboxed=False)
class CustomWorkflow:
@temporalio.workflow.run
async def run(self) -> str:
return "Custom workflow"def init(init_fn: CallableType) -> CallableTypeParameters:
init_fn (CallableType): The __init__ method to decorateThe @workflow.init decorator may be used on the __init__ method of the workflow class to specify that it accepts the same workflow input arguments as the @workflow.run method. If used, the parameters of your __init__ and @workflow.run methods must be identical.
Example:
@temporalio.workflow.defn
class MyWorkflow:
@temporalio.workflow.init
def __init__(self, input: str):
self.input = input
@temporalio.workflow.run
async def run(self, input: str) -> str:
return f"Hello, {self.input}!"def run(fn: CallableAsyncType) -> CallableAsyncTypeParameters:
fn (CallableAsyncType): The function to decorateThe @workflow.run decorator must be used on one and only one async method defined on the same class as @workflow.defn. This can be defined on a base class method but must then be explicitly overridden and defined on the workflow class.
Example:
@temporalio.workflow.defn
class MyWorkflow:
@temporalio.workflow.run
async def run(self, name: str) -> str:
return await temporalio.workflow.start_activity(
my_activity,
name,
schedule_to_close_timeout=timedelta(minutes=5)
)@overload
def signal(
fn: CallableSyncOrAsyncReturnNoneType,
) -> CallableSyncOrAsyncReturnNoneType: ...
@overload
def signal(
*,
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
description: Optional[str] = None,
) -> Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]: ...
@overload
def signal(
*,
name: str,
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
description: Optional[str] = None,
) -> Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]: ...
@overload
def signal(
*,
dynamic: Literal[True],
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
description: Optional[str] = None,
) -> Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]: ...
def signal(
fn: Optional[CallableSyncOrAsyncReturnNoneType] = None,
*,
name: Optional[str] = None,
dynamic: Optional[bool] = False,
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
description: Optional[str] = None,
) -> Union[CallableSyncOrAsyncReturnNoneType, Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]]Parameters:
fn (Optional[CallableSyncOrAsyncReturnNoneType]): The function to decoratename (Optional[str]): Signal name. Defaults to method __name__. Cannot be present when dynamic is presentdynamic (Optional[bool]): If true, this handles all signals not otherwise handled. Cannot be present when name is presentunfinished_policy (HandlerUnfinishedPolicy): Actions taken if a workflow terminates with a running instance of this handlerdescription (Optional[str]): A short description of the signal that may appear in the UI/CLIExample:
@temporalio.workflow.defn
class MyWorkflow:
def __init__(self):
self.messages = []
@temporalio.workflow.signal
async def add_message(self, message: str) -> None:
self.messages.append(message)
@temporalio.workflow.signal(name="custom_signal", description="Custom signal handler")
def handle_custom_signal(self, data: dict) -> None:
# Handle custom signal
pass@overload
def query(fn: CallableType) -> CallableType: ...
@overload
def query(
*, name: str, description: Optional[str] = None
) -> Callable[[CallableType], CallableType]: ...
@overload
def query(
*, dynamic: Literal[True], description: Optional[str] = None
) -> Callable[[CallableType], CallableType]: ...
@overload
def query(*, description: str) -> Callable[[CallableType], CallableType]: ...
def query(
fn: Optional[CallableType] = None,
*,
name: Optional[str] = None,
dynamic: Optional[bool] = False,
description: Optional[str] = None,
) -> Union[CallableType, Callable[[CallableType], CallableType]]Parameters:
fn (Optional[CallableType]): The function to decoratename (Optional[str]): Query name. Defaults to method __name__. Cannot be present when dynamic is presentdynamic (Optional[bool]): If true, this handles all queries not otherwise handled. Cannot be present when name is presentdescription (Optional[str]): A short description of the query that may appear in the UI/CLIExample:
@temporalio.workflow.defn
class MyWorkflow:
def __init__(self):
self.status = "running"
@temporalio.workflow.query
def get_status(self) -> str:
return self.status
@temporalio.workflow.query(name="message_count", description="Get number of messages")
def get_message_count(self) -> int:
return len(self.messages)@overload
def update(
fn: UpdateMethodMultiParam[MultiParamSpec, ReturnType]
) -> UpdateMethodMultiParam[MultiParamSpec, ReturnType]: ...
@overload
def update(
*, name: str, description: Optional[str] = None
) -> Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]: ...
@overload
def update(
*,
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
description: Optional[str] = None,
) -> Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]: ...
@overload
def update(
*,
name: str,
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
description: Optional[str] = None,
) -> Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]: ...
@overload
def update(
*,
dynamic: Literal[True],
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
description: Optional[str] = None,
) -> Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]: ...
def update(
fn: Optional[UpdateMethodMultiParam[MultiParamSpec, ReturnType]] = None,
*,
name: Optional[str] = None,
dynamic: Optional[bool] = False,
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
description: Optional[str] = None,
) -> Union[UpdateMethodMultiParam[MultiParamSpec, ReturnType], Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]]Parameters:
fn (Optional[UpdateMethodMultiParam[MultiParamSpec, ReturnType]]): The function to decoratename (Optional[str]): Update name. Defaults to method __name__. Cannot be present when dynamic is presentdynamic (Optional[bool]): If true, this handles all updates not otherwise handled. Cannot be present when name is presentunfinished_policy (HandlerUnfinishedPolicy): Actions taken if a workflow terminates with a running instance of this handlerdescription (Optional[str]): A short description of the update that may appear in the UI/CLIExample:
@temporalio.workflow.defn
class MyWorkflow:
def __init__(self):
self.value = 0
@temporalio.workflow.update
def increment_value(self, amount: int) -> int:
self.value += amount
return self.value
@temporalio.workflow.update(name="set_value", description="Set the workflow value")
def set_workflow_value(self, new_value: int) -> int:
self.value = new_value
return self.valuedef info() -> InfoReturns:
Info: Current workflow execution informationGet current workflow info. This will error if not called in a workflow.
Example:
# Inside a workflow
workflow_info = temporalio.workflow.info()
print(f"Workflow ID: {workflow_info.workflow_id}")
print(f"Run ID: {workflow_info.run_id}")def instance() -> AnyReturns:
Any: Current workflow instanceGet the current workflow instance. This is the same as the workflow instance that is currently executing.
def in_workflow() -> boolReturns:
bool: Whether currently executing in a workflow contextCheck whether currently in a workflow. This will return True if called from a workflow and False otherwise.
def now() -> datetimeReturns:
datetime: Current workflow timeGet current workflow time as a timezone-aware datetime in UTC. This is always the current time according to the workflow's view of time which may not be the same as system time.
def time() -> floatReturns:
float: Current workflow time as seconds since epochGet current workflow time as seconds since the Unix epoch.
def time_ns() -> intReturns:
int: Current workflow time as nanoseconds since epochGet current workflow time as nanoseconds since the Unix epoch.
Example:
# Inside a workflow
current_time = temporalio.workflow.now()
timestamp = temporalio.workflow.time()
nano_timestamp = temporalio.workflow.time_ns()
print(f"Current workflow time: {current_time}")
print(f"Timestamp: {timestamp}")def uuid4() -> uuid.UUIDReturns:
uuid.UUID: Deterministic UUID4Generate a deterministic UUID4 value. This UUID will be the same for the same workflow execution on replay.
def random() -> RandomReturns:
Random: Deterministic random number generatorGet a deterministic random number generator. This random generator will produce the same sequence of numbers for the same workflow execution on replay.
Example:
# Inside a workflow
workflow_uuid = temporalio.workflow.uuid4()
rng = temporalio.workflow.random()
random_number = rng.randint(1, 100)def payload_converter() -> temporalio.converter.PayloadConverterReturns:
temporalio.converter.PayloadConverter: Current payload converterGet the payload converter for the current workflow.
def metric_meter() -> temporalio.common.MetricMeterReturns:
temporalio.common.MetricMeter: Current metric meterGet the metric meter for the current workflow.
def memo() -> Mapping[str, Any]Returns:
Mapping[str, Any]: Current workflow memoGet current workflow memo as a mapping of keys to values.
@overload
def memo_value(key: str, default: Any = temporalio.common._arg_unset) -> Any: ...
@overload
def memo_value(key: str, *, type_hint: Type[ParamType]) -> ParamType: ...
@overload
def memo_value(
key: str, default: ParamType, *, type_hint: Type[ParamType]
) -> ParamType: ...
def memo_value(
key: str,
default: Any = temporalio.common._arg_unset,
*,
type_hint: Optional[Type] = None,
) -> AnyParameters:
key (str): Memo key to getdefault (Any): Default value if key not presenttype_hint (Optional[Type]): Type hint for return valueReturns:
Any: Memo value for the given keyGet a specific memo value. If no default is provided and the key is not present, this will raise a KeyError.
def upsert_memo(updates: Mapping[str, Any]) -> NoneParameters:
updates (Mapping[str, Any]): Memo keys and values to upsertUpsert memo values. This will update the workflow memo with the provided key-value pairs.
Example:
# Inside a workflow
current_memo = temporalio.workflow.memo()
user_id = temporalio.workflow.memo_value("user_id", default="unknown")
# Update memo
temporalio.workflow.upsert_memo({"status": "processing", "progress": 0.5})def upsert_search_attributes(
updates: Union[
temporalio.common.TypedSearchAttributes,
Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
]
) -> NoneParameters:
updates (Union[temporalio.common.TypedSearchAttributes, Mapping[Union[str, temporalio.common.SearchAttributeKey], Any]]): Search attributes to upsertUpsert search attributes. This will update the workflow's search attributes with the provided values.
Example:
# Inside a workflow
import temporalio.common
# Update search attributes
temporalio.workflow.upsert_search_attributes({
"status": "active",
temporalio.common.SearchAttributeKey.for_keyword("department"): "engineering"
})@overload
def start_activity(
activity: CallableAsyncType,
arg: AnyType,
*,
task_queue: Optional[str] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
heartbeat_timeout: Optional[timedelta] = None,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[ReturnType]: ...
@overload
def start_activity(
activity: CallableAsyncType,
*,
task_queue: Optional[str] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
heartbeat_timeout: Optional[timedelta] = None,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[ReturnType]: ...
@overload
def start_activity(
activity: str,
arg: AnyType,
*,
task_queue: Optional[str] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
heartbeat_timeout: Optional[timedelta] = None,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[Any]: ...
@overload
def start_activity(
activity: str,
*,
task_queue: Optional[str] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
heartbeat_timeout: Optional[timedelta] = None,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[Any]: ...
def start_activity(
activity: Union[CallableType, str],
arg: Any = temporalio.common._arg_unset,
*,
task_queue: Optional[str] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
heartbeat_timeout: Optional[timedelta] = None,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[Any]Parameters:
activity (Union[CallableType, str]): Activity function or string namearg (Any): Single argument to pass to the activitytask_queue (Optional[str]): Task queue to run activity on. Defaults to current workflow's task queueschedule_to_close_timeout (Optional[timedelta]): Total time allowed for activity from schedule to completionschedule_to_start_timeout (Optional[timedelta]): Time allowed for activity to start from when scheduledstart_to_close_timeout (Optional[timedelta]): Time allowed for activity to complete from startheartbeat_timeout (Optional[timedelta]): Maximum time between activity heartbeatsretry_policy (Optional[temporalio.common.RetryPolicy]): How the activity is retried on failurecancellation_type (ActivityCancellationType): How to handle activity cancellationversioning_intent (Optional[temporalio.common.VersioningIntent]): Worker versioning intent for this activitylocal_retry_threshold (Optional[timedelta]): Threshold for local retries vs server retriesReturns:
ActivityHandle[Any]: Handle to the started activityStart an activity and return a handle to it. The activity will not be awaited automatically.
Example:
# Inside a workflow
from datetime import timedelta
import temporalio.activity
@temporalio.activity.defn
async def my_activity(name: str) -> str:
return f"Hello, {name}!"
# Start activity and await result
result = await temporalio.workflow.start_activity(
my_activity,
"World",
schedule_to_close_timeout=timedelta(minutes=5)
)
# Start activity and get handle for later awaiting
handle = temporalio.workflow.start_activity(
my_activity,
"World",
schedule_to_close_timeout=timedelta(minutes=5)
)
result = await handle@overload
def start_activity_class(
activity_cls: Type[ClassType],
activity_method: str,
arg: AnyType,
*,
task_queue: Optional[str] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
heartbeat_timeout: Optional[timedelta] = None,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[Any]: ...
def start_activity_class(
activity_cls: Type,
activity_method: str,
arg: Any = temporalio.common._arg_unset,
*,
task_queue: Optional[str] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
heartbeat_timeout: Optional[timedelta] = None,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[Any]Parameters:
activity_cls (Type): Activity classactivity_method (str): Method name on the activity classarg (Any): Single argument to pass to the activitystart_activityReturns:
ActivityHandle[Any]: Handle to the started activityStart an activity method from a class and return a handle to it.
@overload
def start_activity_method(
activity_method: MethodAsyncNoParam[SelfType, ReturnType],
*,
task_queue: Optional[str] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
heartbeat_timeout: Optional[timedelta] = None,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[ReturnType]: ...
def start_activity_method(
activity_method: Callable,
arg: Any = temporalio.common._arg_unset,
*,
task_queue: Optional[str] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
heartbeat_timeout: Optional[timedelta] = None,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
local_retry_threshold: Optional[timedelta] = None,
) -> ActivityHandle[Any]Parameters:
activity_method (Callable): Activity method to startarg (Any): Single argument to pass to the activitystart_activityReturns:
ActivityHandle[Any]: Handle to the started activityStart an activity method and return a handle to it.
@overload
def start_local_activity(
activity: CallableAsyncType,
arg: AnyType,
*,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
local_retry_threshold: Optional[timedelta] = None,
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
) -> ActivityHandle[ReturnType]: ...
def start_local_activity(
activity: Union[CallableType, str],
arg: Any = temporalio.common._arg_unset,
*,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
local_retry_threshold: Optional[timedelta] = None,
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
) -> ActivityHandle[Any]Parameters:
activity (Union[CallableType, str]): Local activity function or string namearg (Any): Single argument to pass to the activityschedule_to_close_timeout (Optional[timedelta]): Total time allowed for activity from schedule to completionschedule_to_start_timeout (Optional[timedelta]): Time allowed for activity to start from when scheduledstart_to_close_timeout (Optional[timedelta]): Time allowed for activity to complete from startretry_policy (Optional[temporalio.common.RetryPolicy]): How the activity is retried on failurelocal_retry_threshold (Optional[timedelta]): Threshold for local retries vs server retriescancellation_type (ActivityCancellationType): How to handle activity cancellationReturns:
ActivityHandle[Any]: Handle to the started local activityStart a local activity and return a handle to it. Local activities are executed in the same process as the workflow worker.
Example:
# Inside a workflow
@temporalio.activity.defn
async def local_activity(data: str) -> str:
return data.upper()
# Start local activity
result = await temporalio.workflow.start_local_activity(
local_activity,
"hello world",
start_to_close_timeout=timedelta(seconds=30)
)@overload
def start_child_workflow(
workflow: CallableAsyncType,
arg: AnyType,
*,
id: str,
task_queue: Optional[str] = None,
execution_timeout: Optional[timedelta] = None,
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[
Union[
temporalio.common.TypedSearchAttributes,
Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
]
] = None,
cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
) -> ChildWorkflowHandle[ReturnType]: ...
def start_child_workflow(
workflow: Union[CallableType, str],
arg: Any = temporalio.common._arg_unset,
*,
id: str,
task_queue: Optional[str] = None,
execution_timeout: Optional[timedelta] = None,
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[
Union[
temporalio.common.TypedSearchAttributes,
Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
]
] = None,
cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
) -> ChildWorkflowHandle[Any]Parameters:
workflow (Union[CallableType, str]): Child workflow function or string namearg (Any): Single argument to pass to the child workflowid (str): ID for the child workflowtask_queue (Optional[str]): Task queue to run child workflow onexecution_timeout (Optional[timedelta]): Maximum time for workflow executionrun_timeout (Optional[timedelta]): Maximum time for workflow runtask_timeout (Optional[timedelta]): Maximum time for workflow tasksid_reuse_policy (temporalio.common.WorkflowIDReusePolicy): Policy for workflow ID reuseretry_policy (Optional[temporalio.common.RetryPolicy]): How the workflow is retried on failurecron_schedule (str): Cron schedule for the workflowmemo (Optional[Mapping[str, Any]]): Memo to attach to workflowsearch_attributes (Optional[Union[temporalio.common.TypedSearchAttributes, Mapping[Union[str, temporalio.common.SearchAttributeKey], Any]]]): Search attributes for the workflowcancellation_type (ChildWorkflowCancellationType): How to handle child workflow cancellationparent_close_policy (ParentClosePolicy): What to do with child when parent closesversioning_intent (Optional[temporalio.common.VersioningIntent]): Worker versioning intentReturns:
ChildWorkflowHandle[Any]: Handle to the started child workflowStart a child workflow and return a handle to it.
Example:
# Inside a workflow
@temporalio.workflow.defn
class ChildWorkflow:
@temporalio.workflow.run
async def run(self, data: str) -> str:
return f"Processed: {data}"
# Start child workflow
child_handle = temporalio.workflow.start_child_workflow(
ChildWorkflow.run,
"input data",
id="child-workflow-123",
task_queue="child-queue"
)
result = await child_handle@overload
def get_external_workflow_handle(workflow_id: str) -> ExternalWorkflowHandle: ...
@overload
def get_external_workflow_handle(
workflow_id: str, *, run_id: Optional[str] = None
) -> ExternalWorkflowHandle: ...
def get_external_workflow_handle(
workflow_id: str, *, run_id: Optional[str] = None
) -> ExternalWorkflowHandleParameters:
workflow_id (str): Workflow ID of the external workflowrun_id (Optional[str]): Run ID of the external workflowReturns:
ExternalWorkflowHandle: Handle for the external workflowGet a handle to an external workflow by workflow ID and optional run ID.
@overload
def get_external_workflow_handle_for(
workflow_type: CallableAsyncType, workflow_id: str
) -> ExternalWorkflowHandle[ReturnType]: ...
@overload
def get_external_workflow_handle_for(
workflow_type: CallableAsyncType,
workflow_id: str,
*,
run_id: Optional[str] = None
) -> ExternalWorkflowHandle[ReturnType]: ...
def get_external_workflow_handle_for(
workflow_type: Callable,
workflow_id: str,
*,
run_id: Optional[str] = None,
) -> ExternalWorkflowHandle[Any]Parameters:
workflow_type (Callable): Type of the external workflowworkflow_id (str): Workflow ID of the external workflowrun_id (Optional[str]): Run ID of the external workflowReturns:
ExternalWorkflowHandle[Any]: Typed handle for the external workflowGet a typed handle to an external workflow by workflow type, workflow ID, and optional run ID.
Example:
# Inside a workflow
# Get handle to external workflow
external_handle = temporalio.workflow.get_external_workflow_handle("external-workflow-id")
# Signal external workflow
await external_handle.signal("signal_name", "signal_data")
# Cancel external workflow
await external_handle.cancel()@overload
def continue_as_new(
*,
task_queue: Optional[str] = None,
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[
Union[
temporalio.common.TypedSearchAttributes,
Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
]
] = None,
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
) -> NoReturn: ...
@overload
def continue_as_new(
arg: AnyType,
*,
task_queue: Optional[str] = None,
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[
Union[
temporalio.common.TypedSearchAttributes,
Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
]
] = None,
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
) -> NoReturn: ...
def continue_as_new(
arg: Any = temporalio.common._arg_unset,
*,
task_queue: Optional[str] = None,
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[
Union[
temporalio.common.TypedSearchAttributes,
Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
]
] = None,
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
) -> NoReturnParameters:
arg (Any): Argument to pass to the continued workflowtask_queue (Optional[str]): Task queue for the continued workflowrun_timeout (Optional[timedelta]): Maximum time for the continued workflow runtask_timeout (Optional[timedelta]): Maximum time for continued workflow tasksmemo (Optional[Mapping[str, Any]]): Memo to attach to continued workflowsearch_attributes (Optional[Union[temporalio.common.TypedSearchAttributes, Mapping[Union[str, temporalio.common.SearchAttributeKey], Any]]]): Search attributes for continued workflowversioning_intent (Optional[temporalio.common.VersioningIntent]): Worker versioning intentContinue-as-new the current workflow. This will complete the current workflow and start a new execution with the same workflow ID.
Example:
# Inside a workflow
@temporalio.workflow.defn
class MyWorkflow:
@temporalio.workflow.run
async def run(self, iteration: int) -> str:
if iteration >= 100:
return f"Completed after {iteration} iterations"
# Continue as new with next iteration
temporalio.workflow.continue_as_new(iteration + 1)def get_signal_handler(name: str) -> Optional[Callable]Parameters:
name (str): Signal handler nameReturns:
Optional[Callable]: Signal handler function if foundGet a signal handler by name.
def set_signal_handler(
name: str, handler: Optional[Callable[..., None]]
) -> NoneParameters:
name (str): Signal handler namehandler (Optional[Callable[..., None]]): Signal handler function or None to unsetSet a signal handler for the given name. Set to None to unset.
def get_dynamic_signal_handler() -> Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]Returns:
Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]: Dynamic signal handler if setGet the dynamic signal handler.
def set_dynamic_signal_handler(
handler: Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]
) -> NoneParameters:
handler (Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]): Dynamic signal handler or None to unsetSet the dynamic signal handler. Set to None to unset.
def get_query_handler(name: str) -> Optional[Callable]Parameters:
name (str): Query handler nameReturns:
Optional[Callable]: Query handler function if foundGet a query handler by name.
def set_query_handler(
name: str, handler: Optional[Callable[..., Any]]
) -> NoneParameters:
name (str): Query handler namehandler (Optional[Callable[..., Any]]): Query handler function or None to unsetSet a query handler for the given name. Set to None to unset.
def get_dynamic_query_handler() -> Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]Returns:
Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]: Dynamic query handler if setGet the dynamic query handler.
def set_dynamic_query_handler(
handler: Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]
) -> NoneParameters:
handler (Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]): Dynamic query handler or None to unsetSet the dynamic query handler. Set to None to unset.
def get_update_handler(name: str) -> Optional[Callable]Parameters:
name (str): Update handler nameReturns:
Optional[Callable]: Update handler function if foundGet an update handler by name.
def set_update_handler(
name: str,
handler: Optional[Callable[..., Any]],
validator: Optional[Callable[..., None]] = None,
) -> NoneParameters:
name (str): Update handler namehandler (Optional[Callable[..., Any]]): Update handler function or None to unsetvalidator (Optional[Callable[..., None]]): Optional validator functionSet an update handler for the given name. Set to None to unset.
def get_dynamic_update_handler() -> Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]Returns:
Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]: Dynamic update handler if setGet the dynamic update handler.
def set_dynamic_update_handler(
handler: Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]],
validator: Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]] = None,
) -> NoneParameters:
handler (Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]): Dynamic update handler or None to unsetvalidator (Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]): Optional validator functionSet the dynamic update handler. Set to None to unset.
Example:
# Inside a workflow
def custom_signal_handler(message: str) -> None:
print(f"Received signal: {message}")
def custom_query_handler(query_type: str) -> str:
return f"Query response for {query_type}"
# Set handlers dynamically
temporalio.workflow.set_signal_handler("custom_signal", custom_signal_handler)
temporalio.workflow.set_query_handler("custom_query", custom_query_handler)
# Get handlers
signal_handler = temporalio.workflow.get_signal_handler("custom_signal")
query_handler = temporalio.workflow.get_query_handler("custom_query")@overload
def create_nexus_client(
client: NexusClient[ServiceHandlerT], *, endpoint: str
) -> ServiceHandlerT: ...
@overload
def create_nexus_client(
*, service: str, endpoint: str, serializer: nexusrpc.Serializer = nexusrpc.JSONSerializer()
) -> temporalio.nexus.OperationClient: ...
def create_nexus_client(
client: Optional[NexusClient[ServiceHandlerT]] = None,
*,
service: Optional[str] = None,
endpoint: str,
serializer: nexusrpc.Serializer = nexusrpc.JSONSerializer(),
) -> Union[ServiceHandlerT, temporalio.nexus.OperationClient]Parameters:
client (Optional[NexusClient[ServiceHandlerT]]): Nexus client to useservice (Optional[str]): Service name for Nexus operationsendpoint (str): Nexus endpointserializer (nexusrpc.Serializer): Serializer for Nexus operationsReturns:
Union[ServiceHandlerT, temporalio.nexus.OperationClient]: Nexus client for operationsCreate a Nexus client for calling operations on external Nexus services.
def patched(id: str) -> boolParameters:
id (str): Patch identifierReturns:
bool: Whether the patch has been appliedCheck whether a patch has been applied. This is used for workflow code versioning.
def deprecate_patch(id: str) -> NoneParameters:
id (str): Patch identifier to deprecateDeprecate a patch. This removes the patch from future workflow executions but maintains compatibility with existing executions.
Example:
# Inside a workflow
if temporalio.workflow.patched("feature_v2"):
# New version of the feature
result = await new_feature_implementation()
else:
# Old version for replay compatibility
result = await old_feature_implementation()
# In newer workflow versions, deprecate old patches
temporalio.workflow.deprecate_patch("old_feature_v1")def all_handlers_finished() -> boolReturns:
bool: Whether all handlers have finishedCheck whether all handlers (signal, query, update) have finished executing.
def as_completed(
tasks: Iterable[Awaitable[AnyType]], *, timeout: Optional[float] = None
) -> Iterator[Awaitable[AnyType]]Parameters:
tasks (Iterable[Awaitable[AnyType]]): Tasks to wait for completiontimeout (Optional[float]): Timeout in secondsReturns:
Iterator[Awaitable[AnyType]]: Iterator of completed tasksReturn an iterator that yields tasks as they complete. Similar to asyncio.as_completed but works in workflows.
Example:
# Inside a workflow
tasks = [
temporalio.workflow.start_activity(activity1, "arg1", schedule_to_close_timeout=timedelta(minutes=1)),
temporalio.workflow.start_activity(activity2, "arg2", schedule_to_close_timeout=timedelta(minutes=1)),
temporalio.workflow.start_activity(activity3, "arg3", schedule_to_close_timeout=timedelta(minutes=1))
]
# Process tasks as they complete
for task in temporalio.workflow.as_completed(tasks):
result = await task
print(f"Task completed with result: {result}")def get_current_details() -> strReturns:
str: Current workflow detailsGet the current workflow details string.
def set_current_details(description: str) -> NoneParameters:
description (str): Description to set as current workflow detailsSet current workflow details. This appears in the workflow execution details in the Temporal Web UI.
Example:
# Inside a workflow
temporalio.workflow.set_current_details("Processing user data")
await process_user_data()
temporalio.workflow.set_current_details("Sending notifications")
await send_notifications()
current_status = temporalio.workflow.get_current_details()class Info:
"""Information about the running workflow."""
namespace: str
workflow_id: str
run_id: str
workflow_type: str
task_queue: str
attempt: int
cron_schedule: Optional[str]
continued_from_execution_run_id: Optional[str]
parent: Optional[ParentInfo]
root: Optional[RootInfo]
start_time: datetime
execution_timeout: Optional[timedelta]
run_timeout: Optional[timedelta]
task_timeout: timedelta
retry_policy: Optional[temporalio.common.RetryPolicy]
typed_search_attributes: temporalio.common.TypedSearchAttributes
raw_memo: Optional[Mapping[str, temporalio.api.common.v1.Payload]]
headers: Mapping[str, temporalio.api.common.v1.Payload]
unsafe: temporalio.workflow.unsafeclass ParentInfo:
"""Information about parent workflow if this is a child workflow."""
namespace: str
workflow_id: str
run_id: strclass RootInfo:
"""Information about root workflow if this is a child workflow."""
workflow_id: str
run_id: strclass UpdateInfo:
"""Information about the currently running update."""
id: str
name: strclass ActivityHandle(Generic[ReturnType]):
"""Handle for a running activity."""
@property
def id(self) -> str:
"""ID of the activity."""
...
def cancel(self) -> None:
"""Cancel the activity."""
...
def is_cancelled(self) -> bool:
"""Check if the activity is cancelled."""
...
def is_done(self) -> bool:
"""Check if the activity is complete."""
...
def result(self) -> ReturnType:
"""Get the result of the activity. Raises exception if not complete."""
...class ChildWorkflowHandle(Generic[ReturnType]):
"""Handle for a running child workflow."""
@property
def id(self) -> str:
"""ID of the child workflow."""
...
@property
def first_execution_run_id(self) -> Optional[str]:
"""First execution run ID of the child workflow."""
...
async def signal(self, signal: str, arg: Any = temporalio.common._arg_unset) -> None:
"""Send a signal to the child workflow."""
...
def cancel(self) -> None:
"""Cancel the child workflow."""
...
def is_cancelled(self) -> bool:
"""Check if the child workflow is cancelled."""
...
def is_done(self) -> bool:
"""Check if the child workflow is complete."""
...
def result(self) -> ReturnType:
"""Get the result of the child workflow. Raises exception if not complete."""
...class ExternalWorkflowHandle(Generic[ReturnType]):
"""Handle for an external workflow."""
@property
def id(self) -> str:
"""ID of the external workflow."""
...
@property
def run_id(self) -> Optional[str]:
"""Run ID of the external workflow."""
...
async def signal(self, signal: str, arg: Any = temporalio.common._arg_unset) -> None:
"""Send a signal to the external workflow."""
...
async def cancel(self) -> None:
"""Cancel the external workflow."""
...class ActivityConfig(TypedDict, total=False):
"""Configuration for activity execution."""
task_queue: Optional[str]
schedule_to_close_timeout: Optional[timedelta]
schedule_to_start_timeout: Optional[timedelta]
start_to_close_timeout: Optional[timedelta]
heartbeat_timeout: Optional[timedelta]
retry_policy: Optional[temporalio.common.RetryPolicy]
cancellation_type: ActivityCancellationType
versioning_intent: Optional[temporalio.common.VersioningIntent]
local_retry_threshold: Optional[timedelta]class LocalActivityConfig(TypedDict, total=False):
"""Configuration for local activity execution."""
schedule_to_close_timeout: Optional[timedelta]
schedule_to_start_timeout: Optional[timedelta]
start_to_close_timeout: Optional[timedelta]
retry_policy: Optional[temporalio.common.RetryPolicy]
local_retry_threshold: Optional[timedelta]
cancellation_type: ActivityCancellationTypeclass ChildWorkflowConfig(TypedDict, total=False):
"""Configuration for child workflow execution."""
id: str
task_queue: Optional[str]
execution_timeout: Optional[timedelta]
run_timeout: Optional[timedelta]
task_timeout: Optional[timedelta]
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy
retry_policy: Optional[temporalio.common.RetryPolicy]
cron_schedule: str
memo: Optional[Mapping[str, Any]]
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, Mapping[Union[str, temporalio.common.SearchAttributeKey], Any]]]
cancellation_type: ChildWorkflowCancellationType
parent_close_policy: ParentClosePolicy
versioning_intent: Optional[temporalio.common.VersioningIntent]class HandlerUnfinishedPolicy(Enum):
"""Actions taken if a workflow terminates with running handlers."""
WARN_AND_ABANDON = 1 # Issue a warning in addition to abandoning
ABANDON = 2 # Abandon the handlerclass ActivityCancellationType(IntEnum):
"""How to handle activity cancellation."""
TRY_CANCEL = 1 # Try to cancel the activity
WAIT_CANCELLATION_COMPLETED = 2 # Wait for cancellation to complete
ABANDON = 3 # Abandon the activity immediatelyclass ChildWorkflowCancellationType(IntEnum):
"""How to handle child workflow cancellation."""
WAIT_CANCELLATION_COMPLETED = 1 # Wait for cancellation to complete
ABANDON = 2 # Abandon the child workflow immediatelyclass ParentClosePolicy(IntEnum):
"""What to do with child workflows when parent closes."""
TERMINATE = 1 # Terminate child workflows
ABANDON = 2 # Abandon child workflows
REQUEST_CANCEL = 3 # Request cancellation of child workflowsclass ContinueAsNewError(BaseException):
"""Exception raised to continue workflow as new."""
passclass NondeterminismError(Exception):
"""Exception for nondeterministic behavior in workflow."""
passclass ReadOnlyContextError(Exception):
"""Exception for read-only context violations."""
passclass UnfinishedUpdateHandlersWarning(RuntimeWarning):
"""Warning for unfinished update handlers when workflow exits."""
passclass UnfinishedSignalHandlersWarning(RuntimeWarning):
"""Warning for unfinished signal handlers when workflow exits."""
passThis comprehensive guide covers all the essential workflow development capabilities in the Temporalio Python SDK. The workflow module provides the core building blocks for creating robust, scalable, and maintainable distributed applications using Temporal's orchestration engine.
Install with Tessl CLI
npx tessl i tessl/pypi-temporalio