or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

activity.mdclient.mdcommon.mdcontrib-pydantic.mddata-conversion.mdexceptions.mdindex.mdruntime.mdtesting.mdworker.mdworkflow.md

worker.mddocs/

0

# Worker Management

1

2

The temporalio worker module provides comprehensive functionality for processing Temporal workflows and activities. Workers are the engine that executes your distributed application logic, handling task polling, execution, and resource management.

3

4

## Worker Creation and Configuration

5

6

### Worker Class

7

8

The primary worker class handles both workflow and activity execution:

9

10

```python { .api }

11

class Worker:

12

"""Worker to process workflows and/or activities.

13

14

Once created, workers can be run and shutdown explicitly via run()

15

and shutdown(). Alternatively workers can be used in an async with clause.

16

"""

17

18

def __init__(

19

self,

20

client: temporalio.client.Client,

21

*,

22

task_queue: str,

23

activities: Sequence[Callable] = [],

24

nexus_service_handlers: Sequence[Any] = [],

25

workflows: Sequence[Type] = [],

26

activity_executor: Optional[concurrent.futures.Executor] = None,

27

workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None,

28

nexus_task_executor: Optional[concurrent.futures.Executor] = None,

29

workflow_runner: WorkflowRunner = SandboxedWorkflowRunner(),

30

unsandboxed_workflow_runner: WorkflowRunner = UnsandboxedWorkflowRunner(),

31

plugins: Sequence[Plugin] = [],

32

interceptors: Sequence[Interceptor] = [],

33

build_id: Optional[str] = None,

34

identity: Optional[str] = None,

35

max_cached_workflows: int = 1000,

36

max_concurrent_workflow_tasks: Optional[int] = None,

37

max_concurrent_activities: Optional[int] = None,

38

max_concurrent_local_activities: Optional[int] = None,

39

tuner: Optional[WorkerTuner] = None,

40

max_concurrent_workflow_task_polls: Optional[int] = None,

41

nonsticky_to_sticky_poll_ratio: float = 0.2,

42

max_concurrent_activity_task_polls: Optional[int] = None,

43

no_remote_activities: bool = False,

44

sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10),

45

max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60),

46

default_heartbeat_throttle_interval: timedelta = timedelta(seconds=30),

47

max_activities_per_second: Optional[float] = None,

48

max_task_queue_activities_per_second: Optional[float] = None,

49

graceful_shutdown_timeout: timedelta = timedelta(),

50

workflow_failure_exception_types: Sequence[Type[BaseException]] = [],

51

shared_state_manager: Optional[SharedStateManager] = None,

52

debug_mode: bool = False,

53

disable_eager_activity_execution: bool = False,

54

on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]] = None,

55

use_worker_versioning: bool = False,

56

disable_safe_workflow_eviction: bool = False,

57

deployment_config: Optional[WorkerDeploymentConfig] = None,

58

workflow_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(maximum=5),

59

activity_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(maximum=5),

60

nexus_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(maximum=5),

61

) -> None:

62

"""Create a worker to process workflows and/or activities.

63

64

Args:

65

client: Client to use for this worker. This is required and must be

66

the Client instance or have a worker_service_client attribute with

67

reference to the original client's underlying service client.

68

This client cannot be "lazy".

69

task_queue: Required task queue for this worker.

70

activities: Activity callables decorated with @activity.defn.

71

Activities may be async functions or non-async functions.

72

nexus_service_handlers: Instances of Nexus service handler classes

73

decorated with @nexusrpc.handler.service_handler.

74

workflows: Workflow classes decorated with @workflow.defn.

75

activity_executor: Concurrent executor to use for non-async

76

activities. This is required if any activities are non-async.

77

ThreadPoolExecutor is recommended. If this is a

78

ProcessPoolExecutor, all non-async activities must be picklable.

79

workflow_task_executor: Thread pool executor for workflow tasks. If

80

this is not present, a new ThreadPoolExecutor will be

81

created with max_workers set to max_concurrent_workflow_tasks if it is present,

82

or 500 otherwise.

83

nexus_task_executor: Executor to use for non-async

84

Nexus operations. This is required if any operation start methods

85

are non-async def. ThreadPoolExecutor is recommended.

86

workflow_runner: Runner for workflows.

87

unsandboxed_workflow_runner: Runner for workflows that opt-out of

88

sandboxing.

89

plugins: Collection of plugins for this worker. Any plugins already

90

on the client that also implement Plugin are

91

prepended to this list and should not be explicitly given here

92

to avoid running the plugin twice.

93

interceptors: Collection of interceptors for this worker. Any

94

interceptors already on the client that also implement

95

Interceptor are prepended to this list and should

96

not be explicitly given here.

97

build_id: Unique identifier for the current runtime. This is best

98

set as a hash of all code and should change only when code does.

99

If unset, a best-effort identifier is generated.

100

Exclusive with deployment_config.

101

identity: Identity for this worker client. If unset, the client

102

identity is used.

103

max_cached_workflows: If nonzero, workflows will be cached and

104

sticky task queues will be used.

105

max_concurrent_workflow_tasks: Maximum allowed number of workflow

106

tasks that will ever be given to this worker at one time. Mutually exclusive with

107

tuner. Must be set to at least two if max_cached_workflows is nonzero.

108

max_concurrent_activities: Maximum number of activity tasks that

109

will ever be given to the activity worker concurrently. Mutually exclusive with tuner.

110

max_concurrent_local_activities: Maximum number of local activity

111

tasks that will ever be given to the activity worker concurrently. Mutually exclusive with tuner.

112

tuner: Provide a custom WorkerTuner. Mutually exclusive with the

113

max_concurrent_workflow_tasks, max_concurrent_activities, and

114

max_concurrent_local_activities arguments.

115

Defaults to fixed-size 100 slots for each slot kind if unset and none of the

116

max_* arguments are provided.

117

max_concurrent_workflow_task_polls: Maximum number of concurrent

118

poll workflow task requests we will perform at a time on this worker's task queue.

119

Must be set to at least two if max_cached_workflows is nonzero.

120

If set, will override any value passed to workflow_task_poller_behavior.

121

nonsticky_to_sticky_poll_ratio: max_concurrent_workflow_task_polls *

122

this number = the number of max pollers that will be allowed for

123

the nonsticky queue when sticky tasks are enabled.

124

max_concurrent_activity_task_polls: Maximum number of concurrent

125

poll activity task requests we will perform at a time on this

126

worker's task queue.

127

If set, will override any value passed to activity_task_poller_behavior.

128

no_remote_activities: If true, this worker will only handle workflow

129

tasks and local activities, it will not poll for activity tasks.

130

sticky_queue_schedule_to_start_timeout: How long a workflow task is

131

allowed to sit on the sticky queue before it is timed out and

132

moved to the non-sticky queue where it may be picked up by any

133

worker.

134

max_heartbeat_throttle_interval: Longest interval for throttling

135

activity heartbeats.

136

default_heartbeat_throttle_interval: Default interval for throttling

137

activity heartbeats in case per-activity heartbeat timeout is

138

unset. Otherwise, it's the per-activity heartbeat timeout * 0.8.

139

max_activities_per_second: Limits the number of activities per

140

second that this worker will process. The worker will not poll

141

for new activities if by doing so it might receive and execute

142

an activity which would cause it to exceed this limit.

143

max_task_queue_activities_per_second: Sets the maximum number of

144

activities per second the task queue will dispatch, controlled

145

server-side. Note that this only takes effect upon an activity

146

poll request.

147

graceful_shutdown_timeout: Amount of time after shutdown is called

148

that activities are given to complete before their tasks are

149

cancelled.

150

workflow_failure_exception_types: The types of exceptions that, if a

151

workflow-thrown exception extends, will cause the

152

workflow/update to fail instead of suspending the workflow via

153

task failure.

154

shared_state_manager: Used for obtaining cross-process friendly

155

synchronization primitives. This is required for non-async

156

activities where the activity_executor is not a

157

ThreadPoolExecutor. Reuse of these across workers is encouraged.

158

debug_mode: If true, will disable deadlock detection and may disable

159

sandboxing in order to make using a debugger easier. If false

160

but the environment variable TEMPORAL_DEBUG is truthy, this

161

will be set to true.

162

disable_eager_activity_execution: If true, will disable eager

163

activity execution. Eager activity execution is an optimization

164

on some servers that sends activities back to the same worker as

165

the calling workflow if they can run there.

166

on_fatal_error: An async function that can handle a failure before

167

the worker shutdown commences. This cannot stop the shutdown and

168

any exception raised is logged and ignored.

169

use_worker_versioning: If true, the build_id argument must be

170

specified, and this worker opts into the worker versioning

171

feature. This ensures it only receives workflow tasks for

172

workflows which it claims to be compatible with.

173

Exclusive with deployment_config.

174

disable_safe_workflow_eviction: If true, instead of letting the

175

workflow collect its tasks properly, the worker will simply let

176

the Python garbage collector collect the tasks.

177

deployment_config: Deployment config for the worker. Exclusive with build_id and

178

use_worker_versioning.

179

workflow_task_poller_behavior: Specify the behavior of workflow task polling.

180

Defaults to a 5-poller maximum.

181

activity_task_poller_behavior: Specify the behavior of activity task polling.

182

Defaults to a 5-poller maximum.

183

nexus_task_poller_behavior: Specify the behavior of Nexus task polling.

184

Defaults to a 5-poller maximum.

185

"""

186

```

