0
# Source Reader Framework
1
2
The Source Reader Framework provides a sophisticated foundation for building custom Flink source readers with automatic split management, coordination between threads, and comprehensive state handling. It supports both single-threaded and multi-threaded split reading patterns.
3
4
## Core Components
5
6
### SourceReaderBase
7
8
The foundation class for all source reader implementations.
9
10
```java { .api }
11
@PublicEvolving
12
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
13
implements SourceReader<T, SplitT> {
14
15
// Constructors
16
public SourceReaderBase(
17
SplitFetcherManager<E, SplitT> splitFetcherManager,
18
RecordEmitter<E, T, SplitStateT> recordEmitter,
19
Configuration config,
20
SourceReaderContext context)
21
22
public SourceReaderBase(
23
SplitFetcherManager<E, SplitT> splitFetcherManager,
24
RecordEmitter<E, T, SplitStateT> recordEmitter,
25
RecordEvaluator<T> eofRecordEvaluator,
26
Configuration config,
27
SourceReaderContext context)
28
29
// Public interface methods
30
public void start()
31
public InputStatus pollNext(ReaderOutput<T> output) throws Exception
32
public CompletableFuture<Void> isAvailable()
33
public List<SplitT> snapshotState(long checkpointId)
34
public void addSplits(List<SplitT> splits)
35
public void notifyNoMoreSplits()
36
public void handleSourceEvents(SourceEvent sourceEvent)
37
public void pauseOrResumeSplits(Collection<String> splitsToPause, Collection<String> splitsToResume)
38
public void close() throws Exception
39
public int getNumberOfCurrentlyAssignedSplits()
40
41
// Abstract methods to implement
42
protected abstract void onSplitFinished(Map<String, SplitStateT> finishedSplitIds)
43
protected abstract SplitStateT initializedState(SplitT split)
44
protected abstract SplitT toSplitType(String splitId, SplitStateT splitState)
45
}
46
```
47
48
### SingleThreadMultiplexSourceReaderBase
49
50
Specialized reader for sources that use a single thread with one SplitReader.
51
52
```java { .api }
53
@PublicEvolving
54
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
55
extends SourceReaderBase<E, T, SplitT, SplitStateT> {
56
57
// Constructors
58
public SingleThreadMultiplexSourceReaderBase(
59
Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
60
RecordEmitter<E, T, SplitStateT> recordEmitter,
61
Configuration config,
62
SourceReaderContext context)
63
64
public SingleThreadMultiplexSourceReaderBase(
65
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
66
RecordEmitter<E, T, SplitStateT> recordEmitter,
67
Configuration config,
68
SourceReaderContext context)
69
70
public SingleThreadMultiplexSourceReaderBase(
71
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
72
RecordEmitter<E, T, SplitStateT> recordEmitter,
73
RecordEvaluator<T> eofRecordEvaluator,
74
Configuration config,
75
SourceReaderContext context)
76
}
77
```
78
79
### SplitReader
80
81
Interface for reading records from splits.
82
83
```java { .api }
84
@PublicEvolving
85
public interface SplitReader<E, SplitT> extends AutoCloseable {
86
RecordsWithSplitIds<E> fetch() throws IOException
87
void handleSplitsChanges(SplitsChange<SplitT> splitsChanges)
88
void wakeUp()
89
void pauseOrResumeSplits(Collection<SplitT> splitsToPause, Collection<SplitT> splitsToResume)
90
void close() throws Exception
91
}
92
```
93
94
### RecordEmitter
95
96
Processes records from the split reader and emits them to the output.
97
98
```java { .api }
99
@PublicEvolving
100
public interface RecordEmitter<E, T, SplitStateT> {
101
void emitRecord(E element, SourceOutput<T> output, SplitStateT splitState) throws Exception
102
}
103
```
104
105
### RecordEvaluator
106
107
Evaluates records to determine if they represent end-of-stream markers.
108
109
```java { .api }
110
@PublicEvolving
111
public interface RecordEvaluator<T> {
112
boolean isEndOfStream(T record)
113
}
114
```
115
116
## Implementation Examples
117
118
### Complete File Source Reader
119
120
```java
121
public class CustomFileSourceReader extends SingleThreadMultiplexSourceReaderBase<
122
FileRecord, String, FileSourceSplit, FileSourceSplitState> {
123
124
public CustomFileSourceReader(
125
Configuration config,
126
SourceReaderContext context) {
127
super(
128
() -> new FileSystemSplitReader(config), // SplitReader supplier
129
new FileRecordEmitter(), // Record emitter
130
config,
131
context
132
);
133
}
134
135
@Override
136
protected void onSplitFinished(Map<String, FileSourceSplitState> finishedSplitIds) {
137
// Cleanup resources for finished splits
138
for (Map.Entry<String, FileSourceSplitState> entry : finishedSplitIds.entrySet()) {
139
String splitId = entry.getKey();
140
FileSourceSplitState splitState = entry.getValue();
141
142
LOG.info("Split {} finished at position {}", splitId, splitState.getOffset());
143
144
// Close any split-specific resources
145
splitState.cleanup();
146
}
147
}
148
149
@Override
150
protected FileSourceSplitState initializedState(FileSourceSplit split) {
151
return new FileSourceSplitState(
152
split.path(),
153
split.offset(),
154
split.length()
155
);
156
}
157
158
@Override
159
protected FileSourceSplit toSplitType(String splitId, FileSourceSplitState splitState) {
160
return new FileSourceSplit(
161
splitId,
162
splitState.getPath(),
163
splitState.getOffset(),
164
splitState.getLength()
165
);
166
}
167
}
168
169
// Split implementation
170
public class FileSourceSplit implements SourceSplit {
171
private final String splitId;
172
private final Path path;
173
private final long offset;
174
private final long length;
175
176
public FileSourceSplit(String splitId, Path path, long offset, long length) {
177
this.splitId = splitId;
178
this.path = path;
179
this.offset = offset;
180
this.length = length;
181
}
182
183
@Override
184
public String splitId() {
185
return splitId;
186
}
187
188
public Path path() { return path; }
189
public long offset() { return offset; }
190
public long length() { return length; }
191
}
192
193
// Split state implementation
194
public class FileSourceSplitState {
195
private Path path;
196
private long offset;
197
private long length;
198
private BufferedReader reader;
199
200
public FileSourceSplitState(Path path, long offset, long length) {
201
this.path = path;
202
this.offset = offset;
203
this.length = length;
204
}
205
206
public Path getPath() { return path; }
207
public long getOffset() { return offset; }
208
public void setOffset(long offset) { this.offset = offset; }
209
public long getLength() { return length; }
210
211
public BufferedReader getReader() { return reader; }
212
public void setReader(BufferedReader reader) { this.reader = reader; }
213
214
public void cleanup() {
215
if (reader != null) {
216
try {
217
reader.close();
218
} catch (IOException e) {
219
LOG.warn("Failed to close reader for split", e);
220
}
221
}
222
}
223
}
224
```
225
226
### SplitReader Implementation
227
228
```java
229
public class FileSystemSplitReader implements SplitReader<FileRecord, FileSourceSplit> {
230
private final Configuration config;
231
private final Map<String, FileReaderState> splitReaders;
232
private final Set<String> pausedSplits;
233
234
public FileSystemSplitReader(Configuration config) {
235
this.config = config;
236
this.splitReaders = new HashMap<>();
237
this.pausedSplits = new HashSet<>();
238
}
239
240
@Override
241
public RecordsWithSplitIds<FileRecord> fetch() throws IOException {
242
Map<String, Collection<FileRecord>> recordsBySplit = new HashMap<>();
243
Set<String> finishedSplits = new HashSet<>();
244
245
for (Map.Entry<String, FileReaderState> entry : splitReaders.entrySet()) {
246
String splitId = entry.getKey();
247
248
// Skip paused splits
249
if (pausedSplits.contains(splitId)) {
250
continue;
251
}
252
253
FileReaderState readerState = entry.getValue();
254
List<FileRecord> records = new ArrayList<>();
255
256
try {
257
// Read batch of records from this split
258
for (int i = 0; i < 100 && readerState.hasMore(); i++) {
259
String line = readerState.readLine();
260
if (line != null) {
261
records.add(new FileRecord(
262
splitId,
263
line,
264
readerState.getCurrentOffset(),
265
System.currentTimeMillis()
266
));
267
} else {
268
// End of split reached
269
finishedSplits.add(splitId);
270
break;
271
}
272
}
273
274
if (!records.isEmpty()) {
275
recordsBySplit.put(splitId, records);
276
}
277
} catch (IOException e) {
278
LOG.error("Error reading from split {}", splitId, e);
279
throw e;
280
}
281
}
282
283
// Clean up finished splits
284
for (String finishedSplit : finishedSplits) {
285
FileReaderState readerState = splitReaders.remove(finishedSplit);
286
if (readerState != null) {
287
readerState.close();
288
}
289
}
290
291
return RecordsBySplits.forRecords(recordsBySplit, finishedSplits);
292
}
293
294
@Override
295
public void handleSplitsChanges(SplitsChange<FileSourceSplit> splitsChanges) {
296
if (splitsChanges instanceof SplitsAddition) {
297
SplitsAddition<FileSourceSplit> addition = (SplitsAddition<FileSourceSplit>) splitsChanges;
298
for (FileSourceSplit split : addition.splits()) {
299
try {
300
FileReaderState readerState = new FileReaderState(
301
split.path(),
302
split.offset(),
303
split.length()
304
);
305
splitReaders.put(split.splitId(), readerState);
306
LOG.info("Added split {} for file {}", split.splitId(), split.path());
307
} catch (IOException e) {
308
LOG.error("Failed to open split {} for file {}", split.splitId(), split.path(), e);
309
}
310
}
311
} else if (splitsChanges instanceof SplitsRemoval) {
312
SplitsRemoval<FileSourceSplit> removal = (SplitsRemoval<FileSourceSplit>) splitsChanges;
313
for (String splitId : removal.splitIds()) {
314
FileReaderState readerState = splitReaders.remove(splitId);
315
if (readerState != null) {
316
readerState.close();
317
LOG.info("Removed split {}", splitId);
318
}
319
}
320
}
321
}
322
323
@Override
324
public void wakeUp() {
325
// Interrupt any blocking reads if needed
326
}
327
328
@Override
329
public void pauseOrResumeSplits(
330
Collection<FileSourceSplit> splitsToPause,
331
Collection<FileSourceSplit> splitsToResume) {
332
333
// Pause splits
334
for (FileSourceSplit split : splitsToPause) {
335
pausedSplits.add(split.splitId());
336
LOG.info("Paused split {}", split.splitId());
337
}
338
339
// Resume splits
340
for (FileSourceSplit split : splitsToResume) {
341
pausedSplits.remove(split.splitId());
342
LOG.info("Resumed split {}", split.splitId());
343
}
344
}
345
346
@Override
347
public void close() throws Exception {
348
// Close all split readers
349
for (FileReaderState readerState : splitReaders.values()) {
350
readerState.close();
351
}
352
splitReaders.clear();
353
}
354
}
355
356
// Helper class for managing file reading state
357
public class FileReaderState {
358
private final Path path;
359
private final long startOffset;
360
private final long length;
361
private BufferedReader reader;
362
private long currentOffset;
363
364
public FileReaderState(Path path, long startOffset, long length) throws IOException {
365
this.path = path;
366
this.startOffset = startOffset;
367
this.length = length;
368
this.currentOffset = startOffset;
369
370
// Open file and skip to start offset
371
FileInputStream fis = new FileInputStream(path.toFile());
372
fis.skip(startOffset);
373
this.reader = new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8));
374
}
375
376
public String readLine() throws IOException {
377
if (currentOffset >= startOffset + length) {
378
return null; // End of split
379
}
380
381
String line = reader.readLine();
382
if (line != null) {
383
currentOffset += line.getBytes(StandardCharsets.UTF_8).length + 1; // +1 for newline
384
}
385
return line;
386
}
387
388
public boolean hasMore() {
389
return currentOffset < startOffset + length;
390
}
391
392
public long getCurrentOffset() {
393
return currentOffset;
394
}
395
396
public void close() {
397
if (reader != null) {
398
try {
399
reader.close();
400
} catch (IOException e) {
401
LOG.warn("Error closing file reader for {}", path, e);
402
}
403
}
404
}
405
}
406
```
407
408
### RecordEmitter Implementation
409
410
```java
411
public class FileRecordEmitter implements RecordEmitter<FileRecord, String, FileSourceSplitState> {
412
413
@Override
414
public void emitRecord(
415
FileRecord element,
416
SourceOutput<String> output,
417
FileSourceSplitState splitState) throws Exception {
418
419
// Update split state with current offset
420
splitState.setOffset(element.getOffset());
421
422
// Extract the actual data and emit
423
String record = element.getData();
424
425
// Emit with timestamp if available
426
if (element.getTimestamp() > 0) {
427
output.collect(record, element.getTimestamp());
428
} else {
429
output.collect(record);
430
}
431
}
432
}
433
434
// Record type for file data
435
public class FileRecord {
436
private final String splitId;
437
private final String data;
438
private final long offset;
439
private final long timestamp;
440
441
public FileRecord(String splitId, String data, long offset, long timestamp) {
442
this.splitId = splitId;
443
this.data = data;
444
this.offset = offset;
445
this.timestamp = timestamp;
446
}
447
448
public String getSplitId() { return splitId; }
449
public String getData() { return data; }
450
public long getOffset() { return offset; }
451
public long getTimestamp() { return timestamp; }
452
}
453
```
454
455
### Advanced Source Reader with Watermarks
456
457
```java
458
public class WatermarkingFileSourceReader extends SingleThreadMultiplexSourceReaderBase<
459
FileRecord, String, FileSourceSplit, FileSourceSplitState> {
460
461
private final WatermarkStrategy<String> watermarkStrategy;
462
private final Map<String, WatermarkOutput> splitWatermarkOutputs;
463
464
public WatermarkingFileSourceReader(
465
Configuration config,
466
SourceReaderContext context,
467
WatermarkStrategy<String> watermarkStrategy) {
468
super(
469
() -> new FileSystemSplitReader(config),
470
new WatermarkingFileRecordEmitter(watermarkStrategy),
471
config,
472
context
473
);
474
this.watermarkStrategy = watermarkStrategy;
475
this.splitWatermarkOutputs = new HashMap<>();
476
}
477
478
// ... other methods same as before ...
479
}
480
481
public class WatermarkingFileRecordEmitter
482
implements RecordEmitter<FileRecord, String, FileSourceSplitState> {
483
484
private final WatermarkStrategy<String> watermarkStrategy;
485
private final Map<String, WatermarkGenerator<String>> watermarkGenerators;
486
private final Map<String, TimestampAssigner<String>> timestampAssigners;
487
488
public WatermarkingFileRecordEmitter(WatermarkStrategy<String> watermarkStrategy) {
489
this.watermarkStrategy = watermarkStrategy;
490
this.watermarkGenerators = new HashMap<>();
491
this.timestampAssigners = new HashMap<>();
492
}
493
494
@Override
495
public void emitRecord(
496
FileRecord element,
497
SourceOutput<String> output,
498
FileSourceSplitState splitState) throws Exception {
499
500
String splitId = element.getSplitId();
501
String record = element.getData();
502
503
// Update split state
504
splitState.setOffset(element.getOffset());
505
506
// Get or create watermark generator for this split
507
WatermarkGenerator<String> watermarkGenerator = watermarkGenerators.computeIfAbsent(
508
splitId,
509
k -> watermarkStrategy.createWatermarkGenerator(() -> new WatermarkGeneratorSupplier.Context() {
510
@Override
511
public MetricGroup getMetricGroup() {
512
return new UnregisteredMetricsGroup();
513
}
514
515
@Override
516
public ProcessingTimeService getProcessingTimeService() {
517
return new TestProcessingTimeService();
518
}
519
})
520
);
521
522
// Get or create timestamp assigner
523
TimestampAssigner<String> timestampAssigner = timestampAssigners.computeIfAbsent(
524
splitId,
525
k -> watermarkStrategy.createTimestampAssigner(() -> new TimestampAssignerSupplier.Context() {
526
@Override
527
public MetricGroup getMetricGroup() {
528
return new UnregisteredMetricsGroup();
529
}
530
531
@Override
532
public ProcessingTimeService getProcessingTimeService() {
533
return new TestProcessingTimeService();
534
}
535
})
536
);
537
538
// Assign timestamp
539
long timestamp = timestampAssigner.extractTimestamp(record, element.getTimestamp());
540
541
// Update watermark generator
542
watermarkGenerator.onEvent(record, timestamp, new WatermarkOutput() {
543
@Override
544
public void emitWatermark(Watermark watermark) {
545
output.emitWatermark(watermark);
546
}
547
548
@Override
549
public void markIdle() {
550
output.markIdle();
551
}
552
553
@Override
554
public void markActive() {
555
output.markActive();
556
}
557
});
558
559
// Emit record with timestamp
560
output.collect(record, timestamp);
561
}
562
}
563
```
564
565
## Configuration and Options
566
567
### SourceReaderOptions
568
569
```java
570
// Available configuration options
571
public static final ConfigOption<Integer> ELEMENT_QUEUE_CAPACITY =
572
ConfigOptions.key("source.reader.element-queue-capacity")
573
.intType()
574
.defaultValue(1000)
575
.withDescription("The capacity of the element queue in the source reader.");
576
577
public static final ConfigOption<Duration> SOURCE_READER_CLOSE_TIMEOUT =
578
ConfigOptions.key("source.reader.close-timeout")
579
.durationType()
580
.defaultValue(Duration.ofSeconds(30))
581
.withDescription("The timeout for closing the source reader.");
582
583
// Usage in source reader
584
public class ConfigurableFileSourceReader extends SingleThreadMultiplexSourceReaderBase<
585
FileRecord, String, FileSourceSplit, FileSourceSplitState> {
586
587
public ConfigurableFileSourceReader(
588
Configuration config,
589
SourceReaderContext context) {
590
super(
591
() -> new FileSystemSplitReader(config),
592
new FileRecordEmitter(),
593
config,
594
context
595
);
596
597
// Access configuration options
598
int queueCapacity = config.get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY);
599
Duration closeTimeout = config.get(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT);
600
601
LOG.info("Source reader configured with queue capacity: {}, close timeout: {}",
602
queueCapacity, closeTimeout);
603
}
604
}
605
```
606
607
## Best Practices
608
609
### Performance Optimization
610
611
1. **Batch Size Tuning**
612
```java
613
@Override
614
public RecordsWithSplitIds<FileRecord> fetch() throws IOException {
615
// Adjust batch size based on record size and processing speed
616
int batchSize = calculateOptimalBatchSize();
617
618
Map<String, Collection<FileRecord>> recordsBySplit = new HashMap<>();
619
620
for (Map.Entry<String, FileReaderState> entry : splitReaders.entrySet()) {
621
List<FileRecord> records = new ArrayList<>();
622
623
// Read optimal batch size for each split
624
for (int i = 0; i < batchSize && readerState.hasMore(); i++) {
625
// ... reading logic
626
}
627
}
628
}
629
630
private int calculateOptimalBatchSize() {
631
// Consider factors like:
632
// - Available memory
633
// - Record processing time
634
// - Network latency
635
// - Split characteristics
636
return Math.min(1000, Runtime.getRuntime().availableProcessors() * 100);
637
}
638
```
639
640
2. **Efficient State Management**
641
```java
642
@Override
643
protected void onSplitFinished(Map<String, FileSourceSplitState> finishedSplitIds) {
644
// Efficient cleanup of finished splits
645
finishedSplitIds.forEach((splitId, splitState) -> {
646
try {
647
// Close resources immediately
648
splitState.cleanup();
649
650
// Update metrics
651
updateSplitFinishedMetrics(splitId);
652
653
// Log completion with important details
654
LOG.info("Split {} finished: processed {} bytes in {} ms",
655
splitId, splitState.getBytesProcessed(), splitState.getProcessingTime());
656
} catch (Exception e) {
657
LOG.warn("Error cleaning up finished split {}", splitId, e);
658
}
659
});
660
}
661
```
662
663
3. **Memory Management**
664
```java
665
public class MemoryAwareFileReader implements SplitReader<FileRecord, FileSourceSplit> {
666
private final MemoryManager memoryManager;
667
private final long maxMemoryUsage;
668
669
@Override
670
public RecordsWithSplitIds<FileRecord> fetch() throws IOException {
671
// Check memory usage before reading
672
if (memoryManager.getCurrentUsage() > maxMemoryUsage) {
673
// Return smaller batch or pause reading
674
return RecordsBySplits.forRecords(Collections.emptyMap());
675
}
676
677
// Normal fetch logic
678
return fetchRecords();
679
}
680
}
681
```
682
683
### Error Handling and Resilience
684
685
1. **Split-Level Error Isolation**
686
```java
687
@Override
688
public RecordsWithSplitIds<FileRecord> fetch() throws IOException {
689
Map<String, Collection<FileRecord>> recordsBySplit = new HashMap<>();
690
Set<String> finishedSplits = new HashSet<>();
691
List<String> failedSplits = new ArrayList<>();
692
693
for (Map.Entry<String, FileReaderState> entry : splitReaders.entrySet()) {
694
String splitId = entry.getKey();
695
696
try {
697
// Read from split
698
List<FileRecord> records = readFromSplit(entry.getValue());
699
if (!records.isEmpty()) {
700
recordsBySplit.put(splitId, records);
701
}
702
} catch (IOException e) {
703
LOG.error("Error reading from split {}, marking as failed", splitId, e);
704
failedSplits.add(splitId);
705
}
706
}
707
708
// Handle failed splits
709
handleFailedSplits(failedSplits);
710
711
return RecordsBySplits.forRecords(recordsBySplit, finishedSplits);
712
}
713
714
private void handleFailedSplits(List<String> failedSplits) {
715
for (String splitId : failedSplits) {
716
FileReaderState readerState = splitReaders.remove(splitId);
717
if (readerState != null) {
718
readerState.close();
719
720
// Optionally retry split or report failure
721
reportSplitFailure(splitId);
722
}
723
}
724
}
725
```
726
727
2. **Graceful Degradation**
728
```java
729
public class ResilientFileSourceReader extends SingleThreadMultiplexSourceReaderBase<
730
FileRecord, String, FileSourceSplit, FileSourceSplitState> {
731
732
private final AtomicInteger failedSplitCount = new AtomicInteger(0);
733
private final int maxFailedSplits;
734
735
@Override
736
protected void onSplitFinished(Map<String, FileSourceSplitState> finishedSplitIds) {
737
// Check if too many splits have failed
738
if (failedSplitCount.get() > maxFailedSplits) {
739
LOG.warn("Too many splits have failed ({}), source may be degraded",
740
failedSplitCount.get());
741
}
742
743
super.onSplitFinished(finishedSplitIds);
744
}
745
}
746
```
747
748
The Source Reader Framework provides a robust foundation for building sophisticated source readers with automatic coordination, efficient resource management, and comprehensive error handling capabilities.