CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-temporalio

Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.

Overview
Eval results
Files

activity.mddocs/

Activity Development

The temporalio.activity module provides functions and utilities for defining and executing activities within Temporal workflows. Activities are the building blocks of workflows that perform actual business logic, interact with external systems, and can be retried, timeout, and cancelled.

Core Imports

from temporalio import activity
from temporalio.activity import Info, LoggerAdapter, ActivityCancellationDetails

Activity Definition

Activity Decorator

The @activity.defn decorator is used to mark functions as Temporal activities:

@overload
def defn(fn: CallableType) -> CallableType: ...

@overload
def defn(
    *,
    name: Optional[str] = None,
    no_thread_cancel_exception: bool = False
) -> Callable[[CallableType], CallableType]: ...

@overload
def defn(
    *,
    no_thread_cancel_exception: bool = False,
    dynamic: bool = False
) -> Callable[[CallableType], CallableType]: ...

def defn(
    fn: Optional[CallableType] = None,
    *,
    name: Optional[str] = None,
    no_thread_cancel_exception: bool = False,
    dynamic: bool = False,
) -> Union[CallableType, Callable[[CallableType], CallableType]]:
    """Decorator for activity functions.

    Activities can be async or non-async.

    Args:
        fn: The function to decorate.
        name: Name to use for the activity. Defaults to function ``__name__``.
            This cannot be set if dynamic is set.
        no_thread_cancel_exception: If set to true, an exception will not be
            raised in synchronous, threaded activities upon cancellation.
        dynamic: If true, this activity will be dynamic. Dynamic activities have
            to accept a single 'Sequence[RawValue]' parameter. This cannot be
            set to true if name is present.
    """

Basic Activity Definition

@activity.defn
async def process_data(data: str) -> str:
    """Async activity function."""
    return f"Processed: {data}"

@activity.defn
def sync_process_data(data: str) -> str:
    """Synchronous activity function."""
    return f"Processed: {data}"

Named Activities

@activity.defn(name="custom_activity_name")
async def my_activity(input: str) -> str:
    return f"Result: {input}"

Thread-Safe Cancellation Control

@activity.defn(no_thread_cancel_exception=True)
def robust_activity(data: str) -> str:
    """Activity that won't have cancellation exceptions thrown in threads."""
    # Critical cleanup logic that should not be interrupted
    return process_critical_data(data)

Dynamic Activities

from temporalio.common import RawValue
from typing import Sequence

@activity.defn(dynamic=True)
async def dynamic_activity(args: Sequence[RawValue]) -> Any:
    """Dynamic activity that can handle any activity type."""
    # Use payload_converter() to decode raw values
    converter = activity.payload_converter()
    decoded_args = [converter.from_payload(arg.payload) for arg in args]
    return process_dynamic_args(decoded_args)

Activity Context Functions

Client Access

def client() -> temporalio.client.Client:
    """Return a Temporal Client for use in the current activity.

    The client is only available in `async def` activities.

    In tests it is not available automatically, but you can pass a client when creating a
    :py:class:`temporalio.testing.ActivityEnvironment`.

    Returns:
        :py:class:`temporalio.client.Client` for use in the current activity.

    Raises:
        RuntimeError: When the client is not available.
    """
@activity.defn
async def query_external_service(user_id: str) -> dict:
    """Activity that uses the client to interact with Temporal."""
    client = activity.client()

    # Can use client to start child workflows, query other executions, etc.
    result = await client.execute_workflow(
        "external_workflow",
        user_id,
        id=f"external-{user_id}",
        task_queue="external-queue"
    )
    return result

Activity Information Access

def info() -> Info:
    """Current activity's info.

    Returns:
        Info for the currently running activity.

    Raises:
        RuntimeError: When not in an activity.
    """
@activity.defn
async def tracked_activity(data: str) -> str:
    """Activity that uses info for logging and tracking."""
    info = activity.info()

    # Access activity execution details
    activity_logger.info(
        f"Processing activity {info.activity_type} "
        f"(attempt {info.attempt}) for workflow {info.workflow_id}"
    )

    return f"Processed {data} in attempt {info.attempt}"

Context Detection

def in_activity() -> bool:
    """Whether the current code is inside an activity.

    Returns:
        True if in an activity, False otherwise.
    """
