or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-distributed.mddata-processing.mddistributed-training.mdhyperparameter-tuning.mdindex.mdmodel-serving.mdreinforcement-learning.mdutilities-advanced.md

utilities-advanced.mddocs/

0

# Utilities and Advanced Features

1

2

Ray provides utility functions, placement groups, debugging tools, actor pools, and advanced distributed computing features for complex distributed applications and optimized resource management.

3

4

## Capabilities

5

6

### Placement Groups

7

8

Advanced resource management and co-location of tasks and actors.

9

10

```python { .api }

11

def placement_group(bundles, *, strategy="PACK", name="", lifetime=None):

12

"""

13

Create placement group for resource co-location.

14

15

Args:

16

bundles (list): List of resource bundle dictionaries

17

strategy (str): Placement strategy ("PACK", "SPREAD", "STRICT_PACK", "STRICT_SPREAD")

18

name (str, optional): Placement group name

19

lifetime (str, optional): Lifetime policy ("detached" or None)

20

21

Returns:

22

PlacementGroup: Placement group handle

23

"""

24

25

def get_placement_group(name):

26

"""

27

Get existing placement group by name.

28

29

Args:

30

name (str): Placement group name

31

32

Returns:

33

PlacementGroup: Placement group handle

34

"""

35

36

def remove_placement_group(placement_group):

37

"""

38

Remove placement group.

39

40

Args:

41

placement_group (PlacementGroup): Placement group to remove

42

43

Returns:

44

bool: True if removal was successful

45

"""

46

47

def list_placement_groups(*, filter_state=None):

48

"""

49

List all placement groups.

50

51

Args:

52

filter_state (str, optional): Filter by state

53

54

Returns:

55

list: List of placement group information

56

"""

57

58

class PlacementGroup:

59

"""Handle for placement group."""

60

61

def ready(self):

62

"""

63

Check if placement group is ready.

64

65

Returns:

66

ObjectRef: Object reference that becomes ready when PG is ready

67

"""

68

69

@property

70

def bundle_count(self):

71

"""Number of bundles in placement group."""

72

73

@property

74

def id(self):

75

"""Placement group ID."""

76

77

def wait(self, timeout_seconds=None):

78

"""

79

Wait for placement group to be ready.

80

81

Args:

82

timeout_seconds (float, optional): Timeout in seconds

83

84

Returns:

85

bool: True if ready, False if timed out

86

"""

87

88

class PlacementGroupSchedulingStrategy:

89

"""Scheduling strategy for placement group."""

90

91

def __init__(self, placement_group, placement_group_bundle_index=None,

92

placement_group_capture_child_tasks=None):

93

"""

94

Initialize placement group scheduling strategy.

95

96

Args:

97

placement_group (PlacementGroup): Placement group

98

placement_group_bundle_index (int, optional): Bundle index

99

placement_group_capture_child_tasks (bool, optional): Capture child tasks

100

"""

101

```

102

103

### Actor Pools

104

105

Manage pools of actors for load balancing and resource efficiency.

106

107

```python { .api }

108

class ActorPool:

109

"""Pool of actors for load balancing."""

110

111

def __init__(self, actors):

112

"""

113

Initialize actor pool.

114

115

Args:

116

actors (list): List of actor handles

117

"""

118

119

def map(self, fn, values):

120

"""

121

Map function over values using actor pool.

122

123

Args:

124

fn: Function to apply

125

values: Values to process

126

127

Yields:

128

Results from function application

129

"""

130

131

def map_unordered(self, fn, values):

132

"""

133

Map function over values, yielding results as they complete.

134

135

Args:

136

fn: Function to apply

137

values: Values to process

138

139

Yields:

140

Results in completion order

141

"""

142

143

def submit(self, fn, value):

144

"""

145

Submit task to actor pool.

146

147

Args:

148

fn: Function to apply

149

value: Value to process

150

151

Returns:

152

ObjectRef: Result reference

153

"""

154

155

def get_next(self, timeout=None):

156

"""

157

Get next completed result.

158

159

Args:

160

timeout (float, optional): Timeout in seconds

161

162

Returns:

163

tuple: (actor_index, result)

164

"""

165

166

def get_next_unordered(self, timeout=None):

167

"""

168

Get next completed result without order guarantee.

169

170

Args:

171

timeout (float, optional): Timeout in seconds

172

173

Returns:

174

Result value

175

"""

176

177

def has_next(self):

178

"""

179

Check if there are pending results.

180

181

Returns:

182

bool: True if there are pending results

183

"""

184

185

def get_submitter(self):

186

"""

187

Get submitter for async task submission.

188

189

Returns:

190

PoolTaskSubmitter: Task submitter

191

"""

192

```

