0
# DataStream Transformations
1
2
DataStream transformations are the core operations for processing unbounded streams of data in Apache Flink. These operations transform one or more DataStreams into new DataStreams, enabling complex data processing pipelines.
3
4
## Capabilities
5
6
### Basic Transformations
7
8
Transform individual elements in the stream using map, filter, and flatMap operations.
9
10
```java { .api }
11
/**
12
* Apply a MapFunction to transform each element
13
* @param mapper - the map function to apply
14
* @return transformed DataStream
15
*/
16
<R> DataStream<R> map(MapFunction<T, R> mapper);
17
18
/**
19
* Filter elements based on a predicate
20
* @param filter - the filter function
21
* @return filtered DataStream
22
*/
23
DataStream<T> filter(FilterFunction<T> filter);
24
25
/**
26
* Apply a FlatMapFunction that can produce zero, one, or more elements for each input
27
* @param flatMapper - the flatmap function
28
* @return transformed DataStream
29
*/
30
<R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper);
31
```
32
33
**Usage Examples:**
34
35
```java
36
DataStream<String> text = env.fromElements("hello world", "flink streaming");
37
38
// Map transformation - convert to uppercase
39
DataStream<String> upperCase = text.map(new MapFunction<String, String>() {
40
@Override
41
public String map(String value) {
42
return value.toUpperCase();
43
}
44
});
45
46
// Using lambda expressions
47
DataStream<String> upperCase = text.map(String::toUpperCase);
48
49
// Filter transformation - keep only strings with more than 5 characters
50
DataStream<String> filtered = text.filter(s -> s.length() > 5);
51
52
// FlatMap transformation - split sentences into words
53
DataStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
54
@Override
55
public void flatMap(String sentence, Collector<String> out) {
56
for (String word : sentence.split(" ")) {
57
out.collect(word);
58
}
59
}
60
});
61
62
// Using lambda expressions
63
DataStream<String> words = text.flatMap(
64
(sentence, out) -> Arrays.stream(sentence.split(" ")).forEach(out::collect)
65
);
66
```
67
68
### Stream Partitioning
69
70
Control how data is distributed across parallel instances of operators.
71
72
```java { .api }
73
/**
74
* Partition the stream by key
75
* @param key - the key selector function
76
* @return KeyedStream partitioned by the key
77
*/
78
<K> KeyedStream<T, K> keyBy(KeySelector<T, K> key);
79
80
/**
81
* Partition by field positions (for Tuple types)
82
* @param fields - field positions to partition by
83
* @return KeyedStream partitioned by the fields
84
*/
85
KeyedStream<T, Tuple> keyBy(int... fields);
86
87
/**
88
* Partition by field names (for POJO types)
89
* @param fields - field names to partition by
90
* @return KeyedStream partitioned by the fields
91
*/
92
KeyedStream<T, Tuple> keyBy(String... fields);
93
94
/**
95
* Random partitioning - elements are randomly distributed
96
* @return randomly partitioned DataStream
97
*/
98
DataStream<T> shuffle();
99
100
/**
101
* Round-robin partitioning - elements are distributed in round-robin fashion
102
* @return rebalanced DataStream
103
*/
104
DataStream<T> rebalance();
105
106
/**
107
* Rescale partitioning - locally rebalance between upstream and downstream operators
108
* @return rescaled DataStream
109
*/
110
DataStream<T> rescale();
111
112
/**
113
* Broadcast - send elements to all downstream operators
114
* @return broadcasted DataStream
115
*/
116
DataStream<T> broadcast();
117
118
/**
119
* Forward partitioning - send elements to the next operator in the same subtask
120
* @return forwarded DataStream
121
*/
122
DataStream<T> forward();
123
124
/**
125
* Global partitioning - send all elements to the first instance of the next operator
126
* @return globally partitioned DataStream
127
*/
128
DataStream<T> global();
129
```
130
131
**Usage Examples:**
132
133
```java
134
DataStream<Tuple2<String, Integer>> tuples = env.fromElements(
135
Tuple2.of("a", 1), Tuple2.of("b", 2), Tuple2.of("a", 3)
136
);
137
138
// Key by field position
139
KeyedStream<Tuple2<String, Integer>, Tuple> keyedByPosition = tuples.keyBy(0);
140
141
// Key by lambda function
142
KeyedStream<Tuple2<String, Integer>, String> keyedByFunction =
143
tuples.keyBy(value -> value.f0);
144
145
// For POJO types
146
DataStream<Person> people = env.fromElements(new Person("John", 25), new Person("Jane", 30));
147
KeyedStream<Person, String> keyedByName = people.keyBy(person -> person.getName());
148
149
// Partitioning strategies
150
DataStream<String> shuffled = text.shuffle();
151
DataStream<String> rebalanced = text.rebalance();
152
DataStream<String> broadcasted = text.broadcast();
153
```
154
155
### Stream Composition
156
157
Combine multiple streams into a single stream or create connected streams for joint processing.
158
159
```java { .api }
160
/**
161
* Union with other DataStreams of the same type
162
* @param streams - streams to union with
163
* @return unified DataStream
164
*/
165
DataStream<T> union(DataStream<T>... streams);
166
167
/**
168
* Connect with another DataStream for joint processing
169
* @param dataStream - stream to connect with
170
* @return ConnectedStreams for joint processing
171
*/
172
<R> ConnectedStreams<T, R> connect(DataStream<R> dataStream);
173
```
174
175
**Usage Examples:**
176
177
```java
178
DataStream<String> stream1 = env.fromElements("a", "b");
179
DataStream<String> stream2 = env.fromElements("c", "d");
180
DataStream<String> stream3 = env.fromElements("e", "f");
181
182
// Union streams of the same type
183
DataStream<String> unionedStream = stream1.union(stream2, stream3);
184
185
// Connect streams of different types
186
DataStream<Integer> numbers = env.fromElements(1, 2, 3);
187
ConnectedStreams<String, Integer> connected = stream1.connect(numbers);
188
189
// Process connected streams
190
DataStream<String> result = connected.map(new CoMapFunction<String, Integer, String>() {
191
@Override
192
public String map1(String value) {
193
return "String: " + value;
194
}
195
196
@Override
197
public String map2(Integer value) {
198
return "Number: " + value;
199
}
200
});
201
```
202
203
### Rich Transformations
204
205
Use rich functions that provide access to runtime context and lifecycle methods.
206
207
```java { .api }
208
/**
209
* Apply a RichMapFunction with access to runtime context
210
* @param mapper - the rich map function
211
* @return transformed DataStream
212
*/
213
<R> DataStream<R> map(RichMapFunction<T, R> mapper);
214
215
/**
216
* Apply a RichFilterFunction with access to runtime context
217
* @param filter - the rich filter function
218
* @return filtered DataStream
219
*/
220
DataStream<T> filter(RichFilterFunction<T> filter);
221
222
/**
223
* Apply a RichFlatMapFunction with access to runtime context
224
* @param flatMapper - the rich flatmap function
225
* @return transformed DataStream
226
*/
227
<R> DataStream<R> flatMap(RichFlatMapFunction<T, R> flatMapper);
228
```
229
230
**Usage Examples:**
231
232
```java
233
// Rich function with initialization
234
DataStream<String> enriched = text.map(new RichMapFunction<String, String>() {
235
private String prefix;
236
237
@Override
238
public void open(Configuration parameters) {
239
prefix = getRuntimeContext().getExecutionConfig()
240
.getGlobalJobParameters().get("prefix", "default");
241
}
242
243
@Override
244
public String map(String value) {
245
return prefix + ": " + value;
246
}
247
});
248
```
249
250
### Process Functions
251
252
Use process functions for complex processing logic with access to timers and state.
253
254
```java { .api }
255
/**
256
* Apply a ProcessFunction for complex stream processing
257
* @param processFunction - the process function
258
* @return processed DataStream
259
*/
260
<R> DataStream<R> process(ProcessFunction<T, R> processFunction);
261
```
262
263
**Usage Examples:**
264
265
```java
266
DataStream<String> processed = text.process(new ProcessFunction<String, String>() {
267
@Override
268
public void processElement(String value, Context ctx, Collector<String> out) {
269
// Custom processing logic
270
if (value.length() > 0) {
271
out.collect("Processed: " + value);
272
273
// Set timer for 60 seconds from now
274
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 60000);
275
}
276
}
277
278
@Override
279
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
280
out.collect("Timer fired at: " + timestamp);
281
}
282
});
283
```
284
285
### Stream Splitting (Deprecated)
286
287
Note: Stream splitting using split() and select() is deprecated in newer versions. Use side outputs instead.
288
289
```java { .api }
290
/**
291
* Split the stream based on an OutputSelector (DEPRECATED)
292
* @param outputSelector - selector to determine output streams
293
* @return SplitStream for selecting split streams
294
*/
295
@Deprecated
296
SplitStream<T> split(OutputSelector<T> outputSelector);
297
```
298
299
### Side Outputs
300
301
Use side outputs to emit data to multiple output streams from a single operator.
302
303
```java { .api }
304
// Side outputs are used within ProcessFunction
305
public void processElement(T element, Context ctx, Collector<R> out) {
306
// Emit to main output
307
out.collect(mainResult);
308
309
// Emit to side output
310
ctx.output(sideOutputTag, sideResult);
311
}
312
313
// Retrieve side output from SingleOutputStreamOperator
314
DataStream<X> getSideOutput(OutputTag<X> sideOutputTag);
315
```
316
317
**Usage Examples:**
318
319
```java
320
// Define side output tag
321
final OutputTag<String> lateDataTag = new OutputTag<String>("late-data"){};
322
323
// Process function with side output
324
SingleOutputStreamOperator<String> mainStream = input.process(
325
new ProcessFunction<String, String>() {
326
@Override
327
public void processElement(String value, Context ctx, Collector<String> out) {
328
if (isLate(value)) {
329
// Emit to side output
330
ctx.output(lateDataTag, value);
331
} else {
332
// Emit to main output
333
out.collect(value);
334
}
335
}
336
}
337
);
338
339
// Get side output stream
340
DataStream<String> lateData = mainStream.getSideOutput(lateDataTag);
341
```
342
343
### Iteration
344
345
Create iterative streaming programs for machine learning and graph processing.
346
347
```java { .api }
348
/**
349
* Create an iterative stream
350
* @return IterativeStream for iteration processing
351
*/
352
IterativeStream<T> iterate();
353
354
/**
355
* Create an iterative stream with timeout
356
* @param maxWaitTimeMillis - maximum wait time for iteration
357
* @return IterativeStream for iteration processing
358
*/
359
IterativeStream<T> iterate(long maxWaitTimeMillis);
360
```
361
362
**Usage Examples:**
363
364
```java
365
DataStream<Long> someIntegers = env.generateSequence(0, 1000);
366
367
IterativeStream<Long> iteration = someIntegers.iterate();
368
369
DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
370
@Override
371
public Long map(Long value) throws Exception {
372
return value - 1;
373
}
374
});
375
376
DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
377
@Override
378
public boolean filter(Long value) throws Exception {
379
return value > 0;
380
}
381
});
382
383
iteration.closeWith(stillGreaterThanZero);
384
385
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
386
@Override
387
public boolean filter(Long value) throws Exception {
388
return value <= 0;
389
}
390
});
391
```
392
393
## Types
394
395
### Transformation Function Interfaces
396
397
```java { .api }
398
// Basic transformation functions
399
interface MapFunction<T, O> extends Function {
400
O map(T value) throws Exception;
401
}
402
403
interface FilterFunction<T> extends Function {
404
boolean filter(T value) throws Exception;
405
}
406
407
interface FlatMapFunction<T, O> extends Function {
408
void flatMap(T value, Collector<O> out) throws Exception;
409
}
410
411
// Rich transformation functions
412
abstract class RichMapFunction<T, O> extends AbstractRichFunction implements MapFunction<T, O> {
413
// Provides access to RuntimeContext and lifecycle methods
414
}
415
416
abstract class RichFilterFunction<T> extends AbstractRichFunction implements FilterFunction<T> {
417
// Provides access to RuntimeContext and lifecycle methods
418
}
419
420
abstract class RichFlatMapFunction<T, O> extends AbstractRichFunction implements FlatMapFunction<T, O> {
421
// Provides access to RuntimeContext and lifecycle methods
422
}
423
424
// Key selector for partitioning
425
interface KeySelector<IN, KEY> extends Function {
426
KEY getKey(IN value) throws Exception;
427
}
428
429
// Output selector for splitting (deprecated)
430
@Deprecated
431
interface OutputSelector<OUT> extends Function {
432
Iterable<String> select(OUT value);
433
}
434
```
435
436
### Stream Types
437
438
```java { .api }
439
// Main stream type
440
class DataStream<T> {
441
// All transformation methods as documented above
442
}
443
444
// Result of transformations
445
class SingleOutputStreamOperator<T> extends DataStream<T> {
446
// Additional operator configuration methods
447
SingleOutputStreamOperator<T> name(String name);
448
SingleOutputStreamOperator<T> uid(String uid);
449
SingleOutputStreamOperator<T> setParallelism(int parallelism);
450
<X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag);
451
}
452
453
// Keyed stream for stateful operations
454
class KeyedStream<T, KEY> {
455
// Stateful operations (documented in keyed-streams-state.md)
456
}
457
458
// Connected streams for joint processing
459
class ConnectedStreams<T1, T2> {
460
// Joint processing operations (documented in connected-streams.md)
461
}
462
463
// Split stream (deprecated)
464
@Deprecated
465
class SplitStream<T> extends DataStream<T> {
466
DataStream<T> select(String... outputNames);
467
}
468
469
// Iterative stream
470
class IterativeStream<T> extends DataStream<T> {
471
DataStream<T> closeWith(DataStream<T> feedbackStream);
472
}
473
```
474
475
### Utility Types
476
477
```java { .api }
478
// Collector for emitting results
479
interface Collector<T> {
480
void collect(T record);
481
void close();
482
}
483
484
// Output tag for side outputs
485
class OutputTag<T> {
486
public OutputTag(String id) {}
487
public OutputTag(String id, TypeInformation<T> typeInfo) {}
488
}
489
490
// Runtime context for rich functions
491
interface RuntimeContext {
492
String getTaskName();
493
int getNumberOfParallelSubtasks();
494
int getIndexOfThisSubtask();
495
ExecutionConfig getExecutionConfig();
496
// State access methods
497
}
498
```