Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.
The temporalio.testing module provides comprehensive testing environments and utilities for testing Temporal workflows and activities. It includes specialized environments for both activity and workflow testing, with support for time manipulation, mocking, and comprehensive integration testing patterns.
from temporalio.testing import ActivityEnvironment, WorkflowEnvironment
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import WorkerFor advanced testing patterns:
from temporalio.testing import ActivityEnvironment, WorkflowEnvironment
from temporalio.common import RetryPolicy, SearchAttributeKey
from temporalio.exceptions import ActivityError, ApplicationError, CancelledError
from datetime import datetime, timedeltaThe ActivityEnvironment class provides an isolated environment for testing activity functions with complete control over the activity context and lifecycle.
class ActivityEnvironment:
"""Activity environment for testing activities.
This environment is used for running activity code that can access the
functions in the :py:mod:`temporalio.activity` module. Use :py:meth:`run` to
run an activity function or any function within an activity context.
Attributes:
info: The info that is returned from :py:func:`temporalio.activity.info`
function.
on_heartbeat: Function called on each heartbeat invocation by the
activity.
payload_converter: Payload converter set on the activity context. This
must be set before :py:meth:`run`. Changes after the activity has
started do not take effect.
metric_meter: Metric meter set on the activity context. This must be set
before :py:meth:`run`. Changes after the activity has started do not
take effect. Default is noop.
"""
def __init__(self, client: Optional[Client] = None) -> None:
"""Create an ActivityEnvironment for running activity code.
Args:
client: Optional client to make available in activity context.
Only available for async activities.
"""
def cancel(
self,
cancellation_details: activity.ActivityCancellationDetails = activity.ActivityCancellationDetails(
cancel_requested=True
),
) -> None:
"""Cancel the activity.
Args:
cancellation_details: Details about the cancellation. These will
be accessible through temporalio.activity.cancellation_details()
in the activity after cancellation.
This only has an effect on the first call.
"""
def worker_shutdown(self) -> None:
"""Notify the activity that the worker is shutting down.
This only has an effect on the first call.
"""
def run(
self,
fn: Callable[_Params, _Return],
*args: _Params.args,
**kwargs: _Params.kwargs,
) -> _Return:
"""Run the given callable in an activity context.
Args:
fn: The function/callable to run.
args: All positional arguments to the callable.
kwargs: All keyword arguments to the callable.
Returns:
The callable's result.
"""The WorkflowEnvironment class provides environments for testing workflows with different capabilities including time skipping and full server integration.
class WorkflowEnvironment:
"""Workflow environment for testing workflows.
Most developers will want to use the static :py:meth:`start_time_skipping`
to start a test server process that automatically skips time as needed.
Alternatively, :py:meth:`start_local` may be used for a full, local Temporal
server with more features. To use an existing server, use
:py:meth:`from_client`.
This environment is an async context manager, so it can be used with
``async with`` to make sure it shuts down properly. Otherwise,
:py:meth:`shutdown` can be manually called.
To use the environment, simply use the :py:attr:`client` on it.
Workflows invoked on the workflow environment are automatically configured
to have ``assert`` failures fail the workflow with the assertion error.
"""
@staticmethod
def from_client(client: Client) -> WorkflowEnvironment:
"""Create a workflow environment from the given client.
:py:attr:`supports_time_skipping` will always return ``False`` for this
environment. :py:meth:`sleep` will sleep the actual amount of time and
:py:meth:`get_current_time` will return the current time.
Args:
client: The client to use for the environment.
Returns:
The workflow environment that runs against the given client.
"""
@staticmethod
async def start_local(
*,
namespace: str = "default",
data_converter: converter.DataConverter = converter.DataConverter.default,
interceptors: Sequence[client.Interceptor] = [],
plugins: Sequence[client.Plugin] = [],
default_workflow_query_reject_condition: Optional[
common.QueryRejectCondition
] = None,
retry_config: Optional[client.RetryConfig] = None,
rpc_metadata: Mapping[str, str] = {},
identity: Optional[str] = None,
tls: bool | client.TLSConfig = False,
ip: str = "127.0.0.1",
port: Optional[int] = None,
download_dest_dir: Optional[str] = None,
ui: bool = False,
runtime: Optional[runtime.Runtime] = None,
search_attributes: Sequence[common.SearchAttributeKey] = (),
dev_server_existing_path: Optional[str] = None,
dev_server_database_filename: Optional[str] = None,
dev_server_log_format: str = "pretty",
dev_server_log_level: Optional[str] = "warn",
dev_server_download_version: str = "default",
dev_server_extra_args: Sequence[str] = [],
dev_server_download_ttl: Optional[timedelta] = None,
) -> WorkflowEnvironment:
"""Start a full Temporal server locally, downloading if necessary.
This environment is good for testing full server capabilities, but does
not support time skipping like :py:meth:`start_time_skipping` does.
:py:attr:`supports_time_skipping` will always return ``False`` for this
environment. :py:meth:`sleep` will sleep the actual amount of time and
:py:meth:`get_current_time` will return the current time.
Args:
namespace: Namespace name to use for this environment.
data_converter: Data converter for serialization.
interceptors: Client interceptors to apply.
plugins: Client plugins to apply.
default_workflow_query_reject_condition: Default query reject condition.
retry_config: Retry configuration for client calls.
rpc_metadata: Additional RPC metadata.
identity: Client identity.
tls: TLS configuration.
ip: IP address to bind to, or 127.0.0.1 by default.
port: Port number to bind to, or an OS-provided port by default.
download_dest_dir: Directory to download binary to if needed.
ui: If ``True``, will start a UI in the dev server.
runtime: Specific runtime to use or default if unset.
search_attributes: Search attributes to register with the dev server.
dev_server_existing_path: Existing path to the CLI binary.
dev_server_database_filename: Path to the Sqlite database to use.
dev_server_log_format: Log format for the dev server.
dev_server_log_level: Log level for the dev server.
dev_server_download_version: Specific CLI version to download.
dev_server_extra_args: Extra arguments for the CLI binary.
dev_server_download_ttl: TTL for the downloaded CLI binary.
Returns:
The started CLI dev server workflow environment.
"""
@staticmethod
async def start_time_skipping(
*,
data_converter: converter.DataConverter = converter.DataConverter.default,
interceptors: Sequence[client.Interceptor] = [],
plugins: Sequence[client.Plugin] = [],
default_workflow_query_reject_condition: Optional[
common.QueryRejectCondition
] = None,
retry_config: Optional[client.RetryConfig] = None,
rpc_metadata: Mapping[str, str] = {},
identity: Optional[str] = None,
port: Optional[int] = None,
download_dest_dir: Optional[str] = None,
runtime: Optional[runtime.Runtime] = None,
test_server_existing_path: Optional[str] = None,
test_server_download_version: str = "default",
test_server_extra_args: Sequence[str] = [],
test_server_download_ttl: Optional[timedelta] = None,
) -> WorkflowEnvironment:
"""Start a time skipping workflow environment.
By default, this environment will automatically skip to the next events
in time when a workflow's
:py:meth:`temporalio.client.WorkflowHandle.result` is awaited on (which
includes :py:meth:`temporalio.client.Client.execute_workflow`). Before
the result is awaited on, time can be manually skipped forward using
:py:meth:`sleep`. The currently known time can be obtained via
:py:meth:`get_current_time`.
Args:
data_converter: Data converter for serialization.
interceptors: Client interceptors to apply.
plugins: Client plugins to apply.
default_workflow_query_reject_condition: Default query reject condition.
retry_config: Retry configuration for client calls.
rpc_metadata: Additional RPC metadata.
identity: Client identity.
port: Port number to bind to, or an OS-provided port by default.
download_dest_dir: Directory to download binary to if needed.
runtime: Specific runtime to use or default if unset.
test_server_existing_path: Existing path to the test server binary.
test_server_download_version: Specific test server version to download.
test_server_extra_args: Extra arguments for the test server binary.
test_server_download_ttl: TTL for the downloaded test server binary.
Returns:
The started workflow environment with time skipping.
"""
@property
def client(self) -> Client:
"""Client to this environment."""
async def shutdown(self) -> None:
"""Shut down this environment."""
async def sleep(self, duration: Union[timedelta, float]) -> None:
"""Sleep in this environment.
This awaits a regular :py:func:`asyncio.sleep` in regular environments,
or manually skips time in time-skipping environments.
Args:
duration: Amount of time to sleep.
"""
async def get_current_time(self) -> datetime:
"""Get the current time known to this environment.
For non-time-skipping environments this is simply the system time. For
time-skipping environments this is whatever time has been skipped to.
"""
@property
def supports_time_skipping(self) -> bool:
"""Whether this environment supports time skipping."""
@contextmanager
def auto_time_skipping_disabled(self) -> Iterator[None]:
"""Disable any automatic time skipping if this is a time-skipping
environment.
This is a context manager for use via ``with``. Usually in time-skipping
environments, waiting on a workflow result causes time to automatically
skip until the next event. This can disable that. However, this only
applies to results awaited inside this context. This will not disable
automatic time skipping on previous results.
This has no effect on non-time-skipping environments.
"""Test activities in isolation using ActivityEnvironment:
import asyncio
from temporalio import activity
from temporalio.testing import ActivityEnvironment
@activity.defn
async def process_data(data: str) -> str:
activity.heartbeat(f"Processing: {data}")
await asyncio.sleep(1) # Simulate work
return f"Processed: {data}"
async def test_process_data():
# Create test environment
env = ActivityEnvironment()
# Track heartbeats
heartbeats = []
env.on_heartbeat = lambda *args: heartbeats.append(args[0])
# Run activity
result = await env.run(process_data, "test-data")
assert result == "Processed: test-data"
assert heartbeats == ["Processing: test-data"]Test how activities handle cancellation:
from temporalio import activity
from temporalio.testing import ActivityEnvironment
from temporalio.exceptions import CancelledError
@activity.defn
async def cancellable_activity() -> str:
try:
# Wait indefinitely
await asyncio.Future()
return "completed"
except asyncio.CancelledError:
# Check cancellation details
cancellation_details = activity.cancellation_details()
if cancellation_details and cancellation_details.cancel_requested:
activity.heartbeat("cancelled gracefully")
raise
return "unexpected cancellation"
async def test_activity_cancellation():
env = ActivityEnvironment()
heartbeats = []
env.on_heartbeat = lambda *args: heartbeats.append(args[0])
# Start activity
task = asyncio.create_task(env.run(cancellable_activity))
# Give it time to start
await asyncio.sleep(0.1)
# Cancel with details
env.cancel(
activity.ActivityCancellationDetails(cancel_requested=True)
)
# Verify cancellation handled properly
result = await task
assert heartbeats == ["cancelled gracefully"]Test activities that use activity context functions:
from temporalio import activity
from temporalio.testing import ActivityEnvironment
from temporalio.common import MetricMeter
from temporalio.converter import DataConverter
@activity.defn
async def context_aware_activity() -> dict:
info = activity.info()
converter = activity.payload_converter()
meter = activity.metric_meter()
return {
"activity_id": info.activity_id,
"workflow_id": info.workflow_id,
"has_converter": converter is not None,
"has_meter": meter is not None
}
async def test_activity_context():
env = ActivityEnvironment()
# Configure custom context
env.info = activity.Info(
activity_id="test-activity",
activity_type="context_aware_activity",
workflow_id="test-workflow",
# ... other required fields
)
env.payload_converter = DataConverter.default.payload_converter
env.metric_meter = MetricMeter.noop
result = await env.run(context_aware_activity)
assert result["activity_id"] == "test-activity"
assert result["workflow_id"] == "test-workflow"
assert result["has_converter"] is True
assert result["has_meter"] is TrueTest synchronous activities with thread-based cancellation:
import threading
import time
from temporalio import activity
from temporalio.testing import ActivityEnvironment
from temporalio.exceptions import CancelledError
@activity.defn
def sync_activity(data: str) -> str:
activity.heartbeat(f"Starting: {data}")
# Simulate work with cancellation checks
for i in range(10):
if activity.is_cancelled():
activity.heartbeat("Cancelled during processing")
raise CancelledError()
time.sleep(0.5)
activity.heartbeat(f"Step {i}")
return f"Done: {data}"
def test_sync_activity_cancellation():
env = ActivityEnvironment()
heartbeats = []
env.on_heartbeat = lambda *args: heartbeats.append(args[0])
# Use thread cancellation for sync activities
waiting = threading.Event()
result_holder = {}
def run_activity():
try:
result_holder["result"] = env.run(sync_activity, "test")
except CancelledError:
result_holder["cancelled"] = True
thread = threading.Thread(target=run_activity)
thread.start()
# Wait a bit then cancel
time.sleep(1)
env.cancel()
thread.join()
assert "cancelled" in result_holder
assert "Starting: test" in heartbeatsTest activities that need access to the Temporal client:
from temporalio import activity
from temporalio.testing import ActivityEnvironment
from temporalio.client import Client
from unittest.mock import Mock
@activity.defn
async def client_activity() -> str:
client = activity.client()
# Use client for additional operations
return f"Client namespace: {client.namespace}"
async def test_activity_with_client():
# Create mock client
mock_client = Mock(spec=Client)
mock_client.namespace = "test-namespace"
env = ActivityEnvironment(client=mock_client)
result = await env.run(client_activity)
assert result == "Client namespace: test-namespace"
async def test_activity_without_client():
env = ActivityEnvironment() # No client provided
with pytest.raises(RuntimeError, match="No client available"):
await env.run(client_activity)Test workflows with automatic time advancement:
from temporalio import workflow
from temporalio.testing import WorkflowEnvironment
from datetime import timedelta
import asyncio
@workflow.defn
class TimerWorkflow:
@workflow.run
async def run(self, duration_seconds: int) -> str:
await asyncio.sleep(duration_seconds)
return f"Slept for {duration_seconds} seconds"
async def test_timer_workflow():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="test-queue",
workflows=[TimerWorkflow]
) as worker:
# Execute workflow - time will automatically skip
result = await env.client.execute_workflow(
TimerWorkflow.run,
100, # 100 seconds
id="timer-test",
task_queue="test-queue"
)
assert result == "Slept for 100 seconds"
# Verify time has advanced
current_time = await env.get_current_time()
# Time should be approximately 100 seconds laterTest workflows with manual time manipulation:
@workflow.defn
class ScheduledWorkflow:
@workflow.run
async def run(self) -> list[str]:
results = []
for i in range(3):
await asyncio.sleep(30) # 30 seconds
results.append(f"Step {i} at {workflow.now()}")
return results
async def test_scheduled_workflow():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="test-queue",
workflows=[ScheduledWorkflow]
) as worker:
# Start workflow
handle = await env.client.start_workflow(
ScheduledWorkflow.run,
id="scheduled-test",
task_queue="test-queue"
)
# Manually advance time and check progress
await env.sleep(30) # Skip 30 seconds
# Workflow should still be running
try:
result = await asyncio.wait_for(handle.result(), timeout=0.1)
assert False, "Workflow completed too early"
except asyncio.TimeoutError:
pass # Expected
# Skip more time to complete
await env.sleep(70) # Skip total 100 seconds (covers remaining 60s)
result = await handle.result()
assert len(result) == 3Test workflow signal and query handling:
@workflow.defn
class SignalWorkflow:
def __init__(self):
self.messages = []
self.completed = False
@workflow.run
async def run(self) -> list[str]:
# Wait for completion signal
await workflow.wait_condition(lambda: self.completed)
return self.messages
@workflow.signal
def add_message(self, message: str) -> None:
self.messages.append(message)
@workflow.signal
def complete(self) -> None:
self.completed = True
@workflow.query
def get_message_count(self) -> int:
return len(self.messages)
async def test_signal_workflow():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="test-queue",
workflows=[SignalWorkflow]
) as worker:
# Start workflow
handle = await env.client.start_workflow(
SignalWorkflow.run,
id="signal-test",
task_queue="test-queue"
)
# Send signals
await handle.signal(SignalWorkflow.add_message, "Hello")
await handle.signal(SignalWorkflow.add_message, "World")
# Query current state
count = await handle.query(SignalWorkflow.get_message_count)
assert count == 2
# Complete workflow
await handle.signal(SignalWorkflow.complete)
result = await handle.result()
assert result == ["Hello", "World"]Test workflows that spawn child workflows:
@workflow.defn
class ChildWorkflow:
@workflow.run
async def run(self, value: int) -> int:
await asyncio.sleep(1)
return value * 2
@workflow.defn
class ParentWorkflow:
@workflow.run
async def run(self, values: list[int]) -> list[int]:
# Start child workflows
child_handles = []
for value in values:
handle = await workflow.start_child_workflow(
ChildWorkflow.run,
value,
id=f"child-{value}"
)
child_handles.append(handle)
# Collect results
results = []
for handle in child_handles:
result = await handle
results.append(result)
return results
async def test_child_workflows():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="test-queue",
workflows=[ParentWorkflow, ChildWorkflow]
) as worker:
result = await env.client.execute_workflow(
ParentWorkflow.run,
[1, 2, 3, 4],
id="parent-test",
task_queue="test-queue"
)
assert result == [2, 4, 6, 8]Test workflow update handlers:
@workflow.defn
class UpdateableWorkflow:
def __init__(self):
self.counter = 0
self.running = True
@workflow.run
async def run(self) -> int:
await workflow.wait_condition(lambda: not self.running)
return self.counter
@workflow.update
def increment(self, amount: int) -> int:
self.counter += amount
return self.counter
@workflow.update
def stop(self) -> None:
self.running = False
async def test_workflow_updates():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="test-queue",
workflows=[UpdateableWorkflow]
) as worker:
handle = await env.client.start_workflow(
UpdateableWorkflow.run,
id="update-test",
task_queue="test-queue"
)
# Send updates
result1 = await handle.execute_update(UpdateableWorkflow.increment, 5)
assert result1 == 5
result2 = await handle.execute_update(UpdateableWorkflow.increment, 3)
assert result2 == 8
# Stop workflow
await handle.execute_update(UpdateableWorkflow.stop)
final_result = await handle.result()
assert final_result == 8Control time advancement for deterministic testing:
@workflow.defn
class TimeBasedWorkflow:
@workflow.run
async def run(self) -> list[str]:
events = []
# Record initial time
start_time = workflow.now()
events.append(f"Started at {start_time}")
# Wait various durations
await asyncio.sleep(60) # 1 minute
events.append(f"After 1 minute: {workflow.now()}")
await asyncio.sleep(3600) # 1 hour
events.append(f"After 1 hour: {workflow.now()}")
return events
async def test_time_advancement():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="test-queue",
workflows=[TimeBasedWorkflow]
) as worker:
# Record initial time
initial_time = await env.get_current_time()
# Execute workflow (time auto-advances)
result = await env.client.execute_workflow(
TimeBasedWorkflow.run,
id="time-test",
task_queue="test-queue"
)
# Verify time advancement
final_time = await env.get_current_time()
elapsed = final_time - initial_time
# Should be approximately 1 hour and 1 minute
assert abs(elapsed.total_seconds() - 3660) < 60Test workflows with real-time progression when needed:
async def test_real_time_workflow():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="test-queue",
workflows=[TimeBasedWorkflow]
) as worker:
with env.auto_time_skipping_disabled():
start_real_time = monotonic()
# This will take real time
await env.client.execute_workflow(
TimeBasedWorkflow.run,
id="real-time-test",
task_queue="test-queue"
)
elapsed_real_time = monotonic() - start_real_time
# Should have taken actual time (limited by workflow logic)Precisely control time advancement for complex scenarios:
@workflow.defn
class ComplexTimingWorkflow:
@workflow.run
async def run(self) -> dict:
results = {}
# Phase 1: Quick operations
for i in range(5):
await asyncio.sleep(10)
results[f"quick_{i}"] = workflow.now()
# Phase 2: Long operation
await asyncio.sleep(300) # 5 minutes
results["long_operation"] = workflow.now()
return results
async def test_complex_timing():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="test-queue",
workflows=[ComplexTimingWorkflow]
) as worker:
# Start workflow without auto-completion
handle = await env.client.start_workflow(
ComplexTimingWorkflow.run,
id="complex-timing",
task_queue="test-queue"
)
# Manually advance through quick phase
for i in range(5):
await env.sleep(10)
# Could check intermediate state here
# Advance through long operation
await env.sleep(300)
result = await handle.result()
# Verify timing progression
assert len(result) == 6
assert "long_operation" in resultTest Nexus service operations and handlers:
from temporalio import nexus
from temporalio.testing import WorkflowEnvironment
from temporalio.client import Client
# Define Nexus service
@nexus.service.defn
class CalculatorService:
@nexus.operation.defn
async def add(self, a: int, b: int) -> int:
return a + b
@nexus.operation.defn
async def multiply(self, a: int, b: int) -> int:
await asyncio.sleep(1) # Simulate work
return a * b
# Workflow that uses Nexus
@workflow.defn
class NexusWorkflow:
@workflow.run
async def run(self, x: int, y: int) -> dict:
# Create Nexus client
nexus_client = workflow.create_nexus_client(
"calculator-service",
CalculatorService
)
# Call operations
sum_result = await nexus_client.add(x, y)
product_result = await nexus_client.multiply(x, y)
return {
"sum": sum_result,
"product": product_result
}
async def test_nexus_operations():
async with await WorkflowEnvironment.start_time_skipping() as env:
# Create Nexus endpoint (test helper)
endpoint = await create_test_nexus_endpoint(
env.client,
"calculator-service",
"test-nexus-queue"
)
async with Worker(
env.client,
task_queue="test-queue",
workflows=[NexusWorkflow]
) as workflow_worker:
async with Worker(
env.client,
task_queue="test-nexus-queue",
nexus_services=[CalculatorService()]
) as nexus_worker:
result = await env.client.execute_workflow(
NexusWorkflow.run,
5, 3,
id="nexus-test",
task_queue="test-queue"
)
assert result["sum"] == 8
assert result["product"] == 15Test workflows with mocked Nexus dependencies:
from unittest.mock import AsyncMock
class MockCalculatorService:
def __init__(self):
self.add = AsyncMock(return_value=100)
self.multiply = AsyncMock(return_value=200)
async def test_nexus_with_mocks():
# This would require custom interceptors or test utilities
# to inject mock services into the Nexus client resolution
pass # Implementation depends on specific mocking strategyTest how workflows handle Nexus operation failures:
@nexus.service.defn
class FailingService:
@nexus.operation.defn
async def unreliable_op(self, fail: bool) -> str:
if fail:
raise ApplicationError("Operation failed", type="ServiceError")
return "success"
@workflow.defn
class ErrorHandlingWorkflow:
@workflow.run
async def run(self, should_fail: bool) -> str:
nexus_client = workflow.create_nexus_client(
"failing-service",
FailingService
)
try:
result = await nexus_client.unreliable_op(should_fail)
return f"Success: {result}"
except NexusOperationError as e:
return f"Error: {e}"
async def test_nexus_error_handling():
async with await WorkflowEnvironment.start_time_skipping() as env:
# Set up Nexus service and workflow workers
# Test both success and failure cases
passReusable patterns for test environment setup:
from contextlib import asynccontextmanager
from typing import AsyncGenerator
@asynccontextmanager
async def test_environment() -> AsyncGenerator[WorkflowEnvironment, None]:
"""Standard test environment setup."""
async with await WorkflowEnvironment.start_time_skipping() as env:
yield env
@asynccontextmanager
async def workflow_worker(
env: WorkflowEnvironment,
workflows: list,
activities: list = None,
task_queue: str = "test-queue"
) -> AsyncGenerator[Worker, None]:
"""Standard worker setup for tests."""
async with Worker(
env.client,
task_queue=task_queue,
workflows=workflows,
activities=activities or []
) as worker:
yield worker
# Usage in tests
async def test_with_standard_setup():
async with test_environment() as env:
async with workflow_worker(env, [MyWorkflow]) as worker:
result = await env.client.execute_workflow(
MyWorkflow.run,
id="test",
task_queue="test-queue"
)
assert result is not NoneCommon patterns for asserting workflow and activity behavior:
def assert_workflow_completed_successfully(handle):
"""Assert workflow completed without failure."""
try:
result = await handle.result()
return result
except WorkflowFailureError:
pytest.fail("Workflow failed unexpectedly")
def assert_workflow_failed_with(handle, error_type):
"""Assert workflow failed with specific error type."""
with pytest.raises(WorkflowFailureError) as exc_info:
await handle.result()
assert isinstance(exc_info.value.cause, error_type)
return exc_info.value.cause
def assert_activity_heartbeats(env, expected_heartbeats):
"""Assert activity sent expected heartbeats."""
heartbeats = []
env.on_heartbeat = lambda *args: heartbeats.append(args[0])
# Run activity...
assert heartbeats == expected_heartbeatsUtilities for managing test data and state:
import uuid
from dataclasses import dataclass
@dataclass
class TestWorkflowIds:
"""Generate unique workflow IDs for tests."""
prefix: str = "test"
def generate(self, suffix: str = None) -> str:
base = f"{self.prefix}-{uuid.uuid4()}"
return f"{base}-{suffix}" if suffix else base
class TestDataBuilder:
"""Builder pattern for test data."""
def __init__(self):
self.reset()
def reset(self):
self.data = {}
return self
def with_field(self, key: str, value):
self.data[key] = value
return self
def build(self):
return self.data.copy()
# Usage
ids = TestWorkflowIds("integration")
workflow_id = ids.generate("signal-test")
test_data = (TestDataBuilder()
.with_field("name", "test-user")
.with_field("value", 42)
.build())Test complete workflows with all components:
@workflow.defn
class OrderProcessingWorkflow:
@workflow.run
async def run(self, order_id: str) -> dict:
# Validate order
await workflow.execute_activity(
validate_order,
order_id,
schedule_to_close_timeout=timedelta(minutes=5)
)
# Process payment
payment_result = await workflow.execute_activity(
process_payment,
order_id,
schedule_to_close_timeout=timedelta(minutes=10)
)
# Ship order
shipping_result = await workflow.execute_activity(
ship_order,
order_id,
schedule_to_close_timeout=timedelta(days=1)
)
return {
"order_id": order_id,
"payment": payment_result,
"shipping": shipping_result,
"status": "completed"
}
@activity.defn
async def validate_order(order_id: str) -> bool:
# Mock validation logic
activity.heartbeat(f"Validating order {order_id}")
await asyncio.sleep(1)
return True
@activity.defn
async def process_payment(order_id: str) -> str:
activity.heartbeat(f"Processing payment for {order_id}")
await asyncio.sleep(2)
return f"payment-{order_id}"
@activity.defn
async def ship_order(order_id: str) -> str:
activity.heartbeat(f"Shipping order {order_id}")
await asyncio.sleep(5) # Will be skipped in time-skipping tests
return f"tracking-{order_id}"
async def test_order_processing_e2e():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="orders",
workflows=[OrderProcessingWorkflow],
activities=[validate_order, process_payment, ship_order]
) as worker:
result = await env.client.execute_workflow(
OrderProcessingWorkflow.run,
"order-123",
id="order-processing-test",
task_queue="orders"
)
assert result["order_id"] == "order-123"
assert result["status"] == "completed"
assert "payment-order-123" in result["payment"]
assert "tracking-order-123" in result["shipping"]Test workflows that interact with external services:
from unittest.mock import AsyncMock, patch
@activity.defn
async def call_external_api(url: str, data: dict) -> dict:
# In real implementation, this would make HTTP calls
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
return response.json()
async def test_workflow_with_external_deps():
"""Test workflow with mocked external dependencies."""
# Mock the external HTTP call
mock_response = {"status": "success", "id": "ext-123"}
with patch("httpx.AsyncClient.post") as mock_post:
mock_post.return_value.json.return_value = mock_response
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="test-queue",
workflows=[MyWorkflow],
activities=[call_external_api]
) as worker:
result = await env.client.execute_workflow(
MyWorkflow.run,
id="external-test",
task_queue="test-queue"
)
# Verify mock was called
mock_post.assert_called_once()
assert result is not NoneTest scenarios with multiple workers and task queues:
async def test_multi_worker_scenario():
"""Test workflow spanning multiple workers and task queues."""
async with await WorkflowEnvironment.start_time_skipping() as env:
# Worker for main workflows
async with Worker(
env.client,
task_queue="main-queue",
workflows=[CoordinatorWorkflow]
) as main_worker:
# Worker for processing activities
async with Worker(
env.client,
task_queue="processing-queue",
activities=[process_item, validate_item]
) as processing_worker:
# Worker for notification activities
async with Worker(
env.client,
task_queue="notification-queue",
activities=[send_notification]
) as notification_worker:
result = await env.client.execute_workflow(
CoordinatorWorkflow.run,
["item1", "item2", "item3"],
id="multi-worker-test",
task_queue="main-queue"
)
assert result["processed_count"] == 3
assert result["notifications_sent"] == 3Configure test environments for specific scenarios:
async def create_test_env_with_custom_config():
"""Create test environment with custom configuration."""
# Custom data converter for testing
custom_converter = DataConverter(
payload_converter_class=JSONPlainPayloadConverter,
failure_converter_class=DefaultFailureConverter
)
# Custom interceptors for testing
test_interceptors = [
LoggingInterceptor(),
MetricsInterceptor()
]
return await WorkflowEnvironment.start_time_skipping(
data_converter=custom_converter,
interceptors=test_interceptors,
runtime=Runtime(telemetry=TelemetryConfig(
logging=LoggingConfig(level="DEBUG")
))
)Ensure test isolation and cleanup:
class TestIsolation:
"""Helper for test isolation."""
def __init__(self):
self.created_workflows = []
self.test_data = {}
def generate_workflow_id(self, prefix: str) -> str:
workflow_id = f"{prefix}-{uuid.uuid4()}"
self.created_workflows.append(workflow_id)
return workflow_id
async def cleanup(self, client: Client):
"""Clean up test resources."""
for workflow_id in self.created_workflows:
try:
handle = client.get_workflow_handle(workflow_id)
await handle.terminate("test cleanup")
except:
pass # Ignore cleanup errors
# Usage in tests
@pytest.fixture
async def test_isolation():
isolation = TestIsolation()
yield isolation
# Cleanup happens in test teardown
async def test_with_isolation(test_isolation):
async with test_environment() as env:
workflow_id = test_isolation.generate_workflow_id("test")
# Run test...
await test_isolation.cleanup(env.client)Basic patterns for performance testing:
import time
from statistics import mean, stdev
async def test_workflow_performance():
"""Basic performance test for workflow execution."""
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="perf-test",
workflows=[FastWorkflow],
max_concurrent_workflows=100
) as worker:
# Warm up
await env.client.execute_workflow(
FastWorkflow.run,
id="warmup",
task_queue="perf-test"
)
# Performance test
execution_times = []
for i in range(10):
start_time = time.monotonic()
await env.client.execute_workflow(
FastWorkflow.run,
id=f"perf-{i}",
task_queue="perf-test"
)
execution_times.append(time.monotonic() - start_time)
# Basic performance assertions
avg_time = mean(execution_times)
std_dev = stdev(execution_times)
assert avg_time < 1.0, f"Average execution time too high: {avg_time}"
assert std_dev < 0.5, f"Execution time variance too high: {std_dev}"This comprehensive testing documentation covers all the major aspects of testing Temporal workflows and activities using the temporalio.testing module, providing both basic examples and advanced patterns for thorough testing of distributed workflow applications.
Install with Tessl CLI
npx tessl i tessl/pypi-temporalio