0
# Stream Formats
1
2
Stream formats provide record-by-record reading interfaces with automatic compression support for various file formats.
3
4
## Capabilities
5
6
### StreamFormat Interface
7
8
Core interface for implementing record-wise file reading with compression support.
9
10
```java { .api }
11
/**
12
* A reader format that reads individual records from a stream.
13
*
14
* The outer class StreamFormat acts mainly as a configuration holder and factory for the
15
* reader. The actual reading is done by the Reader, which is created based on
16
* an input stream in the createReader method and restored (from checkpointed positions)
17
* in the restoreReader method.
18
*
19
* Compared to the BulkFormat, the stream format handles a few things out-of-the-box,
20
* like deciding how to batch records or dealing with compression.
21
*/
22
@PublicEvolving
23
public interface StreamFormat<T> extends Serializable, ResultTypeQueryable<T> {
24
/**
25
* Creates a new reader to read in this format. This method is called when a fresh reader is
26
* created for a split that was assigned from the enumerator. This method may also be called on
27
* recovery from a checkpoint, if the reader never stored an offset in the checkpoint.
28
*
29
* If the format is splittable, then the stream is positioned to the beginning of the file split,
30
* otherwise it will be at position zero.
31
*
32
* The fileLen is the length of the entire file, while splitEnd is the offset
33
* of the first byte after the split end boundary (exclusive end boundary). For non-splittable
34
* formats, both values are identical.
35
*/
36
Reader<T> createReader(
37
Configuration config, FSDataInputStream stream, long fileLen, long splitEnd)
38
throws IOException;
39
40
/**
41
* Restores a reader from a checkpointed position. This method is called when the reader is
42
* recovered from a checkpoint and the reader has previously stored an offset into the
43
* checkpoint, by returning from the Reader.getCheckpointedPosition() a value with
44
* non-negative offset. That value is supplied as the restoredOffset.
45
*
46
* If the format is splittable, then the stream is positioned to the beginning of the file split,
47
* otherwise it will be at position zero. The stream is NOT positioned to the checkpointed offset,
48
* because the format is free to interpret this offset in a different way than the byte offset in the file.
49
*/
50
Reader<T> restoreReader(
51
Configuration config,
52
FSDataInputStream stream,
53
long restoredOffset,
54
long fileLen,
55
long splitEnd)
56
throws IOException;
57
58
/**
59
* Checks whether this format is splittable. Splittable formats allow Flink to create multiple
60
* splits per file, so that Flink can read multiple regions of the file concurrently.
61
*/
62
boolean isSplittable();
63
64
/**
65
* Gets the type produced by this format. This type will be the type produced by the file source
66
* as a whole.
67
*/
68
@Override
69
TypeInformation<T> getProducedType();
70
71
/**
72
* The config option to define how many bytes to be read by the I/O thread in one fetch
73
* operation.
74
*/
75
ConfigOption<MemorySize> FETCH_IO_SIZE =
76
ConfigOptions.key("source.file.stream.io-fetch-size")
77
.memoryType()
78
.defaultValue(MemorySize.ofMebiBytes(1L))
79
.withDescription(
80
"The approximate of bytes per fetch that is passed from the I/O thread to file reader.");
81
}
82
```
83
84
### StreamFormat.Reader Interface
85
86
Nested interface for reading individual records from a stream.
87
88
```java { .api }
89
/**
90
* The actual reader that reads the records.
91
*/
92
@PublicEvolving
93
public interface Reader<T> extends Closeable {
94
95
/**
96
* Reads the next record. Returns null when the input has reached its end.
97
*/
98
@Nullable
99
T read() throws IOException;
100
101
/**
102
* Closes the reader to release all resources.
103
*/
104
@Override
105
void close() throws IOException;
106
107
/**
108
* Optionally returns the current position of the reader. This can be implemented by readers
109
* that want to speed up recovery from a checkpoint.
110
*
111
* The current position of the reader is the position of the next record that will be
112
* returned in a call to read(). This can be implemented by readers that want to
113
* speed up recovery from a checkpoint.
114
*/
115
@Nullable
116
default CheckpointedPosition getCheckpointedPosition() {
117
return null;
118
}
119
}
120
```
121
122
### SimpleStreamFormat
123
124
Abstract base class for non-splittable stream formats.
125
126
```java { .api }
127
/**
128
* Simplified stream format for non-splittable files
129
*/
130
public abstract class SimpleStreamFormat<T> implements StreamFormat<T> {
131
/**
132
* Creates a reader for the entire file stream (simplified interface)
133
* @param config Configuration for the reader
134
* @param stream Input stream to read from
135
* @return Reader instance for reading records
136
* @throws IOException If reader creation fails
137
*/
138
public abstract Reader<T> createReader(Configuration config, FSDataInputStream stream)
139
throws IOException;
140
141
/**
142
* Always returns false for simple formats
143
* @return false (simple formats are not splittable)
144
*/
145
public final boolean isSplittable() {
146
return false;
147
}
148
}
149
```
150
151
### TextLineInputFormat
152
153
Built-in implementation for reading text files line by line.
154
155
```java { .api }
156
/**
157
* Stream format for reading text files line by line with charset support
158
*/
159
public class TextLineInputFormat extends SimpleStreamFormat<String> {
160
/**
161
* Creates TextLineInputFormat with UTF-8 encoding
162
*/
163
public TextLineInputFormat();
164
165
/**
166
* Creates TextLineInputFormat with specified charset
167
* @param charsetName Name of charset to use for decoding
168
*/
169
public TextLineInputFormat(String charsetName);
170
171
/**
172
* Creates a reader for reading text lines
173
* @param config Configuration for the reader
174
* @param stream Input stream to read from
175
* @return Reader that returns String lines
176
* @throws IOException If reader creation fails
177
*/
178
public Reader<String> createReader(Configuration config, FSDataInputStream stream)
179
throws IOException;
180
181
/**
182
* Returns TypeInformation for String output
183
* @return TypeInformation describing String type
184
*/
185
public TypeInformation<String> getProducedType() {
186
return Types.STRING;
187
}
188
}
189
```
190
191
**Usage Examples:**
192
193
```java
194
import org.apache.flink.connector.file.src.FileSource;
195
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
196
import org.apache.flink.connector.file.src.reader.StreamFormat;
197
import org.apache.flink.core.fs.Path;
198
199
// Reading text files with UTF-8 encoding
200
FileSource<String> textSource = FileSource
201
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/logs"))
202
.build();
203
204
// Reading text files with custom encoding
205
FileSource<String> customEncodingSource = FileSource
206
.forRecordStreamFormat(new TextLineInputFormat("ISO-8859-1"), new Path("/data/legacy"))
207
.build();
208
209
// Using the source in a Flink job
210
DataStream<String> lines = env.fromSource(textSource, WatermarkStrategy.noWatermarks(), "text-source");
211
```
212
213
### Custom StreamFormat Implementation
214
215
Example of implementing a custom stream format.
216
217
```java { .api }
218
/**
219
* Example custom stream format for reading CSV records
220
*/
221
public class CsvStreamFormat implements StreamFormat<String[]> {
222
private final String delimiter;
223
224
public CsvStreamFormat(String delimiter) {
225
this.delimiter = delimiter;
226
}
227
228
@Override
229
public Reader<String[]> createReader(
230
Configuration config,
231
FSDataInputStream stream,
232
long fileLen,
233
long splitEnd) throws IOException {
234
return new CsvReader(stream, splitEnd, delimiter);
235
}
236
237
@Override
238
public Reader<String[]> restoreReader(
239
Configuration config,
240
FSDataInputStream stream,
241
long fileLen,
242
long splitEnd,
243
long checkpointedOffset) throws IOException {
244
stream.seek(checkpointedOffset);
245
return new CsvReader(stream, splitEnd, delimiter);
246
}
247
248
@Override
249
public boolean isSplittable() {
250
return true; // CSV can be split at line boundaries
251
}
252
253
@Override
254
public TypeInformation<String[]> getProducedType() {
255
return Types.OBJECT_ARRAY(Types.STRING);
256
}
257
258
private static class CsvReader implements StreamFormat.Reader<String[]> {
259
private final BufferedReader reader;
260
private final long splitEnd;
261
private final String delimiter;
262
private long bytesRead = 0;
263
264
public CsvReader(FSDataInputStream stream, long splitEnd, String delimiter) {
265
this.reader = new BufferedReader(new InputStreamReader(stream));
266
this.splitEnd = splitEnd;
267
this.delimiter = delimiter;
268
}
269
270
@Override
271
public String[] read() throws IOException {
272
if (bytesRead >= splitEnd) {
273
return null;
274
}
275
276
String line = reader.readLine();
277
if (line == null) {
278
return null;
279
}
280
281
bytesRead += line.getBytes().length + 1; // +1 for newline
282
return line.split(delimiter);
283
}
284
}
285
}
286
```
287
288
### Compression Support Integration
289
290
Stream formats automatically support compression through the compression detection system.
291
292
```java { .api }
293
/**
294
* Stream formats automatically detect and handle compressed files
295
* Supported extensions: .gz, .gzip, .bz2, .xz, .deflate
296
*/
297
298
// Reading compressed text files - compression is handled automatically
299
FileSource<String> compressedSource = FileSource
300
.forRecordStreamFormat(new TextLineInputFormat(),
301
new Path("/data/logs.gz"),
302
new Path("/data/archive.bz2"))
303
.build();
304
305
// Custom format with compression support
306
FileSource<String[]> compressedCsvSource = FileSource
307
.forRecordStreamFormat(new CsvStreamFormat(","), new Path("/data/data.csv.gz"))
308
.build();
309
```
310
311
## Error Handling
312
313
Stream formats handle various error conditions during reading:
314
315
- **IOException**: File system read errors, stream corruption
316
- **UnsupportedEncodingException**: Invalid charset specifications
317
- **EOFException**: Unexpected end of file during reading
318
- **RuntimeException**: Format-specific parsing errors
319
320
```java
321
try {
322
StreamFormat<String> format = new TextLineInputFormat("INVALID-CHARSET");
323
} catch (UnsupportedEncodingException e) {
324
// Handle invalid charset
325
}
326
327
// Reader error handling
328
StreamFormat.Reader<String> reader = format.createReader(config, stream, fileLen, splitEnd);
329
try {
330
String record;
331
while ((record = reader.read()) != null) {
332
// Process record
333
}
334
} catch (IOException e) {
335
// Handle read errors
336
}
337
```
338
339
## Performance Considerations
340
341
- Implement `isSplittable()` correctly - splittable formats can be processed in parallel
342
- Use appropriate buffer sizes in custom readers for optimal I/O performance
343
- Consider memory usage when reading large records or implementing custom formats
344
- Compression detection adds minimal overhead and improves storage efficiency
345
- For high-throughput scenarios, consider BulkFormat instead of StreamFormat
346
- Implement proper checkpointing support for exactly-once processing guarantees