or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-sink.mdhybrid-source.mdindex.mdrate-limiting.mdsource-reader.mdtable-api.md
tile.json

tessl/maven-org-apache-flink--flink-connector-base

Base classes and utilities for building Apache Flink connectors with async sink patterns, source readers, and advanced streaming capabilities

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-base@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-base@2.1.0

index.mddocs/

Apache Flink Connector Base

The Apache Flink Connector Base library provides foundational classes and utilities for building high-performance, production-ready Apache Flink connectors. It offers sophisticated async sink and source frameworks with built-in features like backpressure handling, rate limiting, checkpointing, and hybrid source switching.

Package Information

Maven Coordinates:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.18+</version>
</dependency>

Package: org.apache.flink.connector.base

Core Imports

Essential Base Classes

import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;

Configuration and Strategy Classes

import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;

Core Interfaces

import org.apache.flink.connector.base.sink.writer.BatchCreator;
import org.apache.flink.connector.base.sink.writer.RequestBuffer;
import org.apache.flink.connector.base.sink.writer.ResultHandler;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;

Basic Usage

Creating an Async Sink

// Define your request entry type
public class MyRequestEntry implements Serializable {
    private final String data;
    private final long timestamp;
    
    public MyRequestEntry(String data, long timestamp) {
        this.data = data;
        this.timestamp = timestamp;
    }
    
    public String getData() { return data; }
    public long getTimestamp() { return timestamp; }
}

// Implement ElementConverter
public class MyElementConverter implements ElementConverter<String, MyRequestEntry> {
    @Override
    public MyRequestEntry apply(String element, SinkWriter.Context context) {
        return new MyRequestEntry(element, context.timestamp());
    }
}

// Create AsyncSink implementation
public class MyAsyncSink extends AsyncSinkBase<String, MyRequestEntry> {
    public MyAsyncSink() {
        super(
            new MyElementConverter(),  // Element converter
            100,                       // Max batch size
            10,                        // Max in-flight requests
            1000,                      // Max buffered requests
            1024 * 1024,              // Max batch size in bytes
            5000,                      // Max time in buffer (ms)
            256 * 1024,               // Max record size in bytes
            60000,                     // Request timeout (ms)
            false                      // Fail on timeout
        );
    }

    @Override
    public SinkWriter<String> createWriter(WriterInitContext context) throws IOException {
        return new MyAsyncSinkWriter(
            getElementConverter(),
            context,
            AsyncSinkWriterConfiguration.builder()
                .setMaxBatchSize(getMaxBatchSize())
                .setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
                .setMaxInFlightRequests(getMaxInFlightRequests())
                .setMaxBufferedRequests(getMaxBufferedRequests())
                .setMaxTimeInBufferMS(getMaxTimeInBufferMS())
                .setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
                .build(),
            Collections.emptyList()
        );
    }
}

// Implement AsyncSinkWriter
public class MyAsyncSinkWriter extends AsyncSinkWriter<String, MyRequestEntry> {
    private final AsyncClient client;

    public MyAsyncSinkWriter(
            ElementConverter<String, MyRequestEntry> elementConverter,
            WriterInitContext context,
            AsyncSinkWriterConfiguration configuration,
            Collection<BufferedRequestState<MyRequestEntry>> states) {
        super(elementConverter, context, configuration, states);
        this.client = new AsyncClient();
    }

    @Override
    protected void submitRequestEntries(
            List<MyRequestEntry> requestEntries, 
            ResultHandler<MyRequestEntry> resultHandler) {
        
        CompletableFuture<Response> future = client.sendBatch(requestEntries);
        future.whenComplete((response, error) -> {
            if (error != null && isFatalError(error)) {
                resultHandler.completeExceptionally(new RuntimeException(error));
            } else if (error != null || response.hasFailures()) {
                List<MyRequestEntry> failedEntries = getFailedEntries(requestEntries, response);
                resultHandler.retryForEntries(failedEntries);
            } else {
                resultHandler.complete();
            }
        });
    }

