Base classes and utilities for building Apache Flink connectors with async sink patterns, source readers, and advanced streaming capabilities
—
The Table API Integration components provide seamless integration between the async sink framework and Flink's Table API and SQL. This enables building table sinks with all the advanced features of async sinks while maintaining compatibility with Flink's unified batch and streaming API.
Abstract base factory for creating table sinks with async sink capabilities.
@PublicEvolving
public abstract class AsyncDynamicTableSinkFactory implements DynamicTableSinkFactory {
// Required method implementations
public Set<ConfigOption<?>> requiredOptions()
public Set<ConfigOption<?>> optionalOptions()
// Protected helper methods
protected AsyncDynamicTableSinkBuilder<?, ?> addAsyncOptionsToBuilder(
Properties properties,
AsyncDynamicTableSinkBuilder<?, ?> builder)
// Inner class for context
public static class AsyncDynamicSinkContext {
public DataType getPhysicalRowDataType()
public ReadableConfig getConfiguration()
public ClassLoader getClassLoader()
public boolean isStreamingMode()
}
}Table sink implementation that bridges table operations to async sink writers.
@PublicEvolving
public class AsyncDynamicTableSink implements DynamicTableSink {
// Constructor
protected AsyncDynamicTableSink(
DataType physicalRowDataType,
AsyncSinkBase<RowData, ?> asyncSinkBase)
// DynamicTableSink implementation
public ChangelogMode getChangelogMode(ChangelogMode requestedMode)
public SinkRuntimeProvider getSinkRuntimeProvider(Context context)
public DynamicTableSink copy()
public String asSummaryString()
}Interface for validating table sink configurations.
@PublicEvolving
public interface ConfigurationValidator {
void validate(ReadableConfig configuration) throws ValidationException
}Built-in validator for async sink configuration options.
@PublicEvolving
public class AsyncSinkConfigurationValidator implements ConfigurationValidator {
public AsyncSinkConfigurationValidator()
public void validate(ReadableConfig configuration) throws ValidationException
}public class HttpTableSinkFactory extends AsyncDynamicTableSinkFactory {
// Configuration options
public static final ConfigOption<String> ENDPOINT =
ConfigOptions.key("endpoint")
.stringType()
.noDefaultValue()
.withDescription("HTTP endpoint URL for sending data");
public static final ConfigOption<String> METHOD =
ConfigOptions.key("method")
.stringType()
.defaultValue("POST")
.withDescription("HTTP method to use");
public static final ConfigOption<Map<String, String>> HEADERS =
ConfigOptions.key("headers")
.mapType()
.defaultValue(Collections.emptyMap())
.withDescription("HTTP headers to include in requests");
public static final ConfigOption<String> AUTH_TOKEN =
ConfigOptions.key("auth.token")
.stringType()
.noDefaultValue()
.withDescription("Authentication token for HTTP requests");
public static final ConfigOption<String> RECORD_FORMAT =
ConfigOptions.key("format")
.stringType()
.defaultValue("json")
.withDescription("Format for serializing records (json, avro, csv)");
@Override
public String factoryIdentifier() {
return "http";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.singleton(ENDPOINT);
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>(super.optionalOptions());
options.addAll(Arrays.asList(
METHOD,
HEADERS,
AUTH_TOKEN,
RECORD_FORMAT
));
return options;
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
// Validate configuration
ReadableConfig config = context.getConfiguration();
validateConfiguration(config);
// Extract configuration values
String endpoint = config.get(ENDPOINT);
String method = config.get(METHOD);
Map<String, String> headers = config.get(HEADERS);
Optional<String> authToken = config.getOptional(AUTH_TOKEN);
String recordFormat = config.get(RECORD_FORMAT);
// Create record serializer based on format
RecordSerializer<RowData> serializer = createSerializer(
recordFormat,
context.getPhysicalRowDataType()
);
// Create HTTP client configuration
HttpClientConfig clientConfig = HttpClientConfig.builder()
.setEndpoint(endpoint)
.setMethod(method)
.setHeaders(headers)
.setAuthToken(authToken.orElse(null))
.build();
// Create element converter
HttpElementConverter elementConverter = new HttpElementConverter(serializer, clientConfig);
// Create async sink base with configuration from table properties
AsyncSinkWriterConfiguration writerConfig = createAsyncWriterConfiguration(config);
HttpAsyncSinkBase asyncSink = new HttpAsyncSinkBase(
elementConverter,
writerConfig,
clientConfig
);
// Return table sink
return new AsyncDynamicTableSink(context.getPhysicalRowDataType(), asyncSink);
}
private void validateConfiguration(ReadableConfig config) {
// Basic validation
String endpoint = config.get(ENDPOINT);
if (endpoint == null || endpoint.isEmpty()) {
throw new ValidationException("HTTP endpoint must be specified");
}
try {
new URL(endpoint);
} catch (MalformedURLException e) {
throw new ValidationException("Invalid HTTP endpoint URL: " + endpoint, e);
}
// Validate async sink options
AsyncSinkConfigurationValidator asyncValidator = new AsyncSinkConfigurationValidator();
asyncValidator.validate(config);
}
private RecordSerializer<RowData> createSerializer(String format, DataType dataType) {
switch (format.toLowerCase()) {
case "json":
return new JsonRowDataSerializer(dataType);
case "avro":
return new AvroRowDataSerializer(dataType);
case "csv":
return new CsvRowDataSerializer(dataType);
default:
throw new ValidationException("Unsupported record format: " + format);
}
}
private AsyncSinkWriterConfiguration createAsyncWriterConfiguration(ReadableConfig config) {
AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder builder =
AsyncSinkWriterConfiguration.builder();
// Add async options from table configuration
addAsyncOptionsToBuilder(toProperties(config), new AsyncSinkWriterConfigurationBuilderAdapter(builder));
return builder.build();
}
private Properties toProperties(ReadableConfig config) {
Properties properties = new Properties();
config.toMap().forEach(properties::setProperty);
return properties;
}
}
// Adapter to bridge the builder interfaces
public class AsyncSinkWriterConfigurationBuilderAdapter implements AsyncDynamicTableSinkBuilder<RowData, HttpRequestEntry> {
private final AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder delegate;
public AsyncSinkWriterConfigurationBuilderAdapter(
AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder delegate) {
this.delegate = delegate;
}
// Implement bridge methods...
public AsyncDynamicTableSinkBuilder<RowData, HttpRequestEntry> setMaxBatchSize(int maxBatchSize) {
delegate.setMaxBatchSize(maxBatchSize);
return this;
}
// ... other bridge methods
}public class HttpElementConverter implements ElementConverter<RowData, HttpRequestEntry> {
private final RecordSerializer<RowData> serializer;
private final HttpClientConfig clientConfig;
public HttpElementConverter(RecordSerializer<RowData> serializer, HttpClientConfig clientConfig) {
this.serializer = serializer;
this.clientConfig = clientConfig;
}
@Override
public HttpRequestEntry apply(RowData element, SinkWriter.Context context) {
try {
// Serialize the row data
byte[] payload = serializer.serialize(element);
// Create HTTP request entry
return new HttpRequestEntry(
clientConfig.getEndpoint(),
clientConfig.getMethod(),
clientConfig.getHeaders(),
payload,
context.timestamp(),
generateRequestId()
);
} catch (Exception e) {
throw new RuntimeException("Failed to convert row data to HTTP request", e);
}
}
@Override
public void open(WriterInitContext context) {
serializer.open(context);
}
private String generateRequestId() {
return UUID.randomUUID().toString();
}
}
// JSON serializer for RowData
public class JsonRowDataSerializer implements RecordSerializer<RowData> {
private final DataType dataType;
private final ObjectMapper objectMapper;
private final RowDataToJsonConverter converter;
public JsonRowDataSerializer(DataType dataType) {
this.dataType = dataType;
this.objectMapper = new ObjectMapper();
this.converter = new RowDataToJsonConverter(dataType.getLogicalType());
}
@Override
public byte[] serialize(RowData rowData) throws IOException {
JsonNode jsonNode = converter.convert(rowData);
return objectMapper.writeValueAsBytes(jsonNode);
}
@Override
public void open(WriterInitContext context) {
// Configure object mapper
objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
objectMapper.registerModule(new JavaTimeModule());
}
}
// Converter from RowData to JSON
public class RowDataToJsonConverter {
private final LogicalType logicalType;
public RowDataToJsonConverter(LogicalType logicalType) {
this.logicalType = logicalType;
}
public JsonNode convert(RowData rowData) {
ObjectNode objectNode = JsonNodeFactory.instance.objectNode();
if (logicalType instanceof RowType) {
RowType rowType = (RowType) logicalType;
List<RowType.RowField> fields = rowType.getFields();
for (int i = 0; i < fields.size(); i++) {
RowType.RowField field = fields.get(i);
String fieldName = field.getName();
LogicalType fieldType = field.getType();
if (rowData.isNullAt(i)) {
objectNode.putNull(fieldName);
} else {
JsonNode fieldValue = convertField(rowData, i, fieldType);
objectNode.set(fieldName, fieldValue);
}
}
}
return objectNode;
}
private JsonNode convertField(RowData rowData, int pos, LogicalType fieldType) {
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
return JsonNodeFactory.instance.booleanNode(rowData.getBoolean(pos));
case TINYINT:
return JsonNodeFactory.instance.numberNode(rowData.getByte(pos));
case SMALLINT:
return JsonNodeFactory.instance.numberNode(rowData.getShort(pos));
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
return JsonNodeFactory.instance.numberNode(rowData.getInt(pos));
case BIGINT:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return JsonNodeFactory.instance.numberNode(rowData.getLong(pos));
case FLOAT:
return JsonNodeFactory.instance.numberNode(rowData.getFloat(pos));
case DOUBLE:
return JsonNodeFactory.instance.numberNode(rowData.getDouble(pos));
case VARCHAR:
case CHAR:
return JsonNodeFactory.instance.textNode(rowData.getString(pos).toString());
case DECIMAL:
return JsonNodeFactory.instance.numberNode(
rowData.getDecimal(pos, fieldType.getPrecision(), fieldType.getScale()).toBigDecimal());
case ARRAY:
ArrayType arrayType = (ArrayType) fieldType;
ArrayData arrayData = rowData.getArray(pos);
return convertArray(arrayData, arrayType.getElementType());
case ROW:
RowType nestedRowType = (RowType) fieldType;
RowData nestedRowData = rowData.getRow(pos, nestedRowType.getFieldCount());
return new RowDataToJsonConverter(nestedRowType).convert(nestedRowData);
default:
throw new UnsupportedOperationException("Unsupported type: " + fieldType);
}
}
private JsonNode convertArray(ArrayData arrayData, LogicalType elementType) {
ArrayNode arrayNode = JsonNodeFactory.instance.arrayNode();
for (int i = 0; i < arrayData.size(); i++) {
if (arrayData.isNullAt(i)) {
arrayNode.addNull();
} else {
// Convert array element (simplified - would need full implementation)
switch (elementType.getTypeRoot()) {
case INTEGER:
arrayNode.add(arrayData.getInt(i));
break;
case VARCHAR:
arrayNode.add(arrayData.getString(i).toString());
break;
// ... handle other types
default:
throw new UnsupportedOperationException("Unsupported array element type: " + elementType);
}
}
}
return arrayNode;
}
}-- Create HTTP table sink with async configuration
CREATE TABLE http_sink (
user_id BIGINT,
event_name STRING,
event_time TIMESTAMP(3),
properties MAP<STRING, STRING>
) WITH (
'connector' = 'http',
'endpoint' = 'https://api.example.com/events',
'method' = 'POST',
'format' = 'json',
'headers.Content-Type' = 'application/json',
'headers.User-Agent' = 'Flink-HTTP-Sink/1.0',
'auth.token' = 'your-auth-token',
-- Async sink configuration
'sink.max-batch-size' = '100',
'sink.max-batch-size-in-bytes' = '1048576', -- 1MB
'sink.max-in-flight-requests' = '10',
'sink.max-buffered-requests' = '1000',
'sink.max-time-in-buffer-ms' = '5000',
'sink.max-record-size-in-bytes' = '262144', -- 256KB
'sink.request-timeout-ms' = '30000',
'sink.fail-on-timeout' = 'false'
);
-- Insert data into the HTTP sink
INSERT INTO http_sink
SELECT
user_id,
event_name,
event_time,
properties
FROM source_table;public class MultiFormatTableSinkFactory extends AsyncDynamicTableSinkFactory {
public static final ConfigOption<String> FORMAT =
ConfigOptions.key("format")
.stringType()
.defaultValue("json")
.withDescription("Serialization format (json, avro, protobuf, csv)");
public static final ConfigOption<String> SCHEMA_REGISTRY_URL =
ConfigOptions.key("schema-registry.url")
.stringType()
.noDefaultValue()
.withDescription("Schema registry URL for Avro/Protobuf formats");
public static final ConfigOption<String> SUBJECT_NAME =
ConfigOptions.key("schema-registry.subject")
.stringType()
.noDefaultValue()
.withDescription("Schema registry subject name");
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
ReadableConfig config = context.getConfiguration();
String format = config.get(FORMAT);
// Create format-specific serializer
RecordSerializer<RowData> serializer = createFormatSerializer(format, context, config);
// Create sink with serializer
return createTableSink(context, serializer, config);
}
private RecordSerializer<RowData> createFormatSerializer(
String format,
Context context,
ReadableConfig config) {
DataType dataType = context.getPhysicalRowDataType();
switch (format.toLowerCase()) {
case "json":
return new JsonRowDataSerializer(dataType);
case "avro":
String schemaRegistryUrl = config.get(SCHEMA_REGISTRY_URL);
String subjectName = config.get(SUBJECT_NAME);
return new AvroRowDataSerializer(dataType, schemaRegistryUrl, subjectName);
case "protobuf":
return new ProtobufRowDataSerializer(dataType, config);
case "csv":
return new CsvRowDataSerializer(dataType, config);
default:
throw new ValidationException("Unsupported format: " + format);
}
}
}
// Avro serializer with schema registry
public class AvroRowDataSerializer implements RecordSerializer<RowData> {
private final DataType dataType;
private final String schemaRegistryUrl;
private final String subjectName;
private Schema avroSchema;
private CachedSchemaRegistryClient schemaRegistryClient;
private KafkaAvroSerializer avroSerializer;
public AvroRowDataSerializer(DataType dataType, String schemaRegistryUrl, String subjectName) {
this.dataType = dataType;
this.schemaRegistryUrl = schemaRegistryUrl;
this.subjectName = subjectName;
}
@Override
public void open(WriterInitContext context) {
try {
// Initialize schema registry client
this.schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100);
// Get Avro schema from registry
this.avroSchema = schemaRegistryClient.getLatestSchemaMetadata(subjectName).getSchema();
// Initialize Avro serializer
Map<String, Object> props = new HashMap<>();
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
this.avroSerializer = new KafkaAvroSerializer(schemaRegistryClient, props);
} catch (Exception e) {
throw new RuntimeException("Failed to initialize Avro serializer", e);
}
}
@Override
public byte[] serialize(RowData rowData) throws IOException {
try {
// Convert RowData to Avro GenericRecord
GenericRecord genericRecord = convertToGenericRecord(rowData);
// Serialize with schema registry
return avroSerializer.serialize(subjectName, genericRecord);
} catch (Exception e) {
throw new IOException("Failed to serialize RowData to Avro", e);
}
}
private GenericRecord convertToGenericRecord(RowData rowData) {
GenericRecord record = new GenericData.Record(avroSchema);
// Convert fields based on schema
List<Schema.Field> fields = avroSchema.getFields();
for (int i = 0; i < fields.size(); i++) {
Schema.Field field = fields.get(i);
if (!rowData.isNullAt(i)) {
Object value = convertFieldValue(rowData, i, field.schema());
record.put(field.name(), value);
}
}
return record;
}
private Object convertFieldValue(RowData rowData, int pos, Schema fieldSchema) {
// Implementation depends on field type mapping
switch (fieldSchema.getType()) {
case BOOLEAN:
return rowData.getBoolean(pos);
case INT:
return rowData.getInt(pos);
case LONG:
return rowData.getLong(pos);
case STRING:
return rowData.getString(pos).toString();
// ... handle other Avro types
default:
throw new UnsupportedOperationException("Unsupported Avro type: " + fieldSchema.getType());
}
}
}public class ComprehensiveAsyncSinkConfigurationValidator implements ConfigurationValidator {
// Async sink configuration options
public static final ConfigOption<Integer> MAX_BATCH_SIZE =
ConfigOptions.key("sink.max-batch-size")
.intType()
.defaultValue(100)
.withDescription("Maximum number of records per batch");
public static final ConfigOption<Long> MAX_BATCH_SIZE_IN_BYTES =
ConfigOptions.key("sink.max-batch-size-in-bytes")
.longType()
.defaultValue(1024 * 1024L) // 1MB
.withDescription("Maximum batch size in bytes");
public static final ConfigOption<Integer> MAX_IN_FLIGHT_REQUESTS =
ConfigOptions.key("sink.max-in-flight-requests")
.intType()
.defaultValue(10)
.withDescription("Maximum number of concurrent requests");
public static final ConfigOption<Integer> MAX_BUFFERED_REQUESTS =
ConfigOptions.key("sink.max-buffered-requests")
.intType()
.defaultValue(1000)
.withDescription("Maximum number of buffered requests");
public static final ConfigOption<Long> MAX_TIME_IN_BUFFER_MS =
ConfigOptions.key("sink.max-time-in-buffer-ms")
.longType()
.defaultValue(5000L)
.withDescription("Maximum time records stay in buffer (milliseconds)");
public static final ConfigOption<Long> MAX_RECORD_SIZE_IN_BYTES =
ConfigOptions.key("sink.max-record-size-in-bytes")
.longType()
.defaultValue(256 * 1024L) // 256KB
.withDescription("Maximum size of individual records in bytes");
public static final ConfigOption<Long> REQUEST_TIMEOUT_MS =
ConfigOptions.key("sink.request-timeout-ms")
.longType()
.defaultValue(30000L)
.withDescription("Request timeout in milliseconds");
public static final ConfigOption<Boolean> FAIL_ON_TIMEOUT =
ConfigOptions.key("sink.fail-on-timeout")
.booleanType()
.defaultValue(false)
.withDescription("Whether to fail job on request timeout");
@Override
public void validate(ReadableConfig configuration) throws ValidationException {
// Validate batch size constraints
int maxBatchSize = configuration.get(MAX_BATCH_SIZE);
int maxBufferedRequests = configuration.get(MAX_BUFFERED_REQUESTS);
if (maxBatchSize <= 0) {
throw new ValidationException("max-batch-size must be positive, got: " + maxBatchSize);
}
if (maxBufferedRequests <= maxBatchSize) {
throw new ValidationException(
"max-buffered-requests (" + maxBufferedRequests +
") must be greater than max-batch-size (" + maxBatchSize + ")");
}
// Validate size constraints
long maxBatchSizeInBytes = configuration.get(MAX_BATCH_SIZE_IN_BYTES);
long maxRecordSizeInBytes = configuration.get(MAX_RECORD_SIZE_IN_BYTES);
if (maxBatchSizeInBytes < maxRecordSizeInBytes) {
throw new ValidationException(
"max-batch-size-in-bytes (" + maxBatchSizeInBytes +
") must be >= max-record-size-in-bytes (" + maxRecordSizeInBytes + ")");
}
// Validate timeout settings
long requestTimeout = configuration.get(REQUEST_TIMEOUT_MS);
long maxTimeInBuffer = configuration.get(MAX_TIME_IN_BUFFER_MS);
if (requestTimeout <= 0) {
throw new ValidationException("request-timeout-ms must be positive, got: " + requestTimeout);
}
if (maxTimeInBuffer <= 0) {
throw new ValidationException("max-time-in-buffer-ms must be positive, got: " + maxTimeInBuffer);
}
// Warn if timeout is too short
if (requestTimeout < maxTimeInBuffer) {
LOG.warn("request-timeout-ms ({}) is shorter than max-time-in-buffer-ms ({}), " +
"this may cause premature timeouts", requestTimeout, maxTimeInBuffer);
}
// Validate in-flight request limits
int maxInFlightRequests = configuration.get(MAX_IN_FLIGHT_REQUESTS);
if (maxInFlightRequests <= 0) {
throw new ValidationException("max-in-flight-requests must be positive, got: " + maxInFlightRequests);
}
}
}public class OptimizedTableSinkFactory extends AsyncDynamicTableSinkFactory {
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
ReadableConfig config = context.getConfiguration();
// Optimize configuration based on table characteristics
AsyncSinkWriterConfiguration optimizedConfig = optimizeConfiguration(
config,
context.getPhysicalRowDataType(),
context.isStreamingMode()
);
return createOptimizedSink(context, optimizedConfig);
}
private AsyncSinkWriterConfiguration optimizeConfiguration(
ReadableConfig config,
DataType rowDataType,
boolean isStreaming) {
AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder builder =
AsyncSinkWriterConfiguration.builder();
// Calculate optimal batch size based on row size
int estimatedRowSize = estimateRowSize(rowDataType);
int optimalBatchSize = calculateOptimalBatchSize(estimatedRowSize, isStreaming);
builder.setMaxBatchSize(optimalBatchSize);
// Adjust buffer sizes for streaming vs batch mode
if (isStreaming) {
// Smaller buffers for lower latency
builder.setMaxTimeInBufferMS(1000)
.setMaxBufferedRequests(optimalBatchSize * 5);
} else {
// Larger buffers for higher throughput
builder.setMaxTimeInBufferMS(10000)
.setMaxBufferedRequests(optimalBatchSize * 20);
}
// Set other optimized values...
return builder.build();
}
private int estimateRowSize(DataType dataType) {
// Estimate based on data type structure
if (dataType instanceof RowType) {
RowType rowType = (RowType) dataType;
return rowType.getFields().stream()
.mapToInt(this::estimateFieldSize)
.sum();
}
return 100; // Default estimate
}
private int estimateFieldSize(RowType.RowField field) {
LogicalType type = field.getType();
switch (type.getTypeRoot()) {
case BOOLEAN:
case TINYINT:
return 1;
case SMALLINT:
return 2;
case INTEGER:
case FLOAT:
case DATE:
return 4;
case BIGINT:
case DOUBLE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return 8;
case VARCHAR:
case CHAR:
VarCharType varCharType = (VarCharType) type;
return varCharType.getLength();
default:
return 50; // Conservative estimate
}
}
}public class MonitoredTableSink extends AsyncDynamicTableSink {
private final MetricGroup metricGroup;
private final Counter recordsSent;
private final Counter recordsFailed;
private final Histogram serializationTime;
public MonitoredTableSink(
DataType physicalRowDataType,
AsyncSinkBase<RowData, ?> asyncSinkBase,
MetricGroup metricGroup) {
super(physicalRowDataType, asyncSinkBase);
this.metricGroup = metricGroup;
this.recordsSent = metricGroup.counter("records_sent");
this.recordsFailed = metricGroup.counter("records_failed");
this.serializationTime = metricGroup.histogram("serialization_time_ms");
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return SinkV2Provider.of(new MonitoredAsyncSinkWrapper(
asyncSinkBase,
recordsSent,
recordsFailed,
serializationTime
));
}
}
public class MonitoredAsyncSinkWrapper<T> implements Sink<T> {
private final Sink<T> delegate;
private final Counter recordsSent;
private final Counter recordsFailed;
private final Histogram serializationTime;
// Implementation that wraps calls with metrics...
}The Table API Integration provides a complete bridge between Flink's table ecosystem and the advanced async sink framework, enabling powerful, high-performance table sinks with comprehensive configuration options and monitoring capabilities.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-base