Python support for Parquet file format with high performance reading and writing capabilities
npx @tessl/cli install tessl/pypi-fastparquet@2024.11.0A high-performance Python implementation of the Apache Parquet columnar storage format, designed for seamless integration with pandas and the Python data science ecosystem. It provides fast reading and writing of parquet files with excellent compression and query performance.
pip install fastparquet or conda install -c conda-forge fastparquetimport fastparquetFor reading parquet files:
from fastparquet import ParquetFileFor writing parquet files:
from fastparquet import write, update_file_custom_metadataimport pandas as pd
from fastparquet import ParquetFile, write
# Writing parquet files
df = pd.DataFrame({
'id': range(1000),
'value': range(1000, 2000),
'category': ['A', 'B', 'C'] * 333 + ['A']
})
# Write to parquet file
write('data.parquet', df)
# Write with compression
write('data_compressed.parquet', df, compression='GZIP')
# Reading parquet files
pf = ParquetFile('data.parquet')
# Read entire file to pandas DataFrame
df_read = pf.to_pandas()
# Read specific columns
df_subset = pf.to_pandas(columns=['id', 'value'])
# Read with filters
df_filtered = pf.to_pandas(
filters=[('category', '==', 'A'), ('value', '>', 1500)]
)fastparquet is built around several core components:
The library emphasizes performance and compatibility with the broader Python data ecosystem, particularly pandas, while providing comprehensive support for parquet format features.
Core functionality for reading parquet files into pandas DataFrames, with support for selective column reading, filtering, and efficient memory usage through row group iteration.
class ParquetFile:
def __init__(self, fn, verify=False, open_with=None, root=False,
sep=None, fs=None, pandas_nulls=True, dtypes=None): ...
def to_pandas(self, columns=None, categories=None, filters=[],
index=None, row_filter=False, dtypes=None): ...
def head(self, nrows, **kwargs): ...
def iter_row_groups(self, filters=None, **kwargs): ...
def count(self, filters=None, row_filter=False): ...
def __getitem__(self, item): ... # Row group selection
def __len__(self): ... # Number of row groups
def read_row_group_file(self, rg, columns, categories, index=None,
assign=None, partition_meta=None, row_filter=False): ...
def write_row_groups(self, data, row_group_offsets=None, **kwargs): ...
def remove_row_groups(self, rgs, **kwargs): ...
def check_categories(self, cats): ...
def pre_allocate(self, size, columns, categories, index, dtypes=None): ...Comprehensive functionality for writing pandas DataFrames to parquet format with options for compression, partitioning, encoding, and metadata management.
def write(filename, data, row_group_offsets=None, compression=None,
file_scheme='hive', has_nulls=None, write_index=None,
partition_on=[], append=False, object_encoding=None,
fixed_text=None, times='int64', custom_metadata=None,
stats="auto", open_with=None, mkdirs=None): ...
def update_file_custom_metadata(fn, custom_metadata, open_with=None): ...Tools for working with parquet schemas, type conversions, and metadata management to ensure proper data representation and compatibility.
def find_type(data, fixed_text=None, object_encoding=None,
times='int64', is_index=None): ...
def convert(data, se): ...
class SchemaHelper:
def __init__(self, schema_elements): ...
def schema_element(self, name): ...
def is_required(self, name): ...
def max_repetition_level(self, parts): ...
def max_definition_level(self, parts): ...Advanced features for working with partitioned datasets, including reading from and writing to multi-file parquet collections with directory-based partitioning.
def merge(file_list, verify_schema=True, open_with=None, root=False): ...
def metadata_from_many(file_list, verify_schema=False, open_with=None,
root=False, fs=None): ...class ParquetException(Exception):
"""Generic Exception related to unexpected data format when reading parquet file."""
# File scheme options
FileScheme = Literal['simple', 'hive', 'drill', 'flat', 'empty', 'mixed']
# Compression algorithm options
CompressionType = Union[str, Dict[str, Union[str, Dict[str, Any]]]]
# Supported: 'UNCOMPRESSED', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4', 'ZSTD'
# Filter specification
FilterCondition = Tuple[str, str, Any] # (column, operator, value)
FilterGroup = List[FilterCondition] # AND conditions
Filter = List[Union[FilterCondition, FilterGroup]] # OR groups
# Supported filter operators
FilterOperator = Literal['==', '=', '!=', '<', '<=', '>', '>=', 'in', 'not in']
# Object encoding options
ObjectEncoding = Literal['infer', 'utf8', 'json', 'bytes', 'bson']
# Timestamp encoding options
TimeEncoding = Literal['int64', 'int96']
# SchemaElement type
class SchemaElement:
name: str
type: Optional[str]
type_length: Optional[int]
repetition_type: Optional[str] # 'REQUIRED', 'OPTIONAL', 'REPEATED'
num_children: int
converted_type: Optional[str]
scale: Optional[int]
precision: Optional[int]
field_id: Optional[int]
logical_type: Optional[dict]
# Thrift metadata types
class FileMetaData:
version: int
schema: List[SchemaElement]
num_rows: int
row_groups: List['RowGroup']
key_value_metadata: Optional[List['KeyValue']]
created_by: Optional[str]
column_orders: Optional[List['ColumnOrder']]
class RowGroup:
columns: List['ColumnChunk']
total_byte_size: int
num_rows: int
sorting_columns: Optional[List['SortingColumn']]
file_offset: Optional[int]
total_compressed_size: Optional[int]
ordinal: Optional[int]
class ColumnChunk:
file_path: Optional[str]
file_offset: int
meta_data: Optional['ColumnMetaData']
offset_index_offset: Optional[int]
offset_index_length: Optional[int]
column_index_offset: Optional[int]
column_index_length: Optional[int]
# Function type signatures
OpenFunction = Callable[[str, str], Any] # (path, mode) -> file-like
MkdirsFunction = Callable[[str], None] # (path) -> None
RemoveFunction = Callable[[str], None] # (path) -> None
FileSystem = Any # fsspec.AbstractFileSystem compatible interface