CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-temporalio

Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.

Overview
Eval results
Files

client.mddocs/

Client Operations

The temporalio.client module provides the primary interface for connecting to Temporal servers and managing workflows, schedules, and activities. This module contains the main Client class that serves as the entry point for all client-side operations.

Client Connection and Configuration

Client Class

The Client class is the main interface for interacting with a Temporal server. It provides methods for workflow management, schedule operations, and activity handling.

class Client:
    """Client for accessing Temporal.

    Most users will use connect() to create a client. The service property provides
    access to a raw gRPC client. Clients are not thread-safe and should only be used
    in the event loop they are first connected in.

    Clients do not work across forks since runtimes do not work across forks.
    """

Connection Methods

Client.connect

@staticmethod
async def connect(
    target_host: str,
    *,
    namespace: str = "default",
    api_key: Optional[str] = None,
    data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
    plugins: Sequence[Plugin] = [],
    interceptors: Sequence[Interceptor] = [],
    default_workflow_query_reject_condition: Optional[temporalio.common.QueryRejectCondition] = None,
    tls: Union[bool, TLSConfig] = False,
    retry_config: Optional[RetryConfig] = None,
    keep_alive_config: Optional[KeepAliveConfig] = KeepAliveConfig.default,
    rpc_metadata: Mapping[str, str] = {},
    identity: Optional[str] = None,
    lazy: bool = False,
    runtime: Optional[temporalio.runtime.Runtime] = None,
    http_connect_proxy_config: Optional[HttpConnectProxyConfig] = None,
    header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC,
) -> Client

Parameters:

  • target_host (str): host:port for the Temporal server. For local development, this is often "localhost:7233"
  • namespace (str): Namespace to use for client calls. Default: "default"
  • api_key (Optional[str]): API key for Temporal. This becomes the "Authorization" HTTP header with "Bearer " prepended
  • data_converter (temporalio.converter.DataConverter): Data converter to use for all data conversions to/from payloads
  • plugins (Sequence[Plugin]): Set of plugins that are chained together to allow intercepting and modifying client creation and service connection
  • interceptors (Sequence[Interceptor]): Set of interceptors that are chained together to allow intercepting of client calls
  • default_workflow_query_reject_condition (Optional[temporalio.common.QueryRejectCondition]): The default rejection condition for workflow queries
  • tls (Union[bool, TLSConfig]): TLS configuration. If False, do not use TLS. If True, use system default TLS configuration
  • retry_config (Optional[RetryConfig]): Retry configuration for direct service calls or all high-level calls
  • keep_alive_config (Optional[KeepAliveConfig]): Keep-alive configuration for the client connection
  • rpc_metadata (Mapping[str, str]): Headers to use for all calls to the server
  • identity (Optional[str]): Identity for this client. If unset, a default is created based on the version of the SDK
  • lazy (bool): If true, the client will not connect until the first call is attempted
  • runtime (Optional[temporalio.runtime.Runtime]): The runtime for this client, or the default if unset
  • http_connect_proxy_config (Optional[HttpConnectProxyConfig]): Configuration for HTTP CONNECT proxy
  • header_codec_behavior (HeaderCodecBehavior): Encoding behavior for headers sent by the client

Returns:

  • Client: Connected Temporal client

Example:

import temporalio.client

# Connect to local Temporal server
client = await temporalio.client.Client.connect("localhost:7233")

# Connect with TLS and authentication
client = await temporalio.client.Client.connect(
    "my-temporal.example.com:7233",
    namespace="production",
    api_key="my-api-key",
    tls=True
)

ClientConfig

class ClientConfig(TypedDict, total=False):
    """TypedDict of config originally passed to Client."""

    service_client: Required[temporalio.service.ServiceClient]
    namespace: Required[str]
    data_converter: Required[temporalio.converter.DataConverter]
    interceptors: Required[Sequence[Interceptor]]
    default_workflow_query_reject_condition: Required[Optional[temporalio.common.QueryRejectCondition]]
    header_codec_behavior: Required[HeaderCodecBehavior]
    plugins: Required[Sequence[Plugin]]

