A Python package for extracting, transforming and loading tables of data.
—
Functions for grouping data and performing aggregation operations. PETL provides comprehensive aggregation capabilities including custom aggregation functions, reduction operations on grouped data, and statistical computations.
Primary functions for grouping and aggregating data with flexible aggregation specifications.
def aggregate(table, key, aggregation=None, value=None, presorted=False,
buffersize=None, tempdir=None, cache=True) -> Table:
"""
Aggregate data grouped by key.
Parameters:
- table: Input table
- key: Field(s) to group by
- aggregation: Aggregation function(s) (sum, min, max, mean, count, etc.)
- value: Field(s) to aggregate (default: all numeric fields)
- presorted: If True, table is pre-sorted by key
- buffersize: Buffer size for sorting
- tempdir: Directory for temporary files
- cache: Whether to cache results
Returns:
Table with aggregated results
"""
def rowreduce(table, key, reducer, header=None, presorted=False,
buffersize=None, tempdir=None, cache=True) -> Table:
"""
Reduce rows grouped by key using reducer function.
Parameters:
- table: Input table
- key: Field(s) to group by
- reducer: Function to reduce rows (takes list of rows, returns single row)
- header: Header for output table
- presorted: If True, table is pre-sorted by key
- buffersize: Buffer size for sorting
- tempdir: Directory for temporary files
- cache: Whether to cache results
Returns:
Table with reduced rows
"""
def fold(table, key, f, header=None, presorted=False,
buffersize=None, tempdir=None, cache=True) -> Table:
"""
Fold (reduce) groups of rows into single rows.
Parameters:
- table: Input table
- key: Field(s) to group by
- f: Fold function (takes accumulator and row, returns new accumulator)
- header: Header for output table
- presorted: If True, table is pre-sorted by key
- buffersize: Buffer size for sorting
- tempdir: Directory for temporary files
- cache: Whether to cache results
Returns:
Table with folded results
"""Combine and merge rows with the same key values.
def merge(table, key, missing=None, presorted=False,
buffersize=None, tempdir=None, cache=True) -> Table:
"""
Merge rows with the same key values.
Parameters:
- table: Input table
- key: Field(s) to group by
- missing: Value for missing fields during merge
- presorted: If True, table is pre-sorted by key
- buffersize: Buffer size for sorting
- tempdir: Directory for temporary files
- cache: Whether to cache results
Returns:
Table with merged rows
"""
def mergeduplicates(table, key, missing=None, presorted=False,
buffersize=None, tempdir=None, cache=True) -> Table:
"""
Merge duplicate key rows.
Parameters:
- table: Input table
- key: Field(s) to identify duplicates
- missing: Value for missing fields during merge
- presorted: If True, table is pre-sorted by key
- buffersize: Buffer size for sorting
- tempdir: Directory for temporary files
- cache: Whether to cache results
Returns:
Table with duplicates merged
"""
def unique(table, key=None, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
"""
Return unique rows.
Parameters:
- table: Input table
- key: Field(s) to determine uniqueness (default: all fields)
- presorted: If True, table is pre-sorted
- buffersize: Buffer size for sorting
- tempdir: Directory for temporary files
- cache: Whether to cache results
Returns:
Table with unique rows only
"""
def distinct(table, key=None, count=None, presorted=False,
buffersize=None, tempdir=None, cache=True) -> Table:
"""
Return distinct rows with optional counts.
Parameters:
- table: Input table
- key: Field(s) to determine distinctness
- count: Field name for count column
- presorted: If True, table is pre-sorted
- buffersize: Buffer size for sorting
- tempdir: Directory for temporary files
- cache: Whether to cache results
Returns:
Table with distinct rows and optional counts
"""
def duplicates(table, key=None, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
"""
Return duplicate rows.
Parameters:
- table: Input table
- key: Field(s) to identify duplicates
- presorted: If True, table is pre-sorted
- buffersize: Buffer size for sorting
- tempdir: Directory for temporary files
- cache: Whether to cache results
Returns:
Table with duplicate rows only
"""Select specific rows from each group based on various criteria.
def groupselectfirst(table, key, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
"""
Select first row from each group.
Parameters:
- table: Input table
- key: Field(s) to group by
- presorted: If True, table is pre-sorted by key
- buffersize: Buffer size for sorting
- tempdir: Directory for temporary files
- cache: Whether to cache results
Returns:
Table with first row from each group
"""
def groupselectlast(table, key, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
"""
Select last row from each group.
Parameters:
- table: Input table
- key: Field(s) to group by
- presorted: If True, table is pre-sorted by key
- buffersize: Buffer size for sorting
- tempdir: Directory for temporary files
- cache: Whether to cache results
Returns:
Table with last row from each group
"""
def groupselectmin(table, key, value, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
"""
Select row with minimum value from each group.
Parameters:
- table: Input table
- key: Field(s) to group by
- value: Field to find minimum value for
- presorted: If True, table is pre-sorted by key
- buffersize: Buffer size for sorting
- tempdir: Directory for temporary files
- cache: Whether to cache results
Returns:
Table with min-value row from each group
"""
def groupselectmax(table, key, value, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
"""
Select row with maximum value from each group.
Parameters:
- table: Input table
- key: Field(s) to group by
- value: Field to find maximum value for
- presorted: If True, table is pre-sorted by key
- buffersize: Buffer size for sorting
- tempdir: Directory for temporary files
- cache: Whether to cache results
Returns:
Table with max-value row from each group
"""Specialized aggregation functions for statistical analysis.
def groupcountdistinctvalues(table, key, value) -> Table:
"""
Count distinct values in groups.
Parameters:
- table: Input table
- key: Field(s) to group by
- value: Field to count distinct values for
Returns:
Table with distinct value counts per group
"""
def valuecounts(table, *field, **kwargs) -> Table:
"""
Return a table with value counts for the specified field(s).
Parameters:
- table: Input table
- field: Field name(s) to count values for
- kwargs: Additional options
Returns:
Table with value counts
"""
def valuecounter(table, *field, **kwargs):
"""
Return a Counter of values in the specified field(s).
Parameters:
- table: Input table
- field: Field name(s) to count
- kwargs: Additional options
Returns:
Counter object with value counts
"""
def typecounts(table, field) -> Table:
"""
Return a table with type counts for the specified field.
Parameters:
- table: Input table
- field: Field name to analyze types
Returns:
Table with Python type counts
"""
def parsecounts(table, field, parsers=(('int', int), ('float', float))) -> Table:
"""
Return a table with parsing attempt counts.
Parameters:
- table: Input table
- field: Field name to test parsing
- parsers: List of (name, parser_function) tuples
Returns:
Table with parsing success counts
"""Identify and handle conflicting values during aggregation.
def conflicts(table, key, missing=None, include=None, exclude=None,
presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
"""
Return rows with conflicting values for the same key.
Parameters:
- table: Input table
- key: Field(s) to identify conflicts
- missing: Value to treat as missing
- include: Fields to include in conflict detection
- exclude: Fields to exclude from conflict detection
- presorted: If True, table is pre-sorted by key
- buffersize: Buffer size for sorting
- tempdir: Directory for temporary files
- cache: Whether to cache results
Returns:
Table with conflicting rows
"""
def isunique(table, field) -> bool:
"""
Test if field values are unique.
Parameters:
- table: Input table
- field: Field name to test for uniqueness
Returns:
Boolean indicating if all field values are unique
"""import petl as etl
sales = etl.fromcsv('sales.csv') # date, product, region, amount
# Simple aggregation
total_by_region = etl.aggregate(sales, 'region', 'amount', sum)
# Multiple aggregations
summary = etl.aggregate(sales, 'region', {
'total_amount': ('amount', sum),
'avg_amount': ('amount', lambda vals: sum(vals) / len(vals)),
'count': len
})
# Group by multiple fields
regional_product = etl.aggregate(sales, ('region', 'product'), 'amount', sum)import petl as etl
from statistics import median, stdev
sales = etl.fromcsv('sales.csv')
# Custom aggregation with multiple statistics
stats_summary = etl.aggregate(sales, 'product', {
'total_sales': ('amount', sum),
'avg_sales': ('amount', lambda vals: sum(vals) / len(vals)),
'median_sales': ('amount', median),
'std_dev': ('amount', lambda vals: stdev(vals) if len(vals) > 1 else 0),
'transaction_count': len,
'max_sale': ('amount', max),
'min_sale': ('amount', min)
})import petl as etl
transactions = etl.fromcsv('transactions.csv')
# Custom row reducer
def combine_transactions(rows):
"""Combine multiple transaction rows into summary row."""
if not rows:
return None
total_amount = sum(row[2] for row in rows) # amount field
transaction_count = len(rows)
avg_amount = total_amount / transaction_count
return (rows[0][0], rows[0][1], total_amount, transaction_count, avg_amount)
# Apply row reduction
summary = etl.rowreduce(transactions, 'customer_id', combine_transactions,
header=['customer_id', 'customer_name', 'total_amount',
'transaction_count', 'avg_amount'])import petl as etl
customers = etl.fromcsv('customers.csv')
# Remove duplicate customers
unique_customers = etl.unique(customers, key='email')
# Find duplicate entries
duplicates = etl.duplicates(customers, key='email')
# Get distinct customers with count
distinct_with_count = etl.distinct(customers, key='email', count='duplicate_count')
# Merge duplicate records
merged = etl.mergeduplicates(customers, key='email')import petl as etl
orders = etl.fromcsv('orders.csv') # customer_id, order_date, amount
# Get first order for each customer
first_orders = etl.groupselectfirst(orders, 'customer_id')
# Get latest order for each customer (assuming sorted by date)
latest_orders = etl.groupselectlast(orders, 'customer_id')
# Get highest value order for each customer
highest_orders = etl.groupselectmax(orders, 'customer_id', 'amount')
# Get lowest value order for each customer
lowest_orders = etl.groupselectmin(orders, 'customer_id', 'amount')import petl as etl
survey = etl.fromcsv('survey.csv')
# Count values in a field
age_counts = etl.valuecounts(survey, 'age_group')
# Count combinations of fields
region_gender_counts = etl.valuecounts(survey, 'region', 'gender')
# Get counter object for programmatic access
age_counter = etl.valuecounter(survey, 'age_group')
print(f"Most common age group: {age_counter.most_common(1)[0]}")
# Analyze data types in a field
type_analysis = etl.typecounts(survey, 'income')
# Test parsing capabilities
parsing_analysis = etl.parsecounts(survey, 'income', [
('int', int),
('float', float),
('currency', lambda x: float(x.replace('$', '').replace(',', '')))
])import petl as etl
customer_data = etl.fromcsv('customer_updates.csv')
# Find conflicting customer information
conflicts = etl.conflicts(customer_data, 'customer_id',
exclude=['last_updated'])
# Check if customer IDs are unique
id_unique = etl.isunique(customer_data, 'customer_id')
if not id_unique:
print("Warning: Customer IDs are not unique!")
duplicate_ids = etl.duplicates(customer_data, 'customer_id')import petl as etl
from datetime import datetime
sales = etl.fromcsv('sales.csv')
# Time-based aggregation with date parsing
sales_with_date = etl.convert(sales, 'date', lambda x: datetime.strptime(x, '%Y-%m-%d'))
monthly_sales = etl.aggregate(
etl.addfield(sales_with_date, 'month', lambda row: row.date.strftime('%Y-%m')),
'month',
'amount',
sum
)
# Rolling/window aggregation using fold
def rolling_average(accumulator, row):
"""Calculate rolling 3-period average."""
if accumulator is None:
accumulator = []
accumulator.append(row)
if len(accumulator) > 3:
accumulator.pop(0)
avg = sum(r.amount for r in accumulator) / len(accumulator)
return accumulator # Keep accumulator for next iteration
# Complex multi-level aggregation
hierarchical = etl.aggregate(sales, ('region', 'product', 'quarter'), {
'total_sales': ('amount', sum),
'unit_count': ('units', sum),
'avg_price': (lambda rows: sum(r.amount for r in rows) / sum(r.units for r in rows))
})Install with Tessl CLI
npx tessl i tessl/pypi-petl