CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pytest-xdist

pytest xdist plugin for distributed testing, most importantly across multiple CPUs

Pending
Overview
Eval results
Files

hook-specifications.mddocs/

Hook Specifications

Custom pytest hooks specific to xdist for plugin authors to extend and customize distributed testing behavior.

Capabilities

Node Setup and Lifecycle Hooks

Hooks for managing worker node creation, configuration, and lifecycle.

def pytest_xdist_setupnodes(
    config: pytest.Config, 
    specs: Sequence[execnet.XSpec]
) -> None:
    """
    Called before any remote node is set up.
    
    This hook allows plugins to perform global setup before
    workers are created and started.
    
    Args:
        config: pytest configuration object
        specs: list of execution specifications for workers
    """

def pytest_xdist_newgateway(gateway: execnet.Gateway) -> None:
    """
    Called on new raw gateway creation.
    
    Allows customization of newly created execnet gateways
    before they're used for worker communication.
    
    Args:
        gateway: newly created execnet gateway
    """

def pytest_configure_node(node: WorkerController) -> None:
    """
    Configure node information before it gets instantiated.
    
    Called for each worker before it starts executing tests,
    allowing per-worker customization.
    
    Args:
        node: worker controller being configured
    """

def pytest_testnodeready(node: WorkerController) -> None:
    """
    Test Node is ready to operate.
    
    Called when a worker has finished initialization and
    is ready to receive and execute tests.
    
    Args:
        node: worker controller that became ready
    """

def pytest_testnodedown(node: WorkerController, error: object | None) -> None:
    """
    Test Node is down.
    
    Called when a worker shuts down, either cleanly or due to an error.
    
    Args:
        node: worker controller that went down
        error: exception that caused shutdown, or None for clean shutdown
    """

Collection and Scheduling Hooks

Hooks for customizing test collection and distribution scheduling.

def pytest_xdist_node_collection_finished(
    node: WorkerController, 
    ids: Sequence[str]
) -> None:
    """
    Called by the controller node when a worker node finishes collecting.
    
    Allows processing of collected test IDs before scheduling begins.
    
    Args:
        node: worker that finished collection
        ids: list of collected test item IDs
    """

def pytest_xdist_make_scheduler(
    config: pytest.Config, 
    log: Producer
) -> Scheduling | None:
    """
    Return a node scheduler implementation.
    
    Hook for providing custom test distribution schedulers.
    Return None to use the default scheduler for the selected distribution mode.
    
    Args:
        config: pytest configuration object
        log: logging producer for scheduler messages
        
    Returns:
        Custom scheduler instance or None for default
    """

def pytest_xdist_auto_num_workers(config: pytest.Config) -> int:
    """
    Return the number of workers to spawn when --numprocesses=auto is given.
    
    Hook for customizing automatic worker count detection.
    
    Args:
        config: pytest configuration object
        
    Returns:
        Number of workers to spawn
        
    Raises:
        NotImplementedError: If not implemented by any plugin
    """

Error Handling and Recovery Hooks

Hooks for handling worker crashes and test failures.

def pytest_handlecrashitem(
    crashitem: str, 
    report: pytest.TestReport, 
    sched: Scheduling
) -> None:
    """
    Handle a crashitem, modifying the report if necessary.
    
    Called when a test item causes a worker crash. The scheduler
    is provided to allow rescheduling the test if desired.
    
    Args:
        crashitem: identifier of test that caused crash
        report: test report for the crashed test
        sched: scheduler instance for potential rescheduling
        
    Example:
        def pytest_handlecrashitem(crashitem, report, sched):
            if should_rerun(crashitem):
                sched.mark_test_pending(crashitem) 
                report.outcome = "rerun"
    """

Remote Module Hooks (Advanced)

Hook for customizing remote worker module loading.

def pytest_xdist_getremotemodule() -> Any:
    """
    Called when creating remote node.
    
    Hook for providing custom remote worker module implementations.
    Advanced usage for specialized worker behaviors.
    
    Returns:
        Custom remote module or None for default
    """

Deprecated Rsync Hooks

