0
# Testing
1
2
The temporalio.testing module provides comprehensive testing environments and utilities for testing Temporal workflows and activities. It includes specialized environments for both activity and workflow testing, with support for time manipulation, mocking, and comprehensive integration testing patterns.
3
4
## Core Imports
5
6
```python
7
from temporalio.testing import ActivityEnvironment, WorkflowEnvironment
8
from temporalio import activity, workflow
9
from temporalio.client import Client
10
from temporalio.worker import Worker
11
```
12
13
For advanced testing patterns:
14
15
```python
16
from temporalio.testing import ActivityEnvironment, WorkflowEnvironment
17
from temporalio.common import RetryPolicy, SearchAttributeKey
18
from temporalio.exceptions import ActivityError, ApplicationError, CancelledError
19
from datetime import datetime, timedelta
20
```
21
22
## Testing Environments
23
24
### ActivityEnvironment
25
26
The `ActivityEnvironment` class provides an isolated environment for testing activity functions with complete control over the activity context and lifecycle.
27
28
```python { .api }
29
class ActivityEnvironment:
30
"""Activity environment for testing activities.
31
32
This environment is used for running activity code that can access the
33
functions in the :py:mod:`temporalio.activity` module. Use :py:meth:`run` to
34
run an activity function or any function within an activity context.
35
36
Attributes:
37
info: The info that is returned from :py:func:`temporalio.activity.info`
38
function.
39
on_heartbeat: Function called on each heartbeat invocation by the
40
activity.
41
payload_converter: Payload converter set on the activity context. This
42
must be set before :py:meth:`run`. Changes after the activity has
43
started do not take effect.
44
metric_meter: Metric meter set on the activity context. This must be set
45
before :py:meth:`run`. Changes after the activity has started do not
46
take effect. Default is noop.
47
"""
48
49
def __init__(self, client: Optional[Client] = None) -> None:
50
"""Create an ActivityEnvironment for running activity code.
51
52
Args:
53
client: Optional client to make available in activity context.
54
Only available for async activities.
55
"""
56
57
def cancel(
58
self,
59
cancellation_details: activity.ActivityCancellationDetails = activity.ActivityCancellationDetails(
60
cancel_requested=True
61
),
62
) -> None:
63
"""Cancel the activity.
64
65
Args:
66
cancellation_details: Details about the cancellation. These will
67
be accessible through temporalio.activity.cancellation_details()
68
in the activity after cancellation.
69
70
This only has an effect on the first call.
71
"""
72
73
def worker_shutdown(self) -> None:
74
"""Notify the activity that the worker is shutting down.
75
76
This only has an effect on the first call.
77
"""
78
79
def run(
80
self,
81
fn: Callable[_Params, _Return],
82
*args: _Params.args,
83
**kwargs: _Params.kwargs,
84
) -> _Return:
85
"""Run the given callable in an activity context.
86
87
Args:
88
fn: The function/callable to run.
89
args: All positional arguments to the callable.
90
kwargs: All keyword arguments to the callable.
91
92
Returns:
93
The callable's result.
94
"""
95
```
96
97
### WorkflowEnvironment
98
99
The `WorkflowEnvironment` class provides environments for testing workflows with different capabilities including time skipping and full server integration.
100
101
```python { .api }
102
class WorkflowEnvironment:
103
"""Workflow environment for testing workflows.
104
105
Most developers will want to use the static :py:meth:`start_time_skipping`
106
to start a test server process that automatically skips time as needed.
107
Alternatively, :py:meth:`start_local` may be used for a full, local Temporal
108
server with more features. To use an existing server, use
109
:py:meth:`from_client`.
110
111
This environment is an async context manager, so it can be used with
112
``async with`` to make sure it shuts down properly. Otherwise,
113
:py:meth:`shutdown` can be manually called.
114
115
To use the environment, simply use the :py:attr:`client` on it.
116
117
Workflows invoked on the workflow environment are automatically configured
118
to have ``assert`` failures fail the workflow with the assertion error.
119
"""
120
121
@staticmethod
122
def from_client(client: Client) -> WorkflowEnvironment:
123
"""Create a workflow environment from the given client.
124
125
:py:attr:`supports_time_skipping` will always return ``False`` for this
126
environment. :py:meth:`sleep` will sleep the actual amount of time and
127
:py:meth:`get_current_time` will return the current time.
128
129
Args:
130
client: The client to use for the environment.
131
132
Returns:
133
The workflow environment that runs against the given client.
134
"""
135
136
@staticmethod
137
async def start_local(
138
*,
139
namespace: str = "default",
140
data_converter: converter.DataConverter = converter.DataConverter.default,
141
interceptors: Sequence[client.Interceptor] = [],
142
plugins: Sequence[client.Plugin] = [],
143
default_workflow_query_reject_condition: Optional[
144
common.QueryRejectCondition
145
] = None,
146
retry_config: Optional[client.RetryConfig] = None,
147
rpc_metadata: Mapping[str, str] = {},
148
identity: Optional[str] = None,
149
tls: bool | client.TLSConfig = False,
150
ip: str = "127.0.0.1",
151
port: Optional[int] = None,
152
download_dest_dir: Optional[str] = None,
153
ui: bool = False,
154
runtime: Optional[runtime.Runtime] = None,
155
search_attributes: Sequence[common.SearchAttributeKey] = (),
156
dev_server_existing_path: Optional[str] = None,
157
dev_server_database_filename: Optional[str] = None,
158
dev_server_log_format: str = "pretty",
159
dev_server_log_level: Optional[str] = "warn",
160
dev_server_download_version: str = "default",
161
dev_server_extra_args: Sequence[str] = [],
162
dev_server_download_ttl: Optional[timedelta] = None,
163
) -> WorkflowEnvironment:
164
"""Start a full Temporal server locally, downloading if necessary.
165
166
This environment is good for testing full server capabilities, but does
167
not support time skipping like :py:meth:`start_time_skipping` does.
168
:py:attr:`supports_time_skipping` will always return ``False`` for this
169
environment. :py:meth:`sleep` will sleep the actual amount of time and
170
:py:meth:`get_current_time` will return the current time.
171
172
Args:
173
namespace: Namespace name to use for this environment.
174
data_converter: Data converter for serialization.
175
interceptors: Client interceptors to apply.
176
plugins: Client plugins to apply.
177
default_workflow_query_reject_condition: Default query reject condition.
178
retry_config: Retry configuration for client calls.
179
rpc_metadata: Additional RPC metadata.
180
identity: Client identity.
181
tls: TLS configuration.
182
ip: IP address to bind to, or 127.0.0.1 by default.
183
port: Port number to bind to, or an OS-provided port by default.
184
download_dest_dir: Directory to download binary to if needed.
185
ui: If ``True``, will start a UI in the dev server.
186
runtime: Specific runtime to use or default if unset.
187
search_attributes: Search attributes to register with the dev server.
188
dev_server_existing_path: Existing path to the CLI binary.
189
dev_server_database_filename: Path to the Sqlite database to use.
190
dev_server_log_format: Log format for the dev server.
191
dev_server_log_level: Log level for the dev server.
192
dev_server_download_version: Specific CLI version to download.
193
dev_server_extra_args: Extra arguments for the CLI binary.
194
dev_server_download_ttl: TTL for the downloaded CLI binary.
195
196
Returns:
197
The started CLI dev server workflow environment.
198
"""
199
200
@staticmethod
201
async def start_time_skipping(
202
*,
203
data_converter: converter.DataConverter = converter.DataConverter.default,
204
interceptors: Sequence[client.Interceptor] = [],
205
plugins: Sequence[client.Plugin] = [],
206
default_workflow_query_reject_condition: Optional[
207
common.QueryRejectCondition
208
] = None,
209
retry_config: Optional[client.RetryConfig] = None,
210
rpc_metadata: Mapping[str, str] = {},
211
identity: Optional[str] = None,
212
port: Optional[int] = None,
213
download_dest_dir: Optional[str] = None,
214
runtime: Optional[runtime.Runtime] = None,
215
test_server_existing_path: Optional[str] = None,
216
test_server_download_version: str = "default",
217
test_server_extra_args: Sequence[str] = [],
218
test_server_download_ttl: Optional[timedelta] = None,
219
) -> WorkflowEnvironment:
220
"""Start a time skipping workflow environment.
221
222
By default, this environment will automatically skip to the next events
223
in time when a workflow's
224
:py:meth:`temporalio.client.WorkflowHandle.result` is awaited on (which
225
includes :py:meth:`temporalio.client.Client.execute_workflow`). Before
226
the result is awaited on, time can be manually skipped forward using
227
:py:meth:`sleep`. The currently known time can be obtained via
228
:py:meth:`get_current_time`.
229
230
Args:
231
data_converter: Data converter for serialization.
232
interceptors: Client interceptors to apply.
233
plugins: Client plugins to apply.
234
default_workflow_query_reject_condition: Default query reject condition.
235
retry_config: Retry configuration for client calls.
236
rpc_metadata: Additional RPC metadata.
237
identity: Client identity.
238
port: Port number to bind to, or an OS-provided port by default.
239
download_dest_dir: Directory to download binary to if needed.
240
runtime: Specific runtime to use or default if unset.
241
test_server_existing_path: Existing path to the test server binary.
242
test_server_download_version: Specific test server version to download.
243
test_server_extra_args: Extra arguments for the test server binary.
244
test_server_download_ttl: TTL for the downloaded test server binary.
245
246
Returns:
247
The started workflow environment with time skipping.
248
"""
249
250
@property
251
def client(self) -> Client:
252
"""Client to this environment."""
253
254
async def shutdown(self) -> None:
255
"""Shut down this environment."""
256
257
async def sleep(self, duration: Union[timedelta, float]) -> None:
258
"""Sleep in this environment.
259
260
This awaits a regular :py:func:`asyncio.sleep` in regular environments,
261
or manually skips time in time-skipping environments.
262
263
Args:
264
duration: Amount of time to sleep.
265
"""
266
267
async def get_current_time(self) -> datetime:
268
"""Get the current time known to this environment.
269
270
For non-time-skipping environments this is simply the system time. For
271
time-skipping environments this is whatever time has been skipped to.
272
"""
273
274
@property
275
def supports_time_skipping(self) -> bool:
276
"""Whether this environment supports time skipping."""
277
278
@contextmanager
279
def auto_time_skipping_disabled(self) -> Iterator[None]:
280
"""Disable any automatic time skipping if this is a time-skipping
281
environment.
282
283
This is a context manager for use via ``with``. Usually in time-skipping
284
environments, waiting on a workflow result causes time to automatically
285
skip until the next event. This can disable that. However, this only
286
applies to results awaited inside this context. This will not disable
287
automatic time skipping on previous results.
288
289
This has no effect on non-time-skipping environments.
290
"""
291
```
292
293
## Activity Testing
294
295
### Basic Activity Testing
296
297
Test activities in isolation using `ActivityEnvironment`:
298
299
```python
300
import asyncio
301
from temporalio import activity
302
from temporalio.testing import ActivityEnvironment
303
304
@activity.defn
305
async def process_data(data: str) -> str:
306
activity.heartbeat(f"Processing: {data}")
307
await asyncio.sleep(1) # Simulate work
308
return f"Processed: {data}"
309
310
async def test_process_data():
311
# Create test environment
312
env = ActivityEnvironment()
313
314
# Track heartbeats
315
heartbeats = []
316
env.on_heartbeat = lambda *args: heartbeats.append(args[0])
317
318
# Run activity
319
result = await env.run(process_data, "test-data")
320
321
assert result == "Processed: test-data"
322
assert heartbeats == ["Processing: test-data"]
323
```
324
325
### Testing Activity Cancellation
326
327
Test how activities handle cancellation:
328
329
```python
330
from temporalio import activity
331
from temporalio.testing import ActivityEnvironment
332
from temporalio.exceptions import CancelledError
333
334
@activity.defn
335
async def cancellable_activity() -> str:
336
try:
337
# Wait indefinitely
338
await asyncio.Future()
339
return "completed"
340
except asyncio.CancelledError:
341
# Check cancellation details
342
cancellation_details = activity.cancellation_details()
343
if cancellation_details and cancellation_details.cancel_requested:
344
activity.heartbeat("cancelled gracefully")
345
raise
346
return "unexpected cancellation"
347
348
async def test_activity_cancellation():
349
env = ActivityEnvironment()
350
heartbeats = []
351
env.on_heartbeat = lambda *args: heartbeats.append(args[0])
352
353
# Start activity
354
task = asyncio.create_task(env.run(cancellable_activity))
355
356
# Give it time to start
357
await asyncio.sleep(0.1)
358
359
# Cancel with details
360
env.cancel(
361
activity.ActivityCancellationDetails(cancel_requested=True)
362
)
363
364
# Verify cancellation handled properly
365
result = await task
366
assert heartbeats == ["cancelled gracefully"]
367
```
368
369
### Testing Activity Context
370
371
Test activities that use activity context functions:
372
373
```python
374
from temporalio import activity
375
from temporalio.testing import ActivityEnvironment
376
from temporalio.common import MetricMeter
377
from temporalio.converter import DataConverter
378
379
@activity.defn
380
async def context_aware_activity() -> dict:
381
info = activity.info()
382
converter = activity.payload_converter()
383
meter = activity.metric_meter()
384
385
return {
386
"activity_id": info.activity_id,
387
"workflow_id": info.workflow_id,
388
"has_converter": converter is not None,
389
"has_meter": meter is not None
390
}
391
392
async def test_activity_context():
393
env = ActivityEnvironment()
394
395
# Configure custom context
396
env.info = activity.Info(
397
activity_id="test-activity",
398
activity_type="context_aware_activity",
399
workflow_id="test-workflow",
400
# ... other required fields
401
)
402
env.payload_converter = DataConverter.default.payload_converter
403
env.metric_meter = MetricMeter.noop
404
405
result = await env.run(context_aware_activity)
406
407
assert result["activity_id"] == "test-activity"
408
assert result["workflow_id"] == "test-workflow"
409
assert result["has_converter"] is True
410
assert result["has_meter"] is True
411
```
412
413
### Testing Synchronous Activities
414
415
Test synchronous activities with thread-based cancellation:
416
417
```python
418
import threading
419
import time
420
from temporalio import activity
421
from temporalio.testing import ActivityEnvironment
422
from temporalio.exceptions import CancelledError
423
424
@activity.defn
425
def sync_activity(data: str) -> str:
426
activity.heartbeat(f"Starting: {data}")
427
428
# Simulate work with cancellation checks
429
for i in range(10):
430
if activity.is_cancelled():
431
activity.heartbeat("Cancelled during processing")
432
raise CancelledError()
433
time.sleep(0.5)
434
activity.heartbeat(f"Step {i}")
435
436
return f"Done: {data}"
437
438
def test_sync_activity_cancellation():
439
env = ActivityEnvironment()
440
heartbeats = []
441
env.on_heartbeat = lambda *args: heartbeats.append(args[0])
442
443
# Use thread cancellation for sync activities
444
waiting = threading.Event()
445
result_holder = {}
446
447
def run_activity():
448
try:
449
result_holder["result"] = env.run(sync_activity, "test")
450
except CancelledError:
451
result_holder["cancelled"] = True
452
453
thread = threading.Thread(target=run_activity)
454
thread.start()
455
456
# Wait a bit then cancel
457
time.sleep(1)
458
env.cancel()
459
thread.join()
460
461
assert "cancelled" in result_holder
462
assert "Starting: test" in heartbeats
463
```
464
465
### Testing Activity with Client Access
466
467
Test activities that need access to the Temporal client:
468
469
```python
470
from temporalio import activity
471
from temporalio.testing import ActivityEnvironment
472
from temporalio.client import Client
473
from unittest.mock import Mock
474
475
@activity.defn
476
async def client_activity() -> str:
477
client = activity.client()
478
# Use client for additional operations
479
return f"Client namespace: {client.namespace}"
480
481
async def test_activity_with_client():
482
# Create mock client
483
mock_client = Mock(spec=Client)
484
mock_client.namespace = "test-namespace"
485
486
env = ActivityEnvironment(client=mock_client)
487
result = await env.run(client_activity)
488
489
assert result == "Client namespace: test-namespace"
490
491
async def test_activity_without_client():
492
env = ActivityEnvironment() # No client provided
493
494
with pytest.raises(RuntimeError, match="No client available"):
495
await env.run(client_activity)
496
```
497
498
## Workflow Testing
499
500
### Time-Skipping Environment
501
502
Test workflows with automatic time advancement:
503
504
```python
505
from temporalio import workflow
506
from temporalio.testing import WorkflowEnvironment
507
from datetime import timedelta
508
import asyncio
509
510
@workflow.defn
511
class TimerWorkflow:
512
@workflow.run
513
async def run(self, duration_seconds: int) -> str:
514
await asyncio.sleep(duration_seconds)
515
return f"Slept for {duration_seconds} seconds"
516
517
async def test_timer_workflow():
518
async with await WorkflowEnvironment.start_time_skipping() as env:
519
async with Worker(
520
env.client,
521
task_queue="test-queue",
522
workflows=[TimerWorkflow]
523
) as worker:
524
# Execute workflow - time will automatically skip
525
result = await env.client.execute_workflow(
526
TimerWorkflow.run,
527
100, # 100 seconds
528
id="timer-test",
529
task_queue="test-queue"
530
)
531
532
assert result == "Slept for 100 seconds"
533
534
# Verify time has advanced
535
current_time = await env.get_current_time()
536
# Time should be approximately 100 seconds later
537
```
538
539
### Manual Time Control
540
541
Test workflows with manual time manipulation:
542
543
```python
544
@workflow.defn
545
class ScheduledWorkflow:
546
@workflow.run
547
async def run(self) -> list[str]:
548
results = []
549
550
for i in range(3):
551
await asyncio.sleep(30) # 30 seconds
552
results.append(f"Step {i} at {workflow.now()}")
553
554
return results
555
556
async def test_scheduled_workflow():
557
async with await WorkflowEnvironment.start_time_skipping() as env:
558
async with Worker(
559
env.client,
560
task_queue="test-queue",
561
workflows=[ScheduledWorkflow]
562
) as worker:
563
# Start workflow
564
handle = await env.client.start_workflow(
565
ScheduledWorkflow.run,
566
id="scheduled-test",
567
task_queue="test-queue"
568
)
569
570
# Manually advance time and check progress
571
await env.sleep(30) # Skip 30 seconds
572
573
# Workflow should still be running
574
try:
575
result = await asyncio.wait_for(handle.result(), timeout=0.1)
576
assert False, "Workflow completed too early"
577
except asyncio.TimeoutError:
578
pass # Expected
579
580
# Skip more time to complete
581
await env.sleep(70) # Skip total 100 seconds (covers remaining 60s)
582
583
result = await handle.result()
584
assert len(result) == 3
585
```
586
587
### Testing Signals and Queries
588
589
Test workflow signal and query handling:
590
591
```python
592
@workflow.defn
593
class SignalWorkflow:
594
def __init__(self):
595
self.messages = []
596
self.completed = False
597
598
@workflow.run
599
async def run(self) -> list[str]:
600
# Wait for completion signal
601
await workflow.wait_condition(lambda: self.completed)
602
return self.messages
603
604
@workflow.signal
605
def add_message(self, message: str) -> None:
606
self.messages.append(message)
607
608
@workflow.signal
609
def complete(self) -> None:
610
self.completed = True
611
612
@workflow.query
613
def get_message_count(self) -> int:
614
return len(self.messages)
615
616
async def test_signal_workflow():
617
async with await WorkflowEnvironment.start_time_skipping() as env:
618
async with Worker(
619
env.client,
620
task_queue="test-queue",
621
workflows=[SignalWorkflow]
622
) as worker:
623
# Start workflow
624
handle = await env.client.start_workflow(
625
SignalWorkflow.run,
626
id="signal-test",
627
task_queue="test-queue"
628
)
629
630
# Send signals
631
await handle.signal(SignalWorkflow.add_message, "Hello")
632
await handle.signal(SignalWorkflow.add_message, "World")
633
634
# Query current state
635
count = await handle.query(SignalWorkflow.get_message_count)
636
assert count == 2
637
638
# Complete workflow
639
await handle.signal(SignalWorkflow.complete)
640
641
result = await handle.result()
642
assert result == ["Hello", "World"]
643
```
644
645
### Testing Child Workflows
646
647
Test workflows that spawn child workflows:
648
649
```python
650
@workflow.defn
651
class ChildWorkflow:
652
@workflow.run
653
async def run(self, value: int) -> int:
654
await asyncio.sleep(1)
655
return value * 2
656
657
@workflow.defn
658
class ParentWorkflow:
659
@workflow.run
660
async def run(self, values: list[int]) -> list[int]:
661
# Start child workflows
662
child_handles = []
663
for value in values:
664
handle = await workflow.start_child_workflow(
665
ChildWorkflow.run,
666
value,
667
id=f"child-{value}"
668
)
669
child_handles.append(handle)
670
671
# Collect results
672
results = []
673
for handle in child_handles:
674
result = await handle
675
results.append(result)
676
677
return results
678
679
async def test_child_workflows():
680
async with await WorkflowEnvironment.start_time_skipping() as env:
681
async with Worker(
682
env.client,
683
task_queue="test-queue",
684
workflows=[ParentWorkflow, ChildWorkflow]
685
) as worker:
686
result = await env.client.execute_workflow(
687
ParentWorkflow.run,
688
[1, 2, 3, 4],
689
id="parent-test",
690
task_queue="test-queue"
691
)
692
693
assert result == [2, 4, 6, 8]
694
```
695
696
### Testing Workflow Updates
697
698
Test workflow update handlers:
699
700
```python
701
@workflow.defn
702
class UpdateableWorkflow:
703
def __init__(self):
704
self.counter = 0
705
self.running = True
706
707
@workflow.run
708
async def run(self) -> int:
709
await workflow.wait_condition(lambda: not self.running)
710
return self.counter
711
712
@workflow.update
713
def increment(self, amount: int) -> int:
714
self.counter += amount
715
return self.counter
716
717
@workflow.update
718
def stop(self) -> None:
719
self.running = False
720
721
async def test_workflow_updates():
722
async with await WorkflowEnvironment.start_time_skipping() as env:
723
async with Worker(
724
env.client,
725
task_queue="test-queue",
726
workflows=[UpdateableWorkflow]
727
) as worker:
728
handle = await env.client.start_workflow(
729
UpdateableWorkflow.run,
730
id="update-test",
731
task_queue="test-queue"
732
)
733
734
# Send updates
735
result1 = await handle.execute_update(UpdateableWorkflow.increment, 5)
736
assert result1 == 5
737
738
result2 = await handle.execute_update(UpdateableWorkflow.increment, 3)
739
assert result2 == 8
740
741
# Stop workflow
742
await handle.execute_update(UpdateableWorkflow.stop)
743
744
final_result = await handle.result()
745
assert final_result == 8
746
```
747
748
## Time Management in Tests
749
750
### Time Skipping Patterns
751
752
Control time advancement for deterministic testing:
753
754
```python
755
@workflow.defn
756
class TimeBasedWorkflow:
757
@workflow.run
758
async def run(self) -> list[str]:
759
events = []
760
761
# Record initial time
762
start_time = workflow.now()
763
events.append(f"Started at {start_time}")
764
765
# Wait various durations
766
await asyncio.sleep(60) # 1 minute
767
events.append(f"After 1 minute: {workflow.now()}")
768
769
await asyncio.sleep(3600) # 1 hour
770
events.append(f"After 1 hour: {workflow.now()}")
771
772
return events
773
774
async def test_time_advancement():
775
async with await WorkflowEnvironment.start_time_skipping() as env:
776
async with Worker(
777
env.client,
778
task_queue="test-queue",
779
workflows=[TimeBasedWorkflow]
780
) as worker:
781
# Record initial time
782
initial_time = await env.get_current_time()
783
784
# Execute workflow (time auto-advances)
785
result = await env.client.execute_workflow(
786
TimeBasedWorkflow.run,
787
id="time-test",
788
task_queue="test-queue"
789
)
790
791
# Verify time advancement
792
final_time = await env.get_current_time()
793
elapsed = final_time - initial_time
794
795
# Should be approximately 1 hour and 1 minute
796
assert abs(elapsed.total_seconds() - 3660) < 60
797
```
798
799
### Disabling Auto Time Skipping
800
801
Test workflows with real-time progression when needed:
802
803
```python
804
async def test_real_time_workflow():
805
async with await WorkflowEnvironment.start_time_skipping() as env:
806
async with Worker(
807
env.client,
808
task_queue="test-queue",
809
workflows=[TimeBasedWorkflow]
810
) as worker:
811
with env.auto_time_skipping_disabled():
812
start_real_time = monotonic()
813
814
# This will take real time
815
await env.client.execute_workflow(
816
TimeBasedWorkflow.run,
817
id="real-time-test",
818
task_queue="test-queue"
819
)
820
821
elapsed_real_time = monotonic() - start_real_time
822
# Should have taken actual time (limited by workflow logic)
823
```
824
825
### Manual Time Control
826
827
Precisely control time advancement for complex scenarios:
828
829
```python
830
@workflow.defn
831
class ComplexTimingWorkflow:
832
@workflow.run
833
async def run(self) -> dict:
834
results = {}
835
836
# Phase 1: Quick operations
837
for i in range(5):
838
await asyncio.sleep(10)
839
results[f"quick_{i}"] = workflow.now()
840
841
# Phase 2: Long operation
842
await asyncio.sleep(300) # 5 minutes
843
results["long_operation"] = workflow.now()
844
845
return results
846
847
async def test_complex_timing():
848
async with await WorkflowEnvironment.start_time_skipping() as env:
849
async with Worker(
850
env.client,
851
task_queue="test-queue",
852
workflows=[ComplexTimingWorkflow]
853
) as worker:
854
# Start workflow without auto-completion
855
handle = await env.client.start_workflow(
856
ComplexTimingWorkflow.run,
857
id="complex-timing",
858
task_queue="test-queue"
859
)
860
861
# Manually advance through quick phase
862
for i in range(5):
863
await env.sleep(10)
864
# Could check intermediate state here
865
866
# Advance through long operation
867
await env.sleep(300)
868
869
result = await handle.result()
870
871
# Verify timing progression
872
assert len(result) == 6
873
assert "long_operation" in result
874
```
875
876
## Nexus Testing
877
878
### Testing Nexus Operations
879
880
Test Nexus service operations and handlers:
881
882
```python
883
from temporalio import nexus
884
from temporalio.testing import WorkflowEnvironment
885
from temporalio.client import Client
886
887
# Define Nexus service
888
@nexus.service.defn
889
class CalculatorService:
890
@nexus.operation.defn
891
async def add(self, a: int, b: int) -> int:
892
return a + b
893
894
@nexus.operation.defn
895
async def multiply(self, a: int, b: int) -> int:
896
await asyncio.sleep(1) # Simulate work
897
return a * b
898
899
# Workflow that uses Nexus
900
@workflow.defn
901
class NexusWorkflow:
902
@workflow.run
903
async def run(self, x: int, y: int) -> dict:
904
# Create Nexus client
905
nexus_client = workflow.create_nexus_client(
906
"calculator-service",
907
CalculatorService
908
)
909
910
# Call operations
911
sum_result = await nexus_client.add(x, y)
912
product_result = await nexus_client.multiply(x, y)
913
914
return {
915
"sum": sum_result,
916
"product": product_result
917
}
918
919
async def test_nexus_operations():
920
async with await WorkflowEnvironment.start_time_skipping() as env:
921
# Create Nexus endpoint (test helper)
922
endpoint = await create_test_nexus_endpoint(
923
env.client,
924
"calculator-service",
925
"test-nexus-queue"
926
)
927
928
async with Worker(
929
env.client,
930
task_queue="test-queue",
931
workflows=[NexusWorkflow]
932
) as workflow_worker:
933
async with Worker(
934
env.client,
935
task_queue="test-nexus-queue",
936
nexus_services=[CalculatorService()]
937
) as nexus_worker:
938
result = await env.client.execute_workflow(
939
NexusWorkflow.run,
940
5, 3,
941
id="nexus-test",
942
task_queue="test-queue"
943
)
944
945
assert result["sum"] == 8
946
assert result["product"] == 15
947
```
948
949
### Mock Nexus Services
950
951
Test workflows with mocked Nexus dependencies:
952
953
```python
954
from unittest.mock import AsyncMock
955
956
class MockCalculatorService:
957
def __init__(self):
958
self.add = AsyncMock(return_value=100)
959
self.multiply = AsyncMock(return_value=200)
960
961
async def test_nexus_with_mocks():
962
# This would require custom interceptors or test utilities
963
# to inject mock services into the Nexus client resolution
964
pass # Implementation depends on specific mocking strategy
965
```
966
967
### Testing Nexus Error Handling
968
969
Test how workflows handle Nexus operation failures:
970
971
```python
972
@nexus.service.defn
973
class FailingService:
974
@nexus.operation.defn
975
async def unreliable_op(self, fail: bool) -> str:
976
if fail:
977
raise ApplicationError("Operation failed", type="ServiceError")
978
return "success"
979
980
@workflow.defn
981
class ErrorHandlingWorkflow:
982
@workflow.run
983
async def run(self, should_fail: bool) -> str:
984
nexus_client = workflow.create_nexus_client(
985
"failing-service",
986
FailingService
987
)
988
989
try:
990
result = await nexus_client.unreliable_op(should_fail)
991
return f"Success: {result}"
992
except NexusOperationError as e:
993
return f"Error: {e}"
994
995
async def test_nexus_error_handling():
996
async with await WorkflowEnvironment.start_time_skipping() as env:
997
# Set up Nexus service and workflow workers
998
# Test both success and failure cases
999
pass
1000
```
1001
1002
## Test Utilities and Patterns
1003
1004
### Common Test Setup
1005
1006
Reusable patterns for test environment setup:
1007
1008
```python
1009
from contextlib import asynccontextmanager
1010
from typing import AsyncGenerator
1011
1012
@asynccontextmanager
1013
async def test_environment() -> AsyncGenerator[WorkflowEnvironment, None]:
1014
"""Standard test environment setup."""
1015
async with await WorkflowEnvironment.start_time_skipping() as env:
1016
yield env
1017
1018
@asynccontextmanager
1019
async def workflow_worker(
1020
env: WorkflowEnvironment,
1021
workflows: list,
1022
activities: list = None,
1023
task_queue: str = "test-queue"
1024
) -> AsyncGenerator[Worker, None]:
1025
"""Standard worker setup for tests."""
1026
async with Worker(
1027
env.client,
1028
task_queue=task_queue,
1029
workflows=workflows,
1030
activities=activities or []
1031
) as worker:
1032
yield worker
1033
1034
# Usage in tests
1035
async def test_with_standard_setup():
1036
async with test_environment() as env:
1037
async with workflow_worker(env, [MyWorkflow]) as worker:
1038
result = await env.client.execute_workflow(
1039
MyWorkflow.run,
1040
id="test",
1041
task_queue="test-queue"
1042
)
1043
assert result is not None
1044
```
1045
1046
### Assertion Patterns
1047
1048
Common patterns for asserting workflow and activity behavior:
1049
1050
```python
1051
def assert_workflow_completed_successfully(handle):
1052
"""Assert workflow completed without failure."""
1053
try:
1054
result = await handle.result()
1055
return result
1056
except WorkflowFailureError:
1057
pytest.fail("Workflow failed unexpectedly")
1058
1059
def assert_workflow_failed_with(handle, error_type):
1060
"""Assert workflow failed with specific error type."""
1061
with pytest.raises(WorkflowFailureError) as exc_info:
1062
await handle.result()
1063
1064
assert isinstance(exc_info.value.cause, error_type)
1065
return exc_info.value.cause
1066
1067
def assert_activity_heartbeats(env, expected_heartbeats):
1068
"""Assert activity sent expected heartbeats."""
1069
heartbeats = []
1070
env.on_heartbeat = lambda *args: heartbeats.append(args[0])
1071
# Run activity...
1072
assert heartbeats == expected_heartbeats
1073
```
1074
1075
### Test Data Management
1076
1077
Utilities for managing test data and state:
1078
1079
```python
1080
import uuid
1081
from dataclasses import dataclass
1082
1083
@dataclass
1084
class TestWorkflowIds:
1085
"""Generate unique workflow IDs for tests."""
1086
prefix: str = "test"
1087
1088
def generate(self, suffix: str = None) -> str:
1089
base = f"{self.prefix}-{uuid.uuid4()}"
1090
return f"{base}-{suffix}" if suffix else base
1091
1092
class TestDataBuilder:
1093
"""Builder pattern for test data."""
1094
1095
def __init__(self):
1096
self.reset()
1097
1098
def reset(self):
1099
self.data = {}
1100
return self
1101
1102
def with_field(self, key: str, value):
1103
self.data[key] = value
1104
return self
1105
1106
def build(self):
1107
return self.data.copy()
1108
1109
# Usage
1110
ids = TestWorkflowIds("integration")
1111
workflow_id = ids.generate("signal-test")
1112
1113
test_data = (TestDataBuilder()
1114
.with_field("name", "test-user")
1115
.with_field("value", 42)
1116
.build())
1117
```
1118
1119
## Integration Testing
1120
1121
### End-to-End Testing
1122
1123
Test complete workflows with all components:
1124
1125
```python
1126
@workflow.defn
1127
class OrderProcessingWorkflow:
1128
@workflow.run
1129
async def run(self, order_id: str) -> dict:
1130
# Validate order
1131
await workflow.execute_activity(
1132
validate_order,
1133
order_id,
1134
schedule_to_close_timeout=timedelta(minutes=5)
1135
)
1136
1137
# Process payment
1138
payment_result = await workflow.execute_activity(
1139
process_payment,
1140
order_id,
1141
schedule_to_close_timeout=timedelta(minutes=10)
1142
)
1143
1144
# Ship order
1145
shipping_result = await workflow.execute_activity(
1146
ship_order,
1147
order_id,
1148
schedule_to_close_timeout=timedelta(days=1)
1149
)
1150
1151
return {
1152
"order_id": order_id,
1153
"payment": payment_result,
1154
"shipping": shipping_result,
1155
"status": "completed"
1156
}
1157
1158
@activity.defn
1159
async def validate_order(order_id: str) -> bool:
1160
# Mock validation logic
1161
activity.heartbeat(f"Validating order {order_id}")
1162
await asyncio.sleep(1)
1163
return True
1164
1165
@activity.defn
1166
async def process_payment(order_id: str) -> str:
1167
activity.heartbeat(f"Processing payment for {order_id}")
1168
await asyncio.sleep(2)
1169
return f"payment-{order_id}"
1170
1171
@activity.defn
1172
async def ship_order(order_id: str) -> str:
1173
activity.heartbeat(f"Shipping order {order_id}")
1174
await asyncio.sleep(5) # Will be skipped in time-skipping tests
1175
return f"tracking-{order_id}"
1176
1177
async def test_order_processing_e2e():
1178
async with await WorkflowEnvironment.start_time_skipping() as env:
1179
async with Worker(
1180
env.client,
1181
task_queue="orders",
1182
workflows=[OrderProcessingWorkflow],
1183
activities=[validate_order, process_payment, ship_order]
1184
) as worker:
1185
result = await env.client.execute_workflow(
1186
OrderProcessingWorkflow.run,
1187
"order-123",
1188
id="order-processing-test",
1189
task_queue="orders"
1190
)
1191
1192
assert result["order_id"] == "order-123"
1193
assert result["status"] == "completed"
1194
assert "payment-order-123" in result["payment"]
1195
assert "tracking-order-123" in result["shipping"]
1196
```
1197
1198
### Testing with External Dependencies
1199
1200
Test workflows that interact with external services:
1201
1202
```python
1203
from unittest.mock import AsyncMock, patch
1204
1205
@activity.defn
1206
async def call_external_api(url: str, data: dict) -> dict:
1207
# In real implementation, this would make HTTP calls
1208
import httpx
1209
async with httpx.AsyncClient() as client:
1210
response = await client.post(url, json=data)
1211
return response.json()
1212
1213
async def test_workflow_with_external_deps():
1214
"""Test workflow with mocked external dependencies."""
1215
1216
# Mock the external HTTP call
1217
mock_response = {"status": "success", "id": "ext-123"}
1218
1219
with patch("httpx.AsyncClient.post") as mock_post:
1220
mock_post.return_value.json.return_value = mock_response
1221
1222
async with await WorkflowEnvironment.start_time_skipping() as env:
1223
async with Worker(
1224
env.client,
1225
task_queue="test-queue",
1226
workflows=[MyWorkflow],
1227
activities=[call_external_api]
1228
) as worker:
1229
result = await env.client.execute_workflow(
1230
MyWorkflow.run,
1231
id="external-test",
1232
task_queue="test-queue"
1233
)
1234
1235
# Verify mock was called
1236
mock_post.assert_called_once()
1237
assert result is not None
1238
```
1239
1240
### Multi-Worker Testing
1241
1242
Test scenarios with multiple workers and task queues:
1243
1244
```python
1245
async def test_multi_worker_scenario():
1246
"""Test workflow spanning multiple workers and task queues."""
1247
1248
async with await WorkflowEnvironment.start_time_skipping() as env:
1249
# Worker for main workflows
1250
async with Worker(
1251
env.client,
1252
task_queue="main-queue",
1253
workflows=[CoordinatorWorkflow]
1254
) as main_worker:
1255
# Worker for processing activities
1256
async with Worker(
1257
env.client,
1258
task_queue="processing-queue",
1259
activities=[process_item, validate_item]
1260
) as processing_worker:
1261
# Worker for notification activities
1262
async with Worker(
1263
env.client,
1264
task_queue="notification-queue",
1265
activities=[send_notification]
1266
) as notification_worker:
1267
1268
result = await env.client.execute_workflow(
1269
CoordinatorWorkflow.run,
1270
["item1", "item2", "item3"],
1271
id="multi-worker-test",
1272
task_queue="main-queue"
1273
)
1274
1275
assert result["processed_count"] == 3
1276
assert result["notifications_sent"] == 3
1277
```
1278
1279
## Testing Configuration
1280
1281
### Custom Test Environments
1282
1283
Configure test environments for specific scenarios:
1284
1285
```python
1286
async def create_test_env_with_custom_config():
1287
"""Create test environment with custom configuration."""
1288
1289
# Custom data converter for testing
1290
custom_converter = DataConverter(
1291
payload_converter_class=JSONPlainPayloadConverter,
1292
failure_converter_class=DefaultFailureConverter
1293
)
1294
1295
# Custom interceptors for testing
1296
test_interceptors = [
1297
LoggingInterceptor(),
1298
MetricsInterceptor()
1299
]
1300
1301
return await WorkflowEnvironment.start_time_skipping(
1302
data_converter=custom_converter,
1303
interceptors=test_interceptors,
1304
runtime=Runtime(telemetry=TelemetryConfig(
1305
logging=LoggingConfig(level="DEBUG")
1306
))
1307
)
1308
```
1309
1310
### Test Isolation Patterns
1311
1312
Ensure test isolation and cleanup:
1313
1314
```python
1315
class TestIsolation:
1316
"""Helper for test isolation."""
1317
1318
def __init__(self):
1319
self.created_workflows = []
1320
self.test_data = {}
1321
1322
def generate_workflow_id(self, prefix: str) -> str:
1323
workflow_id = f"{prefix}-{uuid.uuid4()}"
1324
self.created_workflows.append(workflow_id)
1325
return workflow_id
1326
1327
async def cleanup(self, client: Client):
1328
"""Clean up test resources."""
1329
for workflow_id in self.created_workflows:
1330
try:
1331
handle = client.get_workflow_handle(workflow_id)
1332
await handle.terminate("test cleanup")
1333
except:
1334
pass # Ignore cleanup errors
1335
1336
# Usage in tests
1337
@pytest.fixture
1338
async def test_isolation():
1339
isolation = TestIsolation()
1340
yield isolation
1341
# Cleanup happens in test teardown
1342
1343
async def test_with_isolation(test_isolation):
1344
async with test_environment() as env:
1345
workflow_id = test_isolation.generate_workflow_id("test")
1346
1347
# Run test...
1348
1349
await test_isolation.cleanup(env.client)
1350
```
1351
1352
### Performance Testing Considerations
1353
1354
Basic patterns for performance testing:
1355
1356
```python
1357
import time
1358
from statistics import mean, stdev
1359
1360
async def test_workflow_performance():
1361
"""Basic performance test for workflow execution."""
1362
1363
async with await WorkflowEnvironment.start_time_skipping() as env:
1364
async with Worker(
1365
env.client,
1366
task_queue="perf-test",
1367
workflows=[FastWorkflow],
1368
max_concurrent_workflows=100
1369
) as worker:
1370
1371
# Warm up
1372
await env.client.execute_workflow(
1373
FastWorkflow.run,
1374
id="warmup",
1375
task_queue="perf-test"
1376
)
1377
1378
# Performance test
1379
execution_times = []
1380
for i in range(10):
1381
start_time = time.monotonic()
1382
1383
await env.client.execute_workflow(
1384
FastWorkflow.run,
1385
id=f"perf-{i}",
1386
task_queue="perf-test"
1387
)
1388
1389
execution_times.append(time.monotonic() - start_time)
1390
1391
# Basic performance assertions
1392
avg_time = mean(execution_times)
1393
std_dev = stdev(execution_times)
1394
1395
assert avg_time < 1.0, f"Average execution time too high: {avg_time}"
1396
assert std_dev < 0.5, f"Execution time variance too high: {std_dev}"
1397
```
1398
1399
This comprehensive testing documentation covers all the major aspects of testing Temporal workflows and activities using the temporalio.testing module, providing both basic examples and advanced patterns for thorough testing of distributed workflow applications.