or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md

grouping-aggregation.mddocs/

0

# Grouping and Aggregation

1

2

Apache Flink Scala API provides powerful grouping and aggregation operations with type-safe field access and functional programming patterns. Data can be grouped by keys and then aggregated using built-in functions or custom reduce operations.

3

4

## Grouping Operations

5

6

### GroupBy

7

8

Group elements by key for subsequent aggregation operations.

9

10

```scala { .api }

11

class DataSet[T] {

12

// Group by key function

13

def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]

14

15

// Group by field positions (for tuples/case classes)

16

def groupBy(fields: Int*): GroupedDataSet[T]

17

18

// Group by field names (for case classes)

19

def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]

20

}

21

```

22

23

## Aggregation Operations

24

25

### Basic Aggregations

26

27

Perform common aggregation operations on grouped data.

28

29

```scala { .api }

30

class GroupedDataSet[T] {

31

// Sum values by field

32

def sum(field: Int): DataSet[T]

33

def sum(field: String): DataSet[T]

34

35

// Find maximum values by field

36

def max(field: Int): DataSet[T]

37

def max(field: String): DataSet[T]

38

39

// Find minimum values by field

40

def min(field: Int): DataSet[T]

41

def min(field: String): DataSet[T]

42

43

// Find records with maximum field values

44

def maxBy(fields: Int*): DataSet[T]

45

def maxBy(firstField: String, otherFields: String*): DataSet[T]

46

47

// Find records with minimum field values

48

def minBy(fields: Int*): DataSet[T]

49

def minBy(firstField: String, otherFields: String*): DataSet[T]

50

}

51

```

52

53

### Generic Aggregation

54

55

```scala { .api }

56

class GroupedDataSet[T] {

57

// Generic aggregation with Aggregations enum

58

def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]

59

def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]

60

61

// Chain multiple aggregations

62

class AggregateDataSet[T] {

63

def and(agg: Aggregations, field: Int): AggregateDataSet[T]

64

def and(agg: Aggregations, field: String): AggregateDataSet[T]

65

}

66

}

67

68

// Direct aggregation on ungrouped DataSet

69

class DataSet[T] {

70

def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]

71

def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]

72

}

73

```

74

75

### Partitioning Configuration

76

77

```scala { .api }

78

class GroupedDataSet[T] {

79

// Configure custom partitioner for grouping

80

def withPartitioner[K : TypeInformation](partitioner: Partitioner[K]): GroupedDataSet[T]

81

}

82

```

83

84

## Reduce Operations

85

86

### Basic Reduce

87

88

```scala { .api }

89

class GroupedDataSet[T] {

90

// Reduce groups using function

91

def reduce(fun: (T, T) => T): DataSet[T]

92

93

// Reduce groups using ReduceFunction

94

def reduce(reducer: ReduceFunction[T]): DataSet[T]

95

}

96

```

97

98

### Group Reduce

99

100

Transform entire groups at once, providing access to all elements in each group.

101

102

```scala { .api }

103

class GroupedDataSet[T] {

104

// Process entire group with function returning single result

105

def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]

106

107

// Process entire group with function returning multiple results

108

def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]

109

110

// Process entire group with Collector for output

111

def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]

112

113

// Process entire group with GroupReduceFunction

114

def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]

115

}

116

117

// Direct group reduce on ungrouped DataSet

118

class DataSet[T] {

119

def aggregateGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]

120

def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]

121

}

122

```

123

124

### Combine Functions

125

126

Pre-aggregate data locally before shuffling for better performance.

127

128

```scala { .api }

129

class GroupedDataSet[T] {

130

// Combine with function returning single result

131

def combineGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]

132

133

// Combine with function returning multiple results

134

def combineGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]

135

136

// Combine with Collector for output

137

def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]

138

139

// Combine with GroupCombineFunction

140

def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]

141

}

142

```

143

144

## Sorted Grouping

145

146

### Sort Groups

147

148

Sort elements within each group before processing.

149

150

```scala { .api }

151

class GroupedDataSet[T] {

152

// Sort by key function

153

def sortGroup[K: TypeInformation](fun: T => K, order: Order): SortedGrouping[T]

154

155

// Sort by field position

156

def sortGroup(field: Int, order: Order): SortedGrouping[T]

157

158

// Sort by field name

159

def sortGroup(field: String, order: Order): SortedGrouping[T]

160

}

161

162

class SortedGrouping[T] {

163

// Chain additional sort keys

164

def sortGroup[K: TypeInformation](fun: T => K, order: Order): SortedGrouping[T]

165

def sortGroup(field: Int, order: Order): SortedGrouping[T]

166

def sortGroup(field: String, order: Order): SortedGrouping[T]

167

168

// Reduce sorted groups

169

def reduce(fun: (T, T) => T): DataSet[T]

170

def reduce(reducer: ReduceFunction[T]): DataSet[T]

171

def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]

172

}

173

```

174

175

### First N Elements

176

177

Get the first n elements from each group.

178

179

```scala { .api }

180

class GroupedDataSet[T] {

181

def first(n: Int): DataSet[T]

182

}

183

```

184

185

## Usage Examples

186

187

### Basic Grouping and Aggregation

188

189

