or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

distribution-scheduling.mdhook-specifications.mdindex.mdloop-on-fail.mdplugin-configuration.mdsession-management.mdworker-detection.md

session-management.mddocs/

0

# Session Management

1

2

Core session management for coordinating distributed test execution, including worker lifecycle management and result aggregation.

3

4

## Capabilities

5

6

### Distributed Session

7

8

The main class that coordinates distributed test execution across multiple worker processes.

9

10

```python { .api }

11

class DSession:

12

"""

13

A pytest plugin which runs a distributed test session.

14

15

Creates and manages worker nodes, coordinates test distribution,

16

and aggregates results from all workers.

17

"""

18

19

def __init__(self, config: pytest.Config) -> None:

20

"""

21

Initialize distributed session.

22

23

Args:

24

config: pytest configuration object

25

"""

26

27

@property

28

def session_finished(self) -> bool:

29

"""

30

Return True if the distributed session has finished.

31

32

This means all nodes have executed all test items and

33

the session is ready to shut down.

34

35

Returns:

36

bool: True if session is complete

37

"""

38

39

def report_line(self, line: str) -> None:

40

"""

41

Report a line of output if verbose mode is enabled.

42

43

Args:

44

line: Output line to report

45

"""

46

```

47

48

### Session Lifecycle Hooks

49

50

Hooks that manage the distributed session lifecycle.

51

52

```python { .api }

53

def pytest_sessionstart(self, session: pytest.Session) -> None:

54

"""

55

Hook called at session start to create and start worker nodes.

56

57

Sets up the NodeManager, creates worker processes, and

58

initializes the test distribution system.

59

60

Args:

61

session: pytest session object

62

"""

63

64

def pytest_sessionfinish(self) -> None:

65

"""

66

Hook called at session end to clean up worker nodes.

67

68

Shuts down all worker processes and cleans up resources.

69

"""

70

71

def pytest_runtestloop(self) -> bool:

72

"""

73

Main test execution loop for distributed testing.

74

75

Coordinates test distribution, worker communication,

76

and result aggregation until all tests are complete.

77

78

Returns:

79

bool: True to indicate loop handled execution

80

"""

81

```

82

83

### Node Management

84

85

Management of worker node lifecycle and communication.

86

87

```python { .api }

88

class NodeManager:

89

"""Manages worker node creation, setup, and teardown."""

90

91

def __init__(

92

self,

93

config: pytest.Config,

94

specs: Sequence[execnet.XSpec | str] | None = None,

95

defaultchdir: str = "pyexecnetcache",

96

) -> None:

97

"""

98

Initialize node manager.

99

100

Args:

101

config: pytest configuration

102

specs: execution specifications for workers

103

defaultchdir: default working directory for workers

104

"""

105

106

def setup_nodes(

107

self,

108

putevent: Callable[[tuple[str, dict[str, Any]]], None],

109

) -> list[WorkerController]:

110

"""

111

Set up all worker nodes.

112

113

Args:

114

putevent: callback for worker events

115

116

Returns:

117

List of worker controllers

118

"""

119

120

def setup_node(

121

self,

122

spec: execnet.XSpec,

123

putevent: Callable[[tuple[str, dict[str, Any]]], None],

124

) -> WorkerController:

125

"""

126

Set up a single worker node.

127

128

Args:

129

spec: execution specification for worker

130

putevent: callback for worker events

131

132

Returns:

133

Worker controller instance

134

"""

135

136

def teardown_nodes(self) -> None:

137

"""Shut down all worker nodes and clean up resources."""

138

139

def rsync_roots(self, gateway: execnet.Gateway) -> None:

140

"""

141

Rsync configured directories to worker node.

142

143

Args:

144

gateway: execnet gateway to worker

145

"""

146

```

147

148

### Worker Controller

149

150

Controls individual worker processes and their communication.

151

152

