CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-base

Base classes and utilities for building Apache Flink connectors with async sink patterns, source readers, and advanced streaming capabilities

Pending
Overview
Eval results
Files

async-sink.mddocs/

Async Sink Framework

The Async Sink Framework provides a complete solution for building high-performance, fault-tolerant sinks that integrate with asynchronous destination APIs. It handles batching, buffering, rate limiting, retry logic, and state management automatically.

Core Components

AsyncSinkBase

The foundation class for all async sink implementations.

@PublicEvolving
public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
        implements SupportsWriterState<InputT, BufferedRequestState<RequestEntryT>>, Sink<InputT> {
    
    protected AsyncSinkBase(
            ElementConverter<InputT, RequestEntryT> elementConverter,
            int maxBatchSize,
            int maxInFlightRequests,
            int maxBufferedRequests,
            long maxBatchSizeInBytes,
            long maxTimeInBufferMS,
            long maxRecordSizeInBytes,
            long requestTimeoutMS,
            boolean failOnTimeout)
            
    protected ElementConverter<InputT, RequestEntryT> getElementConverter()
    protected int getMaxBatchSize()
    protected int getMaxInFlightRequests()
    protected int getMaxBufferedRequests()
    protected long getMaxBatchSizeInBytes()
    protected long getMaxTimeInBufferMS()
    protected long getMaxRecordSizeInBytes()
    protected long getRequestTimeoutMS()
    protected boolean getFailOnTimeout()
}

AsyncSinkWriter

The core writer that handles the async sink logic.

@PublicEvolving
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
        implements StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
    
    // Constructor
    public AsyncSinkWriter(
            ElementConverter<InputT, RequestEntryT> elementConverter,
            WriterInitContext context,
            AsyncSinkWriterConfiguration configuration,
            Collection<BufferedRequestState<RequestEntryT>> states,
            BatchCreator<RequestEntryT> batchCreator,
            RequestBuffer<RequestEntryT> bufferedRequestEntries)
    
    // Abstract methods to implement
    protected abstract void submitRequestEntries(
            List<RequestEntryT> requestEntries, 
            ResultHandler<RequestEntryT> resultHandler)
            
    protected abstract long getSizeInBytes(RequestEntryT requestEntry)
    
    // Public interface methods
    public void write(InputT element, Context context) throws IOException, InterruptedException
    public void flush(boolean flush) throws InterruptedException
    public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId)
    public void close()
    
    // Protected helper methods
    protected Consumer<Exception> getFatalExceptionCons()
}

ElementConverter

Transforms stream elements into request entries for the destination.

@PublicEvolving
public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
    RequestEntryT apply(InputT element, SinkWriter.Context context)
    
    default void open(WriterInitContext context) {
        // No-op default implementation
    }
}

ResultHandler

Handles the results of async requests with support for retries and fatal errors.

@PublicEvolving
public interface ResultHandler<RequestEntryT> {
    void complete()
    void completeExceptionally(Exception e)
    void retryForEntries(List<RequestEntryT> requestEntriesToRetry)
}

BatchCreator

Pluggable interface for controlling how request entries are batched.

@PublicEvolving
public interface BatchCreator<RequestEntryT extends Serializable> {
    Batch<RequestEntryT> createNextBatch(
            RequestInfo requestInfo, 
            RequestBuffer<RequestEntryT> bufferedRequestEntries)
}

RequestBuffer

Flexible buffer interface for managing request entries.

@PublicEvolving
public interface RequestBuffer<RequestEntryT extends Serializable> {
    void add(RequestEntryWrapper<RequestEntryT> entry, boolean insertAtHead)
    RequestEntryWrapper<RequestEntryT> poll()
    RequestEntryWrapper<RequestEntryT> peek()
    boolean isEmpty()
    int size()
    Collection<RequestEntryWrapper<RequestEntryT>> getBufferedState()
    long totalSizeInBytes()
}

Implementation Examples

Complete Async Sink Implementation

public class HttpAsyncSink extends AsyncSinkBase<JsonNode, HttpRequestEntry> {
    
    public HttpAsyncSink(String endpoint, int maxBatchSize) {
        super(
            new JsonElementConverter(endpoint),  // Element converter
            maxBatchSize,                        // Max batch size
            10,                                 // Max in-flight requests
            maxBatchSize * 10,                  // Max buffered requests
            1024 * 1024,                       // Max batch size in bytes (1MB)
            5000,                              // Max time in buffer (5s)
            256 * 1024,                        // Max record size (256KB)
            60000,                             // Request timeout (60s)
            false                              // Don't fail on timeout
        );
    }

