or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

activity.mdclient.mdcommon.mdcontrib-pydantic.mddata-conversion.mdexceptions.mdindex.mdruntime.mdtesting.mdworker.mdworkflow.md

testing.mddocs/

0

# Testing

1

2

The temporalio.testing module provides comprehensive testing environments and utilities for testing Temporal workflows and activities. It includes specialized environments for both activity and workflow testing, with support for time manipulation, mocking, and comprehensive integration testing patterns.

3

4

## Core Imports

5

6

```python

7

from temporalio.testing import ActivityEnvironment, WorkflowEnvironment

8

from temporalio import activity, workflow

9

from temporalio.client import Client

10

from temporalio.worker import Worker

11

```

12

13

For advanced testing patterns:

14

15

```python

16

from temporalio.testing import ActivityEnvironment, WorkflowEnvironment

17

from temporalio.common import RetryPolicy, SearchAttributeKey

18

from temporalio.exceptions import ActivityError, ApplicationError, CancelledError

19

from datetime import datetime, timedelta

20

```

21

22

## Testing Environments

23

24

### ActivityEnvironment

25

26

The `ActivityEnvironment` class provides an isolated environment for testing activity functions with complete control over the activity context and lifecycle.

27

28

```python { .api }

29

class ActivityEnvironment:

30

"""Activity environment for testing activities.

31

32

This environment is used for running activity code that can access the

33

functions in the :py:mod:`temporalio.activity` module. Use :py:meth:`run` to

34

run an activity function or any function within an activity context.

35

36

Attributes:

37

info: The info that is returned from :py:func:`temporalio.activity.info`

38

function.

39

on_heartbeat: Function called on each heartbeat invocation by the

40

activity.

41

payload_converter: Payload converter set on the activity context. This

42

must be set before :py:meth:`run`. Changes after the activity has

43

started do not take effect.

44

metric_meter: Metric meter set on the activity context. This must be set

45

before :py:meth:`run`. Changes after the activity has started do not

46

take effect. Default is noop.

47

"""

48

49

def __init__(self, client: Optional[Client] = None) -> None:

50

"""Create an ActivityEnvironment for running activity code.

51

52

Args:

53

client: Optional client to make available in activity context.

54

Only available for async activities.

55

"""

56

57

def cancel(

58

self,

59

cancellation_details: activity.ActivityCancellationDetails = activity.ActivityCancellationDetails(

60

cancel_requested=True

61

),

62

) -> None:

63

"""Cancel the activity.

64

65

Args:

66

cancellation_details: Details about the cancellation. These will

67

be accessible through temporalio.activity.cancellation_details()

68

in the activity after cancellation.

69

70

This only has an effect on the first call.

71

"""

72

73

def worker_shutdown(self) -> None:

74

"""Notify the activity that the worker is shutting down.

75

76

This only has an effect on the first call.

77

"""

78

79

def run(

80

self,

81

fn: Callable[_Params, _Return],

82

*args: _Params.args,

83

**kwargs: _Params.kwargs,

84

) -> _Return:

85

"""Run the given callable in an activity context.

86

87

Args:

88

fn: The function/callable to run.

89

args: All positional arguments to the callable.

90

kwargs: All keyword arguments to the callable.

91

92

Returns:

93

The callable's result.

94

"""

95

```

96

97

### WorkflowEnvironment

98

99

The `WorkflowEnvironment` class provides environments for testing workflows with different capabilities including time skipping and full server integration.

100

101

