CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-base

Base classes and utilities for building Apache Flink connectors with async sink patterns, source readers, and advanced streaming capabilities

Pending
Overview
Eval results
Files

source-reader.mddocs/

Source Reader Framework

The Source Reader Framework provides a sophisticated foundation for building custom Flink source readers with automatic split management, coordination between threads, and comprehensive state handling. It supports both single-threaded and multi-threaded split reading patterns.

Core Components

SourceReaderBase

The foundation class for all source reader implementations.

@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
        implements SourceReader<T, SplitT> {
    
    // Constructors
    public SourceReaderBase(
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context)
            
    public SourceReaderBase(
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context)
    
    // Public interface methods
    public void start()
    public InputStatus pollNext(ReaderOutput<T> output) throws Exception
    public CompletableFuture<Void> isAvailable()
    public List<SplitT> snapshotState(long checkpointId)
    public void addSplits(List<SplitT> splits)
    public void notifyNoMoreSplits()
    public void handleSourceEvents(SourceEvent sourceEvent)
    public void pauseOrResumeSplits(Collection<String> splitsToPause, Collection<String> splitsToResume)
    public void close() throws Exception
    public int getNumberOfCurrentlyAssignedSplits()
    
    // Abstract methods to implement
    protected abstract void onSplitFinished(Map<String, SplitStateT> finishedSplitIds)
    protected abstract SplitStateT initializedState(SplitT split)
    protected abstract SplitT toSplitType(String splitId, SplitStateT splitState)
}

SingleThreadMultiplexSourceReaderBase

Specialized reader for sources that use a single thread with one SplitReader.

@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
        extends SourceReaderBase<E, T, SplitT, SplitStateT> {
    
    // Constructors
    public SingleThreadMultiplexSourceReaderBase(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context)
            
    public SingleThreadMultiplexSourceReaderBase(
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context)
            
    public SingleThreadMultiplexSourceReaderBase(
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context)
}

SplitReader

Interface for reading records from splits.

@PublicEvolving
public interface SplitReader<E, SplitT> extends AutoCloseable {
    RecordsWithSplitIds<E> fetch() throws IOException
    void handleSplitsChanges(SplitsChange<SplitT> splitsChanges)
    void wakeUp()
    void pauseOrResumeSplits(Collection<SplitT> splitsToPause, Collection<SplitT> splitsToResume)
    void close() throws Exception
}

RecordEmitter

Processes records from the split reader and emits them to the output.

@PublicEvolving
public interface RecordEmitter<E, T, SplitStateT> {
    void emitRecord(E element, SourceOutput<T> output, SplitStateT splitState) throws Exception
}

RecordEvaluator

Evaluates records to determine if they represent end-of-stream markers.

@PublicEvolving
public interface RecordEvaluator<T> {
    boolean isEndOfStream(T record)
}

Implementation Examples

Complete File Source Reader

public class CustomFileSourceReader extends SingleThreadMultiplexSourceReaderBase<
        FileRecord, String, FileSourceSplit, FileSourceSplitState> {
    
    public CustomFileSourceReader(
            Configuration config,
            SourceReaderContext context) {
        super(
            () -> new FileSystemSplitReader(config),  // SplitReader supplier
            new FileRecordEmitter(),                   // Record emitter
            config,
            context
        );
    }
    
    @Override
    protected void onSplitFinished(Map<String, FileSourceSplitState> finishedSplitIds) {
        // Cleanup resources for finished splits
        for (Map.Entry<String, FileSourceSplitState> entry : finishedSplitIds.entrySet()) {
            String splitId = entry.getKey();
            FileSourceSplitState splitState = entry.getValue();
            
            LOG.info("Split {} finished at position {}", splitId, splitState.getOffset());
            
            // Close any split-specific resources
            splitState.cleanup();
        }
    }
    
    @Override
    protected FileSourceSplitState initializedState(FileSourceSplit split) {
        return new FileSourceSplitState(
            split.path(),
            split.offset(),
            split.length()
        );
    }
    
    @Override
    protected FileSourceSplit toSplitType(String splitId, FileSourceSplitState splitState) {
        return new FileSourceSplit(
            splitId,
            splitState.getPath(),
            splitState.getOffset(),
            splitState.getLength()
        );
    }
}

