or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dataset-operations.mdexecution-environment.mdextensions.mdgrouped-dataset-operations.mdindex.mdjoin-operations.mdtype-system.mdutility-functions.md

dataset-operations.mddocs/

0

# DataSet Operations

1

2

DataSet is the main abstraction for distributed data collections in Flink. It provides immutable transformation operations that create new DataSets.

3

4

## Basic Properties

5

6

```scala { .api }

7

class DataSet[T] {

8

def getType: TypeInformation[T]

9

def getExecutionEnvironment: ExecutionEnvironment

10

def getParallelism: Int

11

def setParallelism(parallelism: Int): DataSet[T]

12

def name(name: String): DataSet[T]

13

def setDescription(description: String): DataSet[T]

14

}

15

```

16

17

## Basic Transformations

18

19

### Map Operations

20

21

```scala { .api }

22

class DataSet[T] {

23

def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]

24

def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R]

25

def mapPartition[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]

26

def mapPartition[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]

27

def mapPartition[R: TypeInformation: ClassTag](mapPartitionFunction: MapPartitionFunction[T, R]): DataSet[R]

28

}

29

```

30

31

#### Usage Examples

32

33

```scala

34

val numbers = env.fromElements(1, 2, 3, 4, 5)

35

36

// Simple map with lambda

37

val doubled = numbers.map(_ * 2)

38

39

// Map with function

40

val squared = numbers.map(x => x * x)

41

42

// Map to different type

43

val numberStrings = numbers.map(n => s"Number: $n")

44

45

// Map partition with collector (process entire partition at once)

46

val processed = numbers.mapPartition { (iterator, collector) =>

47

val batch = iterator.toList

48

batch.foreach(x => collector.collect(x * 10))

49

}

50

51

// Map partition returning collection (alternative approach)

52

val processedAlt = numbers.mapPartition { iterator =>

53

iterator.toList.map(_ * 10)

54

}

55

```

56

57

### FlatMap Operations

58

59

```scala { .api }

60

class DataSet[T] {

61

def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]

62

def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R]

63

}

64

```

65

66

```scala

67

val sentences = env.fromElements("Hello World", "Flink Scala API", "Distributed Processing")

68

69

// Split sentences into words

70

val words = sentences.flatMap(_.split(" "))

71

72

// FlatMap with filtering

73

val longWords = sentences.flatMap(_.split(" ").filter(_.length > 4))

74

75

// FlatMap with collections

76

val chars = words.flatMap(_.toCharArray)

77

```

78

79

### Filter Operations

80

81

```scala { .api }

82

class DataSet[T] {

83

def filter(fun: T => Boolean): DataSet[T]

84

def filter(filter: FilterFunction[T]): DataSet[T]

85

}

86

```

87

88

```scala

89

val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

90

91

// Filter even numbers

92

val evenNumbers = numbers.filter(_ % 2 == 0)

93

94

// Filter with complex condition

95

val filtered = numbers.filter(x => x > 3 && x < 8)

96

```

97

98

## Distinct Operations

99

100

```scala { .api }

101

class DataSet[T] {

102

def distinct(): DataSet[T]

103

def distinct(fields: Int*): DataSet[T]

104

def distinct(firstField: String, otherFields: String*): DataSet[T]

105

def distinct[K: TypeInformation](fun: T => K): DataSet[T]

106

}

107

```

108

109

```scala

110

val duplicates = env.fromElements(1, 2, 2, 3, 3, 3, 4)

111

112

// Remove all duplicates

113

val unique = duplicates.distinct()

114

115

// Distinct by specific fields (for tuples/case classes)

116

val people = env.fromElements(("Alice", 25), ("Bob", 30), ("Alice", 35))

117

val distinctNames = people.distinct(0) // distinct by first field (name)

118

119

// Distinct by key function

120

case class Person(name: String, age: Int)

121

val personData = env.fromElements(Person("Alice", 25), Person("Bob", 30), Person("Alice", 35))

122

val distinctByName = personData.distinct(_.name)

123

```

124

125

## Aggregation Operations

126

127

### Basic Aggregations

128

129

```scala { .api }

130

class DataSet[T] {

131

def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]

132

def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]

133

def sum(field: Int): AggregateDataSet[T]

134

def sum(field: String): AggregateDataSet[T]

135

def max(field: Int): AggregateDataSet[T]

136

def max(field: String): AggregateDataSet[T]

137

def min(field: Int): AggregateDataSet[T]

138

def min(field: String): AggregateDataSet[T]

139

def maxBy(fields: Int*): DataSet[T]

140

def maxBy(firstField: String, otherFields: String*): DataSet[T]

141

def minBy(fields: Int*): DataSet[T]

142

def minBy(firstField: String, otherFields: String*): DataSet[T]

143

}

144

```

145

146

```scala

147

val sales = env.fromElements(

148

("Q1", 1000), ("Q2", 1500), ("Q3", 1200), ("Q4", 1800)

149

)

150

151

// Sum by field index

152

val totalSales = sales.sum(1)

153

154

// Max by field

155

val maxSales = sales.max(1)

156

157

// MaxBy - return entire record with maximum value

158

val bestQuarter = sales.maxBy(1)

159

```

