Ray is a unified framework for scaling AI and Python applications.
—
Core Ray functionality for distributed task execution, actor management, and object storage. This provides the foundation for all Ray applications, enabling parallel and distributed execution of Python code with minimal modifications.
Initialize Ray runtime and manage cluster connections.
def init(address=None, num_cpus=None, num_gpus=None, memory=None,
object_store_memory=None, resources=None, runtime_env=None,
logging_level="INFO", log_to_driver=True, configure_logging=True,
**kwargs):
"""
Initialize Ray runtime.
Args:
address (str, optional): Ray cluster address to connect to
num_cpus (int, optional): Number of CPUs to use
num_gpus (int, optional): Number of GPUs to use
memory (int, optional): Amount of memory to use in bytes
object_store_memory (int, optional): Object store memory in bytes
resources (dict, optional): Custom resources dictionary
runtime_env (dict, optional): Runtime environment configuration
logging_level (str): Logging level ("DEBUG", "INFO", "WARNING", "ERROR")
log_to_driver (bool): Whether to log to driver
configure_logging (bool): Whether to configure logging
Returns:
ray.ClientContext: Ray context information
"""
def is_initialized():
"""Check if Ray is initialized."""
def shutdown():
"""Shutdown Ray runtime and cleanup resources."""
def get_runtime_context():
"""Get current Ray runtime context information."""Execute functions remotely as distributed tasks.
def remote(num_cpus=None, num_gpus=None, memory=None, resources=None,
max_calls=None, max_retries=3, retry_exceptions=False,
runtime_env=None, **kwargs):
"""
Decorator to make functions executable as Ray tasks.
Args:
num_cpus (float, optional): Number of CPUs required
num_gpus (float, optional): Number of GPUs required
memory (int, optional): Memory required in MB
resources (dict, optional): Custom resources required
max_calls (int, optional): Maximum calls before actor restart
max_retries (int): Maximum number of retries on failure
retry_exceptions (bool/list): Exceptions to retry on
runtime_env (dict, optional): Runtime environment
Returns:
RemoteFunction: Ray remote function
"""Manage objects in Ray's distributed object store.
def put(value, _owner=None):
"""
Store object in Ray object store.
Args:
value: Object to store
_owner (ActorHandle, optional): Actor that owns this object
Returns:
ObjectRef: Reference to stored object
"""
def get(object_refs, timeout=None):
"""
Retrieve objects from object store.
Args:
object_refs (ObjectRef/list): Object references to retrieve
timeout (float, optional): Timeout in seconds
Returns:
Object or list of objects
"""
def wait(object_refs, num_returns=1, timeout=None, fetch_local=True):
"""
Wait for subset of object references to become ready.
Args:
object_refs (list): List of object references
num_returns (int): Number of objects to wait for
timeout (float, optional): Timeout in seconds
fetch_local (bool): Whether to fetch objects locally
Returns:
tuple: (ready_refs, remaining_refs)
"""Create and manage stateful distributed actors.
def get_actor(name, namespace=None):
"""
Get handle to named actor.
Args:
name (str): Actor name
namespace (str, optional): Actor namespace
Returns:
ActorHandle: Handle to the actor
"""
def kill(actor, no_restart=True):
"""
Kill an actor, task, or placement group.
Args:
actor: Actor handle, task, or placement group to kill
no_restart (bool): Whether to prevent actor restart
"""
def cancel(object_ref, force=False, recursive=True):
"""
Cancel task execution.
Args:
object_ref (ObjectRef): Task to cancel
force (bool): Force cancellation
recursive (bool): Cancel dependent tasks
Returns:
bool: True if task was cancelled
"""Monitor cluster state and resource usage.
def cluster_resources():
"""
Get total cluster resources.
Returns:
dict: Resource counts by type
"""
def available_resources():
"""
Get available cluster resources.
Returns:
dict: Available resource counts by type
"""
def nodes():
"""
Get information about cluster nodes.
Returns:
list: List of node information dictionaries
"""
def timeline():
"""
Get Ray timeline for profiling and debugging.
Returns:
list: Timeline events
"""
def get_gpu_ids():
"""
Get GPU IDs visible to current worker.
Returns:
list: List of GPU IDs
"""Support for Java and C++ integration.
def java_function(class_name: str, function_name: str):
"""
Create Ray remote function from Java class.
Args:
class_name (str): Java class name
function_name (str): Java function name
Returns:
JavaFunction: Ray Java function
"""
def cpp_function(function_name: str):
"""
Create Ray remote function from C++ function.
Args:
function_name (str): C++ function name
Returns:
CppFunction: Ray C++ function
"""
def java_actor_class(class_name):
"""
Create Ray actor class from Java class.
Args:
class_name (str): Java class name
Returns:
JavaActorClass: Ray Java actor class
"""Display information in Ray dashboard and debugging utilities.
def show_in_dashboard(message: str, key: str = "", dtype: str = "text"):
"""
Display message in Ray dashboard.
Args:
message (str): Message to be displayed
key (str): Key name for the message
dtype (str): Message type ("text" or "html")
"""Configure Ray runtime behavior.
class LoggingConfig:
"""Ray logging configuration."""
def __init__(self, encoding="TEXT", log_level="INFO", logs_dir=None,
enable_default_setup=True):
"""
Initialize logging configuration.
Args:
encoding (str): Log encoding ("TEXT" or "JSON")
log_level (str): Log level
logs_dir (str, optional): Directory for log files
enable_default_setup (bool): Enable default logging setup
"""# Core Object Types
class ObjectRef:
"""Reference to object in Ray object store."""
class ObjectRefGenerator:
"""Generator for streaming object references."""
class DynamicObjectRefGenerator:
"""Dynamic generator for object references."""
# ID Types
class ActorID:
"""Unique identifier for Ray actor."""
class TaskID:
"""Unique identifier for Ray task."""
class JobID:
"""Unique identifier for Ray job."""
class NodeID:
"""Unique identifier for Ray node."""
class WorkerID:
"""Unique identifier for Ray worker."""
class FunctionID:
"""Unique identifier for Ray function."""
class UniqueID:
"""Base class for Ray unique identifiers."""
class PlacementGroupID:
"""Unique identifier for placement group."""
class ClusterID:
"""Unique identifier for Ray cluster."""
class ActorClassID:
"""Unique identifier for Ray actor class."""
# Language Support
class Language:
"""Supported programming languages in Ray."""
PYTHON = "PYTHON"
JAVA = "JAVA"
CPP = "CPP"
# Mode Constants
LOCAL_MODE = "LOCAL_MODE"
SCRIPT_MODE = "SCRIPT_MODE"
WORKER_MODE = "WORKER_MODE"import ray
ray.init()
@ray.remote
def square(x):
return x * x
# Execute task
future = square.remote(4)
result = ray.get(future)
print(result) # 16
ray.shutdown()import ray
ray.init()
@ray.remote
class Counter:
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
return self.count
def get_count(self):
return self.count
# Create actor
counter = Counter.remote()
# Call actor methods
ray.get(counter.increment.remote())
count = ray.get(counter.get_count.remote())
print(count) # 1
ray.shutdown()import ray
import time
ray.init()
@ray.remote
def slow_function(i):
time.sleep(1)
return i * i
# Execute in parallel
futures = [slow_function.remote(i) for i in range(4)]
results = ray.get(futures)
print(results) # [0, 1, 4, 9]
ray.shutdown()Install with Tessl CLI
npx tessl i tessl/pypi-ray