// Split implementation
public class FileSourceSplit implements SourceSplit {
    private final String splitId;
    private final Path path;
    private final long offset;
    private final long length;
    
    public FileSourceSplit(String splitId, Path path, long offset, long length) {
        this.splitId = splitId;
        this.path = path;
        this.offset = offset;
        this.length = length;
    }
    
    @Override
    public String splitId() {
        return splitId;
    }
    
    public Path path() { return path; }
    public long offset() { return offset; }
    public long length() { return length; }
}

// Split state implementation
public class FileSourceSplitState {
    private Path path;
    private long offset;
    private long length;
    private BufferedReader reader;
    
    public FileSourceSplitState(Path path, long offset, long length) {
        this.path = path;
        this.offset = offset;
        this.length = length;
    }
    
    public Path getPath() { return path; }
    public long getOffset() { return offset; }
    public void setOffset(long offset) { this.offset = offset; }
    public long getLength() { return length; }
    
    public BufferedReader getReader() { return reader; }
    public void setReader(BufferedReader reader) { this.reader = reader; }
    
    public void cleanup() {
        if (reader != null) {
            try {
                reader.close();
            } catch (IOException e) {
                LOG.warn("Failed to close reader for split", e);
            }
        }
    }
}

SplitReader Implementation

public class FileSystemSplitReader implements SplitReader<FileRecord, FileSourceSplit> {
    private final Configuration config;
    private final Map<String, FileReaderState> splitReaders;
    private final Set<String> pausedSplits;
    
    public FileSystemSplitReader(Configuration config) {
        this.config = config;
        this.splitReaders = new HashMap<>();
        this.pausedSplits = new HashSet<>();
    }
    
    @Override
    public RecordsWithSplitIds<FileRecord> fetch() throws IOException {
        Map<String, Collection<FileRecord>> recordsBySplit = new HashMap<>();
        Set<String> finishedSplits = new HashSet<>();
        
        for (Map.Entry<String, FileReaderState> entry : splitReaders.entrySet()) {
            String splitId = entry.getKey();
            
            // Skip paused splits
            if (pausedSplits.contains(splitId)) {
                continue;
            }
            
            FileReaderState readerState = entry.getValue();
            List<FileRecord> records = new ArrayList<>();
            
            try {
                // Read batch of records from this split
                for (int i = 0; i < 100 && readerState.hasMore(); i++) {
                    String line = readerState.readLine();
                    if (line != null) {
                        records.add(new FileRecord(
                            splitId,
                            line,
                            readerState.getCurrentOffset(),
                            System.currentTimeMillis()
                        ));
                    } else {
                        // End of split reached
                        finishedSplits.add(splitId);
                        break;
                    }
                }
                
                if (!records.isEmpty()) {
                    recordsBySplit.put(splitId, records);
                }
            } catch (IOException e) {
                LOG.error("Error reading from split {}", splitId, e);
                throw e;
            }
        }
        
        // Clean up finished splits
        for (String finishedSplit : finishedSplits) {
            FileReaderState readerState = splitReaders.remove(finishedSplit);
            if (readerState != null) {
                readerState.close();
            }
        }
        
        return RecordsBySplits.forRecords(recordsBySplit, finishedSplits);
    }
    
    @Override
    public void handleSplitsChanges(SplitsChange<FileSourceSplit> splitsChanges) {
        if (splitsChanges instanceof SplitsAddition) {
            SplitsAddition<FileSourceSplit> addition = (SplitsAddition<FileSourceSplit>) splitsChanges;
            for (FileSourceSplit split : addition.splits()) {
                try {
                    FileReaderState readerState = new FileReaderState(
                        split.path(),
                        split.offset(),
                        split.length()
                    );
                    splitReaders.put(split.splitId(), readerState);
                    LOG.info("Added split {} for file {}", split.splitId(), split.path());
                } catch (IOException e) {
                    LOG.error("Failed to open split {} for file {}", split.splitId(), split.path(), e);
                }
            }
        } else if (splitsChanges instanceof SplitsRemoval) {
            SplitsRemoval<FileSourceSplit> removal = (SplitsRemoval<FileSourceSplit>) splitsChanges;
            for (String splitId : removal.splitIds()) {
                FileReaderState readerState = splitReaders.remove(splitId);
                if (readerState != null) {
                    readerState.close();
                    LOG.info("Removed split {}", splitId);
                }
            }
        }
    }
    
