0
# DataSet Operations
1
2
DataSet is the primary data abstraction in Flink's batch processing API, representing a distributed collection of elements of the same type. It provides comprehensive transformation operations for data processing pipelines.
3
4
## Capabilities
5
6
### Core DataSet Class
7
8
The abstract base class representing a distributed dataset with type information.
9
10
```java { .api }
11
/**
12
* Abstract class representing a distributed dataset of elements of type T
13
* @param <T> the type of elements in the dataset
14
*/
15
public abstract class DataSet<T> {
16
// Core transformation and output methods
17
}
18
```
19
20
### Map Transformations
21
22
Apply a function to each element, producing a new DataSet with potentially different type.
23
24
```java { .api }
25
/**
26
* Apply a map function to each element
27
* @param mapper the map function to apply
28
* @return MapOperator for further configuration
29
*/
30
public <R> MapOperator<T, R> map(MapFunction<T, R> mapper);
31
32
/**
33
* Apply a map function to each partition
34
* @param mapper the map function to apply to each partition
35
* @return MapPartitionOperator for further configuration
36
*/
37
public <R> MapPartitionOperator<T, R> mapPartition(MapPartitionFunction<T, R> mapper);
38
```
39
40
**Usage Examples:**
41
42
```java
43
// Simple map transformation
44
DataSet<String> words = env.fromElements("hello", "world");
45
DataSet<Integer> lengths = words.map(new MapFunction<String, Integer>() {
46
@Override
47
public Integer map(String value) {
48
return value.length();
49
}
50
});
51
52
// Using lambda expression (Java 8+)
53
DataSet<String> uppercase = words.map(String::toUpperCase);
54
55
// Map partition for batch processing
56
DataSet<String> processed = words.mapPartition(new MapPartitionFunction<String, String>() {
57
@Override
58
public void mapPartition(Iterable<String> values, Collector<String> out) {
59
for (String value : values) {
60
out.collect(value.trim().toLowerCase());
61
}
62
}
63
});
64
```
65
66
### FlatMap Transformations
67
68
Apply a function that can produce zero, one, or multiple output elements for each input element.
69
70
```java { .api }
71
/**
72
* Apply a flatMap function to each element
73
* @param flatter the flatMap function to apply
74
* @return FlatMapOperator for further configuration
75
*/
76
public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatter);
77
```
78
79
**Usage Examples:**
80
81
```java
82
// Split sentences into words
83
DataSet<String> sentences = env.fromElements("hello world", "flink java");
84
DataSet<String> words = sentences.flatMap(new FlatMapFunction<String, String>() {
85
@Override
86
public void flatMap(String sentence, Collector<String> out) {
87
for (String word : sentence.split(" ")) {
88
out.collect(word);
89
}
90
}
91
});
92
```
93
94
### Filter Transformations
95
96
Filter elements based on a predicate function.
97
98
```java { .api }
99
/**
100
* Filter elements based on a predicate
101
* @param filter the filter predicate
102
* @return FilterOperator for further configuration
103
*/
104
public FilterOperator<T> filter(FilterFunction<T> filter);
105
```
106
107
**Usage Examples:**
108
109
```java
110
// Filter words longer than 4 characters
111
DataSet<String> words = env.fromElements("hello", "hi", "world", "a");
112
DataSet<String> longWords = words.filter(new FilterFunction<String>() {
113
@Override
114
public boolean filter(String value) {
115
return value.length() > 4;
116
}
117
});
118
119
// Using lambda expression
120
DataSet<String> filtered = words.filter(word -> word.startsWith("h"));
121
```
122
123
### Reduce Transformations
124
125
Combine elements using an associative and commutative function.
126
127
```java { .api }
128
/**
129
* Reduce the DataSet using a reduce function
130
* @param reducer the reduce function
131
* @return ReduceOperator for further configuration
132
*/
133
public ReduceOperator<T> reduce(ReduceFunction<T> reducer);
134
135
/**
136
* Apply a reduce function to grouped elements
137
* @param reducer the group reduce function
138
* @return GroupReduceOperator for further configuration
139
*/
140
public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer);
141
```
142
143
### Distinct Operations
144
145
Remove duplicate elements from the DataSet.
146
147
```java { .api }
148
/**
149
* Remove duplicate elements
150
* @return DistinctOperator for further configuration
151
*/
152
public DistinctOperator<T> distinct();
153
154
/**
155
* Remove duplicates based on key fields
156
* @param fields the field positions for duplicate detection
157
* @return DistinctOperator for further configuration
158
*/
159
public DistinctOperator<T> distinct(int... fields);
160
161
/**
162
* Remove duplicates based on key selector
163
* @param keyExtractor function to extract the key for comparison
164
* @return DistinctOperator for further configuration
165
*/
166
public <K> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor);
167
```
168
169
### Set Operations
170
171
Combine multiple DataSets using set operations.
172
173
```java { .api }
174
/**
175
* Union with another DataSet
176
* @param other the other DataSet to union with
177
* @return UnionOperator containing elements from both DataSets
178
*/
179
public UnionOperator<T> union(DataSet<T> other);
180
```
181
182
### Grouping Operations
183
184
Group elements by key fields or key selector functions.
185
186
```java { .api }
187
/**
188
* Group by field positions (for Tuple types)
189
* @param fields the field positions to group by
190
* @return UnsortedGrouping for further operations
191
*/
192
public UnsortedGrouping<T> groupBy(int... fields);
193
194
/**
195
* Group by field names (for POJO types)
196
* @param fields the field names to group by
197
* @return UnsortedGrouping for further operations
198
*/
199
public UnsortedGrouping<T> groupBy(String... fields);
200
201
/**
202
* Group by key selector function
203
* @param keyExtractor function to extract the grouping key
204
* @return UnsortedGrouping for further operations
205
*/
206
public <K> UnsortedGrouping<T> groupBy(KeySelector<T, K> keyExtractor);
207
```
208
209
### Partitioning Operations
210
211
Control how data is distributed across parallel instances.
212
213
```java { .api }
214
/**
215
* Partition by hash of specified fields
216
* @param fields the fields to use for hash partitioning
217
* @return PartitionOperator for further configuration
218
*/
219
public PartitionOperator<T> partitionByHash(int... fields);
220
221
/**
222
* Partition by range of specified fields
223
* @param fields the fields to use for range partitioning
224
* @return PartitionOperator for further configuration
225
*/
226
public PartitionOperator<T> partitionByRange(int... fields);
227
228
/**
229
* Sort partition by specified field and order
230
* @param field the field to sort by
231
* @param order the sort order (ASCENDING or DESCENDING)
232
* @return SortPartitionOperator for further configuration
233
*/
234
public SortPartitionOperator<T> sortPartition(int field, Order order);
235
```
236
237
### Projection Operations
238
239
Select specific fields from tuple or POJO types.
240
241
```java { .api }
242
/**
243
* Project specified fields (for Tuple types)
244
* @param fieldIndexes the field indexes to project
245
* @return ProjectOperator with projected fields
246
*/
247
public ProjectOperator<T, ?> project(int... fieldIndexes);
248
```
249
250
### Data Collection and Output
251
252
Collect data back to the driver program or output to external systems.
253
254
```java { .api }
255
/**
256
* Collect all elements to the driver program
257
* @return List containing all elements
258
* @throws Exception if collection fails
259
*/
260
public List<T> collect() throws Exception;
261
262
/**
263
* Get the first n elements
264
* @param n number of elements to retrieve
265
* @return DataSet containing first n elements
266
*/
267
public DataSet<T> first(int n);
268
269
/**
270
* Count the number of elements
271
* @return the number of elements in the DataSet
272
* @throws Exception if counting fails
273
*/
274
public long count() throws Exception;
275
276
/**
277
* Sample elements from the DataSet
278
* @param withReplacement whether to sample with replacement
279
* @param fraction the fraction of elements to sample
280
* @return DataSet with sampled elements
281
*/
282
public DataSet<T> sample(boolean withReplacement, double fraction);
283
```
284
285
### Output Operations
286
287
Write DataSet content to external systems or print for debugging.
288
289
```java { .api }
290
/**
291
* Write as text file
292
* @param filePath path to write the file
293
* @return DataSink for execution
294
*/
295
public DataSink<T> writeAsText(String filePath);
296
297
/**
298
* Write as text file with write mode
299
* @param filePath path to write the file
300
* @param writeMode mode for writing (OVERWRITE or NO_OVERWRITE)
301
* @return DataSink for execution
302
*/
303
public DataSink<T> writeAsText(String filePath, WriteMode writeMode);
304
305
/**
306
* Write as CSV file
307
* @param filePath path to write the CSV file
308
* @return DataSink for execution
309
*/
310
public DataSink<T> writeAsCsv(String filePath);
311
312
/**
313
* Print to standard output
314
* @return DataSink for execution
315
*/
316
public DataSink<T> print();
317
318
/**
319
* Print to standard error
320
* @return DataSink for execution
321
*/
322
public DataSink<T> printToErr();
323
```
324
325
### Type and Environment Information
326
327
Access type information and execution environment.
328
329
```java { .api }
330
/**
331
* Get the type information for this DataSet
332
* @return TypeInformation describing the element type
333
*/
334
public TypeInformation<T> getType();
335
336
/**
337
* Get the execution environment
338
* @return ExecutionEnvironment that created this DataSet
339
*/
340
public ExecutionEnvironment getExecutionEnvironment();
341
```
342
343
## Types
344
345
```java { .api }
346
import org.apache.flink.api.java.DataSet;
347
import org.apache.flink.api.java.operators.*;
348
import org.apache.flink.api.common.functions.*;
349
import org.apache.flink.api.common.typeinfo.TypeInformation;
350
import org.apache.flink.util.Collector;
351
import org.apache.flink.core.fs.FileSystem.WriteMode;
352
```