Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.
—
Stream formats provide record-by-record reading interfaces with automatic compression support for various file formats.
Core interface for implementing record-wise file reading with compression support.
/**
* A reader format that reads individual records from a stream.
*
* The outer class StreamFormat acts mainly as a configuration holder and factory for the
* reader. The actual reading is done by the Reader, which is created based on
* an input stream in the createReader method and restored (from checkpointed positions)
* in the restoreReader method.
*
* Compared to the BulkFormat, the stream format handles a few things out-of-the-box,
* like deciding how to batch records or dealing with compression.
*/
@PublicEvolving
public interface StreamFormat<T> extends Serializable, ResultTypeQueryable<T> {
/**
* Creates a new reader to read in this format. This method is called when a fresh reader is
* created for a split that was assigned from the enumerator. This method may also be called on
* recovery from a checkpoint, if the reader never stored an offset in the checkpoint.
*
* If the format is splittable, then the stream is positioned to the beginning of the file split,
* otherwise it will be at position zero.
*
* The fileLen is the length of the entire file, while splitEnd is the offset
* of the first byte after the split end boundary (exclusive end boundary). For non-splittable
* formats, both values are identical.
*/
Reader<T> createReader(
Configuration config, FSDataInputStream stream, long fileLen, long splitEnd)
throws IOException;
/**
* Restores a reader from a checkpointed position. This method is called when the reader is
* recovered from a checkpoint and the reader has previously stored an offset into the
* checkpoint, by returning from the Reader.getCheckpointedPosition() a value with
* non-negative offset. That value is supplied as the restoredOffset.
*
* If the format is splittable, then the stream is positioned to the beginning of the file split,
* otherwise it will be at position zero. The stream is NOT positioned to the checkpointed offset,
* because the format is free to interpret this offset in a different way than the byte offset in the file.
*/
Reader<T> restoreReader(
Configuration config,
FSDataInputStream stream,
long restoredOffset,
long fileLen,
long splitEnd)
throws IOException;
/**
* Checks whether this format is splittable. Splittable formats allow Flink to create multiple
* splits per file, so that Flink can read multiple regions of the file concurrently.
*/
boolean isSplittable();
/**
* Gets the type produced by this format. This type will be the type produced by the file source
* as a whole.
*/
@Override
TypeInformation<T> getProducedType();
/**
* The config option to define how many bytes to be read by the I/O thread in one fetch
* operation.
*/
ConfigOption<MemorySize> FETCH_IO_SIZE =
ConfigOptions.key("source.file.stream.io-fetch-size")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(1L))
.withDescription(
"The approximate of bytes per fetch that is passed from the I/O thread to file reader.");
}Nested interface for reading individual records from a stream.
/**
* The actual reader that reads the records.
*/
@PublicEvolving
public interface Reader<T> extends Closeable {
/**
* Reads the next record. Returns null when the input has reached its end.
*/
@Nullable
T read() throws IOException;
/**
* Closes the reader to release all resources.
*/
@Override
void close() throws IOException;
/**
* Optionally returns the current position of the reader. This can be implemented by readers
* that want to speed up recovery from a checkpoint.
*
* The current position of the reader is the position of the next record that will be
* returned in a call to read(). This can be implemented by readers that want to
* speed up recovery from a checkpoint.
*/
@Nullable
default CheckpointedPosition getCheckpointedPosition() {
return null;
}
}Abstract base class for non-splittable stream formats.
/**
* Simplified stream format for non-splittable files
*/
public abstract class SimpleStreamFormat<T> implements StreamFormat<T> {
/**
* Creates a reader for the entire file stream (simplified interface)
* @param config Configuration for the reader
* @param stream Input stream to read from
* @return Reader instance for reading records
* @throws IOException If reader creation fails
*/
public abstract Reader<T> createReader(Configuration config, FSDataInputStream stream)
throws IOException;
/**
* Always returns false for simple formats
* @return false (simple formats are not splittable)
*/
public final boolean isSplittable() {
return false;
}
}Built-in implementation for reading text files line by line.
/**
* Stream format for reading text files line by line with charset support
*/
public class TextLineInputFormat extends SimpleStreamFormat<String> {
/**
* Creates TextLineInputFormat with UTF-8 encoding
*/
public TextLineInputFormat();
/**
* Creates TextLineInputFormat with specified charset
* @param charsetName Name of charset to use for decoding
*/
public TextLineInputFormat(String charsetName);
/**
* Creates a reader for reading text lines
* @param config Configuration for the reader
* @param stream Input stream to read from
* @return Reader that returns String lines
* @throws IOException If reader creation fails
*/
public Reader<String> createReader(Configuration config, FSDataInputStream stream)
throws IOException;
/**
* Returns TypeInformation for String output
* @return TypeInformation describing String type
*/
public TypeInformation<String> getProducedType() {
return Types.STRING;
}
}Usage Examples:
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.core.fs.Path;
// Reading text files with UTF-8 encoding
FileSource<String> textSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/logs"))
.build();
// Reading text files with custom encoding
FileSource<String> customEncodingSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat("ISO-8859-1"), new Path("/data/legacy"))
.build();
// Using the source in a Flink job
DataStream<String> lines = env.fromSource(textSource, WatermarkStrategy.noWatermarks(), "text-source");Example of implementing a custom stream format.
/**
* Example custom stream format for reading CSV records
*/
public class CsvStreamFormat implements StreamFormat<String[]> {
private final String delimiter;
public CsvStreamFormat(String delimiter) {
this.delimiter = delimiter;
}
@Override
public Reader<String[]> createReader(
Configuration config,
FSDataInputStream stream,
long fileLen,
long splitEnd) throws IOException {
return new CsvReader(stream, splitEnd, delimiter);
}
@Override
public Reader<String[]> restoreReader(
Configuration config,
FSDataInputStream stream,
long fileLen,
long splitEnd,
long checkpointedOffset) throws IOException {
stream.seek(checkpointedOffset);
return new CsvReader(stream, splitEnd, delimiter);
}
@Override
public boolean isSplittable() {
return true; // CSV can be split at line boundaries
}
@Override
public TypeInformation<String[]> getProducedType() {
return Types.OBJECT_ARRAY(Types.STRING);
}
private static class CsvReader implements StreamFormat.Reader<String[]> {
private final BufferedReader reader;
private final long splitEnd;
private final String delimiter;
private long bytesRead = 0;
public CsvReader(FSDataInputStream stream, long splitEnd, String delimiter) {
this.reader = new BufferedReader(new InputStreamReader(stream));
this.splitEnd = splitEnd;
this.delimiter = delimiter;
}
@Override
public String[] read() throws IOException {
if (bytesRead >= splitEnd) {
return null;
}
String line = reader.readLine();
if (line == null) {
return null;
}
bytesRead += line.getBytes().length + 1; // +1 for newline
return line.split(delimiter);
}
}
}Stream formats automatically support compression through the compression detection system.
/**
* Stream formats automatically detect and handle compressed files
* Supported extensions: .gz, .gzip, .bz2, .xz, .deflate
*/
// Reading compressed text files - compression is handled automatically
FileSource<String> compressedSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(),
new Path("/data/logs.gz"),
new Path("/data/archive.bz2"))
.build();
// Custom format with compression support
FileSource<String[]> compressedCsvSource = FileSource
.forRecordStreamFormat(new CsvStreamFormat(","), new Path("/data/data.csv.gz"))
.build();Stream formats handle various error conditions during reading:
try {
StreamFormat<String> format = new TextLineInputFormat("INVALID-CHARSET");
} catch (UnsupportedEncodingException e) {
// Handle invalid charset
}
// Reader error handling
StreamFormat.Reader<String> reader = format.createReader(config, stream, fileLen, splitEnd);
try {
String record;
while ((record = reader.read()) != null) {
// Process record
}
} catch (IOException e) {
// Handle read errors
}isSplittable() correctly - splittable formats can be processed in parallelInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-files