CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ray

Ray is a unified framework for scaling AI and Python applications.

Pending
Overview
Eval results
Files

data-processing.mddocs/

Data Processing

Ray Data provides distributed data processing capabilities for ML workloads. It offers scalable dataset operations, transformations, and integrations with ML frameworks and storage systems.

Capabilities

Dataset Creation

Create datasets from various data sources.

def read_parquet(paths, *, filesystem=None, columns=None, **kwargs):
    """
    Read Parquet files into a Dataset.
    
    Args:
        paths (str/list): Path(s) to Parquet files
        filesystem: Filesystem to use
        columns (list, optional): Columns to read
    
    Returns:
        Dataset: Ray Dataset
    """

def read_csv(paths, *, filesystem=None, **kwargs):
    """
    Read CSV files into a Dataset.
    
    Args:
        paths (str/list): Path(s) to CSV files
        filesystem: Filesystem to use
        **kwargs: Additional CSV reading options
    
    Returns:
        Dataset: Ray Dataset
    """

def read_json(paths, *, filesystem=None, **kwargs):
    """
    Read JSON files into a Dataset.
    
    Args:
        paths (str/list): Path(s) to JSON files
        filesystem: Filesystem to use
        **kwargs: Additional JSON reading options
    
    Returns:
        Dataset: Ray Dataset
    """

def read_text(paths, *, encoding="utf-8", **kwargs):
    """
    Read text files into a Dataset.
    
    Args:
        paths (str/list): Path(s) to text files
        encoding (str): Text encoding
    
    Returns:
        Dataset: Ray Dataset
    """

def read_binary_files(paths, *, include_paths=False, **kwargs):
    """
    Read binary files into a Dataset.
    
    Args:
        paths (str/list): Path(s) to binary files
        include_paths (bool): Include file paths in output
    
    Returns:
        Dataset: Ray Dataset
    """

def read_images(paths, *, mode="RGB", **kwargs):
    """
    Read image files into a Dataset.
    
    Args:
        paths (str/list): Path(s) to image files
        mode (str): Image mode (RGB, RGBA, etc.)
    
    Returns:
        Dataset: Ray Dataset
    """

def from_items(items, *, parallelism=None):
    """
    Create Dataset from list of items.
    
    Args:
        items (list): List of items
        parallelism (int, optional): Parallelism level
    
    Returns:
        Dataset: Ray Dataset
    """

def read_bigquery(query, **kwargs):
    """
    Read from BigQuery into a Dataset.
    
    Args:
        query (str): BigQuery SQL query
        **kwargs: Additional BigQuery options
    
    Returns:
        Dataset: Ray Dataset
    """

def read_databricks_tables(table, **kwargs):
    """
    Read Databricks tables into a Dataset.
    
    Args:
        table (str): Databricks table name
        **kwargs: Additional Databricks options
    
    Returns:
        Dataset: Ray Dataset
    """

def read_delta(table_uri, **kwargs):
    """
    Read Delta Lake table into a Dataset.
    
    Args:
        table_uri (str): Delta table URI
        **kwargs: Additional Delta Lake options
    
    Returns:
        Dataset: Ray Dataset
    """

def read_hudi(table_uri, **kwargs):
    """
    Read Apache Hudi table into a Dataset.
    
    Args:
        table_uri (str): Hudi table URI
        **kwargs: Additional Hudi options
    
    Returns:
        Dataset: Ray Dataset
    """

def read_iceberg(table_uri, **kwargs):
    """
    Read Apache Iceberg table into a Dataset.
    
    Args:
        table_uri (str): Iceberg table URI
        **kwargs: Additional Iceberg options
    
    Returns:
        Dataset: Ray Dataset
    """

def read_mongo(uri, database, collection, **kwargs):
    """
    Read from MongoDB into a Dataset.
    
    Args:
        uri (str): MongoDB connection URI
        database (str): Database name
        collection (str): Collection name
        **kwargs: Additional MongoDB options
    
    Returns:
        Dataset: Ray Dataset
    """

