CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-temporalio

Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.

Overview
Eval results
Files

testing.mddocs/

Testing

The temporalio.testing module provides comprehensive testing environments and utilities for testing Temporal workflows and activities. It includes specialized environments for both activity and workflow testing, with support for time manipulation, mocking, and comprehensive integration testing patterns.

Core Imports

from temporalio.testing import ActivityEnvironment, WorkflowEnvironment
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker

For advanced testing patterns:

from temporalio.testing import ActivityEnvironment, WorkflowEnvironment
from temporalio.common import RetryPolicy, SearchAttributeKey
from temporalio.exceptions import ActivityError, ApplicationError, CancelledError
from datetime import datetime, timedelta

Testing Environments

ActivityEnvironment

The ActivityEnvironment class provides an isolated environment for testing activity functions with complete control over the activity context and lifecycle.

class ActivityEnvironment:
    """Activity environment for testing activities.

    This environment is used for running activity code that can access the
    functions in the :py:mod:`temporalio.activity` module. Use :py:meth:`run` to
    run an activity function or any function within an activity context.

    Attributes:
        info: The info that is returned from :py:func:`temporalio.activity.info`
            function.
        on_heartbeat: Function called on each heartbeat invocation by the
            activity.
        payload_converter: Payload converter set on the activity context. This
            must be set before :py:meth:`run`. Changes after the activity has
            started do not take effect.
        metric_meter: Metric meter set on the activity context. This must be set
            before :py:meth:`run`. Changes after the activity has started do not
            take effect. Default is noop.
    """

    def __init__(self, client: Optional[Client] = None) -> None:
        """Create an ActivityEnvironment for running activity code.

        Args:
            client: Optional client to make available in activity context.
                Only available for async activities.
        """

    def cancel(
        self,
        cancellation_details: activity.ActivityCancellationDetails = activity.ActivityCancellationDetails(
            cancel_requested=True
        ),
    ) -> None:
        """Cancel the activity.

        Args:
            cancellation_details: Details about the cancellation. These will
                be accessible through temporalio.activity.cancellation_details()
                in the activity after cancellation.

        This only has an effect on the first call.
        """

    def worker_shutdown(self) -> None:
        """Notify the activity that the worker is shutting down.

        This only has an effect on the first call.
        """

    def run(
        self,
        fn: Callable[_Params, _Return],
        *args: _Params.args,
        **kwargs: _Params.kwargs,
    ) -> _Return:
        """Run the given callable in an activity context.

        Args:
            fn: The function/callable to run.
            args: All positional arguments to the callable.
            kwargs: All keyword arguments to the callable.

        Returns:
            The callable's result.
        """

WorkflowEnvironment

The WorkflowEnvironment class provides environments for testing workflows with different capabilities including time skipping and full server integration.

