or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md

grouping-aggregation.mddocs/

0

# Grouping and Aggregation

1

2

Group-wise operations and aggregation functions for analyzing and summarizing grouped data. These operations are essential for data analytics and reporting workflows.

3

4

## Capabilities

5

6

### DataSet Grouping

7

8

Create grouped DataSets for group-wise operations using various key selection strategies.

9

10

```scala { .api }

11

class DataSet[T] {

12

/**

13

* Groups elements by key selector function

14

* @param fun Key selector function

15

* @return GroupedDataSet for group-wise operations

16

*/

17

def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]

18

19

/**

20

* Groups elements by field positions

21

* @param fields Field positions to group by

22

* @return GroupedDataSet for group-wise operations

23

*/

24

def groupBy(fields: Int*): GroupedDataSet[T]

25

26

/**

27

* Groups elements by field names

28

* @param firstField First field name

29

* @param otherFields Additional field names

30

* @return GroupedDataSet for group-wise operations

31

*/

32

def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]

33

}

34

```

35

36

**Usage Examples:**

37

38

```scala

39

import org.apache.flink.api.scala._

40

41

case class Sale(product: String, category: String, amount: Double, date: String)

42

43

val env = ExecutionEnvironment.getExecutionEnvironment

44

val sales = env.fromElements(

45

Sale("Laptop", "Electronics", 999.99, "2023-01-15"),

46

Sale("Phone", "Electronics", 599.99, "2023-01-16"),

47

Sale("Chair", "Furniture", 149.99, "2023-01-15"),

48

Sale("Desk", "Furniture", 299.99, "2023-01-16")

49

)

50

51

// Group by category

52

val byCategory = sales.groupBy(_.category)

53

54

// Group by multiple fields

55

val byCategoryAndDate = sales.groupBy(s => (s.category, s.date))

56

57

// Group by field positions (for tuples/case classes)

58

val salesTuples = sales.map(s => (s.category, s.amount))

59

val groupedTuples = salesTuples.groupBy(0) // Group by first field

60

```

61

62

### Group Reductions

63

64

Apply reduction operations within each group to combine elements.

65

66

```scala { .api }

67

class GroupedDataSet[T] {

68

/**

69

* Reduces elements within each group using a combining function

70

* @param fun Binary combining function

71

* @return DataSet with one reduced element per group

72

*/

73

def reduce(fun: (T, T) => T): DataSet[T]

74

75

/**

76

* Reduces with combine hint for optimization

77

* @param fun Binary combining function

78

* @param strategy Combine strategy hint

79

* @return DataSet with reduced elements per group

80

*/

81

def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T]

82

83

/**

84

* Reduces using a ReduceFunction

85

* @param reducer ReduceFunction implementation

86

* @return DataSet with reduced elements

87

*/

88

def reduce(reducer: ReduceFunction[T]): DataSet[T]

89

}

90

```

91

92

### Group Processing

93

94

Process entire groups using iterators for complex group-wise computations.

95

96

```scala { .api }

97

class GroupedDataSet[T] {

98

/**

99

* Processes each group using an iterator

100

* @param fun Function processing group iterator to produce result

101

* @return DataSet with group processing results

102

*/

103

def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]

104

105

/**

106

* Processes groups using iterator and collector

107

* @param fun Function with group iterator and result collector

108

* @return DataSet with collected group results

109

*/

110

def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]

111

112

/**

113

* Applies GroupReduceFunction to process groups

114

* @param reducer GroupReduceFunction implementation

115

* @return DataSet with group reduction results

116

*/

117

def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]

118

}

119

```

120

121

**Usage Examples:**

122

123

```scala

124

// Calculate total sales per category

125

val categoryTotals = sales

126

.groupBy(_.category)

127

.reduceGroup(_.map(_.amount).sum)

128

129

// Complex group processing

130

case class CategoryStats(category: String, totalAmount: Double, count: Int, avgAmount: Double)

131

132

val categoryStats = sales

133

.groupBy(_.category)

134

.reduceGroup { salesInCategory =>

135

val salesList = salesInCategory.toList

136

val total = salesList.map(_.amount).sum

137

val count = salesList.length

138

CategoryStats(salesList.head.category, total, count, total / count)

139

}

140

```

141

142

### Group Combining

143

144

Pre-aggregate elements within partitions before final grouping for better performance.

145

146

