CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-base

Base classes and utilities for building Apache Flink connectors with async sink patterns, source readers, and advanced streaming capabilities

Pending
Overview
Eval results
Files

table-api.mddocs/

Table API Integration

The Table API Integration components provide seamless integration between the async sink framework and Flink's Table API and SQL. This enables building table sinks with all the advanced features of async sinks while maintaining compatibility with Flink's unified batch and streaming API.

Core Components

AsyncDynamicTableSinkFactory

Abstract base factory for creating table sinks with async sink capabilities.

@PublicEvolving
public abstract class AsyncDynamicTableSinkFactory implements DynamicTableSinkFactory {
    
    // Required method implementations
    public Set<ConfigOption<?>> requiredOptions()
    public Set<ConfigOption<?>> optionalOptions()
    
    // Protected helper methods
    protected AsyncDynamicTableSinkBuilder<?, ?> addAsyncOptionsToBuilder(
            Properties properties, 
            AsyncDynamicTableSinkBuilder<?, ?> builder)
    
    // Inner class for context
    public static class AsyncDynamicSinkContext {
        public DataType getPhysicalRowDataType()
        public ReadableConfig getConfiguration()
        public ClassLoader getClassLoader()
        public boolean isStreamingMode()
    }
}

AsyncDynamicTableSink

Table sink implementation that bridges table operations to async sink writers.

@PublicEvolving
public class AsyncDynamicTableSink implements DynamicTableSink {
    
    // Constructor
    protected AsyncDynamicTableSink(
            DataType physicalRowDataType,
            AsyncSinkBase<RowData, ?> asyncSinkBase)
    
    // DynamicTableSink implementation
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode)
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context)
    public DynamicTableSink copy()
    public String asSummaryString()
}

ConfigurationValidator

Interface for validating table sink configurations.

@PublicEvolving
public interface ConfigurationValidator {
    void validate(ReadableConfig configuration) throws ValidationException
}

AsyncSinkConfigurationValidator

Built-in validator for async sink configuration options.

@PublicEvolving
public class AsyncSinkConfigurationValidator implements ConfigurationValidator {
    
    public AsyncSinkConfigurationValidator()
    
    public void validate(ReadableConfig configuration) throws ValidationException
}

Implementation Examples

Complete Table Sink Factory

public class HttpTableSinkFactory extends AsyncDynamicTableSinkFactory {
    
    // Configuration options
    public static final ConfigOption<String> ENDPOINT = 
        ConfigOptions.key("endpoint")
            .stringType()
            .noDefaultValue()
            .withDescription("HTTP endpoint URL for sending data");
    
    public static final ConfigOption<String> METHOD = 
        ConfigOptions.key("method")
            .stringType()
            .defaultValue("POST")
            .withDescription("HTTP method to use");
    
    public static final ConfigOption<Map<String, String>> HEADERS = 
        ConfigOptions.key("headers")
            .mapType()
            .defaultValue(Collections.emptyMap())
            .withDescription("HTTP headers to include in requests");
    
    public static final ConfigOption<String> AUTH_TOKEN = 
        ConfigOptions.key("auth.token")
            .stringType()
            .noDefaultValue()
            .withDescription("Authentication token for HTTP requests");
    
    public static final ConfigOption<String> RECORD_FORMAT = 
        ConfigOptions.key("format")
            .stringType()
            .defaultValue("json")
            .withDescription("Format for serializing records (json, avro, csv)");
    
