Python support for Parquet file format with high performance reading and writing capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive functionality for reading parquet files into pandas DataFrames with high performance and flexible data access patterns.
The main class for reading parquet files, providing access to metadata, schema information, and efficient data reading methods.
class ParquetFile:
def __init__(self, fn, verify=False, open_with=None, root=False,
sep=None, fs=None, pandas_nulls=True, dtypes=None):
"""
Initialize ParquetFile for reading parquet data.
Parameters:
- fn: str, path/URL or list of paths to parquet file(s)
- verify: bool, test file start/end byte markers
- open_with: function, custom file opener with signature func(path, mode)
- root: str, dataset root directory for partitioned data
- fs: fsspec filesystem, alternative to open_with
- pandas_nulls: bool, use pandas nullable types for int/bool with nulls
- dtypes: dict, override column dtypes
"""Read entire parquet file or filtered subset into a pandas DataFrame.
def to_pandas(self, columns=None, categories=None, filters=[],
index=None, row_filter=False, dtypes=None):
"""
Read parquet data into pandas DataFrame.
Parameters:
- columns: list, column names to load (None for all)
- categories: list or dict, columns to treat as categorical
- filters: list, row filtering conditions
- index: str or list, column(s) to use as DataFrame index
- row_filter: bool or array, enable row-wise filtering
- dtypes: dict, override column data types
Returns:
pandas.DataFrame: The loaded data
"""Get a limited number of rows from the beginning of the dataset.
def head(self, nrows, **kwargs):
"""
Get the first nrows of data.
Parameters:
- nrows: int, number of rows to return
- **kwargs: additional arguments passed to to_pandas()
Returns:
pandas.DataFrame: First nrows of data
"""Iterate through the dataset one row group at a time for memory-efficient processing.
def iter_row_groups(self, filters=None, **kwargs):
"""
Iterate dataset by row groups.
Parameters:
- filters: list, optional filters to skip row groups
- **kwargs: additional arguments passed to to_pandas()
Yields:
pandas.DataFrame: One DataFrame per row group
"""Access specific row groups using indexing and slicing operations.
def __getitem__(self, item):
"""
Select row groups using integer/slicing.
Parameters:
- item: int, slice, or list, row group selector
Returns:
ParquetFile: New ParquetFile with selected row groups
"""
def __len__(self):
"""
Return number of row groups.
Returns:
int: Number of row groups in the file
"""Get total number of rows with optional filtering.
def count(self, filters=None, row_filter=False):
"""
Total number of rows in the dataset.
Parameters:
- filters: list, optional row filtering conditions
- row_filter: bool, enable row-wise filtering
Returns:
int: Total number of rows
"""Access file metadata, schema, and structural information.
@property
def columns(self):
"""Column names available in the dataset."""
@property
def dtypes(self):
"""Expected output types for each column."""
@property
def schema(self):
"""SchemaHelper object representing column structure."""
@property
def statistics(self):
"""Per-column statistics (min, max, count, null_count)."""
@property
def key_value_metadata(self):
"""Additional metadata key-value pairs."""
@property
def pandas_metadata(self):
"""Pandas-specific metadata if available."""
@property
def info(self):
"""Dataset summary information."""
@property
def file_scheme(self):
"""File organization scheme ('simple', 'hive', 'mixed', 'empty')."""Direct row group reading functions for advanced use cases and performance optimization.
def read_row_group(file, rg, columns, categories, schema=None,
cats=None, index=None, assign=None,
scheme='hive', pandas_nulls=True, dtypes=None):
"""
Read single row group from parquet file.
Parameters:
- file: file-like object or ParquetFile
- rg: RowGroup, row group metadata object
- columns: list, column names to read
- categories: list or dict, categorical column specifications
- schema: SchemaHelper, parquet schema object
- cats: dict, partition categories
- index: str or list, index column specifications
- assign: dict, values to assign for partitioned columns
- scheme: str, partitioning scheme
- pandas_nulls: bool, use pandas nullable types
- dtypes: dict, column data type overrides
Returns:
pandas.DataFrame: Row group data
"""
def read_row_group_arrays(file, rg, columns, categories, schema=None,
cats=None, assign=None, scheme='hive'):
"""
Read row group into numpy arrays.
Parameters:
- file: file-like object or ParquetFile
- rg: RowGroup, row group metadata object
- columns: list, column names to read
- categories: list or dict, categorical specifications
- schema: SchemaHelper, parquet schema
- cats: dict, partition categories
- assign: dict, partition value assignments
- scheme: str, partitioning scheme
Returns:
dict: Column name to numpy array mapping
"""Functions for reading individual columns and their data pages.
def read_col(column, schema_helper, infile, use_cat=True,
assign=None, row_filter=None):
"""
Read single column from parquet file.
Parameters:
- column: ColumnChunk, column metadata object
- schema_helper: SchemaHelper, schema navigation helper
- infile: file-like object, open parquet file
- use_cat: bool, use categorical optimization
- assign: any, value to assign for partition columns
- row_filter: array, boolean row selection mask
Returns:
numpy.ndarray: Column data
"""
def read_data_page(infile, page, compressed_size, uncompressed_size,
column, schema, use_cat=True, selfmade=True,
assign=None, decoders=None, row_filter=None):
"""
Read and decode single data page.
Parameters:
- infile: file-like object, open parquet file
- page: PageHeader, page metadata object
- compressed_size: int, compressed page size in bytes
- uncompressed_size: int, uncompressed page size in bytes
- column: ColumnChunk, column metadata
- schema: SchemaHelper, schema navigation
- use_cat: bool, use categorical optimization
- selfmade: bool, file created by fastparquet
- assign: any, partition column assignment value
- decoders: dict, custom decoder functions
- row_filter: array, row selection mask
Returns:
tuple: (values, definition_levels, repetition_levels)
"""
def read_data_page_v2(infile, page, compressed_size, uncompressed_size,
column, schema, use_cat=True, selfmade=True,
assign=None, decoders=None, row_filter=None):
"""
Read and decode data page in v2 format.
Parameters:
- infile: file-like object, open parquet file
- page: PageHeader, page metadata object
- compressed_size: int, compressed page size
- uncompressed_size: int, uncompressed page size
- column: ColumnChunk, column metadata
- schema: SchemaHelper, schema navigation
- use_cat: bool, categorical optimization
- selfmade: bool, fastparquet-created file
- assign: any, partition value assignment
- decoders: dict, custom decoders
- row_filter: array, row selection mask
Returns:
tuple: (values, definition_levels, repetition_levels)
"""
def read_dictionary_page(infile, schema_helper):
"""
Read dictionary page for categorical columns.
Parameters:
- infile: file-like object, open parquet file
- schema_helper: SchemaHelper, schema navigation helper
Returns:
numpy.ndarray: Dictionary values
"""Utility functions for working with parquet file filters and statistics.
def filter_row_groups(pf, filters, as_idx=False):
"""
Select row groups using filters.
Parameters:
- pf: ParquetFile, the parquet file object
- filters: list, filtering conditions
- as_idx: bool, return indices instead of row groups
Returns:
list: Filtered row groups or their indices
"""
def statistics(obj):
"""
Return per-column statistics for a ParquetFile.
Parameters:
- obj: ParquetFile, ColumnChunk, or RowGroup
Returns:
dict: Statistics mapping (min, max, distinct_count, null_count) to columns
"""
def sorted_partitioned_columns(pf, filters=None):
"""
Find columns that are sorted partition-by-partition.
Parameters:
- pf: ParquetFile, the parquet file object
- filters: list, optional filtering conditions
Returns:
dict: Column names to min/max value ranges
"""from fastparquet import ParquetFile
# Open parquet file
pf = ParquetFile('data.parquet')
# Read all data
df = pf.to_pandas()
# Read specific columns
df_subset = pf.to_pandas(columns=['col1', 'col2'])
# Check file info
print(pf.info)
print(f"Columns: {pf.columns}")
print(f"Row count: {pf.count()}")# Single condition filter
df_filtered = pf.to_pandas(filters=[('age', '>', 25)])
# Multiple conditions (AND)
df_filtered = pf.to_pandas(filters=[('age', '>', 25), ('score', '>=', 80)])
# Multiple condition groups (OR)
df_filtered = pf.to_pandas(filters=[
[('category', '==', 'A'), ('value', '>', 100)], # Group 1
[('category', '==', 'B'), ('value', '>', 200)] # Group 2
])# Process large files in chunks
total_rows = 0
for chunk in pf.iter_row_groups():
# Process each row group
processed = chunk.groupby('category').sum()
total_rows += len(chunk)
print(f"Processed {total_rows} rows")
# Get sample of large file
sample = pf.head(1000)# Read partitioned dataset
pf = ParquetFile('/path/to/partitioned/dataset/')
# Access partition information
print(f"Partitions: {list(pf.cats.keys())}")
print(f"File scheme: {pf.file_scheme}")
# Filter by partition values
df = pf.to_pandas(filters=[('year', '==', 2023), ('month', 'in', [1, 2, 3])])# Filter specification
FilterCondition = Tuple[str, str, Any] # (column, operator, value)
FilterGroup = List[FilterCondition] # AND conditions
Filter = List[Union[FilterCondition, FilterGroup]] # OR groups
# Supported filter operators
FilterOp = Literal['==', '=', '!=', '<', '<=', '>', '>=', 'in', 'not in']
# File opening function signature
OpenFunction = Callable[[str, str], Any] # (path, mode) -> file-like object
# Filesystem interface
FileSystem = Any # fsspec.AbstractFileSystem compatibleInstall with Tessl CLI
npx tessl i tessl/pypi-fastparquet