CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-base

Base classes and utilities for building Apache Flink connectors with async sink patterns, source readers, and advanced streaming capabilities

Pending
Overview
Eval results
Files

hybrid-source.mddocs/

Hybrid Source System

The Hybrid Source System enables seamless switching between multiple underlying sources based on configured source chains. It supports both static pre-configured sources and dynamic source creation with position transfer between sources.

Core Components

HybridSource

The main class that coordinates multiple underlying sources.

@PublicEvolving
public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> {
    
    // Static builder methods
    public static <T, EnumT extends SplitEnumerator> HybridSourceBuilder<T, EnumT> builder(
            Source<T, ?, ?> firstSource)
    
    // Source interface methods
    public Boundedness getBoundedness()
    public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext readerContext) throws Exception
    public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> createEnumerator(
            SplitEnumeratorContext<HybridSourceSplit> enumContext)
    public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> restoreEnumerator(
            SplitEnumeratorContext<HybridSourceSplit> enumContext,
            HybridSourceEnumeratorState checkpoint) throws Exception
    public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer()
    public SimpleVersionedSerializer<HybridSourceEnumeratorState> getEnumeratorCheckpointSerializer()
}

HybridSourceBuilder

Builder for constructing hybrid sources with multiple underlying sources.

@PublicEvolving
public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator> implements Serializable {
    
    // Add pre-configured source
    public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
            HybridSourceBuilder<T, ToEnumT> addSource(NextSourceT source)
    
    // Add source with deferred instantiation
    public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
            HybridSourceBuilder<T, ToEnumT> addSource(
                    SourceFactory<T, NextSourceT, ? super EnumT> sourceFactory,
                    Boundedness boundedness)
    
    // Build the hybrid source
    public HybridSource<T> build()
}

SourceFactory

Factory interface for creating sources with dynamic configuration.

@PublicEvolving
@FunctionalInterface
public interface SourceFactory<T, SourceT extends Source<T, ?, ?>, FromEnumT extends SplitEnumerator>
        extends Serializable {
    SourceT create(SourceSwitchContext<FromEnumT> context)
}

SourceSwitchContext

Context provided to source factory for position transfer.

@PublicEvolving
public interface SourceSwitchContext<EnumT> {
    EnumT getPreviousEnumerator()
}

Implementation Examples

Simple Hybrid Source

// Create a hybrid source that reads files first, then switches to Kafka
public class FileToKafkaHybridSource {
    
    public static HybridSource<String> create(
            Path filePath, 
            String kafkaBootstrapServers, 
            String kafkaTopic) {
        
        // Create file source
        FileSource<String> fileSource = FileSource
            .forRecordStreamFormat(new TextLineInputFormat(), filePath)
            .build();
        
        // Create Kafka source
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers(kafkaBootstrapServers)
            .setTopics(kafkaTopic)
            .setGroupId("hybrid-consumer")
            .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
            .setStartingOffsets(OffsetsInitializer.earliest())
            .build();
        
        // Build hybrid source
        return HybridSource.builder(fileSource)
            .addSource(kafkaSource)
            .build();
    }
}

// Usage
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

HybridSource<String> hybridSource = FileToKafkaHybridSource.create(
    Paths.get("/path/to/input/files"),
    "localhost:9092",
    "input-events"
);

DataStream<String> stream = env.fromSource(
    hybridSource,
    WatermarkStrategy.noWatermarks(),
    "hybrid-file-kafka-source"
);

Dynamic Position Transfer

// Advanced hybrid source with position transfer from file timestamps to Kafka offsets
public class TimestampBasedHybridSource {
    
    public static HybridSource<EventRecord> createWithTimestampTransfer(
            Path filePath,
            String kafkaBootstrapServers,
            String kafkaTopic) {
        
        // Create timestamped file source
        TimestampedFileSource fileSource = new TimestampedFileSource(filePath);
        
        // Build hybrid source with dynamic Kafka configuration
        return HybridSource.<EventRecord, TimestampedFileEnumerator>builder(fileSource)
            .addSource(
                switchContext -> {
                    // Get the previous enumerator to extract end timestamp
                    TimestampedFileEnumerator fileEnumerator = switchContext.getPreviousEnumerator();
                    long endTimestamp = fileEnumerator.getMaxTimestamp();
                    
                    LOG.info("Switching from file source to Kafka at timestamp: {}", endTimestamp);
                    
                    // Create Kafka source starting from the file's end timestamp
                    return KafkaSource.<EventRecord>builder()
                        .setBootstrapServers(kafkaBootstrapServers)
                        .setTopics(kafkaTopic)
                        .setGroupId("hybrid-consumer-" + UUID.randomUUID())
                        .setDeserializer(new EventRecordDeserializer())
                        .setStartingOffsets(OffsetsInitializer.timestamp(endTimestamp))
                        .build();
                },
                Boundedness.CONTINUOUS_UNBOUNDED
            )
            .build();
    }
}