187

188

### WorkerConfig

189

190

TypedDict for worker configuration:

191

192

```python { .api }

193

class WorkerConfig(TypedDict, total=False):

194

"""TypedDict of config originally passed to Worker."""

195

196

client: temporalio.client.Client

197

task_queue: str

198

activities: Sequence[Callable]

199

nexus_service_handlers: Sequence[Any]

200

workflows: Sequence[Type]

201

activity_executor: Optional[concurrent.futures.Executor]

202

workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor]

203

nexus_task_executor: Optional[concurrent.futures.Executor]

204

workflow_runner: WorkflowRunner

205

unsandboxed_workflow_runner: WorkflowRunner

206

interceptors: Sequence[Interceptor]

207

build_id: Optional[str]

208

identity: Optional[str]

209

max_cached_workflows: int

210

max_concurrent_workflow_tasks: Optional[int]

211

max_concurrent_activities: Optional[int]

212

max_concurrent_local_activities: Optional[int]

213

tuner: Optional[WorkerTuner]

214

max_concurrent_workflow_task_polls: Optional[int]

215

nonsticky_to_sticky_poll_ratio: float

216

max_concurrent_activity_task_polls: Optional[int]

217

no_remote_activities: bool

218

sticky_queue_schedule_to_start_timeout: timedelta

219

max_heartbeat_throttle_interval: timedelta

220

default_heartbeat_throttle_interval: timedelta

221

max_activities_per_second: Optional[float]

222

max_task_queue_activities_per_second: Optional[float]

223

graceful_shutdown_timeout: timedelta

224

workflow_failure_exception_types: Sequence[Type[BaseException]]

225

shared_state_manager: Optional[SharedStateManager]

226

debug_mode: bool

227

disable_eager_activity_execution: bool

228

on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]]

229

use_worker_versioning: bool

230

disable_safe_workflow_eviction: bool

231

deployment_config: Optional[WorkerDeploymentConfig]

232

workflow_task_poller_behavior: PollerBehavior

233

activity_task_poller_behavior: PollerBehavior

234

nexus_task_poller_behavior: PollerBehavior

235

```

236

237

### WorkerDeploymentConfig

238

239

Configuration for Worker Versioning feature:

240

241

```python { .api }

242

@dataclass

243

class WorkerDeploymentConfig:

244

"""Options for configuring the Worker Versioning feature.

245

246

WARNING: This is an experimental feature and may change in the future.

247

"""

248

249

version: WorkerDeploymentVersion

250

"""The deployment version information."""

251

252

use_worker_versioning: bool

253

"""Whether to enable worker versioning."""

254

255

default_versioning_behavior: VersioningBehavior = VersioningBehavior.UNSPECIFIED

256

"""Default versioning behavior for workflow tasks."""

257

258

def _to_bridge_worker_deployment_options(self) -> temporalio.bridge.worker.WorkerDeploymentOptions:

259

"""Convert to bridge worker deployment options."""

260

```

261

262

### Basic Worker Usage

263

264

```python

265

import asyncio

266

from temporalio import activity, client, worker, workflow

267

268

# Define activities and workflows

269

@activity.defn

270

async def say_hello(name: str) -> str:

271

return f"Hello, {name}!"

272

273

@workflow.defn

274

class SayHelloWorkflow:

275

@workflow.run

276

async def run(self, name: str) -> str:

277

return await workflow.execute_activity(

278

say_hello,

279

name,

280

schedule_to_close_timeout=timedelta(seconds=60),

281

)

282

283

async def main():

284

# Create client

285

client_instance = await client.Client.connect("localhost:7233")

286

287

# Create worker

288

worker_instance = worker.Worker(

289

client_instance,

290

task_queue="hello-task-queue",

291

workflows=[SayHelloWorkflow],

292

activities=[say_hello],

293

)

294

295

# Run worker

296

async with worker_instance:

297

# Worker runs until context exits

298

await asyncio.sleep(60)

299

300

if __name__ == "__main__":

301

asyncio.run(main())

302

```

303

304

## Worker Execution and Lifecycle

305

306

### Worker.run() Method

307

308

Start the worker and wait for shutdown:

309

310

```python { .api }

311

async def run(self) -> None:

312

"""Run the worker and wait on it to be shut down.

313

314

This will not return until shutdown is complete. This means that

315

activities have all completed after being told to cancel after the

316

graceful timeout period.

317

318

This method will raise if there is a worker fatal error. While

319

shutdown() does not need to be invoked in this case, it is

320

harmless to do so. Otherwise, to shut down this worker, invoke

321

shutdown().

322

323

Technically this worker can be shutdown by issuing a cancel to this

324

async function assuming that it is currently running. A cancel could

325

also cancel the shutdown process. Therefore users are encouraged to use

326

explicit shutdown instead.

327

"""

328

```