```python { .api }

102

class WorkflowEnvironment:

103

"""Workflow environment for testing workflows.

104

105

Most developers will want to use the static :py:meth:`start_time_skipping`

106

to start a test server process that automatically skips time as needed.

107

Alternatively, :py:meth:`start_local` may be used for a full, local Temporal

108

server with more features. To use an existing server, use

109

:py:meth:`from_client`.

110

111

This environment is an async context manager, so it can be used with

112

``async with`` to make sure it shuts down properly. Otherwise,

113

:py:meth:`shutdown` can be manually called.

114

115

To use the environment, simply use the :py:attr:`client` on it.

116

117

Workflows invoked on the workflow environment are automatically configured

118

to have ``assert`` failures fail the workflow with the assertion error.

119

"""

120

121

@staticmethod

122

def from_client(client: Client) -> WorkflowEnvironment:

123

"""Create a workflow environment from the given client.

124

125

:py:attr:`supports_time_skipping` will always return ``False`` for this

126

environment. :py:meth:`sleep` will sleep the actual amount of time and

127

:py:meth:`get_current_time` will return the current time.

128

129

Args:

130

client: The client to use for the environment.

131

132

Returns:

133

The workflow environment that runs against the given client.

134

"""

135

136

@staticmethod

137

async def start_local(

138

*,

139

namespace: str = "default",

140

data_converter: converter.DataConverter = converter.DataConverter.default,

141

interceptors: Sequence[client.Interceptor] = [],

142

plugins: Sequence[client.Plugin] = [],

143

default_workflow_query_reject_condition: Optional[

144

common.QueryRejectCondition

145

] = None,

146

retry_config: Optional[client.RetryConfig] = None,

147

rpc_metadata: Mapping[str, str] = {},

148

identity: Optional[str] = None,

149

tls: bool | client.TLSConfig = False,

150

ip: str = "127.0.0.1",

151

port: Optional[int] = None,

152

download_dest_dir: Optional[str] = None,

153

ui: bool = False,

154

runtime: Optional[runtime.Runtime] = None,

155

search_attributes: Sequence[common.SearchAttributeKey] = (),

156

dev_server_existing_path: Optional[str] = None,

157

dev_server_database_filename: Optional[str] = None,

158

dev_server_log_format: str = "pretty",

159

dev_server_log_level: Optional[str] = "warn",

160

dev_server_download_version: str = "default",

161

dev_server_extra_args: Sequence[str] = [],

162

dev_server_download_ttl: Optional[timedelta] = None,

163

) -> WorkflowEnvironment:

164

"""Start a full Temporal server locally, downloading if necessary.

165

166

This environment is good for testing full server capabilities, but does

167

not support time skipping like :py:meth:`start_time_skipping` does.

168

:py:attr:`supports_time_skipping` will always return ``False`` for this

169

environment. :py:meth:`sleep` will sleep the actual amount of time and

170

:py:meth:`get_current_time` will return the current time.

171

172

Args:

173

namespace: Namespace name to use for this environment.

174

data_converter: Data converter for serialization.

175

interceptors: Client interceptors to apply.

176

plugins: Client plugins to apply.

177

default_workflow_query_reject_condition: Default query reject condition.

178

retry_config: Retry configuration for client calls.

179

rpc_metadata: Additional RPC metadata.

180

identity: Client identity.

181

tls: TLS configuration.

182

ip: IP address to bind to, or 127.0.0.1 by default.

183

port: Port number to bind to, or an OS-provided port by default.

184

download_dest_dir: Directory to download binary to if needed.

185

ui: If ``True``, will start a UI in the dev server.

186

runtime: Specific runtime to use or default if unset.

187

search_attributes: Search attributes to register with the dev server.

188

dev_server_existing_path: Existing path to the CLI binary.

189

dev_server_database_filename: Path to the Sqlite database to use.

190

dev_server_log_format: Log format for the dev server.

191

dev_server_log_level: Log level for the dev server.

192

dev_server_download_version: Specific CLI version to download.

193

dev_server_extra_args: Extra arguments for the CLI binary.

194

dev_server_download_ttl: TTL for the downloaded CLI binary.

195

196

Returns:

197

The started CLI dev server workflow environment.

198

"""

199

200

@staticmethod

201

async def start_time_skipping(

202

*,

203

data_converter: converter.DataConverter = converter.DataConverter.default,

204

interceptors: Sequence[client.Interceptor] = [],

205

plugins: Sequence[client.Plugin] = [],

206

default_workflow_query_reject_condition: Optional[

207

common.QueryRejectCondition

208

] = None,

209

retry_config: Optional[client.RetryConfig] = None,

210

rpc_metadata: Mapping[str, str] = {},

211

identity: Optional[str] = None,

212

port: Optional[int] = None,

213

download_dest_dir: Optional[str] = None,

214

runtime: Optional[runtime.Runtime] = None,

215

test_server_existing_path: Optional[str] = None,

216

test_server_download_version: str = "default",

217

test_server_extra_args: Sequence[str] = [],

218

test_server_download_ttl: Optional[timedelta] = None,

219

) -> WorkflowEnvironment:

220

"""Start a time skipping workflow environment.

221

222

By default, this environment will automatically skip to the next events

223

in time when a workflow's

224

:py:meth:`temporalio.client.WorkflowHandle.result` is awaited on (which

225

includes :py:meth:`temporalio.client.Client.execute_workflow`). Before

226

the result is awaited on, time can be manually skipped forward using

227

:py:meth:`sleep`. The currently known time can be obtained via

228

:py:meth:`get_current_time`.

229

230

Args:

231

data_converter: Data converter for serialization.

232

interceptors: Client interceptors to apply.

233

plugins: Client plugins to apply.

234

default_workflow_query_reject_condition: Default query reject condition.

235

retry_config: Retry configuration for client calls.

236

rpc_metadata: Additional RPC metadata.

237

identity: Client identity.

238

port: Port number to bind to, or an OS-provided port by default.

239

download_dest_dir: Directory to download binary to if needed.

240

runtime: Specific runtime to use or default if unset.

241

test_server_existing_path: Existing path to the test server binary.

242

test_server_download_version: Specific test server version to download.

243

test_server_extra_args: Extra arguments for the test server binary.

244

test_server_download_ttl: TTL for the downloaded test server binary.

245

246

Returns:

247

The started workflow environment with time skipping.

248

"""

249

250

@property

251

def client(self) -> Client:

252

"""Client to this environment."""

253

254

async def shutdown(self) -> None:

255

"""Shut down this environment."""

256

257

async def sleep(self, duration: Union[timedelta, float]) -> None:

258

"""Sleep in this environment.

259

260

This awaits a regular :py:func:`asyncio.sleep` in regular environments,

261

or manually skips time in time-skipping environments.

262

263

Args:

264

duration: Amount of time to sleep.

265

"""

266

267

async def get_current_time(self) -> datetime:

268

"""Get the current time known to this environment.

269

270

For non-time-skipping environments this is simply the system time. For

271

time-skipping environments this is whatever time has been skipped to.

272

"""

273

274

@property

275

def supports_time_skipping(self) -> bool:

276

"""Whether this environment supports time skipping."""

277

278

@contextmanager

279

def auto_time_skipping_disabled(self) -> Iterator[None]:

280

"""Disable any automatic time skipping if this is a time-skipping

281

environment.

282

283

This is a context manager for use via ``with``. Usually in time-skipping

284

environments, waiting on a workflow result causes time to automatically

285

skip until the next event. This can disable that. However, this only

286

applies to results awaited inside this context. This will not disable

287

automatic time skipping on previous results.

288

289

This has no effect on non-time-skipping environments.

290

"""

291

```

