Python support for Parquet file format with high performance reading and writing capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Advanced features for working with partitioned datasets, including reading from and writing to multi-file parquet collections with directory-based partitioning and dataset-level operations.
Combine multiple parquet files into a logical dataset.
def merge(file_list, verify_schema=True, open_with=None, root=False):
"""
Create logical dataset from multiple parquet files.
The files must either be in the same directory or at the same level
within a structured directory where directories provide partitioning
information. Schemas should be consistent across files.
Parameters:
- file_list: list, paths to parquet files or ParquetFile instances
- verify_schema: bool, verify schema consistency across all files
- open_with: function, file opening function with (path, mode) signature
- root: str, dataset root directory for partitioning disambiguation
Returns:
ParquetFile: Merged dataset representation with combined metadata
"""Create unified metadata from multiple parquet files.
def metadata_from_many(file_list, verify_schema=False, open_with=None,
root=False, fs=None):
"""
Create FileMetaData that points to multiple parquet files.
Parameters:
- file_list: list, paths to parquet files to consolidate
- verify_schema: bool, assert that schemas in each file are identical
- open_with: function, file opening function
- root: str, top directory of dataset tree
- fs: fsspec.AbstractFileSystem, filesystem interface
Returns:
tuple: (basepath, FileMetaData) with consolidated metadata
"""Analyze file paths to determine partitioning schemes and extract partition information.
def analyse_paths(file_list, root=False):
"""
Consolidate list of file paths into parquet relative paths.
Parameters:
- file_list: list, file paths to analyze
- root: str or False, base directory path
Returns:
tuple: (basepath, relative_paths) with common base and relative paths
"""
def get_file_scheme(paths):
"""
Determine partitioning scheme from file paths.
Parameters:
- paths: list, file paths to analyze (from row_group.columns[0].file_path)
Returns:
str: Partitioning scheme ('empty', 'simple', 'flat', 'hive', 'drill', 'other')
"""Utility functions for working with partitioned datasets.
def partitions(row_group, only_values=False):
"""
Extract partition values from row group file path.
Parameters:
- row_group: RowGroup object or str, row group or file path
- only_values: bool, return only values (True) or full path (False)
Returns:
str: Partition values separated by '/' or full partition path
"""
def part_ids(row_groups):
"""
Extract part file IDs from row group file paths.
Finds integers matching "**part.*.parquet" pattern in paths.
Parameters:
- row_groups: list, row group objects to analyze
Returns:
dict: Mapping from part ID to (row_group_id, part_name) tuple
"""
def groupby_types(cats):
"""
Group partitioning categories by their data types.
Parameters:
- cats: dict, partition categories mapping column names to values
Returns:
dict: Categories grouped by inferred type
"""
def check_column_names(columns, path):
"""
Validate column names for parquet compatibility.
Parameters:
- columns: list, column names to validate
- path: str, file path for error reporting
Returns:
list: Validated column names
Raises:
ValueError: If column names are invalid
"""
def ex_from_sep(sep):
"""
Create regular expression from path separator.
Parameters:
- sep: str, path separator character
Returns:
re.Pattern: Regular expression for path matching
"""Add new row groups to existing datasets.
# ParquetFile method
def write_row_groups(self, data, row_group_offsets=None, sort_key=None,
sort_pnames=False, compression=None, write_fmd=True,
open_with=None, mkdirs=None, stats="auto"):
"""
Write data as new row groups to existing dataset.
Parameters:
- data: pandas.DataFrame or iterable, data to add to dataset
- row_group_offsets: int or list, row group size specification
- sort_key: function, sorting function for row group ordering
- sort_pnames: bool, align part file names with row group positions
- compression: str or dict, compression settings
- write_fmd: bool, write updated common metadata to disk
- open_with: function, file opening function
- mkdirs: function, directory creation function
- stats: bool or list, statistics calculation control
"""Remove row groups from datasets and update metadata.
# ParquetFile method
def remove_row_groups(self, rgs, sort_pnames=False, write_fmd=True,
open_with=None, remove_with=None):
"""
Remove row groups from disk and update metadata.
Cannot be applied to simple file scheme datasets. Removes files
and updates common metadata accordingly.
Parameters:
- rgs: RowGroup or list, row group(s) to remove
- sort_pnames: bool, align part file names after removal
- write_fmd: bool, write updated common metadata
- open_with: function, file opening function
- remove_with: function, file removal function
"""Replace existing partitions with new data while preserving other partitions.
def overwrite(dirpath, data, row_group_offsets=None, sort_pnames=True,
compression=None, open_with=None, mkdirs=None,
remove_with=None, stats=True):
"""
Overwrite partitions in existing hive-formatted parquet dataset.
Row groups with partition values overlapping with new data are
removed before new data is added. Only supports overwrite_partitioned
mode currently.
Parameters:
- dirpath: str, directory path to parquet dataset
- data: pandas.DataFrame, new data to write
- row_group_offsets: int or list, row group size specification
- sort_pnames: bool, align part file names with positions
- compression: str or dict, compression settings
- open_with: function, file opening function
- mkdirs: function, directory creation function
- remove_with: function, file removal function
- stats: bool or list, statistics calculation control
"""Write and manage common metadata files for multi-file datasets.
def write_common_metadata(fn, fmd, open_with=None, no_row_groups=True):
"""
Write parquet schema information to shared metadata file.
For hive-style parquet datasets, creates _metadata and _common_metadata
files containing schema and file organization information.
Parameters:
- fn: str, metadata file path to write
- fmd: FileMetaData, metadata information to write
- open_with: function, file opening function
- no_row_groups: bool, exclude row group info (for _common_metadata)
"""
def consolidate_categories(fmd):
"""
Consolidate categorical metadata across row groups.
Updates pandas metadata to reflect the maximum number of categories
found across all row groups for each categorical column.
Parameters:
- fmd: FileMetaData, metadata to consolidate
"""Update metadata in existing files without rewriting data.
def update_custom_metadata(obj, custom_metadata):
"""
Update custom metadata in thrift object or parquet file.
Supports adding, updating, and removing (with None values) metadata
entries without affecting other metadata or data contents.
Parameters:
- obj: ThriftObject or ParquetFile, target for metadata update
- custom_metadata: dict, metadata updates to apply
"""Functions for working with different file systems and storage backends.
def get_fs(fn, open_with, mkdirs):
"""
Get filesystem object and normalize parameters from file path.
Detects and configures appropriate filesystem interface for given
path, supporting local files, cloud storage, and custom backends.
Parameters:
- fn: str, file path or URL
- open_with: function, file opening function
- mkdirs: function, directory creation function
Returns:
tuple: (filesystem, normalized_path, open_function, mkdir_function)
"""
def join_path(*path):
"""
Join path components with forward slashes.
Parameters:
- *path: str, path components to join
Returns:
str: Joined path with forward slash separators
"""from fastparquet import merge, ParquetFile
# Merge multiple parquet files into logical dataset
file_list = [
'part_000.parquet',
'part_001.parquet',
'part_002.parquet'
]
# Create merged dataset
merged_pf = merge(file_list, verify_schema=True)
# Read from merged dataset
df = merged_pf.to_pandas()
print(f"Total rows: {merged_pf.count()}")
print(f"File scheme: {merged_pf.file_scheme}")# Work with hive-style partitioned dataset
partitioned_pf = ParquetFile('/path/to/partitioned/dataset/')
# Check partitioning information
print(f"Partitions: {list(partitioned_pf.cats.keys())}")
print(f"Partition values: {partitioned_pf.cats}")
# Add new data to partitioned dataset
new_data = pd.DataFrame({
'id': range(100, 200),
'value': range(200, 300),
'year': [2024] * 100,
'month': [1] * 50 + [2] * 50
})
partitioned_pf.write_row_groups(
new_data,
compression='SNAPPY',
stats=True
)# Remove specific row groups
pf = ParquetFile('dataset/')
# Get row groups to remove (e.g., old data)
old_rgs = [rg for rg in pf.row_groups if some_condition(rg)]
# Remove old row groups
pf.remove_row_groups(
old_rgs,
sort_pnames=True, # Realign part file names
write_fmd=True # Update metadata
)
# Add new row groups
pf.write_row_groups(new_data, compression='GZIP')from fastparquet import overwrite
# Overwrite specific partitions with new data
new_partition_data = pd.DataFrame({
'id': range(1000),
'value': range(1000),
'year': [2024] * 1000,
'month': [3] * 1000 # This will overwrite month=3 partition
})
overwrite(
'partitioned_dataset/',
new_partition_data,
compression='SNAPPY',
sort_pnames=True
)from fastparquet.util import update_custom_metadata
from fastparquet.writer import update_file_custom_metadata
# Update metadata in ParquetFile object
pf = ParquetFile('data.parquet')
update_custom_metadata(pf, {
'processing_version': '2.0',
'last_updated': '2024-01-15',
'deprecated_field': None # Remove this field
})
# Update metadata directly in file
update_file_custom_metadata('data.parquet', {
'created_by': 'updated_application',
'schema_version': '1.2'
})from fastparquet.util import get_file_scheme, analyse_paths
# Analyze file organization
file_paths = [
'year=2023/month=1/part_000.parquet',
'year=2023/month=2/part_001.parquet',
'year=2024/month=1/part_002.parquet'
]
scheme = get_file_scheme(file_paths)
print(f"Detected scheme: {scheme}") # Output: 'hive'
# Analyze path structure
basepath, relative_paths = analyse_paths(file_paths)
print(f"Base path: {basepath}")
print(f"Relative paths: {relative_paths}")# File scheme types
FileScheme = Literal['empty', 'simple', 'flat', 'hive', 'drill', 'other']
# Row group specification
RowGroupSpec = Union[int, List[int]]
# File system interface
FileSystemInterface = Any # fsspec.AbstractFileSystem compatible
# File opening function
OpenFunction = Callable[[str, str], Any] # (path, mode) -> file-like
# Directory creation function
MkdirsFunction = Callable[[str], None] # (path) -> None
# File removal function
RemoveFunction = Callable[[List[str]], None] # (paths) -> None
# Sort key function for row groups
SortKeyFunction = Callable[[Any], Union[int, str]] # (row_group) -> sort_key
# Partition information
PartitionInfo = Dict[str, List[Any]] # column_name -> list of values
# Path analysis result
PathAnalysis = Tuple[str, List[str]] # (basepath, relative_paths)Install with Tessl CLI
npx tessl i tessl/pypi-fastparquet