Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.
The temporalio.client module provides the primary interface for connecting to Temporal servers and managing workflows, schedules, and activities. This module contains the main Client class that serves as the entry point for all client-side operations.
The Client class is the main interface for interacting with a Temporal server. It provides methods for workflow management, schedule operations, and activity handling.
class Client:
"""Client for accessing Temporal.
Most users will use connect() to create a client. The service property provides
access to a raw gRPC client. Clients are not thread-safe and should only be used
in the event loop they are first connected in.
Clients do not work across forks since runtimes do not work across forks.
"""@staticmethod
async def connect(
target_host: str,
*,
namespace: str = "default",
api_key: Optional[str] = None,
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
plugins: Sequence[Plugin] = [],
interceptors: Sequence[Interceptor] = [],
default_workflow_query_reject_condition: Optional[temporalio.common.QueryRejectCondition] = None,
tls: Union[bool, TLSConfig] = False,
retry_config: Optional[RetryConfig] = None,
keep_alive_config: Optional[KeepAliveConfig] = KeepAliveConfig.default,
rpc_metadata: Mapping[str, str] = {},
identity: Optional[str] = None,
lazy: bool = False,
runtime: Optional[temporalio.runtime.Runtime] = None,
http_connect_proxy_config: Optional[HttpConnectProxyConfig] = None,
header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC,
) -> ClientParameters:
target_host (str): host:port for the Temporal server. For local development, this is often "localhost:7233"namespace (str): Namespace to use for client calls. Default: "default"api_key (Optional[str]): API key for Temporal. This becomes the "Authorization" HTTP header with "Bearer " prependeddata_converter (temporalio.converter.DataConverter): Data converter to use for all data conversions to/from payloadsplugins (Sequence[Plugin]): Set of plugins that are chained together to allow intercepting and modifying client creation and service connectioninterceptors (Sequence[Interceptor]): Set of interceptors that are chained together to allow intercepting of client callsdefault_workflow_query_reject_condition (Optional[temporalio.common.QueryRejectCondition]): The default rejection condition for workflow queriestls (Union[bool, TLSConfig]): TLS configuration. If False, do not use TLS. If True, use system default TLS configurationretry_config (Optional[RetryConfig]): Retry configuration for direct service calls or all high-level callskeep_alive_config (Optional[KeepAliveConfig]): Keep-alive configuration for the client connectionrpc_metadata (Mapping[str, str]): Headers to use for all calls to the serveridentity (Optional[str]): Identity for this client. If unset, a default is created based on the version of the SDKlazy (bool): If true, the client will not connect until the first call is attemptedruntime (Optional[temporalio.runtime.Runtime]): The runtime for this client, or the default if unsethttp_connect_proxy_config (Optional[HttpConnectProxyConfig]): Configuration for HTTP CONNECT proxyheader_codec_behavior (HeaderCodecBehavior): Encoding behavior for headers sent by the clientReturns:
Client: Connected Temporal clientExample:
import temporalio.client
# Connect to local Temporal server
client = await temporalio.client.Client.connect("localhost:7233")
# Connect with TLS and authentication
client = await temporalio.client.Client.connect(
"my-temporal.example.com:7233",
namespace="production",
api_key="my-api-key",
tls=True
)class ClientConfig(TypedDict, total=False):
"""TypedDict of config originally passed to Client."""
service_client: Required[temporalio.service.ServiceClient]
namespace: Required[str]
data_converter: Required[temporalio.converter.DataConverter]
interceptors: Required[Sequence[Interceptor]]
default_workflow_query_reject_condition: Required[Optional[temporalio.common.QueryRejectCondition]]
header_codec_behavior: Required[HeaderCodecBehavior]
plugins: Required[Sequence[Plugin]]@property
def service_client(self) -> temporalio.service.ServiceClient:
"""Raw gRPC service client."""
@property
def workflow_service(self) -> temporalio.service.WorkflowService:
"""Raw gRPC workflow service client."""
@property
def operator_service(self) -> temporalio.service.OperatorService:
"""Raw gRPC operator service client."""
@property
def test_service(self) -> temporalio.service.TestService:
"""Raw gRPC test service client."""
@property
def namespace(self) -> str:
"""Namespace used in calls by this client."""
@property
def identity(self) -> str:
"""Identity used in calls by this client."""
@property
def data_converter(self) -> temporalio.converter.DataConverter:
"""Data converter used by this client."""
@property
def rpc_metadata(self) -> Mapping[str, str]:
"""Headers for every call made by this client."""
@rpc_metadata.setter
def rpc_metadata(self, value: Mapping[str, str]) -> None:
"""Update the headers for this client."""
@property
def api_key(self) -> Optional[str]:
"""API key for every call made by this client."""
@api_key.setter
def api_key(self, value: Optional[str]) -> None:
"""Update the API key for this client."""The start_workflow method is used to start new workflow executions. It has several overloads to support different calling patterns.
async def start_workflow(
self,
workflow: MethodAsyncNoParam[SelfType, ReturnType],
*,
id: str,
task_queue: str,
execution_timeout: Optional[timedelta] = None,
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
static_summary: Optional[str] = None,
static_details: Optional[str] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
priority: temporalio.common.Priority = temporalio.common.Priority.default,
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
) -> WorkflowHandle[SelfType, ReturnType]async def start_workflow(
self,
workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType],
arg: ParamType,
*,
id: str,
task_queue: str,
execution_timeout: Optional[timedelta] = None,
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
static_summary: Optional[str] = None,
static_details: Optional[str] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
priority: temporalio.common.Priority = temporalio.common.Priority.default,
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
) -> WorkflowHandle[SelfType, ReturnType]async def start_workflow(
self,
workflow: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]],
*,
args: Sequence[Any],
id: str,
task_queue: str,
execution_timeout: Optional[timedelta] = None,
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
static_summary: Optional[str] = None,
static_details: Optional[str] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
priority: temporalio.common.Priority = temporalio.common.Priority.default,
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
) -> WorkflowHandle[SelfType, ReturnType]async def start_workflow(
self,
workflow: str,
arg: Any = temporalio.common._arg_unset,
*,
args: Sequence[Any] = [],
id: str,
task_queue: str,
result_type: Optional[Type] = None,
execution_timeout: Optional[timedelta] = None,
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
static_summary: Optional[str] = None,
static_details: Optional[str] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
priority: temporalio.common.Priority = temporalio.common.Priority.default,
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
) -> WorkflowHandle[Any, Any]Common Parameters:
workflow: The workflow to start (method reference or string name)id (str): Unique workflow IDtask_queue (str): Task queue to run the workflow onexecution_timeout (Optional[timedelta]): Timeout for the entire workflow executionrun_timeout (Optional[timedelta]): Timeout for a single workflow runtask_timeout (Optional[timedelta]): Timeout for individual workflow tasksid_reuse_policy (temporalio.common.WorkflowIDReusePolicy): Policy for reusing workflow IDsid_conflict_policy (temporalio.common.WorkflowIDConflictPolicy): Policy for handling workflow ID conflictsretry_policy (Optional[temporalio.common.RetryPolicy]): Retry policy for the workflowcron_schedule (str): Cron schedule for recurring workflowsmemo (Optional[Mapping[str, Any]]): Memo to attach to the workflowsearch_attributes (Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]]): Search attributes for the workflowstatic_summary (Optional[str]): Static summary for the workflowstatic_details (Optional[str]): Static details for the workflowstart_delay (Optional[timedelta]): Delay before starting the workflowstart_signal (Optional[str]): Signal to send when starting the workflowstart_signal_args (Sequence[Any]): Arguments for the start signalrpc_metadata (Mapping[str, str]): RPC metadata for this callrpc_timeout (Optional[timedelta]): Timeout for the RPC callrequest_eager_start (bool): Request eager workflow startpriority (temporalio.common.Priority): Workflow priorityversioning_override (Optional[temporalio.common.VersioningOverride]): Versioning override for the workflowReturns:
WorkflowHandle: Handle to the started workflowSimilar to starting workflows, the client provides execute_workflow methods that start a workflow and wait for its result.
async def execute_workflow(
self,
workflow: Union[str, Callable[..., Awaitable[Any]]],
arg: Any = temporalio.common._arg_unset,
*,
args: Sequence[Any] = [],
id: str,
task_queue: str,
result_type: Optional[Type] = None,
execution_timeout: Optional[timedelta] = None,
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
static_summary: Optional[str] = None,
static_details: Optional[str] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
priority: temporalio.common.Priority = temporalio.common.Priority.default,
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
) -> Anydef get_workflow_handle(
self,
workflow_id: str,
*,
run_id: Optional[str] = None,
first_execution_run_id: Optional[str] = None,
result_type: Optional[Type] = None,
) -> WorkflowHandle[Any, Any]Parameters:
workflow_id (str): Workflow ID to get a handle torun_id (Optional[str]): Run ID that will be used for all callsfirst_execution_run_id (Optional[str]): First execution run ID used for cancellation and terminationresult_type (Optional[Type]): The result type to deserialize into if knownReturns:
WorkflowHandle[Any, Any]: Handle to the workflowdef get_workflow_handle_for(
self,
workflow: Union[MethodAsyncNoParam[SelfType, ReturnType], MethodAsyncSingleParam[SelfType, Any, ReturnType]],
workflow_id: str,
*,
run_id: Optional[str] = None,
first_execution_run_id: Optional[str] = None,
) -> WorkflowHandle[SelfType, ReturnType]Parameters:
workflow: The workflow run method to use for typing the handleworkflow_id (str): Workflow ID to get a handle torun_id (Optional[str]): Run ID that will be used for all callsfirst_execution_run_id (Optional[str]): First execution run ID used for cancellation and terminationReturns:
WorkflowHandle[SelfType, ReturnType]: Typed handle to the workflowclass WorkflowHandle(Generic[SelfType, ReturnType]):
"""Handle for interacting with a workflow.
This is usually created via Client.get_workflow_handle or
returned from Client.start_workflow.
"""
def __init__(
self,
client: Client,
id: str,
*,
run_id: Optional[str] = None,
result_run_id: Optional[str] = None,
first_execution_run_id: Optional[str] = None,
result_type: Optional[Type] = None,
start_workflow_response: Optional[Union[temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse, temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse]] = None,
)@property
def id(self) -> str:
"""ID of the workflow."""
@property
def run_id(self) -> Optional[str]:
"""Run ID used for most calls on this handle."""
@property
def result_run_id(self) -> Optional[str]:
"""Run ID used for result calls on this handle."""
@property
def first_execution_run_id(self) -> Optional[str]:
"""Run ID of the first execution used for cancel and terminate calls."""async def result(
self,
*,
follow_runs: bool = True,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> ReturnTypeParameters:
follow_runs (bool): Whether to follow workflow runs if the workflow continues as newrpc_metadata (Mapping[str, str]): RPC metadata for this callrpc_timeout (Optional[timedelta]): Timeout for the RPC callReturns:
ReturnType: The workflow resultContinue-as-new workflows are handled automatically by the client when following runs in the result() method.
Example:
# Start a workflow
handle = await client.start_workflow(
MyWorkflow.run,
"input_data",
id="my-workflow-id",
task_queue="my-task-queue"
)
# Get the result (will follow continue-as-new workflows by default)
result = await handle.result()async def signal(
self,
signal: Union[MethodSyncOrAsyncNoParam[SelfType], str],
arg: Any = temporalio.common._arg_unset,
*,
args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> NoneParameters:
signal: The signal method to call or signal name as stringarg: Single argument for the signalargs (Sequence[Any]): Multiple arguments for the signalrpc_metadata (Mapping[str, str]): RPC metadata for this callrpc_timeout (Optional[timedelta]): Timeout for the RPC callExample:
# Signal a workflow using typed method
await handle.signal(MyWorkflow.my_signal, "signal_data")
# Signal a workflow using string name
await handle.signal("my_signal", "signal_data")async def query(
self,
query: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], str],
arg: Any = temporalio.common._arg_unset,
*,
args: Sequence[Any] = [],
reject_condition: Optional[temporalio.common.QueryRejectCondition] = None,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> AnyParameters:
query: The query method to call or query name as stringarg: Single argument for the queryargs (Sequence[Any]): Multiple arguments for the queryreject_condition (Optional[temporalio.common.QueryRejectCondition]): Condition for rejecting the queryrpc_metadata (Mapping[str, str]): RPC metadata for this callrpc_timeout (Optional[timedelta]): Timeout for the RPC callReturns:
Any: The query resultasync def execute_update(
self,
update: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], str],
arg: Any = temporalio.common._arg_unset,
*,
args: Sequence[Any] = [],
id: Optional[str] = None,
first_execution_run_id: Optional[str] = None,
wait_for_stage: WorkflowUpdateStage = WorkflowUpdateStage.COMPLETED,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> AnyParameters:
update: The update method to call or update name as stringarg: Single argument for the updateargs (Sequence[Any]): Multiple arguments for the updateid (Optional[str]): Update IDfirst_execution_run_id (Optional[str]): First execution run IDwait_for_stage (WorkflowUpdateStage): Stage to wait for before returningrpc_metadata (Mapping[str, str]): RPC metadata for this callrpc_timeout (Optional[timedelta]): Timeout for the RPC callReturns:
Any: The update resultasync def start_update(
self,
update: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], str],
arg: Any = temporalio.common._arg_unset,
*,
args: Sequence[Any] = [],
id: Optional[str] = None,
first_execution_run_id: Optional[str] = None,
wait_for_stage: WorkflowUpdateStage = WorkflowUpdateStage.ACCEPTED,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> WorkflowUpdateHandle[Any, Any]def get_update_handle(
self,
id: str,
*,
first_execution_run_id: Optional[str] = None,
result_type: Optional[Type] = None,
) -> WorkflowUpdateHandle[SelfType, Any]def get_update_handle_for(
self,
update: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], MethodSyncOrAsyncSingleParam[SelfType, Any, ReturnType]],
id: str,
*,
first_execution_run_id: Optional[str] = None,
) -> WorkflowUpdateHandle[SelfType, ReturnType]class WorkflowUpdateHandle(Generic[SelfType, ReturnType]):
"""Handle for interacting with a workflow update."""
@property
def id(self) -> str:
"""ID of the update."""
@property
def workflow_id(self) -> str:
"""Workflow ID this update is for."""
@property
def workflow_run_id(self) -> Optional[str]:
"""Run ID of the workflow this update is for."""
async def result(
self,
*,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> ReturnType:
"""Get the result of the update."""class WorkflowUpdateStage(IntEnum):
"""Stage of workflow update execution."""
ADMITTED = 1
ACCEPTED = 2
COMPLETED = 3async def cancel(
self,
*,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> NoneParameters:
rpc_metadata (Mapping[str, str]): RPC metadata for this callrpc_timeout (Optional[timedelta]): Timeout for the RPC callasync def terminate(
self,
*,
reason: Optional[str] = None,
details: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> NoneParameters:
reason (Optional[str]): Reason for terminating the workflowdetails (Sequence[Any]): Additional details about the terminationrpc_metadata (Mapping[str, str]): RPC metadata for this callrpc_timeout (Optional[timedelta]): Timeout for the RPC callExample:
# Cancel a workflow
await handle.cancel()
# Terminate a workflow with reason
await handle.terminate(reason="Manual termination", details=["User requested"])async def create_schedule(
self,
id: str,
schedule: Schedule,
*,
trigger_immediately: bool = False,
backfill: Sequence[ScheduleBackfill] = [],
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> ScheduleHandleParameters:
id (str): Schedule IDschedule (Schedule): The schedule definitiontrigger_immediately (bool): Whether to trigger the schedule immediately upon creationbackfill (Sequence[ScheduleBackfill]): Backfill requests for the schedulememo (Optional[Mapping[str, Any]]): Memo to attach to the schedulesearch_attributes (Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]]): Search attributes for the schedulerpc_metadata (Mapping[str, str]): RPC metadata for this callrpc_timeout (Optional[timedelta]): Timeout for the RPC callReturns:
ScheduleHandle: Handle to the created scheduledef get_schedule_handle(self, id: str) -> ScheduleHandleParameters:
id (str): Schedule IDReturns:
ScheduleHandle: Handle to the scheduleclass ScheduleHandle:
"""Handle for interacting with a schedule.
This is usually created via Client.get_schedule_handle or
returned from Client.create_schedule.
"""
def __init__(self, client: Client, id: str) -> None:
"""Create schedule handle."""
@property
def id(self) -> str:
"""ID of the schedule."""async def backfill(
self,
*backfill: ScheduleBackfill,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> None:
"""Backfill the schedule."""
async def delete(
self,
*,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> None:
"""Delete the schedule."""
async def describe(
self,
*,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> ScheduleDescription:
"""Get description of the schedule."""
async def pause(
self,
*,
note: Optional[str] = None,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> None:
"""Pause the schedule."""
async def trigger(
self,
*,
overlap_policy: Optional[ScheduleOverlapPolicy] = None,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> None:
"""Manually trigger the schedule."""
async def unpause(
self,
*,
note: Optional[str] = None,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> None:
"""Unpause the schedule."""
async def update(
self,
updater: Callable[[ScheduleUpdateInput], None],
*,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> None:
"""Update the schedule."""@dataclass
class Schedule:
"""Complete schedule definition."""
action: ScheduleAction
spec: ScheduleSpec
policy: SchedulePolicy = dataclasses.field(default_factory=SchedulePolicy)
state: ScheduleState = dataclasses.field(default_factory=ScheduleState)@dataclass
class ScheduleSpec:
"""Specification for when to run a scheduled action."""
calendars: Sequence[ScheduleCalendarSpec] = ()
intervals: Sequence[ScheduleIntervalSpec] = ()
cron_expressions: Sequence[str] = ()
skip: Sequence[ScheduleCalendarSpec] = ()
start_at: Optional[datetime] = None
end_at: Optional[datetime] = None
jitter: Optional[timedelta] = None
time_zone_name: str = "UTC"class ScheduleAction(ABC):
"""Base class for schedule actions."""
@abstractmethod
def _to_proto(self) -> temporalio.api.schedule.v1.ScheduleAction:
"""Convert to proto."""@dataclass
class ScheduleActionStartWorkflow(ScheduleAction):
"""Schedule action that starts a workflow."""
workflow: str
args: Sequence[Any] = ()
id: str = ""
task_queue: str = ""
execution_timeout: Optional[timedelta] = None
run_timeout: Optional[timedelta] = None
task_timeout: Optional[timedelta] = None
retry_policy: Optional[temporalio.common.RetryPolicy] = None
memo: Optional[Mapping[str, Any]] = None
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None
headers: Optional[Mapping[str, temporalio.api.common.v1.Payload]] = None@dataclass
class SchedulePolicy:
"""General schedule policies."""
overlap: ScheduleOverlapPolicy = ScheduleOverlapPolicy.UNSPECIFIED
catchup_window: Optional[timedelta] = None
pause_on_failure: bool = Falseclass ScheduleOverlapPolicy(IntEnum):
"""Policy for overlapping schedule executions."""
UNSPECIFIED = 0
SKIP = 1
BUFFER_ONE = 2
BUFFER_ALL = 3
CANCEL_OTHER = 4
TERMINATE_OTHER = 5
ALLOW_ALL = 6Example:
import temporalio.client
from temporalio.client import ScheduleActionStartWorkflow, ScheduleSpec, Schedule, SchedulePolicy
# Create a schedule to run a workflow every hour
schedule = Schedule(
action=ScheduleActionStartWorkflow(
workflow="MyWorkflow",
args=["scheduled_data"],
id="scheduled-workflow-{{}}", # Double braces for template
task_queue="my-task-queue"
),
spec=ScheduleSpec(
intervals=[timedelta(hours=1)]
),
policy=SchedulePolicy()
)
# Create the schedule
schedule_handle = await client.create_schedule("my-schedule", schedule)
# Trigger the schedule manually
await schedule_handle.trigger()async def update_worker_build_id_compatibility(
self,
task_queue: str,
operation: BuildIdOp,
*,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> NoneParameters:
task_queue (str): Task queue to update build ID compatibility foroperation (BuildIdOp): Build ID operation to performrpc_metadata (Mapping[str, str]): RPC metadata for this callrpc_timeout (Optional[timedelta]): Timeout for the RPC callasync def get_worker_build_id_compatibility(
self,
task_queue: str,
*,
max_sets: int = 0,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> WorkerBuildIdVersionSetsParameters:
task_queue (str): Task queue to get build ID compatibility formax_sets (int): Maximum number of sets to return. 0 means no limitrpc_metadata (Mapping[str, str]): RPC metadata for this callrpc_timeout (Optional[timedelta]): Timeout for the RPC callReturns:
WorkerBuildIdVersionSets: Build ID version setsasync def get_worker_task_reachability(
self,
*,
build_ids: Sequence[str] = [],
task_queues: Sequence[str] = [],
reachability_type: WorkerTaskReachabilityType = WorkerTaskReachabilityType.UNSPECIFIED,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> WorkerTaskReachabilityParameters:
build_ids (Sequence[str]): Build IDs to check reachability fortask_queues (Sequence[str]): Task queues to check reachability forreachability_type (WorkerTaskReachabilityType): Type of reachability to checkrpc_metadata (Mapping[str, str]): RPC metadata for this callrpc_timeout (Optional[timedelta]): Timeout for the RPC callReturns:
WorkerTaskReachability: Task reachability information@dataclass
class WorkerBuildIdVersionSets:
"""Worker build ID version sets."""
version_sets: Sequence[BuildIdVersionSet]@dataclass
class BuildIdVersionSet:
"""A single build ID version set."""
build_ids: Sequence[str]
is_default: boolclass BuildIdOp(ABC):
"""Base class for build ID operations."""
@abstractmethod
def _to_proto(self) -> temporalio.api.taskqueue.v1.BuildIdOp:
"""Convert to proto."""@dataclass
class WorkerTaskReachability:
"""Worker task reachability information."""
build_id_reachability: Mapping[str, BuildIdReachability]
task_queue_reachability: Mapping[str, TaskQueueReachability]Version management is handled through the build ID compatibility system. Workers can be updated to new versions while maintaining compatibility with existing workflows.
Example:
# Update build ID compatibility to add a new version
from temporalio.client import AddNewIdInNewDefaultSet
await client.update_worker_build_id_compatibility(
"my-task-queue",
AddNewIdInNewDefaultSet("build-id-v2")
)
# Get current build ID compatibility
compatibility = await client.get_worker_build_id_compatibility("my-task-queue")class AsyncActivityHandle:
"""Handle representing an external activity for completion and heartbeat."""
def __init__(
self,
client: Client,
id_or_token: Union[AsyncActivityIDReference, bytes]
) -> None:
"""Create an async activity handle."""async def heartbeat(
self,
*details: Any,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> None:
"""Send a heartbeat for the activity."""
async def complete(
self,
result: Any = None,
*,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> None:
"""Complete the activity with a result."""
async def fail(
self,
exception: BaseException,
*,
last_heartbeat_details: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> None:
"""Fail the activity with an exception."""
async def report_cancellation(
self,
*details: Any,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> None:
"""Report that the activity was cancelled."""def get_async_activity_handle(
self,
*,
activity_id: str,
task_queue: str,
workflow_namespace: Optional[str] = None,
) -> AsyncActivityHandleParameters:
activity_id (str): ID of the async activitytask_queue (str): Task queue the activity is running onworkflow_namespace (Optional[str]): Workflow namespace if different from client namespaceReturns:
AsyncActivityHandle: Handle to the async activitydef get_async_activity_handle(
self,
*,
task_token: bytes
) -> AsyncActivityHandleParameters:
task_token (bytes): Task token for the async activityReturns:
AsyncActivityHandle: Handle to the async activity@dataclass
class AsyncActivityIDReference:
"""Reference to an async activity by ID."""
activity_id: str
task_queue: str
workflow_namespace: Optional[str] = NoneAsync activities can be completed or failed from external processes using the activity handle.
Example:
# Get an async activity handle by ID
activity_handle = client.get_async_activity_handle(
activity_id="my-activity-123",
task_queue="my-task-queue"
)
# Send a heartbeat
await activity_handle.heartbeat("Still processing...")
# Complete the activity
await activity_handle.complete("Activity completed successfully")
# Or fail the activity
try:
# Some processing that might fail
pass
except Exception as e:
await activity_handle.fail(e)Example with task token:
# Get an async activity handle by task token
activity_handle = client.get_async_activity_handle(task_token=task_token_bytes)
# Report cancellation
await activity_handle.report_cancellation("User cancelled the operation")Install with Tessl CLI
npx tessl i tessl/pypi-temporalio