or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation-grouping.mddata-input-output.mddataset-operations.mdexecution-environments.mdindex.mditeration-operations.mdjoin-cogroup-operations.mdutility-functions.md

aggregation-grouping.mddocs/

0

# Aggregation and Grouping

1

2

Built-in aggregation functions and grouping operations for statistical computations and data summarization. These operations enable efficient computation of aggregates like sum, min, max on grouped data.

3

4

## Capabilities

5

6

### Grouping Operations

7

8

Group DataSet elements by key fields or key selector functions to enable aggregation operations.

9

10

```java { .api }

11

/**

12

* Group elements by field positions (for Tuple types)

13

* @param fields the field positions to group by

14

* @return UnsortedGrouping for aggregation operations

15

*/

16

public UnsortedGrouping<T> groupBy(int... fields);

17

18

/**

19

* Group elements by field names (for POJO types)

20

* @param fields the field names to group by

21

* @return UnsortedGrouping for aggregation operations

22

*/

23

public UnsortedGrouping<T> groupBy(String... fields);

24

25

/**

26

* Group elements by key selector function

27

* @param keyExtractor function to extract the grouping key

28

* @return UnsortedGrouping for aggregation operations

29

*/

30

public <K> UnsortedGrouping<T> groupBy(KeySelector<T, K> keyExtractor);

31

```

32

33

**Usage Examples:**

34

35

```java

36

// Group by field position (for Tuples)

37

DataSet<Tuple3<String, String, Integer>> sales = env.fromElements(

38

new Tuple3<>("Product A", "Region 1", 100),

39

new Tuple3<>("Product A", "Region 2", 150),

40

new Tuple3<>("Product B", "Region 1", 200)

41

);

42

43

// Group by product (field 0)

44

UnsortedGrouping<Tuple3<String, String, Integer>> byProduct = sales.groupBy(0);

45

46

// Group by product and region (fields 0 and 1)

47

UnsortedGrouping<Tuple3<String, String, Integer>> byProductRegion = sales.groupBy(0, 1);

48

49

// Group by key selector

50

DataSet<Person> people = getPersonDataSet();

51

UnsortedGrouping<Person> byAge = people.groupBy(person -> person.age);

52

```

53

54

### UnsortedGrouping Operations

55

56

Operations available on unsorted groupings for aggregation and reduction.

57

58

```java { .api }

59

/**

60

* Apply built-in aggregation function

61

* @param agg the aggregation function (SUM, MIN, MAX)

62

* @param field the field position to aggregate (for Tuple types only)

63

* @return AggregateOperator with aggregation result

64

*/

65

public AggregateOperator<T> aggregate(Aggregations agg, int field);

66

67

/**

68

* Sum aggregation on specified field position

69

* @param field the field position to sum (for Tuple types only)

70

* @return AggregateOperator with sum result

71

*/

72

public AggregateOperator<T> sum(int field);

73

74

/**

75

* Minimum aggregation on specified field position

76

* @param field the field position to find minimum (for Tuple types only)

77

* @return AggregateOperator with minimum result

78

*/

79

public AggregateOperator<T> min(int field);

80

81

/**

82

* Maximum aggregation on specified field position

83

* @param field the field position to find maximum (for Tuple types only)

84

* @return AggregateOperator with maximum result

85

*/

86

public AggregateOperator<T> max(int field);

87

```

88

89

**Usage Examples:**

90

91

```java

92

// Sum sales by product

93

DataSet<Tuple3<String, String, Integer>> totalSales = sales

94

.groupBy(0) // group by product

95

.sum(2); // sum the sales amount (field 2)

96

97

// Multiple aggregations

98

DataSet<Tuple3<String, String, Integer>> minSales = sales

99

.groupBy(0)

100

.min(2);

101

102

DataSet<Tuple3<String, String, Integer>> maxSales = sales

103

.groupBy(0)

104

.max(2);

105

106

// Note: Aggregation methods only work with field positions for Tuple types

107

// For POJO types, use custom reduce functions instead

108

```

109

110

### Custom Reduce Operations

111

112

Apply custom reduction functions to grouped data.

113

114

