Python library for Apache Arrow columnar memory format and computing libraries
—
Multi-file dataset interface supporting partitioned data, lazy evaluation, and distributed processing. Enables efficient querying of large datasets stored across multiple files with automatic partition discovery, predicate pushdown, and columnar processing.
Create and manage datasets from various sources including local files, cloud storage, and in-memory data.
def dataset(source, schema=None, format=None, filesystem=None, partitioning=None, partition_base_dir=None, exclude_invalid_files=None, ignore_prefixes=None):
"""
Create dataset from various sources.
Parameters:
- source: str, list, or Table - data source(s)
- schema: Schema, explicit schema
- format: FileFormat, file format
- filesystem: FileSystem, filesystem to use
- partitioning: Partitioning, partitioning scheme
- partition_base_dir: str, base directory for partitioning
- exclude_invalid_files: bool, exclude invalid files
- ignore_prefixes: list, prefixes to ignore
Returns:
Dataset: Dataset object for querying
"""
def write_dataset(data, base_dir, basename_template=None, format=None, partitioning=None, partitioning_flavor=None, schema=None, filesystem=None, file_options=None, use_threads=True, max_partitions=None, max_open_files=None, max_rows_per_file=None, min_rows_per_group=None, max_rows_per_group=None, existing_data_behavior='error'):
"""
Write dataset to storage with partitioning.
Parameters:
- data: Table, dataset, or iterable of batches
- base_dir: str, base directory for output
- basename_template: str, template for file names
- format: FileFormat, output format
- partitioning: Partitioning, partitioning scheme
- partitioning_flavor: str, partitioning flavor ('hive', 'directory')
- schema: Schema, output schema
- filesystem: FileSystem, filesystem to use
- file_options: FileWriteOptions, format-specific options
- use_threads: bool, use multiple threads
- max_partitions: int, maximum number of partitions
- max_open_files: int, maximum open files
- max_rows_per_file: int, maximum rows per file
- min_rows_per_group: int, minimum rows per group
- max_rows_per_group: int, maximum rows per group
- existing_data_behavior: str, behavior for existing data
"""
class Dataset:
"""
Abstract dataset interface for reading tabular data.
Attributes:
- schema: Schema of the dataset
- partition_expression: Partition expression
"""
def count_rows(self, **kwargs):
"""Count total rows in dataset."""
def get_fragments(self, filter=None):
"""Get dataset fragments."""
def head(self, num_rows, **kwargs):
"""Get first num_rows as Table."""
def replace_schema(self, schema):
"""Return dataset with new schema."""
def scanner(self, **kwargs):
"""Create scanner for dataset."""
def take(self, indices, **kwargs):
"""Select rows by indices."""
def to_batches(self, **kwargs):
"""Convert to iterator of record batches."""
def to_table(self, **kwargs):
"""Convert entire dataset to Table."""
class FileSystemDataset(Dataset):
"""
Dataset backed by files in filesystem.
Attributes:
- files: List of dataset files
- filesystem: Filesystem object
- format: File format
- partitioning: Partitioning scheme
"""
class InMemoryDataset(Dataset):
"""Dataset backed by in-memory tables."""
class UnionDataset(Dataset):
"""Union of multiple datasets."""Efficient scanning with projection, filtering, and lazy evaluation for large-scale data processing.
class Scanner:
"""
Dataset scanner with filtering and projection.
Attributes:
- projected_schema: Projected schema
- filter: Applied filter expression
"""
def count_rows(self):
"""Count rows matching scanner criteria."""
def head(self, num_rows):
"""Get first num_rows as Table."""
def take(self, indices):
"""Select rows by indices."""
def to_batches(self):
"""Scan to iterator of record batches."""
def to_reader(self):
"""Convert to RecordBatchReader."""
def to_table(self):
"""Scan entire dataset to Table."""
def scan(self):
"""Perform scan operation."""
class TaggedRecordBatch:
"""Record batch with partition information."""
@property
def record_batch(self):
"""Get record batch."""
@property
def fragment(self):
"""Get source fragment."""Automatic partition discovery and custom partitioning schemes for organizing large datasets.
def partitioning(schema=None, field_names=None, flavor=None):
"""
Create partitioning scheme.
Parameters:
- schema: Schema, partitioning schema
- field_names: list, partition field names
- flavor: str, partitioning flavor ('hive', 'directory')
Returns:
Partitioning: Partitioning scheme
"""
def get_partition_keys(path, partitioning):
"""
Extract partition keys from path.
Parameters:
- path: str, file path
- partitioning: Partitioning, partitioning scheme
Returns:
dict: Partition key-value pairs
"""
class Partitioning:
"""
Abstract partitioning scheme.
Attributes:
- schema: Partitioning schema
"""
def parse(self, path):
"""Parse partition keys from path."""
def format(self, partition_keys):
"""Format partition keys as path."""
class PartitioningFactory:
"""Factory for creating partitioning schemes."""
def create(self, field_names):
"""Create partitioning from field names."""
def inspect(self, paths):
"""Inspect paths to infer partitioning."""
class DirectoryPartitioning(Partitioning):
"""
Directory-based partitioning (e.g., year=2021/month=01/).
"""
class HivePartitioning(Partitioning):
"""
Hive-style partitioning with key=value directories.
"""
class FilenamePartitioning(Partitioning):
"""
Partitioning based on filenames.
"""Support for multiple file formats with format-specific optimization and fragment-based processing.
class FileFormat:
"""Abstract file format interface."""
def equals(self, other):
"""Check format equality."""
def get_type_name(self):
"""Get format type name."""
def make_fragment(self, file, filesystem=None, partition_expression=None):
"""Create fragment from file."""
class CsvFileFormat(FileFormat):
"""CSV file format implementation."""
def __init__(self, parse_options=None, read_options=None, convert_options=None): ...
class JsonFileFormat(FileFormat):
"""JSON file format implementation."""
def __init__(self, parse_options=None, read_options=None): ...
class IpcFileFormat(FileFormat):
"""Arrow IPC file format implementation."""
class FeatherFileFormat(FileFormat):
"""Feather file format implementation."""
class ParquetFileFormat(FileFormat):
"""
Parquet file format implementation.
Attributes:
- read_options: Parquet read options
"""
def __init__(self, read_options=None, **kwargs): ...
class OrcFileFormat(FileFormat):
"""ORC file format implementation."""
class Fragment:
"""
Abstract fragment representing part of dataset.
Attributes:
- format: File format
- partition_expression: Partition expression
"""
def count_rows(self, **kwargs):
"""Count rows in fragment."""
def head(self, num_rows, **kwargs):
"""Get first num_rows from fragment."""
def scanner(self, **kwargs):
"""Create scanner for fragment."""
def to_batches(self, **kwargs):
"""Convert fragment to record batches."""
def to_table(self, **kwargs):
"""Convert fragment to table."""
class FileFragment(Fragment):
"""
Fragment backed by single file.
Attributes:
- path: File path
- filesystem: Filesystem
"""
class ParquetFileFragment(FileFragment):
"""
Parquet-specific file fragment.
Attributes:
- row_groups: Row group information
"""
def get_row_group_fragments(self):
"""Get row group fragments."""
def split_by_row_group(self, **kwargs):
"""Split fragment by row groups."""Factory classes for creating datasets with automatic discovery and configuration.
class DatasetFactory:
"""Abstract dataset factory."""
def finish(self, **kwargs):
"""Create dataset from factory."""
def inspect(self, **kwargs):
"""Inspect data sources."""
def inspect_schemas(self, **kwargs):
"""Inspect schemas from sources."""
class FileSystemDatasetFactory(DatasetFactory):
"""
Factory for filesystem-based datasets.
"""
def __init__(self, filesystem, paths_or_selector, format=None, options=None): ...
class UnionDatasetFactory(DatasetFactory):
"""Factory for union datasets."""
class ParquetDatasetFactory(DatasetFactory):
"""
Specialized factory for Parquet datasets.
"""
def __init__(self, metadata_path, schema=None, **kwargs): ...Configuration and options for writing datasets with various formats and partitioning schemes.
class FileWriteOptions:
"""Base file writing options."""
class IpcFileWriteOptions(FileWriteOptions):
"""IPC file writing options."""
class ParquetFileWriteOptions(FileWriteOptions):
"""
Parquet file writing options.
Attributes:
- write_batch_size: Batch size for writing
- dictionary_pagesize_limit: Dictionary page size limit
- data_page_size: Data page size
- compression: Compression codec
- compression_level: Compression level
- use_dictionary: Dictionary encoding settings
- write_statistics: Statistics writing settings
"""
class WrittenFile:
"""
Information about written file.
Attributes:
- path: File path
- metadata: File metadata
- size: File size
"""import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import tempfile
import os
# Create sample data
table1 = pa.table({
'year': [2020, 2020, 2021, 2021],
'month': [1, 2, 1, 2],
'sales': [100, 150, 120, 180],
'region': ['North', 'South', 'North', 'South']
})
table2 = pa.table({
'year': [2022, 2022],
'month': [1, 2],
'sales': [200, 220],
'region': ['North', 'South']
})
with tempfile.TemporaryDirectory() as tmpdir:
# Write individual Parquet files
pq.write_table(table1, os.path.join(tmpdir, 'data1.parquet'))
pq.write_table(table2, os.path.join(tmpdir, 'data2.parquet'))
# Create dataset from directory
dataset = ds.dataset(tmpdir, format='parquet')
# Basic dataset info
print(f"Schema: {dataset.schema}")
print(f"Files: {len(list(dataset.get_fragments()))}")
# Read entire dataset
full_table = dataset.to_table()
print(f"Total rows: {len(full_table)}")
# Count rows without loading data
row_count = dataset.count_rows()
print(f"Row count: {row_count}")
# Get first few rows
head_table = dataset.head(3)
print(f"Head: {head_table}")
# Scan with projection
projected = dataset.to_table(columns=['year', 'sales'])
print(f"Projected columns: {projected.column_names}")
# Scan with filter
filtered = dataset.to_table(filter=ds.field('year') >= 2021)
print(f"Filtered rows: {len(filtered)}")
# Iterator over batches
total_batches = 0
for batch in dataset.to_batches(batch_size=2):
total_batches += 1
print(f"Batch {total_batches}: {batch.num_rows} rows")import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import tempfile
import os
# Create larger sample data
data = {
'year': [2020] * 100 + [2021] * 100 + [2022] * 100,
'month': ([1] * 50 + [2] * 50) * 3,
'day': list(range(1, 51)) * 6,
'sales': [100 + i for i in range(300)],
'region': (['North', 'South'] * 150)
}
large_table = pa.table(data)
with tempfile.TemporaryDirectory() as tmpdir:
# Write partitioned dataset
ds.write_dataset(
large_table,
tmpdir,
format='parquet',
partitioning=['year', 'month'],
partitioning_flavor='hive'
)
# List created files
for root, dirs, files in os.walk(tmpdir):
for file in files:
rel_path = os.path.relpath(os.path.join(root, file), tmpdir)
print(f"Created: {rel_path}")
# Read partitioned dataset
partitioned_dataset = ds.dataset(tmpdir, format='parquet')
# Dataset automatically discovers partitioning
print(f"Partitioning: {partitioned_dataset.partitioning}")
print(f"Schema: {partitioned_dataset.schema}")
# Filter by partition
year_2021 = partitioned_dataset.to_table(
filter=ds.field('year') == 2021
)
print(f"Year 2021 rows: {len(year_2021)}")
# Multiple partition filters
specific_partition = partitioned_dataset.to_table(
filter=(ds.field('year') == 2021) & (ds.field('month') == 1)
)
print(f"2021-01 rows: {len(specific_partition)}")
# Get partition information
fragments = list(partitioned_dataset.get_fragments())
for i, fragment in enumerate(fragments[:3]): # First 3 fragments
print(f"Fragment {i}: {fragment.partition_expression}")import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.compute as pc
import tempfile
# Create dataset with complex data
table = pa.table({
'id': range(1000),
'category': ['A', 'B', 'C'] * 334, # Cycling categories
'value': [i * 1.5 for i in range(1000)],
'timestamp': pa.array([
f'2023-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}'
for i in range(1000)
], type=pa.string()),
'tags': [['tag1', 'tag2'][:i % 3] for i in range(1000)]
})
with tempfile.TemporaryDirectory() as tmpdir:
# Write dataset
ds.write_dataset(table, tmpdir, format='parquet')
dataset = ds.dataset(tmpdir, format='parquet')
# Complex filters
complex_filter = (
(ds.field('category').isin(['A', 'B'])) &
(ds.field('value') > 500) &
(ds.field('id') % 10 == 0)
)
filtered_data = dataset.to_table(filter=complex_filter)
print(f"Complex filter result: {len(filtered_data)} rows")
# String operations in filters
timestamp_filter = ds.field('timestamp').match_substring('2023-01')
jan_data = dataset.to_table(filter=timestamp_filter)
print(f"January data: {len(jan_data)} rows")
# Scanner with custom settings
scanner = dataset.scanner(
columns=['id', 'category', 'value'],
filter=ds.field('value') > 750,
batch_size=100
)
# Process in batches
batch_count = 0
total_rows = 0
for batch in scanner.to_batches():
batch_count += 1
total_rows += batch.num_rows
if batch_count <= 3: # Show first 3 batches
print(f"Batch {batch_count}: {batch.num_rows} rows, "
f"value range: {pc.min(batch['value'])} - {pc.max(batch['value'])}")
print(f"Total batches: {batch_count}, total rows: {total_rows}")import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import pyarrow.csv as csv
import pyarrow.feather as feather
import tempfile
import os
# Create sample data
base_data = {
'id': range(100),
'name': [f'item_{i}' for i in range(100)],
'price': [10.0 + i * 0.5 for i in range(100)]
}
table = pa.table(base_data)
with tempfile.TemporaryDirectory() as tmpdir:
# Write in different formats
pq.write_table(table.slice(0, 30), os.path.join(tmpdir, 'data1.parquet'))
csv.write_csv(table.slice(30, 30), os.path.join(tmpdir, 'data2.csv'))
feather.write_feather(table.slice(60, 40), os.path.join(tmpdir, 'data3.feather'))
# Create datasets for each format
parquet_ds = ds.dataset(
os.path.join(tmpdir, 'data1.parquet'),
format='parquet'
)
csv_ds = ds.dataset(
os.path.join(tmpdir, 'data2.csv'),
format='csv'
)
feather_ds = ds.dataset(
os.path.join(tmpdir, 'data3.feather'),
format='ipc' # Feather uses IPC format
)
# Union datasets
union_ds = ds.UnionDataset([parquet_ds, csv_ds, feather_ds])
# Read unified dataset
unified_table = union_ds.to_table()
print(f"Unified dataset rows: {len(unified_table)}")
print(f"Schema: {unified_table.schema}")
# Verify data integrity
assert len(unified_table) == 100
assert unified_table['id'].to_pylist() == list(range(100))
# Process by source
for i, fragment in enumerate(union_ds.get_fragments()):
fragment_table = fragment.to_table()
print(f"Fragment {i}: {len(fragment_table)} rows from {type(fragment).__name__}")import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import tempfile
import os
with tempfile.TemporaryDirectory() as tmpdir:
# Version 1 schema
v1_table = pa.table({
'id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie'],
'value': [10.5, 20.3, 30.1]
})
# Version 2 schema (added column)
v2_table = pa.table({
'id': [4, 5, 6],
'name': ['Diana', 'Eve', 'Frank'],
'value': [40.7, 50.2, 60.8],
'category': ['A', 'B', 'A'] # New column
})
# Version 3 schema (changed type, added column)
v3_table = pa.table({
'id': [7, 8, 9],
'name': ['Grace', 'Henry', 'Iris'],
'value': [70.1, 80.9, 90.5],
'category': ['B', 'A', 'B'],
'timestamp': ['2023-01-01', '2023-01-02', '2023-01-03'] # Another new column
})
# Write different versions
pq.write_table(v1_table, os.path.join(tmpdir, 'v1.parquet'))
pq.write_table(v2_table, os.path.join(tmpdir, 'v2.parquet'))
pq.write_table(v3_table, os.path.join(tmpdir, 'v3.parquet'))
# Create dataset with schema evolution
dataset = ds.dataset(tmpdir, format='parquet')
# Dataset handles schema evolution automatically
print(f"Unified schema: {dataset.schema}")
# Read all data - missing columns filled with nulls
full_table = dataset.to_table()
print(f"Total rows: {len(full_table)}")
print(f"Columns: {full_table.column_names}")
# Show schema evolution effects
for col_name in full_table.column_names:
column = full_table[col_name]
null_count = pc.count(column, mode='only_null').as_py()
print(f"Column '{col_name}': {null_count} nulls out of {len(column)}")
# Handle schema evolution explicitly
# Define target schema
target_schema = pa.schema([
pa.field('id', pa.int64()),
pa.field('name', pa.string()),
pa.field('value', pa.float64()),
pa.field('category', pa.string()),
pa.field('timestamp', pa.string()),
pa.field('version', pa.int32()) # Add version tracking
])
# Project to target schema with computed column
projected = dataset.to_table(
schema=target_schema,
# Note: computed columns require more advanced techniques
# This example shows the concept
)import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import tempfile
import time
# Create large dataset for performance testing
n_rows = 100000
large_table = pa.table({
'id': range(n_rows),
'category': ['A', 'B', 'C', 'D'] * (n_rows // 4),
'value1': [i * 1.1 for i in range(n_rows)],
'value2': [i * 2.2 for i in range(n_rows)],
'timestamp': pa.array([
f'2023-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}'
for i in range(n_rows)
])
})
with tempfile.TemporaryDirectory() as tmpdir:
# Write with different configurations
# Single file
start_time = time.time()
ds.write_dataset(
large_table,
os.path.join(tmpdir, 'single_file'),
format='parquet',
basename_template='data.parquet'
)
single_file_write_time = time.time() - start_time
# Partitioned by category
start_time = time.time()
ds.write_dataset(
large_table,
os.path.join(tmpdir, 'partitioned'),
format='parquet',
partitioning=['category']
)
partitioned_write_time = time.time() - start_time
# Multiple files with row limit
start_time = time.time()
ds.write_dataset(
large_table,
os.path.join(tmpdir, 'multi_file'),
format='parquet',
max_rows_per_file=20000
)
multi_file_write_time = time.time() - start_time
print(f"Write times:")
print(f" Single file: {single_file_write_time:.3f}s")
print(f" Partitioned: {partitioned_write_time:.3f}s")
print(f" Multi-file: {multi_file_write_time:.3f}s")
# Compare read performance
datasets = {
'single_file': ds.dataset(os.path.join(tmpdir, 'single_file')),
'partitioned': ds.dataset(os.path.join(tmpdir, 'partitioned')),
'multi_file': ds.dataset(os.path.join(tmpdir, 'multi_file'))
}
# Full table read
print(f"\nFull table read times:")
for name, dataset in datasets.items():
start_time = time.time()
table = dataset.to_table()
read_time = time.time() - start_time
print(f" {name}: {read_time:.3f}s ({len(table)} rows)")
# Filtered read (category = 'A')
print(f"\nFiltered read times (category='A'):")
for name, dataset in datasets.items():
start_time = time.time()
filtered = dataset.to_table(filter=ds.field('category') == 'A')
read_time = time.time() - start_time
print(f" {name}: {read_time:.3f}s ({len(filtered)} rows)")
# Column projection
print(f"\nProjected read times (id, value1 only):")
for name, dataset in datasets.items():
start_time = time.time()
projected = dataset.to_table(columns=['id', 'value1'])
read_time = time.time() - start_time
print(f" {name}: {read_time:.3f}s")Install with Tessl CLI
npx tessl i tessl/pypi-pyarrow