CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyarrow

Python library for Apache Arrow columnar memory format and computing libraries

Pending
Overview
Eval results
Files

dataset-operations.mddocs/

Dataset Operations

Multi-file dataset interface supporting partitioned data, lazy evaluation, and distributed processing. Enables efficient querying of large datasets stored across multiple files with automatic partition discovery, predicate pushdown, and columnar processing.

Capabilities

Dataset Creation and Management

Create and manage datasets from various sources including local files, cloud storage, and in-memory data.

def dataset(source, schema=None, format=None, filesystem=None, partitioning=None, partition_base_dir=None, exclude_invalid_files=None, ignore_prefixes=None):
    """
    Create dataset from various sources.
    
    Parameters:
    - source: str, list, or Table - data source(s)
    - schema: Schema, explicit schema
    - format: FileFormat, file format
    - filesystem: FileSystem, filesystem to use
    - partitioning: Partitioning, partitioning scheme
    - partition_base_dir: str, base directory for partitioning
    - exclude_invalid_files: bool, exclude invalid files
    - ignore_prefixes: list, prefixes to ignore
    
    Returns:
    Dataset: Dataset object for querying
    """

def write_dataset(data, base_dir, basename_template=None, format=None, partitioning=None, partitioning_flavor=None, schema=None, filesystem=None, file_options=None, use_threads=True, max_partitions=None, max_open_files=None, max_rows_per_file=None, min_rows_per_group=None, max_rows_per_group=None, existing_data_behavior='error'):
    """
    Write dataset to storage with partitioning.
    
    Parameters:
    - data: Table, dataset, or iterable of batches
    - base_dir: str, base directory for output
    - basename_template: str, template for file names
    - format: FileFormat, output format
    - partitioning: Partitioning, partitioning scheme
    - partitioning_flavor: str, partitioning flavor ('hive', 'directory')
    - schema: Schema, output schema
    - filesystem: FileSystem, filesystem to use
    - file_options: FileWriteOptions, format-specific options
    - use_threads: bool, use multiple threads
    - max_partitions: int, maximum number of partitions
    - max_open_files: int, maximum open files
    - max_rows_per_file: int, maximum rows per file
    - min_rows_per_group: int, minimum rows per group
    - max_rows_per_group: int, maximum rows per group
    - existing_data_behavior: str, behavior for existing data
    """

class Dataset:
    """
    Abstract dataset interface for reading tabular data.
    
    Attributes:
    - schema: Schema of the dataset
    - partition_expression: Partition expression
    """
    
    def count_rows(self, **kwargs):
        """Count total rows in dataset."""
    
    def get_fragments(self, filter=None):
        """Get dataset fragments."""
    
    def head(self, num_rows, **kwargs):
        """Get first num_rows as Table."""
    
    def replace_schema(self, schema):
        """Return dataset with new schema."""
    
    def scanner(self, **kwargs):
        """Create scanner for dataset."""
    
    def take(self, indices, **kwargs):
        """Select rows by indices."""
    
    def to_batches(self, **kwargs):
        """Convert to iterator of record batches."""
    
    def to_table(self, **kwargs):
        """Convert entire dataset to Table."""

class FileSystemDataset(Dataset):
    """
    Dataset backed by files in filesystem.
    
    Attributes:
    - files: List of dataset files
    - filesystem: Filesystem object
    - format: File format
    - partitioning: Partitioning scheme
    """

class InMemoryDataset(Dataset):
    """Dataset backed by in-memory tables."""

class UnionDataset(Dataset):
    """Union of multiple datasets."""

Dataset Scanning and Querying

Efficient scanning with projection, filtering, and lazy evaluation for large-scale data processing.

class Scanner:
    """
    Dataset scanner with filtering and projection.
    
    Attributes:
    - projected_schema: Projected schema
    - filter: Applied filter expression
    """
    
    def count_rows(self):
        """Count rows matching scanner criteria."""
    
    def head(self, num_rows):
        """Get first num_rows as Table."""
    
    def take(self, indices):
        """Select rows by indices."""
    
    def to_batches(self):
        """Scan to iterator of record batches."""
    
    def to_reader(self):
        """Convert to RecordBatchReader."""
    
    def to_table(self):
        """Scan entire dataset to Table."""
    
    def scan(self):
        """Perform scan operation."""

class TaggedRecordBatch:
    """Record batch with partition information."""
    
    @property
    def record_batch(self):
        """Get record batch."""
    
    @property  
    def fragment(self):
        """Get source fragment."""

Partitioning