```java { .api }

115

/**

116

* Apply reduce function to each group

117

* @param reducer the reduce function to apply

118

* @return ReduceOperator with reduction result

119

*/

120

public ReduceOperator<T> reduce(ReduceFunction<T> reducer);

121

122

/**

123

* Apply group reduce function to each group

124

* @param reducer the group reduce function to apply

125

* @return GroupReduceOperator with group reduction result

126

*/

127

public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer);

128

129

/**

130

* Apply combine function to each group (for pre-aggregation)

131

* @param combiner the group combine function to apply

132

* @return GroupCombineOperator with combine result

133

*/

134

public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner);

135

136

/**

137

* Select elements with minimum value for specified fields

138

* @param fields the field positions to compare

139

* @return ReduceOperator with minimum elements

140

*/

141

public ReduceOperator<T> minBy(int... fields);

142

143

/**

144

* Select elements with maximum value for specified fields

145

* @param fields the field positions to compare

146

* @return ReduceOperator with maximum elements

147

*/

148

public ReduceOperator<T> maxBy(int... fields);

149

150

/**

151

* Get first n elements from each group

152

* @param n number of elements to select from each group

153

* @return GroupReduceOperator with first n elements

154

*/

155

public GroupReduceOperator<T, T> first(int n);

156

```

157

158

**Usage Examples:**

159

160

```java

161

// Custom reduce function

162

DataSet<Tuple2<String, Integer>> wordCounts = words

163

.groupBy(0)

164

.reduce(new ReduceFunction<Tuple2<String, Integer>>() {

165

@Override

166

public Tuple2<String, Integer> reduce(Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) {

167

return new Tuple2<>(v1.f0, v1.f1 + v2.f1);

168

}

169

});

170

171

// Group reduce for more complex aggregations

172

DataSet<Tuple2<String, String>> concatenated = sales

173

.groupBy(0)

174

.reduceGroup(new GroupReduceFunction<Tuple3<String, String, Integer>, Tuple2<String, String>>() {

175

@Override

176

public void reduce(Iterable<Tuple3<String, String, Integer>> values,

177

Collector<Tuple2<String, String>> out) {

178

String product = null;

179

StringBuilder regions = new StringBuilder();

180

181

for (Tuple3<String, String, Integer> value : values) {

182

if (product == null) product = value.f0;

183

regions.append(value.f1).append(",");

184

}

185

186

out.collect(new Tuple2<>(product, regions.toString()));

187

}

188

});

189

```

190

191

### Sorted Grouping

192

193

Sort groups by specific fields for ordered processing.

194

195

```java { .api }

196

/**

197

* Sort group by specified field and order

198

* @param field the field to sort by

199

* @param order the sort order (ASCENDING or DESCENDING)

200

* @return SortedGrouping for ordered group operations

201

*/

202

public SortedGrouping<T> sortGroup(int field, Order order);

203

204

/**

205

* Sort group by field name and order

206

* @param field the field name to sort by

207

* @param order the sort order (ASCENDING or DESCENDING)

208

* @return SortedGrouping for ordered group operations

209

*/

210

public SortedGrouping<T> sortGroup(String field, Order order);

211

```

212

213

**Usage Examples:**

214

215

```java

216

// Sort sales within each product group by amount (descending)

217

DataSet<Tuple3<String, String, Integer>> sortedSales = sales

218

.groupBy(0) // group by product

219

.sortGroup(2, Order.DESCENDING) // sort by sales amount descending

220

.reduceGroup(new GroupReduceFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>>() {

221

@Override

222

public void reduce(Iterable<Tuple3<String, String, Integer>> values,

223

Collector<Tuple3<String, String, Integer>> out) {

224

// Process sorted values - first value is the highest sale

225

Iterator<Tuple3<String, String, Integer>> iter = values.iterator();

226

if (iter.hasNext()) {

227

out.collect(iter.next()); // emit only the highest sale per product

228

}

229

}

230

});

231

```

232

233

### Aggregation Types

234

235

Built-in aggregation functions available for numeric fields.

236

237