Client Properties

@property
def service_client(self) -> temporalio.service.ServiceClient:
    """Raw gRPC service client."""

@property
def workflow_service(self) -> temporalio.service.WorkflowService:
    """Raw gRPC workflow service client."""

@property
def operator_service(self) -> temporalio.service.OperatorService:
    """Raw gRPC operator service client."""

@property
def test_service(self) -> temporalio.service.TestService:
    """Raw gRPC test service client."""

@property
def namespace(self) -> str:
    """Namespace used in calls by this client."""

@property
def identity(self) -> str:
    """Identity used in calls by this client."""

@property
def data_converter(self) -> temporalio.converter.DataConverter:
    """Data converter used by this client."""

@property
def rpc_metadata(self) -> Mapping[str, str]:
    """Headers for every call made by this client."""

@rpc_metadata.setter
def rpc_metadata(self, value: Mapping[str, str]) -> None:
    """Update the headers for this client."""

@property
def api_key(self) -> Optional[str]:
    """API key for every call made by this client."""

@api_key.setter
def api_key(self, value: Optional[str]) -> None:
    """Update the API key for this client."""

Workflow Management

Starting Workflows

The start_workflow method is used to start new workflow executions. It has several overloads to support different calling patterns.

start_workflow (typed method - no parameters)

async def start_workflow(
    self,
    workflow: MethodAsyncNoParam[SelfType, ReturnType],
    *,
    id: str,
    task_queue: str,
    execution_timeout: Optional[timedelta] = None,
    run_timeout: Optional[timedelta] = None,
    task_timeout: Optional[timedelta] = None,
    id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
    id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cron_schedule: str = "",
    memo: Optional[Mapping[str, Any]] = None,
    search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
    static_summary: Optional[str] = None,
    static_details: Optional[str] = None,
    start_delay: Optional[timedelta] = None,
    start_signal: Optional[str] = None,
    start_signal_args: Sequence[Any] = [],
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
    request_eager_start: bool = False,
    priority: temporalio.common.Priority = temporalio.common.Priority.default,
    versioning_override: Optional[temporalio.common.VersioningOverride] = None,
) -> WorkflowHandle[SelfType, ReturnType]

start_workflow (typed method - single parameter)

async def start_workflow(
    self,
    workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType],
    arg: ParamType,
    *,
    id: str,
    task_queue: str,
    execution_timeout: Optional[timedelta] = None,
    run_timeout: Optional[timedelta] = None,
    task_timeout: Optional[timedelta] = None,
    id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
    id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cron_schedule: str = "",
    memo: Optional[Mapping[str, Any]] = None,
    search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
    static_summary: Optional[str] = None,
    static_details: Optional[str] = None,
    start_delay: Optional[timedelta] = None,
    start_signal: Optional[str] = None,
    start_signal_args: Sequence[Any] = [],
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
    request_eager_start: bool = False,
    priority: temporalio.common.Priority = temporalio.common.Priority.default,
    versioning_override: Optional[temporalio.common.VersioningOverride] = None,
) -> WorkflowHandle[SelfType, ReturnType]

start_workflow (typed method - multiple parameters)

async def start_workflow(
    self,
    workflow: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]],
    *,
    args: Sequence[Any],
    id: str,
    task_queue: str,
    execution_timeout: Optional[timedelta] = None,
    run_timeout: Optional[timedelta] = None,
    task_timeout: Optional[timedelta] = None,
    id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
    id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cron_schedule: str = "",
    memo: Optional[Mapping[str, Any]] = None,
    search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
    static_summary: Optional[str] = None,
    static_details: Optional[str] = None,
    start_delay: Optional[timedelta] = None,
    start_signal: Optional[str] = None,
    start_signal_args: Sequence[Any] = [],
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
    request_eager_start: bool = False,
    priority: temporalio.common.Priority = temporalio.common.Priority.default,
    versioning_override: Optional[temporalio.common.VersioningOverride] = None,
) -> WorkflowHandle[SelfType, ReturnType]

