CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-temporalio

Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.

Overview
Eval results
Files

worker.mddocs/

Worker Management

The temporalio worker module provides comprehensive functionality for processing Temporal workflows and activities. Workers are the engine that executes your distributed application logic, handling task polling, execution, and resource management.

Worker Creation and Configuration

Worker Class

The primary worker class handles both workflow and activity execution:

class Worker:
    """Worker to process workflows and/or activities.

    Once created, workers can be run and shutdown explicitly via run()
    and shutdown(). Alternatively workers can be used in an async with clause.
    """

    def __init__(
        self,
        client: temporalio.client.Client,
        *,
        task_queue: str,
        activities: Sequence[Callable] = [],
        nexus_service_handlers: Sequence[Any] = [],
        workflows: Sequence[Type] = [],
        activity_executor: Optional[concurrent.futures.Executor] = None,
        workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None,
        nexus_task_executor: Optional[concurrent.futures.Executor] = None,
        workflow_runner: WorkflowRunner = SandboxedWorkflowRunner(),
        unsandboxed_workflow_runner: WorkflowRunner = UnsandboxedWorkflowRunner(),
        plugins: Sequence[Plugin] = [],
        interceptors: Sequence[Interceptor] = [],
        build_id: Optional[str] = None,
        identity: Optional[str] = None,
        max_cached_workflows: int = 1000,
        max_concurrent_workflow_tasks: Optional[int] = None,
        max_concurrent_activities: Optional[int] = None,
        max_concurrent_local_activities: Optional[int] = None,
        tuner: Optional[WorkerTuner] = None,
        max_concurrent_workflow_task_polls: Optional[int] = None,
        nonsticky_to_sticky_poll_ratio: float = 0.2,
        max_concurrent_activity_task_polls: Optional[int] = None,
        no_remote_activities: bool = False,
        sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10),
        max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60),
        default_heartbeat_throttle_interval: timedelta = timedelta(seconds=30),
        max_activities_per_second: Optional[float] = None,
        max_task_queue_activities_per_second: Optional[float] = None,
        graceful_shutdown_timeout: timedelta = timedelta(),
        workflow_failure_exception_types: Sequence[Type[BaseException]] = [],
        shared_state_manager: Optional[SharedStateManager] = None,
        debug_mode: bool = False,
        disable_eager_activity_execution: bool = False,
        on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]] = None,
        use_worker_versioning: bool = False,
        disable_safe_workflow_eviction: bool = False,
        deployment_config: Optional[WorkerDeploymentConfig] = None,
        workflow_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(maximum=5),
        activity_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(maximum=5),
        nexus_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(maximum=5),
    ) -> None:
        """Create a worker to process workflows and/or activities.

        Args:
            client: Client to use for this worker. This is required and must be
                the Client instance or have a worker_service_client attribute with
                reference to the original client's underlying service client.
                This client cannot be "lazy".
            task_queue: Required task queue for this worker.
            activities: Activity callables decorated with @activity.defn.
                Activities may be async functions or non-async functions.
            nexus_service_handlers: Instances of Nexus service handler classes
                decorated with @nexusrpc.handler.service_handler.
            workflows: Workflow classes decorated with @workflow.defn.
            activity_executor: Concurrent executor to use for non-async
                activities. This is required if any activities are non-async.
                ThreadPoolExecutor is recommended. If this is a
                ProcessPoolExecutor, all non-async activities must be picklable.
            workflow_task_executor: Thread pool executor for workflow tasks. If
                this is not present, a new ThreadPoolExecutor will be
                created with max_workers set to max_concurrent_workflow_tasks if it is present,
                or 500 otherwise.
            nexus_task_executor: Executor to use for non-async
                Nexus operations. This is required if any operation start methods
                are non-async def. ThreadPoolExecutor is recommended.
            workflow_runner: Runner for workflows.
            unsandboxed_workflow_runner: Runner for workflows that opt-out of
                sandboxing.
            plugins: Collection of plugins for this worker. Any plugins already
                on the client that also implement Plugin are
                prepended to this list and should not be explicitly given here
                to avoid running the plugin twice.
            interceptors: Collection of interceptors for this worker. Any
                interceptors already on the client that also implement
                Interceptor are prepended to this list and should
                not be explicitly given here.
            build_id: Unique identifier for the current runtime. This is best
                set as a hash of all code and should change only when code does.
                If unset, a best-effort identifier is generated.
                Exclusive with deployment_config.
            identity: Identity for this worker client. If unset, the client
                identity is used.
            max_cached_workflows: If nonzero, workflows will be cached and
                sticky task queues will be used.
            max_concurrent_workflow_tasks: Maximum allowed number of workflow
                tasks that will ever be given to this worker at one time. Mutually exclusive with
                tuner. Must be set to at least two if max_cached_workflows is nonzero.
            max_concurrent_activities: Maximum number of activity tasks that
                will ever be given to the activity worker concurrently. Mutually exclusive with tuner.
            max_concurrent_local_activities: Maximum number of local activity
                tasks that will ever be given to the activity worker concurrently. Mutually exclusive with tuner.
            tuner: Provide a custom WorkerTuner. Mutually exclusive with the
                max_concurrent_workflow_tasks, max_concurrent_activities, and
                max_concurrent_local_activities arguments.
                Defaults to fixed-size 100 slots for each slot kind if unset and none of the
                max_* arguments are provided.
            max_concurrent_workflow_task_polls: Maximum number of concurrent
                poll workflow task requests we will perform at a time on this worker's task queue.
                Must be set to at least two if max_cached_workflows is nonzero.
                If set, will override any value passed to workflow_task_poller_behavior.
            nonsticky_to_sticky_poll_ratio: max_concurrent_workflow_task_polls *
                this number = the number of max pollers that will be allowed for
                the nonsticky queue when sticky tasks are enabled.
            max_concurrent_activity_task_polls: Maximum number of concurrent
                poll activity task requests we will perform at a time on this
                worker's task queue.
                If set, will override any value passed to activity_task_poller_behavior.
            no_remote_activities: If true, this worker will only handle workflow
                tasks and local activities, it will not poll for activity tasks.
            sticky_queue_schedule_to_start_timeout: How long a workflow task is
                allowed to sit on the sticky queue before it is timed out and
                moved to the non-sticky queue where it may be picked up by any
                worker.
            max_heartbeat_throttle_interval: Longest interval for throttling
                activity heartbeats.
            default_heartbeat_throttle_interval: Default interval for throttling
                activity heartbeats in case per-activity heartbeat timeout is
                unset. Otherwise, it's the per-activity heartbeat timeout * 0.8.
            max_activities_per_second: Limits the number of activities per
                second that this worker will process. The worker will not poll
                for new activities if by doing so it might receive and execute
                an activity which would cause it to exceed this limit.
            max_task_queue_activities_per_second: Sets the maximum number of
                activities per second the task queue will dispatch, controlled
                server-side. Note that this only takes effect upon an activity
                poll request.
            graceful_shutdown_timeout: Amount of time after shutdown is called
                that activities are given to complete before their tasks are
                cancelled.
            workflow_failure_exception_types: The types of exceptions that, if a
                workflow-thrown exception extends, will cause the
                workflow/update to fail instead of suspending the workflow via
                task failure.
            shared_state_manager: Used for obtaining cross-process friendly
                synchronization primitives. This is required for non-async
                activities where the activity_executor is not a
                ThreadPoolExecutor. Reuse of these across workers is encouraged.
            debug_mode: If true, will disable deadlock detection and may disable
                sandboxing in order to make using a debugger easier. If false
                but the environment variable TEMPORAL_DEBUG is truthy, this
                will be set to true.
            disable_eager_activity_execution: If true, will disable eager
                activity execution. Eager activity execution is an optimization
                on some servers that sends activities back to the same worker as
                the calling workflow if they can run there.
            on_fatal_error: An async function that can handle a failure before
                the worker shutdown commences. This cannot stop the shutdown and
                any exception raised is logged and ignored.
            use_worker_versioning: If true, the build_id argument must be
                specified, and this worker opts into the worker versioning
                feature. This ensures it only receives workflow tasks for
                workflows which it claims to be compatible with.
                Exclusive with deployment_config.
            disable_safe_workflow_eviction: If true, instead of letting the
                workflow collect its tasks properly, the worker will simply let
                the Python garbage collector collect the tasks.
            deployment_config: Deployment config for the worker. Exclusive with build_id and
                use_worker_versioning.
            workflow_task_poller_behavior: Specify the behavior of workflow task polling.
                Defaults to a 5-poller maximum.
            activity_task_poller_behavior: Specify the behavior of activity task polling.
                Defaults to a 5-poller maximum.
            nexus_task_poller_behavior: Specify the behavior of Nexus task polling.
                Defaults to a 5-poller maximum.
        """