class WorkflowEnvironment:
    """Workflow environment for testing workflows.

    Most developers will want to use the static :py:meth:`start_time_skipping`
    to start a test server process that automatically skips time as needed.
    Alternatively, :py:meth:`start_local` may be used for a full, local Temporal
    server with more features. To use an existing server, use
    :py:meth:`from_client`.

    This environment is an async context manager, so it can be used with
    ``async with`` to make sure it shuts down properly. Otherwise,
    :py:meth:`shutdown` can be manually called.

    To use the environment, simply use the :py:attr:`client` on it.

    Workflows invoked on the workflow environment are automatically configured
    to have ``assert`` failures fail the workflow with the assertion error.
    """

    @staticmethod
    def from_client(client: Client) -> WorkflowEnvironment:
        """Create a workflow environment from the given client.

        :py:attr:`supports_time_skipping` will always return ``False`` for this
        environment. :py:meth:`sleep` will sleep the actual amount of time and
        :py:meth:`get_current_time` will return the current time.

        Args:
            client: The client to use for the environment.

        Returns:
            The workflow environment that runs against the given client.
        """

    @staticmethod
    async def start_local(
        *,
        namespace: str = "default",
        data_converter: converter.DataConverter = converter.DataConverter.default,
        interceptors: Sequence[client.Interceptor] = [],
        plugins: Sequence[client.Plugin] = [],
        default_workflow_query_reject_condition: Optional[
            common.QueryRejectCondition
        ] = None,
        retry_config: Optional[client.RetryConfig] = None,
        rpc_metadata: Mapping[str, str] = {},
        identity: Optional[str] = None,
        tls: bool | client.TLSConfig = False,
        ip: str = "127.0.0.1",
        port: Optional[int] = None,
        download_dest_dir: Optional[str] = None,
        ui: bool = False,
        runtime: Optional[runtime.Runtime] = None,
        search_attributes: Sequence[common.SearchAttributeKey] = (),
        dev_server_existing_path: Optional[str] = None,
        dev_server_database_filename: Optional[str] = None,
        dev_server_log_format: str = "pretty",
        dev_server_log_level: Optional[str] = "warn",
        dev_server_download_version: str = "default",
        dev_server_extra_args: Sequence[str] = [],
        dev_server_download_ttl: Optional[timedelta] = None,
    ) -> WorkflowEnvironment:
        """Start a full Temporal server locally, downloading if necessary.

        This environment is good for testing full server capabilities, but does
        not support time skipping like :py:meth:`start_time_skipping` does.
        :py:attr:`supports_time_skipping` will always return ``False`` for this
        environment. :py:meth:`sleep` will sleep the actual amount of time and
        :py:meth:`get_current_time` will return the current time.

        Args:
            namespace: Namespace name to use for this environment.
            data_converter: Data converter for serialization.
            interceptors: Client interceptors to apply.
            plugins: Client plugins to apply.
            default_workflow_query_reject_condition: Default query reject condition.
            retry_config: Retry configuration for client calls.
            rpc_metadata: Additional RPC metadata.
            identity: Client identity.
            tls: TLS configuration.
            ip: IP address to bind to, or 127.0.0.1 by default.
            port: Port number to bind to, or an OS-provided port by default.
            download_dest_dir: Directory to download binary to if needed.
            ui: If ``True``, will start a UI in the dev server.
            runtime: Specific runtime to use or default if unset.
            search_attributes: Search attributes to register with the dev server.
            dev_server_existing_path: Existing path to the CLI binary.
            dev_server_database_filename: Path to the Sqlite database to use.
            dev_server_log_format: Log format for the dev server.
            dev_server_log_level: Log level for the dev server.
            dev_server_download_version: Specific CLI version to download.
            dev_server_extra_args: Extra arguments for the CLI binary.
            dev_server_download_ttl: TTL for the downloaded CLI binary.

        Returns:
            The started CLI dev server workflow environment.
        """

    @staticmethod
    async def start_time_skipping(
        *,
        data_converter: converter.DataConverter = converter.DataConverter.default,
        interceptors: Sequence[client.Interceptor] = [],
        plugins: Sequence[client.Plugin] = [],
        default_workflow_query_reject_condition: Optional[
            common.QueryRejectCondition
        ] = None,
        retry_config: Optional[client.RetryConfig] = None,
        rpc_metadata: Mapping[str, str] = {},
        identity: Optional[str] = None,
        port: Optional[int] = None,
        download_dest_dir: Optional[str] = None,
        runtime: Optional[runtime.Runtime] = None,
        test_server_existing_path: Optional[str] = None,
        test_server_download_version: str = "default",
        test_server_extra_args: Sequence[str] = [],
        test_server_download_ttl: Optional[timedelta] = None,
    ) -> WorkflowEnvironment:
        """Start a time skipping workflow environment.

        By default, this environment will automatically skip to the next events
        in time when a workflow's
        :py:meth:`temporalio.client.WorkflowHandle.result` is awaited on (which
        includes :py:meth:`temporalio.client.Client.execute_workflow`). Before
        the result is awaited on, time can be manually skipped forward using
        :py:meth:`sleep`. The currently known time can be obtained via
        :py:meth:`get_current_time`.

        Args:
            data_converter: Data converter for serialization.
            interceptors: Client interceptors to apply.
            plugins: Client plugins to apply.
            default_workflow_query_reject_condition: Default query reject condition.
            retry_config: Retry configuration for client calls.
            rpc_metadata: Additional RPC metadata.
            identity: Client identity.
            port: Port number to bind to, or an OS-provided port by default.
            download_dest_dir: Directory to download binary to if needed.
            runtime: Specific runtime to use or default if unset.
            test_server_existing_path: Existing path to the test server binary.
            test_server_download_version: Specific test server version to download.
            test_server_extra_args: Extra arguments for the test server binary.
            test_server_download_ttl: TTL for the downloaded test server binary.

        Returns:
            The started workflow environment with time skipping.
        """

    @property
    def client(self) -> Client:
        """Client to this environment."""

    async def shutdown(self) -> None:
        """Shut down this environment."""

    async def sleep(self, duration: Union[timedelta, float]) -> None:
        """Sleep in this environment.

        This awaits a regular :py:func:`asyncio.sleep` in regular environments,
        or manually skips time in time-skipping environments.

        Args:
            duration: Amount of time to sleep.
        """

    async def get_current_time(self) -> datetime:
        """Get the current time known to this environment.

        For non-time-skipping environments this is simply the system time. For
        time-skipping environments this is whatever time has been skipped to.
        """

    @property
    def supports_time_skipping(self) -> bool:
        """Whether this environment supports time skipping."""

    @contextmanager
    def auto_time_skipping_disabled(self) -> Iterator[None]:
        """Disable any automatic time skipping if this is a time-skipping
        environment.

        This is a context manager for use via ``with``. Usually in time-skipping
        environments, waiting on a workflow result causes time to automatically
        skip until the next event. This can disable that. However, this only
        applies to results awaited inside this context. This will not disable
        automatic time skipping on previous results.

        This has no effect on non-time-skipping environments.
        """