193

194

### Debugging and Profiling

195

196

Tools for debugging and profiling Ray applications.

197

198

```python { .api }

199

def get_dashboard_url():

200

"""

201

Get Ray dashboard URL.

202

203

Returns:

204

str: Dashboard URL

205

"""

206

207

def timeline(filename=None):

208

"""

209

Get or save Ray timeline for profiling.

210

211

Args:

212

filename (str, optional): File to save timeline to

213

214

Returns:

215

list: Timeline events if no filename provided

216

"""

217

218

def print_timeline(filename=None):

219

"""

220

Print Ray timeline information.

221

222

Args:

223

filename (str, optional): Timeline file to load

224

"""

225

226

class profiling:

227

"""Context manager for Ray profiling."""

228

229

def __init__(self, span_name=None):

230

"""

231

Initialize profiling context.

232

233

Args:

234

span_name (str, optional): Name for profiling span

235

"""

236

237

def __enter__(self):

238

"""Enter profiling context."""

239

240

def __exit__(self, exc_type, exc_val, exc_tb):

241

"""Exit profiling context."""

242

243

def get_node_ip_address():

244

"""

245

Get IP address of current Ray node.

246

247

Returns:

248

str: Node IP address

249

"""

250

251

def get_webui_url():

252

"""

253

Get Ray web UI URL.

254

255

Returns:

256

str: Web UI URL

257

"""

258

```

259

260

### Task and Actor Introspection

261

262

Inspect running tasks and actors.

263

264

```python { .api }

265

def list_tasks(*, filters=None):

266

"""

267

List running tasks.

268

269

Args:

270

filters (list, optional): List of filters to apply

271

272

Returns:

273

dict: Task information

274

"""

275

276

def list_actors(*, filters=None):

277

"""

278

List running actors.

279

280

Args:

281

filters (list, optional): List of filters to apply

282

283

Returns:

284

dict: Actor information

285

"""

286

287

def list_objects(*, filters=None):

288

"""

289

List objects in object store.

290

291

Args:

292

filters (list, optional): List of filters to apply

293

294

Returns:

295

dict: Object information

296

"""

297

298

def summarize_tasks():

299

"""

300

Get summary of tasks.

301

302

Returns:

303

dict: Task summary

304

"""

305

306

def summarize_objects():

307

"""

308

Get summary of objects.

309

310

Returns:

311

dict: Object summary

312

"""

313

```

314

315

### Progress Tracking

316

317

Track progress of Ray operations.

318

319

```python { .api }

320

class ProgressBar:

321

"""Progress bar for Ray operations."""

322

323

def __init__(self, total, title="", unit="it", position=0):

324

"""

325

Initialize progress bar.

326

327

Args:

328

total (int): Total number of items

329

title (str): Progress bar title

330

unit (str): Unit of measurement

331

position (int): Position for multiple progress bars

332

"""

333

334

def block_until_complete(self, object_refs):

335

"""

336

Block until object references are complete, showing progress.

337

338

Args:

339

object_refs (list): List of object references

340

341

Returns:

342

list: Results

343

"""

344

345

def fetch_until_complete(self, object_refs):

346

"""

347

Fetch results as they complete, showing progress.

348

349

Args:

350

object_refs (list): List of object references

351

352

Yields:

353

Results as they complete

354

"""

355

356

def set_description(self, description):

357

"""

358

Set progress bar description.

359

360

Args:

361

description (str): New description

362

"""

363

364

def update(self, n=1):

365

"""

366

Update progress by n items.

367

368

Args:

369

n (int): Number of items completed

370

"""

371

372

def close(self):

373

"""Close progress bar."""

374

```

375

376

### Multiprocessing Integration

377

378

Integration with Python multiprocessing.

379

380