329

330

### Worker Properties

331

332

```python { .api }

333

@property

334

def is_running(self) -> bool:

335

"""Whether the worker is running.

336

337

This is only True if the worker has been started and not yet

338

shut down.

339

"""

340

341

@property

342

def is_shutdown(self) -> bool:

343

"""Whether the worker has run and shut down.

344

345

This is only True if the worker was once started and then shutdown.

346

This is not necessarily True after shutdown() is first

347

called because the shutdown process can take a bit.

348

"""

349

```

350

351

### Worker Shutdown

352

353

```python { .api }

354

async def shutdown(self) -> None:

355

"""Initiate a worker shutdown and wait until complete.

356

357

This can be called before the worker has even started and is safe for

358

repeated invocations. It simply sets a marker informing the worker to

359

shut down as it runs.

360

361

This will not return until the worker has completed shutting down.

362

"""

363

```

364

365

### Async Context Manager

366

367

```python { .api }

368

async def __aenter__(self) -> Worker:

369

"""Start the worker and return self for use by async with.

370

371

This is a wrapper around run(). Please review that method.

372

373

This takes a similar approach to asyncio.timeout() in that it

374

will cancel the current task if there is a fatal worker error and raise

375

that error out of the context manager. However, if the inner async code

376

swallows/wraps the CancelledError, the exiting

377

portion of the context manager will not raise the fatal worker error.

378

"""

379

380

async def __aexit__(

381

self,

382

exc_type: Optional[Type[BaseException]],

383

*args

384

) -> None:

385

"""Exit the context manager and shutdown the worker."""

386

```

387

388

### Lifecycle Example

389

390

```python

391

import asyncio

392

from temporalio import worker

393

394

async def explicit_lifecycle():

395

# Create worker

396

worker_instance = worker.Worker(

397

client,

398

task_queue="my-task-queue",

399

workflows=[MyWorkflow],

400

activities=[my_activity],

401

)

402

403

try:

404

# Start worker in background

405

worker_task = asyncio.create_task(worker_instance.run())

406

407

# Do other work

408

await asyncio.sleep(10)

409

410

# Shutdown worker

411

await worker_instance.shutdown()

412

413

# Wait for worker task to complete

414

await worker_task

415

416

except Exception as e:

417

print(f"Worker failed: {e}")

418

await worker_instance.shutdown()

419

420

async def context_manager_lifecycle():

421

# Using async context manager

422

async with worker.Worker(

423

client,

424

task_queue="my-task-queue",

425

workflows=[MyWorkflow],

426

activities=[my_activity],

427

) as worker_instance:

428

# Worker runs until context exits

429

await asyncio.sleep(10)

430

# Worker automatically shuts down here

431

```

432

433

## Workflow and Activity Registration

434

435

### Registration Patterns

436

437

Workers automatically discover and register workflows and activities:

438

439

```python

440

# Direct registration

441

worker_instance = worker.Worker(

442

client,

443

task_queue="my-task-queue",

444

workflows=[WorkflowClass1, WorkflowClass2],

445

activities=[activity_func1, activity_func2, activity_method],

446

)

447

448

# Dynamic registration

449

def discover_workflows():

450

import importlib

451

import inspect

452

453

workflows = []

454

# Dynamically import and discover workflow classes

455

module = importlib.import_module("my_workflows")

456

for name, obj in inspect.getmembers(module, inspect.isclass):

457

if hasattr(obj, "__temporal_workflow_definition__"):

458

workflows.append(obj)

459

return workflows

460

461

def discover_activities():

462

activities = []

463

# Dynamically import and discover activity functions

464

module = importlib.import_module("my_activities")

465

for name, obj in inspect.getmembers(module, inspect.isfunction):

466

if hasattr(obj, "__temporal_activity_definition__"):

467

activities.append(obj)

468

return activities

469

470

# Use discovered workflows and activities

471

worker_instance = worker.Worker(

472

client,

473

task_queue="my-task-queue",

474

workflows=discover_workflows(),

475

activities=discover_activities(),

476

)

477

```

478

479

### Nexus Service Registration

480

481

```python

482

# Nexus service handlers (experimental)

483

import nexusrpc.handler

484

485

@nexusrpc.handler.service_handler

486

class MyNexusService:

487

@nexusrpc.handler.operation_handler

488

async def my_operation(self, input: str) -> str:

489

return f"Processed: {input}"

490

491

worker_instance = worker.Worker(

492

client,

493

task_queue="my-task-queue",

494

nexus_service_handlers=[MyNexusService()],

495

)

496

```

497

498

## Polling and Execution Behavior

499

500

### PollerBehavior Types

501

502

Control how the worker polls for tasks:

503

504

```python { .api }

505

PollerBehavior = Union[PollerBehaviorSimpleMaximum, PollerBehaviorAutoscaling]

506

```

507

508

### PollerBehaviorSimpleMaximum

509

510

Simple fixed maximum poller behavior:

511

512

```python { .api }

513

@dataclass(frozen=True)

514

class PollerBehaviorSimpleMaximum:

515

"""A poller behavior that will attempt to poll as long as a slot is available, up to the

516

provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.

517

"""

518

519

maximum: int = 5

520

"""Maximum number of concurrent pollers."""

521

522

def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:

523

"""Convert to bridge poller behavior."""

524

```

525

526

### PollerBehaviorAutoscaling

527

528

Dynamic autoscaling poller behavior:

529

530

```python { .api }

531

@dataclass(frozen=True)

532

class PollerBehaviorAutoscaling:

533

"""A poller behavior that will automatically scale the number of pollers based on feedback

534

from the server. A slot must be available before beginning polling.

535

"""

536

537

minimum: int = 1

538

"""At least this many poll calls will always be attempted (assuming slots are available)."""

539

540

maximum: int = 100

541

"""At most this many poll calls will ever be open at once. Must be >= minimum."""

542

543

initial: int = 5

544

"""This many polls will be attempted initially before scaling kicks in. Must be between

545

minimum and maximum."""

546

547

def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:

548

"""Convert to bridge poller behavior."""

549

```

550

551

### Poller Configuration Example

552

553

```python

554

from temporalio.worker import (

555

PollerBehaviorSimpleMaximum,

556

PollerBehaviorAutoscaling,

557

Worker

558

)

559

560

# Simple maximum polling

561

worker_simple = Worker(

562

client,

563

task_queue="simple-queue",

564

workflows=[MyWorkflow],

565

workflow_task_poller_behavior=PollerBehaviorSimpleMaximum(maximum=10),

566

activity_task_poller_behavior=PollerBehaviorSimpleMaximum(maximum=5),

567

)

568

569

# Autoscaling polling

570

worker_autoscaling = Worker(

571

client,

572

task_queue="autoscaling-queue",

573

workflows=[MyWorkflow],

574

workflow_task_poller_behavior=PollerBehaviorAutoscaling(

575

minimum=2,

576

maximum=20,

577

initial=5

578

),

579

activity_task_poller_behavior=PollerBehaviorAutoscaling(

580

minimum=1,

581

maximum=10,

582

initial=3

583

),

584

)

585

```

