or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

distribution-scheduling.mdhook-specifications.mdindex.mdloop-on-fail.mdplugin-configuration.mdsession-management.mdworker-detection.md

hook-specifications.mddocs/

0

# Hook Specifications

1

2

Custom pytest hooks specific to xdist for plugin authors to extend and customize distributed testing behavior.

3

4

## Capabilities

5

6

### Node Setup and Lifecycle Hooks

7

8

Hooks for managing worker node creation, configuration, and lifecycle.

9

10

```python { .api }

11

def pytest_xdist_setupnodes(

12

config: pytest.Config,

13

specs: Sequence[execnet.XSpec]

14

) -> None:

15

"""

16

Called before any remote node is set up.

17

18

This hook allows plugins to perform global setup before

19

workers are created and started.

20

21

Args:

22

config: pytest configuration object

23

specs: list of execution specifications for workers

24

"""

25

26

def pytest_xdist_newgateway(gateway: execnet.Gateway) -> None:

27

"""

28

Called on new raw gateway creation.

29

30

Allows customization of newly created execnet gateways

31

before they're used for worker communication.

32

33

Args:

34

gateway: newly created execnet gateway

35

"""

36

37

def pytest_configure_node(node: WorkerController) -> None:

38

"""

39

Configure node information before it gets instantiated.

40

41

Called for each worker before it starts executing tests,

42

allowing per-worker customization.

43

44

Args:

45

node: worker controller being configured

46

"""

47

48

def pytest_testnodeready(node: WorkerController) -> None:

49

"""

50

Test Node is ready to operate.

51

52

Called when a worker has finished initialization and

53

is ready to receive and execute tests.

54

55

Args:

56

node: worker controller that became ready

57

"""

58

59

def pytest_testnodedown(node: WorkerController, error: object | None) -> None:

60

"""

61

Test Node is down.

62

63

Called when a worker shuts down, either cleanly or due to an error.

64

65

Args:

66

node: worker controller that went down

67

error: exception that caused shutdown, or None for clean shutdown

68

"""

69

```

70

71

### Collection and Scheduling Hooks

72

73

Hooks for customizing test collection and distribution scheduling.

74

75

```python { .api }

76

def pytest_xdist_node_collection_finished(

77

node: WorkerController,

78

ids: Sequence[str]

79

) -> None:

80

"""

81

Called by the controller node when a worker node finishes collecting.

82

83

Allows processing of collected test IDs before scheduling begins.

84

85

Args:

86

node: worker that finished collection

87

ids: list of collected test item IDs

88

"""

89

90

def pytest_xdist_make_scheduler(

91

config: pytest.Config,

92

log: Producer

93

) -> Scheduling | None:

94

"""

95

Return a node scheduler implementation.

96

97

Hook for providing custom test distribution schedulers.

98

Return None to use the default scheduler for the selected distribution mode.

99

100

Args:

101

config: pytest configuration object

102

log: logging producer for scheduler messages

103

104

Returns:

105

Custom scheduler instance or None for default

106

"""

107

108

def pytest_xdist_auto_num_workers(config: pytest.Config) -> int:

109

"""

110

Return the number of workers to spawn when --numprocesses=auto is given.

111

112

Hook for customizing automatic worker count detection.

113

114

Args:

115

config: pytest configuration object

116

117

Returns:

118

Number of workers to spawn

119

120

Raises:

121

NotImplementedError: If not implemented by any plugin

122

"""

123

```

124

125

### Error Handling and Recovery Hooks

126

127

Hooks for handling worker crashes and test failures.

128

129

```python { .api }

130

def pytest_handlecrashitem(

131

crashitem: str,

132

report: pytest.TestReport,

133

sched: Scheduling

134

) -> None:

135

"""

136

Handle a crashitem, modifying the report if necessary.

137

138

Called when a test item causes a worker crash. The scheduler

139

is provided to allow rescheduling the test if desired.

140

141

Args:

142

crashitem: identifier of test that caused crash

143

report: test report for the crashed test

144

sched: scheduler instance for potential rescheduling

145

146

Example:

147

def pytest_handlecrashitem(crashitem, report, sched):

148

if should_rerun(crashitem):

149

sched.mark_test_pending(crashitem)

150

report.outcome = "rerun"

151

"""

152

```

153

154

### Remote Module Hooks (Advanced)

155

156

Hook for customizing remote worker module loading.

157

158