// Custom file source that tracks timestamps
public class TimestampedFileSource implements Source<EventRecord, TimestampedFileSplit, TimestampedFileEnumeratorState> {
    private final Path filePath;
    
    public TimestampedFileSource(Path filePath) {
        this.filePath = filePath;
    }
    
    @Override
    public SplitEnumerator<TimestampedFileSplit, TimestampedFileEnumeratorState> createEnumerator(
            SplitEnumeratorContext<TimestampedFileSplit> enumContext) {
        return new TimestampedFileEnumerator(enumContext, filePath);
    }
    
    @Override
    public SourceReader<EventRecord, TimestampedFileSplit> createReader(
            SourceReaderContext readerContext) {
        return new TimestampedFileReader(readerContext);
    }
    
    @Override
    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }
    
    // ... other required methods
}

// Enumerator that tracks maximum timestamp seen
public class TimestampedFileEnumerator implements SplitEnumerator<TimestampedFileSplit, TimestampedFileEnumeratorState> {
    private final SplitEnumeratorContext<TimestampedFileSplit> context;
    private final Path filePath;
    private long maxTimestamp = 0;
    private boolean splitsAssigned = false;
    
    public TimestampedFileEnumerator(SplitEnumeratorContext<TimestampedFileSplit> context, Path filePath) {
        this.context = context;
        this.filePath = filePath;
    }
    
    @Override
    public void start() {
        // Assign splits for file reading
        if (!splitsAssigned) {
            List<TimestampedFileSplit> splits = createFileSplits();
            context.assignSplits(new SplitsAssignment<>(
                Collections.singletonMap(0, splits)  // Assign to reader 0
            ));
            splitsAssigned = true;
        }
    }
    
    @Override
    public void handleSplitRequest(int subtaskId, String requesterHostname) {
        // No more splits to assign
    }
    
    @Override
    public void addSplitsBack(List<TimestampedFileSplit> splits, int subtaskId) {
        // Re-assign splits if needed
        context.assignSplits(new SplitsAssignment<>(
            Collections.singletonMap(subtaskId, splits)
        ));
    }
    
    @Override
    public void addReader(int subtaskId) {
        // New reader registered
    }
    
    @Override
    public TimestampedFileEnumeratorState snapshotState(long checkpointId) {
        return new TimestampedFileEnumeratorState(maxTimestamp, splitsAssigned);
    }
    
    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        // Checkpoint completed
    }
    
    @Override
    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        if (sourceEvent instanceof TimestampUpdateEvent) {
            TimestampUpdateEvent timestampEvent = (TimestampUpdateEvent) sourceEvent;
            maxTimestamp = Math.max(maxTimestamp, timestampEvent.getTimestamp());
            LOG.debug("Updated max timestamp to: {}", maxTimestamp);
        }
    }
    
    @Override
    public void close() {
        // Cleanup resources
    }
    
    public long getMaxTimestamp() {
        return maxTimestamp;
    }
    
    private List<TimestampedFileSplit> createFileSplits() {
        // Create splits for the file
        return Collections.singletonList(
            new TimestampedFileSplit("file-split-0", filePath, 0, getFileSize(filePath))
        );
    }
}

// Event to communicate timestamp updates
public class TimestampUpdateEvent implements SourceEvent {
    private final long timestamp;
    
    public TimestampUpdateEvent(long timestamp) {
        this.timestamp = timestamp;
    }
    
    public long getTimestamp() {
        return timestamp;
    }
}

Multi-Stage Hybrid Source

// Complex hybrid source with multiple stages: Archive → Recent Files → Real-time Stream
public class MultiStageHybridSource {
    
