CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ray

Ray is a unified framework for scaling AI and Python applications.

Pending
Overview
Eval results
Files

utilities-advanced.mddocs/

Utilities and Advanced Features

Ray provides utility functions, placement groups, debugging tools, actor pools, and advanced distributed computing features for complex distributed applications and optimized resource management.

Capabilities

Placement Groups

Advanced resource management and co-location of tasks and actors.

def placement_group(bundles, *, strategy="PACK", name="", lifetime=None):
    """
    Create placement group for resource co-location.
    
    Args:
        bundles (list): List of resource bundle dictionaries
        strategy (str): Placement strategy ("PACK", "SPREAD", "STRICT_PACK", "STRICT_SPREAD")
        name (str, optional): Placement group name
        lifetime (str, optional): Lifetime policy ("detached" or None)
    
    Returns:
        PlacementGroup: Placement group handle
    """

def get_placement_group(name):
    """
    Get existing placement group by name.
    
    Args:
        name (str): Placement group name
    
    Returns:
        PlacementGroup: Placement group handle
    """

def remove_placement_group(placement_group):
    """
    Remove placement group.
    
    Args:
        placement_group (PlacementGroup): Placement group to remove
    
    Returns:
        bool: True if removal was successful
    """

def list_placement_groups(*, filter_state=None):
    """
    List all placement groups.
    
    Args:
        filter_state (str, optional): Filter by state
    
    Returns:
        list: List of placement group information
    """

class PlacementGroup:
    """Handle for placement group."""
    
    def ready(self):
        """
        Check if placement group is ready.
        
        Returns:
            ObjectRef: Object reference that becomes ready when PG is ready
        """
    
    @property
    def bundle_count(self):
        """Number of bundles in placement group."""
    
    @property
    def id(self):
        """Placement group ID."""
    
    def wait(self, timeout_seconds=None):
        """
        Wait for placement group to be ready.
        
        Args:
            timeout_seconds (float, optional): Timeout in seconds
        
        Returns:
            bool: True if ready, False if timed out
        """

class PlacementGroupSchedulingStrategy:
    """Scheduling strategy for placement group."""
    
    def __init__(self, placement_group, placement_group_bundle_index=None,
                 placement_group_capture_child_tasks=None):
        """
        Initialize placement group scheduling strategy.
        
        Args:
            placement_group (PlacementGroup): Placement group
            placement_group_bundle_index (int, optional): Bundle index
            placement_group_capture_child_tasks (bool, optional): Capture child tasks
        """

Actor Pools

Manage pools of actors for load balancing and resource efficiency.

class ActorPool:
    """Pool of actors for load balancing."""
    
    def __init__(self, actors):
        """
        Initialize actor pool.
        
        Args:
            actors (list): List of actor handles
        """
    
    def map(self, fn, values):
        """
        Map function over values using actor pool.
        
        Args:
            fn: Function to apply
            values: Values to process
        
        Yields:
            Results from function application
        """
    
    def map_unordered(self, fn, values):
        """
        Map function over values, yielding results as they complete.
        
        Args:
            fn: Function to apply
            values: Values to process
        
        Yields:
            Results in completion order
        """
    
    def submit(self, fn, value):
        """
        Submit task to actor pool.
        
        Args:
            fn: Function to apply
            value: Value to process
        
        Returns:
            ObjectRef: Result reference
        """
    
    def get_next(self, timeout=None):
        """
        Get next completed result.
        
        Args:
            timeout (float, optional): Timeout in seconds
        
        Returns:
            tuple: (actor_index, result)
        """
    
    def get_next_unordered(self, timeout=None):
        """
        Get next completed result without order guarantee.
        
        Args:
            timeout (float, optional): Timeout in seconds
        
        Returns:
            Result value
        """
    
    def has_next(self):
        """
        Check if there are pending results.
        
        Returns:
            bool: True if there are pending results
        """
    
    def get_submitter(self):
        """
        Get submitter for async task submission.
        
        Returns:
            PoolTaskSubmitter: Task submitter
        """

Debugging and Profiling

Tools for debugging and profiling Ray applications.

def get_dashboard_url():
    """
    Get Ray dashboard URL.
    
    Returns:
        str: Dashboard URL
    """

def timeline(filename=None):
    """
    Get or save Ray timeline for profiling.
    
    Args:
        filename (str, optional): File to save timeline to
    
    Returns:
        list: Timeline events if no filename provided
    """

def print_timeline(filename=None):
    """
    Print Ray timeline information.
    
    Args:
        filename (str, optional): Timeline file to load
    """

