0
# Client Operations
1
2
The `temporalio.client` module provides the primary interface for connecting to Temporal servers and managing workflows, schedules, and activities. This module contains the main `Client` class that serves as the entry point for all client-side operations.
3
4
## Client Connection and Configuration
5
6
### Client Class
7
8
The `Client` class is the main interface for interacting with a Temporal server. It provides methods for workflow management, schedule operations, and activity handling.
9
10
```python { .api }
11
class Client:
12
"""Client for accessing Temporal.
13
14
Most users will use connect() to create a client. The service property provides
15
access to a raw gRPC client. Clients are not thread-safe and should only be used
16
in the event loop they are first connected in.
17
18
Clients do not work across forks since runtimes do not work across forks.
19
"""
20
```
21
22
### Connection Methods
23
24
#### Client.connect
25
26
```python { .api }
27
@staticmethod
28
async def connect(
29
target_host: str,
30
*,
31
namespace: str = "default",
32
api_key: Optional[str] = None,
33
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
34
plugins: Sequence[Plugin] = [],
35
interceptors: Sequence[Interceptor] = [],
36
default_workflow_query_reject_condition: Optional[temporalio.common.QueryRejectCondition] = None,
37
tls: Union[bool, TLSConfig] = False,
38
retry_config: Optional[RetryConfig] = None,
39
keep_alive_config: Optional[KeepAliveConfig] = KeepAliveConfig.default,
40
rpc_metadata: Mapping[str, str] = {},
41
identity: Optional[str] = None,
42
lazy: bool = False,
43
runtime: Optional[temporalio.runtime.Runtime] = None,
44
http_connect_proxy_config: Optional[HttpConnectProxyConfig] = None,
45
header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC,
46
) -> Client
47
```
48
49
**Parameters:**
50
- `target_host` (str): `host:port` for the Temporal server. For local development, this is often "localhost:7233"
51
- `namespace` (str): Namespace to use for client calls. Default: "default"
52
- `api_key` (Optional[str]): API key for Temporal. This becomes the "Authorization" HTTP header with "Bearer " prepended
53
- `data_converter` (temporalio.converter.DataConverter): Data converter to use for all data conversions to/from payloads
54
- `plugins` (Sequence[Plugin]): Set of plugins that are chained together to allow intercepting and modifying client creation and service connection
55
- `interceptors` (Sequence[Interceptor]): Set of interceptors that are chained together to allow intercepting of client calls
56
- `default_workflow_query_reject_condition` (Optional[temporalio.common.QueryRejectCondition]): The default rejection condition for workflow queries
57
- `tls` (Union[bool, TLSConfig]): TLS configuration. If False, do not use TLS. If True, use system default TLS configuration
58
- `retry_config` (Optional[RetryConfig]): Retry configuration for direct service calls or all high-level calls
59
- `keep_alive_config` (Optional[KeepAliveConfig]): Keep-alive configuration for the client connection
60
- `rpc_metadata` (Mapping[str, str]): Headers to use for all calls to the server
61
- `identity` (Optional[str]): Identity for this client. If unset, a default is created based on the version of the SDK
62
- `lazy` (bool): If true, the client will not connect until the first call is attempted
63
- `runtime` (Optional[temporalio.runtime.Runtime]): The runtime for this client, or the default if unset
64
- `http_connect_proxy_config` (Optional[HttpConnectProxyConfig]): Configuration for HTTP CONNECT proxy
65
- `header_codec_behavior` (HeaderCodecBehavior): Encoding behavior for headers sent by the client
66
67
**Returns:**
68
- `Client`: Connected Temporal client
69
70
**Example:**
71
```python
72
import temporalio.client
73
74
# Connect to local Temporal server
75
client = await temporalio.client.Client.connect("localhost:7233")
76
77
# Connect with TLS and authentication
78
client = await temporalio.client.Client.connect(
79
"my-temporal.example.com:7233",
80
namespace="production",
81
api_key="my-api-key",
82
tls=True
83
)
84
```
85
86
### ClientConfig
87
88
```python { .api }
89
class ClientConfig(TypedDict, total=False):
90
"""TypedDict of config originally passed to Client."""
91
92
service_client: Required[temporalio.service.ServiceClient]
93
namespace: Required[str]
94
data_converter: Required[temporalio.converter.DataConverter]
95
interceptors: Required[Sequence[Interceptor]]
96
default_workflow_query_reject_condition: Required[Optional[temporalio.common.QueryRejectCondition]]
97
header_codec_behavior: Required[HeaderCodecBehavior]
98
plugins: Required[Sequence[Plugin]]
99
```
100
101
### Client Properties
102
103
```python { .api }
104
@property
105
def service_client(self) -> temporalio.service.ServiceClient:
106
"""Raw gRPC service client."""
107
108
@property
109
def workflow_service(self) -> temporalio.service.WorkflowService:
110
"""Raw gRPC workflow service client."""
111
112
@property
113
def operator_service(self) -> temporalio.service.OperatorService:
114
"""Raw gRPC operator service client."""
115
116
@property
117
def test_service(self) -> temporalio.service.TestService:
118
"""Raw gRPC test service client."""
119
120
@property
121
def namespace(self) -> str:
122
"""Namespace used in calls by this client."""
123
124
@property
125
def identity(self) -> str:
126
"""Identity used in calls by this client."""
127
128
@property
129
def data_converter(self) -> temporalio.converter.DataConverter:
130
"""Data converter used by this client."""
131
132
@property
133
def rpc_metadata(self) -> Mapping[str, str]:
134
"""Headers for every call made by this client."""
135
136
@rpc_metadata.setter
137
def rpc_metadata(self, value: Mapping[str, str]) -> None:
138
"""Update the headers for this client."""
139
140
@property
141
def api_key(self) -> Optional[str]:
142
"""API key for every call made by this client."""
143
144
@api_key.setter
145
def api_key(self, value: Optional[str]) -> None:
146
"""Update the API key for this client."""
147
```
148
149
## Workflow Management
150
151
### Starting Workflows
152
153
The `start_workflow` method is used to start new workflow executions. It has several overloads to support different calling patterns.
154
155
#### start_workflow (typed method - no parameters)
156
157
```python { .api }
158
async def start_workflow(
159
self,
160
workflow: MethodAsyncNoParam[SelfType, ReturnType],
161
*,
162
id: str,
163
task_queue: str,
164
execution_timeout: Optional[timedelta] = None,
165
run_timeout: Optional[timedelta] = None,
166
task_timeout: Optional[timedelta] = None,
167
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
168
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
169
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
170
cron_schedule: str = "",
171
memo: Optional[Mapping[str, Any]] = None,
172
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
173
static_summary: Optional[str] = None,
174
static_details: Optional[str] = None,
175
start_delay: Optional[timedelta] = None,
176
start_signal: Optional[str] = None,
177
start_signal_args: Sequence[Any] = [],
178
rpc_metadata: Mapping[str, str] = {},
179
rpc_timeout: Optional[timedelta] = None,
180
request_eager_start: bool = False,
181
priority: temporalio.common.Priority = temporalio.common.Priority.default,
182
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
183
) -> WorkflowHandle[SelfType, ReturnType]
184
```
185
186
#### start_workflow (typed method - single parameter)
187
188
```python { .api }
189
async def start_workflow(
190
self,
191
workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType],
192
arg: ParamType,
193
*,
194
id: str,
195
task_queue: str,
196
execution_timeout: Optional[timedelta] = None,
197
run_timeout: Optional[timedelta] = None,
198
task_timeout: Optional[timedelta] = None,
199
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
200
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
201
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
202
cron_schedule: str = "",
203
memo: Optional[Mapping[str, Any]] = None,
204
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
205
static_summary: Optional[str] = None,
206
static_details: Optional[str] = None,
207
start_delay: Optional[timedelta] = None,
208
start_signal: Optional[str] = None,
209
start_signal_args: Sequence[Any] = [],
210
rpc_metadata: Mapping[str, str] = {},
211
rpc_timeout: Optional[timedelta] = None,
212
request_eager_start: bool = False,
213
priority: temporalio.common.Priority = temporalio.common.Priority.default,
214
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
215
) -> WorkflowHandle[SelfType, ReturnType]
216
```
217
218
#### start_workflow (typed method - multiple parameters)
219
220
```python { .api }
221
async def start_workflow(
222
self,
223
workflow: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]],
224
*,
225
args: Sequence[Any],
226
id: str,
227
task_queue: str,
228
execution_timeout: Optional[timedelta] = None,
229
run_timeout: Optional[timedelta] = None,
230
task_timeout: Optional[timedelta] = None,
231
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
232
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
233
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
234
cron_schedule: str = "",
235
memo: Optional[Mapping[str, Any]] = None,
236
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
237
static_summary: Optional[str] = None,
238
static_details: Optional[str] = None,
239
start_delay: Optional[timedelta] = None,
240
start_signal: Optional[str] = None,
241
start_signal_args: Sequence[Any] = [],
242
rpc_metadata: Mapping[str, str] = {},
243
rpc_timeout: Optional[timedelta] = None,
244
request_eager_start: bool = False,
245
priority: temporalio.common.Priority = temporalio.common.Priority.default,
246
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
247
) -> WorkflowHandle[SelfType, ReturnType]
248
```
249
250
#### start_workflow (string name)
251
252
```python { .api }
253
async def start_workflow(
254
self,
255
workflow: str,
256
arg: Any = temporalio.common._arg_unset,
257
*,
258
args: Sequence[Any] = [],
259
id: str,
260
task_queue: str,
261
result_type: Optional[Type] = None,
262
execution_timeout: Optional[timedelta] = None,
263
run_timeout: Optional[timedelta] = None,
264
task_timeout: Optional[timedelta] = None,
265
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
266
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
267
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
268
cron_schedule: str = "",
269
memo: Optional[Mapping[str, Any]] = None,
270
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
271
static_summary: Optional[str] = None,
272
static_details: Optional[str] = None,
273
start_delay: Optional[timedelta] = None,
274
start_signal: Optional[str] = None,
275
start_signal_args: Sequence[Any] = [],
276
rpc_metadata: Mapping[str, str] = {},
277
rpc_timeout: Optional[timedelta] = None,
278
request_eager_start: bool = False,
279
priority: temporalio.common.Priority = temporalio.common.Priority.default,
280
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
281
) -> WorkflowHandle[Any, Any]
282
```
283
284
**Common Parameters:**
285
- `workflow`: The workflow to start (method reference or string name)
286
- `id` (str): Unique workflow ID
287
- `task_queue` (str): Task queue to run the workflow on
288
- `execution_timeout` (Optional[timedelta]): Timeout for the entire workflow execution
289
- `run_timeout` (Optional[timedelta]): Timeout for a single workflow run
290
- `task_timeout` (Optional[timedelta]): Timeout for individual workflow tasks
291
- `id_reuse_policy` (temporalio.common.WorkflowIDReusePolicy): Policy for reusing workflow IDs
292
- `id_conflict_policy` (temporalio.common.WorkflowIDConflictPolicy): Policy for handling workflow ID conflicts
293
- `retry_policy` (Optional[temporalio.common.RetryPolicy]): Retry policy for the workflow
294
- `cron_schedule` (str): Cron schedule for recurring workflows
295
- `memo` (Optional[Mapping[str, Any]]): Memo to attach to the workflow
296
- `search_attributes` (Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]]): Search attributes for the workflow
297
- `static_summary` (Optional[str]): Static summary for the workflow
298
- `static_details` (Optional[str]): Static details for the workflow
299
- `start_delay` (Optional[timedelta]): Delay before starting the workflow
300
- `start_signal` (Optional[str]): Signal to send when starting the workflow
301
- `start_signal_args` (Sequence[Any]): Arguments for the start signal
302
- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call
303
- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call
304
- `request_eager_start` (bool): Request eager workflow start
305
- `priority` (temporalio.common.Priority): Workflow priority
306
- `versioning_override` (Optional[temporalio.common.VersioningOverride]): Versioning override for the workflow
307
308
**Returns:**
309
- `WorkflowHandle`: Handle to the started workflow
310
311
### Executing Workflows
312
313
Similar to starting workflows, the client provides `execute_workflow` methods that start a workflow and wait for its result.
314
315
```python { .api }
316
async def execute_workflow(
317
self,
318
workflow: Union[str, Callable[..., Awaitable[Any]]],
319
arg: Any = temporalio.common._arg_unset,
320
*,
321
args: Sequence[Any] = [],
322
id: str,
323
task_queue: str,
324
result_type: Optional[Type] = None,
325
execution_timeout: Optional[timedelta] = None,
326
run_timeout: Optional[timedelta] = None,
327
task_timeout: Optional[timedelta] = None,
328
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
329
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
330
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
331
cron_schedule: str = "",
332
memo: Optional[Mapping[str, Any]] = None,
333
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
334
static_summary: Optional[str] = None,
335
static_details: Optional[str] = None,
336
start_delay: Optional[timedelta] = None,
337
start_signal: Optional[str] = None,
338
start_signal_args: Sequence[Any] = [],
339
rpc_metadata: Mapping[str, str] = {},
340
rpc_timeout: Optional[timedelta] = None,
341
request_eager_start: bool = False,
342
priority: temporalio.common.Priority = temporalio.common.Priority.default,
343
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
344
) -> Any
345
```
346
347
### Getting Workflow Handles
348
349
#### get_workflow_handle
350
351
```python { .api }
352
def get_workflow_handle(
353
self,
354
workflow_id: str,
355
*,
356
run_id: Optional[str] = None,
357
first_execution_run_id: Optional[str] = None,
358
result_type: Optional[Type] = None,
359
) -> WorkflowHandle[Any, Any]
360
```
361
362
**Parameters:**
363
- `workflow_id` (str): Workflow ID to get a handle to
364
- `run_id` (Optional[str]): Run ID that will be used for all calls
365
- `first_execution_run_id` (Optional[str]): First execution run ID used for cancellation and termination
366
- `result_type` (Optional[Type]): The result type to deserialize into if known
367
368
**Returns:**
369
- `WorkflowHandle[Any, Any]`: Handle to the workflow
370
371
#### get_workflow_handle_for
372
373
```python { .api }
374
def get_workflow_handle_for(
375
self,
376
workflow: Union[MethodAsyncNoParam[SelfType, ReturnType], MethodAsyncSingleParam[SelfType, Any, ReturnType]],
377
workflow_id: str,
378
*,
379
run_id: Optional[str] = None,
380
first_execution_run_id: Optional[str] = None,
381
) -> WorkflowHandle[SelfType, ReturnType]
382
```
383
384
**Parameters:**
385
- `workflow`: The workflow run method to use for typing the handle
386
- `workflow_id` (str): Workflow ID to get a handle to
387
- `run_id` (Optional[str]): Run ID that will be used for all calls
388
- `first_execution_run_id` (Optional[str]): First execution run ID used for cancellation and termination
389
390
**Returns:**
391
- `WorkflowHandle[SelfType, ReturnType]`: Typed handle to the workflow
392
393
### WorkflowHandle
394
395
```python { .api }
396
class WorkflowHandle(Generic[SelfType, ReturnType]):
397
"""Handle for interacting with a workflow.
398
399
This is usually created via Client.get_workflow_handle or
400
returned from Client.start_workflow.
401
"""
402
403
def __init__(
404
self,
405
client: Client,
406
id: str,
407
*,
408
run_id: Optional[str] = None,
409
result_run_id: Optional[str] = None,
410
first_execution_run_id: Optional[str] = None,
411
result_type: Optional[Type] = None,
412
start_workflow_response: Optional[Union[temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse, temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse]] = None,
413
)
414
```
415
416
#### WorkflowHandle Properties
417
418
```python { .api }
419
@property
420
def id(self) -> str:
421
"""ID of the workflow."""
422
423
@property
424
def run_id(self) -> Optional[str]:
425
"""Run ID used for most calls on this handle."""
426
427
@property
428
def result_run_id(self) -> Optional[str]:
429
"""Run ID used for result calls on this handle."""
430
431
@property
432
def first_execution_run_id(self) -> Optional[str]:
433
"""Run ID of the first execution used for cancel and terminate calls."""
434
```
435
436
#### WorkflowHandle.result
437
438
```python { .api }
439
async def result(
440
self,
441
*,
442
follow_runs: bool = True,
443
rpc_metadata: Mapping[str, str] = {},
444
rpc_timeout: Optional[timedelta] = None,
445
) -> ReturnType
446
```
447
448
**Parameters:**
449
- `follow_runs` (bool): Whether to follow workflow runs if the workflow continues as new
450
- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call
451
- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call
452
453
**Returns:**
454
- `ReturnType`: The workflow result
455
456
### Continue-as-New Workflows
457
458
Continue-as-new workflows are handled automatically by the client when following runs in the `result()` method.
459
460
**Example:**
461
```python
462
# Start a workflow
463
handle = await client.start_workflow(
464
MyWorkflow.run,
465
"input_data",
466
id="my-workflow-id",
467
task_queue="my-task-queue"
468
)
469
470
# Get the result (will follow continue-as-new workflows by default)
471
result = await handle.result()
472
```
473
474
## Workflow Interaction
475
476
### Signaling Workflows
477
478
#### WorkflowHandle.signal
479
480
```python { .api }
481
async def signal(
482
self,
483
signal: Union[MethodSyncOrAsyncNoParam[SelfType], str],
484
arg: Any = temporalio.common._arg_unset,
485
*,
486
args: Sequence[Any] = [],
487
rpc_metadata: Mapping[str, str] = {},
488
rpc_timeout: Optional[timedelta] = None,
489
) -> None
490
```
491
492
**Parameters:**
493
- `signal`: The signal method to call or signal name as string
494
- `arg`: Single argument for the signal
495
- `args` (Sequence[Any]): Multiple arguments for the signal
496
- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call
497
- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call
498
499
**Example:**
500
```python
501
# Signal a workflow using typed method
502
await handle.signal(MyWorkflow.my_signal, "signal_data")
503
504
# Signal a workflow using string name
505
await handle.signal("my_signal", "signal_data")
506
```
507
508
### Querying Workflows
509
510
#### WorkflowHandle.query
511
512
```python { .api }
513
async def query(
514
self,
515
query: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], str],
516
arg: Any = temporalio.common._arg_unset,
517
*,
518
args: Sequence[Any] = [],
519
reject_condition: Optional[temporalio.common.QueryRejectCondition] = None,
520
rpc_metadata: Mapping[str, str] = {},
521
rpc_timeout: Optional[timedelta] = None,
522
) -> Any
523
```
524
525
**Parameters:**
526
- `query`: The query method to call or query name as string
527
- `arg`: Single argument for the query
528
- `args` (Sequence[Any]): Multiple arguments for the query
529
- `reject_condition` (Optional[temporalio.common.QueryRejectCondition]): Condition for rejecting the query
530
- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call
531
- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call
532
533
**Returns:**
534
- `Any`: The query result
535
536
### Workflow Updates
537
538
#### Executing Updates
539
540
```python { .api }
541
async def execute_update(
542
self,
543
update: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], str],
544
arg: Any = temporalio.common._arg_unset,
545
*,
546
args: Sequence[Any] = [],
547
id: Optional[str] = None,
548
first_execution_run_id: Optional[str] = None,
549
wait_for_stage: WorkflowUpdateStage = WorkflowUpdateStage.COMPLETED,
550
rpc_metadata: Mapping[str, str] = {},
551
rpc_timeout: Optional[timedelta] = None,
552
) -> Any
553
```
554
555
**Parameters:**
556
- `update`: The update method to call or update name as string
557
- `arg`: Single argument for the update
558
- `args` (Sequence[Any]): Multiple arguments for the update
559
- `id` (Optional[str]): Update ID
560
- `first_execution_run_id` (Optional[str]): First execution run ID
561
- `wait_for_stage` (WorkflowUpdateStage): Stage to wait for before returning
562
- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call
563
- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call
564
565
**Returns:**
566
- `Any`: The update result
567
568
#### Starting Updates
569
570
```python { .api }
571
async def start_update(
572
self,
573
update: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], str],
574
arg: Any = temporalio.common._arg_unset,
575
*,
576
args: Sequence[Any] = [],
577
id: Optional[str] = None,
578
first_execution_run_id: Optional[str] = None,
579
wait_for_stage: WorkflowUpdateStage = WorkflowUpdateStage.ACCEPTED,
580
rpc_metadata: Mapping[str, str] = {},
581
rpc_timeout: Optional[timedelta] = None,
582
) -> WorkflowUpdateHandle[Any, Any]
583
```
584
585
#### Getting Update Handles
586
587
```python { .api }
588
def get_update_handle(
589
self,
590
id: str,
591
*,
592
first_execution_run_id: Optional[str] = None,
593
result_type: Optional[Type] = None,
594
) -> WorkflowUpdateHandle[SelfType, Any]
595
```
596
597
```python { .api }
598
def get_update_handle_for(
599
self,
600
update: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], MethodSyncOrAsyncSingleParam[SelfType, Any, ReturnType]],
601
id: str,
602
*,
603
first_execution_run_id: Optional[str] = None,
604
) -> WorkflowUpdateHandle[SelfType, ReturnType]
605
```
606
607
### WorkflowUpdateHandle
608
609
```python { .api }
610
class WorkflowUpdateHandle(Generic[SelfType, ReturnType]):
611
"""Handle for interacting with a workflow update."""
612
613
@property
614
def id(self) -> str:
615
"""ID of the update."""
616
617
@property
618
def workflow_id(self) -> str:
619
"""Workflow ID this update is for."""
620
621
@property
622
def workflow_run_id(self) -> Optional[str]:
623
"""Run ID of the workflow this update is for."""
624
625
async def result(
626
self,
627
*,
628
rpc_metadata: Mapping[str, str] = {},
629
rpc_timeout: Optional[timedelta] = None,
630
) -> ReturnType:
631
"""Get the result of the update."""
632
```
633
634
### WorkflowUpdateStage
635
636
```python { .api }
637
class WorkflowUpdateStage(IntEnum):
638
"""Stage of workflow update execution."""
639
640
ADMITTED = 1
641
ACCEPTED = 2
642
COMPLETED = 3
643
```
644
645
### Canceling and Terminating Workflows
646
647
#### WorkflowHandle.cancel
648
649
```python { .api }
650
async def cancel(
651
self,
652
*,
653
rpc_metadata: Mapping[str, str] = {},
654
rpc_timeout: Optional[timedelta] = None,
655
) -> None
656
```
657
658
**Parameters:**
659
- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call
660
- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call
661
662
#### WorkflowHandle.terminate
663
664
```python { .api }
665
async def terminate(
666
self,
667
*,
668
reason: Optional[str] = None,
669
details: Sequence[Any] = [],
670
rpc_metadata: Mapping[str, str] = {},
671
rpc_timeout: Optional[timedelta] = None,
672
) -> None
673
```
674
675
**Parameters:**
676
- `reason` (Optional[str]): Reason for terminating the workflow
677
- `details` (Sequence[Any]): Additional details about the termination
678
- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call
679
- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call
680
681
**Example:**
682
```python
683
# Cancel a workflow
684
await handle.cancel()
685
686
# Terminate a workflow with reason
687
await handle.terminate(reason="Manual termination", details=["User requested"])
688
```
689
690
## Schedule Management
691
692
### Creating and Managing Schedules
693
694
#### Client.create_schedule
695
696
```python { .api }
697
async def create_schedule(
698
self,
699
id: str,
700
schedule: Schedule,
701
*,
702
trigger_immediately: bool = False,
703
backfill: Sequence[ScheduleBackfill] = [],
704
memo: Optional[Mapping[str, Any]] = None,
705
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,
706
rpc_metadata: Mapping[str, str] = {},
707
rpc_timeout: Optional[timedelta] = None,
708
) -> ScheduleHandle
709
```
710
711
**Parameters:**
712
- `id` (str): Schedule ID
713
- `schedule` (Schedule): The schedule definition
714
- `trigger_immediately` (bool): Whether to trigger the schedule immediately upon creation
715
- `backfill` (Sequence[ScheduleBackfill]): Backfill requests for the schedule
716
- `memo` (Optional[Mapping[str, Any]]): Memo to attach to the schedule
717
- `search_attributes` (Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]]): Search attributes for the schedule
718
- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call
719
- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call
720
721
**Returns:**
722
- `ScheduleHandle`: Handle to the created schedule
723
724
#### Client.get_schedule_handle
725
726
```python { .api }
727
def get_schedule_handle(self, id: str) -> ScheduleHandle
728
```
729
730
**Parameters:**
731
- `id` (str): Schedule ID
732
733
**Returns:**
734
- `ScheduleHandle`: Handle to the schedule
735
736
### ScheduleHandle
737
738
```python { .api }
739
class ScheduleHandle:
740
"""Handle for interacting with a schedule.
741
742
This is usually created via Client.get_schedule_handle or
743
returned from Client.create_schedule.
744
"""
745
746
def __init__(self, client: Client, id: str) -> None:
747
"""Create schedule handle."""
748
749
@property
750
def id(self) -> str:
751
"""ID of the schedule."""
752
```
753
754
#### ScheduleHandle Operations
755
756
```python { .api }
757
async def backfill(
758
self,
759
*backfill: ScheduleBackfill,
760
rpc_metadata: Mapping[str, str] = {},
761
rpc_timeout: Optional[timedelta] = None,
762
) -> None:
763
"""Backfill the schedule."""
764
765
async def delete(
766
self,
767
*,
768
rpc_metadata: Mapping[str, str] = {},
769
rpc_timeout: Optional[timedelta] = None,
770
) -> None:
771
"""Delete the schedule."""
772
773
async def describe(
774
self,
775
*,
776
rpc_metadata: Mapping[str, str] = {},
777
rpc_timeout: Optional[timedelta] = None,
778
) -> ScheduleDescription:
779
"""Get description of the schedule."""
780
781
async def pause(
782
self,
783
*,
784
note: Optional[str] = None,
785
rpc_metadata: Mapping[str, str] = {},
786
rpc_timeout: Optional[timedelta] = None,
787
) -> None:
788
"""Pause the schedule."""
789
790
async def trigger(
791
self,
792
*,
793
overlap_policy: Optional[ScheduleOverlapPolicy] = None,
794
rpc_metadata: Mapping[str, str] = {},
795
rpc_timeout: Optional[timedelta] = None,
796
) -> None:
797
"""Manually trigger the schedule."""
798
799
async def unpause(
800
self,
801
*,
802
note: Optional[str] = None,
803
rpc_metadata: Mapping[str, str] = {},
804
rpc_timeout: Optional[timedelta] = None,
805
) -> None:
806
"""Unpause the schedule."""
807
808
async def update(
809
self,
810
updater: Callable[[ScheduleUpdateInput], None],
811
*,
812
rpc_metadata: Mapping[str, str] = {},
813
rpc_timeout: Optional[timedelta] = None,
814
) -> None:
815
"""Update the schedule."""
816
```
817
818
### Schedule Types
819
820
#### Schedule
821
822
```python { .api }
823
@dataclass
824
class Schedule:
825
"""Complete schedule definition."""
826
827
action: ScheduleAction
828
spec: ScheduleSpec
829
policy: SchedulePolicy = dataclasses.field(default_factory=SchedulePolicy)
830
state: ScheduleState = dataclasses.field(default_factory=ScheduleState)
831
```
832
833
#### ScheduleSpec
834
835
```python { .api }
836
@dataclass
837
class ScheduleSpec:
838
"""Specification for when to run a scheduled action."""
839
840
calendars: Sequence[ScheduleCalendarSpec] = ()
841
intervals: Sequence[ScheduleIntervalSpec] = ()
842
cron_expressions: Sequence[str] = ()
843
skip: Sequence[ScheduleCalendarSpec] = ()
844
start_at: Optional[datetime] = None
845
end_at: Optional[datetime] = None
846
jitter: Optional[timedelta] = None
847
time_zone_name: str = "UTC"
848
```
849
850
#### ScheduleAction
851
852
```python { .api }
853
class ScheduleAction(ABC):
854
"""Base class for schedule actions."""
855
856
@abstractmethod
857
def _to_proto(self) -> temporalio.api.schedule.v1.ScheduleAction:
858
"""Convert to proto."""
859
```
860
861
#### ScheduleActionStartWorkflow
862
863
```python { .api }
864
@dataclass
865
class ScheduleActionStartWorkflow(ScheduleAction):
866
"""Schedule action that starts a workflow."""
867
868
workflow: str
869
args: Sequence[Any] = ()
870
id: str = ""
871
task_queue: str = ""
872
execution_timeout: Optional[timedelta] = None
873
run_timeout: Optional[timedelta] = None
874
task_timeout: Optional[timedelta] = None
875
retry_policy: Optional[temporalio.common.RetryPolicy] = None
876
memo: Optional[Mapping[str, Any]] = None
877
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None
878
headers: Optional[Mapping[str, temporalio.api.common.v1.Payload]] = None
879
```
880
881
#### SchedulePolicy
882
883
```python { .api }
884
@dataclass
885
class SchedulePolicy:
886
"""General schedule policies."""
887
888
overlap: ScheduleOverlapPolicy = ScheduleOverlapPolicy.UNSPECIFIED
889
catchup_window: Optional[timedelta] = None
890
pause_on_failure: bool = False
891
```
892
893
#### ScheduleOverlapPolicy
894
895
```python { .api }
896
class ScheduleOverlapPolicy(IntEnum):
897
"""Policy for overlapping schedule executions."""
898
899
UNSPECIFIED = 0
900
SKIP = 1
901
BUFFER_ONE = 2
902
BUFFER_ALL = 3
903
CANCEL_OTHER = 4
904
TERMINATE_OTHER = 5
905
ALLOW_ALL = 6
906
```
907
908
**Example:**
909
```python
910
import temporalio.client
911
from temporalio.client import ScheduleActionStartWorkflow, ScheduleSpec, Schedule, SchedulePolicy
912
913
# Create a schedule to run a workflow every hour
914
schedule = Schedule(
915
action=ScheduleActionStartWorkflow(
916
workflow="MyWorkflow",
917
args=["scheduled_data"],
918
id="scheduled-workflow-{{}}", # Double braces for template
919
task_queue="my-task-queue"
920
),
921
spec=ScheduleSpec(
922
intervals=[timedelta(hours=1)]
923
),
924
policy=SchedulePolicy()
925
)
926
927
# Create the schedule
928
schedule_handle = await client.create_schedule("my-schedule", schedule)
929
930
# Trigger the schedule manually
931
await schedule_handle.trigger()
932
```
933
934
## Worker Build Management
935
936
### Build ID Compatibility
937
938
#### Client.update_worker_build_id_compatibility
939
940
```python { .api }
941
async def update_worker_build_id_compatibility(
942
self,
943
task_queue: str,
944
operation: BuildIdOp,
945
*,
946
rpc_metadata: Mapping[str, str] = {},
947
rpc_timeout: Optional[timedelta] = None,
948
) -> None
949
```
950
951
**Parameters:**
952
- `task_queue` (str): Task queue to update build ID compatibility for
953
- `operation` (BuildIdOp): Build ID operation to perform
954
- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call
955
- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call
956
957
#### Client.get_worker_build_id_compatibility
958
959
```python { .api }
960
async def get_worker_build_id_compatibility(
961
self,
962
task_queue: str,
963
*,
964
max_sets: int = 0,
965
rpc_metadata: Mapping[str, str] = {},
966
rpc_timeout: Optional[timedelta] = None,
967
) -> WorkerBuildIdVersionSets
968
```
969
970
**Parameters:**
971
- `task_queue` (str): Task queue to get build ID compatibility for
972
- `max_sets` (int): Maximum number of sets to return. 0 means no limit
973
- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call
974
- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call
975
976
**Returns:**
977
- `WorkerBuildIdVersionSets`: Build ID version sets
978
979
### Task Reachability
980
981
#### Client.get_worker_task_reachability
982
983
```python { .api }
984
async def get_worker_task_reachability(
985
self,
986
*,
987
build_ids: Sequence[str] = [],
988
task_queues: Sequence[str] = [],
989
reachability_type: WorkerTaskReachabilityType = WorkerTaskReachabilityType.UNSPECIFIED,
990
rpc_metadata: Mapping[str, str] = {},
991
rpc_timeout: Optional[timedelta] = None,
992
) -> WorkerTaskReachability
993
```
994
995
**Parameters:**
996
- `build_ids` (Sequence[str]): Build IDs to check reachability for
997
- `task_queues` (Sequence[str]): Task queues to check reachability for
998
- `reachability_type` (WorkerTaskReachabilityType): Type of reachability to check
999
- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call
1000
- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call
1001
1002
**Returns:**
1003
- `WorkerTaskReachability`: Task reachability information
1004
1005
### Worker Build Types
1006
1007
#### WorkerBuildIdVersionSets
1008
1009
```python { .api }
1010
@dataclass
1011
class WorkerBuildIdVersionSets:
1012
"""Worker build ID version sets."""
1013
1014
version_sets: Sequence[BuildIdVersionSet]
1015
```
1016
1017
#### BuildIdVersionSet
1018
1019
```python { .api }
1020
@dataclass
1021
class BuildIdVersionSet:
1022
"""A single build ID version set."""
1023
1024
build_ids: Sequence[str]
1025
is_default: bool
1026
```
1027
1028
#### BuildIdOp
1029
1030
```python { .api }
1031
class BuildIdOp(ABC):
1032
"""Base class for build ID operations."""
1033
1034
@abstractmethod
1035
def _to_proto(self) -> temporalio.api.taskqueue.v1.BuildIdOp:
1036
"""Convert to proto."""
1037
```
1038
1039
#### WorkerTaskReachability
1040
1041
```python { .api }
1042
@dataclass
1043
class WorkerTaskReachability:
1044
"""Worker task reachability information."""
1045
1046
build_id_reachability: Mapping[str, BuildIdReachability]
1047
task_queue_reachability: Mapping[str, TaskQueueReachability]
1048
```
1049
1050
### Version Management
1051
1052
Version management is handled through the build ID compatibility system. Workers can be updated to new versions while maintaining compatibility with existing workflows.
1053
1054
**Example:**
1055
```python
1056
# Update build ID compatibility to add a new version
1057
from temporalio.client import AddNewIdInNewDefaultSet
1058
1059
await client.update_worker_build_id_compatibility(
1060
"my-task-queue",
1061
AddNewIdInNewDefaultSet("build-id-v2")
1062
)
1063
1064
# Get current build ID compatibility
1065
compatibility = await client.get_worker_build_id_compatibility("my-task-queue")
1066
```
1067
1068
## Async Activity Management
1069
1070
### AsyncActivityHandle
1071
1072
```python { .api }
1073
class AsyncActivityHandle:
1074
"""Handle representing an external activity for completion and heartbeat."""
1075
1076
def __init__(
1077
self,
1078
client: Client,
1079
id_or_token: Union[AsyncActivityIDReference, bytes]
1080
) -> None:
1081
"""Create an async activity handle."""
1082
```
1083
1084
#### AsyncActivityHandle Methods
1085
1086
```python { .api }
1087
async def heartbeat(
1088
self,
1089
*details: Any,
1090
rpc_metadata: Mapping[str, str] = {},
1091
rpc_timeout: Optional[timedelta] = None,
1092
) -> None:
1093
"""Send a heartbeat for the activity."""
1094
1095
async def complete(
1096
self,
1097
result: Any = None,
1098
*,
1099
rpc_metadata: Mapping[str, str] = {},
1100
rpc_timeout: Optional[timedelta] = None,
1101
) -> None:
1102
"""Complete the activity with a result."""
1103
1104
async def fail(
1105
self,
1106
exception: BaseException,
1107
*,
1108
last_heartbeat_details: Sequence[Any] = [],
1109
rpc_metadata: Mapping[str, str] = {},
1110
rpc_timeout: Optional[timedelta] = None,
1111
) -> None:
1112
"""Fail the activity with an exception."""
1113
1114
async def report_cancellation(
1115
self,
1116
*details: Any,
1117
rpc_metadata: Mapping[str, str] = {},
1118
rpc_timeout: Optional[timedelta] = None,
1119
) -> None:
1120
"""Report that the activity was cancelled."""
1121
```
1122
1123
### Getting Async Activity Handles
1124
1125
#### Client.get_async_activity_handle (by ID and task queue)
1126
1127
```python { .api }
1128
def get_async_activity_handle(
1129
self,
1130
*,
1131
activity_id: str,
1132
task_queue: str,
1133
workflow_namespace: Optional[str] = None,
1134
) -> AsyncActivityHandle
1135
```
1136
1137
**Parameters:**
1138
- `activity_id` (str): ID of the async activity
1139
- `task_queue` (str): Task queue the activity is running on
1140
- `workflow_namespace` (Optional[str]): Workflow namespace if different from client namespace
1141
1142
**Returns:**
1143
- `AsyncActivityHandle`: Handle to the async activity
1144
1145
#### Client.get_async_activity_handle (by task token)
1146
1147
```python { .api }
1148
def get_async_activity_handle(
1149
self,
1150
*,
1151
task_token: bytes
1152
) -> AsyncActivityHandle
1153
```
1154
1155
**Parameters:**
1156
- `task_token` (bytes): Task token for the async activity
1157
1158
**Returns:**
1159
- `AsyncActivityHandle`: Handle to the async activity
1160
1161
### AsyncActivityIDReference
1162
1163
```python { .api }
1164
@dataclass
1165
class AsyncActivityIDReference:
1166
"""Reference to an async activity by ID."""
1167
1168
activity_id: str
1169
task_queue: str
1170
workflow_namespace: Optional[str] = None
1171
```
1172
1173
### Activity Completion and Failure
1174
1175
Async activities can be completed or failed from external processes using the activity handle.
1176
1177
**Example:**
1178
```python
1179
# Get an async activity handle by ID
1180
activity_handle = client.get_async_activity_handle(
1181
activity_id="my-activity-123",
1182
task_queue="my-task-queue"
1183
)
1184
1185
# Send a heartbeat
1186
await activity_handle.heartbeat("Still processing...")
1187
1188
# Complete the activity
1189
await activity_handle.complete("Activity completed successfully")
1190
1191
# Or fail the activity
1192
try:
1193
# Some processing that might fail
1194
pass
1195
except Exception as e:
1196
await activity_handle.fail(e)
1197
```
1198
1199
**Example with task token:**
1200
```python
1201
# Get an async activity handle by task token
1202
activity_handle = client.get_async_activity_handle(task_token=task_token_bytes)
1203
1204
# Report cancellation
1205
await activity_handle.report_cancellation("User cancelled the operation")
1206
```