    @Override
    public String factoryIdentifier() {
        return "http";
    }
    
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.singleton(ENDPOINT);
    }
    
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> options = new HashSet<>(super.optionalOptions());
        options.addAll(Arrays.asList(
            METHOD,
            HEADERS,
            AUTH_TOKEN,
            RECORD_FORMAT
        ));
        return options;
    }
    
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        // Validate configuration
        ReadableConfig config = context.getConfiguration();
        validateConfiguration(config);
        
        // Extract configuration values
        String endpoint = config.get(ENDPOINT);
        String method = config.get(METHOD);
        Map<String, String> headers = config.get(HEADERS);
        Optional<String> authToken = config.getOptional(AUTH_TOKEN);
        String recordFormat = config.get(RECORD_FORMAT);
        
        // Create record serializer based on format
        RecordSerializer<RowData> serializer = createSerializer(
            recordFormat, 
            context.getPhysicalRowDataType()
        );
        
        // Create HTTP client configuration
        HttpClientConfig clientConfig = HttpClientConfig.builder()
            .setEndpoint(endpoint)
            .setMethod(method)
            .setHeaders(headers)
            .setAuthToken(authToken.orElse(null))
            .build();
        
        // Create element converter
        HttpElementConverter elementConverter = new HttpElementConverter(serializer, clientConfig);
        
        // Create async sink base with configuration from table properties
        AsyncSinkWriterConfiguration writerConfig = createAsyncWriterConfiguration(config);
        
        HttpAsyncSinkBase asyncSink = new HttpAsyncSinkBase(
            elementConverter,
            writerConfig,
            clientConfig
        );
        
        // Return table sink
        return new AsyncDynamicTableSink(context.getPhysicalRowDataType(), asyncSink);
    }
    
    private void validateConfiguration(ReadableConfig config) {
        // Basic validation
        String endpoint = config.get(ENDPOINT);
        if (endpoint == null || endpoint.isEmpty()) {
            throw new ValidationException("HTTP endpoint must be specified");
        }
        
        try {
            new URL(endpoint);
        } catch (MalformedURLException e) {
            throw new ValidationException("Invalid HTTP endpoint URL: " + endpoint, e);
        }
        
        // Validate async sink options
        AsyncSinkConfigurationValidator asyncValidator = new AsyncSinkConfigurationValidator();
        asyncValidator.validate(config);
    }
    
    private RecordSerializer<RowData> createSerializer(String format, DataType dataType) {
        switch (format.toLowerCase()) {
            case "json":
                return new JsonRowDataSerializer(dataType);
            case "avro":
                return new AvroRowDataSerializer(dataType);
            case "csv":
                return new CsvRowDataSerializer(dataType);
            default:
                throw new ValidationException("Unsupported record format: " + format);
        }
    }
    
    private AsyncSinkWriterConfiguration createAsyncWriterConfiguration(ReadableConfig config) {
        AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder builder = 
            AsyncSinkWriterConfiguration.builder();
        
        // Add async options from table configuration
        addAsyncOptionsToBuilder(toProperties(config), new AsyncSinkWriterConfigurationBuilderAdapter(builder));
        
        return builder.build();
    }
    
    private Properties toProperties(ReadableConfig config) {
        Properties properties = new Properties();
        config.toMap().forEach(properties::setProperty);
        return properties;
    }
}

// Adapter to bridge the builder interfaces
public class AsyncSinkWriterConfigurationBuilderAdapter implements AsyncDynamicTableSinkBuilder<RowData, HttpRequestEntry> {
    private final AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder delegate;
    
    public AsyncSinkWriterConfigurationBuilderAdapter(
            AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder delegate) {
        this.delegate = delegate;
    }
    
    // Implement bridge methods...
    public AsyncDynamicTableSinkBuilder<RowData, HttpRequestEntry> setMaxBatchSize(int maxBatchSize) {
        delegate.setMaxBatchSize(maxBatchSize);
        return this;
    }
    
    // ... other bridge methods
}

Element Converter for Table Data

public class HttpElementConverter implements ElementConverter<RowData, HttpRequestEntry> {
    private final RecordSerializer<RowData> serializer;
    private final HttpClientConfig clientConfig;
    
    public HttpElementConverter(RecordSerializer<RowData> serializer, HttpClientConfig clientConfig) {
        this.serializer = serializer;
        this.clientConfig = clientConfig;
    }
    