WorkerConfig

TypedDict for worker configuration:

class WorkerConfig(TypedDict, total=False):
    """TypedDict of config originally passed to Worker."""

    client: temporalio.client.Client
    task_queue: str
    activities: Sequence[Callable]
    nexus_service_handlers: Sequence[Any]
    workflows: Sequence[Type]
    activity_executor: Optional[concurrent.futures.Executor]
    workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor]
    nexus_task_executor: Optional[concurrent.futures.Executor]
    workflow_runner: WorkflowRunner
    unsandboxed_workflow_runner: WorkflowRunner
    interceptors: Sequence[Interceptor]
    build_id: Optional[str]
    identity: Optional[str]
    max_cached_workflows: int
    max_concurrent_workflow_tasks: Optional[int]
    max_concurrent_activities: Optional[int]
    max_concurrent_local_activities: Optional[int]
    tuner: Optional[WorkerTuner]
    max_concurrent_workflow_task_polls: Optional[int]
    nonsticky_to_sticky_poll_ratio: float
    max_concurrent_activity_task_polls: Optional[int]
    no_remote_activities: bool
    sticky_queue_schedule_to_start_timeout: timedelta
    max_heartbeat_throttle_interval: timedelta
    default_heartbeat_throttle_interval: timedelta
    max_activities_per_second: Optional[float]
    max_task_queue_activities_per_second: Optional[float]
    graceful_shutdown_timeout: timedelta
    workflow_failure_exception_types: Sequence[Type[BaseException]]
    shared_state_manager: Optional[SharedStateManager]
    debug_mode: bool
    disable_eager_activity_execution: bool
    on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]]
    use_worker_versioning: bool
    disable_safe_workflow_eviction: bool
    deployment_config: Optional[WorkerDeploymentConfig]
    workflow_task_poller_behavior: PollerBehavior
    activity_task_poller_behavior: PollerBehavior
    nexus_task_poller_behavior: PollerBehavior

WorkerDeploymentConfig

Configuration for Worker Versioning feature:

@dataclass
class WorkerDeploymentConfig:
    """Options for configuring the Worker Versioning feature.

    WARNING: This is an experimental feature and may change in the future.
    """

    version: WorkerDeploymentVersion
    """The deployment version information."""

    use_worker_versioning: bool
    """Whether to enable worker versioning."""

    default_versioning_behavior: VersioningBehavior = VersioningBehavior.UNSPECIFIED
    """Default versioning behavior for workflow tasks."""

    def _to_bridge_worker_deployment_options(self) -> temporalio.bridge.worker.WorkerDeploymentOptions:
        """Convert to bridge worker deployment options."""

Basic Worker Usage

import asyncio
from temporalio import activity, client, worker, workflow

# Define activities and workflows
@activity.defn
async def say_hello(name: str) -> str:
    return f"Hello, {name}!"

@workflow.defn
class SayHelloWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        return await workflow.execute_activity(
            say_hello,
            name,
            schedule_to_close_timeout=timedelta(seconds=60),
        )

