or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-sink.mdhybrid-source.mdindex.mdrate-limiting.mdsource-reader.mdtable-api.md

table-api.mddocs/

0

# Table API Integration

1

2

The Table API Integration components provide seamless integration between the async sink framework and Flink's Table API and SQL. This enables building table sinks with all the advanced features of async sinks while maintaining compatibility with Flink's unified batch and streaming API.

3

4

## Core Components

5

6

### AsyncDynamicTableSinkFactory

7

8

Abstract base factory for creating table sinks with async sink capabilities.

9

10

```java { .api }

11

@PublicEvolving

12

public abstract class AsyncDynamicTableSinkFactory implements DynamicTableSinkFactory {

13

14

// Required method implementations

15

public Set<ConfigOption<?>> requiredOptions()

16

public Set<ConfigOption<?>> optionalOptions()

17

18

// Protected helper methods

19

protected AsyncDynamicTableSinkBuilder<?, ?> addAsyncOptionsToBuilder(

20

Properties properties,

21

AsyncDynamicTableSinkBuilder<?, ?> builder)

22

23

// Inner class for context

24

public static class AsyncDynamicSinkContext {

25

public DataType getPhysicalRowDataType()

26

public ReadableConfig getConfiguration()

27

public ClassLoader getClassLoader()

28

public boolean isStreamingMode()

29

}

30

}

31

```

32

33

### AsyncDynamicTableSink

34

35

Table sink implementation that bridges table operations to async sink writers.

36

37

```java { .api }

38

@PublicEvolving

39

public class AsyncDynamicTableSink implements DynamicTableSink {

40

41

// Constructor

42

protected AsyncDynamicTableSink(

43

DataType physicalRowDataType,

44

AsyncSinkBase<RowData, ?> asyncSinkBase)

45

46

// DynamicTableSink implementation

47

public ChangelogMode getChangelogMode(ChangelogMode requestedMode)

48

public SinkRuntimeProvider getSinkRuntimeProvider(Context context)

49

public DynamicTableSink copy()

50

public String asSummaryString()

51

}

52

```

53

54

### ConfigurationValidator

55

56

Interface for validating table sink configurations.

57

58

```java { .api }

59

@PublicEvolving

60

public interface ConfigurationValidator {

61

void validate(ReadableConfig configuration) throws ValidationException

62

}

63

```

64

65

### AsyncSinkConfigurationValidator

66

67

Built-in validator for async sink configuration options.

68

69

```java { .api }

70

@PublicEvolving

71

public class AsyncSinkConfigurationValidator implements ConfigurationValidator {

72

73

public AsyncSinkConfigurationValidator()

74

75

public void validate(ReadableConfig configuration) throws ValidationException

76

}

77

```

78

79

## Implementation Examples

80

81

### Complete Table Sink Factory

82

83