class profiling:
    """Context manager for Ray profiling."""
    
    def __init__(self, span_name=None):
        """
        Initialize profiling context.
        
        Args:
            span_name (str, optional): Name for profiling span
        """
    
    def __enter__(self):
        """Enter profiling context."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit profiling context."""

def get_node_ip_address():
    """
    Get IP address of current Ray node.
    
    Returns:
        str: Node IP address
    """

def get_webui_url():
    """
    Get Ray web UI URL.
    
    Returns:
        str: Web UI URL
    """

Task and Actor Introspection

Inspect running tasks and actors.

def list_tasks(*, filters=None):
    """
    List running tasks.
    
    Args:
        filters (list, optional): List of filters to apply
    
    Returns:
        dict: Task information
    """

def list_actors(*, filters=None):
    """
    List running actors.
    
    Args:
        filters (list, optional): List of filters to apply
    
    Returns:
        dict: Actor information
    """

def list_objects(*, filters=None):
    """
    List objects in object store.
    
    Args:
        filters (list, optional): List of filters to apply
    
    Returns:
        dict: Object information
    """

def summarize_tasks():
    """
    Get summary of tasks.
    
    Returns:
        dict: Task summary
    """

def summarize_objects():
    """
    Get summary of objects.
    
    Returns:
        dict: Object summary
    """

Progress Tracking

Track progress of Ray operations.

class ProgressBar:
    """Progress bar for Ray operations."""
    
    def __init__(self, total, title="", unit="it", position=0):
        """
        Initialize progress bar.
        
        Args:
            total (int): Total number of items
            title (str): Progress bar title
            unit (str): Unit of measurement
            position (int): Position for multiple progress bars
        """
    
    def block_until_complete(self, object_refs):
        """
        Block until object references are complete, showing progress.
        
        Args:
            object_refs (list): List of object references
        
        Returns:
            list: Results
        """
    
    def fetch_until_complete(self, object_refs):
        """
        Fetch results as they complete, showing progress.
        
        Args:
            object_refs (list): List of object references
        
        Yields:
            Results as they complete
        """
    
    def set_description(self, description):
        """
        Set progress bar description.
        
        Args:
            description (str): New description
        """
    
    def update(self, n=1):
        """
        Update progress by n items.
        
        Args:
            n (int): Number of items completed
        """
    
    def close(self):
        """Close progress bar."""

Multiprocessing Integration

Integration with Python multiprocessing.

class Pool:
    """Ray-based replacement for multiprocessing.Pool."""
    
    def __init__(self, processes=None, ray_remote_args=None):
        """
        Initialize Ray pool.
        
        Args:
            processes (int, optional): Number of worker processes
            ray_remote_args (dict, optional): Ray remote arguments
        """
    
    def map(self, func, iterable, chunksize=None):
        """
        Map function over iterable.
        
        Args:
            func: Function to apply
            iterable: Items to process
            chunksize (int, optional): Chunk size for batching
        
        Returns:
            list: Results
        """
    
    def map_async(self, func, iterable, chunksize=None, callback=None,
                  error_callback=None):
        """
        Asynchronously map function over iterable.
        
        Args:
            func: Function to apply
            iterable: Items to process
            chunksize (int, optional): Chunk size
            callback: Success callback
            error_callback: Error callback
        
        Returns:
            AsyncResult: Async result handle
        """
    
    def imap(self, func, iterable, chunksize=1):
        """
        Lazily map function over iterable.
        
        Args:
            func: Function to apply
            iterable: Items to process
            chunksize (int): Chunk size
        
        Returns:
            Iterator: Result iterator
        """
    
    def imap_unordered(self, func, iterable, chunksize=1):
        """
        Lazily map function, yielding results in completion order.
        
        Args:
            func: Function to apply
            iterable: Items to process
            chunksize (int): Chunk size
        
        Returns:
            Iterator: Result iterator
        """
    
    def starmap(self, func, iterable, chunksize=None):
        """
        Map function over iterable of argument tuples.
        
        Args:
            func: Function to apply
            iterable: Tuples of arguments
            chunksize (int, optional): Chunk size
        
        Returns:
            list: Results
        """
    
    def apply(self, func, args=(), kwds={}):
        """
        Apply function with arguments.
        
        Args:
            func: Function to apply
            args (tuple): Positional arguments
            kwds (dict): Keyword arguments
        
        Returns:
            Result value
        """
    
    def apply_async(self, func, args=(), kwds={}, callback=None,
                   error_callback=None):
        """
        Asynchronously apply function.
        
        Args:
            func: Function to apply
            args (tuple): Positional arguments
            kwds (dict): Keyword arguments
            callback: Success callback
            error_callback: Error callback
        
        Returns:
            AsyncResult: Async result handle
        """
    
    def close(self):
        """Close pool."""
    
    def terminate(self):
        """Terminate pool."""
    
    def join(self):
        """Wait for workers to exit."""

