0
# Activity Development
1
2
The temporalio.activity module provides functions and utilities for defining and executing activities within Temporal workflows. Activities are the building blocks of workflows that perform actual business logic, interact with external systems, and can be retried, timeout, and cancelled.
3
4
## Core Imports
5
6
```python
7
from temporalio import activity
8
from temporalio.activity import Info, LoggerAdapter, ActivityCancellationDetails
9
```
10
11
## Activity Definition
12
13
### Activity Decorator
14
15
The `@activity.defn` decorator is used to mark functions as Temporal activities:
16
17
```python { .api }
18
@overload
19
def defn(fn: CallableType) -> CallableType: ...
20
21
@overload
22
def defn(
23
*,
24
name: Optional[str] = None,
25
no_thread_cancel_exception: bool = False
26
) -> Callable[[CallableType], CallableType]: ...
27
28
@overload
29
def defn(
30
*,
31
no_thread_cancel_exception: bool = False,
32
dynamic: bool = False
33
) -> Callable[[CallableType], CallableType]: ...
34
35
def defn(
36
fn: Optional[CallableType] = None,
37
*,
38
name: Optional[str] = None,
39
no_thread_cancel_exception: bool = False,
40
dynamic: bool = False,
41
) -> Union[CallableType, Callable[[CallableType], CallableType]]:
42
"""Decorator for activity functions.
43
44
Activities can be async or non-async.
45
46
Args:
47
fn: The function to decorate.
48
name: Name to use for the activity. Defaults to function ``__name__``.
49
This cannot be set if dynamic is set.
50
no_thread_cancel_exception: If set to true, an exception will not be
51
raised in synchronous, threaded activities upon cancellation.
52
dynamic: If true, this activity will be dynamic. Dynamic activities have
53
to accept a single 'Sequence[RawValue]' parameter. This cannot be
54
set to true if name is present.
55
"""
56
```
57
58
#### Basic Activity Definition
59
60
```python
61
@activity.defn
62
async def process_data(data: str) -> str:
63
"""Async activity function."""
64
return f"Processed: {data}"
65
66
@activity.defn
67
def sync_process_data(data: str) -> str:
68
"""Synchronous activity function."""
69
return f"Processed: {data}"
70
```
71
72
#### Named Activities
73
74
```python
75
@activity.defn(name="custom_activity_name")
76
async def my_activity(input: str) -> str:
77
return f"Result: {input}"
78
```
79
80
#### Thread-Safe Cancellation Control
81
82
```python
83
@activity.defn(no_thread_cancel_exception=True)
84
def robust_activity(data: str) -> str:
85
"""Activity that won't have cancellation exceptions thrown in threads."""
86
# Critical cleanup logic that should not be interrupted
87
return process_critical_data(data)
88
```
89
90
#### Dynamic Activities
91
92
```python
93
from temporalio.common import RawValue
94
from typing import Sequence
95
96
@activity.defn(dynamic=True)
97
async def dynamic_activity(args: Sequence[RawValue]) -> Any:
98
"""Dynamic activity that can handle any activity type."""
99
# Use payload_converter() to decode raw values
100
converter = activity.payload_converter()
101
decoded_args = [converter.from_payload(arg.payload) for arg in args]
102
return process_dynamic_args(decoded_args)
103
```
104
105
## Activity Context Functions
106
107
### Client Access
108
109
```python { .api }
110
def client() -> temporalio.client.Client:
111
"""Return a Temporal Client for use in the current activity.
112
113
The client is only available in `async def` activities.
114
115
In tests it is not available automatically, but you can pass a client when creating a
116
:py:class:`temporalio.testing.ActivityEnvironment`.
117
118
Returns:
119
:py:class:`temporalio.client.Client` for use in the current activity.
120
121
Raises:
122
RuntimeError: When the client is not available.
123
"""
124
```
125
126
```python
127
@activity.defn
128
async def query_external_service(user_id: str) -> dict:
129
"""Activity that uses the client to interact with Temporal."""
130
client = activity.client()
131
132
# Can use client to start child workflows, query other executions, etc.
133
result = await client.execute_workflow(
134
"external_workflow",
135
user_id,
136
id=f"external-{user_id}",
137
task_queue="external-queue"
138
)
139
return result
140
```
141
142
### Activity Information Access
143
144
```python { .api }
145
def info() -> Info:
146
"""Current activity's info.
147
148
Returns:
149
Info for the currently running activity.
150
151
Raises:
152
RuntimeError: When not in an activity.
153
"""
154
```
155
156
```python
157
@activity.defn
158
async def tracked_activity(data: str) -> str:
159
"""Activity that uses info for logging and tracking."""
160
info = activity.info()
161
162
# Access activity execution details
163
activity_logger.info(
164
f"Processing activity {info.activity_type} "
165
f"(attempt {info.attempt}) for workflow {info.workflow_id}"
166
)
167
168
return f"Processed {data} in attempt {info.attempt}"
169
```
170
171
### Context Detection
172
173
```python { .api }
174
def in_activity() -> bool:
175
"""Whether the current code is inside an activity.
176
177
Returns:
178
True if in an activity, False otherwise.
179
"""
180
```
181
182
```python
183
def conditional_logic():
184
"""Function that behaves differently in activity vs non-activity context."""
185
if activity.in_activity():
186
# Use activity-specific logging and error handling
187
info = activity.info()
188
logger = activity.logger
189
logger.info(f"Running in activity {info.activity_type}")
190
else:
191
# Use regular application logging
192
logger = logging.getLogger(__name__)
193
logger.info("Running outside activity context")
194
```
195
196
## Activity Lifecycle Management
197
198
### Heartbeat Functionality
199
200
```python { .api }
201
def heartbeat(*details: Any) -> None:
202
"""Send a heartbeat for the current activity.
203
204
Raises:
205
RuntimeError: When not in an activity.
206
"""
207
```
208
209
Heartbeats are essential for long-running activities to signal that they are still alive and making progress:
210
211
```python
212
@activity.defn
213
async def long_running_task(items: List[str]) -> List[str]:
214
"""Activity that reports progress via heartbeats."""
215
results = []
216
217
for i, item in enumerate(items):
218
# Process the item
219
result = await process_item(item)
220
results.append(result)
221
222
# Send heartbeat with progress details
223
progress = {
224
"processed": i + 1,
225
"total": len(items),
226
"last_item": item
227
}
228
activity.heartbeat(progress)
229
230
# Check for cancellation periodically
231
if activity.is_cancelled():
232
break
233
234
return results
235
```
236
237
### Cancellation Detection
238
239
```python { .api }
240
def is_cancelled() -> bool:
241
"""Whether a cancellation was ever requested on this activity.
242
243
Returns:
244
True if the activity has had a cancellation request, False otherwise.
245
246
Raises:
247
RuntimeError: When not in an activity.
248
"""
249
```
250
251
```python
252
@activity.defn
253
async def cancellable_activity(duration_seconds: int) -> str:
254
"""Activity that gracefully handles cancellation."""
255
start_time = time.time()
256
257
while time.time() - start_time < duration_seconds:
258
# Check for cancellation
259
if activity.is_cancelled():
260
# Perform cleanup
261
await cleanup_resources()
262
return "Cancelled gracefully"
263
264
# Do some work
265
await asyncio.sleep(1)
266
activity.heartbeat({"elapsed": time.time() - start_time})
267
268
return "Completed successfully"
269
```
270
271
### Worker Shutdown Detection
272
273
```python { .api }
274
def is_worker_shutdown() -> bool:
275
"""Whether shutdown has been invoked on the worker.
276
277
Returns:
278
True if shutdown has been called on the worker, False otherwise.
279
280
Raises:
281
RuntimeError: When not in an activity.
282
"""
283
```
284
285
```python
286
@activity.defn
287
async def shutdown_aware_activity(data: List[str]) -> List[str]:
288
"""Activity that handles worker shutdown gracefully."""
289
results = []
290
291
for item in data:
292
# Check if worker is shutting down
293
if activity.is_worker_shutdown():
294
# Save partial progress and exit cleanly
295
await save_partial_results(results)
296
raise Exception("Worker shutting down, partial results saved")
297
298
result = await process_item(item)
299
results.append(result)
300
301
return results
302
```
303
304
## Activity Cancellation and Control
305
306
### Cancellation Details
307
308
```python { .api }
309
def cancellation_details() -> Optional[ActivityCancellationDetails]:
310
"""Cancellation details of the current activity, if any. Once set, cancellation details do not change."""
311
```
312
313
The `ActivityCancellationDetails` class provides detailed information about why an activity was cancelled:
314
315
```python { .api }
316
@dataclass(frozen=True)
317
class ActivityCancellationDetails:
318
"""Provides the reasons for the activity's cancellation. Cancellation details are set once and do not change once set."""
319
320
not_found: bool = False
321
cancel_requested: bool = False
322
paused: bool = False
323
timed_out: bool = False
324
worker_shutdown: bool = False
325
```
326
327
```python
328
@activity.defn
329
async def detailed_cancellation_handling() -> str:
330
"""Activity that provides detailed cancellation information."""
331
try:
332
# Perform long-running work
333
await perform_work()
334
return "Success"
335
except Exception:
336
# Check cancellation details
337
details = activity.cancellation_details()
338
if details:
339
if details.timed_out:
340
await handle_timeout_cleanup()
341
return "Timed out, cleanup completed"
342
elif details.cancel_requested:
343
await handle_cancellation_cleanup()
344
return "Cancelled by request, cleanup completed"
345
elif details.worker_shutdown:
346
await handle_shutdown_cleanup()
347
return "Worker shutdown, cleanup completed"
348
raise
349
```
350
351
### Shielding from Cancellation
352
353
```python { .api }
354
@contextmanager
355
def shield_thread_cancel_exception() -> Iterator[None]:
356
"""Context manager for synchronous multithreaded activities to delay
357
cancellation exceptions.
358
359
By default, synchronous multithreaded activities have an exception thrown
360
inside when cancellation occurs. Code within a "with" block of this context
361
manager will delay that throwing until the end. Even if the block returns a
362
value or throws its own exception, if a cancellation exception is pending,
363
it is thrown instead. Therefore users are encouraged to not throw out of
364
this block and can surround this with a try/except if they wish to catch a
365
cancellation.
366
367
This properly supports nested calls and will only throw after the last one.
368
369
This just runs the blocks with no extra effects for async activities or
370
synchronous multiprocess/other activities.
371
372
Raises:
373
temporalio.exceptions.CancelledError: If a cancellation occurs anytime
374
during this block and this is not nested in another shield block.
375
"""
376
```
377
378
```python
379
@activity.defn
380
def sync_activity_with_cleanup(data: str) -> str:
381
"""Synchronous activity that performs critical cleanup."""
382
try:
383
# Critical section that should not be interrupted
384
with activity.shield_thread_cancel_exception():
385
# Perform critical operations
386
critical_result = perform_critical_operation(data)
387
388
# Critical cleanup that must complete
389
cleanup_critical_resources()
390
391
return critical_result
392
except temporalio.exceptions.CancelledError:
393
# Handle cancellation after cleanup is complete
394
return "Cancelled after cleanup"
395
```
396
397
### Synchronous Cancellation Waiting
398
399
```python { .api }
400
def wait_for_cancelled_sync(timeout: Optional[Union[timedelta, float]] = None) -> None:
401
"""Synchronously block while waiting for a cancellation request on this
402
activity.
403
404
This is essentially a wrapper around :py:meth:`threading.Event.wait`.
405
406
Args:
407
timeout: Max amount of time to wait for cancellation.
408
409
Raises:
410
RuntimeError: When not in an activity.
411
"""
412
```
413
414
```python
415
@activity.defn
416
def sync_monitoring_activity(check_interval: float = 1.0) -> str:
417
"""Synchronous activity that monitors for cancellation."""
418
while True:
419
# Perform some work
420
result = perform_work_unit()
421
422
# Wait for cancellation with timeout
423
activity.wait_for_cancelled_sync(timeout=check_interval)
424
425
# Check if actually cancelled
426
if activity.is_cancelled():
427
return "Cancelled during monitoring"
428
429
# Continue if timeout occurred (not cancelled)
430
if work_is_complete(result):
431
return "Work completed successfully"
432
```
433
434
### Worker Shutdown Waiting
435
436
```python { .api }
437
def wait_for_worker_shutdown_sync(
438
timeout: Optional[Union[timedelta, float]] = None,
439
) -> None:
440
"""Synchronously block while waiting for shutdown to be called on the
441
worker.
442
443
This is essentially a wrapper around :py:meth:`threading.Event.wait`.
444
445
Args:
446
timeout: Max amount of time to wait for shutdown to be called on the
447
worker.
448
449
Raises:
450
RuntimeError: When not in an activity.
451
"""
452
```
453
454
```python
455
@activity.defn
456
def graceful_shutdown_activity(work_items: List[str]) -> List[str]:
457
"""Activity that gracefully handles worker shutdown."""
458
results = []
459
460
for item in work_items:
461
# Process item
462
result = process_item(item)
463
results.append(result)
464
465
# Check for worker shutdown with timeout
466
activity.wait_for_worker_shutdown_sync(timeout=0.1)
467
468
if activity.is_worker_shutdown():
469
# Save partial results before shutdown
470
save_results(results)
471
raise Exception(f"Worker shutdown, saved {len(results)} results")
472
473
return results
474
```
475
476
## Async Activity Completion
477
478
### External Completion
479
480
```python { .api }
481
def raise_complete_async() -> NoReturn:
482
"""Raise an error that says the activity will be completed
483
asynchronously.
484
"""
485
```
486
487
For activities that need to be completed by external systems:
488
489
```python
490
@activity.defn
491
async def async_completion_activity(task_id: str) -> str:
492
"""Activity that will be completed externally."""
493
# Start external process
494
external_system.start_task(task_id, callback_url="http://callback/complete")
495
496
# Register for external completion
497
activity.raise_complete_async()
498
# This line will never be reached
499
```
500
501
The external system would then complete the activity using the client:
502
503
```python
504
# External system completion
505
client = Client.connect("localhost:7233")
506
await client.complete_activity_by_task_token(
507
task_token=activity_task_token,
508
result="External completion result"
509
)
510
```
511
512
## Utility Functions
513
514
### Payload Converter Access
515
516
```python { .api }
517
def payload_converter() -> temporalio.converter.PayloadConverter:
518
"""Get the payload converter for the current activity.
519
520
This is often used for dynamic activities to convert payloads.
521
"""
522
```
523
524
```python
525
@activity.defn(dynamic=True)
526
async def dynamic_payload_activity(args: Sequence[temporalio.common.RawValue]) -> Any:
527
"""Dynamic activity that handles various payload types."""
528
converter = activity.payload_converter()
529
530
# Convert raw payloads to Python objects
531
decoded_args = []
532
for raw_value in args:
533
decoded = converter.from_payload(raw_value.payload)
534
decoded_args.append(decoded)
535
536
# Process based on argument types
537
if len(decoded_args) == 1 and isinstance(decoded_args[0], str):
538
return f"String processing: {decoded_args[0]}"
539
elif len(decoded_args) == 2 and all(isinstance(arg, int) for arg in decoded_args):
540
return f"Math result: {decoded_args[0] + decoded_args[1]}"
541
else:
542
return f"Generic processing: {decoded_args}"
543
```
544
545
### Metric Meter Access
546
547
```python { .api }
548
def metric_meter() -> temporalio.common.MetricMeter:
549
"""Get the metric meter for the current activity.
550
551
.. warning::
552
This is only available in async or synchronous threaded activities. An
553
error is raised on non-thread-based sync activities when trying to
554
access this.
555
556
Returns:
557
Current metric meter for this activity for recording metrics.
558
559
Raises:
560
RuntimeError: When not in an activity or in a non-thread-based
561
synchronous activity.
562
"""
563
```
564
565
```python
566
@activity.defn
567
async def metrics_activity(data_size: int) -> str:
568
"""Activity that records custom metrics."""
569
meter = activity.metric_meter()
570
571
# Create custom metrics
572
processing_counter = meter.create_counter(
573
"activity_items_processed",
574
"Number of items processed by activity"
575
)
576
577
processing_histogram = meter.create_histogram(
578
"activity_processing_duration",
579
"Time spent processing items"
580
)
581
582
start_time = time.time()
583
584
# Process data with metrics
585
for i in range(data_size):
586
await process_item(i)
587
processing_counter.add(1)
588
589
duration = time.time() - start_time
590
processing_histogram.record(duration)
591
592
return f"Processed {data_size} items in {duration:.2f}s"
593
```
594
595
## Core Classes and Types
596
597
### Info Class
598
599
```python { .api }
600
@dataclass(frozen=True)
601
class Info:
602
"""Information about the running activity.
603
604
Retrieved inside an activity via :py:func:`info`.
605
"""
606
607
activity_id: str
608
activity_type: str
609
attempt: int
610
current_attempt_scheduled_time: datetime
611
heartbeat_details: Sequence[Any]
612
heartbeat_timeout: Optional[timedelta]
613
is_local: bool
614
schedule_to_close_timeout: Optional[timedelta]
615
scheduled_time: datetime
616
start_to_close_timeout: Optional[timedelta]
617
started_time: datetime
618
task_queue: str
619
task_token: bytes
620
workflow_id: str
621
workflow_namespace: str
622
workflow_run_id: str
623
workflow_type: str
624
priority: temporalio.common.Priority
625
```
626
627
The Info class provides comprehensive information about the activity execution context:
628
629
```python
630
@activity.defn
631
async def info_logging_activity(data: str) -> str:
632
"""Activity that logs detailed execution information."""
633
info = activity.info()
634
635
# Log execution details
636
logger.info(f"Activity {info.activity_type} started")
637
logger.info(f"Attempt {info.attempt} of activity {info.activity_id}")
638
logger.info(f"Running for workflow {info.workflow_type}:{info.workflow_id}")
639
logger.info(f"Task queue: {info.task_queue}")
640
logger.info(f"Is local activity: {info.is_local}")
641
642
if info.heartbeat_timeout:
643
logger.info(f"Heartbeat timeout: {info.heartbeat_timeout}")
644
645
if info.start_to_close_timeout:
646
logger.info(f"Start-to-close timeout: {info.start_to_close_timeout}")
647
648
# Use priority information
649
if info.priority.priority_key:
650
logger.info(f"Priority key: {info.priority.priority_key}")
651
652
return f"Processed {data} in attempt {info.attempt}"
653
```
654
655
### Priority Class
656
657
```python { .api }
658
@dataclass(frozen=True)
659
class Priority:
660
"""Priority contains metadata that controls relative ordering of task processing when tasks are
661
backlogged in a queue."""
662
663
priority_key: Optional[int] = None
664
"""Priority key is a positive integer from 1 to n, where smaller integers correspond to higher
665
priorities (tasks run sooner)."""
666
667
fairness_key: Optional[str] = None
668
"""A short string (max 64 bytes) that is used as a key for a fairness balancing mechanism."""
669
670
fairness_weight: Optional[float] = None
671
"""A float that represents the weight for task dispatch for the associated fairness key."""
672
673
default: ClassVar[Priority]
674
"""Singleton default priority instance."""
675
```
676
677
### LoggerAdapter Class
678
679
```python { .api }
680
class LoggerAdapter(logging.LoggerAdapter):
681
"""Adapter that adds details to the log about the running activity.
682
683
Attributes:
684
activity_info_on_message: Boolean for whether a string representation of
685
a dict of some activity info will be appended to each message.
686
Default is True.
687
activity_info_on_extra: Boolean for whether a ``temporal_activity``
688
dictionary value will be added to the ``extra`` dictionary with some
689
activity info, making it present on the ``LogRecord.__dict__`` for
690
use by others. Default is True.
691
full_activity_info_on_extra: Boolean for whether an ``activity_info``
692
value will be added to the ``extra`` dictionary with the entire
693
activity info, making it present on the ``LogRecord.__dict__`` for
694
use by others. Default is False.
695
"""
696
697
def __init__(
698
self, logger: logging.Logger, extra: Optional[Mapping[str, Any]]
699
) -> None:
700
"""Create the logger adapter."""
701
702
def process(
703
self, msg: Any, kwargs: MutableMapping[str, Any]
704
) -> Tuple[Any, MutableMapping[str, Any]]:
705
"""Override to add activity details."""
706
707
@property
708
def base_logger(self) -> logging.Logger:
709
"""Underlying logger usable for actions such as adding
710
handlers/formatters.
711
"""
712
```
713
714
The SDK provides a pre-configured logger adapter:
715
716
```python { .api }
717
logger: LoggerAdapter
718
"""Logger that will have contextual activity details embedded."""
719
```
720
721
```python
722
@activity.defn
723
async def logging_activity(message: str) -> str:
724
"""Activity that demonstrates contextual logging."""
725
# Use the pre-configured logger with activity context
726
activity.logger.info(f"Processing message: {message}")
727
activity.logger.warning("This is a warning with activity context")
728
729
# Create custom logger adapter
730
custom_logger = activity.LoggerAdapter(
731
logging.getLogger("custom"),
732
{"custom_field": "custom_value"}
733
)
734
custom_logger.activity_info_on_message = False
735
custom_logger.full_activity_info_on_extra = True
736
737
custom_logger.info("Custom logging with full activity info in extra")
738
739
return f"Logged: {message}"
740
```
741
742
### Supporting Types
743
744
#### CallableType
745
746
```python { .api }
747
CallableType = TypeVar('CallableType', bound=Callable)
748
"""Type variable for callable functions used in activity definitions."""
749
```
750
751
#### RawValue
752
753
```python { .api }
754
@dataclass(frozen=True)
755
class RawValue:
756
"""Raw value container for dynamic activities."""
757
758
payload: temporalio.api.common.v1.Payload
759
"""The raw payload data."""
760
```
761
762
## Complete Activity Examples
763
764
### Robust Activity with All Features
765
766
```python
767
@activity.defn(name="robust_data_processor")
768
async def robust_activity(
769
data: List[dict],
770
batch_size: int = 10,
771
timeout_seconds: int = 300
772
) -> dict:
773
"""Comprehensive activity demonstrating all features."""
774
info = activity.info()
775
meter = activity.metric_meter()
776
777
# Create metrics
778
items_processed = meter.create_counter("items_processed", "Items processed")
779
processing_duration = meter.create_histogram("processing_duration", "Processing time")
780
781
activity.logger.info(f"Starting robust processing of {len(data)} items")
782
783
results = []
784
start_time = time.time()
785
786
try:
787
for i, item in enumerate(data):
788
# Check for various cancellation conditions
789
if activity.is_cancelled():
790
details = activity.cancellation_details()
791
if details and details.timed_out:
792
activity.logger.warning("Activity timed out, saving partial results")
793
await save_partial_results(results)
794
break
795
796
if activity.is_worker_shutdown():
797
activity.logger.warning("Worker shutting down, saving progress")
798
await save_partial_results(results)
799
break
800
801
# Process item with error handling
802
try:
803
result = await process_complex_item(item)
804
results.append(result)
805
items_processed.add(1)
806
except Exception as e:
807
activity.logger.error(f"Failed to process item {i}: {e}")
808
continue
809
810
# Send heartbeat every batch
811
if (i + 1) % batch_size == 0:
812
progress = {
813
"processed": len(results),
814
"total": len(data),
815
"success_rate": len(results) / (i + 1),
816
"elapsed_time": time.time() - start_time
817
}
818
activity.heartbeat(progress)
819
820
duration = time.time() - start_time
821
processing_duration.record(duration)
822
823
final_result = {
824
"total_items": len(data),
825
"processed_items": len(results),
826
"success_rate": len(results) / len(data) if data else 0,
827
"processing_time": duration,
828
"activity_info": {
829
"activity_id": info.activity_id,
830
"attempt": info.attempt,
831
"workflow_id": info.workflow_id
832
}
833
}
834
835
activity.logger.info(f"Successfully processed {len(results)}/{len(data)} items")
836
return final_result
837
838
except Exception as e:
839
activity.logger.error(f"Activity failed: {e}")
840
# Save whatever progress we made
841
await save_partial_results(results)
842
raise
843
```
844
845
This comprehensive activity.md sub-doc provides complete coverage of the temporalio.activity module, including all API signatures, detailed parameter documentation, usage examples, and comprehensive type definitions. The documentation follows the Knowledge Tile format and provides developers with everything needed to effectively use activities in their Temporal workflows.