Deep learning framework providing tensor computation with GPU acceleration and dynamic neural networks with automatic differentiation
Device management, CUDA operations, distributed training, and multi-GPU support for scaling deep learning workloads across different hardware platforms including CPU, CUDA, MPS, and XPU.
Core device detection, selection, and management functions.
class device:
"""Device specification for tensor placement."""
def __init__(self, device_string: str): ...
def __str__(self) -> str: ...
def __repr__(self) -> str: ...
def get_default_device() -> device:
"""Get the default device for new tensors."""
def set_default_device(device) -> None:
"""Set the default device for new tensors."""
def get_device(tensor_or_device) -> device:
"""Get device of tensor or validate device specification."""CUDA device management and GPU acceleration functions.
def cuda.is_available() -> bool:
"""Check if CUDA is available."""
def cuda.device_count() -> int:
"""Number of available CUDA devices."""
def cuda.get_device_name(device=None) -> str:
"""Get name of CUDA device."""
def cuda.get_device_properties(device) -> _CudaDeviceProperties:
"""Get properties of CUDA device."""
def cuda.get_device_capability(device=None) -> Tuple[int, int]:
"""Get compute capability of device."""
def cuda.current_device() -> int:
"""Get current CUDA device index."""
def cuda.set_device(device) -> None:
"""Set current CUDA device."""
def cuda.device(device) -> ContextManager:
"""Context manager for device selection."""
def cuda.stream(stream=None) -> ContextManager:
"""Context manager for CUDA stream selection."""
def cuda.synchronize(device=None) -> None:
"""Synchronize all kernels on device."""
def cuda.is_initialized() -> bool:
"""Check if CUDA is initialized."""
def cuda.init() -> None:
"""Initialize CUDA."""GPU memory allocation, caching, and profiling.
def cuda.empty_cache() -> None:
"""Free unused cached memory."""
def cuda.memory_allocated(device=None) -> int:
"""Get currently allocated memory in bytes."""
def cuda.max_memory_allocated(device=None) -> int:
"""Get peak allocated memory in bytes."""
def cuda.memory_reserved(device=None) -> int:
"""Get currently reserved memory in bytes."""
def cuda.max_memory_reserved(device=None) -> int:
"""Get peak reserved memory in bytes."""
def cuda.memory_cached(device=None) -> int:
"""Get currently cached memory in bytes."""
def cuda.max_memory_cached(device=None) -> int:
"""Get peak cached memory in bytes."""
def cuda.reset_max_memory_allocated(device=None) -> None:
"""Reset peak memory stats."""
def cuda.reset_max_memory_cached(device=None) -> None:
"""Reset peak cache stats."""
def cuda.memory_stats(device=None) -> Dict[str, Any]:
"""Get comprehensive memory statistics."""
def cuda.memory_summary(device=None, abbreviated=False) -> str:
"""Get human-readable memory summary."""
def cuda.memory_snapshot() -> List[Dict[str, Any]]:
"""Get detailed memory snapshot."""
def cuda.set_per_process_memory_fraction(fraction: float, device=None) -> None:
"""Set memory fraction for process."""
def cuda.get_per_process_memory_fraction(device=None) -> float:
"""Get memory fraction for process."""Asynchronous execution control for GPU operations.
class cuda.Stream:
"""CUDA stream for asynchronous operations."""
def __init__(self, device=None, priority=0): ...
def wait_event(self, event): ...
def wait_stream(self, stream): ...
def record_event(self, event=None): ...
def query(self) -> bool: ...
def synchronize(self): ...
class cuda.Event:
"""CUDA event for synchronization."""
def __init__(self, enable_timing=False, blocking=False, interprocess=False): ...
def record(self, stream=None): ...
def wait(self, stream=None): ...
def query(self) -> bool: ...
def synchronize(self): ...
def elapsed_time(self, event) -> float: ...
def cuda.current_stream(device=None) -> cuda.Stream:
"""Get current CUDA stream."""
def cuda.default_stream(device=None) -> cuda.Stream:
"""Get default CUDA stream."""
def cuda.set_stream(stream) -> None:
"""Set current CUDA stream."""GPU random number generation functions.
def cuda.manual_seed(seed: int) -> None:
"""Set CUDA random seed."""
def cuda.manual_seed_all(seed: int) -> None:
"""Set CUDA random seed for all devices."""
def cuda.seed() -> None:
"""Generate random CUDA seed."""
def cuda.seed_all() -> None:
"""Generate random CUDA seed for all devices."""
def cuda.initial_seed() -> int:
"""Get initial CUDA random seed."""
def cuda.get_rng_state(device='cuda') -> Tensor:
"""Get CUDA random number generator state."""
def cuda.get_rng_state_all() -> List[Tensor]:
"""Get CUDA RNG state for all devices."""
def cuda.set_rng_state(new_state: Tensor, device='cuda') -> None:
"""Set CUDA random number generator state."""
def cuda.set_rng_state_all(new_states: List[Tensor]) -> None:
"""Set CUDA RNG state for all devices."""Metal Performance Shaders for Apple Silicon GPU acceleration.
def mps.is_available() -> bool:
"""Check if MPS is available."""
def mps.is_built() -> bool:
"""Check if PyTorch was built with MPS support."""
def mps.get_default_generator() -> Generator:
"""Get default MPS random number generator."""
def mps.manual_seed(seed: int) -> None:
"""Set MPS random seed."""
def mps.seed() -> None:
"""Generate random MPS seed."""
def mps.synchronize() -> None:
"""Synchronize MPS operations."""
def mps.empty_cache() -> None:
"""Free unused MPS memory."""
def mps.set_per_process_memory_fraction(fraction: float) -> None:
"""Set MPS memory fraction."""
class mps.Event:
"""MPS event for synchronization."""
def __init__(self): ...
def query(self) -> bool: ...
def synchronize(self): ...
def wait(self): ...Intel XPU backend support for Intel GPUs.
def xpu.is_available() -> bool:
"""Check if XPU is available."""
def xpu.device_count() -> int:
"""Number of available XPU devices."""
def xpu.get_device_name(device=None) -> str:
"""Get name of XPU device."""
def xpu.current_device() -> int:
"""Get current XPU device index."""
def xpu.set_device(device) -> None:
"""Set current XPU device."""
def xpu.synchronize(device=None) -> None:
"""Synchronize XPU operations."""
def xpu.empty_cache() -> None:
"""Free unused XPU memory."""Distributed training and multi-process communication.
def distributed.init_process_group(backend: str, init_method=None, timeout=default_pg_timeout,
world_size=-1, rank=-1, store=None, group_name='', pg_options=None) -> None:
"""Initialize distributed process group."""
def distributed.destroy_process_group(group=None) -> None:
"""Destroy process group."""
def distributed.get_rank(group=None) -> int:
"""Get rank of current process."""
def distributed.get_world_size(group=None) -> int:
"""Get number of processes in group."""
def distributed.is_available() -> bool:
"""Check if distributed package is available."""
def distributed.is_initialized() -> bool:
"""Check if distributed process group is initialized."""
def distributed.is_mpi_available() -> bool:
"""Check if MPI backend is available."""
def distributed.is_nccl_available() -> bool:
"""Check if NCCL backend is available."""
def distributed.is_gloo_available() -> bool:
"""Check if Gloo backend is available."""
def distributed.is_torchelastic_launched() -> bool:
"""Check if launched with TorchElastic."""
def distributed.get_backend(group=None) -> str:
"""Get backend of process group."""
def distributed.barrier(group=None, async_op=False) -> Optional[Work]:
"""Synchronize all processes."""Distributed communication primitives for multi-GPU training.
def distributed.broadcast(tensor: Tensor, src: int, group=None, async_op=False) -> Optional[Work]:
"""Broadcast tensor from source to all processes."""
def distributed.all_reduce(tensor: Tensor, op=ReduceOp.SUM, group=None, async_op=False) -> Optional[Work]:
"""Reduce tensor across all processes."""
def distributed.reduce(tensor: Tensor, dst: int, op=ReduceOp.SUM, group=None, async_op=False) -> Optional[Work]:
"""Reduce tensor to destination process."""
def distributed.all_gather(tensor_list: List[Tensor], tensor: Tensor, group=None, async_op=False) -> Optional[Work]:
"""Gather tensors from all processes."""
def distributed.gather(tensor: Tensor, gather_list=None, dst=0, group=None, async_op=False) -> Optional[Work]:
"""Gather tensors to destination process."""
def distributed.scatter(tensor: Tensor, scatter_list=None, src=0, group=None, async_op=False) -> Optional[Work]:
"""Scatter tensors from source process."""
def distributed.reduce_scatter(output: Tensor, input_list: List[Tensor], op=ReduceOp.SUM, group=None, async_op=False) -> Optional[Work]:
"""Reduce and scatter tensors."""
def distributed.all_to_all(output_tensor_list: List[Tensor], input_tensor_list: List[Tensor], group=None, async_op=False) -> Optional[Work]:
"""All-to-all communication."""
def distributed.send(tensor: Tensor, dst: int, group=None, tag=0) -> None:
"""Send tensor to destination process."""
def distributed.recv(tensor: Tensor, src: int, group=None, tag=0) -> None:
"""Receive tensor from source process."""
def distributed.isend(tensor: Tensor, dst: int, group=None, tag=0) -> Work:
"""Non-blocking send."""
def distributed.irecv(tensor: Tensor, src: int, group=None, tag=0) -> Work:
"""Non-blocking receive."""Distributed data parallel training utilities.
class nn.DataParallel(Module):
"""Data parallel wrapper for single-machine multi-GPU."""
def __init__(self, module, device_ids=None, output_device=None, dim=0): ...
def forward(self, *inputs, **kwargs): ...
class nn.parallel.DistributedDataParallel(Module):
"""Distributed data parallel for multi-machine training."""
def __init__(self, module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True,
process_group=None, bucket_cap_mb=25, find_unused_parameters=False,
check_reduction=False, gradient_as_bucket_view=False): ...
def forward(self, *inputs, **kwargs): ...
def no_sync(self) -> ContextManager: ...Advanced process group management for flexible distributed training.
class distributed.ProcessGroup:
"""Process group for collective operations."""
def distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None) -> ProcessGroup:
"""Create new process group."""
def distributed.new_subgroups(group_size=None, group=None, timeout=None, backend=None, pg_options=None) -> List[ProcessGroup]:
"""Create subgroups."""
def distributed.new_subgroups_by_enumeration(ranks_per_subgroup_list, timeout=None, backend=None, pg_options=None) -> List[ProcessGroup]:
"""Create subgroups by enumeration."""Additional utilities for distributed training.
def distributed.get_process_group_ranks(group) -> List[int]:
"""Get ranks in process group."""
def distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False) -> None:
"""Barrier with monitoring and timeout."""
class distributed.Store:
"""Distributed key-value store."""
def get(self, key: str) -> bytes: ...
def set(self, key: str, value: bytes): ...
def add(self, key: str, value: int) -> int: ...
def compare_set(self, key: str, expected_value: bytes, desired_value: bytes) -> bytes: ...
def wait(self, keys: List[str], timeout=None): ...
class distributed.TCPStore(Store):
"""TCP-based distributed store."""
def __init__(self, host_name: str, port: int, world_size=None, is_master=False, timeout=None): ...
class distributed.FileStore(Store):
"""File-based distributed store."""
def __init__(self, file_name: str, world_size=-1): ...
class distributed.HashStore(Store):
"""Hash-based distributed store."""
def __init__(self): ...import torch
# Check CUDA availability
if torch.cuda.is_available():
print(f"CUDA devices: {torch.cuda.device_count()}")
print(f"Current device: {torch.cuda.current_device()}")
print(f"Device name: {torch.cuda.get_device_name()}")
# Create tensors on GPU
device = torch.device('cuda')
x = torch.randn(1000, 1000, device=device)
y = torch.randn(1000, 1000, device=device)
# GPU operations
z = torch.matmul(x, y)
# Memory management
print(f"Allocated memory: {torch.cuda.memory_allocated() / 1e6:.1f} MB")
print(f"Cached memory: {torch.cuda.memory_reserved() / 1e6:.1f} MB")
# Free unused memory
torch.cuda.empty_cache()
# Move back to CPU
z_cpu = z.cpu()
else:
print("CUDA not available")import torch
import torch.nn as nn
# Check for multiple GPUs
if torch.cuda.device_count() > 1:
print(f"Using {torch.cuda.device_count()} GPUs")
# Define model
model = nn.Sequential(
nn.Linear(1000, 500),
nn.ReLU(),
nn.Linear(500, 100),
nn.ReLU(),
nn.Linear(100, 10)
)
# Wrap with DataParallel
model = nn.DataParallel(model)
model = model.cuda()
# Create batch data
batch_size = 64
x = torch.randn(batch_size, 1000).cuda()
# Forward pass uses all available GPUs
output = model(x)
print(f"Output shape: {output.shape}")
print(f"Output device: {output.device}")import torch
import time
if torch.cuda.is_available():
device = torch.device('cuda')
# Create streams
stream1 = torch.cuda.Stream()
stream2 = torch.cuda.Stream()
# Create events
start_event = torch.cuda.Event(enable_timing=True)
end_event = torch.cuda.Event(enable_timing=True)
# Asynchronous operations
x = torch.randn(1000, 1000, device=device)
y = torch.randn(1000, 1000, device=device)
# Record start time
start_event.record()
# Operations on different streams
with torch.cuda.stream(stream1):
z1 = torch.matmul(x, y)
with torch.cuda.stream(stream2):
z2 = torch.matmul(y, x)
# Record end time
end_event.record()
# Synchronize
torch.cuda.synchronize()
# Get elapsed time
elapsed_time = start_event.elapsed_time(end_event)
print(f"Elapsed time: {elapsed_time:.2f} ms")import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
import os
def setup(rank, world_size):
"""Initialize distributed training."""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# Initialize process group
dist.init_process_group("nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
def cleanup():
"""Clean up distributed training."""
dist.destroy_process_group()
def train_ddp(rank, world_size):
"""Distributed training function."""
setup(rank, world_size)
# Create model and move to GPU
model = nn.Linear(100, 10).cuda(rank)
model = DDP(model, device_ids=[rank])
# Create optimizer
optimizer = optim.SGD(model.parameters(), lr=0.01)
# Training loop
for epoch in range(10):
# Create dummy data
data = torch.randn(32, 100).cuda(rank)
targets = torch.randint(0, 10, (32,)).cuda(rank)
# Forward pass
outputs = model(data)
loss = nn.CrossEntropyLoss()(outputs, targets)
# Backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()
if rank == 0:
print(f"Epoch {epoch}, Loss: {loss.item():.4f}")
cleanup()
# To run: python -m torch.distributed.launch --nproc_per_node=2 script.pyimport torch
import torch.distributed as dist
def collective_example(rank, world_size):
"""Example of collective communication operations."""
# Initialize
dist.init_process_group("nccl", rank=rank, world_size=world_size)
device = torch.device(f'cuda:{rank}')
torch.cuda.set_device(device)
# Create tensor on each process
tensor = torch.ones(2, 2).cuda() * rank
print(f"Rank {rank}: Before all_reduce: {tensor}")
# All-reduce: sum across all processes
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
print(f"Rank {rank}: After all_reduce: {tensor}")
# Broadcast from rank 0
broadcast_tensor = torch.zeros(2, 2).cuda()
if rank == 0:
broadcast_tensor = torch.ones(2, 2).cuda() * 42
dist.broadcast(broadcast_tensor, src=0)
print(f"Rank {rank}: After broadcast: {broadcast_tensor}")
# All-gather: collect tensors from all processes
tensor_list = [torch.zeros(2, 2).cuda() for _ in range(world_size)]
local_tensor = torch.ones(2, 2).cuda() * rank
dist.all_gather(tensor_list, local_tensor)
print(f"Rank {rank}: All gathered tensors: {tensor_list}")
# Barrier synchronization
dist.barrier()
print(f"Rank {rank}: All processes synchronized")
dist.destroy_process_group()import torch
# Check MPS availability
if torch.mps.is_available():
print("MPS is available")
device = torch.device('mps')
# Create tensors on MPS
x = torch.randn(1000, 1000, device=device)
y = torch.randn(1000, 1000, device=device)
# Perform operations
z = torch.matmul(x, y)
# Synchronize MPS operations
torch.mps.synchronize()
# Memory management
torch.mps.empty_cache()
print(f"Computation completed on device: {z.device}")
else:
print("MPS not available, using CPU")
device = torch.device('cpu')import torch
if torch.cuda.is_available():
device = torch.device('cuda')
# Set memory fraction
torch.cuda.set_per_process_memory_fraction(0.5) # Use only 50% of GPU memory
# Memory profiling
torch.cuda.reset_max_memory_allocated()
torch.cuda.reset_max_memory_cached()
# Allocate large tensors
tensors = []
for i in range(10):
tensor = torch.randn(1000, 1000, device=device)
tensors.append(tensor)
current_memory = torch.cuda.memory_allocated() / 1e6
max_memory = torch.cuda.max_memory_allocated() / 1e6
print(f"Iteration {i}: Current: {current_memory:.1f} MB, Peak: {max_memory:.1f} MB")
# Memory summary
print(torch.cuda.memory_summary())
# Free memory
del tensors
torch.cuda.empty_cache()
final_memory = torch.cuda.memory_allocated() / 1e6
print(f"Memory after cleanup: {final_memory:.1f} MB")Install with Tessl CLI
npx tessl i tessl/pypi-torch