    @Override
    public HttpRequestEntry apply(RowData element, SinkWriter.Context context) {
        try {
            // Serialize the row data
            byte[] payload = serializer.serialize(element);
            
            // Create HTTP request entry
            return new HttpRequestEntry(
                clientConfig.getEndpoint(),
                clientConfig.getMethod(),
                clientConfig.getHeaders(),
                payload,
                context.timestamp(),
                generateRequestId()
            );
        } catch (Exception e) {
            throw new RuntimeException("Failed to convert row data to HTTP request", e);
        }
    }
    
    @Override
    public void open(WriterInitContext context) {
        serializer.open(context);
    }
    
    private String generateRequestId() {
        return UUID.randomUUID().toString();
    }
}

// JSON serializer for RowData
public class JsonRowDataSerializer implements RecordSerializer<RowData> {
    private final DataType dataType;
    private final ObjectMapper objectMapper;
    private final RowDataToJsonConverter converter;
    
    public JsonRowDataSerializer(DataType dataType) {
        this.dataType = dataType;
        this.objectMapper = new ObjectMapper();
        this.converter = new RowDataToJsonConverter(dataType.getLogicalType());
    }
    
    @Override
    public byte[] serialize(RowData rowData) throws IOException {
        JsonNode jsonNode = converter.convert(rowData);
        return objectMapper.writeValueAsBytes(jsonNode);
    }
    
    @Override
    public void open(WriterInitContext context) {
        // Configure object mapper
        objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
        objectMapper.registerModule(new JavaTimeModule());
    }
}

// Converter from RowData to JSON
public class RowDataToJsonConverter {
    private final LogicalType logicalType;
    
    public RowDataToJsonConverter(LogicalType logicalType) {
        this.logicalType = logicalType;
    }
    
    public JsonNode convert(RowData rowData) {
        ObjectNode objectNode = JsonNodeFactory.instance.objectNode();
        
        if (logicalType instanceof RowType) {
            RowType rowType = (RowType) logicalType;
            List<RowType.RowField> fields = rowType.getFields();
            
            for (int i = 0; i < fields.size(); i++) {
                RowType.RowField field = fields.get(i);
                String fieldName = field.getName();
                LogicalType fieldType = field.getType();
                
                if (rowData.isNullAt(i)) {
                    objectNode.putNull(fieldName);
                } else {
                    JsonNode fieldValue = convertField(rowData, i, fieldType);
                    objectNode.set(fieldName, fieldValue);
                }
            }
        }
        
        return objectNode;
    }
    
    private JsonNode convertField(RowData rowData, int pos, LogicalType fieldType) {
        switch (fieldType.getTypeRoot()) {
            case BOOLEAN:
                return JsonNodeFactory.instance.booleanNode(rowData.getBoolean(pos));
            case TINYINT:
                return JsonNodeFactory.instance.numberNode(rowData.getByte(pos));
            case SMALLINT:
                return JsonNodeFactory.instance.numberNode(rowData.getShort(pos));
            case INTEGER:
            case DATE:
            case TIME_WITHOUT_TIME_ZONE:
                return JsonNodeFactory.instance.numberNode(rowData.getInt(pos));
            case BIGINT:
            case TIMESTAMP_WITHOUT_TIME_ZONE:
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                return JsonNodeFactory.instance.numberNode(rowData.getLong(pos));
            case FLOAT:
                return JsonNodeFactory.instance.numberNode(rowData.getFloat(pos));
            case DOUBLE:
                return JsonNodeFactory.instance.numberNode(rowData.getDouble(pos));
            case VARCHAR:
            case CHAR:
                return JsonNodeFactory.instance.textNode(rowData.getString(pos).toString());
            case DECIMAL:
                return JsonNodeFactory.instance.numberNode(
                    rowData.getDecimal(pos, fieldType.getPrecision(), fieldType.getScale()).toBigDecimal());
            case ARRAY:
                ArrayType arrayType = (ArrayType) fieldType;
                ArrayData arrayData = rowData.getArray(pos);
                return convertArray(arrayData, arrayType.getElementType());
            case ROW:
                RowType nestedRowType = (RowType) fieldType;
                RowData nestedRowData = rowData.getRow(pos, nestedRowType.getFieldCount());
                return new RowDataToJsonConverter(nestedRowType).convert(nestedRowData);
            default:
                throw new UnsupportedOperationException("Unsupported type: " + fieldType);
        }
    }
    