def read_snowflake(query, connection_params, **kwargs):
    """
    Read from Snowflake into a Dataset.
    
    Args:
        query (str): Snowflake SQL query
        connection_params (dict): Connection parameters
        **kwargs: Additional Snowflake options
    
    Returns:
        Dataset: Ray Dataset
    """

def read_tfrecords(paths, **kwargs):
    """
    Read TensorFlow Records into a Dataset.
    
    Args:
        paths (str/list): Path(s) to TFRecord files
        **kwargs: Additional TFRecord options
    
    Returns:
        Dataset: Ray Dataset
    """

def read_avro(paths, **kwargs):
    """
    Read Apache Avro files into a Dataset.
    
    Args:
        paths (str/list): Path(s) to Avro files
        **kwargs: Additional Avro options
    
    Returns:
        Dataset: Ray Dataset
    """

def read_lance(uri, **kwargs):
    """
    Read Lance columnar format into a Dataset.
    
    Args:
        uri (str): Lance dataset URI
        **kwargs: Additional Lance options
    
    Returns:
        Dataset: Ray Dataset
    """

def read_audio(paths, **kwargs):
    """
    Read audio files into a Dataset.
    
    Args:
        paths (str/list): Path(s) to audio files
        **kwargs: Additional audio processing options
    
    Returns:
        Dataset: Ray Dataset
    """

def read_videos(paths, **kwargs):
    """
    Read video files into a Dataset.
    
    Args:
        paths (str/list): Path(s) to video files
        **kwargs: Additional video processing options
    
    Returns:
        Dataset: Ray Dataset
    """

def from_huggingface(dataset, **kwargs):
    """
    Create Dataset from HuggingFace dataset.
    
    Args:
        dataset: HuggingFace dataset object
        **kwargs: Additional conversion options
    
    Returns:
        Dataset: Ray Dataset
    """

def range(n, *, parallelism=None):
    """
    Create Dataset from range of integers.
    
    Args:
        n (int): Upper bound (exclusive)
        parallelism (int, optional): Parallelism level
    
    Returns:
        Dataset: Ray Dataset
    """

def range_tensor(n, *, shape=(), dtype="float32", **kwargs):
    """
    Create Dataset of tensors from range.
    
    Args:
        n (int): Number of tensors
        shape (tuple): Tensor shape
        dtype (str): Tensor data type
    
    Returns:
        Dataset: Ray Dataset
    """

Dataset Transformations

Transform and process dataset contents.

