Ray is a unified framework for scaling AI and Python applications.
—
Ray Data provides distributed data processing capabilities for ML workloads. It offers scalable dataset operations, transformations, and integrations with ML frameworks and storage systems.
Create datasets from various data sources.
def read_parquet(paths, *, filesystem=None, columns=None, **kwargs):
"""
Read Parquet files into a Dataset.
Args:
paths (str/list): Path(s) to Parquet files
filesystem: Filesystem to use
columns (list, optional): Columns to read
Returns:
Dataset: Ray Dataset
"""
def read_csv(paths, *, filesystem=None, **kwargs):
"""
Read CSV files into a Dataset.
Args:
paths (str/list): Path(s) to CSV files
filesystem: Filesystem to use
**kwargs: Additional CSV reading options
Returns:
Dataset: Ray Dataset
"""
def read_json(paths, *, filesystem=None, **kwargs):
"""
Read JSON files into a Dataset.
Args:
paths (str/list): Path(s) to JSON files
filesystem: Filesystem to use
**kwargs: Additional JSON reading options
Returns:
Dataset: Ray Dataset
"""
def read_text(paths, *, encoding="utf-8", **kwargs):
"""
Read text files into a Dataset.
Args:
paths (str/list): Path(s) to text files
encoding (str): Text encoding
Returns:
Dataset: Ray Dataset
"""
def read_binary_files(paths, *, include_paths=False, **kwargs):
"""
Read binary files into a Dataset.
Args:
paths (str/list): Path(s) to binary files
include_paths (bool): Include file paths in output
Returns:
Dataset: Ray Dataset
"""
def read_images(paths, *, mode="RGB", **kwargs):
"""
Read image files into a Dataset.
Args:
paths (str/list): Path(s) to image files
mode (str): Image mode (RGB, RGBA, etc.)
Returns:
Dataset: Ray Dataset
"""
def from_items(items, *, parallelism=None):
"""
Create Dataset from list of items.
Args:
items (list): List of items
parallelism (int, optional): Parallelism level
Returns:
Dataset: Ray Dataset
"""
def read_bigquery(query, **kwargs):
"""
Read from BigQuery into a Dataset.
Args:
query (str): BigQuery SQL query
**kwargs: Additional BigQuery options
Returns:
Dataset: Ray Dataset
"""
def read_databricks_tables(table, **kwargs):
"""
Read Databricks tables into a Dataset.
Args:
table (str): Databricks table name
**kwargs: Additional Databricks options
Returns:
Dataset: Ray Dataset
"""
def read_delta(table_uri, **kwargs):
"""
Read Delta Lake table into a Dataset.
Args:
table_uri (str): Delta table URI
**kwargs: Additional Delta Lake options
Returns:
Dataset: Ray Dataset
"""
def read_hudi(table_uri, **kwargs):
"""
Read Apache Hudi table into a Dataset.
Args:
table_uri (str): Hudi table URI
**kwargs: Additional Hudi options
Returns:
Dataset: Ray Dataset
"""
def read_iceberg(table_uri, **kwargs):
"""
Read Apache Iceberg table into a Dataset.
Args:
table_uri (str): Iceberg table URI
**kwargs: Additional Iceberg options
Returns:
Dataset: Ray Dataset
"""
def read_mongo(uri, database, collection, **kwargs):
"""
Read from MongoDB into a Dataset.
Args:
uri (str): MongoDB connection URI
database (str): Database name
collection (str): Collection name
**kwargs: Additional MongoDB options
Returns:
Dataset: Ray Dataset
"""
def read_snowflake(query, connection_params, **kwargs):
"""
Read from Snowflake into a Dataset.
Args:
query (str): Snowflake SQL query
connection_params (dict): Connection parameters
**kwargs: Additional Snowflake options
Returns:
Dataset: Ray Dataset
"""
def read_tfrecords(paths, **kwargs):
"""
Read TensorFlow Records into a Dataset.
Args:
paths (str/list): Path(s) to TFRecord files
**kwargs: Additional TFRecord options
Returns:
Dataset: Ray Dataset
"""
def read_avro(paths, **kwargs):
"""
Read Apache Avro files into a Dataset.
Args:
paths (str/list): Path(s) to Avro files
**kwargs: Additional Avro options
Returns:
Dataset: Ray Dataset
"""
def read_lance(uri, **kwargs):
"""
Read Lance columnar format into a Dataset.
Args:
uri (str): Lance dataset URI
**kwargs: Additional Lance options
Returns:
Dataset: Ray Dataset
"""
def read_audio(paths, **kwargs):
"""
Read audio files into a Dataset.
Args:
paths (str/list): Path(s) to audio files
**kwargs: Additional audio processing options
Returns:
Dataset: Ray Dataset
"""
def read_videos(paths, **kwargs):
"""
Read video files into a Dataset.
Args:
paths (str/list): Path(s) to video files
**kwargs: Additional video processing options
Returns:
Dataset: Ray Dataset
"""
def from_huggingface(dataset, **kwargs):
"""
Create Dataset from HuggingFace dataset.
Args:
dataset: HuggingFace dataset object
**kwargs: Additional conversion options
Returns:
Dataset: Ray Dataset
"""
def range(n, *, parallelism=None):
"""
Create Dataset from range of integers.
Args:
n (int): Upper bound (exclusive)
parallelism (int, optional): Parallelism level
Returns:
Dataset: Ray Dataset
"""
def range_tensor(n, *, shape=(), dtype="float32", **kwargs):
"""
Create Dataset of tensors from range.
Args:
n (int): Number of tensors
shape (tuple): Tensor shape
dtype (str): Tensor data type
Returns:
Dataset: Ray Dataset
"""Transform and process dataset contents.
class Dataset:
"""Ray Dataset for distributed data processing."""
def map(self, fn, *, compute=None, **kwargs):
"""
Apply function to each row.
Args:
fn: Function to apply
compute (str, optional): Compute strategy
Returns:
Dataset: Transformed dataset
"""
def map_batches(self, fn, *, batch_size=None, **kwargs):
"""
Apply function to batches of rows.
Args:
fn: Function to apply to batches
batch_size (int, optional): Batch size
Returns:
Dataset: Transformed dataset
"""
def flat_map(self, fn, *, compute=None, **kwargs):
"""
Apply function and flatten results.
Args:
fn: Function that returns iterable
compute (str, optional): Compute strategy
Returns:
Dataset: Transformed dataset
"""
def filter(self, fn, *, compute=None):
"""
Filter rows using predicate function.
Args:
fn: Predicate function
compute (str, optional): Compute strategy
Returns:
Dataset: Filtered dataset
"""
def repartition(self, num_blocks, *, shuffle=False):
"""
Repartition dataset into specified number of blocks.
Args:
num_blocks (int): Number of blocks
shuffle (bool): Whether to shuffle data
Returns:
Dataset: Repartitioned dataset
"""
def random_shuffle(self, *, seed=None, num_blocks=None):
"""
Randomly shuffle dataset rows.
Args:
seed (int, optional): Random seed
num_blocks (int, optional): Number of blocks
Returns:
Dataset: Shuffled dataset
"""
def sort(self, key=None, *, descending=False):
"""
Sort dataset by key function.
Args:
key: Key function or column name
descending (bool): Sort in descending order
Returns:
Dataset: Sorted dataset
"""
def groupby(self, key):
"""
Group dataset by key function.
Args:
key: Key function or column name
Returns:
GroupedDataset: Grouped dataset
"""
def union(self, *other):
"""
Union with other datasets.
Args:
*other: Other datasets to union with
Returns:
Dataset: Union dataset
"""
def zip(self, other):
"""
Zip with another dataset.
Args:
other (Dataset): Dataset to zip with
Returns:
Dataset: Zipped dataset
"""
def add_column(self, col, fn, *, compute=None):
"""
Add new column to dataset.
Args:
col (str): Column name
fn: Function to compute column values
compute (str, optional): Compute strategy
Returns:
Dataset: Dataset with new column
"""
def drop_columns(self, cols):
"""
Drop columns from dataset.
Args:
cols (list): Column names to drop
Returns:
Dataset: Dataset with columns dropped
"""
def select_columns(self, cols):
"""
Select specific columns from dataset.
Args:
cols (list): Column names to select
Returns:
Dataset: Dataset with selected columns
"""
def rename_columns(self, columns):
"""
Rename dataset columns.
Args:
columns (dict): Mapping of old to new column names
Returns:
Dataset: Dataset with renamed columns
"""
def repartition(self, num_blocks, *, shuffle=False):
"""
Repartition dataset into specified number of blocks.
Args:
num_blocks (int): Target number of blocks
shuffle (bool): Whether to shuffle data
Returns:
Dataset: Repartitioned dataset
"""Save datasets and convert to other formats.
class Dataset:
def write_parquet(self, path, *, filesystem=None, **kwargs):
"""
Write dataset to Parquet files.
Args:
path (str): Output path
filesystem: Filesystem to use
"""
def write_csv(self, path, *, filesystem=None, **kwargs):
"""
Write dataset to CSV files.
Args:
path (str): Output path
filesystem: Filesystem to use
"""
def write_json(self, path, *, filesystem=None, **kwargs):
"""
Write dataset to JSON files.
Args:
path (str): Output path
filesystem: Filesystem to use
"""
def to_torch(self, *, label_column=None, feature_columns=None,
batch_size=1, **kwargs):
"""
Convert to PyTorch IterableDataset.
Args:
label_column (str, optional): Label column name
feature_columns (list, optional): Feature column names
batch_size (int): Batch size
Returns:
TorchIterableDataset: PyTorch dataset
"""
def to_tf(self, *, label_column=None, feature_columns=None,
batch_size=1, **kwargs):
"""
Convert to TensorFlow Dataset.
Args:
label_column (str, optional): Label column name
feature_columns (list, optional): Feature column names
batch_size (int): Batch size
Returns:
tf.data.Dataset: TensorFlow dataset
"""
def to_pandas(self, *, limit=None):
"""
Convert to Pandas DataFrame.
Args:
limit (int, optional): Row limit
Returns:
pandas.DataFrame: Pandas DataFrame
"""
def to_arrow(self):
"""
Convert to PyArrow Table.
Returns:
pyarrow.Table: Arrow table
"""
def iter_rows(self, *, prefetch_blocks=0):
"""
Iterate over dataset rows.
Args:
prefetch_blocks (int): Number of blocks to prefetch
Yields:
Row data
"""
def iter_batches(self, *, batch_size=None, prefetch_blocks=0):
"""
Iterate over dataset batches.
Args:
batch_size (int, optional): Batch size
prefetch_blocks (int): Number of blocks to prefetch
Yields:
Batch data
"""Get information about datasets.
class Dataset:
def count(self):
"""
Count total number of rows.
Returns:
int: Row count
"""
def schema(self):
"""
Get dataset schema.
Returns:
Schema: Dataset schema
"""
def columns(self):
"""
Get column names.
Returns:
list: Column names
"""
def stats(self):
"""
Get dataset statistics.
Returns:
DatasetStats: Dataset statistics
"""
def show(self, limit=20):
"""
Display dataset contents.
Args:
limit (int): Number of rows to show
"""
def take(self, limit=20):
"""
Take first N rows.
Args:
limit (int): Number of rows to take
Returns:
list: Row data
"""
def take_batch(self, batch_size=20):
"""
Take first batch.
Args:
batch_size (int): Batch size
Returns:
Batch data
"""Operations on grouped datasets.
class GroupedDataset:
"""Grouped dataset for aggregation operations."""
def count(self):
"""
Count rows in each group.
Returns:
Dataset: Dataset with group counts
"""
def sum(self, *columns):
"""
Sum columns in each group.
Args:
*columns: Columns to sum
Returns:
Dataset: Dataset with group sums
"""
def min(self, *columns):
"""
Find minimum values in each group.
Args:
*columns: Columns to find min for
Returns:
Dataset: Dataset with group minimums
"""
def max(self, *columns):
"""
Find maximum values in each group.
Args:
*columns: Columns to find max for
Returns:
Dataset: Dataset with group maximums
"""
def mean(self, *columns):
"""
Calculate mean values in each group.
Args:
*columns: Columns to calculate mean for
Returns:
Dataset: Dataset with group means
"""
def std(self, *columns):
"""
Calculate standard deviation in each group.
Args:
*columns: Columns to calculate std for
Returns:
Dataset: Dataset with group standard deviations
"""import ray
# Initialize Ray
ray.init()
# Create dataset from files
ds = ray.data.read_csv("s3://my-bucket/data.csv")
# Transform data
ds = ds.map(lambda row: {"value": row["value"] * 2})
# Filter data
ds = ds.filter(lambda row: row["value"] > 10)
# Convert to PyTorch
torch_ds = ds.to_torch(batch_size=32)
# Write results
ds.write_parquet("s3://my-bucket/output/")import ray
ray.init()
# Load training data
train_ds = ray.data.read_parquet("train.parquet")
# Preprocess data
def preprocess(batch):
# Normalize features
batch["features"] = (batch["features"] - batch["features"].mean()) / batch["features"].std()
return batch
train_ds = train_ds.map_batches(preprocess)
# Split features and labels
train_ds = train_ds.map(lambda row: {
"features": row["features"],
"label": row["target"]
})
# Convert to PyTorch for training
torch_ds = train_ds.to_torch(
label_column="label",
feature_columns=["features"],
batch_size=64
)import ray
ray.init()
# Load dataset
ds = ray.data.read_json("events.json")
# Group by category and aggregate
grouped = ds.groupby("category")
stats = grouped.count()
stats.show()
# Calculate statistics
print(f"Total rows: {ds.count()}")
print(f"Schema: {ds.schema()}")
ds.stats()Install with Tessl CLI
npx tessl i tessl/pypi-ray