292

293

## Activity Testing

294

295

### Basic Activity Testing

296

297

Test activities in isolation using `ActivityEnvironment`:

298

299

```python

300

import asyncio

301

from temporalio import activity

302

from temporalio.testing import ActivityEnvironment

303

304

@activity.defn

305

async def process_data(data: str) -> str:

306

activity.heartbeat(f"Processing: {data}")

307

await asyncio.sleep(1) # Simulate work

308

return f"Processed: {data}"

309

310

async def test_process_data():

311

# Create test environment

312

env = ActivityEnvironment()

313

314

# Track heartbeats

315

heartbeats = []

316

env.on_heartbeat = lambda *args: heartbeats.append(args[0])

317

318

# Run activity

319

result = await env.run(process_data, "test-data")

320

321

assert result == "Processed: test-data"

322

assert heartbeats == ["Processing: test-data"]

323

```

324

325

### Testing Activity Cancellation

326

327

Test how activities handle cancellation:

328

329

```python

330

from temporalio import activity

331

from temporalio.testing import ActivityEnvironment

332

from temporalio.exceptions import CancelledError

333

334

@activity.defn

335

async def cancellable_activity() -> str:

336

try:

337

# Wait indefinitely

338

await asyncio.Future()

339

return "completed"

340

except asyncio.CancelledError:

341

# Check cancellation details

342

cancellation_details = activity.cancellation_details()

343

if cancellation_details and cancellation_details.cancel_requested:

344

activity.heartbeat("cancelled gracefully")

345

raise

346

return "unexpected cancellation"

347

348

async def test_activity_cancellation():

349

env = ActivityEnvironment()

350

heartbeats = []

351

env.on_heartbeat = lambda *args: heartbeats.append(args[0])

352

353

# Start activity

354

task = asyncio.create_task(env.run(cancellable_activity))

355

356

# Give it time to start

357

await asyncio.sleep(0.1)

358

359

# Cancel with details

360

env.cancel(

361

activity.ActivityCancellationDetails(cancel_requested=True)

362

)

363

364

# Verify cancellation handled properly

365

result = await task

366

assert heartbeats == ["cancelled gracefully"]

367

```

368

369

### Testing Activity Context

370

371

Test activities that use activity context functions:

372

373

```python

374

from temporalio import activity

375

from temporalio.testing import ActivityEnvironment

376

from temporalio.common import MetricMeter

377

from temporalio.converter import DataConverter

378

379

@activity.defn

380

async def context_aware_activity() -> dict:

381

info = activity.info()

382

converter = activity.payload_converter()

383

meter = activity.metric_meter()

384

385

return {

386

"activity_id": info.activity_id,

387

"workflow_id": info.workflow_id,

388

"has_converter": converter is not None,

389

"has_meter": meter is not None

390

}

391

392

async def test_activity_context():

393

env = ActivityEnvironment()

394

395

# Configure custom context

396

env.info = activity.Info(

397

activity_id="test-activity",

398

activity_type="context_aware_activity",

399

workflow_id="test-workflow",

400

# ... other required fields

401

)

402

env.payload_converter = DataConverter.default.payload_converter

403

env.metric_meter = MetricMeter.noop

404

405

result = await env.run(context_aware_activity)

406

407

assert result["activity_id"] == "test-activity"

408

assert result["workflow_id"] == "test-workflow"

409

assert result["has_converter"] is True

410

assert result["has_meter"] is True

411

```

412

413

### Testing Synchronous Activities

414

415

Test synchronous activities with thread-based cancellation:

416

417

```python

418

import threading

419

import time

420

from temporalio import activity

421

from temporalio.testing import ActivityEnvironment

422

from temporalio.exceptions import CancelledError

423

424

@activity.defn

425

def sync_activity(data: str) -> str:

426

activity.heartbeat(f"Starting: {data}")

427

428

# Simulate work with cancellation checks

429

for i in range(10):

430

if activity.is_cancelled():

431

activity.heartbeat("Cancelled during processing")

432

raise CancelledError()

433

time.sleep(0.5)

434

activity.heartbeat(f"Step {i}")

435

436

return f"Done: {data}"

437

438

def test_sync_activity_cancellation():

439

env = ActivityEnvironment()

440

heartbeats = []

441

env.on_heartbeat = lambda *args: heartbeats.append(args[0])

442

443

# Use thread cancellation for sync activities

444

waiting = threading.Event()

445

result_holder = {}

446

447

def run_activity():

448

try:

449

result_holder["result"] = env.run(sync_activity, "test")

450

except CancelledError:

451

result_holder["cancelled"] = True

452

453

thread = threading.Thread(target=run_activity)

454

thread.start()

455

456

# Wait a bit then cancel

457

time.sleep(1)

458

env.cancel()

459

thread.join()

460

461

assert "cancelled" in result_holder

462

assert "Starting: test" in heartbeats

463

```

464

465

### Testing Activity with Client Access

466

467

Test activities that need access to the Temporal client:

468

469

