CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-torch

Deep learning framework providing tensor computation with GPU acceleration and dynamic neural networks with automatic differentiation

Overview
Eval results
Files

devices-distributed.mddocs/

Device and Distributed Computing

Device management, CUDA operations, distributed training, and multi-GPU support for scaling deep learning workloads across different hardware platforms including CPU, CUDA, MPS, and XPU.

Capabilities

Device Management

Core device detection, selection, and management functions.

class device:
    """Device specification for tensor placement."""
    def __init__(self, device_string: str): ...
    def __str__(self) -> str: ...
    def __repr__(self) -> str: ...

def get_default_device() -> device:
    """Get the default device for new tensors."""

def set_default_device(device) -> None:
    """Set the default device for new tensors."""

def get_device(tensor_or_device) -> device:
    """Get device of tensor or validate device specification."""

CUDA Operations (torch.cuda)

CUDA device management and GPU acceleration functions.

def cuda.is_available() -> bool:
    """Check if CUDA is available."""

def cuda.device_count() -> int:
    """Number of available CUDA devices."""

def cuda.get_device_name(device=None) -> str:
    """Get name of CUDA device."""

def cuda.get_device_properties(device) -> _CudaDeviceProperties:
    """Get properties of CUDA device."""

def cuda.get_device_capability(device=None) -> Tuple[int, int]:
    """Get compute capability of device."""

def cuda.current_device() -> int:
    """Get current CUDA device index."""

def cuda.set_device(device) -> None:
    """Set current CUDA device."""

def cuda.device(device) -> ContextManager:
    """Context manager for device selection."""

def cuda.stream(stream=None) -> ContextManager:
    """Context manager for CUDA stream selection."""

def cuda.synchronize(device=None) -> None:
    """Synchronize all kernels on device."""

def cuda.is_initialized() -> bool:
    """Check if CUDA is initialized."""

def cuda.init() -> None:
    """Initialize CUDA."""

CUDA Memory Management

GPU memory allocation, caching, and profiling.

def cuda.empty_cache() -> None:
    """Free unused cached memory."""

def cuda.memory_allocated(device=None) -> int:
    """Get currently allocated memory in bytes."""

def cuda.max_memory_allocated(device=None) -> int:
    """Get peak allocated memory in bytes."""

def cuda.memory_reserved(device=None) -> int:
    """Get currently reserved memory in bytes."""

def cuda.max_memory_reserved(device=None) -> int:
    """Get peak reserved memory in bytes."""

def cuda.memory_cached(device=None) -> int:
    """Get currently cached memory in bytes."""

def cuda.max_memory_cached(device=None) -> int:
    """Get peak cached memory in bytes."""

def cuda.reset_max_memory_allocated(device=None) -> None:
    """Reset peak memory stats."""

def cuda.reset_max_memory_cached(device=None) -> None:
    """Reset peak cache stats."""

def cuda.memory_stats(device=None) -> Dict[str, Any]:
    """Get comprehensive memory statistics."""

def cuda.memory_summary(device=None, abbreviated=False) -> str:
    """Get human-readable memory summary."""

def cuda.memory_snapshot() -> List[Dict[str, Any]]:
    """Get detailed memory snapshot."""

def cuda.set_per_process_memory_fraction(fraction: float, device=None) -> None:
    """Set memory fraction for process."""

def cuda.get_per_process_memory_fraction(device=None) -> float:
    """Get memory fraction for process."""

CUDA Streams and Events

Asynchronous execution control for GPU operations.

class cuda.Stream:
    """CUDA stream for asynchronous operations."""
    def __init__(self, device=None, priority=0): ...
    def wait_event(self, event): ...
    def wait_stream(self, stream): ...
    def record_event(self, event=None): ...
    def query(self) -> bool: ...
    def synchronize(self): ...

