0
# Device and Distributed Computing
1
2
Device management, CUDA operations, distributed training, and multi-GPU support for scaling deep learning workloads across different hardware platforms including CPU, CUDA, MPS, and XPU.
3
4
## Capabilities
5
6
### Device Management
7
8
Core device detection, selection, and management functions.
9
10
```python { .api }
11
class device:
12
"""Device specification for tensor placement."""
13
def __init__(self, device_string: str): ...
14
def __str__(self) -> str: ...
15
def __repr__(self) -> str: ...
16
17
def get_default_device() -> device:
18
"""Get the default device for new tensors."""
19
20
def set_default_device(device) -> None:
21
"""Set the default device for new tensors."""
22
23
def get_device(tensor_or_device) -> device:
24
"""Get device of tensor or validate device specification."""
25
```
26
27
### CUDA Operations (torch.cuda)
28
29
CUDA device management and GPU acceleration functions.
30
31
```python { .api }
32
def cuda.is_available() -> bool:
33
"""Check if CUDA is available."""
34
35
def cuda.device_count() -> int:
36
"""Number of available CUDA devices."""
37
38
def cuda.get_device_name(device=None) -> str:
39
"""Get name of CUDA device."""
40
41
def cuda.get_device_properties(device) -> _CudaDeviceProperties:
42
"""Get properties of CUDA device."""
43
44
def cuda.get_device_capability(device=None) -> Tuple[int, int]:
45
"""Get compute capability of device."""
46
47
def cuda.current_device() -> int:
48
"""Get current CUDA device index."""
49
50
def cuda.set_device(device) -> None:
51
"""Set current CUDA device."""
52
53
def cuda.device(device) -> ContextManager:
54
"""Context manager for device selection."""
55
56
def cuda.stream(stream=None) -> ContextManager:
57
"""Context manager for CUDA stream selection."""
58
59
def cuda.synchronize(device=None) -> None:
60
"""Synchronize all kernels on device."""
61
62
def cuda.is_initialized() -> bool:
63
"""Check if CUDA is initialized."""
64
65
def cuda.init() -> None:
66
"""Initialize CUDA."""
67
```
68
69
### CUDA Memory Management
70
71
GPU memory allocation, caching, and profiling.
72
73
```python { .api }
74
def cuda.empty_cache() -> None:
75
"""Free unused cached memory."""
76
77
def cuda.memory_allocated(device=None) -> int:
78
"""Get currently allocated memory in bytes."""
79
80
def cuda.max_memory_allocated(device=None) -> int:
81
"""Get peak allocated memory in bytes."""
82
83
def cuda.memory_reserved(device=None) -> int:
84
"""Get currently reserved memory in bytes."""
85
86
def cuda.max_memory_reserved(device=None) -> int:
87
"""Get peak reserved memory in bytes."""
88
89
def cuda.memory_cached(device=None) -> int:
90
"""Get currently cached memory in bytes."""
91
92
def cuda.max_memory_cached(device=None) -> int:
93
"""Get peak cached memory in bytes."""
94
95
def cuda.reset_max_memory_allocated(device=None) -> None:
96
"""Reset peak memory stats."""
97
98
def cuda.reset_max_memory_cached(device=None) -> None:
99
"""Reset peak cache stats."""
100
101
def cuda.memory_stats(device=None) -> Dict[str, Any]:
102
"""Get comprehensive memory statistics."""
103
104
def cuda.memory_summary(device=None, abbreviated=False) -> str:
105
"""Get human-readable memory summary."""
106
107
def cuda.memory_snapshot() -> List[Dict[str, Any]]:
108
"""Get detailed memory snapshot."""
109
110
def cuda.set_per_process_memory_fraction(fraction: float, device=None) -> None:
111
"""Set memory fraction for process."""
112
113
def cuda.get_per_process_memory_fraction(device=None) -> float:
114
"""Get memory fraction for process."""
115
```
116
117
### CUDA Streams and Events
118
119
Asynchronous execution control for GPU operations.
120
121
```python { .api }
122
class cuda.Stream:
123
"""CUDA stream for asynchronous operations."""
124
def __init__(self, device=None, priority=0): ...
125
def wait_event(self, event): ...
126
def wait_stream(self, stream): ...
127
def record_event(self, event=None): ...
128
def query(self) -> bool: ...
129
def synchronize(self): ...
130
131
class cuda.Event:
132
"""CUDA event for synchronization."""
133
def __init__(self, enable_timing=False, blocking=False, interprocess=False): ...
134
def record(self, stream=None): ...
135
def wait(self, stream=None): ...
136
def query(self) -> bool: ...
137
def synchronize(self): ...
138
def elapsed_time(self, event) -> float: ...
139
140
def cuda.current_stream(device=None) -> cuda.Stream:
141
"""Get current CUDA stream."""
142
143
def cuda.default_stream(device=None) -> cuda.Stream:
144
"""Get default CUDA stream."""
145
146
def cuda.set_stream(stream) -> None:
147
"""Set current CUDA stream."""
148
```
149
150
### CUDA Random Number Generation
151
152
GPU random number generation functions.
153
154
```python { .api }
155
def cuda.manual_seed(seed: int) -> None:
156
"""Set CUDA random seed."""
157
158
def cuda.manual_seed_all(seed: int) -> None:
159
"""Set CUDA random seed for all devices."""
160
161
def cuda.seed() -> None:
162
"""Generate random CUDA seed."""
163
164
def cuda.seed_all() -> None:
165
"""Generate random CUDA seed for all devices."""
166
167
def cuda.initial_seed() -> int:
168
"""Get initial CUDA random seed."""
169
170
def cuda.get_rng_state(device='cuda') -> Tensor:
171
"""Get CUDA random number generator state."""
172
173
def cuda.get_rng_state_all() -> List[Tensor]:
174
"""Get CUDA RNG state for all devices."""
175
176
def cuda.set_rng_state(new_state: Tensor, device='cuda') -> None:
177
"""Set CUDA random number generator state."""
178
179
def cuda.set_rng_state_all(new_states: List[Tensor]) -> None:
180
"""Set CUDA RNG state for all devices."""
181
```
182
183
### MPS Operations (torch.mps)
184
185
Metal Performance Shaders for Apple Silicon GPU acceleration.
186
187
```python { .api }
188
def mps.is_available() -> bool:
189
"""Check if MPS is available."""
190
191
def mps.is_built() -> bool:
192
"""Check if PyTorch was built with MPS support."""
193
194
def mps.get_default_generator() -> Generator:
195
"""Get default MPS random number generator."""
196
197
def mps.manual_seed(seed: int) -> None:
198
"""Set MPS random seed."""
199
200
def mps.seed() -> None:
201
"""Generate random MPS seed."""
202
203
def mps.synchronize() -> None:
204
"""Synchronize MPS operations."""
205
206
def mps.empty_cache() -> None:
207
"""Free unused MPS memory."""
208
209
def mps.set_per_process_memory_fraction(fraction: float) -> None:
210
"""Set MPS memory fraction."""
211
212
class mps.Event:
213
"""MPS event for synchronization."""
214
def __init__(self): ...
215
def query(self) -> bool: ...
216
def synchronize(self): ...
217
def wait(self): ...
218
```
219
220
### XPU Operations (torch.xpu)
221
222
Intel XPU backend support for Intel GPUs.
223
224
```python { .api }
225
def xpu.is_available() -> bool:
226
"""Check if XPU is available."""
227
228
def xpu.device_count() -> int:
229
"""Number of available XPU devices."""
230
231
def xpu.get_device_name(device=None) -> str:
232
"""Get name of XPU device."""
233
234
def xpu.current_device() -> int:
235
"""Get current XPU device index."""
236
237
def xpu.set_device(device) -> None:
238
"""Set current XPU device."""
239
240
def xpu.synchronize(device=None) -> None:
241
"""Synchronize XPU operations."""
242
243
def xpu.empty_cache() -> None:
244
"""Free unused XPU memory."""
245
```
246
247
### Distributed Computing (torch.distributed)
248
249
Distributed training and multi-process communication.
250
251
```python { .api }
252
def distributed.init_process_group(backend: str, init_method=None, timeout=default_pg_timeout,
253
world_size=-1, rank=-1, store=None, group_name='', pg_options=None) -> None:
254
"""Initialize distributed process group."""
255
256
def distributed.destroy_process_group(group=None) -> None:
257
"""Destroy process group."""
258
259
def distributed.get_rank(group=None) -> int:
260
"""Get rank of current process."""
261
262
def distributed.get_world_size(group=None) -> int:
263
"""Get number of processes in group."""
264
265
def distributed.is_available() -> bool:
266
"""Check if distributed package is available."""
267
268
def distributed.is_initialized() -> bool:
269
"""Check if distributed process group is initialized."""
270
271
def distributed.is_mpi_available() -> bool:
272
"""Check if MPI backend is available."""
273
274
def distributed.is_nccl_available() -> bool:
275
"""Check if NCCL backend is available."""
276
277
def distributed.is_gloo_available() -> bool:
278
"""Check if Gloo backend is available."""
279
280
def distributed.is_torchelastic_launched() -> bool:
281
"""Check if launched with TorchElastic."""
282
283
def distributed.get_backend(group=None) -> str:
284
"""Get backend of process group."""
285
286
def distributed.barrier(group=None, async_op=False) -> Optional[Work]:
287
"""Synchronize all processes."""
288
```
289
290
### Collective Communication Operations
291
292
Distributed communication primitives for multi-GPU training.
293
294
```python { .api }
295
def distributed.broadcast(tensor: Tensor, src: int, group=None, async_op=False) -> Optional[Work]:
296
"""Broadcast tensor from source to all processes."""
297
298
def distributed.all_reduce(tensor: Tensor, op=ReduceOp.SUM, group=None, async_op=False) -> Optional[Work]:
299
"""Reduce tensor across all processes."""
300
301
def distributed.reduce(tensor: Tensor, dst: int, op=ReduceOp.SUM, group=None, async_op=False) -> Optional[Work]:
302
"""Reduce tensor to destination process."""
303
304
def distributed.all_gather(tensor_list: List[Tensor], tensor: Tensor, group=None, async_op=False) -> Optional[Work]:
305
"""Gather tensors from all processes."""
306
307
def distributed.gather(tensor: Tensor, gather_list=None, dst=0, group=None, async_op=False) -> Optional[Work]:
308
"""Gather tensors to destination process."""
309
310
def distributed.scatter(tensor: Tensor, scatter_list=None, src=0, group=None, async_op=False) -> Optional[Work]:
311
"""Scatter tensors from source process."""
312
313
def distributed.reduce_scatter(output: Tensor, input_list: List[Tensor], op=ReduceOp.SUM, group=None, async_op=False) -> Optional[Work]:
314
"""Reduce and scatter tensors."""
315
316
def distributed.all_to_all(output_tensor_list: List[Tensor], input_tensor_list: List[Tensor], group=None, async_op=False) -> Optional[Work]:
317
"""All-to-all communication."""
318
319
def distributed.send(tensor: Tensor, dst: int, group=None, tag=0) -> None:
320
"""Send tensor to destination process."""
321
322
def distributed.recv(tensor: Tensor, src: int, group=None, tag=0) -> None:
323
"""Receive tensor from source process."""
324
325
def distributed.isend(tensor: Tensor, dst: int, group=None, tag=0) -> Work:
326
"""Non-blocking send."""
327
328
def distributed.irecv(tensor: Tensor, src: int, group=None, tag=0) -> Work:
329
"""Non-blocking receive."""
330
```
331
332
### Data Parallel Training
333
334
Distributed data parallel training utilities.
335
336
```python { .api }
337
class nn.DataParallel(Module):
338
"""Data parallel wrapper for single-machine multi-GPU."""
339
def __init__(self, module, device_ids=None, output_device=None, dim=0): ...
340
def forward(self, *inputs, **kwargs): ...
341
342
class nn.parallel.DistributedDataParallel(Module):
343
"""Distributed data parallel for multi-machine training."""
344
def __init__(self, module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True,
345
process_group=None, bucket_cap_mb=25, find_unused_parameters=False,
346
check_reduction=False, gradient_as_bucket_view=False): ...
347
def forward(self, *inputs, **kwargs): ...
348
def no_sync(self) -> ContextManager: ...
349
```
350
351
### Process Groups
352
353
Advanced process group management for flexible distributed training.
354
355
```python { .api }
356
class distributed.ProcessGroup:
357
"""Process group for collective operations."""
358
359
def distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None) -> ProcessGroup:
360
"""Create new process group."""
361
362
def distributed.new_subgroups(group_size=None, group=None, timeout=None, backend=None, pg_options=None) -> List[ProcessGroup]:
363
"""Create subgroups."""
364
365
def distributed.new_subgroups_by_enumeration(ranks_per_subgroup_list, timeout=None, backend=None, pg_options=None) -> List[ProcessGroup]:
366
"""Create subgroups by enumeration."""
367
```
368
369
### Distributed Utilities
370
371
Additional utilities for distributed training.
372
373
```python { .api }
374
def distributed.get_process_group_ranks(group) -> List[int]:
375
"""Get ranks in process group."""
376
377
def distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False) -> None:
378
"""Barrier with monitoring and timeout."""
379
380
class distributed.Store:
381
"""Distributed key-value store."""
382
def get(self, key: str) -> bytes: ...
383
def set(self, key: str, value: bytes): ...
384
def add(self, key: str, value: int) -> int: ...
385
def compare_set(self, key: str, expected_value: bytes, desired_value: bytes) -> bytes: ...
386
def wait(self, keys: List[str], timeout=None): ...
387
388
class distributed.TCPStore(Store):
389
"""TCP-based distributed store."""
390
def __init__(self, host_name: str, port: int, world_size=None, is_master=False, timeout=None): ...
391
392
class distributed.FileStore(Store):
393
"""File-based distributed store."""
394
def __init__(self, file_name: str, world_size=-1): ...
395
396
class distributed.HashStore(Store):
397
"""Hash-based distributed store."""
398
def __init__(self): ...
399
```
400
401
## Usage Examples
402
403
### Basic CUDA Operations
404
405
```python
406
import torch
407
408
# Check CUDA availability
409
if torch.cuda.is_available():
410
print(f"CUDA devices: {torch.cuda.device_count()}")
411
print(f"Current device: {torch.cuda.current_device()}")
412
print(f"Device name: {torch.cuda.get_device_name()}")
413
414
# Create tensors on GPU
415
device = torch.device('cuda')
416
x = torch.randn(1000, 1000, device=device)
417
y = torch.randn(1000, 1000, device=device)
418
419
# GPU operations
420
z = torch.matmul(x, y)
421
422
# Memory management
423
print(f"Allocated memory: {torch.cuda.memory_allocated() / 1e6:.1f} MB")
424
print(f"Cached memory: {torch.cuda.memory_reserved() / 1e6:.1f} MB")
425
426
# Free unused memory
427
torch.cuda.empty_cache()
428
429
# Move back to CPU
430
z_cpu = z.cpu()
431
else:
432
print("CUDA not available")
433
```
434
435
### Multi-GPU Data Parallel
436
437
```python
438
import torch
439
import torch.nn as nn
440
441
# Check for multiple GPUs
442
if torch.cuda.device_count() > 1:
443
print(f"Using {torch.cuda.device_count()} GPUs")
444
445
# Define model
446
model = nn.Sequential(
447
nn.Linear(1000, 500),
448
nn.ReLU(),
449
nn.Linear(500, 100),
450
nn.ReLU(),
451
nn.Linear(100, 10)
452
)
453
454
# Wrap with DataParallel
455
model = nn.DataParallel(model)
456
model = model.cuda()
457
458
# Create batch data
459
batch_size = 64
460
x = torch.randn(batch_size, 1000).cuda()
461
462
# Forward pass uses all available GPUs
463
output = model(x)
464
print(f"Output shape: {output.shape}")
465
print(f"Output device: {output.device}")
466
```
467
468
### CUDA Streams and Events
469
470
```python
471
import torch
472
import time
473
474
if torch.cuda.is_available():
475
device = torch.device('cuda')
476
477
# Create streams
478
stream1 = torch.cuda.Stream()
479
stream2 = torch.cuda.Stream()
480
481
# Create events
482
start_event = torch.cuda.Event(enable_timing=True)
483
end_event = torch.cuda.Event(enable_timing=True)
484
485
# Asynchronous operations
486
x = torch.randn(1000, 1000, device=device)
487
y = torch.randn(1000, 1000, device=device)
488
489
# Record start time
490
start_event.record()
491
492
# Operations on different streams
493
with torch.cuda.stream(stream1):
494
z1 = torch.matmul(x, y)
495
496
with torch.cuda.stream(stream2):
497
z2 = torch.matmul(y, x)
498
499
# Record end time
500
end_event.record()
501
502
# Synchronize
503
torch.cuda.synchronize()
504
505
# Get elapsed time
506
elapsed_time = start_event.elapsed_time(end_event)
507
print(f"Elapsed time: {elapsed_time:.2f} ms")
508
```
509
510
### Distributed Data Parallel Training
511
512
```python
513
import torch
514
import torch.distributed as dist
515
import torch.nn as nn
516
import torch.optim as optim
517
from torch.nn.parallel import DistributedDataParallel as DDP
518
import os
519
520
def setup(rank, world_size):
521
"""Initialize distributed training."""
522
os.environ['MASTER_ADDR'] = 'localhost'
523
os.environ['MASTER_PORT'] = '12355'
524
525
# Initialize process group
526
dist.init_process_group("nccl", rank=rank, world_size=world_size)
527
torch.cuda.set_device(rank)
528
529
def cleanup():
530
"""Clean up distributed training."""
531
dist.destroy_process_group()
532
533
def train_ddp(rank, world_size):
534
"""Distributed training function."""
535
setup(rank, world_size)
536
537
# Create model and move to GPU
538
model = nn.Linear(100, 10).cuda(rank)
539
model = DDP(model, device_ids=[rank])
540
541
# Create optimizer
542
optimizer = optim.SGD(model.parameters(), lr=0.01)
543
544
# Training loop
545
for epoch in range(10):
546
# Create dummy data
547
data = torch.randn(32, 100).cuda(rank)
548
targets = torch.randint(0, 10, (32,)).cuda(rank)
549
550
# Forward pass
551
outputs = model(data)
552
loss = nn.CrossEntropyLoss()(outputs, targets)
553
554
# Backward pass
555
optimizer.zero_grad()
556
loss.backward()
557
optimizer.step()
558
559
if rank == 0:
560
print(f"Epoch {epoch}, Loss: {loss.item():.4f}")
561
562
cleanup()
563
564
# To run: python -m torch.distributed.launch --nproc_per_node=2 script.py
565
```
566
567
### Collective Communication
568
569
```python
570
import torch
571
import torch.distributed as dist
572
573
def collective_example(rank, world_size):
574
"""Example of collective communication operations."""
575
# Initialize
576
dist.init_process_group("nccl", rank=rank, world_size=world_size)
577
578
device = torch.device(f'cuda:{rank}')
579
torch.cuda.set_device(device)
580
581
# Create tensor on each process
582
tensor = torch.ones(2, 2).cuda() * rank
583
print(f"Rank {rank}: Before all_reduce: {tensor}")
584
585
# All-reduce: sum across all processes
586
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
587
print(f"Rank {rank}: After all_reduce: {tensor}")
588
589
# Broadcast from rank 0
590
broadcast_tensor = torch.zeros(2, 2).cuda()
591
if rank == 0:
592
broadcast_tensor = torch.ones(2, 2).cuda() * 42
593
594
dist.broadcast(broadcast_tensor, src=0)
595
print(f"Rank {rank}: After broadcast: {broadcast_tensor}")
596
597
# All-gather: collect tensors from all processes
598
tensor_list = [torch.zeros(2, 2).cuda() for _ in range(world_size)]
599
local_tensor = torch.ones(2, 2).cuda() * rank
600
dist.all_gather(tensor_list, local_tensor)
601
print(f"Rank {rank}: All gathered tensors: {tensor_list}")
602
603
# Barrier synchronization
604
dist.barrier()
605
print(f"Rank {rank}: All processes synchronized")
606
607
dist.destroy_process_group()
608
```
609
610
### MPS (Apple Silicon) Usage
611
612
```python
613
import torch
614
615
# Check MPS availability
616
if torch.mps.is_available():
617
print("MPS is available")
618
device = torch.device('mps')
619
620
# Create tensors on MPS
621
x = torch.randn(1000, 1000, device=device)
622
y = torch.randn(1000, 1000, device=device)
623
624
# Perform operations
625
z = torch.matmul(x, y)
626
627
# Synchronize MPS operations
628
torch.mps.synchronize()
629
630
# Memory management
631
torch.mps.empty_cache()
632
633
print(f"Computation completed on device: {z.device}")
634
else:
635
print("MPS not available, using CPU")
636
device = torch.device('cpu')
637
```
638
639
### Advanced Memory Management
640
641
```python
642
import torch
643
644
if torch.cuda.is_available():
645
device = torch.device('cuda')
646
647
# Set memory fraction
648
torch.cuda.set_per_process_memory_fraction(0.5) # Use only 50% of GPU memory
649
650
# Memory profiling
651
torch.cuda.reset_max_memory_allocated()
652
torch.cuda.reset_max_memory_cached()
653
654
# Allocate large tensors
655
tensors = []
656
for i in range(10):
657
tensor = torch.randn(1000, 1000, device=device)
658
tensors.append(tensor)
659
660
current_memory = torch.cuda.memory_allocated() / 1e6
661
max_memory = torch.cuda.max_memory_allocated() / 1e6
662
print(f"Iteration {i}: Current: {current_memory:.1f} MB, Peak: {max_memory:.1f} MB")
663
664
# Memory summary
665
print(torch.cuda.memory_summary())
666
667
# Free memory
668
del tensors
669
torch.cuda.empty_cache()
670
671
final_memory = torch.cuda.memory_allocated() / 1e6
672
print(f"Memory after cleanup: {final_memory:.1f} MB")
673
```