0
# Hook Specifications
1
2
Custom pytest hooks specific to xdist for plugin authors to extend and customize distributed testing behavior.
3
4
## Capabilities
5
6
### Node Setup and Lifecycle Hooks
7
8
Hooks for managing worker node creation, configuration, and lifecycle.
9
10
```python { .api }
11
def pytest_xdist_setupnodes(
12
config: pytest.Config,
13
specs: Sequence[execnet.XSpec]
14
) -> None:
15
"""
16
Called before any remote node is set up.
17
18
This hook allows plugins to perform global setup before
19
workers are created and started.
20
21
Args:
22
config: pytest configuration object
23
specs: list of execution specifications for workers
24
"""
25
26
def pytest_xdist_newgateway(gateway: execnet.Gateway) -> None:
27
"""
28
Called on new raw gateway creation.
29
30
Allows customization of newly created execnet gateways
31
before they're used for worker communication.
32
33
Args:
34
gateway: newly created execnet gateway
35
"""
36
37
def pytest_configure_node(node: WorkerController) -> None:
38
"""
39
Configure node information before it gets instantiated.
40
41
Called for each worker before it starts executing tests,
42
allowing per-worker customization.
43
44
Args:
45
node: worker controller being configured
46
"""
47
48
def pytest_testnodeready(node: WorkerController) -> None:
49
"""
50
Test Node is ready to operate.
51
52
Called when a worker has finished initialization and
53
is ready to receive and execute tests.
54
55
Args:
56
node: worker controller that became ready
57
"""
58
59
def pytest_testnodedown(node: WorkerController, error: object | None) -> None:
60
"""
61
Test Node is down.
62
63
Called when a worker shuts down, either cleanly or due to an error.
64
65
Args:
66
node: worker controller that went down
67
error: exception that caused shutdown, or None for clean shutdown
68
"""
69
```
70
71
### Collection and Scheduling Hooks
72
73
Hooks for customizing test collection and distribution scheduling.
74
75
```python { .api }
76
def pytest_xdist_node_collection_finished(
77
node: WorkerController,
78
ids: Sequence[str]
79
) -> None:
80
"""
81
Called by the controller node when a worker node finishes collecting.
82
83
Allows processing of collected test IDs before scheduling begins.
84
85
Args:
86
node: worker that finished collection
87
ids: list of collected test item IDs
88
"""
89
90
def pytest_xdist_make_scheduler(
91
config: pytest.Config,
92
log: Producer
93
) -> Scheduling | None:
94
"""
95
Return a node scheduler implementation.
96
97
Hook for providing custom test distribution schedulers.
98
Return None to use the default scheduler for the selected distribution mode.
99
100
Args:
101
config: pytest configuration object
102
log: logging producer for scheduler messages
103
104
Returns:
105
Custom scheduler instance or None for default
106
"""
107
108
def pytest_xdist_auto_num_workers(config: pytest.Config) -> int:
109
"""
110
Return the number of workers to spawn when --numprocesses=auto is given.
111
112
Hook for customizing automatic worker count detection.
113
114
Args:
115
config: pytest configuration object
116
117
Returns:
118
Number of workers to spawn
119
120
Raises:
121
NotImplementedError: If not implemented by any plugin
122
"""
123
```
124
125
### Error Handling and Recovery Hooks
126
127
Hooks for handling worker crashes and test failures.
128
129
```python { .api }
130
def pytest_handlecrashitem(
131
crashitem: str,
132
report: pytest.TestReport,
133
sched: Scheduling
134
) -> None:
135
"""
136
Handle a crashitem, modifying the report if necessary.
137
138
Called when a test item causes a worker crash. The scheduler
139
is provided to allow rescheduling the test if desired.
140
141
Args:
142
crashitem: identifier of test that caused crash
143
report: test report for the crashed test
144
sched: scheduler instance for potential rescheduling
145
146
Example:
147
def pytest_handlecrashitem(crashitem, report, sched):
148
if should_rerun(crashitem):
149
sched.mark_test_pending(crashitem)
150
report.outcome = "rerun"
151
"""
152
```
153
154
### Remote Module Hooks (Advanced)
155
156
Hook for customizing remote worker module loading.
157
158
```python { .api }
159
def pytest_xdist_getremotemodule() -> Any:
160
"""
161
Called when creating remote node.
162
163
Hook for providing custom remote worker module implementations.
164
Advanced usage for specialized worker behaviors.
165
166
Returns:
167
Custom remote module or None for default
168
"""
169
```
170
171
### Deprecated Rsync Hooks
172
173
Legacy hooks for rsync functionality (deprecated, will be removed in pytest-xdist 4.0).
174
175
```python { .api }
176
def pytest_xdist_rsyncstart(
177
source: str | os.PathLike[str],
178
gateways: Sequence[execnet.Gateway],
179
) -> None:
180
"""
181
Called before rsyncing a directory to remote gateways takes place.
182
183
DEPRECATED: rsync feature is deprecated and will be removed in pytest-xdist 4.0
184
185
Args:
186
source: source directory being rsynced
187
gateways: target gateways for rsync
188
"""
189
190
def pytest_xdist_rsyncfinish(
191
source: str | os.PathLike[str],
192
gateways: Sequence[execnet.Gateway],
193
) -> None:
194
"""
195
Called after rsyncing a directory to remote gateways takes place.
196
197
DEPRECATED: rsync feature is deprecated and will be removed in pytest-xdist 4.0
198
199
Args:
200
source: source directory that was rsynced
201
gateways: target gateways for rsync
202
"""
203
```
204
205
## Usage Examples
206
207
### Custom Worker Count Detection
208
209
```python
210
# In conftest.py
211
import pytest
212
import psutil
213
214
def pytest_xdist_auto_num_workers(config):
215
"""Custom worker count based on available resources."""
216
217
# Check available memory
218
memory_gb = psutil.virtual_memory().total / (1024**3)
219
220
# Check if running resource-intensive tests
221
if config.getoption("--memory-intensive"):
222
# Use fewer workers for memory-heavy tests
223
if memory_gb < 8:
224
return 2
225
elif memory_gb < 16:
226
return 4
227
else:
228
return 8
229
230
# Check CPU characteristics
231
cpu_count = psutil.cpu_count(logical=False) # Physical cores only
232
233
# Custom logic for different scenarios
234
if config.getoption("--io-bound"):
235
# I/O bound tests can use more workers
236
return cpu_count * 2
237
elif config.getoption("--cpu-bound"):
238
# CPU bound tests should match core count
239
return cpu_count
240
241
# Default to letting pytest-xdist decide
242
return None
243
```
244
245
### Custom Scheduler Implementation
246
247
```python
248
# In conftest.py
249
import pytest
250
from xdist.scheduler.protocol import Scheduling
251
from xdist.workermanage import WorkerController
252
from typing import Sequence
253
254
class PriorityScheduler:
255
"""Custom scheduler that prioritizes certain tests."""
256
257
def __init__(self, config, log):
258
self.config = config
259
self.log = log
260
self._nodes = []
261
self._pending_priority = []
262
self._pending_normal = []
263
self._collection_complete = False
264
265
@property
266
def nodes(self) -> list[WorkerController]:
267
return self._nodes
268
269
@property
270
def collection_is_completed(self) -> bool:
271
return self._collection_complete
272
273
@property
274
def tests_finished(self) -> bool:
275
return not (self._pending_priority or self._pending_normal)
276
277
@property
278
def has_pending(self) -> bool:
279
return bool(self._pending_priority or self._pending_normal)
280
281
def add_node_collection(self, node: WorkerController, collection: Sequence[str]) -> None:
282
# Separate high-priority tests
283
for test_id in collection:
284
if "priority" in test_id or "critical" in test_id:
285
self._pending_priority.append(test_id)
286
else:
287
self._pending_normal.append(test_id)
288
self._collection_complete = True
289
290
def schedule(self) -> None:
291
"""Schedule priority tests first, then normal tests."""
292
for node in self._nodes:
293
if node.is_ready():
294
# Schedule priority tests first
295
if self._pending_priority:
296
test = self._pending_priority.pop(0)
297
node.send_runtest(test)
298
elif self._pending_normal:
299
test = self._pending_normal.pop(0)
300
node.send_runtest(test)
301
302
# Implement other required methods...
303
304
def pytest_xdist_make_scheduler(config, log):
305
"""Use custom priority scheduler if requested."""
306
if config.getoption("--priority-scheduler"):
307
return PriorityScheduler(config, log)
308
return None
309
```
310
311
### Node Lifecycle Management
312
313
```python
314
# In conftest.py
315
import pytest
316
import time
317
318
class NodeLifecycleManager:
319
"""Monitor and manage worker node lifecycle."""
320
321
def __init__(self):
322
self.node_stats = {}
323
self.setup_start_time = None
324
325
def pytest_xdist_setupnodes(self, config, specs):
326
"""Called before node setup begins."""
327
self.setup_start_time = time.time()
328
print(f"Setting up {len(specs)} worker nodes...")
329
330
# Pre-setup validation
331
for i, spec in enumerate(specs):
332
print(f"Worker {i}: {spec}")
333
334
def pytest_configure_node(self, node):
335
"""Configure each worker node."""
336
worker_id = node.workerinput['workerid']
337
338
# Add custom configuration
339
node.config.option.worker_start_time = time.time()
340
341
# Track node
342
self.node_stats[worker_id] = {
343
'configured_at': time.time(),
344
'ready_at': None,
345
'finished_at': None,
346
'tests_executed': 0,
347
'errors': []
348
}
349
350
print(f"Configured worker {worker_id}")
351
352
def pytest_testnodeready(self, node):
353
"""Track when nodes become ready."""
354
worker_id = node.workerinput['workerid']
355
self.node_stats[worker_id]['ready_at'] = time.time()
356
357
setup_duration = time.time() - self.setup_start_time
358
print(f"Worker {worker_id} ready after {setup_duration:.2f}s")
359
360
def pytest_testnodedown(self, node, error):
361
"""Handle node shutdown."""
362
worker_id = node.workerinput.get('workerid', 'unknown')
363
364
if worker_id in self.node_stats:
365
self.node_stats[worker_id]['finished_at'] = time.time()
366
367
if error:
368
self.node_stats[worker_id]['errors'].append(str(error))
369
print(f"Worker {worker_id} crashed: {error}")
370
else:
371
print(f"Worker {worker_id} finished cleanly")
372
373
self.print_node_summary(worker_id)
374
375
def print_node_summary(self, worker_id):
376
"""Print summary statistics for a worker."""
377
if worker_id not in self.node_stats:
378
return
379
380
stats = self.node_stats[worker_id]
381
if stats['ready_at'] and stats['finished_at']:
382
runtime = stats['finished_at'] - stats['ready_at']
383
print(f"Worker {worker_id} runtime: {runtime:.2f}s, "
384
f"tests: {stats['tests_executed']}, "
385
f"errors: {len(stats['errors'])}")
386
387
def pytest_configure(config):
388
if config.pluginmanager.hasplugin("dsession"):
389
manager = NodeLifecycleManager()
390
config.pluginmanager.register(manager)
391
```
392
393
### Crash Recovery Handler
394
395
```python
396
# In conftest.py
397
import pytest
398
399
class CrashRecoveryHandler:
400
"""Handle worker crashes and implement recovery strategies."""
401
402
def __init__(self):
403
self.crash_counts = {}
404
self.max_retries = 3
405
406
def pytest_handlecrashitem(self, crashitem, report, sched):
407
"""Handle crashed test items."""
408
# Track crash frequency
409
if crashitem not in self.crash_counts:
410
self.crash_counts[crashitem] = 0
411
412
self.crash_counts[crashitem] += 1
413
414
# Retry logic
415
if self.crash_counts[crashitem] <= self.max_retries:
416
print(f"Retrying crashed test {crashitem} "
417
f"(attempt {self.crash_counts[crashitem]}/{self.max_retries})")
418
419
# Reschedule the test
420
sched.mark_test_pending(crashitem)
421
report.outcome = "rerun"
422
else:
423
print(f"Test {crashitem} failed {self.max_retries} times, marking as crashed")
424
report.outcome = "crashed"
425
# Could also mark as skipped or failed depending on needs
426
427
def pytest_configure(config):
428
if config.pluginmanager.hasplugin("dsession"):
429
handler = CrashRecoveryHandler()
430
config.pluginmanager.register(handler)
431
```
432
433
### Collection Processing
434
435
```python
436
# In conftest.py
437
import pytest
438
from collections import defaultdict
439
440
def pytest_xdist_node_collection_finished(node, ids):
441
"""Process collected test IDs."""
442
worker_id = node.workerinput['workerid']
443
444
# Analyze collected tests
445
test_counts = defaultdict(int)
446
for test_id in ids:
447
if '::' in test_id:
448
module = test_id.split('::')[0]
449
test_counts[module] += 1
450
451
print(f"Worker {worker_id} collected {len(ids)} tests:")
452
for module, count in sorted(test_counts.items()):
453
print(f" {module}: {count} tests")
454
455
# Could implement load balancing adjustments here
456
# based on collection results
457
```