Apache Flink Java API - Core Java APIs for Apache Flink's batch and stream processing framework
—
Comprehensive I/O capabilities for reading from and writing to various data formats and storage systems. Flink provides built-in support for common formats like text, CSV, and custom input/output formats.
Methods for creating DataSets from various data sources.
/**
* Create DataSet from Java collection
* @param data the collection to create DataSet from
* @return DataSet containing collection elements
*/
public <T> DataSet<T> fromCollection(Collection<T> data);
/**
* Create DataSet from individual elements
* @param data the elements to include in the DataSet
* @return DataSet containing the specified elements
*/
@SafeVarargs
public final <T> DataSet<T> fromElements(T... data);
/**
* Read text file line by line
* @param filePath path to the text file
* @return DataSet where each element is a line from the file
*/
public DataSet<String> readTextFile(String filePath);
/**
* Read text file line by line with specific character encoding
* @param filePath path to the text file
* @param charsetName the charset name for decoding the file
* @return DataSet where each element is a line from the file
*/
public DataSet<String> readTextFile(String filePath, String charsetName);
/**
* Read text file as StringValue objects
* @param filePath path to the text file
* @return DataSet where each element is a StringValue from the file
*/
public DataSource<StringValue> readTextFileWithValue(String filePath);
/**
* Read text file as StringValue objects with charset and error handling
* @param filePath path to the text file
* @param charsetName the charset name for decoding the file
* @param skipInvalidLines whether to skip lines that cannot be decoded
* @return DataSet where each element is a StringValue from the file
*/
public DataSource<StringValue> readTextFileWithValue(String filePath, String charsetName, boolean skipInvalidLines);
/**
* Read file containing primitive values
* @param filePath path to the file
* @param typeClass the class of the primitive type
* @return DataSet with elements of the primitive type
*/
public <X> DataSource<X> readFileOfPrimitives(String filePath, Class<X> typeClass);
/**
* Read file containing primitive values with custom delimiter
* @param filePath path to the file
* @param delimiter the delimiter separating values
* @param typeClass the class of the primitive type
* @return DataSet with elements of the primitive type
*/
public <X> DataSource<X> readFileOfPrimitives(String filePath, String delimiter, Class<X> typeClass);
/**
* Read file using custom input format
* @param inputFormat the input format to use for reading
* @param filePath path to the file
* @return DataSet with elements read by the input format
*/
public <T> DataSet<T> readFile(FileInputFormat<T> inputFormat, String filePath);
/**
* Generate sequence of numbers
* @param from starting number (inclusive)
* @param to ending number (inclusive)
* @return DataSet containing the number sequence
*/
public DataSet<Long> generateSequence(long from, long to);Usage Examples:
// From collection
List<String> words = Arrays.asList("hello", "world", "flink");
DataSet<String> wordsDataSet = env.fromCollection(words);
// From elements
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
// Read text file
DataSet<String> textData = env.readTextFile("/path/to/input.txt");
// Generate sequence
DataSet<Long> sequence = env.generateSequence(1, 1000);Specialized CSV reader with extensive configuration options.
/**
* Create CSV reader for structured data reading
* @param filePath path to the CSV file
* @return CsvReader for configuration and DataSet creation
*/
public CsvReader readCsvFile(String filePath);The CsvReader class provides fluent API for CSV configuration.
/**
* CSV reader with configuration options
*/
public class CsvReader {
/**
* Set line delimiter (default: newline)
* @param delimiter the line delimiter
* @return CsvReader for method chaining
*/
public CsvReader lineDelimiter(String delimiter);
/**
* Set field delimiter (default: comma)
* @param delimiter the field delimiter
* @return CsvReader for method chaining
*/
public CsvReader fieldDelimiter(String delimiter);
/**
* Include only specific fields by position
* @param fields boolean array indicating which fields to include
* @return CsvReader for method chaining
*/
public CsvReader includeFields(boolean... fields);
/**
* Ignore the first line (header row)
* @return CsvReader for method chaining
*/
public CsvReader ignoreFirstLine();
/**
* Ignore lines that cannot be parsed
* @return CsvReader for method chaining
*/
public CsvReader ignoreInvalidLines();
/**
* Parse CSV into POJO objects
* @param pojoType the POJO class type
* @param pojoFields the field names in order
* @return DataSource of POJO objects
*/
public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields);
/**
* Parse CSV with specified field types
* @param fieldTypes the types for each field
* @return DataSource with typed tuples
*/
public DataSource<?> types(Class<?>... fieldTypes);
}Usage Examples:
// Basic CSV reading
DataSet<Tuple3<String, Integer, Double>> csvData = env
.readCsvFile("/path/to/data.csv")
.fieldDelimiter(",")
.lineDelimiter("\n")
.ignoreFirstLine()
.types(String.class, Integer.class, Double.class);
// CSV to POJO
public static class Person {
public String name;
public Integer age;
public String city;
}
DataSet<Person> people = env
.readCsvFile("/path/to/people.csv")
.ignoreFirstLine()
.pojoType(Person.class, "name", "age", "city");
// Selective field reading
DataSet<Tuple2<String, Integer>> nameAge = env
.readCsvFile("/path/to/people.csv")
.includeFields(true, true, false) // name, age, skip city
.types(String.class, Integer.class);Built-in input formats for reading various data types.
/**
* Input format for reading text files line by line
*/
public class TextInputFormat extends FileInputFormat<String> {
// Reads text files, each line becomes a String element
}
/**
* Input format for reading text files as StringValue objects
*/
public class TextValueInputFormat extends FileInputFormat<StringValue> {
// Reads text files as StringValue for better memory efficiency
}
/**
* Input format for reading from Java collections
*/
public class CollectionInputFormat<T> implements InputFormat<T, GenericInputSplit> {
/**
* Create input format from collection
* @param dataSet the collection to read from
* @param serializer serializer for the data type
*/
public CollectionInputFormat(Collection<T> dataSet, TypeSerializer<T> serializer);
}
/**
* Input format for reading from iterators
*/
public class IteratorInputFormat<T> implements InputFormat<T, GenericInputSplit> {
/**
* Create input format from iterator
* @param iterator the iterator to read from
* @param serializer serializer for the data type
*/
public IteratorInputFormat(Iterator<T> iterator, TypeSerializer<T> serializer);
}
/**
* Input format for reading primitive types
*/
public class PrimitiveInputFormat<T> extends FileInputFormat<T> {
/**
* Create input format for primitive types
* @param filePath path to the file
* @param delimiter delimiter between values
* @param typeClass the primitive type class
*/
public PrimitiveInputFormat(Path filePath, String delimiter, Class<T> typeClass);
}
/**
* CSV input format for Row objects
*/
public class RowCsvInputFormat extends CsvInputFormat<Row> {
// Specialized CSV format for Row-based data
}Methods for writing DataSet content to external systems.
/**
* Write DataSet as text file
* @param filePath path where to write the file
* @return DataSink for execution
*/
public DataSink<T> writeAsText(String filePath);
/**
* Write DataSet as text file with write mode
* @param filePath path where to write the file
* @param writeMode OVERWRITE or NO_OVERWRITE
* @return DataSink for execution
*/
public DataSink<T> writeAsText(String filePath, WriteMode writeMode);
/**
* Write DataSet as CSV file
* @param filePath path where to write the CSV file
* @return DataSink for execution
*/
public DataSink<T> writeAsCsv(String filePath);
/**
* Write with custom text formatter
* @param filePath path where to write the file
* @param formatter custom text formatter
* @return DataSink for execution
*/
public DataSink<T> writeAsFormattedText(String filePath, TextFormatter<T> formatter);
/**
* Write using custom output format
* @param outputFormat the output format to use
* @param filePath path where to write
* @return DataSink for execution
*/
public DataSink<T> write(OutputFormat<T> outputFormat, String filePath);
/**
* Output using custom output format (no file path)
* @param outputFormat the output format to use
* @return DataSink for execution
*/
public DataSink<T> output(OutputFormat<T> outputFormat);Operations for debugging and development.
/**
* Print DataSet content to standard output (executes immediately)
* @throws Exception if printing fails
*/
public void print() throws Exception;
/**
* Print DataSet content to standard error (executes immediately)
* @throws Exception if printing fails
*/
public void printToErr() throws Exception;
/**
* Print DataSet content with identifier to standard output
* @param sinkIdentifier identifier for the print sink
* @return DataSink for execution
*/
public DataSink<T> print(String sinkIdentifier);
/**
* Print DataSet content with identifier to standard error
* @param sinkIdentifier identifier for the print sink
* @return DataSink for execution
*/
public DataSink<T> printToErr(String sinkIdentifier);
/**
* Print DataSet content on task manager with prefix (for debugging)
* @param prefix prefix for the printed output
* @return DataSink for execution
*/
public DataSink<T> printOnTaskManager(String prefix);Usage Examples:
// Write as text
DataSet<String> result = processedData;
result.writeAsText("/path/to/output.txt", WriteMode.OVERWRITE);
// Write as CSV
DataSet<Tuple3<String, Integer, Double>> data = getData();
data.writeAsCsv("/path/to/output.csv");
// Print for debugging
result.print();
// Custom formatter
result.writeAsFormattedText("/path/to/formatted.txt", new TextFormatter<String>() {
@Override
public String format(String record) {
return "Record: " + record;
}
});Built-in output formats for writing data in various formats.
/**
* Output format for writing text files
*/
public class TextOutputFormat<T> extends FileOutputFormat<T> {
/**
* Create text output format
* @param outputPath path to write the output
*/
public TextOutputFormat(Path outputPath);
}
/**
* Output format for writing CSV files
*/
public class CsvOutputFormat<T> extends FileOutputFormat<T> {
/**
* Create CSV output format
* @param outputPath path to write the CSV
*/
public CsvOutputFormat(Path outputPath);
/**
* Set field delimiter
* @param fieldDelimiter delimiter between fields
*/
public void setFieldDelimiter(String fieldDelimiter);
/**
* Set record delimiter
* @param recordDelimiter delimiter between records
*/
public void setRecordDelimiter(String recordDelimiter);
}
/**
* Output format for printing to stdout/stderr
*/
public class PrintingOutputFormat<T> implements OutputFormat<T> {
/**
* Create printing output format
* @param targetStream target stream (System.out or System.err)
* @param sinkIdentifier identifier for the sink
*/
public PrintingOutputFormat(PrintStream targetStream, String sinkIdentifier);
}
/**
* Output format for collecting to local collection
*/
public class LocalCollectionOutputFormat<T> implements OutputFormat<T> {
/**
* Create local collection output format
* @param out the collection to write to
*/
public LocalCollectionOutputFormat(List<T> out);
}
/**
* Output format that discards all records (for testing)
*/
public class DiscardingOutputFormat<T> implements OutputFormat<T> {
// Discards all records - useful for performance testing
}Write mode options for file operations.
/**
* Write mode for file operations
*/
public enum WriteMode {
/** Overwrite existing files */
OVERWRITE,
/** Fail if file already exists */
NO_OVERWRITE
}Configure data properties for input splits.
/**
* Properties for data splits
*/
public class SplitDataProperties<T> {
/**
* Specify that data is sorted by given fields
* @param fields the fields by which data is sorted
* @return configured properties
*/
public SplitDataProperties<T> splitsPartitionedBy(int... fields);
/**
* Specify grouping properties
* @param fields the fields by which data is grouped
* @return configured properties
*/
public SplitDataProperties<T> splitsGroupedBy(int... fields);
}import org.apache.flink.api.java.io.*;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Row;
import java.util.Collection;
import java.util.List;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-java