586

587

## Interceptors and Plugins

588

589

### Base Interceptor Class

590

591

```python { .api }

592

class Interceptor:

593

"""Interceptor for workers.

594

595

This should be extended by any worker interceptors.

596

"""

597

598

def intercept_activity(

599

self, next: ActivityInboundInterceptor

600

) -> ActivityInboundInterceptor:

601

"""Method called for intercepting an activity.

602

603

Args:

604

next: The underlying inbound interceptor this interceptor should

605

delegate to.

606

607

Returns:

608

The new interceptor that will be used to for the activity.

609

"""

610

return next

611

612

def workflow_interceptor_class(

613

self, input: WorkflowInterceptorClassInput

614

) -> Optional[Type[WorkflowInboundInterceptor]]:

615

"""Class that will be instantiated and used to intercept workflows.

616

617

This method is called on workflow start. The class must have the same

618

init as WorkflowInboundInterceptor.__init__. The input can be

619

altered to do things like add additional extern functions.

620

621

Args:

622

input: Input to this method that contains mutable properties that

623

can be altered by this interceptor.

624

625

Returns:

626

The class to construct to intercept each workflow.

627

"""

628

return None

629

```

630

631

### Activity Interceptors

632

633

#### ActivityInboundInterceptor

634

635

```python { .api }

636

class ActivityInboundInterceptor:

637

"""Inbound interceptor to wrap outbound creation and activity execution.

638

639

This should be extended by any activity inbound interceptors.

640

"""

641

642

def __init__(self, next: ActivityOutboundInterceptor) -> None:

643

"""Create activity inbound interceptor."""

644

self.next = next

645

646

async def execute_activity(self, input: ExecuteActivityInput) -> Any:

647

"""Called when executing an activity.

648

649

Args:

650

input: Activity execution input.

651

652

Returns:

653

Result of activity execution.

654

"""

655

return await self.next.execute_activity(input)

656

```

657

658

#### ActivityOutboundInterceptor

659

660

```python { .api }

661

class ActivityOutboundInterceptor:

662

"""Outbound interceptor for activities.

663

664

This should be extended by any activity outbound interceptors.

665

"""

666

667

async def execute_activity(self, input: ExecuteActivityInput) -> Any:

668

"""Execute an activity.

669

670

Args:

671

input: Activity execution input.

672

673

Returns:

674

Result of activity execution.

675

"""

676

```

677

678

### Workflow Interceptors

679

680

#### WorkflowInboundInterceptor

681

682

```python { .api }

683

class WorkflowInboundInterceptor:

684

"""Inbound interceptor to wrap outbound calls and workflow execution.

685

686

This should be extended by any workflow inbound interceptors.

687

"""

688

689

def __init__(self, next: WorkflowOutboundInterceptor) -> None:

690

"""Create workflow inbound interceptor."""

691

self.next = next

692

693

async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:

694

"""Called when executing a workflow.

695

696

Args:

697

input: Workflow execution input.

698

699

Returns:

700

Result of workflow execution.

701

"""

702

return await self.next.execute_workflow(input)

703

704

async def handle_signal(self, input: HandleSignalInput) -> None:

705

"""Called when handling a signal."""

706

await self.next.handle_signal(input)

707

708

async def handle_query(self, input: HandleQueryInput) -> Any:

709

"""Called when handling a query."""

710

return await self.next.handle_query(input)

711

712

async def handle_update(self, input: HandleUpdateInput) -> Any:

713

"""Called when handling an update."""

714

return await self.next.handle_update(input)

715

```

716

717

#### WorkflowOutboundInterceptor

718

719

```python { .api }

720

class WorkflowOutboundInterceptor:

721

"""Outbound interceptor for workflows.

722

723

This should be extended by any workflow outbound interceptors.

724

"""

725

726

async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:

727

"""Execute a workflow."""

728

729

async def start_activity(self, input: StartActivityInput) -> ActivityHandle:

730

"""Start an activity."""

731

732

async def start_local_activity(self, input: StartLocalActivityInput) -> ActivityHandle:

733

"""Start a local activity."""

734

735

async def start_child_workflow(self, input: StartChildWorkflowInput) -> ChildWorkflowHandle:

736

"""Start a child workflow."""

737

```

738

739

### Input Classes for Interceptors

740

741

```python { .api }

742

@dataclass

743

class ExecuteActivityInput:

744

"""Input for ActivityInboundInterceptor.execute_activity."""

745

746

fn: Callable[..., Any]

747

"""The activity function to execute."""

748

749

args: Sequence[Any]

750

"""Arguments to pass to the activity function."""

751

752

executor: Optional[concurrent.futures.Executor]

753

"""Executor to run the activity in."""

754

755

headers: Mapping[str, temporalio.api.common.v1.Payload]

756

"""Headers for the activity."""

757

758

@dataclass

759

class ExecuteWorkflowInput:

760

"""Input for WorkflowInboundInterceptor.execute_workflow."""

761

762

run_fn: Callable[..., Awaitable[Any]]

763

"""The workflow run function."""

764

765

args: Sequence[Any]

766

"""Arguments to pass to the workflow run function."""

767

768

@dataclass

769

class HandleSignalInput:

770

"""Input for WorkflowInboundInterceptor.handle_signal."""

771

772

signal: str

773

"""Name of the signal."""

774

775

args: Sequence[Any]

776

"""Arguments for the signal."""

777

778

@dataclass

779

class HandleQueryInput:

780

"""Input for WorkflowInboundInterceptor.handle_query."""

781

782

query: str

783

"""Name of the query."""

784

785

args: Sequence[Any]

786

"""Arguments for the query."""

787

788

@dataclass

789

class HandleUpdateInput:

790

"""Input for WorkflowInboundInterceptor.handle_update."""

791

792

update: str

793

"""Name of the update."""

794

795

args: Sequence[Any]

796

"""Arguments for the update."""

797

798

@dataclass

799

class StartActivityInput:

800

"""Input for WorkflowOutboundInterceptor.start_activity."""

801

802

activity: str

803

"""Activity name or function."""

804

805

args: Sequence[Any]

806

"""Arguments to pass to the activity."""

807

808

activity_id: Optional[str]

809

"""ID for the activity."""

810

811

task_queue: Optional[str]

812

"""Task queue for the activity."""

813

814

@dataclass

815

class StartChildWorkflowInput:

816

"""Input for WorkflowOutboundInterceptor.start_child_workflow."""

817

818

workflow: str

819

"""Workflow name or class."""

820

821

args: Sequence[Any]

822

"""Arguments to pass to the workflow."""

823

824

id: Optional[str]

825

"""ID for the child workflow."""

826

827

task_queue: Optional[str]

828

"""Task queue for the child workflow."""

829

830

@dataclass

831

class WorkflowInterceptorClassInput:

832

"""Input for Interceptor.workflow_interceptor_class."""

833

834

unsafe_extern_functions: MutableMapping[str, Callable]

835

"""Set of external functions that can be called from the sandbox.

836

837

WARNING: Exposing external functions to the workflow sandbox is dangerous and

838

should be avoided. Use at your own risk.

839

"""

840

```

