or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation.mdbasic-transformations.mddata-io.mddata-reshaping.mdindex.mdsorting-joins.mdtable-operations.mdvalidation-analysis.md

aggregation.mddocs/

0

# Aggregation and Grouping

1

2

Functions for grouping data and performing aggregation operations. PETL provides comprehensive aggregation capabilities including custom aggregation functions, reduction operations on grouped data, and statistical computations.

3

4

## Capabilities

5

6

### Core Aggregation

7

8

Primary functions for grouping and aggregating data with flexible aggregation specifications.

9

10

```python { .api }

11

def aggregate(table, key, aggregation=None, value=None, presorted=False,

12

buffersize=None, tempdir=None, cache=True) -> Table:

13

"""

14

Aggregate data grouped by key.

15

16

Parameters:

17

- table: Input table

18

- key: Field(s) to group by

19

- aggregation: Aggregation function(s) (sum, min, max, mean, count, etc.)

20

- value: Field(s) to aggregate (default: all numeric fields)

21

- presorted: If True, table is pre-sorted by key

22

- buffersize: Buffer size for sorting

23

- tempdir: Directory for temporary files

24

- cache: Whether to cache results

25

26

Returns:

27

Table with aggregated results

28

"""

29

30

def rowreduce(table, key, reducer, header=None, presorted=False,

31

buffersize=None, tempdir=None, cache=True) -> Table:

32

"""

33

Reduce rows grouped by key using reducer function.

34

35

Parameters:

36

- table: Input table

37

- key: Field(s) to group by

38

- reducer: Function to reduce rows (takes list of rows, returns single row)

39

- header: Header for output table

40

- presorted: If True, table is pre-sorted by key

41

- buffersize: Buffer size for sorting

42

- tempdir: Directory for temporary files

43

- cache: Whether to cache results

44

45

Returns:

46

Table with reduced rows

47

"""

48

49

def fold(table, key, f, header=None, presorted=False,

50

buffersize=None, tempdir=None, cache=True) -> Table:

51

"""

52

Fold (reduce) groups of rows into single rows.

53

54

Parameters:

55

- table: Input table

56

- key: Field(s) to group by

57

- f: Fold function (takes accumulator and row, returns new accumulator)

58

- header: Header for output table

59

- presorted: If True, table is pre-sorted by key

60

- buffersize: Buffer size for sorting

61

- tempdir: Directory for temporary files

62

- cache: Whether to cache results

63

64

Returns:

65

Table with folded results

66

"""

67

```

68

69

### Data Merging and Deduplication

70

71

Combine and merge rows with the same key values.

72

73

```python { .api }

74

def merge(table, key, missing=None, presorted=False,

75

buffersize=None, tempdir=None, cache=True) -> Table:

76

"""

77

Merge rows with the same key values.

78

79

Parameters:

80

- table: Input table

81

- key: Field(s) to group by

82

- missing: Value for missing fields during merge

83

- presorted: If True, table is pre-sorted by key

84

- buffersize: Buffer size for sorting

85

- tempdir: Directory for temporary files

86

- cache: Whether to cache results

87

88

Returns:

89

Table with merged rows

90

"""

91

92

def mergeduplicates(table, key, missing=None, presorted=False,

93

buffersize=None, tempdir=None, cache=True) -> Table:

94

"""

95

Merge duplicate key rows.

96

97

Parameters:

98

- table: Input table

99

- key: Field(s) to identify duplicates

100

- missing: Value for missing fields during merge

101

- presorted: If True, table is pre-sorted by key

102

- buffersize: Buffer size for sorting

103

- tempdir: Directory for temporary files

104

- cache: Whether to cache results

105

106

Returns:

107

Table with duplicates merged

108

"""

109

110

def unique(table, key=None, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:

111

"""

112

Return unique rows.

113

114

Parameters:

115

- table: Input table

116

- key: Field(s) to determine uniqueness (default: all fields)

117

- presorted: If True, table is pre-sorted

118

- buffersize: Buffer size for sorting

119

- tempdir: Directory for temporary files

120

- cache: Whether to cache results

121

122

Returns:

123

Table with unique rows only

124

"""

125

126

def distinct(table, key=None, count=None, presorted=False,

127

buffersize=None, tempdir=None, cache=True) -> Table:

128

"""

129

Return distinct rows with optional counts.

130

131

Parameters:

132

- table: Input table

133

- key: Field(s) to determine distinctness

134

- count: Field name for count column

135

- presorted: If True, table is pre-sorted

136

- buffersize: Buffer size for sorting

137

- tempdir: Directory for temporary files

138

- cache: Whether to cache results

139

140

Returns:

141

Table with distinct rows and optional counts

142

"""

143

144

def duplicates(table, key=None, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:

145

"""

146

Return duplicate rows.

147

148

Parameters:

149

- table: Input table

150

- key: Field(s) to identify duplicates

151

- presorted: If True, table is pre-sorted

152

- buffersize: Buffer size for sorting

153

- tempdir: Directory for temporary files

154

- cache: Whether to cache results

155

156

Returns:

157

Table with duplicate rows only

158

"""

159

```