class Dataset:
    """Ray Dataset for distributed data processing."""
    
    def map(self, fn, *, compute=None, **kwargs):
        """
        Apply function to each row.
        
        Args:
            fn: Function to apply
            compute (str, optional): Compute strategy
        
        Returns:
            Dataset: Transformed dataset
        """
    
    def map_batches(self, fn, *, batch_size=None, **kwargs):
        """
        Apply function to batches of rows.
        
        Args:
            fn: Function to apply to batches
            batch_size (int, optional): Batch size
        
        Returns:
            Dataset: Transformed dataset
        """
    
    def flat_map(self, fn, *, compute=None, **kwargs):
        """
        Apply function and flatten results.
        
        Args:
            fn: Function that returns iterable
            compute (str, optional): Compute strategy
        
        Returns:
            Dataset: Transformed dataset
        """
    
    def filter(self, fn, *, compute=None):
        """
        Filter rows using predicate function.
        
        Args:
            fn: Predicate function
            compute (str, optional): Compute strategy
        
        Returns:
            Dataset: Filtered dataset
        """
    
    def repartition(self, num_blocks, *, shuffle=False):
        """
        Repartition dataset into specified number of blocks.
        
        Args:
            num_blocks (int): Number of blocks
            shuffle (bool): Whether to shuffle data
        
        Returns:
            Dataset: Repartitioned dataset
        """
    
    def random_shuffle(self, *, seed=None, num_blocks=None):
        """
        Randomly shuffle dataset rows.
        
        Args:
            seed (int, optional): Random seed
            num_blocks (int, optional): Number of blocks
        
        Returns:
            Dataset: Shuffled dataset
        """
    
    def sort(self, key=None, *, descending=False):
        """
        Sort dataset by key function.
        
        Args:
            key: Key function or column name
            descending (bool): Sort in descending order
        
        Returns:
            Dataset: Sorted dataset
        """
    
    def groupby(self, key):
        """
        Group dataset by key function.
        
        Args:
            key: Key function or column name
        
        Returns:
            GroupedDataset: Grouped dataset
        """
    
    def union(self, *other):
        """
        Union with other datasets.
        
        Args:
            *other: Other datasets to union with
        
        Returns:
            Dataset: Union dataset
        """
    
    def zip(self, other):
        """
        Zip with another dataset.
        
        Args:
            other (Dataset): Dataset to zip with
        
        Returns:
            Dataset: Zipped dataset
        """
    
    def add_column(self, col, fn, *, compute=None):
        """
        Add new column to dataset.
        
        Args:
            col (str): Column name
            fn: Function to compute column values
            compute (str, optional): Compute strategy
        
        Returns:
            Dataset: Dataset with new column
        """
    
    def drop_columns(self, cols):
        """
        Drop columns from dataset.
        
        Args:
            cols (list): Column names to drop
        
        Returns:
            Dataset: Dataset with columns dropped
        """
    
    def select_columns(self, cols):
        """
        Select specific columns from dataset.
        
        Args:
            cols (list): Column names to select
        
        Returns:
            Dataset: Dataset with selected columns
        """
    
    def rename_columns(self, columns):
        """
        Rename dataset columns.
        
        Args:
            columns (dict): Mapping of old to new column names
        
        Returns:
            Dataset: Dataset with renamed columns
        """
    
    def repartition(self, num_blocks, *, shuffle=False):
        """
        Repartition dataset into specified number of blocks.
        
        Args:
            num_blocks (int): Target number of blocks
            shuffle (bool): Whether to shuffle data
        
        Returns:
            Dataset: Repartitioned dataset
        """

Dataset I/O and Persistence

Save datasets and convert to other formats.

class Dataset:
    def write_parquet(self, path, *, filesystem=None, **kwargs):
        """
        Write dataset to Parquet files.
        
        Args:
            path (str): Output path
            filesystem: Filesystem to use
        """
    
    def write_csv(self, path, *, filesystem=None, **kwargs):
        """
        Write dataset to CSV files.
        
        Args:
            path (str): Output path
            filesystem: Filesystem to use
        """
    
    def write_json(self, path, *, filesystem=None, **kwargs):
        """
        Write dataset to JSON files.
        
        Args:
            path (str): Output path
            filesystem: Filesystem to use
        """
    
    def to_torch(self, *, label_column=None, feature_columns=None, 
                 batch_size=1, **kwargs):
        """
        Convert to PyTorch IterableDataset.
        
        Args:
            label_column (str, optional): Label column name
            feature_columns (list, optional): Feature column names
            batch_size (int): Batch size
        
        Returns:
            TorchIterableDataset: PyTorch dataset
        """
    
    def to_tf(self, *, label_column=None, feature_columns=None,
              batch_size=1, **kwargs):
        """
        Convert to TensorFlow Dataset.
        
        Args:
            label_column (str, optional): Label column name
            feature_columns (list, optional): Feature column names
            batch_size (int): Batch size
        
        Returns:
            tf.data.Dataset: TensorFlow dataset
        """
    
    def to_pandas(self, *, limit=None):
        """
        Convert to Pandas DataFrame.
        
        Args:
            limit (int, optional): Row limit
        
        Returns:
            pandas.DataFrame: Pandas DataFrame
        """
    
    def to_arrow(self):
        """
        Convert to PyArrow Table.
        
        Returns:
            pyarrow.Table: Arrow table
        """
    
    def iter_rows(self, *, prefetch_blocks=0):
        """
        Iterate over dataset rows.
        
        Args:
            prefetch_blocks (int): Number of blocks to prefetch
        
        Yields:
            Row data
        """
    
    def iter_batches(self, *, batch_size=None, prefetch_blocks=0):
        """
        Iterate over dataset batches.
        
        Args:
            batch_size (int, optional): Batch size
            prefetch_blocks (int): Number of blocks to prefetch
        
        Yields:
            Batch data
        """