```java

84

public class HttpTableSinkFactory extends AsyncDynamicTableSinkFactory {

85

86

// Configuration options

87

public static final ConfigOption<String> ENDPOINT =

88

ConfigOptions.key("endpoint")

89

.stringType()

90

.noDefaultValue()

91

.withDescription("HTTP endpoint URL for sending data");

92

93

public static final ConfigOption<String> METHOD =

94

ConfigOptions.key("method")

95

.stringType()

96

.defaultValue("POST")

97

.withDescription("HTTP method to use");

98

99

public static final ConfigOption<Map<String, String>> HEADERS =

100

ConfigOptions.key("headers")

101

.mapType()

102

.defaultValue(Collections.emptyMap())

103

.withDescription("HTTP headers to include in requests");

104

105

public static final ConfigOption<String> AUTH_TOKEN =

106

ConfigOptions.key("auth.token")

107

.stringType()

108

.noDefaultValue()

109

.withDescription("Authentication token for HTTP requests");

110

111

public static final ConfigOption<String> RECORD_FORMAT =

112

ConfigOptions.key("format")

113

.stringType()

114

.defaultValue("json")

115

.withDescription("Format for serializing records (json, avro, csv)");

116

117

@Override

118

public String factoryIdentifier() {

119

return "http";

120

}

121

122

@Override

123

public Set<ConfigOption<?>> requiredOptions() {

124

return Collections.singleton(ENDPOINT);

125

}

126

127

@Override

128

public Set<ConfigOption<?>> optionalOptions() {

129

Set<ConfigOption<?>> options = new HashSet<>(super.optionalOptions());

130

options.addAll(Arrays.asList(

131

METHOD,

132

HEADERS,

133

AUTH_TOKEN,

134

RECORD_FORMAT

135

));

136

return options;

137

}

138

139

@Override

140

public DynamicTableSink createDynamicTableSink(Context context) {

141

// Validate configuration

142

ReadableConfig config = context.getConfiguration();

143

validateConfiguration(config);

144

145

// Extract configuration values

146

String endpoint = config.get(ENDPOINT);

147

String method = config.get(METHOD);

148

Map<String, String> headers = config.get(HEADERS);

149

Optional<String> authToken = config.getOptional(AUTH_TOKEN);

150

String recordFormat = config.get(RECORD_FORMAT);

151

152

// Create record serializer based on format

153

RecordSerializer<RowData> serializer = createSerializer(

154

recordFormat,

155

context.getPhysicalRowDataType()

156

);

157

158

// Create HTTP client configuration

159

HttpClientConfig clientConfig = HttpClientConfig.builder()

160

.setEndpoint(endpoint)

161

.setMethod(method)

162

.setHeaders(headers)

163

.setAuthToken(authToken.orElse(null))

164

.build();

165

166

// Create element converter

167

HttpElementConverter elementConverter = new HttpElementConverter(serializer, clientConfig);

168

169

// Create async sink base with configuration from table properties

170

AsyncSinkWriterConfiguration writerConfig = createAsyncWriterConfiguration(config);

171

172

HttpAsyncSinkBase asyncSink = new HttpAsyncSinkBase(

173

elementConverter,

174

writerConfig,

175

clientConfig

176

);

177

178

// Return table sink

179

return new AsyncDynamicTableSink(context.getPhysicalRowDataType(), asyncSink);

180

}

181

182

private void validateConfiguration(ReadableConfig config) {

183

// Basic validation

184

String endpoint = config.get(ENDPOINT);

185

if (endpoint == null || endpoint.isEmpty()) {

186

throw new ValidationException("HTTP endpoint must be specified");

187

}

188

189

try {

190

new URL(endpoint);

191

} catch (MalformedURLException e) {

192

throw new ValidationException("Invalid HTTP endpoint URL: " + endpoint, e);

193

}

194

195

// Validate async sink options

196

AsyncSinkConfigurationValidator asyncValidator = new AsyncSinkConfigurationValidator();

197

asyncValidator.validate(config);

198

}

199

200

private RecordSerializer<RowData> createSerializer(String format, DataType dataType) {

201

switch (format.toLowerCase()) {

202

case "json":

203

return new JsonRowDataSerializer(dataType);

204

case "avro":

205

return new AvroRowDataSerializer(dataType);

206

case "csv":

207

return new CsvRowDataSerializer(dataType);

208

default:

209

throw new ValidationException("Unsupported record format: " + format);

210

}

211

}

212

213

private AsyncSinkWriterConfiguration createAsyncWriterConfiguration(ReadableConfig config) {

214

AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder builder =

215

AsyncSinkWriterConfiguration.builder();

216

217

// Add async options from table configuration

218

addAsyncOptionsToBuilder(toProperties(config), new AsyncSinkWriterConfigurationBuilderAdapter(builder));

219

220

return builder.build();

221

}

222

223

private Properties toProperties(ReadableConfig config) {

224

Properties properties = new Properties();

225

config.toMap().forEach(properties::setProperty);

226

return properties;

227

}

228

}

229

230

// Adapter to bridge the builder interfaces

231

public class AsyncSinkWriterConfigurationBuilderAdapter implements AsyncDynamicTableSinkBuilder<RowData, HttpRequestEntry> {

232

private final AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder delegate;

233

234

public AsyncSinkWriterConfigurationBuilderAdapter(

235

AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder delegate) {

236

this.delegate = delegate;

237

}

238

239

// Implement bridge methods...

240

public AsyncDynamicTableSinkBuilder<RowData, HttpRequestEntry> setMaxBatchSize(int maxBatchSize) {

241

delegate.setMaxBatchSize(maxBatchSize);

242

return this;

243

}

244

245

// ... other bridge methods

246

}

247

```

248

249

### Element Converter for Table Data

250

251