    @Override
    public void wakeUp() {
        // Interrupt any blocking reads if needed
    }
    
    @Override
    public void pauseOrResumeSplits(
            Collection<FileSourceSplit> splitsToPause,
            Collection<FileSourceSplit> splitsToResume) {
        
        // Pause splits
        for (FileSourceSplit split : splitsToPause) {
            pausedSplits.add(split.splitId());
            LOG.info("Paused split {}", split.splitId());
        }
        
        // Resume splits
        for (FileSourceSplit split : splitsToResume) {
            pausedSplits.remove(split.splitId());
            LOG.info("Resumed split {}", split.splitId());
        }
    }
    
    @Override
    public void close() throws Exception {
        // Close all split readers
        for (FileReaderState readerState : splitReaders.values()) {
            readerState.close();
        }
        splitReaders.clear();
    }
}

// Helper class for managing file reading state
public class FileReaderState {
    private final Path path;
    private final long startOffset;
    private final long length;
    private BufferedReader reader;
    private long currentOffset;
    
    public FileReaderState(Path path, long startOffset, long length) throws IOException {
        this.path = path;
        this.startOffset = startOffset;
        this.length = length;
        this.currentOffset = startOffset;
        
        // Open file and skip to start offset
        FileInputStream fis = new FileInputStream(path.toFile());
        fis.skip(startOffset);
        this.reader = new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8));
    }
    
    public String readLine() throws IOException {
        if (currentOffset >= startOffset + length) {
            return null; // End of split
        }
        
        String line = reader.readLine();
        if (line != null) {
            currentOffset += line.getBytes(StandardCharsets.UTF_8).length + 1; // +1 for newline
        }
        return line;
    }
    
    public boolean hasMore() {
        return currentOffset < startOffset + length;
    }
    
    public long getCurrentOffset() {
        return currentOffset;
    }
    
    public void close() {
        if (reader != null) {
            try {
                reader.close();
            } catch (IOException e) {
                LOG.warn("Error closing file reader for {}", path, e);
            }
        }
    }
}

RecordEmitter Implementation

public class FileRecordEmitter implements RecordEmitter<FileRecord, String, FileSourceSplitState> {
    
    @Override
    public void emitRecord(
            FileRecord element, 
            SourceOutput<String> output, 
            FileSourceSplitState splitState) throws Exception {
        
        // Update split state with current offset
        splitState.setOffset(element.getOffset());
        
        // Extract the actual data and emit
        String record = element.getData();
        
        // Emit with timestamp if available
        if (element.getTimestamp() > 0) {
            output.collect(record, element.getTimestamp());
        } else {
            output.collect(record);
        }
    }
}

// Record type for file data
public class FileRecord {
    private final String splitId;
    private final String data;
    private final long offset;
    private final long timestamp;
    
    public FileRecord(String splitId, String data, long offset, long timestamp) {
        this.splitId = splitId;
        this.data = data;
        this.offset = offset;
        this.timestamp = timestamp;
    }
    
    public String getSplitId() { return splitId; }
    public String getData() { return data; }
    public long getOffset() { return offset; }
    public long getTimestamp() { return timestamp; }
}

Advanced Source Reader with Watermarks

public class WatermarkingFileSourceReader extends SingleThreadMultiplexSourceReaderBase<
        FileRecord, String, FileSourceSplit, FileSourceSplitState> {
    
    private final WatermarkStrategy<String> watermarkStrategy;
    private final Map<String, WatermarkOutput> splitWatermarkOutputs;
    
    public WatermarkingFileSourceReader(
            Configuration config,
            SourceReaderContext context,
            WatermarkStrategy<String> watermarkStrategy) {
        super(
            () -> new FileSystemSplitReader(config),
            new WatermarkingFileRecordEmitter(watermarkStrategy),
            config,
            context
        );
        this.watermarkStrategy = watermarkStrategy;
        this.splitWatermarkOutputs = new HashMap<>();
    }
    
    // ... other methods same as before ...
}