```python { .api }

159

def pytest_xdist_getremotemodule() -> Any:

160

"""

161

Called when creating remote node.

162

163

Hook for providing custom remote worker module implementations.

164

Advanced usage for specialized worker behaviors.

165

166

Returns:

167

Custom remote module or None for default

168

"""

169

```

170

171

### Deprecated Rsync Hooks

172

173

Legacy hooks for rsync functionality (deprecated, will be removed in pytest-xdist 4.0).

174

175

```python { .api }

176

def pytest_xdist_rsyncstart(

177

source: str | os.PathLike[str],

178

gateways: Sequence[execnet.Gateway],

179

) -> None:

180

"""

181

Called before rsyncing a directory to remote gateways takes place.

182

183

DEPRECATED: rsync feature is deprecated and will be removed in pytest-xdist 4.0

184

185

Args:

186

source: source directory being rsynced

187

gateways: target gateways for rsync

188

"""

189

190

def pytest_xdist_rsyncfinish(

191

source: str | os.PathLike[str],

192

gateways: Sequence[execnet.Gateway],

193

) -> None:

194

"""

195

Called after rsyncing a directory to remote gateways takes place.

196

197

DEPRECATED: rsync feature is deprecated and will be removed in pytest-xdist 4.0

198

199

Args:

200

source: source directory that was rsynced

201

gateways: target gateways for rsync

202

"""

203

```

204

205

## Usage Examples

206

207

### Custom Worker Count Detection

208

209

```python

210

# In conftest.py

211

import pytest

212

import psutil

213

214

def pytest_xdist_auto_num_workers(config):

215

"""Custom worker count based on available resources."""

216

217

# Check available memory

218

memory_gb = psutil.virtual_memory().total / (1024**3)

219

220

# Check if running resource-intensive tests

221

if config.getoption("--memory-intensive"):

222

# Use fewer workers for memory-heavy tests

223

if memory_gb < 8:

224

return 2

225

elif memory_gb < 16:

226

return 4

227

else:

228

return 8

229

230

# Check CPU characteristics

231

cpu_count = psutil.cpu_count(logical=False) # Physical cores only

232

233

# Custom logic for different scenarios

234

if config.getoption("--io-bound"):

235

# I/O bound tests can use more workers

236

return cpu_count * 2

237

elif config.getoption("--cpu-bound"):

238

# CPU bound tests should match core count

239

return cpu_count

240

241

# Default to letting pytest-xdist decide

242

return None

243

```

244

245

### Custom Scheduler Implementation

246

247

```python

248

# In conftest.py

249

import pytest

250

from xdist.scheduler.protocol import Scheduling

251

from xdist.workermanage import WorkerController

252

from typing import Sequence

253

254

class PriorityScheduler:

255

"""Custom scheduler that prioritizes certain tests."""

256

257

def __init__(self, config, log):

258

self.config = config

259

self.log = log

260

self._nodes = []

261

self._pending_priority = []

262

self._pending_normal = []

263

self._collection_complete = False

264

265

@property

266

def nodes(self) -> list[WorkerController]:

267

return self._nodes

268

269

@property

270

def collection_is_completed(self) -> bool:

271

return self._collection_complete

272

273

@property

274

def tests_finished(self) -> bool:

275

return not (self._pending_priority or self._pending_normal)

276

277

@property

278

def has_pending(self) -> bool:

279

return bool(self._pending_priority or self._pending_normal)

280

281

def add_node_collection(self, node: WorkerController, collection: Sequence[str]) -> None:

282

# Separate high-priority tests

283

for test_id in collection:

284

if "priority" in test_id or "critical" in test_id:

285

self._pending_priority.append(test_id)

286

else:

287

self._pending_normal.append(test_id)

288

self._collection_complete = True

289

290

def schedule(self) -> None:

291

"""Schedule priority tests first, then normal tests."""

292

for node in self._nodes:

293

if node.is_ready():

294

# Schedule priority tests first

295

if self._pending_priority:

296

test = self._pending_priority.pop(0)

297

node.send_runtest(test)

298

elif self._pending_normal:

299

test = self._pending_normal.pop(0)

300

node.send_runtest(test)

301

302

# Implement other required methods...

303

304

def pytest_xdist_make_scheduler(config, log):

305

"""Use custom priority scheduler if requested."""

306

if config.getoption("--priority-scheduler"):

307

return PriorityScheduler(config, log)

308

return None

309

```

310

311

### Node Lifecycle Management

312

313