async def main():
    # Create client
    client_instance = await client.Client.connect("localhost:7233")

    # Create worker
    worker_instance = worker.Worker(
        client_instance,
        task_queue="hello-task-queue",
        workflows=[SayHelloWorkflow],
        activities=[say_hello],
    )

    # Run worker
    async with worker_instance:
        # Worker runs until context exits
        await asyncio.sleep(60)

if __name__ == "__main__":
    asyncio.run(main())

Worker Execution and Lifecycle

Worker.run() Method

Start the worker and wait for shutdown:

async def run(self) -> None:
    """Run the worker and wait on it to be shut down.

    This will not return until shutdown is complete. This means that
    activities have all completed after being told to cancel after the
    graceful timeout period.

    This method will raise if there is a worker fatal error. While
    shutdown() does not need to be invoked in this case, it is
    harmless to do so. Otherwise, to shut down this worker, invoke
    shutdown().

    Technically this worker can be shutdown by issuing a cancel to this
    async function assuming that it is currently running. A cancel could
    also cancel the shutdown process. Therefore users are encouraged to use
    explicit shutdown instead.
    """

Worker Properties

@property
def is_running(self) -> bool:
    """Whether the worker is running.

    This is only True if the worker has been started and not yet
    shut down.
    """

@property
def is_shutdown(self) -> bool:
    """Whether the worker has run and shut down.

    This is only True if the worker was once started and then shutdown.
    This is not necessarily True after shutdown() is first
    called because the shutdown process can take a bit.
    """

Worker Shutdown

async def shutdown(self) -> None:
    """Initiate a worker shutdown and wait until complete.

    This can be called before the worker has even started and is safe for
    repeated invocations. It simply sets a marker informing the worker to
    shut down as it runs.

    This will not return until the worker has completed shutting down.
    """

Async Context Manager

async def __aenter__(self) -> Worker:
    """Start the worker and return self for use by async with.

    This is a wrapper around run(). Please review that method.

    This takes a similar approach to asyncio.timeout() in that it
    will cancel the current task if there is a fatal worker error and raise
    that error out of the context manager. However, if the inner async code
    swallows/wraps the CancelledError, the exiting
    portion of the context manager will not raise the fatal worker error.
    """

async def __aexit__(
    self,
    exc_type: Optional[Type[BaseException]],
    *args
) -> None:
    """Exit the context manager and shutdown the worker."""

Lifecycle Example

import asyncio
from temporalio import worker

async def explicit_lifecycle():
    # Create worker
    worker_instance = worker.Worker(
        client,
        task_queue="my-task-queue",
        workflows=[MyWorkflow],
        activities=[my_activity],
    )

    try:
        # Start worker in background
        worker_task = asyncio.create_task(worker_instance.run())

        # Do other work
        await asyncio.sleep(10)

        # Shutdown worker
        await worker_instance.shutdown()

        # Wait for worker task to complete
        await worker_task

    except Exception as e:
        print(f"Worker failed: {e}")
        await worker_instance.shutdown()

async def context_manager_lifecycle():
    # Using async context manager
    async with worker.Worker(
        client,
        task_queue="my-task-queue",
        workflows=[MyWorkflow],
        activities=[my_activity],
    ) as worker_instance:
        # Worker runs until context exits
        await asyncio.sleep(10)
    # Worker automatically shuts down here

Workflow and Activity Registration

Registration Patterns

Workers automatically discover and register workflows and activities:

# Direct registration
worker_instance = worker.Worker(
    client,
    task_queue="my-task-queue",
    workflows=[WorkflowClass1, WorkflowClass2],
    activities=[activity_func1, activity_func2, activity_method],
)

# Dynamic registration
def discover_workflows():
    import importlib
    import inspect

    workflows = []
    # Dynamically import and discover workflow classes
    module = importlib.import_module("my_workflows")
    for name, obj in inspect.getmembers(module, inspect.isclass):
        if hasattr(obj, "__temporal_workflow_definition__"):
            workflows.append(obj)
    return workflows

def discover_activities():
    activities = []
    # Dynamically import and discover activity functions
    module = importlib.import_module("my_activities")
    for name, obj in inspect.getmembers(module, inspect.isfunction):
        if hasattr(obj, "__temporal_activity_definition__"):
            activities.append(obj)
    return activities

# Use discovered workflows and activities
worker_instance = worker.Worker(
    client,
    task_queue="my-task-queue",
    workflows=discover_workflows(),
    activities=discover_activities(),
)

Nexus Service Registration

# Nexus service handlers (experimental)
import nexusrpc.handler

@nexusrpc.handler.service_handler
class MyNexusService:
    @nexusrpc.handler.operation_handler
    async def my_operation(self, input: str) -> str:
        return f"Processed: {input}"

worker_instance = worker.Worker(
    client,
    task_queue="my-task-queue",
    nexus_service_handlers=[MyNexusService()],
)

Polling and Execution Behavior

PollerBehavior Types

Control how the worker polls for tasks:

PollerBehavior = Union[PollerBehaviorSimpleMaximum, PollerBehaviorAutoscaling]

PollerBehaviorSimpleMaximum

Simple fixed maximum poller behavior:

@dataclass(frozen=True)
class PollerBehaviorSimpleMaximum:
    """A poller behavior that will attempt to poll as long as a slot is available, up to the
    provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.
    """

    maximum: int = 5
    """Maximum number of concurrent pollers."""

    def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:
        """Convert to bridge poller behavior."""

PollerBehaviorAutoscaling

Dynamic autoscaling poller behavior:

@dataclass(frozen=True)
class PollerBehaviorAutoscaling:
    """A poller behavior that will automatically scale the number of pollers based on feedback
    from the server. A slot must be available before beginning polling.
    """

    minimum: int = 1
    """At least this many poll calls will always be attempted (assuming slots are available)."""

    maximum: int = 100
    """At most this many poll calls will ever be open at once. Must be >= minimum."""

    initial: int = 5
    """This many polls will be attempted initially before scaling kicks in. Must be between
       minimum and maximum."""

    def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:
        """Convert to bridge poller behavior."""

