0
# Utilities and Advanced Features
1
2
Ray provides utility functions, placement groups, debugging tools, actor pools, and advanced distributed computing features for complex distributed applications and optimized resource management.
3
4
## Capabilities
5
6
### Placement Groups
7
8
Advanced resource management and co-location of tasks and actors.
9
10
```python { .api }
11
def placement_group(bundles, *, strategy="PACK", name="", lifetime=None):
12
"""
13
Create placement group for resource co-location.
14
15
Args:
16
bundles (list): List of resource bundle dictionaries
17
strategy (str): Placement strategy ("PACK", "SPREAD", "STRICT_PACK", "STRICT_SPREAD")
18
name (str, optional): Placement group name
19
lifetime (str, optional): Lifetime policy ("detached" or None)
20
21
Returns:
22
PlacementGroup: Placement group handle
23
"""
24
25
def get_placement_group(name):
26
"""
27
Get existing placement group by name.
28
29
Args:
30
name (str): Placement group name
31
32
Returns:
33
PlacementGroup: Placement group handle
34
"""
35
36
def remove_placement_group(placement_group):
37
"""
38
Remove placement group.
39
40
Args:
41
placement_group (PlacementGroup): Placement group to remove
42
43
Returns:
44
bool: True if removal was successful
45
"""
46
47
def list_placement_groups(*, filter_state=None):
48
"""
49
List all placement groups.
50
51
Args:
52
filter_state (str, optional): Filter by state
53
54
Returns:
55
list: List of placement group information
56
"""
57
58
class PlacementGroup:
59
"""Handle for placement group."""
60
61
def ready(self):
62
"""
63
Check if placement group is ready.
64
65
Returns:
66
ObjectRef: Object reference that becomes ready when PG is ready
67
"""
68
69
@property
70
def bundle_count(self):
71
"""Number of bundles in placement group."""
72
73
@property
74
def id(self):
75
"""Placement group ID."""
76
77
def wait(self, timeout_seconds=None):
78
"""
79
Wait for placement group to be ready.
80
81
Args:
82
timeout_seconds (float, optional): Timeout in seconds
83
84
Returns:
85
bool: True if ready, False if timed out
86
"""
87
88
class PlacementGroupSchedulingStrategy:
89
"""Scheduling strategy for placement group."""
90
91
def __init__(self, placement_group, placement_group_bundle_index=None,
92
placement_group_capture_child_tasks=None):
93
"""
94
Initialize placement group scheduling strategy.
95
96
Args:
97
placement_group (PlacementGroup): Placement group
98
placement_group_bundle_index (int, optional): Bundle index
99
placement_group_capture_child_tasks (bool, optional): Capture child tasks
100
"""
101
```
102
103
### Actor Pools
104
105
Manage pools of actors for load balancing and resource efficiency.
106
107
```python { .api }
108
class ActorPool:
109
"""Pool of actors for load balancing."""
110
111
def __init__(self, actors):
112
"""
113
Initialize actor pool.
114
115
Args:
116
actors (list): List of actor handles
117
"""
118
119
def map(self, fn, values):
120
"""
121
Map function over values using actor pool.
122
123
Args:
124
fn: Function to apply
125
values: Values to process
126
127
Yields:
128
Results from function application
129
"""
130
131
def map_unordered(self, fn, values):
132
"""
133
Map function over values, yielding results as they complete.
134
135
Args:
136
fn: Function to apply
137
values: Values to process
138
139
Yields:
140
Results in completion order
141
"""
142
143
def submit(self, fn, value):
144
"""
145
Submit task to actor pool.
146
147
Args:
148
fn: Function to apply
149
value: Value to process
150
151
Returns:
152
ObjectRef: Result reference
153
"""
154
155
def get_next(self, timeout=None):
156
"""
157
Get next completed result.
158
159
Args:
160
timeout (float, optional): Timeout in seconds
161
162
Returns:
163
tuple: (actor_index, result)
164
"""
165
166
def get_next_unordered(self, timeout=None):
167
"""
168
Get next completed result without order guarantee.
169
170
Args:
171
timeout (float, optional): Timeout in seconds
172
173
Returns:
174
Result value
175
"""
176
177
def has_next(self):
178
"""
179
Check if there are pending results.
180
181
Returns:
182
bool: True if there are pending results
183
"""
184
185
def get_submitter(self):
186
"""
187
Get submitter for async task submission.
188
189
Returns:
190
PoolTaskSubmitter: Task submitter
191
"""
192
```
193
194
### Debugging and Profiling
195
196
Tools for debugging and profiling Ray applications.
197
198
```python { .api }
199
def get_dashboard_url():
200
"""
201
Get Ray dashboard URL.
202
203
Returns:
204
str: Dashboard URL
205
"""
206
207
def timeline(filename=None):
208
"""
209
Get or save Ray timeline for profiling.
210
211
Args:
212
filename (str, optional): File to save timeline to
213
214
Returns:
215
list: Timeline events if no filename provided
216
"""
217
218
def print_timeline(filename=None):
219
"""
220
Print Ray timeline information.
221
222
Args:
223
filename (str, optional): Timeline file to load
224
"""
225
226
class profiling:
227
"""Context manager for Ray profiling."""
228
229
def __init__(self, span_name=None):
230
"""
231
Initialize profiling context.
232
233
Args:
234
span_name (str, optional): Name for profiling span
235
"""
236
237
def __enter__(self):
238
"""Enter profiling context."""
239
240
def __exit__(self, exc_type, exc_val, exc_tb):
241
"""Exit profiling context."""
242
243
def get_node_ip_address():
244
"""
245
Get IP address of current Ray node.
246
247
Returns:
248
str: Node IP address
249
"""
250
251
def get_webui_url():
252
"""
253
Get Ray web UI URL.
254
255
Returns:
256
str: Web UI URL
257
"""
258
```
259
260
### Task and Actor Introspection
261
262
Inspect running tasks and actors.
263
264
```python { .api }
265
def list_tasks(*, filters=None):
266
"""
267
List running tasks.
268
269
Args:
270
filters (list, optional): List of filters to apply
271
272
Returns:
273
dict: Task information
274
"""
275
276
def list_actors(*, filters=None):
277
"""
278
List running actors.
279
280
Args:
281
filters (list, optional): List of filters to apply
282
283
Returns:
284
dict: Actor information
285
"""
286
287
def list_objects(*, filters=None):
288
"""
289
List objects in object store.
290
291
Args:
292
filters (list, optional): List of filters to apply
293
294
Returns:
295
dict: Object information
296
"""
297
298
def summarize_tasks():
299
"""
300
Get summary of tasks.
301
302
Returns:
303
dict: Task summary
304
"""
305
306
def summarize_objects():
307
"""
308
Get summary of objects.
309
310
Returns:
311
dict: Object summary
312
"""
313
```
314
315
### Progress Tracking
316
317
Track progress of Ray operations.
318
319
```python { .api }
320
class ProgressBar:
321
"""Progress bar for Ray operations."""
322
323
def __init__(self, total, title="", unit="it", position=0):
324
"""
325
Initialize progress bar.
326
327
Args:
328
total (int): Total number of items
329
title (str): Progress bar title
330
unit (str): Unit of measurement
331
position (int): Position for multiple progress bars
332
"""
333
334
def block_until_complete(self, object_refs):
335
"""
336
Block until object references are complete, showing progress.
337
338
Args:
339
object_refs (list): List of object references
340
341
Returns:
342
list: Results
343
"""
344
345
def fetch_until_complete(self, object_refs):
346
"""
347
Fetch results as they complete, showing progress.
348
349
Args:
350
object_refs (list): List of object references
351
352
Yields:
353
Results as they complete
354
"""
355
356
def set_description(self, description):
357
"""
358
Set progress bar description.
359
360
Args:
361
description (str): New description
362
"""
363
364
def update(self, n=1):
365
"""
366
Update progress by n items.
367
368
Args:
369
n (int): Number of items completed
370
"""
371
372
def close(self):
373
"""Close progress bar."""
374
```
375
376
### Multiprocessing Integration
377
378
Integration with Python multiprocessing.
379
380
```python { .api }
381
class Pool:
382
"""Ray-based replacement for multiprocessing.Pool."""
383
384
def __init__(self, processes=None, ray_remote_args=None):
385
"""
386
Initialize Ray pool.
387
388
Args:
389
processes (int, optional): Number of worker processes
390
ray_remote_args (dict, optional): Ray remote arguments
391
"""
392
393
def map(self, func, iterable, chunksize=None):
394
"""
395
Map function over iterable.
396
397
Args:
398
func: Function to apply
399
iterable: Items to process
400
chunksize (int, optional): Chunk size for batching
401
402
Returns:
403
list: Results
404
"""
405
406
def map_async(self, func, iterable, chunksize=None, callback=None,
407
error_callback=None):
408
"""
409
Asynchronously map function over iterable.
410
411
Args:
412
func: Function to apply
413
iterable: Items to process
414
chunksize (int, optional): Chunk size
415
callback: Success callback
416
error_callback: Error callback
417
418
Returns:
419
AsyncResult: Async result handle
420
"""
421
422
def imap(self, func, iterable, chunksize=1):
423
"""
424
Lazily map function over iterable.
425
426
Args:
427
func: Function to apply
428
iterable: Items to process
429
chunksize (int): Chunk size
430
431
Returns:
432
Iterator: Result iterator
433
"""
434
435
def imap_unordered(self, func, iterable, chunksize=1):
436
"""
437
Lazily map function, yielding results in completion order.
438
439
Args:
440
func: Function to apply
441
iterable: Items to process
442
chunksize (int): Chunk size
443
444
Returns:
445
Iterator: Result iterator
446
"""
447
448
def starmap(self, func, iterable, chunksize=None):
449
"""
450
Map function over iterable of argument tuples.
451
452
Args:
453
func: Function to apply
454
iterable: Tuples of arguments
455
chunksize (int, optional): Chunk size
456
457
Returns:
458
list: Results
459
"""
460
461
def apply(self, func, args=(), kwds={}):
462
"""
463
Apply function with arguments.
464
465
Args:
466
func: Function to apply
467
args (tuple): Positional arguments
468
kwds (dict): Keyword arguments
469
470
Returns:
471
Result value
472
"""
473
474
def apply_async(self, func, args=(), kwds={}, callback=None,
475
error_callback=None):
476
"""
477
Asynchronously apply function.
478
479
Args:
480
func: Function to apply
481
args (tuple): Positional arguments
482
kwds (dict): Keyword arguments
483
callback: Success callback
484
error_callback: Error callback
485
486
Returns:
487
AsyncResult: Async result handle
488
"""
489
490
def close(self):
491
"""Close pool."""
492
493
def terminate(self):
494
"""Terminate pool."""
495
496
def join(self):
497
"""Wait for workers to exit."""
498
```
499
500
### Runtime Environment
501
502
Manage runtime environments for isolation.
503
504
```python { .api }
505
class RuntimeEnv:
506
"""Runtime environment specification."""
507
508
def __init__(self, *, py_modules=None, working_dir=None, pip=None,
509
conda=None, env_vars=None, container=None,
510
excludes=None, _validate=True):
511
"""
512
Initialize runtime environment.
513
514
Args:
515
py_modules (list, optional): Python modules to include
516
working_dir (str, optional): Working directory
517
pip (list/str, optional): Pip requirements
518
conda (str/dict, optional): Conda environment specification
519
env_vars (dict, optional): Environment variables
520
container (dict, optional): Container specification
521
excludes (list, optional): Files/patterns to exclude
522
_validate (bool): Whether to validate specification
523
"""
524
525
def runtime_env_context_manager(runtime_env):
526
"""
527
Context manager for runtime environment.
528
529
Args:
530
runtime_env (dict/RuntimeEnv): Runtime environment specification
531
532
Returns:
533
Context manager for runtime environment
534
"""
535
```
536
537
### Advanced Resource Management
538
539
Advanced resource allocation and management.
540
541
```python { .api }
542
def get_current_node_resource_key():
543
"""
544
Get resource key for current node.
545
546
Returns:
547
str: Node resource key
548
"""
549
550
def list_named_actors(*, all_namespaces=False):
551
"""
552
List named actors.
553
554
Args:
555
all_namespaces (bool): Whether to include all namespaces
556
557
Returns:
558
list: Named actor information
559
"""
560
561
class Accelerator:
562
"""Accelerator resource specification."""
563
564
def __init__(self, accelerator_type, num=None):
565
"""
566
Initialize accelerator specification.
567
568
Args:
569
accelerator_type (str): Type of accelerator
570
num (int, optional): Number of accelerators
571
"""
572
```
573
574
### Collective Communications
575
576
Distributed communication operations for multi-GPU and multi-node training.
577
578
```python { .api }
579
def init_collective_group(world_size, rank, backend="nccl", group_name="default"):
580
"""
581
Initialize collective communication group.
582
583
Args:
584
world_size (int): Total number of processes
585
rank (int): Rank of current process
586
backend (str): Communication backend ("nccl", "gloo")
587
group_name (str): Name of communication group
588
"""
589
590
def destroy_collective_group(group_name="default"):
591
"""
592
Destroy collective communication group.
593
594
Args:
595
group_name (str): Name of group to destroy
596
"""
597
598
def allreduce(tensor, group_name="default", op="SUM"):
599
"""
600
All-reduce operation across all processes.
601
602
Args:
603
tensor: Input tensor to reduce
604
group_name (str): Communication group name
605
op (str): Reduction operation ("SUM", "PRODUCT", "MIN", "MAX")
606
607
Returns:
608
Reduced tensor
609
"""
610
611
def broadcast(tensor, src_rank, group_name="default"):
612
"""
613
Broadcast tensor from source to all processes.
614
615
Args:
616
tensor: Tensor to broadcast
617
src_rank (int): Source rank for broadcast
618
group_name (str): Communication group name
619
620
Returns:
621
Broadcasted tensor
622
"""
623
624
def allgather(tensor, group_name="default"):
625
"""
626
All-gather operation to collect tensors from all processes.
627
628
Args:
629
tensor: Input tensor
630
group_name (str): Communication group name
631
632
Returns:
633
List of tensors from all processes
634
"""
635
636
def barrier(group_name="default"):
637
"""
638
Synchronization barrier for all processes.
639
640
Args:
641
group_name (str): Communication group name
642
"""
643
644
def get_rank(group_name="default"):
645
"""
646
Get rank of current process in group.
647
648
Args:
649
group_name (str): Communication group name
650
651
Returns:
652
int: Current process rank
653
"""
654
655
def get_world_size(group_name="default"):
656
"""
657
Get world size of communication group.
658
659
Args:
660
group_name (str): Communication group name
661
662
Returns:
663
int: Total number of processes
664
"""
665
666
def allreduce_multigpu(tensor_list, group_name="default", op="SUM"):
667
"""
668
Multi-GPU all-reduce operation.
669
670
Args:
671
tensor_list (list): List of tensors (one per GPU)
672
group_name (str): Communication group name
673
op (str): Reduction operation
674
675
Returns:
676
List of reduced tensors
677
"""
678
679
def broadcast_multigpu(tensor_list, src_rank, group_name="default"):
680
"""
681
Multi-GPU broadcast operation.
682
683
Args:
684
tensor_list (list): List of tensors (one per GPU)
685
src_rank (int): Source rank for broadcast
686
group_name (str): Communication group name
687
688
Returns:
689
List of broadcasted tensors
690
"""
691
```
692
693
## Usage Examples
694
695
### Placement Groups Example
696
697
```python
698
import ray
699
700
ray.init()
701
702
# Create placement group with co-located resources
703
pg = ray.util.placement_group([
704
{"CPU": 2, "GPU": 1}, # Bundle 0
705
{"CPU": 2, "GPU": 1}, # Bundle 1
706
{"CPU": 4} # Bundle 2
707
], strategy="PACK")
708
709
# Wait for placement group to be ready
710
ray.get(pg.ready())
711
712
# Use placement group for actor creation
713
@ray.remote(num_cpus=2, num_gpus=1)
714
class GPUActor:
715
def train_model(self):
716
return "Training on GPU"
717
718
# Create actors in specific bundles
719
actor1 = GPUActor.options(
720
scheduling_strategy=PlacementGroupSchedulingStrategy(
721
placement_group=pg,
722
placement_group_bundle_index=0
723
)
724
).remote()
725
726
actor2 = GPUActor.options(
727
scheduling_strategy=PlacementGroupSchedulingStrategy(
728
placement_group=pg,
729
placement_group_bundle_index=1
730
)
731
).remote()
732
733
# Use the actors
734
results = ray.get([
735
actor1.train_model.remote(),
736
actor2.train_model.remote()
737
])
738
print(results)
739
740
# Clean up
741
ray.util.remove_placement_group(pg)
742
ray.shutdown()
743
```
744
745
### Actor Pool Example
746
747
```python
748
import ray
749
from ray.util import ActorPool
750
751
ray.init()
752
753
@ray.remote
754
class Worker:
755
def __init__(self, worker_id):
756
self.worker_id = worker_id
757
758
def process(self, item):
759
# Simulate processing
760
import time
761
time.sleep(1)
762
return f"Worker {self.worker_id} processed {item}"
763
764
# Create workers
765
workers = [Worker.remote(i) for i in range(4)]
766
767
# Create actor pool
768
pool = ActorPool(workers)
769
770
# Process items using the pool
771
items = list(range(20))
772
results = list(pool.map(lambda w, item: w.process.remote(item), items))
773
774
print(f"Processed {len(results)} items")
775
776
ray.shutdown()
777
```
778
779
### Progress Tracking Example
780
781
```python
782
import ray
783
from ray.experimental import ProgressBar
784
import time
785
786
ray.init()
787
788
@ray.remote
789
def slow_task(i):
790
time.sleep(2)
791
return i ** 2
792
793
# Create tasks
794
num_tasks = 10
795
pb = ProgressBar(num_tasks, title="Processing")
796
797
# Submit tasks and track progress
798
tasks = [slow_task.remote(i) for i in range(num_tasks)]
799
results = pb.block_until_complete(tasks)
800
801
print(f"Results: {results}")
802
pb.close()
803
804
ray.shutdown()
805
```
806
807
### Multiprocessing Pool Example
808
809
```python
810
import ray
811
from ray.util.multiprocessing import Pool
812
813
def square(x):
814
return x ** 2
815
816
ray.init()
817
818
# Use Ray pool instead of multiprocessing.Pool
819
with Pool() as pool:
820
results = pool.map(square, range(10))
821
print(f"Squares: {results}")
822
823
ray.shutdown()
824
```
825
826
### Debugging and Monitoring Example
827
828
```python
829
import ray
830
831
ray.init()
832
833
@ray.remote
834
def monitored_task(x):
835
with ray.profiling.profile("computation"):
836
# Some computation
837
result = sum(range(x))
838
return result
839
840
# Submit tasks
841
tasks = [monitored_task.remote(1000) for _ in range(5)]
842
results = ray.get(tasks)
843
844
# Get debugging information
845
print("Dashboard URL:", ray.get_dashboard_url())
846
print("Node IP:", ray.get_node_ip_address())
847
848
# List running tasks and actors
849
print("Tasks:", ray.util.list_tasks())
850
print("Actors:", ray.util.list_actors())
851
852
# Get timeline for profiling
853
timeline_data = ray.timeline()
854
print(f"Timeline has {len(timeline_data)} events")
855
856
ray.shutdown()
857
```
858
859
### Runtime Environment Example
860
861
```python
862
import ray
863
864
# Define runtime environment
865
runtime_env = {
866
"pip": ["numpy==1.21.0", "pandas==1.3.0"],
867
"env_vars": {"MY_ENV_VAR": "value"},
868
"working_dir": "./my_project"
869
}
870
871
ray.init()
872
873
@ray.remote(runtime_env=runtime_env)
874
def task_with_runtime_env():
875
import numpy as np
876
import pandas as pd
877
import os
878
879
return {
880
"numpy_version": np.__version__,
881
"pandas_version": pd.__version__,
882
"env_var": os.environ.get("MY_ENV_VAR")
883
}
884
885
result = ray.get(task_with_runtime_env.remote())
886
print("Task result:", result)
887
888
ray.shutdown()
889
```
890
891
### Advanced Resource Management
892
893
```python
894
import ray
895
896
ray.init()
897
898
# Define custom resource requirements
899
@ray.remote(resources={"custom_resource": 1})
900
class CustomResourceActor:
901
def process(self):
902
return "Processing with custom resource"
903
904
# Get current node resources
905
print("Available resources:", ray.available_resources())
906
print("Cluster resources:", ray.cluster_resources())
907
908
# Create actor with custom resources (will wait for resources)
909
try:
910
actor = CustomResourceActor.remote()
911
result = ray.get(actor.process.remote(), timeout=5)
912
print(result)
913
except ray.exceptions.RayTimeoutError:
914
print("Task timed out - custom resource not available")
915
916
ray.shutdown()
917
```