0
# Stream Transformations and Operations
1
2
DataStream provides comprehensive transformation operations for processing stream elements, including element-wise transformations, filtering, and stateful processing with type-safe functional interfaces.
3
4
## Basic Transformations
5
6
### Map Operations
7
8
```scala { .api }
9
class DataStream[T] {
10
def map[R: TypeInformation](fun: T => R): DataStream[R]
11
def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R]
12
}
13
```
14
15
Transform each element to a new element:
16
17
```scala
18
import org.apache.flink.streaming.api.scala._
19
import org.apache.flink.api.common.functions.MapFunction
20
21
val env = StreamExecutionEnvironment.getExecutionEnvironment
22
val stream = env.fromElements(1, 2, 3, 4, 5)
23
24
// Map with lambda function
25
val doubled = stream.map(_ * 2)
26
27
// Map with explicit function
28
val stringified = stream.map(x => s"Number: $x")
29
30
// Map with MapFunction interface
31
class SquareMapper extends MapFunction[Int, Int] {
32
override def map(value: Int): Int = value * value
33
}
34
val squared = stream.map(new SquareMapper)
35
36
// Map with case class transformation
37
case class User(id: Int, name: String)
38
val users = env.fromElements(1, 2, 3)
39
.map(id => User(id, s"User$id"))
40
```
41
42
### FlatMap Operations
43
44
```scala { .api }
45
class DataStream[T] {
46
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]
47
def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R]
48
def flatMap[R: TypeInformation](flatMapper: FlatMapFunction[T, R]): DataStream[R]
49
}
50
```
51
52
Transform each element to zero or more elements:
53
54
```scala
55
import org.apache.flink.util.Collector
56
import org.apache.flink.api.common.functions.FlatMapFunction
57
58
val env = StreamExecutionEnvironment.getExecutionEnvironment
59
val sentences = env.fromElements("Hello world", "How are you", "Flink streaming")
60
61
// FlatMap with lambda returning TraversableOnce
62
val words = sentences.flatMap(_.split("\\s+"))
63
64
// FlatMap with lambda using Collector
65
val wordsWithCollector = sentences.flatMap { (sentence, out) =>
66
sentence.split("\\s+").foreach(out.collect)
67
}
68
69
// FlatMap with FlatMapFunction
70
class WordSplitter extends FlatMapFunction[String, String] {
71
override def flatMap(sentence: String, out: Collector[String]): Unit = {
72
sentence.toLowerCase.split("\\W+")
73
.filter(_.nonEmpty)
74
.foreach(out.collect)
75
}
76
}
77
val cleanWords = sentences.flatMap(new WordSplitter)
78
79
// FlatMap for conditional emission
80
val evenNumbers = env.fromElements(1, 2, 3, 4, 5, 6)
81
.flatMap(x => if (x % 2 == 0) Some(x) else None)
82
```
83
84
### Filter Operations
85
86
```scala { .api }
87
class DataStream[T] {
88
def filter(fun: T => Boolean): DataStream[T]
89
def filter(filter: FilterFunction[T]): DataStream[T]
90
}
91
```
92
93
Filter elements based on predicates:
94
95
```scala
96
import org.apache.flink.api.common.functions.FilterFunction
97
98
val env = StreamExecutionEnvironment.getExecutionEnvironment
99
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
100
101
// Filter with lambda
102
val evenNumbers = numbers.filter(_ % 2 == 0)
103
104
// Filter with complex condition
105
case class Person(name: String, age: Int)
106
val people = env.fromElements(
107
Person("Alice", 25),
108
Person("Bob", 17),
109
Person("Charlie", 30)
110
)
111
val adults = people.filter(_.age >= 18)
112
113
// Filter with FilterFunction
114
class PositiveFilter extends FilterFunction[Int] {
115
override def filter(value: Int): Boolean = value > 0
116
}
117
val positives = numbers.filter(new PositiveFilter)
118
```
119
120
## Advanced Transformations
121
122
### Process Functions
123
124
```scala { .api }
125
class DataStream[T] {
126
def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]
127
}
128
```
129
130
Low-level processing with access to timers and state:
131
132
```scala
133
import org.apache.flink.streaming.api.functions.ProcessFunction
134
import org.apache.flink.streaming.api.functions.ProcessFunction.Context
135
import org.apache.flink.util.Collector
136
137
class EventProcessor extends ProcessFunction[String, String] {
138
override def processElement(
139
value: String,
140
ctx: Context,
141
out: Collector[String]
142
): Unit = {
143
// Access current timestamp
144
val timestamp = ctx.timestamp()
145
146
// Access current watermark
147
val watermark = ctx.timerService().currentWatermark()
148
149
// Register timer for 10 seconds from now
150
ctx.timerService().registerProcessingTimeTimer(
151
ctx.timerService().currentProcessingTime() + 10000
152
)
153
154
// Emit result
155
out.collect(s"Processed: $value at $timestamp")
156
}
157
158
override def onTimer(
159
timestamp: Long,
160
ctx: OnTimerContext,
161
out: Collector[String]
162
): Unit = {
163
out.collect(s"Timer fired at $timestamp")
164
}
165
}
166
167
val env = StreamExecutionEnvironment.getExecutionEnvironment
168
val processed = env.socketTextStream("localhost", 9999)
169
.process(new EventProcessor)
170
```
171
172
## Side Outputs
173
174
### OutputTag and Side Output Streams
175
176
```scala { .api }
177
class DataStream[T] {
178
def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X]
179
}
180
181
case class OutputTag[T](id: String)(implicit val typeInfo: TypeInformation[T])
182
```
183
184
Route different types of data to side outputs:
185
186
```scala
187
import org.apache.flink.streaming.api.scala.OutputTag
188
189
val env = StreamExecutionEnvironment.getExecutionEnvironment
190
191
// Define output tags
192
val evenTag = OutputTag[Int]("even-numbers")
193
val oddTag = OutputTag[Int]("odd-numbers")
194
195
class NumberSplitter extends ProcessFunction[Int, String] {
196
override def processElement(
197
value: Int,
198
ctx: Context,
199
out: Collector[String]
200
): Unit = {
201
if (value % 2 == 0) {
202
ctx.output(evenTag, value)
203
} else {
204
ctx.output(oddTag, value)
205
}
206
out.collect(s"Processed: $value")
207
}
208
}
209
210
val mainStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
211
.process(new NumberSplitter)
212
213
// Get side output streams
214
val evenNumbers = mainStream.getSideOutput(evenTag)
215
val oddNumbers = mainStream.getSideOutput(oddTag)
216
217
evenNumbers.print("Even")
218
oddNumbers.print("Odd")
219
mainStream.print("Main")
220
```
221
222
## Stream Splitting (Legacy)
223
224
### Split and Select Operations
225
226
```scala { .api }
227
class DataStream[T] {
228
def split(selector: OutputSelector[T]): SplitStream[T]
229
def split(fun: T => TraversableOnce[String]): SplitStream[T]
230
}
231
232
class SplitStream[T] {
233
def select(outputNames: String*): DataStream[T]
234
}
235
```
236
237
Split streams based on conditions (deprecated, use side outputs instead):
238
239
```scala
240
import org.apache.flink.streaming.api.collector.selector.OutputSelector
241
242
val env = StreamExecutionEnvironment.getExecutionEnvironment
243
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
244
245
// Split with OutputSelector
246
class EvenOddSelector extends OutputSelector[Int] {
247
override def select(value: Int): java.lang.Iterable[String] = {
248
if (value % 2 == 0) List("even").asJava else List("odd").asJava
249
}
250
}
251
252
val splitStream = numbers.split(new EvenOddSelector)
253
val evenStream = splitStream.select("even")
254
val oddStream = splitStream.select("odd")
255
256
// Split with lambda
257
val splitStreamLambda = numbers.split(value =>
258
if (value % 2 == 0) List("even") else List("odd")
259
)
260
```
261
262
## Stream Configuration
263
264
### Operator Configuration
265
266
```scala { .api }
267
class DataStream[T] {
268
def setParallelism(parallelism: Int): DataStream[T]
269
def setMaxParallelism(maxParallelism: Int): DataStream[T]
270
def name(name: String): DataStream[T]
271
def uid(uid: String): DataStream[T]
272
def setUidHash(hash: String): DataStream[T]
273
def disableChaining(): DataStream[T]
274
def startNewChain(): DataStream[T]
275
def slotSharingGroup(slotSharingGroup: String): DataStream[T]
276
def setBufferTimeout(timeoutMillis: Long): DataStream[T]
277
}
278
```
279
280
Configure transformation operators:
281
282
```scala
283
val env = StreamExecutionEnvironment.getExecutionEnvironment
284
val stream = env.fromElements(1, 2, 3, 4, 5)
285
286
val result = stream
287
.map(_ * 2)
288
.name("Doubler") // Set operator name
289
.uid("doubler-operator") // Set unique ID
290
.setParallelism(4) // Set parallelism
291
.disableChaining() // Disable operator chaining
292
.slotSharingGroup("group1") // Set slot sharing group
293
.filter(_ > 5)
294
.name("Filter Large Numbers")
295
.startNewChain() // Start new operator chain
296
```
297
298
## Error Handling in Transformations
299
300
### Exception Handling
301
302
```scala
303
import org.apache.flink.api.common.functions.RichMapFunction
304
import org.apache.flink.configuration.Configuration
305
306
class SafeMapper extends RichMapFunction[String, Int] {
307
override def map(value: String): Int = {
308
try {
309
value.toInt
310
} catch {
311
case _: NumberFormatException =>
312
// Log error or emit to side output
313
getRuntimeContext.getMetricGroup
314
.counter("parsing-errors")
315
.inc()
316
-1 // Default value
317
}
318
}
319
}
320
```
321
322
### Dead Letter Queue Pattern
323
324
```scala
325
val validTag = OutputTag[Int]("valid")
326
val invalidTag = OutputTag[String]("invalid")
327
328
class ValidatingProcessor extends ProcessFunction[String, String] {
329
override def processElement(
330
value: String,
331
ctx: Context,
332
out: Collector[String]
333
): Unit = {
334
try {
335
val number = value.toInt
336
ctx.output(validTag, number)
337
out.collect(s"Processed: $number")
338
} catch {
339
case _: NumberFormatException =>
340
ctx.output(invalidTag, value)
341
}
342
}
343
}
344
345
val env = StreamExecutionEnvironment.getExecutionEnvironment
346
val input = env.fromElements("1", "2", "invalid", "4", "not-a-number", "6")
347
348
val processed = input.process(new ValidatingProcessor)
349
val validNumbers = processed.getSideOutput(validTag)
350
val invalidInputs = processed.getSideOutput(invalidTag)
351
352
// Handle valid and invalid data separately
353
validNumbers.print("Valid")
354
invalidInputs.print("Invalid")
355
```
356
357
## Performance Considerations
358
359
### Operator Fusion and Chaining
360
361
```scala
362
val env = StreamExecutionEnvironment.getExecutionEnvironment
363
364
// Operations are chained together by default for efficiency
365
val stream = env.fromElements(1, 2, 3, 4, 5)
366
.map(_ * 2) // Chained with filter
367
.filter(_ > 5) // Chained with map
368
.map(_ + 1) // Chained with previous operations
369
370
// Explicit chaining control
371
val explicitChaining = env.fromElements(1, 2, 3, 4, 5)
372
.map(_ * 2)
373
.startNewChain() // Force new chain
374
.filter(_ > 5)
375
.disableChaining() // Disable chaining for this operator
376
.map(_ + 1)
377
```
378
379
### Memory and State Considerations
380
381
```scala
382
// Prefer stateless transformations when possible
383
val stateless = stream.map(_ * 2) // No state required
384
385
// Use process functions judiciously as they can maintain state
386
class StatefulProcessor extends ProcessFunction[Int, Int] {
387
// This would maintain state per key
388
override def processElement(value: Int, ctx: Context, out: Collector[Int]): Unit = {
389
// Processing logic
390
}
391
}
392
```
393
394
## Complete Example: Text Processing Pipeline
395
396
```scala
397
import org.apache.flink.streaming.api.scala._
398
import org.apache.flink.streaming.api.scala.OutputTag
399
import org.apache.flink.streaming.api.functions.ProcessFunction
400
import org.apache.flink.util.Collector
401
402
object TextProcessingPipeline {
403
404
case class WordCount(word: String, count: Int)
405
406
// Output tags for different types of words
407
val shortWordsTag = OutputTag[String]("short-words")
408
val longWordsTag = OutputTag[String]("long-words")
409
410
class WordClassifier extends ProcessFunction[String, WordCount] {
411
override def processElement(
412
word: String,
413
ctx: Context,
414
out: Collector[WordCount]
415
): Unit = {
416
val cleanWord = word.toLowerCase.trim
417
418
if (cleanWord.length < 4) {
419
ctx.output(shortWordsTag, cleanWord)
420
} else if (cleanWord.length > 8) {
421
ctx.output(longWordsTag, cleanWord)
422
}
423
424
out.collect(WordCount(cleanWord, 1))
425
}
426
}
427
428
def main(args: Array[String]): Unit = {
429
val env = StreamExecutionEnvironment.getExecutionEnvironment
430
431
// Read text input
432
val textStream = env.socketTextStream("localhost", 9999)
433
434
// Process text through transformation pipeline
435
val words = textStream
436
.flatMap(_.split("\\W+"))
437
.filter(_.nonEmpty)
438
.name("Word Splitter")
439
.setParallelism(4)
440
441
// Classify and count words
442
val wordCounts = words
443
.process(new WordClassifier)
444
.name("Word Classifier")
445
446
// Get side outputs
447
val shortWords = wordCounts.getSideOutput(shortWordsTag)
448
val longWords = wordCounts.getSideOutput(longWordsTag)
449
450
// Print results
451
wordCounts.print("Word Counts")
452
shortWords.print("Short Words")
453
longWords.print("Long Words")
454
455
env.execute("Text Processing Pipeline")
456
}
457
}
458
```