or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dataset-operations.mdexecution-environment.mdextensions.mdgrouped-dataset-operations.mdindex.mdjoin-operations.mdtype-system.mdutility-functions.md

grouped-dataset-operations.mddocs/

0

# Grouped DataSet Operations

1

2

Grouped DataSets are created by applying groupBy operations to DataSets. They provide specialized operations for processing data within groups.

3

4

## Creating Grouped DataSets

5

6

```scala { .api }

7

class DataSet[T] {

8

def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]

9

def groupBy(fields: Int*): GroupedDataSet[T]

10

def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]

11

}

12

```

13

14

### Grouping Examples

15

16

```scala

17

case class Sale(region: String, product: String, amount: Double, date: String)

18

19

val sales = env.fromElements(

20

Sale("US", "ProductA", 100.0, "2023-01-01"),

21

Sale("EU", "ProductA", 150.0, "2023-01-01"),

22

Sale("US", "ProductB", 200.0, "2023-01-02"),

23

Sale("EU", "ProductB", 120.0, "2023-01-02")

24

)

25

26

// Group by key function

27

val groupedByRegion = sales.groupBy(_.region)

28

29

// Group by field position (for tuples)

30

val tuples = env.fromElements(("US", 100), ("EU", 150), ("US", 200))

31

val groupedByPosition = tuples.groupBy(0)

32

33

// Group by field name (for case classes)

34

val groupedByName = sales.groupBy("region")

35

36

// Group by multiple fields

37

val groupedByRegionAndProduct = sales.groupBy(s => (s.region, s.product))

38

val groupedByMultipleFields = sales.groupBy("region", "product")

39

```

40

41

## Sorting Within Groups

42

43

```scala { .api }

44

class GroupedDataSet[T] {

45

def sortGroup(field: Int, order: Order): GroupedDataSet[T]

46

def sortGroup(field: String, order: Order): GroupedDataSet[T]

47

def sortGroup[K: TypeInformation](fun: T => K, order: Order): GroupedDataSet[T]

48

}

49

```

50

51

### Sorting Examples

52

53

```scala

54

import org.apache.flink.api.common.operators.Order

55

56

val sales = env.fromElements(

57

("US", 100, "2023-01-01"),

58

("US", 200, "2023-01-02"),

59

("EU", 150, "2023-01-01")

60

)

61

62

// Sort by field position within groups

63

val sortedByAmount = sales

64

.groupBy(0) // Group by region

65

.sortGroup(1, Order.DESCENDING) // Sort by amount within each group

66

67

// Sort by field name

68

case class Transaction(region: String, amount: Double, date: String)

69

val transactions = env.fromElements(

70

Transaction("US", 100.0, "2023-01-01"),

71

Transaction("US", 200.0, "2023-01-02")

72

)

73

74

val sortedTransactions = transactions

75

.groupBy("region")

76

.sortGroup("amount", Order.ASCENDING)

77

78

// Sort by key function

79

val sortedByDate = transactions

80

.groupBy(_.region)

81

.sortGroup(_.date, Order.DESCENDING)

82

```

83

84

## Aggregation Operations

85

86

### Basic Aggregations

87

88

```scala { .api }

89

class GroupedDataSet[T] {

90

def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]

91

def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]

92

def sum(field: Int): AggregateDataSet[T]

93

def sum(field: String): AggregateDataSet[T]

94

def max(field: Int): AggregateDataSet[T]

95

def max(field: String): AggregateDataSet[T]

96

def min(field: Int): AggregateDataSet[T]

97

def min(field: String): AggregateDataSet[T]

98

}

99

```

100

101

### Aggregation Examples

102

103

```scala

104

val salesData = env.fromElements(

105

("US", "ProductA", 100.0),

106

("US", "ProductB", 200.0),

107

("EU", "ProductA", 150.0),

108

("EU", "ProductB", 120.0)

109

)

110

111

// Sum sales by region

112

val totalByRegion = salesData

113

.groupBy(0) // Group by region

114

.sum(2) // Sum the amount field

115

116

// Max sales by region

117

val maxByRegion = salesData

118

.groupBy(0)

119

.max(2)

120

121

// Min sales by region

122

val minByRegion = salesData

123

.groupBy(0)

124

.min(2)

125

126

// Multiple aggregations

127

val stats = salesData

128

.groupBy(0)

129

.aggregate(Aggregations.SUM, 2)

130

.and(Aggregations.MAX, 2)

131

.and(Aggregations.MIN, 2)

132

```

133

134

### MaxBy and MinBy

135

136

```scala { .api }

137

class GroupedDataSet[T] {

138

def maxBy(fields: Int*): DataSet[T]

139

def maxBy(firstField: String, otherFields: String*): DataSet[T]

140

def minBy(fields: Int*): DataSet[T]

141

def minBy(firstField: String, otherFields: String*): DataSet[T]

142

}

143

```