    @Override
    public SinkWriter<JsonNode> createWriter(WriterInitContext context) throws IOException {
        return new HttpAsyncSinkWriter(
            getElementConverter(),
            context,
            createWriterConfiguration(),
            Collections.emptyList()
        );
    }
    
    private AsyncSinkWriterConfiguration createWriterConfiguration() {
        return AsyncSinkWriterConfiguration.builder()
            .setMaxBatchSize(getMaxBatchSize())
            .setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
            .setMaxInFlightRequests(getMaxInFlightRequests())
            .setMaxBufferedRequests(getMaxBufferedRequests())
            .setMaxTimeInBufferMS(getMaxTimeInBufferMS())
            .setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
            .setRequestTimeoutMS(getRequestTimeoutMS())
            .setFailOnTimeout(getFailOnTimeout())
            .build();
    }
}

// Element converter implementation
public class JsonElementConverter implements ElementConverter<JsonNode, HttpRequestEntry> {
    private final String endpoint;
    
    public JsonElementConverter(String endpoint) {
        this.endpoint = endpoint;
    }
    
    @Override
    public HttpRequestEntry apply(JsonNode element, SinkWriter.Context context) {
        return new HttpRequestEntry(
            endpoint,
            element.toString(),
            context.timestamp(),
            generateRequestId()
        );
    }
    
    private String generateRequestId() {
        return UUID.randomUUID().toString();
    }
}

// Request entry implementation
public class HttpRequestEntry implements Serializable {
    private final String endpoint;
    private final String payload;
    private final long timestamp;
    private final String requestId;
    
    public HttpRequestEntry(String endpoint, String payload, long timestamp, String requestId) {
        this.endpoint = endpoint;
        this.payload = payload;
        this.timestamp = timestamp;
        this.requestId = requestId;
    }
    
    // Getters
    public String getEndpoint() { return endpoint; }
    public String getPayload() { return payload; }
    public long getTimestamp() { return timestamp; }
    public String getRequestId() { return requestId; }
}

// Async sink writer implementation
public class HttpAsyncSinkWriter extends AsyncSinkWriter<JsonNode, HttpRequestEntry> {
    private final HttpAsyncClient httpClient;
    private final ObjectMapper objectMapper;
    
    public HttpAsyncSinkWriter(
            ElementConverter<JsonNode, HttpRequestEntry> elementConverter,
            WriterInitContext context,
            AsyncSinkWriterConfiguration configuration,
            Collection<BufferedRequestState<HttpRequestEntry>> states) {
        super(elementConverter, context, configuration, states);
        this.httpClient = new HttpAsyncClient();
        this.objectMapper = new ObjectMapper();
    }

    @Override
    protected void submitRequestEntries(
            List<HttpRequestEntry> requestEntries, 
            ResultHandler<HttpRequestEntry> resultHandler) {
        
        // Create batch request
        BatchHttpRequest batchRequest = createBatchRequest(requestEntries);
        
        // Submit asynchronously
        CompletableFuture<BatchHttpResponse> future = httpClient.submitBatch(batchRequest);
        
        // Handle response
        future.whenComplete((response, error) -> {
            if (error != null) {
                if (isFatalError(error)) {
                    resultHandler.completeExceptionally(new RuntimeException(
                        "Fatal error in HTTP request", error));
                } else {
                    // Retry all entries on network error
                    resultHandler.retryForEntries(requestEntries);
                }
            } else if (response.hasFailures()) {
                // Partial failure - retry only failed entries
                List<HttpRequestEntry> failedEntries = extractFailedEntries(requestEntries, response);
                resultHandler.retryForEntries(failedEntries);
            } else {
                // Complete success
                resultHandler.complete();
            }
        });
    }

    @Override
    protected long getSizeInBytes(HttpRequestEntry requestEntry) {
        // Estimate size including headers and metadata
        return requestEntry.getPayload().getBytes(StandardCharsets.UTF_8).length + 
               requestEntry.getEndpoint().getBytes(StandardCharsets.UTF_8).length +
               100; // Approximate header overhead
    }
    
    private BatchHttpRequest createBatchRequest(List<HttpRequestEntry> entries) {
        List<String> payloads = entries.stream()
            .map(HttpRequestEntry::getPayload)
            .collect(Collectors.toList());
            
        return new BatchHttpRequest(
            entries.get(0).getEndpoint(),
            payloads,
            generateBatchId(),
            System.currentTimeMillis()
        );
    }
    
    private boolean isFatalError(Throwable error) {
        // Consider authentication, authorization, and configuration errors as fatal
        return error instanceof AuthenticationException ||
               error instanceof AuthorizationException ||
               error instanceof MalformedURLException;
    }
    
