Python library for Apache Arrow columnar memory format and computing libraries
—
Native support for reading and writing multiple file formats including Parquet, CSV, JSON, Feather, and ORC. Provides high-performance I/O with configurable options for compression, encoding, metadata handling, and integration with cloud storage systems.
High-performance columnar storage format with advanced features including compression, encoding, statistics, and schema evolution support.
# Main I/O functions
def read_table(source, columns=None, use_threads=True, metadata=None, schema=None, use_pandas_metadata=False, read_dictionary=None, memory_map=False, buffer_size=None, partitioning=None, filesystem=None, filters=None, use_legacy_dataset=None, ignore_prefixes=None, pre_buffer=None, coerce_int96_timestamp_unit=None, thrift_string_size_limit=None, thrift_container_size_limit=None):
"""
Read Parquet file as Arrow Table.
Parameters:
- source: str or file-like, path or file object
- columns: list of str, columns to read
- use_threads: bool, use multiple threads
- metadata: FileMetaData, pre-loaded metadata
- schema: Schema, expected schema
- use_pandas_metadata: bool, use pandas metadata
- read_dictionary: list, columns to dictionary encode
- memory_map: bool, use memory mapping
- buffer_size: int, read buffer size
- partitioning: Partitioning, dataset partitioning
- filesystem: FileSystem, filesystem to use
- filters: list, row filters
- use_legacy_dataset: bool, use legacy dataset API
- ignore_prefixes: list, prefixes to ignore
- pre_buffer: bool, pre-buffer columns
- coerce_int96_timestamp_unit: str, int96 timestamp unit
- thrift_string_size_limit: int, thrift string size limit
- thrift_container_size_limit: int, thrift container size limit
Returns:
Table: Arrow table with data from Parquet file
"""
def write_table(table, where, row_group_size=None, version='2.6', use_dictionary=None, compression='snappy', write_statistics=None, use_deprecated_int96_timestamps=None, coerce_timestamps=None, allow_truncated_timestamps=False, data_page_size=None, data_page_version='1.0', compression_level=None, use_byte_stream_split=None, column_encoding=None, data_encoding=None, use_compliant_nested_type=None, encryption_properties=None, write_batch_size=None, dictionary_pagesize_limit=None, store_schema=None, write_page_index=None, write_page_checksum=None, sorting_columns=None, filesystem=None, metadata_collector=None):
"""
Write Arrow Table to Parquet file.
Parameters:
- table: Table, Arrow table to write
- where: str or file-like, output path or file
- row_group_size: int, maximum rows per row group
- version: str, Parquet format version
- use_dictionary: bool or list, dictionary encoding
- compression: str or dict, compression codec
- write_statistics: bool or list, write column statistics
- use_deprecated_int96_timestamps: bool, use int96 for timestamps
- coerce_timestamps: str, timestamp coercion unit
- allow_truncated_timestamps: bool, allow timestamp truncation
- data_page_size: int, target data page size
- data_page_version: str, data page version
- compression_level: int, compression level
- use_byte_stream_split: bool or list, byte stream split encoding
- column_encoding: dict, column encoding options
- data_encoding: dict, data encoding options
- use_compliant_nested_type: bool, compliant nested type naming
- encryption_properties: FileEncryptionProperties, encryption settings
- write_batch_size: int, write batch size
- dictionary_pagesize_limit: int, dictionary page size limit
- store_schema: bool, store schema in metadata
- write_page_index: bool, write page index
- write_page_checksum: bool, write page checksums
- sorting_columns: list, column sorting information
- filesystem: FileSystem, filesystem to use
- metadata_collector: list, collect metadata
"""
def read_pandas(source, columns=None, **kwargs):
"""Read Parquet file optimized for pandas DataFrame."""
def read_schema(where, memory_map=False, metadata=None, filesystem=None):
"""
Read schema from Parquet file.
Parameters:
- where: str or file-like, path or file object
- memory_map: bool, use memory mapping
- metadata: FileMetaData, pre-loaded metadata
- filesystem: FileSystem, filesystem to use
Returns:
Schema: Arrow schema from Parquet file
"""
def read_metadata(where, memory_map=False, decryption_properties=None, filesystem=None):
"""
Read metadata from Parquet file.
Parameters:
- where: str or file-like, path or file object
- memory_map: bool, use memory mapping
- decryption_properties: FileDecryptionProperties, decryption settings
- filesystem: FileSystem, filesystem to use
Returns:
FileMetaData: Parquet file metadata
"""
class ParquetFile:
"""
Interface for reading Parquet files.
Attributes:
- metadata: FileMetaData object
- schema: Arrow schema
- schema_arrow: Arrow schema (alias)
- num_row_groups: Number of row groups
"""
def __init__(self, source, metadata=None, common_metadata=None, read_dictionary=None, memory_map=False, buffer_size=None, pre_buffer=None, coerce_int96_timestamp_unit=None, decryption_properties=None, thrift_string_size_limit=None, thrift_container_size_limit=None): ...
def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
"""Read entire file as Table."""
def read_row_group(self, i, columns=None, use_threads=True, use_pandas_metadata=False):
"""Read specific row group."""
def read_row_groups(self, row_groups, columns=None, use_threads=True, use_pandas_metadata=False):
"""Read multiple row groups."""
def iter_batches(self, batch_size=1024, row_groups=None, columns=None, use_threads=True, use_pandas_metadata=False):
"""Iterate over record batches."""
def scan_contents(self, columns=None, batch_size=1024):
"""Scan file contents."""
class ParquetWriter:
"""
Writer for Parquet files.
"""
def __init__(self, where, schema, filesystem=None, **kwargs): ...
def write_batch(self, batch, row_group_size=None):
"""Write record batch."""
def write_table(self, table, row_group_size=None):
"""Write table."""
def close(self):
"""Close writer and finalize file."""
# Metadata classes
class FileMetaData:
"""
Parquet file metadata.
Attributes:
- created_by: Creator information
- format_version: Parquet format version
- metadata: Key-value metadata
- num_columns: Number of columns
- num_row_groups: Number of row groups
- num_rows: Total number of rows
- schema: Parquet schema
- serialized_size: Serialized metadata size
"""
def row_group(self, i):
"""Get row group metadata."""
def to_dict(self):
"""Convert to dictionary."""
class RowGroupMetaData:
"""
Row group metadata.
Attributes:
- num_columns: Number of columns in row group
- num_rows: Number of rows in row group
- total_byte_size: Total byte size
"""
def column(self, i):
"""Get column chunk metadata."""
class ColumnChunkMetaData:
"""
Column chunk metadata.
Attributes:
- column_path: Column path in schema
- compression: Compression codec
- data_page_offset: Data page offset
- dictionary_page_offset: Dictionary page offset
- encodings: List of encodings used
- file_offset: File offset
- file_path: File path (for external columns)
- has_dictionary_page: Whether has dictionary page
- index_page_offset: Index page offset
- num_values: Number of values
- physical_type: Physical storage type
- statistics: Column statistics
- total_compressed_size: Compressed size
- total_uncompressed_size: Uncompressed size
"""
def to_dict(self):
"""Convert to dictionary."""
class ParquetSchema:
"""
Parquet schema representation.
Attributes:
- names: Column names
- pandas_metadata: Pandas metadata
"""
def column(self, i):
"""Get column schema."""
def to_arrow_schema(self):
"""Convert to Arrow schema."""
# Encryption support
class FileEncryptionProperties:
"""File-level encryption properties."""
class FileDecryptionProperties:
"""File-level decryption properties."""Flexible CSV reading and writing with extensive parsing options, type inference, and error handling capabilities.
def read_csv(input_file, read_options=None, parse_options=None, convert_options=None):
"""
Read CSV file as Arrow Table.
Parameters:
- input_file: str or file-like, CSV file to read
- read_options: ReadOptions, reading configuration
- parse_options: ParseOptions, parsing configuration
- convert_options: ConvertOptions, conversion configuration
Returns:
Table: Arrow table with CSV data
"""
def write_csv(data, output_file, write_options=None):
"""
Write Table to CSV file.
Parameters:
- data: Table or RecordBatch, data to write
- output_file: str or file-like, output CSV file
- write_options: WriteOptions, writing configuration
"""
def open_csv(input_file, read_options=None, parse_options=None, convert_options=None):
"""
Open CSV file for streaming.
Parameters:
- input_file: str or file-like, CSV file to open
- read_options: ReadOptions, reading configuration
- parse_options: ParseOptions, parsing configuration
- convert_options: ConvertOptions, conversion configuration
Returns:
CSVStreamingReader: Streaming CSV reader
"""
class ReadOptions:
"""
CSV reading options.
Attributes:
- use_threads: Whether to use multiple threads
- block_size: Block size for reading
- skip_rows: Number of rows to skip at start
- skip_rows_after_names: Rows to skip after header
- column_names: Explicit column names
- autogenerate_column_names: Auto-generate column names
- encoding: Character encoding (default: utf8)
"""
class ParseOptions:
"""
CSV parsing options.
Attributes:
- delimiter: Field delimiter character
- quote_char: Quote character
- double_quote: Whether quotes are doubled for escaping
- escape_char: Escape character
- newlines_in_values: Allow newlines in values
- ignore_empty_lines: Skip empty lines
"""
class ConvertOptions:
"""
CSV type conversion options.
Attributes:
- check_utf8: Validate UTF-8 encoding
- column_types: Explicit column types (dict)
- null_values: Values to treat as null
- true_values: Values to treat as True
- false_values: Values to treat as False
- decimal_point: Decimal point character
- strings_can_be_null: Whether strings can be null
- quoted_strings_can_be_null: Whether quoted strings can be null
- auto_dict_encode: Auto dictionary-encode string columns
- auto_dict_max_cardinality: Max cardinality for auto dict encoding
- include_columns: Columns to include
- include_missing_columns: Include missing columns as null
- timestamp_parsers: Custom timestamp parsers
"""
class WriteOptions:
"""
CSV writing options.
Attributes:
- include_header: Include column names as header
- batch_size: Batch size for writing
- delimiter: Field delimiter
- quoting_style: When to quote fields
"""
class CSVStreamingReader:
"""
Streaming CSV reader for large files.
"""
def __iter__(self): ...
def read_next_batch(self):
"""Read next batch of records."""
def schema(self):
"""Get schema of CSV data."""
class CSVWriter:
"""CSV writer with configurable options."""
def __init__(self, sink, schema, write_options=None): ...
def write_batch(self, batch):
"""Write record batch."""
def write_table(self, table):
"""Write table."""
def close(self):
"""Close writer."""
class InvalidRow:
"""Information about invalid rows during parsing."""
ISO8601 = ... # ISO8601 timestamp parsing constantLine-delimited JSON reading with schema inference and flexible parsing options for semi-structured data.
def read_json(input_file, read_options=None, parse_options=None):
"""
Read line-delimited JSON file as Arrow Table.
Parameters:
- input_file: str or file-like, JSON file to read
- read_options: ReadOptions, reading configuration
- parse_options: ParseOptions, parsing configuration
Returns:
Table: Arrow table with JSON data
"""
def open_json(input_file, read_options=None, parse_options=None):
"""
Open JSON file for streaming.
Parameters:
- input_file: str or file-like, JSON file to open
- read_options: ReadOptions, reading configuration
- parse_options: ParseOptions, parsing configuration
Returns:
Iterator: Streaming JSON reader
"""
class ReadOptions:
"""
JSON reading options.
Attributes:
- use_threads: Whether to use multiple threads
- block_size: Block size for reading
- schema: Explicit schema
"""
class ParseOptions:
"""
JSON parsing options.
Attributes:
- newlines_in_values: Allow newlines in string values
- explicit_schema: Use explicit schema
- unexpected_field_behavior: How to handle unexpected fields
"""Fast, language-agnostic columnar serialization format optimized for data interchange and temporary storage.
def read_table(source, columns=None, use_threads=True, memory_map=False):
"""
Read Feather file as Arrow Table.
Parameters:
- source: str or file-like, Feather file to read
- columns: list of str, columns to read
- use_threads: bool, use multiple threads
- memory_map: bool, use memory mapping
Returns:
Table: Arrow table with Feather data
"""
def read_feather(source, columns=None, use_threads=True, memory_map=False):
"""Read Feather file (pandas compatibility)."""
def write_feather(df, dest, compression=None, compression_level=None, chunksize=None, version=None):
"""
Write Table to Feather file.
Parameters:
- df: Table or pandas DataFrame, data to write
- dest: str or file-like, output Feather file
- compression: str, compression codec
- compression_level: int, compression level
- chunksize: int, maximum rows per chunk
- version: int, Feather format version
"""
class FeatherDataset:
"""Multi-file Feather dataset interface."""
class FeatherError(Exception):
"""Feather format-specific errors."""Optimized Row Columnar format with advanced compression and indexing for big data processing.
def read_table(source, columns=None, use_threads=True, memory_map=False):
"""
Read ORC file as Arrow Table.
Parameters:
- source: str or file-like, ORC file to read
- columns: list of str, columns to read
- use_threads: bool, use multiple threads
- memory_map: bool, use memory mapping
Returns:
Table: Arrow table with ORC data
"""
def write_table(table, where, file_version='0.12', batch_size=1024, stripe_size=67108864, compression='ZLIB', compression_block_size=65536, compression_strategy='speed', row_index_stride=10000, padding_tolerance=0.0, dictionary_key_size_threshold=0.0, bloom_filter_columns=None, bloom_filter_fpp=0.05):
"""
Write Arrow Table to ORC file.
Parameters:
- table: Table, Arrow table to write
- where: str or file-like, output ORC file
- file_version: str, ORC file format version
- batch_size: int, batch size for writing
- stripe_size: int, target stripe size in bytes
- compression: str, compression codec
- compression_block_size: int, compression block size
- compression_strategy: str, compression strategy
- row_index_stride: int, row index stride
- padding_tolerance: float, padding tolerance
- dictionary_key_size_threshold: float, dictionary encoding threshold
- bloom_filter_columns: list, columns for bloom filters
- bloom_filter_fpp: float, bloom filter false positive probability
"""
class ORCFile:
"""
ORC file reader interface.
Attributes:
- metadata: ORC file metadata
- schema: Arrow schema
- nrows: Number of rows
- nstripes: Number of stripes
"""
def __init__(self, source, memory_map=False): ...
def read(self, columns=None, use_threads=True):
"""Read entire file as Table."""
def read_stripe(self, n, columns=None):
"""Read specific stripe."""import pyarrow as pa
import pyarrow.parquet as pq
# Write Parquet file
table = pa.table({
'id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
'value': [10.5, 20.3, 30.1, 40.7, 50.2]
})
# Basic write
pq.write_table(table, 'example.parquet')
# Advanced write with options
pq.write_table(
table,
'advanced.parquet',
compression='snappy',
use_dictionary=['name'],
row_group_size=2,
write_statistics=True
)
# Read Parquet file
loaded_table = pq.read_table('example.parquet')
# Read specific columns
subset = pq.read_table('example.parquet', columns=['id', 'name'])
# Read with filtering
filtered = pq.read_table(
'example.parquet',
filters=[('value', '>', 25.0)]
)
# Working with ParquetFile class
parquet_file = pq.ParquetFile('example.parquet')
print(f"Schema: {parquet_file.schema}")
print(f"Metadata: {parquet_file.metadata}")
print(f"Row groups: {parquet_file.num_row_groups}")
# Read row group
row_group_0 = parquet_file.read_row_group(0)
# Iterate over batches
for batch in parquet_file.iter_batches(batch_size=2):
print(batch)import pyarrow as pa
import pyarrow.csv as csv
# Basic CSV reading
table = csv.read_csv('data.csv')
# Advanced CSV reading with options
read_options = csv.ReadOptions(
skip_rows=1,
column_names=['id', 'name', 'age', 'salary']
)
parse_options = csv.ParseOptions(
delimiter=',',
quote_char='"',
escape_char='\\'
)
convert_options = csv.ConvertOptions(
column_types={
'id': pa.int64(),
'name': pa.string(),
'age': pa.int32(),
'salary': pa.float64()
},
null_values=['', 'NULL', 'null'],
strings_can_be_null=True
)
table = csv.read_csv(
'data.csv',
read_options=read_options,
parse_options=parse_options,
convert_options=convert_options
)
# Streaming CSV reading
reader = csv.open_csv('large_data.csv')
for batch in reader:
# Process batch
print(f"Batch shape: {batch.num_rows} x {batch.num_columns}")
# Write CSV
csv.write_csv(table, 'output.csv')
# Write with options
write_options = csv.WriteOptions(
include_header=True,
delimiter=';',
quoting_style='needed'
)
csv.write_csv(table, 'output_custom.csv', write_options=write_options)import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv as csv
import pyarrow.feather as feather
import pyarrow.orc as orc
# Create sample data
table = pa.table({
'date': pa.array(['2023-01-01', '2023-01-02', '2023-01-03']),
'value': [100.5, 200.3, 150.7],
'category': ['A', 'B', 'A']
})
# Write to different formats
pq.write_table(table, 'data.parquet')
csv.write_csv(table, 'data.csv')
feather.write_feather(table, 'data.feather')
orc.write_table(table, 'data.orc')
# Read from different formats
parquet_table = pq.read_table('data.parquet')
csv_table = csv.read_csv('data.csv')
feather_table = feather.read_table('data.feather')
orc_table = orc.read_table('data.orc')
# Verify all tables are equal
assert parquet_table.equals(csv_table)
assert csv_table.equals(feather_table)
assert feather_table.equals(orc_table)
# Performance comparison
import time
def time_format(read_func, write_func, filename):
# Write timing
start = time.time()
write_func(table, filename)
write_time = time.time() - start
# Read timing
start = time.time()
result = read_func(filename)
read_time = time.time() - start
return write_time, read_time
# Compare formats
formats = [
('Parquet', pq.read_table, pq.write_table, 'test.parquet'),
('Feather', feather.read_table, feather.write_feather, 'test.feather'),
('ORC', orc.read_table, orc.write_table, 'test.orc')
]
for name, read_func, write_func, filename in formats:
write_time, read_time = time_format(read_func, write_func, filename)
print(f"{name}: Write {write_time:.4f}s, Read {read_time:.4f}s")import pyarrow as pa
import pyarrow.parquet as pq
# Schema evolution example
old_schema = pa.schema([
pa.field('id', pa.int64()),
pa.field('name', pa.string()),
pa.field('value', pa.float64())
])
new_schema = pa.schema([
pa.field('id', pa.int64()),
pa.field('name', pa.string()),
pa.field('value', pa.float64()),
pa.field('category', pa.string()) # New column
])
# Write with old schema
old_table = pa.table([
[1, 2, 3],
['A', 'B', 'C'],
[10.5, 20.3, 30.1]
], schema=old_schema)
pq.write_table(old_table, 'old_format.parquet')
# Read and extend with new schema
loaded = pq.read_table('old_format.parquet')
extended = loaded.add_column('category', pa.array([None, None, None]))
# Write with new schema
pq.write_table(extended, 'new_format.parquet')
# Metadata handling
metadata = {'version': '1.0', 'created_by': 'pyarrow_example'}
table_with_metadata = table.replace_schema_metadata(metadata)
pq.write_table(table_with_metadata, 'with_metadata.parquet')
# Read metadata
file_metadata = pq.read_metadata('with_metadata.parquet')
print(f"File metadata: {file_metadata.metadata}")
print(f"Schema metadata: {file_metadata.schema.to_arrow_schema().metadata}")Install with Tessl CLI
npx tessl i tessl/pypi-pyarrow