Base classes and utilities for building Apache Flink connectors with async sink patterns, source readers, and advanced streaming capabilities
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
The Apache Flink Connector Base library provides foundational classes and utilities for building high-performance, production-ready Apache Flink connectors. It offers sophisticated async sink and source frameworks with built-in features like backpressure handling, rate limiting, checkpointing, and hybrid source switching.
Maven Coordinates:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.18+</version>
</dependency>Package: org.apache.flink.connector.base
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;import org.apache.flink.connector.base.sink.writer.BatchCreator;
import org.apache.flink.connector.base.sink.writer.RequestBuffer;
import org.apache.flink.connector.base.sink.writer.ResultHandler;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;// Define your request entry type
public class MyRequestEntry implements Serializable {
private final String data;
private final long timestamp;
public MyRequestEntry(String data, long timestamp) {
this.data = data;
this.timestamp = timestamp;
}
public String getData() { return data; }
public long getTimestamp() { return timestamp; }
}
// Implement ElementConverter
public class MyElementConverter implements ElementConverter<String, MyRequestEntry> {
@Override
public MyRequestEntry apply(String element, SinkWriter.Context context) {
return new MyRequestEntry(element, context.timestamp());
}
}
// Create AsyncSink implementation
public class MyAsyncSink extends AsyncSinkBase<String, MyRequestEntry> {
public MyAsyncSink() {
super(
new MyElementConverter(), // Element converter
100, // Max batch size
10, // Max in-flight requests
1000, // Max buffered requests
1024 * 1024, // Max batch size in bytes
5000, // Max time in buffer (ms)
256 * 1024, // Max record size in bytes
60000, // Request timeout (ms)
false // Fail on timeout
);
}
@Override
public SinkWriter<String> createWriter(WriterInitContext context) throws IOException {
return new MyAsyncSinkWriter(
getElementConverter(),
context,
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(getMaxBatchSize())
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
.setMaxInFlightRequests(getMaxInFlightRequests())
.setMaxBufferedRequests(getMaxBufferedRequests())
.setMaxTimeInBufferMS(getMaxTimeInBufferMS())
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
.build(),
Collections.emptyList()
);
}
}
// Implement AsyncSinkWriter
public class MyAsyncSinkWriter extends AsyncSinkWriter<String, MyRequestEntry> {
private final AsyncClient client;
public MyAsyncSinkWriter(
ElementConverter<String, MyRequestEntry> elementConverter,
WriterInitContext context,
AsyncSinkWriterConfiguration configuration,
Collection<BufferedRequestState<MyRequestEntry>> states) {
super(elementConverter, context, configuration, states);
this.client = new AsyncClient();
}
@Override
protected void submitRequestEntries(
List<MyRequestEntry> requestEntries,
ResultHandler<MyRequestEntry> resultHandler) {
CompletableFuture<Response> future = client.sendBatch(requestEntries);
future.whenComplete((response, error) -> {
if (error != null && isFatalError(error)) {
resultHandler.completeExceptionally(new RuntimeException(error));
} else if (error != null || response.hasFailures()) {
List<MyRequestEntry> failedEntries = getFailedEntries(requestEntries, response);
resultHandler.retryForEntries(failedEntries);
} else {
resultHandler.complete();
}
});
}
@Override
protected long getSizeInBytes(MyRequestEntry requestEntry) {
return requestEntry.getData().length() + 8; // Data length + timestamp
}
}// Create file and Kafka sources
FileSource<String> fileSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), path)
.build();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("events")
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
// Create hybrid source that reads files first, then switches to Kafka
HybridSource<String> hybridSource = HybridSource.builder(fileSource)
.addSource(kafkaSource)
.build();
// Use in DataStream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.fromSource(
hybridSource,
WatermarkStrategy.noWatermarks(),
"hybrid-source"
);The connector base library is organized into several key architectural components:
Complete framework for building async sinks with batching, buffering, rate limiting, and fault tolerance.
Key APIs:
public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
public interface ElementConverter<InputT, RequestEntryT>
public interface ResultHandler<RequestEntryT>Sophisticated framework for building source readers with split management and coordination.
Key APIs:
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT, SplitStateT>
public interface SplitReader<E, SplitT>
public interface RecordEmitter<E, T, SplitStateT>Advanced source that can switch between multiple underlying sources with position transfer.
Key APIs:
public class HybridSource<T>
public interface SourceFactory<T, SourceT, FromEnumT>
public interface SourceSwitchContext<EnumT>Pluggable strategies for controlling throughput, handling backpressure, and dynamic scaling.
Key APIs:
public interface RateLimitingStrategy
public interface ScalingStrategy<T>
public class CongestionControlRateLimitingStrategy
public class AIMDScalingStrategyBase classes for integrating async sinks with Flink's Table API and SQL.
Key APIs:
public abstract class AsyncDynamicTableSinkFactory
public class AsyncDynamicTableSink
public interface ConfigurationValidatorDeliveryGuarantee
public enum DeliveryGuarantee implements DescribedEnum {
EXACTLY_ONCE, // Records delivered exactly once, even under failover
AT_LEAST_ONCE, // Records ensured delivery but may be duplicated
NONE // Best effort delivery, may lose or duplicate records
}RequestEntryWrapper
public class RequestEntryWrapper<RequestEntryT> {
public RequestEntryWrapper(RequestEntryT requestEntry, long size)
public RequestEntryT getRequestEntry()
public long getSize()
}Batch
public class Batch<RequestEntryT extends Serializable> {
public Batch(List<RequestEntryT> batchEntries, long sizeInBytes)
public List<RequestEntryT> getBatchEntries()
public long getSizeInBytes()
public int getRecordCount()
}BufferedRequestState
public class BufferedRequestState<RequestEntryT extends Serializable> {
public BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries)
public List<RequestEntryWrapper<RequestEntryT>> getBufferedRequestEntries()
public long getStateSize()
public static <T extends Serializable> BufferedRequestState<T> emptyState()
}RequestInfo
public interface RequestInfo {
int getBatchSize()
}ResultInfo
public interface ResultInfo {
int getFailedMessages()
int getBatchSize()
}BasicRequestInfo
public class BasicRequestInfo implements RequestInfo {
public BasicRequestInfo(int batchSize)
public int getBatchSize()
}BasicResultInfo
public class BasicResultInfo implements ResultInfo {
public BasicResultInfo(int failedMessages, int batchSize)
public int getFailedMessages()
public int getBatchSize()
}RecordsWithSplitIds
public interface RecordsWithSplitIds<E> {
String nextSplit()
E nextRecordFromSplit()
Set<String> finishedSplits()
void recycle()
}RecordsBySplits
public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
public static <E> RecordsBySplits<E> forRecords(Map<String, Collection<E>> recordsBySplit)
public static <E> RecordsBySplits<E> forFinishedSplit(String splitId)
}AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(100) // Max records per batch
.setMaxBatchSizeInBytes(1024 * 1024) // Max 1MB per batch
.setMaxInFlightRequests(10) // Max concurrent requests
.setMaxBufferedRequests(1000) // Max queued requests
.setMaxTimeInBufferMS(5000) // Max 5s buffering
.setMaxRecordSizeInBytes(256 * 1024) // Max 256KB per record
.setRequestTimeoutMS(60000) // 60s timeout
.setFailOnTimeout(false) // Retry on timeout
.build();// AIMD scaling strategy
AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)
.setIncreaseRate(10) // Linear increase
.setDecreaseFactor(0.5) // 50% decrease on failure
.build();
// Congestion control rate limiting
CongestionControlRateLimitingStrategy rateLimiting =
CongestionControlRateLimitingStrategy.builder()
.setMaxInFlightRequests(50)
.setInitialMaxInFlightMessages(100)
.setScalingStrategy(scalingStrategy)
.build();
AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(100)
.setMaxBatchSizeInBytes(1024 * 1024)
.setMaxInFlightRequests(50)
.setMaxBufferedRequests(1000)
.setMaxTimeInBufferMS(5000)
.setMaxRecordSizeInBytes(256 * 1024)
.setRateLimitingStrategy(rateLimiting) // Custom rate limiting
.build();HybridSource<String> hybridSource = HybridSource
.<String, FileSourceEnumerator>builder(fileSource)
.addSource(
switchContext -> {
// Get end position from previous source
FileSourceEnumerator previousEnumerator = switchContext.getPreviousEnumerator();
long endTimestamp = previousEnumerator.getEndTimestamp();
// Configure next source with derived start position
return KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("events")
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
.setStartingOffsets(OffsetsInitializer.timestamp(endTimestamp))
.build();
},
Boundedness.CONTINUOUS_UNBOUNDED
)
.build();submitRequestEntriesgetSizeInBytesSplitReader.fetch()This comprehensive framework enables building production-ready Flink connectors with sophisticated features like rate limiting, backpressure handling, state management, and fault tolerance built-in.