def conditional_logic():
    """Function that behaves differently in activity vs non-activity context."""
    if activity.in_activity():
        # Use activity-specific logging and error handling
        info = activity.info()
        logger = activity.logger
        logger.info(f"Running in activity {info.activity_type}")
    else:
        # Use regular application logging
        logger = logging.getLogger(__name__)
        logger.info("Running outside activity context")

Activity Lifecycle Management

Heartbeat Functionality

def heartbeat(*details: Any) -> None:
    """Send a heartbeat for the current activity.

    Raises:
        RuntimeError: When not in an activity.
    """

Heartbeats are essential for long-running activities to signal that they are still alive and making progress:

@activity.defn
async def long_running_task(items: List[str]) -> List[str]:
    """Activity that reports progress via heartbeats."""
    results = []

    for i, item in enumerate(items):
        # Process the item
        result = await process_item(item)
        results.append(result)

        # Send heartbeat with progress details
        progress = {
            "processed": i + 1,
            "total": len(items),
            "last_item": item
        }
        activity.heartbeat(progress)

        # Check for cancellation periodically
        if activity.is_cancelled():
            break

    return results

Cancellation Detection

def is_cancelled() -> bool:
    """Whether a cancellation was ever requested on this activity.

    Returns:
        True if the activity has had a cancellation request, False otherwise.

    Raises:
        RuntimeError: When not in an activity.
    """
@activity.defn
async def cancellable_activity(duration_seconds: int) -> str:
    """Activity that gracefully handles cancellation."""
    start_time = time.time()

    while time.time() - start_time < duration_seconds:
        # Check for cancellation
        if activity.is_cancelled():
            # Perform cleanup
            await cleanup_resources()
            return "Cancelled gracefully"

        # Do some work
        await asyncio.sleep(1)
        activity.heartbeat({"elapsed": time.time() - start_time})

    return "Completed successfully"

Worker Shutdown Detection

def is_worker_shutdown() -> bool:
    """Whether shutdown has been invoked on the worker.

    Returns:
        True if shutdown has been called on the worker, False otherwise.

    Raises:
        RuntimeError: When not in an activity.
    """
@activity.defn
async def shutdown_aware_activity(data: List[str]) -> List[str]:
    """Activity that handles worker shutdown gracefully."""
    results = []

    for item in data:
        # Check if worker is shutting down
        if activity.is_worker_shutdown():
            # Save partial progress and exit cleanly
            await save_partial_results(results)
            raise Exception("Worker shutting down, partial results saved")

        result = await process_item(item)
        results.append(result)

    return results

Activity Cancellation and Control

Cancellation Details

def cancellation_details() -> Optional[ActivityCancellationDetails]:
    """Cancellation details of the current activity, if any. Once set, cancellation details do not change."""

The ActivityCancellationDetails class provides detailed information about why an activity was cancelled:

@dataclass(frozen=True)
class ActivityCancellationDetails:
    """Provides the reasons for the activity's cancellation. Cancellation details are set once and do not change once set."""

    not_found: bool = False
    cancel_requested: bool = False
    paused: bool = False
    timed_out: bool = False
    worker_shutdown: bool = False
@activity.defn
async def detailed_cancellation_handling() -> str:
    """Activity that provides detailed cancellation information."""
    try:
        # Perform long-running work
        await perform_work()
        return "Success"
    except Exception:
        # Check cancellation details
        details = activity.cancellation_details()
        if details:
            if details.timed_out:
                await handle_timeout_cleanup()
                return "Timed out, cleanup completed"
            elif details.cancel_requested:
                await handle_cancellation_cleanup()
                return "Cancelled by request, cleanup completed"
            elif details.worker_shutdown:
                await handle_shutdown_cleanup()
                return "Worker shutdown, cleanup completed"
        raise

Shielding from Cancellation

@contextmanager
def shield_thread_cancel_exception() -> Iterator[None]:
    """Context manager for synchronous multithreaded activities to delay
    cancellation exceptions.

    By default, synchronous multithreaded activities have an exception thrown
    inside when cancellation occurs. Code within a "with" block of this context
    manager will delay that throwing until the end. Even if the block returns a
    value or throws its own exception, if a cancellation exception is pending,
    it is thrown instead. Therefore users are encouraged to not throw out of
    this block and can surround this with a try/except if they wish to catch a
    cancellation.

    This properly supports nested calls and will only throw after the last one.

    This just runs the blocks with no extra effects for async activities or
    synchronous multiprocess/other activities.

    Raises:
        temporalio.exceptions.CancelledError: If a cancellation occurs anytime
            during this block and this is not nested in another shield block.
    """