Automatic partition discovery and custom partitioning schemes for organizing large datasets.

def partitioning(schema=None, field_names=None, flavor=None):
    """
    Create partitioning scheme.
    
    Parameters:
    - schema: Schema, partitioning schema
    - field_names: list, partition field names
    - flavor: str, partitioning flavor ('hive', 'directory')
    
    Returns:
    Partitioning: Partitioning scheme
    """

def get_partition_keys(path, partitioning):
    """
    Extract partition keys from path.
    
    Parameters:
    - path: str, file path
    - partitioning: Partitioning, partitioning scheme
    
    Returns:
    dict: Partition key-value pairs
    """

class Partitioning:
    """
    Abstract partitioning scheme.
    
    Attributes:
    - schema: Partitioning schema
    """
    
    def parse(self, path):
        """Parse partition keys from path."""
    
    def format(self, partition_keys):
        """Format partition keys as path."""

class PartitioningFactory:
    """Factory for creating partitioning schemes."""
    
    def create(self, field_names):
        """Create partitioning from field names."""
    
    def inspect(self, paths):
        """Inspect paths to infer partitioning."""

class DirectoryPartitioning(Partitioning):
    """
    Directory-based partitioning (e.g., year=2021/month=01/).
    """

class HivePartitioning(Partitioning):
    """
    Hive-style partitioning with key=value directories.
    """

class FilenamePartitioning(Partitioning):
    """
    Partitioning based on filenames.
    """

File Formats and Fragments

Support for multiple file formats with format-specific optimization and fragment-based processing.

class FileFormat:
    """Abstract file format interface."""
    
    def equals(self, other):
        """Check format equality."""
    
    def get_type_name(self):
        """Get format type name."""
    
    def make_fragment(self, file, filesystem=None, partition_expression=None):
        """Create fragment from file."""

class CsvFileFormat(FileFormat):
    """CSV file format implementation."""
    
    def __init__(self, parse_options=None, read_options=None, convert_options=None): ...

class JsonFileFormat(FileFormat):
    """JSON file format implementation."""
    
    def __init__(self, parse_options=None, read_options=None): ...

class IpcFileFormat(FileFormat):
    """Arrow IPC file format implementation."""

class FeatherFileFormat(FileFormat):
    """Feather file format implementation."""

class ParquetFileFormat(FileFormat):
    """
    Parquet file format implementation.
    
    Attributes:
    - read_options: Parquet read options
    """
    
    def __init__(self, read_options=None, **kwargs): ...

class OrcFileFormat(FileFormat):
    """ORC file format implementation."""

class Fragment:
    """
    Abstract fragment representing part of dataset.
    
    Attributes:
    - format: File format
    - partition_expression: Partition expression
    """
    
    def count_rows(self, **kwargs):
        """Count rows in fragment."""
    
    def head(self, num_rows, **kwargs):
        """Get first num_rows from fragment."""
    
    def scanner(self, **kwargs):
        """Create scanner for fragment."""
    
    def to_batches(self, **kwargs):
        """Convert fragment to record batches."""
    
    def to_table(self, **kwargs):
        """Convert fragment to table."""

class FileFragment(Fragment):
    """
    Fragment backed by single file.
    
    Attributes:
    - path: File path
    - filesystem: Filesystem
    """

class ParquetFileFragment(FileFragment):
    """
    Parquet-specific file fragment.
    
    Attributes:
    - row_groups: Row group information
    """
    
    def get_row_group_fragments(self):
        """Get row group fragments."""
    
    def split_by_row_group(self, **kwargs):
        """Split fragment by row groups."""

Dataset Factories

Factory classes for creating datasets with automatic discovery and configuration.

class DatasetFactory:
    """Abstract dataset factory."""
    
    def finish(self, **kwargs):
        """Create dataset from factory."""
    
    def inspect(self, **kwargs):
        """Inspect data sources."""
    
    def inspect_schemas(self, **kwargs):
        """Inspect schemas from sources."""

class FileSystemDatasetFactory(DatasetFactory):
    """
    Factory for filesystem-based datasets.
    """
    
    def __init__(self, filesystem, paths_or_selector, format=None, options=None): ...

class UnionDatasetFactory(DatasetFactory):
    """Factory for union datasets."""

class ParquetDatasetFactory(DatasetFactory):
    """
    Specialized factory for Parquet datasets.
    """
    
    def __init__(self, metadata_path, schema=None, **kwargs): ...

Write Support

Configuration and options for writing datasets with various formats and partitioning schemes.

