Base classes and utilities for building Apache Flink connectors with async sink patterns, source readers, and advanced streaming capabilities
—
The Source Reader Framework provides a sophisticated foundation for building custom Flink source readers with automatic split management, coordination between threads, and comprehensive state handling. It supports both single-threaded and multi-threaded split reading patterns.
The foundation class for all source reader implementations.
@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
implements SourceReader<T, SplitT> {
// Constructors
public SourceReaderBase(
SplitFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context)
public SourceReaderBase(
SplitFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context)
// Public interface methods
public void start()
public InputStatus pollNext(ReaderOutput<T> output) throws Exception
public CompletableFuture<Void> isAvailable()
public List<SplitT> snapshotState(long checkpointId)
public void addSplits(List<SplitT> splits)
public void notifyNoMoreSplits()
public void handleSourceEvents(SourceEvent sourceEvent)
public void pauseOrResumeSplits(Collection<String> splitsToPause, Collection<String> splitsToResume)
public void close() throws Exception
public int getNumberOfCurrentlyAssignedSplits()
// Abstract methods to implement
protected abstract void onSplitFinished(Map<String, SplitStateT> finishedSplitIds)
protected abstract SplitStateT initializedState(SplitT split)
protected abstract SplitT toSplitType(String splitId, SplitStateT splitState)
}Specialized reader for sources that use a single thread with one SplitReader.
@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
extends SourceReaderBase<E, T, SplitT, SplitStateT> {
// Constructors
public SingleThreadMultiplexSourceReaderBase(
Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context)
public SingleThreadMultiplexSourceReaderBase(
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context)
public SingleThreadMultiplexSourceReaderBase(
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context)
}Interface for reading records from splits.
@PublicEvolving
public interface SplitReader<E, SplitT> extends AutoCloseable {
RecordsWithSplitIds<E> fetch() throws IOException
void handleSplitsChanges(SplitsChange<SplitT> splitsChanges)
void wakeUp()
void pauseOrResumeSplits(Collection<SplitT> splitsToPause, Collection<SplitT> splitsToResume)
void close() throws Exception
}Processes records from the split reader and emits them to the output.
@PublicEvolving
public interface RecordEmitter<E, T, SplitStateT> {
void emitRecord(E element, SourceOutput<T> output, SplitStateT splitState) throws Exception
}Evaluates records to determine if they represent end-of-stream markers.
@PublicEvolving
public interface RecordEvaluator<T> {
boolean isEndOfStream(T record)
}public class CustomFileSourceReader extends SingleThreadMultiplexSourceReaderBase<
FileRecord, String, FileSourceSplit, FileSourceSplitState> {
public CustomFileSourceReader(
Configuration config,
SourceReaderContext context) {
super(
() -> new FileSystemSplitReader(config), // SplitReader supplier
new FileRecordEmitter(), // Record emitter
config,
context
);
}
@Override
protected void onSplitFinished(Map<String, FileSourceSplitState> finishedSplitIds) {
// Cleanup resources for finished splits
for (Map.Entry<String, FileSourceSplitState> entry : finishedSplitIds.entrySet()) {
String splitId = entry.getKey();
FileSourceSplitState splitState = entry.getValue();
LOG.info("Split {} finished at position {}", splitId, splitState.getOffset());
// Close any split-specific resources
splitState.cleanup();
}
}
@Override
protected FileSourceSplitState initializedState(FileSourceSplit split) {
return new FileSourceSplitState(
split.path(),
split.offset(),
split.length()
);
}
@Override
protected FileSourceSplit toSplitType(String splitId, FileSourceSplitState splitState) {
return new FileSourceSplit(
splitId,
splitState.getPath(),
splitState.getOffset(),
splitState.getLength()
);
}
}
// Split implementation
public class FileSourceSplit implements SourceSplit {
private final String splitId;
private final Path path;
private final long offset;
private final long length;
public FileSourceSplit(String splitId, Path path, long offset, long length) {
this.splitId = splitId;
this.path = path;
this.offset = offset;
this.length = length;
}
@Override
public String splitId() {
return splitId;
}
public Path path() { return path; }
public long offset() { return offset; }
public long length() { return length; }
}
// Split state implementation
public class FileSourceSplitState {
private Path path;
private long offset;
private long length;
private BufferedReader reader;
public FileSourceSplitState(Path path, long offset, long length) {
this.path = path;
this.offset = offset;
this.length = length;
}
public Path getPath() { return path; }
public long getOffset() { return offset; }
public void setOffset(long offset) { this.offset = offset; }
public long getLength() { return length; }
public BufferedReader getReader() { return reader; }
public void setReader(BufferedReader reader) { this.reader = reader; }
public void cleanup() {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
LOG.warn("Failed to close reader for split", e);
}
}
}
}public class FileSystemSplitReader implements SplitReader<FileRecord, FileSourceSplit> {
private final Configuration config;
private final Map<String, FileReaderState> splitReaders;
private final Set<String> pausedSplits;
public FileSystemSplitReader(Configuration config) {
this.config = config;
this.splitReaders = new HashMap<>();
this.pausedSplits = new HashSet<>();
}
@Override
public RecordsWithSplitIds<FileRecord> fetch() throws IOException {
Map<String, Collection<FileRecord>> recordsBySplit = new HashMap<>();
Set<String> finishedSplits = new HashSet<>();
for (Map.Entry<String, FileReaderState> entry : splitReaders.entrySet()) {
String splitId = entry.getKey();
// Skip paused splits
if (pausedSplits.contains(splitId)) {
continue;
}
FileReaderState readerState = entry.getValue();
List<FileRecord> records = new ArrayList<>();
try {
// Read batch of records from this split
for (int i = 0; i < 100 && readerState.hasMore(); i++) {
String line = readerState.readLine();
if (line != null) {
records.add(new FileRecord(
splitId,
line,
readerState.getCurrentOffset(),
System.currentTimeMillis()
));
} else {
// End of split reached
finishedSplits.add(splitId);
break;
}
}
if (!records.isEmpty()) {
recordsBySplit.put(splitId, records);
}
} catch (IOException e) {
LOG.error("Error reading from split {}", splitId, e);
throw e;
}
}
// Clean up finished splits
for (String finishedSplit : finishedSplits) {
FileReaderState readerState = splitReaders.remove(finishedSplit);
if (readerState != null) {
readerState.close();
}
}
return RecordsBySplits.forRecords(recordsBySplit, finishedSplits);
}
@Override
public void handleSplitsChanges(SplitsChange<FileSourceSplit> splitsChanges) {
if (splitsChanges instanceof SplitsAddition) {
SplitsAddition<FileSourceSplit> addition = (SplitsAddition<FileSourceSplit>) splitsChanges;
for (FileSourceSplit split : addition.splits()) {
try {
FileReaderState readerState = new FileReaderState(
split.path(),
split.offset(),
split.length()
);
splitReaders.put(split.splitId(), readerState);
LOG.info("Added split {} for file {}", split.splitId(), split.path());
} catch (IOException e) {
LOG.error("Failed to open split {} for file {}", split.splitId(), split.path(), e);
}
}
} else if (splitsChanges instanceof SplitsRemoval) {
SplitsRemoval<FileSourceSplit> removal = (SplitsRemoval<FileSourceSplit>) splitsChanges;
for (String splitId : removal.splitIds()) {
FileReaderState readerState = splitReaders.remove(splitId);
if (readerState != null) {
readerState.close();
LOG.info("Removed split {}", splitId);
}
}
}
}
@Override
public void wakeUp() {
// Interrupt any blocking reads if needed
}
@Override
public void pauseOrResumeSplits(
Collection<FileSourceSplit> splitsToPause,
Collection<FileSourceSplit> splitsToResume) {
// Pause splits
for (FileSourceSplit split : splitsToPause) {
pausedSplits.add(split.splitId());
LOG.info("Paused split {}", split.splitId());
}
// Resume splits
for (FileSourceSplit split : splitsToResume) {
pausedSplits.remove(split.splitId());
LOG.info("Resumed split {}", split.splitId());
}
}
@Override
public void close() throws Exception {
// Close all split readers
for (FileReaderState readerState : splitReaders.values()) {
readerState.close();
}
splitReaders.clear();
}
}
// Helper class for managing file reading state
public class FileReaderState {
private final Path path;
private final long startOffset;
private final long length;
private BufferedReader reader;
private long currentOffset;
public FileReaderState(Path path, long startOffset, long length) throws IOException {
this.path = path;
this.startOffset = startOffset;
this.length = length;
this.currentOffset = startOffset;
// Open file and skip to start offset
FileInputStream fis = new FileInputStream(path.toFile());
fis.skip(startOffset);
this.reader = new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8));
}
public String readLine() throws IOException {
if (currentOffset >= startOffset + length) {
return null; // End of split
}
String line = reader.readLine();
if (line != null) {
currentOffset += line.getBytes(StandardCharsets.UTF_8).length + 1; // +1 for newline
}
return line;
}
public boolean hasMore() {
return currentOffset < startOffset + length;
}
public long getCurrentOffset() {
return currentOffset;
}
public void close() {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
LOG.warn("Error closing file reader for {}", path, e);
}
}
}
}public class FileRecordEmitter implements RecordEmitter<FileRecord, String, FileSourceSplitState> {
@Override
public void emitRecord(
FileRecord element,
SourceOutput<String> output,
FileSourceSplitState splitState) throws Exception {
// Update split state with current offset
splitState.setOffset(element.getOffset());
// Extract the actual data and emit
String record = element.getData();
// Emit with timestamp if available
if (element.getTimestamp() > 0) {
output.collect(record, element.getTimestamp());
} else {
output.collect(record);
}
}
}
// Record type for file data
public class FileRecord {
private final String splitId;
private final String data;
private final long offset;
private final long timestamp;
public FileRecord(String splitId, String data, long offset, long timestamp) {
this.splitId = splitId;
this.data = data;
this.offset = offset;
this.timestamp = timestamp;
}
public String getSplitId() { return splitId; }
public String getData() { return data; }
public long getOffset() { return offset; }
public long getTimestamp() { return timestamp; }
}public class WatermarkingFileSourceReader extends SingleThreadMultiplexSourceReaderBase<
FileRecord, String, FileSourceSplit, FileSourceSplitState> {
private final WatermarkStrategy<String> watermarkStrategy;
private final Map<String, WatermarkOutput> splitWatermarkOutputs;
public WatermarkingFileSourceReader(
Configuration config,
SourceReaderContext context,
WatermarkStrategy<String> watermarkStrategy) {
super(
() -> new FileSystemSplitReader(config),
new WatermarkingFileRecordEmitter(watermarkStrategy),
config,
context
);
this.watermarkStrategy = watermarkStrategy;
this.splitWatermarkOutputs = new HashMap<>();
}
// ... other methods same as before ...
}
public class WatermarkingFileRecordEmitter
implements RecordEmitter<FileRecord, String, FileSourceSplitState> {
private final WatermarkStrategy<String> watermarkStrategy;
private final Map<String, WatermarkGenerator<String>> watermarkGenerators;
private final Map<String, TimestampAssigner<String>> timestampAssigners;
public WatermarkingFileRecordEmitter(WatermarkStrategy<String> watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
this.watermarkGenerators = new HashMap<>();
this.timestampAssigners = new HashMap<>();
}
@Override
public void emitRecord(
FileRecord element,
SourceOutput<String> output,
FileSourceSplitState splitState) throws Exception {
String splitId = element.getSplitId();
String record = element.getData();
// Update split state
splitState.setOffset(element.getOffset());
// Get or create watermark generator for this split
WatermarkGenerator<String> watermarkGenerator = watermarkGenerators.computeIfAbsent(
splitId,
k -> watermarkStrategy.createWatermarkGenerator(() -> new WatermarkGeneratorSupplier.Context() {
@Override
public MetricGroup getMetricGroup() {
return new UnregisteredMetricsGroup();
}
@Override
public ProcessingTimeService getProcessingTimeService() {
return new TestProcessingTimeService();
}
})
);
// Get or create timestamp assigner
TimestampAssigner<String> timestampAssigner = timestampAssigners.computeIfAbsent(
splitId,
k -> watermarkStrategy.createTimestampAssigner(() -> new TimestampAssignerSupplier.Context() {
@Override
public MetricGroup getMetricGroup() {
return new UnregisteredMetricsGroup();
}
@Override
public ProcessingTimeService getProcessingTimeService() {
return new TestProcessingTimeService();
}
})
);
// Assign timestamp
long timestamp = timestampAssigner.extractTimestamp(record, element.getTimestamp());
// Update watermark generator
watermarkGenerator.onEvent(record, timestamp, new WatermarkOutput() {
@Override
public void emitWatermark(Watermark watermark) {
output.emitWatermark(watermark);
}
@Override
public void markIdle() {
output.markIdle();
}
@Override
public void markActive() {
output.markActive();
}
});
// Emit record with timestamp
output.collect(record, timestamp);
}
}// Available configuration options
public static final ConfigOption<Integer> ELEMENT_QUEUE_CAPACITY =
ConfigOptions.key("source.reader.element-queue-capacity")
.intType()
.defaultValue(1000)
.withDescription("The capacity of the element queue in the source reader.");
public static final ConfigOption<Duration> SOURCE_READER_CLOSE_TIMEOUT =
ConfigOptions.key("source.reader.close-timeout")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription("The timeout for closing the source reader.");
// Usage in source reader
public class ConfigurableFileSourceReader extends SingleThreadMultiplexSourceReaderBase<
FileRecord, String, FileSourceSplit, FileSourceSplitState> {
public ConfigurableFileSourceReader(
Configuration config,
SourceReaderContext context) {
super(
() -> new FileSystemSplitReader(config),
new FileRecordEmitter(),
config,
context
);
// Access configuration options
int queueCapacity = config.get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY);
Duration closeTimeout = config.get(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT);
LOG.info("Source reader configured with queue capacity: {}, close timeout: {}",
queueCapacity, closeTimeout);
}
}@Override
public RecordsWithSplitIds<FileRecord> fetch() throws IOException {
// Adjust batch size based on record size and processing speed
int batchSize = calculateOptimalBatchSize();
Map<String, Collection<FileRecord>> recordsBySplit = new HashMap<>();
for (Map.Entry<String, FileReaderState> entry : splitReaders.entrySet()) {
List<FileRecord> records = new ArrayList<>();
// Read optimal batch size for each split
for (int i = 0; i < batchSize && readerState.hasMore(); i++) {
// ... reading logic
}
}
}
private int calculateOptimalBatchSize() {
// Consider factors like:
// - Available memory
// - Record processing time
// - Network latency
// - Split characteristics
return Math.min(1000, Runtime.getRuntime().availableProcessors() * 100);
}@Override
protected void onSplitFinished(Map<String, FileSourceSplitState> finishedSplitIds) {
// Efficient cleanup of finished splits
finishedSplitIds.forEach((splitId, splitState) -> {
try {
// Close resources immediately
splitState.cleanup();
// Update metrics
updateSplitFinishedMetrics(splitId);
// Log completion with important details
LOG.info("Split {} finished: processed {} bytes in {} ms",
splitId, splitState.getBytesProcessed(), splitState.getProcessingTime());
} catch (Exception e) {
LOG.warn("Error cleaning up finished split {}", splitId, e);
}
});
}public class MemoryAwareFileReader implements SplitReader<FileRecord, FileSourceSplit> {
private final MemoryManager memoryManager;
private final long maxMemoryUsage;
@Override
public RecordsWithSplitIds<FileRecord> fetch() throws IOException {
// Check memory usage before reading
if (memoryManager.getCurrentUsage() > maxMemoryUsage) {
// Return smaller batch or pause reading
return RecordsBySplits.forRecords(Collections.emptyMap());
}
// Normal fetch logic
return fetchRecords();
}
}@Override
public RecordsWithSplitIds<FileRecord> fetch() throws IOException {
Map<String, Collection<FileRecord>> recordsBySplit = new HashMap<>();
Set<String> finishedSplits = new HashSet<>();
List<String> failedSplits = new ArrayList<>();
for (Map.Entry<String, FileReaderState> entry : splitReaders.entrySet()) {
String splitId = entry.getKey();
try {
// Read from split
List<FileRecord> records = readFromSplit(entry.getValue());
if (!records.isEmpty()) {
recordsBySplit.put(splitId, records);
}
} catch (IOException e) {
LOG.error("Error reading from split {}, marking as failed", splitId, e);
failedSplits.add(splitId);
}
}
// Handle failed splits
handleFailedSplits(failedSplits);
return RecordsBySplits.forRecords(recordsBySplit, finishedSplits);
}
private void handleFailedSplits(List<String> failedSplits) {
for (String splitId : failedSplits) {
FileReaderState readerState = splitReaders.remove(splitId);
if (readerState != null) {
readerState.close();
// Optionally retry split or report failure
reportSplitFailure(splitId);
}
}
}public class ResilientFileSourceReader extends SingleThreadMultiplexSourceReaderBase<
FileRecord, String, FileSourceSplit, FileSourceSplitState> {
private final AtomicInteger failedSplitCount = new AtomicInteger(0);
private final int maxFailedSplits;
@Override
protected void onSplitFinished(Map<String, FileSourceSplitState> finishedSplitIds) {
// Check if too many splits have failed
if (failedSplitCount.get() > maxFailedSplits) {
LOG.warn("Too many splits have failed ({}), source may be degraded",
failedSplitCount.get());
}
super.onSplitFinished(finishedSplitIds);
}
}The Source Reader Framework provides a robust foundation for building sophisticated source readers with automatic coordination, efficient resource management, and comprehensive error handling capabilities.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-base