841

842

### Plugin System

843

844

```python { .api }

845

class Plugin:

846

"""Plugin base class.

847

848

Plugins can be used to extend worker functionality and are applied

849

during worker initialization.

850

"""

851

852

def init_worker_plugin(self, root_plugin: Plugin) -> None:

853

"""Initialize the plugin with the root plugin."""

854

pass

855

856

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:

857

"""Configure worker settings.

858

859

Args:

860

config: The worker configuration to modify.

861

862

Returns:

863

Modified worker configuration.

864

"""

865

return config

866

867

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:

868

"""Configure replayer settings.

869

870

Args:

871

config: The replayer configuration to modify.

872

873

Returns:

874

Modified replayer configuration.

875

"""

876

return config

877

```

878

879

### Interceptor Example

880

881

```python

882

from temporalio.worker import Interceptor, ActivityInboundInterceptor, ExecuteActivityInput

883

import logging

884

885

class LoggingInterceptor(Interceptor):

886

"""Example interceptor that logs activity execution."""

887

888

def intercept_activity(self, next: ActivityInboundInterceptor) -> ActivityInboundInterceptor:

889

return LoggingActivityInboundInterceptor(next)

890

891

class LoggingActivityInboundInterceptor(ActivityInboundInterceptor):

892

async def execute_activity(self, input: ExecuteActivityInput) -> Any:

893

activity_name = getattr(input.fn, '__temporal_activity_definition__', {}).get('name', str(input.fn))

894

895

logging.info(f"Starting activity: {activity_name}")

896

try:

897

result = await super().execute_activity(input)

898

logging.info(f"Completed activity: {activity_name}")

899

return result

900

except Exception as e:

901

logging.error(f"Activity failed: {activity_name}, error: {e}")

902

raise

903

904

# Use the interceptor

905

worker_instance = worker.Worker(

906

client,

907

task_queue="my-task-queue",

908

workflows=[MyWorkflow],

909

activities=[my_activity],

910

interceptors=[LoggingInterceptor()],

911

)

912

```

913

914

## Worker Tuning and Resource Management

915

916

### WorkerTuner

917

918

Base class for worker tuning:

919

920

```python { .api }

921

class WorkerTuner(ABC):

922

"""WorkerTuners allow for the dynamic customization of some aspects of worker configuration"""

923

924

@staticmethod

925

def create_resource_based(

926

*,

927

target_memory_usage: float,

928

target_cpu_usage: float,

929

workflow_config: Optional[ResourceBasedSlotConfig] = None,

930

activity_config: Optional[ResourceBasedSlotConfig] = None,

931

local_activity_config: Optional[ResourceBasedSlotConfig] = None,

932

) -> "WorkerTuner":

933

"""Create a resource-based tuner with the provided options."""

934

935

@staticmethod

936

def create_fixed(

937

*,

938

workflow_slots: Optional[int],

939

activity_slots: Optional[int],

940

local_activity_slots: Optional[int],

941

) -> "WorkerTuner":

942

"""Create a fixed-size tuner with the provided number of slots. Any unspecified slots will default to 100."""

943

944

@staticmethod

945

def create_composite(

946

*,

947

workflow_supplier: SlotSupplier,

948

activity_supplier: SlotSupplier,

949

local_activity_supplier: SlotSupplier,

950

) -> "WorkerTuner":

951

"""Create a tuner composed of the provided slot suppliers."""

952

953

@abstractmethod

954

def _get_workflow_task_slot_supplier(self) -> SlotSupplier:

955

"""Get the slot supplier for workflow tasks."""

956

957

@abstractmethod

958

def _get_activity_task_slot_supplier(self) -> SlotSupplier:

959

"""Get the slot supplier for activity tasks."""

960

961

@abstractmethod

962

def _get_local_activity_task_slot_supplier(self) -> SlotSupplier:

963

"""Get the slot supplier for local activity tasks."""

964

```

965

966

### Slot Suppliers

967

968

#### FixedSizeSlotSupplier

969

970

```python { .api }

971

@dataclass(frozen=True)

972

class FixedSizeSlotSupplier:

973

"""A fixed-size slot supplier that will never issue more than a fixed number of slots."""

974

975

num_slots: int

976

"""The maximum number of slots that can be issued"""

977

```

978

979

#### ResourceBasedSlotSupplier

980

981

```python { .api }

982

@dataclass(frozen=True)

983

class ResourceBasedSlotSupplier:

984

"""A slot supplier that will dynamically adjust the number of slots based on resource usage.

985

986

WARNING: The resource based tuner is currently experimental.

987

"""

988

989

slot_config: ResourceBasedSlotConfig

990

"""Configuration for this slot supplier."""

991

992

tuner_config: ResourceBasedTunerConfig

993

"""Options for the tuner that will be used to adjust the number of slots. When used with a

994

CompositeTuner, all resource-based slot suppliers must use the same tuner options."""

995

996

@dataclass(frozen=True)

997

class ResourceBasedTunerConfig:

998

"""Options for a ResourceBasedTuner or a ResourceBasedSlotSupplier.

999

1000

WARNING: The resource based tuner is currently experimental.

1001

"""

1002

1003

target_memory_usage: float

1004

"""A value between 0 and 1 that represents the target (system) memory usage. It's not recommended

1005

to set this higher than 0.8, since how much memory a workflow may use is not predictable, and

1006

you don't want to encounter OOM errors."""

1007

1008

target_cpu_usage: float

1009

"""A value between 0 and 1 that represents the target (system) CPU usage. This can be set to 1.0

1010

if desired, but it's recommended to leave some headroom for other processes."""

1011

1012

@dataclass(frozen=True)

1013

class ResourceBasedSlotConfig:

1014

"""Options for a specific slot type being used with a ResourceBasedSlotSupplier.

1015

1016

WARNING: The resource based tuner is currently experimental.

1017

"""

1018

1019

minimum_slots: Optional[int] = None

1020

"""Amount of slots that will be issued regardless of any other checks. Defaults to 5 for workflows and 1 for

1021

activities."""

1022

1023

maximum_slots: Optional[int] = None

1024

"""Maximum amount of slots permitted. Defaults to 500."""

1025

1026

ramp_throttle: Optional[timedelta] = None

1027

"""Minimum time we will wait (after passing the minimum slots number) between handing out new slots in milliseconds.

1028

Defaults to 0 for workflows and 50ms for activities.

1029

1030

This value matters because how many resources a task will use cannot be determined ahead of time, and thus the

1031

system should wait to see how much resources are used before issuing more slots."""

1032

```

1033

1034

#### CustomSlotSupplier

1035

1036

