CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fastparquet

Python support for Parquet file format with high performance reading and writing capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

dataset-management.mddocs/

Dataset Management

Advanced features for working with partitioned datasets, including reading from and writing to multi-file parquet collections with directory-based partitioning and dataset-level operations.

Capabilities

Multi-File Dataset Operations

Dataset Merging

Combine multiple parquet files into a logical dataset.

def merge(file_list, verify_schema=True, open_with=None, root=False):
    """
    Create logical dataset from multiple parquet files.
    
    The files must either be in the same directory or at the same level
    within a structured directory where directories provide partitioning
    information. Schemas should be consistent across files.
    
    Parameters:
    - file_list: list, paths to parquet files or ParquetFile instances
    - verify_schema: bool, verify schema consistency across all files
    - open_with: function, file opening function with (path, mode) signature
    - root: str, dataset root directory for partitioning disambiguation
    
    Returns:
    ParquetFile: Merged dataset representation with combined metadata
    """

Metadata Consolidation

Create unified metadata from multiple parquet files.

def metadata_from_many(file_list, verify_schema=False, open_with=None,
                       root=False, fs=None):
    """
    Create FileMetaData that points to multiple parquet files.
    
    Parameters:
    - file_list: list, paths to parquet files to consolidate
    - verify_schema: bool, assert that schemas in each file are identical
    - open_with: function, file opening function
    - root: str, top directory of dataset tree
    - fs: fsspec.AbstractFileSystem, filesystem interface
    
    Returns:
    tuple: (basepath, FileMetaData) with consolidated metadata
    """

Partitioned Dataset Management

Path Analysis

Analyze file paths to determine partitioning schemes and extract partition information.

def analyse_paths(file_list, root=False):
    """
    Consolidate list of file paths into parquet relative paths.
    
    Parameters:
    - file_list: list, file paths to analyze
    - root: str or False, base directory path
    
    Returns:
    tuple: (basepath, relative_paths) with common base and relative paths
    """

def get_file_scheme(paths):
    """
    Determine partitioning scheme from file paths.
    
    Parameters:
    - paths: list, file paths to analyze (from row_group.columns[0].file_path)
    
    Returns:
    str: Partitioning scheme ('empty', 'simple', 'flat', 'hive', 'drill', 'other')
    """

Partition Functions

Utility functions for working with partitioned datasets.

def partitions(row_group, only_values=False):
    """
    Extract partition values from row group file path.
    
    Parameters:
    - row_group: RowGroup object or str, row group or file path
    - only_values: bool, return only values (True) or full path (False)
    
    Returns:
    str: Partition values separated by '/' or full partition path
    """

def part_ids(row_groups):
    """
    Extract part file IDs from row group file paths.
    
    Finds integers matching "**part.*.parquet" pattern in paths.
    
    Parameters:
    - row_groups: list, row group objects to analyze
    
    Returns:
    dict: Mapping from part ID to (row_group_id, part_name) tuple
    """

def groupby_types(cats):
    """
    Group partitioning categories by their data types.
    
    Parameters:
    - cats: dict, partition categories mapping column names to values
    
    Returns:
    dict: Categories grouped by inferred type
    """

def check_column_names(columns, path):
    """
    Validate column names for parquet compatibility.
    
    Parameters:
    - columns: list, column names to validate
    - path: str, file path for error reporting
    
    Returns:
    list: Validated column names
    
    Raises:
    ValueError: If column names are invalid
    """

def ex_from_sep(sep):
    """
    Create regular expression from path separator.
    
    Parameters:
    - sep: str, path separator character
    
    Returns:
    re.Pattern: Regular expression for path matching
    """

Dataset Row Group Management

Row Group Addition

Add new row groups to existing datasets.