class FileWriteOptions:
    """Base file writing options."""

class IpcFileWriteOptions(FileWriteOptions):
    """IPC file writing options."""

class ParquetFileWriteOptions(FileWriteOptions):
    """
    Parquet file writing options.
    
    Attributes:
    - write_batch_size: Batch size for writing
    - dictionary_pagesize_limit: Dictionary page size limit
    - data_page_size: Data page size
    - compression: Compression codec
    - compression_level: Compression level
    - use_dictionary: Dictionary encoding settings
    - write_statistics: Statistics writing settings
    """

class WrittenFile:
    """
    Information about written file.
    
    Attributes:
    - path: File path
    - metadata: File metadata
    - size: File size
    """

Usage Examples

Basic Dataset Operations

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import tempfile
import os

# Create sample data
table1 = pa.table({
    'year': [2020, 2020, 2021, 2021],
    'month': [1, 2, 1, 2], 
    'sales': [100, 150, 120, 180],
    'region': ['North', 'South', 'North', 'South']
})

table2 = pa.table({
    'year': [2022, 2022],
    'month': [1, 2],
    'sales': [200, 220],
    'region': ['North', 'South']
})

with tempfile.TemporaryDirectory() as tmpdir:
    # Write individual Parquet files
    pq.write_table(table1, os.path.join(tmpdir, 'data1.parquet'))
    pq.write_table(table2, os.path.join(tmpdir, 'data2.parquet'))
    
    # Create dataset from directory
    dataset = ds.dataset(tmpdir, format='parquet')
    
    # Basic dataset info
    print(f"Schema: {dataset.schema}")
    print(f"Files: {len(list(dataset.get_fragments()))}")
    
    # Read entire dataset
    full_table = dataset.to_table()
    print(f"Total rows: {len(full_table)}")
    
    # Count rows without loading data
    row_count = dataset.count_rows()
    print(f"Row count: {row_count}")
    
    # Get first few rows
    head_table = dataset.head(3)
    print(f"Head: {head_table}")
    
    # Scan with projection
    projected = dataset.to_table(columns=['year', 'sales'])
    print(f"Projected columns: {projected.column_names}")
    
    # Scan with filter
    filtered = dataset.to_table(filter=ds.field('year') >= 2021)
    print(f"Filtered rows: {len(filtered)}")
    
    # Iterator over batches
    total_batches = 0
    for batch in dataset.to_batches(batch_size=2):
        total_batches += 1
        print(f"Batch {total_batches}: {batch.num_rows} rows")

Partitioned Datasets

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import tempfile
import os

# Create larger sample data
data = {
    'year': [2020] * 100 + [2021] * 100 + [2022] * 100,
    'month': ([1] * 50 + [2] * 50) * 3,
    'day': list(range(1, 51)) * 6,
    'sales': [100 + i for i in range(300)],
    'region': (['North', 'South'] * 150)
}
large_table = pa.table(data)

with tempfile.TemporaryDirectory() as tmpdir:
    # Write partitioned dataset
    ds.write_dataset(
        large_table,
        tmpdir,
        format='parquet',
        partitioning=['year', 'month'],
        partitioning_flavor='hive'
    )
    
    # List created files
    for root, dirs, files in os.walk(tmpdir):
        for file in files:
            rel_path = os.path.relpath(os.path.join(root, file), tmpdir)
            print(f"Created: {rel_path}")
    
    # Read partitioned dataset
    partitioned_dataset = ds.dataset(tmpdir, format='parquet')
    
    # Dataset automatically discovers partitioning
    print(f"Partitioning: {partitioned_dataset.partitioning}")
    print(f"Schema: {partitioned_dataset.schema}")
    
    # Filter by partition
    year_2021 = partitioned_dataset.to_table(
        filter=ds.field('year') == 2021
    )
    print(f"Year 2021 rows: {len(year_2021)}")
    
    # Multiple partition filters  
    specific_partition = partitioned_dataset.to_table(
        filter=(ds.field('year') == 2021) & (ds.field('month') == 1)
    )
    print(f"2021-01 rows: {len(specific_partition)}")
    
    # Get partition information
    fragments = list(partitioned_dataset.get_fragments())
    for i, fragment in enumerate(fragments[:3]):  # First 3 fragments
        print(f"Fragment {i}: {fragment.partition_expression}")

Advanced Filtering and Projection

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.compute as pc
import tempfile