@activity.defn
def sync_activity_with_cleanup(data: str) -> str:
    """Synchronous activity that performs critical cleanup."""
    try:
        # Critical section that should not be interrupted
        with activity.shield_thread_cancel_exception():
            # Perform critical operations
            critical_result = perform_critical_operation(data)

            # Critical cleanup that must complete
            cleanup_critical_resources()

            return critical_result
    except temporalio.exceptions.CancelledError:
        # Handle cancellation after cleanup is complete
        return "Cancelled after cleanup"

Synchronous Cancellation Waiting

def wait_for_cancelled_sync(timeout: Optional[Union[timedelta, float]] = None) -> None:
    """Synchronously block while waiting for a cancellation request on this
    activity.

    This is essentially a wrapper around :py:meth:`threading.Event.wait`.

    Args:
        timeout: Max amount of time to wait for cancellation.

    Raises:
        RuntimeError: When not in an activity.
    """
@activity.defn
def sync_monitoring_activity(check_interval: float = 1.0) -> str:
    """Synchronous activity that monitors for cancellation."""
    while True:
        # Perform some work
        result = perform_work_unit()

        # Wait for cancellation with timeout
        activity.wait_for_cancelled_sync(timeout=check_interval)

        # Check if actually cancelled
        if activity.is_cancelled():
            return "Cancelled during monitoring"

        # Continue if timeout occurred (not cancelled)
        if work_is_complete(result):
            return "Work completed successfully"

Worker Shutdown Waiting

def wait_for_worker_shutdown_sync(
    timeout: Optional[Union[timedelta, float]] = None,
) -> None:
    """Synchronously block while waiting for shutdown to be called on the
    worker.

    This is essentially a wrapper around :py:meth:`threading.Event.wait`.

    Args:
        timeout: Max amount of time to wait for shutdown to be called on the
            worker.

    Raises:
        RuntimeError: When not in an activity.
    """
@activity.defn
def graceful_shutdown_activity(work_items: List[str]) -> List[str]:
    """Activity that gracefully handles worker shutdown."""
    results = []

    for item in work_items:
        # Process item
        result = process_item(item)
        results.append(result)

        # Check for worker shutdown with timeout
        activity.wait_for_worker_shutdown_sync(timeout=0.1)

        if activity.is_worker_shutdown():
            # Save partial results before shutdown
            save_results(results)
            raise Exception(f"Worker shutdown, saved {len(results)} results")

    return results

Async Activity Completion

External Completion

def raise_complete_async() -> NoReturn:
    """Raise an error that says the activity will be completed
    asynchronously.
    """

For activities that need to be completed by external systems:

@activity.defn
async def async_completion_activity(task_id: str) -> str:
    """Activity that will be completed externally."""
    # Start external process
    external_system.start_task(task_id, callback_url="http://callback/complete")

    # Register for external completion
    activity.raise_complete_async()
    # This line will never be reached

The external system would then complete the activity using the client:

# External system completion
client = Client.connect("localhost:7233")
await client.complete_activity_by_task_token(
    task_token=activity_task_token,
    result="External completion result"
)

Utility Functions

Payload Converter Access

def payload_converter() -> temporalio.converter.PayloadConverter:
    """Get the payload converter for the current activity.

    This is often used for dynamic activities to convert payloads.
    """
@activity.defn(dynamic=True)
async def dynamic_payload_activity(args: Sequence[temporalio.common.RawValue]) -> Any:
    """Dynamic activity that handles various payload types."""
    converter = activity.payload_converter()

    # Convert raw payloads to Python objects
    decoded_args = []
    for raw_value in args:
        decoded = converter.from_payload(raw_value.payload)
        decoded_args.append(decoded)

    # Process based on argument types
    if len(decoded_args) == 1 and isinstance(decoded_args[0], str):
        return f"String processing: {decoded_args[0]}"
    elif len(decoded_args) == 2 and all(isinstance(arg, int) for arg in decoded_args):
        return f"Math result: {decoded_args[0] + decoded_args[1]}"
    else:
        return f"Generic processing: {decoded_args}"

