or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-formats.mdfile-compaction.mdfile-enumeration.mdfile-sinks.mdfile-sources.mdindex.mdsplit-assignment.mdstream-formats.md

file-sources.mddocs/

0

# File Sources

1

2

File sources provide unified reading capabilities for files in both batch and streaming modes, with support for various formats, continuous monitoring, and distributed processing.

3

4

## Capabilities

5

6

### FileSource Class

7

8

Main entry point for creating file sources that can read from distributed file systems.

9

10

```java { .api }

11

/**

12

* A unified data source that reads files - both in batch and in streaming mode.

13

* Supports all (distributed) file systems and object stores that can be accessed via

14

* the Flink's FileSystem class.

15

*/

16

@PublicEvolving

17

public final class FileSource<T> extends AbstractFileSource<T, FileSourceSplit>

18

implements DynamicParallelismInference {

19

20

/**

21

* Builds a new FileSource using a StreamFormat to read record-by-record from a

22

* file stream. When possible, stream-based formats are generally easier (preferable)

23

* to file-based formats, because they support better default behavior around I/O

24

* batching or progress tracking (checkpoints).

25

*/

26

public static <T> FileSourceBuilder<T> forRecordStreamFormat(

27

final StreamFormat<T> streamFormat, final Path... paths);

28

29

/**

30

* Builds a new FileSource using a BulkFormat to read batches of records from

31

* files. Examples for bulk readers are compressed and vectorized formats such as

32

* ORC or Parquet.

33

*/

34

public static <T> FileSourceBuilder<T> forBulkFileFormat(

35

final BulkFormat<T, FileSourceSplit> bulkFormat, final Path... paths);

36

37

/**

38

* The default split assigner, a lazy locality-aware assigner.

39

*/

40

public static final FileSplitAssigner.Provider DEFAULT_SPLIT_ASSIGNER;

41

42

/**

43

* The default file enumerator used for splittable formats.

44

*/

45

public static final FileEnumerator.Provider DEFAULT_SPLITTABLE_FILE_ENUMERATOR;

46

47

/**

48

* The default file enumerator used for non-splittable formats.

49

*/

50

public static final FileEnumerator.Provider DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR;

51

}

52

```

53

54

### FileSource.FileSourceBuilder

55

56

Builder pattern for configuring FileSource instances with various options.

57

58

```java { .api }

59

/**

60

* The builder for the FileSource, to configure the various behaviors.

61

*/

62

public static final class FileSourceBuilder<T>

63

extends AbstractFileSourceBuilder<T, FileSourceSplit, FileSourceBuilder<T>> {

64

65

/**

66

* Sets this source to streaming ("continuous monitoring") mode.

67

* This makes the source a "continuous streaming" source that keeps running, monitoring

68

* for new files, and reads these files when they appear and are discovered by the

69

* monitoring.

70

*/

71

public FileSourceBuilder<T> monitorContinuously(Duration discoveryInterval);

72

73

/**

74

* Sets this source to bounded (batch) mode.

75

* In this mode, the source processes the files that are under the given paths when the

76

* application is started. Once all files are processed, the source will finish.

77

*/

78

public FileSourceBuilder<T> processStaticFileSet();

79

80

/**

81

* Configures the FileEnumerator for the source. The File Enumerator is responsible

82

* for selecting from the input path the set of files that should be processed (and which to

83

* filter out). Furthermore, the File Enumerator may split the files further into

84

* sub-regions, to enable parallelization beyond the number of files.

85

*/

86

public FileSourceBuilder<T> setFileEnumerator(FileEnumerator.Provider fileEnumerator);

87

88

/**

89

* Configures the FileSplitAssigner for the source. The File Split Assigner

90

* determines which parallel reader instance gets which FileSourceSplit, and in

91

* which order these splits are assigned.

92

*/

93

public FileSourceBuilder<T> setSplitAssigner(FileSplitAssigner.Provider splitAssigner);

94

95

/**

96

* Creates the file source with the settings applied to this builder.

97

*/

98

public FileSource<T> build();

99

}

100

```

101

102

**Usage Examples:**

103

104

```java

105

import org.apache.flink.connector.file.src.FileSource;

106

import org.apache.flink.connector.file.src.reader.TextLineInputFormat;

107

import org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner;

108

import org.apache.flink.core.fs.Path;

109

import java.time.Duration;

110

111

// Basic file source for text files

112

FileSource<String> basicSource = FileSource

113

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/input"))

114

.build();

115

116

// Streaming source with continuous monitoring

117

FileSource<String> streamingSource = FileSource

118

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/stream"))

119

.monitorContinuously(Duration.ofSeconds(10))

120

.build();

121

122

// Source with custom split assignment for locality

123

FileSource<String> localityAwareSource = FileSource

124

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/distributed"))

125

.setSplitAssigner(LocalityAwareSplitAssigner.Provider.INSTANCE)

126

.build();

127

128

// Use with DataStream API

129

DataStreamSource<String> stream = env.fromSource(

130

streamingSource,

131

WatermarkStrategy.noWatermarks(),

132

"file-source"

133

);

134

```

