0
# File Enumeration
1
2
File enumeration provides file discovery mechanisms for finding files and creating splits across distributed storage systems with support for filtering and recursive directory traversal.
3
4
## Capabilities
5
6
### FileEnumerator Interface
7
8
Core interface for discovering files and creating processing splits.
9
10
```java { .api }
11
/**
12
* Interface for discovering files and creating splits for parallel processing
13
*/
14
public interface FileEnumerator {
15
/**
16
* Enumerates files from given paths and creates splits for processing
17
* @param paths Array of paths to enumerate files from
18
* @param minDesiredSplits Minimum number of splits to create for parallel processing
19
* @return Collection of FileSourceSplit instances for processing
20
* @throws IOException If file enumeration fails
21
*/
22
Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
23
throws IOException;
24
}
25
```
26
27
### FileEnumerator.Provider Interface
28
29
Factory interface for creating FileEnumerator instances with serialization support.
30
31
```java { .api }
32
/**
33
* Factory interface for creating FileEnumerator instances
34
*/
35
public interface Provider extends Serializable {
36
/**
37
* Creates a new FileEnumerator instance
38
* @return FileEnumerator implementation
39
*/
40
FileEnumerator create();
41
}
42
```
43
44
### BlockSplittingRecursiveEnumerator
45
46
Default enumerator for splittable formats that respects storage block boundaries.
47
48
```java { .api }
49
/**
50
* File enumerator that splits files into multiple regions based on block boundaries
51
* Designed for splittable file formats and distributed file systems
52
*/
53
public class BlockSplittingRecursiveEnumerator implements FileEnumerator {
54
/**
55
* Creates enumerator with default configuration
56
*/
57
public BlockSplittingRecursiveEnumerator();
58
59
/**
60
* Creates enumerator with custom file filter
61
* @param fileFilter Filter for selecting which files to include
62
*/
63
public BlockSplittingRecursiveEnumerator(FileSystem.FileStatusFilter fileFilter);
64
65
@Override
66
public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
67
throws IOException;
68
69
/**
70
* Provider instance for use with FileSource builder
71
*/
72
public static final Provider PROVIDER = BlockSplittingRecursiveEnumerator::new;
73
}
74
```
75
76
### NonSplittingRecursiveEnumerator
77
78
Enumerator for non-splittable formats that creates one split per file.
79
80
```java { .api }
81
/**
82
* File enumerator that creates one split per file without splitting
83
* Designed for non-splittable file formats
84
*/
85
public class NonSplittingRecursiveEnumerator implements FileEnumerator {
86
/**
87
* Creates enumerator with default configuration
88
*/
89
public NonSplittingRecursiveEnumerator();
90
91
/**
92
* Creates enumerator with custom file filter
93
* @param fileFilter Filter for selecting which files to include
94
*/
95
public NonSplittingRecursiveEnumerator(FileSystem.FileStatusFilter fileFilter);
96
97
@Override
98
public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
99
throws IOException;
100
101
/**
102
* Provider instance for use with FileSource builder
103
*/
104
public static final Provider PROVIDER = NonSplittingRecursiveEnumerator::new;
105
}
106
```
107
108
### All-Directory Enumerators
109
110
Specialized enumerators that include all directories, including empty ones.
111
112
```java { .api }
113
/**
114
* Block-splitting enumerator that includes all directories
115
*/
116
public class BlockSplittingRecursiveAllDirEnumerator extends BlockSplittingRecursiveEnumerator {
117
public BlockSplittingRecursiveAllDirEnumerator();
118
public BlockSplittingRecursiveAllDirEnumerator(FileSystem.FileStatusFilter fileFilter);
119
}
120
121
/**
122
* Non-splitting enumerator that includes all directories
123
*/
124
public class NonSplittingRecursiveAllDirEnumerator extends NonSplittingRecursiveEnumerator {
125
public NonSplittingRecursiveAllDirEnumerator();
126
public NonSplittingRecursiveAllDirEnumerator(FileSystem.FileStatusFilter fileFilter);
127
}
128
```
129
130
**Usage Examples:**
131
132
```java
133
import org.apache.flink.connector.file.src.FileSource;
134
import org.apache.flink.connector.file.src.enumerate.*;
135
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
136
import org.apache.flink.core.fs.Path;
137
138
// Using default block-splitting enumerator (recommended for splittable formats)
139
FileSource<String> splittableSource = FileSource
140
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
141
.setFileEnumerator(BlockSplittingRecursiveEnumerator.PROVIDER)
142
.build();
143
144
// Using non-splitting enumerator for formats that cannot be split
145
FileSource<String> nonSplittableSource = FileSource
146
.forRecordStreamFormat(new CustomBinaryFormat(), new Path("/data"))
147
.setFileEnumerator(NonSplittingRecursiveEnumerator.PROVIDER)
148
.build();
149
150
// Using all-directory enumerator to include empty directories
151
FileSource<String> allDirSource = FileSource
152
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
153
.setFileEnumerator(() -> new BlockSplittingRecursiveAllDirEnumerator())
154
.build();
155
```
156
157
### File Filters
158
159
Built-in and custom file filtering capabilities for selective file processing.
160
161
```java { .api }
162
/**
163
* Default file filter that excludes hidden files and directories
164
*/
165
public class DefaultFileFilter implements FileSystem.FileStatusFilter {
166
/**
167
* Singleton instance of the default filter
168
*/
169
public static final DefaultFileFilter INSTANCE = new DefaultFileFilter();
170
171
/**
172
* Accepts non-hidden files and directories
173
* @param fileStatus File status to evaluate
174
* @return true if file should be included
175
*/
176
@Override
177
public boolean accept(FileStatus fileStatus);
178
}
179
180
/**
181
* File filter using regular expressions for name matching
182
*/
183
public class RegexFileFilter implements FileSystem.FileStatusFilter {
184
/**
185
* Creates filter with regex pattern
186
* @param pattern Regular expression pattern for file names
187
*/
188
public RegexFileFilter(String pattern);
189
190
/**
191
* Accepts files whose names match the regex pattern
192
* @param fileStatus File status to evaluate
193
* @return true if file name matches pattern
194
*/
195
@Override
196
public boolean accept(FileStatus fileStatus);
197
}
198
```
199
200
### Dynamic File Enumeration
201
202
Support for continuous file discovery in streaming scenarios.
203
204
```java { .api }
205
/**
206
* Enumerator interface for dynamic file discovery
207
*/
208
public interface DynamicFileEnumerator extends FileEnumerator {
209
/**
210
* Enumerates new files that have appeared since the last enumeration
211
* @param paths Paths to check for new files
212
* @param alreadyProcessedPaths Set of paths already processed
213
* @return Collection of new splits for processing
214
* @throws IOException If enumeration fails
215
*/
216
Collection<FileSourceSplit> enumerateNewSplits(
217
Path[] paths, Set<Path> alreadyProcessedPaths) throws IOException;
218
}
219
```
220
221
**Advanced Usage Examples:**
222
223
```java
224
// Custom file filter for specific file extensions
225
FileSystem.FileStatusFilter csvFilter = fileStatus ->
226
!fileStatus.getPath().getName().startsWith(".") &&
227
fileStatus.getPath().getName().endsWith(".csv");
228
229
// Enumerator with custom filter
230
FileSource<String> filteredSource = FileSource
231
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
232
.setFileEnumerator(() -> new BlockSplittingRecursiveEnumerator(csvFilter))
233
.build();
234
235
// Regex-based file filtering
236
FileSystem.FileStatusFilter logFilter = new RegexFileFilter(".*\\.log$");
237
FileSource<String> logSource = FileSource
238
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/logs"))
239
.setFileEnumerator(() -> new NonSplittingRecursiveEnumerator(logFilter))
240
.build();
241
242
// Multiple path enumeration with different strategies
243
Path[] dataPaths = {
244
new Path("/data/current"),
245
new Path("/data/archive"),
246
new Path("/data/backup")
247
};
248
249
FileSource<String> multiPathSource = FileSource
250
.forRecordStreamFormat(new TextLineInputFormat(), dataPaths)
251
.setFileEnumerator(() -> new BlockSplittingRecursiveAllDirEnumerator())
252
.build();
253
```
254
255
### Custom FileEnumerator Implementation
256
257
Example of implementing a custom file enumerator with specific logic.
258
259
```java { .api }
260
/**
261
* Example custom enumerator that prioritizes newer files
262
*/
263
public class TimestampBasedEnumerator implements FileEnumerator {
264
private final long maxFileAge;
265
private final FileSystem.FileStatusFilter filter;
266
267
public TimestampBasedEnumerator(long maxFileAgeMillis) {
268
this.maxFileAge = maxFileAgeMillis;
269
this.filter = new DefaultFileFilter();
270
}
271
272
@Override
273
public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
274
throws IOException {
275
long currentTime = System.currentTimeMillis();
276
List<FileSourceSplit> splits = new ArrayList<>();
277
278
for (Path path : paths) {
279
FileSystem fs = path.getFileSystem();
280
FileStatus[] statuses = fs.listStatus(path, filter);
281
282
// Sort by modification time (newest first)
283
Arrays.sort(statuses, (a, b) -> Long.compare(b.getModificationTime(), a.getModificationTime()));
284
285
for (FileStatus status : statuses) {
286
if (currentTime - status.getModificationTime() <= maxFileAge) {
287
FileSourceSplit split = new FileSourceSplit(
288
status.getPath().toString(),
289
status.getPath(),
290
0,
291
status.getLen(),
292
status.getModificationTime(),
293
status.getLen()
294
);
295
splits.add(split);
296
}
297
}
298
}
299
300
return splits;
301
}
302
303
public static class Provider implements FileEnumerator.Provider {
304
private final long maxFileAge;
305
306
public Provider(long maxFileAgeMillis) {
307
this.maxFileAge = maxFileAgeMillis;
308
}
309
310
@Override
311
public FileEnumerator create() {
312
return new TimestampBasedEnumerator(maxFileAge);
313
}
314
}
315
}
316
```
317
318
## Error Handling
319
320
File enumerators handle various error conditions during file discovery:
321
322
- **IOException**: File system access errors, permission denied
323
- **FileNotFoundException**: Specified paths do not exist
324
- **SecurityException**: Insufficient permissions for file access
325
326
```java
327
try {
328
FileEnumerator enumerator = new BlockSplittingRecursiveEnumerator();
329
Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
330
new Path[]{new Path("/protected/path")}, 4);
331
} catch (IOException e) {
332
// Handle file system errors
333
} catch (SecurityException e) {
334
// Handle permission errors
335
}
336
```
337
338
## Performance Considerations
339
340
- Use `BlockSplittingRecursiveEnumerator` for large files on distributed file systems
341
- Use `NonSplittingRecursiveEnumerator` for small files or non-splittable formats
342
- Implement efficient file filters to reduce enumeration overhead
343
- Consider file system characteristics when choosing enumeration strategies
344
- Monitor enumeration performance for large directory structures
345
- Use appropriate `minDesiredSplits` values to balance parallelism and overhead
346
- Cache enumeration results where possible to avoid repeated file system calls