Dataset Information and Statistics

Get information about datasets.

class Dataset:
    def count(self):
        """
        Count total number of rows.
        
        Returns:
            int: Row count
        """
    
    def schema(self):
        """
        Get dataset schema.
        
        Returns:
            Schema: Dataset schema
        """
    
    def columns(self):
        """
        Get column names.
        
        Returns:
            list: Column names
        """
    
    def stats(self):
        """
        Get dataset statistics.
        
        Returns:
            DatasetStats: Dataset statistics
        """
    
    def show(self, limit=20):
        """
        Display dataset contents.
        
        Args:
            limit (int): Number of rows to show
        """
    
    def take(self, limit=20):
        """
        Take first N rows.
        
        Args:
            limit (int): Number of rows to take
        
        Returns:
            list: Row data
        """
    
    def take_batch(self, batch_size=20):
        """
        Take first batch.
        
        Args:
            batch_size (int): Batch size
        
        Returns:
            Batch data
        """

Grouped Dataset Operations

Operations on grouped datasets.

class GroupedDataset:
    """Grouped dataset for aggregation operations."""
    
    def count(self):
        """
        Count rows in each group.
        
        Returns:
            Dataset: Dataset with group counts
        """
    
    def sum(self, *columns):
        """
        Sum columns in each group.
        
        Args:
            *columns: Columns to sum
        
        Returns:
            Dataset: Dataset with group sums
        """
    
    def min(self, *columns):
        """
        Find minimum values in each group.
        
        Args:
            *columns: Columns to find min for
        
        Returns:
            Dataset: Dataset with group minimums
        """
    
    def max(self, *columns):
        """
        Find maximum values in each group.
        
        Args:
            *columns: Columns to find max for
        
        Returns:
            Dataset: Dataset with group maximums
        """
    
    def mean(self, *columns):
        """
        Calculate mean values in each group.
        
        Args:
            *columns: Columns to calculate mean for
        
        Returns:
            Dataset: Dataset with group means
        """
    
    def std(self, *columns):
        """
        Calculate standard deviation in each group.
        
        Args:
            *columns: Columns to calculate std for
        
        Returns:
            Dataset: Dataset with group standard deviations
        """

Usage Examples

Basic Dataset Operations

import ray

# Initialize Ray
ray.init()

# Create dataset from files
ds = ray.data.read_csv("s3://my-bucket/data.csv")

# Transform data
ds = ds.map(lambda row: {"value": row["value"] * 2})

# Filter data
ds = ds.filter(lambda row: row["value"] > 10)

# Convert to PyTorch
torch_ds = ds.to_torch(batch_size=32)

# Write results
ds.write_parquet("s3://my-bucket/output/")

ML Pipeline Example

import ray

ray.init()

# Load training data
train_ds = ray.data.read_parquet("train.parquet")

# Preprocess data
def preprocess(batch):
    # Normalize features
    batch["features"] = (batch["features"] - batch["features"].mean()) / batch["features"].std()
    return batch

train_ds = train_ds.map_batches(preprocess)

# Split features and labels
train_ds = train_ds.map(lambda row: {
    "features": row["features"],
    "label": row["target"]
})

# Convert to PyTorch for training
torch_ds = train_ds.to_torch(
    label_column="label",
    feature_columns=["features"],
    batch_size=64
)

Data Analysis Example

import ray

ray.init()

# Load dataset
ds = ray.data.read_json("events.json")

# Group by category and aggregate
grouped = ds.groupby("category")
stats = grouped.count()
stats.show()

# Calculate statistics
print(f"Total rows: {ds.count()}")
print(f"Schema: {ds.schema()}")
ds.stats()

Install with Tessl CLI

npx tessl i tessl/pypi-ray

docs

core-distributed.md

data-processing.md

distributed-training.md

hyperparameter-tuning.md

index.md

model-serving.md

reinforcement-learning.md

utilities-advanced.md

tile.json