# ParquetFile method
def write_row_groups(self, data, row_group_offsets=None, sort_key=None,
                     sort_pnames=False, compression=None, write_fmd=True,
                     open_with=None, mkdirs=None, stats="auto"):
    """
    Write data as new row groups to existing dataset.
    
    Parameters:
    - data: pandas.DataFrame or iterable, data to add to dataset
    - row_group_offsets: int or list, row group size specification
    - sort_key: function, sorting function for row group ordering
    - sort_pnames: bool, align part file names with row group positions
    - compression: str or dict, compression settings
    - write_fmd: bool, write updated common metadata to disk
    - open_with: function, file opening function
    - mkdirs: function, directory creation function
    - stats: bool or list, statistics calculation control
    """

Row Group Removal

Remove row groups from datasets and update metadata.

# ParquetFile method
def remove_row_groups(self, rgs, sort_pnames=False, write_fmd=True,
                      open_with=None, remove_with=None):
    """
    Remove row groups from disk and update metadata.
    
    Cannot be applied to simple file scheme datasets. Removes files
    and updates common metadata accordingly.
    
    Parameters:
    - rgs: RowGroup or list, row group(s) to remove
    - sort_pnames: bool, align part file names after removal
    - write_fmd: bool, write updated common metadata
    - open_with: function, file opening function
    - remove_with: function, file removal function
    """

Dataset Overwriting and Updates

Partition Overwriting

Replace existing partitions with new data while preserving other partitions.

def overwrite(dirpath, data, row_group_offsets=None, sort_pnames=True,
              compression=None, open_with=None, mkdirs=None,
              remove_with=None, stats=True):
    """
    Overwrite partitions in existing hive-formatted parquet dataset.
    
    Row groups with partition values overlapping with new data are
    removed before new data is added. Only supports overwrite_partitioned
    mode currently.
    
    Parameters:
    - dirpath: str, directory path to parquet dataset
    - data: pandas.DataFrame, new data to write
    - row_group_offsets: int or list, row group size specification
    - sort_pnames: bool, align part file names with positions
    - compression: str or dict, compression settings
    - open_with: function, file opening function
    - mkdirs: function, directory creation function
    - remove_with: function, file removal function
    - stats: bool or list, statistics calculation control
    """

Common Metadata Management

Metadata File Writing

Write and manage common metadata files for multi-file datasets.

def write_common_metadata(fn, fmd, open_with=None, no_row_groups=True):
    """
    Write parquet schema information to shared metadata file.
    
    For hive-style parquet datasets, creates _metadata and _common_metadata
    files containing schema and file organization information.
    
    Parameters:
    - fn: str, metadata file path to write
    - fmd: FileMetaData, metadata information to write
    - open_with: function, file opening function
    - no_row_groups: bool, exclude row group info (for _common_metadata)
    """

def consolidate_categories(fmd):
    """
    Consolidate categorical metadata across row groups.
    
    Updates pandas metadata to reflect the maximum number of categories
    found across all row groups for each categorical column.
    
    Parameters:
    - fmd: FileMetaData, metadata to consolidate
    """

Metadata Updates

Update metadata in existing files without rewriting data.

def update_custom_metadata(obj, custom_metadata):
    """
    Update custom metadata in thrift object or parquet file.
    
    Supports adding, updating, and removing (with None values) metadata
    entries without affecting other metadata or data contents.
    
    Parameters:
    - obj: ThriftObject or ParquetFile, target for metadata update
    - custom_metadata: dict, metadata updates to apply
    """

File System Integration

File System Utilities

Functions for working with different file systems and storage backends.

def get_fs(fn, open_with, mkdirs):
    """
    Get filesystem object and normalize parameters from file path.
    
    Detects and configures appropriate filesystem interface for given
    path, supporting local files, cloud storage, and custom backends.
    
    Parameters:
    - fn: str, file path or URL
    - open_with: function, file opening function
    - mkdirs: function, directory creation function
    
    Returns:
    tuple: (filesystem, normalized_path, open_function, mkdir_function)
    """

def join_path(*path):
    """
    Join path components with forward slashes.
    
    Parameters:
    - *path: str, path components to join
    
    Returns:
    str: Joined path with forward slash separators
    """

Usage Examples

Working with Multi-File Datasets

from fastparquet import merge, ParquetFile

