CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-accelerate

HuggingFace Accelerate is a PyTorch library that simplifies distributed and mixed-precision training by abstracting away the boilerplate code needed for multi-GPU, TPU, and mixed-precision setups.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

distributed-operations.mddocs/

Distributed Operations

Low-level distributed communication primitives for gathering, broadcasting, reducing, and synchronizing data across processes. These functions provide the building blocks for distributed training and inference operations.

Capabilities

Basic Communication Primitives

Core distributed operations for communicating tensors and data between processes.

def broadcast(tensor: torch.Tensor, from_process: int = 0):
    """
    Broadcast tensor from one process to all other processes.
    
    Parameters:
    - tensor: Tensor to broadcast (modified in-place on receiving processes)
    - from_process: Source process rank (default: 0)
    """

def gather(tensor: torch.Tensor):
    """
    Gather tensors from all processes to the main process.
    
    Parameters:
    - tensor: Tensor to gather from current process
    
    Returns:
    Concatenated tensor from all processes (only on main process, None elsewhere)
    """

def reduce(tensor: torch.Tensor, reduction: str = "mean"):
    """
    Reduce tensor across all processes using specified operation.
    
    Parameters:
    - tensor: Tensor to reduce (modified in-place)
    - reduction: Reduction operation ("mean", "sum")
    
    Returns:
    Reduced tensor (same shape as input)
    """

def pad_across_processes(
    tensor: torch.Tensor,
    dim: int = 0,
    pad_index: int = 0,
    pad_first: bool = False
):
    """
    Pad tensor to same size across all processes.
    
    Useful for gathering tensors of different sizes by padding
    smaller tensors to match the largest tensor size.
    
    Parameters:
    - tensor: Tensor to pad
    - dim: Dimension along which to pad
    - pad_index: Value to use for padding
    - pad_first: Whether to pad at beginning or end
    
    Returns:
    Padded tensor with same size across all processes
    """

Object Communication

Functions for communicating arbitrary Python objects between processes.

def broadcast_object_list(
    objects: list,
    from_process: int = 0
):
    """
    Broadcast list of Python objects from one process to all others.
    
    Parameters:
    - objects: List of objects to broadcast (modified in-place on receiving processes)
    - from_process: Source process rank
    """

def gather_object(obj):
    """
    Gather Python objects from all processes.
    
    Parameters:
    - obj: Object to gather from current process
    
    Returns:
    List of objects from all processes (only on main process, None elsewhere)
    """

Advanced Tensor Operations

Higher-level operations for tensor manipulation in distributed settings.

def concatenate(data, dim: int = 0):
    """
    Concatenate tensors or nested data structures along specified dimension.
    
    Handles complex nested structures including lists, tuples, and dictionaries
    containing tensors or other concatenatable objects.
    
    Parameters:
    - data: Data structure containing tensors to concatenate
    - dim: Dimension along which to concatenate
    
    Returns:
    Concatenated data structure with same nesting as input
    """

def slice_tensors(data, tensor_slice: slice | int):
    """
    Slice tensors in nested data structures.
    
    Applies the same slice operation to all tensors found in nested
    lists, tuples, and dictionaries.
    
    Parameters:
    - data: Nested data structure containing tensors
    - tensor_slice: Slice object or integer index to apply
    
    Returns:
    Sliced data structure maintaining original nesting
    """

def send_to_device(
    tensor: torch.Tensor,
    device: torch.device | str,
    non_blocking: bool = False,
    skip_keys: list[str] | str | None = None
):
    """
    Move tensor or nested data structure to specified device.
    
    Recursively moves all tensors in nested structures while preserving
    the original data organization.
    
    Parameters:
    - tensor: Tensor or nested structure to move
    - device: Target device
    - non_blocking: Whether to use non-blocking transfer
    - skip_keys: Keys to skip when moving nested dictionaries
    
    Returns:
    Data moved to target device
    """

Data Structure Utilities

Functions for analyzing and manipulating tensor data structures.

def find_batch_size(data):
    """
    Find batch size from tensor or nested data structure.
    
    Searches through nested structures to find the first tensor
    and returns its size along dimension 0 (batch dimension).
    
    Parameters:
    - data: Tensor or nested structure containing tensors
    
    Returns:
    Batch size (int) or None if no tensors found
    """

def find_device(*args):
    """
    Find device from tensor arguments.
    
    Searches through arguments to find the first tensor and
    returns its device.
    
    Parameters:
    - *args: Arguments that may contain tensors
    
    Returns:
    torch.device of first tensor found, or None
    """

def get_data_structure(data):
    """
    Analyze nested data structure containing tensors.
    
    Returns metadata about the structure including tensor shapes,
    devices, and nesting patterns.
    
    Parameters:
    - data: Nested data structure to analyze
    
    Returns:
    DataStructure object describing the input
    """

def is_torch_tensor(data):
    """
    Check if data is a PyTorch tensor.
    
    Parameters:
    - data: Object to check
    
    Returns:
    Boolean indicating if data is a torch.Tensor
    """

def is_tensor_information(data):
    """
    Check if data contains tensor metadata information.
    
    Parameters:
    - data: Object to check
    
    Returns:
    Boolean indicating if data is TensorInformation
    """

Process Synchronization

Functions for coordinating execution across distributed processes.