160

161

### Group Selection Operations

162

163

Select specific rows from each group based on various criteria.

164

165

```python { .api }

166

def groupselectfirst(table, key, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:

167

"""

168

Select first row from each group.

169

170

Parameters:

171

- table: Input table

172

- key: Field(s) to group by

173

- presorted: If True, table is pre-sorted by key

174

- buffersize: Buffer size for sorting

175

- tempdir: Directory for temporary files

176

- cache: Whether to cache results

177

178

Returns:

179

Table with first row from each group

180

"""

181

182

def groupselectlast(table, key, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:

183

"""

184

Select last row from each group.

185

186

Parameters:

187

- table: Input table

188

- key: Field(s) to group by

189

- presorted: If True, table is pre-sorted by key

190

- buffersize: Buffer size for sorting

191

- tempdir: Directory for temporary files

192

- cache: Whether to cache results

193

194

Returns:

195

Table with last row from each group

196

"""

197

198

def groupselectmin(table, key, value, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:

199

"""

200

Select row with minimum value from each group.

201

202

Parameters:

203

- table: Input table

204

- key: Field(s) to group by

205

- value: Field to find minimum value for

206

- presorted: If True, table is pre-sorted by key

207

- buffersize: Buffer size for sorting

208

- tempdir: Directory for temporary files

209

- cache: Whether to cache results

210

211

Returns:

212

Table with min-value row from each group

213

"""

214

215

def groupselectmax(table, key, value, presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:

216

"""

217

Select row with maximum value from each group.

218

219

Parameters:

220

- table: Input table

221

- key: Field(s) to group by

222

- value: Field to find maximum value for

223

- presorted: If True, table is pre-sorted by key

224

- buffersize: Buffer size for sorting

225

- tempdir: Directory for temporary files

226

- cache: Whether to cache results

227

228

Returns:

229

Table with max-value row from each group

230

"""

231

```

232

233

### Statistical Aggregations

234

235

Specialized aggregation functions for statistical analysis.

236

237

```python { .api }

238

def groupcountdistinctvalues(table, key, value) -> Table:

239

"""

240

Count distinct values in groups.

241

242

Parameters:

243

- table: Input table

244

- key: Field(s) to group by

245

- value: Field to count distinct values for

246

247

Returns:

248

Table with distinct value counts per group

249

"""

250

251

def valuecounts(table, *field, **kwargs) -> Table:

252

"""

253

Return a table with value counts for the specified field(s).

254

255

Parameters:

256

- table: Input table

257

- field: Field name(s) to count values for

258

- kwargs: Additional options

259

260

Returns:

261

Table with value counts

262

"""

263

264

def valuecounter(table, *field, **kwargs):

265

"""

266

Return a Counter of values in the specified field(s).

267

268

Parameters:

269

- table: Input table

270

- field: Field name(s) to count

271

- kwargs: Additional options

272

273

Returns:

274

Counter object with value counts

275

"""

276

277

def typecounts(table, field) -> Table:

278

"""

279

Return a table with type counts for the specified field.

280

281

Parameters:

282

- table: Input table

283

- field: Field name to analyze types

284

285

Returns:

286

Table with Python type counts

287

"""

288

289

def parsecounts(table, field, parsers=(('int', int), ('float', float))) -> Table:

290

"""

291

Return a table with parsing attempt counts.

292

293

Parameters:

294

- table: Input table

295

- field: Field name to test parsing

296

- parsers: List of (name, parser_function) tuples

297

298

Returns:

299

Table with parsing success counts

300

"""

301

```