Activity Testing

Basic Activity Testing

Test activities in isolation using ActivityEnvironment:

import asyncio
from temporalio import activity
from temporalio.testing import ActivityEnvironment

@activity.defn
async def process_data(data: str) -> str:
    activity.heartbeat(f"Processing: {data}")
    await asyncio.sleep(1)  # Simulate work
    return f"Processed: {data}"

async def test_process_data():
    # Create test environment
    env = ActivityEnvironment()

    # Track heartbeats
    heartbeats = []
    env.on_heartbeat = lambda *args: heartbeats.append(args[0])

    # Run activity
    result = await env.run(process_data, "test-data")

    assert result == "Processed: test-data"
    assert heartbeats == ["Processing: test-data"]

Testing Activity Cancellation

Test how activities handle cancellation:

from temporalio import activity
from temporalio.testing import ActivityEnvironment
from temporalio.exceptions import CancelledError

@activity.defn
async def cancellable_activity() -> str:
    try:
        # Wait indefinitely
        await asyncio.Future()
        return "completed"
    except asyncio.CancelledError:
        # Check cancellation details
        cancellation_details = activity.cancellation_details()
        if cancellation_details and cancellation_details.cancel_requested:
            activity.heartbeat("cancelled gracefully")
            raise
        return "unexpected cancellation"

async def test_activity_cancellation():
    env = ActivityEnvironment()
    heartbeats = []
    env.on_heartbeat = lambda *args: heartbeats.append(args[0])

    # Start activity
    task = asyncio.create_task(env.run(cancellable_activity))

    # Give it time to start
    await asyncio.sleep(0.1)

    # Cancel with details
    env.cancel(
        activity.ActivityCancellationDetails(cancel_requested=True)
    )

    # Verify cancellation handled properly
    result = await task
    assert heartbeats == ["cancelled gracefully"]

Testing Activity Context

Test activities that use activity context functions:

from temporalio import activity
from temporalio.testing import ActivityEnvironment
from temporalio.common import MetricMeter
from temporalio.converter import DataConverter

@activity.defn
async def context_aware_activity() -> dict:
    info = activity.info()
    converter = activity.payload_converter()
    meter = activity.metric_meter()

    return {
        "activity_id": info.activity_id,
        "workflow_id": info.workflow_id,
        "has_converter": converter is not None,
        "has_meter": meter is not None
    }

async def test_activity_context():
    env = ActivityEnvironment()

    # Configure custom context
    env.info = activity.Info(
        activity_id="test-activity",
        activity_type="context_aware_activity",
        workflow_id="test-workflow",
        # ... other required fields
    )
    env.payload_converter = DataConverter.default.payload_converter
    env.metric_meter = MetricMeter.noop

    result = await env.run(context_aware_activity)

    assert result["activity_id"] == "test-activity"
    assert result["workflow_id"] == "test-workflow"
    assert result["has_converter"] is True
    assert result["has_meter"] is True

Testing Synchronous Activities

Test synchronous activities with thread-based cancellation:

import threading
import time
from temporalio import activity
from temporalio.testing import ActivityEnvironment
from temporalio.exceptions import CancelledError

@activity.defn
def sync_activity(data: str) -> str:
    activity.heartbeat(f"Starting: {data}")

    # Simulate work with cancellation checks
    for i in range(10):
        if activity.is_cancelled():
            activity.heartbeat("Cancelled during processing")
            raise CancelledError()
        time.sleep(0.5)
        activity.heartbeat(f"Step {i}")

    return f"Done: {data}"