    private List<HttpRequestEntry> extractFailedEntries(
            List<HttpRequestEntry> originalEntries, 
            BatchHttpResponse response) {
        List<HttpRequestEntry> failed = new ArrayList<>();
        List<Integer> failedIndices = response.getFailedIndices();
        
        for (Integer index : failedIndices) {
            if (index < originalEntries.size()) {
                failed.add(originalEntries.get(index));
            }
        }
        
        return failed;
    }
    
    private String generateBatchId() {
        return UUID.randomUUID().toString();
    }
}

Custom Batch Creator

public class SizeLimitedBatchCreator implements BatchCreator<HttpRequestEntry> {
    private final long maxBatchSizeInBytes;
    
    public SizeLimitedBatchCreator(long maxBatchSizeInBytes) {
        this.maxBatchSizeInBytes = maxBatchSizeInBytes;
    }
    
    @Override
    public Batch<HttpRequestEntry> createNextBatch(
            RequestInfo requestInfo, 
            RequestBuffer<HttpRequestEntry> bufferedRequestEntries) {
        
        List<HttpRequestEntry> batch = new ArrayList<>();
        long totalSize = 0;
        int maxBatchSize = requestInfo.getBatchSize();
        
        // Add entries until we hit size or count limits
        while (!bufferedRequestEntries.isEmpty() && 
               batch.size() < maxBatchSize) {
            
            RequestEntryWrapper<HttpRequestEntry> wrapper = bufferedRequestEntries.peek();
            if (wrapper == null) {
                break;
            }
            
            // Check if adding this entry would exceed size limit
            if (totalSize + wrapper.getSize() > maxBatchSizeInBytes && !batch.isEmpty()) {
                break;
            }
            
            // Remove from buffer and add to batch
            bufferedRequestEntries.poll();
            batch.add(wrapper.getRequestEntry());
            totalSize += wrapper.getSize();
        }
        
        return new Batch<>(batch, totalSize);
    }
}

Advanced Error Handling

public class AdvancedHttpAsyncSinkWriter extends AsyncSinkWriter<JsonNode, HttpRequestEntry> {
    private final FatalExceptionClassifier fatalExceptionClassifier;
    
    public AdvancedHttpAsyncSinkWriter(
            ElementConverter<JsonNode, HttpRequestEntry> elementConverter,
            WriterInitContext context,
            AsyncSinkWriterConfiguration configuration,
            Collection<BufferedRequestState<HttpRequestEntry>> states) {
        super(elementConverter, context, configuration, states);
        
        // Create chain of fatal exception classifiers
        this.fatalExceptionClassifier = FatalExceptionClassifier.createChain(
            FatalExceptionClassifier.withRootCauseOfType(
                AuthenticationException.class,
                cause -> new RuntimeException("Authentication failed", cause)
            ),
            FatalExceptionClassifier.withRootCauseOfType(
                IllegalArgumentException.class,
                cause -> new RuntimeException("Invalid request configuration", cause)
            )
        );
    }
    
    @Override
    protected void submitRequestEntries(
            List<HttpRequestEntry> requestEntries, 
            ResultHandler<HttpRequestEntry> resultHandler) {
        
        CompletableFuture<BatchHttpResponse> future = httpClient.submitBatch(
            createBatchRequest(requestEntries));
        
        future.whenComplete((response, error) -> {
            if (error != null) {
                // Use classifier to determine if error is fatal
                if (fatalExceptionClassifier.isFatal(error, getFatalExceptionCons())) {
                    return; // Fatal exception consumer already called
                } else {
                    // Retryable error - retry all entries
                    resultHandler.retryForEntries(requestEntries);
                }
            } else {
                handleResponse(requestEntries, response, resultHandler);
            }
        });
    }
    
    private void handleResponse(
            List<HttpRequestEntry> requestEntries,
            BatchHttpResponse response,
            ResultHandler<HttpRequestEntry> resultHandler) {
        
        if (response.isSuccess()) {
            resultHandler.complete();
        } else if (response.hasPartialFailures()) {
            List<HttpRequestEntry> failedEntries = new ArrayList<>();
            
            for (int i = 0; i < requestEntries.size(); i++) {
                if (response.isFailed(i)) {
                    int statusCode = response.getStatusCode(i);
                    
                    // Check if individual failure is retryable
                    if (isRetryableStatusCode(statusCode)) {
                        failedEntries.add(requestEntries.get(i));
                    }
                    // Non-retryable individual failures are dropped (logged elsewhere)
                }
            }
            
            if (!failedEntries.isEmpty()) {
                resultHandler.retryForEntries(failedEntries);
            } else {
                resultHandler.complete();
            }
        } else {
            // Complete failure - retry all if retryable
            if (isRetryableStatusCode(response.getOverallStatusCode())) {
                resultHandler.retryForEntries(requestEntries);
            } else {
                // Non-retryable failure - complete (entries lost, logged elsewhere)
                resultHandler.complete();
            }
        }
    }
    
