0
# Worker Management
1
2
The temporalio worker module provides comprehensive functionality for processing Temporal workflows and activities. Workers are the engine that executes your distributed application logic, handling task polling, execution, and resource management.
3
4
## Worker Creation and Configuration
5
6
### Worker Class
7
8
The primary worker class handles both workflow and activity execution:
9
10
```python { .api }
11
class Worker:
12
"""Worker to process workflows and/or activities.
13
14
Once created, workers can be run and shutdown explicitly via run()
15
and shutdown(). Alternatively workers can be used in an async with clause.
16
"""
17
18
def __init__(
19
self,
20
client: temporalio.client.Client,
21
*,
22
task_queue: str,
23
activities: Sequence[Callable] = [],
24
nexus_service_handlers: Sequence[Any] = [],
25
workflows: Sequence[Type] = [],
26
activity_executor: Optional[concurrent.futures.Executor] = None,
27
workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None,
28
nexus_task_executor: Optional[concurrent.futures.Executor] = None,
29
workflow_runner: WorkflowRunner = SandboxedWorkflowRunner(),
30
unsandboxed_workflow_runner: WorkflowRunner = UnsandboxedWorkflowRunner(),
31
plugins: Sequence[Plugin] = [],
32
interceptors: Sequence[Interceptor] = [],
33
build_id: Optional[str] = None,
34
identity: Optional[str] = None,
35
max_cached_workflows: int = 1000,
36
max_concurrent_workflow_tasks: Optional[int] = None,
37
max_concurrent_activities: Optional[int] = None,
38
max_concurrent_local_activities: Optional[int] = None,
39
tuner: Optional[WorkerTuner] = None,
40
max_concurrent_workflow_task_polls: Optional[int] = None,
41
nonsticky_to_sticky_poll_ratio: float = 0.2,
42
max_concurrent_activity_task_polls: Optional[int] = None,
43
no_remote_activities: bool = False,
44
sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10),
45
max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60),
46
default_heartbeat_throttle_interval: timedelta = timedelta(seconds=30),
47
max_activities_per_second: Optional[float] = None,
48
max_task_queue_activities_per_second: Optional[float] = None,
49
graceful_shutdown_timeout: timedelta = timedelta(),
50
workflow_failure_exception_types: Sequence[Type[BaseException]] = [],
51
shared_state_manager: Optional[SharedStateManager] = None,
52
debug_mode: bool = False,
53
disable_eager_activity_execution: bool = False,
54
on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]] = None,
55
use_worker_versioning: bool = False,
56
disable_safe_workflow_eviction: bool = False,
57
deployment_config: Optional[WorkerDeploymentConfig] = None,
58
workflow_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(maximum=5),
59
activity_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(maximum=5),
60
nexus_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(maximum=5),
61
) -> None:
62
"""Create a worker to process workflows and/or activities.
63
64
Args:
65
client: Client to use for this worker. This is required and must be
66
the Client instance or have a worker_service_client attribute with
67
reference to the original client's underlying service client.
68
This client cannot be "lazy".
69
task_queue: Required task queue for this worker.
70
activities: Activity callables decorated with @activity.defn.
71
Activities may be async functions or non-async functions.
72
nexus_service_handlers: Instances of Nexus service handler classes
73
decorated with @nexusrpc.handler.service_handler.
74
workflows: Workflow classes decorated with @workflow.defn.
75
activity_executor: Concurrent executor to use for non-async
76
activities. This is required if any activities are non-async.
77
ThreadPoolExecutor is recommended. If this is a
78
ProcessPoolExecutor, all non-async activities must be picklable.
79
workflow_task_executor: Thread pool executor for workflow tasks. If
80
this is not present, a new ThreadPoolExecutor will be
81
created with max_workers set to max_concurrent_workflow_tasks if it is present,
82
or 500 otherwise.
83
nexus_task_executor: Executor to use for non-async
84
Nexus operations. This is required if any operation start methods
85
are non-async def. ThreadPoolExecutor is recommended.
86
workflow_runner: Runner for workflows.
87
unsandboxed_workflow_runner: Runner for workflows that opt-out of
88
sandboxing.
89
plugins: Collection of plugins for this worker. Any plugins already
90
on the client that also implement Plugin are
91
prepended to this list and should not be explicitly given here
92
to avoid running the plugin twice.
93
interceptors: Collection of interceptors for this worker. Any
94
interceptors already on the client that also implement
95
Interceptor are prepended to this list and should
96
not be explicitly given here.
97
build_id: Unique identifier for the current runtime. This is best
98
set as a hash of all code and should change only when code does.
99
If unset, a best-effort identifier is generated.
100
Exclusive with deployment_config.
101
identity: Identity for this worker client. If unset, the client
102
identity is used.
103
max_cached_workflows: If nonzero, workflows will be cached and
104
sticky task queues will be used.
105
max_concurrent_workflow_tasks: Maximum allowed number of workflow
106
tasks that will ever be given to this worker at one time. Mutually exclusive with
107
tuner. Must be set to at least two if max_cached_workflows is nonzero.
108
max_concurrent_activities: Maximum number of activity tasks that
109
will ever be given to the activity worker concurrently. Mutually exclusive with tuner.
110
max_concurrent_local_activities: Maximum number of local activity
111
tasks that will ever be given to the activity worker concurrently. Mutually exclusive with tuner.
112
tuner: Provide a custom WorkerTuner. Mutually exclusive with the
113
max_concurrent_workflow_tasks, max_concurrent_activities, and
114
max_concurrent_local_activities arguments.
115
Defaults to fixed-size 100 slots for each slot kind if unset and none of the
116
max_* arguments are provided.
117
max_concurrent_workflow_task_polls: Maximum number of concurrent
118
poll workflow task requests we will perform at a time on this worker's task queue.
119
Must be set to at least two if max_cached_workflows is nonzero.
120
If set, will override any value passed to workflow_task_poller_behavior.
121
nonsticky_to_sticky_poll_ratio: max_concurrent_workflow_task_polls *
122
this number = the number of max pollers that will be allowed for
123
the nonsticky queue when sticky tasks are enabled.
124
max_concurrent_activity_task_polls: Maximum number of concurrent
125
poll activity task requests we will perform at a time on this
126
worker's task queue.
127
If set, will override any value passed to activity_task_poller_behavior.
128
no_remote_activities: If true, this worker will only handle workflow
129
tasks and local activities, it will not poll for activity tasks.
130
sticky_queue_schedule_to_start_timeout: How long a workflow task is
131
allowed to sit on the sticky queue before it is timed out and
132
moved to the non-sticky queue where it may be picked up by any
133
worker.
134
max_heartbeat_throttle_interval: Longest interval for throttling
135
activity heartbeats.
136
default_heartbeat_throttle_interval: Default interval for throttling
137
activity heartbeats in case per-activity heartbeat timeout is
138
unset. Otherwise, it's the per-activity heartbeat timeout * 0.8.
139
max_activities_per_second: Limits the number of activities per
140
second that this worker will process. The worker will not poll
141
for new activities if by doing so it might receive and execute
142
an activity which would cause it to exceed this limit.
143
max_task_queue_activities_per_second: Sets the maximum number of
144
activities per second the task queue will dispatch, controlled
145
server-side. Note that this only takes effect upon an activity
146
poll request.
147
graceful_shutdown_timeout: Amount of time after shutdown is called
148
that activities are given to complete before their tasks are
149
cancelled.
150
workflow_failure_exception_types: The types of exceptions that, if a
151
workflow-thrown exception extends, will cause the
152
workflow/update to fail instead of suspending the workflow via
153
task failure.
154
shared_state_manager: Used for obtaining cross-process friendly
155
synchronization primitives. This is required for non-async
156
activities where the activity_executor is not a
157
ThreadPoolExecutor. Reuse of these across workers is encouraged.
158
debug_mode: If true, will disable deadlock detection and may disable
159
sandboxing in order to make using a debugger easier. If false
160
but the environment variable TEMPORAL_DEBUG is truthy, this
161
will be set to true.
162
disable_eager_activity_execution: If true, will disable eager
163
activity execution. Eager activity execution is an optimization
164
on some servers that sends activities back to the same worker as
165
the calling workflow if they can run there.
166
on_fatal_error: An async function that can handle a failure before
167
the worker shutdown commences. This cannot stop the shutdown and
168
any exception raised is logged and ignored.
169
use_worker_versioning: If true, the build_id argument must be
170
specified, and this worker opts into the worker versioning
171
feature. This ensures it only receives workflow tasks for
172
workflows which it claims to be compatible with.
173
Exclusive with deployment_config.
174
disable_safe_workflow_eviction: If true, instead of letting the
175
workflow collect its tasks properly, the worker will simply let
176
the Python garbage collector collect the tasks.
177
deployment_config: Deployment config for the worker. Exclusive with build_id and
178
use_worker_versioning.
179
workflow_task_poller_behavior: Specify the behavior of workflow task polling.
180
Defaults to a 5-poller maximum.
181
activity_task_poller_behavior: Specify the behavior of activity task polling.
182
Defaults to a 5-poller maximum.
183
nexus_task_poller_behavior: Specify the behavior of Nexus task polling.
184
Defaults to a 5-poller maximum.
185
"""
186
```
187
188
### WorkerConfig
189
190
TypedDict for worker configuration:
191
192
```python { .api }
193
class WorkerConfig(TypedDict, total=False):
194
"""TypedDict of config originally passed to Worker."""
195
196
client: temporalio.client.Client
197
task_queue: str
198
activities: Sequence[Callable]
199
nexus_service_handlers: Sequence[Any]
200
workflows: Sequence[Type]
201
activity_executor: Optional[concurrent.futures.Executor]
202
workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor]
203
nexus_task_executor: Optional[concurrent.futures.Executor]
204
workflow_runner: WorkflowRunner
205
unsandboxed_workflow_runner: WorkflowRunner
206
interceptors: Sequence[Interceptor]
207
build_id: Optional[str]
208
identity: Optional[str]
209
max_cached_workflows: int
210
max_concurrent_workflow_tasks: Optional[int]
211
max_concurrent_activities: Optional[int]
212
max_concurrent_local_activities: Optional[int]
213
tuner: Optional[WorkerTuner]
214
max_concurrent_workflow_task_polls: Optional[int]
215
nonsticky_to_sticky_poll_ratio: float
216
max_concurrent_activity_task_polls: Optional[int]
217
no_remote_activities: bool
218
sticky_queue_schedule_to_start_timeout: timedelta
219
max_heartbeat_throttle_interval: timedelta
220
default_heartbeat_throttle_interval: timedelta
221
max_activities_per_second: Optional[float]
222
max_task_queue_activities_per_second: Optional[float]
223
graceful_shutdown_timeout: timedelta
224
workflow_failure_exception_types: Sequence[Type[BaseException]]
225
shared_state_manager: Optional[SharedStateManager]
226
debug_mode: bool
227
disable_eager_activity_execution: bool
228
on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]]
229
use_worker_versioning: bool
230
disable_safe_workflow_eviction: bool
231
deployment_config: Optional[WorkerDeploymentConfig]
232
workflow_task_poller_behavior: PollerBehavior
233
activity_task_poller_behavior: PollerBehavior
234
nexus_task_poller_behavior: PollerBehavior
235
```
236
237
### WorkerDeploymentConfig
238
239
Configuration for Worker Versioning feature:
240
241
```python { .api }
242
@dataclass
243
class WorkerDeploymentConfig:
244
"""Options for configuring the Worker Versioning feature.
245
246
WARNING: This is an experimental feature and may change in the future.
247
"""
248
249
version: WorkerDeploymentVersion
250
"""The deployment version information."""
251
252
use_worker_versioning: bool
253
"""Whether to enable worker versioning."""
254
255
default_versioning_behavior: VersioningBehavior = VersioningBehavior.UNSPECIFIED
256
"""Default versioning behavior for workflow tasks."""
257
258
def _to_bridge_worker_deployment_options(self) -> temporalio.bridge.worker.WorkerDeploymentOptions:
259
"""Convert to bridge worker deployment options."""
260
```
261
262
### Basic Worker Usage
263
264
```python
265
import asyncio
266
from temporalio import activity, client, worker, workflow
267
268
# Define activities and workflows
269
@activity.defn
270
async def say_hello(name: str) -> str:
271
return f"Hello, {name}!"
272
273
@workflow.defn
274
class SayHelloWorkflow:
275
@workflow.run
276
async def run(self, name: str) -> str:
277
return await workflow.execute_activity(
278
say_hello,
279
name,
280
schedule_to_close_timeout=timedelta(seconds=60),
281
)
282
283
async def main():
284
# Create client
285
client_instance = await client.Client.connect("localhost:7233")
286
287
# Create worker
288
worker_instance = worker.Worker(
289
client_instance,
290
task_queue="hello-task-queue",
291
workflows=[SayHelloWorkflow],
292
activities=[say_hello],
293
)
294
295
# Run worker
296
async with worker_instance:
297
# Worker runs until context exits
298
await asyncio.sleep(60)
299
300
if __name__ == "__main__":
301
asyncio.run(main())
302
```
303
304
## Worker Execution and Lifecycle
305
306
### Worker.run() Method
307
308
Start the worker and wait for shutdown:
309
310
```python { .api }
311
async def run(self) -> None:
312
"""Run the worker and wait on it to be shut down.
313
314
This will not return until shutdown is complete. This means that
315
activities have all completed after being told to cancel after the
316
graceful timeout period.
317
318
This method will raise if there is a worker fatal error. While
319
shutdown() does not need to be invoked in this case, it is
320
harmless to do so. Otherwise, to shut down this worker, invoke
321
shutdown().
322
323
Technically this worker can be shutdown by issuing a cancel to this
324
async function assuming that it is currently running. A cancel could
325
also cancel the shutdown process. Therefore users are encouraged to use
326
explicit shutdown instead.
327
"""
328
```
329
330
### Worker Properties
331
332
```python { .api }
333
@property
334
def is_running(self) -> bool:
335
"""Whether the worker is running.
336
337
This is only True if the worker has been started and not yet
338
shut down.
339
"""
340
341
@property
342
def is_shutdown(self) -> bool:
343
"""Whether the worker has run and shut down.
344
345
This is only True if the worker was once started and then shutdown.
346
This is not necessarily True after shutdown() is first
347
called because the shutdown process can take a bit.
348
"""
349
```
350
351
### Worker Shutdown
352
353
```python { .api }
354
async def shutdown(self) -> None:
355
"""Initiate a worker shutdown and wait until complete.
356
357
This can be called before the worker has even started and is safe for
358
repeated invocations. It simply sets a marker informing the worker to
359
shut down as it runs.
360
361
This will not return until the worker has completed shutting down.
362
"""
363
```
364
365
### Async Context Manager
366
367
```python { .api }
368
async def __aenter__(self) -> Worker:
369
"""Start the worker and return self for use by async with.
370
371
This is a wrapper around run(). Please review that method.
372
373
This takes a similar approach to asyncio.timeout() in that it
374
will cancel the current task if there is a fatal worker error and raise
375
that error out of the context manager. However, if the inner async code
376
swallows/wraps the CancelledError, the exiting
377
portion of the context manager will not raise the fatal worker error.
378
"""
379
380
async def __aexit__(
381
self,
382
exc_type: Optional[Type[BaseException]],
383
*args
384
) -> None:
385
"""Exit the context manager and shutdown the worker."""
386
```
387
388
### Lifecycle Example
389
390
```python
391
import asyncio
392
from temporalio import worker
393
394
async def explicit_lifecycle():
395
# Create worker
396
worker_instance = worker.Worker(
397
client,
398
task_queue="my-task-queue",
399
workflows=[MyWorkflow],
400
activities=[my_activity],
401
)
402
403
try:
404
# Start worker in background
405
worker_task = asyncio.create_task(worker_instance.run())
406
407
# Do other work
408
await asyncio.sleep(10)
409
410
# Shutdown worker
411
await worker_instance.shutdown()
412
413
# Wait for worker task to complete
414
await worker_task
415
416
except Exception as e:
417
print(f"Worker failed: {e}")
418
await worker_instance.shutdown()
419
420
async def context_manager_lifecycle():
421
# Using async context manager
422
async with worker.Worker(
423
client,
424
task_queue="my-task-queue",
425
workflows=[MyWorkflow],
426
activities=[my_activity],
427
) as worker_instance:
428
# Worker runs until context exits
429
await asyncio.sleep(10)
430
# Worker automatically shuts down here
431
```
432
433
## Workflow and Activity Registration
434
435
### Registration Patterns
436
437
Workers automatically discover and register workflows and activities:
438
439
```python
440
# Direct registration
441
worker_instance = worker.Worker(
442
client,
443
task_queue="my-task-queue",
444
workflows=[WorkflowClass1, WorkflowClass2],
445
activities=[activity_func1, activity_func2, activity_method],
446
)
447
448
# Dynamic registration
449
def discover_workflows():
450
import importlib
451
import inspect
452
453
workflows = []
454
# Dynamically import and discover workflow classes
455
module = importlib.import_module("my_workflows")
456
for name, obj in inspect.getmembers(module, inspect.isclass):
457
if hasattr(obj, "__temporal_workflow_definition__"):
458
workflows.append(obj)
459
return workflows
460
461
def discover_activities():
462
activities = []
463
# Dynamically import and discover activity functions
464
module = importlib.import_module("my_activities")
465
for name, obj in inspect.getmembers(module, inspect.isfunction):
466
if hasattr(obj, "__temporal_activity_definition__"):
467
activities.append(obj)
468
return activities
469
470
# Use discovered workflows and activities
471
worker_instance = worker.Worker(
472
client,
473
task_queue="my-task-queue",
474
workflows=discover_workflows(),
475
activities=discover_activities(),
476
)
477
```
478
479
### Nexus Service Registration
480
481
```python
482
# Nexus service handlers (experimental)
483
import nexusrpc.handler
484
485
@nexusrpc.handler.service_handler
486
class MyNexusService:
487
@nexusrpc.handler.operation_handler
488
async def my_operation(self, input: str) -> str:
489
return f"Processed: {input}"
490
491
worker_instance = worker.Worker(
492
client,
493
task_queue="my-task-queue",
494
nexus_service_handlers=[MyNexusService()],
495
)
496
```
497
498
## Polling and Execution Behavior
499
500
### PollerBehavior Types
501
502
Control how the worker polls for tasks:
503
504
```python { .api }
505
PollerBehavior = Union[PollerBehaviorSimpleMaximum, PollerBehaviorAutoscaling]
506
```
507
508
### PollerBehaviorSimpleMaximum
509
510
Simple fixed maximum poller behavior:
511
512
```python { .api }
513
@dataclass(frozen=True)
514
class PollerBehaviorSimpleMaximum:
515
"""A poller behavior that will attempt to poll as long as a slot is available, up to the
516
provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.
517
"""
518
519
maximum: int = 5
520
"""Maximum number of concurrent pollers."""
521
522
def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:
523
"""Convert to bridge poller behavior."""
524
```
525
526
### PollerBehaviorAutoscaling
527
528
Dynamic autoscaling poller behavior:
529
530
```python { .api }
531
@dataclass(frozen=True)
532
class PollerBehaviorAutoscaling:
533
"""A poller behavior that will automatically scale the number of pollers based on feedback
534
from the server. A slot must be available before beginning polling.
535
"""
536
537
minimum: int = 1
538
"""At least this many poll calls will always be attempted (assuming slots are available)."""
539
540
maximum: int = 100
541
"""At most this many poll calls will ever be open at once. Must be >= minimum."""
542
543
initial: int = 5
544
"""This many polls will be attempted initially before scaling kicks in. Must be between
545
minimum and maximum."""
546
547
def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:
548
"""Convert to bridge poller behavior."""
549
```
550
551
### Poller Configuration Example
552
553
```python
554
from temporalio.worker import (
555
PollerBehaviorSimpleMaximum,
556
PollerBehaviorAutoscaling,
557
Worker
558
)
559
560
# Simple maximum polling
561
worker_simple = Worker(
562
client,
563
task_queue="simple-queue",
564
workflows=[MyWorkflow],
565
workflow_task_poller_behavior=PollerBehaviorSimpleMaximum(maximum=10),
566
activity_task_poller_behavior=PollerBehaviorSimpleMaximum(maximum=5),
567
)
568
569
# Autoscaling polling
570
worker_autoscaling = Worker(
571
client,
572
task_queue="autoscaling-queue",
573
workflows=[MyWorkflow],
574
workflow_task_poller_behavior=PollerBehaviorAutoscaling(
575
minimum=2,
576
maximum=20,
577
initial=5
578
),
579
activity_task_poller_behavior=PollerBehaviorAutoscaling(
580
minimum=1,
581
maximum=10,
582
initial=3
583
),
584
)
585
```
586
587
## Interceptors and Plugins
588
589
### Base Interceptor Class
590
591
```python { .api }
592
class Interceptor:
593
"""Interceptor for workers.
594
595
This should be extended by any worker interceptors.
596
"""
597
598
def intercept_activity(
599
self, next: ActivityInboundInterceptor
600
) -> ActivityInboundInterceptor:
601
"""Method called for intercepting an activity.
602
603
Args:
604
next: The underlying inbound interceptor this interceptor should
605
delegate to.
606
607
Returns:
608
The new interceptor that will be used to for the activity.
609
"""
610
return next
611
612
def workflow_interceptor_class(
613
self, input: WorkflowInterceptorClassInput
614
) -> Optional[Type[WorkflowInboundInterceptor]]:
615
"""Class that will be instantiated and used to intercept workflows.
616
617
This method is called on workflow start. The class must have the same
618
init as WorkflowInboundInterceptor.__init__. The input can be
619
altered to do things like add additional extern functions.
620
621
Args:
622
input: Input to this method that contains mutable properties that
623
can be altered by this interceptor.
624
625
Returns:
626
The class to construct to intercept each workflow.
627
"""
628
return None
629
```
630
631
### Activity Interceptors
632
633
#### ActivityInboundInterceptor
634
635
```python { .api }
636
class ActivityInboundInterceptor:
637
"""Inbound interceptor to wrap outbound creation and activity execution.
638
639
This should be extended by any activity inbound interceptors.
640
"""
641
642
def __init__(self, next: ActivityOutboundInterceptor) -> None:
643
"""Create activity inbound interceptor."""
644
self.next = next
645
646
async def execute_activity(self, input: ExecuteActivityInput) -> Any:
647
"""Called when executing an activity.
648
649
Args:
650
input: Activity execution input.
651
652
Returns:
653
Result of activity execution.
654
"""
655
return await self.next.execute_activity(input)
656
```
657
658
#### ActivityOutboundInterceptor
659
660
```python { .api }
661
class ActivityOutboundInterceptor:
662
"""Outbound interceptor for activities.
663
664
This should be extended by any activity outbound interceptors.
665
"""
666
667
async def execute_activity(self, input: ExecuteActivityInput) -> Any:
668
"""Execute an activity.
669
670
Args:
671
input: Activity execution input.
672
673
Returns:
674
Result of activity execution.
675
"""
676
```
677
678
### Workflow Interceptors
679
680
#### WorkflowInboundInterceptor
681
682
```python { .api }
683
class WorkflowInboundInterceptor:
684
"""Inbound interceptor to wrap outbound calls and workflow execution.
685
686
This should be extended by any workflow inbound interceptors.
687
"""
688
689
def __init__(self, next: WorkflowOutboundInterceptor) -> None:
690
"""Create workflow inbound interceptor."""
691
self.next = next
692
693
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
694
"""Called when executing a workflow.
695
696
Args:
697
input: Workflow execution input.
698
699
Returns:
700
Result of workflow execution.
701
"""
702
return await self.next.execute_workflow(input)
703
704
async def handle_signal(self, input: HandleSignalInput) -> None:
705
"""Called when handling a signal."""
706
await self.next.handle_signal(input)
707
708
async def handle_query(self, input: HandleQueryInput) -> Any:
709
"""Called when handling a query."""
710
return await self.next.handle_query(input)
711
712
async def handle_update(self, input: HandleUpdateInput) -> Any:
713
"""Called when handling an update."""
714
return await self.next.handle_update(input)
715
```
716
717
#### WorkflowOutboundInterceptor
718
719
```python { .api }
720
class WorkflowOutboundInterceptor:
721
"""Outbound interceptor for workflows.
722
723
This should be extended by any workflow outbound interceptors.
724
"""
725
726
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
727
"""Execute a workflow."""
728
729
async def start_activity(self, input: StartActivityInput) -> ActivityHandle:
730
"""Start an activity."""
731
732
async def start_local_activity(self, input: StartLocalActivityInput) -> ActivityHandle:
733
"""Start a local activity."""
734
735
async def start_child_workflow(self, input: StartChildWorkflowInput) -> ChildWorkflowHandle:
736
"""Start a child workflow."""
737
```
738
739
### Input Classes for Interceptors
740
741
```python { .api }
742
@dataclass
743
class ExecuteActivityInput:
744
"""Input for ActivityInboundInterceptor.execute_activity."""
745
746
fn: Callable[..., Any]
747
"""The activity function to execute."""
748
749
args: Sequence[Any]
750
"""Arguments to pass to the activity function."""
751
752
executor: Optional[concurrent.futures.Executor]
753
"""Executor to run the activity in."""
754
755
headers: Mapping[str, temporalio.api.common.v1.Payload]
756
"""Headers for the activity."""
757
758
@dataclass
759
class ExecuteWorkflowInput:
760
"""Input for WorkflowInboundInterceptor.execute_workflow."""
761
762
run_fn: Callable[..., Awaitable[Any]]
763
"""The workflow run function."""
764
765
args: Sequence[Any]
766
"""Arguments to pass to the workflow run function."""
767
768
@dataclass
769
class HandleSignalInput:
770
"""Input for WorkflowInboundInterceptor.handle_signal."""
771
772
signal: str
773
"""Name of the signal."""
774
775
args: Sequence[Any]
776
"""Arguments for the signal."""
777
778
@dataclass
779
class HandleQueryInput:
780
"""Input for WorkflowInboundInterceptor.handle_query."""
781
782
query: str
783
"""Name of the query."""
784
785
args: Sequence[Any]
786
"""Arguments for the query."""
787
788
@dataclass
789
class HandleUpdateInput:
790
"""Input for WorkflowInboundInterceptor.handle_update."""
791
792
update: str
793
"""Name of the update."""
794
795
args: Sequence[Any]
796
"""Arguments for the update."""
797
798
@dataclass
799
class StartActivityInput:
800
"""Input for WorkflowOutboundInterceptor.start_activity."""
801
802
activity: str
803
"""Activity name or function."""
804
805
args: Sequence[Any]
806
"""Arguments to pass to the activity."""
807
808
activity_id: Optional[str]
809
"""ID for the activity."""
810
811
task_queue: Optional[str]
812
"""Task queue for the activity."""
813
814
@dataclass
815
class StartChildWorkflowInput:
816
"""Input for WorkflowOutboundInterceptor.start_child_workflow."""
817
818
workflow: str
819
"""Workflow name or class."""
820
821
args: Sequence[Any]
822
"""Arguments to pass to the workflow."""
823
824
id: Optional[str]
825
"""ID for the child workflow."""
826
827
task_queue: Optional[str]
828
"""Task queue for the child workflow."""
829
830
@dataclass
831
class WorkflowInterceptorClassInput:
832
"""Input for Interceptor.workflow_interceptor_class."""
833
834
unsafe_extern_functions: MutableMapping[str, Callable]
835
"""Set of external functions that can be called from the sandbox.
836
837
WARNING: Exposing external functions to the workflow sandbox is dangerous and
838
should be avoided. Use at your own risk.
839
"""
840
```
841
842
### Plugin System
843
844
```python { .api }
845
class Plugin:
846
"""Plugin base class.
847
848
Plugins can be used to extend worker functionality and are applied
849
during worker initialization.
850
"""
851
852
def init_worker_plugin(self, root_plugin: Plugin) -> None:
853
"""Initialize the plugin with the root plugin."""
854
pass
855
856
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
857
"""Configure worker settings.
858
859
Args:
860
config: The worker configuration to modify.
861
862
Returns:
863
Modified worker configuration.
864
"""
865
return config
866
867
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
868
"""Configure replayer settings.
869
870
Args:
871
config: The replayer configuration to modify.
872
873
Returns:
874
Modified replayer configuration.
875
"""
876
return config
877
```
878
879
### Interceptor Example
880
881
```python
882
from temporalio.worker import Interceptor, ActivityInboundInterceptor, ExecuteActivityInput
883
import logging
884
885
class LoggingInterceptor(Interceptor):
886
"""Example interceptor that logs activity execution."""
887
888
def intercept_activity(self, next: ActivityInboundInterceptor) -> ActivityInboundInterceptor:
889
return LoggingActivityInboundInterceptor(next)
890
891
class LoggingActivityInboundInterceptor(ActivityInboundInterceptor):
892
async def execute_activity(self, input: ExecuteActivityInput) -> Any:
893
activity_name = getattr(input.fn, '__temporal_activity_definition__', {}).get('name', str(input.fn))
894
895
logging.info(f"Starting activity: {activity_name}")
896
try:
897
result = await super().execute_activity(input)
898
logging.info(f"Completed activity: {activity_name}")
899
return result
900
except Exception as e:
901
logging.error(f"Activity failed: {activity_name}, error: {e}")
902
raise
903
904
# Use the interceptor
905
worker_instance = worker.Worker(
906
client,
907
task_queue="my-task-queue",
908
workflows=[MyWorkflow],
909
activities=[my_activity],
910
interceptors=[LoggingInterceptor()],
911
)
912
```
913
914
## Worker Tuning and Resource Management
915
916
### WorkerTuner
917
918
Base class for worker tuning:
919
920
```python { .api }
921
class WorkerTuner(ABC):
922
"""WorkerTuners allow for the dynamic customization of some aspects of worker configuration"""
923
924
@staticmethod
925
def create_resource_based(
926
*,
927
target_memory_usage: float,
928
target_cpu_usage: float,
929
workflow_config: Optional[ResourceBasedSlotConfig] = None,
930
activity_config: Optional[ResourceBasedSlotConfig] = None,
931
local_activity_config: Optional[ResourceBasedSlotConfig] = None,
932
) -> "WorkerTuner":
933
"""Create a resource-based tuner with the provided options."""
934
935
@staticmethod
936
def create_fixed(
937
*,
938
workflow_slots: Optional[int],
939
activity_slots: Optional[int],
940
local_activity_slots: Optional[int],
941
) -> "WorkerTuner":
942
"""Create a fixed-size tuner with the provided number of slots. Any unspecified slots will default to 100."""
943
944
@staticmethod
945
def create_composite(
946
*,
947
workflow_supplier: SlotSupplier,
948
activity_supplier: SlotSupplier,
949
local_activity_supplier: SlotSupplier,
950
) -> "WorkerTuner":
951
"""Create a tuner composed of the provided slot suppliers."""
952
953
@abstractmethod
954
def _get_workflow_task_slot_supplier(self) -> SlotSupplier:
955
"""Get the slot supplier for workflow tasks."""
956
957
@abstractmethod
958
def _get_activity_task_slot_supplier(self) -> SlotSupplier:
959
"""Get the slot supplier for activity tasks."""
960
961
@abstractmethod
962
def _get_local_activity_task_slot_supplier(self) -> SlotSupplier:
963
"""Get the slot supplier for local activity tasks."""
964
```
965
966
### Slot Suppliers
967
968
#### FixedSizeSlotSupplier
969
970
```python { .api }
971
@dataclass(frozen=True)
972
class FixedSizeSlotSupplier:
973
"""A fixed-size slot supplier that will never issue more than a fixed number of slots."""
974
975
num_slots: int
976
"""The maximum number of slots that can be issued"""
977
```
978
979
#### ResourceBasedSlotSupplier
980
981
```python { .api }
982
@dataclass(frozen=True)
983
class ResourceBasedSlotSupplier:
984
"""A slot supplier that will dynamically adjust the number of slots based on resource usage.
985
986
WARNING: The resource based tuner is currently experimental.
987
"""
988
989
slot_config: ResourceBasedSlotConfig
990
"""Configuration for this slot supplier."""
991
992
tuner_config: ResourceBasedTunerConfig
993
"""Options for the tuner that will be used to adjust the number of slots. When used with a
994
CompositeTuner, all resource-based slot suppliers must use the same tuner options."""
995
996
@dataclass(frozen=True)
997
class ResourceBasedTunerConfig:
998
"""Options for a ResourceBasedTuner or a ResourceBasedSlotSupplier.
999
1000
WARNING: The resource based tuner is currently experimental.
1001
"""
1002
1003
target_memory_usage: float
1004
"""A value between 0 and 1 that represents the target (system) memory usage. It's not recommended
1005
to set this higher than 0.8, since how much memory a workflow may use is not predictable, and
1006
you don't want to encounter OOM errors."""
1007
1008
target_cpu_usage: float
1009
"""A value between 0 and 1 that represents the target (system) CPU usage. This can be set to 1.0
1010
if desired, but it's recommended to leave some headroom for other processes."""
1011
1012
@dataclass(frozen=True)
1013
class ResourceBasedSlotConfig:
1014
"""Options for a specific slot type being used with a ResourceBasedSlotSupplier.
1015
1016
WARNING: The resource based tuner is currently experimental.
1017
"""
1018
1019
minimum_slots: Optional[int] = None
1020
"""Amount of slots that will be issued regardless of any other checks. Defaults to 5 for workflows and 1 for
1021
activities."""
1022
1023
maximum_slots: Optional[int] = None
1024
"""Maximum amount of slots permitted. Defaults to 500."""
1025
1026
ramp_throttle: Optional[timedelta] = None
1027
"""Minimum time we will wait (after passing the minimum slots number) between handing out new slots in milliseconds.
1028
Defaults to 0 for workflows and 50ms for activities.
1029
1030
This value matters because how many resources a task will use cannot be determined ahead of time, and thus the
1031
system should wait to see how much resources are used before issuing more slots."""
1032
```
1033
1034
#### CustomSlotSupplier
1035
1036
```python { .api }
1037
class CustomSlotSupplier(ABC):
1038
"""This class can be implemented to provide custom slot supplier behavior.
1039
1040
WARNING: Custom slot suppliers are currently experimental.
1041
"""
1042
1043
@abstractmethod
1044
async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:
1045
"""This function is called before polling for new tasks. Your implementation must block until a
1046
slot is available then return a permit to use that slot.
1047
1048
The only acceptable exception to throw is asyncio.CancelledError, as invocations of this method may
1049
be cancelled. Any other exceptions thrown will be logged and ignored.
1050
1051
Args:
1052
ctx: The context for slot reservation.
1053
1054
Returns:
1055
A permit to use the slot which may be populated with your own data.
1056
"""
1057
1058
@abstractmethod
1059
def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]:
1060
"""This function is called when trying to reserve slots for "eager" workflow and activity tasks.
1061
Eager tasks are those which are returned as a result of completing a workflow task, rather than
1062
from polling. Your implementation must not block, and if a slot is available, return a permit
1063
to use that slot.
1064
1065
Args:
1066
ctx: The context for slot reservation.
1067
1068
Returns:
1069
Maybe a permit to use the slot which may be populated with your own data.
1070
"""
1071
1072
@abstractmethod
1073
def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:
1074
"""This function is called once a slot is actually being used to process some task, which may be
1075
some time after the slot was reserved originally.
1076
1077
Args:
1078
ctx: The context for marking a slot as used.
1079
"""
1080
1081
@abstractmethod
1082
def release_slot(self, ctx: SlotReleaseContext) -> None:
1083
"""This function is called once a permit is no longer needed.
1084
1085
Args:
1086
ctx: The context for releasing a slot.
1087
"""
1088
```
1089
1090
### Slot Management Classes
1091
1092
```python { .api }
1093
class SlotPermit:
1094
"""A permit to use a slot for a workflow/activity/local activity task.
1095
1096
You can inherit from this class to add your own data to the permit.
1097
1098
WARNING: Custom slot suppliers are currently experimental.
1099
"""
1100
1101
class SlotReserveContext(Protocol):
1102
"""Context for reserving a slot from a CustomSlotSupplier.
1103
1104
WARNING: Custom slot suppliers are currently experimental.
1105
"""
1106
1107
slot_type: Literal["workflow", "activity", "local-activity"]
1108
"""The type of slot trying to be reserved. Always one of "workflow", "activity", or "local-activity"."""
1109
1110
task_queue: str
1111
"""The name of the task queue for which this reservation request is associated."""
1112
1113
worker_identity: str
1114
"""The identity of the worker that is requesting the reservation."""
1115
1116
worker_build_id: str
1117
"""The build id of the worker that is requesting the reservation."""
1118
1119
worker_deployment_version: Optional[WorkerDeploymentVersion]
1120
"""The deployment version of the worker that is requesting the reservation, if any."""
1121
1122
is_sticky: bool
1123
"""True iff this is a reservation for a sticky poll for a workflow task."""
1124
1125
@dataclass(frozen=True)
1126
class SlotMarkUsedContext(Protocol):
1127
"""Context for marking a slot used from a CustomSlotSupplier.
1128
1129
WARNING: Custom slot suppliers are currently experimental.
1130
"""
1131
1132
slot_info: SlotInfo
1133
"""Info about the task that will be using the slot."""
1134
1135
permit: SlotPermit
1136
"""The permit that was issued when the slot was reserved."""
1137
1138
@dataclass(frozen=True)
1139
class SlotReleaseContext:
1140
"""Context for releasing a slot from a CustomSlotSupplier.
1141
1142
WARNING: Custom slot suppliers are currently experimental.
1143
"""
1144
1145
slot_info: Optional[SlotInfo]
1146
"""Info about the task that will be using the slot. May be None if the slot was never used."""
1147
1148
permit: SlotPermit
1149
"""The permit that was issued when the slot was reserved."""
1150
```
1151
1152
### Slot Info Types
1153
1154
```python { .api }
1155
@runtime_checkable
1156
class WorkflowSlotInfo(Protocol):
1157
"""Info about a workflow task slot usage.
1158
1159
WARNING: Custom slot suppliers are currently experimental.
1160
"""
1161
1162
workflow_type: str
1163
is_sticky: bool
1164
1165
@runtime_checkable
1166
class ActivitySlotInfo(Protocol):
1167
"""Info about an activity task slot usage.
1168
1169
WARNING: Custom slot suppliers are currently experimental.
1170
"""
1171
1172
activity_type: str
1173
1174
@runtime_checkable
1175
class LocalActivitySlotInfo(Protocol):
1176
"""Info about a local activity task slot usage.
1177
1178
WARNING: Custom slot suppliers are currently experimental.
1179
"""
1180
1181
activity_type: str
1182
1183
SlotInfo = Union[WorkflowSlotInfo, ActivitySlotInfo, LocalActivitySlotInfo]
1184
```
1185
1186
### Tuning Examples
1187
1188
```python
1189
from temporalio.worker import WorkerTuner, ResourceBasedSlotConfig
1190
1191
# Fixed-size tuning
1192
fixed_tuner = WorkerTuner.create_fixed(
1193
workflow_slots=50,
1194
activity_slots=100,
1195
local_activity_slots=200,
1196
)
1197
1198
# Resource-based tuning
1199
resource_tuner = WorkerTuner.create_resource_based(
1200
target_memory_usage=0.7, # Target 70% memory usage
1201
target_cpu_usage=0.8, # Target 80% CPU usage
1202
workflow_config=ResourceBasedSlotConfig(
1203
minimum_slots=5,
1204
maximum_slots=100,
1205
ramp_throttle=timedelta(milliseconds=0),
1206
),
1207
activity_config=ResourceBasedSlotConfig(
1208
minimum_slots=10,
1209
maximum_slots=500,
1210
ramp_throttle=timedelta(milliseconds=50),
1211
),
1212
)
1213
1214
# Custom slot supplier
1215
class CustomSlotSupplierImpl(CustomSlotSupplier):
1216
def __init__(self, max_slots: int):
1217
self._max_slots = max_slots
1218
self._current_slots = 0
1219
self._lock = asyncio.Lock()
1220
self._condition = asyncio.Condition(self._lock)
1221
1222
async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:
1223
async with self._condition:
1224
while self._current_slots >= self._max_slots:
1225
await self._condition.wait()
1226
1227
self._current_slots += 1
1228
return SlotPermit()
1229
1230
def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]:
1231
if self._current_slots < self._max_slots:
1232
self._current_slots += 1
1233
return SlotPermit()
1234
return None
1235
1236
def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:
1237
# Track slot usage if needed
1238
pass
1239
1240
def release_slot(self, ctx: SlotReleaseContext) -> None:
1241
async def release():
1242
async with self._condition:
1243
self._current_slots -= 1
1244
self._condition.notify()
1245
1246
# Schedule release in event loop
1247
asyncio.create_task(release())
1248
1249
# Use custom tuner
1250
custom_tuner = WorkerTuner.create_composite(
1251
workflow_supplier=CustomSlotSupplierImpl(max_slots=20),
1252
activity_supplier=CustomSlotSupplierImpl(max_slots=50),
1253
local_activity_supplier=CustomSlotSupplierImpl(max_slots=100),
1254
)
1255
1256
# Apply tuner to worker
1257
worker_instance = Worker(
1258
client,
1259
task_queue="tuned-queue",
1260
workflows=[MyWorkflow],
1261
activities=[my_activity],
1262
tuner=resource_tuner,
1263
)
1264
```
1265
1266
## Workflow Replay and Testing
1267
1268
### Replayer Class
1269
1270
Test workflow compatibility with historical executions:
1271
1272
```python { .api }
1273
class Replayer:
1274
"""Replayer to replay workflows from history."""
1275
1276
def __init__(
1277
self,
1278
*,
1279
workflows: Sequence[Type],
1280
workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None,
1281
workflow_runner: WorkflowRunner = SandboxedWorkflowRunner(),
1282
unsandboxed_workflow_runner: WorkflowRunner = UnsandboxedWorkflowRunner(),
1283
namespace: str = "ReplayNamespace",
1284
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
1285
interceptors: Sequence[Interceptor] = [],
1286
plugins: Sequence[temporalio.worker.Plugin] = [],
1287
build_id: Optional[str] = None,
1288
identity: Optional[str] = None,
1289
workflow_failure_exception_types: Sequence[Type[BaseException]] = [],
1290
debug_mode: bool = False,
1291
runtime: Optional[temporalio.runtime.Runtime] = None,
1292
disable_safe_workflow_eviction: bool = False,
1293
header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC,
1294
) -> None:
1295
"""Create a replayer to replay workflows from history.
1296
1297
Most of the same arguments need to be passed to the replayer that were passed
1298
to the worker when the workflow originally ran.
1299
1300
Note, unlike the worker, for the replayer the workflow_task_executor
1301
will default to a new thread pool executor with no max_workers set that
1302
will be shared across all replay calls and never explicitly shut down.
1303
Users are encouraged to provide their own if needing more control.
1304
"""
1305
1306
def config(self) -> ReplayerConfig:
1307
"""Config, as a dictionary, used to create this replayer.
1308
1309
Returns:
1310
Configuration, shallow-copied.
1311
"""
1312
1313
async def replay_workflow(
1314
self,
1315
history: temporalio.client.WorkflowHistory,
1316
*,
1317
raise_on_replay_failure: bool = True,
1318
) -> WorkflowReplayResult:
1319
"""Replay a workflow for the given history.
1320
1321
Args:
1322
history: The history to replay. Can be fetched directly, or use
1323
WorkflowHistory.from_json to parse a history downloaded via
1324
tctl or the web UI.
1325
raise_on_replay_failure: If True (the default), this will raise
1326
a WorkflowReplayResult.replay_failure if it is present.
1327
"""
1328
1329
async def replay_workflows(
1330
self,
1331
histories: AsyncIterator[temporalio.client.WorkflowHistory],
1332
*,
1333
raise_on_replay_failure: bool = True,
1334
) -> WorkflowReplayResults:
1335
"""Replay workflows for the given histories.
1336
1337
This is a shortcut for workflow_replay_iterator that iterates
1338
all results and aggregates information about them.
1339
1340
Args:
1341
histories: The histories to replay, from an async iterator.
1342
raise_on_replay_failure: If True (the default), this will raise
1343
the first replay failure seen.
1344
"""
1345
1346
@asynccontextmanager
1347
async def workflow_replay_iterator(
1348
self,
1349
histories: AsyncIterator[temporalio.client.WorkflowHistory],
1350
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
1351
"""Create an async context manager for replaying workflows.
1352
1353
Args:
1354
histories: The histories to replay, from an async iterator.
1355
1356
Returns:
1357
Async context manager that yields an iterator of replay results.
1358
"""
1359
```
1360
1361
### ReplayerConfig
1362
1363
```python { .api }
1364
class ReplayerConfig(TypedDict, total=False):
1365
"""TypedDict of config originally passed to Replayer."""
1366
1367
workflows: Sequence[Type]
1368
workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor]
1369
workflow_runner: WorkflowRunner
1370
unsandboxed_workflow_runner: WorkflowRunner
1371
namespace: str
1372
data_converter: temporalio.converter.DataConverter
1373
interceptors: Sequence[Interceptor]
1374
build_id: Optional[str]
1375
identity: Optional[str]
1376
workflow_failure_exception_types: Sequence[Type[BaseException]]
1377
debug_mode: bool
1378
runtime: Optional[temporalio.runtime.Runtime]
1379
disable_safe_workflow_eviction: bool
1380
header_codec_behavior: HeaderCodecBehavior
1381
```
1382
1383
### WorkflowReplayResult
1384
1385
```python { .api }
1386
@dataclass(frozen=True)
1387
class WorkflowReplayResult:
1388
"""Single workflow replay result."""
1389
1390
history: temporalio.client.WorkflowHistory
1391
"""History originally passed for this workflow replay."""
1392
1393
replay_failure: Optional[Exception]
1394
"""Failure during replay if any.
1395
1396
This does not mean your workflow exited by raising an error, but rather that
1397
some task failure such as NondeterminismError was encountered during
1398
replay - likely indicating your workflow code is incompatible with the
1399
history.
1400
"""
1401
```
1402
1403
### WorkflowReplayResults
1404
1405
```python { .api }
1406
@dataclass(frozen=True)
1407
class WorkflowReplayResults:
1408
"""Results of replaying multiple workflows."""
1409
1410
replay_failures: Mapping[str, Exception]
1411
"""Replay failures, keyed by run ID."""
1412
```
1413
1414
### Replay Testing Example
1415
1416
```python
1417
import asyncio
1418
from temporalio import client
1419
from temporalio.worker import Replayer, WorkflowReplayResult
1420
1421
async def test_workflow_replay():
1422
# Create client to fetch workflow history
1423
client_instance = await client.Client.connect("localhost:7233")
1424
1425
# Get workflow history
1426
workflow_handle = client_instance.get_workflow_handle("my-workflow-id")
1427
history = await workflow_handle.fetch_history()
1428
1429
# Create replayer with current workflow implementation
1430
replayer = Replayer(
1431
workflows=[MyWorkflow], # Current version of workflow
1432
namespace="my-namespace",
1433
)
1434
1435
# Test replay compatibility
1436
try:
1437
result = await replayer.replay_workflow(history)
1438
print("Replay successful - workflow is compatible")
1439
return True
1440
except Exception as e:
1441
print(f"Replay failed - workflow incompatible: {e}")
1442
return False
1443
1444
async def test_multiple_workflows():
1445
# Create replayer
1446
replayer = Replayer(workflows=[MyWorkflow])
1447
1448
# Create async iterator of histories
1449
async def history_generator():
1450
for workflow_id in ["wf-1", "wf-2", "wf-3"]:
1451
handle = client_instance.get_workflow_handle(workflow_id)
1452
history = await handle.fetch_history()
1453
yield history
1454
1455
# Replay all workflows
1456
results = await replayer.replay_workflows(
1457
history_generator(),
1458
raise_on_replay_failure=False # Don't raise on first failure
1459
)
1460
1461
if results.replay_failures:
1462
print(f"Replay failures: {len(results.replay_failures)}")
1463
for run_id, error in results.replay_failures.items():
1464
print(f" {run_id}: {error}")
1465
else:
1466
print("All replays successful")
1467
1468
# Test with history from JSON file
1469
async def test_replay_from_json():
1470
# Load history from tctl export or web UI download
1471
with open("workflow_history.json", "r") as f:
1472
history_json = f.read()
1473
1474
history = temporalio.client.WorkflowHistory.from_json(
1475
workflow_id="my-workflow-id",
1476
json_str=history_json
1477
)
1478
1479
replayer = Replayer(workflows=[MyWorkflow])
1480
result = await replayer.replay_workflow(history)
1481
1482
return result.replay_failure is None
1483
```
1484
1485
## Advanced Worker Features
1486
1487
### Shared State Management
1488
1489
Cross-process synchronization for non-async activities:
1490
1491
```python { .api }
1492
class SharedStateManager:
1493
"""Used for obtaining cross-process friendly synchronization primitives.
1494
1495
This is required for non-async activities where the activity_executor
1496
is not a ThreadPoolExecutor. Reuse of these across workers is encouraged.
1497
"""
1498
```
1499
1500
### Shared Heartbeat Sender
1501
1502
Coordinated heartbeat management:
1503
1504
```python { .api }
1505
class SharedHeartbeatSender:
1506
"""Shared heartbeat sender for activities.
1507
1508
Used to coordinate heartbeat sending across multiple workers or processes.
1509
"""
1510
```
1511
1512
### Workflow Runners
1513
1514
#### WorkflowRunner
1515
1516
Base class for workflow execution:
1517
1518
```python { .api }
1519
class WorkflowRunner:
1520
"""Base class for workflow runners."""
1521
```
1522
1523
#### SandboxedWorkflowRunner
1524
1525
Default sandboxed workflow execution:
1526
1527
```python { .api }
1528
class SandboxedWorkflowRunner(WorkflowRunner):
1529
"""Sandboxed workflow runner.
1530
1531
Provides isolation and deterministic execution for workflows.
1532
"""
1533
```
1534
1535
#### UnsandboxedWorkflowRunner
1536
1537
Unsandboxed workflow execution for special cases:
1538
1539
```python { .api }
1540
class UnsandboxedWorkflowRunner(WorkflowRunner):
1541
"""Unsandboxed workflow runner.
1542
1543
Allows workflows to opt-out of sandboxing restrictions.
1544
WARNING: Use with caution as this can break workflow determinism.
1545
"""
1546
```
1547
1548
### Workflow Instance Management
1549
1550
```python { .api }
1551
class WorkflowInstance:
1552
"""Workflow instance representation."""
1553
1554
class WorkflowInstanceDetails:
1555
"""Detailed information about a workflow instance."""
1556
```
1557
1558
### Build ID Management
1559
1560
```python { .api }
1561
def load_default_build_id(*, memoize: bool = True) -> str:
1562
"""Load the default worker build ID.
1563
1564
The worker build ID is a unique hash representing the entire set of code
1565
including Temporal code and external code. The default here is currently
1566
implemented by walking loaded modules and hashing their bytecode into a
1567
common hash.
1568
1569
Args:
1570
memoize: If true, the default, this will cache to a global variable to
1571
keep from having to run again on successive calls.
1572
1573
Returns:
1574
Unique identifier representing the set of running code.
1575
"""
1576
```
1577
1578
### Advanced Configuration Example
1579
1580
```python
1581
from temporalio import worker, client
1582
from temporalio.worker import (
1583
SharedStateManager,
1584
UnsandboxedWorkflowRunner,
1585
PollerBehaviorAutoscaling,
1586
WorkerTuner
1587
)
1588
import concurrent.futures
1589
from datetime import timedelta
1590
1591
async def create_advanced_worker():
1592
client_instance = await client.Client.connect("localhost:7233")
1593
1594
# Custom executors
1595
activity_executor = concurrent.futures.ThreadPoolExecutor(
1596
max_workers=50,
1597
thread_name_prefix="activity-"
1598
)
1599
1600
workflow_executor = concurrent.futures.ThreadPoolExecutor(
1601
max_workers=20,
1602
thread_name_prefix="workflow-"
1603
)
1604
1605
# Resource-based tuning
1606
tuner = WorkerTuner.create_resource_based(
1607
target_memory_usage=0.75,
1608
target_cpu_usage=0.85,
1609
)
1610
1611
# Custom error handler
1612
async def handle_fatal_error(error: Exception):
1613
print(f"Worker fatal error: {error}")
1614
# Could send alerts, write to log files, etc.
1615
1616
# Advanced worker configuration
1617
worker_instance = worker.Worker(
1618
client_instance,
1619
task_queue="advanced-task-queue",
1620
workflows=[MyWorkflow, MyOtherWorkflow],
1621
activities=[my_activity, my_other_activity],
1622
activity_executor=activity_executor,
1623
workflow_task_executor=workflow_executor,
1624
workflow_runner=worker.workflow_sandbox.SandboxedWorkflowRunner(),
1625
unsandboxed_workflow_runner=UnsandboxedWorkflowRunner(),
1626
tuner=tuner,
1627
max_cached_workflows=2000,
1628
workflow_task_poller_behavior=PollerBehaviorAutoscaling(
1629
minimum=3,
1630
maximum=30,
1631
initial=10
1632
),
1633
activity_task_poller_behavior=PollerBehaviorAutoscaling(
1634
minimum=5,
1635
maximum=50,
1636
initial=15
1637
),
1638
sticky_queue_schedule_to_start_timeout=timedelta(seconds=30),
1639
max_heartbeat_throttle_interval=timedelta(seconds=120),
1640
default_heartbeat_throttle_interval=timedelta(seconds=45),
1641
max_activities_per_second=100.0,
1642
graceful_shutdown_timeout=timedelta(seconds=30),
1643
shared_state_manager=SharedStateManager(),
1644
debug_mode=False,
1645
disable_eager_activity_execution=False,
1646
on_fatal_error=handle_fatal_error,
1647
use_worker_versioning=True,
1648
build_id="my-app-v1.2.3-abc123",
1649
interceptors=[MyLoggingInterceptor(), MyMetricsInterceptor()],
1650
)
1651
1652
return worker_instance
1653
1654
# Usage
1655
async def main():
1656
worker_instance = await create_advanced_worker()
1657
1658
try:
1659
print("Starting advanced worker...")
1660
await worker_instance.run()
1661
except KeyboardInterrupt:
1662
print("Shutting down worker...")
1663
await worker_instance.shutdown()
1664
finally:
1665
print("Worker shutdown complete")
1666
1667
if __name__ == "__main__":
1668
asyncio.run(main())
1669
```
1670
1671
The Worker Management module provides comprehensive control over workflow and activity execution in Temporal applications. From basic worker setup to advanced tuning, interceptors, and replay testing, this module gives you the tools to build robust, scalable distributed systems with fine-grained control over resource management and execution behavior.