0
# Aggregation and Grouping
1
2
Functions for grouping data and performing aggregation operations. PETL provides comprehensive aggregation capabilities including custom aggregation functions, reduction operations on grouped data, and statistical computations.
3
4
## Capabilities
5
6
### Core Aggregation
7
8
Primary functions for grouping and aggregating data with flexible aggregation specifications.
9
10
```python { .api }
11
def aggregate(table, key, aggregation=None, value=None, presorted=False,
12
buffersize=None, tempdir=None, cache=True) -> Table:
13
"""
14
Aggregate data grouped by key.
15
16
Parameters:
17
- table: Input table
18
- key: Field(s) to group by
19
- aggregation: Aggregation function(s) (sum, min, max, mean, count, etc.)
20
- value: Field(s) to aggregate (default: all numeric fields)
21
- presorted: If True, table is pre-sorted by key
22
- buffersize: Buffer size for sorting
23
- tempdir: Directory for temporary files
24
- cache: Whether to cache results
25
26
Returns:
27
Table with aggregated results
28
"""
29
30
def rowreduce(table, key, reducer, header=None, presorted=False,
31
buffersize=None, tempdir=None, cache=True) -> Table:
32
"""
33
Reduce rows grouped by key using reducer function.
34
35
Parameters:
36
- table: Input table
37
- key: Field(s) to group by
38
- reducer: Function to reduce rows (takes list of rows, returns single row)
39
- header: Header for output table
40
- presorted: If True, table is pre-sorted by key
41
- buffersize: Buffer size for sorting
42
- tempdir: Directory for temporary files
43
- cache: Whether to cache results
44
45
Returns:
46
Table with reduced rows
47
"""
48
49
def fold(table, key, f, header=None, presorted=False,
50
buffersize=None, tempdir=None, cache=True) -> Table:
51
"""
52
Fold (reduce) groups of rows into single rows.
53
54
Parameters:
55
- table: Input table
56
- key: Field(s) to group by
57
- f: Fold function (takes accumulator and row, returns new accumulator)
58
- header: Header for output table
59
- presorted: If True, table is pre-sorted by key
60
- buffersize: Buffer size for sorting
61
- tempdir: Directory for temporary files
62
- cache: Whether to cache results
63
64
Returns:
65
Table with folded results
66
"""
67
```
68
69
### Data Merging and Deduplication
70
71
Combine and merge rows with the same key values.
72
73
```python { .api }
74
def merge(table, key, missing=None, presorted=False,
75
buffersize=None, tempdir=None, cache=True) -> Table:
76
"""
77
Merge rows with the same key values.
78
79
Parameters:
80
- table: Input table
81
- key: Field(s) to group by
82
- missing: Value for missing fields during merge
83
- presorted: If True, table is pre-sorted by key
84
- buffersize: Buffer size for sorting
85
- tempdir: Directory for temporary files
86
- cache: Whether to cache results
87
88
Returns:
89
Table with merged rows
90
"""
91
92
def mergeduplicates(table, key, missing=None, presorted=False,
93
buffersize=None, tempdir=None, cache=True) -> Table:
94
"""
95
Merge duplicate key rows.
96
97
Parameters:
98
- table: Input table
99
- key: Field(s) to identify duplicates
100
- missing: Value for missing fields during merge
101
- presorted: If True, table is pre-sorted by key
102
- buffersize: Buffer size for sorting
103
- tempdir: Directory for temporary files
104
- cache: Whether to cache results
105
106
Returns:
107
Table with duplicates merged
108
"""
109
110
def unique(table, key=None, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
111
"""
112
Return unique rows.
113
114
Parameters:
115
- table: Input table
116
- key: Field(s) to determine uniqueness (default: all fields)
117
- presorted: If True, table is pre-sorted
118
- buffersize: Buffer size for sorting
119
- tempdir: Directory for temporary files
120
- cache: Whether to cache results
121
122
Returns:
123
Table with unique rows only
124
"""
125
126
def distinct(table, key=None, count=None, presorted=False,
127
buffersize=None, tempdir=None, cache=True) -> Table:
128
"""
129
Return distinct rows with optional counts.
130
131
Parameters:
132
- table: Input table
133
- key: Field(s) to determine distinctness
134
- count: Field name for count column
135
- presorted: If True, table is pre-sorted
136
- buffersize: Buffer size for sorting
137
- tempdir: Directory for temporary files
138
- cache: Whether to cache results
139
140
Returns:
141
Table with distinct rows and optional counts
142
"""
143
144
def duplicates(table, key=None, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
145
"""
146
Return duplicate rows.
147
148
Parameters:
149
- table: Input table
150
- key: Field(s) to identify duplicates
151
- presorted: If True, table is pre-sorted
152
- buffersize: Buffer size for sorting
153
- tempdir: Directory for temporary files
154
- cache: Whether to cache results
155
156
Returns:
157
Table with duplicate rows only
158
"""
159
```
160
161
### Group Selection Operations
162
163
Select specific rows from each group based on various criteria.
164
165
```python { .api }
166
def groupselectfirst(table, key, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
167
"""
168
Select first row from each group.
169
170
Parameters:
171
- table: Input table
172
- key: Field(s) to group by
173
- presorted: If True, table is pre-sorted by key
174
- buffersize: Buffer size for sorting
175
- tempdir: Directory for temporary files
176
- cache: Whether to cache results
177
178
Returns:
179
Table with first row from each group
180
"""
181
182
def groupselectlast(table, key, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
183
"""
184
Select last row from each group.
185
186
Parameters:
187
- table: Input table
188
- key: Field(s) to group by
189
- presorted: If True, table is pre-sorted by key
190
- buffersize: Buffer size for sorting
191
- tempdir: Directory for temporary files
192
- cache: Whether to cache results
193
194
Returns:
195
Table with last row from each group
196
"""
197
198
def groupselectmin(table, key, value, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
199
"""
200
Select row with minimum value from each group.
201
202
Parameters:
203
- table: Input table
204
- key: Field(s) to group by
205
- value: Field to find minimum value for
206
- presorted: If True, table is pre-sorted by key
207
- buffersize: Buffer size for sorting
208
- tempdir: Directory for temporary files
209
- cache: Whether to cache results
210
211
Returns:
212
Table with min-value row from each group
213
"""
214
215
def groupselectmax(table, key, value, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
216
"""
217
Select row with maximum value from each group.
218
219
Parameters:
220
- table: Input table
221
- key: Field(s) to group by
222
- value: Field to find maximum value for
223
- presorted: If True, table is pre-sorted by key
224
- buffersize: Buffer size for sorting
225
- tempdir: Directory for temporary files
226
- cache: Whether to cache results
227
228
Returns:
229
Table with max-value row from each group
230
"""
231
```
232
233
### Statistical Aggregations
234
235
Specialized aggregation functions for statistical analysis.
236
237
```python { .api }
238
def groupcountdistinctvalues(table, key, value) -> Table:
239
"""
240
Count distinct values in groups.
241
242
Parameters:
243
- table: Input table
244
- key: Field(s) to group by
245
- value: Field to count distinct values for
246
247
Returns:
248
Table with distinct value counts per group
249
"""
250
251
def valuecounts(table, *field, **kwargs) -> Table:
252
"""
253
Return a table with value counts for the specified field(s).
254
255
Parameters:
256
- table: Input table
257
- field: Field name(s) to count values for
258
- kwargs: Additional options
259
260
Returns:
261
Table with value counts
262
"""
263
264
def valuecounter(table, *field, **kwargs):
265
"""
266
Return a Counter of values in the specified field(s).
267
268
Parameters:
269
- table: Input table
270
- field: Field name(s) to count
271
- kwargs: Additional options
272
273
Returns:
274
Counter object with value counts
275
"""
276
277
def typecounts(table, field) -> Table:
278
"""
279
Return a table with type counts for the specified field.
280
281
Parameters:
282
- table: Input table
283
- field: Field name to analyze types
284
285
Returns:
286
Table with Python type counts
287
"""
288
289
def parsecounts(table, field, parsers=(('int', int), ('float', float))) -> Table:
290
"""
291
Return a table with parsing attempt counts.
292
293
Parameters:
294
- table: Input table
295
- field: Field name to test parsing
296
- parsers: List of (name, parser_function) tuples
297
298
Returns:
299
Table with parsing success counts
300
"""
301
```
302
303
### Conflict Detection
304
305
Identify and handle conflicting values during aggregation.
306
307
```python { .api }
308
def conflicts(table, key, missing=None, include=None, exclude=None,
309
presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:
310
"""
311
Return rows with conflicting values for the same key.
312
313
Parameters:
314
- table: Input table
315
- key: Field(s) to identify conflicts
316
- missing: Value to treat as missing
317
- include: Fields to include in conflict detection
318
- exclude: Fields to exclude from conflict detection
319
- presorted: If True, table is pre-sorted by key
320
- buffersize: Buffer size for sorting
321
- tempdir: Directory for temporary files
322
- cache: Whether to cache results
323
324
Returns:
325
Table with conflicting rows
326
"""
327
328
def isunique(table, field) -> bool:
329
"""
330
Test if field values are unique.
331
332
Parameters:
333
- table: Input table
334
- field: Field name to test for uniqueness
335
336
Returns:
337
Boolean indicating if all field values are unique
338
"""
339
```
340
341
## Usage Examples
342
343
### Basic Aggregation
344
345
```python
346
import petl as etl
347
348
sales = etl.fromcsv('sales.csv') # date, product, region, amount
349
350
# Simple aggregation
351
total_by_region = etl.aggregate(sales, 'region', 'amount', sum)
352
353
# Multiple aggregations
354
summary = etl.aggregate(sales, 'region', {
355
'total_amount': ('amount', sum),
356
'avg_amount': ('amount', lambda vals: sum(vals) / len(vals)),
357
'count': len
358
})
359
360
# Group by multiple fields
361
regional_product = etl.aggregate(sales, ('region', 'product'), 'amount', sum)
362
```
363
364
### Custom Aggregation Functions
365
366
```python
367
import petl as etl
368
from statistics import median, stdev
369
370
sales = etl.fromcsv('sales.csv')
371
372
# Custom aggregation with multiple statistics
373
stats_summary = etl.aggregate(sales, 'product', {
374
'total_sales': ('amount', sum),
375
'avg_sales': ('amount', lambda vals: sum(vals) / len(vals)),
376
'median_sales': ('amount', median),
377
'std_dev': ('amount', lambda vals: stdev(vals) if len(vals) > 1 else 0),
378
'transaction_count': len,
379
'max_sale': ('amount', max),
380
'min_sale': ('amount', min)
381
})
382
```
383
384
### Row Reduction
385
386
```python
387
import petl as etl
388
389
transactions = etl.fromcsv('transactions.csv')
390
391
# Custom row reducer
392
def combine_transactions(rows):
393
"""Combine multiple transaction rows into summary row."""
394
if not rows:
395
return None
396
397
total_amount = sum(row[2] for row in rows) # amount field
398
transaction_count = len(rows)
399
avg_amount = total_amount / transaction_count
400
401
return (rows[0][0], rows[0][1], total_amount, transaction_count, avg_amount)
402
403
# Apply row reduction
404
summary = etl.rowreduce(transactions, 'customer_id', combine_transactions,
405
header=['customer_id', 'customer_name', 'total_amount',
406
'transaction_count', 'avg_amount'])
407
```
408
409
### Deduplication and Uniqueness
410
411
```python
412
import petl as etl
413
414
customers = etl.fromcsv('customers.csv')
415
416
# Remove duplicate customers
417
unique_customers = etl.unique(customers, key='email')
418
419
# Find duplicate entries
420
duplicates = etl.duplicates(customers, key='email')
421
422
# Get distinct customers with count
423
distinct_with_count = etl.distinct(customers, key='email', count='duplicate_count')
424
425
# Merge duplicate records
426
merged = etl.mergeduplicates(customers, key='email')
427
```
428
429
### Group Selection
430
431
```python
432
import petl as etl
433
434
orders = etl.fromcsv('orders.csv') # customer_id, order_date, amount
435
436
# Get first order for each customer
437
first_orders = etl.groupselectfirst(orders, 'customer_id')
438
439
# Get latest order for each customer (assuming sorted by date)
440
latest_orders = etl.groupselectlast(orders, 'customer_id')
441
442
# Get highest value order for each customer
443
highest_orders = etl.groupselectmax(orders, 'customer_id', 'amount')
444
445
# Get lowest value order for each customer
446
lowest_orders = etl.groupselectmin(orders, 'customer_id', 'amount')
447
```
448
449
### Value Counting and Analysis
450
451
```python
452
import petl as etl
453
454
survey = etl.fromcsv('survey.csv')
455
456
# Count values in a field
457
age_counts = etl.valuecounts(survey, 'age_group')
458
459
# Count combinations of fields
460
region_gender_counts = etl.valuecounts(survey, 'region', 'gender')
461
462
# Get counter object for programmatic access
463
age_counter = etl.valuecounter(survey, 'age_group')
464
print(f"Most common age group: {age_counter.most_common(1)[0]}")
465
466
# Analyze data types in a field
467
type_analysis = etl.typecounts(survey, 'income')
468
469
# Test parsing capabilities
470
parsing_analysis = etl.parsecounts(survey, 'income', [
471
('int', int),
472
('float', float),
473
('currency', lambda x: float(x.replace('$', '').replace(',', '')))
474
])
475
```
476
477
### Conflict Detection
478
479
```python
480
import petl as etl
481
482
customer_data = etl.fromcsv('customer_updates.csv')
483
484
# Find conflicting customer information
485
conflicts = etl.conflicts(customer_data, 'customer_id',
486
exclude=['last_updated'])
487
488
# Check if customer IDs are unique
489
id_unique = etl.isunique(customer_data, 'customer_id')
490
491
if not id_unique:
492
print("Warning: Customer IDs are not unique!")
493
duplicate_ids = etl.duplicates(customer_data, 'customer_id')
494
```
495
496
### Advanced Aggregation Patterns
497
498
```python
499
import petl as etl
500
from datetime import datetime
501
502
sales = etl.fromcsv('sales.csv')
503
504
# Time-based aggregation with date parsing
505
sales_with_date = etl.convert(sales, 'date', lambda x: datetime.strptime(x, '%Y-%m-%d'))
506
monthly_sales = etl.aggregate(
507
etl.addfield(sales_with_date, 'month', lambda row: row.date.strftime('%Y-%m')),
508
'month',
509
'amount',
510
sum
511
)
512
513
# Rolling/window aggregation using fold
514
def rolling_average(accumulator, row):
515
"""Calculate rolling 3-period average."""
516
if accumulator is None:
517
accumulator = []
518
519
accumulator.append(row)
520
if len(accumulator) > 3:
521
accumulator.pop(0)
522
523
avg = sum(r.amount for r in accumulator) / len(accumulator)
524
return accumulator # Keep accumulator for next iteration
525
526
# Complex multi-level aggregation
527
hierarchical = etl.aggregate(sales, ('region', 'product', 'quarter'), {
528
'total_sales': ('amount', sum),
529
'unit_count': ('units', sum),
530
'avg_price': (lambda rows: sum(r.amount for r in rows) / sum(r.units for r in rows))
531
})
532
```