```python { .api }

381

class Pool:

382

"""Ray-based replacement for multiprocessing.Pool."""

383

384

def __init__(self, processes=None, ray_remote_args=None):

385

"""

386

Initialize Ray pool.

387

388

Args:

389

processes (int, optional): Number of worker processes

390

ray_remote_args (dict, optional): Ray remote arguments

391

"""

392

393

def map(self, func, iterable, chunksize=None):

394

"""

395

Map function over iterable.

396

397

Args:

398

func: Function to apply

399

iterable: Items to process

400

chunksize (int, optional): Chunk size for batching

401

402

Returns:

403

list: Results

404

"""

405

406

def map_async(self, func, iterable, chunksize=None, callback=None,

407

error_callback=None):

408

"""

409

Asynchronously map function over iterable.

410

411

Args:

412

func: Function to apply

413

iterable: Items to process

414

chunksize (int, optional): Chunk size

415

callback: Success callback

416

error_callback: Error callback

417

418

Returns:

419

AsyncResult: Async result handle

420

"""

421

422

def imap(self, func, iterable, chunksize=1):

423

"""

424

Lazily map function over iterable.

425

426

Args:

427

func: Function to apply

428

iterable: Items to process

429

chunksize (int): Chunk size

430

431

Returns:

432

Iterator: Result iterator

433

"""

434

435

def imap_unordered(self, func, iterable, chunksize=1):

436

"""

437

Lazily map function, yielding results in completion order.

438

439

Args:

440

func: Function to apply

441

iterable: Items to process

442

chunksize (int): Chunk size

443

444

Returns:

445

Iterator: Result iterator

446

"""

447

448

def starmap(self, func, iterable, chunksize=None):

449

"""

450

Map function over iterable of argument tuples.

451

452

Args:

453

func: Function to apply

454

iterable: Tuples of arguments

455

chunksize (int, optional): Chunk size

456

457

Returns:

458

list: Results

459

"""

460

461

def apply(self, func, args=(), kwds={}):

462

"""

463

Apply function with arguments.

464

465

Args:

466

func: Function to apply

467

args (tuple): Positional arguments

468

kwds (dict): Keyword arguments

469

470

Returns:

471

Result value

472

"""

473

474

def apply_async(self, func, args=(), kwds={}, callback=None,

475

error_callback=None):

476

"""

477

Asynchronously apply function.

478

479

Args:

480

func: Function to apply

481

args (tuple): Positional arguments

482

kwds (dict): Keyword arguments

483

callback: Success callback

484

error_callback: Error callback

485

486

Returns:

487

AsyncResult: Async result handle

488

"""

489

490

def close(self):

491

"""Close pool."""

492

493

def terminate(self):

494

"""Terminate pool."""

495

496

def join(self):

497

"""Wait for workers to exit."""

498

```

499

500

### Runtime Environment

501

502

Manage runtime environments for isolation.

503

504

```python { .api }

505

class RuntimeEnv:

506

"""Runtime environment specification."""

507

508

def __init__(self, *, py_modules=None, working_dir=None, pip=None,

509

conda=None, env_vars=None, container=None,

510

excludes=None, _validate=True):

511

"""

512

Initialize runtime environment.

513

514

Args:

515

py_modules (list, optional): Python modules to include

516

working_dir (str, optional): Working directory

517

pip (list/str, optional): Pip requirements

518

conda (str/dict, optional): Conda environment specification

519

env_vars (dict, optional): Environment variables

520

container (dict, optional): Container specification

521

excludes (list, optional): Files/patterns to exclude

522

_validate (bool): Whether to validate specification

523

"""

524

525

def runtime_env_context_manager(runtime_env):

526

"""

527

Context manager for runtime environment.

528

529

Args:

530

runtime_env (dict/RuntimeEnv): Runtime environment specification

531

532

Returns:

533

Context manager for runtime environment

534

"""

535

```

536

537

### Advanced Resource Management

538

539

Advanced resource allocation and management.

540

541

```python { .api }

542

def get_current_node_resource_key():

543

"""

544

Get resource key for current node.

545

546

Returns:

547

str: Node resource key

548

"""

549

550

def list_named_actors(*, all_namespaces=False):

551

"""

552

List named actors.

553

554

Args:

555

all_namespaces (bool): Whether to include all namespaces

556

557

Returns:

558

list: Named actor information

559

"""

560

561

class Accelerator:

562

"""Accelerator resource specification."""

563

564

def __init__(self, accelerator_type, num=None):

565

"""

566

Initialize accelerator specification.

567

568

Args:

569

accelerator_type (str): Type of accelerator

570

num (int, optional): Number of accelerators

571

"""

572

```