Metric Meter Access

def metric_meter() -> temporalio.common.MetricMeter:
    """Get the metric meter for the current activity.

    .. warning::
        This is only available in async or synchronous threaded activities. An
        error is raised on non-thread-based sync activities when trying to
        access this.

    Returns:
        Current metric meter for this activity for recording metrics.

    Raises:
        RuntimeError: When not in an activity or in a non-thread-based
            synchronous activity.
    """
@activity.defn
async def metrics_activity(data_size: int) -> str:
    """Activity that records custom metrics."""
    meter = activity.metric_meter()

    # Create custom metrics
    processing_counter = meter.create_counter(
        "activity_items_processed",
        "Number of items processed by activity"
    )

    processing_histogram = meter.create_histogram(
        "activity_processing_duration",
        "Time spent processing items"
    )

    start_time = time.time()

    # Process data with metrics
    for i in range(data_size):
        await process_item(i)
        processing_counter.add(1)

    duration = time.time() - start_time
    processing_histogram.record(duration)

    return f"Processed {data_size} items in {duration:.2f}s"

Core Classes and Types

Info Class

@dataclass(frozen=True)
class Info:
    """Information about the running activity.

    Retrieved inside an activity via :py:func:`info`.
    """

    activity_id: str
    activity_type: str
    attempt: int
    current_attempt_scheduled_time: datetime
    heartbeat_details: Sequence[Any]
    heartbeat_timeout: Optional[timedelta]
    is_local: bool
    schedule_to_close_timeout: Optional[timedelta]
    scheduled_time: datetime
    start_to_close_timeout: Optional[timedelta]
    started_time: datetime
    task_queue: str
    task_token: bytes
    workflow_id: str
    workflow_namespace: str
    workflow_run_id: str
    workflow_type: str
    priority: temporalio.common.Priority

The Info class provides comprehensive information about the activity execution context:

@activity.defn
async def info_logging_activity(data: str) -> str:
    """Activity that logs detailed execution information."""
    info = activity.info()

    # Log execution details
    logger.info(f"Activity {info.activity_type} started")
    logger.info(f"Attempt {info.attempt} of activity {info.activity_id}")
    logger.info(f"Running for workflow {info.workflow_type}:{info.workflow_id}")
    logger.info(f"Task queue: {info.task_queue}")
    logger.info(f"Is local activity: {info.is_local}")

    if info.heartbeat_timeout:
        logger.info(f"Heartbeat timeout: {info.heartbeat_timeout}")

    if info.start_to_close_timeout:
        logger.info(f"Start-to-close timeout: {info.start_to_close_timeout}")

    # Use priority information
    if info.priority.priority_key:
        logger.info(f"Priority key: {info.priority.priority_key}")

    return f"Processed {data} in attempt {info.attempt}"

Priority Class

@dataclass(frozen=True)
class Priority:
    """Priority contains metadata that controls relative ordering of task processing when tasks are
    backlogged in a queue."""

    priority_key: Optional[int] = None
    """Priority key is a positive integer from 1 to n, where smaller integers correspond to higher
    priorities (tasks run sooner)."""

    fairness_key: Optional[str] = None
    """A short string (max 64 bytes) that is used as a key for a fairness balancing mechanism."""

    fairness_weight: Optional[float] = None
    """A float that represents the weight for task dispatch for the associated fairness key."""

    default: ClassVar[Priority]
    """Singleton default priority instance."""

LoggerAdapter Class

class LoggerAdapter(logging.LoggerAdapter):
    """Adapter that adds details to the log about the running activity.

    Attributes:
        activity_info_on_message: Boolean for whether a string representation of
            a dict of some activity info will be appended to each message.
            Default is True.
        activity_info_on_extra: Boolean for whether a ``temporal_activity``
            dictionary value will be added to the ``extra`` dictionary with some
            activity info, making it present on the ``LogRecord.__dict__`` for
            use by others. Default is True.
        full_activity_info_on_extra: Boolean for whether an ``activity_info``
            value will be added to the ``extra`` dictionary with the entire
            activity info, making it present on the ``LogRecord.__dict__`` for
            use by others. Default is False.
    """

    def __init__(
        self, logger: logging.Logger, extra: Optional[Mapping[str, Any]]
    ) -> None:
        """Create the logger adapter."""

    def process(
        self, msg: Any, kwargs: MutableMapping[str, Any]
    ) -> Tuple[Any, MutableMapping[str, Any]]:
        """Override to add activity details."""

    @property
    def base_logger(self) -> logging.Logger:
        """Underlying logger usable for actions such as adding
        handlers/formatters.
        """

