or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-features.mddevices-distributed.mdindex.mdmathematical-functions.mdneural-networks.mdtensor-operations.mdtraining.md

devices-distributed.mddocs/

0

# Device and Distributed Computing

1

2

Device management, CUDA operations, distributed training, and multi-GPU support for scaling deep learning workloads across different hardware platforms including CPU, CUDA, MPS, and XPU.

3

4

## Capabilities

5

6

### Device Management

7

8

Core device detection, selection, and management functions.

9

10

```python { .api }

11

class device:

12

"""Device specification for tensor placement."""

13

def __init__(self, device_string: str): ...

14

def __str__(self) -> str: ...

15

def __repr__(self) -> str: ...

16

17

def get_default_device() -> device:

18

"""Get the default device for new tensors."""

19

20

def set_default_device(device) -> None:

21

"""Set the default device for new tensors."""

22

23

def get_device(tensor_or_device) -> device:

24

"""Get device of tensor or validate device specification."""

25

```

26

27

### CUDA Operations (torch.cuda)

28

29

CUDA device management and GPU acceleration functions.

30

31

```python { .api }

32

def cuda.is_available() -> bool:

33

"""Check if CUDA is available."""

34

35

def cuda.device_count() -> int:

36

"""Number of available CUDA devices."""

37

38

def cuda.get_device_name(device=None) -> str:

39

"""Get name of CUDA device."""

40

41

def cuda.get_device_properties(device) -> _CudaDeviceProperties:

42

"""Get properties of CUDA device."""

43

44

def cuda.get_device_capability(device=None) -> Tuple[int, int]:

45

"""Get compute capability of device."""

46

47

def cuda.current_device() -> int:

48

"""Get current CUDA device index."""

49

50

def cuda.set_device(device) -> None:

51

"""Set current CUDA device."""

52

53

def cuda.device(device) -> ContextManager:

54

"""Context manager for device selection."""

55

56

def cuda.stream(stream=None) -> ContextManager:

57

"""Context manager for CUDA stream selection."""

58

59

def cuda.synchronize(device=None) -> None:

60

"""Synchronize all kernels on device."""

61

62

def cuda.is_initialized() -> bool:

63

"""Check if CUDA is initialized."""

64

65

def cuda.init() -> None:

66

"""Initialize CUDA."""

67

```

68

69

### CUDA Memory Management

70

71

GPU memory allocation, caching, and profiling.

72

73

```python { .api }

74

def cuda.empty_cache() -> None:

75

"""Free unused cached memory."""

76

77

def cuda.memory_allocated(device=None) -> int:

78

"""Get currently allocated memory in bytes."""

79

80

def cuda.max_memory_allocated(device=None) -> int:

81

"""Get peak allocated memory in bytes."""

82

83

def cuda.memory_reserved(device=None) -> int:

84

"""Get currently reserved memory in bytes."""

85

86

def cuda.max_memory_reserved(device=None) -> int:

87

"""Get peak reserved memory in bytes."""

88

89

def cuda.memory_cached(device=None) -> int:

90

"""Get currently cached memory in bytes."""

91

92

def cuda.max_memory_cached(device=None) -> int:

93

"""Get peak cached memory in bytes."""

94

95

def cuda.reset_max_memory_allocated(device=None) -> None:

96

"""Reset peak memory stats."""

97

98

def cuda.reset_max_memory_cached(device=None) -> None:

99

"""Reset peak cache stats."""

100

101

def cuda.memory_stats(device=None) -> Dict[str, Any]:

102

"""Get comprehensive memory statistics."""

103

104

def cuda.memory_summary(device=None, abbreviated=False) -> str:

105

"""Get human-readable memory summary."""

106

107

def cuda.memory_snapshot() -> List[Dict[str, Any]]:

108

"""Get detailed memory snapshot."""

109

110

def cuda.set_per_process_memory_fraction(fraction: float, device=None) -> None:

111

"""Set memory fraction for process."""

112

113

def cuda.get_per_process_memory_fraction(device=None) -> float:

114

"""Get memory fraction for process."""

115

```

116

117

### CUDA Streams and Events

118

119

Asynchronous execution control for GPU operations.

120

121