```java

252

public class HttpElementConverter implements ElementConverter<RowData, HttpRequestEntry> {

253

private final RecordSerializer<RowData> serializer;

254

private final HttpClientConfig clientConfig;

255

256

public HttpElementConverter(RecordSerializer<RowData> serializer, HttpClientConfig clientConfig) {

257

this.serializer = serializer;

258

this.clientConfig = clientConfig;

259

}

260

261

@Override

262

public HttpRequestEntry apply(RowData element, SinkWriter.Context context) {

263

try {

264

// Serialize the row data

265

byte[] payload = serializer.serialize(element);

266

267

// Create HTTP request entry

268

return new HttpRequestEntry(

269

clientConfig.getEndpoint(),

270

clientConfig.getMethod(),

271

clientConfig.getHeaders(),

272

payload,

273

context.timestamp(),

274

generateRequestId()

275

);

276

} catch (Exception e) {

277

throw new RuntimeException("Failed to convert row data to HTTP request", e);

278

}

279

}

280

281

@Override

282

public void open(WriterInitContext context) {

283

serializer.open(context);

284

}

285

286

private String generateRequestId() {

287

return UUID.randomUUID().toString();

288

}

289

}

290

291

// JSON serializer for RowData

292

public class JsonRowDataSerializer implements RecordSerializer<RowData> {

293

private final DataType dataType;

294

private final ObjectMapper objectMapper;

295

private final RowDataToJsonConverter converter;

296

297

public JsonRowDataSerializer(DataType dataType) {

298

this.dataType = dataType;

299

this.objectMapper = new ObjectMapper();

300

this.converter = new RowDataToJsonConverter(dataType.getLogicalType());

301

}

302

303

@Override

304

public byte[] serialize(RowData rowData) throws IOException {

305

JsonNode jsonNode = converter.convert(rowData);

306

return objectMapper.writeValueAsBytes(jsonNode);

307

}

308

309

@Override

310

public void open(WriterInitContext context) {

311

// Configure object mapper

312

objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);

313

objectMapper.registerModule(new JavaTimeModule());

314

}

315

}

316

317

// Converter from RowData to JSON

318

public class RowDataToJsonConverter {

319

private final LogicalType logicalType;

320

321

public RowDataToJsonConverter(LogicalType logicalType) {

322

this.logicalType = logicalType;

323

}

324

325

public JsonNode convert(RowData rowData) {

326

ObjectNode objectNode = JsonNodeFactory.instance.objectNode();

327

328

if (logicalType instanceof RowType) {

329

RowType rowType = (RowType) logicalType;

330

List<RowType.RowField> fields = rowType.getFields();

331

332

for (int i = 0; i < fields.size(); i++) {

333

RowType.RowField field = fields.get(i);

334

String fieldName = field.getName();

335

LogicalType fieldType = field.getType();

336

337

if (rowData.isNullAt(i)) {

338

objectNode.putNull(fieldName);

339

} else {

340

JsonNode fieldValue = convertField(rowData, i, fieldType);

341

objectNode.set(fieldName, fieldValue);

342

}

343

}

344

}

345

346

return objectNode;

347

}

348

349

private JsonNode convertField(RowData rowData, int pos, LogicalType fieldType) {

350

switch (fieldType.getTypeRoot()) {

351

case BOOLEAN:

352

return JsonNodeFactory.instance.booleanNode(rowData.getBoolean(pos));

353

case TINYINT:

354

return JsonNodeFactory.instance.numberNode(rowData.getByte(pos));

355

case SMALLINT:

356

return JsonNodeFactory.instance.numberNode(rowData.getShort(pos));

357

case INTEGER:

358

case DATE:

359

case TIME_WITHOUT_TIME_ZONE:

360

return JsonNodeFactory.instance.numberNode(rowData.getInt(pos));

361

case BIGINT:

362

case TIMESTAMP_WITHOUT_TIME_ZONE:

363

case TIMESTAMP_WITH_LOCAL_TIME_ZONE:

364

return JsonNodeFactory.instance.numberNode(rowData.getLong(pos));

365

case FLOAT:

366

return JsonNodeFactory.instance.numberNode(rowData.getFloat(pos));

367

case DOUBLE:

368

return JsonNodeFactory.instance.numberNode(rowData.getDouble(pos));

369

case VARCHAR:

370

case CHAR:

371

return JsonNodeFactory.instance.textNode(rowData.getString(pos).toString());

372

case DECIMAL:

373

return JsonNodeFactory.instance.numberNode(

374

rowData.getDecimal(pos, fieldType.getPrecision(), fieldType.getScale()).toBigDecimal());

375

case ARRAY:

376

ArrayType arrayType = (ArrayType) fieldType;

377

ArrayData arrayData = rowData.getArray(pos);

378

return convertArray(arrayData, arrayType.getElementType());

379

case ROW:

380

RowType nestedRowType = (RowType) fieldType;

381

RowData nestedRowData = rowData.getRow(pos, nestedRowType.getFieldCount());

382

return new RowDataToJsonConverter(nestedRowType).convert(nestedRowData);

383

default:

384

throw new UnsupportedOperationException("Unsupported type: " + fieldType);

385

}

386

}

387

388

private JsonNode convertArray(ArrayData arrayData, LogicalType elementType) {

389

ArrayNode arrayNode = JsonNodeFactory.instance.arrayNode();

390

391

for (int i = 0; i < arrayData.size(); i++) {

392

if (arrayData.isNullAt(i)) {

393

arrayNode.addNull();

394

} else {

395

// Convert array element (simplified - would need full implementation)

396

switch (elementType.getTypeRoot()) {

397

case INTEGER:

398

arrayNode.add(arrayData.getInt(i));

399

break;

400

case VARCHAR:

401

arrayNode.add(arrayData.getString(i).toString());

402

break;

403

// ... handle other types

404

default:

405

throw new UnsupportedOperationException("Unsupported array element type: " + elementType);

406

}

407

}

408

}

409

410

return arrayNode;

411

}

412

}

413

```