public class WatermarkingFileRecordEmitter 
        implements RecordEmitter<FileRecord, String, FileSourceSplitState> {
    
    private final WatermarkStrategy<String> watermarkStrategy;
    private final Map<String, WatermarkGenerator<String>> watermarkGenerators;
    private final Map<String, TimestampAssigner<String>> timestampAssigners;
    
    public WatermarkingFileRecordEmitter(WatermarkStrategy<String> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
        this.watermarkGenerators = new HashMap<>();
        this.timestampAssigners = new HashMap<>();
    }
    
    @Override
    public void emitRecord(
            FileRecord element, 
            SourceOutput<String> output, 
            FileSourceSplitState splitState) throws Exception {
        
        String splitId = element.getSplitId();
        String record = element.getData();
        
        // Update split state
        splitState.setOffset(element.getOffset());
        
        // Get or create watermark generator for this split
        WatermarkGenerator<String> watermarkGenerator = watermarkGenerators.computeIfAbsent(
            splitId, 
            k -> watermarkStrategy.createWatermarkGenerator(() -> new WatermarkGeneratorSupplier.Context() {
                @Override
                public MetricGroup getMetricGroup() {
                    return new UnregisteredMetricsGroup();
                }
                
                @Override
                public ProcessingTimeService getProcessingTimeService() {
                    return new TestProcessingTimeService();
                }
            })
        );
        
        // Get or create timestamp assigner
        TimestampAssigner<String> timestampAssigner = timestampAssigners.computeIfAbsent(
            splitId,
            k -> watermarkStrategy.createTimestampAssigner(() -> new TimestampAssignerSupplier.Context() {
                @Override
                public MetricGroup getMetricGroup() {
                    return new UnregisteredMetricsGroup();
                }
                
                @Override
                public ProcessingTimeService getProcessingTimeService() {
                    return new TestProcessingTimeService();
                }
            })
        );
        
        // Assign timestamp
        long timestamp = timestampAssigner.extractTimestamp(record, element.getTimestamp());
        
        // Update watermark generator
        watermarkGenerator.onEvent(record, timestamp, new WatermarkOutput() {
            @Override
            public void emitWatermark(Watermark watermark) {
                output.emitWatermark(watermark);
            }
            
            @Override
            public void markIdle() {
                output.markIdle();
            }
            
            @Override
            public void markActive() {
                output.markActive();
            }
        });
        
        // Emit record with timestamp
        output.collect(record, timestamp);
    }
}

Configuration and Options

SourceReaderOptions

// Available configuration options
public static final ConfigOption<Integer> ELEMENT_QUEUE_CAPACITY = 
    ConfigOptions.key("source.reader.element-queue-capacity")
        .intType()
        .defaultValue(1000)
        .withDescription("The capacity of the element queue in the source reader.");

public static final ConfigOption<Duration> SOURCE_READER_CLOSE_TIMEOUT = 
    ConfigOptions.key("source.reader.close-timeout")
        .durationType()
        .defaultValue(Duration.ofSeconds(30))
        .withDescription("The timeout for closing the source reader.");

// Usage in source reader
public class ConfigurableFileSourceReader extends SingleThreadMultiplexSourceReaderBase<
        FileRecord, String, FileSourceSplit, FileSourceSplitState> {
    
    public ConfigurableFileSourceReader(
            Configuration config,
            SourceReaderContext context) {
        super(
            () -> new FileSystemSplitReader(config),
            new FileRecordEmitter(),
            config,
            context
        );
        
        // Access configuration options
        int queueCapacity = config.get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY);
        Duration closeTimeout = config.get(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT);
        
        LOG.info("Source reader configured with queue capacity: {}, close timeout: {}", 
                 queueCapacity, closeTimeout);
    }
}

Best Practices

Performance Optimization

  1. Batch Size Tuning
