0
# Legacy Connector Support
1
2
The Flink Table API Java Bridge maintains backward compatibility with legacy connector interfaces that were used before the introduction of the modern connector framework (FLIP-95). These interfaces are deprecated but still supported for existing connector implementations.
3
4
## Legacy Factory Interfaces
5
6
### StreamTableSourceFactory
7
8
Factory interface for creating legacy stream table sources:
9
10
```java { .api }
11
@Deprecated
12
@PublicEvolving
13
public interface StreamTableSourceFactory<T> extends TableSourceFactory<T> {
14
StreamTableSource<T> createStreamTableSource(Map<String, String> properties);
15
16
// Inherited from TableSourceFactory
17
TableSource<T> createTableSource(Map<String, String> properties);
18
}
19
```
20
21
**Migration Note**: New implementations should use `DynamicTableSourceFactory` instead.
22
23
### StreamTableSinkFactory
24
25
Factory interface for creating legacy stream table sinks:
26
27
```java { .api }
28
@Deprecated
29
@PublicEvolving
30
public interface StreamTableSinkFactory<T> extends TableSinkFactory<T> {
31
StreamTableSink<T> createStreamTableSink(Map<String, String> properties);
32
33
// Inherited from TableSinkFactory
34
TableSink<T> createTableSink(Map<String, String> properties);
35
}
36
```
37
38
**Migration Note**: New implementations should use `DynamicTableSinkFactory` instead.
39
40
## Legacy Source Interfaces
41
42
### StreamTableSource
43
44
Base interface for legacy streaming table sources:
45
46
```java { .api }
47
@Deprecated
48
public interface StreamTableSource<T> extends TableSource<T> {
49
default boolean isBounded() {
50
return false;
51
}
52
53
DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
54
}
55
```
56
57
**Usage Example (Deprecated):**
58
59
```java
60
@Deprecated
61
public class MyLegacyTableSource implements StreamTableSource<Row> {
62
private final String[] fieldNames;
63
private final TypeInformation<?>[] fieldTypes;
64
private final MySourceConfig config;
65
66
public MyLegacyTableSource(String[] fieldNames, TypeInformation<?>[] fieldTypes, MySourceConfig config) {
67
this.fieldNames = fieldNames;
68
this.fieldTypes = fieldTypes;
69
this.config = config;
70
}
71
72
@Override
73
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
74
return execEnv
75
.addSource(new MySourceFunction(config))
76
.map(new MyRowMapper());
77
}
78
79
@Override
80
public boolean isBounded() {
81
return config.isBounded();
82
}
83
84
@Override
85
public TableSchema getTableSchema() {
86
return TableSchema.builder()
87
.fields(fieldNames, fieldTypes)
88
.build();
89
}
90
91
@Override
92
public String explainSource() {
93
return "MyLegacyTableSource";
94
}
95
}
96
```
97
98
### InputFormatTableSource
99
100
Abstract class for bounded table sources based on InputFormat:
101
102
```java { .api }
103
@Deprecated
104
@Experimental
105
public abstract class InputFormatTableSource<T> extends StreamTableSource<T> {
106
public abstract InputFormat<T, ?> getInputFormat();
107
108
@Override
109
public final boolean isBounded() {
110
return true;
111
}
112
113
@Override
114
public final DataStream<T> getDataStream(StreamExecutionEnvironment execEnv) {
115
return execEnv.createInput(getInputFormat(), getReturnType());
116
}
117
}
118
```
119
120
**Usage Example (Deprecated):**
121
122
```java
123
@Deprecated
124
public class MyInputFormatTableSource extends InputFormatTableSource<Row> {
125
private final MyInputFormat inputFormat;
126
private final RowTypeInfo returnType;
127
128
public MyInputFormatTableSource(MyInputFormat inputFormat, RowTypeInfo returnType) {
129
this.inputFormat = inputFormat;
130
this.returnType = returnType;
131
}
132
133
@Override
134
public InputFormat<Row, ?> getInputFormat() {
135
return inputFormat;
136
}
137
138
@Override
139
public TypeInformation<Row> getReturnType() {
140
return returnType;
141
}
142
143
@Override
144
public TableSchema getTableSchema() {
145
return TableSchema.fromTypeInfo(returnType);
146
}
147
}
148
```
149
150
## Legacy Sink Interfaces
151
152
### StreamTableSink
153
154
Base interface for legacy streaming table sinks:
155
156
```java { .api }
157
@Deprecated
158
public interface StreamTableSink<T> extends TableSink<T> {
159
DataStreamSink<?> consumeDataStream(DataStream<T> dataStream);
160
}
161
```
162
163
**Usage Example (Deprecated):**
164
165
```java
166
@Deprecated
167
public class MyLegacyTableSink implements StreamTableSink<Row> {
168
private final String[] fieldNames;
169
private final TypeInformation<?>[] fieldTypes;
170
private final MySinkConfig config;
171
172
@Override
173
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
174
return dataStream
175
.map(new MyRowConverter(config))
176
.addSink(new MySinkFunction(config))
177
.name("MyLegacySink");
178
}
179
180
@Override
181
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
182
return new MyLegacyTableSink(fieldNames, fieldTypes, config);
183
}
184
185
@Override
186
public TableSchema getTableSchema() {
187
return TableSchema.builder()
188
.fields(fieldNames, fieldTypes)
189
.build();
190
}
191
}
192
```
193
194
### AppendStreamTableSink
195
196
Interface for append-only stream table sinks:
197
198
```java { .api }
199
@Deprecated
200
@PublicEvolving
201
public interface AppendStreamTableSink<T> extends StreamTableSink<T> {
202
// Inherits all methods from StreamTableSink
203
// Semantically indicates append-only capability
204
}
205
```
206
207
### RetractStreamTableSink
208
209
Interface for retractable stream table sinks that can handle updates:
210
211
```java { .api }
212
@Deprecated
213
@PublicEvolving
214
public interface RetractStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> {
215
TypeInformation<T> getRecordType();
216
217
default TypeInformation<Tuple2<Boolean, T>> getOutputType() {
218
return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
219
}
220
}
221
```
222
223
**Usage Example (Deprecated):**
224
225
```java
226
@Deprecated
227
public class MyRetractStreamTableSink implements RetractStreamTableSink<Row> {
228
229
@Override
230
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
231
return dataStream
232
.process(new ProcessFunction<Tuple2<Boolean, Row>, MyRecord>() {
233
@Override
234
public void processElement(Tuple2<Boolean, Row> value, Context ctx, Collector<MyRecord> out) {
235
Boolean isInsert = value.f0;
236
Row row = value.f1;
237
238
if (isInsert) {
239
out.collect(MyRecord.fromRow(row, ChangeType.INSERT));
240
} else {
241
out.collect(MyRecord.fromRow(row, ChangeType.DELETE));
242
}
243
}
244
})
245
.addSink(new MyChangelogSinkFunction());
246
}
247
248
@Override
249
public TypeInformation<Row> getRecordType() {
250
return Types.ROW_NAMED(fieldNames, fieldTypes);
251
}
252
}
253
```
254
255
### UpsertStreamTableSink
256
257
Interface for upsert stream table sinks that can handle insert/update/delete operations:
258
259
```java { .api }
260
@Deprecated
261
@PublicEvolving
262
public interface UpsertStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> {
263
void setKeyFields(String[] keys);
264
void setIsAppendOnly(Boolean isAppendOnly);
265
TypeInformation<T> getRecordType();
266
267
default TypeInformation<Tuple2<Boolean, T>> getOutputType() {
268
return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
269
}
270
}
271
```
272
273
**Usage Example (Deprecated):**
274
275
```java
276
@Deprecated
277
public class MyUpsertStreamTableSink implements UpsertStreamTableSink<Row> {
278
private String[] keyFields;
279
private Boolean isAppendOnly;
280
281
@Override
282
public void setKeyFields(String[] keys) {
283
this.keyFields = keys;
284
}
285
286
@Override
287
public void setIsAppendOnly(Boolean isAppendOnly) {
288
this.isAppendOnly = isAppendOnly;
289
}
290
291
@Override
292
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
293
return dataStream
294
.keyBy(new KeySelector<Tuple2<Boolean, Row>, String>() {
295
@Override
296
public String getKey(Tuple2<Boolean, Row> value) throws Exception {
297
Row row = value.f1;
298
// Build key from key fields
299
StringBuilder keyBuilder = new StringBuilder();
300
for (String keyField : keyFields) {
301
int index = getFieldIndex(keyField);
302
keyBuilder.append(row.getField(index)).append("|");
303
}
304
return keyBuilder.toString();
305
}
306
})
307
.addSink(new MyUpsertSinkFunction(keyFields, isAppendOnly));
308
}
309
310
@Override
311
public TypeInformation<Row> getRecordType() {
312
return Types.ROW_NAMED(fieldNames, fieldTypes);
313
}
314
}
315
```
316
317
### OutputFormatTableSink
318
319
Abstract class for table sinks based on OutputFormat:
320
321
```java { .api }
322
@Deprecated
323
public abstract class OutputFormatTableSink<T> implements StreamTableSink<T> {
324
public abstract OutputFormat<T> getOutputFormat();
325
326
@Override
327
public DataStreamSink<T> consumeDataStream(DataStream<T> dataStream) {
328
return dataStream.writeUsingOutputFormat(getOutputFormat());
329
}
330
}
331
```
332
333
## Legacy CSV Connector (Testing Only)
334
335
The CSV connector implementations are maintained only for testing the legacy connector stack:
336
337
### CsvTableSource
338
339
```java { .api }
340
@Deprecated
341
public class CsvTableSource extends InputFormatTableSource<Row> {
342
// Constructor options
343
public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes);
344
public CsvTableSource(
345
String path,
346
String[] fieldNames,
347
TypeInformation<?>[] fieldTypes,
348
String fieldDelim,
349
String rowDelim,
350
Character quoteCharacter,
351
boolean ignoreFirstLine,
352
String ignoreComments,
353
boolean lenient
354
);
355
356
// Builder pattern support
357
public static CsvTableSource.Builder builder();
358
}
359
```
360
361
### CsvTableSink
362
363
```java { .api }
364
@Deprecated
365
public class CsvTableSink extends OutputFormatTableSink<Row> {
366
public CsvTableSink(String path, String fieldDelim, int numFiles, WriteMode writeMode);
367
368
@Override
369
public OutputFormat<Row> getOutputFormat() {
370
return new CsvOutputFormat<>(path, fieldDelim, numFiles, writeMode);
371
}
372
}
373
```
374
375
## Migration Guidelines
376
377
### From Legacy to Modern Connectors
378
379
**Step 1: Replace Factory Interface**
380
381
```java
382
// Old (Deprecated)
383
public class MyConnectorFactory implements StreamTableSourceFactory<Row> {
384
@Override
385
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
386
return new MyLegacyTableSource(properties);
387
}
388
}
389
390
// New (Recommended)
391
public class MyConnectorFactory implements DynamicTableSourceFactory {
392
@Override
393
public DynamicTableSource createDynamicTableSource(Context context) {
394
return new MyModernTableSource(context);
395
}
396
}
397
```
398
399
**Step 2: Replace Source Interface**
400
401
```java
402
// Old (Deprecated)
403
public class MyLegacyTableSource implements StreamTableSource<Row> {
404
@Override
405
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
406
return execEnv.addSource(new MySourceFunction());
407
}
408
}
409
410
// New (Recommended)
411
public class MyModernTableSource implements DynamicTableSource {
412
@Override
413
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
414
return new DataStreamScanProvider() {
415
@Override
416
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
417
return execEnv.addSource(new MySourceFunction())
418
.map(new MyRowDataMapper());
419
}
420
};
421
}
422
}
423
```
424
425
**Step 3: Replace Sink Interface**
426
427
```java
428
// Old (Deprecated)
429
public class MyLegacyTableSink implements StreamTableSink<Row> {
430
@Override
431
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
432
return dataStream.addSink(new MySinkFunction());
433
}
434
}
435
436
// New (Recommended)
437
public class MyModernTableSink implements DynamicTableSink {
438
@Override
439
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
440
return new DataStreamSinkProvider() {
441
@Override
442
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
443
return dataStream
444
.map(new MyRowDataConverter())
445
.addSink(new MySinkFunction());
446
}
447
};
448
}
449
}
450
```
451
452
### Configuration Migration
453
454
**Legacy String-based Properties:**
455
456
```java
457
// Old approach
458
Map<String, String> properties = new HashMap<>();
459
properties.put("connector.type", "my-connector");
460
properties.put("connector.host", "localhost");
461
properties.put("connector.port", "8080");
462
```
463
464
**Modern ConfigOption-based Configuration:**
465
466
```java
467
// New approach
468
public class MyConnectorOptions {
469
public static final ConfigOption<String> HOST =
470
ConfigOptions.key("host")
471
.stringType()
472
.defaultValue("localhost");
473
474
public static final ConfigOption<Integer> PORT =
475
ConfigOptions.key("port")
476
.intType()
477
.defaultValue(8080);
478
}
479
```
480
481
## Compatibility Considerations
482
483
### Runtime Compatibility
484
485
1. **Legacy connectors** continue to work with current Flink versions
486
2. **Mixed usage** of legacy and modern connectors is supported
487
3. **Gradual migration** can be performed incrementally
488
489
### Feature Limitations
490
491
Legacy connectors have limitations compared to modern ones:
492
493
1. **No support for** complex data types introduced in newer versions
494
2. **Limited metadata** access compared to modern metadata handling
495
3. **No support for** advanced features like watermark pushdown
496
4. **String-based configuration** instead of type-safe ConfigOptions
497
498
### Performance Implications
499
500
1. **Legacy connectors** may have slight performance overhead
501
2. **Type conversions** between Row and RowData may be needed
502
3. **Modern connectors** are optimized for current Flink runtime
503
504
## Best Practices for Legacy Support
505
506
### When to Use Legacy Interfaces
507
508
1. **Maintaining existing connectors** that haven't been migrated yet
509
2. **Quick prototyping** when familiar with legacy APIs
510
3. **Compatibility requirements** with older Flink versions
511
512
### Migration Strategy
513
514
1. **Plan migration** during major version upgrades
515
2. **Test thoroughly** with both legacy and modern implementations
516
3. **Migrate incrementally** by component rather than all at once
517
4. **Document migration** progress and remaining legacy components
518
519
### Code Organization
520
521
```java
522
// Organize legacy code clearly
523
@Deprecated
524
@SuppressWarnings("deprecation")
525
public class LegacyConnectorSupport {
526
527
// Legacy implementation
528
public static class LegacySource implements StreamTableSource<Row> {
529
// Implementation
530
}
531
532
// Migration helper
533
public static DynamicTableSource migrateToModern(LegacySource legacy) {
534
return new ModernSourceAdapter(legacy);
535
}
536
}
537
```