or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-formats.mdfile-compaction.mdfile-enumeration.mdfile-sinks.mdfile-sources.mdindex.mdsplit-assignment.mdstream-formats.md

file-enumeration.mddocs/

0

# File Enumeration

1

2

File enumeration provides file discovery mechanisms for finding files and creating splits across distributed storage systems with support for filtering and recursive directory traversal.

3

4

## Capabilities

5

6

### FileEnumerator Interface

7

8

Core interface for discovering files and creating processing splits.

9

10

```java { .api }

11

/**

12

* Interface for discovering files and creating splits for parallel processing

13

*/

14

public interface FileEnumerator {

15

/**

16

* Enumerates files from given paths and creates splits for processing

17

* @param paths Array of paths to enumerate files from

18

* @param minDesiredSplits Minimum number of splits to create for parallel processing

19

* @return Collection of FileSourceSplit instances for processing

20

* @throws IOException If file enumeration fails

21

*/

22

Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)

23

throws IOException;

24

}

25

```

26

27

### FileEnumerator.Provider Interface

28

29

Factory interface for creating FileEnumerator instances with serialization support.

30

31

```java { .api }

32

/**

33

* Factory interface for creating FileEnumerator instances

34

*/

35

public interface Provider extends Serializable {

36

/**

37

* Creates a new FileEnumerator instance

38

* @return FileEnumerator implementation

39

*/

40

FileEnumerator create();

41

}

42

```

43

44

### BlockSplittingRecursiveEnumerator

45

46

Default enumerator for splittable formats that respects storage block boundaries.

47

48

```java { .api }

49

/**

50

* File enumerator that splits files into multiple regions based on block boundaries

51

* Designed for splittable file formats and distributed file systems

52

*/

53

public class BlockSplittingRecursiveEnumerator implements FileEnumerator {

54

/**

55

* Creates enumerator with default configuration

56

*/

57

public BlockSplittingRecursiveEnumerator();

58

59

/**

60

* Creates enumerator with custom file filter

61

* @param fileFilter Filter for selecting which files to include

62

*/

63

public BlockSplittingRecursiveEnumerator(FileSystem.FileStatusFilter fileFilter);

64

65

@Override

66

public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)

67

throws IOException;

68

69

/**

70

* Provider instance for use with FileSource builder

71

*/

72

public static final Provider PROVIDER = BlockSplittingRecursiveEnumerator::new;

73

}

74

```

75

76

### NonSplittingRecursiveEnumerator

77

78

Enumerator for non-splittable formats that creates one split per file.

79

80

```java { .api }

81

/**

82

* File enumerator that creates one split per file without splitting

83

* Designed for non-splittable file formats

84

*/

85

public class NonSplittingRecursiveEnumerator implements FileEnumerator {

86

/**

87

* Creates enumerator with default configuration

88

*/

89

public NonSplittingRecursiveEnumerator();

90

91

/**

92

* Creates enumerator with custom file filter

93

* @param fileFilter Filter for selecting which files to include

94

*/

95

public NonSplittingRecursiveEnumerator(FileSystem.FileStatusFilter fileFilter);

96

97

@Override

98

public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)

99

throws IOException;

100

101

/**

102

* Provider instance for use with FileSource builder

103

*/

104

public static final Provider PROVIDER = NonSplittingRecursiveEnumerator::new;

105

}

106

```

107

108

### All-Directory Enumerators

109

110

Specialized enumerators that include all directories, including empty ones.

111

112

```java { .api }

113

/**

114

* Block-splitting enumerator that includes all directories

115

*/

116

public class BlockSplittingRecursiveAllDirEnumerator extends BlockSplittingRecursiveEnumerator {

117

public BlockSplittingRecursiveAllDirEnumerator();

118

public BlockSplittingRecursiveAllDirEnumerator(FileSystem.FileStatusFilter fileFilter);

119

}

120

121

/**

122

* Non-splitting enumerator that includes all directories

123

*/

124

public class NonSplittingRecursiveAllDirEnumerator extends NonSplittingRecursiveEnumerator {

125

public NonSplittingRecursiveAllDirEnumerator();

126

public NonSplittingRecursiveAllDirEnumerator(FileSystem.FileStatusFilter fileFilter);

127

}

128

```

