CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pytest-xdist

pytest xdist plugin for distributed testing, most importantly across multiple CPUs

Pending
Overview
Eval results
Files

session-management.mddocs/

Session Management

Core session management for coordinating distributed test execution, including worker lifecycle management and result aggregation.

Capabilities

Distributed Session

The main class that coordinates distributed test execution across multiple worker processes.

class DSession:
    """
    A pytest plugin which runs a distributed test session.
    
    Creates and manages worker nodes, coordinates test distribution,
    and aggregates results from all workers.
    """
    
    def __init__(self, config: pytest.Config) -> None:
        """
        Initialize distributed session.
        
        Args:
            config: pytest configuration object
        """
    
    @property
    def session_finished(self) -> bool:
        """
        Return True if the distributed session has finished.
        
        This means all nodes have executed all test items and
        the session is ready to shut down.
        
        Returns:
            bool: True if session is complete
        """
    
    def report_line(self, line: str) -> None:
        """
        Report a line of output if verbose mode is enabled.
        
        Args:
            line: Output line to report
        """

Session Lifecycle Hooks

Hooks that manage the distributed session lifecycle.

def pytest_sessionstart(self, session: pytest.Session) -> None:
    """
    Hook called at session start to create and start worker nodes.
    
    Sets up the NodeManager, creates worker processes, and
    initializes the test distribution system.
    
    Args:
        session: pytest session object
    """

def pytest_sessionfinish(self) -> None:
    """
    Hook called at session end to clean up worker nodes.
    
    Shuts down all worker processes and cleans up resources.
    """

def pytest_runtestloop(self) -> bool:
    """
    Main test execution loop for distributed testing.
    
    Coordinates test distribution, worker communication,
    and result aggregation until all tests are complete.
    
    Returns:
        bool: True to indicate loop handled execution
    """

Node Management

Management of worker node lifecycle and communication.

class NodeManager:
    """Manages worker node creation, setup, and teardown."""
    
    def __init__(
        self,
        config: pytest.Config,
        specs: Sequence[execnet.XSpec | str] | None = None,
        defaultchdir: str = "pyexecnetcache",
    ) -> None:
        """
        Initialize node manager.
        
        Args:
            config: pytest configuration
            specs: execution specifications for workers
            defaultchdir: default working directory for workers
        """
    
    def setup_nodes(
        self,
        putevent: Callable[[tuple[str, dict[str, Any]]], None],
    ) -> list[WorkerController]:
        """
        Set up all worker nodes.
        
        Args:
            putevent: callback for worker events
            
        Returns:
            List of worker controllers
        """
    
    def setup_node(
        self,
        spec: execnet.XSpec,
        putevent: Callable[[tuple[str, dict[str, Any]]], None],
    ) -> WorkerController:
        """
        Set up a single worker node.
        
        Args:
            spec: execution specification for worker
            putevent: callback for worker events
            
        Returns:
            Worker controller instance
        """
    
    def teardown_nodes(self) -> None:
        """Shut down all worker nodes and clean up resources."""
    
    def rsync_roots(self, gateway: execnet.Gateway) -> None:
        """
        Rsync configured directories to worker node.
        
        Args:
            gateway: execnet gateway to worker
        """

Worker Controller

Controls individual worker processes and their communication.

class WorkerController:
    """Controls a single worker process."""
    
    def send_runtest_some(self, indices: Sequence[int]) -> None:
        """
        Send test items to worker for execution.
        
        Args:
            indices: indices of test items to run
        """
    
    def send_runtest_all(self) -> None:
        """Send all collected tests to worker for execution."""
    
    def shutdown(self) -> None:
        """Shut down the worker process."""
    
    def pytest_runtest_protocol_complete(
        self,
        item_index: int,
        duration: float,
    ) -> None:
        """
        Handle completion of test item execution.
        
        Args:
            item_index: index of completed test
            duration: execution duration in seconds
        """

Event Processing

Worker event processing and coordination.

def worker_workerready(self, node: WorkerController) -> None:
    """
    Handle worker ready event.
    
    Args:
        node: worker that became ready
    """

def worker_workerfinished(self, node: WorkerController) -> None:
    """
    Handle worker finished event.
    
    Args:
        node: worker that finished
    """

def worker_runtest_logreport(
    self,
    node: WorkerController,
    report: pytest.TestReport,
) -> None:
    """
    Handle test report from worker.
    
    Args:
        node: worker that sent report
        report: test execution report
    """

def worker_collectreport(
    self,
    node: WorkerController,
    report: pytest.CollectReport,
) -> None:
    """
    Handle collection report from worker.
    
    Args:
        node: worker that sent report
        report: collection report
    """

def worker_logstart(
    self,
    node: WorkerController,
    nodeid: str,
) -> None:
    """
    Handle test start logging from worker.
    
    Args:
        node: worker that started test
        nodeid: test node identifier
    """

Configuration Utilities

Utilities for parsing and managing worker configuration.