    public static HybridSource<LogRecord> createLogProcessingSource(
            Path archivePath,
            Path recentPath,
            String streamingEndpoint) {
        
        // Stage 1: Archive files (oldest data)
        FileSource<LogRecord> archiveSource = FileSource
            .forRecordStreamFormat(new LogRecordFormat(), archivePath)
            .build();
        
        // Stage 2: Recent files (newer data)
        FileSource<LogRecord> recentSource = FileSource
            .forRecordStreamFormat(new LogRecordFormat(), recentPath)
            .monitorContinuously(Duration.ofSeconds(10))  // Monitor for new files
            .build();
        
        return HybridSource.<LogRecord, FileSourceEnumerator>builder(archiveSource)
            // Add recent files stage
            .addSource(
                switchContext -> {
                    FileSourceEnumerator archiveEnumerator = switchContext.getPreviousEnumerator();
                    long maxProcessedTime = archiveEnumerator.getMaxProcessedTimestamp();
                    
                    // Configure recent source to start after archive data
                    return FileSource
                        .forRecordStreamFormat(new LogRecordFormat(), recentPath)
                        .monitorContinuously(Duration.ofSeconds(10))
                        .setFilenameFilter(path -> getFileTimestamp(path) > maxProcessedTime)
                        .build();
                },
                Boundedness.BOUNDED  // Recent files are bounded
            )
            // Add real-time streaming stage
            .addSource(
                switchContext -> {
                    // Get end time from recent files
                    FileSourceEnumerator recentEnumerator = switchContext.getPreviousEnumerator();
                    long streamStartTime = recentEnumerator.getMaxProcessedTimestamp();
                    
                    // Create streaming source starting from where files ended
                    return new LogStreamSource(streamingEndpoint, streamStartTime);
                },
                Boundedness.CONTINUOUS_UNBOUNDED  // Streaming is unbounded
            )
            .build();
    }
    
    private static long getFileTimestamp(Path path) {
        // Extract timestamp from filename or file attributes
        String filename = path.getFileName().toString();
        // Assuming filename like "logs-2024-01-01-12-00.txt"
        // Parse and return timestamp
        return parseTimestampFromFilename(filename);
    }
}

Source Factory Patterns

// Factory for database sources with connection pooling
public class DatabaseSourceFactory implements SourceFactory<DatabaseRecord, DatabaseSource, Object> {
    private final String connectionUrl;
    private final String query;
    private final DataSource dataSource;
    
    public DatabaseSourceFactory(String connectionUrl, String query) {
        this.connectionUrl = connectionUrl;
        this.query = query;
        this.dataSource = createDataSource(connectionUrl);
    }
    
    @Override
    public DatabaseSource create(SourceSwitchContext<Object> context) {
        // Create database source with existing connection pool
        return new DatabaseSource(dataSource, query);
    }
    
    private DataSource createDataSource(String url) {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl(url);
        config.setMaximumPoolSize(10);
        config.setMinimumIdle(2);
        return new HikariDataSource(config);
    }
}

// Factory with conditional source creation
public class ConditionalSourceFactory implements SourceFactory<String, Source<String, ?, ?>, FileSourceEnumerator> {
    
    @Override
    public Source<String, ?, ?> create(SourceSwitchContext<FileSourceEnumerator> context) {
        FileSourceEnumerator previousEnumerator = context.getPreviousEnumerator();
        
        // Choose next source based on previous source state
        if (previousEnumerator.getProcessedRecordCount() > 1_000_000) {
            // Large dataset - use Kafka for scalability
            return createKafkaSource();
        } else if (previousEnumerator.hasErrors()) {
            // Previous source had errors - use reliable source
            return createDatabaseSource();
        } else {
            // Normal case - use default file source
            return createFileSource();
        }
    }
    
    private Source<String, ?, ?> createKafkaSource() {
        return KafkaSource.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics("high-volume-topic")
            .setGroupId("hybrid-high-volume")
            .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
            .setStartingOffsets(OffsetsInitializer.latest())
            .build();
    }
    
    private Source<String, ?, ?> createDatabaseSource() {
        return JdbcSource.<String>builder()
            .setDrivername("com.mysql.cj.jdbc.Driver")
            .setDBUrl("jdbc:mysql://localhost:3306/backup")
            .setQuery("SELECT data FROM backup_records WHERE processed = false")
            .setRowTypeInfo(Types.STRING)
            .build();
    }
    
    private Source<String, ?, ?> createFileSource() {
        return FileSource
            .forRecordStreamFormat(new TextLineInputFormat(), Paths.get("/backup/files"))
            .build();
    }
}

State Management and Checkpointing

HybridSourceEnumeratorState