    private boolean isRetryableStatusCode(int statusCode) {
        // 5xx server errors and some 4xx errors are retryable
        return statusCode >= 500 || 
               statusCode == 408 ||  // Request Timeout
               statusCode == 429;    // Too Many Requests
    }
}

Configuration Patterns

Basic Configuration Builder Pattern

AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
    .setMaxBatchSize(100)                    // Records per batch
    .setMaxBatchSizeInBytes(1024 * 1024)    // Bytes per batch (1MB)
    .setMaxInFlightRequests(10)             // Concurrent requests
    .setMaxBufferedRequests(1000)           // Queue capacity
    .setMaxTimeInBufferMS(5000)             // Max buffering delay (5s)
    .setMaxRecordSizeInBytes(256 * 1024)    // Max record size (256KB)
    .setRequestTimeoutMS(30000)             // Request timeout (30s)
    .setFailOnTimeout(false)                // Retry on timeout
    .build();

Advanced Configuration with Custom Rate Limiting

// Create AIMD scaling strategy
AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)
    .setIncreaseRate(5)          // Increase by 5 per success
    .setDecreaseFactor(0.7)      // Decrease by 30% on failure
    .build();

// Create congestion control rate limiting
CongestionControlRateLimitingStrategy rateLimiting = 
    CongestionControlRateLimitingStrategy.builder()
        .setMaxInFlightRequests(50)
        .setInitialMaxInFlightMessages(100)
        .setScalingStrategy(scalingStrategy)
        .build();

// Apply to configuration
AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
    .setMaxBatchSize(200)
    .setMaxBatchSizeInBytes(2 * 1024 * 1024)  // 2MB
    .setMaxInFlightRequests(50)
    .setMaxBufferedRequests(2000)
    .setMaxTimeInBufferMS(3000)               // 3s
    .setMaxRecordSizeInBytes(512 * 1024)      // 512KB
    .setRateLimitingStrategy(rateLimiting)    // Custom rate limiting
    .build();

State Management

The async sink framework provides automatic state management for fault tolerance:

BufferedRequestState

@PublicEvolving
public class BufferedRequestState<RequestEntryT extends Serializable> {
    public BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries)
    public BufferedRequestState(RequestBuffer<RequestEntryT> requestBuffer)
    
    public List<RequestEntryWrapper<RequestEntryT>> getBufferedRequestEntries()
    public long getStateSize()
    
    public static <T extends Serializable> BufferedRequestState<T> emptyState()
}

State Serializer (Custom Implementation)

public class HttpRequestEntryStateSerializer 
        extends AsyncSinkWriterStateSerializer<HttpRequestEntry> {
    
    @Override
    protected void serializeRequestToStream(
            HttpRequestEntry request, 
            DataOutputStream out) throws IOException {
        out.writeUTF(request.getEndpoint());
        out.writeUTF(request.getPayload());
        out.writeLong(request.getTimestamp());
        out.writeUTF(request.getRequestId());
    }
    
    @Override
    protected HttpRequestEntry deserializeRequestFromStream(
            long requestSize, 
            DataInputStream in) throws IOException {
        String endpoint = in.readUTF();
        String payload = in.readUTF();
        long timestamp = in.readLong();
        String requestId = in.readUTF();
        
        return new HttpRequestEntry(endpoint, payload, timestamp, requestId);
    }
}

Best Practices

Performance Optimization

  1. Batch Size Tuning: Balance between latency and throughput
  2. Buffer Management: Size buffers based on memory constraints and throughput requirements
  3. Rate Limiting: Configure based on destination capacity and network conditions
  4. Request Sizing: Implement efficient getSizeInBytes() calculations

Error Handling

  1. Fatal vs Retryable: Properly classify exceptions to avoid infinite retries
  2. Partial Failures: Handle individual entry failures in batch responses
  3. Timeout Handling: Configure appropriate timeouts for network conditions
  4. Exception Classification: Use FatalExceptionClassifier for sophisticated error handling

Resource Management

  1. Connection Pooling: Reuse HTTP connections in async clients
  2. Memory Management: Monitor buffer sizes and implement backpressure
  3. Thread Management: Ensure proper cleanup of async resources
  4. Metrics Integration: Use built-in metrics for monitoring and alerting

The async sink framework provides a robust foundation for building production-ready sinks with sophisticated features like automatic batching, rate limiting, fault tolerance, and state management.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-base

docs

async-sink.md

hybrid-source.md

index.md

rate-limiting.md

source-reader.md

table-api.md

tile.json