    private JsonNode convertArray(ArrayData arrayData, LogicalType elementType) {
        ArrayNode arrayNode = JsonNodeFactory.instance.arrayNode();
        
        for (int i = 0; i < arrayData.size(); i++) {
            if (arrayData.isNullAt(i)) {
                arrayNode.addNull();
            } else {
                // Convert array element (simplified - would need full implementation)
                switch (elementType.getTypeRoot()) {
                    case INTEGER:
                        arrayNode.add(arrayData.getInt(i));
                        break;
                    case VARCHAR:
                        arrayNode.add(arrayData.getString(i).toString());
                        break;
                    // ... handle other types
                    default:
                        throw new UnsupportedOperationException("Unsupported array element type: " + elementType);
                }
            }
        }
        
        return arrayNode;
    }
}

SQL DDL Usage Examples

-- Create HTTP table sink with async configuration
CREATE TABLE http_sink (
    user_id BIGINT,
    event_name STRING,
    event_time TIMESTAMP(3),
    properties MAP<STRING, STRING>
) WITH (
    'connector' = 'http',
    'endpoint' = 'https://api.example.com/events',
    'method' = 'POST',
    'format' = 'json',
    'headers.Content-Type' = 'application/json',
    'headers.User-Agent' = 'Flink-HTTP-Sink/1.0',
    'auth.token' = 'your-auth-token',
    
    -- Async sink configuration
    'sink.max-batch-size' = '100',
    'sink.max-batch-size-in-bytes' = '1048576',  -- 1MB
    'sink.max-in-flight-requests' = '10',
    'sink.max-buffered-requests' = '1000',
    'sink.max-time-in-buffer-ms' = '5000',
    'sink.max-record-size-in-bytes' = '262144',  -- 256KB
    'sink.request-timeout-ms' = '30000',
    'sink.fail-on-timeout' = 'false'
);

-- Insert data into the HTTP sink
INSERT INTO http_sink
SELECT 
    user_id,
    event_name,
    event_time,
    properties
FROM source_table;

Advanced Table Sink with Multiple Formats

public class MultiFormatTableSinkFactory extends AsyncDynamicTableSinkFactory {
    
    public static final ConfigOption<String> FORMAT = 
        ConfigOptions.key("format")
            .stringType()
            .defaultValue("json")
            .withDescription("Serialization format (json, avro, protobuf, csv)");
    
    public static final ConfigOption<String> SCHEMA_REGISTRY_URL = 
        ConfigOptions.key("schema-registry.url")
            .stringType()
            .noDefaultValue()
            .withDescription("Schema registry URL for Avro/Protobuf formats");
    
    public static final ConfigOption<String> SUBJECT_NAME = 
        ConfigOptions.key("schema-registry.subject")
            .stringType()
            .noDefaultValue()
            .withDescription("Schema registry subject name");
    
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        ReadableConfig config = context.getConfiguration();
        String format = config.get(FORMAT);
        
        // Create format-specific serializer
        RecordSerializer<RowData> serializer = createFormatSerializer(format, context, config);
        
