pytest xdist plugin for distributed testing, most importantly across multiple CPUs
—
Core session management for coordinating distributed test execution, including worker lifecycle management and result aggregation.
The main class that coordinates distributed test execution across multiple worker processes.
class DSession:
"""
A pytest plugin which runs a distributed test session.
Creates and manages worker nodes, coordinates test distribution,
and aggregates results from all workers.
"""
def __init__(self, config: pytest.Config) -> None:
"""
Initialize distributed session.
Args:
config: pytest configuration object
"""
@property
def session_finished(self) -> bool:
"""
Return True if the distributed session has finished.
This means all nodes have executed all test items and
the session is ready to shut down.
Returns:
bool: True if session is complete
"""
def report_line(self, line: str) -> None:
"""
Report a line of output if verbose mode is enabled.
Args:
line: Output line to report
"""Hooks that manage the distributed session lifecycle.
def pytest_sessionstart(self, session: pytest.Session) -> None:
"""
Hook called at session start to create and start worker nodes.
Sets up the NodeManager, creates worker processes, and
initializes the test distribution system.
Args:
session: pytest session object
"""
def pytest_sessionfinish(self) -> None:
"""
Hook called at session end to clean up worker nodes.
Shuts down all worker processes and cleans up resources.
"""
def pytest_runtestloop(self) -> bool:
"""
Main test execution loop for distributed testing.
Coordinates test distribution, worker communication,
and result aggregation until all tests are complete.
Returns:
bool: True to indicate loop handled execution
"""Management of worker node lifecycle and communication.
class NodeManager:
"""Manages worker node creation, setup, and teardown."""
def __init__(
self,
config: pytest.Config,
specs: Sequence[execnet.XSpec | str] | None = None,
defaultchdir: str = "pyexecnetcache",
) -> None:
"""
Initialize node manager.
Args:
config: pytest configuration
specs: execution specifications for workers
defaultchdir: default working directory for workers
"""
def setup_nodes(
self,
putevent: Callable[[tuple[str, dict[str, Any]]], None],
) -> list[WorkerController]:
"""
Set up all worker nodes.
Args:
putevent: callback for worker events
Returns:
List of worker controllers
"""
def setup_node(
self,
spec: execnet.XSpec,
putevent: Callable[[tuple[str, dict[str, Any]]], None],
) -> WorkerController:
"""
Set up a single worker node.
Args:
spec: execution specification for worker
putevent: callback for worker events
Returns:
Worker controller instance
"""
def teardown_nodes(self) -> None:
"""Shut down all worker nodes and clean up resources."""
def rsync_roots(self, gateway: execnet.Gateway) -> None:
"""
Rsync configured directories to worker node.
Args:
gateway: execnet gateway to worker
"""Controls individual worker processes and their communication.
class WorkerController:
"""Controls a single worker process."""
def send_runtest_some(self, indices: Sequence[int]) -> None:
"""
Send test items to worker for execution.
Args:
indices: indices of test items to run
"""
def send_runtest_all(self) -> None:
"""Send all collected tests to worker for execution."""
def shutdown(self) -> None:
"""Shut down the worker process."""
def pytest_runtest_protocol_complete(
self,
item_index: int,
duration: float,
) -> None:
"""
Handle completion of test item execution.
Args:
item_index: index of completed test
duration: execution duration in seconds
"""Worker event processing and coordination.
def worker_workerready(self, node: WorkerController) -> None:
"""
Handle worker ready event.
Args:
node: worker that became ready
"""
def worker_workerfinished(self, node: WorkerController) -> None:
"""
Handle worker finished event.
Args:
node: worker that finished
"""
def worker_runtest_logreport(
self,
node: WorkerController,
report: pytest.TestReport,
) -> None:
"""
Handle test report from worker.
Args:
node: worker that sent report
report: test execution report
"""
def worker_collectreport(
self,
node: WorkerController,
report: pytest.CollectReport,
) -> None:
"""
Handle collection report from worker.
Args:
node: worker that sent report
report: collection report
"""
def worker_logstart(
self,
node: WorkerController,
nodeid: str,
) -> None:
"""
Handle test start logging from worker.
Args:
node: worker that started test
nodeid: test node identifier
"""Utilities for parsing and managing worker configuration.
def parse_tx_spec_config(config: pytest.Config) -> list[str]:
"""
Parse tx specification configuration into list of specs.
Handles multiplication syntax (e.g., "4*popen" -> ["popen", "popen", "popen", "popen"])
Args:
config: pytest configuration object
Returns:
List of execution specifications
Raises:
pytest.UsageError: If no tx specs are provided
"""
def get_default_max_worker_restart(config: pytest.Config) -> int:
"""
Get the default maximum worker restart count.
Args:
config: pytest configuration object
Returns:
Maximum number of worker restarts allowed
"""# In conftest.py
import pytest
class CustomSessionHandler:
def pytest_sessionstart(self, session):
if hasattr(session.config, 'workerinput'):
# Worker session start
self.setup_worker_session()
else:
# Controller session start
self.setup_controller_session()
def pytest_sessionfinish(self, session):
if hasattr(session.config, 'workerinput'):
# Worker session cleanup
self.cleanup_worker_session()
else:
# Controller session cleanup
self.cleanup_controller_session()
def pytest_configure(config):
if config.pluginmanager.hasplugin("dsession"):
# xdist is active, register our handler
config.pluginmanager.register(CustomSessionHandler())import pytest
from xdist.dsession import DSession
class WorkerMonitor:
def __init__(self):
self.worker_stats = {}
def worker_workerready(self, node):
worker_id = node.workerinput['workerid']
self.worker_stats[worker_id] = {
'ready_time': time.time(),
'tests_run': 0,
'failures': 0
}
print(f"Worker {worker_id} is ready")
def worker_runtest_logreport(self, node, report):
worker_id = node.workerinput['workerid']
if worker_id in self.worker_stats:
self.worker_stats[worker_id]['tests_run'] += 1
if report.failed:
self.worker_stats[worker_id]['failures'] += 1
def worker_workerfinished(self, node):
worker_id = node.workerinput['workerid']
if worker_id in self.worker_stats:
stats = self.worker_stats[worker_id]
print(f"Worker {worker_id} finished: "
f"{stats['tests_run']} tests, "
f"{stats['failures']} failures")
def pytest_configure(config):
if config.pluginmanager.hasplugin("dsession"):
monitor = WorkerMonitor()
config.pluginmanager.register(monitor)import pytest
from xdist.nodemanage import NodeManager
def pytest_xdist_setupnodes(config, specs):
"""Hook called before nodes are set up."""
print(f"Setting up {len(specs)} worker nodes")
# Custom pre-setup logic
for i, spec in enumerate(specs):
print(f"Worker {i} spec: {spec}")
def pytest_configure_node(node):
"""Hook called to configure each worker node."""
worker_id = node.workerinput['workerid']
# Custom worker configuration
node.config.option.custom_worker_id = worker_id
print(f"Configured worker {worker_id}")
def pytest_testnodeready(node):
"""Hook called when worker is ready."""
worker_id = node.workerinput['workerid']
print(f"Worker {worker_id} is ready for tests")
def pytest_testnodedown(node, error):
"""Hook called when worker goes down."""
worker_id = node.workerinput.get('workerid', 'unknown')
if error:
print(f"Worker {worker_id} crashed: {error}")
else:
print(f"Worker {worker_id} shut down cleanly")import pytest
from collections import defaultdict
class DistributedSessionState:
"""Manage state across distributed session."""
def __init__(self):
self.controller_state = {}
self.worker_results = defaultdict(list)
self.session_start_time = None
def pytest_sessionstart(self, session):
self.session_start_time = time.time()
if hasattr(session.config, 'workerinput'):
# Worker session
worker_id = session.config.workerinput['workerid']
self.setup_worker_state(worker_id)
else:
# Controller session
self.setup_controller_state()
def setup_controller_state(self):
"""Set up controller-specific state."""
self.controller_state['total_workers'] = len(
self.config.getoption('tx') or []
)
self.controller_state['active_workers'] = set()
def setup_worker_state(self, worker_id):
"""Set up worker-specific state."""
# Worker state is local to each process
pass
def worker_workerready(self, node):
"""Track worker readiness in controller."""
worker_id = node.workerinput['workerid']
self.controller_state['active_workers'].add(worker_id)
def pytest_runtest_logreport(self, report):
"""Collect test results."""
if hasattr(self.config, 'workerinput'):
# In worker - collect local results
worker_id = self.config.workerinput['workerid']
self.worker_results[worker_id].append(report)
# Usage
state_manager = DistributedSessionState()
def pytest_configure(config):
state_manager.config = config
config.pluginmanager.register(state_manager)Install with Tessl CLI
npx tessl i tessl/pypi-pytest-xdist