def test_sync_activity_cancellation():
    env = ActivityEnvironment()
    heartbeats = []
    env.on_heartbeat = lambda *args: heartbeats.append(args[0])

    # Use thread cancellation for sync activities
    waiting = threading.Event()
    result_holder = {}

    def run_activity():
        try:
            result_holder["result"] = env.run(sync_activity, "test")
        except CancelledError:
            result_holder["cancelled"] = True

    thread = threading.Thread(target=run_activity)
    thread.start()

    # Wait a bit then cancel
    time.sleep(1)
    env.cancel()
    thread.join()

    assert "cancelled" in result_holder
    assert "Starting: test" in heartbeats

Testing Activity with Client Access

Test activities that need access to the Temporal client:

from temporalio import activity
from temporalio.testing import ActivityEnvironment
from temporalio.client import Client
from unittest.mock import Mock

@activity.defn
async def client_activity() -> str:
    client = activity.client()
    # Use client for additional operations
    return f"Client namespace: {client.namespace}"

async def test_activity_with_client():
    # Create mock client
    mock_client = Mock(spec=Client)
    mock_client.namespace = "test-namespace"

    env = ActivityEnvironment(client=mock_client)
    result = await env.run(client_activity)

    assert result == "Client namespace: test-namespace"

async def test_activity_without_client():
    env = ActivityEnvironment()  # No client provided

    with pytest.raises(RuntimeError, match="No client available"):
        await env.run(client_activity)

Workflow Testing

Time-Skipping Environment

Test workflows with automatic time advancement:

from temporalio import workflow
from temporalio.testing import WorkflowEnvironment
from datetime import timedelta
import asyncio

@workflow.defn
class TimerWorkflow:
    @workflow.run
    async def run(self, duration_seconds: int) -> str:
        await asyncio.sleep(duration_seconds)
        return f"Slept for {duration_seconds} seconds"

async def test_timer_workflow():
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="test-queue",
            workflows=[TimerWorkflow]
        ) as worker:
            # Execute workflow - time will automatically skip
            result = await env.client.execute_workflow(
                TimerWorkflow.run,
                100,  # 100 seconds
                id="timer-test",
                task_queue="test-queue"
            )

            assert result == "Slept for 100 seconds"

            # Verify time has advanced
            current_time = await env.get_current_time()
            # Time should be approximately 100 seconds later

Manual Time Control

Test workflows with manual time manipulation:

@workflow.defn
class ScheduledWorkflow:
    @workflow.run
    async def run(self) -> list[str]:
        results = []

        for i in range(3):
            await asyncio.sleep(30)  # 30 seconds
            results.append(f"Step {i} at {workflow.now()}")

        return results

async def test_scheduled_workflow():
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="test-queue",
            workflows=[ScheduledWorkflow]
        ) as worker:
            # Start workflow
            handle = await env.client.start_workflow(
                ScheduledWorkflow.run,
                id="scheduled-test",
                task_queue="test-queue"
            )

            # Manually advance time and check progress
            await env.sleep(30)  # Skip 30 seconds

            # Workflow should still be running
            try:
                result = await asyncio.wait_for(handle.result(), timeout=0.1)
                assert False, "Workflow completed too early"
            except asyncio.TimeoutError:
                pass  # Expected

            # Skip more time to complete
            await env.sleep(70)  # Skip total 100 seconds (covers remaining 60s)

            result = await handle.result()
            assert len(result) == 3

Testing Signals and Queries

Test workflow signal and query handling:

@workflow.defn
class SignalWorkflow:
    def __init__(self):
        self.messages = []
        self.completed = False

    @workflow.run
    async def run(self) -> list[str]:
        # Wait for completion signal
        await workflow.wait_condition(lambda: self.completed)
        return self.messages

    @workflow.signal
    def add_message(self, message: str) -> None:
        self.messages.append(message)

    @workflow.signal
    def complete(self) -> None:
        self.completed = True

    @workflow.query
    def get_message_count(self) -> int:
        return len(self.messages)

async def test_signal_workflow():
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="test-queue",
            workflows=[SignalWorkflow]
        ) as worker:
            # Start workflow
            handle = await env.client.start_workflow(
                SignalWorkflow.run,
                id="signal-test",
                task_queue="test-queue"
            )

            # Send signals
            await handle.signal(SignalWorkflow.add_message, "Hello")
            await handle.signal(SignalWorkflow.add_message, "World")

            # Query current state
            count = await handle.query(SignalWorkflow.get_message_count)
            assert count == 2

            # Complete workflow
            await handle.signal(SignalWorkflow.complete)

            result = await handle.result()
            assert result == ["Hello", "World"]