Poller Configuration Example

from temporalio.worker import (
    PollerBehaviorSimpleMaximum,
    PollerBehaviorAutoscaling,
    Worker
)

# Simple maximum polling
worker_simple = Worker(
    client,
    task_queue="simple-queue",
    workflows=[MyWorkflow],
    workflow_task_poller_behavior=PollerBehaviorSimpleMaximum(maximum=10),
    activity_task_poller_behavior=PollerBehaviorSimpleMaximum(maximum=5),
)

# Autoscaling polling
worker_autoscaling = Worker(
    client,
    task_queue="autoscaling-queue",
    workflows=[MyWorkflow],
    workflow_task_poller_behavior=PollerBehaviorAutoscaling(
        minimum=2,
        maximum=20,
        initial=5
    ),
    activity_task_poller_behavior=PollerBehaviorAutoscaling(
        minimum=1,
        maximum=10,
        initial=3
    ),
)

Interceptors and Plugins

Base Interceptor Class

class Interceptor:
    """Interceptor for workers.

    This should be extended by any worker interceptors.
    """

    def intercept_activity(
        self, next: ActivityInboundInterceptor
    ) -> ActivityInboundInterceptor:
        """Method called for intercepting an activity.

        Args:
            next: The underlying inbound interceptor this interceptor should
                delegate to.

        Returns:
            The new interceptor that will be used to for the activity.
        """
        return next

    def workflow_interceptor_class(
        self, input: WorkflowInterceptorClassInput
    ) -> Optional[Type[WorkflowInboundInterceptor]]:
        """Class that will be instantiated and used to intercept workflows.

        This method is called on workflow start. The class must have the same
        init as WorkflowInboundInterceptor.__init__. The input can be
        altered to do things like add additional extern functions.

        Args:
            input: Input to this method that contains mutable properties that
                can be altered by this interceptor.

        Returns:
            The class to construct to intercept each workflow.
        """
        return None

Activity Interceptors

ActivityInboundInterceptor

class ActivityInboundInterceptor:
    """Inbound interceptor to wrap outbound creation and activity execution.

    This should be extended by any activity inbound interceptors.
    """

    def __init__(self, next: ActivityOutboundInterceptor) -> None:
        """Create activity inbound interceptor."""
        self.next = next

    async def execute_activity(self, input: ExecuteActivityInput) -> Any:
        """Called when executing an activity.

        Args:
            input: Activity execution input.

        Returns:
            Result of activity execution.
        """
        return await self.next.execute_activity(input)

ActivityOutboundInterceptor

class ActivityOutboundInterceptor:
    """Outbound interceptor for activities.

    This should be extended by any activity outbound interceptors.
    """

    async def execute_activity(self, input: ExecuteActivityInput) -> Any:
        """Execute an activity.

        Args:
            input: Activity execution input.

        Returns:
            Result of activity execution.
        """

Workflow Interceptors

WorkflowInboundInterceptor

class WorkflowInboundInterceptor:
    """Inbound interceptor to wrap outbound calls and workflow execution.

    This should be extended by any workflow inbound interceptors.
    """

    def __init__(self, next: WorkflowOutboundInterceptor) -> None:
        """Create workflow inbound interceptor."""
        self.next = next

    async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
        """Called when executing a workflow.

        Args:
            input: Workflow execution input.

        Returns:
            Result of workflow execution.
        """
        return await self.next.execute_workflow(input)

    async def handle_signal(self, input: HandleSignalInput) -> None:
        """Called when handling a signal."""
        await self.next.handle_signal(input)

    async def handle_query(self, input: HandleQueryInput) -> Any:
        """Called when handling a query."""
        return await self.next.handle_query(input)

    async def handle_update(self, input: HandleUpdateInput) -> Any:
        """Called when handling an update."""
        return await self.next.handle_update(input)

WorkflowOutboundInterceptor

class WorkflowOutboundInterceptor:
    """Outbound interceptor for workflows.

    This should be extended by any workflow outbound interceptors.
    """

    async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
        """Execute a workflow."""

    async def start_activity(self, input: StartActivityInput) -> ActivityHandle:
        """Start an activity."""

    async def start_local_activity(self, input: StartLocalActivityInput) -> ActivityHandle:
        """Start a local activity."""

    async def start_child_workflow(self, input: StartChildWorkflowInput) -> ChildWorkflowHandle:
        """Start a child workflow."""

Input Classes for Interceptors

@dataclass
class ExecuteActivityInput:
    """Input for ActivityInboundInterceptor.execute_activity."""

    fn: Callable[..., Any]
    """The activity function to execute."""

    args: Sequence[Any]
    """Arguments to pass to the activity function."""

    executor: Optional[concurrent.futures.Executor]
    """Executor to run the activity in."""

    headers: Mapping[str, temporalio.api.common.v1.Payload]
    """Headers for the activity."""

@dataclass
class ExecuteWorkflowInput:
    """Input for WorkflowInboundInterceptor.execute_workflow."""

    run_fn: Callable[..., Awaitable[Any]]
    """The workflow run function."""

    args: Sequence[Any]
    """Arguments to pass to the workflow run function."""

@dataclass
class HandleSignalInput:
    """Input for WorkflowInboundInterceptor.handle_signal."""

    signal: str
    """Name of the signal."""

    args: Sequence[Any]
    """Arguments for the signal."""

@dataclass
class HandleQueryInput:
    """Input for WorkflowInboundInterceptor.handle_query."""

    query: str
    """Name of the query."""

    args: Sequence[Any]
    """Arguments for the query."""

@dataclass
class HandleUpdateInput:
    """Input for WorkflowInboundInterceptor.handle_update."""

    update: str
    """Name of the update."""

    args: Sequence[Any]
    """Arguments for the update."""