class cuda.Event:
    """CUDA event for synchronization."""
    def __init__(self, enable_timing=False, blocking=False, interprocess=False): ...
    def record(self, stream=None): ...
    def wait(self, stream=None): ...
    def query(self) -> bool: ...
    def synchronize(self): ...
    def elapsed_time(self, event) -> float: ...

def cuda.current_stream(device=None) -> cuda.Stream:
    """Get current CUDA stream."""

def cuda.default_stream(device=None) -> cuda.Stream:
    """Get default CUDA stream."""

def cuda.set_stream(stream) -> None:
    """Set current CUDA stream."""

CUDA Random Number Generation

GPU random number generation functions.

def cuda.manual_seed(seed: int) -> None:
    """Set CUDA random seed."""

def cuda.manual_seed_all(seed: int) -> None:
    """Set CUDA random seed for all devices."""

def cuda.seed() -> None:
    """Generate random CUDA seed."""

def cuda.seed_all() -> None:
    """Generate random CUDA seed for all devices."""

def cuda.initial_seed() -> int:
    """Get initial CUDA random seed."""

def cuda.get_rng_state(device='cuda') -> Tensor:
    """Get CUDA random number generator state."""

def cuda.get_rng_state_all() -> List[Tensor]:
    """Get CUDA RNG state for all devices."""

def cuda.set_rng_state(new_state: Tensor, device='cuda') -> None:
    """Set CUDA random number generator state."""

def cuda.set_rng_state_all(new_states: List[Tensor]) -> None:
    """Set CUDA RNG state for all devices."""

MPS Operations (torch.mps)

Metal Performance Shaders for Apple Silicon GPU acceleration.

def mps.is_available() -> bool:
    """Check if MPS is available."""

def mps.is_built() -> bool:
    """Check if PyTorch was built with MPS support."""

def mps.get_default_generator() -> Generator:
    """Get default MPS random number generator."""

def mps.manual_seed(seed: int) -> None:
    """Set MPS random seed."""

def mps.seed() -> None:
    """Generate random MPS seed."""

def mps.synchronize() -> None:
    """Synchronize MPS operations."""

def mps.empty_cache() -> None:
    """Free unused MPS memory."""

def mps.set_per_process_memory_fraction(fraction: float) -> None:
    """Set MPS memory fraction."""

class mps.Event:
    """MPS event for synchronization."""
    def __init__(self): ...
    def query(self) -> bool: ...
    def synchronize(self): ...
    def wait(self): ...

XPU Operations (torch.xpu)

Intel XPU backend support for Intel GPUs.

def xpu.is_available() -> bool:
    """Check if XPU is available."""

def xpu.device_count() -> int:
    """Number of available XPU devices."""

def xpu.get_device_name(device=None) -> str:
    """Get name of XPU device."""

def xpu.current_device() -> int:
    """Get current XPU device index."""

def xpu.set_device(device) -> None:
    """Set current XPU device."""

def xpu.synchronize(device=None) -> None:
    """Synchronize XPU operations."""

def xpu.empty_cache() -> None:
    """Free unused XPU memory."""

Distributed Computing (torch.distributed)

Distributed training and multi-process communication.

def distributed.init_process_group(backend: str, init_method=None, timeout=default_pg_timeout, 
                                 world_size=-1, rank=-1, store=None, group_name='', pg_options=None) -> None:
    """Initialize distributed process group."""

def distributed.destroy_process_group(group=None) -> None:
    """Destroy process group."""

def distributed.get_rank(group=None) -> int:
    """Get rank of current process."""

def distributed.get_world_size(group=None) -> int:
    """Get number of processes in group."""

def distributed.is_available() -> bool:
    """Check if distributed package is available."""

def distributed.is_initialized() -> bool:
    """Check if distributed process group is initialized."""

def distributed.is_mpi_available() -> bool:
    """Check if MPI backend is available."""

def distributed.is_nccl_available() -> bool:
    """Check if NCCL backend is available."""