```python

470

from temporalio import activity

471

from temporalio.testing import ActivityEnvironment

472

from temporalio.client import Client

473

from unittest.mock import Mock

474

475

@activity.defn

476

async def client_activity() -> str:

477

client = activity.client()

478

# Use client for additional operations

479

return f"Client namespace: {client.namespace}"

480

481

async def test_activity_with_client():

482

# Create mock client

483

mock_client = Mock(spec=Client)

484

mock_client.namespace = "test-namespace"

485

486

env = ActivityEnvironment(client=mock_client)

487

result = await env.run(client_activity)

488

489

assert result == "Client namespace: test-namespace"

490

491

async def test_activity_without_client():

492

env = ActivityEnvironment() # No client provided

493

494

with pytest.raises(RuntimeError, match="No client available"):

495

await env.run(client_activity)

496

```

497

498

## Workflow Testing

499

500

### Time-Skipping Environment

501

502

Test workflows with automatic time advancement:

503

504

```python

505

from temporalio import workflow

506

from temporalio.testing import WorkflowEnvironment

507

from datetime import timedelta

508

import asyncio

509

510

@workflow.defn

511

class TimerWorkflow:

512

@workflow.run

513

async def run(self, duration_seconds: int) -> str:

514

await asyncio.sleep(duration_seconds)

515

return f"Slept for {duration_seconds} seconds"

516

517

async def test_timer_workflow():

518

async with await WorkflowEnvironment.start_time_skipping() as env:

519

async with Worker(

520

env.client,

521

task_queue="test-queue",

522

workflows=[TimerWorkflow]

523

) as worker:

524

# Execute workflow - time will automatically skip

525

result = await env.client.execute_workflow(

526

TimerWorkflow.run,

527

100, # 100 seconds

528

id="timer-test",

529

task_queue="test-queue"

530

)

531

532

assert result == "Slept for 100 seconds"

533

534

# Verify time has advanced

535

current_time = await env.get_current_time()

536

# Time should be approximately 100 seconds later

537

```

538

539

### Manual Time Control

540

541

Test workflows with manual time manipulation:

542

543

```python

544

@workflow.defn

545

class ScheduledWorkflow:

546

@workflow.run

547

async def run(self) -> list[str]:

548

results = []

549

550

for i in range(3):

551

await asyncio.sleep(30) # 30 seconds

552

results.append(f"Step {i} at {workflow.now()}")

553

554

return results

555

556

async def test_scheduled_workflow():

557

async with await WorkflowEnvironment.start_time_skipping() as env:

558

async with Worker(

559

env.client,

560

task_queue="test-queue",

561

workflows=[ScheduledWorkflow]

562

) as worker:

563

# Start workflow

564

handle = await env.client.start_workflow(

565

ScheduledWorkflow.run,

566

id="scheduled-test",

567

task_queue="test-queue"

568

)

569

570

# Manually advance time and check progress

571

await env.sleep(30) # Skip 30 seconds

572

573

# Workflow should still be running

574

try:

575

result = await asyncio.wait_for(handle.result(), timeout=0.1)

576

assert False, "Workflow completed too early"

577

except asyncio.TimeoutError:

578

pass # Expected

579

580

# Skip more time to complete

581

await env.sleep(70) # Skip total 100 seconds (covers remaining 60s)

582

583

result = await handle.result()

584

assert len(result) == 3

585

```

586

587

### Testing Signals and Queries

588

589

Test workflow signal and query handling:

590

591

```python

592

@workflow.defn

593

class SignalWorkflow:

594

def __init__(self):

595

self.messages = []

596

self.completed = False

597

598

@workflow.run

599

async def run(self) -> list[str]:

600

# Wait for completion signal

601

await workflow.wait_condition(lambda: self.completed)

602

return self.messages

603

604

@workflow.signal

605

def add_message(self, message: str) -> None:

606

self.messages.append(message)

607

608

@workflow.signal

609

def complete(self) -> None:

610

self.completed = True

611

612

@workflow.query

613

def get_message_count(self) -> int:

614

return len(self.messages)

615

616

async def test_signal_workflow():

617

async with await WorkflowEnvironment.start_time_skipping() as env:

618

async with Worker(

619

env.client,

620

task_queue="test-queue",

621

workflows=[SignalWorkflow]

622

) as worker:

623

# Start workflow

624

handle = await env.client.start_workflow(

625

SignalWorkflow.run,

626

id="signal-test",

627

task_queue="test-queue"

628

)

629

630

# Send signals

631

await handle.signal(SignalWorkflow.add_message, "Hello")

632

await handle.signal(SignalWorkflow.add_message, "World")

633

634

# Query current state

635

count = await handle.query(SignalWorkflow.get_message_count)

636

assert count == 2

637

638

# Complete workflow

639

await handle.signal(SignalWorkflow.complete)

640

641

result = await handle.result()

642

assert result == ["Hello", "World"]

643

```

644

645

### Testing Child Workflows

646

647

Test workflows that spawn child workflows:

648

649

