0
# Connector Framework
1
2
Extensible connector architecture with built-in connectors for testing and development, plus framework for custom connector development in Apache Flink's Table API.
3
4
## Capabilities
5
6
### Connector Framework Interfaces
7
8
Base interfaces for implementing custom table sources and sinks.
9
10
```java { .api }
11
/**
12
* Base interface for dynamic table sources
13
*/
14
public interface DynamicTableSource extends TableSource {
15
/**
16
* Create a copy of this source with additional abilities
17
* @param abilities Required abilities for the source
18
* @return New source instance with the specified abilities
19
*/
20
public DynamicTableSource copy();
21
22
/**
23
* Get a string summary of this source
24
* @return Human-readable summary
25
*/
26
public String asSummaryString();
27
}
28
29
/**
30
* Interface for scan-based table sources (batch and streaming)
31
*/
32
public interface ScanTableSource extends DynamicTableSource {
33
/**
34
* Get the change log mode supported by this source
35
* @return Change log mode (INSERT_ONLY, UPSERT, etc.)
36
*/
37
public ChangelogMode getChangelogMode();
38
39
/**
40
* Create the actual source provider for runtime
41
* @param context Context with parallelism and other runtime info
42
* @return Source provider for the Flink runtime
43
*/
44
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context);
45
}
46
47
/**
48
* Interface for lookup table sources (for joins)
49
*/
50
public interface LookupTableSource extends DynamicTableSource {
51
/**
52
* Create the lookup runtime provider
53
* @param context Context with lookup configuration
54
* @return Runtime provider for lookup operations
55
*/
56
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
57
}
58
59
/**
60
* Base interface for dynamic table sinks
61
*/
62
public interface DynamicTableSink extends TableSink {
63
/**
64
* Get the change log mode consumed by this sink
65
* @param requestedMode Requested change log mode from query
66
* @return Change log mode accepted by this sink
67
*/
68
public ChangelogMode getChangelogMode(ChangelogMode requestedMode);
69
70
/**
71
* Create the sink runtime provider
72
* @param context Context with parallelism and other runtime info
73
* @return Sink provider for the Flink runtime
74
*/
75
public SinkRuntimeProvider getSinkRuntimeProvider(SinkContext context);
76
77
/**
78
* Create a copy of this sink with additional abilities
79
* @param abilities Required abilities for the sink
80
* @return New sink instance with the specified abilities
81
*/
82
public DynamicTableSink copy();
83
84
/**
85
* Get a string summary of this sink
86
* @return Human-readable summary
87
*/
88
public String asSummaryString();
89
}
90
```
91
92
### Connector Factory Interface
93
94
Interface for creating connectors dynamically based on configuration.
95
96
```java { .api }
97
/**
98
* Factory interface for creating dynamic table sources
99
*/
100
public interface DynamicTableSourceFactory extends Factory {
101
/**
102
* Create a table source based on the context
103
* @param context Creation context with options and schema
104
* @return Created table source instance
105
*/
106
public DynamicTableSource createDynamicTableSource(Context context);
107
}
108
109
/**
110
* Factory interface for creating dynamic table sinks
111
*/
112
public interface DynamicTableSinkFactory extends Factory {
113
/**
114
* Create a table sink based on the context
115
* @param context Creation context with options and schema
116
* @return Created table sink instance
117
*/
118
public DynamicTableSink createDynamicTableSink(Context context);
119
}
120
121
/**
122
* Base factory interface with common metadata
123
*/
124
public interface Factory {
125
/**
126
* Get unique identifier for this factory
127
* @return Factory identifier (e.g., "kafka", "filesystem")
128
*/
129
public String factoryIdentifier();
130
131
/**
132
* Get required configuration options
133
* @return Set of required configuration keys
134
*/
135
public Set<ConfigOption<?>> requiredOptions();
136
137
/**
138
* Get optional configuration options
139
* @return Set of optional configuration keys
140
*/
141
public Set<ConfigOption<?>> optionalOptions();
142
}
143
```
144
145
### Built-in Connectors
146
147
Ready-to-use connectors included in the uber JAR for testing and development.
148
149
```java { .api }
150
/**
151
* Factory for data generation connector (for testing)
152
*/
153
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
154
@Override
155
public String factoryIdentifier() {
156
return "datagen";
157
}
158
159
@Override
160
public Set<ConfigOption<?>> requiredOptions() {
161
return Collections.emptySet();
162
}
163
164
@Override
165
public Set<ConfigOption<?>> optionalOptions() {
166
return Set.of(
167
DataGenConnectorOptions.ROWS_PER_SECOND,
168
DataGenConnectorOptions.NUMBER_OF_ROWS,
169
DataGenConnectorOptions.FIELDS
170
);
171
}
172
}
173
174
/**
175
* Factory for print sink connector (for debugging)
176
*/
177
public class PrintTableSinkFactory implements DynamicTableSinkFactory {
178
@Override
179
public String factoryIdentifier() {
180
return "print";
181
}
182
183
@Override
184
public Set<ConfigOption<?>> requiredOptions() {
185
return Collections.emptySet();
186
}
187
188
@Override
189
public Set<ConfigOption<?>> optionalOptions() {
190
return Set.of(
191
PrintConnectorOptions.PRINT_IDENTIFIER,
192
PrintConnectorOptions.STANDARD_ERROR
193
);
194
}
195
}
196
197
/**
198
* Factory for blackhole sink connector (for performance testing)
199
*/
200
public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
201
@Override
202
public String factoryIdentifier() {
203
return "blackhole";
204
}
205
206
@Override
207
public Set<ConfigOption<?>> requiredOptions() {
208
return Collections.emptySet();
209
}
210
211
@Override
212
public Set<ConfigOption<?>> optionalOptions() {
213
return Collections.emptySet();
214
}
215
}
216
```
217
218
**Built-in Connector Usage:**
219
220
```java
221
// Data generation source for testing
222
tEnv.executeSql("CREATE TABLE test_source (" +
223
"id BIGINT," +
224
"name STRING," +
225
"amount DECIMAL(10,2)," +
226
"event_time TIMESTAMP(3)" +
227
") WITH (" +
228
"'connector' = 'datagen'," +
229
"'rows-per-second' = '100'," +
230
"'number-of-rows' = '10000'," +
231
"'fields.id.kind' = 'sequence'," +
232
"'fields.id.start' = '1'," +
233
"'fields.id.end' = '10000'," +
234
"'fields.name.length' = '10'," +
235
"'fields.amount.min' = '1.00'," +
236
"'fields.amount.max' = '1000.00'" +
237
")");
238
239
// Print sink for debugging output
240
tEnv.executeSql("CREATE TABLE debug_sink (" +
241
"id BIGINT," +
242
"name STRING," +
243
"amount DECIMAL(10,2)" +
244
") WITH (" +
245
"'connector' = 'print'," +
246
"'print-identifier' = 'debug'," +
247
"'standard-error' = 'false'" +
248
")");
249
250
// Blackhole sink for performance testing
251
tEnv.executeSql("CREATE TABLE perf_sink (" +
252
"id BIGINT," +
253
"name STRING," +
254
"processed_time TIMESTAMP(3)" +
255
") WITH (" +
256
"'connector' = 'blackhole'" +
257
")");
258
259
// Use the connectors
260
tEnv.executeSql("INSERT INTO debug_sink SELECT id, name, amount FROM test_source");
261
```
262
263
### Connector Configuration Options
264
265
Configuration utilities for connector options and validation.
266
267
```java { .api }
268
/**
269
* Configuration options for DataGen connector
270
*/
271
public class DataGenConnectorOptions {
272
public static final ConfigOption<Long> ROWS_PER_SECOND = ConfigOptions
273
.key("rows-per-second")
274
.longType()
275
.defaultValue(10000L)
276
.withDescription("Rows per second to generate");
277
278
public static final ConfigOption<Long> NUMBER_OF_ROWS = ConfigOptions
279
.key("number-of-rows")
280
.longType()
281
.noDefaultValue()
282
.withDescription("Total number of rows to generate");
283
284
public static final ConfigOption<Map<String, String>> FIELDS = ConfigOptions
285
.key("fields")
286
.mapType()
287
.noDefaultValue()
288
.withDescription("Field-specific generation options");
289
}
290
291
/**
292
* Configuration options for Print connector
293
*/
294
public class PrintConnectorOptions {
295
public static final ConfigOption<String> PRINT_IDENTIFIER = ConfigOptions
296
.key("print-identifier")
297
.stringType()
298
.noDefaultValue()
299
.withDescription("Identifier for print output");
300
301
public static final ConfigOption<Boolean> STANDARD_ERROR = ConfigOptions
302
.key("standard-error")
303
.booleanType()
304
.defaultValue(false)
305
.withDescription("Print to standard error instead of standard out");
306
}
307
```
308
309
### Connector Abilities
310
311
Interface for extending connector capabilities with additional features.
312
313
```java { .api }
314
/**
315
* Ability for sources to support reading metadata
316
*/
317
public interface SupportsReadingMetadata {
318
/**
319
* Get available metadata keys that can be read
320
* @return Map of metadata key to data type
321
*/
322
public Map<String, DataType> listReadableMetadata();
323
324
/**
325
* Apply metadata reading configuration
326
* @param metadataKeys List of metadata keys to read
327
* @param producedDataType Data type that includes metadata
328
*/
329
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType);
330
}
331
332
/**
333
* Ability for sinks to support writing metadata
334
*/
335
public interface SupportsWritingMetadata {
336
/**
337
* Get available metadata keys that can be written
338
* @return Map of metadata key to data type
339
*/
340
public Map<String, DataType> listWritableMetadata();
341
342
/**
343
* Apply metadata writing configuration
344
* @param metadataKeys List of metadata keys to write
345
* @param consumedDataType Data type that includes metadata
346
*/
347
public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType);
348
}
349
350
/**
351
* Ability for sources to support projection pushdown
352
*/
353
public interface SupportsProjectionPushDown {
354
/**
355
* Apply projection pushdown optimization
356
* @param projectedFields Indices of fields to project
357
* @param producedDataType Data type after projection
358
*/
359
public void applyProjection(int[][] projectedFields, DataType producedDataType);
360
}
361
362
/**
363
* Ability for sources to support filter pushdown
364
*/
365
public interface SupportsFilterPushDown {
366
/**
367
* Apply filter pushdown optimization
368
* @param filters List of filters to push down
369
* @return Result indicating which filters were accepted
370
*/
371
public Result applyFilters(List<ResolvedExpression> filters);
372
373
/**
374
* Result of filter pushdown application
375
*/
376
public static final class Result {
377
public static Result of(List<ResolvedExpression> acceptedFilters,
378
List<ResolvedExpression> remainingFilters) {
379
return new Result(acceptedFilters, remainingFilters);
380
}
381
382
public List<ResolvedExpression> getAcceptedFilters() { return acceptedFilters; }
383
public List<ResolvedExpression> getRemainingFilters() { return remainingFilters; }
384
}
385
}
386
387
/**
388
* Ability for sinks to support overwrite mode
389
*/
390
public interface SupportsOverwrite {
391
/**
392
* Apply overwrite mode configuration
393
* @param overwrite Whether to overwrite existing data
394
*/
395
public void applyOverwrite(boolean overwrite);
396
}
397
398
/**
399
* Ability for sinks to support partitioning
400
*/
401
public interface SupportsPartitioning {
402
/**
403
* Apply partitioning configuration
404
* @param partitionKeys List of partition key names
405
*/
406
public void applyStaticPartition(Map<String, String> partition);
407
}
408
```
409
410
### Custom Connector Development
411
412
Template and utilities for developing custom connectors.
413
414
```java { .api }
415
// Example custom source implementation
416
public class CustomTableSource implements ScanTableSource, SupportsReadingMetadata {
417
private final ResolvedSchema schema;
418
private final Map<String, String> options;
419
private List<String> metadataKeys;
420
421
public CustomTableSource(ResolvedSchema schema, Map<String, String> options) {
422
this.schema = schema;
423
this.options = options;
424
this.metadataKeys = new ArrayList<>();
425
}
426
427
@Override
428
public ChangelogMode getChangelogMode() {
429
return ChangelogMode.insertOnly();
430
}
431
432
@Override
433
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
434
return SourceProvider.of(new CustomSourceFunction(schema, options, metadataKeys));
435
}
436
437
@Override
438
public Map<String, DataType> listReadableMetadata() {
439
Map<String, DataType> metadata = new HashMap<>();
440
metadata.put("timestamp", DataTypes.TIMESTAMP_LTZ(3));
441
metadata.put("source-id", DataTypes.STRING());
442
return metadata;
443
}
444
445
@Override
446
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
447
this.metadataKeys = metadataKeys;
448
}
449
450
@Override
451
public DynamicTableSource copy() {
452
CustomTableSource copy = new CustomTableSource(schema, options);
453
copy.metadataKeys = new ArrayList<>(this.metadataKeys);
454
return copy;
455
}
456
457
@Override
458
public String asSummaryString() {
459
return "CustomSource";
460
}
461
}
462
463
// Example custom sink implementation
464
public class CustomTableSink implements DynamicTableSink, SupportsWritingMetadata {
465
private final ResolvedSchema schema;
466
private final Map<String, String> options;
467
private List<String> metadataKeys;
468
469
@Override
470
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
471
return ChangelogMode.insertOnly();
472
}
473
474
@Override
475
public SinkRuntimeProvider getSinkRuntimeProvider(SinkContext context) {
476
return SinkProvider.of(new CustomSinkFunction(schema, options, metadataKeys));
477
}
478
479
@Override
480
public Map<String, DataType> listWritableMetadata() {
481
Map<String, DataType> metadata = new HashMap<>();
482
metadata.put("timestamp", DataTypes.TIMESTAMP_LTZ(3));
483
metadata.put("partition", DataTypes.STRING());
484
return metadata;
485
}
486
487
@Override
488
public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
489
this.metadataKeys = metadataKeys;
490
}
491
492
@Override
493
public DynamicTableSink copy() {
494
CustomTableSink copy = new CustomTableSink(schema, options);
495
copy.metadataKeys = new ArrayList<>(this.metadataKeys);
496
return copy;
497
}
498
499
@Override
500
public String asSummaryString() {
501
return "CustomSink";
502
}
503
}
504
505
// Custom factory implementation
506
public class CustomConnectorFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
507
@Override
508
public String factoryIdentifier() {
509
return "custom";
510
}
511
512
@Override
513
public Set<ConfigOption<?>> requiredOptions() {
514
return Set.of(CustomOptions.HOST, CustomOptions.PORT);
515
}
516
517
@Override
518
public Set<ConfigOption<?>> optionalOptions() {
519
return Set.of(CustomOptions.USERNAME, CustomOptions.PASSWORD);
520
}
521
522
@Override
523
public DynamicTableSource createDynamicTableSource(Context context) {
524
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
525
helper.validate();
526
527
ReadableConfig options = helper.getOptions();
528
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
529
530
return new CustomTableSource(schema, options.toMap());
531
}
532
533
@Override
534
public DynamicTableSink createDynamicTableSink(Context context) {
535
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
536
helper.validate();
537
538
ReadableConfig options = helper.getOptions();
539
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
540
541
return new CustomTableSink(schema, options.toMap());
542
}
543
}
544
```
545
546
### Connector Utilities
547
548
Helper utilities for connector development and configuration.
549
550
```java { .api }
551
/**
552
* Utility for connector factory helpers
553
*/
554
public class FactoryUtil {
555
/**
556
* Create a table factory helper for validation and option extraction
557
* @param factory The connector factory
558
* @param context Creation context
559
* @return Helper for option validation and extraction
560
*/
561
public static TableFactoryHelper createTableFactoryHelper(Factory factory,
562
DynamicTableFactory.Context context);
563
564
public static final class TableFactoryHelper {
565
/**
566
* Validate required and optional options
567
*/
568
public void validate();
569
570
/**
571
* Get validated configuration options
572
* @return ReadableConfig with validated options
573
*/
574
public ReadableConfig getOptions();
575
}
576
}
577
578
/**
579
* Utility for changelog mode operations
580
*/
581
public class ChangelogMode {
582
/**
583
* Create insert-only changelog mode
584
* @return ChangelogMode for append-only sources/sinks
585
*/
586
public static ChangelogMode insertOnly();
587
588
/**
589
* Create upsert changelog mode
590
* @return ChangelogMode supporting inserts, updates, and deletes
591
*/
592
public static ChangelogMode upsert();
593
594
/**
595
* Create all-changes changelog mode
596
* @return ChangelogMode supporting all change types
597
*/
598
public static ChangelogMode all();
599
}
600
```
601
602
### Testing Connectors
603
604
Utilities and patterns for testing custom connectors.
605
606
```java { .api }
607
// Test utilities for connector development
608
public class ConnectorTestUtils {
609
/**
610
* Create test table environment for connector testing
611
* @return TableEnvironment configured for testing
612
*/
613
public static TableEnvironment createTestTableEnvironment() {
614
EnvironmentSettings settings = EnvironmentSettings.newInstance()
615
.inStreamingMode()
616
.build();
617
return TableEnvironment.create(settings);
618
}
619
620
/**
621
* Execute connector test with sample data
622
* @param sourceConnector Source connector configuration
623
* @param sinkConnector Sink connector configuration
624
* @param testData Sample data for testing
625
*/
626
public static void executeConnectorTest(String sourceConnector,
627
String sinkConnector,
628
List<Row> testData) {
629
// Implementation for automated connector testing
630
}
631
}
632
633
// Example connector test
634
@Test
635
public void testCustomConnector() {
636
TableEnvironment tEnv = ConnectorTestUtils.createTestTableEnvironment();
637
638
// Register custom connector factory
639
tEnv.executeSql("CREATE TABLE source_table (" +
640
"id BIGINT," +
641
"name STRING," +
642
"amount DECIMAL(10,2)" +
643
") WITH (" +
644
"'connector' = 'custom'," +
645
"'host' = 'localhost'," +
646
"'port' = '9092'" +
647
")");
648
649
tEnv.executeSql("CREATE TABLE sink_table (" +
650
"id BIGINT," +
651
"name STRING," +
652
"amount DECIMAL(10,2)" +
653
") WITH (" +
654
"'connector' = 'print'" +
655
")");
656
657
// Test data pipeline
658
TableResult result = tEnv.executeSql("INSERT INTO sink_table SELECT * FROM source_table");
659
660
// Verify results
661
assertThat(result.getJobClient()).isPresent();
662
}
663
```