Testing Child Workflows

Test workflows that spawn child workflows:

@workflow.defn
class ChildWorkflow:
    @workflow.run
    async def run(self, value: int) -> int:
        await asyncio.sleep(1)
        return value * 2

@workflow.defn
class ParentWorkflow:
    @workflow.run
    async def run(self, values: list[int]) -> list[int]:
        # Start child workflows
        child_handles = []
        for value in values:
            handle = await workflow.start_child_workflow(
                ChildWorkflow.run,
                value,
                id=f"child-{value}"
            )
            child_handles.append(handle)

        # Collect results
        results = []
        for handle in child_handles:
            result = await handle
            results.append(result)

        return results

async def test_child_workflows():
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="test-queue",
            workflows=[ParentWorkflow, ChildWorkflow]
        ) as worker:
            result = await env.client.execute_workflow(
                ParentWorkflow.run,
                [1, 2, 3, 4],
                id="parent-test",
                task_queue="test-queue"
            )

            assert result == [2, 4, 6, 8]

Testing Workflow Updates

Test workflow update handlers:

@workflow.defn
class UpdateableWorkflow:
    def __init__(self):
        self.counter = 0
        self.running = True

    @workflow.run
    async def run(self) -> int:
        await workflow.wait_condition(lambda: not self.running)
        return self.counter

    @workflow.update
    def increment(self, amount: int) -> int:
        self.counter += amount
        return self.counter

    @workflow.update
    def stop(self) -> None:
        self.running = False

async def test_workflow_updates():
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="test-queue",
            workflows=[UpdateableWorkflow]
        ) as worker:
            handle = await env.client.start_workflow(
                UpdateableWorkflow.run,
                id="update-test",
                task_queue="test-queue"
            )

            # Send updates
            result1 = await handle.execute_update(UpdateableWorkflow.increment, 5)
            assert result1 == 5

            result2 = await handle.execute_update(UpdateableWorkflow.increment, 3)
            assert result2 == 8

            # Stop workflow
            await handle.execute_update(UpdateableWorkflow.stop)

            final_result = await handle.result()
            assert final_result == 8

Time Management in Tests

Time Skipping Patterns

Control time advancement for deterministic testing:

@workflow.defn
class TimeBasedWorkflow:
    @workflow.run
    async def run(self) -> list[str]:
        events = []

        # Record initial time
        start_time = workflow.now()
        events.append(f"Started at {start_time}")

        # Wait various durations
        await asyncio.sleep(60)  # 1 minute
        events.append(f"After 1 minute: {workflow.now()}")

        await asyncio.sleep(3600)  # 1 hour
        events.append(f"After 1 hour: {workflow.now()}")

        return events

async def test_time_advancement():
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="test-queue",
            workflows=[TimeBasedWorkflow]
        ) as worker:
            # Record initial time
            initial_time = await env.get_current_time()

            # Execute workflow (time auto-advances)
            result = await env.client.execute_workflow(
                TimeBasedWorkflow.run,
                id="time-test",
                task_queue="test-queue"
            )

            # Verify time advancement
            final_time = await env.get_current_time()
            elapsed = final_time - initial_time

            # Should be approximately 1 hour and 1 minute
            assert abs(elapsed.total_seconds() - 3660) < 60

Disabling Auto Time Skipping

Test workflows with real-time progression when needed:

async def test_real_time_workflow():
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="test-queue",
            workflows=[TimeBasedWorkflow]
        ) as worker:
            with env.auto_time_skipping_disabled():
                start_real_time = monotonic()

                # This will take real time
                await env.client.execute_workflow(
                    TimeBasedWorkflow.run,
                    id="real-time-test",
                    task_queue="test-queue"
                )

                elapsed_real_time = monotonic() - start_real_time
                # Should have taken actual time (limited by workflow logic)

Manual Time Control

Precisely control time advancement for complex scenarios:

@workflow.defn
class ComplexTimingWorkflow:
    @workflow.run
    async def run(self) -> dict:
        results = {}

        # Phase 1: Quick operations
        for i in range(5):
            await asyncio.sleep(10)
            results[f"quick_{i}"] = workflow.now()

        # Phase 2: Long operation
        await asyncio.sleep(300)  # 5 minutes
        results["long_operation"] = workflow.now()

        return results