```python

650

@workflow.defn

651

class ChildWorkflow:

652

@workflow.run

653

async def run(self, value: int) -> int:

654

await asyncio.sleep(1)

655

return value * 2

656

657

@workflow.defn

658

class ParentWorkflow:

659

@workflow.run

660

async def run(self, values: list[int]) -> list[int]:

661

# Start child workflows

662

child_handles = []

663

for value in values:

664

handle = await workflow.start_child_workflow(

665

ChildWorkflow.run,

666

value,

667

id=f"child-{value}"

668

)

669

child_handles.append(handle)

670

671

# Collect results

672

results = []

673

for handle in child_handles:

674

result = await handle

675

results.append(result)

676

677

return results

678

679

async def test_child_workflows():

680

async with await WorkflowEnvironment.start_time_skipping() as env:

681

async with Worker(

682

env.client,

683

task_queue="test-queue",

684

workflows=[ParentWorkflow, ChildWorkflow]

685

) as worker:

686

result = await env.client.execute_workflow(

687

ParentWorkflow.run,

688

[1, 2, 3, 4],

689

id="parent-test",

690

task_queue="test-queue"

691

)

692

693

assert result == [2, 4, 6, 8]

694

```

695

696

### Testing Workflow Updates

697

698

Test workflow update handlers:

699

700

```python

701

@workflow.defn

702

class UpdateableWorkflow:

703

def __init__(self):

704

self.counter = 0

705

self.running = True

706

707

@workflow.run

708

async def run(self) -> int:

709

await workflow.wait_condition(lambda: not self.running)

710

return self.counter

711

712

@workflow.update

713

def increment(self, amount: int) -> int:

714

self.counter += amount

715

return self.counter

716

717

@workflow.update

718

def stop(self) -> None:

719

self.running = False

720

721

async def test_workflow_updates():

722

async with await WorkflowEnvironment.start_time_skipping() as env:

723

async with Worker(

724

env.client,

725

task_queue="test-queue",

726

workflows=[UpdateableWorkflow]

727

) as worker:

728

handle = await env.client.start_workflow(

729

UpdateableWorkflow.run,

730

id="update-test",

731

task_queue="test-queue"

732

)

733

734

# Send updates

735

result1 = await handle.execute_update(UpdateableWorkflow.increment, 5)

736

assert result1 == 5

737

738

result2 = await handle.execute_update(UpdateableWorkflow.increment, 3)

739

assert result2 == 8

740

741

# Stop workflow

742

await handle.execute_update(UpdateableWorkflow.stop)

743

744

final_result = await handle.result()

745

assert final_result == 8

746

```

747

748

## Time Management in Tests

749

750

### Time Skipping Patterns

751

752

Control time advancement for deterministic testing:

753

754

```python

755

@workflow.defn

756

class TimeBasedWorkflow:

757

@workflow.run

758

async def run(self) -> list[str]:

759

events = []

760

761

# Record initial time

762

start_time = workflow.now()

763

events.append(f"Started at {start_time}")

764

765

# Wait various durations

766

await asyncio.sleep(60) # 1 minute

767

events.append(f"After 1 minute: {workflow.now()}")

768

769

await asyncio.sleep(3600) # 1 hour

770

events.append(f"After 1 hour: {workflow.now()}")

771

772

return events

773

774

async def test_time_advancement():

775

async with await WorkflowEnvironment.start_time_skipping() as env:

776

async with Worker(

777

env.client,

778

task_queue="test-queue",

779

workflows=[TimeBasedWorkflow]

780

) as worker:

781

# Record initial time

782

initial_time = await env.get_current_time()

783

784

# Execute workflow (time auto-advances)

785

result = await env.client.execute_workflow(

786

TimeBasedWorkflow.run,

787

id="time-test",

788

task_queue="test-queue"

789

)

790

791

# Verify time advancement

792

final_time = await env.get_current_time()

793

elapsed = final_time - initial_time

794

795

# Should be approximately 1 hour and 1 minute

796

assert abs(elapsed.total_seconds() - 3660) < 60

797

```

798

799

### Disabling Auto Time Skipping

800

801

Test workflows with real-time progression when needed:

802

803

```python

804

async def test_real_time_workflow():

805

async with await WorkflowEnvironment.start_time_skipping() as env:

806

async with Worker(

807

env.client,

808

task_queue="test-queue",

809

workflows=[TimeBasedWorkflow]

810

) as worker:

811

with env.auto_time_skipping_disabled():

812

start_real_time = monotonic()

813

814

# This will take real time

815

await env.client.execute_workflow(

816

TimeBasedWorkflow.run,

817

id="real-time-test",

818

task_queue="test-queue"

819

)

820

821

elapsed_real_time = monotonic() - start_real_time

822

# Should have taken actual time (limited by workflow logic)

823

```

824

825

### Manual Time Control

826

827

Precisely control time advancement for complex scenarios:

828

829

```python

830

@workflow.defn

831

class ComplexTimingWorkflow:

832

@workflow.run

833

async def run(self) -> dict:

834

results = {}

835

836

# Phase 1: Quick operations

837

for i in range(5):

838

await asyncio.sleep(10)

839

results[f"quick_{i}"] = workflow.now()

840

841

# Phase 2: Long operation

842

await asyncio.sleep(300) # 5 minutes

843

results["long_operation"] = workflow.now()

844

845

return results

846

847

async def test_complex_timing():

848

async with await WorkflowEnvironment.start_time_skipping() as env:

849

async with Worker(

850

env.client,

851

task_queue="test-queue",

852

workflows=[ComplexTimingWorkflow]

853

) as worker:

854

# Start workflow without auto-completion

855

handle = await env.client.start_workflow(

856

ComplexTimingWorkflow.run,

857

id="complex-timing",

858

task_queue="test-queue"

859

)

860

861

# Manually advance through quick phase

862

for i in range(5):

863

await env.sleep(10)

864

# Could check intermediate state here

865

866

# Advance through long operation

867

await env.sleep(300)

868

869

result = await handle.result()

870

871

# Verify timing progression

872

assert len(result) == 6

873

assert "long_operation" in result

874

```