def distributed.is_gloo_available() -> bool:
    """Check if Gloo backend is available."""

def distributed.is_torchelastic_launched() -> bool:
    """Check if launched with TorchElastic."""

def distributed.get_backend(group=None) -> str:
    """Get backend of process group."""

def distributed.barrier(group=None, async_op=False) -> Optional[Work]:
    """Synchronize all processes."""

Collective Communication Operations

Distributed communication primitives for multi-GPU training.

def distributed.broadcast(tensor: Tensor, src: int, group=None, async_op=False) -> Optional[Work]:
    """Broadcast tensor from source to all processes."""

def distributed.all_reduce(tensor: Tensor, op=ReduceOp.SUM, group=None, async_op=False) -> Optional[Work]:
    """Reduce tensor across all processes."""

def distributed.reduce(tensor: Tensor, dst: int, op=ReduceOp.SUM, group=None, async_op=False) -> Optional[Work]:
    """Reduce tensor to destination process."""

def distributed.all_gather(tensor_list: List[Tensor], tensor: Tensor, group=None, async_op=False) -> Optional[Work]:
    """Gather tensors from all processes."""

def distributed.gather(tensor: Tensor, gather_list=None, dst=0, group=None, async_op=False) -> Optional[Work]:
    """Gather tensors to destination process."""

def distributed.scatter(tensor: Tensor, scatter_list=None, src=0, group=None, async_op=False) -> Optional[Work]:
    """Scatter tensors from source process."""

def distributed.reduce_scatter(output: Tensor, input_list: List[Tensor], op=ReduceOp.SUM, group=None, async_op=False) -> Optional[Work]:
    """Reduce and scatter tensors."""

def distributed.all_to_all(output_tensor_list: List[Tensor], input_tensor_list: List[Tensor], group=None, async_op=False) -> Optional[Work]:
    """All-to-all communication."""

def distributed.send(tensor: Tensor, dst: int, group=None, tag=0) -> None:
    """Send tensor to destination process."""

def distributed.recv(tensor: Tensor, src: int, group=None, tag=0) -> None:
    """Receive tensor from source process."""

def distributed.isend(tensor: Tensor, dst: int, group=None, tag=0) -> Work:
    """Non-blocking send."""

def distributed.irecv(tensor: Tensor, src: int, group=None, tag=0) -> Work:
    """Non-blocking receive."""

Data Parallel Training

Distributed data parallel training utilities.

class nn.DataParallel(Module):
    """Data parallel wrapper for single-machine multi-GPU."""
    def __init__(self, module, device_ids=None, output_device=None, dim=0): ...
    def forward(self, *inputs, **kwargs): ...

class nn.parallel.DistributedDataParallel(Module):
    """Distributed data parallel for multi-machine training."""
    def __init__(self, module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True,
                 process_group=None, bucket_cap_mb=25, find_unused_parameters=False, 
                 check_reduction=False, gradient_as_bucket_view=False): ...
    def forward(self, *inputs, **kwargs): ...
    def no_sync(self) -> ContextManager: ...

Process Groups

Advanced process group management for flexible distributed training.

class distributed.ProcessGroup:
    """Process group for collective operations."""

def distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None) -> ProcessGroup:
    """Create new process group."""

def distributed.new_subgroups(group_size=None, group=None, timeout=None, backend=None, pg_options=None) -> List[ProcessGroup]:
    """Create subgroups."""

def distributed.new_subgroups_by_enumeration(ranks_per_subgroup_list, timeout=None, backend=None, pg_options=None) -> List[ProcessGroup]:
    """Create subgroups by enumeration."""

Distributed Utilities

Additional utilities for distributed training.

def distributed.get_process_group_ranks(group) -> List[int]:
    """Get ranks in process group."""

def distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False) -> None:
    """Barrier with monitoring and timeout."""

