0
# File Sources
1
2
File sources provide unified reading capabilities for files in both batch and streaming modes, with support for various formats, continuous monitoring, and distributed processing.
3
4
## Capabilities
5
6
### FileSource Class
7
8
Main entry point for creating file sources that can read from distributed file systems.
9
10
```java { .api }
11
/**
12
* A unified data source that reads files - both in batch and in streaming mode.
13
* Supports all (distributed) file systems and object stores that can be accessed via
14
* the Flink's FileSystem class.
15
*/
16
@PublicEvolving
17
public final class FileSource<T> extends AbstractFileSource<T, FileSourceSplit>
18
implements DynamicParallelismInference {
19
20
/**
21
* Builds a new FileSource using a StreamFormat to read record-by-record from a
22
* file stream. When possible, stream-based formats are generally easier (preferable)
23
* to file-based formats, because they support better default behavior around I/O
24
* batching or progress tracking (checkpoints).
25
*/
26
public static <T> FileSourceBuilder<T> forRecordStreamFormat(
27
final StreamFormat<T> streamFormat, final Path... paths);
28
29
/**
30
* Builds a new FileSource using a BulkFormat to read batches of records from
31
* files. Examples for bulk readers are compressed and vectorized formats such as
32
* ORC or Parquet.
33
*/
34
public static <T> FileSourceBuilder<T> forBulkFileFormat(
35
final BulkFormat<T, FileSourceSplit> bulkFormat, final Path... paths);
36
37
/**
38
* The default split assigner, a lazy locality-aware assigner.
39
*/
40
public static final FileSplitAssigner.Provider DEFAULT_SPLIT_ASSIGNER;
41
42
/**
43
* The default file enumerator used for splittable formats.
44
*/
45
public static final FileEnumerator.Provider DEFAULT_SPLITTABLE_FILE_ENUMERATOR;
46
47
/**
48
* The default file enumerator used for non-splittable formats.
49
*/
50
public static final FileEnumerator.Provider DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR;
51
}
52
```
53
54
### FileSource.FileSourceBuilder
55
56
Builder pattern for configuring FileSource instances with various options.
57
58
```java { .api }
59
/**
60
* The builder for the FileSource, to configure the various behaviors.
61
*/
62
public static final class FileSourceBuilder<T>
63
extends AbstractFileSourceBuilder<T, FileSourceSplit, FileSourceBuilder<T>> {
64
65
/**
66
* Sets this source to streaming ("continuous monitoring") mode.
67
* This makes the source a "continuous streaming" source that keeps running, monitoring
68
* for new files, and reads these files when they appear and are discovered by the
69
* monitoring.
70
*/
71
public FileSourceBuilder<T> monitorContinuously(Duration discoveryInterval);
72
73
/**
74
* Sets this source to bounded (batch) mode.
75
* In this mode, the source processes the files that are under the given paths when the
76
* application is started. Once all files are processed, the source will finish.
77
*/
78
public FileSourceBuilder<T> processStaticFileSet();
79
80
/**
81
* Configures the FileEnumerator for the source. The File Enumerator is responsible
82
* for selecting from the input path the set of files that should be processed (and which to
83
* filter out). Furthermore, the File Enumerator may split the files further into
84
* sub-regions, to enable parallelization beyond the number of files.
85
*/
86
public FileSourceBuilder<T> setFileEnumerator(FileEnumerator.Provider fileEnumerator);
87
88
/**
89
* Configures the FileSplitAssigner for the source. The File Split Assigner
90
* determines which parallel reader instance gets which FileSourceSplit, and in
91
* which order these splits are assigned.
92
*/
93
public FileSourceBuilder<T> setSplitAssigner(FileSplitAssigner.Provider splitAssigner);
94
95
/**
96
* Creates the file source with the settings applied to this builder.
97
*/
98
public FileSource<T> build();
99
}
100
```
101
102
**Usage Examples:**
103
104
```java
105
import org.apache.flink.connector.file.src.FileSource;
106
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
107
import org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner;
108
import org.apache.flink.core.fs.Path;
109
import java.time.Duration;
110
111
// Basic file source for text files
112
FileSource<String> basicSource = FileSource
113
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/input"))
114
.build();
115
116
// Streaming source with continuous monitoring
117
FileSource<String> streamingSource = FileSource
118
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/stream"))
119
.monitorContinuously(Duration.ofSeconds(10))
120
.build();
121
122
// Source with custom split assignment for locality
123
FileSource<String> localityAwareSource = FileSource
124
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/distributed"))
125
.setSplitAssigner(LocalityAwareSplitAssigner.Provider.INSTANCE)
126
.build();
127
128
// Use with DataStream API
129
DataStreamSource<String> stream = env.fromSource(
130
streamingSource,
131
WatermarkStrategy.noWatermarks(),
132
"file-source"
133
);
134
```
135
136
### AbstractFileSource
137
138
Base class for file sources, providing common functionality and structure.
139
140
```java { .api }
141
/**
142
* The base class for File Sources. The main implementation to use is the FileSource, which
143
* also has the majority of the documentation.
144
*
145
* To read new formats, one commonly does NOT need to extend this class, but should implement a
146
* new Format Reader (like StreamFormat, BulkFormat) and use it with the FileSource.
147
*
148
* The only reason to extend this class is when a source needs a different type of split,
149
* meaning an extension of the FileSourceSplit to carry additional information.
150
*/
151
@PublicEvolving
152
public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit>
153
implements Source<T, SplitT, PendingSplitsCheckpoint<SplitT>>, ResultTypeQueryable<T> {
154
155
protected AbstractFileSource(
156
final Path[] inputPaths,
157
final FileEnumerator.Provider fileEnumerator,
158
final FileSplitAssigner.Provider splitAssigner,
159
final BulkFormat<T, SplitT> readerFormat,
160
@Nullable final ContinuousEnumerationSettings continuousEnumerationSettings);
161
162
/**
163
* Gets the enumerator factory for this source.
164
*/
165
protected FileEnumerator.Provider getEnumeratorFactory();
166
167
/**
168
* Gets the assigner factory for this source.
169
*/
170
public FileSplitAssigner.Provider getAssignerFactory();
171
172
/**
173
* Gets the continuous enumeration settings, or null if this source is bounded.
174
*/
175
@Nullable
176
public ContinuousEnumerationSettings getContinuousEnumerationSettings();
177
178
/**
179
* Gets the boundedness of this source - bounded for batch mode, continuous unbounded for streaming.
180
*/
181
@Override
182
public Boundedness getBoundedness();
183
184
/**
185
* Creates a new source reader for reading the file splits.
186
*/
187
@Override
188
public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext);
189
190
/**
191
* Creates a new split enumerator for discovering and assigning file splits.
192
*/
193
@Override
194
public SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createEnumerator(
195
SplitEnumeratorContext<SplitT> enumContext);
196
}
197
```
198
199
### File Source Integration
200
201
Integration points for using file sources in Flink applications.
202
203
```java { .api }
204
/**
205
* Creates a DataStream from a file source
206
* @param source The file source to read from
207
* @param watermarkStrategy Watermark strategy for event time processing
208
* @param sourceName Name for the source operator
209
* @return DataStream containing the source data
210
*/
211
public <T> DataStreamSource<T> fromSource(
212
Source<T, ?, ?> source,
213
WatermarkStrategy<T> watermarkStrategy,
214
String sourceName);
215
```
216
217
**Advanced Usage Examples:**
218
219
```java
220
// Reading multiple file paths
221
FileSource<String> multiPathSource = FileSource
222
.forRecordStreamFormat(new TextLineInputFormat(),
223
new Path("/data/logs/2023"),
224
new Path("/data/logs/2024"))
225
.build();
226
227
// Custom file enumeration with filtering
228
FileSource<String> filteredSource = FileSource
229
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
230
.setFileEnumerator(() -> new BlockSplittingRecursiveEnumerator())
231
.build();
232
233
// Bulk format reading for Parquet files (example interface)
234
BulkFormat<RowData, FileSourceSplit> parquetFormat = /* implementation */;
235
FileSource<RowData> parquetSource = FileSource
236
.forBulkFileFormat(parquetFormat, new Path("/data/parquet"))
237
.build();
238
```
239
240
## Error Handling
241
242
File sources handle various error conditions during reading:
243
244
- **IOException**: File system access errors, network issues
245
- **IllegalArgumentException**: Invalid configuration or paths
246
- **RuntimeException**: Format-specific parsing errors during reading
247
248
```java
249
try {
250
FileSource<String> source = FileSource
251
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/invalid/path"))
252
.build();
253
} catch (IllegalArgumentException e) {
254
// Handle invalid path or configuration
255
}
256
```
257
258
## Performance Considerations
259
260
- Use `LocalityAwareSplitAssigner` for HDFS and other distributed file systems
261
- Configure appropriate discovery intervals for streaming to balance latency and resource usage
262
- Consider file sizes when choosing between StreamFormat and BulkFormat
263
- Monitor split assignment to ensure balanced processing across nodes