0
# Modern Connector Framework
1
2
The Modern Connector Framework in Flink Table API Java Bridge provides new connector interfaces following the FLIP-95 design. These interfaces offer better integration with the DataStream API and improved flexibility for connector development.
3
4
## Overview
5
6
The modern connector framework introduces provider-based interfaces that integrate directly with Flink's DataStream API, replacing the legacy table source/sink interfaces. This approach provides better type safety, improved performance, and cleaner separation of concerns.
7
8
## Source Providers
9
10
### DataStreamScanProvider
11
12
The `DataStreamScanProvider` interface allows connectors to directly produce DataStream instances:
13
14
```java { .api }
15
@PublicEvolving
16
public interface DataStreamScanProvider extends ScanRuntimeProvider {
17
DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv);
18
}
19
```
20
21
**Usage Example:**
22
23
```java
24
public class MyDataStreamScanProvider implements DataStreamScanProvider {
25
private final MySourceConfig config;
26
27
public MyDataStreamScanProvider(MySourceConfig config) {
28
this.config = config;
29
}
30
31
@Override
32
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
33
return execEnv.addSource(new MySourceFunction(config))
34
.map(new MyRowDataMapper());
35
}
36
}
37
38
// In your DynamicTableSource implementation
39
@Override
40
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
41
return new MyDataStreamScanProvider(sourceConfig);
42
}
43
```
44
45
### SourceFunctionProvider
46
47
The `SourceFunctionProvider` interface provides a way to create connector sources using Flink's SourceFunction:
48
49
```java { .api }
50
@PublicEvolving
51
public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider {
52
static SourceFunctionProvider of(SourceFunction<RowData> sourceFunction, boolean isBounded);
53
54
SourceFunction<RowData> createSourceFunction();
55
56
// Note: isBounded() method is inherited from ScanRuntimeProvider parent interface
57
}
58
```
59
60
**Usage Example:**
61
62
```java
63
// Create a bounded source function provider
64
SourceFunction<RowData> mySourceFunction = new MyBoundedSourceFunction(config);
65
SourceFunctionProvider provider = SourceFunctionProvider.of(mySourceFunction, true);
66
67
// Create an unbounded source function provider
68
SourceFunction<RowData> streamingSource = new MyStreamingSourceFunction(config);
69
SourceFunctionProvider streamingProvider = SourceFunctionProvider.of(streamingSource, false);
70
71
// In your DynamicTableSource implementation
72
@Override
73
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
74
if (config.isBounded()) {
75
return SourceFunctionProvider.of(new MyBoundedSourceFunction(config), true);
76
} else {
77
return SourceFunctionProvider.of(new MyStreamingSourceFunction(config), false);
78
}
79
}
80
```
81
82
## Sink Providers
83
84
### DataStreamSinkProvider
85
86
The `DataStreamSinkProvider` interface allows connectors to directly consume DataStream instances:
87
88
```java { .api }
89
@PublicEvolving
90
public interface DataStreamSinkProvider extends SinkRuntimeProvider {
91
DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);
92
93
default Optional<Integer> getParallelism() {
94
return Optional.empty();
95
}
96
}
97
```
98
99
**Usage Example:**
100
101
```java
102
public class MyDataStreamSinkProvider implements DataStreamSinkProvider {
103
private final MySinkConfig config;
104
105
public MyDataStreamSinkProvider(MySinkConfig config) {
106
this.config = config;
107
}
108
109
@Override
110
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
111
return dataStream
112
.map(new MyRowDataConverter(config))
113
.addSink(new MySinkFunction(config))
114
.name("My Custom Sink");
115
}
116
117
@Override
118
public Optional<Integer> getParallelism() {
119
return Optional.ofNullable(config.getParallelism());
120
}
121
}
122
123
// In your DynamicTableSink implementation
124
@Override
125
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
126
return new MyDataStreamSinkProvider(sinkConfig);
127
}
128
```
129
130
### SinkFunctionProvider
131
132
The `SinkFunctionProvider` interface provides a way to create connector sinks using Flink's SinkFunction:
133
134
```java { .api }
135
@PublicEvolving
136
public interface SinkFunctionProvider extends SinkRuntimeProvider {
137
static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction);
138
static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, @Nullable Integer parallelism);
139
140
SinkFunction<RowData> createSinkFunction();
141
142
default Optional<Integer> getParallelism() {
143
return Optional.empty();
144
}
145
}
146
```
147
148
**Usage Example:**
149
150
```java
151
// Create a sink function provider without parallelism constraint
152
SinkFunction<RowData> mySinkFunction = new MySinkFunction(config);
153
SinkFunctionProvider provider = SinkFunctionProvider.of(mySinkFunction);
154
155
// Create a sink function provider with specific parallelism
156
SinkFunction<RowData> parallelSink = new MyParallelSinkFunction(config);
157
SinkFunctionProvider parallelProvider = SinkFunctionProvider.of(parallelSink, 4);
158
159
// In your DynamicTableSink implementation
160
@Override
161
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
162
SinkFunction<RowData> sinkFunction = new MySinkFunction(sinkConfig);
163
164
if (sinkConfig.getParallelism() != null) {
165
return SinkFunctionProvider.of(sinkFunction, sinkConfig.getParallelism());
166
} else {
167
return SinkFunctionProvider.of(sinkFunction);
168
}
169
}
170
```
171
172
## Complete Connector Implementation Examples
173
174
### Custom DataStream Source
175
176
```java
177
public class MyCustomTableSource implements DynamicTableSource {
178
private final MySourceConfig config;
179
private final ResolvedSchema resolvedSchema;
180
181
public MyCustomTableSource(MySourceConfig config, ResolvedSchema resolvedSchema) {
182
this.config = config;
183
this.resolvedSchema = resolvedSchema;
184
}
185
186
@Override
187
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
188
return new DataStreamScanProvider() {
189
@Override
190
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
191
return execEnv
192
.addSource(new MySourceFunction(config))
193
.returns(context.createTypeInformation())
194
.map(new MyToRowDataMapper(resolvedSchema));
195
}
196
};
197
}
198
199
@Override
200
public DynamicTableSource copy() {
201
return new MyCustomTableSource(config, resolvedSchema);
202
}
203
204
@Override
205
public String asSummaryString() {
206
return "MyCustomSource";
207
}
208
209
@Override
210
public ChangelogMode getChangelogMode() {
211
return ChangelogMode.insertOnly();
212
}
213
}
214
```
215
216
### Custom DataStream Sink
217
218
```java
219
public class MyCustomTableSink implements DynamicTableSink {
220
private final MySinkConfig config;
221
private final ResolvedSchema resolvedSchema;
222
223
public MyCustomTableSink(MySinkConfig config, ResolvedSchema resolvedSchema) {
224
this.config = config;
225
this.resolvedSchema = resolvedSchema;
226
}
227
228
@Override
229
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
230
return new DataStreamSinkProvider() {
231
@Override
232
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
233
return dataStream
234
.map(new MyFromRowDataMapper(resolvedSchema))
235
.addSink(new MySinkFunction(config))
236
.name("MyCustomSink");
237
}
238
239
@Override
240
public Optional<Integer> getParallelism() {
241
return Optional.ofNullable(config.getParallelism());
242
}
243
};
244
}
245
246
@Override
247
public DynamicTableSink copy() {
248
return new MyCustomTableSink(config, resolvedSchema);
249
}
250
251
@Override
252
public String asSummaryString() {
253
return "MyCustomSink";
254
}
255
256
@Override
257
public ChangelogMode getChangelogMode() {
258
return ChangelogMode.insertOnly();
259
}
260
}
261
```
262
263
## Advanced Integration Patterns
264
265
### Source with Watermark Strategy
266
267
```java
268
public class WatermarkedDataStreamScanProvider implements DataStreamScanProvider {
269
private final MySourceConfig config;
270
private final WatermarkStrategy<MyRecord> watermarkStrategy;
271
272
@Override
273
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
274
return execEnv
275
.fromSource(
276
new MyFLinkSource(config),
277
watermarkStrategy,
278
"MyWatermarkedSource"
279
)
280
.map(new MyToRowDataMapper());
281
}
282
}
283
```
284
285
### Sink with State Backend Integration
286
287
```java
288
public class StatefulDataStreamSinkProvider implements DataStreamSinkProvider {
289
private final MySinkConfig config;
290
291
@Override
292
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
293
return dataStream
294
.keyBy(new MyKeySelector())
295
.process(new MyStatefulSinkFunction(config))
296
.addSink(new MyOutputSink(config));
297
}
298
}
299
```
300
301
### Connector with Custom Serialization
302
303
```java
304
public class SerializationAwareDataStreamSinkProvider implements DataStreamSinkProvider {
305
private final MySinkConfig config;
306
private final ResolvedSchema schema;
307
308
@Override
309
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
310
// Create custom serializer based on schema
311
MyCustomSerializer serializer = new MyCustomSerializer(schema);
312
313
return dataStream
314
.map(rowData -> serializer.serialize(rowData))
315
.addSink(new MySinkFunction(config));
316
}
317
}
318
```
319
320
## Error Handling and Resilience
321
322
### Source Error Handling
323
324
```java
325
public class ResilientDataStreamScanProvider implements DataStreamScanProvider {
326
private final MySourceConfig config;
327
328
@Override
329
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
330
return execEnv
331
.addSource(new MySourceFunction(config))
332
.map(new MyToRowDataMapper())
333
.process(new ProcessFunction<RowData, RowData>() {
334
@Override
335
public void processElement(RowData value, Context ctx, Collector<RowData> out) {
336
try {
337
// Validate and process
338
validateRowData(value);
339
out.collect(value);
340
} catch (Exception e) {
341
// Log error and optionally send to side output
342
getRuntimeContext().getMetricGroup()
343
.counter("malformed_records")
344
.inc();
345
}
346
}
347
});
348
}
349
}
350
```
351
352
### Sink Error Handling
353
354
```java
355
public class ResilientDataStreamSinkProvider implements DataStreamSinkProvider {
356
private final MySinkConfig config;
357
358
@Override
359
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
360
return dataStream
361
.map(new MyRowDataConverter())
362
.process(new ProcessFunction<MyRecord, MyRecord>() {
363
@Override
364
public void processElement(MyRecord value, Context ctx, Collector<MyRecord> out) {
365
try {
366
out.collect(value);
367
} catch (Exception e) {
368
// Handle serialization errors
369
ctx.output(errorOutputTag, new ErrorRecord(value, e));
370
}
371
}
372
})
373
.addSink(new MyResilientSinkFunction(config));
374
}
375
}
376
```
377
378
## Performance Optimization
379
380
### Batching in Sinks
381
382
```java
383
public class BatchingDataStreamSinkProvider implements DataStreamSinkProvider {
384
private final MySinkConfig config;
385
386
@Override
387
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
388
return dataStream
389
.map(new MyRowDataConverter())
390
.countWindow(config.getBatchSize())
391
.apply(new WindowFunction<MyRecord, List<MyRecord>, GlobalWindow>() {
392
@Override
393
public void apply(GlobalWindow window,
394
Iterable<MyRecord> values,
395
Collector<List<MyRecord>> out) {
396
List<MyRecord> batch = new ArrayList<>();
397
values.forEach(batch::add);
398
out.collect(batch);
399
}
400
})
401
.addSink(new MyBatchingSinkFunction(config));
402
}
403
}
404
```
405
406
### Parallel Processing
407
408
```java
409
public class ParallelDataStreamSinkProvider implements DataStreamSinkProvider {
410
private final MySinkConfig config;
411
412
@Override
413
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
414
return dataStream
415
.rebalance() // Distribute evenly across parallel instances
416
.map(new MyRowDataConverter())
417
.addSink(new MySinkFunction(config))
418
.setParallelism(config.getParallelism());
419
}
420
421
@Override
422
public Optional<Integer> getParallelism() {
423
return Optional.of(config.getParallelism());
424
}
425
}
426
```
427
428
## Migration from Legacy Interfaces
429
430
When migrating from legacy `StreamTableSource`/`StreamTableSink` interfaces:
431
432
1. **Replace TableSource**: Implement `DynamicTableSource` with `DataStreamScanProvider`
433
2. **Replace TableSink**: Implement `DynamicTableSink` with `DataStreamSinkProvider`
434
3. **Update Factory**: Implement `DynamicTableSourceFactory`/`DynamicTableSinkFactory`
435
4. **Handle Configuration**: Use `ConfigOption` instead of string-based properties
436
437
**Before (Legacy):**
438
439
```java
440
public class LegacyTableSource implements StreamTableSource<Row> {
441
@Override
442
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
443
return execEnv.addSource(new MySourceFunction());
444
}
445
}
446
```
447
448
**After (Modern):**
449
450
```java
451
public class ModernTableSource implements DynamicTableSource {
452
@Override
453
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
454
return new DataStreamScanProvider() {
455
@Override
456
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
457
return execEnv.addSource(new MySourceFunction())
458
.map(new MyRowDataMapper());
459
}
460
};
461
}
462
}
463
```