class distributed.Store:
    """Distributed key-value store."""
    def get(self, key: str) -> bytes: ...
    def set(self, key: str, value: bytes): ...
    def add(self, key: str, value: int) -> int: ...
    def compare_set(self, key: str, expected_value: bytes, desired_value: bytes) -> bytes: ...
    def wait(self, keys: List[str], timeout=None): ...

class distributed.TCPStore(Store):
    """TCP-based distributed store."""
    def __init__(self, host_name: str, port: int, world_size=None, is_master=False, timeout=None): ...

class distributed.FileStore(Store):
    """File-based distributed store."""
    def __init__(self, file_name: str, world_size=-1): ...

class distributed.HashStore(Store):
    """Hash-based distributed store."""
    def __init__(self): ...

Usage Examples

Basic CUDA Operations

import torch

# Check CUDA availability
if torch.cuda.is_available():
    print(f"CUDA devices: {torch.cuda.device_count()}")
    print(f"Current device: {torch.cuda.current_device()}")
    print(f"Device name: {torch.cuda.get_device_name()}")
    
    # Create tensors on GPU
    device = torch.device('cuda')
    x = torch.randn(1000, 1000, device=device)
    y = torch.randn(1000, 1000, device=device)
    
    # GPU operations
    z = torch.matmul(x, y)
    
    # Memory management
    print(f"Allocated memory: {torch.cuda.memory_allocated() / 1e6:.1f} MB")
    print(f"Cached memory: {torch.cuda.memory_reserved() / 1e6:.1f} MB")
    
    # Free unused memory
    torch.cuda.empty_cache()
    
    # Move back to CPU
    z_cpu = z.cpu()
else:
    print("CUDA not available")

Multi-GPU Data Parallel

import torch
import torch.nn as nn

# Check for multiple GPUs
if torch.cuda.device_count() > 1:
    print(f"Using {torch.cuda.device_count()} GPUs")
    
    # Define model
    model = nn.Sequential(
        nn.Linear(1000, 500),
        nn.ReLU(),
        nn.Linear(500, 100),
        nn.ReLU(),
        nn.Linear(100, 10)
    )
    
    # Wrap with DataParallel
    model = nn.DataParallel(model)
    model = model.cuda()
    
    # Create batch data
    batch_size = 64
    x = torch.randn(batch_size, 1000).cuda()
    
    # Forward pass uses all available GPUs
    output = model(x)
    print(f"Output shape: {output.shape}")
    print(f"Output device: {output.device}")

CUDA Streams and Events

import torch
import time

if torch.cuda.is_available():
    device = torch.device('cuda')
    
    # Create streams
    stream1 = torch.cuda.Stream()
    stream2 = torch.cuda.Stream()
    
    # Create events
    start_event = torch.cuda.Event(enable_timing=True)
    end_event = torch.cuda.Event(enable_timing=True)
    
    # Asynchronous operations
    x = torch.randn(1000, 1000, device=device)
    y = torch.randn(1000, 1000, device=device)
    
    # Record start time
    start_event.record()
    
    # Operations on different streams
    with torch.cuda.stream(stream1):
        z1 = torch.matmul(x, y)
    
    with torch.cuda.stream(stream2):
        z2 = torch.matmul(y, x)
    
    # Record end time
    end_event.record()
    
    # Synchronize
    torch.cuda.synchronize()
    
    # Get elapsed time
    elapsed_time = start_event.elapsed_time(end_event)
    print(f"Elapsed time: {elapsed_time:.2f} ms")

Distributed Data Parallel Training

import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
import os

def setup(rank, world_size):
    """Initialize distributed training."""
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    
    # Initialize process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)

def cleanup():
    """Clean up distributed training."""
    dist.destroy_process_group()