135

136

### AbstractFileSource

137

138

Base class for file sources, providing common functionality and structure.

139

140

```java { .api }

141

/**

142

* The base class for File Sources. The main implementation to use is the FileSource, which

143

* also has the majority of the documentation.

144

*

145

* To read new formats, one commonly does NOT need to extend this class, but should implement a

146

* new Format Reader (like StreamFormat, BulkFormat) and use it with the FileSource.

147

*

148

* The only reason to extend this class is when a source needs a different type of split,

149

* meaning an extension of the FileSourceSplit to carry additional information.

150

*/

151

@PublicEvolving

152

public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit>

153

implements Source<T, SplitT, PendingSplitsCheckpoint<SplitT>>, ResultTypeQueryable<T> {

154

155

protected AbstractFileSource(

156

final Path[] inputPaths,

157

final FileEnumerator.Provider fileEnumerator,

158

final FileSplitAssigner.Provider splitAssigner,

159

final BulkFormat<T, SplitT> readerFormat,

160

@Nullable final ContinuousEnumerationSettings continuousEnumerationSettings);

161

162

/**

163

* Gets the enumerator factory for this source.

164

*/

165

protected FileEnumerator.Provider getEnumeratorFactory();

166

167

/**

168

* Gets the assigner factory for this source.

169

*/

170

public FileSplitAssigner.Provider getAssignerFactory();

171

172

/**

173

* Gets the continuous enumeration settings, or null if this source is bounded.

174

*/

175

@Nullable

176

public ContinuousEnumerationSettings getContinuousEnumerationSettings();

177

178

/**

179

* Gets the boundedness of this source - bounded for batch mode, continuous unbounded for streaming.

180

*/

181

@Override

182

public Boundedness getBoundedness();

183

184

/**

185

* Creates a new source reader for reading the file splits.

186

*/

187

@Override

188

public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext);

189

190

/**

191

* Creates a new split enumerator for discovering and assigning file splits.

192

*/

193

@Override

194

public SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createEnumerator(

195

SplitEnumeratorContext<SplitT> enumContext);

196

}

197

```

198

199

### File Source Integration

200

201

Integration points for using file sources in Flink applications.

202

203

```java { .api }

204

/**

205

* Creates a DataStream from a file source

206

* @param source The file source to read from

207

* @param watermarkStrategy Watermark strategy for event time processing

208

* @param sourceName Name for the source operator

209

* @return DataStream containing the source data

210

*/

211

public <T> DataStreamSource<T> fromSource(

212

Source<T, ?, ?> source,

213

WatermarkStrategy<T> watermarkStrategy,

214

String sourceName);

215

```

216

217

**Advanced Usage Examples:**

218

219

```java

220

// Reading multiple file paths

221

FileSource<String> multiPathSource = FileSource

222

.forRecordStreamFormat(new TextLineInputFormat(),

223

new Path("/data/logs/2023"),

224

new Path("/data/logs/2024"))

225

.build();

226

227

// Custom file enumeration with filtering

228

FileSource<String> filteredSource = FileSource

229

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))

230

.setFileEnumerator(() -> new BlockSplittingRecursiveEnumerator())

231

.build();

232

233

// Bulk format reading for Parquet files (example interface)

234

BulkFormat<RowData, FileSourceSplit> parquetFormat = /* implementation */;

235

FileSource<RowData> parquetSource = FileSource

236

.forBulkFileFormat(parquetFormat, new Path("/data/parquet"))

237

.build();

238

```

239

240

## Error Handling

241

242

File sources handle various error conditions during reading:

243

244

- **IOException**: File system access errors, network issues

245

- **IllegalArgumentException**: Invalid configuration or paths

246

- **RuntimeException**: Format-specific parsing errors during reading

247

248

```java

249

try {

250

FileSource<String> source = FileSource

251

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/invalid/path"))

252

.build();

253

} catch (IllegalArgumentException e) {

254

// Handle invalid path or configuration

255

}

256

```

257

258

## Performance Considerations

259

260

- Use `LocalityAwareSplitAssigner` for HDFS and other distributed file systems

261

- Configure appropriate discovery intervals for streaming to balance latency and resource usage

262

- Consider file sizes when choosing between StreamFormat and BulkFormat

263

- Monitor split assignment to ensure balanced processing across nodes