302

303

### Conflict Detection

304

305

Identify and handle conflicting values during aggregation.

306

307

```python { .api }

308

def conflicts(table, key, missing=None, include=None, exclude=None,

309

presorted=False, buffersize=None, tempdir=None, cache=True) -> Table:

310

"""

311

Return rows with conflicting values for the same key.

312

313

Parameters:

314

- table: Input table

315

- key: Field(s) to identify conflicts

316

- missing: Value to treat as missing

317

- include: Fields to include in conflict detection

318

- exclude: Fields to exclude from conflict detection

319

- presorted: If True, table is pre-sorted by key

320

- buffersize: Buffer size for sorting

321

- tempdir: Directory for temporary files

322

- cache: Whether to cache results

323

324

Returns:

325

Table with conflicting rows

326

"""

327

328

def isunique(table, field) -> bool:

329

"""

330

Test if field values are unique.

331

332

Parameters:

333

- table: Input table

334

- field: Field name to test for uniqueness

335

336

Returns:

337

Boolean indicating if all field values are unique

338

"""

339

```

340

341

## Usage Examples

342

343

### Basic Aggregation

344

345

```python

346

import petl as etl

347

348

sales = etl.fromcsv('sales.csv') # date, product, region, amount

349

350

# Simple aggregation

351

total_by_region = etl.aggregate(sales, 'region', 'amount', sum)

352

353

# Multiple aggregations

354

summary = etl.aggregate(sales, 'region', {

355

'total_amount': ('amount', sum),

356

'avg_amount': ('amount', lambda vals: sum(vals) / len(vals)),

357

'count': len

358

})

359

360

# Group by multiple fields

361

regional_product = etl.aggregate(sales, ('region', 'product'), 'amount', sum)

362

```

363

364

### Custom Aggregation Functions

365

366

```python

367

import petl as etl

368

from statistics import median, stdev

369

370

sales = etl.fromcsv('sales.csv')

371

372

# Custom aggregation with multiple statistics

373

stats_summary = etl.aggregate(sales, 'product', {

374

'total_sales': ('amount', sum),

375

'avg_sales': ('amount', lambda vals: sum(vals) / len(vals)),

376

'median_sales': ('amount', median),

377

'std_dev': ('amount', lambda vals: stdev(vals) if len(vals) > 1 else 0),

378

'transaction_count': len,

379

'max_sale': ('amount', max),

380

'min_sale': ('amount', min)

381

})

382

```

383

384

### Row Reduction

385

386

```python

387

import petl as etl

388

389

transactions = etl.fromcsv('transactions.csv')

390

391

# Custom row reducer

392

def combine_transactions(rows):

393

"""Combine multiple transaction rows into summary row."""

394

if not rows:

395

return None

396

397

total_amount = sum(row[2] for row in rows) # amount field

398

transaction_count = len(rows)

399

avg_amount = total_amount / transaction_count

400

401

return (rows[0][0], rows[0][1], total_amount, transaction_count, avg_amount)

402

403

# Apply row reduction

404

summary = etl.rowreduce(transactions, 'customer_id', combine_transactions,

405

header=['customer_id', 'customer_name', 'total_amount',

406

'transaction_count', 'avg_amount'])

407

```

408

409