144

145

```scala

146

val employeeData = env.fromElements(

147

("Engineering", "Alice", 95000),

148

("Engineering", "Bob", 85000),

149

("Sales", "Charlie", 75000),

150

("Sales", "David", 80000)

151

)

152

153

// Get highest paid employee per department

154

val highestPaid = employeeData

155

.groupBy(0) // Group by department

156

.maxBy(2) // Max by salary

157

158

// Get lowest paid employee per department

159

val lowestPaid = employeeData

160

.groupBy(0)

161

.minBy(2)

162

163

// MaxBy with multiple fields (for tie-breaking)

164

val employeesByName = env.fromElements(

165

("Dept1", "Alice", 80000, 5), // (dept, name, salary, years)

166

("Dept1", "Bob", 80000, 3),

167

("Dept2", "Charlie", 75000, 7)

168

)

169

170

// Max by salary, then by years of experience

171

val seniorHighestPaid = employeesByName

172

.groupBy(0)

173

.maxBy(2, 3) // Max by salary, then by years

174

```

175

176

## Reduce Operations

177

178

### Simple Reduce

179

180

```scala { .api }

181

class GroupedDataSet[T] {

182

def reduce(fun: (T, T) => T): DataSet[T]

183

def reduce(reducer: ReduceFunction[T]): DataSet[T]

184

}

185

```

186

187

```scala

188

val numbers = env.fromElements(

189

("A", 1), ("A", 2), ("A", 3),

190

("B", 4), ("B", 5)

191

)

192

193

// Sum numbers within each group

194

val groupSum = numbers

195

.groupBy(0)

196

.reduce((a, b) => (a._1, a._2 + b._2))

197

198

// Find maximum within each group

199

val groupMax = numbers

200

.groupBy(0)

201

.reduce((a, b) => if (a._2 > b._2) a else b)

202

```

203

204

### Group Reduce

205

206

```scala { .api }

207

class GroupedDataSet[T] {

208

def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]

209

def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]

210

def reduceGroup[R: TypeInformation: ClassTag](groupReducer: GroupReduceFunction[T, R]): DataSet[R]

211

}

212

```

213

214

```scala

215

val studentGrades = env.fromElements(

216

("Math", "Alice", 85),

217

("Math", "Bob", 90),

218

("Math", "Charlie", 78),

219

("Science", "Alice", 92),

220

("Science", "Bob", 88)

221

)

222

223

// Calculate statistics per subject

224

val subjectStats = studentGrades

225

.groupBy(0) // Group by subject

226

.reduceGroup { iterator =>

227

val grades = iterator.map(_._3).toList

228

val subject = grades.headOption.map(_ => iterator.next()._1).getOrElse("Unknown")

229

val avg = grades.sum.toDouble / grades.length

230

val max = grades.max

231

val min = grades.min

232

val count = grades.length

233

(subject, avg, max, min, count)

234

}

235

236

// Multiple output per group

237

val gradeRanges = studentGrades

238

.groupBy(0)

239

.reduceGroup { (iterator, collector) =>

240

val gradesList = iterator.toList

241

val subject = gradesList.head._1

242

243

gradesList.foreach { case (_, student, grade) =>

244

val range = grade match {

245

case g if g >= 90 => "A"

246

case g if g >= 80 => "B"

247

case g if g >= 70 => "C"

248

case _ => "D"

249

}

250

collector.collect((subject, student, grade, range))

251

}

252

}

253

```

254

255

### Combine Group

256

257

```scala { .api }

258

class GroupedDataSet[T] {

259

def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]

260

def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]

261

}

262

```

263

264

```scala

265

val salesRecords = env.fromElements(

266

("US", 100), ("US", 200), ("US", 150),

267

("EU", 120), ("EU", 180), ("EU", 90)

268

)

269

270

// Combine groups efficiently (pre-aggregation)

271

val combinedSales = salesRecords

272

.groupBy(0)

273

.combineGroup { (iterator, collector) =>

274

val records = iterator.toList

275

val region = records.head._1

276

val totalSales = records.map(_._2).sum

277

val recordCount = records.length

278

collector.collect((region, totalSales, recordCount))

279

}

280

```

281

282

## First Operations

283

284

```scala { .api }

285

class GroupedDataSet[T] {

286

def first(n: Int): DataSet[T]

287

}

288

```

289

290

```scala

291

val timestampedData = env.fromElements(

292

("US", "2023-01-01", 100),

293

("US", "2023-01-02", 150),

294

("US", "2023-01-03", 200),

295

("EU", "2023-01-01", 120),

296

("EU", "2023-01-02", 180)

297

)

298

299

// Get first 2 records per region

300

val firstTwo = timestampedData

301

.groupBy(0) // Group by region

302

.first(2)

303

304

// Get first record per region after sorting

305

val latestPerRegion = timestampedData

306

.groupBy(0)

307

.sortGroup(1, Order.DESCENDING) // Sort by date descending

308

.first(1) // Get latest (first after desc sort)

309

```

