Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.
—
File enumeration provides file discovery mechanisms for finding files and creating splits across distributed storage systems with support for filtering and recursive directory traversal.
Core interface for discovering files and creating processing splits.
/**
* Interface for discovering files and creating splits for parallel processing
*/
public interface FileEnumerator {
/**
* Enumerates files from given paths and creates splits for processing
* @param paths Array of paths to enumerate files from
* @param minDesiredSplits Minimum number of splits to create for parallel processing
* @return Collection of FileSourceSplit instances for processing
* @throws IOException If file enumeration fails
*/
Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
throws IOException;
}Factory interface for creating FileEnumerator instances with serialization support.
/**
* Factory interface for creating FileEnumerator instances
*/
public interface Provider extends Serializable {
/**
* Creates a new FileEnumerator instance
* @return FileEnumerator implementation
*/
FileEnumerator create();
}Default enumerator for splittable formats that respects storage block boundaries.
/**
* File enumerator that splits files into multiple regions based on block boundaries
* Designed for splittable file formats and distributed file systems
*/
public class BlockSplittingRecursiveEnumerator implements FileEnumerator {
/**
* Creates enumerator with default configuration
*/
public BlockSplittingRecursiveEnumerator();
/**
* Creates enumerator with custom file filter
* @param fileFilter Filter for selecting which files to include
*/
public BlockSplittingRecursiveEnumerator(FileSystem.FileStatusFilter fileFilter);
@Override
public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
throws IOException;
/**
* Provider instance for use with FileSource builder
*/
public static final Provider PROVIDER = BlockSplittingRecursiveEnumerator::new;
}Enumerator for non-splittable formats that creates one split per file.
/**
* File enumerator that creates one split per file without splitting
* Designed for non-splittable file formats
*/
public class NonSplittingRecursiveEnumerator implements FileEnumerator {
/**
* Creates enumerator with default configuration
*/
public NonSplittingRecursiveEnumerator();
/**
* Creates enumerator with custom file filter
* @param fileFilter Filter for selecting which files to include
*/
public NonSplittingRecursiveEnumerator(FileSystem.FileStatusFilter fileFilter);
@Override
public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
throws IOException;
/**
* Provider instance for use with FileSource builder
*/
public static final Provider PROVIDER = NonSplittingRecursiveEnumerator::new;
}Specialized enumerators that include all directories, including empty ones.
/**
* Block-splitting enumerator that includes all directories
*/
public class BlockSplittingRecursiveAllDirEnumerator extends BlockSplittingRecursiveEnumerator {
public BlockSplittingRecursiveAllDirEnumerator();
public BlockSplittingRecursiveAllDirEnumerator(FileSystem.FileStatusFilter fileFilter);
}
/**
* Non-splitting enumerator that includes all directories
*/
public class NonSplittingRecursiveAllDirEnumerator extends NonSplittingRecursiveEnumerator {
public NonSplittingRecursiveAllDirEnumerator();
public NonSplittingRecursiveAllDirEnumerator(FileSystem.FileStatusFilter fileFilter);
}Usage Examples:
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.enumerate.*;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
// Using default block-splitting enumerator (recommended for splittable formats)
FileSource<String> splittableSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
.setFileEnumerator(BlockSplittingRecursiveEnumerator.PROVIDER)
.build();
// Using non-splitting enumerator for formats that cannot be split
FileSource<String> nonSplittableSource = FileSource
.forRecordStreamFormat(new CustomBinaryFormat(), new Path("/data"))
.setFileEnumerator(NonSplittingRecursiveEnumerator.PROVIDER)
.build();
// Using all-directory enumerator to include empty directories
FileSource<String> allDirSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
.setFileEnumerator(() -> new BlockSplittingRecursiveAllDirEnumerator())
.build();Built-in and custom file filtering capabilities for selective file processing.
/**
* Default file filter that excludes hidden files and directories
*/
public class DefaultFileFilter implements FileSystem.FileStatusFilter {
/**
* Singleton instance of the default filter
*/
public static final DefaultFileFilter INSTANCE = new DefaultFileFilter();
/**
* Accepts non-hidden files and directories
* @param fileStatus File status to evaluate
* @return true if file should be included
*/
@Override
public boolean accept(FileStatus fileStatus);
}
/**
* File filter using regular expressions for name matching
*/
public class RegexFileFilter implements FileSystem.FileStatusFilter {
/**
* Creates filter with regex pattern
* @param pattern Regular expression pattern for file names
*/
public RegexFileFilter(String pattern);
/**
* Accepts files whose names match the regex pattern
* @param fileStatus File status to evaluate
* @return true if file name matches pattern
*/
@Override
public boolean accept(FileStatus fileStatus);
}Support for continuous file discovery in streaming scenarios.
/**
* Enumerator interface for dynamic file discovery
*/
public interface DynamicFileEnumerator extends FileEnumerator {
/**
* Enumerates new files that have appeared since the last enumeration
* @param paths Paths to check for new files
* @param alreadyProcessedPaths Set of paths already processed
* @return Collection of new splits for processing
* @throws IOException If enumeration fails
*/
Collection<FileSourceSplit> enumerateNewSplits(
Path[] paths, Set<Path> alreadyProcessedPaths) throws IOException;
}Advanced Usage Examples:
// Custom file filter for specific file extensions
FileSystem.FileStatusFilter csvFilter = fileStatus ->
!fileStatus.getPath().getName().startsWith(".") &&
fileStatus.getPath().getName().endsWith(".csv");
// Enumerator with custom filter
FileSource<String> filteredSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
.setFileEnumerator(() -> new BlockSplittingRecursiveEnumerator(csvFilter))
.build();
// Regex-based file filtering
FileSystem.FileStatusFilter logFilter = new RegexFileFilter(".*\\.log$");
FileSource<String> logSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/logs"))
.setFileEnumerator(() -> new NonSplittingRecursiveEnumerator(logFilter))
.build();
// Multiple path enumeration with different strategies
Path[] dataPaths = {
new Path("/data/current"),
new Path("/data/archive"),
new Path("/data/backup")
};
FileSource<String> multiPathSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), dataPaths)
.setFileEnumerator(() -> new BlockSplittingRecursiveAllDirEnumerator())
.build();Example of implementing a custom file enumerator with specific logic.
/**
* Example custom enumerator that prioritizes newer files
*/
public class TimestampBasedEnumerator implements FileEnumerator {
private final long maxFileAge;
private final FileSystem.FileStatusFilter filter;
public TimestampBasedEnumerator(long maxFileAgeMillis) {
this.maxFileAge = maxFileAgeMillis;
this.filter = new DefaultFileFilter();
}
@Override
public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
throws IOException {
long currentTime = System.currentTimeMillis();
List<FileSourceSplit> splits = new ArrayList<>();
for (Path path : paths) {
FileSystem fs = path.getFileSystem();
FileStatus[] statuses = fs.listStatus(path, filter);
// Sort by modification time (newest first)
Arrays.sort(statuses, (a, b) -> Long.compare(b.getModificationTime(), a.getModificationTime()));
for (FileStatus status : statuses) {
if (currentTime - status.getModificationTime() <= maxFileAge) {
FileSourceSplit split = new FileSourceSplit(
status.getPath().toString(),
status.getPath(),
0,
status.getLen(),
status.getModificationTime(),
status.getLen()
);
splits.add(split);
}
}
}
return splits;
}
public static class Provider implements FileEnumerator.Provider {
private final long maxFileAge;
public Provider(long maxFileAgeMillis) {
this.maxFileAge = maxFileAgeMillis;
}
@Override
public FileEnumerator create() {
return new TimestampBasedEnumerator(maxFileAge);
}
}
}File enumerators handle various error conditions during file discovery:
try {
FileEnumerator enumerator = new BlockSplittingRecursiveEnumerator();
Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
new Path[]{new Path("/protected/path")}, 4);
} catch (IOException e) {
// Handle file system errors
} catch (SecurityException e) {
// Handle permission errors
}BlockSplittingRecursiveEnumerator for large files on distributed file systemsNonSplittingRecursiveEnumerator for small files or non-splittable formatsminDesiredSplits values to balance parallelism and overheadInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-files