```scala { .api }

147

class GroupedDataSet[T] {

148

/**

149

* Combines elements within partitions before final grouping

150

* @param fun Function for partition-wise combining with collector

151

* @return DataSet with partition-combined results

152

*/

153

def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]

154

155

/**

156

* Applies GroupCombineFunction for partition-wise combining

157

* @param combiner GroupCombineFunction implementation

158

* @return DataSet with combined results

159

*/

160

def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]

161

}

162

```

163

164

### Built-in Aggregations

165

166

Convenient aggregation functions for common operations like sum, min, max.

167

168

```scala { .api }

169

class GroupedDataSet[T] {

170

/**

171

* Sums values in specified field across each group

172

* @param field Field position to sum

173

* @return AggregateDataSet for chaining additional aggregations

174

*/

175

def sum(field: Int): AggregateDataSet[T]

176

177

/**

178

* Sums values in named field across each group

179

* @param field Field name to sum

180

* @return AggregateDataSet for chaining additional aggregations

181

*/

182

def sum(field: String): AggregateDataSet[T]

183

184

/**

185

* Finds maximum values in specified field across each group

186

* @param field Field position for maximum

187

* @return AggregateDataSet for chaining additional aggregations

188

*/

189

def max(field: Int): AggregateDataSet[T]

190

191

/**

192

* Finds maximum values in named field across each group

193

* @param field Field name for maximum

194

* @return AggregateDataSet for chaining additional aggregations

195

*/

196

def max(field: String): AggregateDataSet[T]

197

198

/**

199

* Finds minimum values in specified field across each group

200

* @param field Field position for minimum

201

* @return AggregateDataSet for chaining additional aggregations

202

*/

203

def min(field: Int): AggregateDataSet[T]

204

205

/**

206

* Finds minimum values in named field across each group

207

* @param field Field name for minimum

208

* @return AggregateDataSet for chaining additional aggregations

209

*/

210

def min(field: String): AggregateDataSet[T]

211

212

/**

213

* Applies specified aggregation to field across each group

214

* @param agg Aggregation type (SUM, MAX, MIN)

215

* @param field Field position for aggregation

216

* @return AggregateDataSet for chaining additional aggregations

217

*/

218

def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]

219

220

/**

221

* Applies specified aggregation to named field across each group

222

* @param agg Aggregation type (SUM, MAX, MIN)

223

* @param field Field name for aggregation

224

* @return AggregateDataSet for chaining additional aggregations

225

*/

226

def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]

227

}

228

```

229

230

### Element Selection

231

232

Select specific elements from each group based on field values.

233

234

```scala { .api }

235

class GroupedDataSet[T] {

236

/**

237

* Selects elements with minimum values in specified fields from each group

238

* @param fields Field positions for minimum comparison

239

* @return DataSet with minimum elements per group

240

*/

241

def minBy(fields: Int*): DataSet[T]

242

243

/**

244

* Selects elements with maximum values in specified fields from each group

245

* @param fields Field positions for maximum comparison

246

* @return DataSet with maximum elements per group

247

*/

248

def maxBy(fields: Int*): DataSet[T]

249

250

/**

251

* Selects first n elements from each group

252

* @param n Number of elements to select per group

253

* @return DataSet with first n elements per group

254

*/

255

def first(n: Int): DataSet[T]

256

}

257

```

258

259

**Usage Examples:**

260

261

```scala

262

// Find highest sale in each category

263

val highestSalePerCategory = sales

264

.groupBy(_.category)

265

.maxBy("amount")

266

267

// Get top 2 sales per category

268

val top2PerCategory = sales

269

.groupBy(_.category)

270

.first(2)

271

```

272

273

### Group Sorting

274

275

Sort elements within each group before processing.

276

277

```scala { .api }

278

class GroupedDataSet[T] {

279

/**

280

* Sorts elements within each group by field position

281

* @param field Field position for sorting

282

* @param order Sort order (ASCENDING or DESCENDING)

283

* @return Sorted GroupedDataSet

284

*/

285

def sortGroup(field: Int, order: Order): GroupedDataSet[T]

286

287

/**

288

* Sorts elements within each group by field name

289

* @param field Field name for sorting

290

* @param order Sort order (ASCENDING or DESCENDING)

291

* @return Sorted GroupedDataSet

292

*/

293

def sortGroup(field: String, order: Order): GroupedDataSet[T]

294

295

/**

296

* Sorts elements within each group using key selector

297

* @param fun Key selector function for sorting

298

* @param order Sort order (ASCENDING or DESCENDING)

299

* @return Sorted GroupedDataSet

300

*/

301

def sortGroup[K: TypeInformation](fun: T => K, order: Order): GroupedDataSet[T]

302

}

303

```