875

876

## Nexus Testing

877

878

### Testing Nexus Operations

879

880

Test Nexus service operations and handlers:

881

882

```python

883

from temporalio import nexus

884

from temporalio.testing import WorkflowEnvironment

885

from temporalio.client import Client

886

887

# Define Nexus service

888

@nexus.service.defn

889

class CalculatorService:

890

@nexus.operation.defn

891

async def add(self, a: int, b: int) -> int:

892

return a + b

893

894

@nexus.operation.defn

895

async def multiply(self, a: int, b: int) -> int:

896

await asyncio.sleep(1) # Simulate work

897

return a * b

898

899

# Workflow that uses Nexus

900

@workflow.defn

901

class NexusWorkflow:

902

@workflow.run

903

async def run(self, x: int, y: int) -> dict:

904

# Create Nexus client

905

nexus_client = workflow.create_nexus_client(

906

"calculator-service",

907

CalculatorService

908

)

909

910

# Call operations

911

sum_result = await nexus_client.add(x, y)

912

product_result = await nexus_client.multiply(x, y)

913

914

return {

915

"sum": sum_result,

916

"product": product_result

917

}

918

919

async def test_nexus_operations():

920

async with await WorkflowEnvironment.start_time_skipping() as env:

921

# Create Nexus endpoint (test helper)

922

endpoint = await create_test_nexus_endpoint(

923

env.client,

924

"calculator-service",

925

"test-nexus-queue"

926

)

927

928

async with Worker(

929

env.client,

930

task_queue="test-queue",

931

workflows=[NexusWorkflow]

932

) as workflow_worker:

933

async with Worker(

934

env.client,

935

task_queue="test-nexus-queue",

936

nexus_services=[CalculatorService()]

937

) as nexus_worker:

938

result = await env.client.execute_workflow(

939

NexusWorkflow.run,

940

5, 3,

941

id="nexus-test",

942

task_queue="test-queue"

943

)

944

945

assert result["sum"] == 8

946

assert result["product"] == 15

947

```

948

949

### Mock Nexus Services

950

951

Test workflows with mocked Nexus dependencies:

952

953

```python

954

from unittest.mock import AsyncMock

955

956

class MockCalculatorService:

957

def __init__(self):

958

self.add = AsyncMock(return_value=100)

959

self.multiply = AsyncMock(return_value=200)

960

961

async def test_nexus_with_mocks():

962

# This would require custom interceptors or test utilities

963

# to inject mock services into the Nexus client resolution

964

pass # Implementation depends on specific mocking strategy

965

```

966

967

### Testing Nexus Error Handling

968

969

Test how workflows handle Nexus operation failures:

970

971

```python

972

@nexus.service.defn

973

class FailingService:

974

@nexus.operation.defn

975

async def unreliable_op(self, fail: bool) -> str:

976

if fail:

977

raise ApplicationError("Operation failed", type="ServiceError")

978

return "success"

979

980

@workflow.defn

981

class ErrorHandlingWorkflow:

982

@workflow.run

983

async def run(self, should_fail: bool) -> str:

984

nexus_client = workflow.create_nexus_client(

985

"failing-service",

986

FailingService

987

)

988

989

try:

990

result = await nexus_client.unreliable_op(should_fail)

991

return f"Success: {result}"

992

except NexusOperationError as e:

993

return f"Error: {e}"

994

995

async def test_nexus_error_handling():

996

async with await WorkflowEnvironment.start_time_skipping() as env:

997

# Set up Nexus service and workflow workers

998

# Test both success and failure cases

999

pass

1000

```

1001

1002

## Test Utilities and Patterns

1003

1004

### Common Test Setup

1005

1006

Reusable patterns for test environment setup:

1007

1008

```python

1009

from contextlib import asynccontextmanager

1010

from typing import AsyncGenerator

1011

1012

@asynccontextmanager

1013

async def test_environment() -> AsyncGenerator[WorkflowEnvironment, None]:

1014

"""Standard test environment setup."""

1015

async with await WorkflowEnvironment.start_time_skipping() as env:

1016

yield env

1017

1018

@asynccontextmanager

1019

async def workflow_worker(

1020

env: WorkflowEnvironment,

1021

workflows: list,

1022

activities: list = None,

1023

task_queue: str = "test-queue"

1024

) -> AsyncGenerator[Worker, None]:

1025

"""Standard worker setup for tests."""

1026

async with Worker(

1027

env.client,

1028

task_queue=task_queue,

1029

workflows=workflows,

1030

activities=activities or []

1031

) as worker:

1032

yield worker

1033

1034

# Usage in tests

1035

async def test_with_standard_setup():

1036

async with test_environment() as env:

1037

async with workflow_worker(env, [MyWorkflow]) as worker:

1038

result = await env.client.execute_workflow(

1039

MyWorkflow.run,

1040

id="test",

1041

task_queue="test-queue"

1042

)

1043

assert result is not None

1044

```

1045

1046

### Assertion Patterns

1047

1048

Common patterns for asserting workflow and activity behavior:

1049

1050