414

415

### SQL DDL Usage Examples

416

417

```sql

418

-- Create HTTP table sink with async configuration

419

CREATE TABLE http_sink (

420

user_id BIGINT,

421

event_name STRING,

422

event_time TIMESTAMP(3),

423

properties MAP<STRING, STRING>

424

) WITH (

425

'connector' = 'http',

426

'endpoint' = 'https://api.example.com/events',

427

'method' = 'POST',

428

'format' = 'json',

429

'headers.Content-Type' = 'application/json',

430

'headers.User-Agent' = 'Flink-HTTP-Sink/1.0',

431

'auth.token' = 'your-auth-token',

432

433

-- Async sink configuration

434

'sink.max-batch-size' = '100',

435

'sink.max-batch-size-in-bytes' = '1048576', -- 1MB

436

'sink.max-in-flight-requests' = '10',

437

'sink.max-buffered-requests' = '1000',

438

'sink.max-time-in-buffer-ms' = '5000',

439

'sink.max-record-size-in-bytes' = '262144', -- 256KB

440

'sink.request-timeout-ms' = '30000',

441

'sink.fail-on-timeout' = 'false'

442

);

443

444

-- Insert data into the HTTP sink

445

INSERT INTO http_sink

446

SELECT

447

user_id,

448

event_name,

449

event_time,

450

properties

451

FROM source_table;

452

```

453

454

### Advanced Table Sink with Multiple Formats

455

456

