0
# Table API Integration
1
2
The Table API Integration components provide seamless integration between the async sink framework and Flink's Table API and SQL. This enables building table sinks with all the advanced features of async sinks while maintaining compatibility with Flink's unified batch and streaming API.
3
4
## Core Components
5
6
### AsyncDynamicTableSinkFactory
7
8
Abstract base factory for creating table sinks with async sink capabilities.
9
10
```java { .api }
11
@PublicEvolving
12
public abstract class AsyncDynamicTableSinkFactory implements DynamicTableSinkFactory {
13
14
// Required method implementations
15
public Set<ConfigOption<?>> requiredOptions()
16
public Set<ConfigOption<?>> optionalOptions()
17
18
// Protected helper methods
19
protected AsyncDynamicTableSinkBuilder<?, ?> addAsyncOptionsToBuilder(
20
Properties properties,
21
AsyncDynamicTableSinkBuilder<?, ?> builder)
22
23
// Inner class for context
24
public static class AsyncDynamicSinkContext {
25
public DataType getPhysicalRowDataType()
26
public ReadableConfig getConfiguration()
27
public ClassLoader getClassLoader()
28
public boolean isStreamingMode()
29
}
30
}
31
```
32
33
### AsyncDynamicTableSink
34
35
Table sink implementation that bridges table operations to async sink writers.
36
37
```java { .api }
38
@PublicEvolving
39
public class AsyncDynamicTableSink implements DynamicTableSink {
40
41
// Constructor
42
protected AsyncDynamicTableSink(
43
DataType physicalRowDataType,
44
AsyncSinkBase<RowData, ?> asyncSinkBase)
45
46
// DynamicTableSink implementation
47
public ChangelogMode getChangelogMode(ChangelogMode requestedMode)
48
public SinkRuntimeProvider getSinkRuntimeProvider(Context context)
49
public DynamicTableSink copy()
50
public String asSummaryString()
51
}
52
```
53
54
### ConfigurationValidator
55
56
Interface for validating table sink configurations.
57
58
```java { .api }
59
@PublicEvolving
60
public interface ConfigurationValidator {
61
void validate(ReadableConfig configuration) throws ValidationException
62
}
63
```
64
65
### AsyncSinkConfigurationValidator
66
67
Built-in validator for async sink configuration options.
68
69
```java { .api }
70
@PublicEvolving
71
public class AsyncSinkConfigurationValidator implements ConfigurationValidator {
72
73
public AsyncSinkConfigurationValidator()
74
75
public void validate(ReadableConfig configuration) throws ValidationException
76
}
77
```
78
79
## Implementation Examples
80
81
### Complete Table Sink Factory
82
83
```java
84
public class HttpTableSinkFactory extends AsyncDynamicTableSinkFactory {
85
86
// Configuration options
87
public static final ConfigOption<String> ENDPOINT =
88
ConfigOptions.key("endpoint")
89
.stringType()
90
.noDefaultValue()
91
.withDescription("HTTP endpoint URL for sending data");
92
93
public static final ConfigOption<String> METHOD =
94
ConfigOptions.key("method")
95
.stringType()
96
.defaultValue("POST")
97
.withDescription("HTTP method to use");
98
99
public static final ConfigOption<Map<String, String>> HEADERS =
100
ConfigOptions.key("headers")
101
.mapType()
102
.defaultValue(Collections.emptyMap())
103
.withDescription("HTTP headers to include in requests");
104
105
public static final ConfigOption<String> AUTH_TOKEN =
106
ConfigOptions.key("auth.token")
107
.stringType()
108
.noDefaultValue()
109
.withDescription("Authentication token for HTTP requests");
110
111
public static final ConfigOption<String> RECORD_FORMAT =
112
ConfigOptions.key("format")
113
.stringType()
114
.defaultValue("json")
115
.withDescription("Format for serializing records (json, avro, csv)");
116
117
@Override
118
public String factoryIdentifier() {
119
return "http";
120
}
121
122
@Override
123
public Set<ConfigOption<?>> requiredOptions() {
124
return Collections.singleton(ENDPOINT);
125
}
126
127
@Override
128
public Set<ConfigOption<?>> optionalOptions() {
129
Set<ConfigOption<?>> options = new HashSet<>(super.optionalOptions());
130
options.addAll(Arrays.asList(
131
METHOD,
132
HEADERS,
133
AUTH_TOKEN,
134
RECORD_FORMAT
135
));
136
return options;
137
}
138
139
@Override
140
public DynamicTableSink createDynamicTableSink(Context context) {
141
// Validate configuration
142
ReadableConfig config = context.getConfiguration();
143
validateConfiguration(config);
144
145
// Extract configuration values
146
String endpoint = config.get(ENDPOINT);
147
String method = config.get(METHOD);
148
Map<String, String> headers = config.get(HEADERS);
149
Optional<String> authToken = config.getOptional(AUTH_TOKEN);
150
String recordFormat = config.get(RECORD_FORMAT);
151
152
// Create record serializer based on format
153
RecordSerializer<RowData> serializer = createSerializer(
154
recordFormat,
155
context.getPhysicalRowDataType()
156
);
157
158
// Create HTTP client configuration
159
HttpClientConfig clientConfig = HttpClientConfig.builder()
160
.setEndpoint(endpoint)
161
.setMethod(method)
162
.setHeaders(headers)
163
.setAuthToken(authToken.orElse(null))
164
.build();
165
166
// Create element converter
167
HttpElementConverter elementConverter = new HttpElementConverter(serializer, clientConfig);
168
169
// Create async sink base with configuration from table properties
170
AsyncSinkWriterConfiguration writerConfig = createAsyncWriterConfiguration(config);
171
172
HttpAsyncSinkBase asyncSink = new HttpAsyncSinkBase(
173
elementConverter,
174
writerConfig,
175
clientConfig
176
);
177
178
// Return table sink
179
return new AsyncDynamicTableSink(context.getPhysicalRowDataType(), asyncSink);
180
}
181
182
private void validateConfiguration(ReadableConfig config) {
183
// Basic validation
184
String endpoint = config.get(ENDPOINT);
185
if (endpoint == null || endpoint.isEmpty()) {
186
throw new ValidationException("HTTP endpoint must be specified");
187
}
188
189
try {
190
new URL(endpoint);
191
} catch (MalformedURLException e) {
192
throw new ValidationException("Invalid HTTP endpoint URL: " + endpoint, e);
193
}
194
195
// Validate async sink options
196
AsyncSinkConfigurationValidator asyncValidator = new AsyncSinkConfigurationValidator();
197
asyncValidator.validate(config);
198
}
199
200
private RecordSerializer<RowData> createSerializer(String format, DataType dataType) {
201
switch (format.toLowerCase()) {
202
case "json":
203
return new JsonRowDataSerializer(dataType);
204
case "avro":
205
return new AvroRowDataSerializer(dataType);
206
case "csv":
207
return new CsvRowDataSerializer(dataType);
208
default:
209
throw new ValidationException("Unsupported record format: " + format);
210
}
211
}
212
213
private AsyncSinkWriterConfiguration createAsyncWriterConfiguration(ReadableConfig config) {
214
AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder builder =
215
AsyncSinkWriterConfiguration.builder();
216
217
// Add async options from table configuration
218
addAsyncOptionsToBuilder(toProperties(config), new AsyncSinkWriterConfigurationBuilderAdapter(builder));
219
220
return builder.build();
221
}
222
223
private Properties toProperties(ReadableConfig config) {
224
Properties properties = new Properties();
225
config.toMap().forEach(properties::setProperty);
226
return properties;
227
}
228
}
229
230
// Adapter to bridge the builder interfaces
231
public class AsyncSinkWriterConfigurationBuilderAdapter implements AsyncDynamicTableSinkBuilder<RowData, HttpRequestEntry> {
232
private final AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder delegate;
233
234
public AsyncSinkWriterConfigurationBuilderAdapter(
235
AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder delegate) {
236
this.delegate = delegate;
237
}
238
239
// Implement bridge methods...
240
public AsyncDynamicTableSinkBuilder<RowData, HttpRequestEntry> setMaxBatchSize(int maxBatchSize) {
241
delegate.setMaxBatchSize(maxBatchSize);
242
return this;
243
}
244
245
// ... other bridge methods
246
}
247
```
248
249
### Element Converter for Table Data
250
251
```java
252
public class HttpElementConverter implements ElementConverter<RowData, HttpRequestEntry> {
253
private final RecordSerializer<RowData> serializer;
254
private final HttpClientConfig clientConfig;
255
256
public HttpElementConverter(RecordSerializer<RowData> serializer, HttpClientConfig clientConfig) {
257
this.serializer = serializer;
258
this.clientConfig = clientConfig;
259
}
260
261
@Override
262
public HttpRequestEntry apply(RowData element, SinkWriter.Context context) {
263
try {
264
// Serialize the row data
265
byte[] payload = serializer.serialize(element);
266
267
// Create HTTP request entry
268
return new HttpRequestEntry(
269
clientConfig.getEndpoint(),
270
clientConfig.getMethod(),
271
clientConfig.getHeaders(),
272
payload,
273
context.timestamp(),
274
generateRequestId()
275
);
276
} catch (Exception e) {
277
throw new RuntimeException("Failed to convert row data to HTTP request", e);
278
}
279
}
280
281
@Override
282
public void open(WriterInitContext context) {
283
serializer.open(context);
284
}
285
286
private String generateRequestId() {
287
return UUID.randomUUID().toString();
288
}
289
}
290
291
// JSON serializer for RowData
292
public class JsonRowDataSerializer implements RecordSerializer<RowData> {
293
private final DataType dataType;
294
private final ObjectMapper objectMapper;
295
private final RowDataToJsonConverter converter;
296
297
public JsonRowDataSerializer(DataType dataType) {
298
this.dataType = dataType;
299
this.objectMapper = new ObjectMapper();
300
this.converter = new RowDataToJsonConverter(dataType.getLogicalType());
301
}
302
303
@Override
304
public byte[] serialize(RowData rowData) throws IOException {
305
JsonNode jsonNode = converter.convert(rowData);
306
return objectMapper.writeValueAsBytes(jsonNode);
307
}
308
309
@Override
310
public void open(WriterInitContext context) {
311
// Configure object mapper
312
objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
313
objectMapper.registerModule(new JavaTimeModule());
314
}
315
}
316
317
// Converter from RowData to JSON
318
public class RowDataToJsonConverter {
319
private final LogicalType logicalType;
320
321
public RowDataToJsonConverter(LogicalType logicalType) {
322
this.logicalType = logicalType;
323
}
324
325
public JsonNode convert(RowData rowData) {
326
ObjectNode objectNode = JsonNodeFactory.instance.objectNode();
327
328
if (logicalType instanceof RowType) {
329
RowType rowType = (RowType) logicalType;
330
List<RowType.RowField> fields = rowType.getFields();
331
332
for (int i = 0; i < fields.size(); i++) {
333
RowType.RowField field = fields.get(i);
334
String fieldName = field.getName();
335
LogicalType fieldType = field.getType();
336
337
if (rowData.isNullAt(i)) {
338
objectNode.putNull(fieldName);
339
} else {
340
JsonNode fieldValue = convertField(rowData, i, fieldType);
341
objectNode.set(fieldName, fieldValue);
342
}
343
}
344
}
345
346
return objectNode;
347
}
348
349
private JsonNode convertField(RowData rowData, int pos, LogicalType fieldType) {
350
switch (fieldType.getTypeRoot()) {
351
case BOOLEAN:
352
return JsonNodeFactory.instance.booleanNode(rowData.getBoolean(pos));
353
case TINYINT:
354
return JsonNodeFactory.instance.numberNode(rowData.getByte(pos));
355
case SMALLINT:
356
return JsonNodeFactory.instance.numberNode(rowData.getShort(pos));
357
case INTEGER:
358
case DATE:
359
case TIME_WITHOUT_TIME_ZONE:
360
return JsonNodeFactory.instance.numberNode(rowData.getInt(pos));
361
case BIGINT:
362
case TIMESTAMP_WITHOUT_TIME_ZONE:
363
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
364
return JsonNodeFactory.instance.numberNode(rowData.getLong(pos));
365
case FLOAT:
366
return JsonNodeFactory.instance.numberNode(rowData.getFloat(pos));
367
case DOUBLE:
368
return JsonNodeFactory.instance.numberNode(rowData.getDouble(pos));
369
case VARCHAR:
370
case CHAR:
371
return JsonNodeFactory.instance.textNode(rowData.getString(pos).toString());
372
case DECIMAL:
373
return JsonNodeFactory.instance.numberNode(
374
rowData.getDecimal(pos, fieldType.getPrecision(), fieldType.getScale()).toBigDecimal());
375
case ARRAY:
376
ArrayType arrayType = (ArrayType) fieldType;
377
ArrayData arrayData = rowData.getArray(pos);
378
return convertArray(arrayData, arrayType.getElementType());
379
case ROW:
380
RowType nestedRowType = (RowType) fieldType;
381
RowData nestedRowData = rowData.getRow(pos, nestedRowType.getFieldCount());
382
return new RowDataToJsonConverter(nestedRowType).convert(nestedRowData);
383
default:
384
throw new UnsupportedOperationException("Unsupported type: " + fieldType);
385
}
386
}
387
388
private JsonNode convertArray(ArrayData arrayData, LogicalType elementType) {
389
ArrayNode arrayNode = JsonNodeFactory.instance.arrayNode();
390
391
for (int i = 0; i < arrayData.size(); i++) {
392
if (arrayData.isNullAt(i)) {
393
arrayNode.addNull();
394
} else {
395
// Convert array element (simplified - would need full implementation)
396
switch (elementType.getTypeRoot()) {
397
case INTEGER:
398
arrayNode.add(arrayData.getInt(i));
399
break;
400
case VARCHAR:
401
arrayNode.add(arrayData.getString(i).toString());
402
break;
403
// ... handle other types
404
default:
405
throw new UnsupportedOperationException("Unsupported array element type: " + elementType);
406
}
407
}
408
}
409
410
return arrayNode;
411
}
412
}
413
```
414
415
### SQL DDL Usage Examples
416
417
```sql
418
-- Create HTTP table sink with async configuration
419
CREATE TABLE http_sink (
420
user_id BIGINT,
421
event_name STRING,
422
event_time TIMESTAMP(3),
423
properties MAP<STRING, STRING>
424
) WITH (
425
'connector' = 'http',
426
'endpoint' = 'https://api.example.com/events',
427
'method' = 'POST',
428
'format' = 'json',
429
'headers.Content-Type' = 'application/json',
430
'headers.User-Agent' = 'Flink-HTTP-Sink/1.0',
431
'auth.token' = 'your-auth-token',
432
433
-- Async sink configuration
434
'sink.max-batch-size' = '100',
435
'sink.max-batch-size-in-bytes' = '1048576', -- 1MB
436
'sink.max-in-flight-requests' = '10',
437
'sink.max-buffered-requests' = '1000',
438
'sink.max-time-in-buffer-ms' = '5000',
439
'sink.max-record-size-in-bytes' = '262144', -- 256KB
440
'sink.request-timeout-ms' = '30000',
441
'sink.fail-on-timeout' = 'false'
442
);
443
444
-- Insert data into the HTTP sink
445
INSERT INTO http_sink
446
SELECT
447
user_id,
448
event_name,
449
event_time,
450
properties
451
FROM source_table;
452
```
453
454
### Advanced Table Sink with Multiple Formats
455
456
```java
457
public class MultiFormatTableSinkFactory extends AsyncDynamicTableSinkFactory {
458
459
public static final ConfigOption<String> FORMAT =
460
ConfigOptions.key("format")
461
.stringType()
462
.defaultValue("json")
463
.withDescription("Serialization format (json, avro, protobuf, csv)");
464
465
public static final ConfigOption<String> SCHEMA_REGISTRY_URL =
466
ConfigOptions.key("schema-registry.url")
467
.stringType()
468
.noDefaultValue()
469
.withDescription("Schema registry URL for Avro/Protobuf formats");
470
471
public static final ConfigOption<String> SUBJECT_NAME =
472
ConfigOptions.key("schema-registry.subject")
473
.stringType()
474
.noDefaultValue()
475
.withDescription("Schema registry subject name");
476
477
@Override
478
public DynamicTableSink createDynamicTableSink(Context context) {
479
ReadableConfig config = context.getConfiguration();
480
String format = config.get(FORMAT);
481
482
// Create format-specific serializer
483
RecordSerializer<RowData> serializer = createFormatSerializer(format, context, config);
484
485
// Create sink with serializer
486
return createTableSink(context, serializer, config);
487
}
488
489
private RecordSerializer<RowData> createFormatSerializer(
490
String format,
491
Context context,
492
ReadableConfig config) {
493
494
DataType dataType = context.getPhysicalRowDataType();
495
496
switch (format.toLowerCase()) {
497
case "json":
498
return new JsonRowDataSerializer(dataType);
499
500
case "avro":
501
String schemaRegistryUrl = config.get(SCHEMA_REGISTRY_URL);
502
String subjectName = config.get(SUBJECT_NAME);
503
return new AvroRowDataSerializer(dataType, schemaRegistryUrl, subjectName);
504
505
case "protobuf":
506
return new ProtobufRowDataSerializer(dataType, config);
507
508
case "csv":
509
return new CsvRowDataSerializer(dataType, config);
510
511
default:
512
throw new ValidationException("Unsupported format: " + format);
513
}
514
}
515
}
516
517
// Avro serializer with schema registry
518
public class AvroRowDataSerializer implements RecordSerializer<RowData> {
519
private final DataType dataType;
520
private final String schemaRegistryUrl;
521
private final String subjectName;
522
private Schema avroSchema;
523
private CachedSchemaRegistryClient schemaRegistryClient;
524
private KafkaAvroSerializer avroSerializer;
525
526
public AvroRowDataSerializer(DataType dataType, String schemaRegistryUrl, String subjectName) {
527
this.dataType = dataType;
528
this.schemaRegistryUrl = schemaRegistryUrl;
529
this.subjectName = subjectName;
530
}
531
532
@Override
533
public void open(WriterInitContext context) {
534
try {
535
// Initialize schema registry client
536
this.schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100);
537
538
// Get Avro schema from registry
539
this.avroSchema = schemaRegistryClient.getLatestSchemaMetadata(subjectName).getSchema();
540
541
// Initialize Avro serializer
542
Map<String, Object> props = new HashMap<>();
543
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
544
this.avroSerializer = new KafkaAvroSerializer(schemaRegistryClient, props);
545
546
} catch (Exception e) {
547
throw new RuntimeException("Failed to initialize Avro serializer", e);
548
}
549
}
550
551
@Override
552
public byte[] serialize(RowData rowData) throws IOException {
553
try {
554
// Convert RowData to Avro GenericRecord
555
GenericRecord genericRecord = convertToGenericRecord(rowData);
556
557
// Serialize with schema registry
558
return avroSerializer.serialize(subjectName, genericRecord);
559
560
} catch (Exception e) {
561
throw new IOException("Failed to serialize RowData to Avro", e);
562
}
563
}
564
565
private GenericRecord convertToGenericRecord(RowData rowData) {
566
GenericRecord record = new GenericData.Record(avroSchema);
567
568
// Convert fields based on schema
569
List<Schema.Field> fields = avroSchema.getFields();
570
for (int i = 0; i < fields.size(); i++) {
571
Schema.Field field = fields.get(i);
572
573
if (!rowData.isNullAt(i)) {
574
Object value = convertFieldValue(rowData, i, field.schema());
575
record.put(field.name(), value);
576
}
577
}
578
579
return record;
580
}
581
582
private Object convertFieldValue(RowData rowData, int pos, Schema fieldSchema) {
583
// Implementation depends on field type mapping
584
switch (fieldSchema.getType()) {
585
case BOOLEAN:
586
return rowData.getBoolean(pos);
587
case INT:
588
return rowData.getInt(pos);
589
case LONG:
590
return rowData.getLong(pos);
591
case STRING:
592
return rowData.getString(pos).toString();
593
// ... handle other Avro types
594
default:
595
throw new UnsupportedOperationException("Unsupported Avro type: " + fieldSchema.getType());
596
}
597
}
598
}
599
```
600
601
### Configuration Validation
602
603
```java
604
public class ComprehensiveAsyncSinkConfigurationValidator implements ConfigurationValidator {
605
606
// Async sink configuration options
607
public static final ConfigOption<Integer> MAX_BATCH_SIZE =
608
ConfigOptions.key("sink.max-batch-size")
609
.intType()
610
.defaultValue(100)
611
.withDescription("Maximum number of records per batch");
612
613
public static final ConfigOption<Long> MAX_BATCH_SIZE_IN_BYTES =
614
ConfigOptions.key("sink.max-batch-size-in-bytes")
615
.longType()
616
.defaultValue(1024 * 1024L) // 1MB
617
.withDescription("Maximum batch size in bytes");
618
619
public static final ConfigOption<Integer> MAX_IN_FLIGHT_REQUESTS =
620
ConfigOptions.key("sink.max-in-flight-requests")
621
.intType()
622
.defaultValue(10)
623
.withDescription("Maximum number of concurrent requests");
624
625
public static final ConfigOption<Integer> MAX_BUFFERED_REQUESTS =
626
ConfigOptions.key("sink.max-buffered-requests")
627
.intType()
628
.defaultValue(1000)
629
.withDescription("Maximum number of buffered requests");
630
631
public static final ConfigOption<Long> MAX_TIME_IN_BUFFER_MS =
632
ConfigOptions.key("sink.max-time-in-buffer-ms")
633
.longType()
634
.defaultValue(5000L)
635
.withDescription("Maximum time records stay in buffer (milliseconds)");
636
637
public static final ConfigOption<Long> MAX_RECORD_SIZE_IN_BYTES =
638
ConfigOptions.key("sink.max-record-size-in-bytes")
639
.longType()
640
.defaultValue(256 * 1024L) // 256KB
641
.withDescription("Maximum size of individual records in bytes");
642
643
public static final ConfigOption<Long> REQUEST_TIMEOUT_MS =
644
ConfigOptions.key("sink.request-timeout-ms")
645
.longType()
646
.defaultValue(30000L)
647
.withDescription("Request timeout in milliseconds");
648
649
public static final ConfigOption<Boolean> FAIL_ON_TIMEOUT =
650
ConfigOptions.key("sink.fail-on-timeout")
651
.booleanType()
652
.defaultValue(false)
653
.withDescription("Whether to fail job on request timeout");
654
655
@Override
656
public void validate(ReadableConfig configuration) throws ValidationException {
657
// Validate batch size constraints
658
int maxBatchSize = configuration.get(MAX_BATCH_SIZE);
659
int maxBufferedRequests = configuration.get(MAX_BUFFERED_REQUESTS);
660
661
if (maxBatchSize <= 0) {
662
throw new ValidationException("max-batch-size must be positive, got: " + maxBatchSize);
663
}
664
665
if (maxBufferedRequests <= maxBatchSize) {
666
throw new ValidationException(
667
"max-buffered-requests (" + maxBufferedRequests +
668
") must be greater than max-batch-size (" + maxBatchSize + ")");
669
}
670
671
// Validate size constraints
672
long maxBatchSizeInBytes = configuration.get(MAX_BATCH_SIZE_IN_BYTES);
673
long maxRecordSizeInBytes = configuration.get(MAX_RECORD_SIZE_IN_BYTES);
674
675
if (maxBatchSizeInBytes < maxRecordSizeInBytes) {
676
throw new ValidationException(
677
"max-batch-size-in-bytes (" + maxBatchSizeInBytes +
678
") must be >= max-record-size-in-bytes (" + maxRecordSizeInBytes + ")");
679
}
680
681
// Validate timeout settings
682
long requestTimeout = configuration.get(REQUEST_TIMEOUT_MS);
683
long maxTimeInBuffer = configuration.get(MAX_TIME_IN_BUFFER_MS);
684
685
if (requestTimeout <= 0) {
686
throw new ValidationException("request-timeout-ms must be positive, got: " + requestTimeout);
687
}
688
689
if (maxTimeInBuffer <= 0) {
690
throw new ValidationException("max-time-in-buffer-ms must be positive, got: " + maxTimeInBuffer);
691
}
692
693
// Warn if timeout is too short
694
if (requestTimeout < maxTimeInBuffer) {
695
LOG.warn("request-timeout-ms ({}) is shorter than max-time-in-buffer-ms ({}), " +
696
"this may cause premature timeouts", requestTimeout, maxTimeInBuffer);
697
}
698
699
// Validate in-flight request limits
700
int maxInFlightRequests = configuration.get(MAX_IN_FLIGHT_REQUESTS);
701
if (maxInFlightRequests <= 0) {
702
throw new ValidationException("max-in-flight-requests must be positive, got: " + maxInFlightRequests);
703
}
704
}
705
}
706
```
707
708
## Best Practices
709
710
### Performance Optimization for Table Sinks
711
712
```java
713
public class OptimizedTableSinkFactory extends AsyncDynamicTableSinkFactory {
714
715
@Override
716
public DynamicTableSink createDynamicTableSink(Context context) {
717
ReadableConfig config = context.getConfiguration();
718
719
// Optimize configuration based on table characteristics
720
AsyncSinkWriterConfiguration optimizedConfig = optimizeConfiguration(
721
config,
722
context.getPhysicalRowDataType(),
723
context.isStreamingMode()
724
);
725
726
return createOptimizedSink(context, optimizedConfig);
727
}
728
729
private AsyncSinkWriterConfiguration optimizeConfiguration(
730
ReadableConfig config,
731
DataType rowDataType,
732
boolean isStreaming) {
733
734
AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder builder =
735
AsyncSinkWriterConfiguration.builder();
736
737
// Calculate optimal batch size based on row size
738
int estimatedRowSize = estimateRowSize(rowDataType);
739
int optimalBatchSize = calculateOptimalBatchSize(estimatedRowSize, isStreaming);
740
741
builder.setMaxBatchSize(optimalBatchSize);
742
743
// Adjust buffer sizes for streaming vs batch mode
744
if (isStreaming) {
745
// Smaller buffers for lower latency
746
builder.setMaxTimeInBufferMS(1000)
747
.setMaxBufferedRequests(optimalBatchSize * 5);
748
} else {
749
// Larger buffers for higher throughput
750
builder.setMaxTimeInBufferMS(10000)
751
.setMaxBufferedRequests(optimalBatchSize * 20);
752
}
753
754
// Set other optimized values...
755
return builder.build();
756
}
757
758
private int estimateRowSize(DataType dataType) {
759
// Estimate based on data type structure
760
if (dataType instanceof RowType) {
761
RowType rowType = (RowType) dataType;
762
return rowType.getFields().stream()
763
.mapToInt(this::estimateFieldSize)
764
.sum();
765
}
766
return 100; // Default estimate
767
}
768
769
private int estimateFieldSize(RowType.RowField field) {
770
LogicalType type = field.getType();
771
switch (type.getTypeRoot()) {
772
case BOOLEAN:
773
case TINYINT:
774
return 1;
775
case SMALLINT:
776
return 2;
777
case INTEGER:
778
case FLOAT:
779
case DATE:
780
return 4;
781
case BIGINT:
782
case DOUBLE:
783
case TIMESTAMP_WITHOUT_TIME_ZONE:
784
return 8;
785
case VARCHAR:
786
case CHAR:
787
VarCharType varCharType = (VarCharType) type;
788
return varCharType.getLength();
789
default:
790
return 50; // Conservative estimate
791
}
792
}
793
}
794
```
795
796
### Error Handling and Monitoring
797
798
```java
799
public class MonitoredTableSink extends AsyncDynamicTableSink {
800
private final MetricGroup metricGroup;
801
private final Counter recordsSent;
802
private final Counter recordsFailed;
803
private final Histogram serializationTime;
804
805
public MonitoredTableSink(
806
DataType physicalRowDataType,
807
AsyncSinkBase<RowData, ?> asyncSinkBase,
808
MetricGroup metricGroup) {
809
super(physicalRowDataType, asyncSinkBase);
810
this.metricGroup = metricGroup;
811
812
this.recordsSent = metricGroup.counter("records_sent");
813
this.recordsFailed = metricGroup.counter("records_failed");
814
this.serializationTime = metricGroup.histogram("serialization_time_ms");
815
}
816
817
@Override
818
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
819
return SinkV2Provider.of(new MonitoredAsyncSinkWrapper(
820
asyncSinkBase,
821
recordsSent,
822
recordsFailed,
823
serializationTime
824
));
825
}
826
}
827
828
public class MonitoredAsyncSinkWrapper<T> implements Sink<T> {
829
private final Sink<T> delegate;
830
private final Counter recordsSent;
831
private final Counter recordsFailed;
832
private final Histogram serializationTime;
833
834
// Implementation that wraps calls with metrics...
835
}
836
```
837
838
The Table API Integration provides a complete bridge between Flink's table ecosystem and the advanced async sink framework, enabling powerful, high-performance table sinks with comprehensive configuration options and monitoring capabilities.