```python { .api }

122

class cuda.Stream:

123

"""CUDA stream for asynchronous operations."""

124

def __init__(self, device=None, priority=0): ...

125

def wait_event(self, event): ...

126

def wait_stream(self, stream): ...

127

def record_event(self, event=None): ...

128

def query(self) -> bool: ...

129

def synchronize(self): ...

130

131

class cuda.Event:

132

"""CUDA event for synchronization."""

133

def __init__(self, enable_timing=False, blocking=False, interprocess=False): ...

134

def record(self, stream=None): ...

135

def wait(self, stream=None): ...

136

def query(self) -> bool: ...

137

def synchronize(self): ...

138

def elapsed_time(self, event) -> float: ...

139

140

def cuda.current_stream(device=None) -> cuda.Stream:

141

"""Get current CUDA stream."""

142

143

def cuda.default_stream(device=None) -> cuda.Stream:

144

"""Get default CUDA stream."""

145

146

def cuda.set_stream(stream) -> None:

147

"""Set current CUDA stream."""

148

```

149

150

### CUDA Random Number Generation

151

152

GPU random number generation functions.

153

154

```python { .api }

155

def cuda.manual_seed(seed: int) -> None:

156

"""Set CUDA random seed."""

157

158

def cuda.manual_seed_all(seed: int) -> None:

159

"""Set CUDA random seed for all devices."""

160

161

def cuda.seed() -> None:

162

"""Generate random CUDA seed."""

163

164

def cuda.seed_all() -> None:

165

"""Generate random CUDA seed for all devices."""

166

167

def cuda.initial_seed() -> int:

168

"""Get initial CUDA random seed."""

169

170

def cuda.get_rng_state(device='cuda') -> Tensor:

171

"""Get CUDA random number generator state."""

172

173

def cuda.get_rng_state_all() -> List[Tensor]:

174

"""Get CUDA RNG state for all devices."""

175

176

def cuda.set_rng_state(new_state: Tensor, device='cuda') -> None:

177

"""Set CUDA random number generator state."""

178

179

def cuda.set_rng_state_all(new_states: List[Tensor]) -> None:

180

"""Set CUDA RNG state for all devices."""

181

```

182

183

### MPS Operations (torch.mps)

184

185

Metal Performance Shaders for Apple Silicon GPU acceleration.

186

187

```python { .api }

188

def mps.is_available() -> bool:

189

"""Check if MPS is available."""

190

191

def mps.is_built() -> bool:

192

"""Check if PyTorch was built with MPS support."""

193

194

def mps.get_default_generator() -> Generator:

195

"""Get default MPS random number generator."""

196

197

def mps.manual_seed(seed: int) -> None:

198

"""Set MPS random seed."""

199

200

def mps.seed() -> None:

201

"""Generate random MPS seed."""

202

203

def mps.synchronize() -> None:

204

"""Synchronize MPS operations."""

205

206

def mps.empty_cache() -> None:

207

"""Free unused MPS memory."""

208

209

def mps.set_per_process_memory_fraction(fraction: float) -> None:

210

"""Set MPS memory fraction."""

211

212

class mps.Event:

213

"""MPS event for synchronization."""

214

def __init__(self): ...

215

def query(self) -> bool: ...

216

def synchronize(self): ...

217

def wait(self): ...

218

```

219

220

### XPU Operations (torch.xpu)

221

222

Intel XPU backend support for Intel GPUs.

223

224

```python { .api }

225

def xpu.is_available() -> bool:

226

"""Check if XPU is available."""

227

228

def xpu.device_count() -> int:

229

"""Number of available XPU devices."""

230

231

def xpu.get_device_name(device=None) -> str:

232

"""Get name of XPU device."""

233

234

def xpu.current_device() -> int:

235

"""Get current XPU device index."""

236

237

def xpu.set_device(device) -> None:

238

"""Set current XPU device."""

239

240

def xpu.synchronize(device=None) -> None:

241

"""Synchronize XPU operations."""

242

243

def xpu.empty_cache() -> None:

244

"""Free unused XPU memory."""

245

```

246

247

### Distributed Computing (torch.distributed)

248

249

Distributed training and multi-process communication.

250

251

```python { .api }

252

def distributed.init_process_group(backend: str, init_method=None, timeout=default_pg_timeout,

253

world_size=-1, rank=-1, store=None, group_name='', pg_options=None) -> None:

254

"""Initialize distributed process group."""

255

256

def distributed.destroy_process_group(group=None) -> None:

257

"""Destroy process group."""

258

259

def distributed.get_rank(group=None) -> int:

260

"""Get rank of current process."""

261

262

def distributed.get_world_size(group=None) -> int:

263

"""Get number of processes in group."""

264

265

def distributed.is_available() -> bool:

266

"""Check if distributed package is available."""

267

268

def distributed.is_initialized() -> bool:

269

"""Check if distributed process group is initialized."""

270

271

def distributed.is_mpi_available() -> bool:

272

"""Check if MPI backend is available."""

273

274

def distributed.is_nccl_available() -> bool:

275

"""Check if NCCL backend is available."""

276

277

def distributed.is_gloo_available() -> bool:

278

"""Check if Gloo backend is available."""

279

280

def distributed.is_torchelastic_launched() -> bool:

281

"""Check if launched with TorchElastic."""

282

283

def distributed.get_backend(group=None) -> str:

284

"""Get backend of process group."""

285

286

def distributed.barrier(group=None, async_op=False) -> Optional[Work]:

287

"""Synchronize all processes."""

288

```