```java

457

public class MultiFormatTableSinkFactory extends AsyncDynamicTableSinkFactory {

458

459

public static final ConfigOption<String> FORMAT =

460

ConfigOptions.key("format")

461

.stringType()

462

.defaultValue("json")

463

.withDescription("Serialization format (json, avro, protobuf, csv)");

464

465

public static final ConfigOption<String> SCHEMA_REGISTRY_URL =

466

ConfigOptions.key("schema-registry.url")

467

.stringType()

468

.noDefaultValue()

469

.withDescription("Schema registry URL for Avro/Protobuf formats");

470

471

public static final ConfigOption<String> SUBJECT_NAME =

472

ConfigOptions.key("schema-registry.subject")

473

.stringType()

474

.noDefaultValue()

475

.withDescription("Schema registry subject name");

476

477

@Override

478

public DynamicTableSink createDynamicTableSink(Context context) {

479

ReadableConfig config = context.getConfiguration();

480

String format = config.get(FORMAT);

481

482

// Create format-specific serializer

483

RecordSerializer<RowData> serializer = createFormatSerializer(format, context, config);

484

485

// Create sink with serializer

486

return createTableSink(context, serializer, config);

487

}

488

489

private RecordSerializer<RowData> createFormatSerializer(

490

String format,

491

Context context,

492

ReadableConfig config) {

493

494

DataType dataType = context.getPhysicalRowDataType();

495

496

switch (format.toLowerCase()) {

497

case "json":

498

return new JsonRowDataSerializer(dataType);

499

500

case "avro":

501

String schemaRegistryUrl = config.get(SCHEMA_REGISTRY_URL);

502

String subjectName = config.get(SUBJECT_NAME);

503

return new AvroRowDataSerializer(dataType, schemaRegistryUrl, subjectName);

504

505

case "protobuf":

506

return new ProtobufRowDataSerializer(dataType, config);

507

508

case "csv":

509

return new CsvRowDataSerializer(dataType, config);

510

511

default:

512

throw new ValidationException("Unsupported format: " + format);

513

}

514

}

515

}

516

517

// Avro serializer with schema registry

518

public class AvroRowDataSerializer implements RecordSerializer<RowData> {

519

private final DataType dataType;

520

private final String schemaRegistryUrl;

521

private final String subjectName;

522

private Schema avroSchema;

523

private CachedSchemaRegistryClient schemaRegistryClient;

524

private KafkaAvroSerializer avroSerializer;

525

526

public AvroRowDataSerializer(DataType dataType, String schemaRegistryUrl, String subjectName) {

527

this.dataType = dataType;

528

this.schemaRegistryUrl = schemaRegistryUrl;

529

this.subjectName = subjectName;

530

}

531

532

@Override

533

public void open(WriterInitContext context) {

534

try {

535

// Initialize schema registry client

536

this.schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100);

537

538

// Get Avro schema from registry

539

this.avroSchema = schemaRegistryClient.getLatestSchemaMetadata(subjectName).getSchema();

540

541

// Initialize Avro serializer

542

Map<String, Object> props = new HashMap<>();

543

props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);

544

this.avroSerializer = new KafkaAvroSerializer(schemaRegistryClient, props);

545

546

} catch (Exception e) {

547

throw new RuntimeException("Failed to initialize Avro serializer", e);

548

}

549

}

550

551

@Override

552

public byte[] serialize(RowData rowData) throws IOException {

553

try {

554

// Convert RowData to Avro GenericRecord

555

GenericRecord genericRecord = convertToGenericRecord(rowData);

556

557

// Serialize with schema registry

558

return avroSerializer.serialize(subjectName, genericRecord);

559

560

} catch (Exception e) {

561

throw new IOException("Failed to serialize RowData to Avro", e);

562

}

563

}

564

565

private GenericRecord convertToGenericRecord(RowData rowData) {

566

GenericRecord record = new GenericData.Record(avroSchema);

567

568

// Convert fields based on schema

569

List<Schema.Field> fields = avroSchema.getFields();

570

for (int i = 0; i < fields.size(); i++) {

571

Schema.Field field = fields.get(i);

572

573

if (!rowData.isNullAt(i)) {

574

Object value = convertFieldValue(rowData, i, field.schema());

575

record.put(field.name(), value);

576

}

577

}

578

579

return record;

580

}

581

582

private Object convertFieldValue(RowData rowData, int pos, Schema fieldSchema) {

583

// Implementation depends on field type mapping

584

switch (fieldSchema.getType()) {

585

case BOOLEAN:

586

return rowData.getBoolean(pos);

587

case INT:

588

return rowData.getInt(pos);

589

case LONG:

590

return rowData.getLong(pos);

591

case STRING:

592

return rowData.getString(pos).toString();

593

// ... handle other Avro types

594

default:

595

throw new UnsupportedOperationException("Unsupported Avro type: " + fieldSchema.getType());

596

}

597

}

598

}

599

```

600

601

### Configuration Validation

602

603