start_workflow (string name)

async def start_workflow(
    self,
    workflow: str,
    arg: Any = temporalio.common._arg_unset,
    *,
    args: Sequence[Any] = [],
    id: str,
    task_queue: str,
    result_type: Optional[Type] = None,
    execution_timeout: Optional[timedelta] = None,
    run_timeout: Optional[timedelta] = None,
    task_timeout: Optional[timedelta] = None,
    id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
    id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cron_schedule: str = "",
    memo: Optional[Mapping[str, Any]] = None,
    search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
    static_summary: Optional[str] = None,
    static_details: Optional[str] = None,
    start_delay: Optional[timedelta] = None,
    start_signal: Optional[str] = None,
    start_signal_args: Sequence[Any] = [],
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
    request_eager_start: bool = False,
    priority: temporalio.common.Priority = temporalio.common.Priority.default,
    versioning_override: Optional[temporalio.common.VersioningOverride] = None,
) -> WorkflowHandle[Any, Any]

Common Parameters:

  • workflow: The workflow to start (method reference or string name)
  • id (str): Unique workflow ID
  • task_queue (str): Task queue to run the workflow on
  • execution_timeout (Optional[timedelta]): Timeout for the entire workflow execution
  • run_timeout (Optional[timedelta]): Timeout for a single workflow run
  • task_timeout (Optional[timedelta]): Timeout for individual workflow tasks
  • id_reuse_policy (temporalio.common.WorkflowIDReusePolicy): Policy for reusing workflow IDs
  • id_conflict_policy (temporalio.common.WorkflowIDConflictPolicy): Policy for handling workflow ID conflicts
  • retry_policy (Optional[temporalio.common.RetryPolicy]): Retry policy for the workflow
  • cron_schedule (str): Cron schedule for recurring workflows
  • memo (Optional[Mapping[str, Any]]): Memo to attach to the workflow
  • search_attributes (Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]]): Search attributes for the workflow
  • static_summary (Optional[str]): Static summary for the workflow
  • static_details (Optional[str]): Static details for the workflow
  • start_delay (Optional[timedelta]): Delay before starting the workflow
  • start_signal (Optional[str]): Signal to send when starting the workflow
  • start_signal_args (Sequence[Any]): Arguments for the start signal
  • rpc_metadata (Mapping[str, str]): RPC metadata for this call
  • rpc_timeout (Optional[timedelta]): Timeout for the RPC call
  • request_eager_start (bool): Request eager workflow start
  • priority (temporalio.common.Priority): Workflow priority
  • versioning_override (Optional[temporalio.common.VersioningOverride]): Versioning override for the workflow

Returns:

  • WorkflowHandle: Handle to the started workflow

Executing Workflows

Similar to starting workflows, the client provides execute_workflow methods that start a workflow and wait for its result.

async def execute_workflow(
    self,
    workflow: Union[str, Callable[..., Awaitable[Any]]],
    arg: Any = temporalio.common._arg_unset,
    *,
    args: Sequence[Any] = [],
    id: str,
    task_queue: str,
    result_type: Optional[Type] = None,
    execution_timeout: Optional[timedelta] = None,
    run_timeout: Optional[timedelta] = None,
    task_timeout: Optional[timedelta] = None,
    id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
    id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
    retry_policy: Optional[temporalio.common.RetryPolicy] = None,
    cron_schedule: str = "",
    memo: Optional[Mapping[str, Any]] = None,
    search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
    static_summary: Optional[str] = None,
    static_details: Optional[str] = None,
    start_delay: Optional[timedelta] = None,
    start_signal: Optional[str] = None,
    start_signal_args: Sequence[Any] = [],
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
    request_eager_start: bool = False,
    priority: temporalio.common.Priority = temporalio.common.Priority.default,
    versioning_override: Optional[temporalio.common.VersioningOverride] = None,
) -> Any