289

290

### Collective Communication Operations

291

292

Distributed communication primitives for multi-GPU training.

293

294

```python { .api }

295

def distributed.broadcast(tensor: Tensor, src: int, group=None, async_op=False) -> Optional[Work]:

296

"""Broadcast tensor from source to all processes."""

297

298

def distributed.all_reduce(tensor: Tensor, op=ReduceOp.SUM, group=None, async_op=False) -> Optional[Work]:

299

"""Reduce tensor across all processes."""

300

301

def distributed.reduce(tensor: Tensor, dst: int, op=ReduceOp.SUM, group=None, async_op=False) -> Optional[Work]:

302

"""Reduce tensor to destination process."""

303

304

def distributed.all_gather(tensor_list: List[Tensor], tensor: Tensor, group=None, async_op=False) -> Optional[Work]:

305

"""Gather tensors from all processes."""

306

307

def distributed.gather(tensor: Tensor, gather_list=None, dst=0, group=None, async_op=False) -> Optional[Work]:

308

"""Gather tensors to destination process."""

309

310

def distributed.scatter(tensor: Tensor, scatter_list=None, src=0, group=None, async_op=False) -> Optional[Work]:

311

"""Scatter tensors from source process."""

312

313

def distributed.reduce_scatter(output: Tensor, input_list: List[Tensor], op=ReduceOp.SUM, group=None, async_op=False) -> Optional[Work]:

314

"""Reduce and scatter tensors."""

315

316

def distributed.all_to_all(output_tensor_list: List[Tensor], input_tensor_list: List[Tensor], group=None, async_op=False) -> Optional[Work]:

317

"""All-to-all communication."""

318

319

def distributed.send(tensor: Tensor, dst: int, group=None, tag=0) -> None:

320

"""Send tensor to destination process."""

321

322

def distributed.recv(tensor: Tensor, src: int, group=None, tag=0) -> None:

323

"""Receive tensor from source process."""

324

325

def distributed.isend(tensor: Tensor, dst: int, group=None, tag=0) -> Work:

326

"""Non-blocking send."""

327

328

def distributed.irecv(tensor: Tensor, src: int, group=None, tag=0) -> Work:

329

"""Non-blocking receive."""

330

```

331

332

### Data Parallel Training

333

334

Distributed data parallel training utilities.

335

336

```python { .api }

337

class nn.DataParallel(Module):

338

"""Data parallel wrapper for single-machine multi-GPU."""

339

def __init__(self, module, device_ids=None, output_device=None, dim=0): ...

340

def forward(self, *inputs, **kwargs): ...

341

342

class nn.parallel.DistributedDataParallel(Module):

343

"""Distributed data parallel for multi-machine training."""

344

def __init__(self, module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True,

345

process_group=None, bucket_cap_mb=25, find_unused_parameters=False,

346

check_reduction=False, gradient_as_bucket_view=False): ...

347

def forward(self, *inputs, **kwargs): ...

348

def no_sync(self) -> ContextManager: ...

349

```

350

351

### Process Groups

352

353

Advanced process group management for flexible distributed training.

354

355

```python { .api }

356

class distributed.ProcessGroup:

357

"""Process group for collective operations."""

358

359

def distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None) -> ProcessGroup:

360

"""Create new process group."""

361

362

def distributed.new_subgroups(group_size=None, group=None, timeout=None, backend=None, pg_options=None) -> List[ProcessGroup]:

363

"""Create subgroups."""

364

365

def distributed.new_subgroups_by_enumeration(ranks_per_subgroup_list, timeout=None, backend=None, pg_options=None) -> List[ProcessGroup]:

366

"""Create subgroups by enumeration."""

367

```

368

369

### Distributed Utilities

370

371

Additional utilities for distributed training.

372

373