573

574

### Collective Communications

575

576

Distributed communication operations for multi-GPU and multi-node training.

577

578

```python { .api }

579

def init_collective_group(world_size, rank, backend="nccl", group_name="default"):

580

"""

581

Initialize collective communication group.

582

583

Args:

584

world_size (int): Total number of processes

585

rank (int): Rank of current process

586

backend (str): Communication backend ("nccl", "gloo")

587

group_name (str): Name of communication group

588

"""

589

590

def destroy_collective_group(group_name="default"):

591

"""

592

Destroy collective communication group.

593

594

Args:

595

group_name (str): Name of group to destroy

596

"""

597

598

def allreduce(tensor, group_name="default", op="SUM"):

599

"""

600

All-reduce operation across all processes.

601

602

Args:

603

tensor: Input tensor to reduce

604

group_name (str): Communication group name

605

op (str): Reduction operation ("SUM", "PRODUCT", "MIN", "MAX")

606

607

Returns:

608

Reduced tensor

609

"""

610

611

def broadcast(tensor, src_rank, group_name="default"):

612

"""

613

Broadcast tensor from source to all processes.

614

615

Args:

616

tensor: Tensor to broadcast

617

src_rank (int): Source rank for broadcast

618

group_name (str): Communication group name

619

620

Returns:

621

Broadcasted tensor

622

"""

623

624

def allgather(tensor, group_name="default"):

625

"""

626

All-gather operation to collect tensors from all processes.

627

628

Args:

629

tensor: Input tensor

630

group_name (str): Communication group name

631

632

Returns:

633

List of tensors from all processes

634

"""

635

636

def barrier(group_name="default"):

637

"""

638

Synchronization barrier for all processes.

639

640

Args:

641

group_name (str): Communication group name

642

"""

643

644

def get_rank(group_name="default"):

645

"""

646

Get rank of current process in group.

647

648

Args:

649

group_name (str): Communication group name

650

651

Returns:

652

int: Current process rank

653

"""

654

655

def get_world_size(group_name="default"):

656

"""

657

Get world size of communication group.

658

659

Args:

660

group_name (str): Communication group name

661

662

Returns:

663

int: Total number of processes

664

"""

665

666

def allreduce_multigpu(tensor_list, group_name="default", op="SUM"):

667

"""

668

Multi-GPU all-reduce operation.

669

670

Args:

671

tensor_list (list): List of tensors (one per GPU)

672

group_name (str): Communication group name

673

op (str): Reduction operation

674

675

Returns:

676

List of reduced tensors

677

"""

678

679

def broadcast_multigpu(tensor_list, src_rank, group_name="default"):

680

"""

681

Multi-GPU broadcast operation.

682

683

Args:

684

tensor_list (list): List of tensors (one per GPU)

685

src_rank (int): Source rank for broadcast

686

group_name (str): Communication group name

687

688

Returns:

689

List of broadcasted tensors

690

"""

691

```

692

693

## Usage Examples

694

695

### Placement Groups Example

696

697

```python

698

import ray

699

700

ray.init()

701

702

# Create placement group with co-located resources

703

pg = ray.util.placement_group([

704

{"CPU": 2, "GPU": 1}, # Bundle 0

705

{"CPU": 2, "GPU": 1}, # Bundle 1

706

{"CPU": 4} # Bundle 2

707

], strategy="PACK")

708

709

# Wait for placement group to be ready

710

ray.get(pg.ready())

711

712

# Use placement group for actor creation

713

@ray.remote(num_cpus=2, num_gpus=1)

714

class GPUActor:

715

def train_model(self):

716

return "Training on GPU"

717

718

# Create actors in specific bundles

719

actor1 = GPUActor.options(

720

scheduling_strategy=PlacementGroupSchedulingStrategy(

721

placement_group=pg,

722

placement_group_bundle_index=0

723

)

724

).remote()

725

726

actor2 = GPUActor.options(

727

scheduling_strategy=PlacementGroupSchedulingStrategy(

728

placement_group=pg,

729

placement_group_bundle_index=1

730

)

731

).remote()

732

733

# Use the actors

734

results = ray.get([

735

actor1.train_model.remote(),

736

actor2.train_model.remote()

737

])

738

print(results)

739

740

# Clean up

741

ray.util.remove_placement_group(pg)

742

ray.shutdown()

743

```