        // Create sink with serializer
        return createTableSink(context, serializer, config);
    }
    
    private RecordSerializer<RowData> createFormatSerializer(
            String format, 
            Context context, 
            ReadableConfig config) {
        
        DataType dataType = context.getPhysicalRowDataType();
        
        switch (format.toLowerCase()) {
            case "json":
                return new JsonRowDataSerializer(dataType);
                
            case "avro":
                String schemaRegistryUrl = config.get(SCHEMA_REGISTRY_URL);
                String subjectName = config.get(SUBJECT_NAME);
                return new AvroRowDataSerializer(dataType, schemaRegistryUrl, subjectName);
                
            case "protobuf":
                return new ProtobufRowDataSerializer(dataType, config);
                
            case "csv":
                return new CsvRowDataSerializer(dataType, config);
                
            default:
                throw new ValidationException("Unsupported format: " + format);
        }
    }
}

// Avro serializer with schema registry
public class AvroRowDataSerializer implements RecordSerializer<RowData> {
    private final DataType dataType;
    private final String schemaRegistryUrl;
    private final String subjectName;
    private Schema avroSchema;
    private CachedSchemaRegistryClient schemaRegistryClient;
    private KafkaAvroSerializer avroSerializer;
    
    public AvroRowDataSerializer(DataType dataType, String schemaRegistryUrl, String subjectName) {
        this.dataType = dataType;
        this.schemaRegistryUrl = schemaRegistryUrl;
        this.subjectName = subjectName;
    }
    
    @Override
    public void open(WriterInitContext context) {
        try {
            // Initialize schema registry client
            this.schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100);
            
            // Get Avro schema from registry
            this.avroSchema = schemaRegistryClient.getLatestSchemaMetadata(subjectName).getSchema();
            
            // Initialize Avro serializer
            Map<String, Object> props = new HashMap<>();
            props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
            this.avroSerializer = new KafkaAvroSerializer(schemaRegistryClient, props);
            
        } catch (Exception e) {
            throw new RuntimeException("Failed to initialize Avro serializer", e);
        }
    }
    
    @Override
    public byte[] serialize(RowData rowData) throws IOException {
        try {
            // Convert RowData to Avro GenericRecord
            GenericRecord genericRecord = convertToGenericRecord(rowData);
            
            // Serialize with schema registry
            return avroSerializer.serialize(subjectName, genericRecord);
            
        } catch (Exception e) {
            throw new IOException("Failed to serialize RowData to Avro", e);
        }
    }
    
    private GenericRecord convertToGenericRecord(RowData rowData) {
        GenericRecord record = new GenericData.Record(avroSchema);
        
        // Convert fields based on schema
        List<Schema.Field> fields = avroSchema.getFields();
        for (int i = 0; i < fields.size(); i++) {
            Schema.Field field = fields.get(i);
            
            if (!rowData.isNullAt(i)) {
                Object value = convertFieldValue(rowData, i, field.schema());
                record.put(field.name(), value);
            }
        }
        
        return record;
    }
    
    private Object convertFieldValue(RowData rowData, int pos, Schema fieldSchema) {
        // Implementation depends on field type mapping
        switch (fieldSchema.getType()) {
            case BOOLEAN:
                return rowData.getBoolean(pos);
            case INT:
                return rowData.getInt(pos);
            case LONG:
                return rowData.getLong(pos);
            case STRING:
                return rowData.getString(pos).toString();
            // ... handle other Avro types
            default:
                throw new UnsupportedOperationException("Unsupported Avro type: " + fieldSchema.getType());
        }
    }
}

Configuration Validation

public class ComprehensiveAsyncSinkConfigurationValidator implements ConfigurationValidator {
    
    // Async sink configuration options
    public static final ConfigOption<Integer> MAX_BATCH_SIZE = 
        ConfigOptions.key("sink.max-batch-size")
            .intType()
            .defaultValue(100)
            .withDescription("Maximum number of records per batch");
    
    public static final ConfigOption<Long> MAX_BATCH_SIZE_IN_BYTES = 
        ConfigOptions.key("sink.max-batch-size-in-bytes")
            .longType()
            .defaultValue(1024 * 1024L)  // 1MB
            .withDescription("Maximum batch size in bytes");
    