160

161

### Reduce Operations

162

163

```scala { .api }

164

class DataSet[T] {

165

def reduce(fun: (T, T) => T): DataSet[T]

166

def reduce(reducer: ReduceFunction[T]): DataSet[T]

167

def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]

168

def reduceGroup[R: TypeInformation: ClassTag](groupReducer: GroupReduceFunction[T, R]): DataSet[R]

169

def combineGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]

170

def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]

171

}

172

```

173

174

```scala

175

val numbers = env.fromElements(1, 2, 3, 4, 5)

176

177

// Simple reduce - sum all numbers

178

val sum = numbers.reduce(_ + _)

179

180

// Reduce with more complex logic

181

val product = numbers.reduce((a, b) => a * b)

182

183

// Reduce group (process all elements together)

184

val statistics = numbers.reduceGroup { iterator =>

185

val values = iterator.toList

186

val sum = values.sum

187

val count = values.length

188

val avg = sum.toDouble / count

189

(sum, count, avg)

190

}

191

```

192

193

## Partitioning Operations

194

195

```scala { .api }

196

class DataSet[T] {

197

def partitionByHash(fields: Int*): DataSet[T]

198

def partitionByHash(firstField: String, otherFields: String*): DataSet[T]

199

def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]

200

def partitionByRange(fields: Int*): DataSet[T]

201

def partitionByRange(firstField: String, otherFields: String*): DataSet[T]

202

def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]

203

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]

204

def rebalance(): DataSet[T]

205

}

206

```

207

208

```scala

209

val data = env.fromElements(("A", 1), ("B", 2), ("C", 3), ("D", 4))

210

211

// Hash partition by first field

212

val hashPartitioned = data.partitionByHash(0)

213

214

// Range partition by second field

215

val rangePartitioned = data.partitionByRange(1)

216

217

// Custom partitioning

218

val customPartitioned = data.partitionCustom(new MyPartitioner(), _._1)

219

220

// Rebalance (round-robin distribution)

221

val rebalanced = data.rebalance()

222

```

223

224

### Sort Partition

225

226

```scala { .api }

227

class DataSet[T] {

228

def sortPartition(field: Int, order: Order): DataSet[T]

229

def sortPartition(field: String, order: Order): DataSet[T]

230

def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]

231

}

232

```

233

234

```scala

235

import org.apache.flink.api.common.operators.Order

236

237

val data = env.fromElements(("Alice", 25), ("Bob", 30), ("Charlie", 20))

238

239

// Sort partitions by age in ascending order

240

val sortedByAge = data.sortPartition(1, Order.ASCENDING)

241

242

// Sort by key function

243

val sortedByName = data.sortPartition(_._1, Order.DESCENDING)

244

```

245

246

## Output Operations

247

248

### Collect and Print

249

250

```scala { .api }

251

class DataSet[T] {

252

def collect(): Seq[T]

253

def count(): Long

254

def print(): Unit

255

def printToErr(): Unit

256

def printOnTaskManager(sinkIdentifier: String): DataSink[T]

257

}

258

```

259

260

```scala

261

val numbers = env.fromElements(1, 2, 3, 4, 5)

262

263

// Collect to local collection (triggers execution)

264

val results: Seq[Int] = numbers.collect()

265

266

// Count elements

267

val elementCount: Long = numbers.count()

268

269

// Print to console

270

numbers.print()

271

272

// Print to stderr

273

numbers.printToErr()

274

```

275

276

### File Output

277

278

```scala { .api }

279

class DataSet[T] {

280

def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE): DataSink[T]

281

def writeAsCsv(

282

filePath: String,

283

rowDelimiter: String = "\n",

284

fieldDelimiter: String = ",",

285

writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE

286

): DataSink[T]

287

def write[O <: FileOutputFormat[T]](

288

outputFormat: O,

289

filePath: String,

290

writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE

291

): DataSink[T]

292

def output(outputFormat: OutputFormat[T]): DataSink[T]

293

}

294

```

295

296

```scala

297

val results = env.fromElements("Hello", "World", "Flink")

298

299

// Write as text file

300

results.writeAsText("output/results.txt")

301

302

// Write as CSV with custom delimiters

303

val csvData = env.fromElements(("Alice", 25), ("Bob", 30))

304

csvData.writeAsCsv("output/people.csv", fieldDelimiter = ";")

305

306

// Write with custom output format

307

results.write(new MyOutputFormat(), "output/custom.dat")

308

```

309

310

## First Operations

311

312

```scala { .api }

313

class DataSet[T] {

314

def first(n: Int): DataSet[T]

315

}

316

```

317

318

```scala

319

val numbers = env.fromElements(10, 5, 8, 3, 1, 9, 2)

320

321

// Get first 3 elements (order not guaranteed unless sorted)

322

val firstThree = numbers.first(3)

323

324

// Get first 3 after sorting

325

val topThree = numbers.sortPartition(0, Order.DESCENDING).first(3)

326

```

