CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-petl

A Python package for extracting, transforming and loading tables of data.

Pending
Overview
Eval results
Files

aggregation.mddocs/

Aggregation and Grouping

Functions for grouping data and performing aggregation operations. PETL provides comprehensive aggregation capabilities including custom aggregation functions, reduction operations on grouped data, and statistical computations.

Capabilities

Core Aggregation

Primary functions for grouping and aggregating data with flexible aggregation specifications.

def aggregate(table, key, aggregation=None, value=None, presorted=False, 
              buffersize=None, tempdir=None, cache=True) -> Table:
    """
    Aggregate data grouped by key.
    
    Parameters:
    - table: Input table
    - key: Field(s) to group by
    - aggregation: Aggregation function(s) (sum, min, max, mean, count, etc.)
    - value: Field(s) to aggregate (default: all numeric fields)
    - presorted: If True, table is pre-sorted by key
    - buffersize: Buffer size for sorting
    - tempdir: Directory for temporary files
    - cache: Whether to cache results
    
    Returns:
    Table with aggregated results
    """

def rowreduce(table, key, reducer, header=None, presorted=False, 
              buffersize=None, tempdir=None, cache=True) -> Table:
    """
    Reduce rows grouped by key using reducer function.
    
    Parameters:
    - table: Input table
    - key: Field(s) to group by
    - reducer: Function to reduce rows (takes list of rows, returns single row)
    - header: Header for output table
    - presorted: If True, table is pre-sorted by key
    - buffersize: Buffer size for sorting
    - tempdir: Directory for temporary files
    - cache: Whether to cache results
    
    Returns:
    Table with reduced rows
    """

def fold(table, key, f, header=None, presorted=False, 
         buffersize=None, tempdir=None, cache=True) -> Table:
    """
    Fold (reduce) groups of rows into single rows.
    
    Parameters:
    - table: Input table
    - key: Field(s) to group by
    - f: Fold function (takes accumulator and row, returns new accumulator)
    - header: Header for output table
    - presorted: If True, table is pre-sorted by key
    - buffersize: Buffer size for sorting
    - tempdir: Directory for temporary files
    - cache: Whether to cache results
    
    Returns:
    Table with folded results
    """

Data Merging and Deduplication

Combine and merge rows with the same key values.

def merge(table, key, missing=None, presorted=False, 
          buffersize=None, tempdir=None, cache=True) -> Table:
    """
    Merge rows with the same key values.
    
    Parameters:
    - table: Input table
    - key: Field(s) to group by
    - missing: Value for missing fields during merge
    - presorted: If True, table is pre-sorted by key
    - buffersize: Buffer size for sorting
    - tempdir: Directory for temporary files
    - cache: Whether to cache results
    
    Returns:
    Table with merged rows
    """

def mergeduplicates(table, key, missing=None, presorted=False, 
                    buffersize=None, tempdir=None, cache=True) -> Table:
    """
    Merge duplicate key rows.
    
    Parameters:
    - table: Input table
    - key: Field(s) to identify duplicates
    - missing: Value for missing fields during merge
    - presorted: If True, table is pre-sorted by key
    - buffersize: Buffer size for sorting
    - tempdir: Directory for temporary files
    - cache: Whether to cache results
    
    Returns:
    Table with duplicates merged
    """

