0
# Session Management
1
2
Core session management for coordinating distributed test execution, including worker lifecycle management and result aggregation.
3
4
## Capabilities
5
6
### Distributed Session
7
8
The main class that coordinates distributed test execution across multiple worker processes.
9
10
```python { .api }
11
class DSession:
12
"""
13
A pytest plugin which runs a distributed test session.
14
15
Creates and manages worker nodes, coordinates test distribution,
16
and aggregates results from all workers.
17
"""
18
19
def __init__(self, config: pytest.Config) -> None:
20
"""
21
Initialize distributed session.
22
23
Args:
24
config: pytest configuration object
25
"""
26
27
@property
28
def session_finished(self) -> bool:
29
"""
30
Return True if the distributed session has finished.
31
32
This means all nodes have executed all test items and
33
the session is ready to shut down.
34
35
Returns:
36
bool: True if session is complete
37
"""
38
39
def report_line(self, line: str) -> None:
40
"""
41
Report a line of output if verbose mode is enabled.
42
43
Args:
44
line: Output line to report
45
"""
46
```
47
48
### Session Lifecycle Hooks
49
50
Hooks that manage the distributed session lifecycle.
51
52
```python { .api }
53
def pytest_sessionstart(self, session: pytest.Session) -> None:
54
"""
55
Hook called at session start to create and start worker nodes.
56
57
Sets up the NodeManager, creates worker processes, and
58
initializes the test distribution system.
59
60
Args:
61
session: pytest session object
62
"""
63
64
def pytest_sessionfinish(self) -> None:
65
"""
66
Hook called at session end to clean up worker nodes.
67
68
Shuts down all worker processes and cleans up resources.
69
"""
70
71
def pytest_runtestloop(self) -> bool:
72
"""
73
Main test execution loop for distributed testing.
74
75
Coordinates test distribution, worker communication,
76
and result aggregation until all tests are complete.
77
78
Returns:
79
bool: True to indicate loop handled execution
80
"""
81
```
82
83
### Node Management
84
85
Management of worker node lifecycle and communication.
86
87
```python { .api }
88
class NodeManager:
89
"""Manages worker node creation, setup, and teardown."""
90
91
def __init__(
92
self,
93
config: pytest.Config,
94
specs: Sequence[execnet.XSpec | str] | None = None,
95
defaultchdir: str = "pyexecnetcache",
96
) -> None:
97
"""
98
Initialize node manager.
99
100
Args:
101
config: pytest configuration
102
specs: execution specifications for workers
103
defaultchdir: default working directory for workers
104
"""
105
106
def setup_nodes(
107
self,
108
putevent: Callable[[tuple[str, dict[str, Any]]], None],
109
) -> list[WorkerController]:
110
"""
111
Set up all worker nodes.
112
113
Args:
114
putevent: callback for worker events
115
116
Returns:
117
List of worker controllers
118
"""
119
120
def setup_node(
121
self,
122
spec: execnet.XSpec,
123
putevent: Callable[[tuple[str, dict[str, Any]]], None],
124
) -> WorkerController:
125
"""
126
Set up a single worker node.
127
128
Args:
129
spec: execution specification for worker
130
putevent: callback for worker events
131
132
Returns:
133
Worker controller instance
134
"""
135
136
def teardown_nodes(self) -> None:
137
"""Shut down all worker nodes and clean up resources."""
138
139
def rsync_roots(self, gateway: execnet.Gateway) -> None:
140
"""
141
Rsync configured directories to worker node.
142
143
Args:
144
gateway: execnet gateway to worker
145
"""
146
```
147
148
### Worker Controller
149
150
Controls individual worker processes and their communication.
151
152
```python { .api }
153
class WorkerController:
154
"""Controls a single worker process."""
155
156
def send_runtest_some(self, indices: Sequence[int]) -> None:
157
"""
158
Send test items to worker for execution.
159
160
Args:
161
indices: indices of test items to run
162
"""
163
164
def send_runtest_all(self) -> None:
165
"""Send all collected tests to worker for execution."""
166
167
def shutdown(self) -> None:
168
"""Shut down the worker process."""
169
170
def pytest_runtest_protocol_complete(
171
self,
172
item_index: int,
173
duration: float,
174
) -> None:
175
"""
176
Handle completion of test item execution.
177
178
Args:
179
item_index: index of completed test
180
duration: execution duration in seconds
181
"""
182
```
183
184
### Event Processing
185
186
Worker event processing and coordination.
187
188
```python { .api }
189
def worker_workerready(self, node: WorkerController) -> None:
190
"""
191
Handle worker ready event.
192
193
Args:
194
node: worker that became ready
195
"""
196
197
def worker_workerfinished(self, node: WorkerController) -> None:
198
"""
199
Handle worker finished event.
200
201
Args:
202
node: worker that finished
203
"""
204
205
def worker_runtest_logreport(
206
self,
207
node: WorkerController,
208
report: pytest.TestReport,
209
) -> None:
210
"""
211
Handle test report from worker.
212
213
Args:
214
node: worker that sent report
215
report: test execution report
216
"""
217
218
def worker_collectreport(
219
self,
220
node: WorkerController,
221
report: pytest.CollectReport,
222
) -> None:
223
"""
224
Handle collection report from worker.
225
226
Args:
227
node: worker that sent report
228
report: collection report
229
"""
230
231
def worker_logstart(
232
self,
233
node: WorkerController,
234
nodeid: str,
235
) -> None:
236
"""
237
Handle test start logging from worker.
238
239
Args:
240
node: worker that started test
241
nodeid: test node identifier
242
"""
243
```
244
245
### Configuration Utilities
246
247
Utilities for parsing and managing worker configuration.
248
249
```python { .api }
250
def parse_tx_spec_config(config: pytest.Config) -> list[str]:
251
"""
252
Parse tx specification configuration into list of specs.
253
254
Handles multiplication syntax (e.g., "4*popen" -> ["popen", "popen", "popen", "popen"])
255
256
Args:
257
config: pytest configuration object
258
259
Returns:
260
List of execution specifications
261
262
Raises:
263
pytest.UsageError: If no tx specs are provided
264
"""
265
266
def get_default_max_worker_restart(config: pytest.Config) -> int:
267
"""
268
Get the default maximum worker restart count.
269
270
Args:
271
config: pytest configuration object
272
273
Returns:
274
Maximum number of worker restarts allowed
275
"""
276
```
277
278
## Usage Examples
279
280
### Custom Session Event Handler
281
282
```python
283
# In conftest.py
284
import pytest
285
286
class CustomSessionHandler:
287
def pytest_sessionstart(self, session):
288
if hasattr(session.config, 'workerinput'):
289
# Worker session start
290
self.setup_worker_session()
291
else:
292
# Controller session start
293
self.setup_controller_session()
294
295
def pytest_sessionfinish(self, session):
296
if hasattr(session.config, 'workerinput'):
297
# Worker session cleanup
298
self.cleanup_worker_session()
299
else:
300
# Controller session cleanup
301
self.cleanup_controller_session()
302
303
def pytest_configure(config):
304
if config.pluginmanager.hasplugin("dsession"):
305
# xdist is active, register our handler
306
config.pluginmanager.register(CustomSessionHandler())
307
```
308
309
### Worker Event Monitoring
310
311
```python
312
import pytest
313
from xdist.dsession import DSession
314
315
class WorkerMonitor:
316
def __init__(self):
317
self.worker_stats = {}
318
319
def worker_workerready(self, node):
320
worker_id = node.workerinput['workerid']
321
self.worker_stats[worker_id] = {
322
'ready_time': time.time(),
323
'tests_run': 0,
324
'failures': 0
325
}
326
print(f"Worker {worker_id} is ready")
327
328
def worker_runtest_logreport(self, node, report):
329
worker_id = node.workerinput['workerid']
330
if worker_id in self.worker_stats:
331
self.worker_stats[worker_id]['tests_run'] += 1
332
if report.failed:
333
self.worker_stats[worker_id]['failures'] += 1
334
335
def worker_workerfinished(self, node):
336
worker_id = node.workerinput['workerid']
337
if worker_id in self.worker_stats:
338
stats = self.worker_stats[worker_id]
339
print(f"Worker {worker_id} finished: "
340
f"{stats['tests_run']} tests, "
341
f"{stats['failures']} failures")
342
343
def pytest_configure(config):
344
if config.pluginmanager.hasplugin("dsession"):
345
monitor = WorkerMonitor()
346
config.pluginmanager.register(monitor)
347
```
348
349
### Custom Node Setup
350
351
```python
352
import pytest
353
from xdist.nodemanage import NodeManager
354
355
def pytest_xdist_setupnodes(config, specs):
356
"""Hook called before nodes are set up."""
357
print(f"Setting up {len(specs)} worker nodes")
358
359
# Custom pre-setup logic
360
for i, spec in enumerate(specs):
361
print(f"Worker {i} spec: {spec}")
362
363
def pytest_configure_node(node):
364
"""Hook called to configure each worker node."""
365
worker_id = node.workerinput['workerid']
366
367
# Custom worker configuration
368
node.config.option.custom_worker_id = worker_id
369
print(f"Configured worker {worker_id}")
370
371
def pytest_testnodeready(node):
372
"""Hook called when worker is ready."""
373
worker_id = node.workerinput['workerid']
374
print(f"Worker {worker_id} is ready for tests")
375
376
def pytest_testnodedown(node, error):
377
"""Hook called when worker goes down."""
378
worker_id = node.workerinput.get('workerid', 'unknown')
379
if error:
380
print(f"Worker {worker_id} crashed: {error}")
381
else:
382
print(f"Worker {worker_id} shut down cleanly")
383
```
384
385
### Session State Management
386
387
```python
388
import pytest
389
from collections import defaultdict
390
391
class DistributedSessionState:
392
"""Manage state across distributed session."""
393
394
def __init__(self):
395
self.controller_state = {}
396
self.worker_results = defaultdict(list)
397
self.session_start_time = None
398
399
def pytest_sessionstart(self, session):
400
self.session_start_time = time.time()
401
402
if hasattr(session.config, 'workerinput'):
403
# Worker session
404
worker_id = session.config.workerinput['workerid']
405
self.setup_worker_state(worker_id)
406
else:
407
# Controller session
408
self.setup_controller_state()
409
410
def setup_controller_state(self):
411
"""Set up controller-specific state."""
412
self.controller_state['total_workers'] = len(
413
self.config.getoption('tx') or []
414
)
415
self.controller_state['active_workers'] = set()
416
417
def setup_worker_state(self, worker_id):
418
"""Set up worker-specific state."""
419
# Worker state is local to each process
420
pass
421
422
def worker_workerready(self, node):
423
"""Track worker readiness in controller."""
424
worker_id = node.workerinput['workerid']
425
self.controller_state['active_workers'].add(worker_id)
426
427
def pytest_runtest_logreport(self, report):
428
"""Collect test results."""
429
if hasattr(self.config, 'workerinput'):
430
# In worker - collect local results
431
worker_id = self.config.workerinput['workerid']
432
self.worker_results[worker_id].append(report)
433
434
# Usage
435
state_manager = DistributedSessionState()
436
437
def pytest_configure(config):
438
state_manager.config = config
439
config.pluginmanager.register(state_manager)
440
```