def train_ddp(rank, world_size):
    """Distributed training function."""
    setup(rank, world_size)
    
    # Create model and move to GPU
    model = nn.Linear(100, 10).cuda(rank)
    model = DDP(model, device_ids=[rank])
    
    # Create optimizer
    optimizer = optim.SGD(model.parameters(), lr=0.01)
    
    # Training loop
    for epoch in range(10):
        # Create dummy data
        data = torch.randn(32, 100).cuda(rank)
        targets = torch.randint(0, 10, (32,)).cuda(rank)
        
        # Forward pass
        outputs = model(data)
        loss = nn.CrossEntropyLoss()(outputs, targets)
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if rank == 0:
            print(f"Epoch {epoch}, Loss: {loss.item():.4f}")
    
    cleanup()

# To run: python -m torch.distributed.launch --nproc_per_node=2 script.py

Collective Communication

import torch
import torch.distributed as dist

def collective_example(rank, world_size):
    """Example of collective communication operations."""
    # Initialize
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    
    device = torch.device(f'cuda:{rank}')
    torch.cuda.set_device(device)
    
    # Create tensor on each process
    tensor = torch.ones(2, 2).cuda() * rank
    print(f"Rank {rank}: Before all_reduce: {tensor}")
    
    # All-reduce: sum across all processes
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
    print(f"Rank {rank}: After all_reduce: {tensor}")
    
    # Broadcast from rank 0
    broadcast_tensor = torch.zeros(2, 2).cuda()
    if rank == 0:
        broadcast_tensor = torch.ones(2, 2).cuda() * 42
    
    dist.broadcast(broadcast_tensor, src=0)
    print(f"Rank {rank}: After broadcast: {broadcast_tensor}")
    
    # All-gather: collect tensors from all processes
    tensor_list = [torch.zeros(2, 2).cuda() for _ in range(world_size)]
    local_tensor = torch.ones(2, 2).cuda() * rank
    dist.all_gather(tensor_list, local_tensor)
    print(f"Rank {rank}: All gathered tensors: {tensor_list}")
    
    # Barrier synchronization
    dist.barrier()
    print(f"Rank {rank}: All processes synchronized")
    
    dist.destroy_process_group()

MPS (Apple Silicon) Usage

import torch

# Check MPS availability
if torch.mps.is_available():
    print("MPS is available")
    device = torch.device('mps')
    
    # Create tensors on MPS
    x = torch.randn(1000, 1000, device=device)
    y = torch.randn(1000, 1000, device=device)
    
    # Perform operations
    z = torch.matmul(x, y)
    
    # Synchronize MPS operations
    torch.mps.synchronize()
    
    # Memory management
    torch.mps.empty_cache()
    
    print(f"Computation completed on device: {z.device}")
else:
    print("MPS not available, using CPU")
    device = torch.device('cpu')

Advanced Memory Management

import torch

if torch.cuda.is_available():
    device = torch.device('cuda')
    
    # Set memory fraction
    torch.cuda.set_per_process_memory_fraction(0.5)  # Use only 50% of GPU memory
    
    # Memory profiling
    torch.cuda.reset_max_memory_allocated()
    torch.cuda.reset_max_memory_cached()
    
    # Allocate large tensors
    tensors = []
    for i in range(10):
        tensor = torch.randn(1000, 1000, device=device)
        tensors.append(tensor)
        
        current_memory = torch.cuda.memory_allocated() / 1e6
        max_memory = torch.cuda.max_memory_allocated() / 1e6
        print(f"Iteration {i}: Current: {current_memory:.1f} MB, Peak: {max_memory:.1f} MB")
    
    # Memory summary
    print(torch.cuda.memory_summary())
    
    # Free memory
    del tensors
    torch.cuda.empty_cache()
    
    final_memory = torch.cuda.memory_allocated() / 1e6
    print(f"Memory after cleanup: {final_memory:.1f} MB")

Install with Tessl CLI

npx tessl i tessl/pypi-torch

docs

advanced-features.md

devices-distributed.md

index.md

mathematical-functions.md

neural-networks.md

tensor-operations.md

training.md

tile.json