# Create dataset with complex data
table = pa.table({
    'id': range(1000),
    'category': ['A', 'B', 'C'] * 334,  # Cycling categories
    'value': [i * 1.5 for i in range(1000)],
    'timestamp': pa.array([
        f'2023-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}'
        for i in range(1000)
    ], type=pa.string()),
    'tags': [['tag1', 'tag2'][:i % 3] for i in range(1000)]
})

with tempfile.TemporaryDirectory() as tmpdir:
    # Write dataset
    ds.write_dataset(table, tmpdir, format='parquet')
    dataset = ds.dataset(tmpdir, format='parquet')
    
    # Complex filters
    complex_filter = (
        (ds.field('category').isin(['A', 'B'])) &
        (ds.field('value') > 500) &
        (ds.field('id') % 10 == 0)
    )
    
    filtered_data = dataset.to_table(filter=complex_filter)
    print(f"Complex filter result: {len(filtered_data)} rows")
    
    # String operations in filters
    timestamp_filter = ds.field('timestamp').match_substring('2023-01')
    jan_data = dataset.to_table(filter=timestamp_filter)
    print(f"January data: {len(jan_data)} rows")
    
    # Scanner with custom settings
    scanner = dataset.scanner(
        columns=['id', 'category', 'value'],
        filter=ds.field('value') > 750,
        batch_size=100
    )
    
    # Process in batches
    batch_count = 0
    total_rows = 0
    for batch in scanner.to_batches():
        batch_count += 1
        total_rows += batch.num_rows
        if batch_count <= 3:  # Show first 3 batches
            print(f"Batch {batch_count}: {batch.num_rows} rows, "
                  f"value range: {pc.min(batch['value'])} - {pc.max(batch['value'])}")
    
    print(f"Total batches: {batch_count}, total rows: {total_rows}")

Multi-Format Datasets

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import pyarrow.csv as csv
import pyarrow.feather as feather
import tempfile
import os

# Create sample data
base_data = {
    'id': range(100),
    'name': [f'item_{i}' for i in range(100)],
    'price': [10.0 + i * 0.5 for i in range(100)]
}

table = pa.table(base_data)

with tempfile.TemporaryDirectory() as tmpdir:
    # Write in different formats
    pq.write_table(table.slice(0, 30), os.path.join(tmpdir, 'data1.parquet'))
    csv.write_csv(table.slice(30, 30), os.path.join(tmpdir, 'data2.csv'))
    feather.write_feather(table.slice(60, 40), os.path.join(tmpdir, 'data3.feather'))
    
    # Create datasets for each format
    parquet_ds = ds.dataset(
        os.path.join(tmpdir, 'data1.parquet'),
        format='parquet'
    )
    
    csv_ds = ds.dataset(
        os.path.join(tmpdir, 'data2.csv'),
        format='csv'
    )
    
    feather_ds = ds.dataset(
        os.path.join(tmpdir, 'data3.feather'),
        format='ipc'  # Feather uses IPC format
    )
    
    # Union datasets
    union_ds = ds.UnionDataset([parquet_ds, csv_ds, feather_ds])
    
    # Read unified dataset
    unified_table = union_ds.to_table()
    print(f"Unified dataset rows: {len(unified_table)}")
    print(f"Schema: {unified_table.schema}")
    
    # Verify data integrity
    assert len(unified_table) == 100
    assert unified_table['id'].to_pylist() == list(range(100))
    
    # Process by source
    for i, fragment in enumerate(union_ds.get_fragments()):
        fragment_table = fragment.to_table()
        print(f"Fragment {i}: {len(fragment_table)} rows from {type(fragment).__name__}")

Dataset Schema Evolution

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import tempfile
import os