@Override
public RecordsWithSplitIds<FileRecord> fetch() throws IOException {
    // Adjust batch size based on record size and processing speed
    int batchSize = calculateOptimalBatchSize();
    
    Map<String, Collection<FileRecord>> recordsBySplit = new HashMap<>();
    
    for (Map.Entry<String, FileReaderState> entry : splitReaders.entrySet()) {
        List<FileRecord> records = new ArrayList<>();
        
        // Read optimal batch size for each split
        for (int i = 0; i < batchSize && readerState.hasMore(); i++) {
            // ... reading logic
        }
    }
}

private int calculateOptimalBatchSize() {
    // Consider factors like:
    // - Available memory
    // - Record processing time
    // - Network latency
    // - Split characteristics
    return Math.min(1000, Runtime.getRuntime().availableProcessors() * 100);
}
  1. Efficient State Management
@Override
protected void onSplitFinished(Map<String, FileSourceSplitState> finishedSplitIds) {
    // Efficient cleanup of finished splits
    finishedSplitIds.forEach((splitId, splitState) -> {
        try {
            // Close resources immediately
            splitState.cleanup();
            
            // Update metrics
            updateSplitFinishedMetrics(splitId);
            
            // Log completion with important details
            LOG.info("Split {} finished: processed {} bytes in {} ms", 
                     splitId, splitState.getBytesProcessed(), splitState.getProcessingTime());
        } catch (Exception e) {
            LOG.warn("Error cleaning up finished split {}", splitId, e);
        }
    });
}
  1. Memory Management
public class MemoryAwareFileReader implements SplitReader<FileRecord, FileSourceSplit> {
    private final MemoryManager memoryManager;
    private final long maxMemoryUsage;
    
    @Override
    public RecordsWithSplitIds<FileRecord> fetch() throws IOException {
        // Check memory usage before reading
        if (memoryManager.getCurrentUsage() > maxMemoryUsage) {
            // Return smaller batch or pause reading
            return RecordsBySplits.forRecords(Collections.emptyMap());
        }
        
        // Normal fetch logic
        return fetchRecords();
    }
}

Error Handling and Resilience

  1. Split-Level Error Isolation
@Override
public RecordsWithSplitIds<FileRecord> fetch() throws IOException {
    Map<String, Collection<FileRecord>> recordsBySplit = new HashMap<>();
    Set<String> finishedSplits = new HashSet<>();
    List<String> failedSplits = new ArrayList<>();
    
    for (Map.Entry<String, FileReaderState> entry : splitReaders.entrySet()) {
        String splitId = entry.getKey();
        
        try {
            // Read from split
            List<FileRecord> records = readFromSplit(entry.getValue());
            if (!records.isEmpty()) {
                recordsBySplit.put(splitId, records);
            }
        } catch (IOException e) {
            LOG.error("Error reading from split {}, marking as failed", splitId, e);
            failedSplits.add(splitId);
        }
    }
    
    // Handle failed splits
    handleFailedSplits(failedSplits);
    
    return RecordsBySplits.forRecords(recordsBySplit, finishedSplits);
}

private void handleFailedSplits(List<String> failedSplits) {
    for (String splitId : failedSplits) {
        FileReaderState readerState = splitReaders.remove(splitId);
        if (readerState != null) {
            readerState.close();
            
            // Optionally retry split or report failure
            reportSplitFailure(splitId);
        }
    }
}
  1. Graceful Degradation
public class ResilientFileSourceReader extends SingleThreadMultiplexSourceReaderBase<
        FileRecord, String, FileSourceSplit, FileSourceSplitState> {
    
    private final AtomicInteger failedSplitCount = new AtomicInteger(0);
    private final int maxFailedSplits;
    
    @Override
    protected void onSplitFinished(Map<String, FileSourceSplitState> finishedSplitIds) {
        // Check if too many splits have failed
        if (failedSplitCount.get() > maxFailedSplits) {
            LOG.warn("Too many splits have failed ({}), source may be degraded", 
                     failedSplitCount.get());
        }
        
        super.onSplitFinished(finishedSplitIds);
    }
}

The Source Reader Framework provides a robust foundation for building sophisticated source readers with automatic coordination, efficient resource management, and comprehensive error handling capabilities.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-base

docs

async-sink.md

hybrid-source.md

index.md

rate-limiting.md

source-reader.md

table-api.md

tile.json