public class HybridSourceEnumeratorState {
    private final int currentSourceIndex;
    private final byte[] currentEnumeratorState;
    private final List<HybridSourceSplit> remainingSplits;
    
    // Constructor and methods for state management
    public HybridSourceEnumeratorState(
            int currentSourceIndex,
            byte[] currentEnumeratorState,
            List<HybridSourceSplit> remainingSplits)
    
    public int getCurrentSourceIndex()
    public byte[] getCurrentEnumeratorState()
    public List<HybridSourceSplit> getRemainingSplits()
}

Custom State Serialization

public class CustomHybridSourceStateSerializer 
        implements SimpleVersionedSerializer<HybridSourceEnumeratorState> {
    
    @Override
    public int getVersion() {
        return 1;
    }
    
    @Override
    public byte[] serialize(HybridSourceEnumeratorState state) throws IOException {
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutputStream out = new DataOutputStream(baos)) {
            
            // Write current source index
            out.writeInt(state.getCurrentSourceIndex());
            
            // Write enumerator state
            byte[] enumState = state.getCurrentEnumeratorState();
            out.writeInt(enumState.length);
            out.write(enumState);
            
            // Write remaining splits
            List<HybridSourceSplit> splits = state.getRemainingSplits();
            out.writeInt(splits.size());
            for (HybridSourceSplit split : splits) {
                serializeSplit(split, out);
            }
            
            return baos.toByteArray();
        }
    }
    
    @Override
    public HybridSourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
             DataInputStream in = new DataInputStream(bais)) {
            
            // Read current source index
            int currentSourceIndex = in.readInt();
            
            // Read enumerator state
            int enumStateLength = in.readInt();
            byte[] enumState = new byte[enumStateLength];
            in.readFully(enumState);
            
            // Read remaining splits
            int splitsCount = in.readInt();
            List<HybridSourceSplit> splits = new ArrayList<>(splitsCount);
            for (int i = 0; i < splitsCount; i++) {
                splits.add(deserializeSplit(in));
            }
            
            return new HybridSourceEnumeratorState(currentSourceIndex, enumState, splits);
        }
    }
}

Configuration Examples

Basic Configuration

// Simple file-to-stream hybrid
HybridSource<String> basicHybrid = HybridSource.builder(fileSource)
    .addSource(streamSource)
    .build();

// Configure in job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(Duration.ofMinutes(1));

DataStream<String> stream = env.fromSource(
    basicHybrid,
    WatermarkStrategy.noWatermarks(),
    "basic-hybrid-source"
);

Advanced Configuration with Position Transfer

public class AdvancedHybridConfiguration {
    
    public static HybridSource<TransactionRecord> createTransactionSource() {
        // Historical data from data lake
        ParquetSource<TransactionRecord> historicalSource = ParquetSource
            .forRecords(TransactionRecord.class, Paths.get("s3://data-lake/transactions/"))
            .withParallelism(16)
            .build();
        
        return HybridSource.<TransactionRecord, ParquetSourceEnumerator>builder(historicalSource)
            // Recent data from database
            .addSource(
                switchContext -> {
                    ParquetSourceEnumerator histEnumerator = switchContext.getPreviousEnumerator();
                    Instant cutoffTime = histEnumerator.getMaxRecordTimestamp();
                    
                    return JdbcSource.<TransactionRecord>builder()
                        .setDrivername("org.postgresql.Driver")
                        .setDBUrl("jdbc:postgresql://localhost:5432/transactions")
                        .setQuery("SELECT * FROM transactions WHERE created_at > ?")
                        .setParametersProvider(new TimestampParameterProvider(cutoffTime))
                        .setRowTypeInfo(TransactionRecord.getTypeInfo())
                        .build();
                },
                Boundedness.BOUNDED
            )
            // Live stream
            .addSource(
                switchContext -> {
                    // Get end time from database source
                    JdbcSourceEnumerator dbEnumerator = switchContext.getPreviousEnumerator();
                    Instant streamStart = dbEnumerator.getMaxProcessedTimestamp();
                    
                    return KafkaSource.<TransactionRecord>builder()
                        .setBootstrapServers("kafka-cluster:9092")
                        .setTopics("live-transactions")
                        .setGroupId("hybrid-transaction-processor")
                        .setDeserializer(new TransactionRecordDeserializer())
                        .setStartingOffsets(OffsetsInitializer.timestamp(streamStart.toEpochMilli()))
                        .build();
                },
                Boundedness.CONTINUOUS_UNBOUNDED
            )
            .build();
    }
}