129

130

**Usage Examples:**

131

132

```java

133

import org.apache.flink.connector.file.src.FileSource;

134

import org.apache.flink.connector.file.src.enumerate.*;

135

import org.apache.flink.connector.file.src.reader.TextLineInputFormat;

136

import org.apache.flink.core.fs.Path;

137

138

// Using default block-splitting enumerator (recommended for splittable formats)

139

FileSource<String> splittableSource = FileSource

140

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))

141

.setFileEnumerator(BlockSplittingRecursiveEnumerator.PROVIDER)

142

.build();

143

144

// Using non-splitting enumerator for formats that cannot be split

145

FileSource<String> nonSplittableSource = FileSource

146

.forRecordStreamFormat(new CustomBinaryFormat(), new Path("/data"))

147

.setFileEnumerator(NonSplittingRecursiveEnumerator.PROVIDER)

148

.build();

149

150

// Using all-directory enumerator to include empty directories

151

FileSource<String> allDirSource = FileSource

152

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))

153

.setFileEnumerator(() -> new BlockSplittingRecursiveAllDirEnumerator())

154

.build();

155

```

156

157

### File Filters

158

159

Built-in and custom file filtering capabilities for selective file processing.

160

161

```java { .api }

162

/**

163

* Default file filter that excludes hidden files and directories

164

*/

165

public class DefaultFileFilter implements FileSystem.FileStatusFilter {

166

/**

167

* Singleton instance of the default filter

168

*/

169

public static final DefaultFileFilter INSTANCE = new DefaultFileFilter();

170

171

/**

172

* Accepts non-hidden files and directories

173

* @param fileStatus File status to evaluate

174

* @return true if file should be included

175

*/

176

@Override

177

public boolean accept(FileStatus fileStatus);

178

}

179

180

/**

181

* File filter using regular expressions for name matching

182

*/

183

public class RegexFileFilter implements FileSystem.FileStatusFilter {

184

/**

185

* Creates filter with regex pattern

186

* @param pattern Regular expression pattern for file names

187

*/

188

public RegexFileFilter(String pattern);

189

190

/**

191

* Accepts files whose names match the regex pattern

192

* @param fileStatus File status to evaluate

193

* @return true if file name matches pattern

194

*/

195

@Override

196

public boolean accept(FileStatus fileStatus);

197

}

198

```

199

200

### Dynamic File Enumeration

201

202

Support for continuous file discovery in streaming scenarios.

203

204

```java { .api }

205

/**

206

* Enumerator interface for dynamic file discovery

207

*/

208

public interface DynamicFileEnumerator extends FileEnumerator {

209

/**

210

* Enumerates new files that have appeared since the last enumeration

211

* @param paths Paths to check for new files

212

* @param alreadyProcessedPaths Set of paths already processed

213

* @return Collection of new splits for processing

214

* @throws IOException If enumeration fails

215

*/

216

Collection<FileSourceSplit> enumerateNewSplits(

217

Path[] paths, Set<Path> alreadyProcessedPaths) throws IOException;

218

}

219

```

220

221

**Advanced Usage Examples:**

222

223