    @Override
    protected long getSizeInBytes(MyRequestEntry requestEntry) {
        return requestEntry.getData().length() + 8; // Data length + timestamp
    }
}

Creating a Hybrid Source

// Create file and Kafka sources
FileSource<String> fileSource = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), path)
    .build();

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("events")
    .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
    .setStartingOffsets(OffsetsInitializer.earliest())
    .build();

// Create hybrid source that reads files first, then switches to Kafka
HybridSource<String> hybridSource = HybridSource.builder(fileSource)
    .addSource(kafkaSource)
    .build();

// Use in DataStream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.fromSource(
    hybridSource,
    WatermarkStrategy.noWatermarks(),
    "hybrid-source"
);

Architecture

The connector base library is organized into several key architectural components:

Sink Architecture

  • AsyncSinkBase: Abstract base class for destination-agnostic async sinks
  • AsyncSinkWriter: Core writer that handles batching, buffering, and retry logic
  • ElementConverter: Transforms stream elements into request entries
  • RateLimitingStrategy: Controls throughput and backpressure
  • BatchCreator: Pluggable batching logic for request grouping

Source Architecture

  • SourceReaderBase: Foundation for building custom source readers
  • SplitReader: Interface for reading from individual splits
  • RecordEmitter: Handles record processing and state updates
  • HybridSource: Enables seamless switching between multiple sources

State Management

  • BufferedRequestState: Handles checkpointing for async sinks
  • Split State Management: Automatic state tracking for source splits
  • Serializers: Built-in serialization for state persistence

Capabilities

Async Sink Framework { .api }

Complete framework for building async sinks with batching, buffering, rate limiting, and fault tolerance.

Key APIs:

public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
public interface ElementConverter<InputT, RequestEntryT>
public interface ResultHandler<RequestEntryT>

Source Reader Framework { .api }

Sophisticated framework for building source readers with split management and coordination.

Key APIs:

public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT, SplitStateT>
public interface SplitReader<E, SplitT>
public interface RecordEmitter<E, T, SplitStateT>

Hybrid Source System { .api }

Advanced source that can switch between multiple underlying sources with position transfer.

Key APIs:

public class HybridSource<T>
public interface SourceFactory<T, SourceT, FromEnumT>
public interface SourceSwitchContext<EnumT>

Rate Limiting & Scaling { .api }

Pluggable strategies for controlling throughput, handling backpressure, and dynamic scaling.

Key APIs:

public interface RateLimitingStrategy
public interface ScalingStrategy<T>
public class CongestionControlRateLimitingStrategy
public class AIMDScalingStrategy

Table API Integration { .api }

Base classes for integrating async sinks with Flink's Table API and SQL.

Key APIs:

public abstract class AsyncDynamicTableSinkFactory
public class AsyncDynamicTableSink
public interface ConfigurationValidator

Type Definitions

Core Types

DeliveryGuarantee

public enum DeliveryGuarantee implements DescribedEnum {
    EXACTLY_ONCE,     // Records delivered exactly once, even under failover
    AT_LEAST_ONCE,    // Records ensured delivery but may be duplicated  
    NONE              // Best effort delivery, may lose or duplicate records
}

RequestEntryWrapper

public class RequestEntryWrapper<RequestEntryT> {
    public RequestEntryWrapper(RequestEntryT requestEntry, long size)
    public RequestEntryT getRequestEntry()
    public long getSize()
}

Batch

public class Batch<RequestEntryT extends Serializable> {
    public Batch(List<RequestEntryT> batchEntries, long sizeInBytes)
    public List<RequestEntryT> getBatchEntries()
    public long getSizeInBytes()
    public int getRecordCount()
}

BufferedRequestState

public class BufferedRequestState<RequestEntryT extends Serializable> {
    public BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries)
    public List<RequestEntryWrapper<RequestEntryT>> getBufferedRequestEntries()
    public long getStateSize()
    public static <T extends Serializable> BufferedRequestState<T> emptyState()
}

Strategy Types

RequestInfo

public interface RequestInfo {
    int getBatchSize()
}

ResultInfo

public interface ResultInfo {
    int getFailedMessages()
    int getBatchSize()
}