Getting Workflow Handles

get_workflow_handle

def get_workflow_handle(
    self,
    workflow_id: str,
    *,
    run_id: Optional[str] = None,
    first_execution_run_id: Optional[str] = None,
    result_type: Optional[Type] = None,
) -> WorkflowHandle[Any, Any]

Parameters:

  • workflow_id (str): Workflow ID to get a handle to
  • run_id (Optional[str]): Run ID that will be used for all calls
  • first_execution_run_id (Optional[str]): First execution run ID used for cancellation and termination
  • result_type (Optional[Type]): The result type to deserialize into if known

Returns:

  • WorkflowHandle[Any, Any]: Handle to the workflow

get_workflow_handle_for

def get_workflow_handle_for(
    self,
    workflow: Union[MethodAsyncNoParam[SelfType, ReturnType], MethodAsyncSingleParam[SelfType, Any, ReturnType]],
    workflow_id: str,
    *,
    run_id: Optional[str] = None,
    first_execution_run_id: Optional[str] = None,
) -> WorkflowHandle[SelfType, ReturnType]

Parameters:

  • workflow: The workflow run method to use for typing the handle
  • workflow_id (str): Workflow ID to get a handle to
  • run_id (Optional[str]): Run ID that will be used for all calls
  • first_execution_run_id (Optional[str]): First execution run ID used for cancellation and termination

Returns:

  • WorkflowHandle[SelfType, ReturnType]: Typed handle to the workflow

WorkflowHandle

class WorkflowHandle(Generic[SelfType, ReturnType]):
    """Handle for interacting with a workflow.

    This is usually created via Client.get_workflow_handle or
    returned from Client.start_workflow.
    """

    def __init__(
        self,
        client: Client,
        id: str,
        *,
        run_id: Optional[str] = None,
        result_run_id: Optional[str] = None,
        first_execution_run_id: Optional[str] = None,
        result_type: Optional[Type] = None,
        start_workflow_response: Optional[Union[temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse, temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse]] = None,
    )

WorkflowHandle Properties

@property
def id(self) -> str:
    """ID of the workflow."""

@property
def run_id(self) -> Optional[str]:
    """Run ID used for most calls on this handle."""

@property
def result_run_id(self) -> Optional[str]:
    """Run ID used for result calls on this handle."""

@property
def first_execution_run_id(self) -> Optional[str]:
    """Run ID of the first execution used for cancel and terminate calls."""

WorkflowHandle.result

async def result(
    self,
    *,
    follow_runs: bool = True,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> ReturnType

Parameters:

  • follow_runs (bool): Whether to follow workflow runs if the workflow continues as new
  • rpc_metadata (Mapping[str, str]): RPC metadata for this call
  • rpc_timeout (Optional[timedelta]): Timeout for the RPC call

Returns:

  • ReturnType: The workflow result

Continue-as-New Workflows

Continue-as-new workflows are handled automatically by the client when following runs in the result() method.

Example:

# Start a workflow
handle = await client.start_workflow(
    MyWorkflow.run,
    "input_data",
    id="my-workflow-id",
    task_queue="my-task-queue"
)

# Get the result (will follow continue-as-new workflows by default)
result = await handle.result()

Workflow Interaction

Signaling Workflows

WorkflowHandle.signal

async def signal(
    self,
    signal: Union[MethodSyncOrAsyncNoParam[SelfType], str],
    arg: Any = temporalio.common._arg_unset,
    *,
    args: Sequence[Any] = [],
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> None

Parameters:

  • signal: The signal method to call or signal name as string
  • arg: Single argument for the signal
  • args (Sequence[Any]): Multiple arguments for the signal
  • rpc_metadata (Mapping[str, str]): RPC metadata for this call
  • rpc_timeout (Optional[timedelta]): Timeout for the RPC call

Example:

# Signal a workflow using typed method
await handle.signal(MyWorkflow.my_signal, "signal_data")

# Signal a workflow using string name
await handle.signal("my_signal", "signal_data")

Querying Workflows

WorkflowHandle.query

async def query(
    self,
    query: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], str],
    arg: Any = temporalio.common._arg_unset,
    *,
    args: Sequence[Any] = [],
    reject_condition: Optional[temporalio.common.QueryRejectCondition] = None,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> Any

Parameters:

  • query: The query method to call or query name as string
  • arg: Single argument for the query
  • args (Sequence[Any]): Multiple arguments for the query
  • reject_condition (Optional[temporalio.common.QueryRejectCondition]): Condition for rejecting the query
  • rpc_metadata (Mapping[str, str]): RPC metadata for this call
  • rpc_timeout (Optional[timedelta]): Timeout for the RPC call

Returns:

  • Any: The query result

Workflow Updates

Executing Updates

async def execute_update(
    self,
    update: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], str],
    arg: Any = temporalio.common._arg_unset,
    *,
    args: Sequence[Any] = [],
    id: Optional[str] = None,
    first_execution_run_id: Optional[str] = None,
    wait_for_stage: WorkflowUpdateStage = WorkflowUpdateStage.COMPLETED,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> Any

