A lightweight library to help with training neural networks in PyTorch.
—
Comprehensive distributed computing support with multiple backends including native PyTorch DDP, Horovod, and XLA/TPU support. PyTorch Ignite provides a unified API for distributed training across different platforms and scales.
Functions for initializing and configuring distributed training backends.
def initialize(backend=None, **kwargs):
"""
Initialize distributed backend.
Parameters:
- backend: backend type ('nccl', 'gloo', 'mpi', 'horovod', 'xla-tpu')
- **kwargs: backend-specific arguments
Supported kwargs:
- For 'nccl'/'gloo': init_method, rank, world_size, timeout
- For 'horovod': no additional arguments
- For 'xla-tpu': no additional arguments
"""
def finalize():
"""Finalize distributed backend and cleanup resources."""
def show_config():
"""Show current distributed configuration."""Core distributed communication primitives for data synchronization.
def sync(group=None):
"""
Synchronize across all processes.
Parameters:
- group: process group (optional)
"""
def barrier(group=None):
"""
Synchronization barrier across all processes.
Parameters:
- group: process group (optional)
"""
def broadcast(tensor, src=0, group=None):
"""
Broadcast tensor from source to all processes.
Parameters:
- tensor: tensor to broadcast
- src: source rank
- group: process group (optional)
Returns:
Broadcasted tensor
"""
def all_reduce(tensor, group=None, op='SUM'):
"""
All-reduce operation across all processes.
Parameters:
- tensor: tensor to reduce
- group: process group (optional)
- op: reduction operation ('SUM', 'PRODUCT', 'MIN', 'MAX')
Returns:
Reduced tensor
"""
def all_gather(tensor, group=None):
"""
All-gather operation across all processes.
Parameters:
- tensor: tensor to gather
- group: process group (optional)
Returns:
List of tensors from all processes
"""Functions for querying distributed environment information.
def backend():
"""
Get current distributed backend name.
Returns:
String name of current backend ('nccl', 'gloo', 'horovod', 'xla-tpu', None)
"""
def available_backends():
"""
Get list of available distributed backends.
Returns:
List of available backend names
"""
def model_name():
"""
Get distributed model name.
Returns:
String name of distributed model
"""
def get_rank():
"""
Get current process rank.
Returns:
Integer rank of current process (0 for single process)
"""
def get_local_rank():
"""
Get local process rank within node.
Returns:
Integer local rank of current process
"""
def get_world_size():
"""
Get total number of processes.
Returns:
Integer total number of processes (1 for single process)
"""Functions for spawning and managing distributed processes.
def spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn'):
"""
Spawn distributed processes.
Parameters:
- fn: function to run in each process
- args: arguments to pass to function
- nprocs: number of processes to spawn
- join: whether to join processes
- daemon: whether processes are daemons
- start_method: process start method
Returns:
Process handles if join=False
"""
class Parallel:
"""
Parallel execution launcher for distributed training.
Parameters:
- backend: distributed backend to use
- nprocs: number of processes
- **kwargs: additional backend arguments
"""
def __init__(self, backend=None, nprocs=None, **kwargs): ...
def run(self, fn, *args, **kwargs):
"""Run function in parallel across processes."""Auto-configuration utilities for distributed data loading and model setup.
def auto_dataloader(dataloader, **kwargs):
"""
Auto-configure dataloader for distributed training.
Parameters:
- dataloader: original dataloader
- **kwargs: additional arguments for DistributedSampler
Returns:
Configured dataloader with distributed sampler
"""
def auto_model(model, sync_bn=False, **kwargs):
"""
Auto-configure model for distributed training.
Parameters:
- model: PyTorch model
- sync_bn: whether to use synchronized batch normalization
- **kwargs: additional arguments for DistributedDataParallel
Returns:
Wrapped model for distributed training
"""
def auto_optim(optimizer, **kwargs):
"""
Auto-configure optimizer for distributed training.
Parameters:
- optimizer: PyTorch optimizer
- **kwargs: additional arguments
Returns:
Configured optimizer (or Horovod DistributedOptimizer)
"""Functions for checking distributed backend capabilities.
def has_native_dist_support():
"""
Check if native PyTorch distributed support is available.
Returns:
Boolean indicating availability
"""
def has_hvd_support():
"""
Check if Horovod support is available.
Returns:
Boolean indicating availability
"""
def has_xla_support():
"""
Check if XLA/TPU support is available.
Returns:
Boolean indicating availability
"""Convenience utilities for distributed training workflows.
def one_rank_only(rank=0, with_barrier=True):
"""
Decorator to execute function on single rank only.
Parameters:
- rank: rank to execute on (default: 0)
- with_barrier: whether to add barrier after execution
Returns:
Decorator function
"""import ignite.distributed as idist
from ignite.engine import create_supervised_trainer
# Initialize distributed backend
idist.initialize()
# Auto-configure model, optimizer, and dataloader
model = idist.auto_model(model)
optimizer = idist.auto_optim(optimizer)
train_loader = idist.auto_dataloader(train_loader)
# Create trainer
trainer = create_supervised_trainer(model, optimizer, criterion)
# Run training
trainer.run(train_loader, max_epochs=100)
# Finalize
idist.finalize()import torch.multiprocessing as mp
import ignite.distributed as idist
def training(local_rank, config):
# Initialize distributed backend
idist.initialize("nccl")
# Setup model and data
model = create_model()
model = idist.auto_model(model)
train_loader = create_dataloader()
train_loader = idist.auto_dataloader(train_loader)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
optimizer = idist.auto_optim(optimizer)
# Create trainer
trainer = create_supervised_trainer(model, optimizer, criterion)
# Add logging only on rank 0
@trainer.on(Events.ITERATION_COMPLETED(every=100))
@idist.one_rank_only()
def log_training(engine):
print(f"Rank {idist.rank()}: Iteration {engine.state.iteration}, Loss: {engine.state.output}")
# Run training
trainer.run(train_loader, max_epochs=10)
# Finalize
idist.finalize()
if __name__ == "__main__":
config = {}
nprocs = torch.cuda.device_count()
mp.spawn(training, args=(config,), nprocs=nprocs)import ignite.distributed as idist
# Initialize Horovod backend
idist.initialize("horovod")
# Auto-configure components
model = idist.auto_model(model)
optimizer = idist.auto_optim(optimizer) # Wraps with Horovod DistributedOptimizer
train_loader = idist.auto_dataloader(train_loader)
# Broadcast initial parameters
idist.all_reduce(torch.tensor(0.0)) # Dummy reduce to ensure initialization
# Create trainer
trainer = create_supervised_trainer(model, optimizer, criterion)
# Run training
trainer.run(train_loader, max_epochs=100)
# Finalize
idist.finalize()import ignite.distributed as idist
# Initialize XLA backend
idist.initialize("xla-tpu")
# Auto-configure for TPU
model = idist.auto_model(model)
optimizer = idist.auto_optim(optimizer)
train_loader = idist.auto_dataloader(train_loader)
# Create trainer
trainer = create_supervised_trainer(model, optimizer, criterion)
# Run training
trainer.run(train_loader, max_epochs=100)
# Finalize
idist.finalize()import ignite.distributed as idist
import torch
# Check if distributed
if idist.world_size() > 1:
# Broadcast model parameters from rank 0
for param in model.parameters():
idist.broadcast(param.data, src=0)
# All-reduce gradients
for param in model.parameters():
if param.grad is not None:
idist.all_reduce(param.grad.data)
param.grad.data /= idist.world_size()
# Gather loss from all processes
local_loss = torch.tensor(loss_value)
all_losses = idist.all_gather(local_loss)
avg_loss = torch.mean(torch.stack(all_losses))import ignite.distributed as idist
def train_worker(rank, world_size, config):
# Worker function for each process
print(f"Worker {rank} of {world_size} started")
# Initialize backend within worker
idist.initialize()
# Training code here
# ...
idist.finalize()
# Spawn workers
idist.spawn(
train_worker,
args=(4, config), # world_size, config
nprocs=4,
join=True
)import ignite.distributed as idist
# Execute only on rank 0
@idist.one_rank_only(rank=0)
def save_checkpoint():
torch.save(model.state_dict(), 'checkpoint.pth')
# Execute on rank 0 with barrier
@idist.one_rank_only(rank=0, with_barrier=True)
def log_metrics():
print(f"Epoch completed, metrics: {metrics}")
# Manual rank checking
if idist.rank() == 0:
print("This runs only on the master process")Install with Tessl CLI
npx tessl i tessl/pypi-pytorch-ignite