```python { .api }

1037

class CustomSlotSupplier(ABC):

1038

"""This class can be implemented to provide custom slot supplier behavior.

1039

1040

WARNING: Custom slot suppliers are currently experimental.

1041

"""

1042

1043

@abstractmethod

1044

async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:

1045

"""This function is called before polling for new tasks. Your implementation must block until a

1046

slot is available then return a permit to use that slot.

1047

1048

The only acceptable exception to throw is asyncio.CancelledError, as invocations of this method may

1049

be cancelled. Any other exceptions thrown will be logged and ignored.

1050

1051

Args:

1052

ctx: The context for slot reservation.

1053

1054

Returns:

1055

A permit to use the slot which may be populated with your own data.

1056

"""

1057

1058

@abstractmethod

1059

def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]:

1060

"""This function is called when trying to reserve slots for "eager" workflow and activity tasks.

1061

Eager tasks are those which are returned as a result of completing a workflow task, rather than

1062

from polling. Your implementation must not block, and if a slot is available, return a permit

1063

to use that slot.

1064

1065

Args:

1066

ctx: The context for slot reservation.

1067

1068

Returns:

1069

Maybe a permit to use the slot which may be populated with your own data.

1070

"""

1071

1072

@abstractmethod

1073

def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:

1074

"""This function is called once a slot is actually being used to process some task, which may be

1075

some time after the slot was reserved originally.

1076

1077

Args:

1078

ctx: The context for marking a slot as used.

1079

"""

1080

1081

@abstractmethod

1082

def release_slot(self, ctx: SlotReleaseContext) -> None:

1083

"""This function is called once a permit is no longer needed.

1084

1085

Args:

1086

ctx: The context for releasing a slot.

1087

"""

1088

```

1089

1090

### Slot Management Classes

1091

1092

```python { .api }

1093

class SlotPermit:

1094

"""A permit to use a slot for a workflow/activity/local activity task.

1095

1096

You can inherit from this class to add your own data to the permit.

1097

1098

WARNING: Custom slot suppliers are currently experimental.

1099

"""

1100

1101

class SlotReserveContext(Protocol):

1102

"""Context for reserving a slot from a CustomSlotSupplier.

1103

1104

WARNING: Custom slot suppliers are currently experimental.

1105

"""

1106

1107

slot_type: Literal["workflow", "activity", "local-activity"]

1108

"""The type of slot trying to be reserved. Always one of "workflow", "activity", or "local-activity"."""

1109

1110

task_queue: str

1111

"""The name of the task queue for which this reservation request is associated."""

1112

1113

worker_identity: str

1114

"""The identity of the worker that is requesting the reservation."""

1115

1116

worker_build_id: str

1117

"""The build id of the worker that is requesting the reservation."""

1118

1119

worker_deployment_version: Optional[WorkerDeploymentVersion]

1120

"""The deployment version of the worker that is requesting the reservation, if any."""

1121

1122

is_sticky: bool

1123

"""True iff this is a reservation for a sticky poll for a workflow task."""

1124

1125

@dataclass(frozen=True)

1126

class SlotMarkUsedContext(Protocol):

1127

"""Context for marking a slot used from a CustomSlotSupplier.

1128

1129

WARNING: Custom slot suppliers are currently experimental.

1130

"""

1131

1132

slot_info: SlotInfo

1133

"""Info about the task that will be using the slot."""

1134

1135

permit: SlotPermit

1136

"""The permit that was issued when the slot was reserved."""

1137

1138

@dataclass(frozen=True)

1139

class SlotReleaseContext:

1140

"""Context for releasing a slot from a CustomSlotSupplier.

1141

1142

WARNING: Custom slot suppliers are currently experimental.

1143

"""

1144

1145

slot_info: Optional[SlotInfo]

1146

"""Info about the task that will be using the slot. May be None if the slot was never used."""

1147

1148

permit: SlotPermit

1149

"""The permit that was issued when the slot was reserved."""

1150

```

1151

1152

### Slot Info Types

1153

1154

```python { .api }

1155

@runtime_checkable

1156

class WorkflowSlotInfo(Protocol):

1157

"""Info about a workflow task slot usage.

1158

1159

WARNING: Custom slot suppliers are currently experimental.

1160

"""

1161

1162

workflow_type: str

1163

is_sticky: bool

1164

1165

@runtime_checkable

1166

class ActivitySlotInfo(Protocol):

1167

"""Info about an activity task slot usage.

1168

1169

WARNING: Custom slot suppliers are currently experimental.

1170

"""

1171

1172

activity_type: str

1173

1174

@runtime_checkable

1175

class LocalActivitySlotInfo(Protocol):

1176

"""Info about a local activity task slot usage.

1177

1178

WARNING: Custom slot suppliers are currently experimental.

1179

"""

1180

1181

activity_type: str

1182

1183

SlotInfo = Union[WorkflowSlotInfo, ActivitySlotInfo, LocalActivitySlotInfo]

1184

```

1185

1186

### Tuning Examples

1187

1188

```python

1189

from temporalio.worker import WorkerTuner, ResourceBasedSlotConfig

1190

1191

# Fixed-size tuning

1192

fixed_tuner = WorkerTuner.create_fixed(

1193

workflow_slots=50,

1194

activity_slots=100,

1195

local_activity_slots=200,

1196

)

1197

1198

# Resource-based tuning

1199

resource_tuner = WorkerTuner.create_resource_based(

1200

target_memory_usage=0.7, # Target 70% memory usage

1201

target_cpu_usage=0.8, # Target 80% CPU usage

1202

workflow_config=ResourceBasedSlotConfig(

1203

minimum_slots=5,

1204

maximum_slots=100,

1205

ramp_throttle=timedelta(milliseconds=0),

1206

),

1207

activity_config=ResourceBasedSlotConfig(

1208

minimum_slots=10,

1209

maximum_slots=500,

1210

ramp_throttle=timedelta(milliseconds=50),

1211

),

1212

)

1213

1214

# Custom slot supplier

1215

class CustomSlotSupplierImpl(CustomSlotSupplier):

1216

def __init__(self, max_slots: int):

1217

self._max_slots = max_slots

1218

self._current_slots = 0

1219

self._lock = asyncio.Lock()

1220

self._condition = asyncio.Condition(self._lock)

1221

1222

async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:

1223

async with self._condition:

1224

while self._current_slots >= self._max_slots:

1225

await self._condition.wait()

1226

1227

self._current_slots += 1

1228

return SlotPermit()

1229

1230

def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]:

1231

if self._current_slots < self._max_slots:

1232

self._current_slots += 1

1233

return SlotPermit()

1234

return None

1235

1236

def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:

1237

# Track slot usage if needed

1238

pass

1239

1240

def release_slot(self, ctx: SlotReleaseContext) -> None:

1241

async def release():

1242

async with self._condition:

1243

self._current_slots -= 1

1244

self._condition.notify()

1245

1246

# Schedule release in event loop

1247

asyncio.create_task(release())

1248

1249

# Use custom tuner

1250

custom_tuner = WorkerTuner.create_composite(

1251

workflow_supplier=CustomSlotSupplierImpl(max_slots=20),

1252

activity_supplier=CustomSlotSupplierImpl(max_slots=50),

1253

local_activity_supplier=CustomSlotSupplierImpl(max_slots=100),

1254

)

1255

1256

# Apply tuner to worker

1257

worker_instance = Worker(

1258

client,

1259

task_queue="tuned-queue",

1260

workflows=[MyWorkflow],

1261

activities=[my_activity],

1262

tuner=resource_tuner,

1263

)

1264

```

