Base classes and utilities for building Apache Flink connectors with async sink patterns, source readers, and advanced streaming capabilities
—
The Hybrid Source System enables seamless switching between multiple underlying sources based on configured source chains. It supports both static pre-configured sources and dynamic source creation with position transfer between sources.
The main class that coordinates multiple underlying sources.
@PublicEvolving
public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> {
// Static builder methods
public static <T, EnumT extends SplitEnumerator> HybridSourceBuilder<T, EnumT> builder(
Source<T, ?, ?> firstSource)
// Source interface methods
public Boundedness getBoundedness()
public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext readerContext) throws Exception
public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> createEnumerator(
SplitEnumeratorContext<HybridSourceSplit> enumContext)
public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> restoreEnumerator(
SplitEnumeratorContext<HybridSourceSplit> enumContext,
HybridSourceEnumeratorState checkpoint) throws Exception
public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer()
public SimpleVersionedSerializer<HybridSourceEnumeratorState> getEnumeratorCheckpointSerializer()
}Builder for constructing hybrid sources with multiple underlying sources.
@PublicEvolving
public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator> implements Serializable {
// Add pre-configured source
public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
HybridSourceBuilder<T, ToEnumT> addSource(NextSourceT source)
// Add source with deferred instantiation
public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
HybridSourceBuilder<T, ToEnumT> addSource(
SourceFactory<T, NextSourceT, ? super EnumT> sourceFactory,
Boundedness boundedness)
// Build the hybrid source
public HybridSource<T> build()
}Factory interface for creating sources with dynamic configuration.
@PublicEvolving
@FunctionalInterface
public interface SourceFactory<T, SourceT extends Source<T, ?, ?>, FromEnumT extends SplitEnumerator>
extends Serializable {
SourceT create(SourceSwitchContext<FromEnumT> context)
}Context provided to source factory for position transfer.
@PublicEvolving
public interface SourceSwitchContext<EnumT> {
EnumT getPreviousEnumerator()
}// Create a hybrid source that reads files first, then switches to Kafka
public class FileToKafkaHybridSource {
public static HybridSource<String> create(
Path filePath,
String kafkaBootstrapServers,
String kafkaTopic) {
// Create file source
FileSource<String> fileSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), filePath)
.build();
// Create Kafka source
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(kafkaBootstrapServers)
.setTopics(kafkaTopic)
.setGroupId("hybrid-consumer")
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
// Build hybrid source
return HybridSource.builder(fileSource)
.addSource(kafkaSource)
.build();
}
}
// Usage
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
HybridSource<String> hybridSource = FileToKafkaHybridSource.create(
Paths.get("/path/to/input/files"),
"localhost:9092",
"input-events"
);
DataStream<String> stream = env.fromSource(
hybridSource,
WatermarkStrategy.noWatermarks(),
"hybrid-file-kafka-source"
);// Advanced hybrid source with position transfer from file timestamps to Kafka offsets
public class TimestampBasedHybridSource {
public static HybridSource<EventRecord> createWithTimestampTransfer(
Path filePath,
String kafkaBootstrapServers,
String kafkaTopic) {
// Create timestamped file source
TimestampedFileSource fileSource = new TimestampedFileSource(filePath);
// Build hybrid source with dynamic Kafka configuration
return HybridSource.<EventRecord, TimestampedFileEnumerator>builder(fileSource)
.addSource(
switchContext -> {
// Get the previous enumerator to extract end timestamp
TimestampedFileEnumerator fileEnumerator = switchContext.getPreviousEnumerator();
long endTimestamp = fileEnumerator.getMaxTimestamp();
LOG.info("Switching from file source to Kafka at timestamp: {}", endTimestamp);
// Create Kafka source starting from the file's end timestamp
return KafkaSource.<EventRecord>builder()
.setBootstrapServers(kafkaBootstrapServers)
.setTopics(kafkaTopic)
.setGroupId("hybrid-consumer-" + UUID.randomUUID())
.setDeserializer(new EventRecordDeserializer())
.setStartingOffsets(OffsetsInitializer.timestamp(endTimestamp))
.build();
},
Boundedness.CONTINUOUS_UNBOUNDED
)
.build();
}
}
// Custom file source that tracks timestamps
public class TimestampedFileSource implements Source<EventRecord, TimestampedFileSplit, TimestampedFileEnumeratorState> {
private final Path filePath;
public TimestampedFileSource(Path filePath) {
this.filePath = filePath;
}
@Override
public SplitEnumerator<TimestampedFileSplit, TimestampedFileEnumeratorState> createEnumerator(
SplitEnumeratorContext<TimestampedFileSplit> enumContext) {
return new TimestampedFileEnumerator(enumContext, filePath);
}
@Override
public SourceReader<EventRecord, TimestampedFileSplit> createReader(
SourceReaderContext readerContext) {
return new TimestampedFileReader(readerContext);
}
@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
}
// ... other required methods
}
// Enumerator that tracks maximum timestamp seen
public class TimestampedFileEnumerator implements SplitEnumerator<TimestampedFileSplit, TimestampedFileEnumeratorState> {
private final SplitEnumeratorContext<TimestampedFileSplit> context;
private final Path filePath;
private long maxTimestamp = 0;
private boolean splitsAssigned = false;
public TimestampedFileEnumerator(SplitEnumeratorContext<TimestampedFileSplit> context, Path filePath) {
this.context = context;
this.filePath = filePath;
}
@Override
public void start() {
// Assign splits for file reading
if (!splitsAssigned) {
List<TimestampedFileSplit> splits = createFileSplits();
context.assignSplits(new SplitsAssignment<>(
Collections.singletonMap(0, splits) // Assign to reader 0
));
splitsAssigned = true;
}
}
@Override
public void handleSplitRequest(int subtaskId, String requesterHostname) {
// No more splits to assign
}
@Override
public void addSplitsBack(List<TimestampedFileSplit> splits, int subtaskId) {
// Re-assign splits if needed
context.assignSplits(new SplitsAssignment<>(
Collections.singletonMap(subtaskId, splits)
));
}
@Override
public void addReader(int subtaskId) {
// New reader registered
}
@Override
public TimestampedFileEnumeratorState snapshotState(long checkpointId) {
return new TimestampedFileEnumeratorState(maxTimestamp, splitsAssigned);
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
// Checkpoint completed
}
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof TimestampUpdateEvent) {
TimestampUpdateEvent timestampEvent = (TimestampUpdateEvent) sourceEvent;
maxTimestamp = Math.max(maxTimestamp, timestampEvent.getTimestamp());
LOG.debug("Updated max timestamp to: {}", maxTimestamp);
}
}
@Override
public void close() {
// Cleanup resources
}
public long getMaxTimestamp() {
return maxTimestamp;
}
private List<TimestampedFileSplit> createFileSplits() {
// Create splits for the file
return Collections.singletonList(
new TimestampedFileSplit("file-split-0", filePath, 0, getFileSize(filePath))
);
}
}
// Event to communicate timestamp updates
public class TimestampUpdateEvent implements SourceEvent {
private final long timestamp;
public TimestampUpdateEvent(long timestamp) {
this.timestamp = timestamp;
}
public long getTimestamp() {
return timestamp;
}
}// Complex hybrid source with multiple stages: Archive → Recent Files → Real-time Stream
public class MultiStageHybridSource {
public static HybridSource<LogRecord> createLogProcessingSource(
Path archivePath,
Path recentPath,
String streamingEndpoint) {
// Stage 1: Archive files (oldest data)
FileSource<LogRecord> archiveSource = FileSource
.forRecordStreamFormat(new LogRecordFormat(), archivePath)
.build();
// Stage 2: Recent files (newer data)
FileSource<LogRecord> recentSource = FileSource
.forRecordStreamFormat(new LogRecordFormat(), recentPath)
.monitorContinuously(Duration.ofSeconds(10)) // Monitor for new files
.build();
return HybridSource.<LogRecord, FileSourceEnumerator>builder(archiveSource)
// Add recent files stage
.addSource(
switchContext -> {
FileSourceEnumerator archiveEnumerator = switchContext.getPreviousEnumerator();
long maxProcessedTime = archiveEnumerator.getMaxProcessedTimestamp();
// Configure recent source to start after archive data
return FileSource
.forRecordStreamFormat(new LogRecordFormat(), recentPath)
.monitorContinuously(Duration.ofSeconds(10))
.setFilenameFilter(path -> getFileTimestamp(path) > maxProcessedTime)
.build();
},
Boundedness.BOUNDED // Recent files are bounded
)
// Add real-time streaming stage
.addSource(
switchContext -> {
// Get end time from recent files
FileSourceEnumerator recentEnumerator = switchContext.getPreviousEnumerator();
long streamStartTime = recentEnumerator.getMaxProcessedTimestamp();
// Create streaming source starting from where files ended
return new LogStreamSource(streamingEndpoint, streamStartTime);
},
Boundedness.CONTINUOUS_UNBOUNDED // Streaming is unbounded
)
.build();
}
private static long getFileTimestamp(Path path) {
// Extract timestamp from filename or file attributes
String filename = path.getFileName().toString();
// Assuming filename like "logs-2024-01-01-12-00.txt"
// Parse and return timestamp
return parseTimestampFromFilename(filename);
}
}// Factory for database sources with connection pooling
public class DatabaseSourceFactory implements SourceFactory<DatabaseRecord, DatabaseSource, Object> {
private final String connectionUrl;
private final String query;
private final DataSource dataSource;
public DatabaseSourceFactory(String connectionUrl, String query) {
this.connectionUrl = connectionUrl;
this.query = query;
this.dataSource = createDataSource(connectionUrl);
}
@Override
public DatabaseSource create(SourceSwitchContext<Object> context) {
// Create database source with existing connection pool
return new DatabaseSource(dataSource, query);
}
private DataSource createDataSource(String url) {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(url);
config.setMaximumPoolSize(10);
config.setMinimumIdle(2);
return new HikariDataSource(config);
}
}
// Factory with conditional source creation
public class ConditionalSourceFactory implements SourceFactory<String, Source<String, ?, ?>, FileSourceEnumerator> {
@Override
public Source<String, ?, ?> create(SourceSwitchContext<FileSourceEnumerator> context) {
FileSourceEnumerator previousEnumerator = context.getPreviousEnumerator();
// Choose next source based on previous source state
if (previousEnumerator.getProcessedRecordCount() > 1_000_000) {
// Large dataset - use Kafka for scalability
return createKafkaSource();
} else if (previousEnumerator.hasErrors()) {
// Previous source had errors - use reliable source
return createDatabaseSource();
} else {
// Normal case - use default file source
return createFileSource();
}
}
private Source<String, ?, ?> createKafkaSource() {
return KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("high-volume-topic")
.setGroupId("hybrid-high-volume")
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
.setStartingOffsets(OffsetsInitializer.latest())
.build();
}
private Source<String, ?, ?> createDatabaseSource() {
return JdbcSource.<String>builder()
.setDrivername("com.mysql.cj.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/backup")
.setQuery("SELECT data FROM backup_records WHERE processed = false")
.setRowTypeInfo(Types.STRING)
.build();
}
private Source<String, ?, ?> createFileSource() {
return FileSource
.forRecordStreamFormat(new TextLineInputFormat(), Paths.get("/backup/files"))
.build();
}
}public class HybridSourceEnumeratorState {
private final int currentSourceIndex;
private final byte[] currentEnumeratorState;
private final List<HybridSourceSplit> remainingSplits;
// Constructor and methods for state management
public HybridSourceEnumeratorState(
int currentSourceIndex,
byte[] currentEnumeratorState,
List<HybridSourceSplit> remainingSplits)
public int getCurrentSourceIndex()
public byte[] getCurrentEnumeratorState()
public List<HybridSourceSplit> getRemainingSplits()
}public class CustomHybridSourceStateSerializer
implements SimpleVersionedSerializer<HybridSourceEnumeratorState> {
@Override
public int getVersion() {
return 1;
}
@Override
public byte[] serialize(HybridSourceEnumeratorState state) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
// Write current source index
out.writeInt(state.getCurrentSourceIndex());
// Write enumerator state
byte[] enumState = state.getCurrentEnumeratorState();
out.writeInt(enumState.length);
out.write(enumState);
// Write remaining splits
List<HybridSourceSplit> splits = state.getRemainingSplits();
out.writeInt(splits.size());
for (HybridSourceSplit split : splits) {
serializeSplit(split, out);
}
return baos.toByteArray();
}
}
@Override
public HybridSourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
// Read current source index
int currentSourceIndex = in.readInt();
// Read enumerator state
int enumStateLength = in.readInt();
byte[] enumState = new byte[enumStateLength];
in.readFully(enumState);
// Read remaining splits
int splitsCount = in.readInt();
List<HybridSourceSplit> splits = new ArrayList<>(splitsCount);
for (int i = 0; i < splitsCount; i++) {
splits.add(deserializeSplit(in));
}
return new HybridSourceEnumeratorState(currentSourceIndex, enumState, splits);
}
}
}// Simple file-to-stream hybrid
HybridSource<String> basicHybrid = HybridSource.builder(fileSource)
.addSource(streamSource)
.build();
// Configure in job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(Duration.ofMinutes(1));
DataStream<String> stream = env.fromSource(
basicHybrid,
WatermarkStrategy.noWatermarks(),
"basic-hybrid-source"
);public class AdvancedHybridConfiguration {
public static HybridSource<TransactionRecord> createTransactionSource() {
// Historical data from data lake
ParquetSource<TransactionRecord> historicalSource = ParquetSource
.forRecords(TransactionRecord.class, Paths.get("s3://data-lake/transactions/"))
.withParallelism(16)
.build();
return HybridSource.<TransactionRecord, ParquetSourceEnumerator>builder(historicalSource)
// Recent data from database
.addSource(
switchContext -> {
ParquetSourceEnumerator histEnumerator = switchContext.getPreviousEnumerator();
Instant cutoffTime = histEnumerator.getMaxRecordTimestamp();
return JdbcSource.<TransactionRecord>builder()
.setDrivername("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://localhost:5432/transactions")
.setQuery("SELECT * FROM transactions WHERE created_at > ?")
.setParametersProvider(new TimestampParameterProvider(cutoffTime))
.setRowTypeInfo(TransactionRecord.getTypeInfo())
.build();
},
Boundedness.BOUNDED
)
// Live stream
.addSource(
switchContext -> {
// Get end time from database source
JdbcSourceEnumerator dbEnumerator = switchContext.getPreviousEnumerator();
Instant streamStart = dbEnumerator.getMaxProcessedTimestamp();
return KafkaSource.<TransactionRecord>builder()
.setBootstrapServers("kafka-cluster:9092")
.setTopics("live-transactions")
.setGroupId("hybrid-transaction-processor")
.setDeserializer(new TransactionRecordDeserializer())
.setStartingOffsets(OffsetsInitializer.timestamp(streamStart.toEpochMilli()))
.build();
},
Boundedness.CONTINUOUS_UNBOUNDED
)
.build();
}
}public class TimestampTrackingEnumerator implements SplitEnumerator<MySplit, MyEnumeratorState> {
private volatile Instant maxProcessedTimestamp = Instant.MIN;
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof TimestampProgressEvent) {
TimestampProgressEvent event = (TimestampProgressEvent) sourceEvent;
synchronized (this) {
if (event.getTimestamp().isAfter(maxProcessedTimestamp)) {
maxProcessedTimestamp = event.getTimestamp();
}
}
}
}
public Instant getMaxProcessedTimestamp() {
return maxProcessedTimestamp;
}
}public class OverlapHandlingSourceFactory implements SourceFactory<Event, KafkaSource<Event>, FileSourceEnumerator> {
private final Duration overlapBuffer;
@Override
public KafkaSource<Event> create(SourceSwitchContext<FileSourceEnumerator> context) {
FileSourceEnumerator prevEnum = context.getPreviousEnumerator();
Instant switchTime = prevEnum.getMaxProcessedTimestamp();
// Start slightly before the file source ended to handle clock skew
Instant kafkaStartTime = switchTime.minus(overlapBuffer);
return KafkaSource.<Event>builder()
.setStartingOffsets(OffsetsInitializer.timestamp(kafkaStartTime.toEpochMilli()))
.setDeserializer(new DeduplicatingEventDeserializer(switchTime)) // Handle duplicates
.build();
}
}public class ResourceAwareHybridSource {
public static <T> HybridSource<T> createWithResourceManagement(
List<Source<T, ?, ?>> sources) {
HybridSourceBuilder<T, ?> builder = null;
for (int i = 0; i < sources.size(); i++) {
final int sourceIndex = i;
if (builder == null) {
builder = HybridSource.builder(sources.get(0));
} else {
builder = builder.addSource(
switchContext -> {
// Cleanup previous source resources if needed
cleanupPreviousSource(switchContext.getPreviousEnumerator());
// Pre-warm next source
Source<T, ?, ?> nextSource = sources.get(sourceIndex);
prewarmSource(nextSource);
return nextSource;
},
sources.get(sourceIndex).getBoundedness()
);
}
}
return builder.build();
}
private static void cleanupPreviousSource(SplitEnumerator previousEnumerator) {
if (previousEnumerator instanceof ResourceManager) {
((ResourceManager) previousEnumerator).releaseResources();
}
}
private static <T> void prewarmSource(Source<T, ?, ?> source) {
if (source instanceof Prewarmable) {
((Prewarmable) source).prewarm();
}
}
}public class OptimizedHybridSource {
public static HybridSource<Record> createOptimized(SourceChainConfig config) {
return HybridSource.builder(createFirstSource(config))
.addSource(
switchContext -> {
// Adjust parallelism based on data volume
int optimalParallelism = calculateOptimalParallelism(
switchContext.getPreviousEnumerator()
);
return createSecondSource(config)
.withParallelism(optimalParallelism);
},
Boundedness.CONTINUOUS_UNBOUNDED
)
.build();
}
private static int calculateOptimalParallelism(SplitEnumerator previousEnumerator) {
if (previousEnumerator instanceof MetricsProvider) {
MetricsProvider metricsProvider = (MetricsProvider) previousEnumerator;
long recordsPerSecond = metricsProvider.getRecordsPerSecond();
// Scale parallelism based on throughput
return Math.max(1, (int) (recordsPerSecond / 10000)); // 10k records per subtask
}
return Runtime.getRuntime().availableProcessors();
}
}The Hybrid Source System provides a powerful framework for building complex data ingestion pipelines that can seamlessly transition between different data sources while maintaining exactly-once processing guarantees and efficient resource utilization.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-base