0
# Transformations
1
2
Apache Flink Scala API provides a rich set of transformation operations that enable functional programming patterns with type safety. All transformations are lazy and form a directed acyclic graph (DAG) that is executed when an action is triggered.
3
4
## Basic Transformations
5
6
### Map
7
8
Transform each element using a function, producing exactly one output element per input element.
9
10
```scala { .api }
11
class DataSet[T] {
12
// Using function literal
13
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
14
15
// Using MapFunction
16
def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R]
17
18
// Using RichMapFunction with RuntimeContext access
19
def map[R: TypeInformation: ClassTag](mapper: RichMapFunction[T, R]): DataSet[R]
20
}
21
```
22
23
### FlatMap
24
25
Transform each element into zero, one, or more output elements.
26
27
```scala { .api }
28
class DataSet[T] {
29
// Using function returning TraversableOnce
30
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
31
32
// Using function with Collector
33
def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataSet[R]
34
35
// Using FlatMapFunction
36
def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R]
37
38
// Using RichFlatMapFunction with RuntimeContext access
39
def flatMap[R: TypeInformation: ClassTag](flatMapper: RichFlatMapFunction[T, R]): DataSet[R]
40
}
41
```
42
43
### Filter
44
45
Keep only elements that satisfy a predicate function.
46
47
```scala { .api }
48
class DataSet[T] {
49
// Using boolean function
50
def filter(fun: T => Boolean): DataSet[T]
51
52
// Using FilterFunction
53
def filter(filter: FilterFunction[T]): DataSet[T]
54
55
// Using RichFilterFunction with RuntimeContext access
56
def filter(filter: RichFilterFunction[T]): DataSet[T]
57
}
58
```
59
60
## Partition-wise Transformations
61
62
### MapPartition
63
64
Transform entire partitions at once, useful for expensive initialization operations.
65
66
```scala { .api }
67
class DataSet[T] {
68
// Using function on Iterator
69
def mapPartition[R: TypeInformation: ClassTag](fun: Iterator[T] => TraversableOnce[R]): DataSet[R]
70
71
// Using function with Collector
72
def mapPartition[R: TypeInformation: ClassTag](fun: (Iterator[T], Collector[R]) => Unit): DataSet[R]
73
74
// Using MapPartitionFunction
75
def mapPartition[R: TypeInformation: ClassTag](partitionMapper: MapPartitionFunction[T, R]): DataSet[R]
76
}
77
```
78
79
## Transformations with Broadcast Variables
80
81
### MapWithBroadcastSet
82
83
Transform elements with access to broadcast variables for enrichment or lookup operations.
84
85
```scala { .api }
86
class DataSet[T] {
87
def mapWithBroadcastSet[R: TypeInformation: ClassTag](
88
fun: (T, Iterable[_]) => R,
89
broadcastSetName: String
90
): DataSet[R]
91
}
92
```
93
94
### FlatMapWithBroadcastSet
95
96
```scala { .api }
97
class DataSet[T] {
98
def flatMapWithBroadcastSet[R: TypeInformation: ClassTag](
99
fun: (T, Iterable[_], Collector[R]) => Unit,
100
broadcastSetName: String
101
): DataSet[R]
102
}
103
```
104
105
### MapPartitionWithBroadcastSet
106
107
```scala { .api }
108
class DataSet[T] {
109
def mapPartitionWithBroadcastSet[R: TypeInformation: ClassTag](
110
fun: (Iterator[T], Iterable[_], Collector[R]) => Unit,
111
broadcastSetName: String
112
): DataSet[R]
113
}
114
```
115
116
### FlatMapPartitionWithBroadcastSet
117
118
```scala { .api }
119
class DataSet[T] {
120
def flatMapPartitionWithBroadcastSet[R: TypeInformation: ClassTag](
121
fun: (Iterator[T], Iterable[_], Collector[R]) => Unit,
122
broadcastSetName: String
123
): DataSet[R]
124
}
125
```
126
127
### FilterWithBroadcastSet
128
129
```scala { .api }
130
class DataSet[T] {
131
def filterWithBroadcastSet(
132
fun: (T, Iterable[_]) => Boolean,
133
broadcastSetName: String
134
): DataSet[T]
135
}
136
```
137
138
## Reduce Operations
139
140
### Reduce
141
142
Combine elements using an associative and commutative function.
143
144
```scala { .api }
145
class DataSet[T] {
146
// Using function
147
def reduce(fun: (T, T) => T): DataSet[T]
148
149
// Using ReduceFunction
150
def reduce(reducer: ReduceFunction[T]): DataSet[T]
151
152
// Using RichReduceFunction with RuntimeContext access
153
def reduce(reducer: RichReduceFunction[T]): DataSet[T]
154
}
155
```
156
157
## Set Operations
158
159
### Union
160
161
Combine multiple DataSets into one containing all elements.
162
163
```scala { .api }
164
class DataSet[T] {
165
def union(other: DataSet[T]*): DataSet[T]
166
}
167
```
168
169
### Distinct
170
171
Remove duplicate elements based on equality or key extraction.
172
173
```scala { .api }
174
class DataSet[T] {
175
// Distinct by element equality
176
def distinct(): DataSet[T]
177
178
// Distinct by key function
179
def distinct[K: TypeInformation](fun: T => K): DataSet[T]
180
181
// Distinct by field positions
182
def distinct(fields: Int*): DataSet[T]
183
184
// Distinct by field names
185
def distinct(firstField: String, otherFields: String*): DataSet[T]
186
}
187
```
188
189
## Sorting and Partitioning
190
191
### SortPartition
192
193
Sort elements within each partition.
194
195
```scala { .api }
196
class DataSet[T] {
197
// Sort by key function
198
def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
199
200
// Sort by field position
201
def sortPartition(field: Int, order: Order): DataSet[T]
202
203
// Sort by field name
204
def sortPartition(field: String, order: Order): DataSet[T]
205
}
206
```
207
208
### Partitioning Operations
209
210
Control how data is distributed across parallel instances.
211
212
```scala { .api }
213
class DataSet[T] {
214
// Hash partitioning
215
def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]
216
def partitionByHash(fields: Int*): DataSet[T]
217
def partitionByHash(firstField: String, otherFields: String*): DataSet[T]
218
219
// Range partitioning
220
def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]
221
def partitionByRange(fields: Int*): DataSet[T]
222
def partitionByRange(firstField: String, otherFields: String*): DataSet[T]
223
224
// Custom partitioning
225
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]
226
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fields: Int*): DataSet[T]
227
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], firstField: String, otherFields: String*): DataSet[T]
228
229
// Round-robin rebalancing
230
def rebalance(): DataSet[T]
231
}
232
```
233
234
## Sampling
235
236
### Sample
237
238
Extract a random sample of elements from the DataSet.
239
240
```scala { .api }
241
class DataSet[T] {
242
// Sample with fraction
243
def sample(withReplacement: Boolean, fraction: Double): DataSet[T]
244
def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataSet[T]
245
}
246
```
247
248
### First
249
250
Get the first n elements.
251
252
```scala { .api }
253
class DataSet[T] {
254
def first(n: Int): DataSet[T]
255
}
256
```
257
258
## Configuration Operations
259
260
### Naming and Parallelism
261
262
```scala { .api }
263
class DataSet[T] {
264
// Set operation name for debugging
265
def name(name: String): DataSet[T]
266
267
// Set parallelism for this operation
268
def setParallelism(parallelism: Int): DataSet[T]
269
def getParallelism: Int
270
}
271
```
272
273
### Resource Configuration
274
275
```scala { .api }
276
class DataSet[T] {
277
// Get resource specifications
278
def minResources: ResourceSpec
279
def preferredResources: ResourceSpec
280
281
// Set resource requirements
282
def setResources(minResources: ResourceSpec, preferredResources: ResourceSpec): DataSet[T]
283
}
284
```
285
286
### Broadcast Variables
287
288
```scala { .api }
289
class DataSet[T] {
290
// Add broadcast variable
291
def withBroadcastSet(data: DataSet[_], name: String): DataSet[T]
292
}
293
```
294
295
## Usage Examples
296
297
### Basic Transformations
298
299
```scala
300
import org.apache.flink.api.scala._
301
302
val env = ExecutionEnvironment.getExecutionEnvironment
303
304
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
305
306
// Map: square each number
307
val squares = numbers.map(x => x * x)
308
309
// Filter: keep only even numbers
310
val evens = numbers.filter(_ % 2 == 0)
311
312
// FlatMap: split range into individual numbers
313
val ranges = env.fromElements("1-3", "4-6", "7-9")
314
val flatMapped = ranges.flatMap { range =>
315
val parts = range.split("-")
316
val start = parts(0).toInt
317
val end = parts(1).toInt
318
start to end
319
}
320
```
321
322
### Working with Case Classes
323
324
```scala
325
import org.apache.flink.api.scala._
326
327
case class Person(name: String, age: Int, city: String)
328
329
val env = ExecutionEnvironment.getExecutionEnvironment
330
val people = env.fromElements(
331
Person("Alice", 25, "New York"),
332
Person("Bob", 30, "London"),
333
Person("Charlie", 35, "Paris")
334
)
335
336
// Map: extract names
337
val names = people.map(_.name)
338
339
// Filter: adults only
340
val adults = people.filter(_.age >= 18)
341
342
// Transform: create summary
343
val summaries = people.map(p => s"${p.name} (${p.age}) from ${p.city}")
344
```
345
346
### Partition Operations
347
348
```scala
349
import org.apache.flink.api.scala._
350
import org.apache.flink.api.common.operators.Order
351
352
val env = ExecutionEnvironment.getExecutionEnvironment
353
354
case class Sale(product: String, amount: Double, region: String)
355
356
val sales = env.fromElements(
357
Sale("laptop", 1000.0, "US"),
358
Sale("phone", 500.0, "EU"),
359
Sale("tablet", 300.0, "US")
360
)
361
362
// Partition by region for co-location
363
val partitioned = sales.partitionByHash(_.region)
364
365
// Sort within partitions by amount
366
val sorted = sales.sortPartition(_.amount, Order.DESCENDING)
367
368
// Rebalance for load distribution
369
val rebalanced = sales.rebalance()
370
```
371
372
### Broadcast Variables
373
374
```scala
375
import org.apache.flink.api.scala._
376
377
val env = ExecutionEnvironment.getExecutionEnvironment
378
379
// Lookup data
380
val exchangeRates = env.fromElements(
381
("USD", 1.0),
382
("EUR", 0.85),
383
("GBP", 0.73)
384
)
385
386
// Transaction data
387
val transactions = env.fromElements(
388
("USD", 100.0),
389
("EUR", 85.0),
390
("GBP", 73.0)
391
)
392
393
// Convert to USD using broadcast variable
394
val convertedTransactions = transactions
395
.map { case (currency, amount) =>
396
// This would access broadcast variable in real implementation
397
amount * 1.0 // Simplified example
398
}
399
.withBroadcastSet(exchangeRates, "rates")
400
```
401
402
### MapPartition for Expensive Operations
403
404
```scala
405
import org.apache.flink.api.scala._
406
407
val env = ExecutionEnvironment.getExecutionEnvironment
408
val data = env.fromElements("hello", "world", "flink", "scala")
409
410
// Expensive initialization per partition
411
val processed = data.mapPartition { iter =>
412
// Expensive setup (done once per partition)
413
val expensiveResource = initializeExpensiveResource()
414
415
// Process all elements in partition
416
iter.map { element =>
417
expensiveResource.process(element)
418
}
419
}
420
421
def initializeExpensiveResource(): AnyRef = {
422
// Simulate expensive initialization
423
new AnyRef
424
}
425
```
426
427
## Rich Functions
428
429
Rich functions provide access to the RuntimeContext for advanced features like accumulators, broadcast variables, and execution parameters.
430
431
### Rich Function Types
432
433
```scala { .api }
434
// Rich function base classes
435
abstract class RichMapFunction[T, O] extends MapFunction[T, O] with RichFunction
436
abstract class RichFlatMapFunction[T, O] extends FlatMapFunction[T, O] with RichFunction
437
abstract class RichFilterFunction[T] extends FilterFunction[T] with RichFunction
438
abstract class RichReduceFunction[T] extends ReduceFunction[T] with RichFunction
439
abstract class RichGroupReduceFunction[T, O] extends GroupReduceFunction[T, O] with RichFunction
440
abstract class RichMapPartitionFunction[T, O] extends MapPartitionFunction[T, O] with RichFunction
441
442
// RichFunction interface
443
trait RichFunction {
444
def getRuntimeContext: RuntimeContext
445
def open(parameters: Configuration): Unit = {}
446
def close(): Unit = {}
447
def setRuntimeContext(t: RuntimeContext): Unit
448
}
449
450
// RuntimeContext interface
451
trait RuntimeContext {
452
def getExecutionConfig: ExecutionConfig
453
def getJobId: JobID
454
def getTaskName: String
455
def getTaskNameWithSubtasks: String
456
def getNumberOfParallelSubtasks: Int
457
def getIndexOfThisSubtask: Int
458
def getAttemptNumber: Int
459
460
// Accumulators
461
def getAccumulator[V, A](name: String): A
462
def getIntCounter(name: String): IntCounter
463
def getLongCounter(name: String): LongCounter
464
def getDoubleCounter(name: String): DoubleCounter
465
def getHistogram(name: String): Histogram
466
467
// Broadcast variables
468
def getBroadcastVariable[T](name: String): util.List[T]
469
def getBroadcastVariableWithInitializer[T, C](name: String, initializer: BroadcastVariableInitializer[T, C]): C
470
}
471
```
472
473
### Rich Function Usage Examples
474
475
```scala
476
import org.apache.flink.api.scala._
477
import org.apache.flink.api.common.functions.RichMapFunction
478
import org.apache.flink.configuration.Configuration
479
480
val env = ExecutionEnvironment.getExecutionEnvironment
481
482
// Rich function with accumulator access
483
class CountingMapper extends RichMapFunction[String, String] {
484
var counter: IntCounter = _
485
486
override def open(parameters: Configuration): Unit = {
487
counter = getRuntimeContext.getIntCounter("processed-records")
488
}
489
490
override def map(value: String): String = {
491
counter.add(1)
492
value.toUpperCase
493
}
494
}
495
496
val data = env.fromElements("hello", "world", "flink")
497
val result = data.map(new CountingMapper())
498
499
// Rich function with broadcast variables
500
class EnrichingMapper extends RichMapFunction[String, String] {
501
var broadcastData: util.List[String] = _
502
503
override def open(parameters: Configuration): Unit = {
504
broadcastData = getRuntimeContext.getBroadcastVariable("lookup-data")
505
}
506
507
override def map(value: String): String = {
508
// Use broadcast data for enrichment
509
val enrichment = if (broadcastData.contains(value)) " [FOUND]" else " [NOT_FOUND]"
510
value + enrichment
511
}
512
}
513
514
val lookupData = env.fromElements("hello", "flink")
515
val enriched = data
516
.map(new EnrichingMapper())
517
.withBroadcastSet(lookupData, "lookup-data")
518
```