@dataclass
class StartActivityInput:
    """Input for WorkflowOutboundInterceptor.start_activity."""

    activity: str
    """Activity name or function."""

    args: Sequence[Any]
    """Arguments to pass to the activity."""

    activity_id: Optional[str]
    """ID for the activity."""

    task_queue: Optional[str]
    """Task queue for the activity."""

@dataclass
class StartChildWorkflowInput:
    """Input for WorkflowOutboundInterceptor.start_child_workflow."""

    workflow: str
    """Workflow name or class."""

    args: Sequence[Any]
    """Arguments to pass to the workflow."""

    id: Optional[str]
    """ID for the child workflow."""

    task_queue: Optional[str]
    """Task queue for the child workflow."""

@dataclass
class WorkflowInterceptorClassInput:
    """Input for Interceptor.workflow_interceptor_class."""

    unsafe_extern_functions: MutableMapping[str, Callable]
    """Set of external functions that can be called from the sandbox.

    WARNING: Exposing external functions to the workflow sandbox is dangerous and
    should be avoided. Use at your own risk.
    """

Plugin System

class Plugin:
    """Plugin base class.

    Plugins can be used to extend worker functionality and are applied
    during worker initialization.
    """

    def init_worker_plugin(self, root_plugin: Plugin) -> None:
        """Initialize the plugin with the root plugin."""
        pass

    def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
        """Configure worker settings.

        Args:
            config: The worker configuration to modify.

        Returns:
            Modified worker configuration.
        """
        return config

    def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
        """Configure replayer settings.

        Args:
            config: The replayer configuration to modify.

        Returns:
            Modified replayer configuration.
        """
        return config

Interceptor Example

from temporalio.worker import Interceptor, ActivityInboundInterceptor, ExecuteActivityInput
import logging

class LoggingInterceptor(Interceptor):
    """Example interceptor that logs activity execution."""

    def intercept_activity(self, next: ActivityInboundInterceptor) -> ActivityInboundInterceptor:
        return LoggingActivityInboundInterceptor(next)

class LoggingActivityInboundInterceptor(ActivityInboundInterceptor):
    async def execute_activity(self, input: ExecuteActivityInput) -> Any:
        activity_name = getattr(input.fn, '__temporal_activity_definition__', {}).get('name', str(input.fn))

        logging.info(f"Starting activity: {activity_name}")
        try:
            result = await super().execute_activity(input)
            logging.info(f"Completed activity: {activity_name}")
            return result
        except Exception as e:
            logging.error(f"Activity failed: {activity_name}, error: {e}")
            raise

# Use the interceptor
worker_instance = worker.Worker(
    client,
    task_queue="my-task-queue",
    workflows=[MyWorkflow],
    activities=[my_activity],
    interceptors=[LoggingInterceptor()],
)

Worker Tuning and Resource Management

WorkerTuner

Base class for worker tuning:

class WorkerTuner(ABC):
    """WorkerTuners allow for the dynamic customization of some aspects of worker configuration"""

    @staticmethod
    def create_resource_based(
        *,
        target_memory_usage: float,
        target_cpu_usage: float,
        workflow_config: Optional[ResourceBasedSlotConfig] = None,
        activity_config: Optional[ResourceBasedSlotConfig] = None,
        local_activity_config: Optional[ResourceBasedSlotConfig] = None,
    ) -> "WorkerTuner":
        """Create a resource-based tuner with the provided options."""

    @staticmethod
    def create_fixed(
        *,
        workflow_slots: Optional[int],
        activity_slots: Optional[int],
        local_activity_slots: Optional[int],
    ) -> "WorkerTuner":
        """Create a fixed-size tuner with the provided number of slots. Any unspecified slots will default to 100."""

    @staticmethod
    def create_composite(
        *,
        workflow_supplier: SlotSupplier,
        activity_supplier: SlotSupplier,
        local_activity_supplier: SlotSupplier,
    ) -> "WorkerTuner":
        """Create a tuner composed of the provided slot suppliers."""

    @abstractmethod
    def _get_workflow_task_slot_supplier(self) -> SlotSupplier:
        """Get the slot supplier for workflow tasks."""

    @abstractmethod
    def _get_activity_task_slot_supplier(self) -> SlotSupplier:
        """Get the slot supplier for activity tasks."""

    @abstractmethod
    def _get_local_activity_task_slot_supplier(self) -> SlotSupplier:
        """Get the slot supplier for local activity tasks."""

Slot Suppliers

FixedSizeSlotSupplier

@dataclass(frozen=True)
class FixedSizeSlotSupplier:
    """A fixed-size slot supplier that will never issue more than a fixed number of slots."""

    num_slots: int
    """The maximum number of slots that can be issued"""

ResourceBasedSlotSupplier

@dataclass(frozen=True)
class ResourceBasedSlotSupplier:
    """A slot supplier that will dynamically adjust the number of slots based on resource usage.

    WARNING: The resource based tuner is currently experimental.
    """

    slot_config: ResourceBasedSlotConfig
    """Configuration for this slot supplier."""

    tuner_config: ResourceBasedTunerConfig
    """Options for the tuner that will be used to adjust the number of slots. When used with a
    CompositeTuner, all resource-based slot suppliers must use the same tuner options."""

@dataclass(frozen=True)
class ResourceBasedTunerConfig:
    """Options for a ResourceBasedTuner or a ResourceBasedSlotSupplier.

    WARNING: The resource based tuner is currently experimental.
    """

    target_memory_usage: float
    """A value between 0 and 1 that represents the target (system) memory usage. It's not recommended
       to set this higher than 0.8, since how much memory a workflow may use is not predictable, and
       you don't want to encounter OOM errors."""

    target_cpu_usage: float
    """A value between 0 and 1 that represents the target (system) CPU usage. This can be set to 1.0
       if desired, but it's recommended to leave some headroom for other processes."""