async def test_complex_timing():
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="test-queue",
            workflows=[ComplexTimingWorkflow]
        ) as worker:
            # Start workflow without auto-completion
            handle = await env.client.start_workflow(
                ComplexTimingWorkflow.run,
                id="complex-timing",
                task_queue="test-queue"
            )

            # Manually advance through quick phase
            for i in range(5):
                await env.sleep(10)
                # Could check intermediate state here

            # Advance through long operation
            await env.sleep(300)

            result = await handle.result()

            # Verify timing progression
            assert len(result) == 6
            assert "long_operation" in result

Nexus Testing

Testing Nexus Operations

Test Nexus service operations and handlers:

from temporalio import nexus
from temporalio.testing import WorkflowEnvironment
from temporalio.client import Client

# Define Nexus service
@nexus.service.defn
class CalculatorService:
    @nexus.operation.defn
    async def add(self, a: int, b: int) -> int:
        return a + b

    @nexus.operation.defn
    async def multiply(self, a: int, b: int) -> int:
        await asyncio.sleep(1)  # Simulate work
        return a * b

# Workflow that uses Nexus
@workflow.defn
class NexusWorkflow:
    @workflow.run
    async def run(self, x: int, y: int) -> dict:
        # Create Nexus client
        nexus_client = workflow.create_nexus_client(
            "calculator-service",
            CalculatorService
        )

        # Call operations
        sum_result = await nexus_client.add(x, y)
        product_result = await nexus_client.multiply(x, y)

        return {
            "sum": sum_result,
            "product": product_result
        }

async def test_nexus_operations():
    async with await WorkflowEnvironment.start_time_skipping() as env:
        # Create Nexus endpoint (test helper)
        endpoint = await create_test_nexus_endpoint(
            env.client,
            "calculator-service",
            "test-nexus-queue"
        )

        async with Worker(
            env.client,
            task_queue="test-queue",
            workflows=[NexusWorkflow]
        ) as workflow_worker:
            async with Worker(
                env.client,
                task_queue="test-nexus-queue",
                nexus_services=[CalculatorService()]
            ) as nexus_worker:
                result = await env.client.execute_workflow(
                    NexusWorkflow.run,
                    5, 3,
                    id="nexus-test",
                    task_queue="test-queue"
                )

                assert result["sum"] == 8
                assert result["product"] == 15

Mock Nexus Services

Test workflows with mocked Nexus dependencies:

from unittest.mock import AsyncMock

class MockCalculatorService:
    def __init__(self):
        self.add = AsyncMock(return_value=100)
        self.multiply = AsyncMock(return_value=200)

async def test_nexus_with_mocks():
    # This would require custom interceptors or test utilities
    # to inject mock services into the Nexus client resolution
    pass  # Implementation depends on specific mocking strategy

Testing Nexus Error Handling

Test how workflows handle Nexus operation failures:

@nexus.service.defn
class FailingService:
    @nexus.operation.defn
    async def unreliable_op(self, fail: bool) -> str:
        if fail:
            raise ApplicationError("Operation failed", type="ServiceError")
        return "success"

@workflow.defn
class ErrorHandlingWorkflow:
    @workflow.run
    async def run(self, should_fail: bool) -> str:
        nexus_client = workflow.create_nexus_client(
            "failing-service",
            FailingService
        )

        try:
            result = await nexus_client.unreliable_op(should_fail)
            return f"Success: {result}"
        except NexusOperationError as e:
            return f"Error: {e}"

async def test_nexus_error_handling():
    async with await WorkflowEnvironment.start_time_skipping() as env:
        # Set up Nexus service and workflow workers
        # Test both success and failure cases
        pass

Test Utilities and Patterns

Common Test Setup

Reusable patterns for test environment setup:

from contextlib import asynccontextmanager
from typing import AsyncGenerator

@asynccontextmanager
async def test_environment() -> AsyncGenerator[WorkflowEnvironment, None]:
    """Standard test environment setup."""
    async with await WorkflowEnvironment.start_time_skipping() as env:
        yield env

@asynccontextmanager
async def workflow_worker(
    env: WorkflowEnvironment,
    workflows: list,
    activities: list = None,
    task_queue: str = "test-queue"
) -> AsyncGenerator[Worker, None]:
    """Standard worker setup for tests."""
    async with Worker(
        env.client,
        task_queue=task_queue,
        workflows=workflows,
        activities=activities or []
    ) as worker:
        yield worker