```python

1051

def assert_workflow_completed_successfully(handle):

1052

"""Assert workflow completed without failure."""

1053

try:

1054

result = await handle.result()

1055

return result

1056

except WorkflowFailureError:

1057

pytest.fail("Workflow failed unexpectedly")

1058

1059

def assert_workflow_failed_with(handle, error_type):

1060

"""Assert workflow failed with specific error type."""

1061

with pytest.raises(WorkflowFailureError) as exc_info:

1062

await handle.result()

1063

1064

assert isinstance(exc_info.value.cause, error_type)

1065

return exc_info.value.cause

1066

1067

def assert_activity_heartbeats(env, expected_heartbeats):

1068

"""Assert activity sent expected heartbeats."""

1069

heartbeats = []

1070

env.on_heartbeat = lambda *args: heartbeats.append(args[0])

1071

# Run activity...

1072

assert heartbeats == expected_heartbeats

1073

```

1074

1075

### Test Data Management

1076

1077

Utilities for managing test data and state:

1078

1079

```python

1080

import uuid

1081

from dataclasses import dataclass

1082

1083

@dataclass

1084

class TestWorkflowIds:

1085

"""Generate unique workflow IDs for tests."""

1086

prefix: str = "test"

1087

1088

def generate(self, suffix: str = None) -> str:

1089

base = f"{self.prefix}-{uuid.uuid4()}"

1090

return f"{base}-{suffix}" if suffix else base

1091

1092

class TestDataBuilder:

1093

"""Builder pattern for test data."""

1094

1095

def __init__(self):

1096

self.reset()

1097

1098

def reset(self):

1099

self.data = {}

1100

return self

1101

1102

def with_field(self, key: str, value):

1103

self.data[key] = value

1104

return self

1105

1106

def build(self):

1107

return self.data.copy()

1108

1109

# Usage

1110

ids = TestWorkflowIds("integration")

1111

workflow_id = ids.generate("signal-test")

1112

1113

test_data = (TestDataBuilder()

1114

.with_field("name", "test-user")

1115

.with_field("value", 42)

1116

.build())

1117

```

1118

1119

## Integration Testing

1120

1121

### End-to-End Testing

1122

1123

Test complete workflows with all components:

1124

1125

```python

1126

@workflow.defn

1127

class OrderProcessingWorkflow:

1128

@workflow.run

1129

async def run(self, order_id: str) -> dict:

1130

# Validate order

1131

await workflow.execute_activity(

1132

validate_order,

1133

order_id,

1134

schedule_to_close_timeout=timedelta(minutes=5)

1135

)

1136

1137

# Process payment

1138

payment_result = await workflow.execute_activity(

1139

process_payment,

1140

order_id,

1141

schedule_to_close_timeout=timedelta(minutes=10)

1142

)

1143

1144

# Ship order

1145

shipping_result = await workflow.execute_activity(

1146

ship_order,

1147

order_id,

1148

schedule_to_close_timeout=timedelta(days=1)

1149

)

1150

1151

return {

1152

"order_id": order_id,

1153

"payment": payment_result,

1154

"shipping": shipping_result,

1155

"status": "completed"

1156

}

1157

1158

@activity.defn

1159

async def validate_order(order_id: str) -> bool:

1160

# Mock validation logic

1161

activity.heartbeat(f"Validating order {order_id}")

1162

await asyncio.sleep(1)

1163

return True

1164

1165

@activity.defn

1166

async def process_payment(order_id: str) -> str:

1167

activity.heartbeat(f"Processing payment for {order_id}")

1168

await asyncio.sleep(2)

1169

return f"payment-{order_id}"

1170

1171

@activity.defn

1172

async def ship_order(order_id: str) -> str:

1173

activity.heartbeat(f"Shipping order {order_id}")

1174

await asyncio.sleep(5) # Will be skipped in time-skipping tests

1175

return f"tracking-{order_id}"

1176

1177

async def test_order_processing_e2e():

1178

async with await WorkflowEnvironment.start_time_skipping() as env:

1179

async with Worker(

1180

env.client,

1181

task_queue="orders",

1182

workflows=[OrderProcessingWorkflow],

1183

activities=[validate_order, process_payment, ship_order]

1184

) as worker:

1185

result = await env.client.execute_workflow(

1186

OrderProcessingWorkflow.run,

1187

"order-123",

1188

id="order-processing-test",

1189

task_queue="orders"

1190

)

1191

1192

assert result["order_id"] == "order-123"

1193

assert result["status"] == "completed"

1194

assert "payment-order-123" in result["payment"]

1195

assert "tracking-order-123" in result["shipping"]

1196

```

1197

1198

### Testing with External Dependencies

1199

1200

Test workflows that interact with external services:

1201

1202

```python

1203

from unittest.mock import AsyncMock, patch

1204

1205

@activity.defn

1206

async def call_external_api(url: str, data: dict) -> dict:

1207

# In real implementation, this would make HTTP calls

1208

import httpx

1209

async with httpx.AsyncClient() as client:

1210

response = await client.post(url, json=data)

1211

return response.json()

1212

1213

async def test_workflow_with_external_deps():

1214

"""Test workflow with mocked external dependencies."""

1215

1216

# Mock the external HTTP call

1217

mock_response = {"status": "success", "id": "ext-123"}

1218

1219

with patch("httpx.AsyncClient.post") as mock_post:

1220

mock_post.return_value.json.return_value = mock_response

1221

1222

async with await WorkflowEnvironment.start_time_skipping() as env:

1223

async with Worker(

1224

env.client,

1225

task_queue="test-queue",

1226

workflows=[MyWorkflow],

1227

activities=[call_external_api]

1228

) as worker:

1229

result = await env.client.execute_workflow(

1230

MyWorkflow.run,

1231

id="external-test",

1232

task_queue="test-queue"

1233

)

1234

1235

# Verify mock was called

1236

mock_post.assert_called_once()

1237

assert result is not None

1238

```