Parameters:

  • update: The update method to call or update name as string
  • arg: Single argument for the update
  • args (Sequence[Any]): Multiple arguments for the update
  • id (Optional[str]): Update ID
  • first_execution_run_id (Optional[str]): First execution run ID
  • wait_for_stage (WorkflowUpdateStage): Stage to wait for before returning
  • rpc_metadata (Mapping[str, str]): RPC metadata for this call
  • rpc_timeout (Optional[timedelta]): Timeout for the RPC call

Returns:

  • Any: The update result

Starting Updates

async def start_update(
    self,
    update: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], str],
    arg: Any = temporalio.common._arg_unset,
    *,
    args: Sequence[Any] = [],
    id: Optional[str] = None,
    first_execution_run_id: Optional[str] = None,
    wait_for_stage: WorkflowUpdateStage = WorkflowUpdateStage.ACCEPTED,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> WorkflowUpdateHandle[Any, Any]

Getting Update Handles

def get_update_handle(
    self,
    id: str,
    *,
    first_execution_run_id: Optional[str] = None,
    result_type: Optional[Type] = None,
) -> WorkflowUpdateHandle[SelfType, Any]
def get_update_handle_for(
    self,
    update: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], MethodSyncOrAsyncSingleParam[SelfType, Any, ReturnType]],
    id: str,
    *,
    first_execution_run_id: Optional[str] = None,
) -> WorkflowUpdateHandle[SelfType, ReturnType]

WorkflowUpdateHandle

class WorkflowUpdateHandle(Generic[SelfType, ReturnType]):
    """Handle for interacting with a workflow update."""

    @property
    def id(self) -> str:
        """ID of the update."""

    @property
    def workflow_id(self) -> str:
        """Workflow ID this update is for."""

    @property
    def workflow_run_id(self) -> Optional[str]:
        """Run ID of the workflow this update is for."""

    async def result(
        self,
        *,
        rpc_metadata: Mapping[str, str] = {},
        rpc_timeout: Optional[timedelta] = None,
    ) -> ReturnType:
        """Get the result of the update."""

WorkflowUpdateStage

class WorkflowUpdateStage(IntEnum):
    """Stage of workflow update execution."""

    ADMITTED = 1
    ACCEPTED = 2
    COMPLETED = 3

Canceling and Terminating Workflows

WorkflowHandle.cancel

