Ray is a unified framework for scaling AI and Python applications.
—
Ray provides utility functions, placement groups, debugging tools, actor pools, and advanced distributed computing features for complex distributed applications and optimized resource management.
Advanced resource management and co-location of tasks and actors.
def placement_group(bundles, *, strategy="PACK", name="", lifetime=None):
"""
Create placement group for resource co-location.
Args:
bundles (list): List of resource bundle dictionaries
strategy (str): Placement strategy ("PACK", "SPREAD", "STRICT_PACK", "STRICT_SPREAD")
name (str, optional): Placement group name
lifetime (str, optional): Lifetime policy ("detached" or None)
Returns:
PlacementGroup: Placement group handle
"""
def get_placement_group(name):
"""
Get existing placement group by name.
Args:
name (str): Placement group name
Returns:
PlacementGroup: Placement group handle
"""
def remove_placement_group(placement_group):
"""
Remove placement group.
Args:
placement_group (PlacementGroup): Placement group to remove
Returns:
bool: True if removal was successful
"""
def list_placement_groups(*, filter_state=None):
"""
List all placement groups.
Args:
filter_state (str, optional): Filter by state
Returns:
list: List of placement group information
"""
class PlacementGroup:
"""Handle for placement group."""
def ready(self):
"""
Check if placement group is ready.
Returns:
ObjectRef: Object reference that becomes ready when PG is ready
"""
@property
def bundle_count(self):
"""Number of bundles in placement group."""
@property
def id(self):
"""Placement group ID."""
def wait(self, timeout_seconds=None):
"""
Wait for placement group to be ready.
Args:
timeout_seconds (float, optional): Timeout in seconds
Returns:
bool: True if ready, False if timed out
"""
class PlacementGroupSchedulingStrategy:
"""Scheduling strategy for placement group."""
def __init__(self, placement_group, placement_group_bundle_index=None,
placement_group_capture_child_tasks=None):
"""
Initialize placement group scheduling strategy.
Args:
placement_group (PlacementGroup): Placement group
placement_group_bundle_index (int, optional): Bundle index
placement_group_capture_child_tasks (bool, optional): Capture child tasks
"""Manage pools of actors for load balancing and resource efficiency.
class ActorPool:
"""Pool of actors for load balancing."""
def __init__(self, actors):
"""
Initialize actor pool.
Args:
actors (list): List of actor handles
"""
def map(self, fn, values):
"""
Map function over values using actor pool.
Args:
fn: Function to apply
values: Values to process
Yields:
Results from function application
"""
def map_unordered(self, fn, values):
"""
Map function over values, yielding results as they complete.
Args:
fn: Function to apply
values: Values to process
Yields:
Results in completion order
"""
def submit(self, fn, value):
"""
Submit task to actor pool.
Args:
fn: Function to apply
value: Value to process
Returns:
ObjectRef: Result reference
"""
def get_next(self, timeout=None):
"""
Get next completed result.
Args:
timeout (float, optional): Timeout in seconds
Returns:
tuple: (actor_index, result)
"""
def get_next_unordered(self, timeout=None):
"""
Get next completed result without order guarantee.
Args:
timeout (float, optional): Timeout in seconds
Returns:
Result value
"""
def has_next(self):
"""
Check if there are pending results.
Returns:
bool: True if there are pending results
"""
def get_submitter(self):
"""
Get submitter for async task submission.
Returns:
PoolTaskSubmitter: Task submitter
"""Tools for debugging and profiling Ray applications.
def get_dashboard_url():
"""
Get Ray dashboard URL.
Returns:
str: Dashboard URL
"""
def timeline(filename=None):
"""
Get or save Ray timeline for profiling.
Args:
filename (str, optional): File to save timeline to
Returns:
list: Timeline events if no filename provided
"""
def print_timeline(filename=None):
"""
Print Ray timeline information.
Args:
filename (str, optional): Timeline file to load
"""
class profiling:
"""Context manager for Ray profiling."""
def __init__(self, span_name=None):
"""
Initialize profiling context.
Args:
span_name (str, optional): Name for profiling span
"""
def __enter__(self):
"""Enter profiling context."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit profiling context."""
def get_node_ip_address():
"""
Get IP address of current Ray node.
Returns:
str: Node IP address
"""
def get_webui_url():
"""
Get Ray web UI URL.
Returns:
str: Web UI URL
"""Inspect running tasks and actors.
def list_tasks(*, filters=None):
"""
List running tasks.
Args:
filters (list, optional): List of filters to apply
Returns:
dict: Task information
"""
def list_actors(*, filters=None):
"""
List running actors.
Args:
filters (list, optional): List of filters to apply
Returns:
dict: Actor information
"""
def list_objects(*, filters=None):
"""
List objects in object store.
Args:
filters (list, optional): List of filters to apply
Returns:
dict: Object information
"""
def summarize_tasks():
"""
Get summary of tasks.
Returns:
dict: Task summary
"""
def summarize_objects():
"""
Get summary of objects.
Returns:
dict: Object summary
"""Track progress of Ray operations.
class ProgressBar:
"""Progress bar for Ray operations."""
def __init__(self, total, title="", unit="it", position=0):
"""
Initialize progress bar.
Args:
total (int): Total number of items
title (str): Progress bar title
unit (str): Unit of measurement
position (int): Position for multiple progress bars
"""
def block_until_complete(self, object_refs):
"""
Block until object references are complete, showing progress.
Args:
object_refs (list): List of object references
Returns:
list: Results
"""
def fetch_until_complete(self, object_refs):
"""
Fetch results as they complete, showing progress.
Args:
object_refs (list): List of object references
Yields:
Results as they complete
"""
def set_description(self, description):
"""
Set progress bar description.
Args:
description (str): New description
"""
def update(self, n=1):
"""
Update progress by n items.
Args:
n (int): Number of items completed
"""
def close(self):
"""Close progress bar."""Integration with Python multiprocessing.
class Pool:
"""Ray-based replacement for multiprocessing.Pool."""
def __init__(self, processes=None, ray_remote_args=None):
"""
Initialize Ray pool.
Args:
processes (int, optional): Number of worker processes
ray_remote_args (dict, optional): Ray remote arguments
"""
def map(self, func, iterable, chunksize=None):
"""
Map function over iterable.
Args:
func: Function to apply
iterable: Items to process
chunksize (int, optional): Chunk size for batching
Returns:
list: Results
"""
def map_async(self, func, iterable, chunksize=None, callback=None,
error_callback=None):
"""
Asynchronously map function over iterable.
Args:
func: Function to apply
iterable: Items to process
chunksize (int, optional): Chunk size
callback: Success callback
error_callback: Error callback
Returns:
AsyncResult: Async result handle
"""
def imap(self, func, iterable, chunksize=1):
"""
Lazily map function over iterable.
Args:
func: Function to apply
iterable: Items to process
chunksize (int): Chunk size
Returns:
Iterator: Result iterator
"""
def imap_unordered(self, func, iterable, chunksize=1):
"""
Lazily map function, yielding results in completion order.
Args:
func: Function to apply
iterable: Items to process
chunksize (int): Chunk size
Returns:
Iterator: Result iterator
"""
def starmap(self, func, iterable, chunksize=None):
"""
Map function over iterable of argument tuples.
Args:
func: Function to apply
iterable: Tuples of arguments
chunksize (int, optional): Chunk size
Returns:
list: Results
"""
def apply(self, func, args=(), kwds={}):
"""
Apply function with arguments.
Args:
func: Function to apply
args (tuple): Positional arguments
kwds (dict): Keyword arguments
Returns:
Result value
"""
def apply_async(self, func, args=(), kwds={}, callback=None,
error_callback=None):
"""
Asynchronously apply function.
Args:
func: Function to apply
args (tuple): Positional arguments
kwds (dict): Keyword arguments
callback: Success callback
error_callback: Error callback
Returns:
AsyncResult: Async result handle
"""
def close(self):
"""Close pool."""
def terminate(self):
"""Terminate pool."""
def join(self):
"""Wait for workers to exit."""Manage runtime environments for isolation.
class RuntimeEnv:
"""Runtime environment specification."""
def __init__(self, *, py_modules=None, working_dir=None, pip=None,
conda=None, env_vars=None, container=None,
excludes=None, _validate=True):
"""
Initialize runtime environment.
Args:
py_modules (list, optional): Python modules to include
working_dir (str, optional): Working directory
pip (list/str, optional): Pip requirements
conda (str/dict, optional): Conda environment specification
env_vars (dict, optional): Environment variables
container (dict, optional): Container specification
excludes (list, optional): Files/patterns to exclude
_validate (bool): Whether to validate specification
"""
def runtime_env_context_manager(runtime_env):
"""
Context manager for runtime environment.
Args:
runtime_env (dict/RuntimeEnv): Runtime environment specification
Returns:
Context manager for runtime environment
"""Advanced resource allocation and management.
def get_current_node_resource_key():
"""
Get resource key for current node.
Returns:
str: Node resource key
"""
def list_named_actors(*, all_namespaces=False):
"""
List named actors.
Args:
all_namespaces (bool): Whether to include all namespaces
Returns:
list: Named actor information
"""
class Accelerator:
"""Accelerator resource specification."""
def __init__(self, accelerator_type, num=None):
"""
Initialize accelerator specification.
Args:
accelerator_type (str): Type of accelerator
num (int, optional): Number of accelerators
"""Distributed communication operations for multi-GPU and multi-node training.
def init_collective_group(world_size, rank, backend="nccl", group_name="default"):
"""
Initialize collective communication group.
Args:
world_size (int): Total number of processes
rank (int): Rank of current process
backend (str): Communication backend ("nccl", "gloo")
group_name (str): Name of communication group
"""
def destroy_collective_group(group_name="default"):
"""
Destroy collective communication group.
Args:
group_name (str): Name of group to destroy
"""
def allreduce(tensor, group_name="default", op="SUM"):
"""
All-reduce operation across all processes.
Args:
tensor: Input tensor to reduce
group_name (str): Communication group name
op (str): Reduction operation ("SUM", "PRODUCT", "MIN", "MAX")
Returns:
Reduced tensor
"""
def broadcast(tensor, src_rank, group_name="default"):
"""
Broadcast tensor from source to all processes.
Args:
tensor: Tensor to broadcast
src_rank (int): Source rank for broadcast
group_name (str): Communication group name
Returns:
Broadcasted tensor
"""
def allgather(tensor, group_name="default"):
"""
All-gather operation to collect tensors from all processes.
Args:
tensor: Input tensor
group_name (str): Communication group name
Returns:
List of tensors from all processes
"""
def barrier(group_name="default"):
"""
Synchronization barrier for all processes.
Args:
group_name (str): Communication group name
"""
def get_rank(group_name="default"):
"""
Get rank of current process in group.
Args:
group_name (str): Communication group name
Returns:
int: Current process rank
"""
def get_world_size(group_name="default"):
"""
Get world size of communication group.
Args:
group_name (str): Communication group name
Returns:
int: Total number of processes
"""
def allreduce_multigpu(tensor_list, group_name="default", op="SUM"):
"""
Multi-GPU all-reduce operation.
Args:
tensor_list (list): List of tensors (one per GPU)
group_name (str): Communication group name
op (str): Reduction operation
Returns:
List of reduced tensors
"""
def broadcast_multigpu(tensor_list, src_rank, group_name="default"):
"""
Multi-GPU broadcast operation.
Args:
tensor_list (list): List of tensors (one per GPU)
src_rank (int): Source rank for broadcast
group_name (str): Communication group name
Returns:
List of broadcasted tensors
"""import ray
ray.init()
# Create placement group with co-located resources
pg = ray.util.placement_group([
{"CPU": 2, "GPU": 1}, # Bundle 0
{"CPU": 2, "GPU": 1}, # Bundle 1
{"CPU": 4} # Bundle 2
], strategy="PACK")
# Wait for placement group to be ready
ray.get(pg.ready())
# Use placement group for actor creation
@ray.remote(num_cpus=2, num_gpus=1)
class GPUActor:
def train_model(self):
return "Training on GPU"
# Create actors in specific bundles
actor1 = GPUActor.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_bundle_index=0
)
).remote()
actor2 = GPUActor.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_bundle_index=1
)
).remote()
# Use the actors
results = ray.get([
actor1.train_model.remote(),
actor2.train_model.remote()
])
print(results)
# Clean up
ray.util.remove_placement_group(pg)
ray.shutdown()import ray
from ray.util import ActorPool
ray.init()
@ray.remote
class Worker:
def __init__(self, worker_id):
self.worker_id = worker_id
def process(self, item):
# Simulate processing
import time
time.sleep(1)
return f"Worker {self.worker_id} processed {item}"
# Create workers
workers = [Worker.remote(i) for i in range(4)]
# Create actor pool
pool = ActorPool(workers)
# Process items using the pool
items = list(range(20))
results = list(pool.map(lambda w, item: w.process.remote(item), items))
print(f"Processed {len(results)} items")
ray.shutdown()import ray
from ray.experimental import ProgressBar
import time
ray.init()
@ray.remote
def slow_task(i):
time.sleep(2)
return i ** 2
# Create tasks
num_tasks = 10
pb = ProgressBar(num_tasks, title="Processing")
# Submit tasks and track progress
tasks = [slow_task.remote(i) for i in range(num_tasks)]
results = pb.block_until_complete(tasks)
print(f"Results: {results}")
pb.close()
ray.shutdown()import ray
from ray.util.multiprocessing import Pool
def square(x):
return x ** 2
ray.init()
# Use Ray pool instead of multiprocessing.Pool
with Pool() as pool:
results = pool.map(square, range(10))
print(f"Squares: {results}")
ray.shutdown()import ray
ray.init()
@ray.remote
def monitored_task(x):
with ray.profiling.profile("computation"):
# Some computation
result = sum(range(x))
return result
# Submit tasks
tasks = [monitored_task.remote(1000) for _ in range(5)]
results = ray.get(tasks)
# Get debugging information
print("Dashboard URL:", ray.get_dashboard_url())
print("Node IP:", ray.get_node_ip_address())
# List running tasks and actors
print("Tasks:", ray.util.list_tasks())
print("Actors:", ray.util.list_actors())
# Get timeline for profiling
timeline_data = ray.timeline()
print(f"Timeline has {len(timeline_data)} events")
ray.shutdown()import ray
# Define runtime environment
runtime_env = {
"pip": ["numpy==1.21.0", "pandas==1.3.0"],
"env_vars": {"MY_ENV_VAR": "value"},
"working_dir": "./my_project"
}
ray.init()
@ray.remote(runtime_env=runtime_env)
def task_with_runtime_env():
import numpy as np
import pandas as pd
import os
return {
"numpy_version": np.__version__,
"pandas_version": pd.__version__,
"env_var": os.environ.get("MY_ENV_VAR")
}
result = ray.get(task_with_runtime_env.remote())
print("Task result:", result)
ray.shutdown()import ray
ray.init()
# Define custom resource requirements
@ray.remote(resources={"custom_resource": 1})
class CustomResourceActor:
def process(self):
return "Processing with custom resource"
# Get current node resources
print("Available resources:", ray.available_resources())
print("Cluster resources:", ray.cluster_resources())
# Create actor with custom resources (will wait for resources)
try:
actor = CustomResourceActor.remote()
result = ray.get(actor.process.remote(), timeout=5)
print(result)
except ray.exceptions.RayTimeoutError:
print("Task timed out - custom resource not available")
ray.shutdown()Install with Tessl CLI
npx tessl i tessl/pypi-ray