def unique(table, key=None, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
    """
    Return unique rows.
    
    Parameters:
    - table: Input table
    - key: Field(s) to determine uniqueness (default: all fields)
    - presorted: If True, table is pre-sorted
    - buffersize: Buffer size for sorting
    - tempdir: Directory for temporary files
    - cache: Whether to cache results
    
    Returns:
    Table with unique rows only
    """

def distinct(table, key=None, count=None, presorted=False, 
             buffersize=None, tempdir=None, cache=True) -> Table:
    """
    Return distinct rows with optional counts.
    
    Parameters:
    - table: Input table
    - key: Field(s) to determine distinctness
    - count: Field name for count column
    - presorted: If True, table is pre-sorted
    - buffersize: Buffer size for sorting
    - tempdir: Directory for temporary files
    - cache: Whether to cache results
    
    Returns:
    Table with distinct rows and optional counts
    """

def duplicates(table, key=None, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
    """
    Return duplicate rows.
    
    Parameters:
    - table: Input table
    - key: Field(s) to identify duplicates
    - presorted: If True, table is pre-sorted
    - buffersize: Buffer size for sorting
    - tempdir: Directory for temporary files
    - cache: Whether to cache results
    
    Returns:
    Table with duplicate rows only
    """

Group Selection Operations

Select specific rows from each group based on various criteria.

def groupselectfirst(table, key, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
    """
    Select first row from each group.
    
    Parameters:
    - table: Input table
    - key: Field(s) to group by
    - presorted: If True, table is pre-sorted by key
    - buffersize: Buffer size for sorting
    - tempdir: Directory for temporary files
    - cache: Whether to cache results
    
    Returns:
    Table with first row from each group
    """

def groupselectlast(table, key, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
    """
    Select last row from each group.
    
    Parameters:
    - table: Input table
    - key: Field(s) to group by
    - presorted: If True, table is pre-sorted by key
    - buffersize: Buffer size for sorting
    - tempdir: Directory for temporary files
    - cache: Whether to cache results
    
    Returns:
    Table with last row from each group
    """

def groupselectmin(table, key, value, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
    """
    Select row with minimum value from each group.
    
    Parameters:
    - table: Input table
    - key: Field(s) to group by
    - value: Field to find minimum value for
    - presorted: If True, table is pre-sorted by key
    - buffersize: Buffer size for sorting
    - tempdir: Directory for temporary files
    - cache: Whether to cache results
    
    Returns:
    Table with min-value row from each group
    """

def groupselectmax(table, key, value, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
    """
    Select row with maximum value from each group.
    
    Parameters:
    - table: Input table
    - key: Field(s) to group by
    - value: Field to find maximum value for
    - presorted: If True, table is pre-sorted by key
    - buffersize: Buffer size for sorting
    - tempdir: Directory for temporary files
    - cache: Whether to cache results
    
    Returns:
    Table with max-value row from each group
    """

Statistical Aggregations

Specialized aggregation functions for statistical analysis.

def groupcountdistinctvalues(table, key, value) -> Table:
    """
    Count distinct values in groups.
    
    Parameters:
    - table: Input table
    - key: Field(s) to group by
    - value: Field to count distinct values for
    
    Returns:
    Table with distinct value counts per group
    """

def valuecounts(table, *field, **kwargs) -> Table:
    """
    Return a table with value counts for the specified field(s).
    
    Parameters:
    - table: Input table
    - field: Field name(s) to count values for
    - kwargs: Additional options
    
    Returns:
    Table with value counts
    """

def valuecounter(table, *field, **kwargs):
    """
    Return a Counter of values in the specified field(s).
    
    Parameters:
    - table: Input table
    - field: Field name(s) to count
    - kwargs: Additional options
    
    Returns:
    Counter object with value counts
    """

def typecounts(table, field) -> Table:
    """
    Return a table with type counts for the specified field.
    
    Parameters:
    - table: Input table
    - field: Field name to analyze types
    
    Returns:
    Table with Python type counts
    """

def parsecounts(table, field, parsers=(('int', int), ('float', float))) -> Table:
    """
    Return a table with parsing attempt counts.
    
    Parameters:
    - table: Input table
    - field: Field name to test parsing
    - parsers: List of (name, parser_function) tuples
    
    Returns:
    Table with parsing success counts
    """

Conflict Detection

Identify and handle conflicting values during aggregation.

def conflicts(table, key, missing=None, include=None, exclude=None, 
              presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
    """
    Return rows with conflicting values for the same key.
    
    Parameters:
    - table: Input table
    - key: Field(s) to identify conflicts
    - missing: Value to treat as missing
    - include: Fields to include in conflict detection
    - exclude: Fields to exclude from conflict detection
    - presorted: If True, table is pre-sorted by key
    - buffersize: Buffer size for sorting
    - tempdir: Directory for temporary files
    - cache: Whether to cache results
    
    Returns:
    Table with conflicting rows
    """

def isunique(table, field) -> bool:
    """
    Test if field values are unique.
    
    Parameters:
    - table: Input table
    - field: Field name to test for uniqueness
    
    Returns:
    Boolean indicating if all field values are unique
    """

Usage Examples

Basic Aggregation

import petl as etl

sales = etl.fromcsv('sales.csv')  # date, product, region, amount

# Simple aggregation
total_by_region = etl.aggregate(sales, 'region', 'amount', sum)

# Multiple aggregations
summary = etl.aggregate(sales, 'region', {
    'total_amount': ('amount', sum),
    'avg_amount': ('amount', lambda vals: sum(vals) / len(vals)),
    'count': len
})

# Group by multiple fields
regional_product = etl.aggregate(sales, ('region', 'product'), 'amount', sum)

Custom Aggregation Functions

import petl as etl
from statistics import median, stdev

sales = etl.fromcsv('sales.csv')

# Custom aggregation with multiple statistics
stats_summary = etl.aggregate(sales, 'product', {
    'total_sales': ('amount', sum),
    'avg_sales': ('amount', lambda vals: sum(vals) / len(vals)),
    'median_sales': ('amount', median),
    'std_dev': ('amount', lambda vals: stdev(vals) if len(vals) > 1 else 0),
    'transaction_count': len,
    'max_sale': ('amount', max),
    'min_sale': ('amount', min)
})

Row Reduction

import petl as etl

transactions = etl.fromcsv('transactions.csv')

# Custom row reducer
def combine_transactions(rows):
    """Combine multiple transaction rows into summary row."""
    if not rows:
        return None
    
    total_amount = sum(row[2] for row in rows)  # amount field
    transaction_count = len(rows)
    avg_amount = total_amount / transaction_count
    
    return (rows[0][0], rows[0][1], total_amount, transaction_count, avg_amount)

# Apply row reduction
summary = etl.rowreduce(transactions, 'customer_id', combine_transactions,
                       header=['customer_id', 'customer_name', 'total_amount', 
                              'transaction_count', 'avg_amount'])

Deduplication and Uniqueness

import petl as etl

customers = etl.fromcsv('customers.csv')

# Remove duplicate customers
unique_customers = etl.unique(customers, key='email')

# Find duplicate entries
duplicates = etl.duplicates(customers, key='email')

# Get distinct customers with count
distinct_with_count = etl.distinct(customers, key='email', count='duplicate_count')

# Merge duplicate records
merged = etl.mergeduplicates(customers, key='email')

Group Selection

import petl as etl

orders = etl.fromcsv('orders.csv')  # customer_id, order_date, amount

# Get first order for each customer
first_orders = etl.groupselectfirst(orders, 'customer_id')

# Get latest order for each customer (assuming sorted by date)
latest_orders = etl.groupselectlast(orders, 'customer_id')

# Get highest value order for each customer
highest_orders = etl.groupselectmax(orders, 'customer_id', 'amount')

# Get lowest value order for each customer
lowest_orders = etl.groupselectmin(orders, 'customer_id', 'amount')

Value Counting and Analysis

import petl as etl

survey = etl.fromcsv('survey.csv')

# Count values in a field
age_counts = etl.valuecounts(survey, 'age_group')

# Count combinations of fields
region_gender_counts = etl.valuecounts(survey, 'region', 'gender')

# Get counter object for programmatic access
age_counter = etl.valuecounter(survey, 'age_group')
print(f"Most common age group: {age_counter.most_common(1)[0]}")

# Analyze data types in a field
type_analysis = etl.typecounts(survey, 'income')

# Test parsing capabilities
parsing_analysis = etl.parsecounts(survey, 'income', [
    ('int', int),
    ('float', float),
    ('currency', lambda x: float(x.replace('$', '').replace(',', '')))
])

Conflict Detection

import petl as etl

customer_data = etl.fromcsv('customer_updates.csv')

# Find conflicting customer information
conflicts = etl.conflicts(customer_data, 'customer_id', 
                         exclude=['last_updated'])

# Check if customer IDs are unique
id_unique = etl.isunique(customer_data, 'customer_id')

if not id_unique:
    print("Warning: Customer IDs are not unique!")
    duplicate_ids = etl.duplicates(customer_data, 'customer_id')

Advanced Aggregation Patterns

import petl as etl
from datetime import datetime

sales = etl.fromcsv('sales.csv')

# Time-based aggregation with date parsing
sales_with_date = etl.convert(sales, 'date', lambda x: datetime.strptime(x, '%Y-%m-%d'))
monthly_sales = etl.aggregate(
    etl.addfield(sales_with_date, 'month', lambda row: row.date.strftime('%Y-%m')),
    'month',
    'amount',
    sum
)

# Rolling/window aggregation using fold
def rolling_average(accumulator, row):
    """Calculate rolling 3-period average."""
    if accumulator is None:
        accumulator = []
    
    accumulator.append(row)
    if len(accumulator) > 3:
        accumulator.pop(0)
    
    avg = sum(r.amount for r in accumulator) / len(accumulator)
    return accumulator  # Keep accumulator for next iteration

# Complex multi-level aggregation
hierarchical = etl.aggregate(sales, ('region', 'product', 'quarter'), {
    'total_sales': ('amount', sum),
    'unit_count': ('units', sum),
    'avg_price': (lambda rows: sum(r.amount for r in rows) / sum(r.units for r in rows))
})

Install with Tessl CLI

npx tessl i tessl/pypi-petl

docs

aggregation.md

basic-transformations.md

data-io.md

data-reshaping.md

index.md

sorting-joins.md

table-operations.md

validation-analysis.md

tile.json