310

311

## Window Operations on Groups

312

313

While DataSet API doesn't have built-in windowing, you can simulate windowing using groupBy and custom logic:

314

315

```scala

316

case class Event(userId: String, eventType: String, timestamp: Long, value: Double)

317

318

val events = env.fromElements(

319

Event("user1", "click", 1000, 1.0),

320

Event("user1", "click", 2000, 1.0),

321

Event("user1", "purchase", 3000, 10.0),

322

Event("user2", "click", 1500, 1.0),

323

Event("user2", "purchase", 4000, 15.0)

324

)

325

326

// Group by user and time window (using truncated timestamp)

327

val windowSize = 3000L // 3 second windows

328

val windowedEvents = events

329

.map(e => (e.userId, e.timestamp / windowSize, e)) // Add window key

330

.groupBy(e => (e._1, e._2)) // Group by user and window

331

.reduceGroup { iterator =>

332

val eventsInWindow = iterator.map(_._3).toList

333

val userId = eventsInWindow.head.userId

334

val windowStart = eventsInWindow.head.timestamp / windowSize * windowSize

335

val totalValue = eventsInWindow.map(_.value).sum

336

val eventCount = eventsInWindow.length

337

(userId, windowStart, totalValue, eventCount)

338

}

339

```

340

341

## Types

342

343

```scala { .api }

344

import org.apache.flink.api.common.functions.{

345

ReduceFunction,

346

GroupReduceFunction,

347

GroupCombineFunction

348

}

349

import org.apache.flink.api.common.operators.Order

350

import org.apache.flink.util.Collector

351

352

// Function interfaces

353

trait ReduceFunction[T] extends Function {

354

def reduce(value1: T, value2: T): T

355

}

356

357

trait GroupReduceFunction[IN, OUT] extends Function {

358

def reduce(values: java.lang.Iterable[IN], out: Collector[OUT]): Unit

359

}

360

361

trait GroupCombineFunction[IN, OUT] extends Function {

362

def combine(values: java.lang.Iterable[IN], out: Collector[OUT]): Unit

363

}

364

365

// Aggregation types

366

object Aggregations extends Enumeration {

367

val SUM, MAX, MIN = Value

368

}

369

370

// Chained aggregations

371

class AggregateDataSet[T] {

372

def and(agg: Aggregations, field: Int): AggregateDataSet[T]

373

def and(agg: Aggregations, field: String): AggregateDataSet[T]

374

def andSum(field: Int): AggregateDataSet[T]

375

def andSum(field: String): AggregateDataSet[T]

376

def andMax(field: Int): AggregateDataSet[T]

377

def andMax(field: String): AggregateDataSet[T]

378

def andMin(field: Int): AggregateDataSet[T]

379

def andMin(field: String): AggregateDataSet[T]

380

}

381

382

// Order enumeration

383

object Order extends Enumeration {

384

val ASCENDING, DESCENDING = Value

385

}

386

```

387

388

## Performance Considerations

389

390

### Efficient Grouping

391

392

```scala

393

// Prefer groupBy with key functions over field names when possible

394

case class Record(key: String, value: Int)

395

val data: DataSet[Record] = // ...

396

397

// Efficient - direct key extraction

398

val grouped1 = data.groupBy(_.key)

399

400

// Less efficient - reflection-based field access

401

val grouped2 = data.groupBy("key")

402

```

403

404

### Combining vs Reducing

405

406

```scala

407

// Use combineGroup for operations that can be pre-aggregated

408

val largeSales = env.fromElements((1 to 1000000).map(i => (s"region${i % 10}", i)): _*)

409

410

// Efficient - pre-aggregates locally before sending over network

411

val combined = largeSales

412

.groupBy(0)

413

.combineGroup { (iterator, collector) =>

414

val (region, values) = iterator.toList.unzip

415

collector.collect((region.head, values.sum))

416

}

417

418

// Less efficient for large datasets - no pre-aggregation

419

val reduced = largeSales

420

.groupBy(0)

421

.reduce((a, b) => (a._1, a._2 + b._2))

422

```

423

424

### Sorting Optimization

425

426

```scala

427

// Sort within groups only when necessary

428

val transactions = env.fromElements(

429

("US", 100, "2023-01-01"),

430

("US", 200, "2023-01-02")

431

)

432

433

// If you only need the first N elements, sort only for that

434

val topTransactions = transactions

435

.groupBy(0)

436

.sortGroup(1, Order.DESCENDING)

437

.first(5) // Only sort enough to get top 5

438

439

// If you need all elements sorted, this is fine

440

val allSorted = transactions

441

.groupBy(0)

442

.sortGroup(1, Order.DESCENDING)

443

.reduceGroup(identity) // Process all sorted elements

444

```