def parse_tx_spec_config(config: pytest.Config) -> list[str]:
    """
    Parse tx specification configuration into list of specs.
    
    Handles multiplication syntax (e.g., "4*popen" -> ["popen", "popen", "popen", "popen"])
    
    Args:
        config: pytest configuration object
        
    Returns:
        List of execution specifications
        
    Raises:
        pytest.UsageError: If no tx specs are provided
    """

def get_default_max_worker_restart(config: pytest.Config) -> int:
    """
    Get the default maximum worker restart count.
    
    Args:
        config: pytest configuration object
        
    Returns:
        Maximum number of worker restarts allowed
    """

Usage Examples

Custom Session Event Handler

# In conftest.py
import pytest

class CustomSessionHandler:
    def pytest_sessionstart(self, session):
        if hasattr(session.config, 'workerinput'):
            # Worker session start
            self.setup_worker_session()
        else:
            # Controller session start  
            self.setup_controller_session()
    
    def pytest_sessionfinish(self, session):
        if hasattr(session.config, 'workerinput'):
            # Worker session cleanup
            self.cleanup_worker_session()
        else:
            # Controller session cleanup
            self.cleanup_controller_session()

def pytest_configure(config):
    if config.pluginmanager.hasplugin("dsession"):
        # xdist is active, register our handler
        config.pluginmanager.register(CustomSessionHandler())

Worker Event Monitoring

import pytest
from xdist.dsession import DSession

class WorkerMonitor:
    def __init__(self):
        self.worker_stats = {}
    
    def worker_workerready(self, node):
        worker_id = node.workerinput['workerid']
        self.worker_stats[worker_id] = {
            'ready_time': time.time(),
            'tests_run': 0,
            'failures': 0
        }
        print(f"Worker {worker_id} is ready")
    
    def worker_runtest_logreport(self, node, report):
        worker_id = node.workerinput['workerid']
        if worker_id in self.worker_stats:
            self.worker_stats[worker_id]['tests_run'] += 1
            if report.failed:
                self.worker_stats[worker_id]['failures'] += 1
    
    def worker_workerfinished(self, node):
        worker_id = node.workerinput['workerid']
        if worker_id in self.worker_stats:
            stats = self.worker_stats[worker_id]
            print(f"Worker {worker_id} finished: "
                  f"{stats['tests_run']} tests, "
                  f"{stats['failures']} failures")

def pytest_configure(config):
    if config.pluginmanager.hasplugin("dsession"):
        monitor = WorkerMonitor()
        config.pluginmanager.register(monitor)

Custom Node Setup

import pytest
from xdist.nodemanage import NodeManager

def pytest_xdist_setupnodes(config, specs):
    """Hook called before nodes are set up."""
    print(f"Setting up {len(specs)} worker nodes")
    
    # Custom pre-setup logic
    for i, spec in enumerate(specs):
        print(f"Worker {i} spec: {spec}")

def pytest_configure_node(node):
    """Hook called to configure each worker node."""
    worker_id = node.workerinput['workerid'] 
    
    # Custom worker configuration
    node.config.option.custom_worker_id = worker_id
    print(f"Configured worker {worker_id}")

def pytest_testnodeready(node):
    """Hook called when worker is ready."""
    worker_id = node.workerinput['workerid']
    print(f"Worker {worker_id} is ready for tests")

def pytest_testnodedown(node, error):
    """Hook called when worker goes down."""
    worker_id = node.workerinput.get('workerid', 'unknown')
    if error:
        print(f"Worker {worker_id} crashed: {error}")
    else:
        print(f"Worker {worker_id} shut down cleanly")

Session State Management

import pytest
from collections import defaultdict

class DistributedSessionState:
    """Manage state across distributed session."""
    
    def __init__(self):
        self.controller_state = {}
        self.worker_results = defaultdict(list)
        self.session_start_time = None
    
    def pytest_sessionstart(self, session):
        self.session_start_time = time.time()
        
        if hasattr(session.config, 'workerinput'):
            # Worker session
            worker_id = session.config.workerinput['workerid']
            self.setup_worker_state(worker_id)
        else:
            # Controller session
            self.setup_controller_state()
    
    def setup_controller_state(self):
        """Set up controller-specific state."""
        self.controller_state['total_workers'] = len(
            self.config.getoption('tx') or []
        )
        self.controller_state['active_workers'] = set()
    
    def setup_worker_state(self, worker_id):
        """Set up worker-specific state."""
        # Worker state is local to each process
        pass
    
    def worker_workerready(self, node):
        """Track worker readiness in controller."""
        worker_id = node.workerinput['workerid']
        self.controller_state['active_workers'].add(worker_id)
    
    def pytest_runtest_logreport(self, report):
        """Collect test results."""
        if hasattr(self.config, 'workerinput'):
            # In worker - collect local results
            worker_id = self.config.workerinput['workerid']
            self.worker_results[worker_id].append(report)

# Usage
state_manager = DistributedSessionState()

def pytest_configure(config):
    state_manager.config = config
    config.pluginmanager.register(state_manager)

Install with Tessl CLI

npx tessl i tessl/pypi-pytest-xdist

docs

distribution-scheduling.md

hook-specifications.md

index.md

loop-on-fail.md

plugin-configuration.md

session-management.md

worker-detection.md

tile.json