# Merge multiple parquet files into logical dataset
file_list = [
    'part_000.parquet',
    'part_001.parquet', 
    'part_002.parquet'
]

# Create merged dataset
merged_pf = merge(file_list, verify_schema=True)

# Read from merged dataset
df = merged_pf.to_pandas()
print(f"Total rows: {merged_pf.count()}")
print(f"File scheme: {merged_pf.file_scheme}")

Managing Partitioned Datasets

# Work with hive-style partitioned dataset
partitioned_pf = ParquetFile('/path/to/partitioned/dataset/')

# Check partitioning information
print(f"Partitions: {list(partitioned_pf.cats.keys())}")
print(f"Partition values: {partitioned_pf.cats}")

# Add new data to partitioned dataset
new_data = pd.DataFrame({
    'id': range(100, 200),
    'value': range(200, 300),
    'year': [2024] * 100,
    'month': [1] * 50 + [2] * 50
})

partitioned_pf.write_row_groups(
    new_data,
    compression='SNAPPY',
    stats=True
)

Row Group Management

# Remove specific row groups
pf = ParquetFile('dataset/')

# Get row groups to remove (e.g., old data)
old_rgs = [rg for rg in pf.row_groups if some_condition(rg)]

# Remove old row groups
pf.remove_row_groups(
    old_rgs,
    sort_pnames=True,  # Realign part file names
    write_fmd=True     # Update metadata
)

# Add new row groups
pf.write_row_groups(new_data, compression='GZIP')

Dataset Overwriting

from fastparquet import overwrite

# Overwrite specific partitions with new data
new_partition_data = pd.DataFrame({
    'id': range(1000),
    'value': range(1000),
    'year': [2024] * 1000,
    'month': [3] * 1000  # This will overwrite month=3 partition
})

overwrite(
    'partitioned_dataset/',
    new_partition_data,
    compression='SNAPPY',
    sort_pnames=True
)

Custom Metadata Management

from fastparquet.util import update_custom_metadata
from fastparquet.writer import update_file_custom_metadata

# Update metadata in ParquetFile object
pf = ParquetFile('data.parquet')
update_custom_metadata(pf, {
    'processing_version': '2.0',
    'last_updated': '2024-01-15',
    'deprecated_field': None  # Remove this field
})

# Update metadata directly in file
update_file_custom_metadata('data.parquet', {
    'created_by': 'updated_application',
    'schema_version': '1.2'
})

Working with File Schemes

from fastparquet.util import get_file_scheme, analyse_paths

# Analyze file organization
file_paths = [
    'year=2023/month=1/part_000.parquet',
    'year=2023/month=2/part_001.parquet',
    'year=2024/month=1/part_002.parquet'
]

scheme = get_file_scheme(file_paths)
print(f"Detected scheme: {scheme}")  # Output: 'hive'

# Analyze path structure
basepath, relative_paths = analyse_paths(file_paths)
print(f"Base path: {basepath}")
print(f"Relative paths: {relative_paths}")

Type Definitions

# File scheme types
FileScheme = Literal['empty', 'simple', 'flat', 'hive', 'drill', 'other']

# Row group specification
RowGroupSpec = Union[int, List[int]]

# File system interface
FileSystemInterface = Any  # fsspec.AbstractFileSystem compatible

# File opening function
OpenFunction = Callable[[str, str], Any]  # (path, mode) -> file-like

# Directory creation function  
MkdirsFunction = Callable[[str], None]  # (path) -> None

# File removal function
RemoveFunction = Callable[[List[str]], None]  # (paths) -> None

# Sort key function for row groups
SortKeyFunction = Callable[[Any], Union[int, str]]  # (row_group) -> sort_key

# Partition information
PartitionInfo = Dict[str, List[Any]]  # column_name -> list of values

# Path analysis result
PathAnalysis = Tuple[str, List[str]]  # (basepath, relative_paths)

Install with Tessl CLI

npx tessl i tessl/pypi-fastparquet

docs

dataset-management.md

index.md

reading.md

schema-types.md

writing.md

tile.json