pytest xdist plugin for distributed testing, most importantly across multiple CPUs
—
Custom pytest hooks specific to xdist for plugin authors to extend and customize distributed testing behavior.
Hooks for managing worker node creation, configuration, and lifecycle.
def pytest_xdist_setupnodes(
config: pytest.Config,
specs: Sequence[execnet.XSpec]
) -> None:
"""
Called before any remote node is set up.
This hook allows plugins to perform global setup before
workers are created and started.
Args:
config: pytest configuration object
specs: list of execution specifications for workers
"""
def pytest_xdist_newgateway(gateway: execnet.Gateway) -> None:
"""
Called on new raw gateway creation.
Allows customization of newly created execnet gateways
before they're used for worker communication.
Args:
gateway: newly created execnet gateway
"""
def pytest_configure_node(node: WorkerController) -> None:
"""
Configure node information before it gets instantiated.
Called for each worker before it starts executing tests,
allowing per-worker customization.
Args:
node: worker controller being configured
"""
def pytest_testnodeready(node: WorkerController) -> None:
"""
Test Node is ready to operate.
Called when a worker has finished initialization and
is ready to receive and execute tests.
Args:
node: worker controller that became ready
"""
def pytest_testnodedown(node: WorkerController, error: object | None) -> None:
"""
Test Node is down.
Called when a worker shuts down, either cleanly or due to an error.
Args:
node: worker controller that went down
error: exception that caused shutdown, or None for clean shutdown
"""Hooks for customizing test collection and distribution scheduling.
def pytest_xdist_node_collection_finished(
node: WorkerController,
ids: Sequence[str]
) -> None:
"""
Called by the controller node when a worker node finishes collecting.
Allows processing of collected test IDs before scheduling begins.
Args:
node: worker that finished collection
ids: list of collected test item IDs
"""
def pytest_xdist_make_scheduler(
config: pytest.Config,
log: Producer
) -> Scheduling | None:
"""
Return a node scheduler implementation.
Hook for providing custom test distribution schedulers.
Return None to use the default scheduler for the selected distribution mode.
Args:
config: pytest configuration object
log: logging producer for scheduler messages
Returns:
Custom scheduler instance or None for default
"""
def pytest_xdist_auto_num_workers(config: pytest.Config) -> int:
"""
Return the number of workers to spawn when --numprocesses=auto is given.
Hook for customizing automatic worker count detection.
Args:
config: pytest configuration object
Returns:
Number of workers to spawn
Raises:
NotImplementedError: If not implemented by any plugin
"""Hooks for handling worker crashes and test failures.
def pytest_handlecrashitem(
crashitem: str,
report: pytest.TestReport,
sched: Scheduling
) -> None:
"""
Handle a crashitem, modifying the report if necessary.
Called when a test item causes a worker crash. The scheduler
is provided to allow rescheduling the test if desired.
Args:
crashitem: identifier of test that caused crash
report: test report for the crashed test
sched: scheduler instance for potential rescheduling
Example:
def pytest_handlecrashitem(crashitem, report, sched):
if should_rerun(crashitem):
sched.mark_test_pending(crashitem)
report.outcome = "rerun"
"""Hook for customizing remote worker module loading.
def pytest_xdist_getremotemodule() -> Any:
"""
Called when creating remote node.
Hook for providing custom remote worker module implementations.
Advanced usage for specialized worker behaviors.
Returns:
Custom remote module or None for default
"""Legacy hooks for rsync functionality (deprecated, will be removed in pytest-xdist 4.0).
def pytest_xdist_rsyncstart(
source: str | os.PathLike[str],
gateways: Sequence[execnet.Gateway],
) -> None:
"""
Called before rsyncing a directory to remote gateways takes place.
DEPRECATED: rsync feature is deprecated and will be removed in pytest-xdist 4.0
Args:
source: source directory being rsynced
gateways: target gateways for rsync
"""
def pytest_xdist_rsyncfinish(
source: str | os.PathLike[str],
gateways: Sequence[execnet.Gateway],
) -> None:
"""
Called after rsyncing a directory to remote gateways takes place.
DEPRECATED: rsync feature is deprecated and will be removed in pytest-xdist 4.0
Args:
source: source directory that was rsynced
gateways: target gateways for rsync
"""# In conftest.py
import pytest
import psutil
def pytest_xdist_auto_num_workers(config):
"""Custom worker count based on available resources."""
# Check available memory
memory_gb = psutil.virtual_memory().total / (1024**3)
# Check if running resource-intensive tests
if config.getoption("--memory-intensive"):
# Use fewer workers for memory-heavy tests
if memory_gb < 8:
return 2
elif memory_gb < 16:
return 4
else:
return 8
# Check CPU characteristics
cpu_count = psutil.cpu_count(logical=False) # Physical cores only
# Custom logic for different scenarios
if config.getoption("--io-bound"):
# I/O bound tests can use more workers
return cpu_count * 2
elif config.getoption("--cpu-bound"):
# CPU bound tests should match core count
return cpu_count
# Default to letting pytest-xdist decide
return None# In conftest.py
import pytest
from xdist.scheduler.protocol import Scheduling
from xdist.workermanage import WorkerController
from typing import Sequence
class PriorityScheduler:
"""Custom scheduler that prioritizes certain tests."""
def __init__(self, config, log):
self.config = config
self.log = log
self._nodes = []
self._pending_priority = []
self._pending_normal = []
self._collection_complete = False
@property
def nodes(self) -> list[WorkerController]:
return self._nodes
@property
def collection_is_completed(self) -> bool:
return self._collection_complete
@property
def tests_finished(self) -> bool:
return not (self._pending_priority or self._pending_normal)
@property
def has_pending(self) -> bool:
return bool(self._pending_priority or self._pending_normal)
def add_node_collection(self, node: WorkerController, collection: Sequence[str]) -> None:
# Separate high-priority tests
for test_id in collection:
if "priority" in test_id or "critical" in test_id:
self._pending_priority.append(test_id)
else:
self._pending_normal.append(test_id)
self._collection_complete = True
def schedule(self) -> None:
"""Schedule priority tests first, then normal tests."""
for node in self._nodes:
if node.is_ready():
# Schedule priority tests first
if self._pending_priority:
test = self._pending_priority.pop(0)
node.send_runtest(test)
elif self._pending_normal:
test = self._pending_normal.pop(0)
node.send_runtest(test)
# Implement other required methods...
def pytest_xdist_make_scheduler(config, log):
"""Use custom priority scheduler if requested."""
if config.getoption("--priority-scheduler"):
return PriorityScheduler(config, log)
return None# In conftest.py
import pytest
import time
class NodeLifecycleManager:
"""Monitor and manage worker node lifecycle."""
def __init__(self):
self.node_stats = {}
self.setup_start_time = None
def pytest_xdist_setupnodes(self, config, specs):
"""Called before node setup begins."""
self.setup_start_time = time.time()
print(f"Setting up {len(specs)} worker nodes...")
# Pre-setup validation
for i, spec in enumerate(specs):
print(f"Worker {i}: {spec}")
def pytest_configure_node(self, node):
"""Configure each worker node."""
worker_id = node.workerinput['workerid']
# Add custom configuration
node.config.option.worker_start_time = time.time()
# Track node
self.node_stats[worker_id] = {
'configured_at': time.time(),
'ready_at': None,
'finished_at': None,
'tests_executed': 0,
'errors': []
}
print(f"Configured worker {worker_id}")
def pytest_testnodeready(self, node):
"""Track when nodes become ready."""
worker_id = node.workerinput['workerid']
self.node_stats[worker_id]['ready_at'] = time.time()
setup_duration = time.time() - self.setup_start_time
print(f"Worker {worker_id} ready after {setup_duration:.2f}s")
def pytest_testnodedown(self, node, error):
"""Handle node shutdown."""
worker_id = node.workerinput.get('workerid', 'unknown')
if worker_id in self.node_stats:
self.node_stats[worker_id]['finished_at'] = time.time()
if error:
self.node_stats[worker_id]['errors'].append(str(error))
print(f"Worker {worker_id} crashed: {error}")
else:
print(f"Worker {worker_id} finished cleanly")
self.print_node_summary(worker_id)
def print_node_summary(self, worker_id):
"""Print summary statistics for a worker."""
if worker_id not in self.node_stats:
return
stats = self.node_stats[worker_id]
if stats['ready_at'] and stats['finished_at']:
runtime = stats['finished_at'] - stats['ready_at']
print(f"Worker {worker_id} runtime: {runtime:.2f}s, "
f"tests: {stats['tests_executed']}, "
f"errors: {len(stats['errors'])}")
def pytest_configure(config):
if config.pluginmanager.hasplugin("dsession"):
manager = NodeLifecycleManager()
config.pluginmanager.register(manager)# In conftest.py
import pytest
class CrashRecoveryHandler:
"""Handle worker crashes and implement recovery strategies."""
def __init__(self):
self.crash_counts = {}
self.max_retries = 3
def pytest_handlecrashitem(self, crashitem, report, sched):
"""Handle crashed test items."""
# Track crash frequency
if crashitem not in self.crash_counts:
self.crash_counts[crashitem] = 0
self.crash_counts[crashitem] += 1
# Retry logic
if self.crash_counts[crashitem] <= self.max_retries:
print(f"Retrying crashed test {crashitem} "
f"(attempt {self.crash_counts[crashitem]}/{self.max_retries})")
# Reschedule the test
sched.mark_test_pending(crashitem)
report.outcome = "rerun"
else:
print(f"Test {crashitem} failed {self.max_retries} times, marking as crashed")
report.outcome = "crashed"
# Could also mark as skipped or failed depending on needs
def pytest_configure(config):
if config.pluginmanager.hasplugin("dsession"):
handler = CrashRecoveryHandler()
config.pluginmanager.register(handler)# In conftest.py
import pytest
from collections import defaultdict
def pytest_xdist_node_collection_finished(node, ids):
"""Process collected test IDs."""
worker_id = node.workerinput['workerid']
# Analyze collected tests
test_counts = defaultdict(int)
for test_id in ids:
if '::' in test_id:
module = test_id.split('::')[0]
test_counts[module] += 1
print(f"Worker {worker_id} collected {len(ids)} tests:")
for module, count in sorted(test_counts.items()):
print(f" {module}: {count} tests")
# Could implement load balancing adjustments here
# based on collection resultsInstall with Tessl CLI
npx tessl i tessl/pypi-pytest-xdist