```java { .api }

238

/**

239

* Enumeration of built-in aggregation functions

240

*/

241

public enum Aggregations {

242

/** Sum aggregation */

243

SUM,

244

/** Minimum aggregation */

245

MIN,

246

/** Maximum aggregation */

247

MAX

248

}

249

```

250

251

### Sort Orders

252

253

Sort order options for sorted grouping operations.

254

255

```java { .api }

256

/**

257

* Sort order enumeration

258

*/

259

public enum Order {

260

/** Ascending order (1, 2, 3, ...) */

261

ASCENDING,

262

/** Descending order (..., 3, 2, 1) */

263

DESCENDING

264

}

265

```

266

267

### Function Interfaces

268

269

Interfaces for custom aggregation and reduction functions.

270

271

```java { .api }

272

/**

273

* Interface for reduce functions

274

* @param <T> the type of elements to reduce

275

*/

276

public interface ReduceFunction<T> extends Function, Serializable {

277

/**

278

* Reduce two values to one

279

* @param value1 first value

280

* @param value2 second value

281

* @return reduced value

282

*/

283

T reduce(T value1, T value2) throws Exception;

284

}

285

286

/**

287

* Interface for group reduce functions

288

* @param <IN> input element type

289

* @param <OUT> output element type

290

*/

291

public interface GroupReduceFunction<IN, OUT> extends Function, Serializable {

292

/**

293

* Reduce a group of values

294

* @param values iterable of input values

295

* @param out collector for output values

296

*/

297

void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception;

298

}

299

300

/**

301

* Interface for group combine functions (for pre-aggregation)

302

* @param <IN> input element type

303

* @param <OUT> output element type

304

*/

305

public interface GroupCombineFunction<IN, OUT> extends Function, Serializable {

306

/**

307

* Combine a group of values (partial aggregation)

308

* @param values iterable of input values

309

* @param out collector for output values

310

*/

311

void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;

312

}

313

314

/**

315

* Rich versions with access to runtime context

316

*/

317

public abstract class RichReduceFunction<T> extends AbstractRichFunction implements ReduceFunction<T> {}

318

public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT> {}

319

public abstract class RichGroupCombineFunction<IN, OUT> extends AbstractRichFunction implements GroupCombineFunction<IN, OUT> {}

320

```

321

322

### Exception Handling

323

324

Exception types related to aggregation operations.

325

326

```java { .api }

327

/**

328

* Exception thrown when aggregation is applied to unsupported type

329

*/

330

public class UnsupportedAggregationTypeException extends RuntimeException {

331

/**

332

* Create exception with message

333

* @param message error message

334

*/

335

public UnsupportedAggregationTypeException(String message);

336

}

337

```

338

339

**Usage Examples for Exception Handling:**

340

341

```java

342

try {

343

// This might throw UnsupportedAggregationTypeException

344

// if trying to sum non-numeric fields

345

DataSet<Tuple2<String, String>> result = stringData

346

.groupBy(0)

347

.sum(1); // Error: cannot sum String field

348

349

} catch (UnsupportedAggregationTypeException e) {

350

System.err.println("Cannot perform aggregation: " + e.getMessage());

351

}

352

```

353

354

## Types

355

356

```java { .api }

357

import org.apache.flink.api.java.operators.UnsortedGrouping;

358

import org.apache.flink.api.java.operators.SortedGrouping;

359

import org.apache.flink.api.java.operators.AggregateOperator;

360

import org.apache.flink.api.java.operators.ReduceOperator;

361

import org.apache.flink.api.java.operators.GroupReduceOperator;

362

import org.apache.flink.api.java.operators.GroupCombineOperator;

363

import org.apache.flink.api.java.aggregation.Aggregations;

364

import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException;

365

import org.apache.flink.api.common.functions.ReduceFunction;

366

import org.apache.flink.api.common.functions.GroupReduceFunction;

367

import org.apache.flink.api.common.functions.GroupCombineFunction;

368

import org.apache.flink.api.common.functions.RichReduceFunction;

369

import org.apache.flink.api.common.functions.RichGroupReduceFunction;

370

import org.apache.flink.api.common.functions.RichGroupCombineFunction;

371

import org.apache.flink.api.common.operators.Order;

372

import org.apache.flink.util.Collector;

373

```