@dataclass(frozen=True)
class ResourceBasedSlotConfig:
    """Options for a specific slot type being used with a ResourceBasedSlotSupplier.

    WARNING: The resource based tuner is currently experimental.
    """

    minimum_slots: Optional[int] = None
    """Amount of slots that will be issued regardless of any other checks. Defaults to 5 for workflows and 1 for
    activities."""

    maximum_slots: Optional[int] = None
    """Maximum amount of slots permitted. Defaults to 500."""

    ramp_throttle: Optional[timedelta] = None
    """Minimum time we will wait (after passing the minimum slots number) between handing out new slots in milliseconds.
    Defaults to 0 for workflows and 50ms for activities.

    This value matters because how many resources a task will use cannot be determined ahead of time, and thus the
    system should wait to see how much resources are used before issuing more slots."""

CustomSlotSupplier

class CustomSlotSupplier(ABC):
    """This class can be implemented to provide custom slot supplier behavior.

    WARNING: Custom slot suppliers are currently experimental.
    """

    @abstractmethod
    async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:
        """This function is called before polling for new tasks. Your implementation must block until a
        slot is available then return a permit to use that slot.

        The only acceptable exception to throw is asyncio.CancelledError, as invocations of this method may
        be cancelled. Any other exceptions thrown will be logged and ignored.

        Args:
            ctx: The context for slot reservation.

        Returns:
            A permit to use the slot which may be populated with your own data.
        """

    @abstractmethod
    def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]:
        """This function is called when trying to reserve slots for "eager" workflow and activity tasks.
        Eager tasks are those which are returned as a result of completing a workflow task, rather than
        from polling. Your implementation must not block, and if a slot is available, return a permit
        to use that slot.

        Args:
            ctx: The context for slot reservation.

        Returns:
            Maybe a permit to use the slot which may be populated with your own data.
        """

    @abstractmethod
    def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:
        """This function is called once a slot is actually being used to process some task, which may be
        some time after the slot was reserved originally.

        Args:
            ctx: The context for marking a slot as used.
        """

    @abstractmethod
    def release_slot(self, ctx: SlotReleaseContext) -> None:
        """This function is called once a permit is no longer needed.

        Args:
            ctx: The context for releasing a slot.
        """

Slot Management Classes

class SlotPermit:
    """A permit to use a slot for a workflow/activity/local activity task.

    You can inherit from this class to add your own data to the permit.

    WARNING: Custom slot suppliers are currently experimental.
    """

class SlotReserveContext(Protocol):
    """Context for reserving a slot from a CustomSlotSupplier.

    WARNING: Custom slot suppliers are currently experimental.
    """

    slot_type: Literal["workflow", "activity", "local-activity"]
    """The type of slot trying to be reserved. Always one of "workflow", "activity", or "local-activity"."""

    task_queue: str
    """The name of the task queue for which this reservation request is associated."""

    worker_identity: str
    """The identity of the worker that is requesting the reservation."""

    worker_build_id: str
    """The build id of the worker that is requesting the reservation."""

    worker_deployment_version: Optional[WorkerDeploymentVersion]
    """The deployment version of the worker that is requesting the reservation, if any."""

    is_sticky: bool
    """True iff this is a reservation for a sticky poll for a workflow task."""

@dataclass(frozen=True)
class SlotMarkUsedContext(Protocol):
    """Context for marking a slot used from a CustomSlotSupplier.

    WARNING: Custom slot suppliers are currently experimental.
    """

    slot_info: SlotInfo
    """Info about the task that will be using the slot."""

    permit: SlotPermit
    """The permit that was issued when the slot was reserved."""

@dataclass(frozen=True)
class SlotReleaseContext:
    """Context for releasing a slot from a CustomSlotSupplier.

    WARNING: Custom slot suppliers are currently experimental.
    """

    slot_info: Optional[SlotInfo]
    """Info about the task that will be using the slot. May be None if the slot was never used."""

    permit: SlotPermit
    """The permit that was issued when the slot was reserved."""

Slot Info Types

@runtime_checkable
class WorkflowSlotInfo(Protocol):
    """Info about a workflow task slot usage.

    WARNING: Custom slot suppliers are currently experimental.
    """

    workflow_type: str
    is_sticky: bool

@runtime_checkable
class ActivitySlotInfo(Protocol):
    """Info about an activity task slot usage.

    WARNING: Custom slot suppliers are currently experimental.
    """

    activity_type: str

@runtime_checkable
class LocalActivitySlotInfo(Protocol):
    """Info about a local activity task slot usage.

    WARNING: Custom slot suppliers are currently experimental.
    """

    activity_type: str

SlotInfo = Union[WorkflowSlotInfo, ActivitySlotInfo, LocalActivitySlotInfo]

Tuning Examples

from temporalio.worker import WorkerTuner, ResourceBasedSlotConfig

# Fixed-size tuning
fixed_tuner = WorkerTuner.create_fixed(
    workflow_slots=50,
    activity_slots=100,
    local_activity_slots=200,
)

# Resource-based tuning
resource_tuner = WorkerTuner.create_resource_based(
    target_memory_usage=0.7,  # Target 70% memory usage
    target_cpu_usage=0.8,     # Target 80% CPU usage
    workflow_config=ResourceBasedSlotConfig(
        minimum_slots=5,
        maximum_slots=100,
        ramp_throttle=timedelta(milliseconds=0),
    ),
    activity_config=ResourceBasedSlotConfig(
        minimum_slots=10,
        maximum_slots=500,
        ramp_throttle=timedelta(milliseconds=50),
    ),
)

# Custom slot supplier
class CustomSlotSupplierImpl(CustomSlotSupplier):
    def __init__(self, max_slots: int):
        self._max_slots = max_slots
        self._current_slots = 0
        self._lock = asyncio.Lock()
        self._condition = asyncio.Condition(self._lock)

    async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:
        async with self._condition:
            while self._current_slots >= self._max_slots:
                await self._condition.wait()

            self._current_slots += 1
            return SlotPermit()

    def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]:
        if self._current_slots < self._max_slots:
            self._current_slots += 1
            return SlotPermit()
        return None

    def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:
        # Track slot usage if needed
        pass

    def release_slot(self, ctx: SlotReleaseContext) -> None:
        async def release():
            async with self._condition:
                self._current_slots -= 1
                self._condition.notify()

        # Schedule release in event loop
        asyncio.create_task(release())