```java

224

// Custom file filter for specific file extensions

225

FileSystem.FileStatusFilter csvFilter = fileStatus ->

226

!fileStatus.getPath().getName().startsWith(".") &&

227

fileStatus.getPath().getName().endsWith(".csv");

228

229

// Enumerator with custom filter

230

FileSource<String> filteredSource = FileSource

231

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))

232

.setFileEnumerator(() -> new BlockSplittingRecursiveEnumerator(csvFilter))

233

.build();

234

235

// Regex-based file filtering

236

FileSystem.FileStatusFilter logFilter = new RegexFileFilter(".*\\.log$");

237

FileSource<String> logSource = FileSource

238

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/logs"))

239

.setFileEnumerator(() -> new NonSplittingRecursiveEnumerator(logFilter))

240

.build();

241

242

// Multiple path enumeration with different strategies

243

Path[] dataPaths = {

244

new Path("/data/current"),

245

new Path("/data/archive"),

246

new Path("/data/backup")

247

};

248

249

FileSource<String> multiPathSource = FileSource

250

.forRecordStreamFormat(new TextLineInputFormat(), dataPaths)

251

.setFileEnumerator(() -> new BlockSplittingRecursiveAllDirEnumerator())

252

.build();

253

```

254

255

### Custom FileEnumerator Implementation

256

257

Example of implementing a custom file enumerator with specific logic.

258

259

```java { .api }

260

/**

261

* Example custom enumerator that prioritizes newer files

262

*/

263

public class TimestampBasedEnumerator implements FileEnumerator {

264

private final long maxFileAge;

265

private final FileSystem.FileStatusFilter filter;

266

267

public TimestampBasedEnumerator(long maxFileAgeMillis) {

268

this.maxFileAge = maxFileAgeMillis;

269

this.filter = new DefaultFileFilter();

270

}

271

272

@Override

273

public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)

274

throws IOException {

275

long currentTime = System.currentTimeMillis();

276

List<FileSourceSplit> splits = new ArrayList<>();

277

278

for (Path path : paths) {

279

FileSystem fs = path.getFileSystem();

280

FileStatus[] statuses = fs.listStatus(path, filter);

281

282

// Sort by modification time (newest first)

283

Arrays.sort(statuses, (a, b) -> Long.compare(b.getModificationTime(), a.getModificationTime()));

284

285

for (FileStatus status : statuses) {

286

if (currentTime - status.getModificationTime() <= maxFileAge) {

287

FileSourceSplit split = new FileSourceSplit(

288

status.getPath().toString(),

289

status.getPath(),

290

0,

291

status.getLen(),

292

status.getModificationTime(),

293

status.getLen()

294

);

295

splits.add(split);

296

}

297

}

298

}

299

300

return splits;

301

}

302

303

public static class Provider implements FileEnumerator.Provider {

304

private final long maxFileAge;

305

306

public Provider(long maxFileAgeMillis) {

307

this.maxFileAge = maxFileAgeMillis;

308

}

309

310

@Override

311

public FileEnumerator create() {

312

return new TimestampBasedEnumerator(maxFileAge);

313

}

314

}

315

}

316

```

317

318

## Error Handling

319

320

File enumerators handle various error conditions during file discovery:

321

322

- **IOException**: File system access errors, permission denied

323

- **FileNotFoundException**: Specified paths do not exist

324

- **SecurityException**: Insufficient permissions for file access

325

326

```java

327

try {

328

FileEnumerator enumerator = new BlockSplittingRecursiveEnumerator();

329

Collection<FileSourceSplit> splits = enumerator.enumerateSplits(

330

new Path[]{new Path("/protected/path")}, 4);

331

} catch (IOException e) {

332

// Handle file system errors

333

} catch (SecurityException e) {

334

// Handle permission errors

335

}

336

```

337

338

## Performance Considerations

339

340

- Use `BlockSplittingRecursiveEnumerator` for large files on distributed file systems

341

- Use `NonSplittingRecursiveEnumerator` for small files or non-splittable formats

342

- Implement efficient file filters to reduce enumeration overhead

343

- Consider file system characteristics when choosing enumeration strategies

344

- Monitor enumeration performance for large directory structures

345

- Use appropriate `minDesiredSplits` values to balance parallelism and overhead

346

- Cache enumeration results where possible to avoid repeated file system calls