Bridge component for Apache Flink's Table/SQL API that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs.
—
The Flink Table API Java Bridge maintains backward compatibility with legacy connector interfaces that were used before the introduction of the modern connector framework (FLIP-95). These interfaces are deprecated but still supported for existing connector implementations.
Factory interface for creating legacy stream table sources:
@Deprecated
@PublicEvolving
public interface StreamTableSourceFactory<T> extends TableSourceFactory<T> {
StreamTableSource<T> createStreamTableSource(Map<String, String> properties);
// Inherited from TableSourceFactory
TableSource<T> createTableSource(Map<String, String> properties);
}Migration Note: New implementations should use DynamicTableSourceFactory instead.
Factory interface for creating legacy stream table sinks:
@Deprecated
@PublicEvolving
public interface StreamTableSinkFactory<T> extends TableSinkFactory<T> {
StreamTableSink<T> createStreamTableSink(Map<String, String> properties);
// Inherited from TableSinkFactory
TableSink<T> createTableSink(Map<String, String> properties);
}Migration Note: New implementations should use DynamicTableSinkFactory instead.
Base interface for legacy streaming table sources:
@Deprecated
public interface StreamTableSource<T> extends TableSource<T> {
default boolean isBounded() {
return false;
}
DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
}Usage Example (Deprecated):
@Deprecated
public class MyLegacyTableSource implements StreamTableSource<Row> {
private final String[] fieldNames;
private final TypeInformation<?>[] fieldTypes;
private final MySourceConfig config;
public MyLegacyTableSource(String[] fieldNames, TypeInformation<?>[] fieldTypes, MySourceConfig config) {
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
this.config = config;
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv
.addSource(new MySourceFunction(config))
.map(new MyRowMapper());
}
@Override
public boolean isBounded() {
return config.isBounded();
}
@Override
public TableSchema getTableSchema() {
return TableSchema.builder()
.fields(fieldNames, fieldTypes)
.build();
}
@Override
public String explainSource() {
return "MyLegacyTableSource";
}
}Abstract class for bounded table sources based on InputFormat:
@Deprecated
@Experimental
public abstract class InputFormatTableSource<T> extends StreamTableSource<T> {
public abstract InputFormat<T, ?> getInputFormat();
@Override
public final boolean isBounded() {
return true;
}
@Override
public final DataStream<T> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv.createInput(getInputFormat(), getReturnType());
}
}Usage Example (Deprecated):
@Deprecated
public class MyInputFormatTableSource extends InputFormatTableSource<Row> {
private final MyInputFormat inputFormat;
private final RowTypeInfo returnType;
public MyInputFormatTableSource(MyInputFormat inputFormat, RowTypeInfo returnType) {
this.inputFormat = inputFormat;
this.returnType = returnType;
}
@Override
public InputFormat<Row, ?> getInputFormat() {
return inputFormat;
}
@Override
public TypeInformation<Row> getReturnType() {
return returnType;
}
@Override
public TableSchema getTableSchema() {
return TableSchema.fromTypeInfo(returnType);
}
}Base interface for legacy streaming table sinks:
@Deprecated
public interface StreamTableSink<T> extends TableSink<T> {
DataStreamSink<?> consumeDataStream(DataStream<T> dataStream);
}Usage Example (Deprecated):
@Deprecated
public class MyLegacyTableSink implements StreamTableSink<Row> {
private final String[] fieldNames;
private final TypeInformation<?>[] fieldTypes;
private final MySinkConfig config;
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return dataStream
.map(new MyRowConverter(config))
.addSink(new MySinkFunction(config))
.name("MyLegacySink");
}
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return new MyLegacyTableSink(fieldNames, fieldTypes, config);
}
@Override
public TableSchema getTableSchema() {
return TableSchema.builder()
.fields(fieldNames, fieldTypes)
.build();
}
}Interface for append-only stream table sinks:
@Deprecated
@PublicEvolving
public interface AppendStreamTableSink<T> extends StreamTableSink<T> {
// Inherits all methods from StreamTableSink
// Semantically indicates append-only capability
}Interface for retractable stream table sinks that can handle updates:
@Deprecated
@PublicEvolving
public interface RetractStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> {
TypeInformation<T> getRecordType();
default TypeInformation<Tuple2<Boolean, T>> getOutputType() {
return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
}
}Usage Example (Deprecated):
@Deprecated
public class MyRetractStreamTableSink implements RetractStreamTableSink<Row> {
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream
.process(new ProcessFunction<Tuple2<Boolean, Row>, MyRecord>() {
@Override
public void processElement(Tuple2<Boolean, Row> value, Context ctx, Collector<MyRecord> out) {
Boolean isInsert = value.f0;
Row row = value.f1;
if (isInsert) {
out.collect(MyRecord.fromRow(row, ChangeType.INSERT));
} else {
out.collect(MyRecord.fromRow(row, ChangeType.DELETE));
}
}
})
.addSink(new MyChangelogSinkFunction());
}
@Override
public TypeInformation<Row> getRecordType() {
return Types.ROW_NAMED(fieldNames, fieldTypes);
}
}Interface for upsert stream table sinks that can handle insert/update/delete operations:
@Deprecated
@PublicEvolving
public interface UpsertStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> {
void setKeyFields(String[] keys);
void setIsAppendOnly(Boolean isAppendOnly);
TypeInformation<T> getRecordType();
default TypeInformation<Tuple2<Boolean, T>> getOutputType() {
return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
}
}Usage Example (Deprecated):
@Deprecated
public class MyUpsertStreamTableSink implements UpsertStreamTableSink<Row> {
private String[] keyFields;
private Boolean isAppendOnly;
@Override
public void setKeyFields(String[] keys) {
this.keyFields = keys;
}
@Override
public void setIsAppendOnly(Boolean isAppendOnly) {
this.isAppendOnly = isAppendOnly;
}
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream
.keyBy(new KeySelector<Tuple2<Boolean, Row>, String>() {
@Override
public String getKey(Tuple2<Boolean, Row> value) throws Exception {
Row row = value.f1;
// Build key from key fields
StringBuilder keyBuilder = new StringBuilder();
for (String keyField : keyFields) {
int index = getFieldIndex(keyField);
keyBuilder.append(row.getField(index)).append("|");
}
return keyBuilder.toString();
}
})
.addSink(new MyUpsertSinkFunction(keyFields, isAppendOnly));
}
@Override
public TypeInformation<Row> getRecordType() {
return Types.ROW_NAMED(fieldNames, fieldTypes);
}
}Abstract class for table sinks based on OutputFormat:
@Deprecated
public abstract class OutputFormatTableSink<T> implements StreamTableSink<T> {
public abstract OutputFormat<T> getOutputFormat();
@Override
public DataStreamSink<T> consumeDataStream(DataStream<T> dataStream) {
return dataStream.writeUsingOutputFormat(getOutputFormat());
}
}The CSV connector implementations are maintained only for testing the legacy connector stack:
@Deprecated
public class CsvTableSource extends InputFormatTableSource<Row> {
// Constructor options
public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes);
public CsvTableSource(
String path,
String[] fieldNames,
TypeInformation<?>[] fieldTypes,
String fieldDelim,
String rowDelim,
Character quoteCharacter,
boolean ignoreFirstLine,
String ignoreComments,
boolean lenient
);
// Builder pattern support
public static CsvTableSource.Builder builder();
}@Deprecated
public class CsvTableSink extends OutputFormatTableSink<Row> {
public CsvTableSink(String path, String fieldDelim, int numFiles, WriteMode writeMode);
@Override
public OutputFormat<Row> getOutputFormat() {
return new CsvOutputFormat<>(path, fieldDelim, numFiles, writeMode);
}
}Step 1: Replace Factory Interface
// Old (Deprecated)
public class MyConnectorFactory implements StreamTableSourceFactory<Row> {
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
return new MyLegacyTableSource(properties);
}
}
// New (Recommended)
public class MyConnectorFactory implements DynamicTableSourceFactory {
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
return new MyModernTableSource(context);
}
}Step 2: Replace Source Interface
// Old (Deprecated)
public class MyLegacyTableSource implements StreamTableSource<Row> {
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv.addSource(new MySourceFunction());
}
}
// New (Recommended)
public class MyModernTableSource implements DynamicTableSource {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
return execEnv.addSource(new MySourceFunction())
.map(new MyRowDataMapper());
}
};
}
}Step 3: Replace Sink Interface
// Old (Deprecated)
public class MyLegacyTableSink implements StreamTableSink<Row> {
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return dataStream.addSink(new MySinkFunction());
}
}
// New (Recommended)
public class MyModernTableSink implements DynamicTableSink {
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
return dataStream
.map(new MyRowDataConverter())
.addSink(new MySinkFunction());
}
};
}
}Legacy String-based Properties:
// Old approach
Map<String, String> properties = new HashMap<>();
properties.put("connector.type", "my-connector");
properties.put("connector.host", "localhost");
properties.put("connector.port", "8080");Modern ConfigOption-based Configuration:
// New approach
public class MyConnectorOptions {
public static final ConfigOption<String> HOST =
ConfigOptions.key("host")
.stringType()
.defaultValue("localhost");
public static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()
.defaultValue(8080);
}Legacy connectors have limitations compared to modern ones:
// Organize legacy code clearly
@Deprecated
@SuppressWarnings("deprecation")
public class LegacyConnectorSupport {
// Legacy implementation
public static class LegacySource implements StreamTableSource<Row> {
// Implementation
}
// Migration helper
public static DynamicTableSource migrateToModern(LegacySource legacy) {
return new ModernSourceAdapter(legacy);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11