327

328

## Union Operations

329

330

```scala { .api }

331

class DataSet[T] {

332

def union(other: DataSet[T]): DataSet[T]

333

}

334

```

335

336

```scala

337

val set1 = env.fromElements(1, 2, 3)

338

val set2 = env.fromElements(4, 5, 6)

339

340

// Union two datasets

341

val combined = set1.union(set2)

342

343

// Chain multiple unions

344

val set3 = env.fromElements(7, 8, 9)

345

val allSets = set1.union(set2).union(set3)

346

```

347

348

## Configuration and Optimization

349

350

### Broadcast Variables

351

352

```scala { .api }

353

class DataSet[T] {

354

def withBroadcastSet(data: DataSet[_], name: String): DataSet[T]

355

}

356

```

357

358

```scala

359

val lookupData = env.fromElements(("A", 100), ("B", 200), ("C", 300))

360

val mainData = env.fromElements("A", "B", "C", "A")

361

362

// Use lookup data as broadcast variable

363

val enriched = mainData

364

.withBroadcastSet(lookupData, "lookup")

365

.map(new RichMapFunction[String, (String, Int)] {

366

var lookupMap: Map[String, Int] = _

367

368

override def open(parameters: Configuration): Unit = {

369

import scala.collection.JavaConverters._

370

val lookupCollection = getRuntimeContext

371

.getBroadcastVariable[(String, Int)]("lookup")

372

.asScala

373

lookupMap = lookupCollection.toMap

374

}

375

376

override def map(value: String): (String, Int) = {

377

(value, lookupMap.getOrElse(value, 0))

378

}

379

})

380

```

381

382

### Field Forwarding Hints

383

384

```scala { .api }

385

class DataSet[T] {

386

def withForwardedFields(forwardedFields: String*): DataSet[T]

387

def withForwardedFieldsFirst(forwardedFields: String*): DataSet[T]

388

def withForwardedFieldsSecond(forwardedFields: String*): DataSet[T]

389

}

390

```

391

392

```scala

393

val tuples = env.fromElements(("Alice", 25, "Engineer"), ("Bob", 30, "Manager"))

394

395

// Hint that first field is forwarded unchanged

396

val mapped = tuples

397

.withForwardedFields("0") // field 0 (name) is forwarded

398

.map(t => (t._1, t._2 + 1, t._3)) // only increment age

399

```

400

401

## Iteration Operations

402

403

```scala { .api }

404

class DataSet[T] {

405

def iterate(maxIterations: Int)(stepFunction: DataSet[T] => DataSet[T]): DataSet[T]

406

def iterateWithTermination(maxIterations: Int)(

407

stepFunction: DataSet[T] => (DataSet[T], DataSet[_])

408

): DataSet[T]

409

def iterateDelta[R: ClassTag](

410

workset: DataSet[R],

411

maxIterations: Int,

412

keyFields: Array[Int]

413

)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]

414

def iterateDelta[R: ClassTag](

415

workset: DataSet[R],

416

maxIterations: Int,

417

keyFields: Array[String]

418

)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]

419

def iterateDelta[R: ClassTag](

420

workset: DataSet[R],

421

maxIterations: Int,

422

keyFields: Array[Int]

423

)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R], DataSet[_])): DataSet[T]

424

}

425

```

426

427

```scala

428

val initial = env.fromElements(1.0)

429

430

// Simple iteration - compute power

431

val result = initial.iterate(10) { current =>

432

current.map(_ * 2)

433

}

434

435

// Iteration with termination criterion

436

val converged = initial.iterateWithTermination(100) { current =>

437

val next = current.map(x => (x + 2.0 / x) / 2.0) // Newton's method for sqrt(2)

438

val termination = current.filter(Math.abs(_) < 0.0001)

439

(next, termination)

440

}

441

```

442

443

## Types

444

445

```scala { .api }

446

import org.apache.flink.api.common.operators.Order

447

448

object Order extends Enumeration {

449

val ASCENDING, DESCENDING = Value

450

}

451

452

object FileSystem {

453

object WriteMode extends Enumeration {

454

val NO_OVERWRITE, OVERWRITE = Value

455

}

456

}

457

458

trait DataSink[T] {

459

def name(name: String): DataSink[T]

460

def setParallelism(parallelism: Int): DataSink[T]

461

}

462

463

class AggregateDataSet[T](private[flink] val set: DataSet[T], private[flink] val keys: Keys[T], private[flink] val aggregations: Seq[AggregationFunction[_]]) {

464

def and(agg: Aggregations, field: Int): AggregateDataSet[T]

465

def and(agg: Aggregations, field: String): AggregateDataSet[T]

466

def andSum(field: Int): AggregateDataSet[T]

467

def andSum(field: String): AggregateDataSet[T]

468

def andMax(field: Int): AggregateDataSet[T]

469

def andMax(field: String): AggregateDataSet[T]

470

def andMin(field: Int): AggregateDataSet[T]

471

def andMin(field: String): AggregateDataSet[T]

472

}

473

```