The SDK provides a pre-configured logger adapter:

logger: LoggerAdapter
"""Logger that will have contextual activity details embedded."""
@activity.defn
async def logging_activity(message: str) -> str:
    """Activity that demonstrates contextual logging."""
    # Use the pre-configured logger with activity context
    activity.logger.info(f"Processing message: {message}")
    activity.logger.warning("This is a warning with activity context")

    # Create custom logger adapter
    custom_logger = activity.LoggerAdapter(
        logging.getLogger("custom"),
        {"custom_field": "custom_value"}
    )
    custom_logger.activity_info_on_message = False
    custom_logger.full_activity_info_on_extra = True

    custom_logger.info("Custom logging with full activity info in extra")

    return f"Logged: {message}"

Supporting Types

CallableType

CallableType = TypeVar('CallableType', bound=Callable)
"""Type variable for callable functions used in activity definitions."""

RawValue

@dataclass(frozen=True)
class RawValue:
    """Raw value container for dynamic activities."""

    payload: temporalio.api.common.v1.Payload
    """The raw payload data."""

Complete Activity Examples

Robust Activity with All Features

@activity.defn(name="robust_data_processor")
async def robust_activity(
    data: List[dict],
    batch_size: int = 10,
    timeout_seconds: int = 300
) -> dict:
    """Comprehensive activity demonstrating all features."""
    info = activity.info()
    meter = activity.metric_meter()

    # Create metrics
    items_processed = meter.create_counter("items_processed", "Items processed")
    processing_duration = meter.create_histogram("processing_duration", "Processing time")

    activity.logger.info(f"Starting robust processing of {len(data)} items")

    results = []
    start_time = time.time()

    try:
        for i, item in enumerate(data):
            # Check for various cancellation conditions
            if activity.is_cancelled():
                details = activity.cancellation_details()
                if details and details.timed_out:
                    activity.logger.warning("Activity timed out, saving partial results")
                    await save_partial_results(results)
                break

            if activity.is_worker_shutdown():
                activity.logger.warning("Worker shutting down, saving progress")
                await save_partial_results(results)
                break

            # Process item with error handling
            try:
                result = await process_complex_item(item)
                results.append(result)
                items_processed.add(1)
            except Exception as e:
                activity.logger.error(f"Failed to process item {i}: {e}")
                continue

            # Send heartbeat every batch
            if (i + 1) % batch_size == 0:
                progress = {
                    "processed": len(results),
                    "total": len(data),
                    "success_rate": len(results) / (i + 1),
                    "elapsed_time": time.time() - start_time
                }
                activity.heartbeat(progress)

        duration = time.time() - start_time
        processing_duration.record(duration)

        final_result = {
            "total_items": len(data),
            "processed_items": len(results),
            "success_rate": len(results) / len(data) if data else 0,
            "processing_time": duration,
            "activity_info": {
                "activity_id": info.activity_id,
                "attempt": info.attempt,
                "workflow_id": info.workflow_id
            }
        }

        activity.logger.info(f"Successfully processed {len(results)}/{len(data)} items")
        return final_result

    except Exception as e:
        activity.logger.error(f"Activity failed: {e}")
        # Save whatever progress we made
        await save_partial_results(results)
        raise

This comprehensive activity.md sub-doc provides complete coverage of the temporalio.activity module, including all API signatures, detailed parameter documentation, usage examples, and comprehensive type definitions. The documentation follows the Knowledge Tile format and provides developers with everything needed to effectively use activities in their Temporal workflows.

Install with Tessl CLI

npx tessl i tessl/pypi-temporalio

docs

activity.md

client.md

common.md

contrib-pydantic.md

data-conversion.md

exceptions.md

index.md

runtime.md

testing.md

worker.md

workflow.md

tile.json