CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pytorch-ignite

A lightweight library to help with training neural networks in PyTorch.

Pending
Overview
Eval results
Files

distributed.mddocs/

Distributed Training

Comprehensive distributed computing support with multiple backends including native PyTorch DDP, Horovod, and XLA/TPU support. PyTorch Ignite provides a unified API for distributed training across different platforms and scales.

Capabilities

Initialization and Setup

Functions for initializing and configuring distributed training backends.

def initialize(backend=None, **kwargs):
    """
    Initialize distributed backend.
    
    Parameters:
    - backend: backend type ('nccl', 'gloo', 'mpi', 'horovod', 'xla-tpu')
    - **kwargs: backend-specific arguments
    
    Supported kwargs:
    - For 'nccl'/'gloo': init_method, rank, world_size, timeout
    - For 'horovod': no additional arguments
    - For 'xla-tpu': no additional arguments
    """

def finalize():
    """Finalize distributed backend and cleanup resources."""

def show_config():
    """Show current distributed configuration."""

Communication Utilities

Core distributed communication primitives for data synchronization.

def sync(group=None):
    """
    Synchronize across all processes.
    
    Parameters:
    - group: process group (optional)
    """

def barrier(group=None):
    """
    Synchronization barrier across all processes.
    
    Parameters:
    - group: process group (optional)
    """

def broadcast(tensor, src=0, group=None):
    """
    Broadcast tensor from source to all processes.
    
    Parameters:
    - tensor: tensor to broadcast
    - src: source rank
    - group: process group (optional)
    
    Returns:
    Broadcasted tensor
    """

def all_reduce(tensor, group=None, op='SUM'):
    """
    All-reduce operation across all processes.
    
    Parameters:
    - tensor: tensor to reduce
    - group: process group (optional)
    - op: reduction operation ('SUM', 'PRODUCT', 'MIN', 'MAX')
    
    Returns:
    Reduced tensor
    """

def all_gather(tensor, group=None):
    """
    All-gather operation across all processes.
    
    Parameters:
    - tensor: tensor to gather
    - group: process group (optional)
    
    Returns:
    List of tensors from all processes
    """

Information Queries

Functions for querying distributed environment information.

def backend():
    """
    Get current distributed backend name.
    
    Returns:
    String name of current backend ('nccl', 'gloo', 'horovod', 'xla-tpu', None)
    """

def available_backends():
    """
    Get list of available distributed backends.
    
    Returns:
    List of available backend names
    """

def model_name():
    """
    Get distributed model name.
    
    Returns:
    String name of distributed model
    """

def get_rank():
    """
    Get current process rank.
    
    Returns:
    Integer rank of current process (0 for single process)
    """

def get_local_rank():
    """
    Get local process rank within node.
    
    Returns:
    Integer local rank of current process
    """

def get_world_size():
    """
    Get total number of processes.
    
    Returns:
    Integer total number of processes (1 for single process)
    """

Process Management

Functions for spawning and managing distributed processes.

def spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn'):
    """
    Spawn distributed processes.
    
    Parameters:
    - fn: function to run in each process
    - args: arguments to pass to function
    - nprocs: number of processes to spawn
    - join: whether to join processes
    - daemon: whether processes are daemons
    - start_method: process start method
    
    Returns:
    Process handles if join=False
    """

class Parallel:
    """
    Parallel execution launcher for distributed training.
    
    Parameters:
    - backend: distributed backend to use
    - nprocs: number of processes
    - **kwargs: additional backend arguments
    """
    def __init__(self, backend=None, nprocs=None, **kwargs): ...
    
    def run(self, fn, *args, **kwargs):
        """Run function in parallel across processes."""

Data Handling

Auto-configuration utilities for distributed data loading and model setup.

def auto_dataloader(dataloader, **kwargs):
    """
    Auto-configure dataloader for distributed training.
    
    Parameters:
    - dataloader: original dataloader
    - **kwargs: additional arguments for DistributedSampler
    
    Returns:
    Configured dataloader with distributed sampler
    """

def auto_model(model, sync_bn=False, **kwargs):
    """
    Auto-configure model for distributed training.
    
    Parameters:
    - model: PyTorch model
    - sync_bn: whether to use synchronized batch normalization
    - **kwargs: additional arguments for DistributedDataParallel
    
    Returns:
    Wrapped model for distributed training
    """

def auto_optim(optimizer, **kwargs):
    """
    Auto-configure optimizer for distributed training.
    
    Parameters:
    - optimizer: PyTorch optimizer
    - **kwargs: additional arguments
    
    Returns:
    Configured optimizer (or Horovod DistributedOptimizer)
    """

Capability Detection

Functions for checking distributed backend capabilities.

def has_native_dist_support():
    """
    Check if native PyTorch distributed support is available.
    
    Returns:
    Boolean indicating availability
    """

def has_hvd_support():
    """
    Check if Horovod support is available.
    
    Returns:
    Boolean indicating availability
    """

