0
# Workflow Development
1
2
The `temporalio.workflow` module provides core functionality for defining and executing workflows. This module contains decorators, context functions, activity execution methods, and utilities needed for workflow development. Workflows are the fundamental building blocks of Temporal applications, representing the orchestration logic for business processes.
3
4
## Workflow Definition and Decorators
5
6
### Workflow Definition
7
8
The `@workflow.defn` decorator is used to mark a class as a workflow definition.
9
10
```python { .api }
11
@overload
12
def defn(cls: ClassType) -> ClassType: ...
13
14
@overload
15
def defn(
16
*,
17
name: Optional[str] = None,
18
sandboxed: bool = True,
19
failure_exception_types: Sequence[Type[BaseException]] = [],
20
versioning_behavior: temporalio.common.VersioningBehavior = temporalio.common.VersioningBehavior.UNSPECIFIED,
21
) -> Callable[[ClassType], ClassType]: ...
22
23
@overload
24
def defn(
25
*,
26
sandboxed: bool = True,
27
dynamic: bool = False,
28
versioning_behavior: temporalio.common.VersioningBehavior = temporalio.common.VersioningBehavior.UNSPECIFIED,
29
) -> Callable[[ClassType], ClassType]: ...
30
31
def defn(
32
cls: Optional[ClassType] = None,
33
*,
34
name: Optional[str] = None,
35
sandboxed: bool = True,
36
dynamic: bool = False,
37
failure_exception_types: Sequence[Type[BaseException]] = [],
38
versioning_behavior: temporalio.common.VersioningBehavior = temporalio.common.VersioningBehavior.UNSPECIFIED,
39
) -> Union[ClassType, Callable[[ClassType], ClassType]]
40
```
41
42
**Parameters:**
43
- `cls` (Optional[ClassType]): The class to decorate
44
- `name` (Optional[str]): Name to use for the workflow. Defaults to class `__name__`. Cannot be set if dynamic is set
45
- `sandboxed` (bool): Whether the workflow should run in a sandbox. Default is True
46
- `dynamic` (bool): If true, this workflow will be dynamic. Dynamic workflows have to accept a single 'Sequence[RawValue]' parameter. Cannot be set to true if name is present
47
- `failure_exception_types` (Sequence[Type[BaseException]]): The types of exceptions that, if a workflow-thrown exception extends, will cause the workflow/update to fail instead of suspending the workflow via task failure
48
- `versioning_behavior` (temporalio.common.VersioningBehavior): Specifies the versioning behavior to use for this workflow
49
50
**Example:**
51
```python
52
import temporalio.workflow
53
54
@temporalio.workflow.defn
55
class MyWorkflow:
56
@temporalio.workflow.run
57
async def run(self, input: str) -> str:
58
return f"Hello, {input}!"
59
60
# With custom name and settings
61
@temporalio.workflow.defn(name="CustomWorkflow", sandboxed=False)
62
class CustomWorkflow:
63
@temporalio.workflow.run
64
async def run(self) -> str:
65
return "Custom workflow"
66
```
67
68
### Workflow Initialization
69
70
```python { .api }
71
def init(init_fn: CallableType) -> CallableType
72
```
73
74
**Parameters:**
75
- `init_fn` (CallableType): The `__init__` method to decorate
76
77
The `@workflow.init` decorator may be used on the `__init__` method of the workflow class to specify that it accepts the same workflow input arguments as the `@workflow.run` method. If used, the parameters of your `__init__` and `@workflow.run` methods must be identical.
78
79
**Example:**
80
```python
81
@temporalio.workflow.defn
82
class MyWorkflow:
83
@temporalio.workflow.init
84
def __init__(self, input: str):
85
self.input = input
86
87
@temporalio.workflow.run
88
async def run(self, input: str) -> str:
89
return f"Hello, {self.input}!"
90
```
91
92
### Run Method Decorator
93
94
```python { .api }
95
def run(fn: CallableAsyncType) -> CallableAsyncType
96
```
97
98
**Parameters:**
99
- `fn` (CallableAsyncType): The function to decorate
100
101
The `@workflow.run` decorator must be used on one and only one async method defined on the same class as `@workflow.defn`. This can be defined on a base class method but must then be explicitly overridden and defined on the workflow class.
102
103
**Example:**
104
```python
105
@temporalio.workflow.defn
106
class MyWorkflow:
107
@temporalio.workflow.run
108
async def run(self, name: str) -> str:
109
return await temporalio.workflow.start_activity(
110
my_activity,
111
name,
112
schedule_to_close_timeout=timedelta(minutes=5)
113
)
114
```
115
116
### Signal Handler Decorator
117
118
```python { .api }
119
@overload
120
def signal(
121
fn: CallableSyncOrAsyncReturnNoneType,
122
) -> CallableSyncOrAsyncReturnNoneType: ...
123
124
@overload
125
def signal(
126
*,
127
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
128
description: Optional[str] = None,
129
) -> Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]: ...
130
131
@overload
132
def signal(
133
*,
134
name: str,
135
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
136
description: Optional[str] = None,
137
) -> Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]: ...
138
139
@overload
140
def signal(
141
*,
142
dynamic: Literal[True],
143
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
144
description: Optional[str] = None,
145
) -> Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]: ...
146
147
def signal(
148
fn: Optional[CallableSyncOrAsyncReturnNoneType] = None,
149
*,
150
name: Optional[str] = None,
151
dynamic: Optional[bool] = False,
152
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
153
description: Optional[str] = None,
154
) -> Union[CallableSyncOrAsyncReturnNoneType, Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]]
155
```
156
157
**Parameters:**
158
- `fn` (Optional[CallableSyncOrAsyncReturnNoneType]): The function to decorate
159
- `name` (Optional[str]): Signal name. Defaults to method `__name__`. Cannot be present when `dynamic` is present
160
- `dynamic` (Optional[bool]): If true, this handles all signals not otherwise handled. Cannot be present when `name` is present
161
- `unfinished_policy` (HandlerUnfinishedPolicy): Actions taken if a workflow terminates with a running instance of this handler
162
- `description` (Optional[str]): A short description of the signal that may appear in the UI/CLI
163
164
**Example:**
165
```python
166
@temporalio.workflow.defn
167
class MyWorkflow:
168
def __init__(self):
169
self.messages = []
170
171
@temporalio.workflow.signal
172
async def add_message(self, message: str) -> None:
173
self.messages.append(message)
174
175
@temporalio.workflow.signal(name="custom_signal", description="Custom signal handler")
176
def handle_custom_signal(self, data: dict) -> None:
177
# Handle custom signal
178
pass
179
```
180
181
### Query Handler Decorator
182
183
```python { .api }
184
@overload
185
def query(fn: CallableType) -> CallableType: ...
186
187
@overload
188
def query(
189
*, name: str, description: Optional[str] = None
190
) -> Callable[[CallableType], CallableType]: ...
191
192
@overload
193
def query(
194
*, dynamic: Literal[True], description: Optional[str] = None
195
) -> Callable[[CallableType], CallableType]: ...
196
197
@overload
198
def query(*, description: str) -> Callable[[CallableType], CallableType]: ...
199
200
def query(
201
fn: Optional[CallableType] = None,
202
*,
203
name: Optional[str] = None,
204
dynamic: Optional[bool] = False,
205
description: Optional[str] = None,
206
) -> Union[CallableType, Callable[[CallableType], CallableType]]
207
```
208
209
**Parameters:**
210
- `fn` (Optional[CallableType]): The function to decorate
211
- `name` (Optional[str]): Query name. Defaults to method `__name__`. Cannot be present when `dynamic` is present
212
- `dynamic` (Optional[bool]): If true, this handles all queries not otherwise handled. Cannot be present when `name` is present
213
- `description` (Optional[str]): A short description of the query that may appear in the UI/CLI
214
215
**Example:**
216
```python
217
@temporalio.workflow.defn
218
class MyWorkflow:
219
def __init__(self):
220
self.status = "running"
221
222
@temporalio.workflow.query
223
def get_status(self) -> str:
224
return self.status
225
226
@temporalio.workflow.query(name="message_count", description="Get number of messages")
227
def get_message_count(self) -> int:
228
return len(self.messages)
229
```
230
231
### Update Handler Decorator
232
233
```python { .api }
234
@overload
235
def update(
236
fn: UpdateMethodMultiParam[MultiParamSpec, ReturnType]
237
) -> UpdateMethodMultiParam[MultiParamSpec, ReturnType]: ...
238
239
@overload
240
def update(
241
*, name: str, description: Optional[str] = None
242
) -> Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]: ...
243
244
@overload
245
def update(
246
*,
247
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
248
description: Optional[str] = None,
249
) -> Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]: ...
250
251
@overload
252
def update(
253
*,
254
name: str,
255
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
256
description: Optional[str] = None,
257
) -> Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]: ...
258
259
@overload
260
def update(
261
*,
262
dynamic: Literal[True],
263
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
264
description: Optional[str] = None,
265
) -> Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]: ...
266
267
def update(
268
fn: Optional[UpdateMethodMultiParam[MultiParamSpec, ReturnType]] = None,
269
*,
270
name: Optional[str] = None,
271
dynamic: Optional[bool] = False,
272
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
273
description: Optional[str] = None,
274
) -> Union[UpdateMethodMultiParam[MultiParamSpec, ReturnType], Callable[[UpdateMethodMultiParam[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType]]]
275
```
276
277
**Parameters:**
278
- `fn` (Optional[UpdateMethodMultiParam[MultiParamSpec, ReturnType]]): The function to decorate
279
- `name` (Optional[str]): Update name. Defaults to method `__name__`. Cannot be present when `dynamic` is present
280
- `dynamic` (Optional[bool]): If true, this handles all updates not otherwise handled. Cannot be present when `name` is present
281
- `unfinished_policy` (HandlerUnfinishedPolicy): Actions taken if a workflow terminates with a running instance of this handler
282
- `description` (Optional[str]): A short description of the update that may appear in the UI/CLI
283
284
**Example:**
285
```python
286
@temporalio.workflow.defn
287
class MyWorkflow:
288
def __init__(self):
289
self.value = 0
290
291
@temporalio.workflow.update
292
def increment_value(self, amount: int) -> int:
293
self.value += amount
294
return self.value
295
296
@temporalio.workflow.update(name="set_value", description="Set the workflow value")
297
def set_workflow_value(self, new_value: int) -> int:
298
self.value = new_value
299
return self.value
300
```
301
302
## Context Functions
303
304
### Workflow Information
305
306
```python { .api }
307
def info() -> Info
308
```
309
310
**Returns:**
311
- `Info`: Current workflow execution information
312
313
Get current workflow info. This will error if not called in a workflow.
314
315
**Example:**
316
```python
317
# Inside a workflow
318
workflow_info = temporalio.workflow.info()
319
print(f"Workflow ID: {workflow_info.workflow_id}")
320
print(f"Run ID: {workflow_info.run_id}")
321
```
322
323
```python { .api }
324
def instance() -> Any
325
```
326
327
**Returns:**
328
- `Any`: Current workflow instance
329
330
Get the current workflow instance. This is the same as the workflow instance that is currently executing.
331
332
```python { .api }
333
def in_workflow() -> bool
334
```
335
336
**Returns:**
337
- `bool`: Whether currently executing in a workflow context
338
339
Check whether currently in a workflow. This will return True if called from a workflow and False otherwise.
340
341
### Time Functions
342
343
```python { .api }
344
def now() -> datetime
345
```
346
347
**Returns:**
348
- `datetime`: Current workflow time
349
350
Get current workflow time as a timezone-aware datetime in UTC. This is always the current time according to the workflow's view of time which may not be the same as system time.
351
352
```python { .api }
353
def time() -> float
354
```
355
356
**Returns:**
357
- `float`: Current workflow time as seconds since epoch
358
359
Get current workflow time as seconds since the Unix epoch.
360
361
```python { .api }
362
def time_ns() -> int
363
```
364
365
**Returns:**
366
- `int`: Current workflow time as nanoseconds since epoch
367
368
Get current workflow time as nanoseconds since the Unix epoch.
369
370
**Example:**
371
```python
372
# Inside a workflow
373
current_time = temporalio.workflow.now()
374
timestamp = temporalio.workflow.time()
375
nano_timestamp = temporalio.workflow.time_ns()
376
377
print(f"Current workflow time: {current_time}")
378
print(f"Timestamp: {timestamp}")
379
```
380
381
### Utility Functions
382
383
```python { .api }
384
def uuid4() -> uuid.UUID
385
```
386
387
**Returns:**
388
- `uuid.UUID`: Deterministic UUID4
389
390
Generate a deterministic UUID4 value. This UUID will be the same for the same workflow execution on replay.
391
392
```python { .api }
393
def random() -> Random
394
```
395
396
**Returns:**
397
- `Random`: Deterministic random number generator
398
399
Get a deterministic random number generator. This random generator will produce the same sequence of numbers for the same workflow execution on replay.
400
401
**Example:**
402
```python
403
# Inside a workflow
404
workflow_uuid = temporalio.workflow.uuid4()
405
rng = temporalio.workflow.random()
406
random_number = rng.randint(1, 100)
407
```
408
409
```python { .api }
410
def payload_converter() -> temporalio.converter.PayloadConverter
411
```
412
413
**Returns:**
414
- `temporalio.converter.PayloadConverter`: Current payload converter
415
416
Get the payload converter for the current workflow.
417
418
```python { .api }
419
def metric_meter() -> temporalio.common.MetricMeter
420
```
421
422
**Returns:**
423
- `temporalio.common.MetricMeter`: Current metric meter
424
425
Get the metric meter for the current workflow.
426
427
## Memo and Search Attribute Management
428
429
### Memo Operations
430
431
```python { .api }
432
def memo() -> Mapping[str, Any]
433
```
434
435
**Returns:**
436
- `Mapping[str, Any]`: Current workflow memo
437
438
Get current workflow memo as a mapping of keys to values.
439
440
```python { .api }
441
@overload
442
def memo_value(key: str, default: Any = temporalio.common._arg_unset) -> Any: ...
443
444
@overload
445
def memo_value(key: str, *, type_hint: Type[ParamType]) -> ParamType: ...
446
447
@overload
448
def memo_value(
449
key: str, default: ParamType, *, type_hint: Type[ParamType]
450
) -> ParamType: ...
451
452
def memo_value(
453
key: str,
454
default: Any = temporalio.common._arg_unset,
455
*,
456
type_hint: Optional[Type] = None,
457
) -> Any
458
```
459
460
**Parameters:**
461
- `key` (str): Memo key to get
462
- `default` (Any): Default value if key not present
463
- `type_hint` (Optional[Type]): Type hint for return value
464
465
**Returns:**
466
- `Any`: Memo value for the given key
467
468
Get a specific memo value. If no default is provided and the key is not present, this will raise a KeyError.
469
470
```python { .api }
471
def upsert_memo(updates: Mapping[str, Any]) -> None
472
```
473
474
**Parameters:**
475
- `updates` (Mapping[str, Any]): Memo keys and values to upsert
476
477
Upsert memo values. This will update the workflow memo with the provided key-value pairs.
478
479
**Example:**
480
```python
481
# Inside a workflow
482
current_memo = temporalio.workflow.memo()
483
user_id = temporalio.workflow.memo_value("user_id", default="unknown")
484
485
# Update memo
486
temporalio.workflow.upsert_memo({"status": "processing", "progress": 0.5})
487
```
488
489
### Search Attribute Operations
490
491
```python { .api }
492
def upsert_search_attributes(
493
updates: Union[
494
temporalio.common.TypedSearchAttributes,
495
Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
496
]
497
) -> None
498
```
499
500
**Parameters:**
501
- `updates` (Union[temporalio.common.TypedSearchAttributes, Mapping[Union[str, temporalio.common.SearchAttributeKey], Any]]): Search attributes to upsert
502
503
Upsert search attributes. This will update the workflow's search attributes with the provided values.
504
505
**Example:**
506
```python
507
# Inside a workflow
508
import temporalio.common
509
510
# Update search attributes
511
temporalio.workflow.upsert_search_attributes({
512
"status": "active",
513
temporalio.common.SearchAttributeKey.for_keyword("department"): "engineering"
514
})
515
```
516
517
## Activity Execution
518
519
### Starting Activities
520
521
```python { .api }
522
@overload
523
def start_activity(
524
activity: CallableAsyncType,
525
arg: AnyType,
526
*,
527
task_queue: Optional[str] = None,
528
schedule_to_close_timeout: Optional[timedelta] = None,
529
schedule_to_start_timeout: Optional[timedelta] = None,
530
start_to_close_timeout: Optional[timedelta] = None,
531
heartbeat_timeout: Optional[timedelta] = None,
532
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
533
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
534
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
535
local_retry_threshold: Optional[timedelta] = None,
536
) -> ActivityHandle[ReturnType]: ...
537
538
@overload
539
def start_activity(
540
activity: CallableAsyncType,
541
*,
542
task_queue: Optional[str] = None,
543
schedule_to_close_timeout: Optional[timedelta] = None,
544
schedule_to_start_timeout: Optional[timedelta] = None,
545
start_to_close_timeout: Optional[timedelta] = None,
546
heartbeat_timeout: Optional[timedelta] = None,
547
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
548
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
549
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
550
local_retry_threshold: Optional[timedelta] = None,
551
) -> ActivityHandle[ReturnType]: ...
552
553
@overload
554
def start_activity(
555
activity: str,
556
arg: AnyType,
557
*,
558
task_queue: Optional[str] = None,
559
schedule_to_close_timeout: Optional[timedelta] = None,
560
schedule_to_start_timeout: Optional[timedelta] = None,
561
start_to_close_timeout: Optional[timedelta] = None,
562
heartbeat_timeout: Optional[timedelta] = None,
563
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
564
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
565
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
566
local_retry_threshold: Optional[timedelta] = None,
567
) -> ActivityHandle[Any]: ...
568
569
@overload
570
def start_activity(
571
activity: str,
572
*,
573
task_queue: Optional[str] = None,
574
schedule_to_close_timeout: Optional[timedelta] = None,
575
schedule_to_start_timeout: Optional[timedelta] = None,
576
start_to_close_timeout: Optional[timedelta] = None,
577
heartbeat_timeout: Optional[timedelta] = None,
578
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
579
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
580
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
581
local_retry_threshold: Optional[timedelta] = None,
582
) -> ActivityHandle[Any]: ...
583
584
def start_activity(
585
activity: Union[CallableType, str],
586
arg: Any = temporalio.common._arg_unset,
587
*,
588
task_queue: Optional[str] = None,
589
schedule_to_close_timeout: Optional[timedelta] = None,
590
schedule_to_start_timeout: Optional[timedelta] = None,
591
start_to_close_timeout: Optional[timedelta] = None,
592
heartbeat_timeout: Optional[timedelta] = None,
593
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
594
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
595
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
596
local_retry_threshold: Optional[timedelta] = None,
597
) -> ActivityHandle[Any]
598
```
599
600
**Parameters:**
601
- `activity` (Union[CallableType, str]): Activity function or string name
602
- `arg` (Any): Single argument to pass to the activity
603
- `task_queue` (Optional[str]): Task queue to run activity on. Defaults to current workflow's task queue
604
- `schedule_to_close_timeout` (Optional[timedelta]): Total time allowed for activity from schedule to completion
605
- `schedule_to_start_timeout` (Optional[timedelta]): Time allowed for activity to start from when scheduled
606
- `start_to_close_timeout` (Optional[timedelta]): Time allowed for activity to complete from start
607
- `heartbeat_timeout` (Optional[timedelta]): Maximum time between activity heartbeats
608
- `retry_policy` (Optional[temporalio.common.RetryPolicy]): How the activity is retried on failure
609
- `cancellation_type` (ActivityCancellationType): How to handle activity cancellation
610
- `versioning_intent` (Optional[temporalio.common.VersioningIntent]): Worker versioning intent for this activity
611
- `local_retry_threshold` (Optional[timedelta]): Threshold for local retries vs server retries
612
613
**Returns:**
614
- `ActivityHandle[Any]`: Handle to the started activity
615
616
Start an activity and return a handle to it. The activity will not be awaited automatically.
617
618
**Example:**
619
```python
620
# Inside a workflow
621
from datetime import timedelta
622
import temporalio.activity
623
624
@temporalio.activity.defn
625
async def my_activity(name: str) -> str:
626
return f"Hello, {name}!"
627
628
# Start activity and await result
629
result = await temporalio.workflow.start_activity(
630
my_activity,
631
"World",
632
schedule_to_close_timeout=timedelta(minutes=5)
633
)
634
635
# Start activity and get handle for later awaiting
636
handle = temporalio.workflow.start_activity(
637
my_activity,
638
"World",
639
schedule_to_close_timeout=timedelta(minutes=5)
640
)
641
result = await handle
642
```
643
644
### Starting Activities by Class
645
646
```python { .api }
647
@overload
648
def start_activity_class(
649
activity_cls: Type[ClassType],
650
activity_method: str,
651
arg: AnyType,
652
*,
653
task_queue: Optional[str] = None,
654
schedule_to_close_timeout: Optional[timedelta] = None,
655
schedule_to_start_timeout: Optional[timedelta] = None,
656
start_to_close_timeout: Optional[timedelta] = None,
657
heartbeat_timeout: Optional[timedelta] = None,
658
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
659
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
660
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
661
local_retry_threshold: Optional[timedelta] = None,
662
) -> ActivityHandle[Any]: ...
663
664
def start_activity_class(
665
activity_cls: Type,
666
activity_method: str,
667
arg: Any = temporalio.common._arg_unset,
668
*,
669
task_queue: Optional[str] = None,
670
schedule_to_close_timeout: Optional[timedelta] = None,
671
schedule_to_start_timeout: Optional[timedelta] = None,
672
start_to_close_timeout: Optional[timedelta] = None,
673
heartbeat_timeout: Optional[timedelta] = None,
674
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
675
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
676
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
677
local_retry_threshold: Optional[timedelta] = None,
678
) -> ActivityHandle[Any]
679
```
680
681
**Parameters:**
682
- `activity_cls` (Type): Activity class
683
- `activity_method` (str): Method name on the activity class
684
- `arg` (Any): Single argument to pass to the activity
685
- Additional parameters same as `start_activity`
686
687
**Returns:**
688
- `ActivityHandle[Any]`: Handle to the started activity
689
690
Start an activity method from a class and return a handle to it.
691
692
### Starting Activity Methods
693
694
```python { .api }
695
@overload
696
def start_activity_method(
697
activity_method: MethodAsyncNoParam[SelfType, ReturnType],
698
*,
699
task_queue: Optional[str] = None,
700
schedule_to_close_timeout: Optional[timedelta] = None,
701
schedule_to_start_timeout: Optional[timedelta] = None,
702
start_to_close_timeout: Optional[timedelta] = None,
703
heartbeat_timeout: Optional[timedelta] = None,
704
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
705
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
706
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
707
local_retry_threshold: Optional[timedelta] = None,
708
) -> ActivityHandle[ReturnType]: ...
709
710
def start_activity_method(
711
activity_method: Callable,
712
arg: Any = temporalio.common._arg_unset,
713
*,
714
task_queue: Optional[str] = None,
715
schedule_to_close_timeout: Optional[timedelta] = None,
716
schedule_to_start_timeout: Optional[timedelta] = None,
717
start_to_close_timeout: Optional[timedelta] = None,
718
heartbeat_timeout: Optional[timedelta] = None,
719
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
720
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
721
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
722
local_retry_threshold: Optional[timedelta] = None,
723
) -> ActivityHandle[Any]
724
```
725
726
**Parameters:**
727
- `activity_method` (Callable): Activity method to start
728
- `arg` (Any): Single argument to pass to the activity
729
- Additional parameters same as `start_activity`
730
731
**Returns:**
732
- `ActivityHandle[Any]`: Handle to the started activity
733
734
Start an activity method and return a handle to it.
735
736
### Starting Local Activities
737
738
```python { .api }
739
@overload
740
def start_local_activity(
741
activity: CallableAsyncType,
742
arg: AnyType,
743
*,
744
schedule_to_close_timeout: Optional[timedelta] = None,
745
schedule_to_start_timeout: Optional[timedelta] = None,
746
start_to_close_timeout: Optional[timedelta] = None,
747
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
748
local_retry_threshold: Optional[timedelta] = None,
749
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
750
) -> ActivityHandle[ReturnType]: ...
751
752
def start_local_activity(
753
activity: Union[CallableType, str],
754
arg: Any = temporalio.common._arg_unset,
755
*,
756
schedule_to_close_timeout: Optional[timedelta] = None,
757
schedule_to_start_timeout: Optional[timedelta] = None,
758
start_to_close_timeout: Optional[timedelta] = None,
759
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
760
local_retry_threshold: Optional[timedelta] = None,
761
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
762
) -> ActivityHandle[Any]
763
```
764
765
**Parameters:**
766
- `activity` (Union[CallableType, str]): Local activity function or string name
767
- `arg` (Any): Single argument to pass to the activity
768
- `schedule_to_close_timeout` (Optional[timedelta]): Total time allowed for activity from schedule to completion
769
- `schedule_to_start_timeout` (Optional[timedelta]): Time allowed for activity to start from when scheduled
770
- `start_to_close_timeout` (Optional[timedelta]): Time allowed for activity to complete from start
771
- `retry_policy` (Optional[temporalio.common.RetryPolicy]): How the activity is retried on failure
772
- `local_retry_threshold` (Optional[timedelta]): Threshold for local retries vs server retries
773
- `cancellation_type` (ActivityCancellationType): How to handle activity cancellation
774
775
**Returns:**
776
- `ActivityHandle[Any]`: Handle to the started local activity
777
778
Start a local activity and return a handle to it. Local activities are executed in the same process as the workflow worker.
779
780
**Example:**
781
```python
782
# Inside a workflow
783
@temporalio.activity.defn
784
async def local_activity(data: str) -> str:
785
return data.upper()
786
787
# Start local activity
788
result = await temporalio.workflow.start_local_activity(
789
local_activity,
790
"hello world",
791
start_to_close_timeout=timedelta(seconds=30)
792
)
793
```
794
795
## Child Workflows and External Workflows
796
797
### Child Workflow Execution
798
799
```python { .api }
800
@overload
801
def start_child_workflow(
802
workflow: CallableAsyncType,
803
arg: AnyType,
804
*,
805
id: str,
806
task_queue: Optional[str] = None,
807
execution_timeout: Optional[timedelta] = None,
808
run_timeout: Optional[timedelta] = None,
809
task_timeout: Optional[timedelta] = None,
810
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
811
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
812
cron_schedule: str = "",
813
memo: Optional[Mapping[str, Any]] = None,
814
search_attributes: Optional[
815
Union[
816
temporalio.common.TypedSearchAttributes,
817
Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
818
]
819
] = None,
820
cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
821
parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
822
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
823
) -> ChildWorkflowHandle[ReturnType]: ...
824
825
def start_child_workflow(
826
workflow: Union[CallableType, str],
827
arg: Any = temporalio.common._arg_unset,
828
*,
829
id: str,
830
task_queue: Optional[str] = None,
831
execution_timeout: Optional[timedelta] = None,
832
run_timeout: Optional[timedelta] = None,
833
task_timeout: Optional[timedelta] = None,
834
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
835
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
836
cron_schedule: str = "",
837
memo: Optional[Mapping[str, Any]] = None,
838
search_attributes: Optional[
839
Union[
840
temporalio.common.TypedSearchAttributes,
841
Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
842
]
843
] = None,
844
cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
845
parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
846
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
847
) -> ChildWorkflowHandle[Any]
848
```
849
850
**Parameters:**
851
- `workflow` (Union[CallableType, str]): Child workflow function or string name
852
- `arg` (Any): Single argument to pass to the child workflow
853
- `id` (str): ID for the child workflow
854
- `task_queue` (Optional[str]): Task queue to run child workflow on
855
- `execution_timeout` (Optional[timedelta]): Maximum time for workflow execution
856
- `run_timeout` (Optional[timedelta]): Maximum time for workflow run
857
- `task_timeout` (Optional[timedelta]): Maximum time for workflow tasks
858
- `id_reuse_policy` (temporalio.common.WorkflowIDReusePolicy): Policy for workflow ID reuse
859
- `retry_policy` (Optional[temporalio.common.RetryPolicy]): How the workflow is retried on failure
860
- `cron_schedule` (str): Cron schedule for the workflow
861
- `memo` (Optional[Mapping[str, Any]]): Memo to attach to workflow
862
- `search_attributes` (Optional[Union[temporalio.common.TypedSearchAttributes, Mapping[Union[str, temporalio.common.SearchAttributeKey], Any]]]): Search attributes for the workflow
863
- `cancellation_type` (ChildWorkflowCancellationType): How to handle child workflow cancellation
864
- `parent_close_policy` (ParentClosePolicy): What to do with child when parent closes
865
- `versioning_intent` (Optional[temporalio.common.VersioningIntent]): Worker versioning intent
866
867
**Returns:**
868
- `ChildWorkflowHandle[Any]`: Handle to the started child workflow
869
870
Start a child workflow and return a handle to it.
871
872
**Example:**
873
```python
874
# Inside a workflow
875
@temporalio.workflow.defn
876
class ChildWorkflow:
877
@temporalio.workflow.run
878
async def run(self, data: str) -> str:
879
return f"Processed: {data}"
880
881
# Start child workflow
882
child_handle = temporalio.workflow.start_child_workflow(
883
ChildWorkflow.run,
884
"input data",
885
id="child-workflow-123",
886
task_queue="child-queue"
887
)
888
result = await child_handle
889
```
890
891
### External Workflow Handles
892
893
```python { .api }
894
@overload
895
def get_external_workflow_handle(workflow_id: str) -> ExternalWorkflowHandle: ...
896
897
@overload
898
def get_external_workflow_handle(
899
workflow_id: str, *, run_id: Optional[str] = None
900
) -> ExternalWorkflowHandle: ...
901
902
def get_external_workflow_handle(
903
workflow_id: str, *, run_id: Optional[str] = None
904
) -> ExternalWorkflowHandle
905
```
906
907
**Parameters:**
908
- `workflow_id` (str): Workflow ID of the external workflow
909
- `run_id` (Optional[str]): Run ID of the external workflow
910
911
**Returns:**
912
- `ExternalWorkflowHandle`: Handle for the external workflow
913
914
Get a handle to an external workflow by workflow ID and optional run ID.
915
916
```python { .api }
917
@overload
918
def get_external_workflow_handle_for(
919
workflow_type: CallableAsyncType, workflow_id: str
920
) -> ExternalWorkflowHandle[ReturnType]: ...
921
922
@overload
923
def get_external_workflow_handle_for(
924
workflow_type: CallableAsyncType,
925
workflow_id: str,
926
*,
927
run_id: Optional[str] = None
928
) -> ExternalWorkflowHandle[ReturnType]: ...
929
930
def get_external_workflow_handle_for(
931
workflow_type: Callable,
932
workflow_id: str,
933
*,
934
run_id: Optional[str] = None,
935
) -> ExternalWorkflowHandle[Any]
936
```
937
938
**Parameters:**
939
- `workflow_type` (Callable): Type of the external workflow
940
- `workflow_id` (str): Workflow ID of the external workflow
941
- `run_id` (Optional[str]): Run ID of the external workflow
942
943
**Returns:**
944
- `ExternalWorkflowHandle[Any]`: Typed handle for the external workflow
945
946
Get a typed handle to an external workflow by workflow type, workflow ID, and optional run ID.
947
948
**Example:**
949
```python
950
# Inside a workflow
951
# Get handle to external workflow
952
external_handle = temporalio.workflow.get_external_workflow_handle("external-workflow-id")
953
954
# Signal external workflow
955
await external_handle.signal("signal_name", "signal_data")
956
957
# Cancel external workflow
958
await external_handle.cancel()
959
```
960
961
### Continue-as-New
962
963
```python { .api }
964
@overload
965
def continue_as_new(
966
*,
967
task_queue: Optional[str] = None,
968
run_timeout: Optional[timedelta] = None,
969
task_timeout: Optional[timedelta] = None,
970
memo: Optional[Mapping[str, Any]] = None,
971
search_attributes: Optional[
972
Union[
973
temporalio.common.TypedSearchAttributes,
974
Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
975
]
976
] = None,
977
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
978
) -> NoReturn: ...
979
980
@overload
981
def continue_as_new(
982
arg: AnyType,
983
*,
984
task_queue: Optional[str] = None,
985
run_timeout: Optional[timedelta] = None,
986
task_timeout: Optional[timedelta] = None,
987
memo: Optional[Mapping[str, Any]] = None,
988
search_attributes: Optional[
989
Union[
990
temporalio.common.TypedSearchAttributes,
991
Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
992
]
993
] = None,
994
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
995
) -> NoReturn: ...
996
997
def continue_as_new(
998
arg: Any = temporalio.common._arg_unset,
999
*,
1000
task_queue: Optional[str] = None,
1001
run_timeout: Optional[timedelta] = None,
1002
task_timeout: Optional[timedelta] = None,
1003
memo: Optional[Mapping[str, Any]] = None,
1004
search_attributes: Optional[
1005
Union[
1006
temporalio.common.TypedSearchAttributes,
1007
Mapping[Union[str, temporalio.common.SearchAttributeKey], Any],
1008
]
1009
] = None,
1010
versioning_intent: Optional[temporalio.common.VersioningIntent] = None,
1011
) -> NoReturn
1012
```
1013
1014
**Parameters:**
1015
- `arg` (Any): Argument to pass to the continued workflow
1016
- `task_queue` (Optional[str]): Task queue for the continued workflow
1017
- `run_timeout` (Optional[timedelta]): Maximum time for the continued workflow run
1018
- `task_timeout` (Optional[timedelta]): Maximum time for continued workflow tasks
1019
- `memo` (Optional[Mapping[str, Any]]): Memo to attach to continued workflow
1020
- `search_attributes` (Optional[Union[temporalio.common.TypedSearchAttributes, Mapping[Union[str, temporalio.common.SearchAttributeKey], Any]]]): Search attributes for continued workflow
1021
- `versioning_intent` (Optional[temporalio.common.VersioningIntent]): Worker versioning intent
1022
1023
Continue-as-new the current workflow. This will complete the current workflow and start a new execution with the same workflow ID.
1024
1025
**Example:**
1026
```python
1027
# Inside a workflow
1028
@temporalio.workflow.defn
1029
class MyWorkflow:
1030
@temporalio.workflow.run
1031
async def run(self, iteration: int) -> str:
1032
if iteration >= 100:
1033
return f"Completed after {iteration} iterations"
1034
1035
# Continue as new with next iteration
1036
temporalio.workflow.continue_as_new(iteration + 1)
1037
```
1038
1039
## Signal, Query, and Update Handler Management
1040
1041
### Dynamic Handler Registration
1042
1043
```python { .api }
1044
def get_signal_handler(name: str) -> Optional[Callable]
1045
```
1046
1047
**Parameters:**
1048
- `name` (str): Signal handler name
1049
1050
**Returns:**
1051
- `Optional[Callable]`: Signal handler function if found
1052
1053
Get a signal handler by name.
1054
1055
```python { .api }
1056
def set_signal_handler(
1057
name: str, handler: Optional[Callable[..., None]]
1058
) -> None
1059
```
1060
1061
**Parameters:**
1062
- `name` (str): Signal handler name
1063
- `handler` (Optional[Callable[..., None]]): Signal handler function or None to unset
1064
1065
Set a signal handler for the given name. Set to None to unset.
1066
1067
```python { .api }
1068
def get_dynamic_signal_handler() -> Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]
1069
```
1070
1071
**Returns:**
1072
- `Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]`: Dynamic signal handler if set
1073
1074
Get the dynamic signal handler.
1075
1076
```python { .api }
1077
def set_dynamic_signal_handler(
1078
handler: Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]
1079
) -> None
1080
```
1081
1082
**Parameters:**
1083
- `handler` (Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]): Dynamic signal handler or None to unset
1084
1085
Set the dynamic signal handler. Set to None to unset.
1086
1087
```python { .api }
1088
def get_query_handler(name: str) -> Optional[Callable]
1089
```
1090
1091
**Parameters:**
1092
- `name` (str): Query handler name
1093
1094
**Returns:**
1095
- `Optional[Callable]`: Query handler function if found
1096
1097
Get a query handler by name.
1098
1099
```python { .api }
1100
def set_query_handler(
1101
name: str, handler: Optional[Callable[..., Any]]
1102
) -> None
1103
```
1104
1105
**Parameters:**
1106
- `name` (str): Query handler name
1107
- `handler` (Optional[Callable[..., Any]]): Query handler function or None to unset
1108
1109
Set a query handler for the given name. Set to None to unset.
1110
1111
```python { .api }
1112
def get_dynamic_query_handler() -> Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]
1113
```
1114
1115
**Returns:**
1116
- `Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]`: Dynamic query handler if set
1117
1118
Get the dynamic query handler.
1119
1120
```python { .api }
1121
def set_dynamic_query_handler(
1122
handler: Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]
1123
) -> None
1124
```
1125
1126
**Parameters:**
1127
- `handler` (Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]): Dynamic query handler or None to unset
1128
1129
Set the dynamic query handler. Set to None to unset.
1130
1131
```python { .api }
1132
def get_update_handler(name: str) -> Optional[Callable]
1133
```
1134
1135
**Parameters:**
1136
- `name` (str): Update handler name
1137
1138
**Returns:**
1139
- `Optional[Callable]`: Update handler function if found
1140
1141
Get an update handler by name.
1142
1143
```python { .api }
1144
def set_update_handler(
1145
name: str,
1146
handler: Optional[Callable[..., Any]],
1147
validator: Optional[Callable[..., None]] = None,
1148
) -> None
1149
```
1150
1151
**Parameters:**
1152
- `name` (str): Update handler name
1153
- `handler` (Optional[Callable[..., Any]]): Update handler function or None to unset
1154
- `validator` (Optional[Callable[..., None]]): Optional validator function
1155
1156
Set an update handler for the given name. Set to None to unset.
1157
1158
```python { .api }
1159
def get_dynamic_update_handler() -> Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]
1160
```
1161
1162
**Returns:**
1163
- `Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]`: Dynamic update handler if set
1164
1165
Get the dynamic update handler.
1166
1167
```python { .api }
1168
def set_dynamic_update_handler(
1169
handler: Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]],
1170
validator: Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]] = None,
1171
) -> None
1172
```
1173
1174
**Parameters:**
1175
- `handler` (Optional[Callable[[str, Sequence[temporalio.common.RawValue]], Any]]): Dynamic update handler or None to unset
1176
- `validator` (Optional[Callable[[str, Sequence[temporalio.common.RawValue]], None]]): Optional validator function
1177
1178
Set the dynamic update handler. Set to None to unset.
1179
1180
**Example:**
1181
```python
1182
# Inside a workflow
1183
def custom_signal_handler(message: str) -> None:
1184
print(f"Received signal: {message}")
1185
1186
def custom_query_handler(query_type: str) -> str:
1187
return f"Query response for {query_type}"
1188
1189
# Set handlers dynamically
1190
temporalio.workflow.set_signal_handler("custom_signal", custom_signal_handler)
1191
temporalio.workflow.set_query_handler("custom_query", custom_query_handler)
1192
1193
# Get handlers
1194
signal_handler = temporalio.workflow.get_signal_handler("custom_signal")
1195
query_handler = temporalio.workflow.get_query_handler("custom_query")
1196
```
1197
1198
## Advanced Features
1199
1200
### Nexus Operations
1201
1202
```python { .api }
1203
@overload
1204
def create_nexus_client(
1205
client: NexusClient[ServiceHandlerT], *, endpoint: str
1206
) -> ServiceHandlerT: ...
1207
1208
@overload
1209
def create_nexus_client(
1210
*, service: str, endpoint: str, serializer: nexusrpc.Serializer = nexusrpc.JSONSerializer()
1211
) -> temporalio.nexus.OperationClient: ...
1212
1213
def create_nexus_client(
1214
client: Optional[NexusClient[ServiceHandlerT]] = None,
1215
*,
1216
service: Optional[str] = None,
1217
endpoint: str,
1218
serializer: nexusrpc.Serializer = nexusrpc.JSONSerializer(),
1219
) -> Union[ServiceHandlerT, temporalio.nexus.OperationClient]
1220
```
1221
1222
**Parameters:**
1223
- `client` (Optional[NexusClient[ServiceHandlerT]]): Nexus client to use
1224
- `service` (Optional[str]): Service name for Nexus operations
1225
- `endpoint` (str): Nexus endpoint
1226
- `serializer` (nexusrpc.Serializer): Serializer for Nexus operations
1227
1228
**Returns:**
1229
- `Union[ServiceHandlerT, temporalio.nexus.OperationClient]`: Nexus client for operations
1230
1231
Create a Nexus client for calling operations on external Nexus services.
1232
1233
### Versioning and Patches
1234
1235
```python { .api }
1236
def patched(id: str) -> bool
1237
```
1238
1239
**Parameters:**
1240
- `id` (str): Patch identifier
1241
1242
**Returns:**
1243
- `bool`: Whether the patch has been applied
1244
1245
Check whether a patch has been applied. This is used for workflow code versioning.
1246
1247
```python { .api }
1248
def deprecate_patch(id: str) -> None
1249
```
1250
1251
**Parameters:**
1252
- `id` (str): Patch identifier to deprecate
1253
1254
Deprecate a patch. This removes the patch from future workflow executions but maintains compatibility with existing executions.
1255
1256
**Example:**
1257
```python
1258
# Inside a workflow
1259
if temporalio.workflow.patched("feature_v2"):
1260
# New version of the feature
1261
result = await new_feature_implementation()
1262
else:
1263
# Old version for replay compatibility
1264
result = await old_feature_implementation()
1265
1266
# In newer workflow versions, deprecate old patches
1267
temporalio.workflow.deprecate_patch("old_feature_v1")
1268
```
1269
1270
### Workflow Utilities
1271
1272
```python { .api }
1273
def all_handlers_finished() -> bool
1274
```
1275
1276
**Returns:**
1277
- `bool`: Whether all handlers have finished
1278
1279
Check whether all handlers (signal, query, update) have finished executing.
1280
1281
```python { .api }
1282
def as_completed(
1283
tasks: Iterable[Awaitable[AnyType]], *, timeout: Optional[float] = None
1284
) -> Iterator[Awaitable[AnyType]]
1285
```
1286
1287
**Parameters:**
1288
- `tasks` (Iterable[Awaitable[AnyType]]): Tasks to wait for completion
1289
- `timeout` (Optional[float]): Timeout in seconds
1290
1291
**Returns:**
1292
- `Iterator[Awaitable[AnyType]]`: Iterator of completed tasks
1293
1294
Return an iterator that yields tasks as they complete. Similar to `asyncio.as_completed` but works in workflows.
1295
1296
**Example:**
1297
```python
1298
# Inside a workflow
1299
tasks = [
1300
temporalio.workflow.start_activity(activity1, "arg1", schedule_to_close_timeout=timedelta(minutes=1)),
1301
temporalio.workflow.start_activity(activity2, "arg2", schedule_to_close_timeout=timedelta(minutes=1)),
1302
temporalio.workflow.start_activity(activity3, "arg3", schedule_to_close_timeout=timedelta(minutes=1))
1303
]
1304
1305
# Process tasks as they complete
1306
for task in temporalio.workflow.as_completed(tasks):
1307
result = await task
1308
print(f"Task completed with result: {result}")
1309
```
1310
1311
### Workflow Details Management
1312
1313
```python { .api }
1314
def get_current_details() -> str
1315
```
1316
1317
**Returns:**
1318
- `str`: Current workflow details
1319
1320
Get the current workflow details string.
1321
1322
```python { .api }
1323
def set_current_details(description: str) -> None
1324
```
1325
1326
**Parameters:**
1327
- `description` (str): Description to set as current workflow details
1328
1329
Set current workflow details. This appears in the workflow execution details in the Temporal Web UI.
1330
1331
**Example:**
1332
```python
1333
# Inside a workflow
1334
temporalio.workflow.set_current_details("Processing user data")
1335
await process_user_data()
1336
1337
temporalio.workflow.set_current_details("Sending notifications")
1338
await send_notifications()
1339
1340
current_status = temporalio.workflow.get_current_details()
1341
```
1342
1343
## Core Classes and Types
1344
1345
### Workflow Information Classes
1346
1347
```python { .api }
1348
class Info:
1349
"""Information about the running workflow."""
1350
1351
namespace: str
1352
workflow_id: str
1353
run_id: str
1354
workflow_type: str
1355
task_queue: str
1356
attempt: int
1357
cron_schedule: Optional[str]
1358
continued_from_execution_run_id: Optional[str]
1359
parent: Optional[ParentInfo]
1360
root: Optional[RootInfo]
1361
start_time: datetime
1362
execution_timeout: Optional[timedelta]
1363
run_timeout: Optional[timedelta]
1364
task_timeout: timedelta
1365
retry_policy: Optional[temporalio.common.RetryPolicy]
1366
typed_search_attributes: temporalio.common.TypedSearchAttributes
1367
raw_memo: Optional[Mapping[str, temporalio.api.common.v1.Payload]]
1368
headers: Mapping[str, temporalio.api.common.v1.Payload]
1369
unsafe: temporalio.workflow.unsafe
1370
```
1371
1372
```python { .api }
1373
class ParentInfo:
1374
"""Information about parent workflow if this is a child workflow."""
1375
1376
namespace: str
1377
workflow_id: str
1378
run_id: str
1379
```
1380
1381
```python { .api }
1382
class RootInfo:
1383
"""Information about root workflow if this is a child workflow."""
1384
1385
workflow_id: str
1386
run_id: str
1387
```
1388
1389
```python { .api }
1390
class UpdateInfo:
1391
"""Information about the currently running update."""
1392
1393
id: str
1394
name: str
1395
```
1396
1397
### Activity Handle
1398
1399
```python { .api }
1400
class ActivityHandle(Generic[ReturnType]):
1401
"""Handle for a running activity."""
1402
1403
@property
1404
def id(self) -> str:
1405
"""ID of the activity."""
1406
...
1407
1408
def cancel(self) -> None:
1409
"""Cancel the activity."""
1410
...
1411
1412
def is_cancelled(self) -> bool:
1413
"""Check if the activity is cancelled."""
1414
...
1415
1416
def is_done(self) -> bool:
1417
"""Check if the activity is complete."""
1418
...
1419
1420
def result(self) -> ReturnType:
1421
"""Get the result of the activity. Raises exception if not complete."""
1422
...
1423
```
1424
1425
### Child Workflow Handle
1426
1427
```python { .api }
1428
class ChildWorkflowHandle(Generic[ReturnType]):
1429
"""Handle for a running child workflow."""
1430
1431
@property
1432
def id(self) -> str:
1433
"""ID of the child workflow."""
1434
...
1435
1436
@property
1437
def first_execution_run_id(self) -> Optional[str]:
1438
"""First execution run ID of the child workflow."""
1439
...
1440
1441
async def signal(self, signal: str, arg: Any = temporalio.common._arg_unset) -> None:
1442
"""Send a signal to the child workflow."""
1443
...
1444
1445
def cancel(self) -> None:
1446
"""Cancel the child workflow."""
1447
...
1448
1449
def is_cancelled(self) -> bool:
1450
"""Check if the child workflow is cancelled."""
1451
...
1452
1453
def is_done(self) -> bool:
1454
"""Check if the child workflow is complete."""
1455
...
1456
1457
def result(self) -> ReturnType:
1458
"""Get the result of the child workflow. Raises exception if not complete."""
1459
...
1460
```
1461
1462
### External Workflow Handle
1463
1464
```python { .api }
1465
class ExternalWorkflowHandle(Generic[ReturnType]):
1466
"""Handle for an external workflow."""
1467
1468
@property
1469
def id(self) -> str:
1470
"""ID of the external workflow."""
1471
...
1472
1473
@property
1474
def run_id(self) -> Optional[str]:
1475
"""Run ID of the external workflow."""
1476
...
1477
1478
async def signal(self, signal: str, arg: Any = temporalio.common._arg_unset) -> None:
1479
"""Send a signal to the external workflow."""
1480
...
1481
1482
async def cancel(self) -> None:
1483
"""Cancel the external workflow."""
1484
...
1485
```
1486
1487
### Configuration Types
1488
1489
```python { .api }
1490
class ActivityConfig(TypedDict, total=False):
1491
"""Configuration for activity execution."""
1492
1493
task_queue: Optional[str]
1494
schedule_to_close_timeout: Optional[timedelta]
1495
schedule_to_start_timeout: Optional[timedelta]
1496
start_to_close_timeout: Optional[timedelta]
1497
heartbeat_timeout: Optional[timedelta]
1498
retry_policy: Optional[temporalio.common.RetryPolicy]
1499
cancellation_type: ActivityCancellationType
1500
versioning_intent: Optional[temporalio.common.VersioningIntent]
1501
local_retry_threshold: Optional[timedelta]
1502
```
1503
1504
```python { .api }
1505
class LocalActivityConfig(TypedDict, total=False):
1506
"""Configuration for local activity execution."""
1507
1508
schedule_to_close_timeout: Optional[timedelta]
1509
schedule_to_start_timeout: Optional[timedelta]
1510
start_to_close_timeout: Optional[timedelta]
1511
retry_policy: Optional[temporalio.common.RetryPolicy]
1512
local_retry_threshold: Optional[timedelta]
1513
cancellation_type: ActivityCancellationType
1514
```
1515
1516
```python { .api }
1517
class ChildWorkflowConfig(TypedDict, total=False):
1518
"""Configuration for child workflow execution."""
1519
1520
id: str
1521
task_queue: Optional[str]
1522
execution_timeout: Optional[timedelta]
1523
run_timeout: Optional[timedelta]
1524
task_timeout: Optional[timedelta]
1525
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy
1526
retry_policy: Optional[temporalio.common.RetryPolicy]
1527
cron_schedule: str
1528
memo: Optional[Mapping[str, Any]]
1529
search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, Mapping[Union[str, temporalio.common.SearchAttributeKey], Any]]]
1530
cancellation_type: ChildWorkflowCancellationType
1531
parent_close_policy: ParentClosePolicy
1532
versioning_intent: Optional[temporalio.common.VersioningIntent]
1533
```
1534
1535
### Enumeration Types
1536
1537
```python { .api }
1538
class HandlerUnfinishedPolicy(Enum):
1539
"""Actions taken if a workflow terminates with running handlers."""
1540
1541
WARN_AND_ABANDON = 1 # Issue a warning in addition to abandoning
1542
ABANDON = 2 # Abandon the handler
1543
```
1544
1545
```python { .api }
1546
class ActivityCancellationType(IntEnum):
1547
"""How to handle activity cancellation."""
1548
1549
TRY_CANCEL = 1 # Try to cancel the activity
1550
WAIT_CANCELLATION_COMPLETED = 2 # Wait for cancellation to complete
1551
ABANDON = 3 # Abandon the activity immediately
1552
```
1553
1554
```python { .api }
1555
class ChildWorkflowCancellationType(IntEnum):
1556
"""How to handle child workflow cancellation."""
1557
1558
WAIT_CANCELLATION_COMPLETED = 1 # Wait for cancellation to complete
1559
ABANDON = 2 # Abandon the child workflow immediately
1560
```
1561
1562
```python { .api }
1563
class ParentClosePolicy(IntEnum):
1564
"""What to do with child workflows when parent closes."""
1565
1566
TERMINATE = 1 # Terminate child workflows
1567
ABANDON = 2 # Abandon child workflows
1568
REQUEST_CANCEL = 3 # Request cancellation of child workflows
1569
```
1570
1571
### Exception Types
1572
1573
```python { .api }
1574
class ContinueAsNewError(BaseException):
1575
"""Exception raised to continue workflow as new."""
1576
pass
1577
```
1578
1579
```python { .api }
1580
class NondeterminismError(Exception):
1581
"""Exception for nondeterministic behavior in workflow."""
1582
pass
1583
```
1584
1585
```python { .api }
1586
class ReadOnlyContextError(Exception):
1587
"""Exception for read-only context violations."""
1588
pass
1589
```
1590
1591
```python { .api }
1592
class UnfinishedUpdateHandlersWarning(RuntimeWarning):
1593
"""Warning for unfinished update handlers when workflow exits."""
1594
pass
1595
```
1596
1597
```python { .api }
1598
class UnfinishedSignalHandlersWarning(RuntimeWarning):
1599
"""Warning for unfinished signal handlers when workflow exits."""
1600
pass
1601
```
1602
1603
This comprehensive guide covers all the essential workflow development capabilities in the Temporalio Python SDK. The workflow module provides the core building blocks for creating robust, scalable, and maintainable distributed applications using Temporal's orchestration engine.