```scala

190

import org.apache.flink.api.scala._

191

192

case class Sale(product: String, region: String, amount: Double)

193

194

val env = ExecutionEnvironment.getExecutionEnvironment

195

val sales = env.fromElements(

196

Sale("laptop", "US", 1000.0),

197

Sale("phone", "US", 500.0),

198

Sale("laptop", "EU", 800.0),

199

Sale("phone", "EU", 450.0),

200

Sale("tablet", "US", 300.0)

201

)

202

203

// Group by product and sum amounts

204

val productSales = sales

205

.groupBy(_.product)

206

.sum("amount")

207

208

// Group by region and find max sale

209

val maxSalesByRegion = sales

210

.groupBy(_.region)

211

.maxBy("amount")

212

```

213

214

### Multiple Field Grouping

215

216

```scala

217

import org.apache.flink.api.scala._

218

219

case class Transaction(date: String, product: String, region: String, amount: Double)

220

221

val env = ExecutionEnvironment.getExecutionEnvironment

222

val transactions = env.fromElements(

223

Transaction("2023-01-01", "laptop", "US", 1000.0),

224

Transaction("2023-01-01", "laptop", "EU", 800.0),

225

Transaction("2023-01-02", "laptop", "US", 1200.0)

226

)

227

228

// Group by multiple fields

229

val dailyProductSales = transactions

230

.groupBy(t => (t.date, t.product))

231

.sum("amount")

232

233

// Group by field names

234

val regionProductSales = transactions

235

.groupBy("region", "product")

236

.aggregate(Aggregations.SUM, "amount")

237

```

238

239

### Custom Reduce Operations

240

241

```scala

242

import org.apache.flink.api.scala._

243

244

case class Employee(name: String, department: String, salary: Double)

245

246

val env = ExecutionEnvironment.getExecutionEnvironment

247

val employees = env.fromElements(

248

Employee("Alice", "Engineering", 80000),

249

Employee("Bob", "Engineering", 75000),

250

Employee("Charlie", "Sales", 60000),

251

Employee("Diana", "Sales", 65000)

252

)

253

254

// Find highest paid employee per department

255

val topEarners = employees

256

.groupBy(_.department)

257

.reduce((e1, e2) => if (e1.salary > e2.salary) e1 else e2)

258

259

// Calculate average salary per department

260

case class DeptAverage(department: String, avgSalary: Double, count: Int)

261

262

val avgSalaries = employees

263

.groupBy(_.department)

264

.reduceGroup { employees =>

265

val list = employees.toList

266

val dept = list.head.department

267

val total = list.map(_.salary).sum

268

val count = list.length

269

DeptAverage(dept, total / count, count)

270

}

271

```

272

273

### Group Reduce with Multiple Results

274

275

```scala

276

import org.apache.flink.api.scala._

277

278

case class Order(customer: String, product: String, quantity: Int, price: Double)

279

280

val env = ExecutionEnvironment.getExecutionEnvironment

281

val orders = env.fromElements(

282

Order("Alice", "laptop", 1, 1000.0),

283

Order("Alice", "mouse", 2, 20.0),

284

Order("Bob", "phone", 1, 500.0),

285

Order("Bob", "case", 1, 15.0)

286

)

287

288

// Create customer summary with multiple metrics

289

case class CustomerSummary(customer: String, totalSpent: Double, itemCount: Int, avgOrderValue: Double)

290

291

val customerSummaries = orders

292

.groupBy(_.customer)

293

.reduceGroup { orders =>

294

val orderList = orders.toList

295

val customer = orderList.head.customer

296

val totalSpent = orderList.map(o => o.quantity * o.price).sum

297

val itemCount = orderList.map(_.quantity).sum

298

val avgOrderValue = totalSpent / orderList.length

299

CustomerSummary(customer, totalSpent, itemCount, avgOrderValue)

300

}

301

```

302

303

### Sorted Grouping

304

305

```scala

306

import org.apache.flink.api.scala._

307

import org.apache.flink.api.common.operators.Order

308

309

case class Score(player: String, game: String, score: Int, timestamp: Long)

310

311

val env = ExecutionEnvironment.getExecutionEnvironment

312

val scores = env.fromElements(

313

Score("Alice", "game1", 100, 1000L),

314

Score("Alice", "game1", 150, 2000L),

315

Score("Alice", "game1", 120, 3000L),

316

Score("Bob", "game1", 90, 1500L),

317

Score("Bob", "game1", 110, 2500L)

318

)

319

320

// Get best score per player (latest timestamp in case of ties)

321

val bestScores = scores

322

.groupBy(_.player)

323

.sortGroup(_.score, Order.DESCENDING)

324

.sortGroup(_.timestamp, Order.DESCENDING)

325

.first(1)

326

327

// Get top 3 scores per player

328

val top3Scores = scores

329

.groupBy(_.player)

330

.sortGroup(_.score, Order.DESCENDING)

331

.first(3)

332

```

333

334

### Chained Aggregations

335

336

```scala

337

import org.apache.flink.api.scala._

338

import org.apache.flink.api.java.aggregation.Aggregations

339

340

case class Metrics(region: String, product: String, sales: Double, profit: Double, units: Int)

341

342

val env = ExecutionEnvironment.getExecutionEnvironment

343

val metrics = env.fromElements(

344

Metrics("US", "laptop", 1000.0, 200.0, 1),

345

Metrics("US", "phone", 500.0, 100.0, 1),

346

Metrics("EU", "laptop", 800.0, 150.0, 1)

347

)

348

349

// Aggregate multiple fields

350

val regionalSummary = metrics

351

.groupBy(_.region)

352

.aggregate(Aggregations.SUM, "sales")

353

.and(Aggregations.SUM, "profit")

354

.and(Aggregations.SUM, "units")

355

```