BasicRequestInfo

public class BasicRequestInfo implements RequestInfo {
    public BasicRequestInfo(int batchSize)
    public int getBatchSize()
}

BasicResultInfo

public class BasicResultInfo implements ResultInfo {
    public BasicResultInfo(int failedMessages, int batchSize)
    public int getFailedMessages()
    public int getBatchSize()
}

Source Types

RecordsWithSplitIds

public interface RecordsWithSplitIds<E> {
    String nextSplit()
    E nextRecordFromSplit()
    Set<String> finishedSplits()
    void recycle()
}

RecordsBySplits

public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
    public static <E> RecordsBySplits<E> forRecords(Map<String, Collection<E>> recordsBySplit)
    public static <E> RecordsBySplits<E> forFinishedSplit(String splitId)
}

Configuration Examples

Basic Async Sink Configuration

AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
    .setMaxBatchSize(100)                    // Max records per batch
    .setMaxBatchSizeInBytes(1024 * 1024)    // Max 1MB per batch
    .setMaxInFlightRequests(10)             // Max concurrent requests
    .setMaxBufferedRequests(1000)           // Max queued requests
    .setMaxTimeInBufferMS(5000)             // Max 5s buffering
    .setMaxRecordSizeInBytes(256 * 1024)    // Max 256KB per record
    .setRequestTimeoutMS(60000)             // 60s timeout
    .setFailOnTimeout(false)                // Retry on timeout
    .build();

Advanced Rate Limiting Configuration

// AIMD scaling strategy
AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)
    .setIncreaseRate(10)         // Linear increase
    .setDecreaseFactor(0.5)      // 50% decrease on failure
    .build();

// Congestion control rate limiting
CongestionControlRateLimitingStrategy rateLimiting = 
    CongestionControlRateLimitingStrategy.builder()
        .setMaxInFlightRequests(50)
        .setInitialMaxInFlightMessages(100)
        .setScalingStrategy(scalingStrategy)
        .build();

AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
    .setMaxBatchSize(100)
    .setMaxBatchSizeInBytes(1024 * 1024)
    .setMaxInFlightRequests(50)
    .setMaxBufferedRequests(1000)
    .setMaxTimeInBufferMS(5000)
    .setMaxRecordSizeInBytes(256 * 1024)
    .setRateLimitingStrategy(rateLimiting)  // Custom rate limiting
    .build();

Hybrid Source with Dynamic Position Transfer

HybridSource<String> hybridSource = HybridSource
    .<String, FileSourceEnumerator>builder(fileSource)
    .addSource(
        switchContext -> {
            // Get end position from previous source
            FileSourceEnumerator previousEnumerator = switchContext.getPreviousEnumerator();
            long endTimestamp = previousEnumerator.getEndTimestamp();
            
            // Configure next source with derived start position
            return KafkaSource.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("events")
                .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
                .setStartingOffsets(OffsetsInitializer.timestamp(endTimestamp))
                .build();
        },
        Boundedness.CONTINUOUS_UNBOUNDED
    )
    .build();

Best Practices

Sink Implementation

  1. Always implement proper error handling in submitRequestEntries
  2. Use appropriate batch sizes for your destination's characteristics
  3. Configure rate limiting based on destination capacity
  4. Implement proper size calculation in getSizeInBytes
  5. Handle fatal vs retryable exceptions correctly

Source Implementation

  1. Implement efficient split reading in SplitReader.fetch()
  2. Handle split lifecycle properly (add/remove/pause/resume)
  3. Use appropriate record emitters for state management
  4. Configure proper queue sizes for throughput requirements
  5. Implement proper cleanup in close methods

Performance Optimization

  1. Tune batch sizes for optimal throughput vs latency
  2. Configure appropriate timeouts for your network conditions
  3. Use efficient serialization for request entries
  4. Monitor and tune rate limiting strategies
  5. Optimize record size calculations for performance

This comprehensive framework enables building production-ready Flink connectors with sophisticated features like rate limiting, backpressure handling, state management, and fault tolerance built-in.