async def cancel(
    self,
    *,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> None

Parameters:

  • rpc_metadata (Mapping[str, str]): RPC metadata for this call
  • rpc_timeout (Optional[timedelta]): Timeout for the RPC call

WorkflowHandle.terminate

async def terminate(
    self,
    *,
    reason: Optional[str] = None,
    details: Sequence[Any] = [],
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> None

Parameters:

  • reason (Optional[str]): Reason for terminating the workflow
  • details (Sequence[Any]): Additional details about the termination
  • rpc_metadata (Mapping[str, str]): RPC metadata for this call
  • rpc_timeout (Optional[timedelta]): Timeout for the RPC call

Example:

# Cancel a workflow
await handle.cancel()

# Terminate a workflow with reason
await handle.terminate(reason="Manual termination", details=["User requested"])

Schedule Management

Creating and Managing Schedules

Client.create_schedule

async def create_schedule(
    self,
    id: str,
    schedule: Schedule,
    *,
    trigger_immediately: bool = False,
    backfill: Sequence[ScheduleBackfill] = [],
    memo: Optional[Mapping[str, Any]] = None,
    search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> ScheduleHandle

Parameters:

  • id (str): Schedule ID
  • schedule (Schedule): The schedule definition
  • trigger_immediately (bool): Whether to trigger the schedule immediately upon creation
  • backfill (Sequence[ScheduleBackfill]): Backfill requests for the schedule
  • memo (Optional[Mapping[str, Any]]): Memo to attach to the schedule
  • search_attributes (Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]]): Search attributes for the schedule
  • rpc_metadata (Mapping[str, str]): RPC metadata for this call
  • rpc_timeout (Optional[timedelta]): Timeout for the RPC call

Returns:

  • ScheduleHandle: Handle to the created schedule

Client.get_schedule_handle

def get_schedule_handle(self, id: str) -> ScheduleHandle

Parameters:

  • id (str): Schedule ID

Returns:

  • ScheduleHandle: Handle to the schedule

ScheduleHandle

class ScheduleHandle:
    """Handle for interacting with a schedule.

    This is usually created via Client.get_schedule_handle or
    returned from Client.create_schedule.
    """

    def __init__(self, client: Client, id: str) -> None:
        """Create schedule handle."""

    @property
    def id(self) -> str:
        """ID of the schedule."""

ScheduleHandle Operations

async def backfill(
    self,
    *backfill: ScheduleBackfill,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> None:
    """Backfill the schedule."""

async def delete(
    self,
    *,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> None:
    """Delete the schedule."""

async def describe(
    self,
    *,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> ScheduleDescription:
    """Get description of the schedule."""

async def pause(
    self,
    *,
    note: Optional[str] = None,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> None:
    """Pause the schedule."""

async def trigger(
    self,
    *,
    overlap_policy: Optional[ScheduleOverlapPolicy] = None,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> None:
    """Manually trigger the schedule."""

async def unpause(
    self,
    *,
    note: Optional[str] = None,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> None:
    """Unpause the schedule."""

async def update(
    self,
    updater: Callable[[ScheduleUpdateInput], None],
    *,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> None:
    """Update the schedule."""

Schedule Types

Schedule

@dataclass
class Schedule:
    """Complete schedule definition."""

    action: ScheduleAction
    spec: ScheduleSpec
    policy: SchedulePolicy = dataclasses.field(default_factory=SchedulePolicy)
    state: ScheduleState = dataclasses.field(default_factory=ScheduleState)

ScheduleSpec

@dataclass
class ScheduleSpec:
    """Specification for when to run a scheduled action."""

    calendars: Sequence[ScheduleCalendarSpec] = ()
    intervals: Sequence[ScheduleIntervalSpec] = ()
    cron_expressions: Sequence[str] = ()
    skip: Sequence[ScheduleCalendarSpec] = ()
    start_at: Optional[datetime] = None
    end_at: Optional[datetime] = None
    jitter: Optional[timedelta] = None
    time_zone_name: str = "UTC"

ScheduleAction

class ScheduleAction(ABC):
    """Base class for schedule actions."""

    @abstractmethod
    def _to_proto(self) -> temporalio.api.schedule.v1.ScheduleAction:
        """Convert to proto."""

ScheduleActionStartWorkflow

@dataclass
class ScheduleActionStartWorkflow(ScheduleAction):
    """Schedule action that starts a workflow."""

    workflow: str
    args: Sequence[Any] = ()
    id: str = ""
    task_queue: str = ""
    execution_timeout: Optional[timedelta] = None
    run_timeout: Optional[timedelta] = None
    task_timeout: Optional[timedelta] = None
    retry_policy: Optional[temporalio.common.RetryPolicy] = None
    memo: Optional[Mapping[str, Any]] = None
    search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None
    headers: Optional[Mapping[str, temporalio.api.common.v1.Payload]] = None

SchedulePolicy

@dataclass
class SchedulePolicy:
    """General schedule policies."""

    overlap: ScheduleOverlapPolicy = ScheduleOverlapPolicy.UNSPECIFIED
    catchup_window: Optional[timedelta] = None
    pause_on_failure: bool = False

ScheduleOverlapPolicy

class ScheduleOverlapPolicy(IntEnum):
    """Policy for overlapping schedule executions."""

    UNSPECIFIED = 0
    SKIP = 1
    BUFFER_ONE = 2
    BUFFER_ALL = 3
    CANCEL_OTHER = 4
    TERMINATE_OTHER = 5
    ALLOW_ALL = 6

Example:

import temporalio.client
from temporalio.client import ScheduleActionStartWorkflow, ScheduleSpec, Schedule, SchedulePolicy

# Create a schedule to run a workflow every hour
schedule = Schedule(
    action=ScheduleActionStartWorkflow(
        workflow="MyWorkflow",
        args=["scheduled_data"],
        id="scheduled-workflow-{{}}", # Double braces for template
        task_queue="my-task-queue"
    ),
    spec=ScheduleSpec(
        intervals=[timedelta(hours=1)]
    ),
    policy=SchedulePolicy()
)

# Create the schedule
schedule_handle = await client.create_schedule("my-schedule", schedule)

# Trigger the schedule manually
await schedule_handle.trigger()

Worker Build Management

Build ID Compatibility

Client.update_worker_build_id_compatibility

async def update_worker_build_id_compatibility(
    self,
    task_queue: str,
    operation: BuildIdOp,
    *,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> None

Parameters:

  • task_queue (str): Task queue to update build ID compatibility for
  • operation (BuildIdOp): Build ID operation to perform
  • rpc_metadata (Mapping[str, str]): RPC metadata for this call
  • rpc_timeout (Optional[timedelta]): Timeout for the RPC call

Client.get_worker_build_id_compatibility

async def get_worker_build_id_compatibility(
    self,
    task_queue: str,
    *,
    max_sets: int = 0,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> WorkerBuildIdVersionSets

Parameters:

  • task_queue (str): Task queue to get build ID compatibility for
  • max_sets (int): Maximum number of sets to return. 0 means no limit
  • rpc_metadata (Mapping[str, str]): RPC metadata for this call
  • rpc_timeout (Optional[timedelta]): Timeout for the RPC call

Returns:

  • WorkerBuildIdVersionSets: Build ID version sets

Task Reachability

Client.get_worker_task_reachability

async def get_worker_task_reachability(
    self,
    *,
    build_ids: Sequence[str] = [],
    task_queues: Sequence[str] = [],
    reachability_type: WorkerTaskReachabilityType = WorkerTaskReachabilityType.UNSPECIFIED,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> WorkerTaskReachability

Parameters:

  • build_ids (Sequence[str]): Build IDs to check reachability for
  • task_queues (Sequence[str]): Task queues to check reachability for
  • reachability_type (WorkerTaskReachabilityType): Type of reachability to check
  • rpc_metadata (Mapping[str, str]): RPC metadata for this call
  • rpc_timeout (Optional[timedelta]): Timeout for the RPC call

Returns:

  • WorkerTaskReachability: Task reachability information

Worker Build Types

WorkerBuildIdVersionSets

@dataclass
class WorkerBuildIdVersionSets:
    """Worker build ID version sets."""

    version_sets: Sequence[BuildIdVersionSet]

BuildIdVersionSet

@dataclass
class BuildIdVersionSet:
    """A single build ID version set."""

    build_ids: Sequence[str]
    is_default: bool

BuildIdOp

class BuildIdOp(ABC):
    """Base class for build ID operations."""

    @abstractmethod
    def _to_proto(self) -> temporalio.api.taskqueue.v1.BuildIdOp:
        """Convert to proto."""

WorkerTaskReachability

@dataclass
class WorkerTaskReachability:
    """Worker task reachability information."""

    build_id_reachability: Mapping[str, BuildIdReachability]
    task_queue_reachability: Mapping[str, TaskQueueReachability]

Version Management

Version management is handled through the build ID compatibility system. Workers can be updated to new versions while maintaining compatibility with existing workflows.

Example:

# Update build ID compatibility to add a new version
from temporalio.client import AddNewIdInNewDefaultSet

await client.update_worker_build_id_compatibility(
    "my-task-queue",
    AddNewIdInNewDefaultSet("build-id-v2")
)

# Get current build ID compatibility
compatibility = await client.get_worker_build_id_compatibility("my-task-queue")

Async Activity Management

AsyncActivityHandle

class AsyncActivityHandle:
    """Handle representing an external activity for completion and heartbeat."""

    def __init__(
        self,
        client: Client,
        id_or_token: Union[AsyncActivityIDReference, bytes]
    ) -> None:
        """Create an async activity handle."""

AsyncActivityHandle Methods

async def heartbeat(
    self,
    *details: Any,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> None:
    """Send a heartbeat for the activity."""

async def complete(
    self,
    result: Any = None,
    *,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> None:
    """Complete the activity with a result."""

async def fail(
    self,
    exception: BaseException,
    *,
    last_heartbeat_details: Sequence[Any] = [],
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> None:
    """Fail the activity with an exception."""

async def report_cancellation(
    self,
    *details: Any,
    rpc_metadata: Mapping[str, str] = {},
    rpc_timeout: Optional[timedelta] = None,
) -> None:
    """Report that the activity was cancelled."""

Getting Async Activity Handles

Client.get_async_activity_handle (by ID and task queue)

def get_async_activity_handle(
    self,
    *,
    activity_id: str,
    task_queue: str,
    workflow_namespace: Optional[str] = None,
) -> AsyncActivityHandle

Parameters:

  • activity_id (str): ID of the async activity
  • task_queue (str): Task queue the activity is running on
  • workflow_namespace (Optional[str]): Workflow namespace if different from client namespace

Returns:

  • AsyncActivityHandle: Handle to the async activity

Client.get_async_activity_handle (by task token)

def get_async_activity_handle(
    self,
    *,
    task_token: bytes
) -> AsyncActivityHandle

Parameters:

  • task_token (bytes): Task token for the async activity

Returns:

  • AsyncActivityHandle: Handle to the async activity

AsyncActivityIDReference

@dataclass
class AsyncActivityIDReference:
    """Reference to an async activity by ID."""

    activity_id: str
    task_queue: str
    workflow_namespace: Optional[str] = None

Activity Completion and Failure

Async activities can be completed or failed from external processes using the activity handle.

Example:

# Get an async activity handle by ID
activity_handle = client.get_async_activity_handle(
    activity_id="my-activity-123",
    task_queue="my-task-queue"
)

# Send a heartbeat
await activity_handle.heartbeat("Still processing...")

# Complete the activity
await activity_handle.complete("Activity completed successfully")

# Or fail the activity
try:
    # Some processing that might fail
    pass
except Exception as e:
    await activity_handle.fail(e)

Example with task token:

# Get an async activity handle by task token
activity_handle = client.get_async_activity_handle(task_token=task_token_bytes)

# Report cancellation
await activity_handle.report_cancellation("User cancelled the operation")

Install with Tessl CLI

npx tessl i tessl/pypi-temporalio

docs

activity.md

client.md

common.md

contrib-pydantic.md

data-conversion.md

exceptions.md

index.md

runtime.md

testing.md

worker.md

workflow.md

tile.json