### Deduplication and Uniqueness

410

411

```python

412

import petl as etl

413

414

customers = etl.fromcsv('customers.csv')

415

416

# Remove duplicate customers

417

unique_customers = etl.unique(customers, key='email')

418

419

# Find duplicate entries

420

duplicates = etl.duplicates(customers, key='email')

421

422

# Get distinct customers with count

423

distinct_with_count = etl.distinct(customers, key='email', count='duplicate_count')

424

425

# Merge duplicate records

426

merged = etl.mergeduplicates(customers, key='email')

427

```

428

429

### Group Selection

430

431

```python

432

import petl as etl

433

434

orders = etl.fromcsv('orders.csv') # customer_id, order_date, amount

435

436

# Get first order for each customer

437

first_orders = etl.groupselectfirst(orders, 'customer_id')

438

439

# Get latest order for each customer (assuming sorted by date)

440

latest_orders = etl.groupselectlast(orders, 'customer_id')

441

442

# Get highest value order for each customer

443

highest_orders = etl.groupselectmax(orders, 'customer_id', 'amount')

444

445

# Get lowest value order for each customer

446

lowest_orders = etl.groupselectmin(orders, 'customer_id', 'amount')

447

```

448

449

### Value Counting and Analysis

450

451

```python

452

import petl as etl

453

454

survey = etl.fromcsv('survey.csv')

455

456

# Count values in a field

457

age_counts = etl.valuecounts(survey, 'age_group')

458

459

# Count combinations of fields

460

region_gender_counts = etl.valuecounts(survey, 'region', 'gender')

461

462

# Get counter object for programmatic access

463

age_counter = etl.valuecounter(survey, 'age_group')

464

print(f"Most common age group: {age_counter.most_common(1)[0]}")

465

466

# Analyze data types in a field

467

type_analysis = etl.typecounts(survey, 'income')

468

469

# Test parsing capabilities

470

parsing_analysis = etl.parsecounts(survey, 'income', [

471

('int', int),

472

('float', float),

473

('currency', lambda x: float(x.replace('$', '').replace(',', '')))

474

])

475

```

476

477

### Conflict Detection

478

479

```python

480

import petl as etl

481

482

customer_data = etl.fromcsv('customer_updates.csv')

483

484

# Find conflicting customer information

485

conflicts = etl.conflicts(customer_data, 'customer_id',

486

exclude=['last_updated'])

487

488

# Check if customer IDs are unique

489

id_unique = etl.isunique(customer_data, 'customer_id')

490

491

if not id_unique:

492

print("Warning: Customer IDs are not unique!")

493

duplicate_ids = etl.duplicates(customer_data, 'customer_id')

494

```

495

496

### Advanced Aggregation Patterns

497

498

```python

499

import petl as etl

500

from datetime import datetime

501

502

sales = etl.fromcsv('sales.csv')

503

504

# Time-based aggregation with date parsing

505

sales_with_date = etl.convert(sales, 'date', lambda x: datetime.strptime(x, '%Y-%m-%d'))

506

monthly_sales = etl.aggregate(

507

etl.addfield(sales_with_date, 'month', lambda row: row.date.strftime('%Y-%m')),

508

'month',

509

'amount',

510

sum

511

)

512

513

# Rolling/window aggregation using fold

514

def rolling_average(accumulator, row):

515

"""Calculate rolling 3-period average."""

516

if accumulator is None:

517

accumulator = []

518

519

accumulator.append(row)

520

if len(accumulator) > 3:

521

accumulator.pop(0)

522

523

avg = sum(r.amount for r in accumulator) / len(accumulator)

524

return accumulator # Keep accumulator for next iteration

525

526

# Complex multi-level aggregation

527

hierarchical = etl.aggregate(sales, ('region', 'product', 'quarter'), {

528

'total_sales': ('amount', sum),

529

'unit_count': ('units', sum),

530

'avg_price': (lambda rows: sum(r.amount for r in rows) / sum(r.units for r in rows))

531

})

532

```