Legacy hooks for rsync functionality (deprecated, will be removed in pytest-xdist 4.0).

def pytest_xdist_rsyncstart(
    source: str | os.PathLike[str],
    gateways: Sequence[execnet.Gateway],
) -> None:
    """
    Called before rsyncing a directory to remote gateways takes place.
    
    DEPRECATED: rsync feature is deprecated and will be removed in pytest-xdist 4.0
    
    Args:
        source: source directory being rsynced
        gateways: target gateways for rsync
    """

def pytest_xdist_rsyncfinish(
    source: str | os.PathLike[str],
    gateways: Sequence[execnet.Gateway], 
) -> None:
    """
    Called after rsyncing a directory to remote gateways takes place.
    
    DEPRECATED: rsync feature is deprecated and will be removed in pytest-xdist 4.0
    
    Args:
        source: source directory that was rsynced
        gateways: target gateways for rsync
    """

Usage Examples

Custom Worker Count Detection

# In conftest.py
import pytest
import psutil

def pytest_xdist_auto_num_workers(config):
    """Custom worker count based on available resources."""
    
    # Check available memory
    memory_gb = psutil.virtual_memory().total / (1024**3)
    
    # Check if running resource-intensive tests
    if config.getoption("--memory-intensive"):
        # Use fewer workers for memory-heavy tests
        if memory_gb < 8:
            return 2
        elif memory_gb < 16:
            return 4
        else:
            return 8
    
    # Check CPU characteristics
    cpu_count = psutil.cpu_count(logical=False)  # Physical cores only
    
    # Custom logic for different scenarios
    if config.getoption("--io-bound"):
        # I/O bound tests can use more workers
        return cpu_count * 2
    elif config.getoption("--cpu-bound"):
        # CPU bound tests should match core count
        return cpu_count
    
    # Default to letting pytest-xdist decide
    return None

Custom Scheduler Implementation

# In conftest.py
import pytest
from xdist.scheduler.protocol import Scheduling
from xdist.workermanage import WorkerController
from typing import Sequence

class PriorityScheduler:
    """Custom scheduler that prioritizes certain tests."""
    
    def __init__(self, config, log):
        self.config = config
        self.log = log
        self._nodes = []
        self._pending_priority = []
        self._pending_normal = []
        self._collection_complete = False
    
    @property
    def nodes(self) -> list[WorkerController]:
        return self._nodes
    
    @property
    def collection_is_completed(self) -> bool:
        return self._collection_complete
    
    @property
    def tests_finished(self) -> bool:
        return not (self._pending_priority or self._pending_normal)
    
    @property
    def has_pending(self) -> bool:
        return bool(self._pending_priority or self._pending_normal)
    
    def add_node_collection(self, node: WorkerController, collection: Sequence[str]) -> None:
        # Separate high-priority tests
        for test_id in collection:
            if "priority" in test_id or "critical" in test_id:
                self._pending_priority.append(test_id)
            else:
                self._pending_normal.append(test_id)
        self._collection_complete = True
    
    def schedule(self) -> None:
        """Schedule priority tests first, then normal tests."""
        for node in self._nodes:
            if node.is_ready():
                # Schedule priority tests first
                if self._pending_priority:
                    test = self._pending_priority.pop(0)
                    node.send_runtest(test)
                elif self._pending_normal:
                    test = self._pending_normal.pop(0) 
                    node.send_runtest(test)
    
    # Implement other required methods...
    
def pytest_xdist_make_scheduler(config, log):
    """Use custom priority scheduler if requested."""
    if config.getoption("--priority-scheduler"):
        return PriorityScheduler(config, log)
    return None

Node Lifecycle Management

# In conftest.py
import pytest
import time