Best Practices

Position Transfer

  1. Implement Precise Timestamp Tracking
public class TimestampTrackingEnumerator implements SplitEnumerator<MySplit, MyEnumeratorState> {
    private volatile Instant maxProcessedTimestamp = Instant.MIN;
    
    @Override
    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        if (sourceEvent instanceof TimestampProgressEvent) {
            TimestampProgressEvent event = (TimestampProgressEvent) sourceEvent;
            synchronized (this) {
                if (event.getTimestamp().isAfter(maxProcessedTimestamp)) {
                    maxProcessedTimestamp = event.getTimestamp();
                }
            }
        }
    }
    
    public Instant getMaxProcessedTimestamp() {
        return maxProcessedTimestamp;
    }
}
  1. Handle Clock Skew and Overlap
public class OverlapHandlingSourceFactory implements SourceFactory<Event, KafkaSource<Event>, FileSourceEnumerator> {
    private final Duration overlapBuffer;
    
    @Override
    public KafkaSource<Event> create(SourceSwitchContext<FileSourceEnumerator> context) {
        FileSourceEnumerator prevEnum = context.getPreviousEnumerator();
        Instant switchTime = prevEnum.getMaxProcessedTimestamp();
        
        // Start slightly before the file source ended to handle clock skew
        Instant kafkaStartTime = switchTime.minus(overlapBuffer);
        
        return KafkaSource.<Event>builder()
            .setStartingOffsets(OffsetsInitializer.timestamp(kafkaStartTime.toEpochMilli()))
            .setDeserializer(new DeduplicatingEventDeserializer(switchTime))  // Handle duplicates
            .build();
    }
}

Resource Management

  1. Efficient Source Lifecycle Management
public class ResourceAwareHybridSource {
    
    public static <T> HybridSource<T> createWithResourceManagement(
            List<Source<T, ?, ?>> sources) {
        
        HybridSourceBuilder<T, ?> builder = null;
        
        for (int i = 0; i < sources.size(); i++) {
            final int sourceIndex = i;
            
            if (builder == null) {
                builder = HybridSource.builder(sources.get(0));
            } else {
                builder = builder.addSource(
                    switchContext -> {
                        // Cleanup previous source resources if needed
                        cleanupPreviousSource(switchContext.getPreviousEnumerator());
                        
                        // Pre-warm next source
                        Source<T, ?, ?> nextSource = sources.get(sourceIndex);
                        prewarmSource(nextSource);
                        
                        return nextSource;
                    },
                    sources.get(sourceIndex).getBoundedness()
                );
            }
        }
        
        return builder.build();
    }
    
    private static void cleanupPreviousSource(SplitEnumerator previousEnumerator) {
        if (previousEnumerator instanceof ResourceManager) {
            ((ResourceManager) previousEnumerator).releaseResources();
        }
    }
    
    private static <T> void prewarmSource(Source<T, ?, ?> source) {
        if (source instanceof Prewarmable) {
            ((Prewarmable) source).prewarm();
        }
    }
}
  1. Memory and Performance Optimization
public class OptimizedHybridSource {
    
    public static HybridSource<Record> createOptimized(SourceChainConfig config) {
        
        return HybridSource.builder(createFirstSource(config))
            .addSource(
                switchContext -> {
                    // Adjust parallelism based on data volume
                    int optimalParallelism = calculateOptimalParallelism(
                        switchContext.getPreviousEnumerator()
                    );
                    
                    return createSecondSource(config)
                        .withParallelism(optimalParallelism);
                },
                Boundedness.CONTINUOUS_UNBOUNDED
            )
            .build();
    }
    
    private static int calculateOptimalParallelism(SplitEnumerator previousEnumerator) {
        if (previousEnumerator instanceof MetricsProvider) {
            MetricsProvider metricsProvider = (MetricsProvider) previousEnumerator;
            long recordsPerSecond = metricsProvider.getRecordsPerSecond();
            
            // Scale parallelism based on throughput
            return Math.max(1, (int) (recordsPerSecond / 10000));  // 10k records per subtask
        }
        
        return Runtime.getRuntime().availableProcessors();
    }
}

The Hybrid Source System provides a powerful framework for building complex data ingestion pipelines that can seamlessly transition between different data sources while maintaining exactly-once processing guarantees and efficient resource utilization.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-base

docs

async-sink.md

hybrid-source.md

index.md

rate-limiting.md

source-reader.md

table-api.md

tile.json