304

305

### Custom Partitioning

306

307

Control how groups are distributed across cluster nodes.

308

309

```scala { .api }

310

class GroupedDataSet[T] {

311

/**

312

* Uses custom partitioner for group distribution

313

* @param partitioner Custom partitioner implementation

314

* @return GroupedDataSet with custom partitioning

315

*/

316

def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): GroupedDataSet[T]

317

}

318

```

319

320

## Chained Aggregations

321

322

### AggregateDataSet Operations

323

324

Chain multiple aggregation operations for comprehensive analytics.

325

326

```scala { .api }

327

class AggregateDataSet[T] {

328

/**

329

* Adds additional aggregation to the chain

330

* @param agg Aggregation type

331

* @param field Field position for aggregation

332

* @return AggregateDataSet for further chaining

333

*/

334

def and(agg: Aggregations, field: Int): AggregateDataSet[T]

335

336

/**

337

* Adds additional aggregation to the chain by field name

338

* @param agg Aggregation type

339

* @param field Field name for aggregation

340

* @return AggregateDataSet for further chaining

341

*/

342

def and(agg: Aggregations, field: String): AggregateDataSet[T]

343

344

/**

345

* Adds sum aggregation to the chain

346

* @param field Field position to sum

347

* @return AggregateDataSet for further chaining

348

*/

349

def andSum(field: Int): AggregateDataSet[T]

350

351

/**

352

* Adds maximum aggregation to the chain

353

* @param field Field position for maximum

354

* @return AggregateDataSet for further chaining

355

*/

356

def andMax(field: Int): AggregateDataSet[T]

357

358

/**

359

* Adds minimum aggregation to the chain

360

* @param field Field position for minimum

361

* @return AggregateDataSet for further chaining

362

*/

363

def andMin(field: Int): AggregateDataSet[T]

364

365

/**

366

* Adds sum aggregation to the chain by field name

367

* @param field Field name to sum

368

* @return AggregateDataSet for further chaining

369

*/

370

def andSum(field: String): AggregateDataSet[T]

371

372

/**

373

* Adds maximum aggregation to the chain by field name

374

* @param field Field name for maximum

375

* @return AggregateDataSet for further chaining

376

*/

377

def andMax(field: String): AggregateDataSet[T]

378

379

/**

380

* Adds minimum aggregation to the chain by field name

381

* @param field Field name for minimum

382

* @return AggregateDataSet for further chaining

383

*/

384

def andMin(field: String): AggregateDataSet[T]

385

}

386

```

387

388

**Usage Examples:**

389

390

```scala

391

// Multiple aggregations on grouped data

392

case class SalesRecord(product: String, region: String, quantity: Int, revenue: Double)

393

394

val salesData = env.fromElements(

395

SalesRecord("Laptop", "North", 10, 9999.90),

396

SalesRecord("Phone", "North", 25, 14999.75),

397

SalesRecord("Laptop", "South", 15, 14999.85)

398

)

399

400

// Sum both quantity and revenue by region

401

val regionStats = salesData

402

.groupBy(_.region)

403

.sum("quantity")

404

.andSum("revenue")

405

```

406

407

## Types

408

409

```scala { .api }

410

class GroupedDataSet[T] {

411

// Inherits from DataSet[T] and adds grouping-specific operations

412

}

413

414

class AggregateDataSet[T] extends DataSet[T] {

415

// Represents result of aggregation operations with chaining capabilities

416

}

417

418

sealed trait CombineHint

419

object CombineHint {

420

case object HASH extends CombineHint

421

case object SORT extends CombineHint

422

}

423

424

object Aggregations extends Enumeration {

425

type Aggregations = Value

426

val SUM, MAX, MIN = Value

427

}

428

429

sealed trait Order

430

object Order {

431

case object ASCENDING extends Order

432

case object DESCENDING extends Order

433

}

434

435

abstract class Partitioner[T] {

436

def partition(key: T, numPartitions: Int): Int

437

}

438

```