# Use custom tuner
custom_tuner = WorkerTuner.create_composite(
    workflow_supplier=CustomSlotSupplierImpl(max_slots=20),
    activity_supplier=CustomSlotSupplierImpl(max_slots=50),
    local_activity_supplier=CustomSlotSupplierImpl(max_slots=100),
)

# Apply tuner to worker
worker_instance = Worker(
    client,
    task_queue="tuned-queue",
    workflows=[MyWorkflow],
    activities=[my_activity],
    tuner=resource_tuner,
)

Workflow Replay and Testing

Replayer Class

Test workflow compatibility with historical executions:

class Replayer:
    """Replayer to replay workflows from history."""

    def __init__(
        self,
        *,
        workflows: Sequence[Type],
        workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None,
        workflow_runner: WorkflowRunner = SandboxedWorkflowRunner(),
        unsandboxed_workflow_runner: WorkflowRunner = UnsandboxedWorkflowRunner(),
        namespace: str = "ReplayNamespace",
        data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
        interceptors: Sequence[Interceptor] = [],
        plugins: Sequence[temporalio.worker.Plugin] = [],
        build_id: Optional[str] = None,
        identity: Optional[str] = None,
        workflow_failure_exception_types: Sequence[Type[BaseException]] = [],
        debug_mode: bool = False,
        runtime: Optional[temporalio.runtime.Runtime] = None,
        disable_safe_workflow_eviction: bool = False,
        header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC,
    ) -> None:
        """Create a replayer to replay workflows from history.

        Most of the same arguments need to be passed to the replayer that were passed
        to the worker when the workflow originally ran.

        Note, unlike the worker, for the replayer the workflow_task_executor
        will default to a new thread pool executor with no max_workers set that
        will be shared across all replay calls and never explicitly shut down.
        Users are encouraged to provide their own if needing more control.
        """

    def config(self) -> ReplayerConfig:
        """Config, as a dictionary, used to create this replayer.

        Returns:
            Configuration, shallow-copied.
        """

    async def replay_workflow(
        self,
        history: temporalio.client.WorkflowHistory,
        *,
        raise_on_replay_failure: bool = True,
    ) -> WorkflowReplayResult:
        """Replay a workflow for the given history.

        Args:
            history: The history to replay. Can be fetched directly, or use
                WorkflowHistory.from_json to parse a history downloaded via
                tctl or the web UI.
            raise_on_replay_failure: If True (the default), this will raise
                a WorkflowReplayResult.replay_failure if it is present.
        """

    async def replay_workflows(
        self,
        histories: AsyncIterator[temporalio.client.WorkflowHistory],
        *,
        raise_on_replay_failure: bool = True,
    ) -> WorkflowReplayResults:
        """Replay workflows for the given histories.

        This is a shortcut for workflow_replay_iterator that iterates
        all results and aggregates information about them.

        Args:
            histories: The histories to replay, from an async iterator.
            raise_on_replay_failure: If True (the default), this will raise
                the first replay failure seen.
        """

    @asynccontextmanager
    async def workflow_replay_iterator(
        self,
        histories: AsyncIterator[temporalio.client.WorkflowHistory],
    ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
        """Create an async context manager for replaying workflows.

        Args:
            histories: The histories to replay, from an async iterator.

        Returns:
            Async context manager that yields an iterator of replay results.
        """

ReplayerConfig

class ReplayerConfig(TypedDict, total=False):
    """TypedDict of config originally passed to Replayer."""

    workflows: Sequence[Type]
    workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor]
    workflow_runner: WorkflowRunner
    unsandboxed_workflow_runner: WorkflowRunner
    namespace: str
    data_converter: temporalio.converter.DataConverter
    interceptors: Sequence[Interceptor]
    build_id: Optional[str]
    identity: Optional[str]
    workflow_failure_exception_types: Sequence[Type[BaseException]]
    debug_mode: bool
    runtime: Optional[temporalio.runtime.Runtime]
    disable_safe_workflow_eviction: bool
    header_codec_behavior: HeaderCodecBehavior

WorkflowReplayResult

@dataclass(frozen=True)
class WorkflowReplayResult:
    """Single workflow replay result."""

    history: temporalio.client.WorkflowHistory
    """History originally passed for this workflow replay."""

    replay_failure: Optional[Exception]
    """Failure during replay if any.

    This does not mean your workflow exited by raising an error, but rather that
    some task failure such as NondeterminismError was encountered during
    replay - likely indicating your workflow code is incompatible with the
    history.
    """

WorkflowReplayResults

@dataclass(frozen=True)
class WorkflowReplayResults:
    """Results of replaying multiple workflows."""

    replay_failures: Mapping[str, Exception]
    """Replay failures, keyed by run ID."""

Replay Testing Example

import asyncio
from temporalio import client
from temporalio.worker import Replayer, WorkflowReplayResult

async def test_workflow_replay():
    # Create client to fetch workflow history
    client_instance = await client.Client.connect("localhost:7233")

    # Get workflow history
    workflow_handle = client_instance.get_workflow_handle("my-workflow-id")
    history = await workflow_handle.fetch_history()

    # Create replayer with current workflow implementation
    replayer = Replayer(
        workflows=[MyWorkflow],  # Current version of workflow
        namespace="my-namespace",
    )

    # Test replay compatibility
    try:
        result = await replayer.replay_workflow(history)
        print("Replay successful - workflow is compatible")
        return True
    except Exception as e:
        print(f"Replay failed - workflow incompatible: {e}")
        return False