```java

604

public class ComprehensiveAsyncSinkConfigurationValidator implements ConfigurationValidator {

605

606

// Async sink configuration options

607

public static final ConfigOption<Integer> MAX_BATCH_SIZE =

608

ConfigOptions.key("sink.max-batch-size")

609

.intType()

610

.defaultValue(100)

611

.withDescription("Maximum number of records per batch");

612

613

public static final ConfigOption<Long> MAX_BATCH_SIZE_IN_BYTES =

614

ConfigOptions.key("sink.max-batch-size-in-bytes")

615

.longType()

616

.defaultValue(1024 * 1024L) // 1MB

617

.withDescription("Maximum batch size in bytes");

618

619

public static final ConfigOption<Integer> MAX_IN_FLIGHT_REQUESTS =

620

ConfigOptions.key("sink.max-in-flight-requests")

621

.intType()

622

.defaultValue(10)

623

.withDescription("Maximum number of concurrent requests");

624

625

public static final ConfigOption<Integer> MAX_BUFFERED_REQUESTS =

626

ConfigOptions.key("sink.max-buffered-requests")

627

.intType()

628

.defaultValue(1000)

629

.withDescription("Maximum number of buffered requests");

630

631

public static final ConfigOption<Long> MAX_TIME_IN_BUFFER_MS =

632

ConfigOptions.key("sink.max-time-in-buffer-ms")

633

.longType()

634

.defaultValue(5000L)

635

.withDescription("Maximum time records stay in buffer (milliseconds)");

636

637

public static final ConfigOption<Long> MAX_RECORD_SIZE_IN_BYTES =

638

ConfigOptions.key("sink.max-record-size-in-bytes")

639

.longType()

640

.defaultValue(256 * 1024L) // 256KB

641

.withDescription("Maximum size of individual records in bytes");

642

643

public static final ConfigOption<Long> REQUEST_TIMEOUT_MS =

644

ConfigOptions.key("sink.request-timeout-ms")

645

.longType()

646

.defaultValue(30000L)

647

.withDescription("Request timeout in milliseconds");

648

649

public static final ConfigOption<Boolean> FAIL_ON_TIMEOUT =

650

ConfigOptions.key("sink.fail-on-timeout")

651

.booleanType()

652

.defaultValue(false)

653

.withDescription("Whether to fail job on request timeout");

654

655

@Override

656

public void validate(ReadableConfig configuration) throws ValidationException {

657

// Validate batch size constraints

658

int maxBatchSize = configuration.get(MAX_BATCH_SIZE);

659

int maxBufferedRequests = configuration.get(MAX_BUFFERED_REQUESTS);

660

661

if (maxBatchSize <= 0) {

662

throw new ValidationException("max-batch-size must be positive, got: " + maxBatchSize);

663

}

664

665

if (maxBufferedRequests <= maxBatchSize) {

666

throw new ValidationException(

667

"max-buffered-requests (" + maxBufferedRequests +

668

") must be greater than max-batch-size (" + maxBatchSize + ")");

669

}

670

671

// Validate size constraints

672

long maxBatchSizeInBytes = configuration.get(MAX_BATCH_SIZE_IN_BYTES);

673

long maxRecordSizeInBytes = configuration.get(MAX_RECORD_SIZE_IN_BYTES);

674

675

if (maxBatchSizeInBytes < maxRecordSizeInBytes) {

676

throw new ValidationException(

677

"max-batch-size-in-bytes (" + maxBatchSizeInBytes +

678

") must be >= max-record-size-in-bytes (" + maxRecordSizeInBytes + ")");

679

}

680

681

// Validate timeout settings

682

long requestTimeout = configuration.get(REQUEST_TIMEOUT_MS);

683

long maxTimeInBuffer = configuration.get(MAX_TIME_IN_BUFFER_MS);

684

685

if (requestTimeout <= 0) {

686

throw new ValidationException("request-timeout-ms must be positive, got: " + requestTimeout);

687

}

688

689

if (maxTimeInBuffer <= 0) {

690

throw new ValidationException("max-time-in-buffer-ms must be positive, got: " + maxTimeInBuffer);

691

}

692

693

// Warn if timeout is too short

694

if (requestTimeout < maxTimeInBuffer) {

695

LOG.warn("request-timeout-ms ({}) is shorter than max-time-in-buffer-ms ({}), " +

696

"this may cause premature timeouts", requestTimeout, maxTimeInBuffer);

697

}

698

699

// Validate in-flight request limits

700

int maxInFlightRequests = configuration.get(MAX_IN_FLIGHT_REQUESTS);

701

if (maxInFlightRequests <= 0) {

702

throw new ValidationException("max-in-flight-requests must be positive, got: " + maxInFlightRequests);

703

}

704

}

705

}

706

```

707

708

## Best Practices

709

710

### Performance Optimization for Table Sinks

711

712

