Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.
The temporalio.activity module provides functions and utilities for defining and executing activities within Temporal workflows. Activities are the building blocks of workflows that perform actual business logic, interact with external systems, and can be retried, timeout, and cancelled.
from temporalio import activity
from temporalio.activity import Info, LoggerAdapter, ActivityCancellationDetailsThe @activity.defn decorator is used to mark functions as Temporal activities:
@overload
def defn(fn: CallableType) -> CallableType: ...
@overload
def defn(
*,
name: Optional[str] = None,
no_thread_cancel_exception: bool = False
) -> Callable[[CallableType], CallableType]: ...
@overload
def defn(
*,
no_thread_cancel_exception: bool = False,
dynamic: bool = False
) -> Callable[[CallableType], CallableType]: ...
def defn(
fn: Optional[CallableType] = None,
*,
name: Optional[str] = None,
no_thread_cancel_exception: bool = False,
dynamic: bool = False,
) -> Union[CallableType, Callable[[CallableType], CallableType]]:
"""Decorator for activity functions.
Activities can be async or non-async.
Args:
fn: The function to decorate.
name: Name to use for the activity. Defaults to function ``__name__``.
This cannot be set if dynamic is set.
no_thread_cancel_exception: If set to true, an exception will not be
raised in synchronous, threaded activities upon cancellation.
dynamic: If true, this activity will be dynamic. Dynamic activities have
to accept a single 'Sequence[RawValue]' parameter. This cannot be
set to true if name is present.
"""@activity.defn
async def process_data(data: str) -> str:
"""Async activity function."""
return f"Processed: {data}"
@activity.defn
def sync_process_data(data: str) -> str:
"""Synchronous activity function."""
return f"Processed: {data}"@activity.defn(name="custom_activity_name")
async def my_activity(input: str) -> str:
return f"Result: {input}"@activity.defn(no_thread_cancel_exception=True)
def robust_activity(data: str) -> str:
"""Activity that won't have cancellation exceptions thrown in threads."""
# Critical cleanup logic that should not be interrupted
return process_critical_data(data)from temporalio.common import RawValue
from typing import Sequence
@activity.defn(dynamic=True)
async def dynamic_activity(args: Sequence[RawValue]) -> Any:
"""Dynamic activity that can handle any activity type."""
# Use payload_converter() to decode raw values
converter = activity.payload_converter()
decoded_args = [converter.from_payload(arg.payload) for arg in args]
return process_dynamic_args(decoded_args)def client() -> temporalio.client.Client:
"""Return a Temporal Client for use in the current activity.
The client is only available in `async def` activities.
In tests it is not available automatically, but you can pass a client when creating a
:py:class:`temporalio.testing.ActivityEnvironment`.
Returns:
:py:class:`temporalio.client.Client` for use in the current activity.
Raises:
RuntimeError: When the client is not available.
"""@activity.defn
async def query_external_service(user_id: str) -> dict:
"""Activity that uses the client to interact with Temporal."""
client = activity.client()
# Can use client to start child workflows, query other executions, etc.
result = await client.execute_workflow(
"external_workflow",
user_id,
id=f"external-{user_id}",
task_queue="external-queue"
)
return resultdef info() -> Info:
"""Current activity's info.
Returns:
Info for the currently running activity.
Raises:
RuntimeError: When not in an activity.
"""@activity.defn
async def tracked_activity(data: str) -> str:
"""Activity that uses info for logging and tracking."""
info = activity.info()
# Access activity execution details
activity_logger.info(
f"Processing activity {info.activity_type} "
f"(attempt {info.attempt}) for workflow {info.workflow_id}"
)
return f"Processed {data} in attempt {info.attempt}"def in_activity() -> bool:
"""Whether the current code is inside an activity.
Returns:
True if in an activity, False otherwise.
"""def conditional_logic():
"""Function that behaves differently in activity vs non-activity context."""
if activity.in_activity():
# Use activity-specific logging and error handling
info = activity.info()
logger = activity.logger
logger.info(f"Running in activity {info.activity_type}")
else:
# Use regular application logging
logger = logging.getLogger(__name__)
logger.info("Running outside activity context")def heartbeat(*details: Any) -> None:
"""Send a heartbeat for the current activity.
Raises:
RuntimeError: When not in an activity.
"""Heartbeats are essential for long-running activities to signal that they are still alive and making progress:
@activity.defn
async def long_running_task(items: List[str]) -> List[str]:
"""Activity that reports progress via heartbeats."""
results = []
for i, item in enumerate(items):
# Process the item
result = await process_item(item)
results.append(result)
# Send heartbeat with progress details
progress = {
"processed": i + 1,
"total": len(items),
"last_item": item
}
activity.heartbeat(progress)
# Check for cancellation periodically
if activity.is_cancelled():
break
return resultsdef is_cancelled() -> bool:
"""Whether a cancellation was ever requested on this activity.
Returns:
True if the activity has had a cancellation request, False otherwise.
Raises:
RuntimeError: When not in an activity.
"""@activity.defn
async def cancellable_activity(duration_seconds: int) -> str:
"""Activity that gracefully handles cancellation."""
start_time = time.time()
while time.time() - start_time < duration_seconds:
# Check for cancellation
if activity.is_cancelled():
# Perform cleanup
await cleanup_resources()
return "Cancelled gracefully"
# Do some work
await asyncio.sleep(1)
activity.heartbeat({"elapsed": time.time() - start_time})
return "Completed successfully"def is_worker_shutdown() -> bool:
"""Whether shutdown has been invoked on the worker.
Returns:
True if shutdown has been called on the worker, False otherwise.
Raises:
RuntimeError: When not in an activity.
"""@activity.defn
async def shutdown_aware_activity(data: List[str]) -> List[str]:
"""Activity that handles worker shutdown gracefully."""
results = []
for item in data:
# Check if worker is shutting down
if activity.is_worker_shutdown():
# Save partial progress and exit cleanly
await save_partial_results(results)
raise Exception("Worker shutting down, partial results saved")
result = await process_item(item)
results.append(result)
return resultsdef cancellation_details() -> Optional[ActivityCancellationDetails]:
"""Cancellation details of the current activity, if any. Once set, cancellation details do not change."""The ActivityCancellationDetails class provides detailed information about why an activity was cancelled:
@dataclass(frozen=True)
class ActivityCancellationDetails:
"""Provides the reasons for the activity's cancellation. Cancellation details are set once and do not change once set."""
not_found: bool = False
cancel_requested: bool = False
paused: bool = False
timed_out: bool = False
worker_shutdown: bool = False@activity.defn
async def detailed_cancellation_handling() -> str:
"""Activity that provides detailed cancellation information."""
try:
# Perform long-running work
await perform_work()
return "Success"
except Exception:
# Check cancellation details
details = activity.cancellation_details()
if details:
if details.timed_out:
await handle_timeout_cleanup()
return "Timed out, cleanup completed"
elif details.cancel_requested:
await handle_cancellation_cleanup()
return "Cancelled by request, cleanup completed"
elif details.worker_shutdown:
await handle_shutdown_cleanup()
return "Worker shutdown, cleanup completed"
raise@contextmanager
def shield_thread_cancel_exception() -> Iterator[None]:
"""Context manager for synchronous multithreaded activities to delay
cancellation exceptions.
By default, synchronous multithreaded activities have an exception thrown
inside when cancellation occurs. Code within a "with" block of this context
manager will delay that throwing until the end. Even if the block returns a
value or throws its own exception, if a cancellation exception is pending,
it is thrown instead. Therefore users are encouraged to not throw out of
this block and can surround this with a try/except if they wish to catch a
cancellation.
This properly supports nested calls and will only throw after the last one.
This just runs the blocks with no extra effects for async activities or
synchronous multiprocess/other activities.
Raises:
temporalio.exceptions.CancelledError: If a cancellation occurs anytime
during this block and this is not nested in another shield block.
"""@activity.defn
def sync_activity_with_cleanup(data: str) -> str:
"""Synchronous activity that performs critical cleanup."""
try:
# Critical section that should not be interrupted
with activity.shield_thread_cancel_exception():
# Perform critical operations
critical_result = perform_critical_operation(data)
# Critical cleanup that must complete
cleanup_critical_resources()
return critical_result
except temporalio.exceptions.CancelledError:
# Handle cancellation after cleanup is complete
return "Cancelled after cleanup"def wait_for_cancelled_sync(timeout: Optional[Union[timedelta, float]] = None) -> None:
"""Synchronously block while waiting for a cancellation request on this
activity.
This is essentially a wrapper around :py:meth:`threading.Event.wait`.
Args:
timeout: Max amount of time to wait for cancellation.
Raises:
RuntimeError: When not in an activity.
"""@activity.defn
def sync_monitoring_activity(check_interval: float = 1.0) -> str:
"""Synchronous activity that monitors for cancellation."""
while True:
# Perform some work
result = perform_work_unit()
# Wait for cancellation with timeout
activity.wait_for_cancelled_sync(timeout=check_interval)
# Check if actually cancelled
if activity.is_cancelled():
return "Cancelled during monitoring"
# Continue if timeout occurred (not cancelled)
if work_is_complete(result):
return "Work completed successfully"def wait_for_worker_shutdown_sync(
timeout: Optional[Union[timedelta, float]] = None,
) -> None:
"""Synchronously block while waiting for shutdown to be called on the
worker.
This is essentially a wrapper around :py:meth:`threading.Event.wait`.
Args:
timeout: Max amount of time to wait for shutdown to be called on the
worker.
Raises:
RuntimeError: When not in an activity.
"""@activity.defn
def graceful_shutdown_activity(work_items: List[str]) -> List[str]:
"""Activity that gracefully handles worker shutdown."""
results = []
for item in work_items:
# Process item
result = process_item(item)
results.append(result)
# Check for worker shutdown with timeout
activity.wait_for_worker_shutdown_sync(timeout=0.1)
if activity.is_worker_shutdown():
# Save partial results before shutdown
save_results(results)
raise Exception(f"Worker shutdown, saved {len(results)} results")
return resultsdef raise_complete_async() -> NoReturn:
"""Raise an error that says the activity will be completed
asynchronously.
"""For activities that need to be completed by external systems:
@activity.defn
async def async_completion_activity(task_id: str) -> str:
"""Activity that will be completed externally."""
# Start external process
external_system.start_task(task_id, callback_url="http://callback/complete")
# Register for external completion
activity.raise_complete_async()
# This line will never be reachedThe external system would then complete the activity using the client:
# External system completion
client = Client.connect("localhost:7233")
await client.complete_activity_by_task_token(
task_token=activity_task_token,
result="External completion result"
)def payload_converter() -> temporalio.converter.PayloadConverter:
"""Get the payload converter for the current activity.
This is often used for dynamic activities to convert payloads.
"""@activity.defn(dynamic=True)
async def dynamic_payload_activity(args: Sequence[temporalio.common.RawValue]) -> Any:
"""Dynamic activity that handles various payload types."""
converter = activity.payload_converter()
# Convert raw payloads to Python objects
decoded_args = []
for raw_value in args:
decoded = converter.from_payload(raw_value.payload)
decoded_args.append(decoded)
# Process based on argument types
if len(decoded_args) == 1 and isinstance(decoded_args[0], str):
return f"String processing: {decoded_args[0]}"
elif len(decoded_args) == 2 and all(isinstance(arg, int) for arg in decoded_args):
return f"Math result: {decoded_args[0] + decoded_args[1]}"
else:
return f"Generic processing: {decoded_args}"def metric_meter() -> temporalio.common.MetricMeter:
"""Get the metric meter for the current activity.
.. warning::
This is only available in async or synchronous threaded activities. An
error is raised on non-thread-based sync activities when trying to
access this.
Returns:
Current metric meter for this activity for recording metrics.
Raises:
RuntimeError: When not in an activity or in a non-thread-based
synchronous activity.
"""@activity.defn
async def metrics_activity(data_size: int) -> str:
"""Activity that records custom metrics."""
meter = activity.metric_meter()
# Create custom metrics
processing_counter = meter.create_counter(
"activity_items_processed",
"Number of items processed by activity"
)
processing_histogram = meter.create_histogram(
"activity_processing_duration",
"Time spent processing items"
)
start_time = time.time()
# Process data with metrics
for i in range(data_size):
await process_item(i)
processing_counter.add(1)
duration = time.time() - start_time
processing_histogram.record(duration)
return f"Processed {data_size} items in {duration:.2f}s"@dataclass(frozen=True)
class Info:
"""Information about the running activity.
Retrieved inside an activity via :py:func:`info`.
"""
activity_id: str
activity_type: str
attempt: int
current_attempt_scheduled_time: datetime
heartbeat_details: Sequence[Any]
heartbeat_timeout: Optional[timedelta]
is_local: bool
schedule_to_close_timeout: Optional[timedelta]
scheduled_time: datetime
start_to_close_timeout: Optional[timedelta]
started_time: datetime
task_queue: str
task_token: bytes
workflow_id: str
workflow_namespace: str
workflow_run_id: str
workflow_type: str
priority: temporalio.common.PriorityThe Info class provides comprehensive information about the activity execution context:
@activity.defn
async def info_logging_activity(data: str) -> str:
"""Activity that logs detailed execution information."""
info = activity.info()
# Log execution details
logger.info(f"Activity {info.activity_type} started")
logger.info(f"Attempt {info.attempt} of activity {info.activity_id}")
logger.info(f"Running for workflow {info.workflow_type}:{info.workflow_id}")
logger.info(f"Task queue: {info.task_queue}")
logger.info(f"Is local activity: {info.is_local}")
if info.heartbeat_timeout:
logger.info(f"Heartbeat timeout: {info.heartbeat_timeout}")
if info.start_to_close_timeout:
logger.info(f"Start-to-close timeout: {info.start_to_close_timeout}")
# Use priority information
if info.priority.priority_key:
logger.info(f"Priority key: {info.priority.priority_key}")
return f"Processed {data} in attempt {info.attempt}"@dataclass(frozen=True)
class Priority:
"""Priority contains metadata that controls relative ordering of task processing when tasks are
backlogged in a queue."""
priority_key: Optional[int] = None
"""Priority key is a positive integer from 1 to n, where smaller integers correspond to higher
priorities (tasks run sooner)."""
fairness_key: Optional[str] = None
"""A short string (max 64 bytes) that is used as a key for a fairness balancing mechanism."""
fairness_weight: Optional[float] = None
"""A float that represents the weight for task dispatch for the associated fairness key."""
default: ClassVar[Priority]
"""Singleton default priority instance."""class LoggerAdapter(logging.LoggerAdapter):
"""Adapter that adds details to the log about the running activity.
Attributes:
activity_info_on_message: Boolean for whether a string representation of
a dict of some activity info will be appended to each message.
Default is True.
activity_info_on_extra: Boolean for whether a ``temporal_activity``
dictionary value will be added to the ``extra`` dictionary with some
activity info, making it present on the ``LogRecord.__dict__`` for
use by others. Default is True.
full_activity_info_on_extra: Boolean for whether an ``activity_info``
value will be added to the ``extra`` dictionary with the entire
activity info, making it present on the ``LogRecord.__dict__`` for
use by others. Default is False.
"""
def __init__(
self, logger: logging.Logger, extra: Optional[Mapping[str, Any]]
) -> None:
"""Create the logger adapter."""
def process(
self, msg: Any, kwargs: MutableMapping[str, Any]
) -> Tuple[Any, MutableMapping[str, Any]]:
"""Override to add activity details."""
@property
def base_logger(self) -> logging.Logger:
"""Underlying logger usable for actions such as adding
handlers/formatters.
"""The SDK provides a pre-configured logger adapter:
logger: LoggerAdapter
"""Logger that will have contextual activity details embedded."""@activity.defn
async def logging_activity(message: str) -> str:
"""Activity that demonstrates contextual logging."""
# Use the pre-configured logger with activity context
activity.logger.info(f"Processing message: {message}")
activity.logger.warning("This is a warning with activity context")
# Create custom logger adapter
custom_logger = activity.LoggerAdapter(
logging.getLogger("custom"),
{"custom_field": "custom_value"}
)
custom_logger.activity_info_on_message = False
custom_logger.full_activity_info_on_extra = True
custom_logger.info("Custom logging with full activity info in extra")
return f"Logged: {message}"CallableType = TypeVar('CallableType', bound=Callable)
"""Type variable for callable functions used in activity definitions."""@dataclass(frozen=True)
class RawValue:
"""Raw value container for dynamic activities."""
payload: temporalio.api.common.v1.Payload
"""The raw payload data."""@activity.defn(name="robust_data_processor")
async def robust_activity(
data: List[dict],
batch_size: int = 10,
timeout_seconds: int = 300
) -> dict:
"""Comprehensive activity demonstrating all features."""
info = activity.info()
meter = activity.metric_meter()
# Create metrics
items_processed = meter.create_counter("items_processed", "Items processed")
processing_duration = meter.create_histogram("processing_duration", "Processing time")
activity.logger.info(f"Starting robust processing of {len(data)} items")
results = []
start_time = time.time()
try:
for i, item in enumerate(data):
# Check for various cancellation conditions
if activity.is_cancelled():
details = activity.cancellation_details()
if details and details.timed_out:
activity.logger.warning("Activity timed out, saving partial results")
await save_partial_results(results)
break
if activity.is_worker_shutdown():
activity.logger.warning("Worker shutting down, saving progress")
await save_partial_results(results)
break
# Process item with error handling
try:
result = await process_complex_item(item)
results.append(result)
items_processed.add(1)
except Exception as e:
activity.logger.error(f"Failed to process item {i}: {e}")
continue
# Send heartbeat every batch
if (i + 1) % batch_size == 0:
progress = {
"processed": len(results),
"total": len(data),
"success_rate": len(results) / (i + 1),
"elapsed_time": time.time() - start_time
}
activity.heartbeat(progress)
duration = time.time() - start_time
processing_duration.record(duration)
final_result = {
"total_items": len(data),
"processed_items": len(results),
"success_rate": len(results) / len(data) if data else 0,
"processing_time": duration,
"activity_info": {
"activity_id": info.activity_id,
"attempt": info.attempt,
"workflow_id": info.workflow_id
}
}
activity.logger.info(f"Successfully processed {len(results)}/{len(data)} items")
return final_result
except Exception as e:
activity.logger.error(f"Activity failed: {e}")
# Save whatever progress we made
await save_partial_results(results)
raiseThis comprehensive activity.md sub-doc provides complete coverage of the temporalio.activity module, including all API signatures, detailed parameter documentation, usage examples, and comprehensive type definitions. The documentation follows the Knowledge Tile format and provides developers with everything needed to effectively use activities in their Temporal workflows.
Install with Tessl CLI
npx tessl i tessl/pypi-temporalio