    public static final ConfigOption<Integer> MAX_IN_FLIGHT_REQUESTS = 
        ConfigOptions.key("sink.max-in-flight-requests")
            .intType()
            .defaultValue(10)
            .withDescription("Maximum number of concurrent requests");
    
    public static final ConfigOption<Integer> MAX_BUFFERED_REQUESTS = 
        ConfigOptions.key("sink.max-buffered-requests")
            .intType()
            .defaultValue(1000)
            .withDescription("Maximum number of buffered requests");
    
    public static final ConfigOption<Long> MAX_TIME_IN_BUFFER_MS = 
        ConfigOptions.key("sink.max-time-in-buffer-ms")
            .longType()
            .defaultValue(5000L)
            .withDescription("Maximum time records stay in buffer (milliseconds)");
    
    public static final ConfigOption<Long> MAX_RECORD_SIZE_IN_BYTES = 
        ConfigOptions.key("sink.max-record-size-in-bytes")
            .longType()
            .defaultValue(256 * 1024L)  // 256KB
            .withDescription("Maximum size of individual records in bytes");
    
    public static final ConfigOption<Long> REQUEST_TIMEOUT_MS = 
        ConfigOptions.key("sink.request-timeout-ms")
            .longType()
            .defaultValue(30000L)
            .withDescription("Request timeout in milliseconds");
    
    public static final ConfigOption<Boolean> FAIL_ON_TIMEOUT = 
        ConfigOptions.key("sink.fail-on-timeout")
            .booleanType()
            .defaultValue(false)
            .withDescription("Whether to fail job on request timeout");
    
    @Override
    public void validate(ReadableConfig configuration) throws ValidationException {
        // Validate batch size constraints
        int maxBatchSize = configuration.get(MAX_BATCH_SIZE);
        int maxBufferedRequests = configuration.get(MAX_BUFFERED_REQUESTS);
        
        if (maxBatchSize <= 0) {
            throw new ValidationException("max-batch-size must be positive, got: " + maxBatchSize);
        }
        
        if (maxBufferedRequests <= maxBatchSize) {
            throw new ValidationException(
                "max-buffered-requests (" + maxBufferedRequests + 
                ") must be greater than max-batch-size (" + maxBatchSize + ")");
        }
        
        // Validate size constraints
        long maxBatchSizeInBytes = configuration.get(MAX_BATCH_SIZE_IN_BYTES);
        long maxRecordSizeInBytes = configuration.get(MAX_RECORD_SIZE_IN_BYTES);
        
        if (maxBatchSizeInBytes < maxRecordSizeInBytes) {
            throw new ValidationException(
                "max-batch-size-in-bytes (" + maxBatchSizeInBytes + 
                ") must be >= max-record-size-in-bytes (" + maxRecordSizeInBytes + ")");
        }
        
        // Validate timeout settings
        long requestTimeout = configuration.get(REQUEST_TIMEOUT_MS);
        long maxTimeInBuffer = configuration.get(MAX_TIME_IN_BUFFER_MS);
        
        if (requestTimeout <= 0) {
            throw new ValidationException("request-timeout-ms must be positive, got: " + requestTimeout);
        }
        
        if (maxTimeInBuffer <= 0) {
            throw new ValidationException("max-time-in-buffer-ms must be positive, got: " + maxTimeInBuffer);
        }
        
        // Warn if timeout is too short
        if (requestTimeout < maxTimeInBuffer) {
            LOG.warn("request-timeout-ms ({}) is shorter than max-time-in-buffer-ms ({}), " +
                     "this may cause premature timeouts", requestTimeout, maxTimeInBuffer);
        }
        
        // Validate in-flight request limits
        int maxInFlightRequests = configuration.get(MAX_IN_FLIGHT_REQUESTS);
        if (maxInFlightRequests <= 0) {
            throw new ValidationException("max-in-flight-requests must be positive, got: " + maxInFlightRequests);
        }
    }
}

Best Practices