1265

1266

## Workflow Replay and Testing

1267

1268

### Replayer Class

1269

1270

Test workflow compatibility with historical executions:

1271

1272

```python { .api }

1273

class Replayer:

1274

"""Replayer to replay workflows from history."""

1275

1276

def __init__(

1277

self,

1278

*,

1279

workflows: Sequence[Type],

1280

workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None,

1281

workflow_runner: WorkflowRunner = SandboxedWorkflowRunner(),

1282

unsandboxed_workflow_runner: WorkflowRunner = UnsandboxedWorkflowRunner(),

1283

namespace: str = "ReplayNamespace",

1284

data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,

1285

interceptors: Sequence[Interceptor] = [],

1286

plugins: Sequence[temporalio.worker.Plugin] = [],

1287

build_id: Optional[str] = None,

1288

identity: Optional[str] = None,

1289

workflow_failure_exception_types: Sequence[Type[BaseException]] = [],

1290

debug_mode: bool = False,

1291

runtime: Optional[temporalio.runtime.Runtime] = None,

1292

disable_safe_workflow_eviction: bool = False,

1293

header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC,

1294

) -> None:

1295

"""Create a replayer to replay workflows from history.

1296

1297

Most of the same arguments need to be passed to the replayer that were passed

1298

to the worker when the workflow originally ran.

1299

1300

Note, unlike the worker, for the replayer the workflow_task_executor

1301

will default to a new thread pool executor with no max_workers set that

1302

will be shared across all replay calls and never explicitly shut down.

1303

Users are encouraged to provide their own if needing more control.

1304

"""

1305

1306

def config(self) -> ReplayerConfig:

1307

"""Config, as a dictionary, used to create this replayer.

1308

1309

Returns:

1310

Configuration, shallow-copied.

1311

"""

1312

1313

async def replay_workflow(

1314

self,

1315

history: temporalio.client.WorkflowHistory,

1316

*,

1317

raise_on_replay_failure: bool = True,

1318

) -> WorkflowReplayResult:

1319

"""Replay a workflow for the given history.

1320

1321

Args:

1322

history: The history to replay. Can be fetched directly, or use

1323

WorkflowHistory.from_json to parse a history downloaded via

1324

tctl or the web UI.

1325

raise_on_replay_failure: If True (the default), this will raise

1326

a WorkflowReplayResult.replay_failure if it is present.

1327

"""

1328

1329

async def replay_workflows(

1330

self,

1331

histories: AsyncIterator[temporalio.client.WorkflowHistory],

1332

*,

1333

raise_on_replay_failure: bool = True,

1334

) -> WorkflowReplayResults:

1335

"""Replay workflows for the given histories.

1336

1337

This is a shortcut for workflow_replay_iterator that iterates

1338

all results and aggregates information about them.

1339

1340

Args:

1341

histories: The histories to replay, from an async iterator.

1342

raise_on_replay_failure: If True (the default), this will raise

1343

the first replay failure seen.

1344

"""

1345

1346

@asynccontextmanager

1347

async def workflow_replay_iterator(

1348

self,

1349

histories: AsyncIterator[temporalio.client.WorkflowHistory],

1350

) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:

1351

"""Create an async context manager for replaying workflows.

1352

1353

Args:

1354

histories: The histories to replay, from an async iterator.

1355

1356

Returns:

1357

Async context manager that yields an iterator of replay results.

1358

"""

1359

```

1360

1361

### ReplayerConfig

1362

1363

```python { .api }

1364

class ReplayerConfig(TypedDict, total=False):

1365

"""TypedDict of config originally passed to Replayer."""

1366

1367

workflows: Sequence[Type]

1368

workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor]

1369

workflow_runner: WorkflowRunner

1370

unsandboxed_workflow_runner: WorkflowRunner

1371

namespace: str

1372

data_converter: temporalio.converter.DataConverter

1373

interceptors: Sequence[Interceptor]

1374

build_id: Optional[str]

1375

identity: Optional[str]

1376

workflow_failure_exception_types: Sequence[Type[BaseException]]

1377

debug_mode: bool

1378

runtime: Optional[temporalio.runtime.Runtime]

1379

disable_safe_workflow_eviction: bool

1380

header_codec_behavior: HeaderCodecBehavior

1381

```

1382

1383

### WorkflowReplayResult

1384

1385

```python { .api }

1386

@dataclass(frozen=True)

1387

class WorkflowReplayResult:

1388

"""Single workflow replay result."""

1389

1390

history: temporalio.client.WorkflowHistory

1391

"""History originally passed for this workflow replay."""

1392

1393

replay_failure: Optional[Exception]

1394

"""Failure during replay if any.

1395

1396

This does not mean your workflow exited by raising an error, but rather that

1397

some task failure such as NondeterminismError was encountered during

1398

replay - likely indicating your workflow code is incompatible with the

1399

history.

1400

"""

1401

```

1402

1403

### WorkflowReplayResults

1404

1405

```python { .api }

1406

@dataclass(frozen=True)

1407

class WorkflowReplayResults:

1408

"""Results of replaying multiple workflows."""

1409

1410

replay_failures: Mapping[str, Exception]

1411

"""Replay failures, keyed by run ID."""

1412

```

1413

1414

### Replay Testing Example

1415

1416

```python

1417

import asyncio

1418

from temporalio import client

1419

from temporalio.worker import Replayer, WorkflowReplayResult

1420

1421

async def test_workflow_replay():

1422

# Create client to fetch workflow history

1423

client_instance = await client.Client.connect("localhost:7233")

1424

1425

# Get workflow history

1426

workflow_handle = client_instance.get_workflow_handle("my-workflow-id")

1427

history = await workflow_handle.fetch_history()

1428

1429

# Create replayer with current workflow implementation

1430

replayer = Replayer(

1431

workflows=[MyWorkflow], # Current version of workflow

1432

namespace="my-namespace",

1433

)

1434

1435

# Test replay compatibility

1436

try:

1437

result = await replayer.replay_workflow(history)

1438

print("Replay successful - workflow is compatible")

1439

return True

1440

except Exception as e:

1441

print(f"Replay failed - workflow incompatible: {e}")

1442

return False

1443

1444

async def test_multiple_workflows():

1445

# Create replayer

1446

replayer = Replayer(workflows=[MyWorkflow])

1447

1448

# Create async iterator of histories

1449

async def history_generator():

1450

for workflow_id in ["wf-1", "wf-2", "wf-3"]:

1451

handle = client_instance.get_workflow_handle(workflow_id)

1452

history = await handle.fetch_history()

1453

yield history

1454

1455

# Replay all workflows

1456

results = await replayer.replay_workflows(

1457

history_generator(),

1458

raise_on_replay_failure=False # Don't raise on first failure

1459

)

1460

1461

if results.replay_failures:

1462

print(f"Replay failures: {len(results.replay_failures)}")

1463

for run_id, error in results.replay_failures.items():

1464

print(f" {run_id}: {error}")

1465

else:

1466

print("All replays successful")

1467

1468

# Test with history from JSON file

1469

async def test_replay_from_json():

1470

# Load history from tctl export or web UI download

1471

with open("workflow_history.json", "r") as f:

1472

history_json = f.read()

1473

1474

history = temporalio.client.WorkflowHistory.from_json(

1475

workflow_id="my-workflow-id",

1476

json_str=history_json

1477

)

1478

1479

replayer = Replayer(workflows=[MyWorkflow])

1480

result = await replayer.replay_workflow(history)

1481

1482

return result.replay_failure is None

1483

```