class NodeLifecycleManager:
    """Monitor and manage worker node lifecycle."""
    
    def __init__(self):
        self.node_stats = {}
        self.setup_start_time = None
    
    def pytest_xdist_setupnodes(self, config, specs):
        """Called before node setup begins."""
        self.setup_start_time = time.time()
        print(f"Setting up {len(specs)} worker nodes...")
        
        # Pre-setup validation
        for i, spec in enumerate(specs):
            print(f"Worker {i}: {spec}")
    
    def pytest_configure_node(self, node):
        """Configure each worker node."""
        worker_id = node.workerinput['workerid']
        
        # Add custom configuration
        node.config.option.worker_start_time = time.time()
        
        # Track node
        self.node_stats[worker_id] = {
            'configured_at': time.time(),
            'ready_at': None,
            'finished_at': None,
            'tests_executed': 0,
            'errors': []
        }
        
        print(f"Configured worker {worker_id}")
    
    def pytest_testnodeready(self, node):
        """Track when nodes become ready."""
        worker_id = node.workerinput['workerid']
        self.node_stats[worker_id]['ready_at'] = time.time()
        
        setup_duration = time.time() - self.setup_start_time
        print(f"Worker {worker_id} ready after {setup_duration:.2f}s")
    
    def pytest_testnodedown(self, node, error):
        """Handle node shutdown."""
        worker_id = node.workerinput.get('workerid', 'unknown')
        
        if worker_id in self.node_stats:
            self.node_stats[worker_id]['finished_at'] = time.time()
            
            if error:
                self.node_stats[worker_id]['errors'].append(str(error))
                print(f"Worker {worker_id} crashed: {error}")
            else:
                print(f"Worker {worker_id} finished cleanly")
        
        self.print_node_summary(worker_id)
    
    def print_node_summary(self, worker_id):
        """Print summary statistics for a worker."""
        if worker_id not in self.node_stats:
            return
            
        stats = self.node_stats[worker_id]
        if stats['ready_at'] and stats['finished_at']:
            runtime = stats['finished_at'] - stats['ready_at']
            print(f"Worker {worker_id} runtime: {runtime:.2f}s, "
                  f"tests: {stats['tests_executed']}, "
                  f"errors: {len(stats['errors'])}")

def pytest_configure(config):
    if config.pluginmanager.hasplugin("dsession"):
        manager = NodeLifecycleManager()
        config.pluginmanager.register(manager)

Crash Recovery Handler

# In conftest.py
import pytest

class CrashRecoveryHandler:
    """Handle worker crashes and implement recovery strategies."""
    
    def __init__(self):
        self.crash_counts = {}
        self.max_retries = 3
    
    def pytest_handlecrashitem(self, crashitem, report, sched):
        """Handle crashed test items."""
        # Track crash frequency
        if crashitem not in self.crash_counts:
            self.crash_counts[crashitem] = 0
        
        self.crash_counts[crashitem] += 1
        
        # Retry logic
        if self.crash_counts[crashitem] <= self.max_retries:
            print(f"Retrying crashed test {crashitem} "
                  f"(attempt {self.crash_counts[crashitem]}/{self.max_retries})")
            
            # Reschedule the test
            sched.mark_test_pending(crashitem)
            report.outcome = "rerun"
        else:
            print(f"Test {crashitem} failed {self.max_retries} times, marking as crashed")
            report.outcome = "crashed"
            # Could also mark as skipped or failed depending on needs

def pytest_configure(config):
    if config.pluginmanager.hasplugin("dsession"):
        handler = CrashRecoveryHandler()
        config.pluginmanager.register(handler)

Collection Processing

# In conftest.py
import pytest
from collections import defaultdict

def pytest_xdist_node_collection_finished(node, ids):
    """Process collected test IDs."""
    worker_id = node.workerinput['workerid']
    
    # Analyze collected tests
    test_counts = defaultdict(int)
    for test_id in ids:
        if '::' in test_id:
            module = test_id.split('::')[0]
            test_counts[module] += 1
    
    print(f"Worker {worker_id} collected {len(ids)} tests:")
    for module, count in sorted(test_counts.items()):
        print(f"  {module}: {count} tests")
    
    # Could implement load balancing adjustments here
    # based on collection results

Install with Tessl CLI

npx tessl i tessl/pypi-pytest-xdist

docs

distribution-scheduling.md

hook-specifications.md

index.md

loop-on-fail.md

plugin-configuration.md

session-management.md

worker-detection.md

tile.json