```python { .api }

374

def distributed.get_process_group_ranks(group) -> List[int]:

375

"""Get ranks in process group."""

376

377

def distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False) -> None:

378

"""Barrier with monitoring and timeout."""

379

380

class distributed.Store:

381

"""Distributed key-value store."""

382

def get(self, key: str) -> bytes: ...

383

def set(self, key: str, value: bytes): ...

384

def add(self, key: str, value: int) -> int: ...

385

def compare_set(self, key: str, expected_value: bytes, desired_value: bytes) -> bytes: ...

386

def wait(self, keys: List[str], timeout=None): ...

387

388

class distributed.TCPStore(Store):

389

"""TCP-based distributed store."""

390

def __init__(self, host_name: str, port: int, world_size=None, is_master=False, timeout=None): ...

391

392

class distributed.FileStore(Store):

393

"""File-based distributed store."""

394

def __init__(self, file_name: str, world_size=-1): ...

395

396

class distributed.HashStore(Store):

397

"""Hash-based distributed store."""

398

def __init__(self): ...

399

```

400

401

## Usage Examples

402

403

### Basic CUDA Operations

404

405

```python

406

import torch

407

408

# Check CUDA availability

409

if torch.cuda.is_available():

410

print(f"CUDA devices: {torch.cuda.device_count()}")

411

print(f"Current device: {torch.cuda.current_device()}")

412

print(f"Device name: {torch.cuda.get_device_name()}")

413

414

# Create tensors on GPU

415

device = torch.device('cuda')

416

x = torch.randn(1000, 1000, device=device)

417

y = torch.randn(1000, 1000, device=device)

418

419

# GPU operations

420

z = torch.matmul(x, y)

421

422

# Memory management

423

print(f"Allocated memory: {torch.cuda.memory_allocated() / 1e6:.1f} MB")

424

print(f"Cached memory: {torch.cuda.memory_reserved() / 1e6:.1f} MB")

425

426

# Free unused memory

427

torch.cuda.empty_cache()

428

429

# Move back to CPU

430

z_cpu = z.cpu()

431

else:

432

print("CUDA not available")

433

```

434

435

### Multi-GPU Data Parallel

436

437

```python

438

import torch

439

import torch.nn as nn

440

441

# Check for multiple GPUs

442

if torch.cuda.device_count() > 1:

443

print(f"Using {torch.cuda.device_count()} GPUs")

444

445

# Define model

446

model = nn.Sequential(

447

nn.Linear(1000, 500),

448

nn.ReLU(),

449

nn.Linear(500, 100),

450

nn.ReLU(),

451

nn.Linear(100, 10)

452

)

453

454

# Wrap with DataParallel

455

model = nn.DataParallel(model)

456

model = model.cuda()

457

458

# Create batch data

459

batch_size = 64

460

x = torch.randn(batch_size, 1000).cuda()

461

462

# Forward pass uses all available GPUs

463

output = model(x)

464

print(f"Output shape: {output.shape}")

465

print(f"Output device: {output.device}")

466

```

467

468

### CUDA Streams and Events

469

470

```python

471

import torch

472

import time

473

474

if torch.cuda.is_available():

475

device = torch.device('cuda')

476

477

# Create streams

478

stream1 = torch.cuda.Stream()

479

stream2 = torch.cuda.Stream()

480

481

# Create events

482

start_event = torch.cuda.Event(enable_timing=True)

483

end_event = torch.cuda.Event(enable_timing=True)

484

485

# Asynchronous operations

486

x = torch.randn(1000, 1000, device=device)

487

y = torch.randn(1000, 1000, device=device)

488

489

# Record start time

490

start_event.record()

491

492

# Operations on different streams

493

with torch.cuda.stream(stream1):

494

z1 = torch.matmul(x, y)

495

496

with torch.cuda.stream(stream2):

497

z2 = torch.matmul(y, x)

498

499

# Record end time

500

end_event.record()

501

502

# Synchronize

503

torch.cuda.synchronize()

504

505

# Get elapsed time

506

elapsed_time = start_event.elapsed_time(end_event)

507

print(f"Elapsed time: {elapsed_time:.2f} ms")

508

```

509

510

### Distributed Data Parallel Training

511

512