async def test_multiple_workflows():
    # Create replayer
    replayer = Replayer(workflows=[MyWorkflow])

    # Create async iterator of histories
    async def history_generator():
        for workflow_id in ["wf-1", "wf-2", "wf-3"]:
            handle = client_instance.get_workflow_handle(workflow_id)
            history = await handle.fetch_history()
            yield history

    # Replay all workflows
    results = await replayer.replay_workflows(
        history_generator(),
        raise_on_replay_failure=False  # Don't raise on first failure
    )

    if results.replay_failures:
        print(f"Replay failures: {len(results.replay_failures)}")
        for run_id, error in results.replay_failures.items():
            print(f"  {run_id}: {error}")
    else:
        print("All replays successful")

# Test with history from JSON file
async def test_replay_from_json():
    # Load history from tctl export or web UI download
    with open("workflow_history.json", "r") as f:
        history_json = f.read()

    history = temporalio.client.WorkflowHistory.from_json(
        workflow_id="my-workflow-id",
        json_str=history_json
    )

    replayer = Replayer(workflows=[MyWorkflow])
    result = await replayer.replay_workflow(history)

    return result.replay_failure is None

Advanced Worker Features

Shared State Management

Cross-process synchronization for non-async activities:

class SharedStateManager:
    """Used for obtaining cross-process friendly synchronization primitives.

    This is required for non-async activities where the activity_executor
    is not a ThreadPoolExecutor. Reuse of these across workers is encouraged.
    """

Shared Heartbeat Sender

Coordinated heartbeat management:

class SharedHeartbeatSender:
    """Shared heartbeat sender for activities.

    Used to coordinate heartbeat sending across multiple workers or processes.
    """

Workflow Runners

WorkflowRunner

Base class for workflow execution:

class WorkflowRunner:
    """Base class for workflow runners."""

SandboxedWorkflowRunner

Default sandboxed workflow execution:

class SandboxedWorkflowRunner(WorkflowRunner):
    """Sandboxed workflow runner.

    Provides isolation and deterministic execution for workflows.
    """

UnsandboxedWorkflowRunner

Unsandboxed workflow execution for special cases:

class UnsandboxedWorkflowRunner(WorkflowRunner):
    """Unsandboxed workflow runner.

    Allows workflows to opt-out of sandboxing restrictions.
    WARNING: Use with caution as this can break workflow determinism.
    """

Workflow Instance Management

class WorkflowInstance:
    """Workflow instance representation."""

class WorkflowInstanceDetails:
    """Detailed information about a workflow instance."""

Build ID Management

def load_default_build_id(*, memoize: bool = True) -> str:
    """Load the default worker build ID.

    The worker build ID is a unique hash representing the entire set of code
    including Temporal code and external code. The default here is currently
    implemented by walking loaded modules and hashing their bytecode into a
    common hash.

    Args:
        memoize: If true, the default, this will cache to a global variable to
            keep from having to run again on successive calls.

    Returns:
        Unique identifier representing the set of running code.
    """

Advanced Configuration Example

from temporalio import worker, client
from temporalio.worker import (
    SharedStateManager,
    UnsandboxedWorkflowRunner,
    PollerBehaviorAutoscaling,
    WorkerTuner
)
import concurrent.futures
from datetime import timedelta

async def create_advanced_worker():
    client_instance = await client.Client.connect("localhost:7233")

    # Custom executors
    activity_executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=50,
        thread_name_prefix="activity-"
    )

    workflow_executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=20,
        thread_name_prefix="workflow-"
    )

    # Resource-based tuning
    tuner = WorkerTuner.create_resource_based(
        target_memory_usage=0.75,
        target_cpu_usage=0.85,
    )

    # Custom error handler
    async def handle_fatal_error(error: Exception):
        print(f"Worker fatal error: {error}")
        # Could send alerts, write to log files, etc.

    # Advanced worker configuration
    worker_instance = worker.Worker(
        client_instance,
        task_queue="advanced-task-queue",
        workflows=[MyWorkflow, MyOtherWorkflow],
        activities=[my_activity, my_other_activity],
        activity_executor=activity_executor,
        workflow_task_executor=workflow_executor,
        workflow_runner=worker.workflow_sandbox.SandboxedWorkflowRunner(),
        unsandboxed_workflow_runner=UnsandboxedWorkflowRunner(),
        tuner=tuner,
        max_cached_workflows=2000,
        workflow_task_poller_behavior=PollerBehaviorAutoscaling(
            minimum=3,
            maximum=30,
            initial=10
        ),
        activity_task_poller_behavior=PollerBehaviorAutoscaling(
            minimum=5,
            maximum=50,
            initial=15
        ),
        sticky_queue_schedule_to_start_timeout=timedelta(seconds=30),
        max_heartbeat_throttle_interval=timedelta(seconds=120),
        default_heartbeat_throttle_interval=timedelta(seconds=45),
        max_activities_per_second=100.0,
        graceful_shutdown_timeout=timedelta(seconds=30),
        shared_state_manager=SharedStateManager(),
        debug_mode=False,
        disable_eager_activity_execution=False,
        on_fatal_error=handle_fatal_error,
        use_worker_versioning=True,
        build_id="my-app-v1.2.3-abc123",
        interceptors=[MyLoggingInterceptor(), MyMetricsInterceptor()],
    )

    return worker_instance

# Usage
async def main():
    worker_instance = await create_advanced_worker()

    try:
        print("Starting advanced worker...")
        await worker_instance.run()
    except KeyboardInterrupt:
        print("Shutting down worker...")
        await worker_instance.shutdown()
    finally:
        print("Worker shutdown complete")

if __name__ == "__main__":
    asyncio.run(main())

The Worker Management module provides comprehensive control over workflow and activity execution in Temporal applications. From basic worker setup to advanced tuning, interceptors, and replay testing, this module gives you the tools to build robust, scalable distributed systems with fine-grained control over resource management and execution behavior.

Install with Tessl CLI

npx tessl i tessl/pypi-temporalio

docs

activity.md

client.md

common.md

contrib-pydantic.md

data-conversion.md

exceptions.md

index.md

runtime.md

testing.md

worker.md

workflow.md

tile.json