0
# DataSet Operations
1
2
DataSet is the main abstraction for distributed data collections in Flink. It provides immutable transformation operations that create new DataSets.
3
4
## Basic Properties
5
6
```scala { .api }
7
class DataSet[T] {
8
def getType: TypeInformation[T]
9
def getExecutionEnvironment: ExecutionEnvironment
10
def getParallelism: Int
11
def setParallelism(parallelism: Int): DataSet[T]
12
def name(name: String): DataSet[T]
13
def setDescription(description: String): DataSet[T]
14
}
15
```
16
17
## Basic Transformations
18
19
### Map Operations
20
21
```scala { .api }
22
class DataSet[T] {
23
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
24
def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R]
25
def mapPartition[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
26
def mapPartition[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
27
def mapPartition[R: TypeInformation: ClassTag](mapPartitionFunction: MapPartitionFunction[T, R]): DataSet[R]
28
}
29
```
30
31
#### Usage Examples
32
33
```scala
34
val numbers = env.fromElements(1, 2, 3, 4, 5)
35
36
// Simple map with lambda
37
val doubled = numbers.map(_ * 2)
38
39
// Map with function
40
val squared = numbers.map(x => x * x)
41
42
// Map to different type
43
val numberStrings = numbers.map(n => s"Number: $n")
44
45
// Map partition with collector (process entire partition at once)
46
val processed = numbers.mapPartition { (iterator, collector) =>
47
val batch = iterator.toList
48
batch.foreach(x => collector.collect(x * 10))
49
}
50
51
// Map partition returning collection (alternative approach)
52
val processedAlt = numbers.mapPartition { iterator =>
53
iterator.toList.map(_ * 10)
54
}
55
```
56
57
### FlatMap Operations
58
59
```scala { .api }
60
class DataSet[T] {
61
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
62
def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R]
63
}
64
```
65
66
```scala
67
val sentences = env.fromElements("Hello World", "Flink Scala API", "Distributed Processing")
68
69
// Split sentences into words
70
val words = sentences.flatMap(_.split(" "))
71
72
// FlatMap with filtering
73
val longWords = sentences.flatMap(_.split(" ").filter(_.length > 4))
74
75
// FlatMap with collections
76
val chars = words.flatMap(_.toCharArray)
77
```
78
79
### Filter Operations
80
81
```scala { .api }
82
class DataSet[T] {
83
def filter(fun: T => Boolean): DataSet[T]
84
def filter(filter: FilterFunction[T]): DataSet[T]
85
}
86
```
87
88
```scala
89
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
90
91
// Filter even numbers
92
val evenNumbers = numbers.filter(_ % 2 == 0)
93
94
// Filter with complex condition
95
val filtered = numbers.filter(x => x > 3 && x < 8)
96
```
97
98
## Distinct Operations
99
100
```scala { .api }
101
class DataSet[T] {
102
def distinct(): DataSet[T]
103
def distinct(fields: Int*): DataSet[T]
104
def distinct(firstField: String, otherFields: String*): DataSet[T]
105
def distinct[K: TypeInformation](fun: T => K): DataSet[T]
106
}
107
```
108
109
```scala
110
val duplicates = env.fromElements(1, 2, 2, 3, 3, 3, 4)
111
112
// Remove all duplicates
113
val unique = duplicates.distinct()
114
115
// Distinct by specific fields (for tuples/case classes)
116
val people = env.fromElements(("Alice", 25), ("Bob", 30), ("Alice", 35))
117
val distinctNames = people.distinct(0) // distinct by first field (name)
118
119
// Distinct by key function
120
case class Person(name: String, age: Int)
121
val personData = env.fromElements(Person("Alice", 25), Person("Bob", 30), Person("Alice", 35))
122
val distinctByName = personData.distinct(_.name)
123
```
124
125
## Aggregation Operations
126
127
### Basic Aggregations
128
129
```scala { .api }
130
class DataSet[T] {
131
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
132
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
133
def sum(field: Int): AggregateDataSet[T]
134
def sum(field: String): AggregateDataSet[T]
135
def max(field: Int): AggregateDataSet[T]
136
def max(field: String): AggregateDataSet[T]
137
def min(field: Int): AggregateDataSet[T]
138
def min(field: String): AggregateDataSet[T]
139
def maxBy(fields: Int*): DataSet[T]
140
def maxBy(firstField: String, otherFields: String*): DataSet[T]
141
def minBy(fields: Int*): DataSet[T]
142
def minBy(firstField: String, otherFields: String*): DataSet[T]
143
}
144
```
145
146
```scala
147
val sales = env.fromElements(
148
("Q1", 1000), ("Q2", 1500), ("Q3", 1200), ("Q4", 1800)
149
)
150
151
// Sum by field index
152
val totalSales = sales.sum(1)
153
154
// Max by field
155
val maxSales = sales.max(1)
156
157
// MaxBy - return entire record with maximum value
158
val bestQuarter = sales.maxBy(1)
159
```
160
161
### Reduce Operations
162
163
```scala { .api }
164
class DataSet[T] {
165
def reduce(fun: (T, T) => T): DataSet[T]
166
def reduce(reducer: ReduceFunction[T]): DataSet[T]
167
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
168
def reduceGroup[R: TypeInformation: ClassTag](groupReducer: GroupReduceFunction[T, R]): DataSet[R]
169
def combineGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
170
def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
171
}
172
```
173
174
```scala
175
val numbers = env.fromElements(1, 2, 3, 4, 5)
176
177
// Simple reduce - sum all numbers
178
val sum = numbers.reduce(_ + _)
179
180
// Reduce with more complex logic
181
val product = numbers.reduce((a, b) => a * b)
182
183
// Reduce group (process all elements together)
184
val statistics = numbers.reduceGroup { iterator =>
185
val values = iterator.toList
186
val sum = values.sum
187
val count = values.length
188
val avg = sum.toDouble / count
189
(sum, count, avg)
190
}
191
```
192
193
## Partitioning Operations
194
195
```scala { .api }
196
class DataSet[T] {
197
def partitionByHash(fields: Int*): DataSet[T]
198
def partitionByHash(firstField: String, otherFields: String*): DataSet[T]
199
def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]
200
def partitionByRange(fields: Int*): DataSet[T]
201
def partitionByRange(firstField: String, otherFields: String*): DataSet[T]
202
def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]
203
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]
204
def rebalance(): DataSet[T]
205
}
206
```
207
208
```scala
209
val data = env.fromElements(("A", 1), ("B", 2), ("C", 3), ("D", 4))
210
211
// Hash partition by first field
212
val hashPartitioned = data.partitionByHash(0)
213
214
// Range partition by second field
215
val rangePartitioned = data.partitionByRange(1)
216
217
// Custom partitioning
218
val customPartitioned = data.partitionCustom(new MyPartitioner(), _._1)
219
220
// Rebalance (round-robin distribution)
221
val rebalanced = data.rebalance()
222
```
223
224
### Sort Partition
225
226
```scala { .api }
227
class DataSet[T] {
228
def sortPartition(field: Int, order: Order): DataSet[T]
229
def sortPartition(field: String, order: Order): DataSet[T]
230
def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
231
}
232
```
233
234
```scala
235
import org.apache.flink.api.common.operators.Order
236
237
val data = env.fromElements(("Alice", 25), ("Bob", 30), ("Charlie", 20))
238
239
// Sort partitions by age in ascending order
240
val sortedByAge = data.sortPartition(1, Order.ASCENDING)
241
242
// Sort by key function
243
val sortedByName = data.sortPartition(_._1, Order.DESCENDING)
244
```
245
246
## Output Operations
247
248
### Collect and Print
249
250
```scala { .api }
251
class DataSet[T] {
252
def collect(): Seq[T]
253
def count(): Long
254
def print(): Unit
255
def printToErr(): Unit
256
def printOnTaskManager(sinkIdentifier: String): DataSink[T]
257
}
258
```
259
260
```scala
261
val numbers = env.fromElements(1, 2, 3, 4, 5)
262
263
// Collect to local collection (triggers execution)
264
val results: Seq[Int] = numbers.collect()
265
266
// Count elements
267
val elementCount: Long = numbers.count()
268
269
// Print to console
270
numbers.print()
271
272
// Print to stderr
273
numbers.printToErr()
274
```
275
276
### File Output
277
278
```scala { .api }
279
class DataSet[T] {
280
def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE): DataSink[T]
281
def writeAsCsv(
282
filePath: String,
283
rowDelimiter: String = "\n",
284
fieldDelimiter: String = ",",
285
writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE
286
): DataSink[T]
287
def write[O <: FileOutputFormat[T]](
288
outputFormat: O,
289
filePath: String,
290
writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE
291
): DataSink[T]
292
def output(outputFormat: OutputFormat[T]): DataSink[T]
293
}
294
```
295
296
```scala
297
val results = env.fromElements("Hello", "World", "Flink")
298
299
// Write as text file
300
results.writeAsText("output/results.txt")
301
302
// Write as CSV with custom delimiters
303
val csvData = env.fromElements(("Alice", 25), ("Bob", 30))
304
csvData.writeAsCsv("output/people.csv", fieldDelimiter = ";")
305
306
// Write with custom output format
307
results.write(new MyOutputFormat(), "output/custom.dat")
308
```
309
310
## First Operations
311
312
```scala { .api }
313
class DataSet[T] {
314
def first(n: Int): DataSet[T]
315
}
316
```
317
318
```scala
319
val numbers = env.fromElements(10, 5, 8, 3, 1, 9, 2)
320
321
// Get first 3 elements (order not guaranteed unless sorted)
322
val firstThree = numbers.first(3)
323
324
// Get first 3 after sorting
325
val topThree = numbers.sortPartition(0, Order.DESCENDING).first(3)
326
```
327
328
## Union Operations
329
330
```scala { .api }
331
class DataSet[T] {
332
def union(other: DataSet[T]): DataSet[T]
333
}
334
```
335
336
```scala
337
val set1 = env.fromElements(1, 2, 3)
338
val set2 = env.fromElements(4, 5, 6)
339
340
// Union two datasets
341
val combined = set1.union(set2)
342
343
// Chain multiple unions
344
val set3 = env.fromElements(7, 8, 9)
345
val allSets = set1.union(set2).union(set3)
346
```
347
348
## Configuration and Optimization
349
350
### Broadcast Variables
351
352
```scala { .api }
353
class DataSet[T] {
354
def withBroadcastSet(data: DataSet[_], name: String): DataSet[T]
355
}
356
```
357
358
```scala
359
val lookupData = env.fromElements(("A", 100), ("B", 200), ("C", 300))
360
val mainData = env.fromElements("A", "B", "C", "A")
361
362
// Use lookup data as broadcast variable
363
val enriched = mainData
364
.withBroadcastSet(lookupData, "lookup")
365
.map(new RichMapFunction[String, (String, Int)] {
366
var lookupMap: Map[String, Int] = _
367
368
override def open(parameters: Configuration): Unit = {
369
import scala.collection.JavaConverters._
370
val lookupCollection = getRuntimeContext
371
.getBroadcastVariable[(String, Int)]("lookup")
372
.asScala
373
lookupMap = lookupCollection.toMap
374
}
375
376
override def map(value: String): (String, Int) = {
377
(value, lookupMap.getOrElse(value, 0))
378
}
379
})
380
```
381
382
### Field Forwarding Hints
383
384
```scala { .api }
385
class DataSet[T] {
386
def withForwardedFields(forwardedFields: String*): DataSet[T]
387
def withForwardedFieldsFirst(forwardedFields: String*): DataSet[T]
388
def withForwardedFieldsSecond(forwardedFields: String*): DataSet[T]
389
}
390
```
391
392
```scala
393
val tuples = env.fromElements(("Alice", 25, "Engineer"), ("Bob", 30, "Manager"))
394
395
// Hint that first field is forwarded unchanged
396
val mapped = tuples
397
.withForwardedFields("0") // field 0 (name) is forwarded
398
.map(t => (t._1, t._2 + 1, t._3)) // only increment age
399
```
400
401
## Iteration Operations
402
403
```scala { .api }
404
class DataSet[T] {
405
def iterate(maxIterations: Int)(stepFunction: DataSet[T] => DataSet[T]): DataSet[T]
406
def iterateWithTermination(maxIterations: Int)(
407
stepFunction: DataSet[T] => (DataSet[T], DataSet[_])
408
): DataSet[T]
409
def iterateDelta[R: ClassTag](
410
workset: DataSet[R],
411
maxIterations: Int,
412
keyFields: Array[Int]
413
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
414
def iterateDelta[R: ClassTag](
415
workset: DataSet[R],
416
maxIterations: Int,
417
keyFields: Array[String]
418
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
419
def iterateDelta[R: ClassTag](
420
workset: DataSet[R],
421
maxIterations: Int,
422
keyFields: Array[Int]
423
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R], DataSet[_])): DataSet[T]
424
}
425
```
426
427
```scala
428
val initial = env.fromElements(1.0)
429
430
// Simple iteration - compute power
431
val result = initial.iterate(10) { current =>
432
current.map(_ * 2)
433
}
434
435
// Iteration with termination criterion
436
val converged = initial.iterateWithTermination(100) { current =>
437
val next = current.map(x => (x + 2.0 / x) / 2.0) // Newton's method for sqrt(2)
438
val termination = current.filter(Math.abs(_) < 0.0001)
439
(next, termination)
440
}
441
```
442
443
## Types
444
445
```scala { .api }
446
import org.apache.flink.api.common.operators.Order
447
448
object Order extends Enumeration {
449
val ASCENDING, DESCENDING = Value
450
}
451
452
object FileSystem {
453
object WriteMode extends Enumeration {
454
val NO_OVERWRITE, OVERWRITE = Value
455
}
456
}
457
458
trait DataSink[T] {
459
def name(name: String): DataSink[T]
460
def setParallelism(parallelism: Int): DataSink[T]
461
}
462
463
class AggregateDataSet[T](private[flink] val set: DataSet[T], private[flink] val keys: Keys[T], private[flink] val aggregations: Seq[AggregationFunction[_]]) {
464
def and(agg: Aggregations, field: Int): AggregateDataSet[T]
465
def and(agg: Aggregations, field: String): AggregateDataSet[T]
466
def andSum(field: Int): AggregateDataSet[T]
467
def andSum(field: String): AggregateDataSet[T]
468
def andMax(field: Int): AggregateDataSet[T]
469
def andMax(field: String): AggregateDataSet[T]
470
def andMin(field: Int): AggregateDataSet[T]
471
def andMin(field: String): AggregateDataSet[T]
472
}
473
```