```python

314

# In conftest.py

315

import pytest

316

import time

317

318

class NodeLifecycleManager:

319

"""Monitor and manage worker node lifecycle."""

320

321

def __init__(self):

322

self.node_stats = {}

323

self.setup_start_time = None

324

325

def pytest_xdist_setupnodes(self, config, specs):

326

"""Called before node setup begins."""

327

self.setup_start_time = time.time()

328

print(f"Setting up {len(specs)} worker nodes...")

329

330

# Pre-setup validation

331

for i, spec in enumerate(specs):

332

print(f"Worker {i}: {spec}")

333

334

def pytest_configure_node(self, node):

335

"""Configure each worker node."""

336

worker_id = node.workerinput['workerid']

337

338

# Add custom configuration

339

node.config.option.worker_start_time = time.time()

340

341

# Track node

342

self.node_stats[worker_id] = {

343

'configured_at': time.time(),

344

'ready_at': None,

345

'finished_at': None,

346

'tests_executed': 0,

347

'errors': []

348

}

349

350

print(f"Configured worker {worker_id}")

351

352

def pytest_testnodeready(self, node):

353

"""Track when nodes become ready."""

354

worker_id = node.workerinput['workerid']

355

self.node_stats[worker_id]['ready_at'] = time.time()

356

357

setup_duration = time.time() - self.setup_start_time

358

print(f"Worker {worker_id} ready after {setup_duration:.2f}s")

359

360

def pytest_testnodedown(self, node, error):

361

"""Handle node shutdown."""

362

worker_id = node.workerinput.get('workerid', 'unknown')

363

364

if worker_id in self.node_stats:

365

self.node_stats[worker_id]['finished_at'] = time.time()

366

367

if error:

368

self.node_stats[worker_id]['errors'].append(str(error))

369

print(f"Worker {worker_id} crashed: {error}")

370

else:

371

print(f"Worker {worker_id} finished cleanly")

372

373

self.print_node_summary(worker_id)

374

375

def print_node_summary(self, worker_id):

376

"""Print summary statistics for a worker."""

377

if worker_id not in self.node_stats:

378

return

379

380

stats = self.node_stats[worker_id]

381

if stats['ready_at'] and stats['finished_at']:

382

runtime = stats['finished_at'] - stats['ready_at']

383

print(f"Worker {worker_id} runtime: {runtime:.2f}s, "

384

f"tests: {stats['tests_executed']}, "

385

f"errors: {len(stats['errors'])}")

386

387

def pytest_configure(config):

388

if config.pluginmanager.hasplugin("dsession"):

389

manager = NodeLifecycleManager()

390

config.pluginmanager.register(manager)

391

```

392

393

### Crash Recovery Handler

394

395

```python

396

# In conftest.py

397

import pytest

398

399

class CrashRecoveryHandler:

400

"""Handle worker crashes and implement recovery strategies."""

401

402

def __init__(self):

403

self.crash_counts = {}

404

self.max_retries = 3

405

406

def pytest_handlecrashitem(self, crashitem, report, sched):

407

"""Handle crashed test items."""

408

# Track crash frequency

409

if crashitem not in self.crash_counts:

410

self.crash_counts[crashitem] = 0

411

412

self.crash_counts[crashitem] += 1

413

414

# Retry logic

415

if self.crash_counts[crashitem] <= self.max_retries:

416

print(f"Retrying crashed test {crashitem} "

417

f"(attempt {self.crash_counts[crashitem]}/{self.max_retries})")

418

419

# Reschedule the test

420

sched.mark_test_pending(crashitem)

421

report.outcome = "rerun"

422

else:

423

print(f"Test {crashitem} failed {self.max_retries} times, marking as crashed")

424

report.outcome = "crashed"

425

# Could also mark as skipped or failed depending on needs

426

427

def pytest_configure(config):

428

if config.pluginmanager.hasplugin("dsession"):

429

handler = CrashRecoveryHandler()

430

config.pluginmanager.register(handler)

431

```

432

433

### Collection Processing

434

435

```python

436

# In conftest.py

437

import pytest

438

from collections import defaultdict

439

440

def pytest_xdist_node_collection_finished(node, ids):

441

"""Process collected test IDs."""

442

worker_id = node.workerinput['workerid']

443

444

# Analyze collected tests

445

test_counts = defaultdict(int)

446

for test_id in ids:

447

if '::' in test_id:

448

module = test_id.split('::')[0]

449

test_counts[module] += 1

450

451

print(f"Worker {worker_id} collected {len(ids)} tests:")

452

for module, count in sorted(test_counts.items()):

453

print(f" {module}: {count} tests")

454

455

# Could implement load balancing adjustments here

456

# based on collection results

457

```