0
# Data Transformations
1
2
Core data processing operations for mapping, filtering, reducing, and transforming DataSets. These operations form the foundation of Flink's data processing capabilities.
3
4
## Capabilities
5
6
### Map Transformations
7
8
Transform each element in a DataSet to a new element, potentially of a different type.
9
10
```scala { .api }
11
class DataSet[T] {
12
/**
13
* Applies a transformation function to each element
14
* @param fun Transformation function T => R
15
* @return DataSet with transformed elements
16
*/
17
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
18
19
/**
20
* Applies a MapFunction to each element
21
* @param mapper MapFunction implementation
22
* @return DataSet with transformed elements
23
*/
24
def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R]
25
}
26
```
27
28
**Usage Examples:**
29
30
```scala
31
import org.apache.flink.api.scala._
32
33
val env = ExecutionEnvironment.getExecutionEnvironment
34
val numbers = env.fromElements(1, 2, 3, 4, 5)
35
36
// Simple transformation
37
val doubled = numbers.map(_ * 2)
38
39
// Type transformation
40
case class Person(name: String, age: Int)
41
val people = env.fromElements("Alice,25", "Bob,30")
42
val persons = people.map { line =>
43
val parts = line.split(",")
44
Person(parts(0), parts(1).toInt)
45
}
46
```
47
48
### FlatMap Transformations
49
50
Transform each element into zero or more elements, useful for splitting and expanding data.
51
52
```scala { .api }
53
class DataSet[T] {
54
/**
55
* Transforms each element into a traversable collection
56
* @param fun Function returning TraversableOnce[R]
57
* @return DataSet with flattened results
58
*/
59
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
60
61
/**
62
* Transforms using a FlatMapFunction with collector
63
* @param fun Function using collector to emit results
64
* @return DataSet with collected results
65
*/
66
def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataSet[R]
67
68
/**
69
* Applies a FlatMapFunction to each element
70
* @param flatMapper FlatMapFunction implementation
71
* @return DataSet with flattened results
72
*/
73
def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R]
74
}
75
```
76
77
**Usage Examples:**
78
79
```scala
80
val sentences = env.fromElements("Hello world", "Scala Flink", "Data processing")
81
82
// Split sentences into words
83
val words = sentences.flatMap(_.split(" "))
84
85
// Generate multiple values per input
86
val numbers = env.fromElements(1, 2, 3)
87
val expanded = numbers.flatMap(n => 1 to n)
88
// Result: [1, 1, 2, 1, 2, 3]
89
```
90
91
### MapPartition Transformations
92
93
Transform entire partitions at once, useful for initialization-heavy operations.
94
95
```scala { .api }
96
class DataSet[T] {
97
/**
98
* Transforms entire partitions using an iterator
99
* @param fun Function transforming Iterator[T] to TraversableOnce[R]
100
* @return DataSet with partition-wise transformations
101
*/
102
def mapPartition[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
103
104
/**
105
* Transforms partitions using iterator and collector
106
* @param fun Function with iterator input and collector output
107
* @return DataSet with collected partition results
108
*/
109
def mapPartition[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
110
111
/**
112
* Applies a MapPartitionFunction to each partition
113
* @param partitionMapper MapPartitionFunction implementation
114
* @return DataSet with partition transformations
115
*/
116
def mapPartition[R: TypeInformation: ClassTag](partitionMapper: MapPartitionFunction[T, R]): DataSet[R]
117
}
118
```
119
120
### Filter Operations
121
122
Select elements based on predicates, removing elements that don't match criteria.
123
124
```scala { .api }
125
class DataSet[T] {
126
/**
127
* Filters elements using a predicate function
128
* @param fun Predicate function returning Boolean
129
* @return DataSet with filtered elements
130
*/
131
def filter(fun: T => Boolean): DataSet[T]
132
133
/**
134
* Filters elements using a FilterFunction
135
* @param filter FilterFunction implementation
136
* @return DataSet with filtered elements
137
*/
138
def filter(filter: FilterFunction[T]): DataSet[T]
139
}
140
```
141
142
**Usage Examples:**
143
144
```scala
145
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
146
147
// Filter even numbers
148
val evenNumbers = numbers.filter(_ % 2 == 0)
149
150
// Filter with complex condition
151
case class Product(name: String, price: Double, inStock: Boolean)
152
val products = env.fromElements(
153
Product("Laptop", 999.99, true),
154
Product("Mouse", 29.99, false),
155
Product("Keyboard", 79.99, true)
156
)
157
158
val availableProducts = products.filter(p => p.inStock && p.price < 100)
159
```
160
161
### Reduce Operations
162
163
Combine elements using associative operations to produce aggregated results.
164
165
```scala { .api }
166
class DataSet[T] {
167
/**
168
* Reduces all elements using a combining function
169
* @param fun Binary combining function (T, T) => T
170
* @return DataSet with single reduced element
171
*/
172
def reduce(fun: (T, T) => T): DataSet[T]
173
174
/**
175
* Reduces elements using a ReduceFunction
176
* @param reducer ReduceFunction implementation
177
* @return DataSet with reduced result
178
*/
179
def reduce(reducer: ReduceFunction[T]): DataSet[T]
180
}
181
```
182
183
### ReduceGroup Operations
184
185
Process all elements together, potentially producing different output types.
186
187
```scala { .api }
188
class DataSet[T] {
189
/**
190
* Processes all elements as a group using iterator
191
* @param fun Function processing Iterator[T] to produce R
192
* @return DataSet with group processing result
193
*/
194
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
195
196
/**
197
* Processes groups using iterator and collector
198
* @param fun Function with iterator input and collector output
199
* @return DataSet with collected group results
200
*/
201
def reduceGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
202
203
/**
204
* Applies a GroupReduceFunction to process groups
205
* @param reducer GroupReduceFunction implementation
206
* @return DataSet with group reduction results
207
*/
208
def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
209
}
210
```
211
212
### CombineGroup Operations
213
214
Pre-aggregate elements within partitions before final grouping, improving performance.
215
216
```scala { .api }
217
class DataSet[T] {
218
/**
219
* Combines elements within partitions using iterator and collector
220
* @param fun Function for partition-wise combining
221
* @return DataSet with partition-combined results
222
*/
223
def combineGroup[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
224
225
/**
226
* Applies a GroupCombineFunction for partition-wise combining
227
* @param combiner GroupCombineFunction implementation
228
* @return DataSet with combined results
229
*/
230
def combineGroup[R: TypeInformation: ClassTag](combiner: GroupCombineFunction[T, R]): DataSet[R]
231
}
232
```
233
234
### Distinct Operations
235
236
Remove duplicate elements from DataSets with various key selection strategies.
237
238
```scala { .api }
239
class DataSet[T] {
240
/**
241
* Removes all duplicates from the DataSet
242
* @return DataSet with unique elements
243
*/
244
def distinct(): DataSet[T]
245
246
/**
247
* Removes duplicates based on field positions
248
* @param fields Field positions to consider for uniqueness
249
* @return DataSet with unique elements by fields
250
*/
251
def distinct(fields: Int*): DataSet[T]
252
253
/**
254
* Removes duplicates based on field names
255
* @param firstField First field name
256
* @param otherFields Additional field names
257
* @return DataSet with unique elements by named fields
258
*/
259
def distinct(firstField: String, otherFields: String*): DataSet[T]
260
261
/**
262
* Removes duplicates based on key selector function
263
* @param fun Key selector function
264
* @return DataSet with unique elements by key
265
*/
266
def distinct[K: TypeInformation](fun: T => K): DataSet[T]
267
}
268
```
269
270
**Usage Examples:**
271
272
```scala
273
val numbers = env.fromElements(1, 2, 2, 3, 3, 3, 4, 5)
274
275
// Remove all duplicates
276
val unique = numbers.distinct()
277
278
// Remove duplicates by key
279
case class Person(name: String, age: Int, city: String)
280
val people = env.fromElements(
281
Person("Alice", 25, "NYC"),
282
Person("Bob", 25, "NYC"),
283
Person("Alice", 30, "LA")
284
)
285
286
// Unique by name
287
val uniqueByName = people.distinct(_.name)
288
289
// Unique by age and city
290
val uniqueByAgeCity = people.distinct(p => (p.age, p.city))
291
```
292
293
### Selection Operations
294
295
Select specific elements or subsets from DataSets.
296
297
```scala { .api }
298
class DataSet[T] {
299
/**
300
* Selects the first n elements
301
* @param n Number of elements to select
302
* @return DataSet with first n elements
303
*/
304
def first(n: Int): DataSet[T]
305
306
/**
307
* Selects elements with minimum values in specified fields
308
* @param fields Field positions for minimum comparison
309
* @return DataSet with minimum elements
310
*/
311
def minBy(fields: Int*): DataSet[T]
312
313
/**
314
* Selects elements with maximum values in specified fields
315
* @param fields Field positions for maximum comparison
316
* @return DataSet with maximum elements
317
*/
318
def maxBy(fields: Int*): DataSet[T]
319
}
320
```
321
322
### Counting and Collection
323
324
Get counts and collect elements for inspection.
325
326
```scala { .api }
327
class DataSet[T] {
328
/**
329
* Counts the number of elements in the DataSet
330
* @return Number of elements
331
*/
332
def count(): Long
333
334
/**
335
* Collects all elements to the driver program
336
* @return Sequence containing all elements
337
*/
338
def collect(): Seq[T]
339
}
340
```
341
342
**Usage Examples:**
343
344
```scala
345
val data = env.fromElements(1, 2, 3, 4, 5)
346
347
// Count elements
348
val elementCount = data.count()
349
println(s"Total elements: $elementCount")
350
351
// Collect results (use carefully with large datasets)
352
val results = data.map(_ * 2).collect()
353
results.foreach(println)
354
```
355
356
## Types
357
358
```scala { .api }
359
abstract class MapFunction[T, O] extends Function {
360
def map(value: T): O
361
}
362
363
abstract class FlatMapFunction[T, O] extends Function {
364
def flatMap(value: T, out: Collector[O]): Unit
365
}
366
367
abstract class MapPartitionFunction[T, O] extends Function {
368
def mapPartition(values: java.lang.Iterable[T], out: Collector[O]): Unit
369
}
370
371
abstract class FilterFunction[T] extends Function {
372
def filter(value: T): Boolean
373
}
374
375
abstract class ReduceFunction[T] extends Function {
376
def reduce(value1: T, value2: T): T
377
}
378
379
abstract class GroupReduceFunction[T, O] extends Function {
380
def reduce(values: java.lang.Iterable[T], out: Collector[O]): Unit
381
}
382
383
abstract class GroupCombineFunction[T, O] extends Function {
384
def combine(values: java.lang.Iterable[T], out: Collector[O]): Unit
385
}
386
387
trait Collector[T] {
388
def collect(record: T): Unit
389
}
390
```