```python { .api }

153

class WorkerController:

154

"""Controls a single worker process."""

155

156

def send_runtest_some(self, indices: Sequence[int]) -> None:

157

"""

158

Send test items to worker for execution.

159

160

Args:

161

indices: indices of test items to run

162

"""

163

164

def send_runtest_all(self) -> None:

165

"""Send all collected tests to worker for execution."""

166

167

def shutdown(self) -> None:

168

"""Shut down the worker process."""

169

170

def pytest_runtest_protocol_complete(

171

self,

172

item_index: int,

173

duration: float,

174

) -> None:

175

"""

176

Handle completion of test item execution.

177

178

Args:

179

item_index: index of completed test

180

duration: execution duration in seconds

181

"""

182

```

183

184

### Event Processing

185

186

Worker event processing and coordination.

187

188

```python { .api }

189

def worker_workerready(self, node: WorkerController) -> None:

190

"""

191

Handle worker ready event.

192

193

Args:

194

node: worker that became ready

195

"""

196

197

def worker_workerfinished(self, node: WorkerController) -> None:

198

"""

199

Handle worker finished event.

200

201

Args:

202

node: worker that finished

203

"""

204

205

def worker_runtest_logreport(

206

self,

207

node: WorkerController,

208

report: pytest.TestReport,

209

) -> None:

210

"""

211

Handle test report from worker.

212

213

Args:

214

node: worker that sent report

215

report: test execution report

216

"""

217

218

def worker_collectreport(

219

self,

220

node: WorkerController,

221

report: pytest.CollectReport,

222

) -> None:

223

"""

224

Handle collection report from worker.

225

226

Args:

227

node: worker that sent report

228

report: collection report

229

"""

230

231

def worker_logstart(

232

self,

233

node: WorkerController,

234

nodeid: str,

235

) -> None:

236

"""

237

Handle test start logging from worker.

238

239

Args:

240

node: worker that started test

241

nodeid: test node identifier

242

"""

243

```

244

245

### Configuration Utilities

246

247

Utilities for parsing and managing worker configuration.

248

249

```python { .api }

250

def parse_tx_spec_config(config: pytest.Config) -> list[str]:

251

"""

252

Parse tx specification configuration into list of specs.

253

254

Handles multiplication syntax (e.g., "4*popen" -> ["popen", "popen", "popen", "popen"])

255

256

Args:

257

config: pytest configuration object

258

259

Returns:

260

List of execution specifications

261

262

Raises:

263

pytest.UsageError: If no tx specs are provided

264

"""

265

266

def get_default_max_worker_restart(config: pytest.Config) -> int:

267

"""

268

Get the default maximum worker restart count.

269

270

Args:

271

config: pytest configuration object

272

273

Returns:

274

Maximum number of worker restarts allowed

275

"""

276

```

277

278

## Usage Examples

279

280

### Custom Session Event Handler

281

282

```python

283

# In conftest.py

284

import pytest

285

286

class CustomSessionHandler:

287

def pytest_sessionstart(self, session):

288

if hasattr(session.config, 'workerinput'):

289

# Worker session start

290

self.setup_worker_session()

291

else:

292

# Controller session start

293

self.setup_controller_session()

294

295

def pytest_sessionfinish(self, session):

296

if hasattr(session.config, 'workerinput'):

297

# Worker session cleanup

298

self.cleanup_worker_session()

299

else:

300

# Controller session cleanup

301

self.cleanup_controller_session()

302

303

def pytest_configure(config):

304

if config.pluginmanager.hasplugin("dsession"):

305

# xdist is active, register our handler

306

config.pluginmanager.register(CustomSessionHandler())

307

```

308

309

### Worker Event Monitoring

310

311