1239

1240

### Multi-Worker Testing

1241

1242

Test scenarios with multiple workers and task queues:

1243

1244

```python

1245

async def test_multi_worker_scenario():

1246

"""Test workflow spanning multiple workers and task queues."""

1247

1248

async with await WorkflowEnvironment.start_time_skipping() as env:

1249

# Worker for main workflows

1250

async with Worker(

1251

env.client,

1252

task_queue="main-queue",

1253

workflows=[CoordinatorWorkflow]

1254

) as main_worker:

1255

# Worker for processing activities

1256

async with Worker(

1257

env.client,

1258

task_queue="processing-queue",

1259

activities=[process_item, validate_item]

1260

) as processing_worker:

1261

# Worker for notification activities

1262

async with Worker(

1263

env.client,

1264

task_queue="notification-queue",

1265

activities=[send_notification]

1266

) as notification_worker:

1267

1268

result = await env.client.execute_workflow(

1269

CoordinatorWorkflow.run,

1270

["item1", "item2", "item3"],

1271

id="multi-worker-test",

1272

task_queue="main-queue"

1273

)

1274

1275

assert result["processed_count"] == 3

1276

assert result["notifications_sent"] == 3

1277

```

1278

1279

## Testing Configuration

1280

1281

### Custom Test Environments

1282

1283

Configure test environments for specific scenarios:

1284

1285

```python

1286

async def create_test_env_with_custom_config():

1287

"""Create test environment with custom configuration."""

1288

1289

# Custom data converter for testing

1290

custom_converter = DataConverter(

1291

payload_converter_class=JSONPlainPayloadConverter,

1292

failure_converter_class=DefaultFailureConverter

1293

)

1294

1295

# Custom interceptors for testing

1296

test_interceptors = [

1297

LoggingInterceptor(),

1298

MetricsInterceptor()

1299

]

1300

1301

return await WorkflowEnvironment.start_time_skipping(

1302

data_converter=custom_converter,

1303

interceptors=test_interceptors,

1304

runtime=Runtime(telemetry=TelemetryConfig(

1305

logging=LoggingConfig(level="DEBUG")

1306

))

1307

)

1308

```

1309

1310

### Test Isolation Patterns

1311

1312

Ensure test isolation and cleanup:

1313

1314

```python

1315

class TestIsolation:

1316

"""Helper for test isolation."""

1317

1318

def __init__(self):

1319

self.created_workflows = []

1320

self.test_data = {}

1321

1322

def generate_workflow_id(self, prefix: str) -> str:

1323

workflow_id = f"{prefix}-{uuid.uuid4()}"

1324

self.created_workflows.append(workflow_id)

1325

return workflow_id

1326

1327

async def cleanup(self, client: Client):

1328

"""Clean up test resources."""

1329

for workflow_id in self.created_workflows:

1330

try:

1331

handle = client.get_workflow_handle(workflow_id)

1332

await handle.terminate("test cleanup")

1333

except:

1334

pass # Ignore cleanup errors

1335

1336

# Usage in tests

1337

@pytest.fixture

1338

async def test_isolation():

1339

isolation = TestIsolation()

1340

yield isolation

1341

# Cleanup happens in test teardown

1342

1343

async def test_with_isolation(test_isolation):

1344

async with test_environment() as env:

1345

workflow_id = test_isolation.generate_workflow_id("test")

1346

1347

# Run test...

1348

1349

await test_isolation.cleanup(env.client)

1350

```

1351

1352

### Performance Testing Considerations

1353

1354

Basic patterns for performance testing:

1355

1356

```python

1357

import time

1358

from statistics import mean, stdev

1359

1360

async def test_workflow_performance():

1361

"""Basic performance test for workflow execution."""

1362

1363

async with await WorkflowEnvironment.start_time_skipping() as env:

1364

async with Worker(

1365

env.client,

1366

task_queue="perf-test",

1367

workflows=[FastWorkflow],

1368

max_concurrent_workflows=100

1369

) as worker:

1370

1371

# Warm up

1372

await env.client.execute_workflow(

1373

FastWorkflow.run,

1374

id="warmup",

1375

task_queue="perf-test"

1376

)

1377

1378

# Performance test

1379

execution_times = []

1380

for i in range(10):

1381

start_time = time.monotonic()

1382

1383

await env.client.execute_workflow(

1384

FastWorkflow.run,

1385

id=f"perf-{i}",

1386

task_queue="perf-test"

1387

)

1388

1389

execution_times.append(time.monotonic() - start_time)

1390

1391

# Basic performance assertions

1392

avg_time = mean(execution_times)

1393

std_dev = stdev(execution_times)

1394

1395

assert avg_time < 1.0, f"Average execution time too high: {avg_time}"

1396

assert std_dev < 0.5, f"Execution time variance too high: {std_dev}"

1397

```

1398

1399

This comprehensive testing documentation covers all the major aspects of testing Temporal workflows and activities using the temporalio.testing module, providing both basic examples and advanced patterns for thorough testing of distributed workflow applications.