0
# DataStream Operations
1
2
DataStream is the core abstraction in Flink representing a stream of data elements. It provides a rich set of transformation operations to process and manipulate streaming data.
3
4
## DataStream<T>
5
6
The fundamental stream abstraction providing transformation operations.
7
8
```java { .api }
9
public class DataStream<T> {
10
// Basic transformations
11
public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<T, R> mapper);
12
public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<T, R> flatMapper);
13
public SingleOutputStreamOperator<T, ?> filter(FilterFunction<T> filter);
14
public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer);
15
public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder);
16
public <R extends Tuple> SingleOutputStreamOperator<R, ?> project(int... fieldIndexes);
17
18
// Aggregation operations (available on non-keyed streams for single parallelism)
19
public SingleOutputStreamOperator<T, ?> sum(int positionToSum);
20
public SingleOutputStreamOperator<T, ?> sum(String field);
21
public SingleOutputStreamOperator<T, ?> min(int positionToMin);
22
public SingleOutputStreamOperator<T, ?> min(String field);
23
public SingleOutputStreamOperator<T, ?> max(int positionToMax);
24
public SingleOutputStreamOperator<T, ?> max(String field);
25
public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy);
26
public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy);
27
public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first);
28
public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first);
29
public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy);
30
public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy);
31
public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first);
32
public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first);
33
public SingleOutputStreamOperator<Long, ?> count();
34
35
// Keyed operations
36
public GroupedDataStream<T> groupBy(KeySelector<T, ?> key);
37
public GroupedDataStream<T> groupBy(int... fields);
38
public GroupedDataStream<T> groupBy(String... fields);
39
40
// Stream composition and connectivity
41
public DataStream<T> union(DataStream<T>... streams);
42
public <R> ConnectedDataStream<T, R> connect(DataStream<R> dataStream);
43
public SplitDataStream<T> split(OutputSelector<T> outputSelector);
44
45
// Partitioning strategies
46
public DataStream<T> shuffle();
47
public DataStream<T> forward();
48
public DataStream<T> rebalance();
49
public DataStream<T> global();
50
public DataStream<T> broadcast();
51
public DataStream<T> partitionByHash(int... fields);
52
public DataStream<T> partitionByHash(String... fields);
53
public DataStream<T> partitionByHash(KeySelector<T, ?> keySelector);
54
55
// Temporal operations (cross and join)
56
public <IN2> StreamCrossOperator<T, IN2> cross(DataStream<IN2> dataStreamToCross);
57
public <IN2> StreamJoinOperator<T, IN2> join(DataStream<IN2> dataStreamToJoin);
58
59
// Iteration support
60
public IterativeDataStream<T> iterate();
61
public IterativeDataStream<T> iterate(long maxWaitTimeMillis);
62
63
// Windowing operations
64
public WindowedDataStream<T> window(WindowingHelper policyHelper);
65
public WindowedDataStream<T> window(TriggerPolicy<T> trigger, EvictionPolicy<T> eviction);
66
public WindowedDataStream<T> every(WindowingHelper policyHelper);
67
68
// Output operations - Console
69
public DataStreamSink<T> print();
70
public DataStreamSink<T> printToErr();
71
72
// Output operations - File (Text)
73
public DataStreamSink<T> writeAsText(String path);
74
public DataStreamSink<T> writeAsText(String path, long millis);
75
public DataStreamSink<T> writeAsText(String path, WriteMode writeMode);
76
public DataStreamSink<T> writeAsText(String path, WriteMode writeMode, long millis);
77
78
// Output operations - File (CSV)
79
public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path);
80
public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, long millis);
81
public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode);
82
public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode, long millis);
83
84
// Output operations - Network and Generic
85
public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T, byte[]> schema);
86
public DataStreamSink<T> write(OutputFormat<T> format, long millis);
87
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction);
88
89
// Advanced operations
90
public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator);
91
92
// Basic properties and configuration
93
public Integer getId();
94
public int getParallelism();
95
public TypeInformation<T> getType();
96
public StreamExecutionEnvironment getExecutionEnvironment();
97
public DataStream<T> copy();
98
}
99
```
100
101
## DataStreamSource<T>
102
103
A DataStream created from a source function.
104
105
```java { .api }
106
public class DataStreamSource<T> extends DataStream<T> {
107
public DataStreamSource<T> setParallelism(int parallelism);
108
public DataStreamSource<T> name(String name);
109
}
110
```
111
112
## GroupedDataStream<T>
113
114
A DataStream that has been partitioned by key for keyed operations.
115
116
```java { .api }
117
public class GroupedDataStream<T> {
118
// Aggregations
119
public DataStream<T> reduce(ReduceFunction<T> reducer);
120
public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> folder);
121
122
// Field-based aggregations
123
public DataStream<T> sum(int positionToSum);
124
public DataStream<T> sum(String field);
125
public DataStream<T> min(int positionToMin);
126
public DataStream<T> min(String field);
127
public DataStream<T> max(int positionToMax);
128
public DataStream<T> max(String field);
129
public DataStream<T> minBy(int positionToMinBy);
130
public DataStream<T> minBy(String field);
131
public DataStream<T> maxBy(int positionToMaxBy);
132
public DataStream<T> maxBy(String field);
133
134
// Windowing
135
public WindowedDataStream<T> window(WindowingHelper<T> helper);
136
public WindowedDataStream<T> every(WindowingHelper<T> helper);
137
138
// Configuration
139
public GroupedDataStream<T> setParallelism(int parallelism);
140
public GroupedDataStream<T> name(String name);
141
}
142
```
143
144
## ConnectedDataStream<T1, T2>
145
146
Two connected streams that can be processed jointly.
147
148
```java { .api }
149
public class ConnectedDataStream<T1, T2> {
150
// Joint transformations
151
public <R> DataStream<R> map(CoMapFunction<T1, T2, R> coMapper);
152
public <R> DataStream<R> flatMap(CoFlatMapFunction<T1, T2, R> coFlatMapper);
153
public <R> DataStream<R> reduce(CoReduceFunction<T1, T2, R> coReducer);
154
155
// Key both streams for keyed operations
156
public ConnectedDataStream<T1, T2> groupBy(KeySelector<T1, ?> keySelector1, KeySelector<T2, ?> keySelector2);
157
public ConnectedDataStream<T1, T2> groupBy(int key1, int key2);
158
public ConnectedDataStream<T1, T2> groupBy(String key1, String key2);
159
160
// Configuration
161
public ConnectedDataStream<T1, T2> setParallelism(int parallelism);
162
public ConnectedDataStream<T1, T2> name(String name);
163
}
164
```
165
166
## DataStreamSink<T>
167
168
Terminal operation that consumes stream data.
169
170
```java { .api }
171
public class DataStreamSink<T> {
172
// Configuration
173
public DataStreamSink<T> setParallelism(int parallelism);
174
public int getParallelism();
175
public DataStreamSink<T> name(String name);
176
public DataStreamSink<T> disableChaining();
177
public DataStreamSink<T> setBufferTimeout(long timeout);
178
public DataStreamSink<T> slotSharingGroup(String slotSharingGroup);
179
}
180
```
181
182
## Usage Examples
183
184
### Basic Transformations
185
186
```java
187
DataStream<String> text = env.fromElements("hello world", "how are you");
188
189
// Map transformation
190
DataStream<String> upper = text.map(String::toUpperCase);
191
192
// FlatMap transformation
193
DataStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
194
@Override
195
public void flatMap(String sentence, Collector<String> out) {
196
for (String word : sentence.split(" ")) {
197
out.collect(word);
198
}
199
}
200
});
201
202
// Filter transformation
203
DataStream<String> filtered = words.filter(word -> word.length() > 3);
204
```
205
206
### Keyed Operations
207
208
```java
209
DataStream<Tuple2<String, Integer>> counts = words
210
.map(word -> new Tuple2<>(word, 1))
211
.groupBy(0) // Group by first field (word)
212
.sum(1); // Sum second field (count)
213
214
// Using KeySelector
215
DataStream<Tuple2<String, Integer>> counts2 = words
216
.map(word -> new Tuple2<>(word, 1))
217
.groupBy(tuple -> tuple.f0) // Group by word
218
.reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));
219
```
220
221
### Stream Union
222
223
```java
224
DataStream<String> stream1 = env.fromElements("a", "b", "c");
225
DataStream<String> stream2 = env.fromElements("d", "e", "f");
226
DataStream<String> stream3 = env.fromElements("g", "h", "i");
227
228
// Union multiple streams
229
DataStream<String> unionStream = stream1.union(stream2, stream3);
230
```
231
232
### Connected Streams
233
234
```java
235
DataStream<String> stream1 = env.fromElements("hello", "world");
236
DataStream<Integer> stream2 = env.fromElements(1, 2, 3);
237
238
ConnectedDataStream<String, Integer> connected = stream1.connect(stream2);
239
240
DataStream<String> result = connected.map(new CoMapFunction<String, Integer, String>() {
241
@Override
242
public String map1(String value) {
243
return "String: " + value;
244
}
245
246
@Override
247
public String map2(Integer value) {
248
return "Integer: " + value;
249
}
250
});
251
```
252
253
### Partitioning
254
255
```java
256
DataStream<Tuple2<String, Integer>> data = env.fromElements(
257
new Tuple2<>("key1", 1), new Tuple2<>("key2", 2));
258
259
// Different partitioning strategies
260
DataStream<Tuple2<String, Integer>> shuffled = data.shuffle();
261
DataStream<Tuple2<String, Integer>> rebalanced = data.rebalance();
262
DataStream<Tuple2<String, Integer>> broadcasted = data.broadcast();
263
264
```
265
266
### Output Operations
267
268
```java
269
DataStream<String> processed = words.map(String::toUpperCase);
270
271
// Print to standard output
272
processed.print();
273
274
// Write to file
275
processed.writeAsText("/path/to/output.txt");
276
277
// Write as CSV
278
DataStream<Tuple2<String, Integer>> tuples = processed
279
.map(word -> new Tuple2<>(word, word.length()));
280
tuples.writeAsCsv("/path/to/output.csv");
281
282
// Custom sink
283
processed.addSink(new SinkFunction<String>() {
284
@Override
285
public void invoke(String value) {
286
System.out.println("Custom sink: " + value);
287
}
288
});
289
```
290
291
### Configuration
292
293
```java
294
DataStream<String> configured = text
295
.map(String::toUpperCase)
296
.name("UpperCase Transformation")
297
.setParallelism(4)
298
.setBufferTimeout(100)
299
.disableChaining();
300
```
301
302
## Types
303
304
```java { .api }
305
// Function interfaces for transformations
306
public interface MapFunction<T, O> extends Function {
307
O map(T value) throws Exception;
308
}
309
310
public interface FlatMapFunction<T, O> extends Function {
311
void flatMap(T value, Collector<O> out) throws Exception;
312
}
313
314
public interface FilterFunction<T> extends Function {
315
boolean filter(T value) throws Exception;
316
}
317
318
public interface ReduceFunction<T> extends Function {
319
T reduce(T value1, T value2) throws Exception;
320
}
321
322
public interface FoldFunction<T, O> extends Function {
323
O fold(O accumulator, T value) throws Exception;
324
}
325
326
public interface KeySelector<IN, KEY> extends Function {
327
KEY getKey(IN value) throws Exception;
328
}
329
330
// CoFunction interfaces for connected streams
331
public interface CoMapFunction<IN1, IN2, OUT> extends Function {
332
OUT map1(IN1 value) throws Exception;
333
OUT map2(IN2 value) throws Exception;
334
}
335
336
public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function {
337
void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
338
void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
339
}
340
341
public interface CoReduceFunction<IN1, IN2, OUT> extends Function {
342
OUT reduce1(IN1 value, OUT accumulator) throws Exception;
343
OUT reduce2(IN2 value, OUT accumulator) throws Exception;
344
}
345
346
// Partitioner interface
347
public abstract class Partitioner<T> implements Serializable {
348
public abstract int partition(T key, int numPartitions);
349
}
350
351
// Collector interface
352
public interface Collector<T> {
353
void collect(T record);
354
void close();
355
}
356
357
// Additional types for advanced operations
358
public class SingleOutputStreamOperator<T, O extends StreamOperator<T>> extends DataStream<T> {
359
// Stream operator with configuration methods
360
public SingleOutputStreamOperator<T, O> name(String name);
361
public SingleOutputStreamOperator<T, O> setParallelism(int parallelism);
362
public SingleOutputStreamOperator<T, O> setBufferTimeout(long timeoutMillis);
363
public SingleOutputStreamOperator<T, O> disableChaining();
364
public SingleOutputStreamOperator<T, O> startNewChain();
365
public SingleOutputStreamOperator<T, O> slotSharingGroup(String slotSharingGroup);
366
}
367
368
public class SplitDataStream<T> {
369
// Split stream that can be selected by name
370
public DataStream<T> select(String... outputNames);
371
}
372
373
public interface OutputSelector<T> extends Serializable {
374
Iterable<String> select(T value);
375
}
376
377
public class IterativeDataStream<T> extends DataStream<T> {
378
// Iterative data stream for feedback loops
379
public DataStream<T> closeWith(DataStream<T> feedbackStream);
380
}
381
382
public class StreamCrossOperator<I1, I2> {
383
// Cross operation between two streams
384
public <OUT> DataStream<OUT> with(CrossFunction<I1, I2, OUT> crossFunction);
385
public StreamCrossOperator<I1, I2> where(KeySelector<I1, ?> keySelector);
386
public StreamCrossOperator<I1, I2> equalTo(KeySelector<I2, ?> keySelector);
387
}
388
389
public class StreamJoinOperator<I1, I2> {
390
// Join operation between two streams
391
public <OUT> DataStream<OUT> with(JoinFunction<I1, I2, OUT> joinFunction);
392
public StreamJoinOperator<I1, I2> where(KeySelector<I1, ?> keySelector);
393
public StreamJoinOperator<I1, I2> equalTo(KeySelector<I2, ?> keySelector);
394
}
395
396
// Cross and Join function interfaces
397
public interface CrossFunction<IN1, IN2, OUT> extends Function {
398
OUT cross(IN1 first, IN2 second) throws Exception;
399
}
400
401
public interface JoinFunction<IN1, IN2, OUT> extends Function {
402
OUT join(IN1 first, IN2 second) throws Exception;
403
}
404
405
// Windowing types referenced in API
406
public abstract class WindowingHelper<T> implements Serializable {
407
// Base class for windowing helpers
408
}
409
410
public interface TriggerPolicy<T> extends Serializable {
411
// Trigger policy for windowing
412
}
413
414
public interface EvictionPolicy<T> extends Serializable {
415
// Eviction policy for windowing
416
}
417
418
// Serialization schema for network output
419
public interface SerializationSchema<T, S> extends Serializable {
420
S serialize(T element);
421
}
422
423
// Write mode enum for file outputs
424
public enum WriteMode {
425
NO_OVERWRITE,
426
OVERWRITE
427
}
428
```