```python

513

import torch

514

import torch.distributed as dist

515

import torch.nn as nn

516

import torch.optim as optim

517

from torch.nn.parallel import DistributedDataParallel as DDP

518

import os

519

520

def setup(rank, world_size):

521

"""Initialize distributed training."""

522

os.environ['MASTER_ADDR'] = 'localhost'

523

os.environ['MASTER_PORT'] = '12355'

524

525

# Initialize process group

526

dist.init_process_group("nccl", rank=rank, world_size=world_size)

527

torch.cuda.set_device(rank)

528

529

def cleanup():

530

"""Clean up distributed training."""

531

dist.destroy_process_group()

532

533

def train_ddp(rank, world_size):

534

"""Distributed training function."""

535

setup(rank, world_size)

536

537

# Create model and move to GPU

538

model = nn.Linear(100, 10).cuda(rank)

539

model = DDP(model, device_ids=[rank])

540

541

# Create optimizer

542

optimizer = optim.SGD(model.parameters(), lr=0.01)

543

544

# Training loop

545

for epoch in range(10):

546

# Create dummy data

547

data = torch.randn(32, 100).cuda(rank)

548

targets = torch.randint(0, 10, (32,)).cuda(rank)

549

550

# Forward pass

551

outputs = model(data)

552

loss = nn.CrossEntropyLoss()(outputs, targets)

553

554

# Backward pass

555

optimizer.zero_grad()

556

loss.backward()

557

optimizer.step()

558

559

if rank == 0:

560

print(f"Epoch {epoch}, Loss: {loss.item():.4f}")

561

562

cleanup()

563

564

# To run: python -m torch.distributed.launch --nproc_per_node=2 script.py

565

```

566

567

### Collective Communication

568

569

```python

570

import torch

571

import torch.distributed as dist

572

573

def collective_example(rank, world_size):

574

"""Example of collective communication operations."""

575

# Initialize

576

dist.init_process_group("nccl", rank=rank, world_size=world_size)

577

578

device = torch.device(f'cuda:{rank}')

579

torch.cuda.set_device(device)

580

581

# Create tensor on each process

582

tensor = torch.ones(2, 2).cuda() * rank

583

print(f"Rank {rank}: Before all_reduce: {tensor}")

584

585

# All-reduce: sum across all processes

586

dist.all_reduce(tensor, op=dist.ReduceOp.SUM)

587

print(f"Rank {rank}: After all_reduce: {tensor}")

588

589

# Broadcast from rank 0

590

broadcast_tensor = torch.zeros(2, 2).cuda()

591

if rank == 0:

592

broadcast_tensor = torch.ones(2, 2).cuda() * 42

593

594

dist.broadcast(broadcast_tensor, src=0)

595

print(f"Rank {rank}: After broadcast: {broadcast_tensor}")

596

597

# All-gather: collect tensors from all processes

598

tensor_list = [torch.zeros(2, 2).cuda() for _ in range(world_size)]

599

local_tensor = torch.ones(2, 2).cuda() * rank

600

dist.all_gather(tensor_list, local_tensor)

601

print(f"Rank {rank}: All gathered tensors: {tensor_list}")

602

603

# Barrier synchronization

604

dist.barrier()

605

print(f"Rank {rank}: All processes synchronized")

606

607

dist.destroy_process_group()

608

```

609

610

### MPS (Apple Silicon) Usage

611

612

```python

613

import torch

614

615

# Check MPS availability

616

if torch.mps.is_available():

617

print("MPS is available")

618

device = torch.device('mps')

619

620

# Create tensors on MPS

621

x = torch.randn(1000, 1000, device=device)

622

y = torch.randn(1000, 1000, device=device)

623

624

# Perform operations

625

z = torch.matmul(x, y)

626

627

# Synchronize MPS operations

628

torch.mps.synchronize()

629

630

# Memory management

631

torch.mps.empty_cache()

632

633

print(f"Computation completed on device: {z.device}")

634

else:

635

print("MPS not available, using CPU")

636

device = torch.device('cpu')

637

```

638

639

### Advanced Memory Management

640

641

```python

642

import torch

643

644

if torch.cuda.is_available():

645

device = torch.device('cuda')

646

647

# Set memory fraction

648

torch.cuda.set_per_process_memory_fraction(0.5) # Use only 50% of GPU memory

649

650

# Memory profiling

651

torch.cuda.reset_max_memory_allocated()

652

torch.cuda.reset_max_memory_cached()

653

654

# Allocate large tensors

655

tensors = []

656

for i in range(10):

657

tensor = torch.randn(1000, 1000, device=device)

658

tensors.append(tensor)

659

660

current_memory = torch.cuda.memory_allocated() / 1e6

661

max_memory = torch.cuda.max_memory_allocated() / 1e6

662

print(f"Iteration {i}: Current: {current_memory:.1f} MB, Peak: {max_memory:.1f} MB")

663

664

# Memory summary

665

print(torch.cuda.memory_summary())

666

667

# Free memory

668

del tensors

669

torch.cuda.empty_cache()

670

671

final_memory = torch.cuda.memory_allocated() / 1e6

672

print(f"Memory after cleanup: {final_memory:.1f} MB")

673

```