Base classes and utilities for building Apache Flink connectors with async sink patterns, source readers, and advanced streaming capabilities
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-base@2.1.0The Apache Flink Connector Base library provides foundational classes and utilities for building high-performance, production-ready Apache Flink connectors. It offers sophisticated async sink and source frameworks with built-in features like backpressure handling, rate limiting, checkpointing, and hybrid source switching.
Maven Coordinates:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.18+</version>
</dependency>Package: org.apache.flink.connector.base
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;import org.apache.flink.connector.base.sink.writer.BatchCreator;
import org.apache.flink.connector.base.sink.writer.RequestBuffer;
import org.apache.flink.connector.base.sink.writer.ResultHandler;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;// Define your request entry type
public class MyRequestEntry implements Serializable {
private final String data;
private final long timestamp;
public MyRequestEntry(String data, long timestamp) {
this.data = data;
this.timestamp = timestamp;
}
public String getData() { return data; }
public long getTimestamp() { return timestamp; }
}
// Implement ElementConverter
public class MyElementConverter implements ElementConverter<String, MyRequestEntry> {
@Override
public MyRequestEntry apply(String element, SinkWriter.Context context) {
return new MyRequestEntry(element, context.timestamp());
}
}
// Create AsyncSink implementation
public class MyAsyncSink extends AsyncSinkBase<String, MyRequestEntry> {
public MyAsyncSink() {
super(
new MyElementConverter(), // Element converter
100, // Max batch size
10, // Max in-flight requests
1000, // Max buffered requests
1024 * 1024, // Max batch size in bytes
5000, // Max time in buffer (ms)
256 * 1024, // Max record size in bytes
60000, // Request timeout (ms)
false // Fail on timeout
);
}
@Override
public SinkWriter<String> createWriter(WriterInitContext context) throws IOException {
return new MyAsyncSinkWriter(
getElementConverter(),
context,
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(getMaxBatchSize())
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
.setMaxInFlightRequests(getMaxInFlightRequests())
.setMaxBufferedRequests(getMaxBufferedRequests())
.setMaxTimeInBufferMS(getMaxTimeInBufferMS())
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
.build(),
Collections.emptyList()
);
}
}
// Implement AsyncSinkWriter
public class MyAsyncSinkWriter extends AsyncSinkWriter<String, MyRequestEntry> {
private final AsyncClient client;
public MyAsyncSinkWriter(
ElementConverter<String, MyRequestEntry> elementConverter,
WriterInitContext context,
AsyncSinkWriterConfiguration configuration,
Collection<BufferedRequestState<MyRequestEntry>> states) {
super(elementConverter, context, configuration, states);
this.client = new AsyncClient();
}
@Override
protected void submitRequestEntries(
List<MyRequestEntry> requestEntries,
ResultHandler<MyRequestEntry> resultHandler) {
CompletableFuture<Response> future = client.sendBatch(requestEntries);
future.whenComplete((response, error) -> {
if (error != null && isFatalError(error)) {
resultHandler.completeExceptionally(new RuntimeException(error));
} else if (error != null || response.hasFailures()) {
List<MyRequestEntry> failedEntries = getFailedEntries(requestEntries, response);
resultHandler.retryForEntries(failedEntries);
} else {
resultHandler.complete();
}
});
}
@Override
protected long getSizeInBytes(MyRequestEntry requestEntry) {
return requestEntry.getData().length() + 8; // Data length + timestamp
}
}// Create file and Kafka sources
FileSource<String> fileSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), path)
.build();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("events")
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
// Create hybrid source that reads files first, then switches to Kafka
HybridSource<String> hybridSource = HybridSource.builder(fileSource)
.addSource(kafkaSource)
.build();
// Use in DataStream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.fromSource(
hybridSource,
WatermarkStrategy.noWatermarks(),
"hybrid-source"
);The connector base library is organized into several key architectural components:
Complete framework for building async sinks with batching, buffering, rate limiting, and fault tolerance.
Key APIs:
public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
public interface ElementConverter<InputT, RequestEntryT>
public interface ResultHandler<RequestEntryT>Sophisticated framework for building source readers with split management and coordination.
Key APIs:
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT, SplitStateT>
public interface SplitReader<E, SplitT>
public interface RecordEmitter<E, T, SplitStateT>Advanced source that can switch between multiple underlying sources with position transfer.
Key APIs:
public class HybridSource<T>
public interface SourceFactory<T, SourceT, FromEnumT>
public interface SourceSwitchContext<EnumT>Pluggable strategies for controlling throughput, handling backpressure, and dynamic scaling.
Key APIs:
public interface RateLimitingStrategy
public interface ScalingStrategy<T>
public class CongestionControlRateLimitingStrategy
public class AIMDScalingStrategyBase classes for integrating async sinks with Flink's Table API and SQL.
Key APIs:
public abstract class AsyncDynamicTableSinkFactory
public class AsyncDynamicTableSink
public interface ConfigurationValidatorDeliveryGuarantee
public enum DeliveryGuarantee implements DescribedEnum {
EXACTLY_ONCE, // Records delivered exactly once, even under failover
AT_LEAST_ONCE, // Records ensured delivery but may be duplicated
NONE // Best effort delivery, may lose or duplicate records
}RequestEntryWrapper
public class RequestEntryWrapper<RequestEntryT> {
public RequestEntryWrapper(RequestEntryT requestEntry, long size)
public RequestEntryT getRequestEntry()
public long getSize()
}Batch
public class Batch<RequestEntryT extends Serializable> {
public Batch(List<RequestEntryT> batchEntries, long sizeInBytes)
public List<RequestEntryT> getBatchEntries()
public long getSizeInBytes()
public int getRecordCount()
}BufferedRequestState
public class BufferedRequestState<RequestEntryT extends Serializable> {
public BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries)
public List<RequestEntryWrapper<RequestEntryT>> getBufferedRequestEntries()
public long getStateSize()
public static <T extends Serializable> BufferedRequestState<T> emptyState()
}RequestInfo
public interface RequestInfo {
int getBatchSize()
}ResultInfo
public interface ResultInfo {
int getFailedMessages()
int getBatchSize()
}BasicRequestInfo
public class BasicRequestInfo implements RequestInfo {
public BasicRequestInfo(int batchSize)
public int getBatchSize()
}BasicResultInfo
public class BasicResultInfo implements ResultInfo {
public BasicResultInfo(int failedMessages, int batchSize)
public int getFailedMessages()
public int getBatchSize()
}RecordsWithSplitIds
public interface RecordsWithSplitIds<E> {
String nextSplit()
E nextRecordFromSplit()
Set<String> finishedSplits()
void recycle()
}RecordsBySplits
public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
public static <E> RecordsBySplits<E> forRecords(Map<String, Collection<E>> recordsBySplit)
public static <E> RecordsBySplits<E> forFinishedSplit(String splitId)
}AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(100) // Max records per batch
.setMaxBatchSizeInBytes(1024 * 1024) // Max 1MB per batch
.setMaxInFlightRequests(10) // Max concurrent requests
.setMaxBufferedRequests(1000) // Max queued requests
.setMaxTimeInBufferMS(5000) // Max 5s buffering
.setMaxRecordSizeInBytes(256 * 1024) // Max 256KB per record
.setRequestTimeoutMS(60000) // 60s timeout
.setFailOnTimeout(false) // Retry on timeout
.build();// AIMD scaling strategy
AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)
.setIncreaseRate(10) // Linear increase
.setDecreaseFactor(0.5) // 50% decrease on failure
.build();
// Congestion control rate limiting
CongestionControlRateLimitingStrategy rateLimiting =
CongestionControlRateLimitingStrategy.builder()
.setMaxInFlightRequests(50)
.setInitialMaxInFlightMessages(100)
.setScalingStrategy(scalingStrategy)
.build();
AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(100)
.setMaxBatchSizeInBytes(1024 * 1024)
.setMaxInFlightRequests(50)
.setMaxBufferedRequests(1000)
.setMaxTimeInBufferMS(5000)
.setMaxRecordSizeInBytes(256 * 1024)
.setRateLimitingStrategy(rateLimiting) // Custom rate limiting
.build();HybridSource<String> hybridSource = HybridSource
.<String, FileSourceEnumerator>builder(fileSource)
.addSource(
switchContext -> {
// Get end position from previous source
FileSourceEnumerator previousEnumerator = switchContext.getPreviousEnumerator();
long endTimestamp = previousEnumerator.getEndTimestamp();
// Configure next source with derived start position
return KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("events")
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
.setStartingOffsets(OffsetsInitializer.timestamp(endTimestamp))
.build();
},
Boundedness.CONTINUOUS_UNBOUNDED
)
.build();submitRequestEntriesgetSizeInBytesSplitReader.fetch()This comprehensive framework enables building production-ready Flink connectors with sophisticated features like rate limiting, backpressure handling, state management, and fault tolerance built-in.