```python

312

import pytest

313

from xdist.dsession import DSession

314

315

class WorkerMonitor:

316

def __init__(self):

317

self.worker_stats = {}

318

319

def worker_workerready(self, node):

320

worker_id = node.workerinput['workerid']

321

self.worker_stats[worker_id] = {

322

'ready_time': time.time(),

323

'tests_run': 0,

324

'failures': 0

325

}

326

print(f"Worker {worker_id} is ready")

327

328

def worker_runtest_logreport(self, node, report):

329

worker_id = node.workerinput['workerid']

330

if worker_id in self.worker_stats:

331

self.worker_stats[worker_id]['tests_run'] += 1

332

if report.failed:

333

self.worker_stats[worker_id]['failures'] += 1

334

335

def worker_workerfinished(self, node):

336

worker_id = node.workerinput['workerid']

337

if worker_id in self.worker_stats:

338

stats = self.worker_stats[worker_id]

339

print(f"Worker {worker_id} finished: "

340

f"{stats['tests_run']} tests, "

341

f"{stats['failures']} failures")

342

343

def pytest_configure(config):

344

if config.pluginmanager.hasplugin("dsession"):

345

monitor = WorkerMonitor()

346

config.pluginmanager.register(monitor)

347

```

348

349

### Custom Node Setup

350

351

```python

352

import pytest

353

from xdist.nodemanage import NodeManager

354

355

def pytest_xdist_setupnodes(config, specs):

356

"""Hook called before nodes are set up."""

357

print(f"Setting up {len(specs)} worker nodes")

358

359

# Custom pre-setup logic

360

for i, spec in enumerate(specs):

361

print(f"Worker {i} spec: {spec}")

362

363

def pytest_configure_node(node):

364

"""Hook called to configure each worker node."""

365

worker_id = node.workerinput['workerid']

366

367

# Custom worker configuration

368

node.config.option.custom_worker_id = worker_id

369

print(f"Configured worker {worker_id}")

370

371

def pytest_testnodeready(node):

372

"""Hook called when worker is ready."""

373

worker_id = node.workerinput['workerid']

374

print(f"Worker {worker_id} is ready for tests")

375

376

def pytest_testnodedown(node, error):

377

"""Hook called when worker goes down."""

378

worker_id = node.workerinput.get('workerid', 'unknown')

379

if error:

380

print(f"Worker {worker_id} crashed: {error}")

381

else:

382

print(f"Worker {worker_id} shut down cleanly")

383

```

384

385

### Session State Management

386

387

```python

388

import pytest

389

from collections import defaultdict

390

391

class DistributedSessionState:

392

"""Manage state across distributed session."""

393

394

def __init__(self):

395

self.controller_state = {}

396

self.worker_results = defaultdict(list)

397

self.session_start_time = None

398

399

def pytest_sessionstart(self, session):

400

self.session_start_time = time.time()

401

402

if hasattr(session.config, 'workerinput'):

403

# Worker session

404

worker_id = session.config.workerinput['workerid']

405

self.setup_worker_state(worker_id)

406

else:

407

# Controller session

408

self.setup_controller_state()

409

410

def setup_controller_state(self):

411

"""Set up controller-specific state."""

412

self.controller_state['total_workers'] = len(

413

self.config.getoption('tx') or []

414

)

415

self.controller_state['active_workers'] = set()

416

417

def setup_worker_state(self, worker_id):

418

"""Set up worker-specific state."""

419

# Worker state is local to each process

420

pass

421

422

def worker_workerready(self, node):

423

"""Track worker readiness in controller."""

424

worker_id = node.workerinput['workerid']

425

self.controller_state['active_workers'].add(worker_id)

426

427

def pytest_runtest_logreport(self, report):

428

"""Collect test results."""

429

if hasattr(self.config, 'workerinput'):

430

# In worker - collect local results

431

worker_id = self.config.workerinput['workerid']

432

self.worker_results[worker_id].append(report)

433

434

# Usage

435

state_manager = DistributedSessionState()

436

437

def pytest_configure(config):

438

state_manager.config = config

439

config.pluginmanager.register(state_manager)

440

```