def has_xla_support():
    """
    Check if XLA/TPU support is available.
    
    Returns:
    Boolean indicating availability
    """

Utilities

Convenience utilities for distributed training workflows.

def one_rank_only(rank=0, with_barrier=True):
    """
    Decorator to execute function on single rank only.
    
    Parameters:
    - rank: rank to execute on (default: 0)
    - with_barrier: whether to add barrier after execution
    
    Returns:
    Decorator function
    """

Usage Examples

Basic Distributed Training Setup

import ignite.distributed as idist
from ignite.engine import create_supervised_trainer

# Initialize distributed backend
idist.initialize()

# Auto-configure model, optimizer, and dataloader
model = idist.auto_model(model)
optimizer = idist.auto_optim(optimizer)
train_loader = idist.auto_dataloader(train_loader)

# Create trainer
trainer = create_supervised_trainer(model, optimizer, criterion)

# Run training
trainer.run(train_loader, max_epochs=100)

# Finalize
idist.finalize()

Multi-GPU Training with DDP

import torch.multiprocessing as mp
import ignite.distributed as idist

def training(local_rank, config):
    # Initialize distributed backend
    idist.initialize("nccl")
    
    # Setup model and data
    model = create_model()
    model = idist.auto_model(model)
    
    train_loader = create_dataloader()
    train_loader = idist.auto_dataloader(train_loader)
    
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    optimizer = idist.auto_optim(optimizer)
    
    # Create trainer
    trainer = create_supervised_trainer(model, optimizer, criterion)
    
    # Add logging only on rank 0
    @trainer.on(Events.ITERATION_COMPLETED(every=100))
    @idist.one_rank_only()
    def log_training(engine):
        print(f"Rank {idist.rank()}: Iteration {engine.state.iteration}, Loss: {engine.state.output}")
    
    # Run training
    trainer.run(train_loader, max_epochs=10)
    
    # Finalize
    idist.finalize()

if __name__ == "__main__":
    config = {}
    nprocs = torch.cuda.device_count()
    mp.spawn(training, args=(config,), nprocs=nprocs)

Horovod Training

import ignite.distributed as idist

# Initialize Horovod backend
idist.initialize("horovod")

# Auto-configure components
model = idist.auto_model(model)
optimizer = idist.auto_optim(optimizer)  # Wraps with Horovod DistributedOptimizer
train_loader = idist.auto_dataloader(train_loader)

# Broadcast initial parameters
idist.all_reduce(torch.tensor(0.0))  # Dummy reduce to ensure initialization

# Create trainer
trainer = create_supervised_trainer(model, optimizer, criterion)

# Run training
trainer.run(train_loader, max_epochs=100)

# Finalize
idist.finalize()

XLA/TPU Training

import ignite.distributed as idist

# Initialize XLA backend
idist.initialize("xla-tpu")

# Auto-configure for TPU
model = idist.auto_model(model)
optimizer = idist.auto_optim(optimizer)
train_loader = idist.auto_dataloader(train_loader)

# Create trainer
trainer = create_supervised_trainer(model, optimizer, criterion)

# Run training
trainer.run(train_loader, max_epochs=100)

# Finalize
idist.finalize()

Custom Communication

import ignite.distributed as idist
import torch

# Check if distributed
if idist.world_size() > 1:
    # Broadcast model parameters from rank 0
    for param in model.parameters():
        idist.broadcast(param.data, src=0)
    
    # All-reduce gradients
    for param in model.parameters():
        if param.grad is not None:
            idist.all_reduce(param.grad.data)
            param.grad.data /= idist.world_size()
    
    # Gather loss from all processes
    local_loss = torch.tensor(loss_value)
    all_losses = idist.all_gather(local_loss)
    avg_loss = torch.mean(torch.stack(all_losses))

Process Management with Spawn

import ignite.distributed as idist

def train_worker(rank, world_size, config):
    # Worker function for each process
    print(f"Worker {rank} of {world_size} started")
    
    # Initialize backend within worker
    idist.initialize()
    
    # Training code here
    # ...
    
    idist.finalize()

# Spawn workers
idist.spawn(
    train_worker,
    args=(4, config),  # world_size, config
    nprocs=4,
    join=True
)

Conditional Execution

import ignite.distributed as idist

# Execute only on rank 0
@idist.one_rank_only(rank=0)
def save_checkpoint():
    torch.save(model.state_dict(), 'checkpoint.pth')

# Execute on rank 0 with barrier
@idist.one_rank_only(rank=0, with_barrier=True)
def log_metrics():
    print(f"Epoch completed, metrics: {metrics}")

# Manual rank checking
if idist.rank() == 0:
    print("This runs only on the master process")

Install with Tessl CLI

npx tessl i tessl/pypi-pytorch-ignite

docs

base-exceptions.md

contrib.md

distributed.md

engine.md

handlers.md

index.md

metrics.md

utils.md

tile.json