with tempfile.TemporaryDirectory() as tmpdir:
    # Version 1 schema
    v1_table = pa.table({
        'id': [1, 2, 3],
        'name': ['Alice', 'Bob', 'Charlie'],
        'value': [10.5, 20.3, 30.1]
    })
    
    # Version 2 schema (added column)
    v2_table = pa.table({
        'id': [4, 5, 6],
        'name': ['Diana', 'Eve', 'Frank'],
        'value': [40.7, 50.2, 60.8],
        'category': ['A', 'B', 'A']  # New column
    })
    
    # Version 3 schema (changed type, added column)
    v3_table = pa.table({
        'id': [7, 8, 9],
        'name': ['Grace', 'Henry', 'Iris'],
        'value': [70.1, 80.9, 90.5],
        'category': ['B', 'A', 'B'],
        'timestamp': ['2023-01-01', '2023-01-02', '2023-01-03']  # Another new column
    })
    
    # Write different versions
    pq.write_table(v1_table, os.path.join(tmpdir, 'v1.parquet'))
    pq.write_table(v2_table, os.path.join(tmpdir, 'v2.parquet'))
    pq.write_table(v3_table, os.path.join(tmpdir, 'v3.parquet'))
    
    # Create dataset with schema evolution
    dataset = ds.dataset(tmpdir, format='parquet')
    
    # Dataset handles schema evolution automatically
    print(f"Unified schema: {dataset.schema}")
    
    # Read all data - missing columns filled with nulls
    full_table = dataset.to_table()
    print(f"Total rows: {len(full_table)}")
    print(f"Columns: {full_table.column_names}")
    
    # Show schema evolution effects
    for col_name in full_table.column_names:
        column = full_table[col_name]
        null_count = pc.count(column, mode='only_null').as_py()
        print(f"Column '{col_name}': {null_count} nulls out of {len(column)}")
    
    # Handle schema evolution explicitly
    # Define target schema
    target_schema = pa.schema([
        pa.field('id', pa.int64()),
        pa.field('name', pa.string()),
        pa.field('value', pa.float64()),
        pa.field('category', pa.string()),
        pa.field('timestamp', pa.string()),
        pa.field('version', pa.int32())  # Add version tracking
    ])
    
    # Project to target schema with computed column
    projected = dataset.to_table(
        schema=target_schema,
        # Note: computed columns require more advanced techniques
        # This example shows the concept
    )

Performance Optimization

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import tempfile
import time

# Create large dataset for performance testing
n_rows = 100000
large_table = pa.table({
    'id': range(n_rows),
    'category': ['A', 'B', 'C', 'D'] * (n_rows // 4),
    'value1': [i * 1.1 for i in range(n_rows)],
    'value2': [i * 2.2 for i in range(n_rows)],
    'timestamp': pa.array([
        f'2023-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}'
        for i in range(n_rows)
    ])
})

with tempfile.TemporaryDirectory() as tmpdir:
    # Write with different configurations
    
    # Single file
    start_time = time.time()
    ds.write_dataset(
        large_table,
        os.path.join(tmpdir, 'single_file'),
        format='parquet',
        basename_template='data.parquet'
    )
    single_file_write_time = time.time() - start_time
    
    # Partitioned by category
    start_time = time.time()
    ds.write_dataset(
        large_table,
        os.path.join(tmpdir, 'partitioned'),
        format='parquet',
        partitioning=['category']
    )
    partitioned_write_time = time.time() - start_time
    
    # Multiple files with row limit
    start_time = time.time()
    ds.write_dataset(
        large_table,
        os.path.join(tmpdir, 'multi_file'),
        format='parquet',
        max_rows_per_file=20000
    )
    multi_file_write_time = time.time() - start_time
    
    print(f"Write times:")
    print(f"  Single file: {single_file_write_time:.3f}s")
    print(f"  Partitioned: {partitioned_write_time:.3f}s")
    print(f"  Multi-file: {multi_file_write_time:.3f}s")
    
    # Compare read performance
    datasets = {
        'single_file': ds.dataset(os.path.join(tmpdir, 'single_file')),
        'partitioned': ds.dataset(os.path.join(tmpdir, 'partitioned')),
        'multi_file': ds.dataset(os.path.join(tmpdir, 'multi_file'))
    }
    
    # Full table read
    print(f"\nFull table read times:")
    for name, dataset in datasets.items():
        start_time = time.time()
        table = dataset.to_table()
        read_time = time.time() - start_time
        print(f"  {name}: {read_time:.3f}s ({len(table)} rows)")
    
    # Filtered read (category = 'A')
    print(f"\nFiltered read times (category='A'):")
    for name, dataset in datasets.items():
        start_time = time.time()
        filtered = dataset.to_table(filter=ds.field('category') == 'A')
        read_time = time.time() - start_time
        print(f"  {name}: {read_time:.3f}s ({len(filtered)} rows)")
    
    # Column projection
    print(f"\nProjected read times (id, value1 only):")
    for name, dataset in datasets.items():
        start_time = time.time()
        projected = dataset.to_table(columns=['id', 'value1'])
        read_time = time.time() - start_time
        print(f"  {name}: {read_time:.3f}s")

Install with Tessl CLI

npx tessl i tessl/pypi-pyarrow

docs

advanced-features.md

arrow-flight.md

compute-functions.md

core-data-structures.md

data-types.md

dataset-operations.md

file-formats.md

index.md

memory-io.md

tile.json