Runtime Environment

Manage runtime environments for isolation.

class RuntimeEnv:
    """Runtime environment specification."""
    
    def __init__(self, *, py_modules=None, working_dir=None, pip=None,
                 conda=None, env_vars=None, container=None, 
                 excludes=None, _validate=True):
        """
        Initialize runtime environment.
        
        Args:
            py_modules (list, optional): Python modules to include
            working_dir (str, optional): Working directory
            pip (list/str, optional): Pip requirements
            conda (str/dict, optional): Conda environment specification
            env_vars (dict, optional): Environment variables
            container (dict, optional): Container specification
            excludes (list, optional): Files/patterns to exclude
            _validate (bool): Whether to validate specification
        """

def runtime_env_context_manager(runtime_env):
    """
    Context manager for runtime environment.
    
    Args:
        runtime_env (dict/RuntimeEnv): Runtime environment specification
    
    Returns:
        Context manager for runtime environment
    """

Advanced Resource Management

Advanced resource allocation and management.

def get_current_node_resource_key():
    """
    Get resource key for current node.
    
    Returns:
        str: Node resource key
    """

def list_named_actors(*, all_namespaces=False):
    """
    List named actors.
    
    Args:
        all_namespaces (bool): Whether to include all namespaces
    
    Returns:
        list: Named actor information
    """

class Accelerator:
    """Accelerator resource specification."""
    
    def __init__(self, accelerator_type, num=None):
        """
        Initialize accelerator specification.
        
        Args:
            accelerator_type (str): Type of accelerator
            num (int, optional): Number of accelerators
        """

Collective Communications

Distributed communication operations for multi-GPU and multi-node training.

def init_collective_group(world_size, rank, backend="nccl", group_name="default"):
    """
    Initialize collective communication group.
    
    Args:
        world_size (int): Total number of processes
        rank (int): Rank of current process  
        backend (str): Communication backend ("nccl", "gloo")
        group_name (str): Name of communication group
    """

def destroy_collective_group(group_name="default"):
    """
    Destroy collective communication group.
    
    Args:
        group_name (str): Name of group to destroy
    """

def allreduce(tensor, group_name="default", op="SUM"):
    """
    All-reduce operation across all processes.
    
    Args:
        tensor: Input tensor to reduce
        group_name (str): Communication group name
        op (str): Reduction operation ("SUM", "PRODUCT", "MIN", "MAX")
    
    Returns:
        Reduced tensor
    """

def broadcast(tensor, src_rank, group_name="default"):
    """
    Broadcast tensor from source to all processes.
    
    Args:
        tensor: Tensor to broadcast
        src_rank (int): Source rank for broadcast
        group_name (str): Communication group name
    
    Returns:
        Broadcasted tensor
    """

def allgather(tensor, group_name="default"):
    """
    All-gather operation to collect tensors from all processes.
    
    Args:
        tensor: Input tensor
        group_name (str): Communication group name
    
    Returns:
        List of tensors from all processes
    """

def barrier(group_name="default"):
    """
    Synchronization barrier for all processes.
    
    Args:
        group_name (str): Communication group name
    """

def get_rank(group_name="default"):
    """
    Get rank of current process in group.
    
    Args:
        group_name (str): Communication group name
    
    Returns:
        int: Current process rank
    """

def get_world_size(group_name="default"):
    """
    Get world size of communication group.
    
    Args:
        group_name (str): Communication group name
    
    Returns:
        int: Total number of processes
    """

def allreduce_multigpu(tensor_list, group_name="default", op="SUM"):
    """
    Multi-GPU all-reduce operation.
    
    Args:
        tensor_list (list): List of tensors (one per GPU)
        group_name (str): Communication group name
        op (str): Reduction operation
    
    Returns:
        List of reduced tensors
    """

def broadcast_multigpu(tensor_list, src_rank, group_name="default"):
    """
    Multi-GPU broadcast operation.
    
    Args:
        tensor_list (list): List of tensors (one per GPU)
        src_rank (int): Source rank for broadcast
        group_name (str): Communication group name
    
    Returns:
        List of broadcasted tensors
    """

Usage Examples

Placement Groups Example

import ray

ray.init()

# Create placement group with co-located resources
pg = ray.util.placement_group([
    {"CPU": 2, "GPU": 1},  # Bundle 0
    {"CPU": 2, "GPU": 1},  # Bundle 1
    {"CPU": 4}             # Bundle 2
], strategy="PACK")