Performance Optimization for Table Sinks

public class OptimizedTableSinkFactory extends AsyncDynamicTableSinkFactory {
    
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        ReadableConfig config = context.getConfiguration();
        
        // Optimize configuration based on table characteristics
        AsyncSinkWriterConfiguration optimizedConfig = optimizeConfiguration(
            config, 
            context.getPhysicalRowDataType(),
            context.isStreamingMode()
        );
        
        return createOptimizedSink(context, optimizedConfig);
    }
    
    private AsyncSinkWriterConfiguration optimizeConfiguration(
            ReadableConfig config,
            DataType rowDataType,
            boolean isStreaming) {
        
        AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder builder = 
            AsyncSinkWriterConfiguration.builder();
        
        // Calculate optimal batch size based on row size
        int estimatedRowSize = estimateRowSize(rowDataType);
        int optimalBatchSize = calculateOptimalBatchSize(estimatedRowSize, isStreaming);
        
        builder.setMaxBatchSize(optimalBatchSize);
        
        // Adjust buffer sizes for streaming vs batch mode
        if (isStreaming) {
            // Smaller buffers for lower latency
            builder.setMaxTimeInBufferMS(1000)
                   .setMaxBufferedRequests(optimalBatchSize * 5);
        } else {
            // Larger buffers for higher throughput
            builder.setMaxTimeInBufferMS(10000)
                   .setMaxBufferedRequests(optimalBatchSize * 20);
        }
        
        // Set other optimized values...
        return builder.build();
    }
    
    private int estimateRowSize(DataType dataType) {
        // Estimate based on data type structure
        if (dataType instanceof RowType) {
            RowType rowType = (RowType) dataType;
            return rowType.getFields().stream()
                .mapToInt(this::estimateFieldSize)
                .sum();
        }
        return 100; // Default estimate
    }
    
    private int estimateFieldSize(RowType.RowField field) {
        LogicalType type = field.getType();
        switch (type.getTypeRoot()) {
            case BOOLEAN:
            case TINYINT:
                return 1;
            case SMALLINT:
                return 2;
            case INTEGER:
            case FLOAT:
            case DATE:
                return 4;
            case BIGINT:
            case DOUBLE:
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return 8;
            case VARCHAR:
            case CHAR:
                VarCharType varCharType = (VarCharType) type;
                return varCharType.getLength();
            default:
                return 50; // Conservative estimate
        }
    }
}

Error Handling and Monitoring

public class MonitoredTableSink extends AsyncDynamicTableSink {
    private final MetricGroup metricGroup;
    private final Counter recordsSent;
    private final Counter recordsFailed;
    private final Histogram serializationTime;
    
    public MonitoredTableSink(
            DataType physicalRowDataType,
            AsyncSinkBase<RowData, ?> asyncSinkBase,
            MetricGroup metricGroup) {
        super(physicalRowDataType, asyncSinkBase);
        this.metricGroup = metricGroup;
        
        this.recordsSent = metricGroup.counter("records_sent");
        this.recordsFailed = metricGroup.counter("records_failed");
        this.serializationTime = metricGroup.histogram("serialization_time_ms");
    }
    
    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        return SinkV2Provider.of(new MonitoredAsyncSinkWrapper(
            asyncSinkBase,
            recordsSent,
            recordsFailed,
            serializationTime
        ));
    }
}

public class MonitoredAsyncSinkWrapper<T> implements Sink<T> {
    private final Sink<T> delegate;
    private final Counter recordsSent;
    private final Counter recordsFailed;
    private final Histogram serializationTime;
    
    // Implementation that wraps calls with metrics...
}

The Table API Integration provides a complete bridge between Flink's table ecosystem and the advanced async sink framework, enabling powerful, high-performance table sinks with comprehensive configuration options and monitoring capabilities.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-base

docs

async-sink.md

hybrid-source.md

index.md

rate-limiting.md

source-reader.md

table-api.md

tile.json