Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.
—
File sources provide unified reading capabilities for files in both batch and streaming modes, with support for various formats, continuous monitoring, and distributed processing.
Main entry point for creating file sources that can read from distributed file systems.
/**
* A unified data source that reads files - both in batch and in streaming mode.
* Supports all (distributed) file systems and object stores that can be accessed via
* the Flink's FileSystem class.
*/
@PublicEvolving
public final class FileSource<T> extends AbstractFileSource<T, FileSourceSplit>
implements DynamicParallelismInference {
/**
* Builds a new FileSource using a StreamFormat to read record-by-record from a
* file stream. When possible, stream-based formats are generally easier (preferable)
* to file-based formats, because they support better default behavior around I/O
* batching or progress tracking (checkpoints).
*/
public static <T> FileSourceBuilder<T> forRecordStreamFormat(
final StreamFormat<T> streamFormat, final Path... paths);
/**
* Builds a new FileSource using a BulkFormat to read batches of records from
* files. Examples for bulk readers are compressed and vectorized formats such as
* ORC or Parquet.
*/
public static <T> FileSourceBuilder<T> forBulkFileFormat(
final BulkFormat<T, FileSourceSplit> bulkFormat, final Path... paths);
/**
* The default split assigner, a lazy locality-aware assigner.
*/
public static final FileSplitAssigner.Provider DEFAULT_SPLIT_ASSIGNER;
/**
* The default file enumerator used for splittable formats.
*/
public static final FileEnumerator.Provider DEFAULT_SPLITTABLE_FILE_ENUMERATOR;
/**
* The default file enumerator used for non-splittable formats.
*/
public static final FileEnumerator.Provider DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR;
}Builder pattern for configuring FileSource instances with various options.
/**
* The builder for the FileSource, to configure the various behaviors.
*/
public static final class FileSourceBuilder<T>
extends AbstractFileSourceBuilder<T, FileSourceSplit, FileSourceBuilder<T>> {
/**
* Sets this source to streaming ("continuous monitoring") mode.
* This makes the source a "continuous streaming" source that keeps running, monitoring
* for new files, and reads these files when they appear and are discovered by the
* monitoring.
*/
public FileSourceBuilder<T> monitorContinuously(Duration discoveryInterval);
/**
* Sets this source to bounded (batch) mode.
* In this mode, the source processes the files that are under the given paths when the
* application is started. Once all files are processed, the source will finish.
*/
public FileSourceBuilder<T> processStaticFileSet();
/**
* Configures the FileEnumerator for the source. The File Enumerator is responsible
* for selecting from the input path the set of files that should be processed (and which to
* filter out). Furthermore, the File Enumerator may split the files further into
* sub-regions, to enable parallelization beyond the number of files.
*/
public FileSourceBuilder<T> setFileEnumerator(FileEnumerator.Provider fileEnumerator);
/**
* Configures the FileSplitAssigner for the source. The File Split Assigner
* determines which parallel reader instance gets which FileSourceSplit, and in
* which order these splits are assigned.
*/
public FileSourceBuilder<T> setSplitAssigner(FileSplitAssigner.Provider splitAssigner);
/**
* Creates the file source with the settings applied to this builder.
*/
public FileSource<T> build();
}Usage Examples:
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner;
import org.apache.flink.core.fs.Path;
import java.time.Duration;
// Basic file source for text files
FileSource<String> basicSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/input"))
.build();
// Streaming source with continuous monitoring
FileSource<String> streamingSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/stream"))
.monitorContinuously(Duration.ofSeconds(10))
.build();
// Source with custom split assignment for locality
FileSource<String> localityAwareSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/distributed"))
.setSplitAssigner(LocalityAwareSplitAssigner.Provider.INSTANCE)
.build();
// Use with DataStream API
DataStreamSource<String> stream = env.fromSource(
streamingSource,
WatermarkStrategy.noWatermarks(),
"file-source"
);Base class for file sources, providing common functionality and structure.
/**
* The base class for File Sources. The main implementation to use is the FileSource, which
* also has the majority of the documentation.
*
* To read new formats, one commonly does NOT need to extend this class, but should implement a
* new Format Reader (like StreamFormat, BulkFormat) and use it with the FileSource.
*
* The only reason to extend this class is when a source needs a different type of split,
* meaning an extension of the FileSourceSplit to carry additional information.
*/
@PublicEvolving
public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit>
implements Source<T, SplitT, PendingSplitsCheckpoint<SplitT>>, ResultTypeQueryable<T> {
protected AbstractFileSource(
final Path[] inputPaths,
final FileEnumerator.Provider fileEnumerator,
final FileSplitAssigner.Provider splitAssigner,
final BulkFormat<T, SplitT> readerFormat,
@Nullable final ContinuousEnumerationSettings continuousEnumerationSettings);
/**
* Gets the enumerator factory for this source.
*/
protected FileEnumerator.Provider getEnumeratorFactory();
/**
* Gets the assigner factory for this source.
*/
public FileSplitAssigner.Provider getAssignerFactory();
/**
* Gets the continuous enumeration settings, or null if this source is bounded.
*/
@Nullable
public ContinuousEnumerationSettings getContinuousEnumerationSettings();
/**
* Gets the boundedness of this source - bounded for batch mode, continuous unbounded for streaming.
*/
@Override
public Boundedness getBoundedness();
/**
* Creates a new source reader for reading the file splits.
*/
@Override
public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext);
/**
* Creates a new split enumerator for discovering and assigning file splits.
*/
@Override
public SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createEnumerator(
SplitEnumeratorContext<SplitT> enumContext);
}Integration points for using file sources in Flink applications.
/**
* Creates a DataStream from a file source
* @param source The file source to read from
* @param watermarkStrategy Watermark strategy for event time processing
* @param sourceName Name for the source operator
* @return DataStream containing the source data
*/
public <T> DataStreamSource<T> fromSource(
Source<T, ?, ?> source,
WatermarkStrategy<T> watermarkStrategy,
String sourceName);Advanced Usage Examples:
// Reading multiple file paths
FileSource<String> multiPathSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(),
new Path("/data/logs/2023"),
new Path("/data/logs/2024"))
.build();
// Custom file enumeration with filtering
FileSource<String> filteredSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
.setFileEnumerator(() -> new BlockSplittingRecursiveEnumerator())
.build();
// Bulk format reading for Parquet files (example interface)
BulkFormat<RowData, FileSourceSplit> parquetFormat = /* implementation */;
FileSource<RowData> parquetSource = FileSource
.forBulkFileFormat(parquetFormat, new Path("/data/parquet"))
.build();File sources handle various error conditions during reading:
try {
FileSource<String> source = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/invalid/path"))
.build();
} catch (IllegalArgumentException e) {
// Handle invalid path or configuration
}LocalityAwareSplitAssigner for HDFS and other distributed file systemsInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-files