def wait_for_everyone():
    """
    Synchronization barrier - all processes wait until everyone reaches this point.
    
    Ensures all processes are synchronized before continuing execution.
    Essential for coordinating distributed operations.
    """

def synchronize_rng_states(rng_types: list[str] | None = None):
    """
    Synchronize random number generator states across all processes.
    
    Ensures reproducible results in distributed training by making
    all processes use the same random state.
    
    Parameters:
    - rng_types: Types of RNG to synchronize ("torch", "cuda", "xla")
                 If None, synchronizes all available types
    """

def set_seed(seed: int, device_specific: bool = False):
    """
    Set random seed across all processes and libraries.
    
    Sets seeds for PyTorch, NumPy, Python random, and other libraries
    to ensure reproducible results.
    
    Parameters:
    - seed: Random seed value
    - device_specific: Whether to use device-specific seeding
    """

Context Managers

Context managers for controlling distributed behavior during specific operations.

class GatheredParameters:
    """
    Context manager for gathering distributed parameters.
    
    Temporarily gathers sharded parameters from all processes,
    enabling operations that require the full parameter tensor.
    """
    
    def __init__(self, *models, modifier_rank: int | None = None):
        """
        Initialize parameter gathering context.
        
        Parameters:
        - *models: Models with parameters to gather
        - modifier_rank: Process rank that can modify parameters
        """

Precision Conversion

Functions for converting tensor precision in distributed settings.

def convert_to_fp32(tensor: torch.Tensor):
    """
    Convert tensor to FP32 precision.
    
    Parameters:
    - tensor: Tensor to convert
    
    Returns:
    Tensor converted to torch.float32
    """

def convert_outputs_to_fp32(data):
    """
    Convert nested data structure outputs to FP32.
    
    Recursively converts all tensors in nested structures to FP32,
    useful for metric computation and logging.
    
    Parameters:
    - data: Nested structure containing tensors
    
    Returns:
    Data structure with all tensors converted to FP32
    """

def honor_type(obj, generator):
    """
    Ensure generated object maintains same type hierarchy as original.
    
    Parameters:
    - obj: Original object to match type of
    - generator: Generator producing new values
    
    Returns:
    Object of same type as obj with values from generator
    """

Usage Examples

Basic Distributed Communication

from accelerate import broadcast, gather, reduce
import torch

# Initialize distributed training first
accelerator = Accelerator()

# Broadcast tensor from main process to all processes
if accelerator.is_main_process:
    data = torch.randn(10, 20)
else:
    data = torch.zeros(10, 20)

broadcast(data, from_process=0)  # Now all processes have the same data

# Gather results from all processes
local_result = model(local_batch)
all_results = gather(local_result)  # Only main process gets concatenated results

# Reduce loss across processes
loss = compute_loss(outputs, targets)
average_loss = reduce(loss, reduction="mean")

Handling Variable-Size Batches

from accelerate import pad_across_processes, gather

# When batch sizes differ across processes
predictions = model(batch)  # Different sizes on each process

# Pad to same size before gathering
padded_predictions = pad_across_processes(predictions, dim=0, pad_index=-100)
all_predictions = gather(padded_predictions)

# Remove padding after gathering (on main process)
if accelerator.is_main_process:
    # Remove padded values
    valid_predictions = all_predictions[all_predictions != -100]

Complex Data Structure Communication

from accelerate import broadcast_object_list, gather_object

# Broadcast complex configuration
if accelerator.is_main_process:
    config = {
        "model_settings": {"layers": 12, "hidden_size": 768},
        "training_params": [0.001, 0.9, 0.999],
        "metadata": {"experiment_name": "test_run", "version": "1.0"}
    }
else:
    config = None

broadcast_object_list([config])
config = config[0]  # Extract from list

# Gather evaluation results
eval_metrics = {"accuracy": 0.95, "f1": 0.93}
all_metrics = gather_object(eval_metrics)

if accelerator.is_main_process:
    # all_metrics is list of metrics from each process
    avg_accuracy = sum(m["accuracy"] for m in all_metrics) / len(all_metrics)

Advanced Tensor Manipulation

from accelerate import concatenate, slice_tensors, send_to_device

# Work with nested data structures
batch = {
    "input_ids": torch.tensor([[1, 2, 3], [4, 5, 6]]),
    "attention_mask": torch.tensor([[1, 1, 1], [1, 1, 0]]),
    "labels": torch.tensor([0, 1])
}

# Move entire structure to GPU
batch_gpu = send_to_device(batch, "cuda:0")

# Slice first sample from nested structure
first_sample = slice_tensors(batch, 0)

# Concatenate batches from multiple sources
batches = [batch1, batch2, batch3]
combined_batch = concatenate(batches, dim=0)

Process Synchronization and Reproducibility

from accelerate import wait_for_everyone, set_seed, synchronize_rng_states

# Set reproducible seeds
set_seed(42, device_specific=True)

# Synchronize RNG states across processes
synchronize_rng_states(["torch", "cuda"])

# Coordinate processes for sequential operations
if accelerator.is_main_process:
    # Download and prepare dataset
    dataset = download_and_preprocess()

wait_for_everyone()  # Wait for main process to finish

# Now all processes can safely access the dataset
dataloader = DataLoader(dataset, batch_size=32)

Install with Tessl CLI

npx tessl i tessl/pypi-accelerate

docs

big-modeling.md

cli-commands.md

configuration.md

core-training.md

distributed-operations.md

index.md

utilities.md

tile.json