# Usage in tests
async def test_with_standard_setup():
    async with test_environment() as env:
        async with workflow_worker(env, [MyWorkflow]) as worker:
            result = await env.client.execute_workflow(
                MyWorkflow.run,
                id="test",
                task_queue="test-queue"
            )
            assert result is not None

Assertion Patterns

Common patterns for asserting workflow and activity behavior:

def assert_workflow_completed_successfully(handle):
    """Assert workflow completed without failure."""
    try:
        result = await handle.result()
        return result
    except WorkflowFailureError:
        pytest.fail("Workflow failed unexpectedly")

def assert_workflow_failed_with(handle, error_type):
    """Assert workflow failed with specific error type."""
    with pytest.raises(WorkflowFailureError) as exc_info:
        await handle.result()

    assert isinstance(exc_info.value.cause, error_type)
    return exc_info.value.cause

def assert_activity_heartbeats(env, expected_heartbeats):
    """Assert activity sent expected heartbeats."""
    heartbeats = []
    env.on_heartbeat = lambda *args: heartbeats.append(args[0])
    # Run activity...
    assert heartbeats == expected_heartbeats

Test Data Management

Utilities for managing test data and state:

import uuid
from dataclasses import dataclass

@dataclass
class TestWorkflowIds:
    """Generate unique workflow IDs for tests."""
    prefix: str = "test"

    def generate(self, suffix: str = None) -> str:
        base = f"{self.prefix}-{uuid.uuid4()}"
        return f"{base}-{suffix}" if suffix else base

class TestDataBuilder:
    """Builder pattern for test data."""

    def __init__(self):
        self.reset()

    def reset(self):
        self.data = {}
        return self

    def with_field(self, key: str, value):
        self.data[key] = value
        return self

    def build(self):
        return self.data.copy()

# Usage
ids = TestWorkflowIds("integration")
workflow_id = ids.generate("signal-test")

test_data = (TestDataBuilder()
    .with_field("name", "test-user")
    .with_field("value", 42)
    .build())

Integration Testing

End-to-End Testing

Test complete workflows with all components:

@workflow.defn
class OrderProcessingWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> dict:
        # Validate order
        await workflow.execute_activity(
            validate_order,
            order_id,
            schedule_to_close_timeout=timedelta(minutes=5)
        )

        # Process payment
        payment_result = await workflow.execute_activity(
            process_payment,
            order_id,
            schedule_to_close_timeout=timedelta(minutes=10)
        )

        # Ship order
        shipping_result = await workflow.execute_activity(
            ship_order,
            order_id,
            schedule_to_close_timeout=timedelta(days=1)
        )

        return {
            "order_id": order_id,
            "payment": payment_result,
            "shipping": shipping_result,
            "status": "completed"
        }

@activity.defn
async def validate_order(order_id: str) -> bool:
    # Mock validation logic
    activity.heartbeat(f"Validating order {order_id}")
    await asyncio.sleep(1)
    return True

@activity.defn
async def process_payment(order_id: str) -> str:
    activity.heartbeat(f"Processing payment for {order_id}")
    await asyncio.sleep(2)
    return f"payment-{order_id}"

@activity.defn
async def ship_order(order_id: str) -> str:
    activity.heartbeat(f"Shipping order {order_id}")
    await asyncio.sleep(5)  # Will be skipped in time-skipping tests
    return f"tracking-{order_id}"

async def test_order_processing_e2e():
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="orders",
            workflows=[OrderProcessingWorkflow],
            activities=[validate_order, process_payment, ship_order]
        ) as worker:
            result = await env.client.execute_workflow(
                OrderProcessingWorkflow.run,
                "order-123",
                id="order-processing-test",
                task_queue="orders"
            )

            assert result["order_id"] == "order-123"
            assert result["status"] == "completed"
            assert "payment-order-123" in result["payment"]
            assert "tracking-order-123" in result["shipping"]

Testing with External Dependencies

Test workflows that interact with external services:

from unittest.mock import AsyncMock, patch

@activity.defn
async def call_external_api(url: str, data: dict) -> dict:
    # In real implementation, this would make HTTP calls
    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.post(url, json=data)
        return response.json()

async def test_workflow_with_external_deps():
    """Test workflow with mocked external dependencies."""

    # Mock the external HTTP call
    mock_response = {"status": "success", "id": "ext-123"}

    with patch("httpx.AsyncClient.post") as mock_post:
        mock_post.return_value.json.return_value = mock_response

        async with await WorkflowEnvironment.start_time_skipping() as env:
            async with Worker(
                env.client,
                task_queue="test-queue",
                workflows=[MyWorkflow],
                activities=[call_external_api]
            ) as worker:
                result = await env.client.execute_workflow(
                    MyWorkflow.run,
                    id="external-test",
                    task_queue="test-queue"
                )

                # Verify mock was called
                mock_post.assert_called_once()
                assert result is not None