```java

713

public class OptimizedTableSinkFactory extends AsyncDynamicTableSinkFactory {

714

715

@Override

716

public DynamicTableSink createDynamicTableSink(Context context) {

717

ReadableConfig config = context.getConfiguration();

718

719

// Optimize configuration based on table characteristics

720

AsyncSinkWriterConfiguration optimizedConfig = optimizeConfiguration(

721

config,

722

context.getPhysicalRowDataType(),

723

context.isStreamingMode()

724

);

725

726

return createOptimizedSink(context, optimizedConfig);

727

}

728

729

private AsyncSinkWriterConfiguration optimizeConfiguration(

730

ReadableConfig config,

731

DataType rowDataType,

732

boolean isStreaming) {

733

734

AsyncSinkWriterConfiguration.AsyncSinkWriterConfigurationBuilder builder =

735

AsyncSinkWriterConfiguration.builder();

736

737

// Calculate optimal batch size based on row size

738

int estimatedRowSize = estimateRowSize(rowDataType);

739

int optimalBatchSize = calculateOptimalBatchSize(estimatedRowSize, isStreaming);

740

741

builder.setMaxBatchSize(optimalBatchSize);

742

743

// Adjust buffer sizes for streaming vs batch mode

744

if (isStreaming) {

745

// Smaller buffers for lower latency

746

builder.setMaxTimeInBufferMS(1000)

747

.setMaxBufferedRequests(optimalBatchSize * 5);

748

} else {

749

// Larger buffers for higher throughput

750

builder.setMaxTimeInBufferMS(10000)

751

.setMaxBufferedRequests(optimalBatchSize * 20);

752

}

753

754

// Set other optimized values...

755

return builder.build();

756

}

757

758

private int estimateRowSize(DataType dataType) {

759

// Estimate based on data type structure

760

if (dataType instanceof RowType) {

761

RowType rowType = (RowType) dataType;

762

return rowType.getFields().stream()

763

.mapToInt(this::estimateFieldSize)

764

.sum();

765

}

766

return 100; // Default estimate

767

}

768

769

private int estimateFieldSize(RowType.RowField field) {

770

LogicalType type = field.getType();

771

switch (type.getTypeRoot()) {

772

case BOOLEAN:

773

case TINYINT:

774

return 1;

775

case SMALLINT:

776

return 2;

777

case INTEGER:

778

case FLOAT:

779

case DATE:

780

return 4;

781

case BIGINT:

782

case DOUBLE:

783

case TIMESTAMP_WITHOUT_TIME_ZONE:

784

return 8;

785

case VARCHAR:

786

case CHAR:

787

VarCharType varCharType = (VarCharType) type;

788

return varCharType.getLength();

789

default:

790

return 50; // Conservative estimate

791

}

792

}

793

}

794

```

795

796

### Error Handling and Monitoring

797

798

```java

799

public class MonitoredTableSink extends AsyncDynamicTableSink {

800

private final MetricGroup metricGroup;

801

private final Counter recordsSent;

802

private final Counter recordsFailed;

803

private final Histogram serializationTime;

804

805

public MonitoredTableSink(

806

DataType physicalRowDataType,

807

AsyncSinkBase<RowData, ?> asyncSinkBase,

808

MetricGroup metricGroup) {

809

super(physicalRowDataType, asyncSinkBase);

810

this.metricGroup = metricGroup;

811

812

this.recordsSent = metricGroup.counter("records_sent");

813

this.recordsFailed = metricGroup.counter("records_failed");

814

this.serializationTime = metricGroup.histogram("serialization_time_ms");

815

}

816

817

@Override

818

public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {

819

return SinkV2Provider.of(new MonitoredAsyncSinkWrapper(

820

asyncSinkBase,

821

recordsSent,

822

recordsFailed,

823

serializationTime

824

));

825

}

826

}

827

828

public class MonitoredAsyncSinkWrapper<T> implements Sink<T> {

829

private final Sink<T> delegate;

830

private final Counter recordsSent;

831

private final Counter recordsFailed;

832

private final Histogram serializationTime;

833

834

// Implementation that wraps calls with metrics...

835

}

836

```

837

838

The Table API Integration provides a complete bridge between Flink's table ecosystem and the advanced async sink framework, enabling powerful, high-performance table sinks with comprehensive configuration options and monitoring capabilities.