Base classes and utilities for building Apache Flink connectors with async sink patterns, source readers, and advanced streaming capabilities
—
The Async Sink Framework provides a complete solution for building high-performance, fault-tolerant sinks that integrate with asynchronous destination APIs. It handles batching, buffering, rate limiting, retry logic, and state management automatically.
The foundation class for all async sink implementations.
@PublicEvolving
public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
implements SupportsWriterState<InputT, BufferedRequestState<RequestEntryT>>, Sink<InputT> {
protected AsyncSinkBase(
ElementConverter<InputT, RequestEntryT> elementConverter,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
long requestTimeoutMS,
boolean failOnTimeout)
protected ElementConverter<InputT, RequestEntryT> getElementConverter()
protected int getMaxBatchSize()
protected int getMaxInFlightRequests()
protected int getMaxBufferedRequests()
protected long getMaxBatchSizeInBytes()
protected long getMaxTimeInBufferMS()
protected long getMaxRecordSizeInBytes()
protected long getRequestTimeoutMS()
protected boolean getFailOnTimeout()
}The core writer that handles the async sink logic.
@PublicEvolving
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
implements StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
// Constructor
public AsyncSinkWriter(
ElementConverter<InputT, RequestEntryT> elementConverter,
WriterInitContext context,
AsyncSinkWriterConfiguration configuration,
Collection<BufferedRequestState<RequestEntryT>> states,
BatchCreator<RequestEntryT> batchCreator,
RequestBuffer<RequestEntryT> bufferedRequestEntries)
// Abstract methods to implement
protected abstract void submitRequestEntries(
List<RequestEntryT> requestEntries,
ResultHandler<RequestEntryT> resultHandler)
protected abstract long getSizeInBytes(RequestEntryT requestEntry)
// Public interface methods
public void write(InputT element, Context context) throws IOException, InterruptedException
public void flush(boolean flush) throws InterruptedException
public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId)
public void close()
// Protected helper methods
protected Consumer<Exception> getFatalExceptionCons()
}Transforms stream elements into request entries for the destination.
@PublicEvolving
public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
RequestEntryT apply(InputT element, SinkWriter.Context context)
default void open(WriterInitContext context) {
// No-op default implementation
}
}Handles the results of async requests with support for retries and fatal errors.
@PublicEvolving
public interface ResultHandler<RequestEntryT> {
void complete()
void completeExceptionally(Exception e)
void retryForEntries(List<RequestEntryT> requestEntriesToRetry)
}Pluggable interface for controlling how request entries are batched.
@PublicEvolving
public interface BatchCreator<RequestEntryT extends Serializable> {
Batch<RequestEntryT> createNextBatch(
RequestInfo requestInfo,
RequestBuffer<RequestEntryT> bufferedRequestEntries)
}Flexible buffer interface for managing request entries.
@PublicEvolving
public interface RequestBuffer<RequestEntryT extends Serializable> {
void add(RequestEntryWrapper<RequestEntryT> entry, boolean insertAtHead)
RequestEntryWrapper<RequestEntryT> poll()
RequestEntryWrapper<RequestEntryT> peek()
boolean isEmpty()
int size()
Collection<RequestEntryWrapper<RequestEntryT>> getBufferedState()
long totalSizeInBytes()
}public class HttpAsyncSink extends AsyncSinkBase<JsonNode, HttpRequestEntry> {
public HttpAsyncSink(String endpoint, int maxBatchSize) {
super(
new JsonElementConverter(endpoint), // Element converter
maxBatchSize, // Max batch size
10, // Max in-flight requests
maxBatchSize * 10, // Max buffered requests
1024 * 1024, // Max batch size in bytes (1MB)
5000, // Max time in buffer (5s)
256 * 1024, // Max record size (256KB)
60000, // Request timeout (60s)
false // Don't fail on timeout
);
}
@Override
public SinkWriter<JsonNode> createWriter(WriterInitContext context) throws IOException {
return new HttpAsyncSinkWriter(
getElementConverter(),
context,
createWriterConfiguration(),
Collections.emptyList()
);
}
private AsyncSinkWriterConfiguration createWriterConfiguration() {
return AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(getMaxBatchSize())
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
.setMaxInFlightRequests(getMaxInFlightRequests())
.setMaxBufferedRequests(getMaxBufferedRequests())
.setMaxTimeInBufferMS(getMaxTimeInBufferMS())
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
.setRequestTimeoutMS(getRequestTimeoutMS())
.setFailOnTimeout(getFailOnTimeout())
.build();
}
}
// Element converter implementation
public class JsonElementConverter implements ElementConverter<JsonNode, HttpRequestEntry> {
private final String endpoint;
public JsonElementConverter(String endpoint) {
this.endpoint = endpoint;
}
@Override
public HttpRequestEntry apply(JsonNode element, SinkWriter.Context context) {
return new HttpRequestEntry(
endpoint,
element.toString(),
context.timestamp(),
generateRequestId()
);
}
private String generateRequestId() {
return UUID.randomUUID().toString();
}
}
// Request entry implementation
public class HttpRequestEntry implements Serializable {
private final String endpoint;
private final String payload;
private final long timestamp;
private final String requestId;
public HttpRequestEntry(String endpoint, String payload, long timestamp, String requestId) {
this.endpoint = endpoint;
this.payload = payload;
this.timestamp = timestamp;
this.requestId = requestId;
}
// Getters
public String getEndpoint() { return endpoint; }
public String getPayload() { return payload; }
public long getTimestamp() { return timestamp; }
public String getRequestId() { return requestId; }
}
// Async sink writer implementation
public class HttpAsyncSinkWriter extends AsyncSinkWriter<JsonNode, HttpRequestEntry> {
private final HttpAsyncClient httpClient;
private final ObjectMapper objectMapper;
public HttpAsyncSinkWriter(
ElementConverter<JsonNode, HttpRequestEntry> elementConverter,
WriterInitContext context,
AsyncSinkWriterConfiguration configuration,
Collection<BufferedRequestState<HttpRequestEntry>> states) {
super(elementConverter, context, configuration, states);
this.httpClient = new HttpAsyncClient();
this.objectMapper = new ObjectMapper();
}
@Override
protected void submitRequestEntries(
List<HttpRequestEntry> requestEntries,
ResultHandler<HttpRequestEntry> resultHandler) {
// Create batch request
BatchHttpRequest batchRequest = createBatchRequest(requestEntries);
// Submit asynchronously
CompletableFuture<BatchHttpResponse> future = httpClient.submitBatch(batchRequest);
// Handle response
future.whenComplete((response, error) -> {
if (error != null) {
if (isFatalError(error)) {
resultHandler.completeExceptionally(new RuntimeException(
"Fatal error in HTTP request", error));
} else {
// Retry all entries on network error
resultHandler.retryForEntries(requestEntries);
}
} else if (response.hasFailures()) {
// Partial failure - retry only failed entries
List<HttpRequestEntry> failedEntries = extractFailedEntries(requestEntries, response);
resultHandler.retryForEntries(failedEntries);
} else {
// Complete success
resultHandler.complete();
}
});
}
@Override
protected long getSizeInBytes(HttpRequestEntry requestEntry) {
// Estimate size including headers and metadata
return requestEntry.getPayload().getBytes(StandardCharsets.UTF_8).length +
requestEntry.getEndpoint().getBytes(StandardCharsets.UTF_8).length +
100; // Approximate header overhead
}
private BatchHttpRequest createBatchRequest(List<HttpRequestEntry> entries) {
List<String> payloads = entries.stream()
.map(HttpRequestEntry::getPayload)
.collect(Collectors.toList());
return new BatchHttpRequest(
entries.get(0).getEndpoint(),
payloads,
generateBatchId(),
System.currentTimeMillis()
);
}
private boolean isFatalError(Throwable error) {
// Consider authentication, authorization, and configuration errors as fatal
return error instanceof AuthenticationException ||
error instanceof AuthorizationException ||
error instanceof MalformedURLException;
}
private List<HttpRequestEntry> extractFailedEntries(
List<HttpRequestEntry> originalEntries,
BatchHttpResponse response) {
List<HttpRequestEntry> failed = new ArrayList<>();
List<Integer> failedIndices = response.getFailedIndices();
for (Integer index : failedIndices) {
if (index < originalEntries.size()) {
failed.add(originalEntries.get(index));
}
}
return failed;
}
private String generateBatchId() {
return UUID.randomUUID().toString();
}
}public class SizeLimitedBatchCreator implements BatchCreator<HttpRequestEntry> {
private final long maxBatchSizeInBytes;
public SizeLimitedBatchCreator(long maxBatchSizeInBytes) {
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
}
@Override
public Batch<HttpRequestEntry> createNextBatch(
RequestInfo requestInfo,
RequestBuffer<HttpRequestEntry> bufferedRequestEntries) {
List<HttpRequestEntry> batch = new ArrayList<>();
long totalSize = 0;
int maxBatchSize = requestInfo.getBatchSize();
// Add entries until we hit size or count limits
while (!bufferedRequestEntries.isEmpty() &&
batch.size() < maxBatchSize) {
RequestEntryWrapper<HttpRequestEntry> wrapper = bufferedRequestEntries.peek();
if (wrapper == null) {
break;
}
// Check if adding this entry would exceed size limit
if (totalSize + wrapper.getSize() > maxBatchSizeInBytes && !batch.isEmpty()) {
break;
}
// Remove from buffer and add to batch
bufferedRequestEntries.poll();
batch.add(wrapper.getRequestEntry());
totalSize += wrapper.getSize();
}
return new Batch<>(batch, totalSize);
}
}public class AdvancedHttpAsyncSinkWriter extends AsyncSinkWriter<JsonNode, HttpRequestEntry> {
private final FatalExceptionClassifier fatalExceptionClassifier;
public AdvancedHttpAsyncSinkWriter(
ElementConverter<JsonNode, HttpRequestEntry> elementConverter,
WriterInitContext context,
AsyncSinkWriterConfiguration configuration,
Collection<BufferedRequestState<HttpRequestEntry>> states) {
super(elementConverter, context, configuration, states);
// Create chain of fatal exception classifiers
this.fatalExceptionClassifier = FatalExceptionClassifier.createChain(
FatalExceptionClassifier.withRootCauseOfType(
AuthenticationException.class,
cause -> new RuntimeException("Authentication failed", cause)
),
FatalExceptionClassifier.withRootCauseOfType(
IllegalArgumentException.class,
cause -> new RuntimeException("Invalid request configuration", cause)
)
);
}
@Override
protected void submitRequestEntries(
List<HttpRequestEntry> requestEntries,
ResultHandler<HttpRequestEntry> resultHandler) {
CompletableFuture<BatchHttpResponse> future = httpClient.submitBatch(
createBatchRequest(requestEntries));
future.whenComplete((response, error) -> {
if (error != null) {
// Use classifier to determine if error is fatal
if (fatalExceptionClassifier.isFatal(error, getFatalExceptionCons())) {
return; // Fatal exception consumer already called
} else {
// Retryable error - retry all entries
resultHandler.retryForEntries(requestEntries);
}
} else {
handleResponse(requestEntries, response, resultHandler);
}
});
}
private void handleResponse(
List<HttpRequestEntry> requestEntries,
BatchHttpResponse response,
ResultHandler<HttpRequestEntry> resultHandler) {
if (response.isSuccess()) {
resultHandler.complete();
} else if (response.hasPartialFailures()) {
List<HttpRequestEntry> failedEntries = new ArrayList<>();
for (int i = 0; i < requestEntries.size(); i++) {
if (response.isFailed(i)) {
int statusCode = response.getStatusCode(i);
// Check if individual failure is retryable
if (isRetryableStatusCode(statusCode)) {
failedEntries.add(requestEntries.get(i));
}
// Non-retryable individual failures are dropped (logged elsewhere)
}
}
if (!failedEntries.isEmpty()) {
resultHandler.retryForEntries(failedEntries);
} else {
resultHandler.complete();
}
} else {
// Complete failure - retry all if retryable
if (isRetryableStatusCode(response.getOverallStatusCode())) {
resultHandler.retryForEntries(requestEntries);
} else {
// Non-retryable failure - complete (entries lost, logged elsewhere)
resultHandler.complete();
}
}
}
private boolean isRetryableStatusCode(int statusCode) {
// 5xx server errors and some 4xx errors are retryable
return statusCode >= 500 ||
statusCode == 408 || // Request Timeout
statusCode == 429; // Too Many Requests
}
}AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(100) // Records per batch
.setMaxBatchSizeInBytes(1024 * 1024) // Bytes per batch (1MB)
.setMaxInFlightRequests(10) // Concurrent requests
.setMaxBufferedRequests(1000) // Queue capacity
.setMaxTimeInBufferMS(5000) // Max buffering delay (5s)
.setMaxRecordSizeInBytes(256 * 1024) // Max record size (256KB)
.setRequestTimeoutMS(30000) // Request timeout (30s)
.setFailOnTimeout(false) // Retry on timeout
.build();// Create AIMD scaling strategy
AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)
.setIncreaseRate(5) // Increase by 5 per success
.setDecreaseFactor(0.7) // Decrease by 30% on failure
.build();
// Create congestion control rate limiting
CongestionControlRateLimitingStrategy rateLimiting =
CongestionControlRateLimitingStrategy.builder()
.setMaxInFlightRequests(50)
.setInitialMaxInFlightMessages(100)
.setScalingStrategy(scalingStrategy)
.build();
// Apply to configuration
AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(200)
.setMaxBatchSizeInBytes(2 * 1024 * 1024) // 2MB
.setMaxInFlightRequests(50)
.setMaxBufferedRequests(2000)
.setMaxTimeInBufferMS(3000) // 3s
.setMaxRecordSizeInBytes(512 * 1024) // 512KB
.setRateLimitingStrategy(rateLimiting) // Custom rate limiting
.build();The async sink framework provides automatic state management for fault tolerance:
@PublicEvolving
public class BufferedRequestState<RequestEntryT extends Serializable> {
public BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries)
public BufferedRequestState(RequestBuffer<RequestEntryT> requestBuffer)
public List<RequestEntryWrapper<RequestEntryT>> getBufferedRequestEntries()
public long getStateSize()
public static <T extends Serializable> BufferedRequestState<T> emptyState()
}public class HttpRequestEntryStateSerializer
extends AsyncSinkWriterStateSerializer<HttpRequestEntry> {
@Override
protected void serializeRequestToStream(
HttpRequestEntry request,
DataOutputStream out) throws IOException {
out.writeUTF(request.getEndpoint());
out.writeUTF(request.getPayload());
out.writeLong(request.getTimestamp());
out.writeUTF(request.getRequestId());
}
@Override
protected HttpRequestEntry deserializeRequestFromStream(
long requestSize,
DataInputStream in) throws IOException {
String endpoint = in.readUTF();
String payload = in.readUTF();
long timestamp = in.readLong();
String requestId = in.readUTF();
return new HttpRequestEntry(endpoint, payload, timestamp, requestId);
}
}getSizeInBytes() calculationsFatalExceptionClassifier for sophisticated error handlingThe async sink framework provides a robust foundation for building production-ready sinks with sophisticated features like automatic batching, rate limiting, fault tolerance, and state management.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-base