0
# Data Input and Output
1
2
Comprehensive I/O capabilities for reading from and writing to various data formats and storage systems. Flink provides built-in support for common formats like text, CSV, and custom input/output formats.
3
4
## Capabilities
5
6
### Data Sources
7
8
Methods for creating DataSets from various data sources.
9
10
```java { .api }
11
/**
12
* Create DataSet from Java collection
13
* @param data the collection to create DataSet from
14
* @return DataSet containing collection elements
15
*/
16
public <T> DataSet<T> fromCollection(Collection<T> data);
17
18
/**
19
* Create DataSet from individual elements
20
* @param data the elements to include in the DataSet
21
* @return DataSet containing the specified elements
22
*/
23
@SafeVarargs
24
public final <T> DataSet<T> fromElements(T... data);
25
26
/**
27
* Read text file line by line
28
* @param filePath path to the text file
29
* @return DataSet where each element is a line from the file
30
*/
31
public DataSet<String> readTextFile(String filePath);
32
33
/**
34
* Read text file line by line with specific character encoding
35
* @param filePath path to the text file
36
* @param charsetName the charset name for decoding the file
37
* @return DataSet where each element is a line from the file
38
*/
39
public DataSet<String> readTextFile(String filePath, String charsetName);
40
41
/**
42
* Read text file as StringValue objects
43
* @param filePath path to the text file
44
* @return DataSet where each element is a StringValue from the file
45
*/
46
public DataSource<StringValue> readTextFileWithValue(String filePath);
47
48
/**
49
* Read text file as StringValue objects with charset and error handling
50
* @param filePath path to the text file
51
* @param charsetName the charset name for decoding the file
52
* @param skipInvalidLines whether to skip lines that cannot be decoded
53
* @return DataSet where each element is a StringValue from the file
54
*/
55
public DataSource<StringValue> readTextFileWithValue(String filePath, String charsetName, boolean skipInvalidLines);
56
57
/**
58
* Read file containing primitive values
59
* @param filePath path to the file
60
* @param typeClass the class of the primitive type
61
* @return DataSet with elements of the primitive type
62
*/
63
public <X> DataSource<X> readFileOfPrimitives(String filePath, Class<X> typeClass);
64
65
/**
66
* Read file containing primitive values with custom delimiter
67
* @param filePath path to the file
68
* @param delimiter the delimiter separating values
69
* @param typeClass the class of the primitive type
70
* @return DataSet with elements of the primitive type
71
*/
72
public <X> DataSource<X> readFileOfPrimitives(String filePath, String delimiter, Class<X> typeClass);
73
74
/**
75
* Read file using custom input format
76
* @param inputFormat the input format to use for reading
77
* @param filePath path to the file
78
* @return DataSet with elements read by the input format
79
*/
80
public <T> DataSet<T> readFile(FileInputFormat<T> inputFormat, String filePath);
81
82
/**
83
* Generate sequence of numbers
84
* @param from starting number (inclusive)
85
* @param to ending number (inclusive)
86
* @return DataSet containing the number sequence
87
*/
88
public DataSet<Long> generateSequence(long from, long to);
89
```
90
91
**Usage Examples:**
92
93
```java
94
// From collection
95
List<String> words = Arrays.asList("hello", "world", "flink");
96
DataSet<String> wordsDataSet = env.fromCollection(words);
97
98
// From elements
99
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
100
101
// Read text file
102
DataSet<String> textData = env.readTextFile("/path/to/input.txt");
103
104
// Generate sequence
105
DataSet<Long> sequence = env.generateSequence(1, 1000);
106
```
107
108
### CSV Reading
109
110
Specialized CSV reader with extensive configuration options.
111
112
```java { .api }
113
/**
114
* Create CSV reader for structured data reading
115
* @param filePath path to the CSV file
116
* @return CsvReader for configuration and DataSet creation
117
*/
118
public CsvReader readCsvFile(String filePath);
119
```
120
121
### CsvReader Configuration
122
123
The CsvReader class provides fluent API for CSV configuration.
124
125
```java { .api }
126
/**
127
* CSV reader with configuration options
128
*/
129
public class CsvReader {
130
/**
131
* Set line delimiter (default: newline)
132
* @param delimiter the line delimiter
133
* @return CsvReader for method chaining
134
*/
135
public CsvReader lineDelimiter(String delimiter);
136
137
/**
138
* Set field delimiter (default: comma)
139
* @param delimiter the field delimiter
140
* @return CsvReader for method chaining
141
*/
142
public CsvReader fieldDelimiter(String delimiter);
143
144
/**
145
* Include only specific fields by position
146
* @param fields boolean array indicating which fields to include
147
* @return CsvReader for method chaining
148
*/
149
public CsvReader includeFields(boolean... fields);
150
151
/**
152
* Ignore the first line (header row)
153
* @return CsvReader for method chaining
154
*/
155
public CsvReader ignoreFirstLine();
156
157
/**
158
* Ignore lines that cannot be parsed
159
* @return CsvReader for method chaining
160
*/
161
public CsvReader ignoreInvalidLines();
162
163
/**
164
* Parse CSV into POJO objects
165
* @param pojoType the POJO class type
166
* @param pojoFields the field names in order
167
* @return DataSource of POJO objects
168
*/
169
public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields);
170
171
/**
172
* Parse CSV with specified field types
173
* @param fieldTypes the types for each field
174
* @return DataSource with typed tuples
175
*/
176
public DataSource<?> types(Class<?>... fieldTypes);
177
}
178
```
179
180
**Usage Examples:**
181
182
```java
183
// Basic CSV reading
184
DataSet<Tuple3<String, Integer, Double>> csvData = env
185
.readCsvFile("/path/to/data.csv")
186
.fieldDelimiter(",")
187
.lineDelimiter("\n")
188
.ignoreFirstLine()
189
.types(String.class, Integer.class, Double.class);
190
191
// CSV to POJO
192
public static class Person {
193
public String name;
194
public Integer age;
195
public String city;
196
}
197
198
DataSet<Person> people = env
199
.readCsvFile("/path/to/people.csv")
200
.ignoreFirstLine()
201
.pojoType(Person.class, "name", "age", "city");
202
203
// Selective field reading
204
DataSet<Tuple2<String, Integer>> nameAge = env
205
.readCsvFile("/path/to/people.csv")
206
.includeFields(true, true, false) // name, age, skip city
207
.types(String.class, Integer.class);
208
```
209
210
### Input Formats
211
212
Built-in input formats for reading various data types.
213
214
```java { .api }
215
/**
216
* Input format for reading text files line by line
217
*/
218
public class TextInputFormat extends FileInputFormat<String> {
219
// Reads text files, each line becomes a String element
220
}
221
222
/**
223
* Input format for reading text files as StringValue objects
224
*/
225
public class TextValueInputFormat extends FileInputFormat<StringValue> {
226
// Reads text files as StringValue for better memory efficiency
227
}
228
229
/**
230
* Input format for reading from Java collections
231
*/
232
public class CollectionInputFormat<T> implements InputFormat<T, GenericInputSplit> {
233
/**
234
* Create input format from collection
235
* @param dataSet the collection to read from
236
* @param serializer serializer for the data type
237
*/
238
public CollectionInputFormat(Collection<T> dataSet, TypeSerializer<T> serializer);
239
}
240
241
/**
242
* Input format for reading from iterators
243
*/
244
public class IteratorInputFormat<T> implements InputFormat<T, GenericInputSplit> {
245
/**
246
* Create input format from iterator
247
* @param iterator the iterator to read from
248
* @param serializer serializer for the data type
249
*/
250
public IteratorInputFormat(Iterator<T> iterator, TypeSerializer<T> serializer);
251
}
252
253
/**
254
* Input format for reading primitive types
255
*/
256
public class PrimitiveInputFormat<T> extends FileInputFormat<T> {
257
/**
258
* Create input format for primitive types
259
* @param filePath path to the file
260
* @param delimiter delimiter between values
261
* @param typeClass the primitive type class
262
*/
263
public PrimitiveInputFormat(Path filePath, String delimiter, Class<T> typeClass);
264
}
265
266
/**
267
* CSV input format for Row objects
268
*/
269
public class RowCsvInputFormat extends CsvInputFormat<Row> {
270
// Specialized CSV format for Row-based data
271
}
272
```
273
274
### Data Sinks and Output Operations
275
276
Methods for writing DataSet content to external systems.
277
278
```java { .api }
279
/**
280
* Write DataSet as text file
281
* @param filePath path where to write the file
282
* @return DataSink for execution
283
*/
284
public DataSink<T> writeAsText(String filePath);
285
286
/**
287
* Write DataSet as text file with write mode
288
* @param filePath path where to write the file
289
* @param writeMode OVERWRITE or NO_OVERWRITE
290
* @return DataSink for execution
291
*/
292
public DataSink<T> writeAsText(String filePath, WriteMode writeMode);
293
294
/**
295
* Write DataSet as CSV file
296
* @param filePath path where to write the CSV file
297
* @return DataSink for execution
298
*/
299
public DataSink<T> writeAsCsv(String filePath);
300
301
/**
302
* Write with custom text formatter
303
* @param filePath path where to write the file
304
* @param formatter custom text formatter
305
* @return DataSink for execution
306
*/
307
public DataSink<T> writeAsFormattedText(String filePath, TextFormatter<T> formatter);
308
309
/**
310
* Write using custom output format
311
* @param outputFormat the output format to use
312
* @param filePath path where to write
313
* @return DataSink for execution
314
*/
315
public DataSink<T> write(OutputFormat<T> outputFormat, String filePath);
316
317
/**
318
* Output using custom output format (no file path)
319
* @param outputFormat the output format to use
320
* @return DataSink for execution
321
*/
322
public DataSink<T> output(OutputFormat<T> outputFormat);
323
```
324
325
### Debug Output Operations
326
327
Operations for debugging and development.
328
329
```java { .api }
330
/**
331
* Print DataSet content to standard output (executes immediately)
332
* @throws Exception if printing fails
333
*/
334
public void print() throws Exception;
335
336
/**
337
* Print DataSet content to standard error (executes immediately)
338
* @throws Exception if printing fails
339
*/
340
public void printToErr() throws Exception;
341
342
/**
343
* Print DataSet content with identifier to standard output
344
* @param sinkIdentifier identifier for the print sink
345
* @return DataSink for execution
346
*/
347
public DataSink<T> print(String sinkIdentifier);
348
349
/**
350
* Print DataSet content with identifier to standard error
351
* @param sinkIdentifier identifier for the print sink
352
* @return DataSink for execution
353
*/
354
public DataSink<T> printToErr(String sinkIdentifier);
355
356
/**
357
* Print DataSet content on task manager with prefix (for debugging)
358
* @param prefix prefix for the printed output
359
* @return DataSink for execution
360
*/
361
public DataSink<T> printOnTaskManager(String prefix);
362
```
363
364
**Usage Examples:**
365
366
```java
367
// Write as text
368
DataSet<String> result = processedData;
369
result.writeAsText("/path/to/output.txt", WriteMode.OVERWRITE);
370
371
// Write as CSV
372
DataSet<Tuple3<String, Integer, Double>> data = getData();
373
data.writeAsCsv("/path/to/output.csv");
374
375
// Print for debugging
376
result.print();
377
378
// Custom formatter
379
result.writeAsFormattedText("/path/to/formatted.txt", new TextFormatter<String>() {
380
@Override
381
public String format(String record) {
382
return "Record: " + record;
383
}
384
});
385
```
386
387
### Output Formats
388
389
Built-in output formats for writing data in various formats.
390
391
```java { .api }
392
/**
393
* Output format for writing text files
394
*/
395
public class TextOutputFormat<T> extends FileOutputFormat<T> {
396
/**
397
* Create text output format
398
* @param outputPath path to write the output
399
*/
400
public TextOutputFormat(Path outputPath);
401
}
402
403
/**
404
* Output format for writing CSV files
405
*/
406
public class CsvOutputFormat<T> extends FileOutputFormat<T> {
407
/**
408
* Create CSV output format
409
* @param outputPath path to write the CSV
410
*/
411
public CsvOutputFormat(Path outputPath);
412
413
/**
414
* Set field delimiter
415
* @param fieldDelimiter delimiter between fields
416
*/
417
public void setFieldDelimiter(String fieldDelimiter);
418
419
/**
420
* Set record delimiter
421
* @param recordDelimiter delimiter between records
422
*/
423
public void setRecordDelimiter(String recordDelimiter);
424
}
425
426
/**
427
* Output format for printing to stdout/stderr
428
*/
429
public class PrintingOutputFormat<T> implements OutputFormat<T> {
430
/**
431
* Create printing output format
432
* @param targetStream target stream (System.out or System.err)
433
* @param sinkIdentifier identifier for the sink
434
*/
435
public PrintingOutputFormat(PrintStream targetStream, String sinkIdentifier);
436
}
437
438
/**
439
* Output format for collecting to local collection
440
*/
441
public class LocalCollectionOutputFormat<T> implements OutputFormat<T> {
442
/**
443
* Create local collection output format
444
* @param out the collection to write to
445
*/
446
public LocalCollectionOutputFormat(List<T> out);
447
}
448
449
/**
450
* Output format that discards all records (for testing)
451
*/
452
public class DiscardingOutputFormat<T> implements OutputFormat<T> {
453
// Discards all records - useful for performance testing
454
}
455
```
456
457
### File System Support
458
459
Write mode options for file operations.
460
461
```java { .api }
462
/**
463
* Write mode for file operations
464
*/
465
public enum WriteMode {
466
/** Overwrite existing files */
467
OVERWRITE,
468
/** Fail if file already exists */
469
NO_OVERWRITE
470
}
471
```
472
473
### Data Properties
474
475
Configure data properties for input splits.
476
477
```java { .api }
478
/**
479
* Properties for data splits
480
*/
481
public class SplitDataProperties<T> {
482
/**
483
* Specify that data is sorted by given fields
484
* @param fields the fields by which data is sorted
485
* @return configured properties
486
*/
487
public SplitDataProperties<T> splitsPartitionedBy(int... fields);
488
489
/**
490
* Specify grouping properties
491
* @param fields the fields by which data is grouped
492
* @return configured properties
493
*/
494
public SplitDataProperties<T> splitsGroupedBy(int... fields);
495
}
496
```
497
498
## Types
499
500
```java { .api }
501
import org.apache.flink.api.java.io.*;
502
import org.apache.flink.api.java.operators.DataSource;
503
import org.apache.flink.api.java.operators.DataSink;
504
import org.apache.flink.api.common.io.FileInputFormat;
505
import org.apache.flink.api.common.io.FileOutputFormat;
506
import org.apache.flink.api.common.io.InputFormat;
507
import org.apache.flink.api.common.io.OutputFormat;
508
import org.apache.flink.core.fs.FileSystem.WriteMode;
509
import org.apache.flink.types.StringValue;
510
import org.apache.flink.types.Row;
511
import java.util.Collection;
512
import java.util.List;
513
```