744

745

### Actor Pool Example

746

747

```python

748

import ray

749

from ray.util import ActorPool

750

751

ray.init()

752

753

@ray.remote

754

class Worker:

755

def __init__(self, worker_id):

756

self.worker_id = worker_id

757

758

def process(self, item):

759

# Simulate processing

760

import time

761

time.sleep(1)

762

return f"Worker {self.worker_id} processed {item}"

763

764

# Create workers

765

workers = [Worker.remote(i) for i in range(4)]

766

767

# Create actor pool

768

pool = ActorPool(workers)

769

770

# Process items using the pool

771

items = list(range(20))

772

results = list(pool.map(lambda w, item: w.process.remote(item), items))

773

774

print(f"Processed {len(results)} items")

775

776

ray.shutdown()

777

```

778

779

### Progress Tracking Example

780

781

```python

782

import ray

783

from ray.experimental import ProgressBar

784

import time

785

786

ray.init()

787

788

@ray.remote

789

def slow_task(i):

790

time.sleep(2)

791

return i ** 2

792

793

# Create tasks

794

num_tasks = 10

795

pb = ProgressBar(num_tasks, title="Processing")

796

797

# Submit tasks and track progress

798

tasks = [slow_task.remote(i) for i in range(num_tasks)]

799

results = pb.block_until_complete(tasks)

800

801

print(f"Results: {results}")

802

pb.close()

803

804

ray.shutdown()

805

```

806

807

### Multiprocessing Pool Example

808

809

```python

810

import ray

811

from ray.util.multiprocessing import Pool

812

813

def square(x):

814

return x ** 2

815

816

ray.init()

817

818

# Use Ray pool instead of multiprocessing.Pool

819

with Pool() as pool:

820

results = pool.map(square, range(10))

821

print(f"Squares: {results}")

822

823

ray.shutdown()

824

```

825

826

### Debugging and Monitoring Example

827

828

```python

829

import ray

830

831

ray.init()

832

833

@ray.remote

834

def monitored_task(x):

835

with ray.profiling.profile("computation"):

836

# Some computation

837

result = sum(range(x))

838

return result

839

840

# Submit tasks

841

tasks = [monitored_task.remote(1000) for _ in range(5)]

842

results = ray.get(tasks)

843

844

# Get debugging information

845

print("Dashboard URL:", ray.get_dashboard_url())

846

print("Node IP:", ray.get_node_ip_address())

847

848

# List running tasks and actors

849

print("Tasks:", ray.util.list_tasks())

850

print("Actors:", ray.util.list_actors())

851

852

# Get timeline for profiling

853

timeline_data = ray.timeline()

854

print(f"Timeline has {len(timeline_data)} events")

855

856

ray.shutdown()

857

```

858

859

### Runtime Environment Example

860

861

```python

862

import ray

863

864

# Define runtime environment

865

runtime_env = {

866

"pip": ["numpy==1.21.0", "pandas==1.3.0"],

867

"env_vars": {"MY_ENV_VAR": "value"},

868

"working_dir": "./my_project"

869

}

870

871

ray.init()

872

873

@ray.remote(runtime_env=runtime_env)

874

def task_with_runtime_env():

875

import numpy as np

876

import pandas as pd

877

import os

878

879

return {

880

"numpy_version": np.__version__,

881

"pandas_version": pd.__version__,

882

"env_var": os.environ.get("MY_ENV_VAR")

883

}

884

885

result = ray.get(task_with_runtime_env.remote())

886

print("Task result:", result)

887

888

ray.shutdown()

889

```

890

891

### Advanced Resource Management

892

893

```python

894

import ray

895

896

ray.init()

897

898

# Define custom resource requirements

899

@ray.remote(resources={"custom_resource": 1})

900

class CustomResourceActor:

901

def process(self):

902

return "Processing with custom resource"

903

904

# Get current node resources

905

print("Available resources:", ray.available_resources())

906

print("Cluster resources:", ray.cluster_resources())

907

908

# Create actor with custom resources (will wait for resources)

909

try:

910

actor = CustomResourceActor.remote()

911

result = ray.get(actor.process.remote(), timeout=5)

912

print(result)

913

except ray.exceptions.RayTimeoutError:

914

print("Task timed out - custom resource not available")

915

916

ray.shutdown()

917

```