1484

1485

## Advanced Worker Features

1486

1487

### Shared State Management

1488

1489

Cross-process synchronization for non-async activities:

1490

1491

```python { .api }

1492

class SharedStateManager:

1493

"""Used for obtaining cross-process friendly synchronization primitives.

1494

1495

This is required for non-async activities where the activity_executor

1496

is not a ThreadPoolExecutor. Reuse of these across workers is encouraged.

1497

"""

1498

```

1499

1500

### Shared Heartbeat Sender

1501

1502

Coordinated heartbeat management:

1503

1504

```python { .api }

1505

class SharedHeartbeatSender:

1506

"""Shared heartbeat sender for activities.

1507

1508

Used to coordinate heartbeat sending across multiple workers or processes.

1509

"""

1510

```

1511

1512

### Workflow Runners

1513

1514

#### WorkflowRunner

1515

1516

Base class for workflow execution:

1517

1518

```python { .api }

1519

class WorkflowRunner:

1520

"""Base class for workflow runners."""

1521

```

1522

1523

#### SandboxedWorkflowRunner

1524

1525

Default sandboxed workflow execution:

1526

1527

```python { .api }

1528

class SandboxedWorkflowRunner(WorkflowRunner):

1529

"""Sandboxed workflow runner.

1530

1531

Provides isolation and deterministic execution for workflows.

1532

"""

1533

```

1534

1535

#### UnsandboxedWorkflowRunner

1536

1537

Unsandboxed workflow execution for special cases:

1538

1539

```python { .api }

1540

class UnsandboxedWorkflowRunner(WorkflowRunner):

1541

"""Unsandboxed workflow runner.

1542

1543

Allows workflows to opt-out of sandboxing restrictions.

1544

WARNING: Use with caution as this can break workflow determinism.

1545

"""

1546

```

1547

1548

### Workflow Instance Management

1549

1550

```python { .api }

1551

class WorkflowInstance:

1552

"""Workflow instance representation."""

1553

1554

class WorkflowInstanceDetails:

1555

"""Detailed information about a workflow instance."""

1556

```

1557

1558

### Build ID Management

1559

1560

```python { .api }

1561

def load_default_build_id(*, memoize: bool = True) -> str:

1562

"""Load the default worker build ID.

1563

1564

The worker build ID is a unique hash representing the entire set of code

1565

including Temporal code and external code. The default here is currently

1566

implemented by walking loaded modules and hashing their bytecode into a

1567

common hash.

1568

1569

Args:

1570

memoize: If true, the default, this will cache to a global variable to

1571

keep from having to run again on successive calls.

1572

1573

Returns:

1574

Unique identifier representing the set of running code.

1575

"""

1576

```

1577

1578

### Advanced Configuration Example

1579

1580

```python

1581

from temporalio import worker, client

1582

from temporalio.worker import (

1583

SharedStateManager,

1584

UnsandboxedWorkflowRunner,

1585

PollerBehaviorAutoscaling,

1586

WorkerTuner

1587

)

1588

import concurrent.futures

1589

from datetime import timedelta

1590

1591

async def create_advanced_worker():

1592

client_instance = await client.Client.connect("localhost:7233")

1593

1594

# Custom executors

1595

activity_executor = concurrent.futures.ThreadPoolExecutor(

1596

max_workers=50,

1597

thread_name_prefix="activity-"

1598

)

1599

1600

workflow_executor = concurrent.futures.ThreadPoolExecutor(

1601

max_workers=20,

1602

thread_name_prefix="workflow-"

1603

)

1604

1605

# Resource-based tuning

1606

tuner = WorkerTuner.create_resource_based(

1607

target_memory_usage=0.75,

1608

target_cpu_usage=0.85,

1609

)

1610

1611

# Custom error handler

1612

async def handle_fatal_error(error: Exception):

1613

print(f"Worker fatal error: {error}")

1614

# Could send alerts, write to log files, etc.

1615

1616

# Advanced worker configuration

1617

worker_instance = worker.Worker(

1618

client_instance,

1619

task_queue="advanced-task-queue",

1620

workflows=[MyWorkflow, MyOtherWorkflow],

1621

activities=[my_activity, my_other_activity],

1622

activity_executor=activity_executor,

1623

workflow_task_executor=workflow_executor,

1624

workflow_runner=worker.workflow_sandbox.SandboxedWorkflowRunner(),

1625

unsandboxed_workflow_runner=UnsandboxedWorkflowRunner(),

1626

tuner=tuner,

1627

max_cached_workflows=2000,

1628

workflow_task_poller_behavior=PollerBehaviorAutoscaling(

1629

minimum=3,

1630

maximum=30,

1631

initial=10

1632

),

1633

activity_task_poller_behavior=PollerBehaviorAutoscaling(

1634

minimum=5,

1635

maximum=50,

1636

initial=15

1637

),

1638

sticky_queue_schedule_to_start_timeout=timedelta(seconds=30),

1639

max_heartbeat_throttle_interval=timedelta(seconds=120),

1640

default_heartbeat_throttle_interval=timedelta(seconds=45),

1641

max_activities_per_second=100.0,

1642

graceful_shutdown_timeout=timedelta(seconds=30),

1643

shared_state_manager=SharedStateManager(),

1644

debug_mode=False,

1645

disable_eager_activity_execution=False,

1646

on_fatal_error=handle_fatal_error,

1647

use_worker_versioning=True,

1648

build_id="my-app-v1.2.3-abc123",

1649

interceptors=[MyLoggingInterceptor(), MyMetricsInterceptor()],

1650

)

1651

1652

return worker_instance

1653

1654

# Usage

1655

async def main():

1656

worker_instance = await create_advanced_worker()

1657

1658

try:

1659

print("Starting advanced worker...")

1660

await worker_instance.run()

1661

except KeyboardInterrupt:

1662

print("Shutting down worker...")

1663

await worker_instance.shutdown()

1664

finally:

1665

print("Worker shutdown complete")

1666

1667

if __name__ == "__main__":

1668

asyncio.run(main())

1669

```

1670

1671

The Worker Management module provides comprehensive control over workflow and activity execution in Temporal applications. From basic worker setup to advanced tuning, interceptors, and replay testing, this module gives you the tools to build robust, scalable distributed systems with fine-grained control over resource management and execution behavior.