# Wait for placement group to be ready
ray.get(pg.ready())

# Use placement group for actor creation
@ray.remote(num_cpus=2, num_gpus=1)
class GPUActor:
    def train_model(self):
        return "Training on GPU"

# Create actors in specific bundles
actor1 = GPUActor.options(
    scheduling_strategy=PlacementGroupSchedulingStrategy(
        placement_group=pg,
        placement_group_bundle_index=0
    )
).remote()

actor2 = GPUActor.options(
    scheduling_strategy=PlacementGroupSchedulingStrategy(
        placement_group=pg,
        placement_group_bundle_index=1
    )
).remote()

# Use the actors
results = ray.get([
    actor1.train_model.remote(),
    actor2.train_model.remote()
])
print(results)

# Clean up
ray.util.remove_placement_group(pg)
ray.shutdown()

Actor Pool Example

import ray
from ray.util import ActorPool

ray.init()

@ray.remote
class Worker:
    def __init__(self, worker_id):
        self.worker_id = worker_id
    
    def process(self, item):
        # Simulate processing
        import time
        time.sleep(1)
        return f"Worker {self.worker_id} processed {item}"

# Create workers
workers = [Worker.remote(i) for i in range(4)]

# Create actor pool
pool = ActorPool(workers)

# Process items using the pool
items = list(range(20))
results = list(pool.map(lambda w, item: w.process.remote(item), items))

print(f"Processed {len(results)} items")

ray.shutdown()

Progress Tracking Example

import ray
from ray.experimental import ProgressBar
import time

ray.init()

@ray.remote
def slow_task(i):
    time.sleep(2)
    return i ** 2

# Create tasks
num_tasks = 10
pb = ProgressBar(num_tasks, title="Processing")

# Submit tasks and track progress
tasks = [slow_task.remote(i) for i in range(num_tasks)]
results = pb.block_until_complete(tasks)

print(f"Results: {results}")
pb.close()

ray.shutdown()

Multiprocessing Pool Example

import ray
from ray.util.multiprocessing import Pool

def square(x):
    return x ** 2

ray.init()

# Use Ray pool instead of multiprocessing.Pool
with Pool() as pool:
    results = pool.map(square, range(10))
    print(f"Squares: {results}")

ray.shutdown()

Debugging and Monitoring Example

import ray

ray.init()

@ray.remote
def monitored_task(x):
    with ray.profiling.profile("computation"):
        # Some computation
        result = sum(range(x))
    return result

# Submit tasks
tasks = [monitored_task.remote(1000) for _ in range(5)]
results = ray.get(tasks)

# Get debugging information
print("Dashboard URL:", ray.get_dashboard_url())
print("Node IP:", ray.get_node_ip_address())

# List running tasks and actors
print("Tasks:", ray.util.list_tasks())
print("Actors:", ray.util.list_actors())

# Get timeline for profiling
timeline_data = ray.timeline()
print(f"Timeline has {len(timeline_data)} events")

ray.shutdown()

Runtime Environment Example

import ray

# Define runtime environment
runtime_env = {
    "pip": ["numpy==1.21.0", "pandas==1.3.0"],
    "env_vars": {"MY_ENV_VAR": "value"},
    "working_dir": "./my_project"
}

ray.init()

@ray.remote(runtime_env=runtime_env)
def task_with_runtime_env():
    import numpy as np
    import pandas as pd
    import os
    
    return {
        "numpy_version": np.__version__,
        "pandas_version": pd.__version__,
        "env_var": os.environ.get("MY_ENV_VAR")
    }

result = ray.get(task_with_runtime_env.remote())
print("Task result:", result)

ray.shutdown()

Advanced Resource Management

import ray

ray.init()

# Define custom resource requirements
@ray.remote(resources={"custom_resource": 1})
class CustomResourceActor:
    def process(self):
        return "Processing with custom resource"

# Get current node resources
print("Available resources:", ray.available_resources())
print("Cluster resources:", ray.cluster_resources())

# Create actor with custom resources (will wait for resources)
try:
    actor = CustomResourceActor.remote()
    result = ray.get(actor.process.remote(), timeout=5)
    print(result)
except ray.exceptions.RayTimeoutError:
    print("Task timed out - custom resource not available")

ray.shutdown()

Install with Tessl CLI

npx tessl i tessl/pypi-ray

docs

core-distributed.md

data-processing.md

distributed-training.md

hyperparameter-tuning.md

index.md

model-serving.md

reinforcement-learning.md

utilities-advanced.md

tile.json