Multi-Worker Testing

Test scenarios with multiple workers and task queues:

async def test_multi_worker_scenario():
    """Test workflow spanning multiple workers and task queues."""

    async with await WorkflowEnvironment.start_time_skipping() as env:
        # Worker for main workflows
        async with Worker(
            env.client,
            task_queue="main-queue",
            workflows=[CoordinatorWorkflow]
        ) as main_worker:
            # Worker for processing activities
            async with Worker(
                env.client,
                task_queue="processing-queue",
                activities=[process_item, validate_item]
            ) as processing_worker:
                # Worker for notification activities
                async with Worker(
                    env.client,
                    task_queue="notification-queue",
                    activities=[send_notification]
                ) as notification_worker:

                    result = await env.client.execute_workflow(
                        CoordinatorWorkflow.run,
                        ["item1", "item2", "item3"],
                        id="multi-worker-test",
                        task_queue="main-queue"
                    )

                    assert result["processed_count"] == 3
                    assert result["notifications_sent"] == 3

Testing Configuration

Custom Test Environments

Configure test environments for specific scenarios:

async def create_test_env_with_custom_config():
    """Create test environment with custom configuration."""

    # Custom data converter for testing
    custom_converter = DataConverter(
        payload_converter_class=JSONPlainPayloadConverter,
        failure_converter_class=DefaultFailureConverter
    )

    # Custom interceptors for testing
    test_interceptors = [
        LoggingInterceptor(),
        MetricsInterceptor()
    ]

    return await WorkflowEnvironment.start_time_skipping(
        data_converter=custom_converter,
        interceptors=test_interceptors,
        runtime=Runtime(telemetry=TelemetryConfig(
            logging=LoggingConfig(level="DEBUG")
        ))
    )

Test Isolation Patterns

Ensure test isolation and cleanup:

class TestIsolation:
    """Helper for test isolation."""

    def __init__(self):
        self.created_workflows = []
        self.test_data = {}

    def generate_workflow_id(self, prefix: str) -> str:
        workflow_id = f"{prefix}-{uuid.uuid4()}"
        self.created_workflows.append(workflow_id)
        return workflow_id

    async def cleanup(self, client: Client):
        """Clean up test resources."""
        for workflow_id in self.created_workflows:
            try:
                handle = client.get_workflow_handle(workflow_id)
                await handle.terminate("test cleanup")
            except:
                pass  # Ignore cleanup errors

# Usage in tests
@pytest.fixture
async def test_isolation():
    isolation = TestIsolation()
    yield isolation
    # Cleanup happens in test teardown

async def test_with_isolation(test_isolation):
    async with test_environment() as env:
        workflow_id = test_isolation.generate_workflow_id("test")

        # Run test...

        await test_isolation.cleanup(env.client)

Performance Testing Considerations

Basic patterns for performance testing:

import time
from statistics import mean, stdev

async def test_workflow_performance():
    """Basic performance test for workflow execution."""

    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="perf-test",
            workflows=[FastWorkflow],
            max_concurrent_workflows=100
        ) as worker:

            # Warm up
            await env.client.execute_workflow(
                FastWorkflow.run,
                id="warmup",
                task_queue="perf-test"
            )

            # Performance test
            execution_times = []
            for i in range(10):
                start_time = time.monotonic()

                await env.client.execute_workflow(
                    FastWorkflow.run,
                    id=f"perf-{i}",
                    task_queue="perf-test"
                )

                execution_times.append(time.monotonic() - start_time)

            # Basic performance assertions
            avg_time = mean(execution_times)
            std_dev = stdev(execution_times)

            assert avg_time < 1.0, f"Average execution time too high: {avg_time}"
            assert std_dev < 0.5, f"Execution time variance too high: {std_dev}"

This comprehensive testing documentation covers all the major aspects of testing Temporal workflows and activities using the temporalio.testing module, providing both basic examples and advanced patterns for thorough testing of distributed workflow applications.

Install with Tessl CLI

npx tessl i tessl/pypi-temporalio

docs

activity.md

client.md

common.md

contrib-pydantic.md

data-conversion.md

exceptions.md

index.md

runtime.md

testing.md

worker.md

workflow.md

tile.json